diff --git a/chain_capabilities/stellar/actions/actions.go b/chain_capabilities/stellar/actions/actions.go index 82bcab648..6f768066d 100644 --- a/chain_capabilities/stellar/actions/actions.go +++ b/chain_capabilities/stellar/actions/actions.go @@ -85,11 +85,11 @@ func (s *Stellar) Close() error { func (s *Stellar) GetLatestLedger(ctx context.Context, _ capabilities.RequestMetadata, _ *stellarcap.GetLatestLedgerRequest) (*capabilities.ResponseAndMetadata[*stellarcap.GetLatestLedgerResponse], caperrors.Error) { resp, err := s.StellarService.GetLatestLedger(ctx) if err != nil { - return nil, GetError(err, false) + return nil, capcommon.GetError(err, false) } protoResp, err := stellarcap.ConvertGetLatestLedgerResponseToProto(resp) if err != nil { - return nil, GetError(err, false) + return nil, capcommon.GetError(err, false) } return &capabilities.ResponseAndMetadata[*stellarcap.GetLatestLedgerResponse]{Response: protoResp}, nil } @@ -196,6 +196,3 @@ func isUserError(err error) bool { func isStellarNodeInfraError(err error) bool { return errors.Is(err, multinode.ErrNodeError) || strings.Contains(err.Error(), multinode.ErrNodeError.Error()) } - -var GetError = capcommon.GetError -var NewUserError = caperrors.NewPublicUserError diff --git a/chain_capabilities/stellar/actions/actions_test.go b/chain_capabilities/stellar/actions/actions_test.go index 71e726bd3..ed83b0227 100644 --- a/chain_capabilities/stellar/actions/actions_test.go +++ b/chain_capabilities/stellar/actions/actions_test.go @@ -14,6 +14,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities" caperrors "github.com/smartcontractkit/chainlink-common/pkg/capabilities/errors" stellarcap "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/stellar" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/stellar/scval" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink-common/pkg/types" @@ -281,6 +282,17 @@ func TestConvertReadContractRequestFromProto(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "function is required") }) + + t.Run("invalid arg conversion", func(t *testing.T) { + t.Parallel() + _, err := convertReadContractRequestFromProto(&stellarcap.ReadContractRequest{ + ContractId: testForwarderAddress, + Function: "balance", + Args: []*scval.ScVal{nil}, + }) + require.Error(t, err) + require.Contains(t, err.Error(), "args[0]") + }) } func TestIsStellarNodeInfraError_MessageSubstring(t *testing.T) { diff --git a/chain_capabilities/stellar/actions/tx_hash_retriever.go b/chain_capabilities/stellar/actions/tx_hash_retriever.go index cdfa2d762..0262f6356 100644 --- a/chain_capabilities/stellar/actions/tx_hash_retriever.go +++ b/chain_capabilities/stellar/actions/tx_hash_retriever.go @@ -4,37 +4,67 @@ import ( "context" "errors" "fmt" + "math" "strings" + "time" + "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/logger" capcommon "github.com/smartcontractkit/capabilities/chain_capabilities/common" + "github.com/smartcontractkit/capabilities/chain_capabilities/stellar/monitoring" ) const ( reportProcessedTopicPrefix = "forwarder_ReportProcessed" defaultForwarderLookbackLedgers = int64(100) failedToRetrieveTxHashErrorMsg = "failed to retrieve tx hash for report" + + txHashLookupTypeSuccessful = "SuccessfulTransmission" + txHashLookupTypeFailed = "FailedTransmission" + txHashRetrievalPhase = "EventPoll" ) var ErrUnexpectedSuccessfulTransmission = errors.New("unexpected successful transmission") type TxHashRetriever struct { - forwarderClient CREForwarderClient - transmissionID TransmissionID - lggr logger.SugaredLogger + forwarderClient CREForwarderClient + transmissionID TransmissionID + lggr logger.SugaredLogger + beholderProcessor beholder.ProtoProcessor + messageBuilder *monitoring.MessageBuilder + telemetryContext monitoring.TelemetryContext +} + +type TxHashRetrieverOption func(*TxHashRetriever) + +func WithTxHashRetrieverMonitoring( + beholderProcessor beholder.ProtoProcessor, + messageBuilder *monitoring.MessageBuilder, + telemetryContext monitoring.TelemetryContext, +) TxHashRetrieverOption { + return func(r *TxHashRetriever) { + r.beholderProcessor = beholderProcessor + r.messageBuilder = messageBuilder + r.telemetryContext = telemetryContext + } } func NewTxHashRetriever( forwarderClient CREForwarderClient, lggr logger.SugaredLogger, transmissionID TransmissionID, + opts ...TxHashRetrieverOption, ) TxHashRetriever { - return TxHashRetriever{ + r := TxHashRetriever{ forwarderClient: forwarderClient, transmissionID: transmissionID, lggr: lggr, } + for _, opt := range opts { + opt(&r) + } + return r } type eventDetails struct { @@ -65,7 +95,7 @@ func (l eventDetailsList) String() string { } func (r *TxHashRetriever) GetSuccessfulTransmissionHash(ctx context.Context) (string, error) { - details, err := r.fetchAndParseEvents(ctx) + details, err := r.fetchAndParseEvents(ctx, txHashLookupTypeSuccessful) if err != nil { return "", err } @@ -75,6 +105,7 @@ func (r *TxHashRetriever) GetSuccessfulTransmissionHash(ctx context.Context) (st } } r.lggr.Errorw("No successful transmission found", "txCount", len(details), "transactions", details.String()) + r.emitTxHashRetrievalPhase(ctx, txHashLookupTypeSuccessful, "NotFound", time.Now(), "") return "", fmt.Errorf("no successful transmission found. Found %d transactions (all failed): %s", len(details), details) } @@ -85,16 +116,21 @@ func (r *TxHashRetriever) GetFailedTransmissionHash(ctx context.Context) (string } func (r *TxHashRetriever) GetFailedTransmissionHashWithCount(ctx context.Context) (string, int, error) { - details, err := r.fetchAndParseEvents(ctx) + details, err := r.fetchAndParseEvents(ctx, txHashLookupTypeFailed) if err != nil { return "", 0, err } for _, d := range details { if d.isSuccess { + r.emitTxHashRetrievalPhase(ctx, txHashLookupTypeFailed, "UnexpectedSuccess", time.Now(), d.txHash) return "", len(details), fmt.Errorf("%w, successful tx hash: %s", ErrUnexpectedSuccessfulTransmission, d.txHash) } } + if len(details) == 0 { + r.emitTxHashRetrievalPhase(ctx, txHashLookupTypeFailed, "NotFound", time.Now(), "") + return "", 0, fmt.Errorf("no failed transmission found") + } earliestIdx := 0 for i, d := range details { @@ -113,7 +149,8 @@ func (r *TxHashRetriever) GetFailedTransmissionHashWithCount(ctx context.Context return details[earliestIdx].txHash, len(details), nil } -func (r *TxHashRetriever) fetchAndParseEvents(ctx context.Context) (eventDetailsList, error) { +func (r *TxHashRetriever) fetchAndParseEvents(ctx context.Context, lookupType string) (eventDetailsList, error) { + phaseStart := time.Now() events, err := capcommon.WithPollingRetry(ctx, r.lggr, func(ctx context.Context) ([]ReportProcessedEvent, error) { events, fetchErr := r.forwarderClient.GetReportProcessedEvents(ctx, r.transmissionID) if fetchErr != nil { @@ -125,10 +162,31 @@ func (r *TxHashRetriever) fetchAndParseEvents(ctx context.Context) (eventDetails return events, nil }) if err != nil { + r.emitTxHashRetrievalPhase(ctx, lookupType, "FetchError", phaseStart, "") return nil, fmt.Errorf("%s: %w", failedToRetrieveTxHashErrorMsg, err) } - return buildEventDetails(events), nil + details := buildEventDetails(events) + selectedHash := "" + if len(details) > 0 { + selectedHash = details[0].txHash + } + r.emitTxHashRetrievalPhase(ctx, lookupType, "Found", phaseStart, selectedHash) + return details, nil +} + +func (r *TxHashRetriever) emitTxHashRetrievalPhase(ctx context.Context, lookupType, result string, phaseStart time.Time, txHash string) { + if r.beholderProcessor == nil || r.messageBuilder == nil { + return + } + monitoring.EmitInitiated(ctx, r.lggr, r.beholderProcessor, r.messageBuilder.BuildWriteReportTxHashRetrievalPhase( + r.telemetryContext, + txHashRetrievalPhase, + result, + int64(math.Max(float64(time.Since(phaseStart).Milliseconds()), 0)), + txHash, + lookupType, + )) } func buildEventDetails(events []ReportProcessedEvent) eventDetailsList { diff --git a/chain_capabilities/stellar/actions/tx_hash_retriever_test.go b/chain_capabilities/stellar/actions/tx_hash_retriever_test.go index 298156c55..fb5aee82d 100644 --- a/chain_capabilities/stellar/actions/tx_hash_retriever_test.go +++ b/chain_capabilities/stellar/actions/tx_hash_retriever_test.go @@ -8,27 +8,45 @@ import ( "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/types" stellartypes "github.com/smartcontractkit/chainlink-common/pkg/types/chains/stellar" "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" + + "github.com/smartcontractkit/capabilities/chain_capabilities/stellar/monitoring" ) type stubForwarderClient struct { - events []ReportProcessedEvent - err error + events []ReportProcessedEvent + eventsErr error + transmissionInfoFn func(call int) (TransmissionInfo, error) + invokeOnReportResp *stellartypes.SubmitTransactionResponse + invokeOnReportErr error + transmissionCalls int } func (s *stubForwarderClient) InvokeOnReport(context.Context, string, *sdk.ReportResponse) (*stellartypes.SubmitTransactionResponse, error) { - panic("not implemented") + if s.invokeOnReportErr != nil { + return nil, s.invokeOnReportErr + } + if s.invokeOnReportResp != nil { + return s.invokeOnReportResp, nil + } + panic("stubForwarderClient.InvokeOnReport not configured") } func (s *stubForwarderClient) GetTransmissionInfo(context.Context, TransmissionID) (TransmissionInfo, error) { - panic("not implemented") + s.transmissionCalls++ + if s.transmissionInfoFn != nil { + return s.transmissionInfoFn(s.transmissionCalls) + } + panic("stubForwarderClient.GetTransmissionInfo not configured") } func (s *stubForwarderClient) GetReportProcessedEvents(context.Context, TransmissionID) ([]ReportProcessedEvent, error) { - if s.err != nil { - return nil, s.err + if s.eventsErr != nil { + return nil, s.eventsErr } return s.events, nil } @@ -116,7 +134,7 @@ func TestTxHashRetriever_GetSuccessfulTransmissionHash(t *testing.T) { t.Run("returns error when event fetch fails", func(t *testing.T) { t.Parallel() - client := &stubForwarderClient{err: errors.New("rpc down")} + client := &stubForwarderClient{eventsErr: errors.New("rpc down")} retriever := NewTxHashRetriever(client, lggr, transmissionID) ctx, cancel := context.WithTimeout(t.Context(), 300*time.Millisecond) @@ -158,6 +176,28 @@ func TestTxHashRetriever_GetFailedTransmissionHash(t *testing.T) { require.ErrorIs(t, err, ErrUnexpectedSuccessfulTransmission) }) + t.Run("emits unexpected success phase telemetry", func(t *testing.T) { + t.Parallel() + processor := &recordingWriteReportProcessor{} + client := &stubForwarderClient{events: []ReportProcessedEvent{ + {TxHash: testTxHash, Ledger: 100, Success: true}, + }} + retriever := NewTxHashRetriever( + client, + lggr, + transmissionID, + WithTxHashRetrieverMonitoring( + processor, + monitoring.NewMessageBuilder(types.ChainInfo{}, capabilities.CapabilityInfo{}, ""), + monitoring.TelemetryContext{}, + ), + ) + + _, err := retriever.GetFailedTransmissionHash(t.Context()) + require.ErrorIs(t, err, ErrUnexpectedSuccessfulTransmission) + requireTxHashPhaseEvent(t, processor.messages, "FailedTransmission", txHashRetrievalPhase, "UnexpectedSuccess", testTxHash) + }) + t.Run("GetFailedTransmissionHashWithCount returns count", func(t *testing.T) { t.Parallel() client := &stubForwarderClient{events: []ReportProcessedEvent{ diff --git a/chain_capabilities/stellar/actions/write_report.go b/chain_capabilities/stellar/actions/write_report.go index 042a04d72..ed9adb0d7 100644 --- a/chain_capabilities/stellar/actions/write_report.go +++ b/chain_capabilities/stellar/actions/write_report.go @@ -5,11 +5,13 @@ import ( "encoding/hex" "errors" "fmt" + "math" "strings" "time" "github.com/stellar/go-stellar-sdk/strkey" + "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" caperrors "github.com/smartcontractkit/chainlink-common/pkg/capabilities/errors" stellarcap "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/stellar" @@ -23,6 +25,7 @@ import ( capcommon "github.com/smartcontractkit/capabilities/chain_capabilities/common" ts "github.com/smartcontractkit/capabilities/chain_capabilities/common/transmission_schedule" "github.com/smartcontractkit/capabilities/chain_capabilities/stellar/metering" + "github.com/smartcontractkit/capabilities/chain_capabilities/stellar/monitoring" ) const ( @@ -39,6 +42,8 @@ type writeReport struct { chainSelector uint64 reportSizeLimit limits.BoundLimiter[commoncfg.Size] transmissionScheduler ts.TransmissionScheduler + messageBuilder *monitoring.MessageBuilder + beholderProcessor beholder.ProtoProcessor } func (s *Stellar) WriteReport( @@ -48,35 +53,59 @@ func (s *Stellar) WriteReport( ) (*capabilities.ResponseAndMetadata[*stellarcap.WriteReportReply], caperrors.Error) { ctx = metadata.ContextWithCRE(ctx) + telemetryContext := monitoring.TelemetryContext{TsStart: time.Now().UnixMilli(), RequestMetadata: metadata} + monitoring.EmitInitiated(ctx, s.lggr, s.beholderProcessor, s.messageBuilder.BuildWriteReportInitiated(telemetryContext, input)) + if err := s.validateWriteReportInputs(metadata, input); err != nil { - return nil, NewUserError(err, caperrors.InvalidArgument) + capErr := caperrors.NewPublicUserError(err, caperrors.InvalidArgument) + monitoring.LogAndEmitError(ctx, s.lggr, s.beholderProcessor, + s.messageBuilder.BuildWriteReportError(telemetryContext, input, "Failed to WriteReport, user error due to invalid request", capErr)) + return nil, capErr + } + + reply, meteringMeta, err := s.executeWriteReport(ctx, input, metadata, telemetryContext) + if err != nil { + isUserError := s.isUserErrorWriteReport(err) + capErr := capcommon.GetError(err, isUserError) + monitoring.LogAndEmitError(ctx, s.lggr, s.beholderProcessor, + s.messageBuilder.BuildWriteReportError(telemetryContext, input, "Failed to WriteReport while checking if the report exists or trying to publish on chain", capErr)) + return nil, capErr } + monitoring.LogAndEmitSuccess(ctx, "Successfully executed WriteReport", s.lggr, s.beholderProcessor, + s.messageBuilder.BuildWriteReportSuccess(telemetryContext, input)) + + return &capabilities.ResponseAndMetadata[*stellarcap.WriteReportReply]{ + Response: reply, + ResponseMetadata: meteringMeta, + }, nil +} + +func (s *Stellar) executeWriteReport( + ctx context.Context, + request *stellarcap.WriteReportRequest, + metadata capabilities.RequestMetadata, + telemetryContext monitoring.TelemetryContext, +) (*stellarcap.WriteReportReply, capabilities.ResponseMetadata, error) { wr := &writeReport{ service: s.StellarService, forwarderClient: s.forwarderClient, - lggr: s.lggr, + lggr: s.messageBuilder.RequestLggr(s.lggr, telemetryContext), forwarderLookbackLedgers: s.forwarderLookbackLedgers, chainSelector: s.chainSelector, reportSizeLimit: s.reportSizeLimit, transmissionScheduler: s.transmissionScheduler, + messageBuilder: s.messageBuilder, + beholderProcessor: s.beholderProcessor, } - - reply, meteringMeta, err := wr.execute(ctx, metadata, input) - if err != nil { - return nil, GetError(err, s.isUserErrorWriteReport(err)) - } - - return &capabilities.ResponseAndMetadata[*stellarcap.WriteReportReply]{ - Response: reply, - ResponseMetadata: meteringMeta, - }, nil + return wr.execute(ctx, request, metadata, telemetryContext) } func (wr *writeReport) execute( ctx context.Context, - metadata capabilities.RequestMetadata, request *stellarcap.WriteReportRequest, + metadata capabilities.RequestMetadata, + telemetryContext monitoring.TelemetryContext, ) (*stellarcap.WriteReportReply, capabilities.ResponseMetadata, error) { ctx = contexts.WithChainSelector(ctx, wr.chainSelector) @@ -93,14 +122,15 @@ func (wr *writeReport) execute( queuePosition := wr.transmissionScheduler.GetQueuePosition(hex.EncodeToString(scheduleKey[:])) wr.lggr = wr.lggr.With(append([]any{"queuePosition", queuePosition, "forwarder", wr.forwarderClient.ForwarderAddress()}, transmissionID.LogAttrs()...)...) - txHashRetriever := NewTxHashRetriever(wr.forwarderClient, wr.lggr, transmissionID) + txHashRetriever := NewTxHashRetriever(wr.forwarderClient, wr.lggr, transmissionID, + WithTxHashRetrieverMonitoring(wr.beholderProcessor, wr.messageBuilder, telemetryContext)) // TODO(follow-up): Consider simulating the on_report transaction before polling when the // transmission has not yet been attempted. If simulation predicts a terminal failure we // could skip delta-stage polling and/or submission. Open questions: simulation may fail on // only some DON nodes (stale ledger, timing), so every node must still return the same // WriteReportReply and metering semantics before enabling this shortcut. - info, err := wr.pollTransmissionInfo(ctx, transmissionID, queuePosition) + info, err := wr.pollTransmissionInfo(ctx, request, telemetryContext, transmissionID, queuePosition) if err != nil { return nil, capabilities.ResponseMetadata{}, fmt.Errorf("failed to get transmission info: %w", err) } @@ -112,15 +142,19 @@ func (wr *writeReport) execute( wr.lggr.Errorw("Returning without a transmission attempt - prior transmission succeeded, but failed to retrieve its tx hash", "error", hashErr) return nil, capabilities.ResponseMetadata{}, hashErr } - reply, err := wr.buildSuccessReply(ctx, txHash) + reply, err := wr.buildSuccessReply(ctx, request, telemetryContext, txHash) return reply, capabilities.ResponseMetadata{}, err case TransmissionStateInvalidReceiver: txHash, hashErr := txHashRetriever.GetFailedTransmissionHash(ctx) if hashErr != nil { - wr.lggr.Errorw("Returning without a transmission attempt - prior transmission marked receiver invalid, but failed to retrieve its tx hash", "error", hashErr) + if errors.Is(hashErr, ErrUnexpectedSuccessfulTransmission) { + wr.emitInvalidTransmissionState(ctx, request, telemetryContext, info, transmissionID, "WriteReport unexpected successful transmission", hashErr.Error()) + } else { + wr.lggr.Errorw("Returning without a transmission attempt - prior transmission marked receiver invalid, but failed to retrieve its tx hash", "error", hashErr) + } return nil, capabilities.ResponseMetadata{}, hashErr } - reply, err := wr.buildRevertReplyFromTx(ctx, txHash, info, transmissionID) + reply, err := wr.buildRevertReplyFromTx(ctx, request, telemetryContext, txHash, info, transmissionID) if err != nil { return nil, capabilities.ResponseMetadata{}, revertReplyBuildError(info, transmissionID, err) } @@ -129,19 +163,20 @@ func (wr *writeReport) execute( txHash, hashErr := txHashRetriever.GetFailedTransmissionHash(ctx) if hashErr != nil { if errors.Is(hashErr, ErrUnexpectedSuccessfulTransmission) { - wr.lggr.Errorw("Returning without a transmission attempt - unexpected successful transmission while state is failed", "error", hashErr) + wr.emitInvalidTransmissionState(ctx, request, telemetryContext, info, transmissionID, "WriteReport unexpected successful transmission", hashErr.Error()) } else { wr.lggr.Errorw("Returning without a transmission attempt - prior transmission failed, but failed to retrieve its tx hash", "error", hashErr) } return nil, capabilities.ResponseMetadata{}, hashErr } - reply, err := wr.buildRevertReplyFromTx(ctx, txHash, info, transmissionID) + reply, err := wr.buildRevertReplyFromTx(ctx, request, telemetryContext, txHash, info, transmissionID) if err != nil { return nil, capabilities.ResponseMetadata{}, revertReplyBuildError(info, transmissionID, err) } return reply, capabilities.ResponseMetadata{}, nil case TransmissionStateNotAttempted: default: + wr.emitInvalidTransmissionState(ctx, request, telemetryContext, info, transmissionID, "WriteReport invalid transmission state during pre-submit poll", invalidTransmissionStateError(info.State).Error()) return nil, capabilities.ResponseMetadata{}, invalidTransmissionStateError(info.State) } @@ -149,10 +184,16 @@ func (wr *writeReport) execute( return nil, capabilities.ResponseMetadata{}, fmt.Errorf("%s report size exceeds limit: %w", capcommon.UserError, err) } + submitStart := time.Now() submitResp, err := wr.forwarderClient.InvokeOnReport(ctx, request.ContractId, request.Report) if err != nil { return nil, capabilities.ResponseMetadata{}, err } + monitoring.EmitInitiated(ctx, wr.lggr, wr.beholderProcessor, wr.messageBuilder.BuildWriteReportInvokeOnReportDuration( + telemetryContext, + int64(math.Max(float64(time.Since(submitStart).Milliseconds()), 0)), + submitTxStatusCode(submitResp), + )) // Poll for the canonical on-chain transmission state. The forwarder may record the // outcome after the tx confirms; retry until it is visible or the context expires. @@ -171,7 +212,7 @@ func (wr *writeReport) execute( // submit where another node's tx succeeded). Prefer the canonical event hash over local TXM data. wr.lggr.Warnw("Failed to poll transmission info after submit, attempting event-based tx hash lookup", "error", pollErr) if txHash, lookupErr := txHashRetriever.GetSuccessfulTransmissionHash(ctx); lookupErr == nil { - reply, buildErr := wr.buildSuccessReply(ctx, txHash) + reply, buildErr := wr.buildSuccessReply(ctx, request, telemetryContext, txHash) return reply, wr.meteringFromReply(reply), buildErr } wr.lggr.Warnw("Failed to poll transmission info after submit, returning reply from TXM outcome", "error", pollErr) @@ -188,14 +229,17 @@ func (wr *writeReport) execute( if submitResp.TxStatus != stellartypes.TxSuccess && submitResp.TxHash != "" && submitResp.TxHash != txHash { wr.lggr.Infow("Made a new transmission attempt - transmission succeeded, but local submit did not confirm (likely duplicate)", "localTxHash", submitResp.TxHash, "txHash", txHash) + monitoring.LogAndEmitSuccess(ctx, "Made a new transmission attempt - transmission succeeded, but local submit did not confirm (likely duplicate)", + wr.lggr, wr.beholderProcessor, + wr.messageBuilder.BuildWriteReportDuplicateTx(telemetryContext, request, submitResp.TxHash, txHash)) } - reply, err := wr.buildSuccessReply(ctx, txHash) + reply, err := wr.buildSuccessReply(ctx, request, telemetryContext, txHash) return reply, wr.meteringFromReply(reply), err case TransmissionStateFailed, TransmissionStateInvalidReceiver: txHash, err := txHashRetriever.GetFailedTransmissionHash(ctx) if err != nil { if errors.Is(err, ErrUnexpectedSuccessfulTransmission) { - wr.lggr.Errorw("Made a new transmission attempt - unexpected successful transmission while state is failed", "error", err) + wr.emitInvalidTransmissionState(ctx, request, telemetryContext, postInfo, transmissionID, "WriteReport unexpected successful transmission", err.Error()) } else { wr.lggr.Errorw("Made a new transmission attempt - transmission failed, unable to retrieve failed transmission tx hash", "error", err, "localTxHash", submitResp.TxHash) } @@ -204,15 +248,19 @@ func (wr *writeReport) execute( if submitResp.TxHash != "" && submitResp.TxHash != txHash { wr.lggr.Infow("Made a new transmission attempt - transmission failed, but local submit hash differs from canonical failed transmission", "localTxHash", submitResp.TxHash, "txHash", txHash) + monitoring.LogAndEmitSuccess(ctx, "Made a new transmission attempt - transmission failed, but local submit hash differs from canonical failed transmission", + wr.lggr, wr.beholderProcessor, + wr.messageBuilder.BuildWriteReportDuplicateTx(telemetryContext, request, submitResp.TxHash, txHash)) } wr.lggr.Errorw("Made a new transmission attempt - transmission failed", "txHash", txHash, "transmissionState", postInfo.State) - reply, err := wr.buildRevertReplyFromTx(ctx, txHash, postInfo, transmissionID) + reply, err := wr.buildRevertReplyFromTx(ctx, request, telemetryContext, txHash, postInfo, transmissionID) if err != nil { return nil, wr.meteringFromReply(reply), revertReplyBuildError(postInfo, transmissionID, err) } return reply, wr.meteringFromReply(reply), nil default: wr.lggr.Errorw("Invalid transmission state after submit", "state", postInfo.State, "localTxStatus", submitResp.TxStatus) + wr.emitInvalidTransmissionState(ctx, request, telemetryContext, postInfo, transmissionID, "WriteReport invalid transmission state after submit", invalidTransmissionStateError(postInfo.State).Error()) return nil, capabilities.ResponseMetadata{}, invalidTransmissionStateError(postInfo.State) } } @@ -284,6 +332,8 @@ func getTransmissionID(workflowExecutionID string, request *stellarcap.WriteRepo // failure can be observed without spending fees on a duplicate submit. func (wr *writeReport) pollTransmissionInfo( ctx context.Context, + request *stellarcap.WriteReportRequest, + telemetryContext monitoring.TelemetryContext, transmissionID TransmissionID, queuePosition int, ) (lastValidInfo TransmissionInfo, err error) { @@ -298,8 +348,16 @@ func (wr *writeReport) pollTransmissionInfo( attempt := 0 stageTimer := time.NewTimer(delay) + deltaStagePassed := false hadSuccessfulPoll := false - defer stageTimer.Stop() + defer func() { + stageTimer.Stop() + if wr.monitoringEnabled() && !deltaStagePassed && hadSuccessfulPoll { + monitoring.LogAndEmitSuccess(ctx, "Transmission found before delta stage has passed", + wr.lggr, wr.beholderProcessor, + wr.messageBuilder.BuildWriteReportSuccessfulEarlyReturn(telemetryContext)) + } + }() for { if info, infoErr := wr.forwarderClient.GetTransmissionInfo(ctx, transmissionID); infoErr != nil { @@ -312,7 +370,9 @@ func (wr *writeReport) pollTransmissionInfo( return lastValidInfo, nil case TransmissionStateNotAttempted: default: - wr.lggr.Warnw("Unexpected transmission state during polling, continuing", "state", lastValidInfo.State) + wr.emitInvalidTransmissionState(ctx, request, telemetryContext, lastValidInfo, transmissionID, + "Unexpected transmission state; continuing to poll", + invalidTransmissionStateError(lastValidInfo.State).Error()) } } @@ -326,6 +386,7 @@ func (wr *writeReport) pollTransmissionInfo( case <-ctx.Done(): return TransmissionInfo{}, fmt.Errorf("timed out waiting for transmission info") case <-stageTimer.C: + deltaStagePassed = true if lastValidInfo.State == TransmissionStateNotAttempted { if finalInfo, finalErr := wr.forwarderClient.GetTransmissionInfo(ctx, transmissionID); finalErr == nil { hadSuccessfulPoll = true @@ -356,13 +417,25 @@ func invalidTransmissionStateError(state TransmissionState) error { return fmt.Errorf("unexpected transmission state: %d", state) } -func (wr *writeReport) buildSuccessReply(ctx context.Context, txHash string) (*stellarcap.WriteReportReply, error) { - return wr.replyFromTransaction(ctx, txHash, stellarcap.ReceiverContractExecutionStatus_RECEIVER_CONTRACT_EXECUTION_STATUS_SUCCESS, nil) +func (wr *writeReport) buildSuccessReply( + ctx context.Context, + request *stellarcap.WriteReportRequest, + telemetryContext monitoring.TelemetryContext, + txHash string, +) (*stellarcap.WriteReportReply, error) { + return wr.replyFromTransaction(ctx, request, telemetryContext, txHash, stellarcap.ReceiverContractExecutionStatus_RECEIVER_CONTRACT_EXECUTION_STATUS_SUCCESS, nil) } -func (wr *writeReport) buildRevertReplyFromTx(ctx context.Context, txHash string, transmissionInfo TransmissionInfo, transmissionID TransmissionID) (*stellarcap.WriteReportReply, error) { +func (wr *writeReport) buildRevertReplyFromTx( + ctx context.Context, + request *stellarcap.WriteReportRequest, + telemetryContext monitoring.TelemetryContext, + txHash string, + transmissionInfo TransmissionInfo, + transmissionID TransmissionID, +) (*stellarcap.WriteReportReply, error) { errorMessage := revertReason(transmissionInfo, transmissionID) - return wr.replyFromTransaction(ctx, txHash, stellarcap.ReceiverContractExecutionStatus_RECEIVER_CONTRACT_EXECUTION_STATUS_REVERTED, &errorMessage) + return wr.replyFromTransaction(ctx, request, telemetryContext, txHash, stellarcap.ReceiverContractExecutionStatus_RECEIVER_CONTRACT_EXECUTION_STATUS_REVERTED, &errorMessage) } func revertReason(transmissionInfo TransmissionInfo, transmissionID TransmissionID) string { @@ -376,11 +449,22 @@ func revertReplyBuildError(transmissionInfo TransmissionInfo, transmissionID Tra return fmt.Errorf("%s %s: this is the root cause, but an additional error occurred while fetching more info: %w", capcommon.UserError, revertReason(transmissionInfo, transmissionID), err) } -func (wr *writeReport) replyFromTransaction(ctx context.Context, txHash string, receiverStatus stellarcap.ReceiverContractExecutionStatus, errorMessage *string) (*stellarcap.WriteReportReply, error) { +func (wr *writeReport) replyFromTransaction( + ctx context.Context, + request *stellarcap.WriteReportRequest, + telemetryContext monitoring.TelemetryContext, + txHash string, + receiverStatus stellarcap.ReceiverContractExecutionStatus, + errorMessage *string, +) (*stellarcap.WriteReportReply, error) { txResp, err := capcommon.WithQuickRetry(ctx, wr.lggr, func(ctx context.Context) (stellartypes.GetTransactionResponse, error) { return wr.service.GetTransaction(ctx, stellartypes.GetTransactionRequest{TxHash: txHash}) }) if err != nil { + if wr.monitoringEnabled() { + monitoring.LogAndEmitError(ctx, wr.lggr, wr.beholderProcessor, + wr.messageBuilder.BuildWriteReportTxInfoRetrievalError(telemetryContext, request, txHash, err.Error())) + } return nil, fmt.Errorf("failed to get transaction for tx hash %s: %w", txHash, err) } @@ -457,3 +541,52 @@ func populateReplyFromSubmit(reply *stellarcap.WriteReportReply, resp *stellarty reply.BlockTimestamp = resp.BlockTimestamp } } + +func transmissionDebugID(id TransmissionID) string { + return fmt.Sprintf("%s:%s:%s", id.Receiver, id.ReportIDHex(), id.WorkflowExecutionIDHex()) +} + +func submitTxStatusCode(resp *stellartypes.SubmitTransactionResponse) int32 { + if resp == nil { + return -1 + } + switch resp.TxStatus { + case stellartypes.TxSuccess: + return 0 + case stellartypes.TxFailed: + return 1 + case stellartypes.TxFatal: + return 2 + default: + return -1 + } +} + +func (wr *writeReport) monitoringEnabled() bool { + return wr.messageBuilder != nil && wr.beholderProcessor != nil +} + +func (wr *writeReport) emitInvalidTransmissionState( + ctx context.Context, + request *stellarcap.WriteReportRequest, + telemetryContext monitoring.TelemetryContext, + info TransmissionInfo, + transmissionID TransmissionID, + summary, cause string, +) { + if !wr.monitoringEnabled() { + return + } + monitoring.LogAndEmitError(ctx, wr.lggr, wr.beholderProcessor, + wr.messageBuilder.BuildWriteReportInvalidTransmissionState( + telemetryContext, + request, + uint32(info.State), + info.InvalidReceiver, + info.Success, + transmissionDebugID(transmissionID), + info.Transmitter, + summary, + cause, + )) +} diff --git a/chain_capabilities/stellar/actions/write_report_test.go b/chain_capabilities/stellar/actions/write_report_test.go index a1a23c375..8dd2bce5a 100644 --- a/chain_capabilities/stellar/actions/write_report_test.go +++ b/chain_capabilities/stellar/actions/write_report_test.go @@ -12,6 +12,8 @@ import ( "github.com/stellar/go-stellar-sdk/xdr" + "google.golang.org/protobuf/proto" + p2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -696,6 +698,7 @@ func TestWriteReport_Submit(t *testing.T) { t.Run("submit superseded by prior success - post-submit succeeds", func(t *testing.T) { t.Parallel() h := newWriteReportHelper(t) + processor := h.withRecordingProcessor() rm, reqMeta, req := newWRReportFixture(t) h.expectSigningAccount(t) @@ -725,11 +728,13 @@ func TestWriteReport_Submit(t *testing.T) { require.NotNil(t, result.Response.TxHash) require.Equal(t, testTxHash, *result.Response.TxHash) require.NotEqual(t, "mytx", *result.Response.TxHash) + requireDuplicateTxTelemetry(t, processor.messages, "mytx", testTxHash) }) t.Run("submit superseded by prior invalid receiver - post-submit returns canonical failed hash", func(t *testing.T) { t.Parallel() h := newWriteReportHelper(t) + processor := h.withRecordingProcessor() rm, reqMeta, req := newWRReportFixture(t) h.expectSigningAccount(t) @@ -755,6 +760,7 @@ func TestWriteReport_Submit(t *testing.T) { require.NotEqual(t, "mytx", *result.Response.TxHash) require.NotNil(t, result.Response.ErrorMessage) require.Contains(t, *result.Response.ErrorMessage, "not a Wasm contract or missing on_report") + requireDuplicateTxTelemetry(t, processor.messages, "mytx", testTxHash) }) } @@ -874,6 +880,7 @@ func newPollTransmissionInfoHarness(t *testing.T, deltaStage time.Duration) ( *writeReport, *mocks.StellarService, TransmissionID, + *stellarcap.WriteReportRequest, ) { t.Helper() lggr := logger.Test(t) @@ -895,7 +902,7 @@ func newPollTransmissionInfoHarness(t *testing.T, deltaStage time.Duration) ( _, reqMeta, req := newWRReportFixture(t) transmissionID, err := getTransmissionID(reqMeta.WorkflowExecutionID, req) require.NoError(t, err) - return wr, mockSvc, transmissionID + return wr, mockSvc, transmissionID, req } func expectTransmissionInfoPoll(mockSvc *mocks.StellarService, xdrResult string, err error) { @@ -936,11 +943,11 @@ func TestPollTransmissionInfo_QueuePositionScenarios(t *testing.T) { for _, queuePosition := range []int{1, 2, 3} { t.Run("queue position "+strconv.Itoa(queuePosition), func(t *testing.T) { t.Parallel() - wr, mockSvc, transmissionID := newPollTransmissionInfoHarness(t, 5*time.Second) + wr, mockSvc, transmissionID, req := newPollTransmissionInfoHarness(t, 5*time.Second) expectTransmissionInfoPoll(mockSvc, tc.xdr(t), nil) start := time.Now() - info, err := wr.pollTransmissionInfo(ctx, transmissionID, queuePosition) + info, err := wr.pollTransmissionInfo(ctx, req, monitoring.TelemetryContext{}, transmissionID, queuePosition) require.NoError(t, err) require.Equal(t, tc.state, info.State) require.Less(t, time.Since(start), 500*time.Millisecond) @@ -953,11 +960,11 @@ func TestPollTransmissionInfo_QueuePositionScenarios(t *testing.T) { t.Run("not attempted waits until delta stage then returns", func(t *testing.T) { t.Parallel() const queuePosition = 2 - wr, mockSvc, transmissionID := newPollTransmissionInfoHarness(t, 150*time.Millisecond) + wr, mockSvc, transmissionID, req := newPollTransmissionInfoHarness(t, 150*time.Millisecond) expectTransmissionInfoPollMaybe(mockSvc, notAttemptedXDR(t), nil) start := time.Now() - info, err := wr.pollTransmissionInfo(ctx, transmissionID, queuePosition) + info, err := wr.pollTransmissionInfo(ctx, req, monitoring.TelemetryContext{}, transmissionID, queuePosition) require.NoError(t, err) require.Equal(t, TransmissionStateNotAttempted, info.State) require.GreaterOrEqual(t, time.Since(start), 100*time.Millisecond) @@ -965,10 +972,10 @@ func TestPollTransmissionInfo_QueuePositionScenarios(t *testing.T) { t.Run("position zero uses quick retry", func(t *testing.T) { t.Parallel() - wr, mockSvc, transmissionID := newPollTransmissionInfoHarness(t, 5*time.Second) + wr, mockSvc, transmissionID, req := newPollTransmissionInfoHarness(t, 5*time.Second) expectTransmissionInfoPoll(mockSvc, succeededXDR(t), nil) - info, err := wr.pollTransmissionInfo(ctx, transmissionID, 0) + info, err := wr.pollTransmissionInfo(ctx, req, monitoring.TelemetryContext{}, transmissionID, 0) require.NoError(t, err) require.Equal(t, TransmissionStateSucceeded, info.State) }) @@ -982,7 +989,7 @@ func TestPollTransmissionInfo_RaceConditions(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) defer cancel() - wr, mockSvc, transmissionID := newPollTransmissionInfoHarness(t, 150*time.Millisecond) + wr, mockSvc, transmissionID, req := newPollTransmissionInfoHarness(t, 150*time.Millisecond) var chainStateUpdated atomic.Bool go func() { time.Sleep(120 * time.Millisecond) @@ -1001,7 +1008,7 @@ func TestPollTransmissionInfo_RaceConditions(t *testing.T) { }). Maybe() - info, err := wr.pollTransmissionInfo(ctx, transmissionID, 1) + info, err := wr.pollTransmissionInfo(ctx, req, monitoring.TelemetryContext{}, transmissionID, 1) require.NoError(t, err) require.True(t, chainStateUpdated.Load()) require.Equal(t, TransmissionStateSucceeded, info.State) @@ -1012,7 +1019,7 @@ func TestPollTransmissionInfo_RaceConditions(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) defer cancel() - wr, mockSvc, transmissionID := newPollTransmissionInfoHarness(t, 50*time.Millisecond) + wr, mockSvc, transmissionID, req := newPollTransmissionInfoHarness(t, 50*time.Millisecond) var rpcCalls atomic.Int64 mockSvc.EXPECT(). SimulateTransaction(mock.Anything, mock.MatchedBy(func(req stellartypes.SimulateTransactionRequest) bool { @@ -1024,7 +1031,7 @@ func TestPollTransmissionInfo_RaceConditions(t *testing.T) { }). Maybe() - _, err := wr.pollTransmissionInfo(ctx, transmissionID, 2) + _, err := wr.pollTransmissionInfo(ctx, req, monitoring.TelemetryContext{}, transmissionID, 2) require.Greater(t, rpcCalls.Load(), int64(0)) require.Error(t, err) require.Contains(t, err.Error(), "all GetTransmissionInfo polls failed during delta stage window") @@ -1033,7 +1040,7 @@ func TestPollTransmissionInfo_RaceConditions(t *testing.T) { t.Run("context cancel returns timeout error", func(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(t.Context()) - wr, mockSvc, transmissionID := newPollTransmissionInfoHarness(t, 5*time.Second) + wr, mockSvc, transmissionID, req := newPollTransmissionInfoHarness(t, 5*time.Second) expectTransmissionInfoPollMaybe(mockSvc, notAttemptedXDR(t), nil) go func() { @@ -1041,7 +1048,7 @@ func TestPollTransmissionInfo_RaceConditions(t *testing.T) { cancel() }() - _, err := wr.pollTransmissionInfo(ctx, transmissionID, 2) + _, err := wr.pollTransmissionInfo(ctx, req, monitoring.TelemetryContext{}, transmissionID, 2) require.Error(t, err) require.Contains(t, err.Error(), "timed out waiting for transmission info") }) @@ -1118,7 +1125,7 @@ func TestReplyBuilders(t *testing.T) { LedgerCloseTime: int64(testBlockTimestamp / 1_000_000), }, nil).Once() - reply, err := wr.buildSuccessReply(t.Context(), testTxHash) + reply, err := wr.buildSuccessReply(t.Context(), req, monitoring.TelemetryContext{}, testTxHash) require.NoError(t, err) require.Equal(t, stellarcap.TxStatus_TX_STATUS_SUCCESS, reply.TxStatus) require.Equal(t, testTxHash, *reply.TxHash) @@ -1131,7 +1138,7 @@ func TestReplyBuilders(t *testing.T) { mockSvc.EXPECT().GetTransaction(mock.Anything, stellartypes.GetTransactionRequest{TxHash: testTxHash}). Return(stellartypes.GetTransactionResponse{FeeStroops: testFee}, nil).Once() - reply, err := wr.buildRevertReplyFromTx(t.Context(), testTxHash, TransmissionInfo{State: TransmissionStateInvalidReceiver}, transmissionID) + reply, err := wr.buildRevertReplyFromTx(t.Context(), req, monitoring.TelemetryContext{}, testTxHash, TransmissionInfo{State: TransmissionStateInvalidReceiver}, transmissionID) require.NoError(t, err) require.Equal(t, stellarcap.TxStatus_TX_STATUS_REVERTED, reply.TxStatus) require.Contains(t, *reply.ErrorMessage, "not a Wasm contract") @@ -1218,9 +1225,366 @@ func TestWriteReport_PostSubmitPollRecoversFromEvents(t *testing.T) { require.Equal(t, testTxHash, *result.Response.TxHash) } +func (h *writeReportHelper) withRecordingProcessor() *recordingWriteReportProcessor { + processor := &recordingWriteReportProcessor{} + h.stellar.beholderProcessor = processor + return processor +} + +func newQueuedWriteReportHelper(t *testing.T) *writeReportHelper { + t.Helper() + lggr := logger.Test(t) + mockSvc := mocks.NewStellarService(t) + scheduler := ts.NewTransmissionScheduler( + p2ptypes.PeerID{2}, + []p2ptypes.PeerID{{1}, {2}, {3}}, + 5*time.Second, + 0, + lggr, + ) + s := &Stellar{ + StellarService: mockSvc, + lggr: logger.Sugared(lggr), + chainSelector: testWRChainSelector, + forwarderClient: newForwarderClient(mockSvc, lggr, testForwarderAddress, 100), + forwarderLookbackLedgers: 100, + transmissionScheduler: scheduler, + messageBuilder: monitoring.NewMessageBuilder(types.ChainInfo{}, capabilities.CapabilityInfo{}, ""), + beholderProcessor: nopBeholderProcessor{}, + handler: testConsensusHandler{handle: runVolatileHashableHandle}, + } + require.NoError(t, s.initLimiters(limits.Factory{Logger: lggr})) + return &writeReportHelper{svc: mockSvc, stellar: s} +} + +func hasTelemetryMessage[T proto.Message](messages []proto.Message) bool { + for _, msg := range messages { + if _, ok := msg.(T); ok { + return true + } + } + return false +} + +func requireTxHashPhaseEvent( + t *testing.T, + messages []proto.Message, + lookupType, phase, result, txHash string, +) *monitoring.WriteReportTxHashRetrievalPhase { + t.Helper() + for _, msg := range messages { + phaseMsg, ok := msg.(*monitoring.WriteReportTxHashRetrievalPhase) + if !ok { + continue + } + if phaseMsg.GetLookupType() != lookupType || phaseMsg.GetResult() != result { + continue + } + if phase != "" && phaseMsg.GetPhase() != phase { + continue + } + if txHash != "" && phaseMsg.GetTxHash() != txHash { + continue + } + return phaseMsg + } + t.Fatalf("missing tx hash phase event lookupType=%s phase=%s result=%s txHash=%s", lookupType, phase, result, txHash) + return nil +} + +func requireInvokeOnReportDuration(t *testing.T, messages []proto.Message, txStatus int32) *monitoring.WriteReportInvokeOnReportDuration { + t.Helper() + for _, msg := range messages { + if duration, ok := msg.(*monitoring.WriteReportInvokeOnReportDuration); ok { + require.EqualValues(t, txStatus, duration.GetTxStatus()) + require.NotNil(t, duration.GetExecutionContext()) + return duration + } + } + t.Fatal("missing WriteReportInvokeOnReportDuration telemetry") + return nil +} + +func requireDuplicateTxTelemetry(t *testing.T, messages []proto.Message, duplicateHash, canonicalHash string) { + t.Helper() + for _, msg := range messages { + dup, ok := msg.(*monitoring.WriteReportDuplicateTx) + if !ok { + continue + } + require.Equal(t, duplicateHash, dup.GetDuplicateTxHash()) + require.Equal(t, canonicalHash, dup.GetCanonicalTxHash()) + return + } + t.Fatalf("missing WriteReportDuplicateTx telemetry duplicate=%s canonical=%s", duplicateHash, canonicalHash) +} + +type recordingWriteReportProcessor struct { + messages []proto.Message +} + +func (r *recordingWriteReportProcessor) Process(_ context.Context, msg proto.Message, _ ...any) error { + r.messages = append(r.messages, msg) + return nil +} + +func TestWriteReport_EmitsTxInfoRetrievalErrorTelemetry(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + processor := h.withRecordingProcessor() + rm, reqMeta, req := newWRReportFixture(t) + + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(failedXDR(t)), nil).Once() + h.svc.EXPECT().GetLatestLedger(mock.Anything). + Return(stellartypes.GetLatestLedgerResponse{Sequence: 200}, nil).Once() + h.svc.EXPECT().GetEvents(mock.Anything, mock.Anything). + Return(reportProcessedEventsForFixture(t, rm, req.ContractId, false), nil).Once() + h.svc.EXPECT().GetTransaction(mock.Anything, stellartypes.GetTransactionRequest{TxHash: testTxHash}). + Return(stellartypes.GetTransactionResponse{}, errors.New("rpc down")).Maybe() + + _, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.NotNil(t, capErr) + require.True(t, hasTelemetryMessage[*monitoring.WriteReportTxInfoRetrievalError](processor.messages)) +} + +func TestWriteReport_EmitsInvalidTransmissionStateOnUnexpectedSuccess(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + processor := h.withRecordingProcessor() + rm, reqMeta, req := newWRReportFixture(t) + + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(failedXDR(t)), nil).Once() + h.svc.EXPECT().GetLatestLedger(mock.Anything). + Return(stellartypes.GetLatestLedgerResponse{Sequence: 200}, nil).Once() + h.svc.EXPECT().GetEvents(mock.Anything, mock.Anything). + Return(reportProcessedEventsForFixture(t, rm, req.ContractId, true), nil).Once() + + _, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.NotNil(t, capErr) + require.True(t, hasTelemetryMessage[*monitoring.WriteReportInvalidTransmissionState](processor.messages)) +} + +func TestWriteReport_EmitsInvokeOnReportDurationTelemetry(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + processor := h.withRecordingProcessor() + rm, reqMeta, req := newWRReportFixture(t) + h.expectSigningAccount(t) + + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(notAttemptedXDR(t)), nil).Once() + h.svc.EXPECT().SubmitTransaction(mock.Anything, mock.Anything). + Return(successSubmitResp(), nil).Once() + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(succeededXDR(t)), nil).Once() + h.expectPostSubmitSuccessTxLookup(t, rm, req.ContractId) + + _, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.Nil(t, capErr) + requireInvokeOnReportDuration(t, processor.messages, 0) +} + +func TestWriteReport_EmitsSuccessfulEarlyReturnTelemetry(t *testing.T) { + t.Parallel() + h := newQueuedWriteReportHelper(t) + processor := h.withRecordingProcessor() + rm, reqMeta, req := newWRReportFixture(t) + + transmissionID, err := getTransmissionID(reqMeta.WorkflowExecutionID, req) + require.NoError(t, err) + scheduleKey, err := transmissionID.ScheduleKey() + require.NoError(t, err) + queuePosition := h.stellar.transmissionScheduler.GetQueuePosition(hex.EncodeToString(scheduleKey[:])) + if queuePosition <= 0 { + t.Skip("fixture resolves to queue position 0 in this 3-node schedule") + } + + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(succeededXDR(t)), nil).Once() + h.expectObservedTxHashLookup(t, rm, req.ContractId, true) + + _, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.Nil(t, capErr) + require.True(t, hasTelemetryMessage[*monitoring.WriteReportSuccessfulEarlyReturn](processor.messages)) +} + +func TestWriteReport_EmitsLifecycleTelemetry(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + processor := &recordingWriteReportProcessor{} + h.stellar.beholderProcessor = processor + + rm, reqMeta, req := newWRReportFixture(t) + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(succeededXDR(t)), nil).Once() + h.expectObservedTxHashLookup(t, rm, req.ContractId, true) + + _, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.Nil(t, capErr) + + var sawInitiated, sawSuccess bool + for _, msg := range processor.messages { + switch msg.(type) { + case *monitoring.WriteReportInitiated: + sawInitiated = true + case *monitoring.WriteReportSuccess: + sawSuccess = true + case *monitoring.WriteReportSuccessfulEarlyReturn: + t.Fatalf("unexpected early return telemetry on direct success path") + } + } + require.True(t, sawInitiated) + require.True(t, sawSuccess) +} + +func TestTxHashRetriever_EmitsRetrievalPhaseTelemetry(t *testing.T) { + t.Parallel() + processor := &recordingWriteReportProcessor{} + lggr := logger.Test(t) + mockSvc := mocks.NewStellarService(t) + rm, reqMeta, req := newWRReportFixture(t) + transmissionID, err := getTransmissionID(reqMeta.WorkflowExecutionID, req) + require.NoError(t, err) + + mockSvc.EXPECT().GetLatestLedger(mock.Anything). + Return(stellartypes.GetLatestLedgerResponse{Sequence: 200}, nil).Once() + mockSvc.EXPECT().GetEvents(mock.Anything, mock.Anything). + Return(reportProcessedEventsForFixture(t, rm, req.ContractId, true), nil).Once() + + retriever := NewTxHashRetriever( + newForwarderClient(mockSvc, lggr, testForwarderAddress, 100), + logger.Sugared(lggr), + transmissionID, + WithTxHashRetrieverMonitoring( + processor, + monitoring.NewMessageBuilder(types.ChainInfo{}, capabilities.CapabilityInfo{}, ""), + monitoring.TelemetryContext{}, + ), + ) + + hash, err := retriever.GetSuccessfulTransmissionHash(t.Context()) + require.NoError(t, err) + require.Equal(t, testTxHash, hash) + requireTxHashPhaseEvent(t, processor.messages, "SuccessfulTransmission", txHashRetrievalPhase, "Found", testTxHash) +} + func TestIsUserErrorWriteReport(t *testing.T) { t.Parallel() h := newWriteReportHelper(t) require.True(t, h.stellar.isUserErrorWriteReport(errors.New(capcommon.UserError+" invalid receiver"))) require.False(t, h.stellar.isUserErrorWriteReport(errors.New("system failure"))) } + +func TestSubmitTxStatusCode(t *testing.T) { + t.Parallel() + require.Equal(t, int32(-1), submitTxStatusCode(nil)) + require.Equal(t, int32(0), submitTxStatusCode(successSubmitResp())) + require.Equal(t, int32(1), submitTxStatusCode(&stellartypes.SubmitTransactionResponse{TxStatus: stellartypes.TxFailed})) + require.Equal(t, int32(2), submitTxStatusCode(&stellartypes.SubmitTransactionResponse{TxStatus: stellartypes.TxFatal})) + require.Equal(t, int32(-1), submitTxStatusCode(&stellartypes.SubmitTransactionResponse{TxStatus: stellartypes.TransactionStatus(99)})) +} + +func TestWriteReport_EmitsInvalidTransmissionStatePreSubmitWithStubForwarder(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + processor := h.withRecordingProcessor() + _, reqMeta, req := newWRReportFixture(t) + + h.stellar.forwarderClient = &stubForwarderClient{ + transmissionInfoFn: func(int) (TransmissionInfo, error) { + return TransmissionInfo{State: TransmissionState(99)}, nil + }, + } + + _, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.NotNil(t, capErr) + require.True(t, hasTelemetryMessage[*monitoring.WriteReportInvalidTransmissionState](processor.messages)) +} + +func TestWriteReport_EmitsInvalidTransmissionStateAfterSubmitWithStubForwarder(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + processor := h.withRecordingProcessor() + _, reqMeta, req := newWRReportFixture(t) + + call := 0 + h.stellar.forwarderClient = &stubForwarderClient{ + transmissionInfoFn: func(int) (TransmissionInfo, error) { + call++ + if call == 1 { + return TransmissionInfo{State: TransmissionStateNotAttempted}, nil + } + return TransmissionInfo{State: TransmissionState(99)}, nil + }, + invokeOnReportResp: successSubmitResp(), + } + + _, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.NotNil(t, capErr) + require.True(t, hasTelemetryMessage[*monitoring.WriteReportInvalidTransmissionState](processor.messages)) +} + +func TestPollTransmissionInfo_EmitsInvalidTransmissionStateWithStubForwarder(t *testing.T) { + t.Parallel() + lggr := logger.Test(t) + processor := &recordingWriteReportProcessor{} + scheduler := ts.NewTransmissionScheduler( + p2ptypes.PeerID{2}, + []p2ptypes.PeerID{{1}, {2}, {3}}, + 150*time.Millisecond, + 0, + lggr, + ) + call := 0 + stub := &stubForwarderClient{ + transmissionInfoFn: func(int) (TransmissionInfo, error) { + call++ + if call == 1 { + return TransmissionInfo{State: TransmissionState(99)}, nil + } + return TransmissionInfo{State: TransmissionStateNotAttempted}, nil + }, + } + wr := &writeReport{ + forwarderClient: stub, + lggr: logger.Sugared(lggr), + transmissionScheduler: scheduler, + messageBuilder: monitoring.NewMessageBuilder(types.ChainInfo{}, capabilities.CapabilityInfo{}, ""), + beholderProcessor: processor, + } + _, reqMeta, req := newWRReportFixture(t) + transmissionID, err := getTransmissionID(reqMeta.WorkflowExecutionID, req) + require.NoError(t, err) + + ctx, cancel := context.WithDeadline(t.Context(), time.Now().Add(500*time.Millisecond)) + defer cancel() + + _, err = wr.pollTransmissionInfo(ctx, req, monitoring.TelemetryContext{}, transmissionID, 2) + require.NoError(t, err) + require.True(t, hasTelemetryMessage[*monitoring.WriteReportInvalidTransmissionState](processor.messages)) +} + +func TestWriteReport_EmitsInvalidTransmissionStateOnPostSubmitUnexpectedSuccess(t *testing.T) { + t.Parallel() + h := newWriteReportHelper(t) + processor := h.withRecordingProcessor() + rm, reqMeta, req := newWRReportFixture(t) + h.expectSigningAccount(t) + + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(notAttemptedXDR(t)), nil).Once() + h.svc.EXPECT().SubmitTransaction(mock.Anything, mock.Anything). + Return(successSubmitResp(), nil).Once() + h.svc.EXPECT().SimulateTransaction(mock.Anything, mock.Anything). + Return(transmissionResp(failedXDR(t)), nil).Once() + h.svc.EXPECT().GetLatestLedger(mock.Anything). + Return(stellartypes.GetLatestLedgerResponse{Sequence: 200}, nil).Once() + h.svc.EXPECT().GetEvents(mock.Anything, mock.Anything). + Return(reportProcessedEventsForFixture(t, rm, req.ContractId, true), nil).Once() + + _, capErr := h.stellar.WriteReport(t.Context(), reqMeta, req) + require.NotNil(t, capErr) + require.True(t, hasTelemetryMessage[*monitoring.WriteReportInvalidTransmissionState](processor.messages)) +} diff --git a/chain_capabilities/stellar/go.mod b/chain_capabilities/stellar/go.mod index b56025c35..7b9ccf5ba 100644 --- a/chain_capabilities/stellar/go.mod +++ b/chain_capabilities/stellar/go.mod @@ -106,7 +106,7 @@ require ( go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.36.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.36.0 // indirect go.opentelemetry.io/otel/log v0.19.0 // indirect - go.opentelemetry.io/otel/metric v1.43.0 // indirect + go.opentelemetry.io/otel/metric v1.43.0 go.opentelemetry.io/otel/sdk v1.43.0 // indirect go.opentelemetry.io/otel/sdk/log v0.19.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.43.0 // indirect diff --git a/chain_capabilities/stellar/monitoring/generate.go b/chain_capabilities/stellar/monitoring/generate.go index ea92d4189..4e1d27579 100644 --- a/chain_capabilities/stellar/monitoring/generate.go +++ b/chain_capabilities/stellar/monitoring/generate.go @@ -1,2 +1,3 @@ //go:generate protoc --proto_path=../../../ --go_out=paths=source_relative:../../../ --go-grpc_out=paths=source_relative:../../../ chain_capabilities/stellar/monitoring/read_contract.proto +//go:generate protoc --proto_path=../../../ --go_out=paths=source_relative:../../../ --go-grpc_out=paths=source_relative:../../../ chain_capabilities/stellar/monitoring/write_report.proto package monitoring diff --git a/chain_capabilities/stellar/monitoring/messages.go b/chain_capabilities/stellar/monitoring/messages.go index 64cba7f1d..a7b1682c6 100644 --- a/chain_capabilities/stellar/monitoring/messages.go +++ b/chain_capabilities/stellar/monitoring/messages.go @@ -1,18 +1,25 @@ package monitoring import ( + "fmt" "strconv" "go.opentelemetry.io/otel/attribute" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" caperrors "github.com/smartcontractkit/chainlink-common/pkg/capabilities/errors" + stellarcap "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/stellar" "github.com/smartcontractkit/chainlink-common/pkg/types" stellartypes "github.com/smartcontractkit/chainlink-common/pkg/types/chains/stellar" commonmon "github.com/smartcontractkit/capabilities/chain_capabilities/common/monitoring" + capmonitoring "github.com/smartcontractkit/capabilities/libs/monitoring" ) +type TelemetryContext = commonmon.TelemetryContext +type Message = commonmon.Message +type ErrorMessage = commonmon.ErrorMessage + // MessageBuilder constructs telemetry messages for Stellar calls. // Embeds the common MessageBuilder for shared BuildExecutionContext and RequestLggr. type MessageBuilder struct { @@ -52,6 +59,117 @@ func (m *MessageBuilder) BuildReadContractError(tc commonmon.TelemetryContext, r } } +func (m *MessageBuilder) BuildWriteReportInitiated(tc TelemetryContext, req *stellarcap.WriteReportRequest) *WriteReportInitiated { + return &WriteReportInitiated{ + Req: convertWriteReportRequest(req), + ExecutionContext: m.BuildExecutionContext(tc), + } +} + +func (m *MessageBuilder) BuildWriteReportSuccess(tc TelemetryContext, req *stellarcap.WriteReportRequest) *WriteReportSuccess { + return &WriteReportSuccess{ + Req: convertWriteReportRequest(req), + ExecutionContext: m.BuildExecutionContext(tc), + } +} + +func (m *MessageBuilder) BuildWriteReportError(tc TelemetryContext, req *stellarcap.WriteReportRequest, summary string, err caperrors.Error) ErrorMessage { + return &WriteReportError{ + Req: convertWriteReportRequest(req), + Summary: summary, + Cause: err.Error(), + IsUserError: err.Origin() == caperrors.OriginUser, + ExecutionContext: m.BuildExecutionContext(tc), + } +} + +func (m *MessageBuilder) BuildWriteReportTxInfoRetrievalError(tc TelemetryContext, req *stellarcap.WriteReportRequest, txHash, cause string) ErrorMessage { + summary := "Failed to retrieve transaction info" + if txHash != "" { + summary = fmt.Sprintf("Failed to retrieve transaction info for tx: %s", txHash) + } + return &WriteReportTxInfoRetrievalError{ + Req: convertWriteReportRequest(req), + Summary: summary, + Cause: cause, + TxHash: txHash, + ExecutionContext: m.BuildExecutionContext(tc), + } +} + +func (m *MessageBuilder) BuildWriteReportInvalidTransmissionState( + tc TelemetryContext, + req *stellarcap.WriteReportRequest, + transmissionState uint32, + invalidReceiver, success bool, + transmissionID, transmitter, summary, cause string, +) ErrorMessage { + return &WriteReportInvalidTransmissionState{ + Req: convertWriteReportRequest(req), + Summary: summary, + Cause: cause, + TransmissionState: transmissionState, + InvalidReceiver: invalidReceiver, + Success: success, + TransmissionId: transmissionID, + Transmitter: transmitter, + ExecutionContext: m.BuildExecutionContext(tc), + } +} + +func (m *MessageBuilder) BuildWriteReportDuplicateTx(tc TelemetryContext, req *stellarcap.WriteReportRequest, duplicateTxHash, canonicalTxHash string) *WriteReportDuplicateTx { + return &WriteReportDuplicateTx{ + Req: convertWriteReportRequest(req), + DuplicateTxHash: duplicateTxHash, + CanonicalTxHash: canonicalTxHash, + ExecutionContext: m.BuildExecutionContext(tc), + } +} + +func (m *MessageBuilder) BuildWriteReportSuccessfulEarlyReturn(tc TelemetryContext) *WriteReportSuccessfulEarlyReturn { + return &WriteReportSuccessfulEarlyReturn{ + ExecutionContext: m.BuildExecutionContext(tc), + } +} + +func (m *MessageBuilder) BuildWriteReportTxHashRetrievalPhase( + tc TelemetryContext, + phase, result string, + phaseDurationMs int64, + txHash, lookupType string, +) *WriteReportTxHashRetrievalPhase { + return &WriteReportTxHashRetrievalPhase{ + Phase: phase, + Result: result, + PhaseDurationMs: phaseDurationMs, + TxHash: txHash, + LookupType: lookupType, + ExecutionContext: m.BuildExecutionContext(tc), + } +} + +func (m *MessageBuilder) BuildWriteReportInvokeOnReportDuration(tc TelemetryContext, durationMs int64, txStatus int32) *WriteReportInvokeOnReportDuration { + return &WriteReportInvokeOnReportDuration{ + DurationMs: durationMs, + TxStatus: txStatus, + ExecutionContext: m.BuildExecutionContext(tc), + } +} + +func convertWriteReportRequest(req *stellarcap.WriteReportRequest) *WriteReportRequest { + if req == nil { + return nil + } + msg := &WriteReportRequest{ + ContractId: req.GetContractId(), + } + if req.Report != nil { + msg.ReportSize = uint64(len(req.Report.GetRawReport())) + msg.SigsCount = uint32(len(req.Report.GetSigs())) //nolint:gosec // sig count is bounded by DON size + } + return msg +} + // convertReadContractRequest extracts the non-sensitive subset of the request for telemetry // (raw argument values are intentionally omitted; only the count is recorded). func convertReadContractRequest(req stellartypes.SimulateTransactionRequest) *ReadContractRequest { @@ -63,12 +181,45 @@ func convertReadContractRequest(req stellartypes.SimulateTransactionRequest) *Re } } +func executionMetricAttributes(ec *capmonitoring.ExecutionContext) []attribute.KeyValue { + return ec.MetricsAttributes() +} + +func appendExecutionLogAttributes(reqAttrs []attribute.KeyValue, ec *capmonitoring.ExecutionContext) []attribute.KeyValue { + return append(reqAttrs, ec.LogAttributes()...) +} + +func appendUserErrorLogAttributes(reqAttrs []attribute.KeyValue, summary string, isUserError bool, ec *capmonitoring.ExecutionContext) []attribute.KeyValue { + return append(append(reqAttrs, + attribute.String("summary", summary), + attribute.Bool("isUserError", isUserError), + ), ec.LogAttributes()...) +} + +func readContractRequestLogAttributes(req *ReadContractRequest) []attribute.KeyValue { + if req == nil { + return []attribute.KeyValue{ + attribute.String("contract_id", "nil request"), + attribute.String("function", ""), + } + } + attrs := []attribute.KeyValue{ + attribute.String("contract_id", req.GetContractId()), + attribute.String("function", req.GetFunction()), + attribute.String("args_count", strconv.FormatUint(req.GetArgsCount(), 10)), + } + if req.GetSourceAccount() != "" { + attrs = append(attrs, attribute.String("source_account", req.GetSourceAccount())) + } + return attrs +} + func (r *ReadContractInitiated) LogAttributes() []attribute.KeyValue { - return append(readContractRequestLogAttributes(r.Req), r.ExecutionContext.LogAttributes()...) + return appendExecutionLogAttributes(readContractRequestLogAttributes(r.Req), r.ExecutionContext) } func (r *ReadContractInitiated) MetricAttributes() []attribute.KeyValue { - return r.ExecutionContext.MetricsAttributes() + return executionMetricAttributes(r.ExecutionContext) } func (r *ReadContractSuccess) LogAttributes() []attribute.KeyValue { @@ -79,34 +230,114 @@ func (r *ReadContractSuccess) LogAttributes() []attribute.KeyValue { } func (r *ReadContractSuccess) MetricAttributes() []attribute.KeyValue { - return r.ExecutionContext.MetricsAttributes() + return executionMetricAttributes(r.ExecutionContext) } func (r *ReadContractError) LogAttributes() []attribute.KeyValue { - return append(append(readContractRequestLogAttributes(r.Req), - attribute.String("summary", r.GetSummary()), - attribute.Bool("isUserError", r.GetIsUserError()), - ), r.ExecutionContext.LogAttributes()...) + return appendUserErrorLogAttributes(readContractRequestLogAttributes(r.Req), r.GetSummary(), r.GetIsUserError(), r.ExecutionContext) } func (r *ReadContractError) MetricAttributes() []attribute.KeyValue { - return r.ExecutionContext.MetricsAttributes() + return executionMetricAttributes(r.ExecutionContext) } -func readContractRequestLogAttributes(req *ReadContractRequest) []attribute.KeyValue { +func writeReportRequestLogAttributes(req *WriteReportRequest) []attribute.KeyValue { if req == nil { - return []attribute.KeyValue{ - attribute.String("contract_id", "nil request"), - attribute.String("function", ""), - } + return []attribute.KeyValue{attribute.String("contract_id", "nil request")} } - attrs := []attribute.KeyValue{ + return []attribute.KeyValue{ attribute.String("contract_id", req.GetContractId()), - attribute.String("function", req.GetFunction()), - attribute.String("args_count", strconv.FormatUint(req.GetArgsCount(), 10)), + attribute.String("report_size", strconv.FormatUint(req.GetReportSize(), 10)), + attribute.String("sigs_count", strconv.FormatUint(uint64(req.GetSigsCount()), 10)), } - if req.GetSourceAccount() != "" { - attrs = append(attrs, attribute.String("source_account", req.GetSourceAccount())) +} + +func (r *WriteReportInitiated) LogAttributes() []attribute.KeyValue { + return appendExecutionLogAttributes(writeReportRequestLogAttributes(r.Req), r.ExecutionContext) +} + +func (r *WriteReportInitiated) MetricAttributes() []attribute.KeyValue { + return executionMetricAttributes(r.ExecutionContext) +} + +func (r *WriteReportSuccess) LogAttributes() []attribute.KeyValue { + return appendExecutionLogAttributes(writeReportRequestLogAttributes(r.Req), r.ExecutionContext) +} + +func (r *WriteReportSuccess) MetricAttributes() []attribute.KeyValue { + return executionMetricAttributes(r.ExecutionContext) +} + +func (r *WriteReportError) LogAttributes() []attribute.KeyValue { + return appendUserErrorLogAttributes(writeReportRequestLogAttributes(r.Req), r.GetSummary(), r.GetIsUserError(), r.ExecutionContext) +} + +func (r *WriteReportError) MetricAttributes() []attribute.KeyValue { + return executionMetricAttributes(r.ExecutionContext) +} + +func (r *WriteReportTxInfoRetrievalError) LogAttributes() []attribute.KeyValue { + attrs := append(writeReportRequestLogAttributes(r.Req), + attribute.String("summary", r.GetSummary()), + ) + if r.GetTxHash() != "" { + attrs = append(attrs, attribute.String("tx_hash", r.GetTxHash())) } - return attrs + return append(attrs, r.ExecutionContext.LogAttributes()...) +} + +func (r *WriteReportTxInfoRetrievalError) MetricAttributes() []attribute.KeyValue { + return executionMetricAttributes(r.ExecutionContext) +} + +func (r *WriteReportInvalidTransmissionState) LogAttributes() []attribute.KeyValue { + return append(append(writeReportRequestLogAttributes(r.Req), + attribute.String("summary", r.GetSummary()), + attribute.String("transmission_state", strconv.FormatUint(uint64(r.GetTransmissionState()), 10)), + attribute.Bool("invalid_receiver", r.GetInvalidReceiver()), + attribute.Bool("success", r.GetSuccess()), + attribute.String("transmission_id", r.GetTransmissionId()), + attribute.String("transmitter", r.GetTransmitter()), + ), r.ExecutionContext.LogAttributes()...) +} + +func (r *WriteReportInvalidTransmissionState) MetricAttributes() []attribute.KeyValue { + return executionMetricAttributes(r.ExecutionContext) +} + +func (r *WriteReportSuccessfulEarlyReturn) LogAttributes() []attribute.KeyValue { + return r.ExecutionContext.LogAttributes() +} + +func (r *WriteReportSuccessfulEarlyReturn) MetricAttributes() []attribute.KeyValue { + return executionMetricAttributes(r.ExecutionContext) +} + +func (r *WriteReportDuplicateTx) LogAttributes() []attribute.KeyValue { + attrs := writeReportRequestLogAttributes(r.Req) + if r.GetDuplicateTxHash() != "" { + attrs = append(attrs, attribute.String("duplicate_tx_hash", r.GetDuplicateTxHash())) + } + if r.GetCanonicalTxHash() != "" { + attrs = append(attrs, attribute.String("canonical_tx_hash", r.GetCanonicalTxHash())) + } + return append(attrs, r.ExecutionContext.LogAttributes()...) +} + +func (r *WriteReportDuplicateTx) MetricAttributes() []attribute.KeyValue { + return executionMetricAttributes(r.ExecutionContext) +} + +func (r *WriteReportTxHashRetrievalPhase) MetricAttributes() []attribute.KeyValue { + return append([]attribute.KeyValue{ + attribute.String("phase", r.GetPhase()), + attribute.String("result", r.GetResult()), + attribute.String("lookup_type", r.GetLookupType()), + }, r.ExecutionContext.MetricsAttributes()...) +} + +func (r *WriteReportInvokeOnReportDuration) MetricAttributes() []attribute.KeyValue { + return append([]attribute.KeyValue{ + attribute.String("tx_status", strconv.FormatInt(int64(r.GetTxStatus()), 10)), + }, r.ExecutionContext.MetricsAttributes()...) } diff --git a/chain_capabilities/stellar/monitoring/messages_test.go b/chain_capabilities/stellar/monitoring/messages_test.go new file mode 100644 index 000000000..044e6483e --- /dev/null +++ b/chain_capabilities/stellar/monitoring/messages_test.go @@ -0,0 +1,188 @@ +package monitoring_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + caperrors "github.com/smartcontractkit/chainlink-common/pkg/capabilities/errors" + stellarcap "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/stellar" + "github.com/smartcontractkit/chainlink-common/pkg/types" + stellartypes "github.com/smartcontractkit/chainlink-common/pkg/types/chains/stellar" + workflowpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" + + "github.com/smartcontractkit/capabilities/chain_capabilities/stellar/monitoring" +) + +func newMessageBuilder() *monitoring.MessageBuilder { + return monitoring.NewMessageBuilder( + types.ChainInfo{ + FamilyName: "stellar", + ChainID: "testnet", + NetworkName: "testnet", + NetworkNameFull: "Stellar Testnet", + }, + capabilities.CapabilityInfo{ + ID: "stellar:write-report@1.0.0", + CapabilityType: capabilities.CapabilityTypeAction, + }, + "node-1", + ) +} + +func newTelemetryContext() monitoring.TelemetryContext { + return monitoring.TelemetryContext{ + TsStart: 1234, + RequestMetadata: capabilities.RequestMetadata{ + WorkflowID: "workflow-id", + WorkflowOwner: "workflow-owner", + WorkflowExecutionID: "workflow-execution-id", + WorkflowName: "stellar workflow", + WorkflowDonID: 7, + ReferenceID: "step-1", + }, + } +} + +func attrsToMap(attrs []attribute.KeyValue) map[string]any { + out := make(map[string]any, len(attrs)) + for _, attr := range attrs { + if attr.Valid() { + out[string(attr.Key)] = attr.Value.AsInterface() + } + } + return out +} + +func TestMessageBuilder_ReadContractMessages(t *testing.T) { + builder := newMessageBuilder() + tc := newTelemetryContext() + req := stellartypes.SimulateTransactionRequest{ + ContractID: "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC", + Function: "get", + SourceAccount: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", + } + + initiated := builder.BuildReadContractInitiated(tc, req) + require.NotNil(t, initiated.Req) + assert.Equal(t, req.ContractID, initiated.Req.GetContractId()) + assert.EqualValues(t, 7, attrsToMap(initiated.MetricAttributes())["workflow_don_id"]) + + success := builder.BuildReadContractSuccess(tc, req, 12, 100) + successAttrs := attrsToMap(success.LogAttributes()) + assert.Equal(t, "12", successAttrs["result_len"]) + assert.Equal(t, "100", successAttrs["ledger_sequence"]) + + readErr := builder.BuildReadContractError(tc, req, "summary", caperrors.NewPublicUserError(assert.AnError, caperrors.InvalidArgument)) + errAttrs := attrsToMap(readErr.LogAttributes()) + assert.Equal(t, "summary", errAttrs["summary"]) + assert.Equal(t, true, errAttrs["isUserError"]) + assert.Equal(t, req.ContractID, errAttrs["contract_id"]) + assert.EqualValues(t, 7, attrsToMap(readErr.MetricAttributes())["workflow_don_id"]) + + readSystemErr := builder.BuildReadContractError(tc, req, "system failure", caperrors.NewPublicSystemError(assert.AnError, caperrors.Unknown)) + systemErrAttrs := attrsToMap(readSystemErr.LogAttributes()) + assert.Equal(t, false, systemErrAttrs["isUserError"]) + + nilRead := &monitoring.ReadContractInitiated{ + ExecutionContext: newMessageBuilder().BuildExecutionContext(newTelemetryContext()), + } + assert.Equal(t, "nil request", attrsToMap(nilRead.LogAttributes())["contract_id"]) + + reqNoSource := stellartypes.SimulateTransactionRequest{ + ContractID: req.ContractID, + Function: req.Function, + } + noSourceAttrs := attrsToMap(builder.BuildReadContractInitiated(tc, reqNoSource).LogAttributes()) + _, hasSource := noSourceAttrs["source_account"] + assert.False(t, hasSource) +} + +func TestMessageBuilder_WriteReportMessages(t *testing.T) { + builder := newMessageBuilder() + tc := newTelemetryContext() + req := &stellarcap.WriteReportRequest{ + ContractId: "CA7QYNF7SOWQ3GLR2BGMZEHXAVIRZA4KVWLTJJFC7MGXUA74P7UJUWDA", + Report: &workflowpb.ReportResponse{ + RawReport: []byte{1, 2, 3, 4}, + Sigs: []*workflowpb.AttributedSignature{{}, {}}, + }, + } + + initiated := builder.BuildWriteReportInitiated(tc, req) + require.NotNil(t, initiated.Req) + assert.Equal(t, req.ContractId, initiated.Req.GetContractId()) + assert.Equal(t, uint64(4), initiated.Req.GetReportSize()) + assert.Equal(t, uint32(2), initiated.Req.GetSigsCount()) + + initiatedAttrs := attrsToMap(initiated.LogAttributes()) + assert.Equal(t, req.ContractId, initiatedAttrs["contract_id"]) + assert.Equal(t, "4", initiatedAttrs["report_size"]) + assert.Equal(t, "2", initiatedAttrs["sigs_count"]) + assert.EqualValues(t, 7, attrsToMap(initiated.MetricAttributes())["workflow_don_id"]) + + success := builder.BuildWriteReportSuccess(tc, req) + assert.Equal(t, req.ContractId, attrsToMap(success.LogAttributes())["contract_id"]) + + writeErr := builder.BuildWriteReportError(tc, req, "summary", caperrors.NewPublicUserError(assert.AnError, caperrors.InvalidArgument)) + errAttrs := attrsToMap(writeErr.LogAttributes()) + assert.Equal(t, "summary", errAttrs["summary"]) + assert.Equal(t, true, errAttrs["isUserError"]) + + writeSystemErr := builder.BuildWriteReportError(tc, req, "execute failed", caperrors.NewPublicSystemError(assert.AnError, caperrors.Unknown)) + assert.Equal(t, false, attrsToMap(writeSystemErr.LogAttributes())["isUserError"]) + assert.EqualValues(t, 7, attrsToMap(writeSystemErr.MetricAttributes())["workflow_don_id"]) + + txInfoErr := builder.BuildWriteReportTxInfoRetrievalError(tc, req, "abc123", "rpc down") + txInfoAttrs := attrsToMap(txInfoErr.LogAttributes()) + assert.Equal(t, "abc123", txInfoAttrs["tx_hash"]) + assert.Equal(t, "Failed to retrieve transaction info for tx: abc123", txInfoAttrs["summary"]) + + txInfoNoHash := builder.BuildWriteReportTxInfoRetrievalError(tc, req, "", "rpc down") + assert.Equal(t, "Failed to retrieve transaction info", attrsToMap(txInfoNoHash.LogAttributes())["summary"]) + + duplicateTx := builder.BuildWriteReportDuplicateTx(tc, req, "local-hash", "canonical-hash") + dupAttrs := attrsToMap(duplicateTx.LogAttributes()) + assert.Equal(t, "local-hash", dupAttrs["duplicate_tx_hash"]) + assert.Equal(t, "canonical-hash", dupAttrs["canonical_tx_hash"]) + assert.Equal(t, req.ContractId, dupAttrs["contract_id"]) + + dupOnlyLocal := builder.BuildWriteReportDuplicateTx(tc, req, "local-hash", "") + dupOnlyLocalAttrs := attrsToMap(dupOnlyLocal.LogAttributes()) + assert.Equal(t, "local-hash", dupOnlyLocalAttrs["duplicate_tx_hash"]) + _, hasCanonical := dupOnlyLocalAttrs["canonical_tx_hash"] + assert.False(t, hasCanonical) + + earlyReturn := builder.BuildWriteReportSuccessfulEarlyReturn(tc) + earlyReturnAttrs := attrsToMap(earlyReturn.LogAttributes()) + assert.Equal(t, "node-1", earlyReturnAttrs["source_id"]) + assert.NotNil(t, earlyReturn.ExecutionContext) + + invalidState := builder.BuildWriteReportInvalidTransmissionState( + tc, req, 3, true, false, "receiver:report:exec", "transmitter", "summary", "cause", + ) + invalidAttrs := attrsToMap(invalidState.LogAttributes()) + assert.Equal(t, "3", invalidAttrs["transmission_state"]) + assert.Equal(t, true, invalidAttrs["invalid_receiver"]) + assert.Equal(t, false, invalidAttrs["success"]) + assert.Equal(t, "transmitter", invalidAttrs["transmitter"]) + assert.EqualValues(t, 7, attrsToMap(invalidState.MetricAttributes())["workflow_don_id"]) + + txHashPhase := builder.BuildWriteReportTxHashRetrievalPhase(tc, "EventPoll", "Found", 123, "hash", "SuccessfulTransmission") + phaseAttrs := attrsToMap(txHashPhase.MetricAttributes()) + assert.Equal(t, "EventPoll", phaseAttrs["phase"]) + assert.Equal(t, "Found", phaseAttrs["result"]) + assert.Equal(t, "SuccessfulTransmission", phaseAttrs["lookup_type"]) + + invokeDuration := builder.BuildWriteReportInvokeOnReportDuration(tc, 456, 1) + invokeAttrs := attrsToMap(invokeDuration.MetricAttributes()) + assert.Equal(t, "1", invokeAttrs["tx_status"]) + assert.EqualValues(t, 7, invokeAttrs["workflow_don_id"]) + + nilInitiated := builder.BuildWriteReportInitiated(tc, nil) + nilAttrs := attrsToMap(nilInitiated.LogAttributes()) + assert.Equal(t, "nil request", nilAttrs["contract_id"]) +} diff --git a/chain_capabilities/stellar/monitoring/metrics.go b/chain_capabilities/stellar/monitoring/metrics.go index f331db657..69a10f0ac 100644 --- a/chain_capabilities/stellar/monitoring/metrics.go +++ b/chain_capabilities/stellar/monitoring/metrics.go @@ -4,21 +4,68 @@ import ( "context" "fmt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "google.golang.org/protobuf/proto" + commonbeholder "github.com/smartcontractkit/chainlink-common/pkg/beholder" - commoncapbeholder "github.com/smartcontractkit/capabilities/libs/monitoring" + capmonitoring "github.com/smartcontractkit/capabilities/libs/monitoring" ) func ns(name string) string { return fmt.Sprintf("stellar_capability_%s", name) } -// Metrics holds the per-method instruments for Stellar consensus reads. Each MetricsCapBasic -// records both a count and the request latency (emit - start) as a histogram. +type basicCapEmitMessage interface { + GetExecutionContext() *capmonitoring.ExecutionContext + MetricAttributes() []attribute.KeyValue +} + +func newBasicCapMetric(metricName string, msg proto.Message) (capmonitoring.MetricsCapBasic, error) { + info := capmonitoring.NewMetricsInfoCapBasic(ns(metricName), commonbeholder.ToSchemaFullName(msg)) + basic, err := capmonitoring.NewMetricsCapBasic(info) + if err != nil { + return capmonitoring.MetricsCapBasic{}, fmt.Errorf("failed to create %s metric: %w", metricName, err) + } + return basic, nil +} + +func recordBasicCapEmit(ctx context.Context, basic capmonitoring.MetricsCapBasic, msg basicCapEmitMessage) { + ec := msg.GetExecutionContext() + basic.RecordEmit(ctx, ec.GetMetaCapabilityTimestampStart(), ec.GetMetaCapabilityTimestampEmit(), msg.MetricAttributes()...) +} + +// Metrics holds the per-method instruments for Stellar capability operations. type Metrics struct { ReadContractSuccess struct { - basic commoncapbeholder.MetricsCapBasic + basic capmonitoring.MetricsCapBasic } ReadContractError struct { - basic commoncapbeholder.MetricsCapBasic + basic capmonitoring.MetricsCapBasic + } + WriteReportSuccess struct { + basic capmonitoring.MetricsCapBasic + } + WriteReportError struct { + basic capmonitoring.MetricsCapBasic + } + WriteReportDuplicateTx struct { + basic capmonitoring.MetricsCapBasic + } + WriteReportTxInfoRetrievalError struct { + basic capmonitoring.MetricsCapBasic + } + WriteReportSuccessfulEarlyReturn struct { + basic capmonitoring.MetricsCapBasic + } + WriteReportInvalidTransmissionState struct { + basic capmonitoring.MetricsCapBasic + } + WriteReportTxHashRetrievalPhase struct { + count metric.Int64Counter + phaseDuration metric.Int64Histogram + } + WriteReportInvokeOnReportDuration struct { + duration metric.Int64Histogram } } @@ -27,29 +74,112 @@ func NewMetrics() (Metrics, error) { m := Metrics{} var err error - readSuccess := commoncapbeholder.NewMetricsInfoCapBasic(ns("read_contract_success"), commonbeholder.ToSchemaFullName(&ReadContractSuccess{})) - m.ReadContractSuccess.basic, err = commoncapbeholder.NewMetricsCapBasic(readSuccess) - if err != nil { - return Metrics{}, fmt.Errorf("failed to create read contract success metric: %w", err) + if m.ReadContractSuccess.basic, err = newBasicCapMetric("read_contract_success", &ReadContractSuccess{}); err != nil { + return Metrics{}, err + } + if m.ReadContractError.basic, err = newBasicCapMetric("read_contract_error", &ReadContractError{}); err != nil { + return Metrics{}, err + } + if m.WriteReportSuccess.basic, err = newBasicCapMetric("write_report_success", &WriteReportSuccess{}); err != nil { + return Metrics{}, err + } + if m.WriteReportError.basic, err = newBasicCapMetric("write_report_error", &WriteReportError{}); err != nil { + return Metrics{}, err + } + if m.WriteReportDuplicateTx.basic, err = newBasicCapMetric("write_report_duplicate_tx", &WriteReportDuplicateTx{}); err != nil { + return Metrics{}, err + } + if m.WriteReportTxInfoRetrievalError.basic, err = newBasicCapMetric("write_report_tx_info_retrieval_error", &WriteReportTxInfoRetrievalError{}); err != nil { + return Metrics{}, err + } + if m.WriteReportSuccessfulEarlyReturn.basic, err = newBasicCapMetric("write_report_successful_early_return", &WriteReportSuccessfulEarlyReturn{}); err != nil { + return Metrics{}, err + } + if m.WriteReportInvalidTransmissionState.basic, err = newBasicCapMetric("write_report_invalid_transmission_state", &WriteReportInvalidTransmissionState{}); err != nil { + return Metrics{}, err } - readErr := commoncapbeholder.NewMetricsInfoCapBasic(ns("read_contract_error"), commonbeholder.ToSchemaFullName(&ReadContractError{})) - m.ReadContractError.basic, err = commoncapbeholder.NewMetricsCapBasic(readErr) + meter := commonbeholder.GetMeter() + txHashPhaseCount := commonbeholder.MetricInfo{ + Name: ns("write_report_tx_hash_retrieval_phase_count"), + Unit: "", + Description: "The count of Stellar WriteReport tx hash retrieval phases by lookup type, phase, and result", + } + m.WriteReportTxHashRetrievalPhase.count, err = txHashPhaseCount.NewInt64Counter(meter) + if err != nil { + return Metrics{}, fmt.Errorf("failed to create write report tx hash retrieval phase count metric: %w", err) + } + txHashPhaseDuration := commonbeholder.MetricInfo{ + Name: ns("write_report_tx_hash_retrieval_phase_duration_ms"), + Unit: "ms", + Description: "The duration of Stellar WriteReport tx hash retrieval phases by lookup type, phase, and result", + } + m.WriteReportTxHashRetrievalPhase.phaseDuration, err = txHashPhaseDuration.NewInt64Histogram(meter) + if err != nil { + return Metrics{}, fmt.Errorf("failed to create write report tx hash retrieval phase duration metric: %w", err) + } + invokeOnReportDuration := commonbeholder.MetricInfo{ + Name: ns("write_report_invoke_on_report_duration_ms"), + Unit: "ms", + Description: "The duration of Stellar WriteReport InvokeOnReport calls by tx status", + } + m.WriteReportInvokeOnReportDuration.duration, err = invokeOnReportDuration.NewInt64Histogram(meter) if err != nil { - return Metrics{}, fmt.Errorf("failed to create read contract error metric: %w", err) + return Metrics{}, fmt.Errorf("failed to create write report invoke on report duration metric: %w", err) } return m, nil } func (m *Metrics) OnReadContractSuccess(ctx context.Context, msg *ReadContractSuccess) error { - start, emit := msg.ExecutionContext.MetaCapabilityTimestampStart, msg.ExecutionContext.MetaCapabilityTimestampEmit - m.ReadContractSuccess.basic.RecordEmit(ctx, start, emit, msg.MetricAttributes()...) + recordBasicCapEmit(ctx, m.ReadContractSuccess.basic, msg) return nil } func (m *Metrics) OnReadContractError(ctx context.Context, msg *ReadContractError) error { - start, emit := msg.ExecutionContext.MetaCapabilityTimestampStart, msg.ExecutionContext.MetaCapabilityTimestampEmit - m.ReadContractError.basic.RecordEmit(ctx, start, emit, msg.MetricAttributes()...) + recordBasicCapEmit(ctx, m.ReadContractError.basic, msg) + return nil +} + +func (m *Metrics) OnWriteReportSuccess(ctx context.Context, msg *WriteReportSuccess) error { + recordBasicCapEmit(ctx, m.WriteReportSuccess.basic, msg) + return nil +} + +func (m *Metrics) OnWriteReportError(ctx context.Context, msg *WriteReportError) error { + recordBasicCapEmit(ctx, m.WriteReportError.basic, msg) + return nil +} + +func (m *Metrics) OnWriteReportDuplicateTx(ctx context.Context, msg *WriteReportDuplicateTx) error { + recordBasicCapEmit(ctx, m.WriteReportDuplicateTx.basic, msg) + return nil +} + +func (m *Metrics) OnWriteReportTxInfoRetrievalError(ctx context.Context, msg *WriteReportTxInfoRetrievalError) error { + recordBasicCapEmit(ctx, m.WriteReportTxInfoRetrievalError.basic, msg) + return nil +} + +func (m *Metrics) OnWriteReportSuccessfulEarlyReturn(ctx context.Context, msg *WriteReportSuccessfulEarlyReturn) error { + recordBasicCapEmit(ctx, m.WriteReportSuccessfulEarlyReturn.basic, msg) + return nil +} + +func (m *Metrics) OnWriteReportInvalidTransmissionState(ctx context.Context, msg *WriteReportInvalidTransmissionState) error { + recordBasicCapEmit(ctx, m.WriteReportInvalidTransmissionState.basic, msg) + return nil +} + +func (m *Metrics) OnWriteReportTxHashRetrievalPhase(ctx context.Context, msg *WriteReportTxHashRetrievalPhase) error { + attrs := metric.WithAttributes(msg.MetricAttributes()...) + m.WriteReportTxHashRetrievalPhase.count.Add(ctx, 1, attrs) + m.WriteReportTxHashRetrievalPhase.phaseDuration.Record(ctx, msg.GetPhaseDurationMs(), attrs) + return nil +} + +func (m *Metrics) OnWriteReportInvokeOnReportDuration(ctx context.Context, msg *WriteReportInvokeOnReportDuration) error { + attrs := metric.WithAttributes(msg.MetricAttributes()...) + m.WriteReportInvokeOnReportDuration.duration.Record(ctx, msg.GetDurationMs(), attrs) return nil } diff --git a/chain_capabilities/stellar/monitoring/metrics_test.go b/chain_capabilities/stellar/monitoring/metrics_test.go new file mode 100644 index 000000000..055d29969 --- /dev/null +++ b/chain_capabilities/stellar/monitoring/metrics_test.go @@ -0,0 +1,87 @@ +package monitoring_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + capmonitoring "github.com/smartcontractkit/capabilities/libs/monitoring" + + "github.com/smartcontractkit/capabilities/chain_capabilities/stellar/monitoring" +) + +func testExecutionContext(t *testing.T) *capmonitoring.ExecutionContext { + t.Helper() + ec := newMessageBuilder().BuildExecutionContext(newTelemetryContext()) + ec.MetaCapabilityTimestampStart = 100 + ec.MetaCapabilityTimestampEmit = 250 + return ec +} + +func TestMetrics_OnBasicCapHandlers(t *testing.T) { + t.Parallel() + ctx := t.Context() + metrics, err := monitoring.NewMetrics() + require.NoError(t, err) + + ec := testExecutionContext(t) + readReq := &monitoring.ReadContractRequest{ + ContractId: "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC", + Function: "get", + ArgsCount: 2, + SourceAccount: "GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN7", + } + writeReq := &monitoring.WriteReportRequest{ + ContractId: "CA7QYNF7SOWQ3GLR2BGMZEHXAVIRZA4KVWLTJJFC7MGXUA74P7UJUWDA", + ReportSize: 4, + SigsCount: 2, + } + + require.NoError(t, metrics.OnReadContractSuccess(ctx, &monitoring.ReadContractSuccess{ + Req: readReq, ResultLen: 12, LedgerSequence: 100, ExecutionContext: ec, + })) + require.NoError(t, metrics.OnReadContractError(ctx, &monitoring.ReadContractError{ + Req: readReq, Summary: "summary", IsUserError: false, ExecutionContext: ec, + })) + require.NoError(t, metrics.OnWriteReportSuccess(ctx, &monitoring.WriteReportSuccess{ + Req: writeReq, ExecutionContext: ec, + })) + require.NoError(t, metrics.OnWriteReportError(ctx, &monitoring.WriteReportError{ + Req: writeReq, Summary: "summary", IsUserError: false, ExecutionContext: ec, + })) + require.NoError(t, metrics.OnWriteReportDuplicateTx(ctx, &monitoring.WriteReportDuplicateTx{ + Req: writeReq, DuplicateTxHash: "dup", CanonicalTxHash: "canonical", ExecutionContext: ec, + })) + require.NoError(t, metrics.OnWriteReportTxInfoRetrievalError(ctx, &monitoring.WriteReportTxInfoRetrievalError{ + Req: writeReq, Summary: "summary", TxHash: "hash", ExecutionContext: ec, + })) + require.NoError(t, metrics.OnWriteReportSuccessfulEarlyReturn(ctx, &monitoring.WriteReportSuccessfulEarlyReturn{ + ExecutionContext: ec, + })) + require.NoError(t, metrics.OnWriteReportInvalidTransmissionState(ctx, &monitoring.WriteReportInvalidTransmissionState{ + Req: writeReq, Summary: "summary", TransmissionState: 3, InvalidReceiver: true, + TransmissionId: "receiver:report:exec", Transmitter: "transmitter", ExecutionContext: ec, + })) +} + +func TestMetrics_OnHistogramHandlers(t *testing.T) { + t.Parallel() + ctx := t.Context() + metrics, err := monitoring.NewMetrics() + require.NoError(t, err) + ec := testExecutionContext(t) + + require.NoError(t, metrics.OnWriteReportTxHashRetrievalPhase(ctx, &monitoring.WriteReportTxHashRetrievalPhase{ + ExecutionContext: ec, + Phase: "EventPoll", + Result: "Found", + PhaseDurationMs: 42, + TxHash: "hash", + LookupType: "SuccessfulTransmission", + })) + require.NoError(t, metrics.OnWriteReportInvokeOnReportDuration(ctx, &monitoring.WriteReportInvokeOnReportDuration{ + ExecutionContext: ec, + DurationMs: 99, + TxStatus: 1, + })) +} diff --git a/chain_capabilities/stellar/monitoring/monitoring_test.go b/chain_capabilities/stellar/monitoring/monitoring_test.go new file mode 100644 index 000000000..a372d7cca --- /dev/null +++ b/chain_capabilities/stellar/monitoring/monitoring_test.go @@ -0,0 +1,138 @@ +package monitoring_test + +import ( + "testing" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoreflect" + + capmonitoring "github.com/smartcontractkit/capabilities/libs/monitoring" + + "github.com/smartcontractkit/capabilities/chain_capabilities/stellar/monitoring" +) + +func newTestProcessor(t *testing.T) (monitoring.Metrics, beholder.ProtoProcessor) { + t.Helper() + + lggr := logger.Test(t) + metrics, err := monitoring.NewMetrics() + require.NoError(t, err) + return metrics, &monitoring.Processor{Lggr: lggr, Metrics: metrics} +} + +func TestProcessor_Process_InitiatedMessage(t *testing.T) { + ctx := t.Context() + ec := &capmonitoring.ExecutionContext{} + + initiatedMsgs := []struct { + name string + msg proto.Message + }{ + {"ReadContractInitiated", &monitoring.ReadContractInitiated{ExecutionContext: ec}}, + {"WriteReportInitiated", &monitoring.WriteReportInitiated{ExecutionContext: ec}}, + } + + for _, tc := range initiatedMsgs { + t.Run(tc.name, func(t *testing.T) { + _, p := newTestProcessor(t) + require.NoError(t, p.Process(ctx, tc.msg)) + }) + } +} + +func TestProcessor_Process_SuccessMessages(t *testing.T) { + ctx := t.Context() + ec := &capmonitoring.ExecutionContext{ + MetaCapabilityTimestampStart: 10, + MetaCapabilityTimestampEmit: 20, + } + readReq := &monitoring.ReadContractRequest{ContractId: "C123", Function: "get"} + writeReq := &monitoring.WriteReportRequest{ContractId: "C456", ReportSize: 1, SigsCount: 1} + + successMsgs := []struct { + name string + msg proto.Message + }{ + {"ReadContractSuccess", &monitoring.ReadContractSuccess{Req: readReq, ExecutionContext: ec, ResultLen: 1, LedgerSequence: 2}}, + {"WriteReportSuccess", &monitoring.WriteReportSuccess{Req: writeReq, ExecutionContext: ec}}, + {"WriteReportDuplicateTx", &monitoring.WriteReportDuplicateTx{Req: writeReq, ExecutionContext: ec, DuplicateTxHash: "a", CanonicalTxHash: "b"}}, + {"WriteReportSuccessfulEarlyReturn", &monitoring.WriteReportSuccessfulEarlyReturn{ExecutionContext: ec}}, + {"WriteReportTxHashRetrievalPhase", &monitoring.WriteReportTxHashRetrievalPhase{ExecutionContext: ec, LookupType: "SuccessfulTransmission", Phase: "EventPoll", Result: "Found", PhaseDurationMs: 5}}, + {"WriteReportInvokeOnReportDuration", &monitoring.WriteReportInvokeOnReportDuration{ExecutionContext: ec, DurationMs: 123, TxStatus: 2}}, + } + + for _, tc := range successMsgs { + t.Run(tc.name, func(t *testing.T) { + _, p := newTestProcessor(t) + require.NoError(t, p.Process(ctx, tc.msg)) + }) + } +} + +func TestProcessor_Process_ErrorMessages(t *testing.T) { + ctx := t.Context() + ec := &capmonitoring.ExecutionContext{ + MetaCapabilityTimestampStart: 10, + MetaCapabilityTimestampEmit: 20, + } + readReq := &monitoring.ReadContractRequest{ContractId: "C123", Function: "get"} + writeReq := &monitoring.WriteReportRequest{ContractId: "C456", ReportSize: 1, SigsCount: 1} + + errorMsgs := []struct { + name string + msg proto.Message + }{ + {"ReadContractError", &monitoring.ReadContractError{Req: readReq, ExecutionContext: ec, IsUserError: false, Summary: "fail"}}, + {"WriteReportError", &monitoring.WriteReportError{Req: writeReq, ExecutionContext: ec, IsUserError: false, Summary: "fail"}}, + {"WriteReportTxInfoRetrievalError", &monitoring.WriteReportTxInfoRetrievalError{Req: writeReq, ExecutionContext: ec, Summary: "fail", TxHash: "hash"}}, + {"WriteReportInvalidTransmissionState", &monitoring.WriteReportInvalidTransmissionState{Req: writeReq, ExecutionContext: ec, Summary: "fail"}}, + } + + for _, tc := range errorMsgs { + t.Run(tc.name, func(t *testing.T) { + _, p := newTestProcessor(t) + require.NoError(t, p.Process(ctx, tc.msg)) + }) + } +} + +func TestProcessor_Process_WriteReportError_UserError_SkipsMetrics(t *testing.T) { + _, p := newTestProcessor(t) + msg := &monitoring.WriteReportError{ + ExecutionContext: &capmonitoring.ExecutionContext{}, + IsUserError: true, + Summary: "user did something wrong", + Cause: "invalid input", + } + require.NoError(t, p.Process(t.Context(), msg)) +} + +func TestProcessor_Process_ReadContractError_UserError_SkipsMetrics(t *testing.T) { + _, p := newTestProcessor(t) + msg := &monitoring.ReadContractError{ + ExecutionContext: &capmonitoring.ExecutionContext{}, + IsUserError: true, + Summary: "user did something wrong", + Cause: "invalid input", + } + require.NoError(t, p.Process(t.Context(), msg)) +} + +type dummyProto struct{} + +func (d *dummyProto) ProtoReflect() protoreflect.Message { return nil } + +func TestProcessor_Process_UnknownMessage_Noop(t *testing.T) { + _, p := newTestProcessor(t) + require.NoError(t, p.Process(t.Context(), &dummyProto{})) +} + +func TestNewMetrics(t *testing.T) { + metrics, err := monitoring.NewMetrics() + require.NoError(t, err) + assert.NotNil(t, metrics) +} diff --git a/chain_capabilities/stellar/monitoring/processor.go b/chain_capabilities/stellar/monitoring/processor.go index 133769d4a..56174d4a5 100644 --- a/chain_capabilities/stellar/monitoring/processor.go +++ b/chain_capabilities/stellar/monitoring/processor.go @@ -21,8 +21,8 @@ type Processor struct { func (p *Processor) Process(ctx context.Context, m proto.Message, attrKVs ...any) error { switch msg := m.(type) { - case *ReadContractInitiated: - p.logMessage(msg) + case *ReadContractInitiated, *WriteReportInitiated: + p.logMessage(m) case *ReadContractSuccess: p.logMessage(msg) if err := p.Metrics.OnReadContractSuccess(ctx, msg); err != nil { @@ -30,13 +30,51 @@ func (p *Processor) Process(ctx context.Context, m proto.Message, attrKVs ...any } case *ReadContractError: p.logMessage(msg) - // User errors are caller mistakes, not capability/infra failures, so they are not - // counted in the error metric (mirrors EVM/Aptos). if !msg.GetIsUserError() { if err := p.Metrics.OnReadContractError(ctx, msg); err != nil { return fmt.Errorf("failed to publish ReadContractError metrics: %w", err) } } + case *WriteReportSuccess: + p.logMessage(msg) + if err := p.Metrics.OnWriteReportSuccess(ctx, msg); err != nil { + return fmt.Errorf("failed to publish WriteReportSuccess metrics: %w", err) + } + case *WriteReportError: + p.logMessage(msg) + if !msg.GetIsUserError() { + if err := p.Metrics.OnWriteReportError(ctx, msg); err != nil { + return fmt.Errorf("failed to publish WriteReportError metrics: %w", err) + } + } + case *WriteReportTxInfoRetrievalError: + p.logMessage(msg) + if err := p.Metrics.OnWriteReportTxInfoRetrievalError(ctx, msg); err != nil { + return fmt.Errorf("failed to publish WriteReportTxInfoRetrievalError metrics: %w", err) + } + case *WriteReportDuplicateTx: + p.logMessage(msg) + if err := p.Metrics.OnWriteReportDuplicateTx(ctx, msg); err != nil { + return fmt.Errorf("failed to publish WriteReportDuplicateTx metrics: %w", err) + } + case *WriteReportSuccessfulEarlyReturn: + p.logMessage(msg) + if err := p.Metrics.OnWriteReportSuccessfulEarlyReturn(ctx, msg); err != nil { + return fmt.Errorf("failed to publish WriteReportSuccessfulEarlyReturn metrics: %w", err) + } + case *WriteReportInvalidTransmissionState: + p.logMessage(msg) + if err := p.Metrics.OnWriteReportInvalidTransmissionState(ctx, msg); err != nil { + return fmt.Errorf("failed to publish WriteReportInvalidTransmissionState metrics: %w", err) + } + case *WriteReportTxHashRetrievalPhase: + if err := p.Metrics.OnWriteReportTxHashRetrievalPhase(ctx, msg); err != nil { + return fmt.Errorf("failed to publish WriteReportTxHashRetrievalPhase metrics: %w", err) + } + case *WriteReportInvokeOnReportDuration: + if err := p.Metrics.OnWriteReportInvokeOnReportDuration(ctx, msg); err != nil { + return fmt.Errorf("failed to publish WriteReportInvokeOnReportDuration metrics: %w", err) + } default: return nil } diff --git a/chain_capabilities/stellar/monitoring/write_report.pb.go b/chain_capabilities/stellar/monitoring/write_report.pb.go new file mode 100644 index 000000000..ffe3be9bc --- /dev/null +++ b/chain_capabilities/stellar/monitoring/write_report.pb.go @@ -0,0 +1,839 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.10 +// protoc v5.29.3 +// source: chain_capabilities/stellar/monitoring/write_report.proto + +package monitoring + +import ( + monitoring "github.com/smartcontractkit/capabilities/libs/monitoring" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// WriteReportRequest intentionally captures only the non-sensitive subset of the +// capability request that is useful for monitoring (no raw report bytes or signatures). +type WriteReportRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + ContractId string `protobuf:"bytes,1,opt,name=contract_id,json=contractId,proto3" json:"contract_id,omitempty"` + ReportSize uint64 `protobuf:"varint,2,opt,name=report_size,json=reportSize,proto3" json:"report_size,omitempty"` + SigsCount uint32 `protobuf:"varint,3,opt,name=sigs_count,json=sigsCount,proto3" json:"sigs_count,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WriteReportRequest) Reset() { + *x = WriteReportRequest{} + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WriteReportRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteReportRequest) ProtoMessage() {} + +func (x *WriteReportRequest) ProtoReflect() protoreflect.Message { + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteReportRequest.ProtoReflect.Descriptor instead. +func (*WriteReportRequest) Descriptor() ([]byte, []int) { + return file_chain_capabilities_stellar_monitoring_write_report_proto_rawDescGZIP(), []int{0} +} + +func (x *WriteReportRequest) GetContractId() string { + if x != nil { + return x.ContractId + } + return "" +} + +func (x *WriteReportRequest) GetReportSize() uint64 { + if x != nil { + return x.ReportSize + } + return 0 +} + +func (x *WriteReportRequest) GetSigsCount() uint32 { + if x != nil { + return x.SigsCount + } + return 0 +} + +type WriteReportInitiated struct { + state protoimpl.MessageState `protogen:"open.v1"` + Req *WriteReportRequest `protobuf:"bytes,1,opt,name=req,proto3" json:"req,omitempty"` + ExecutionContext *monitoring.ExecutionContext `protobuf:"bytes,20,opt,name=execution_context,json=executionContext,proto3" json:"execution_context,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WriteReportInitiated) Reset() { + *x = WriteReportInitiated{} + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WriteReportInitiated) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteReportInitiated) ProtoMessage() {} + +func (x *WriteReportInitiated) ProtoReflect() protoreflect.Message { + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteReportInitiated.ProtoReflect.Descriptor instead. +func (*WriteReportInitiated) Descriptor() ([]byte, []int) { + return file_chain_capabilities_stellar_monitoring_write_report_proto_rawDescGZIP(), []int{1} +} + +func (x *WriteReportInitiated) GetReq() *WriteReportRequest { + if x != nil { + return x.Req + } + return nil +} + +func (x *WriteReportInitiated) GetExecutionContext() *monitoring.ExecutionContext { + if x != nil { + return x.ExecutionContext + } + return nil +} + +type WriteReportSuccess struct { + state protoimpl.MessageState `protogen:"open.v1"` + Req *WriteReportRequest `protobuf:"bytes,1,opt,name=req,proto3" json:"req,omitempty"` + ExecutionContext *monitoring.ExecutionContext `protobuf:"bytes,20,opt,name=execution_context,json=executionContext,proto3" json:"execution_context,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WriteReportSuccess) Reset() { + *x = WriteReportSuccess{} + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WriteReportSuccess) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteReportSuccess) ProtoMessage() {} + +func (x *WriteReportSuccess) ProtoReflect() protoreflect.Message { + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteReportSuccess.ProtoReflect.Descriptor instead. +func (*WriteReportSuccess) Descriptor() ([]byte, []int) { + return file_chain_capabilities_stellar_monitoring_write_report_proto_rawDescGZIP(), []int{2} +} + +func (x *WriteReportSuccess) GetReq() *WriteReportRequest { + if x != nil { + return x.Req + } + return nil +} + +func (x *WriteReportSuccess) GetExecutionContext() *monitoring.ExecutionContext { + if x != nil { + return x.ExecutionContext + } + return nil +} + +type WriteReportError struct { + state protoimpl.MessageState `protogen:"open.v1"` + Req *WriteReportRequest `protobuf:"bytes,1,opt,name=req,proto3" json:"req,omitempty"` + Summary string `protobuf:"bytes,2,opt,name=summary,proto3" json:"summary,omitempty"` + Cause string `protobuf:"bytes,3,opt,name=cause,proto3" json:"cause,omitempty"` + IsUserError bool `protobuf:"varint,4,opt,name=is_user_error,json=isUserError,proto3" json:"is_user_error,omitempty"` + ExecutionContext *monitoring.ExecutionContext `protobuf:"bytes,20,opt,name=execution_context,json=executionContext,proto3" json:"execution_context,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WriteReportError) Reset() { + *x = WriteReportError{} + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WriteReportError) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteReportError) ProtoMessage() {} + +func (x *WriteReportError) ProtoReflect() protoreflect.Message { + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteReportError.ProtoReflect.Descriptor instead. +func (*WriteReportError) Descriptor() ([]byte, []int) { + return file_chain_capabilities_stellar_monitoring_write_report_proto_rawDescGZIP(), []int{3} +} + +func (x *WriteReportError) GetReq() *WriteReportRequest { + if x != nil { + return x.Req + } + return nil +} + +func (x *WriteReportError) GetSummary() string { + if x != nil { + return x.Summary + } + return "" +} + +func (x *WriteReportError) GetCause() string { + if x != nil { + return x.Cause + } + return "" +} + +func (x *WriteReportError) GetIsUserError() bool { + if x != nil { + return x.IsUserError + } + return false +} + +func (x *WriteReportError) GetExecutionContext() *monitoring.ExecutionContext { + if x != nil { + return x.ExecutionContext + } + return nil +} + +type WriteReportTxInfoRetrievalError struct { + state protoimpl.MessageState `protogen:"open.v1"` + Req *WriteReportRequest `protobuf:"bytes,1,opt,name=req,proto3" json:"req,omitempty"` + Summary string `protobuf:"bytes,2,opt,name=summary,proto3" json:"summary,omitempty"` + Cause string `protobuf:"bytes,3,opt,name=cause,proto3" json:"cause,omitempty"` + TxHash string `protobuf:"bytes,4,opt,name=tx_hash,json=txHash,proto3" json:"tx_hash,omitempty"` + ExecutionContext *monitoring.ExecutionContext `protobuf:"bytes,20,opt,name=execution_context,json=executionContext,proto3" json:"execution_context,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WriteReportTxInfoRetrievalError) Reset() { + *x = WriteReportTxInfoRetrievalError{} + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WriteReportTxInfoRetrievalError) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteReportTxInfoRetrievalError) ProtoMessage() {} + +func (x *WriteReportTxInfoRetrievalError) ProtoReflect() protoreflect.Message { + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteReportTxInfoRetrievalError.ProtoReflect.Descriptor instead. +func (*WriteReportTxInfoRetrievalError) Descriptor() ([]byte, []int) { + return file_chain_capabilities_stellar_monitoring_write_report_proto_rawDescGZIP(), []int{4} +} + +func (x *WriteReportTxInfoRetrievalError) GetReq() *WriteReportRequest { + if x != nil { + return x.Req + } + return nil +} + +func (x *WriteReportTxInfoRetrievalError) GetSummary() string { + if x != nil { + return x.Summary + } + return "" +} + +func (x *WriteReportTxInfoRetrievalError) GetCause() string { + if x != nil { + return x.Cause + } + return "" +} + +func (x *WriteReportTxInfoRetrievalError) GetTxHash() string { + if x != nil { + return x.TxHash + } + return "" +} + +func (x *WriteReportTxInfoRetrievalError) GetExecutionContext() *monitoring.ExecutionContext { + if x != nil { + return x.ExecutionContext + } + return nil +} + +type WriteReportInvalidTransmissionState struct { + state protoimpl.MessageState `protogen:"open.v1"` + Req *WriteReportRequest `protobuf:"bytes,1,opt,name=req,proto3" json:"req,omitempty"` + Summary string `protobuf:"bytes,2,opt,name=summary,proto3" json:"summary,omitempty"` + Cause string `protobuf:"bytes,3,opt,name=cause,proto3" json:"cause,omitempty"` + TransmissionState uint32 `protobuf:"varint,4,opt,name=transmission_state,json=transmissionState,proto3" json:"transmission_state,omitempty"` + InvalidReceiver bool `protobuf:"varint,5,opt,name=invalid_receiver,json=invalidReceiver,proto3" json:"invalid_receiver,omitempty"` + Success bool `protobuf:"varint,6,opt,name=success,proto3" json:"success,omitempty"` + TransmissionId string `protobuf:"bytes,7,opt,name=transmission_id,json=transmissionId,proto3" json:"transmission_id,omitempty"` + Transmitter string `protobuf:"bytes,8,opt,name=transmitter,proto3" json:"transmitter,omitempty"` + ExecutionContext *monitoring.ExecutionContext `protobuf:"bytes,20,opt,name=execution_context,json=executionContext,proto3" json:"execution_context,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WriteReportInvalidTransmissionState) Reset() { + *x = WriteReportInvalidTransmissionState{} + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WriteReportInvalidTransmissionState) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteReportInvalidTransmissionState) ProtoMessage() {} + +func (x *WriteReportInvalidTransmissionState) ProtoReflect() protoreflect.Message { + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteReportInvalidTransmissionState.ProtoReflect.Descriptor instead. +func (*WriteReportInvalidTransmissionState) Descriptor() ([]byte, []int) { + return file_chain_capabilities_stellar_monitoring_write_report_proto_rawDescGZIP(), []int{5} +} + +func (x *WriteReportInvalidTransmissionState) GetReq() *WriteReportRequest { + if x != nil { + return x.Req + } + return nil +} + +func (x *WriteReportInvalidTransmissionState) GetSummary() string { + if x != nil { + return x.Summary + } + return "" +} + +func (x *WriteReportInvalidTransmissionState) GetCause() string { + if x != nil { + return x.Cause + } + return "" +} + +func (x *WriteReportInvalidTransmissionState) GetTransmissionState() uint32 { + if x != nil { + return x.TransmissionState + } + return 0 +} + +func (x *WriteReportInvalidTransmissionState) GetInvalidReceiver() bool { + if x != nil { + return x.InvalidReceiver + } + return false +} + +func (x *WriteReportInvalidTransmissionState) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +func (x *WriteReportInvalidTransmissionState) GetTransmissionId() string { + if x != nil { + return x.TransmissionId + } + return "" +} + +func (x *WriteReportInvalidTransmissionState) GetTransmitter() string { + if x != nil { + return x.Transmitter + } + return "" +} + +func (x *WriteReportInvalidTransmissionState) GetExecutionContext() *monitoring.ExecutionContext { + if x != nil { + return x.ExecutionContext + } + return nil +} + +type WriteReportDuplicateTx struct { + state protoimpl.MessageState `protogen:"open.v1"` + Req *WriteReportRequest `protobuf:"bytes,1,opt,name=req,proto3" json:"req,omitempty"` + DuplicateTxHash string `protobuf:"bytes,2,opt,name=duplicate_tx_hash,json=duplicateTxHash,proto3" json:"duplicate_tx_hash,omitempty"` + CanonicalTxHash string `protobuf:"bytes,3,opt,name=canonical_tx_hash,json=canonicalTxHash,proto3" json:"canonical_tx_hash,omitempty"` + ExecutionContext *monitoring.ExecutionContext `protobuf:"bytes,20,opt,name=execution_context,json=executionContext,proto3" json:"execution_context,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WriteReportDuplicateTx) Reset() { + *x = WriteReportDuplicateTx{} + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WriteReportDuplicateTx) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteReportDuplicateTx) ProtoMessage() {} + +func (x *WriteReportDuplicateTx) ProtoReflect() protoreflect.Message { + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteReportDuplicateTx.ProtoReflect.Descriptor instead. +func (*WriteReportDuplicateTx) Descriptor() ([]byte, []int) { + return file_chain_capabilities_stellar_monitoring_write_report_proto_rawDescGZIP(), []int{6} +} + +func (x *WriteReportDuplicateTx) GetReq() *WriteReportRequest { + if x != nil { + return x.Req + } + return nil +} + +func (x *WriteReportDuplicateTx) GetDuplicateTxHash() string { + if x != nil { + return x.DuplicateTxHash + } + return "" +} + +func (x *WriteReportDuplicateTx) GetCanonicalTxHash() string { + if x != nil { + return x.CanonicalTxHash + } + return "" +} + +func (x *WriteReportDuplicateTx) GetExecutionContext() *monitoring.ExecutionContext { + if x != nil { + return x.ExecutionContext + } + return nil +} + +type WriteReportSuccessfulEarlyReturn struct { + state protoimpl.MessageState `protogen:"open.v1"` + ExecutionContext *monitoring.ExecutionContext `protobuf:"bytes,20,opt,name=execution_context,json=executionContext,proto3" json:"execution_context,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WriteReportSuccessfulEarlyReturn) Reset() { + *x = WriteReportSuccessfulEarlyReturn{} + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WriteReportSuccessfulEarlyReturn) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteReportSuccessfulEarlyReturn) ProtoMessage() {} + +func (x *WriteReportSuccessfulEarlyReturn) ProtoReflect() protoreflect.Message { + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteReportSuccessfulEarlyReturn.ProtoReflect.Descriptor instead. +func (*WriteReportSuccessfulEarlyReturn) Descriptor() ([]byte, []int) { + return file_chain_capabilities_stellar_monitoring_write_report_proto_rawDescGZIP(), []int{7} +} + +func (x *WriteReportSuccessfulEarlyReturn) GetExecutionContext() *monitoring.ExecutionContext { + if x != nil { + return x.ExecutionContext + } + return nil +} + +type WriteReportTxHashRetrievalPhase struct { + state protoimpl.MessageState `protogen:"open.v1"` + Phase string `protobuf:"bytes,1,opt,name=phase,proto3" json:"phase,omitempty"` + Result string `protobuf:"bytes,2,opt,name=result,proto3" json:"result,omitempty"` + PhaseDurationMs int64 `protobuf:"varint,3,opt,name=phase_duration_ms,json=phaseDurationMs,proto3" json:"phase_duration_ms,omitempty"` + TxHash string `protobuf:"bytes,4,opt,name=tx_hash,json=txHash,proto3" json:"tx_hash,omitempty"` + LookupType string `protobuf:"bytes,5,opt,name=lookup_type,json=lookupType,proto3" json:"lookup_type,omitempty"` + ExecutionContext *monitoring.ExecutionContext `protobuf:"bytes,20,opt,name=execution_context,json=executionContext,proto3" json:"execution_context,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WriteReportTxHashRetrievalPhase) Reset() { + *x = WriteReportTxHashRetrievalPhase{} + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WriteReportTxHashRetrievalPhase) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteReportTxHashRetrievalPhase) ProtoMessage() {} + +func (x *WriteReportTxHashRetrievalPhase) ProtoReflect() protoreflect.Message { + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteReportTxHashRetrievalPhase.ProtoReflect.Descriptor instead. +func (*WriteReportTxHashRetrievalPhase) Descriptor() ([]byte, []int) { + return file_chain_capabilities_stellar_monitoring_write_report_proto_rawDescGZIP(), []int{8} +} + +func (x *WriteReportTxHashRetrievalPhase) GetPhase() string { + if x != nil { + return x.Phase + } + return "" +} + +func (x *WriteReportTxHashRetrievalPhase) GetResult() string { + if x != nil { + return x.Result + } + return "" +} + +func (x *WriteReportTxHashRetrievalPhase) GetPhaseDurationMs() int64 { + if x != nil { + return x.PhaseDurationMs + } + return 0 +} + +func (x *WriteReportTxHashRetrievalPhase) GetTxHash() string { + if x != nil { + return x.TxHash + } + return "" +} + +func (x *WriteReportTxHashRetrievalPhase) GetLookupType() string { + if x != nil { + return x.LookupType + } + return "" +} + +func (x *WriteReportTxHashRetrievalPhase) GetExecutionContext() *monitoring.ExecutionContext { + if x != nil { + return x.ExecutionContext + } + return nil +} + +type WriteReportInvokeOnReportDuration struct { + state protoimpl.MessageState `protogen:"open.v1"` + DurationMs int64 `protobuf:"varint,1,opt,name=duration_ms,json=durationMs,proto3" json:"duration_ms,omitempty"` + TxStatus int32 `protobuf:"varint,2,opt,name=tx_status,json=txStatus,proto3" json:"tx_status,omitempty"` + ExecutionContext *monitoring.ExecutionContext `protobuf:"bytes,20,opt,name=execution_context,json=executionContext,proto3" json:"execution_context,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WriteReportInvokeOnReportDuration) Reset() { + *x = WriteReportInvokeOnReportDuration{} + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WriteReportInvokeOnReportDuration) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteReportInvokeOnReportDuration) ProtoMessage() {} + +func (x *WriteReportInvokeOnReportDuration) ProtoReflect() protoreflect.Message { + mi := &file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteReportInvokeOnReportDuration.ProtoReflect.Descriptor instead. +func (*WriteReportInvokeOnReportDuration) Descriptor() ([]byte, []int) { + return file_chain_capabilities_stellar_monitoring_write_report_proto_rawDescGZIP(), []int{9} +} + +func (x *WriteReportInvokeOnReportDuration) GetDurationMs() int64 { + if x != nil { + return x.DurationMs + } + return 0 +} + +func (x *WriteReportInvokeOnReportDuration) GetTxStatus() int32 { + if x != nil { + return x.TxStatus + } + return 0 +} + +func (x *WriteReportInvokeOnReportDuration) GetExecutionContext() *monitoring.ExecutionContext { + if x != nil { + return x.ExecutionContext + } + return nil +} + +var File_chain_capabilities_stellar_monitoring_write_report_proto protoreflect.FileDescriptor + +const file_chain_capabilities_stellar_monitoring_write_report_proto_rawDesc = "" + + "\n" + + "8chain_capabilities/stellar/monitoring/write_report.proto\x12\x1achain_capabilities.stellar\x1a'libs/monitoring/execution_context.proto\"u\n" + + "\x12WriteReportRequest\x12\x1f\n" + + "\vcontract_id\x18\x01 \x01(\tR\n" + + "contractId\x12\x1f\n" + + "\vreport_size\x18\x02 \x01(\x04R\n" + + "reportSize\x12\x1d\n" + + "\n" + + "sigs_count\x18\x03 \x01(\rR\tsigsCount\"\xa3\x01\n" + + "\x14WriteReportInitiated\x12@\n" + + "\x03req\x18\x01 \x01(\v2..chain_capabilities.stellar.WriteReportRequestR\x03req\x12I\n" + + "\x11execution_context\x18\x14 \x01(\v2\x1c.monitoring.ExecutionContextR\x10executionContext\"\xa1\x01\n" + + "\x12WriteReportSuccess\x12@\n" + + "\x03req\x18\x01 \x01(\v2..chain_capabilities.stellar.WriteReportRequestR\x03req\x12I\n" + + "\x11execution_context\x18\x14 \x01(\v2\x1c.monitoring.ExecutionContextR\x10executionContext\"\xf3\x01\n" + + "\x10WriteReportError\x12@\n" + + "\x03req\x18\x01 \x01(\v2..chain_capabilities.stellar.WriteReportRequestR\x03req\x12\x18\n" + + "\asummary\x18\x02 \x01(\tR\asummary\x12\x14\n" + + "\x05cause\x18\x03 \x01(\tR\x05cause\x12\"\n" + + "\ris_user_error\x18\x04 \x01(\bR\visUserError\x12I\n" + + "\x11execution_context\x18\x14 \x01(\v2\x1c.monitoring.ExecutionContextR\x10executionContext\"\xf7\x01\n" + + "\x1fWriteReportTxInfoRetrievalError\x12@\n" + + "\x03req\x18\x01 \x01(\v2..chain_capabilities.stellar.WriteReportRequestR\x03req\x12\x18\n" + + "\asummary\x18\x02 \x01(\tR\asummary\x12\x14\n" + + "\x05cause\x18\x03 \x01(\tR\x05cause\x12\x17\n" + + "\atx_hash\x18\x04 \x01(\tR\x06txHash\x12I\n" + + "\x11execution_context\x18\x14 \x01(\v2\x1c.monitoring.ExecutionContextR\x10executionContext\"\xa1\x03\n" + + "#WriteReportInvalidTransmissionState\x12@\n" + + "\x03req\x18\x01 \x01(\v2..chain_capabilities.stellar.WriteReportRequestR\x03req\x12\x18\n" + + "\asummary\x18\x02 \x01(\tR\asummary\x12\x14\n" + + "\x05cause\x18\x03 \x01(\tR\x05cause\x12-\n" + + "\x12transmission_state\x18\x04 \x01(\rR\x11transmissionState\x12)\n" + + "\x10invalid_receiver\x18\x05 \x01(\bR\x0finvalidReceiver\x12\x18\n" + + "\asuccess\x18\x06 \x01(\bR\asuccess\x12'\n" + + "\x0ftransmission_id\x18\a \x01(\tR\x0etransmissionId\x12 \n" + + "\vtransmitter\x18\b \x01(\tR\vtransmitter\x12I\n" + + "\x11execution_context\x18\x14 \x01(\v2\x1c.monitoring.ExecutionContextR\x10executionContext\"\xfd\x01\n" + + "\x16WriteReportDuplicateTx\x12@\n" + + "\x03req\x18\x01 \x01(\v2..chain_capabilities.stellar.WriteReportRequestR\x03req\x12*\n" + + "\x11duplicate_tx_hash\x18\x02 \x01(\tR\x0fduplicateTxHash\x12*\n" + + "\x11canonical_tx_hash\x18\x03 \x01(\tR\x0fcanonicalTxHash\x12I\n" + + "\x11execution_context\x18\x14 \x01(\v2\x1c.monitoring.ExecutionContextR\x10executionContext\"m\n" + + " WriteReportSuccessfulEarlyReturn\x12I\n" + + "\x11execution_context\x18\x14 \x01(\v2\x1c.monitoring.ExecutionContextR\x10executionContext\"\x80\x02\n" + + "\x1fWriteReportTxHashRetrievalPhase\x12\x14\n" + + "\x05phase\x18\x01 \x01(\tR\x05phase\x12\x16\n" + + "\x06result\x18\x02 \x01(\tR\x06result\x12*\n" + + "\x11phase_duration_ms\x18\x03 \x01(\x03R\x0fphaseDurationMs\x12\x17\n" + + "\atx_hash\x18\x04 \x01(\tR\x06txHash\x12\x1f\n" + + "\vlookup_type\x18\x05 \x01(\tR\n" + + "lookupType\x12I\n" + + "\x11execution_context\x18\x14 \x01(\v2\x1c.monitoring.ExecutionContextR\x10executionContext\"\xac\x01\n" + + "!WriteReportInvokeOnReportDuration\x12\x1f\n" + + "\vduration_ms\x18\x01 \x01(\x03R\n" + + "durationMs\x12\x1b\n" + + "\ttx_status\x18\x02 \x01(\x05R\btxStatus\x12I\n" + + "\x11execution_context\x18\x14 \x01(\v2\x1c.monitoring.ExecutionContextR\x10executionContextB[ZYgithub.com/smartcontractkit/capabilities/chain_capabilities/stellar/monitoring;monitoringb\x06proto3" + +var ( + file_chain_capabilities_stellar_monitoring_write_report_proto_rawDescOnce sync.Once + file_chain_capabilities_stellar_monitoring_write_report_proto_rawDescData []byte +) + +func file_chain_capabilities_stellar_monitoring_write_report_proto_rawDescGZIP() []byte { + file_chain_capabilities_stellar_monitoring_write_report_proto_rawDescOnce.Do(func() { + file_chain_capabilities_stellar_monitoring_write_report_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_chain_capabilities_stellar_monitoring_write_report_proto_rawDesc), len(file_chain_capabilities_stellar_monitoring_write_report_proto_rawDesc))) + }) + return file_chain_capabilities_stellar_monitoring_write_report_proto_rawDescData +} + +var file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes = make([]protoimpl.MessageInfo, 10) +var file_chain_capabilities_stellar_monitoring_write_report_proto_goTypes = []any{ + (*WriteReportRequest)(nil), // 0: chain_capabilities.stellar.WriteReportRequest + (*WriteReportInitiated)(nil), // 1: chain_capabilities.stellar.WriteReportInitiated + (*WriteReportSuccess)(nil), // 2: chain_capabilities.stellar.WriteReportSuccess + (*WriteReportError)(nil), // 3: chain_capabilities.stellar.WriteReportError + (*WriteReportTxInfoRetrievalError)(nil), // 4: chain_capabilities.stellar.WriteReportTxInfoRetrievalError + (*WriteReportInvalidTransmissionState)(nil), // 5: chain_capabilities.stellar.WriteReportInvalidTransmissionState + (*WriteReportDuplicateTx)(nil), // 6: chain_capabilities.stellar.WriteReportDuplicateTx + (*WriteReportSuccessfulEarlyReturn)(nil), // 7: chain_capabilities.stellar.WriteReportSuccessfulEarlyReturn + (*WriteReportTxHashRetrievalPhase)(nil), // 8: chain_capabilities.stellar.WriteReportTxHashRetrievalPhase + (*WriteReportInvokeOnReportDuration)(nil), // 9: chain_capabilities.stellar.WriteReportInvokeOnReportDuration + (*monitoring.ExecutionContext)(nil), // 10: monitoring.ExecutionContext +} +var file_chain_capabilities_stellar_monitoring_write_report_proto_depIdxs = []int32{ + 0, // 0: chain_capabilities.stellar.WriteReportInitiated.req:type_name -> chain_capabilities.stellar.WriteReportRequest + 10, // 1: chain_capabilities.stellar.WriteReportInitiated.execution_context:type_name -> monitoring.ExecutionContext + 0, // 2: chain_capabilities.stellar.WriteReportSuccess.req:type_name -> chain_capabilities.stellar.WriteReportRequest + 10, // 3: chain_capabilities.stellar.WriteReportSuccess.execution_context:type_name -> monitoring.ExecutionContext + 0, // 4: chain_capabilities.stellar.WriteReportError.req:type_name -> chain_capabilities.stellar.WriteReportRequest + 10, // 5: chain_capabilities.stellar.WriteReportError.execution_context:type_name -> monitoring.ExecutionContext + 0, // 6: chain_capabilities.stellar.WriteReportTxInfoRetrievalError.req:type_name -> chain_capabilities.stellar.WriteReportRequest + 10, // 7: chain_capabilities.stellar.WriteReportTxInfoRetrievalError.execution_context:type_name -> monitoring.ExecutionContext + 0, // 8: chain_capabilities.stellar.WriteReportInvalidTransmissionState.req:type_name -> chain_capabilities.stellar.WriteReportRequest + 10, // 9: chain_capabilities.stellar.WriteReportInvalidTransmissionState.execution_context:type_name -> monitoring.ExecutionContext + 0, // 10: chain_capabilities.stellar.WriteReportDuplicateTx.req:type_name -> chain_capabilities.stellar.WriteReportRequest + 10, // 11: chain_capabilities.stellar.WriteReportDuplicateTx.execution_context:type_name -> monitoring.ExecutionContext + 10, // 12: chain_capabilities.stellar.WriteReportSuccessfulEarlyReturn.execution_context:type_name -> monitoring.ExecutionContext + 10, // 13: chain_capabilities.stellar.WriteReportTxHashRetrievalPhase.execution_context:type_name -> monitoring.ExecutionContext + 10, // 14: chain_capabilities.stellar.WriteReportInvokeOnReportDuration.execution_context:type_name -> monitoring.ExecutionContext + 15, // [15:15] is the sub-list for method output_type + 15, // [15:15] is the sub-list for method input_type + 15, // [15:15] is the sub-list for extension type_name + 15, // [15:15] is the sub-list for extension extendee + 0, // [0:15] is the sub-list for field type_name +} + +func init() { file_chain_capabilities_stellar_monitoring_write_report_proto_init() } +func file_chain_capabilities_stellar_monitoring_write_report_proto_init() { + if File_chain_capabilities_stellar_monitoring_write_report_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_chain_capabilities_stellar_monitoring_write_report_proto_rawDesc), len(file_chain_capabilities_stellar_monitoring_write_report_proto_rawDesc)), + NumEnums: 0, + NumMessages: 10, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_chain_capabilities_stellar_monitoring_write_report_proto_goTypes, + DependencyIndexes: file_chain_capabilities_stellar_monitoring_write_report_proto_depIdxs, + MessageInfos: file_chain_capabilities_stellar_monitoring_write_report_proto_msgTypes, + }.Build() + File_chain_capabilities_stellar_monitoring_write_report_proto = out.File + file_chain_capabilities_stellar_monitoring_write_report_proto_goTypes = nil + file_chain_capabilities_stellar_monitoring_write_report_proto_depIdxs = nil +} diff --git a/chain_capabilities/stellar/monitoring/write_report.proto b/chain_capabilities/stellar/monitoring/write_report.proto new file mode 100644 index 000000000..947a55f16 --- /dev/null +++ b/chain_capabilities/stellar/monitoring/write_report.proto @@ -0,0 +1,77 @@ +syntax = "proto3"; +package chain_capabilities.stellar; +option go_package = "github.com/smartcontractkit/capabilities/chain_capabilities/stellar/monitoring;monitoring"; + +import "libs/monitoring/execution_context.proto"; + +// WriteReportRequest intentionally captures only the non-sensitive subset of the +// capability request that is useful for monitoring (no raw report bytes or signatures). +message WriteReportRequest { + string contract_id = 1; + uint64 report_size = 2; + uint32 sigs_count = 3; +} + +message WriteReportInitiated { + WriteReportRequest req = 1; + monitoring.ExecutionContext execution_context = 20; +} + +message WriteReportSuccess { + WriteReportRequest req = 1; + monitoring.ExecutionContext execution_context = 20; +} + +message WriteReportError { + WriteReportRequest req = 1; + string summary = 2; + string cause = 3; + bool is_user_error = 4; + monitoring.ExecutionContext execution_context = 20; +} + +message WriteReportTxInfoRetrievalError { + WriteReportRequest req = 1; + string summary = 2; + string cause = 3; + string tx_hash = 4; + monitoring.ExecutionContext execution_context = 20; +} + +message WriteReportInvalidTransmissionState { + WriteReportRequest req = 1; + string summary = 2; + string cause = 3; + uint32 transmission_state = 4; + bool invalid_receiver = 5; + bool success = 6; + string transmission_id = 7; + string transmitter = 8; + monitoring.ExecutionContext execution_context = 20; +} + +message WriteReportDuplicateTx { + WriteReportRequest req = 1; + string duplicate_tx_hash = 2; + string canonical_tx_hash = 3; + monitoring.ExecutionContext execution_context = 20; +} + +message WriteReportSuccessfulEarlyReturn { + monitoring.ExecutionContext execution_context = 20; +} + +message WriteReportTxHashRetrievalPhase { + string phase = 1; + string result = 2; + int64 phase_duration_ms = 3; + string tx_hash = 4; + string lookup_type = 5; + monitoring.ExecutionContext execution_context = 20; +} + +message WriteReportInvokeOnReportDuration { + int64 duration_ms = 1; + int32 tx_status = 2; + monitoring.ExecutionContext execution_context = 20; +}