diff --git a/src/labthings_fastapi/client/__init__.py b/src/labthings_fastapi/client/__init__.py index 566852c5..59711022 100644 --- a/src/labthings_fastapi/client/__init__.py +++ b/src/labthings_fastapi/client/__init__.py @@ -7,7 +7,7 @@ from __future__ import annotations import time -from typing import Any, Optional, Union +from typing import Any, Generic, Optional, TypeVar from typing_extensions import Self # 3.9, 3.10 compatibility from collections.abc import Mapping import httpx @@ -321,83 +321,64 @@ class PropertyClientDescriptor: path: str -def property_descriptor( - property_name: str, - model: Union[type, BaseModel], - description: Optional[str] = None, - readable: bool = True, - writeable: bool = True, - property_path: Optional[str] = None, -) -> PropertyClientDescriptor: - """Create a correctly-typed descriptor that gets and/or sets a property. - - The returned `.PropertyClientDescriptor` will have ``__get__`` and - (optionally) ``__set__`` methods that are typed according to the - supplied ``model``. The descriptor should be added to a `~lt.ThingClient` - subclass and used to access the relevant property via - `.ThingClient.get_property` and `.ThingClient.set_property`. - - :param property_name: should be the name of the property (i.e. the - name it takes in the thing description, and also the name it is - assigned to in the class). - :param model: the Python ``type`` or a ``pydantic.BaseModel`` that - represents the datatype of the property. - :param description: text to use for a docstring. - :param readable: whether the property may be read (i.e. has ``__get__``). - :param writeable: whether the property may be written to. - :param property_path: the URL of the ``getproperty`` and ``setproperty`` - HTTP endpoints. Currently these must both be the same. These are - relative to the ``base_url``, i.e. the URL of the Thing Description. - - :return: a descriptor allowing access to the specified property. - """ +Value = TypeVar("Value") - class P(PropertyClientDescriptor): - name = property_name - type = model - path = property_path or property_name - - if readable: - - def __get__( - self: PropertyClientDescriptor, - obj: Optional[ThingClient] = None, - _objtype: Optional[type[ThingClient]] = None, - ) -> Any: - if obj is None: - return self - return obj.get_property(self.name) - else: - - def __get__( - self: PropertyClientDescriptor, - obj: Optional[ThingClient] = None, - _objtype: Optional[type[ThingClient]] = None, - ) -> Any: - raise ClientPropertyError("This property may not be read.") - __get__.__annotations__["return"] = model - P.__get__ = __get__ # type: ignore[attr-defined] +class ClientProperty(Generic[Value]): + """A descriptor to make properties of ThingClient objects work.""" - # Set __set__ method based on whether writable - if writeable: + def __init__( + self, + name: str, + readable: bool = True, + writeable: bool = True, + doc: str | None = None, + ) -> None: + """Initialise a ClientProperty. - def __set__( - self: PropertyClientDescriptor, obj: ThingClient, value: Any - ) -> None: - obj.set_property(self.name, value) - else: + :param name: The name of the property. + :param writeable: whether the property should be writeable. + """ + self._name = name + self._readable = readable + self._writeable = writeable + if doc: + self.__doc__ = doc + + def __get__( + self, obj: ThingClient | None, cls: type[ThingClient] | None = None + ) -> Value | Self: + """Retrieve the property. + + :param obj: The client object on which the property is accessed. + """ + if obj is None: + return self + if self._readable: + return obj.get_property(self._name) + else: + raise ClientPropertyError("This property may not be read.") - def __set__( - self: PropertyClientDescriptor, obj: ThingClient, value: Any - ) -> None: + def __set__(self, obj: ThingClient, value: Value) -> None | Self: + """Retrieve the property. + + :param obj: The client object on which the property is accessed. + """ + if self._writeable: + return obj.set_property(self._name, value) + else: raise ClientPropertyError("This property may not be set.") - __set__.__annotations__["value"] = model - P.__set__ = __set__ # type: ignore[attr-defined] - if description: - P.__doc__ = description - return P() + +def client_property( + name: str, doc: str | None, writeable: bool = True, readable: bool = True +) -> Any: + return ClientProperty( + name=name, + doc=doc, + writeable=writeable, + readable=readable, + ) def add_action(cls: type[ThingClient], action_name: str, action: dict) -> None: @@ -440,13 +421,13 @@ def add_property(cls: type[ThingClient], property_name: str, property: dict) -> :param property: a dictionary representing the property, in :ref:`wot_td` format. """ + annotation = property.get("type", Any) setattr( cls, property_name, - property_descriptor( - property_name, - property.get("type", Any), - description=property.get("description", None), + ClientProperty[annotation]( + name=property_name, + doc=property.get("description", None), writeable=not property.get("readOnly", False), readable=not property.get("writeOnly", False), ), diff --git a/src/labthings_fastapi/code_generation/__init__.py b/src/labthings_fastapi/code_generation/__init__.py new file mode 100644 index 00000000..b8bd7f71 --- /dev/null +++ b/src/labthings_fastapi/code_generation/__init__.py @@ -0,0 +1,364 @@ +"""Generate client subclasses based on a Thing Description. + +This module generates a subclass of ThingClient specific to one Thing Description. +Said subclass will have a method corresponding to each Action, and a property +corresponding to each Property. + +The subclass is generated by first constructing an Abstract Syntax Tree +using `ast` and then evaluating it. The same code may also be used to write +a module to a file, allowing static analysis. +""" + +import ast +from inspect import cleandoc +import re +from types import NoneType +from typing import Optional, Sequence + +from labthings_fastapi.thing_description._model import ( + DataSchema, + ThingDescription, + Type, +) + + +def title_to_snake_case(title: str) -> str: + """Convert text to snake_case.""" + # First, look for CamelCase so it doesn't get ignored: + uncameled = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", title) + words = re.findall(r"[a-zA-Z0-9]+", uncameled) + return "_".join(w.lower() for w in words) + + +def snake_to_camel_case(snake: str) -> str: + """Convert snake_case to CamelCase.""" + words = snake.split("_") + return "".join(word.capitalize() for word in words) + + +def title_to_camel_case(title: str) -> str: + """Convert text to CamelCase.""" + return snake_to_camel_case(title_to_snake_case(title)) + + +def clean_code(code: str, prefix: str = "") -> str: + """Clean up code by removing leading/trailing whitespace and empty lines.""" + lines = cleandoc(code).split("\n") + return "\n".join([prefix + line for line in lines]) + + +def quoted_docstring(docstring: Optional[str], indent: int = 4) -> str: + """Wrap a docstring in triple quotes.""" + if docstring is None: + return "" + prefix = " " * indent + lines = docstring.split("\n") + lines[0] = f'"""{lines[0]}' + lines.append('"""') + return "".join([f"{prefix}{line}\n" for line in lines]) + + +def _name(name: str) -> ast.Name: + """Generate a Name object (shorthand for `ast.Name`).""" + return ast.Name(id=name, ctx=ast.Load()) + + +def _subscript(value: ast.expr, subscript: ast.expr) -> ast.Subscript: + """Generate a Subscript object (shorthand for `ast.Subscript`).""" + return ast.Subscript(value, subscript, ctx=ast.Load()) + + +def _tuple(*elements: ast.expr) -> ast.Tuple: + """Generate a Tuple (shorthand for `ast.Tuple`).""" + return ast.Tuple(elts=list(elements), ctx=ast.Load()) + + +def _constant(value: str | bool | int | float | NoneType) -> ast.Constant: + """Check a value may be rendered as a constant, and return it.""" + if not isinstance(value, (str, bool, int, float, NoneType)): + raise TypeError("Don't know how to return this as a constant!") + return ast.Constant(value=value) + + +def _import_from(module: str, names: list[str]) -> ast.ImportFrom: + """Import names from a module.""" + return ast.ImportFrom( + module=module, + names=[ast.alias(name=name) for name in names], + level=0, + ) + + +def dataschema_to_type( + schema: DataSchema, models: list[ast.ClassDef], name: str = "anonymous" +) -> ast.expr: + """Convert a DataSchema to a Python type.""" + if isinstance(schema.oneOf, Sequence) and len(schema.oneOf) > 0: + types = [dataschema_to_type(s, models) for s in schema.oneOf] + return _subscript(_name("Union"), _tuple(*types)) + if schema.type == Type.string: + return _name("str") + elif schema.type == Type.integer: + return _name("int") + elif schema.type == Type.number: + return _name("float") + elif schema.type == Type.boolean: + return _name("bool") + elif schema.type == Type.array: + if schema.items is None: + # A dataschema with no `items` member gets typed as a plain list + return _name("list") + if isinstance(schema.items, Sequence): + # The WoT spec is based on JSONSchema 2019 + # If `items` is a sequence, it behaves as `prefixItems` in the 2023 + # draft, i.e. it describes a tuple. + # https://json-schema.org/understanding-json-schema/reference/array#tupleValidation + types = [dataschema_to_type(s, models) for s in schema.items] + return _subscript(_name("tuple"), _tuple(*types)) + return _subscript(_name("list"), dataschema_to_type(schema.items, models)) + elif schema.type == Type.object: + # If the object has no properties, return a generic dict + if not schema.properties: + return _subscript(_name("dict"), _name("Any")) + # Objects with properties are converted to Pydantic models + if schema.title: + model_name = title_to_camel_case(schema.title + "_model") + else: + model_name = snake_to_camel_case(name + "_model") + model_names = [m.name for m in models] + if model_name in model_names: + i = 0 + while f"{model_name}_{i}" in models: + i += 1 + model_name = f"{model_name}_{i}" + models.append(dataschema_to_model(schema, models, model_name)) + return _name(model_name) + else: + return _name("Any") + + +def dataschema_to_model( + schema: DataSchema, models: list[ast.ClassDef], name: str +) -> ast.ClassDef: + """Convert a DataSchema to a Pydantic model.""" + if schema.properties is None: + msg = f"Can't generate a model from schema '{schema}' as it has no properties." + raise TypeError(msg) + + # The class body will consist of one line setting `_model_config` and + # then one annotated assignment for each property. + class_body: list[ast.stmt] = [] + class_body.append( + ast.Assign( + targets=[ast.Name(id="_model_config", ctx=ast.Store())], + value=ast.Dict(keys=[_constant("extra")], values=[_constant("allow")]), + ) + ) + for pname, prop in schema.properties.items(): + # The fields of the model will appear as assignments. If there's no default, + # it's still an assignment as far as `ast` is concerned, but there's no + # value (or equals sign). + has_default = "default" in prop.model_fields_set + class_body.append( + ast.AnnAssign( + target=ast.Name(id=pname, ctx=ast.Store()), + annotation=dataschema_to_type(prop, models, name), + value=_constant(prop.default) if has_default else None, + simple=1, + ) + ) + return ast.ClassDef( + name=name, + bases=[_name("BaseModel")], + keywords=[], + body=class_body, + decorator_list=[], + ) + + +def property_to_argument( + name: str, + property: DataSchema, + models: list[ast.ClassDef], +) -> tuple[ast.arg, ast.expr | None]: + """Convert a property to a function argument and default.""" + dtype = dataschema_to_type(property, models, name) + arg = ast.arg(arg=name, annotation=dtype) + if "default" in property.model_fields_set: + if property.default is None: + default = ast.Constant(None) + elif isinstance(property.default, (str, bool, int, float)): + default = ast.Constant(property.default) + else: + raise NotImplementedError(f"Unsupported default value: {property.default}") + else: + default = None + return arg, default + + +NO_ARGUMENTS = ast.arguments( + posonlyargs=[], + args=[], + vararg=None, + kwonlyargs=[], + kwarg=None, + defaults=[], + kw_defaults=[], +) + + +def input_model_to_arguments( + model: DataSchema, models: list[ast.ClassDef] +) -> ast.arguments: + """Convert an input model to an arguments object. + + :param model: A DataSchema describing the input (of an action). + :param models: A dictionary of Pydantic model definitions we'll need. + :return: an arguments object describing the properties of `model` as + keyword-only arguments. + """ + if model.type is None: + return NO_ARGUMENTS + if model.type != Type.object: + print(f"model.type: {model.type}") + raise TypeError("Only object models are supported") + if not model.properties: + return NO_ARGUMENTS + kwargs = [] + kwdefaults = [] + # Make sure required (i.e. no default) arguments come first. + arg_names = list(model.properties.keys()) + required_names = model.required or [] + for name in required_names: + arg_names.remove(name) + arg_names = required_names + arg_names + for name, property in model.properties.items(): + property = model.properties[name] + arg, default = property_to_argument(name, property, models) + kwargs.append(arg) + # It's possible for required args to have defaults in the schema, + # but we remove these from the method signature. + kwdefaults.append(None if name in required_names else default) + return ast.arguments( + kwonlyargs=kwargs, + kw_defaults=kwdefaults, + posonlyargs=[], + args=[], + vararg=None, + kwarg=None, + defaults=[], + ) + + +def generate_client(thing_description: ThingDescription) -> ast.Module: + """Generate a client from a Thing Description. + + :param thing_description: the Thing Description to use. + """ + # We'll populate this dictionary with auto-generated Pydantic models, + # needed for `DataSchema`s that are typed as `object` + models: list[ast.ClassDef] = [] + + class_name = title_to_camel_case(thing_description.title) + + # We create the body of the class as a list of statments. This will be + # included in the class definition later. + class_body: list[ast.stmt] = [] + + if thing_description.description: + doc = thing_description.description + else: + doc = f"A client for the {thing_description.title} Thing." + # The class docstring appears as a constant at the top + class_body.append(ast.Expr(value=_constant(doc))) + + # Each property will be an assignment of the form + # `propname: ValueType = client_property(...)` + properties = thing_description.properties or {} + for name, property in properties.items(): + pname = title_to_snake_case(name) + dtype = dataschema_to_type(property, models=models) + class_body.append( + ast.AnnAssign( + target=ast.Name(id=name, ctx=ast.Store()), + annotation=dtype, + value=ast.Call( + func=_name("client_property"), + args=[], + keywords=[ + ast.keyword(arg="name", value=_constant(pname)), + ast.keyword(arg="doc", value=_constant(property.description)), + ast.keyword( + arg="readable", value=_constant(not property.readOnly) + ), + ast.keyword( + arg="writeable", value=_constant(not property.writeOnly) + ), + ], + ), + simple=1, + ) + ) + + # Each action will be a method definition, with appropriately typed arguments and + # return values. These are then passed straight through to `self.invoke_action` + actions = thing_description.actions or {} + for name, action in actions.items(): + aname = title_to_snake_case(name) + if action.input: + args = input_model_to_arguments(action.input, models) + else: + args = NO_ARGUMENTS + if action.output: + rtype = dataschema_to_type(action.output, models) + else: + rtype = _name("Any") + # The function body simply passes the arguments through to `invoke_action`. + function_body: list[ast.stmt] = [ + ast.Return( + value=ast.Call( + func=ast.Attribute( + value=_name("self"), attr="invoke_action", ctx=ast.Load() + ), + args=[_constant(name)], + keywords=[ + ast.keyword(arg=arg.arg, value=_name(arg.arg)) + for arg in args.kwonlyargs + ], + ) + ) + ] + class_body.append( + ast.FunctionDef( + name=aname, + args=args, + body=function_body, + decorator_list=[], + returns=rtype, + ) + ) + + # The class definition is here: this includes `class_body` defined above, with the + # actions/properties. + class_definition = ast.ClassDef( + name=f"{class_name}Client", + bases=[_name("ThingClient")], + keywords=[], + body=class_body, + decorator_list=[], + ) + + # The module we want to create starts here: + # Import the symbols we'll need + code: list[ast.stmt] = [] + code.append( + _import_from("labthings_fastapi.client", ["ThingClient", "client_property"]) + ) + code.append(_import_from("typing", ["Any", "Union"])) + code.append(_import_from("pydantic", ["BaseModel"])) + + code += models + + code.append(class_definition) + + return ast.Module(body=code, type_ignores=[]) diff --git a/tests/test_client_generation.py b/tests/test_client_generation.py new file mode 100644 index 00000000..3a44bb8d --- /dev/null +++ b/tests/test_client_generation.py @@ -0,0 +1,87 @@ +import ast +import os +import tempfile +import importlib.util + +from pydantic import BaseModel + +from labthings_fastapi.code_generation import generate_client +import labthings_fastapi.code_generation as cg +from labthings_fastapi.example_things import MyThing +from labthings_fastapi.testing import create_thing_without_server +import labthings_fastapi as lt + + +def test_title_to_snake_case(): + assert cg.title_to_snake_case("CamelCase") == "camel_case" + assert cg.title_to_snake_case("Camel") == "camel" + assert cg.title_to_snake_case("camel") == "camel" + assert cg.title_to_snake_case("CAMEL") == "camel" + assert cg.title_to_snake_case("CamelCASE") == "camel_case" + + +def test_snake_to_camel_case(): + assert cg.snake_to_camel_case("snake_case") == "SnakeCase" + assert cg.snake_to_camel_case("snake") == "Snake" + assert cg.snake_to_camel_case("SNAKE") == "Snake" + assert cg.snake_to_camel_case("snakeCASE_word") == "SnakecaseWord" + + +def generate_and_verify(thing): + td = create_thing_without_server(thing).thing_description() + tree = generate_client(td) + ast.fix_missing_locations(tree) + code = ast.unparse(tree) + with tempfile.TemporaryDirectory() as d: + fname = os.path.join(d, "client.py") + with open(fname, "w") as f: + f.write(code) + spec = importlib.util.spec_from_file_location("client", f.name) + assert spec is not None + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + spec.loader.exec_module(module) + assert f"{thing.__name__}Client" in dir(module) + + +def test_mything_generation(): + generate_and_verify(MyThing) + + +class TestModel(BaseModel): + a: int + b: str + + +class NestedModel(BaseModel): + c: TestModel + + +class ThingWithModels(lt.Thing): + @lt.property + def prop1(self) -> TestModel: + return TestModel(a=1, b="test") + + @lt.action + def action1(self, arg1: TestModel) -> TestModel: + return arg1 + + @lt.property + def prop2(self) -> NestedModel: + return NestedModel(c=TestModel(a=1, b="test")) + + +def test_with_models(): + generate_and_verify(ThingWithModels) + + +if __name__ == "__main__": + td = create_thing_without_server(ThingWithModels).thing_description() + print("Thing Description:") + print(td.model_dump_json(indent=2, exclude_unset=True)) + print("\nGenerated AST:") + ast_module = generate_client(td) + print(ast.dump(ast_module, indent=4)) + print("\nGenerated module:") + ast.fix_missing_locations(ast_module) + print(ast.unparse(ast_module))