diff --git a/CHANGELOG.md b/CHANGELOG.md index bf40591d..a9b0cabb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ and start a new "In Progress" section above it. - Support experimental `corsa_compress_v2` and `corsa_decompress_v2` processes ([Open-EO/openeo-geotrellis-extensions#702](https://github.com/Open-EO/openeo-geotrellis-extensions/issues/702)) - Dry run: pass through `align` argument of `resample_spatial` operation ([Open-EO/openeo-geopyspark-driver#1662](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1662)) - `BoundingBox.round_to_resolution()`: add `offset_x` and `offset_y` parameters ([Open-EO/openeo-geopyspark-driver#1662](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1662)) +- Support logging added value for synchronous requests ([Open-EO/openeo-geopyspark-driver#1436](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1436)) ## 0.138.0 diff --git a/openeo_driver/_version.py b/openeo_driver/_version.py index b8778285..566f308b 100644 --- a/openeo_driver/_version.py +++ b/openeo_driver/_version.py @@ -1 +1 @@ -__version__ = "0.139.0a8" +__version__ = "0.139.0a9" diff --git a/openeo_driver/backend.py b/openeo_driver/backend.py index f6c805eb..980da246 100644 --- a/openeo_driver/backend.py +++ b/openeo_driver/backend.py @@ -19,6 +19,7 @@ from typing import List, Union, NamedTuple, Dict, Optional, Callable, Iterable, Container, Any, Tuple import flask +from openeo_driver.processgraph.definitions import ProcessGraphFlatDict import openeo_driver.util.view_helpers from openeo.utils.version import ComparableVersion @@ -786,7 +787,7 @@ class Processing(MicroService): def get_process_registry(self, api_version: Union[str, ComparableVersion]) -> ProcessRegistry: raise NotImplementedError - def evaluate(self, process_graph: dict, env: EvalEnv = None): + def evaluate(self, process_graph: dict, env: EvalEnv = None, do_dry_run: Union[bool, DryRunDataTracer] = True): """Evaluate given process graph (flat dict format).""" raise NotImplementedError @@ -1063,6 +1064,8 @@ def request_costs( job_options: Union[dict, None] = None, request_id: str, success: bool, + process_graph: Union[ProcessGraphFlatDict, None] = None, + tracer: Union[DryRunDataTracer, None] = None, ) -> Optional[float]: """ Report resource usage of (current) synchronous processing request and get associated cost. diff --git a/openeo_driver/processgraph/registry.py b/openeo_driver/processgraph/registry.py index 24819bea..556a76ba 100644 --- a/openeo_driver/processgraph/registry.py +++ b/openeo_driver/processgraph/registry.py @@ -13,6 +13,7 @@ from openeo.utils.version import ComparableVersion from openeo_driver.backend import OpenEoBackendImplementation, Processing +from openeo_driver.dry_run import DryRunDataTracer from openeo_driver.errors import OpenEOApiException from openeo_driver.processes import DEFAULT_NAMESPACE, ProcessArgs, ProcessRegistry, ProcessSpec from openeo_driver.specs import SPECS_ROOT @@ -238,7 +239,7 @@ def get_basic_env(self, api_version: str = OPENEO_API_VERSION_DEFAULT) -> EvalEn } ) - def evaluate(self, process_graph: dict, env: EvalEnv = None): + def evaluate(self, process_graph: dict, env: EvalEnv = None, do_dry_run: Union[bool, DryRunDataTracer] = True): from openeo_driver.processgraph.evaluator import evaluate return evaluate(process_graph=process_graph, env=env or self.get_basic_env(), do_dry_run=False) @@ -257,9 +258,10 @@ def get_process_registry(self, api_version: Union[str, ComparableVersion]) -> Pr else: raise OpenEOApiException(message=f"No process support for openEO version {api_version}") - def evaluate(self, process_graph: dict, env: EvalEnv = None): + def evaluate(self, process_graph: dict, env: EvalEnv = None, do_dry_run: Union[bool, DryRunDataTracer] = True): from openeo_driver.processgraph.evaluator import evaluate - return evaluate(process_graph=process_graph, env=env) + + return evaluate(process_graph=process_graph, env=env, do_dry_run=do_dry_run) def validate(self, process_graph: dict, env: EvalEnv = None): from openeo_driver.processgraph.evaluator import evaluate, _collect_end_nodes, convert_node @@ -304,5 +306,3 @@ def validate(self, process_graph: dict, env: EvalEnv = None): def extra_validation(self, process_graph, env, result, source_constraints): return [] - - diff --git a/openeo_driver/views.py b/openeo_driver/views.py index 846393b2..48f7e7ae 100644 --- a/openeo_driver/views.py +++ b/openeo_driver/views.py @@ -55,6 +55,7 @@ LINK_REL, ) from openeo_driver.datacube import DriverMlModel +from openeo_driver.dry_run import DryRunDataTracer from openeo_driver.errors import ( FeatureUnsupportedException, FilePathInvalidException, @@ -725,8 +726,22 @@ def result(user: User): } ) + tracer = DryRunDataTracer() + + request_costs = functools.partial( + backend_implementation.request_costs, + user=user, + job_options=job_options, + request_id=request_id, + process_graph=process_graph, + tracer=tracer, + ) + try: - result = backend_implementation.processing.evaluate(process_graph=process_graph, env=env) + result = backend_implementation.processing.evaluate( + process_graph=copy.deepcopy(process_graph), env=env, do_dry_run=tracer + ) + _log.info(f"`POST /result`: {type(result)}") if result is None: @@ -742,18 +757,14 @@ def result(user: User): result = to_save_result(data=result) response = result.create_flask_response() - costs = backend_implementation.request_costs( - success=True, user=user, request_id=request_id, job_options=job_options - ) + costs = request_costs(success=True) if costs: # TODO not all costs are accounted for so don't expose in "OpenEO-Costs" yet response.headers["OpenEO-Costs-experimental"] = costs except Exception: # TODO: also send "OpenEO-Costs" header on failure - backend_implementation.request_costs( - success=False, user=user, request_id=request_id, job_options=job_options - ) + request_costs(success=False) raise # Add request id as "OpenEO-Identifier" like we do for batch jobs. diff --git a/tests/test_views_execute.py b/tests/test_views_execute.py index faa522a0..8dd1e1a1 100644 --- a/tests/test_views_execute.py +++ b/tests/test_views_execute.py @@ -22,7 +22,7 @@ from openeo_driver.datacube import DriverDataCube, DriverVectorCube from openeo_driver.datastructs import ResolutionMergeArgs, SarBackscatterArgs -from openeo_driver.dry_run import ProcessType +from openeo_driver.dry_run import ProcessType, DryRunDataTracer from openeo_driver.dummy import dummy_backend from openeo_driver.dummy.dummy_backend import DummyVisitor from openeo_driver.errors import ( @@ -4308,13 +4308,13 @@ def test_vector_buffer_returns_error_on_empty_result_geometry(api): (None, None, None), # request_costs override ( - lambda user, request_id, success, job_options: 1234 + isinstance(user, User), + lambda user, job_options, request_id, success, process_graph, tracer: 1234 + isinstance(user, User), None, "1235", ), # Extra job options handling ( - lambda user, request_id, success, job_options: 1234 * job_options.get("extra", 0), + lambda user, job_options, request_id, success, process_graph, tracer: 1234 * job_options.get("extra", 0), {"extra": 2}, "2468", ), @@ -4363,6 +4363,8 @@ def test_synchronous_processing_request_costs( job_options=job_options, success=success, request_id="r-abc123", + process_graph=pg, + tracer=dirty_equals.IsInstance(DryRunDataTracer), )