Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
No-op if TANGLE_BUGSNAG_API_KEY is not set.

Environment variables:
TANGLE_BUGSNAG_API_KEY Required to enable Bugsnag reporting.
TANGLE_ENV Release stage (e.g. "staging", "production").
TANGLE_SERVICE_VERSION App version tag (e.g. git SHA). Optional.
TANGLE_BUGSNAG_NOTIFY_ENDPOINT Custom notify URL. Optional.
TANGLE_BUGSNAG_SESSIONS_ENDPOINT Custom sessions URL. Optional.
TANGLE_BUGSNAG_API_KEY Required to enable Bugsnag reporting.
TANGLE_ENV Release stage (e.g. "staging", "production").
TANGLE_SERVICE_VERSION App version tag (e.g. git SHA). Optional.
TANGLE_BUGSNAG_NOTIFY_ENDPOINT Custom notify URL. Optional.
TANGLE_BUGSNAG_SESSIONS_ENDPOINT Custom sessions URL. Optional.
TANGLE_BUGSNAG_CUSTOM_GROUPING_KEY Metadata key for normalized error grouping. Optional.
"""

import logging
Expand All @@ -21,6 +22,7 @@
import bugsnag.event as bugsnag_event

from . import contextual_logging
from . import error_normalization

_logger = logging.getLogger(__name__)

Expand All @@ -29,6 +31,7 @@
_SERVICE_VERSION = os.environ.get("TANGLE_SERVICE_VERSION")
_NOTIFY_ENDPOINT = os.environ.get("TANGLE_BUGSNAG_NOTIFY_ENDPOINT")
_SESSIONS_ENDPOINT = os.environ.get("TANGLE_BUGSNAG_SESSIONS_ENDPOINT")
_CUSTOM_GROUPING_KEY = os.environ.get("TANGLE_BUGSNAG_CUSTOM_GROUPING_KEY")

IS_BUGSNAG_ENABLED: bool = bool(_BUGSNAG_API_KEY)
_setup_called: bool = False
Expand All @@ -39,6 +42,21 @@ def _before_notify(event: bugsnag_event.Event) -> None:
context = contextual_logging.get_all_context_metadata()
if context:
event.add_tab("tangle_context", context)
if _CUSTOM_GROUPING_KEY and event.original_error:
normalized = error_normalization.normalize_error_message(
exception=event.original_error
)
prefix = (event.metadata.get("extra") or {}).get("grouping_prefix")
key_value = f"{prefix}: {normalized}" if prefix else normalized
event.add_tab("custom", {_CUSTOM_GROUPING_KEY: key_value})
if prefix and event.errors:
try:
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For extra safety against potential package changes in the future, this is wrapped in a try catch with a fallback to no prefixing.

for error in event.errors:
error.error_class = f"{prefix}: {error.error_class}"
except Exception:
_logger.debug(
"Could not prepend grouping prefix to errorClass", exc_info=True
)


def setup(*, service_name: str | None = None) -> None:
Expand Down
100 changes: 100 additions & 0 deletions cloud_pipelines_backend/instrumentation/error_normalization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""
Normalizes exception messages into stable strings for error grouping.

Strips instance-specific values (pod names, IDs, memory addresses, byte
offsets) so that structurally identical errors produce the same key
regardless of which specific resource was involved.
"""

import json
import re

_POD_NAME_PATTERN = re.compile(r"(?:task|tangle(?:-ce)?)-[a-zA-Z0-9]+-[a-zA-Z0-9]+")
_OBJECT_REPR_PATTERN = re.compile(r"<[^>]+ object at 0x[0-9a-fA-F]+>")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, we might want to fix such address strings (if they are not informative).

_HEX_ADDRESS_PATTERN = re.compile(r"\b0x[0-9a-fA-F]+\b")
_UUID_PATTERN = re.compile(
r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", re.IGNORECASE
)
_LONG_ALNUM_ID_PATTERN = re.compile(r"\b[a-zA-Z0-9]{16,}\b")


def _strip_generic(*, message: str) -> str:
message = _OBJECT_REPR_PATTERN.sub("{object}", message)
message = _HEX_ADDRESS_PATTERN.sub("{addr}", message)
message = _UUID_PATTERN.sub("{uuid}", message)
message = _LONG_ALNUM_ID_PATTERN.sub("{id}", message)
return message.strip()


def _normalize_k8s_api_exception(*, exception: BaseException) -> str | None:
try:
from kubernetes.client import exceptions as k8s_exceptions

if not isinstance(exception, k8s_exceptions.ApiException):
return None
except ImportError:
return None

status_code = exception.status
try:
body = json.loads(exception.body)
reason = body.get("reason", "")
message = body.get("message", "")
except (json.JSONDecodeError, TypeError):
reason = ""
message = str(exception)

message = _POD_NAME_PATTERN.sub("{pod}", message)
parts = [f"kubernetes ApiException ({status_code})"]
if reason:
parts.append(reason)
if message:
parts.append(message)
return ": ".join(parts)


def _normalize_max_retry_error(*, exception: BaseException) -> str | None:
try:
from urllib3.exceptions import MaxRetryError

if not isinstance(exception, MaxRetryError):
return None
except ImportError:
return None

cause = type(exception.reason).__name__ if exception.reason else "unknown"
return f"MaxRetryError: k8s connection pool max retries exceeded ({cause})"


def _normalize_unicode_decode_error(*, exception: BaseException) -> str | None:
if not isinstance(exception, UnicodeDecodeError):
return None
return f"UnicodeDecodeError: '{exception.encoding}' codec can't decode byte at position {{n}}"


def _normalize_orchestrator_error(*, exception: BaseException) -> str | None:
try:
from ..orchestrator_sql import OrchestratorError

if not isinstance(exception, OrchestratorError):
return None
except ImportError:
return None

message = _OBJECT_REPR_PATTERN.sub("{object}", str(exception))
return f"OrchestratorError: {message}"


def normalize_error_message(*, exception: BaseException) -> str:
"""Return a stable normalized string for error grouping."""
for normalizer in (
_normalize_k8s_api_exception,
_normalize_max_retry_error,
_normalize_unicode_decode_error,
_normalize_orchestrator_error,
):
result = normalizer(exception=exception)
if result is not None:
return result

return f"{type(exception).__name__}: {_strip_generic(message=str(exception))}"
6 changes: 5 additions & 1 deletion cloud_pipelines_backend/orchestrator_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,11 @@ def _retry(

def record_system_error_exception(execution: bts.ExecutionNode, exception: Exception):
app_metrics.execution_system_errors.add(1)
bugsnag_instrumentation.notify(exception=exception, execution_id=str(execution.id))
bugsnag_instrumentation.notify(
exception=exception,
execution_id=str(execution.id),
grouping_prefix="SYSTEM_ERROR",
)

if execution.extra_data is None:
execution.extra_data = {}
Expand Down
79 changes: 79 additions & 0 deletions tests/instrumentation/test_bugsnag.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,85 @@ def test_before_notify_attaches_context_metadata(monkeypatch):
)


def test_before_notify_prefixes_error_class_when_grouping_prefix_set(monkeypatch):
monkeypatch.setenv("TANGLE_BUGSNAG_API_KEY", "test-api-key")
monkeypatch.setenv("TANGLE_ENV", "staging")
monkeypatch.setenv("TANGLE_BUGSNAG_CUSTOM_GROUPING_KEY", "my_key")

import importlib
import cloud_pipelines_backend.instrumentation.bugsnag_instrumentation as bugsnag_module

importlib.reload(bugsnag_module)

from cloud_pipelines_backend.instrumentation import contextual_logging
from bugsnag.error import Error

contextual_logging.clear_context_metadata()

error = Error("ValueError", "boom", [])
mock_event = mock.MagicMock()
mock_event.original_error = ValueError("boom")
mock_event.metadata = {"extra": {"grouping_prefix": "SYSTEM_ERROR"}}
mock_event.errors = [error]

bugsnag_module._before_notify(mock_event)

assert error.error_class == "SYSTEM_ERROR: ValueError"


def test_before_notify_skips_error_class_prefix_when_no_grouping_prefix(monkeypatch):
monkeypatch.setenv("TANGLE_BUGSNAG_API_KEY", "test-api-key")
monkeypatch.setenv("TANGLE_ENV", "staging")
monkeypatch.setenv("TANGLE_BUGSNAG_CUSTOM_GROUPING_KEY", "my_key")

import importlib
import cloud_pipelines_backend.instrumentation.bugsnag_instrumentation as bugsnag_module

importlib.reload(bugsnag_module)

from cloud_pipelines_backend.instrumentation import contextual_logging
from bugsnag.error import Error

contextual_logging.clear_context_metadata()

error = Error("ValueError", "boom", [])
mock_event = mock.MagicMock()
mock_event.original_error = ValueError("boom")
mock_event.metadata = {"extra": {}}
mock_event.errors = [error]

bugsnag_module._before_notify(mock_event)

assert error.error_class == "ValueError"


def test_before_notify_skips_error_class_prefix_gracefully_on_bad_errors_structure(
monkeypatch,
):
monkeypatch.setenv("TANGLE_BUGSNAG_API_KEY", "test-api-key")
monkeypatch.setenv("TANGLE_ENV", "staging")
monkeypatch.setenv("TANGLE_BUGSNAG_CUSTOM_GROUPING_KEY", "my_key")

import importlib
import cloud_pipelines_backend.instrumentation.bugsnag_instrumentation as bugsnag_module

importlib.reload(bugsnag_module)

from cloud_pipelines_backend.instrumentation import contextual_logging

contextual_logging.clear_context_metadata()

mock_event = mock.MagicMock()
mock_event.original_error = ValueError("boom")
mock_event.metadata = {"extra": {"grouping_prefix": "SYSTEM_ERROR"}}
mock_event.errors = (
"unexpected" # SDK structure changed — not a list of Error objects
)

# Should not raise
bugsnag_module._before_notify(mock_event)


def test_before_notify_skips_empty_context(monkeypatch):
monkeypatch.setenv("TANGLE_BUGSNAG_API_KEY", "test-api-key")
monkeypatch.setenv("TANGLE_ENV", "staging")
Expand Down
Loading