Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions chain_capabilities/stellar/actions/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ func (s *Stellar) Close() error {
func (s *Stellar) GetLatestLedger(ctx context.Context, _ capabilities.RequestMetadata, _ *stellarcap.GetLatestLedgerRequest) (*capabilities.ResponseAndMetadata[*stellarcap.GetLatestLedgerResponse], caperrors.Error) {
resp, err := s.StellarService.GetLatestLedger(ctx)
if err != nil {
return nil, GetError(err, false)
return nil, capcommon.GetError(err, false)
}
protoResp, err := stellarcap.ConvertGetLatestLedgerResponseToProto(resp)
if err != nil {
return nil, GetError(err, false)
return nil, capcommon.GetError(err, false)
}
return &capabilities.ResponseAndMetadata[*stellarcap.GetLatestLedgerResponse]{Response: protoResp}, nil
}
Expand Down Expand Up @@ -196,6 +196,3 @@ func isUserError(err error) bool {
func isStellarNodeInfraError(err error) bool {
return errors.Is(err, multinode.ErrNodeError) || strings.Contains(err.Error(), multinode.ErrNodeError.Error())
}

var GetError = capcommon.GetError
var NewUserError = caperrors.NewPublicUserError
12 changes: 12 additions & 0 deletions chain_capabilities/stellar/actions/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
caperrors "github.com/smartcontractkit/chainlink-common/pkg/capabilities/errors"
stellarcap "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/stellar"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/stellar/scval"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/settings/limits"
"github.com/smartcontractkit/chainlink-common/pkg/types"
Expand Down Expand Up @@ -281,6 +282,17 @@ func TestConvertReadContractRequestFromProto(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "function is required")
})

t.Run("invalid arg conversion", func(t *testing.T) {
t.Parallel()
_, err := convertReadContractRequestFromProto(&stellarcap.ReadContractRequest{
ContractId: testForwarderAddress,
Function: "balance",
Args: []*scval.ScVal{nil},
})
require.Error(t, err)
require.Contains(t, err.Error(), "args[0]")
})
}

func TestIsStellarNodeInfraError_MessageSubstring(t *testing.T) {
Expand Down
74 changes: 66 additions & 8 deletions chain_capabilities/stellar/actions/tx_hash_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,67 @@ import (
"context"
"errors"
"fmt"
"math"
"strings"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/logger"

capcommon "github.com/smartcontractkit/capabilities/chain_capabilities/common"
"github.com/smartcontractkit/capabilities/chain_capabilities/stellar/monitoring"
)

const (
reportProcessedTopicPrefix = "forwarder_ReportProcessed"
defaultForwarderLookbackLedgers = int64(100)
failedToRetrieveTxHashErrorMsg = "failed to retrieve tx hash for report"

txHashLookupTypeSuccessful = "SuccessfulTransmission"
txHashLookupTypeFailed = "FailedTransmission"
txHashRetrievalPhase = "EventPoll"
)

var ErrUnexpectedSuccessfulTransmission = errors.New("unexpected successful transmission")

type TxHashRetriever struct {
forwarderClient CREForwarderClient
transmissionID TransmissionID
lggr logger.SugaredLogger
forwarderClient CREForwarderClient
transmissionID TransmissionID
lggr logger.SugaredLogger
beholderProcessor beholder.ProtoProcessor
messageBuilder *monitoring.MessageBuilder
telemetryContext monitoring.TelemetryContext
}

type TxHashRetrieverOption func(*TxHashRetriever)

func WithTxHashRetrieverMonitoring(
beholderProcessor beholder.ProtoProcessor,
messageBuilder *monitoring.MessageBuilder,
telemetryContext monitoring.TelemetryContext,
) TxHashRetrieverOption {
return func(r *TxHashRetriever) {
r.beholderProcessor = beholderProcessor
r.messageBuilder = messageBuilder
r.telemetryContext = telemetryContext
}
}

func NewTxHashRetriever(
forwarderClient CREForwarderClient,
lggr logger.SugaredLogger,
transmissionID TransmissionID,
opts ...TxHashRetrieverOption,
) TxHashRetriever {
return TxHashRetriever{
r := TxHashRetriever{
forwarderClient: forwarderClient,
transmissionID: transmissionID,
lggr: lggr,
}
for _, opt := range opts {
opt(&r)
}
return r
}

type eventDetails struct {
Expand Down Expand Up @@ -65,7 +95,7 @@ func (l eventDetailsList) String() string {
}

func (r *TxHashRetriever) GetSuccessfulTransmissionHash(ctx context.Context) (string, error) {
details, err := r.fetchAndParseEvents(ctx)
details, err := r.fetchAndParseEvents(ctx, txHashLookupTypeSuccessful)
if err != nil {
return "", err
}
Expand All @@ -75,6 +105,7 @@ func (r *TxHashRetriever) GetSuccessfulTransmissionHash(ctx context.Context) (st
}
}
r.lggr.Errorw("No successful transmission found", "txCount", len(details), "transactions", details.String())
r.emitTxHashRetrievalPhase(ctx, txHashLookupTypeSuccessful, "NotFound", time.Now(), "")
return "", fmt.Errorf("no successful transmission found. Found %d transactions (all failed): %s",
len(details), details)
}
Expand All @@ -85,16 +116,21 @@ func (r *TxHashRetriever) GetFailedTransmissionHash(ctx context.Context) (string
}

func (r *TxHashRetriever) GetFailedTransmissionHashWithCount(ctx context.Context) (string, int, error) {
details, err := r.fetchAndParseEvents(ctx)
details, err := r.fetchAndParseEvents(ctx, txHashLookupTypeFailed)
if err != nil {
return "", 0, err
}
for _, d := range details {
if d.isSuccess {
r.emitTxHashRetrievalPhase(ctx, txHashLookupTypeFailed, "UnexpectedSuccess", time.Now(), d.txHash)
return "", len(details), fmt.Errorf("%w, successful tx hash: %s",
ErrUnexpectedSuccessfulTransmission, d.txHash)
}
}
if len(details) == 0 {
r.emitTxHashRetrievalPhase(ctx, txHashLookupTypeFailed, "NotFound", time.Now(), "")
return "", 0, fmt.Errorf("no failed transmission found")
}

earliestIdx := 0
for i, d := range details {
Expand All @@ -113,7 +149,8 @@ func (r *TxHashRetriever) GetFailedTransmissionHashWithCount(ctx context.Context
return details[earliestIdx].txHash, len(details), nil
}

func (r *TxHashRetriever) fetchAndParseEvents(ctx context.Context) (eventDetailsList, error) {
func (r *TxHashRetriever) fetchAndParseEvents(ctx context.Context, lookupType string) (eventDetailsList, error) {
phaseStart := time.Now()
events, err := capcommon.WithPollingRetry(ctx, r.lggr, func(ctx context.Context) ([]ReportProcessedEvent, error) {
events, fetchErr := r.forwarderClient.GetReportProcessedEvents(ctx, r.transmissionID)
if fetchErr != nil {
Expand All @@ -125,10 +162,31 @@ func (r *TxHashRetriever) fetchAndParseEvents(ctx context.Context) (eventDetails
return events, nil
})
if err != nil {
r.emitTxHashRetrievalPhase(ctx, lookupType, "FetchError", phaseStart, "")
return nil, fmt.Errorf("%s: %w", failedToRetrieveTxHashErrorMsg, err)
}

return buildEventDetails(events), nil
details := buildEventDetails(events)
selectedHash := ""
if len(details) > 0 {
selectedHash = details[0].txHash
}
r.emitTxHashRetrievalPhase(ctx, lookupType, "Found", phaseStart, selectedHash)
return details, nil
}

func (r *TxHashRetriever) emitTxHashRetrievalPhase(ctx context.Context, lookupType, result string, phaseStart time.Time, txHash string) {
if r.beholderProcessor == nil || r.messageBuilder == nil {
return
}
monitoring.EmitInitiated(ctx, r.lggr, r.beholderProcessor, r.messageBuilder.BuildWriteReportTxHashRetrievalPhase(
r.telemetryContext,
txHashRetrievalPhase,
result,
int64(math.Max(float64(time.Since(phaseStart).Milliseconds()), 0)),
txHash,
lookupType,
))
}

func buildEventDetails(events []ReportProcessedEvent) eventDetailsList {
Expand Down
54 changes: 47 additions & 7 deletions chain_capabilities/stellar/actions/tx_hash_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,45 @@ import (

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types"
stellartypes "github.com/smartcontractkit/chainlink-common/pkg/types/chains/stellar"
"github.com/smartcontractkit/chainlink-protos/cre/go/sdk"

"github.com/smartcontractkit/capabilities/chain_capabilities/stellar/monitoring"
)

type stubForwarderClient struct {
events []ReportProcessedEvent
err error
events []ReportProcessedEvent
eventsErr error
transmissionInfoFn func(call int) (TransmissionInfo, error)
invokeOnReportResp *stellartypes.SubmitTransactionResponse
invokeOnReportErr error
transmissionCalls int
}

func (s *stubForwarderClient) InvokeOnReport(context.Context, string, *sdk.ReportResponse) (*stellartypes.SubmitTransactionResponse, error) {
panic("not implemented")
if s.invokeOnReportErr != nil {
return nil, s.invokeOnReportErr
}
if s.invokeOnReportResp != nil {
return s.invokeOnReportResp, nil
}
panic("stubForwarderClient.InvokeOnReport not configured")
}

func (s *stubForwarderClient) GetTransmissionInfo(context.Context, TransmissionID) (TransmissionInfo, error) {
panic("not implemented")
s.transmissionCalls++
if s.transmissionInfoFn != nil {
return s.transmissionInfoFn(s.transmissionCalls)
}
panic("stubForwarderClient.GetTransmissionInfo not configured")
}

func (s *stubForwarderClient) GetReportProcessedEvents(context.Context, TransmissionID) ([]ReportProcessedEvent, error) {
if s.err != nil {
return nil, s.err
if s.eventsErr != nil {
return nil, s.eventsErr
}
return s.events, nil
}
Expand Down Expand Up @@ -116,7 +134,7 @@ func TestTxHashRetriever_GetSuccessfulTransmissionHash(t *testing.T) {

t.Run("returns error when event fetch fails", func(t *testing.T) {
t.Parallel()
client := &stubForwarderClient{err: errors.New("rpc down")}
client := &stubForwarderClient{eventsErr: errors.New("rpc down")}
retriever := NewTxHashRetriever(client, lggr, transmissionID)

ctx, cancel := context.WithTimeout(t.Context(), 300*time.Millisecond)
Expand Down Expand Up @@ -158,6 +176,28 @@ func TestTxHashRetriever_GetFailedTransmissionHash(t *testing.T) {
require.ErrorIs(t, err, ErrUnexpectedSuccessfulTransmission)
})

t.Run("emits unexpected success phase telemetry", func(t *testing.T) {
t.Parallel()
processor := &recordingWriteReportProcessor{}
client := &stubForwarderClient{events: []ReportProcessedEvent{
{TxHash: testTxHash, Ledger: 100, Success: true},
}}
retriever := NewTxHashRetriever(
client,
lggr,
transmissionID,
WithTxHashRetrieverMonitoring(
processor,
monitoring.NewMessageBuilder(types.ChainInfo{}, capabilities.CapabilityInfo{}, ""),
monitoring.TelemetryContext{},
),
)

_, err := retriever.GetFailedTransmissionHash(t.Context())
require.ErrorIs(t, err, ErrUnexpectedSuccessfulTransmission)
requireTxHashPhaseEvent(t, processor.messages, "FailedTransmission", txHashRetrievalPhase, "UnexpectedSuccess", testTxHash)
})

t.Run("GetFailedTransmissionHashWithCount returns count", func(t *testing.T) {
t.Parallel()
client := &stubForwarderClient{events: []ReportProcessedEvent{
Expand Down
Loading
Loading