Skip to content
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ var/
.installed.cfg
*.egg
**/_version.py

*.DS_Store

# PyInstaller
Expand Down Expand Up @@ -59,6 +58,7 @@ cov.xml
# Sphinx documentation
docs/_build/
docs/_api
plugins/

# PyBuilder
target/
Expand Down
5 changes: 3 additions & 2 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ cleanup:
plugins:
paths:
- ./plugins # local folder for plugins
github_repos: # list of GitHub repos with analysis code

github_repos: # list of Https GitHub repos with analysis code (ending with .git)
- https://github.com/DiamondLightSource/xrpd-toolbox.git
register_all: False # whether to register all analyses found in plugins or only those decorated
rabbitmq:
enabled: True
host: "i15-1-rabbitmq-daq.diamond.ac.uk"
Expand Down
1 change: 1 addition & 0 deletions helm/indigoapi/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ config:
paths:
- ./plugins
github_repos: []
register_all: True # whether to register all analyses found in plugins or only those decorated
rabbitmq:
enabled: true
host: "ixx-rabbitmq-daq.diamond.ac.uk"
Expand Down
19 changes: 13 additions & 6 deletions src/indigoapi/analyses/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,17 @@
from indigoapi.analyses.loader import load_analyses, load_plugins
from indigoapi.config import Config

# load built-in analyses
package = importlib.import_module(__name__)
MODULE_NAMES = load_analyses(package)
MODULE_NAMES = []

# load user plugins from config
config = Config.load_config()
load_plugins(config)

def initialize_analyses(register_all: bool = False):
"""Load built-in analyses and user plugins. Call during server startup."""
global MODULE_NAMES

# load built-in analyses
package = importlib.import_module(__name__)
MODULE_NAMES = load_analyses(package)

# load user plugins from config
config = Config.load_config()
load_plugins(config, register_all=register_all)
24 changes: 11 additions & 13 deletions src/indigoapi/analyses/decorator.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
import asyncio
import inspect
from functools import wraps
from collections.abc import Awaitable, Callable
from typing import ParamSpec, TypeVar

from indigoapi.analyses.loader import get_async_function
from indigoapi.analyses.registry import register_analysis

P = ParamSpec("P")
R = TypeVar("R")

def analysis(name: str | None = None):

def analysis(
name: str | None = None,
) -> Callable[[Callable[P, R]], Callable[P, Awaitable[R]]]:
"""Decorator to register a function as an analysis.
Converts sync functions to async."""

def decorator(func):
if inspect.iscoroutinefunction(func):
async_fn = func
else:

@wraps(func)
async def async_fn(*args, **kwargs):
return await asyncio.to_thread(func, *args, **kwargs)

def decorator(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
async_fn = get_async_function(func)
name_to_register = name or func.__name__

register_analysis(name_to_register, async_fn)
Expand Down
112 changes: 93 additions & 19 deletions src/indigoapi/analyses/loader.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import asyncio
import importlib
import inspect
import logging
import pkgutil
from collections.abc import Awaitable, Callable
from functools import wraps
from pathlib import Path
from typing import ParamSpec, TypeVar

from git import Repo

from indigoapi.analyses.registry import register_analysis
from indigoapi.config import Config

logger = logging.getLogger(__name__)
Expand All @@ -21,40 +27,108 @@ def load_analyses(package):
return module_names


def load_plugins_from_dir(path: str | Path):
"""Load user plugins from a folder"""
P = ParamSpec("P")
R = TypeVar("R")


def get_async_function(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
if inspect.iscoroutinefunction(func):
return func # type: ignore[return-value]

@wraps(func)
async def async_fn(*args: P.args, **kwargs: P.kwargs) -> R:
return await asyncio.to_thread(func, *args, **kwargs)

return async_fn


def register_module_functions(module):

for name, obj in vars(module).items():
if name.startswith("_"):
continue
if not inspect.isfunction(obj):
continue
if obj.__module__ != module.__name__:
continue
try:
register_analysis(name, get_async_function(obj))
except ValueError:
logger.debug(f"Analysis '{name}' already registered")
except Exception as e:
logger.error(f"Unable to register {name} from {module.__name__}: {e}")


def load_plugins_from_dir(path: str | Path, register_all: bool = False):
"""Load user plugins recursively from a folder and all subfolders."""
path = Path(path)
assert isinstance(path, Path)
if not path.exists():
if not path.exists() or not path.is_dir():
return
for pyfile in path.glob("*.py"):
spec = importlib.util.spec_from_file_location(pyfile.stem, pyfile) # type: ignore
module = importlib.util.module_from_spec(spec) # type: ignore
spec.loader.exec_module(module)

for pyfile in path.rglob("*.py"):
if pyfile.stem.startswith("_") or pyfile.stem.startswith("test_"):
continue

module_name = f"plugin.{pyfile.relative_to(path).with_suffix('').as_posix().replace('/', '.')}" # noqa
try:
spec = importlib.util.spec_from_file_location(module_name, pyfile) # type: ignore
module = importlib.util.module_from_spec(spec) # type: ignore
spec.loader.exec_module(module)
# logger.info(f"Loaded plugin: {pyfile}")
if register_all:
register_module_functions(module)

def clone_github_repo(repo_url: str, dest_dir: str):
"""Clone a repo if not already cloned"""
except Exception:
# logger.error(f"Failed to read plugin {pyfile}: {e}")
pass


def clone_github_repo(repo_url: str, dest_dir: str, force: bool = False) -> Path:
"""Clone a repo if not already cloned. Returns path to cloned repo."""
dest_path = Path(dest_dir) / Path(repo_url).stem
if dest_path.exists():
return dest_path

Repo.clone_from(repo_url, dest_path)
if not dest_path.exists() or force:
Repo.clone_from(repo_url, dest_path)

return dest_path


def load_plugins(config: Config):
"""Load all user plugins (local + GitHub)"""
# Local paths
def load_plugins(config: Config, register_all: bool = False):
"""
Load all user plugins from configured paths and GitHub repos.

Built-in analyses (in indigoapi.analyses) are already loaded via decorators.
This function loads external plugins.

Args:
config: Configuration object with plugin paths and GitHub repos
register_all: If False, only load @analysis-decorated functions.
If True, also auto-register any top-level functions.
"""
# Load from local plugin paths
for p in config.plugins.paths:
load_plugins_from_dir(p)
load_plugins_from_dir(p, register_all=register_all)

# GitHub repos
# Load from GitHub repos
if config.plugins.github_repos is not None:
for repo in config.plugins.github_repos:
logger.info(f"Loading from {repo}")

try:
repo_path = clone_github_repo(repo, "./plugins") # cloned into plugins/
load_plugins_from_dir(repo_path)
repo_path = clone_github_repo(
repo, config.plugins.paths[0]
) # cloned into plugins/
source_path = repo_path / "src"
load_plugins_from_dir(source_path, register_all=register_all)

except Exception as e:
logger.error(f"Unable to load {repo}: {e}")


# if __name__ == "__main__":
# from indigoapi.analyses.registry import list_analyses

# load_plugins(Config.load_config())

# print(list_analyses()[0:4])
25 changes: 13 additions & 12 deletions src/indigoapi/analyses/registry.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
import importlib
import logging
from collections.abc import Callable
from typing import Any

ANALYSIS_REGISTRY = {}
logger = logging.getLogger(__name__)


class AnalysisNotFoundError(Exception):
"""Raised when a requested analysis cannot be found or imported."""


ANALYSIS_REGISTRY: dict[str, Callable[..., Any]] = {}


def register_analysis(name: str, fn: Callable) -> None:
if name in ANALYSIS_REGISTRY:
raise ValueError(f"Analysis '{name}' already registered")
ANALYSIS_REGISTRY[name] = fn
logger.info(f"Registered analysis: {name}")


def list_analyses() -> list[str]:

return list(ANALYSIS_REGISTRY.keys())


def get_analysis(name: str) -> Callable:
if name not in ANALYSIS_REGISTRY:
try:
mod = importlib.import_module(f"indigoapi.analyses.{name}")
func = getattr(mod, name)
ANALYSIS_REGISTRY[name] = func
except Exception as e:
print(f"Unknown analysis '{name}': {e}")
print("Available analyses:")
for analysis in list_analyses():
print(analysis)
msg = f"Unknown analysis '{name}': analysis not found"
raise AnalysisNotFoundError(msg)

return ANALYSIS_REGISTRY[name]
15 changes: 13 additions & 2 deletions src/indigoapi/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,26 @@ async def available_analyses() -> list[dict[str, Any]]:
params.append(
{
"name": p.name,
"default": p.default
"default": repr(p.default)
if p.default != inspect.Parameter.empty
else None,
"annotation": str(p.annotation)
if p.annotation != inspect.Parameter.empty
else "Any",
}
)
analyses_info.append({"name": name, "parameters": params})
return_annotation = (
str(sig.return_annotation)
if sig.return_annotation != inspect.Signature.empty
else "Any"
)
analyses_info.append(
{
"name": name,
"parameters": params,
"return_annotation": return_annotation,
}
)
return analyses_info


Expand Down
40 changes: 35 additions & 5 deletions src/indigoapi/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,36 @@ def __init__(
self.latest_request_id: UUID | None = None
self.session = session or requests.Session()

def list_analyses(self) -> list[dict[str, Any]]:
def list_analyses(
self, as_strings: bool = True
) -> list[dict[str, Any]] | list[str]:
resp = self.session.get(f"{self.base_url}{ANALYSES_ROUTE}")
resp.raise_for_status()
return resp.json()
analyses = resp.json()
if as_strings:
return [self._format_analysis_signature(analysis) for analysis in analyses]
return analyses

def _format_analysis_signature(self, analysis: dict[str, Any]) -> str:
params = []
for param in analysis.get("parameters", []):
param_str = f"{param['name']}: {param['annotation']}"
if param.get("default") is not None:
param_str += f" = {param['default']}"
params.append(param_str)

return_annotation = analysis.get("return_annotation", "Any")
if params:
params_block = ",\n ".join(params)
signature = (
f"{analysis['name']}(\n"
f" {params_block},\n"
f" ) -> {return_annotation}:"
)
else:
signature = f"{analysis['name']}() -> {return_annotation}:"

return signature

def health(self) -> dict[str, Any]:
resp = self.session.get(f"{self.base_url}{HEALTH_ROUTE}")
Expand Down Expand Up @@ -186,16 +212,20 @@ def get_request_id_result(
if __name__ == "__main__":
import numpy as np

from indigoapi.analyses.peak_fitting import gaussian, gaussian_fit
from indigoapi.analyses.peak_fitting import gaussian

x = np.linspace(0, 20, 200)

y = gaussian(x, 10, 5, 1) + (np.random.rand(x.shape[-1]) / 5)

client = AnalysisClient()

client.submit(gaussian_fit.__name__, x=x, y=y)
# client.submit(gaussian_fit.__name__, x=x, y=y)

client.submit("beam_energy_to_wavelength", beam_energy=15)

print(client.get_result())

print(client.get_endpoints())
# print(client.get_endpoints())
# for i in client.list_analyses()[0:4]:
# print(i)
1 change: 1 addition & 0 deletions src/indigoapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class CleanupConfig(BaseModel):
class PluginsConfig(BaseModel):
paths: list[str] = []
github_repos: list[str] | None = []
register_all: bool = False


class Config(BaseSettings):
Expand Down
Loading
Loading