diff --git a/docs/content/en/open_source/upgrading/2.60.md b/docs/content/en/open_source/upgrading/2.60.md index e7811aa0b99..529dd8ce54c 100644 --- a/docs/content/en/open_source/upgrading/2.60.md +++ b/docs/content/en/open_source/upgrading/2.60.md @@ -2,6 +2,45 @@ title: 'Upgrading to DefectDojo Version 2.60.x' toc_hide: true weight: -20260601 -description: No special instructions. +description: New deduplication execution mode for import/reimport. --- -There are no special instructions for upgrading to 2.60.x. Check the [Release Notes](https://github.com/DefectDojo/django-DefectDojo/releases/tag/2.60.0) for the contents of the release. + +## Deduplication execution mode for import/reimport + +This release adds a new `deduplication_execution_mode` setting that controls how +import/reimport deduplication post-processing is dispatched and whether the API +response waits for it. It can be set per user (profile) and overridden per request +on the import and reimport endpoints. + +Modes: + +- `async` (default): deduplication and the rest of post-processing are dispatched + to the background and the response returns immediately. This is the historical + behavior; nothing changes for existing users. +- `async_wait`: post-processing is still dispatched to the background, but the + request waits for deduplication to finish before responding. As a result the + `scan_added` notification and the statistics in the import/reimport response + reflect the deduplicated state (findings that turned out to be duplicates are + no longer counted/listed as new). JIRA push, product grading and other + non-deduplication tasks remain asynchronous and are not awaited. +- `sync`: import deduplication runs inline in the web request. + +The wait in `async_wait` is bounded by the new `DD_DEDUPLICATION_ASYNC_WAIT_TIMEOUT` +environment variable (default `60` seconds). If no worker picks up the work within +the timeout, the request responds anyway (degrading to the `async` outcome) rather +than hanging. + +The import/reimport response now also includes a `deduplication_complete` boolean +indicating whether deduplication had finished by the time the response was produced. + +### Relationship to `block_execution` + +The existing `block_execution` profile flag is unchanged. It remains the global +switch that forces **all** of a user's asynchronous tasks (notifications, JIRA +push, product grading, deduplication, ...) to run in the foreground. +`deduplication_execution_mode` is independent and narrower — it only affects +import/reimport deduplication post-processing. A user who has `block_execution` +enabled continues to get fully synchronous imports; the upgrade migration seeds +their `deduplication_execution_mode` to `sync` so behavior is unchanged. + +No action is required to upgrade. Check the [Release Notes](https://github.com/DefectDojo/django-DefectDojo/releases/tag/2.60.0) for the contents of the release. diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index dc15ac6c2dc..42b2912f760 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -41,6 +41,7 @@ from dojo.jira import services as jira_services from dojo.location.models import Location, LocationFindingReference from dojo.models import ( + DEDUPLICATION_EXECUTION_MODE_CHOICES, IMPORT_ACTIONS, SEVERITIES, SEVERITY_CHOICES, @@ -1868,6 +1869,16 @@ class CommonImportScanSerializer(serializers.Serializer): allow_null=True, default=None, queryset=User.objects.all(), ) push_to_jira = serializers.BooleanField(default=False) + deduplication_execution_mode = serializers.ChoiceField( + required=False, + allow_null=True, + choices=DEDUPLICATION_EXECUTION_MODE_CHOICES, + help_text="Override how import post-processing (deduplication, jira push, grading, ...) is executed for " + "this request. 'async' dispatches post-processing to the background and responds immediately (default). " + "'async_wait' dispatches to the background but waits for deduplication to finish before responding, so " + "notifications and the returned statistics reflect the deduplicated state. 'sync' runs everything inline. " + "If omitted, falls back to the user's profile setting (deduplication_execution_mode).", + ) environment = serializers.CharField(required=False) build_id = serializers.CharField( required=False, help_text="ID of the build that was scanned.", @@ -1913,6 +1924,14 @@ class CommonImportScanSerializer(serializers.Serializer): help_text=_("Also referred to as 'Organization' ID."), ) statistics = ImportStatisticsSerializer(read_only=True, required=False) + deduplication_complete = serializers.BooleanField( + read_only=True, + required=False, + help_text="Whether deduplication had finished by the time this response was produced. " + "True for 'sync' and for 'async_wait' when deduplication completed within the timeout; " + "False for 'async' (deduplication is still running in the background) or when an " + "'async_wait' import timed out waiting for it.", + ) pro = serializers.ListField(read_only=True, required=False) apply_tags_to_findings = serializers.BooleanField( help_text="If set to True, the tags will be applied to the findings", @@ -1971,6 +1990,7 @@ def process_scan( data["product_id"] = test.engagement.product.id data["product_type_id"] = test.engagement.product.prod_type.id data["statistics"] = {"after": test.statistics} + data["deduplication_complete"] = importer.deduplication_complete duration = time.perf_counter() - start_time LargeScanSizeProductAnnouncement(response_data=data, duration=duration) ScanTypeProductAnnouncement(response_data=data, scan_type=context.get("scan_type")) @@ -2069,6 +2089,14 @@ def setup_common_context(self, data: dict) -> dict: if eng_end_date: context["target_end"] = context.get("engagement_end_date") + # Resolve the effective import execution mode: request override (if any) + # takes precedence over the user's profile setting, otherwise default async. + request = self.context.get("request") + user = getattr(request, "user", None) + context["deduplication_execution_mode"] = Dojo_User.resolve_deduplication_execution_mode( + user, data.get("deduplication_execution_mode"), + ) + return context @@ -2242,11 +2270,11 @@ def process_scan( try: logger.debug(f"process_scan called with context: {context}") start_time = time.perf_counter() + processor = None if test := context.get("test"): statistics_before = test.statistics - context["test"], _, _, _, _, _, test_import = self.get_reimporter( - **context, - ).process_scan( + processor = self.get_reimporter(**context) + context["test"], _, _, _, _, _, test_import = processor.process_scan( context.pop("scan", None), ) if test_import: @@ -2258,9 +2286,10 @@ def process_scan( # Do not close old findings when creating a brand new test: there are no # existing findings to compare against, and close_old_findings would # incorrectly close findings from other tests in the same scope. - context["test"], _, _, _, _, _, _ = self.get_importer( + processor = self.get_importer( **{**context, "close_old_findings": False}, - ).process_scan( + ) + context["test"], _, _, _, _, _, _ = processor.process_scan( context.pop("scan", None), ) else: @@ -2279,6 +2308,8 @@ def process_scan( if statistics_delta: data["statistics"]["delta"] = statistics_delta data["statistics"]["after"] = test.statistics + if processor is not None: + data["deduplication_complete"] = processor.deduplication_complete duration = time.perf_counter() - start_time LargeScanSizeProductAnnouncement(response_data=data, duration=duration) ScanTypeProductAnnouncement(response_data=data, scan_type=context.get("scan_type")) diff --git a/dojo/celery_dispatch.py b/dojo/celery_dispatch.py index 38a3b4fa06c..c4eee2aef08 100644 --- a/dojo/celery_dispatch.py +++ b/dojo/celery_dispatch.py @@ -61,10 +61,11 @@ def dojo_dispatch_task(task_or_sig: _SupportsSi | _SupportsApplyAsync | Signatur - Inject `async_user_id` if missing. - Capture and inject pghistory context if available. - - Respect `force_sync=True` (foreground execution) and user `block_execution`. + - Respect `force_sync=True` (foreground execution) and the user's + block_execution flag. - Respect `force_async=True` (background execution even when the caller - would otherwise run synchronously, e.g. user has `block_execution`). - `force_async` wins over `force_sync` and `block_execution`. + would otherwise run synchronously, e.g. user has block_execution). + `force_async` wins over `force_sync` and block_execution. - Support `countdown=` for async dispatch. Returns: diff --git a/dojo/db_migrations/0269_usercontactinfo_deduplication_execution_mode.py b/dojo/db_migrations/0269_usercontactinfo_deduplication_execution_mode.py new file mode 100644 index 00000000000..94f1e29f255 --- /dev/null +++ b/dojo/db_migrations/0269_usercontactinfo_deduplication_execution_mode.py @@ -0,0 +1,16 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('dojo', '0268_release_authorization_to_pro'), + ] + + operations = [ + migrations.AddField( + model_name='usercontactinfo', + name='deduplication_execution_mode', + field=models.CharField(blank=True, choices=[('async', 'Async (do not wait)'), ('async_wait', 'Async, wait for deduplication'), ('sync', 'Synchronous (block)')], help_text="Controls how import/reimport deduplication post-processing is executed. 'Async' dispatches it to the background and returns immediately (default). 'Async, wait for deduplication' dispatches to the background but waits for deduplication to finish before responding, so notifications and statistics reflect the deduplicated state. 'Synchronous' runs the import deduplication inline. Can be overridden per request. Independent of block_execution, which forces all async tasks (notifications, jira, ...) to the foreground.", max_length=20, null=True), + ), + ] diff --git a/dojo/db_migrations/0270_seed_deduplication_execution_mode.py b/dojo/db_migrations/0270_seed_deduplication_execution_mode.py new file mode 100644 index 00000000000..7e56f1674f4 --- /dev/null +++ b/dojo/db_migrations/0270_seed_deduplication_execution_mode.py @@ -0,0 +1,30 @@ +from django.db import migrations + + +def seed_deduplication_execution_mode(apps, schema_editor): + """ + Seed the new import deduplication execution mode from the legacy block_execution flag. + + block_execution remains the global "run all async tasks in the foreground" switch; + users who had it enabled get the synchronous deduplication mode so import behavior is + unchanged for them. + """ + UserContactInfo = apps.get_model("dojo", "UserContactInfo") + UserContactInfo.objects.filter(block_execution=True).update(deduplication_execution_mode="sync") + + +def unseed_deduplication_execution_mode(apps, schema_editor): + """Reverse: clear the seeded synchronous mode.""" + UserContactInfo = apps.get_model("dojo", "UserContactInfo") + UserContactInfo.objects.filter(deduplication_execution_mode="sync").update(deduplication_execution_mode=None) + + +class Migration(migrations.Migration): + + dependencies = [ + ('dojo', '0269_usercontactinfo_deduplication_execution_mode'), + ] + + operations = [ + migrations.RunPython(seed_deduplication_execution_mode, unseed_deduplication_execution_mode), + ] diff --git a/dojo/engagement/views.py b/dojo/engagement/views.py index 0154aa5d336..bba6323b260 100644 --- a/dojo/engagement/views.py +++ b/dojo/engagement/views.py @@ -954,6 +954,10 @@ def process_form( "create_finding_groups_for_all_findings": form.cleaned_data.get("create_finding_groups_for_all_findings", None), "environment": self.get_development_environment(environment_name=form.cleaned_data.get("environment")), }) + # Honor the user's profile deduplication_execution_mode for UI imports. The API resolves + # this in the serializer; the UI has no per-import selector, so fall back to the profile + # (or block_execution) instead of silently defaulting to async. + context["deduplication_execution_mode"] = Dojo_User.resolve_deduplication_execution_mode(request.user) # Create the engagement if necessary self.create_engagement(context) # close_old_findings_product_scope is a modifier of close_old_findings. diff --git a/dojo/finding/helper.py b/dojo/finding/helper.py index 7808fdd7cf5..d89c9908c91 100644 --- a/dojo/finding/helper.py +++ b/dojo/finding/helper.py @@ -456,7 +456,13 @@ def post_process_finding_save_internal(finding, dedupe_option=True, rules_option jira_services.push(finding.finding_group) -@app.task +# ignore_result=False so the 'async_wait' import execution mode can join on the +# dispatched batch via AsyncResult.get() even when CELERY_TASK_IGNORE_RESULT=True. +# NOTE: this override may not be strictly necessary — in the past .get()/await +# appears to have worked with the global CELERY_TASK_IGNORE_RESULT=True and a +# Redis broker. Needs verification against a real broker/worker setup; if join +# works without it, this override can be removed to avoid storing extra results. +@app.task(ignore_result=False) def post_process_findings_batch( finding_ids, *args, diff --git a/dojo/forms.py b/dojo/forms.py index e33d7ef51c4..599746763df 100644 --- a/dojo/forms.py +++ b/dojo/forms.py @@ -2393,7 +2393,7 @@ class Meta: # Swap order: password_last_reset before token_last_reset field_order = [ "title", "phone_number", "cell_number", "twitter_username", "github_username", - "slack_username", "ui_use_tailwind", "block_execution", "force_password_reset", "reset_api_token", + "slack_username", "ui_use_tailwind", "block_execution", "deduplication_execution_mode", "force_password_reset", "reset_api_token", "password_last_reset", "token_last_reset", ] diff --git a/dojo/importers/base_importer.py b/dojo/importers/base_importer.py index d87524185fe..281e9f9a125 100644 --- a/dojo/importers/base_importer.py +++ b/dojo/importers/base_importer.py @@ -16,6 +16,8 @@ from dojo.jira.services import is_keep_in_sync from dojo.location.models import Location from dojo.models import ( + DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, + DEDUPLICATION_EXECUTION_MODE_SYNC, # Import History States IMPORT_CLOSED_FINDING, IMPORT_CREATED_FINDING, @@ -77,6 +79,79 @@ def __init__( and will raise a `NotImplemented` exception """ ImporterOptions.__init__(self, *args, **kwargs) + # Handles for async post-processing tasks to await in 'async_wait' mode. + # Set after ImporterOptions.__init__ so it stays out of field_names + # (and the compress/decompress cycle used for async dispatch). + self.post_processing_results = [] + # Whether deduplication is known to be finished by the time the response + # is built. True for 'sync' (ran inline) and for 'async_wait' when all + # batches completed within the timeout; False for 'async' (dispatched, + # not awaited) or when an 'async_wait' join timed out/errored. + self.deduplication_complete = False + + def post_processing_dispatch_kwargs(self, **kwargs): + """ + Translate the resolved import execution mode into the force flags that + dojo_dispatch_task understands: + - SYNC: run inline in the web process (force_sync). + - ASYNC_WAIT: guarantee background dispatch (force_async) so we get a + handle to await, regardless of the user's profile mode. + - ASYNC (default): preserve historical behavior, honoring any externally + supplied force_sync and the user's sync mode via we_want_async. + """ + if self.deduplication_execution_mode == DEDUPLICATION_EXECUTION_MODE_SYNC: + return {"force_sync": True} + if self.deduplication_execution_mode == DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT: + return {"force_async": True} + return {"force_sync": kwargs.get("force_sync", False)} + + def record_post_processing_result(self, result): + """ + Remember an async post-processing dispatch handle so it can be awaited + later when running in the 'async_wait' execution mode. No-op for the + other modes (no handle is recorded by the caller). + """ + if not hasattr(self, "post_processing_results"): + self.post_processing_results = [] + if result is not None: + self.post_processing_results.append(result) + + def wait_for_post_processing(self): + """ + Block until the deduplication (and other batch) post-processing tasks + dispatched during this import have finished, so notifications and the + returned statistics reflect the deduplicated state. + + Only relevant in the 'async_wait' execution mode; bounded by + settings.DEDUPLICATION_ASYNC_WAIT_TIMEOUT so a stuck/missing worker degrades + to the historical (respond-anyway) behavior instead of hanging. + """ + if self.deduplication_execution_mode == DEDUPLICATION_EXECUTION_MODE_SYNC: + # Batches ran inline during process_findings, so dedup is already done. + self.deduplication_complete = True + return + if self.deduplication_execution_mode != DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT: + # 'async': post-processing was dispatched but is not awaited. + self.deduplication_complete = False + return + results = getattr(self, "post_processing_results", None) or [] + if not results: + # Nothing was dispatched (e.g. empty import) — dedup is trivially done. + self.deduplication_complete = True + return + timeout = getattr(settings, "DEDUPLICATION_ASYNC_WAIT_TIMEOUT", 60) + logger.debug("async_wait: waiting for %d post-processing task(s) (timeout=%ss)", len(results), timeout) + success = True + for result in results: + if result is None or not hasattr(result, "get"): + continue + try: + result.get(timeout=timeout, propagate=False) + except Exception as e: + logger.warning("async_wait: error/timeout while waiting for post-processing task: %s", e) + success = False + self.deduplication_complete = success + self.post_processing_results = [] def check_child_implementation_exception(self): """ @@ -864,10 +939,49 @@ def notify_scan_added( new_findings = [] logger.debug("Scan added notifications") + # When deduplication has finished (synchronous mode, or async_wait after the + # join), the in-memory findings still carry their pre-dedup duplicate=False + # flag because deduplication runs on separately-fetched instances. Refresh the + # flag from the database and split each list into "real" and duplicate findings + # so the notification reflects post-dedup reality instead of counting/listing + # deduplicated findings as brand new. In plain async mode dedup has not run yet, + # so we leave the lists untouched (best-effort, historical behavior). + findings_new_duplicate: list[Finding] = [] + findings_reactivated_duplicate: list[Finding] = [] + findings_untouched_duplicate: list[Finding] = [] + if getattr(self, "deduplication_complete", False): + all_ids = [f.id for f in (*new_findings, *findings_reactivated, *findings_untouched)] + duplicate_ids = set() + if all_ids: + duplicate_ids = set( + Finding.objects.filter(id__in=all_ids, duplicate=True).values_list("id", flat=True), + ) + + def _split(findings): + kept, duplicates = [], [] + for finding in findings: + if finding.id in duplicate_ids: + # refresh the in-memory flag so any template logic is correct + finding.duplicate = True + duplicates.append(finding) + else: + kept.append(finding) + return kept, duplicates + + new_findings, findings_new_duplicate = _split(new_findings) + findings_reactivated, findings_reactivated_duplicate = _split(findings_reactivated) + findings_untouched, findings_untouched_duplicate = _split(findings_untouched) + # Recompute the headline count to exclude findings that turned out to be + # duplicates of an existing finding (they are not genuinely new activity). + updated_count = len(new_findings) + len(findings_reactivated) + len(findings_mitigated) + new_findings = sorted(new_findings, key=lambda x: x.numerical_severity) findings_mitigated = sorted(findings_mitigated, key=lambda x: x.numerical_severity) findings_reactivated = sorted(findings_reactivated, key=lambda x: x.numerical_severity) findings_untouched = sorted(findings_untouched, key=lambda x: x.numerical_severity) + findings_new_duplicate = sorted(findings_new_duplicate, key=lambda x: x.numerical_severity) + findings_reactivated_duplicate = sorted(findings_reactivated_duplicate, key=lambda x: x.numerical_severity) + findings_untouched_duplicate = sorted(findings_untouched_duplicate, key=lambda x: x.numerical_severity) title = ( f"Created/Updated {updated_count} findings for {test.engagement.product}: {test.engagement.name}: {test}" @@ -884,6 +998,11 @@ def notify_scan_added( engagement=test.engagement, product=test.engagement.product, findings_untouched=findings_untouched, + # Findings deduplicated during post-processing, split by their import action. + # Populated only once deduplication has completed (sync / async_wait). + findings_new_duplicate=findings_new_duplicate, + findings_reactivated_duplicate=findings_reactivated_duplicate, + findings_untouched_duplicate=findings_untouched_duplicate, url=reverse("view_test", args=(test.id,)), url_api=reverse("test-detail", args=(test.id,)), ) diff --git a/dojo/importers/default_importer.py b/dojo/importers/default_importer.py index 3a920577d2d..203d919ce67 100644 --- a/dojo/importers/default_importer.py +++ b/dojo/importers/default_importer.py @@ -12,6 +12,7 @@ from dojo.importers.options import ImporterOptions from dojo.jira import services as jira_services from dojo.models import ( + DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, Engagement, Finding, Test, @@ -134,6 +135,10 @@ def process_scan( new_findings=new_findings, closed_findings=closed_findings, ) + # In 'async_wait' mode, block until background deduplication has finished + # so notifications and statistics reflect the deduplicated state. + self.wait_for_post_processing() + # Send out some notifications to the user logger.debug("IMPORT_SCAN: Generating notifications") dojo_dispatch_task( @@ -290,7 +295,7 @@ def _process_findings_internal( batch_finding_ids.clear() logger.debug("process_findings: dispatching batch with push_to_jira=%s (batch_size=%d, is_final=%s)", push_to_jira, len(finding_ids_batch), is_final_finding) - dojo_dispatch_task( + result = dojo_dispatch_task( finding_helper.post_process_findings_batch, finding_ids_batch, dedupe_option=True, @@ -298,8 +303,10 @@ def _process_findings_internal( product_grading_option=True, issue_updater_option=True, push_to_jira=push_to_jira, - force_sync=kwargs.get("force_sync", False), + **self.post_processing_dispatch_kwargs(**kwargs), ) + if self.deduplication_execution_mode == DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT: + self.record_post_processing_result(result) # No chord: tasks are dispatched immediately above per batch diff --git a/dojo/importers/default_reimporter.py b/dojo/importers/default_reimporter.py index aa33c6153b0..83ee9386945 100644 --- a/dojo/importers/default_reimporter.py +++ b/dojo/importers/default_reimporter.py @@ -18,6 +18,7 @@ from dojo.importers.options import ImporterOptions from dojo.jira import services as jira_services from dojo.models import ( + DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, Development_Environment, Finding, Notes, @@ -141,6 +142,9 @@ def process_scan( ) # Send out som notifications to the user logger.debug("REIMPORT_SCAN: Generating notifications") + # In 'async_wait' mode, block until background deduplication has finished + # so notifications and statistics reflect the deduplicated state. + self.wait_for_post_processing() updated_count = ( len(closed_findings) + len(reactivated_findings) + len(new_findings) ) @@ -455,7 +459,7 @@ def _process_findings_internal( batch_findings.clear() finding_ids_batch = list(batch_finding_ids) batch_finding_ids.clear() - dojo_dispatch_task( + result = dojo_dispatch_task( finding_helper.post_process_findings_batch, finding_ids_batch, dedupe_option=True, @@ -464,8 +468,10 @@ def _process_findings_internal( issue_updater_option=True, push_to_jira=push_to_jira, jira_instance_id=getattr(self.jira_instance, "id", None), - force_sync=kwargs.get("force_sync", False), + **self.post_processing_dispatch_kwargs(**kwargs), ) + if self.deduplication_execution_mode == DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT: + self.record_post_processing_result(result) # No chord: tasks are dispatched immediately above per batch diff --git a/dojo/importers/options.py b/dojo/importers/options.py index 02cecff1113..d25f1a68b29 100644 --- a/dojo/importers/options.py +++ b/dojo/importers/options.py @@ -12,6 +12,9 @@ from dojo.jira.services import get_instance as get_jira_instance from dojo.models import ( + DEDUPLICATION_EXECUTION_MODE_ASYNC, + DEDUPLICATION_EXECUTION_MODE_SYNC, + DEDUPLICATION_EXECUTION_MODES, Development_Environment, Dojo_User, Endpoint, @@ -68,6 +71,7 @@ def load_base_options( self.engagement: Engagement | None = self.validate_engagement(*args, **kwargs) self.environment: Development_Environment | None = self.validate_environment(*args, **kwargs) self.group_by: str = self.validate_group_by(*args, **kwargs) + self.deduplication_execution_mode: str = self.validate_deduplication_execution_mode(*args, **kwargs) self.import_type: str = self.validate_import_type(*args, **kwargs) self.lead: Dojo_User | None = self.validate_lead(*args, **kwargs) self.minimum_severity: str = self.validate_minimum_severity(*args, **kwargs) @@ -345,6 +349,25 @@ def validate_do_not_reactivate( **kwargs, ) + def validate_deduplication_execution_mode( + self, + *args: list, + **kwargs: dict, + ) -> str: + mode = self.validate( + "deduplication_execution_mode", + expected_types=[str], + required=False, + default=DEDUPLICATION_EXECUTION_MODE_ASYNC, + **kwargs, + ) + if mode not in DEDUPLICATION_EXECUTION_MODES: + mode = DEDUPLICATION_EXECUTION_MODE_ASYNC + # An explicit force_sync from a non-serializer caller still wins. + if kwargs.get("force_sync"): + mode = DEDUPLICATION_EXECUTION_MODE_SYNC + return mode + def validate_commit_hash( self, *args: list, diff --git a/dojo/middleware.py b/dojo/middleware.py index a576244312a..127ab462a09 100644 --- a/dojo/middleware.py +++ b/dojo/middleware.py @@ -281,8 +281,8 @@ def _drain_search_context_to_async(objects, source): for model_name, pk_list in model_groups.items(): batches = [pk_list[i:i + batch_size] for i in range(0, len(pk_list), batch_size)] # force_async=True keeps indexing off the request path even for users - # with block_execution=True — index updates are slow and never need - # to be synchronous from the user's perspective. + # with block_execution=True — index updates are slow and + # never need to be synchronous from the user's perspective. for i, batch in enumerate(batches, 1): logger.debug(f"{source}: Triggering batch {i}/{len(batches)} for {model_name}: {len(batch)} instances") dojo_dispatch_task(update_watson_search_index_for_model, model_name, batch, force_async=True) diff --git a/dojo/models.py b/dojo/models.py index a41f5640889..f7881e6d358 100644 --- a/dojo/models.py +++ b/dojo/models.py @@ -199,6 +199,28 @@ def __str__(self): User = get_user_model() +# Import post-processing execution modes. +# - ASYNC: post-processing (dedup, jira, grading, ...) runs in the background; +# the API responds immediately (default, historical behavior). +# - ASYNC_WAIT: post-processing is dispatched to the background as usual, but the +# request waits for the deduplication batches to finish before responding, so +# notifications and the returned statistics reflect the deduplicated state. +# - SYNC: post-processing runs inline in the web process (legacy block_execution). +DEDUPLICATION_EXECUTION_MODE_ASYNC = "async" +DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT = "async_wait" +DEDUPLICATION_EXECUTION_MODE_SYNC = "sync" +DEDUPLICATION_EXECUTION_MODES = ( + DEDUPLICATION_EXECUTION_MODE_ASYNC, + DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, + DEDUPLICATION_EXECUTION_MODE_SYNC, +) +DEDUPLICATION_EXECUTION_MODE_CHOICES = ( + (DEDUPLICATION_EXECUTION_MODE_ASYNC, _("Async (do not wait)")), + (DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, _("Async, wait for deduplication")), + (DEDUPLICATION_EXECUTION_MODE_SYNC, _("Synchronous (block)")), +) + + # proxy class for convenience and UI class Dojo_User(User): class Meta: @@ -214,8 +236,31 @@ def __str__(self): @staticmethod def wants_block_execution(user): # this return False if there is no user, i.e. in celery processes, unittests, etc. + # block_execution is the global "run all async tasks in the foreground" switch and + # governs every dojo_dispatch_task/dojo_async_task call (notifications, jira, grading, + # deduplication, ...). It is distinct from deduplication_execution_mode, which only + # controls how import/reimport deduplication post-processing is dispatched/awaited. return hasattr(user, "usercontactinfo") and user.usercontactinfo.block_execution + @staticmethod + def resolve_deduplication_execution_mode(user, override=None): + """ + Resolve the effective import/reimport deduplication execution mode. + + Priority: explicit request override > user profile deduplication_execution_mode > + legacy block_execution (which forces everything sync) > default async. + Returns one of DEDUPLICATION_EXECUTION_MODE_ASYNC / _ASYNC_WAIT / _SYNC. + """ + if override in DEDUPLICATION_EXECUTION_MODES: + return override + info = getattr(user, "usercontactinfo", None) + if info is not None: + if info.deduplication_execution_mode in DEDUPLICATION_EXECUTION_MODES: + return info.deduplication_execution_mode + if info.block_execution: + return DEDUPLICATION_EXECUTION_MODE_SYNC + return DEDUPLICATION_EXECUTION_MODE_ASYNC + @staticmethod def force_password_reset(user): return hasattr(user, "usercontactinfo") and user.usercontactinfo.force_password_reset @@ -256,6 +301,21 @@ class UserContactInfo(models.Model): slack_username = models.CharField(blank=True, null=True, max_length=150, help_text=_("Email address associated with your slack account"), verbose_name=_("Slack Email Address")) slack_user_id = models.CharField(blank=True, null=True, max_length=25) block_execution = models.BooleanField(default=False, help_text=_("Instead of async deduping a finding the findings will be deduped synchronously and will 'block' the user until completion.")) + deduplication_execution_mode = models.CharField( + max_length=20, + choices=DEDUPLICATION_EXECUTION_MODE_CHOICES, + null=True, + blank=True, + help_text=_( + "Controls how import/reimport deduplication post-processing is executed. " + "'Async' dispatches it to the background and returns immediately (default). " + "'Async, wait for deduplication' dispatches to the background but waits for " + "deduplication to finish before responding, so notifications and statistics " + "reflect the deduplicated state. 'Synchronous' runs the import deduplication " + "inline. Can be overridden per request. Independent of block_execution, which " + "forces all async tasks (notifications, jira, ...) to the foreground.", + ), + ) force_password_reset = models.BooleanField(default=False, help_text=_("Forces this user to reset their password on next login.")) ui_use_tailwind = models.BooleanField(default=False, verbose_name=_("Use new UI (beta)"), help_text=_("Opt in to the new Tailwind-based UI. Leave off for the classic UI.")) token_last_reset = models.DateTimeField(null=True, blank=True, help_text=_("Timestamp of the most recent API token reset for this user.")) diff --git a/dojo/settings/settings.dist.py b/dojo/settings/settings.dist.py index abe31dec9f9..c554c7d3d76 100644 --- a/dojo/settings/settings.dist.py +++ b/dojo/settings/settings.dist.py @@ -96,6 +96,9 @@ DD_CELERY_BROKER_PARAMS=(str, ""), DD_CELERY_BROKER_TRANSPORT_OPTIONS=(str, ""), DD_CELERY_TASK_IGNORE_RESULT=(bool, True), + # Max seconds the 'async_wait' deduplication execution mode will wait for + # background deduplication/post-processing to finish before responding anyway. + DD_DEDUPLICATION_ASYNC_WAIT_TIMEOUT=(int, 60), DD_CELERY_RESULT_BACKEND=(str, "django-db"), DD_CELERY_RESULT_EXPIRES=(int, 86400), DD_CELERY_BEAT_SCHEDULE_FILENAME=(str, root("dojo.celery.beat.db")), @@ -855,6 +858,7 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param params=env("DD_CELERY_BROKER_PARAMS"), ) CELERY_TASK_IGNORE_RESULT = env("DD_CELERY_TASK_IGNORE_RESULT") +DEDUPLICATION_ASYNC_WAIT_TIMEOUT = env("DD_DEDUPLICATION_ASYNC_WAIT_TIMEOUT") CELERY_RESULT_BACKEND = env("DD_CELERY_RESULT_BACKEND") CELERY_TIMEZONE = TIME_ZONE CELERY_RESULT_EXPIRES = env("DD_CELERY_RESULT_EXPIRES") diff --git a/dojo/templates/dojo/view_user.html b/dojo/templates/dojo/view_user.html index 4f1d0a3b04d..75c30fd6edf 100644 --- a/dojo/templates/dojo/view_user.html +++ b/dojo/templates/dojo/view_user.html @@ -288,6 +288,12 @@

+ + {% trans "Deduplication execution mode" %} + + {{ user.usercontactinfo.get_deduplication_execution_mode_display|default:_("Async (do not wait)") }} + + {% trans "Date Joined" %} {{ user.date_joined }} diff --git a/dojo/templates_classic/dojo/view_user.html b/dojo/templates_classic/dojo/view_user.html index e1fe9917722..ed9e90ee8cc 100644 --- a/dojo/templates_classic/dojo/view_user.html +++ b/dojo/templates_classic/dojo/view_user.html @@ -288,6 +288,12 @@

+ + {% trans "Deduplication execution mode" %} + + {{ user.usercontactinfo.get_deduplication_execution_mode_display|default:_("Async (do not wait)") }} + + {% trans "Date Joined" %} {{ user.date_joined }} diff --git a/dojo/templates_classic/notifications/mail/scan_added.tpl b/dojo/templates_classic/notifications/mail/scan_added.tpl index 263585246e0..567d80ee47e 100644 --- a/dojo/templates_classic/notifications/mail/scan_added.tpl +++ b/dojo/templates_classic/notifications/mail/scan_added.tpl @@ -26,6 +26,17 @@ {% endfor %}

+ {% if findings_new_duplicate %} +

+

+ {% blocktranslate %}New findings detected as duplicates{% endblocktranslate %} ({{ findings_new_duplicate | length }})
+ {% for finding in findings_new_duplicate %} + {% url 'view_finding' finding.id as finding_url %} + {{ finding.title }} ({{ finding.severity }})
+ {% endfor %} +
+

+ {% endif %}

{% blocktranslate %}Reactivated findings{% endblocktranslate %} ({{ findings_reactivated | length }})
@@ -37,6 +48,17 @@ {% endfor %}

+ {% if findings_reactivated_duplicate %} +

+

+ {% blocktranslate %}Reactivated findings detected as duplicates{% endblocktranslate %} ({{ findings_reactivated_duplicate | length }})
+ {% for finding in findings_reactivated_duplicate %} + {% url 'view_finding' finding.id as finding_url %} + {{ finding.title }} ({{ finding.severity }})
+ {% endfor %} +
+

+ {% endif %}

{% blocktranslate %}Closed findings{% endblocktranslate %} ({{ findings_mitigated | length }})
@@ -59,6 +81,17 @@ {% endfor %}

+ {% if findings_untouched_duplicate %} +

+

+ {% blocktranslate %}Existing findings detected as duplicates{% endblocktranslate %} ({{ findings_untouched_duplicate | length }})
+ {% for finding in findings_untouched_duplicate %} + {% url 'view_finding' finding.id as finding_url %} + {{ finding.title }} ({{ finding.severity }})
+ {% endfor %} +
+

+ {% endif %}

{% trans "Kind regards" %},

diff --git a/dojo/templates_classic/notifications/webhooks/scan_added.tpl b/dojo/templates_classic/notifications/webhooks/scan_added.tpl index b42096bfba2..0f68a72eb12 100644 --- a/dojo/templates_classic/notifications/webhooks/scan_added.tpl +++ b/dojo/templates_classic/notifications/webhooks/scan_added.tpl @@ -8,5 +8,11 @@ findings: {% include 'notifications/webhooks/subtemplates/findings_list.tpl' with findings=findings_reactivated %} mitigated: {% include 'notifications/webhooks/subtemplates/findings_list.tpl' with findings=findings_mitigated %} - untouched: + untouched: {% include 'notifications/webhooks/subtemplates/findings_list.tpl' with findings=findings_untouched %} + new_duplicate: +{% include 'notifications/webhooks/subtemplates/findings_list.tpl' with findings=findings_new_duplicate %} + reactivated_duplicate: +{% include 'notifications/webhooks/subtemplates/findings_list.tpl' with findings=findings_reactivated_duplicate %} + untouched_duplicate: +{% include 'notifications/webhooks/subtemplates/findings_list.tpl' with findings=findings_untouched_duplicate %} diff --git a/dojo/test/views.py b/dojo/test/views.py index 4e7f9c54dba..77666524ffd 100644 --- a/dojo/test/views.py +++ b/dojo/test/views.py @@ -47,6 +47,7 @@ from dojo.location.models import Location from dojo.models import ( BurpRawRequestResponse, + Dojo_User, Endpoint, Finding, Finding_Group, @@ -960,6 +961,10 @@ def process_form( "close_old_findings": form.cleaned_data.get("close_old_findings", None), "create_finding_groups_for_all_findings": form.cleaned_data.get("create_finding_groups_for_all_findings", None), }) + # Honor the user's profile deduplication_execution_mode for UI reimports. The API resolves + # this in the serializer; the UI has no per-import selector, so fall back to the profile + # (or block_execution) instead of silently defaulting to async. + context["deduplication_execution_mode"] = Dojo_User.resolve_deduplication_execution_mode(request.user) # Override the form values of active and verified if activeChoice := form.cleaned_data.get("active", None): if activeChoice == "force_to_true": diff --git a/unittests/test_import_execution_mode.py b/unittests/test_import_execution_mode.py new file mode 100644 index 00000000000..e575b987c01 --- /dev/null +++ b/unittests/test_import_execution_mode.py @@ -0,0 +1,246 @@ +from unittest.mock import patch + +from django.test import override_settings + +from dojo.importers.default_importer import DefaultImporter +from dojo.models import ( + DEDUPLICATION_EXECUTION_MODE_ASYNC, + DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, + DEDUPLICATION_EXECUTION_MODE_SYNC, + Development_Environment, + Dojo_User, + Engagement, + Finding, + Test, + UserContactInfo, +) + +from .dojo_test_case import DojoAPITestCase, DojoTestCase, get_unit_tests_path, versioned_fixtures + + +@versioned_fixtures +class ImportExecutionModeResolverTest(DojoTestCase): + + """resolve_deduplication_execution_mode: request override > profile > default.""" + + fixtures = ["dojo_testdata.json"] + + def setUp(self): + self.user = Dojo_User.objects.get(username="admin") + UserContactInfo.objects.filter(user=self.user).delete() + + def _set_profile(self, *, mode=None): + UserContactInfo.objects.update_or_create( + user=self.user, + defaults={"deduplication_execution_mode": mode}, + ) + self.user.refresh_from_db() + + def test_default_is_async(self): + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_ASYNC, Dojo_User.resolve_deduplication_execution_mode(self.user)) + + def test_request_override_wins_over_profile(self): + self._set_profile(mode=DEDUPLICATION_EXECUTION_MODE_SYNC) + self.assertEqual( + DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, + Dojo_User.resolve_deduplication_execution_mode(self.user, DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT), + ) + + def test_profile_mode_used_when_no_override(self): + self._set_profile(mode=DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT) + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, Dojo_User.resolve_deduplication_execution_mode(self.user)) + + def test_empty_profile_falls_back_to_async(self): + self._set_profile(mode=None) + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_ASYNC, Dojo_User.resolve_deduplication_execution_mode(self.user)) + + def test_invalid_override_ignored(self): + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_ASYNC, Dojo_User.resolve_deduplication_execution_mode(self.user, "garbage")) + + def test_no_user(self): + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_ASYNC, Dojo_User.resolve_deduplication_execution_mode(None)) + + def test_block_execution_falls_back_to_sync(self): + # legacy global block_execution flag implies synchronous deduplication + UserContactInfo.objects.update_or_create(user=self.user, defaults={"block_execution": True}) + self.user.refresh_from_db() + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_SYNC, Dojo_User.resolve_deduplication_execution_mode(self.user)) + + def test_mode_takes_precedence_over_block_execution(self): + UserContactInfo.objects.update_or_create( + user=self.user, + defaults={"block_execution": True, "deduplication_execution_mode": DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT}, + ) + self.user.refresh_from_db() + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT, Dojo_User.resolve_deduplication_execution_mode(self.user)) + + def test_wants_block_execution_reads_block_execution_not_mode(self): + # wants_block_execution is the global switch and is independent of the dedup mode + UserContactInfo.objects.update_or_create(user=self.user, defaults={"block_execution": True}) + self.user.refresh_from_db() + self.assertTrue(Dojo_User.wants_block_execution(self.user)) + UserContactInfo.objects.update_or_create( + user=self.user, + defaults={"block_execution": False, "deduplication_execution_mode": DEDUPLICATION_EXECUTION_MODE_SYNC}, + ) + self.user.refresh_from_db() + # a 'sync' dedup mode alone does NOT force global foreground execution + self.assertFalse(Dojo_User.wants_block_execution(self.user)) + + +@versioned_fixtures +class ImporterDispatchKwargsTest(DojoTestCase): + + """deduplication_execution_mode -> dojo_dispatch_task force flags.""" + + fixtures = ["dojo_testdata.json"] + + def _importer(self, mode, **extra): + return DefaultImporter( + scan_type="ZAP Scan", + engagement=Engagement.objects.first(), + environment=Development_Environment.objects.first(), + deduplication_execution_mode=mode, + **extra, + ) + + def test_sync_mode_forces_sync(self): + self.assertEqual({"force_sync": True}, self._importer(DEDUPLICATION_EXECUTION_MODE_SYNC).post_processing_dispatch_kwargs()) + + def test_async_wait_mode_forces_async(self): + self.assertEqual({"force_async": True}, self._importer(DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT).post_processing_dispatch_kwargs()) + + def test_async_mode_preserves_external_force_sync(self): + importer = self._importer(DEDUPLICATION_EXECUTION_MODE_ASYNC) + self.assertEqual({"force_sync": False}, importer.post_processing_dispatch_kwargs()) + self.assertEqual({"force_sync": True}, importer.post_processing_dispatch_kwargs(force_sync=True)) + + def test_invalid_mode_defaults_to_async(self): + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_ASYNC, self._importer("nonsense").deduplication_execution_mode) + + def test_external_force_sync_promotes_to_sync_mode(self): + importer = self._importer(DEDUPLICATION_EXECUTION_MODE_ASYNC, force_sync=True) + self.assertEqual(DEDUPLICATION_EXECUTION_MODE_SYNC, importer.deduplication_execution_mode) + + +@versioned_fixtures +@override_settings(CELERY_TASK_ALWAYS_EAGER=True) +class ImportExecutionModeAPITest(DojoAPITestCase): + + """ + End-to-end: the import endpoints accept and honor deduplication_execution_mode. + + CELERY_TASK_ALWAYS_EAGER runs dispatched tasks inline against the test DB, so + 'async_wait' can actually join its deduplication batch (a real broker/worker + runs against a different DB and could never see the test transaction's data). + """ + + fixtures = ["dojo_testdata.json"] + + def setUp(self): + super().setUp() + self.login_as_admin() + + def _payload(self, mode): + return { + "minimum_severity": "Low", + "scan_type": "ZAP Scan", + "engagement": 1, + "deduplication_execution_mode": mode, + } + + def test_import_async_wait_returns_statistics(self): + with (get_unit_tests_path() / "scans/zap/0_zap_sample.xml").open(encoding="utf-8") as testfile: + payload = self._payload(DEDUPLICATION_EXECUTION_MODE_ASYNC_WAIT) + payload["file"] = testfile + result = self.import_scan(payload, 201) + self.assertIn("statistics", result) + self.assertIn("after", result["statistics"]) + # async_wait joins deduplication, so it must report completion + self.assertTrue(result["deduplication_complete"]) + + def test_import_async_does_not_await_deduplication(self): + with (get_unit_tests_path() / "scans/zap/0_zap_sample.xml").open(encoding="utf-8") as testfile: + payload = self._payload(DEDUPLICATION_EXECUTION_MODE_ASYNC) + payload["file"] = testfile + result = self.import_scan(payload, 201) + self.assertFalse(result["deduplication_complete"]) + + def test_import_rejects_invalid_mode(self): + with (get_unit_tests_path() / "scans/zap/0_zap_sample.xml").open(encoding="utf-8") as testfile: + payload = self._payload("not-a-mode") + payload["file"] = testfile + self.import_scan(payload, 400) + + +@versioned_fixtures +class NotificationDeduplicationRefreshTest(DojoTestCase): + + """notify_scan_added refreshes duplicate status from the DB once dedup is complete.""" + + fixtures = ["dojo_testdata.json"] + + def _importer(self): + test = Test.objects.first() + importer = DefaultImporter( + scan_type="ZAP Scan", + engagement=test.engagement, + environment=Development_Environment.objects.first(), + ) + return importer, test + + @patch("dojo.importers.base_importer.create_notification") + def test_deduplicated_new_findings_excluded_when_complete(self, mock_notify): + importer, test = self._importer() + importer.deduplication_complete = True + + real = Finding(test=test, title="real finding", severity="High") + real.save() + dupe = Finding(test=test, title="dupe finding", severity="High") + dupe.save() + # Simulate background deduplication having flagged the second finding. + Finding.objects.filter(pk=dupe.pk).update(duplicate=True) + + importer.notify_scan_added(test, updated_count=2, new_findings=[real, dupe]) + + kwargs = mock_notify.call_args.kwargs + self.assertEqual([f.id for f in kwargs["findings_new"]], [real.id]) + self.assertEqual([f.id for f in kwargs["findings_new_duplicate"]], [dupe.id]) + # headline count excludes the deduplicated finding + self.assertEqual(kwargs["finding_count"], 1) + self.assertEqual(kwargs["event"], "scan_added") + + @patch("dojo.importers.base_importer.create_notification") + def test_async_mode_does_not_refresh(self, mock_notify): + importer, test = self._importer() + importer.deduplication_complete = False # plain async: dedup not awaited + + dupe = Finding(test=test, title="async dupe", severity="High") + dupe.save() + Finding.objects.filter(pk=dupe.pk).update(duplicate=True) + + importer.notify_scan_added(test, updated_count=1, new_findings=[dupe]) + + kwargs = mock_notify.call_args.kwargs + # historical behavior: duplicate still listed/counted as new + self.assertEqual([f.id for f in kwargs["findings_new"]], [dupe.id]) + self.assertEqual(kwargs["findings_new_duplicate"], []) + self.assertEqual(kwargs["finding_count"], 1) + + @patch("dojo.importers.base_importer.create_notification") + def test_all_new_findings_duplicate_yields_empty_event(self, mock_notify): + importer, test = self._importer() + importer.deduplication_complete = True + + dupe = Finding(test=test, title="only dupe", severity="Low") + dupe.save() + Finding.objects.filter(pk=dupe.pk).update(duplicate=True) + + importer.notify_scan_added(test, updated_count=1, new_findings=[dupe]) + + kwargs = mock_notify.call_args.kwargs + self.assertEqual(kwargs["findings_new"], []) + self.assertEqual([f.id for f in kwargs["findings_new_duplicate"]], [dupe.id]) + self.assertEqual(kwargs["finding_count"], 0) + # net-new is zero -> empty scan notification + self.assertEqual(kwargs["event"], "scan_added_empty") diff --git a/unittests/test_importers_performance.py b/unittests/test_importers_performance.py index ce9133e4a6b..3eac03f4ffb 100644 --- a/unittests/test_importers_performance.py +++ b/unittests/test_importers_performance.py @@ -403,7 +403,7 @@ def test_import_reimport_reimport_performance_pghistory_no_async_with_product_gr ) # Deduplication is enabled in the tests above, but to properly test it we must run the same import twice and capture the results. - def _deduplication_performance(self, expected_num_queries1, expected_num_async_tasks1, expected_num_queries2, expected_num_async_tasks2, *, check_duplicates=True): + def _deduplication_performance(self, expected_num_queries1, expected_num_async_tasks1, expected_num_queries2, expected_num_async_tasks2, *, check_duplicates=True, dedup_mode=None): """ Test method to measure deduplication performance by importing the same scan twice. The second import should result in all findings being marked as duplicates. @@ -444,6 +444,7 @@ def _deduplication_performance(self, expected_num_queries1, expected_num_async_t "verified": True, "scan_type": STACK_HAWK_SCAN_TYPE, "engagement": engagement, + **({"deduplication_execution_mode": dedup_mode} if dedup_mode else {}), } importer = DefaultImporter(**import_options) _, _, len_new_findings1, len_closed_findings1, _, _, _ = importer.process_scan(scan) @@ -471,6 +472,7 @@ def _deduplication_performance(self, expected_num_queries1, expected_num_async_t "verified": True, "scan_type": STACK_HAWK_SCAN_TYPE, "engagement": engagement, + **({"deduplication_execution_mode": dedup_mode} if dedup_mode else {}), } importer = DefaultImporter(**import_options) _, _, len_new_findings2, len_closed_findings2, _, _, _ = importer.process_scan(scan) @@ -551,6 +553,40 @@ def test_deduplication_performance_pghistory_no_async(self): expected_num_async_tasks2=2, ) + @override_settings(ENABLE_AUDITLOG=True) + def test_deduplication_performance_pghistory_async_wait(self): + """ + Deduplication performance in the 'async_wait' execution mode: post-processing is + dispatched to a background worker, then the request joins on the result before + responding. The dedup queries run in the worker (a separate connection), NOT in + the web request, so the only web-side cost over the plain async path is the single + post-dedup notification refresh SELECT (+1). + + We do not use CELERY_TASK_ALWAYS_EAGER here — that would run the dispatched task + inline on the request's connection and wrongly count the worker's dedup queries. + Instead the dedup batch is dispatched async (not executed in-process) and the join + (AsyncResult.get) is mocked to return immediately, simulating a worker that has + finished. deduplication_complete is therefore True (so the refresh runs), but the + findings are not actually deduplicated in-test, so check_duplicates is False. + """ + configure_audit_system() + configure_pghistory_triggers() + + # Enable deduplication + self.system_settings(enable_deduplication=True) + + # Simulate the background worker's post-processing having completed so the join + # returns instantly without executing dedup on the request's DB connection. + with patch("celery.result.AsyncResult.get", return_value=None): + self._deduplication_performance( + expected_num_queries1=110, + expected_num_async_tasks1=2, + expected_num_queries2=90, + expected_num_async_tasks2=2, + dedup_mode="async_wait", + check_duplicates=False, + ) + @tag("performance") @override_settings(V3_FEATURE_LOCATIONS=True)