From 0d9ceb10ea140137dfd0ce66d4663e28d2f0d365 Mon Sep 17 00:00:00 2001 From: Silas Lenihan Date: Thu, 2 Jul 2026 13:48:01 -0400 Subject: [PATCH 1/4] fix trigger bug --- chain_capabilities/solana/trigger/trigger.go | 85 +++++++++---- .../solana/trigger/trigger_test.go | 120 +++++++++++++++--- 2 files changed, 167 insertions(+), 38 deletions(-) diff --git a/chain_capabilities/solana/trigger/trigger.go b/chain_capabilities/solana/trigger/trigger.go index 44bc2b0f9..9df2af4ff 100644 --- a/chain_capabilities/solana/trigger/trigger.go +++ b/chain_capabilities/solana/trigger/trigger.go @@ -3,6 +3,7 @@ package trigger import ( "context" "fmt" + "slices" "strconv" "strings" "time" @@ -89,15 +90,19 @@ func (lts *SolanaLogTriggerService) ToLogPollerFilter(triggerID string, config * }, nil } -// BuildQueryExpressions builds query expressions including subkey filters -func BuildQueryExpressions(config *solanacappb.FilterLogTriggerRequest, lastProcessedBlock int64) ([]query.Expression, error) { +// BuildQueryExpressions builds query expressions including subkey filters. +// When includeStartingBlockFilter is true and startingBlock >= 0, results are limited to logs +// after the registration-time finalized block. Progression uses log-poller sequence numbers instead +// of advancing a block cursor, so includeStartingBlockFilter should be false once events have +// been delivered. +func BuildQueryExpressions(config *solanacappb.FilterLogTriggerRequest, startingBlock int64, includeStartingBlockFilter bool) ([]query.Expression, error) { expressions := []query.Expression{ solprimitives.NewAddressFilter(solana.PublicKey(config.Address)), solprimitives.NewEventSigFilter(getEventSig(config.EventName)), } - if lastProcessedBlock >= 0 { - blockStr := strconv.FormatInt(lastProcessedBlock, 10) + if includeStartingBlockFilter && startingBlock >= 0 { + blockStr := strconv.FormatInt(startingBlock, 10) expressions = append(expressions, query.Block(blockStr, primitives.Gt)) } @@ -482,8 +487,9 @@ func (lts *SolanaLogTriggerService) startPolling(ctx context.Context, telemetryC defer ticker.Stop() defer close(logCh) - // Use the finalized block number from registration as the starting point - lastProcessedBlock := startingBlock + // startingBlock is the finalized block at registration; lastProcessedSequenceNum tracks the + // log-poller ingest cursor and only advances after successful delivery. + var lastProcessedSequenceNum int64 for { select { @@ -491,8 +497,13 @@ func (lts *SolanaLogTriggerService) startPolling(ctx context.Context, telemetryC lts.lggr.Debugf("Context cancelled for triggerID: %s, stopping polling", triggerID) return case <-ticker.C: - lts.lggr.Debugf("Awake, polling for triggerID: %s, currentOffset: %d", triggerID, lastProcessedBlock) - expressions, err := BuildQueryExpressions(config, lastProcessedBlock) + lts.lggr.Debugw("Polling for trigger logs", + "triggerID", triggerID, + "lastProcessedSequenceNum", lastProcessedSequenceNum, + "startingBlock", startingBlock, + ) + includeStartingBlockFilter := lastProcessedSequenceNum == 0 + expressions, err := BuildQueryExpressions(config, startingBlock, includeStartingBlockFilter) if err != nil { summary := fmt.Sprintf("Failed to build query expressions for trigger %s: %v", triggerID, err) lts.logAndEmitError(ctx, telemetryContext, triggerID, summary, err.Error()) @@ -512,39 +523,69 @@ func (lts *SolanaLogTriggerService) startPolling(ctx context.Context, telemetryC } sentCount := 0 - calculatedLatestBlock := lastProcessedBlock - for _, log := range logs { + var latestDeliveredBlock int64 = startingBlock + pendingLogs := pendingLogsAfterSequence(logs, lastProcessedSequenceNum) + for _, log := range pendingLogs { if log == nil { lts.lggr.Warnw("Received nil log from QueryTrackedLogs, skipping", "triggerID", triggerID) continue } - // Track the highest block number from logs (all logs from log poller are already finalized) - if log.BlockNumber > calculatedLatestBlock { - calculatedLatestBlock = log.BlockNumber - } - protoLog := solanacappb.ConvertLogToProto(log) eventID := lts.generateLogIdentifier(log) checksLimitsOk := lts.checkLimitsOnLog(ctx, telemetryContext, protoLog, triggerID, eventID, log) if !checksLimitsOk { - continue + break } if ctx.Err() != nil { return } - if lts.deliverLogReliably(ctx, telemetryContext, triggerID, protoLog, eventID, log) { - sentCount++ + if !lts.deliverLogReliably(ctx, telemetryContext, triggerID, protoLog, eventID, log) { + break + } + + sentCount++ + lastProcessedSequenceNum = log.SequenceNum + if log.BlockNumber > latestDeliveredBlock { + latestDeliveredBlock = log.BlockNumber } } - lastProcessedBlock = calculatedLatestBlock - successMessage := fmt.Sprintf("Finished updating BlockNumber for triggerID: %s, BlockNumber: %d, sent logs: %d", triggerID, calculatedLatestBlock, sentCount) - lts.logAndEmitSuccess(ctx, successMessage, telemetryContext, triggerID, config, sentCount, calculatedLatestBlock) + successMessage := fmt.Sprintf( + "Finished updating sequence cursor for triggerID: %s, SequenceNum: %d, BlockNumber: %d, sent logs: %d", + triggerID, lastProcessedSequenceNum, latestDeliveredBlock, sentCount, + ) + lts.logAndEmitSuccess(ctx, successMessage, telemetryContext, triggerID, config, sentCount, latestDeliveredBlock) + } + } +} + +func pendingLogsAfterSequence(logs []*solana.Log, lastProcessedSequenceNum int64) []*solana.Log { + pending := make([]*solana.Log, 0, len(logs)) + for _, log := range logs { + if log == nil { + continue + } + if log.SequenceNum <= lastProcessedSequenceNum { + continue } + pending = append(pending, log) } + + slices.SortFunc(pending, func(a, b *solana.Log) int { + switch { + case a.SequenceNum < b.SequenceNum: + return -1 + case a.SequenceNum > b.SequenceNum: + return 1 + default: + return 0 + } + }) + + return pending } func getEventSig(eventName string) solana.EventSignature { @@ -585,7 +626,7 @@ func (lts *SolanaLogTriggerService) deliverLogReliably( Payload: anyPayload, } - lts.lggr.Infow("Sending log event to pipe", "triggerID", triggerID, "eventID", eventID, "blockNumber", log.BlockNumber, "txHash", log.TxHash) + lts.lggr.Infow("Sending log event to pipe", "triggerID", triggerID, "eventID", eventID, "blockNumber", log.BlockNumber, "sequenceNum", log.SequenceNum, "txHash", log.TxHash) deliverCtx := capcommon.ContextWithOrgForDelivery(ctx, lts.lggr, lts.orgResolver, telemetryContext.RequestMetadata) if err := lts.baseTrigger.DeliverEvent(deliverCtx, te, triggerID); err != nil { summary := fmt.Sprintf("failed to persist/deliver event (triggerID=%s, eventID=%s): %v", triggerID, eventID, err) diff --git a/chain_capabilities/solana/trigger/trigger_test.go b/chain_capabilities/solana/trigger/trigger_test.go index 14ae54a85..2a3d44d74 100644 --- a/chain_capabilities/solana/trigger/trigger_test.go +++ b/chain_capabilities/solana/trigger/trigger_test.go @@ -127,6 +127,21 @@ func startPollingAsync( t.Cleanup(wg.Wait) } +func waitForLog( + t *testing.T, + logCh <-chan capabilities.TriggerAndId[*solanacappb.Log], + timeout time.Duration, +) capabilities.TriggerAndId[*solanacappb.Log] { + t.Helper() + select { + case response := <-logCh: + return response + case <-time.After(timeout): + t.Fatal("Timeout waiting for log") + return capabilities.TriggerAndId[*solanacappb.Log]{} + } +} + func setupTest(t *testing.T) (*SolanaLogTriggerService, *mocks.SolanaService) { mockSolanaService := mocks.NewSolanaService(t) @@ -166,10 +181,15 @@ func createTestRequest() *solanacappb.FilterLogTriggerRequest { } func createTestLog(blockNumber int64, address solana.PublicKey) *solana.Log { + return createTestLogWithSequence(blockNumber, blockNumber, address) +} + +func createTestLogWithSequence(blockNumber int64, sequenceNum int64, address solana.PublicKey) *solana.Log { return &solana.Log{ Address: address, EventSig: testEventSig, BlockNumber: blockNumber, + SequenceNum: sequenceNum, TxHash: solana.Signature{}, Data: []byte("test log data"), } @@ -390,13 +410,41 @@ func TestToLogPollerFilter(t *testing.T) { }) } +func TestPendingLogsAfterSequence(t *testing.T) { + t.Parallel() + + logs := []*solana.Log{ + createTestLogWithSequence(101, 1, testPublicKey), + createTestLogWithSequence(103, 3, testPublicKey), + createTestLogWithSequence(102, 2, testPublicKey), + nil, + } + + pending := pendingLogsAfterSequence(logs, 1) + require.Len(t, pending, 2) + require.Equal(t, int64(2), pending[0].SequenceNum) + require.Equal(t, int64(3), pending[1].SequenceNum) +} + func TestBuildQueryExpressions(t *testing.T) { + t.Run("omits starting block filter after first delivery", func(t *testing.T) { + request := createTestRequest() + + withBlock, err := BuildQueryExpressions(request, 99, true) + require.NoError(t, err) + require.Len(t, withBlock, 4) + + withoutBlock, err := BuildQueryExpressions(request, 99, false) + require.NoError(t, err) + require.Len(t, withoutBlock, 3) + }) + t.Run("builds basic expressions", func(t *testing.T) { request := &solanacappb.FilterLogTriggerRequest{ Address: testPublicKey[:], } - expressions, err := BuildQueryExpressions(request, 99) + expressions, err := BuildQueryExpressions(request, 99, true) require.NoError(t, err) require.Len(t, expressions, 3) @@ -408,7 +456,7 @@ func TestBuildQueryExpressions(t *testing.T) { Subkeys: testSubkeys, } - expressions, err := BuildQueryExpressions(request, 99) + expressions, err := BuildQueryExpressions(request, 99, true) require.NoError(t, err) require.Len(t, expressions, 4) @@ -422,7 +470,7 @@ func TestBuildQueryExpressions(t *testing.T) { }, } - expressions, err := BuildQueryExpressions(request, 99) + expressions, err := BuildQueryExpressions(request, 99, true) require.NoError(t, err) require.Len(t, expressions, 3) @@ -447,7 +495,7 @@ func TestLogTriggerSubkeyFilters(t *testing.T) { }, } - expressions, err := BuildQueryExpressions(request, 99) + expressions, err := BuildQueryExpressions(request, 99, true) require.NoError(t, err) require.NotNil(t, expressions) @@ -493,7 +541,7 @@ func TestLogTriggerSubkeyFilters(t *testing.T) { }, } - expressions, err := BuildQueryExpressions(request, 49) + expressions, err := BuildQueryExpressions(request, 49, true) require.NoError(t, err) require.NotNil(t, expressions) @@ -526,7 +574,7 @@ func TestLogTriggerSubkeyFilters(t *testing.T) { }, } - expressions, err := BuildQueryExpressions(request, 0) + expressions, err := BuildQueryExpressions(request, 0, true) require.NoError(t, err) require.NotNil(t, expressions) @@ -558,7 +606,7 @@ func TestLogTriggerSubkeyFilters(t *testing.T) { }, } - expressions, err := BuildQueryExpressions(request, 0) + expressions, err := BuildQueryExpressions(request, 0, true) require.NoError(t, err) require.NotNil(t, expressions) @@ -685,7 +733,7 @@ func TestStartPolling(t *testing.T) { mockSolanaService.AssertExpectations(t) }) - t.Run("updates lastProcessedBlock correctly", func(t *testing.T) { + t.Run("updates sequence cursor on successful delivery", func(t *testing.T) { service, mockSolana := setupTest(t) baseCtx, cancel := context.WithTimeout(t.Context(), 100*time.Millisecond) defer cancel() @@ -806,7 +854,7 @@ func TestStartPolling(t *testing.T) { mockSolanaService.AssertExpectations(t) }) - t.Run("updates lastProcessedBlock when inbox is full", func(t *testing.T) { + t.Run("does not advance sequence cursor when inbox is full", func(t *testing.T) { mockSolanaService := mocks.NewSolanaService(t) store := NewSolanaLogTriggerStore() @@ -868,6 +916,46 @@ func TestStartPolling(t *testing.T) { tests.AssertEventually(t, func() bool { return queryCallCount.Load() >= 2 }) }) + t.Run("delivers later-ingested lower block after higher block", func(t *testing.T) { + service, mockSolana := setupTest(t) + + meta := testRequestMetadata() + ctx := meta.ContextWithCRE(t.Context()) + + config := createTestRequest() + triggerID := "test-trigger" + startingBlock := int64(100) + logCh := make(chan capabilities.TriggerAndId[*solanacappb.Log], 10) + + // Higher block ingested first (sequence 1), lower block arrives later (sequence 2). + firstBatch := []*solana.Log{ + createTestLogWithSequence(102, 1, testPublicKey), + } + secondBatch := []*solana.Log{ + createTestLogWithSequence(101, 2, testPublicKey), + } + + var queryCallCount atomic.Int32 + mockSolana.EXPECT().QueryTrackedLogs(mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(_ context.Context, _ []query.Expression, _ query.LimitAndSort) ([]*solana.Log, error) { + if queryCallCount.Add(1) == 1 { + return firstBatch, nil + } + return secondBatch, nil + }).Maybe() + + telemetryContext := createTestTelemetryContext() + startPollingAsync(ctx, t, service, telemetryContext, config, triggerID, startingBlock, logCh) + + first := waitForLog(t, logCh, 100*time.Millisecond) + assert.Equal(t, int64(102), first.Trigger.BlockNumber) + + second := waitForLog(t, logCh, 200*time.Millisecond) + assert.Equal(t, int64(101), second.Trigger.BlockNumber) + + mockSolana.AssertExpectations(t) + }) + t.Run("continues polling after transient errors", func(t *testing.T) { mockSolanaService := mocks.NewSolanaService(t) store := NewSolanaLogTriggerStore() @@ -968,7 +1056,7 @@ func TestErrorHandling(t *testing.T) { }, } - expressions, err := BuildQueryExpressions(request, 99) + expressions, err := BuildQueryExpressions(request, 99, true) if err != nil { assert.Contains(t, err.Error(), "failed to create subkey filter") } else { @@ -978,7 +1066,7 @@ func TestErrorHandling(t *testing.T) { t.Run("block range validation", func(t *testing.T) { request := createTestRequest() - expressions, err := BuildQueryExpressions(request, 199) + expressions, err := BuildQueryExpressions(request, 199, true) if err == nil { assert.NotNil(t, expressions) } @@ -998,7 +1086,7 @@ func TestErrorHandling(t *testing.T) { }, } - expressions, err := BuildQueryExpressions(request, 99) + expressions, err := BuildQueryExpressions(request, 99, true) if err == nil { assert.NotNil(t, expressions) } @@ -1006,7 +1094,7 @@ func TestErrorHandling(t *testing.T) { t.Run("zero block range", func(t *testing.T) { request := createTestRequest() - expressions, err := BuildQueryExpressions(request, -1) + expressions, err := BuildQueryExpressions(request, -1, true) require.NoError(t, err) assert.GreaterOrEqual(t, len(expressions), 2) }) @@ -1099,7 +1187,7 @@ func BenchmarkSolanaLogTriggerService_BuildQueryExpressions(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := BuildQueryExpressions(request, 99) + _, err := BuildQueryExpressions(request, 99, true) if err != nil { b.Fatal(err) } @@ -1320,7 +1408,7 @@ func TestSolanaLogTriggerService_EdgeCases(t *testing.T) { t.Run("very large block numbers", func(t *testing.T) { request := createTestRequest() - expressions, err := BuildQueryExpressions(request, 9223372036854775805) + expressions, err := BuildQueryExpressions(request, 9223372036854775805, true) require.NoError(t, err) require.NotEmpty(t, expressions) @@ -1329,7 +1417,7 @@ func TestSolanaLogTriggerService_EdgeCases(t *testing.T) { t.Run("negative block numbers", func(t *testing.T) { request := createTestRequest() - expressions, err := BuildQueryExpressions(request, -2) + expressions, err := BuildQueryExpressions(request, -2, true) require.NoError(t, err) require.NotEmpty(t, expressions) From 5c6edefe7b1e3ea4e5fbb8b9f4ed5974cdd8a764 Mon Sep 17 00:00:00 2001 From: Silas Lenihan Date: Thu, 2 Jul 2026 13:56:51 -0400 Subject: [PATCH 2/4] simplify --- chain_capabilities/solana/trigger/trigger.go | 57 +++++++------ .../solana/trigger/trigger_test.go | 79 +++---------------- 2 files changed, 47 insertions(+), 89 deletions(-) diff --git a/chain_capabilities/solana/trigger/trigger.go b/chain_capabilities/solana/trigger/trigger.go index 9df2af4ff..ba5b4787b 100644 --- a/chain_capabilities/solana/trigger/trigger.go +++ b/chain_capabilities/solana/trigger/trigger.go @@ -2,6 +2,7 @@ package trigger import ( "context" + "cmp" "fmt" "slices" "strconv" @@ -92,9 +93,8 @@ func (lts *SolanaLogTriggerService) ToLogPollerFilter(triggerID string, config * // BuildQueryExpressions builds query expressions including subkey filters. // When includeStartingBlockFilter is true and startingBlock >= 0, results are limited to logs -// after the registration-time finalized block. Progression uses log-poller sequence numbers instead -// of advancing a block cursor, so includeStartingBlockFilter should be false once events have -// been delivered. +// after the registration-time finalized block. Once delivery starts, progression uses the last +// successfully delivered block/log-index position. func BuildQueryExpressions(config *solanacappb.FilterLogTriggerRequest, startingBlock int64, includeStartingBlockFilter bool) ([]query.Expression, error) { expressions := []query.Expression{ solprimitives.NewAddressFilter(solana.PublicKey(config.Address)), @@ -487,9 +487,13 @@ func (lts *SolanaLogTriggerService) startPolling(ctx context.Context, telemetryC defer ticker.Stop() defer close(logCh) - // startingBlock is the finalized block at registration; lastProcessedSequenceNum tracks the - // log-poller ingest cursor and only advances after successful delivery. - var lastProcessedSequenceNum int64 + // startingBlock is the finalized block at registration. The delivery cursor only advances to the + // last successfully delivered log and never jumps ahead within a batch. + var ( + lastDeliveredBlock = startingBlock + lastDeliveredLogIndex int64 + hasDelivered bool + ) for { select { @@ -499,10 +503,11 @@ func (lts *SolanaLogTriggerService) startPolling(ctx context.Context, telemetryC case <-ticker.C: lts.lggr.Debugw("Polling for trigger logs", "triggerID", triggerID, - "lastProcessedSequenceNum", lastProcessedSequenceNum, + "lastDeliveredBlock", lastDeliveredBlock, + "lastDeliveredLogIndex", lastDeliveredLogIndex, "startingBlock", startingBlock, ) - includeStartingBlockFilter := lastProcessedSequenceNum == 0 + includeStartingBlockFilter := !hasDelivered expressions, err := BuildQueryExpressions(config, startingBlock, includeStartingBlockFilter) if err != nil { summary := fmt.Sprintf("Failed to build query expressions for trigger %s: %v", triggerID, err) @@ -524,7 +529,7 @@ func (lts *SolanaLogTriggerService) startPolling(ctx context.Context, telemetryC sentCount := 0 var latestDeliveredBlock int64 = startingBlock - pendingLogs := pendingLogsAfterSequence(logs, lastProcessedSequenceNum) + pendingLogs := pendingLogsAfterPosition(logs, lastDeliveredBlock, lastDeliveredLogIndex) for _, log := range pendingLogs { if log == nil { lts.lggr.Warnw("Received nil log from QueryTrackedLogs, skipping", "triggerID", triggerID) @@ -547,47 +552,55 @@ func (lts *SolanaLogTriggerService) startPolling(ctx context.Context, telemetryC } sentCount++ - lastProcessedSequenceNum = log.SequenceNum + hasDelivered = true + lastDeliveredBlock = log.BlockNumber + lastDeliveredLogIndex = log.LogIndex if log.BlockNumber > latestDeliveredBlock { latestDeliveredBlock = log.BlockNumber } } successMessage := fmt.Sprintf( - "Finished updating sequence cursor for triggerID: %s, SequenceNum: %d, BlockNumber: %d, sent logs: %d", - triggerID, lastProcessedSequenceNum, latestDeliveredBlock, sentCount, + "Finished updating delivery cursor for triggerID: %s, BlockNumber: %d, LogIndex: %d, sent logs: %d", + triggerID, lastDeliveredBlock, lastDeliveredLogIndex, sentCount, ) lts.logAndEmitSuccess(ctx, successMessage, telemetryContext, triggerID, config, sentCount, latestDeliveredBlock) } } } -func pendingLogsAfterSequence(logs []*solana.Log, lastProcessedSequenceNum int64) []*solana.Log { +func pendingLogsAfterPosition(logs []*solana.Log, lastDeliveredBlock int64, lastDeliveredLogIndex int64) []*solana.Log { pending := make([]*solana.Log, 0, len(logs)) for _, log := range logs { if log == nil { continue } - if log.SequenceNum <= lastProcessedSequenceNum { + if isLogAtOrBeforePosition(log, lastDeliveredBlock, lastDeliveredLogIndex) { continue } pending = append(pending, log) } slices.SortFunc(pending, func(a, b *solana.Log) int { - switch { - case a.SequenceNum < b.SequenceNum: - return -1 - case a.SequenceNum > b.SequenceNum: - return 1 - default: - return 0 + if c := cmp.Compare(a.BlockNumber, b.BlockNumber); c != 0 { + return c } + return cmp.Compare(a.LogIndex, b.LogIndex) }) return pending } +func isLogAtOrBeforePosition(log *solana.Log, lastDeliveredBlock int64, lastDeliveredLogIndex int64) bool { + if log.BlockNumber < lastDeliveredBlock { + return true + } + if log.BlockNumber > lastDeliveredBlock { + return false + } + return log.LogIndex <= lastDeliveredLogIndex +} + func getEventSig(eventName string) solana.EventSignature { sig := lptypes.NewEventSignatureFromName(eventName) return solana.EventSignature(sig[:]) @@ -626,7 +639,7 @@ func (lts *SolanaLogTriggerService) deliverLogReliably( Payload: anyPayload, } - lts.lggr.Infow("Sending log event to pipe", "triggerID", triggerID, "eventID", eventID, "blockNumber", log.BlockNumber, "sequenceNum", log.SequenceNum, "txHash", log.TxHash) + lts.lggr.Infow("Sending log event to pipe", "triggerID", triggerID, "eventID", eventID, "blockNumber", log.BlockNumber, "logIndex", log.LogIndex, "txHash", log.TxHash) deliverCtx := capcommon.ContextWithOrgForDelivery(ctx, lts.lggr, lts.orgResolver, telemetryContext.RequestMetadata) if err := lts.baseTrigger.DeliverEvent(deliverCtx, te, triggerID); err != nil { summary := fmt.Sprintf("failed to persist/deliver event (triggerID=%s, eventID=%s): %v", triggerID, eventID, err) diff --git a/chain_capabilities/solana/trigger/trigger_test.go b/chain_capabilities/solana/trigger/trigger_test.go index 2a3d44d74..1dadae53d 100644 --- a/chain_capabilities/solana/trigger/trigger_test.go +++ b/chain_capabilities/solana/trigger/trigger_test.go @@ -127,21 +127,6 @@ func startPollingAsync( t.Cleanup(wg.Wait) } -func waitForLog( - t *testing.T, - logCh <-chan capabilities.TriggerAndId[*solanacappb.Log], - timeout time.Duration, -) capabilities.TriggerAndId[*solanacappb.Log] { - t.Helper() - select { - case response := <-logCh: - return response - case <-time.After(timeout): - t.Fatal("Timeout waiting for log") - return capabilities.TriggerAndId[*solanacappb.Log]{} - } -} - func setupTest(t *testing.T) (*SolanaLogTriggerService, *mocks.SolanaService) { mockSolanaService := mocks.NewSolanaService(t) @@ -181,15 +166,15 @@ func createTestRequest() *solanacappb.FilterLogTriggerRequest { } func createTestLog(blockNumber int64, address solana.PublicKey) *solana.Log { - return createTestLogWithSequence(blockNumber, blockNumber, address) + return createTestLogWithIndex(blockNumber, blockNumber, address) } -func createTestLogWithSequence(blockNumber int64, sequenceNum int64, address solana.PublicKey) *solana.Log { +func createTestLogWithIndex(blockNumber int64, logIndex int64, address solana.PublicKey) *solana.Log { return &solana.Log{ Address: address, EventSig: testEventSig, BlockNumber: blockNumber, - SequenceNum: sequenceNum, + LogIndex: logIndex, TxHash: solana.Signature{}, Data: []byte("test log data"), } @@ -410,20 +395,20 @@ func TestToLogPollerFilter(t *testing.T) { }) } -func TestPendingLogsAfterSequence(t *testing.T) { +func TestPendingLogsAfterPosition(t *testing.T) { t.Parallel() logs := []*solana.Log{ - createTestLogWithSequence(101, 1, testPublicKey), - createTestLogWithSequence(103, 3, testPublicKey), - createTestLogWithSequence(102, 2, testPublicKey), + createTestLogWithIndex(101, 1, testPublicKey), + createTestLogWithIndex(103, 3, testPublicKey), + createTestLogWithIndex(102, 2, testPublicKey), nil, } - pending := pendingLogsAfterSequence(logs, 1) + pending := pendingLogsAfterPosition(logs, 101, 1) require.Len(t, pending, 2) - require.Equal(t, int64(2), pending[0].SequenceNum) - require.Equal(t, int64(3), pending[1].SequenceNum) + require.Equal(t, int64(102), pending[0].BlockNumber) + require.Equal(t, int64(103), pending[1].BlockNumber) } func TestBuildQueryExpressions(t *testing.T) { @@ -733,7 +718,7 @@ func TestStartPolling(t *testing.T) { mockSolanaService.AssertExpectations(t) }) - t.Run("updates sequence cursor on successful delivery", func(t *testing.T) { + t.Run("updates delivery cursor on successful delivery", func(t *testing.T) { service, mockSolana := setupTest(t) baseCtx, cancel := context.WithTimeout(t.Context(), 100*time.Millisecond) defer cancel() @@ -854,7 +839,7 @@ func TestStartPolling(t *testing.T) { mockSolanaService.AssertExpectations(t) }) - t.Run("does not advance sequence cursor when inbox is full", func(t *testing.T) { + t.Run("does not advance delivery cursor when inbox is full", func(t *testing.T) { mockSolanaService := mocks.NewSolanaService(t) store := NewSolanaLogTriggerStore() @@ -916,46 +901,6 @@ func TestStartPolling(t *testing.T) { tests.AssertEventually(t, func() bool { return queryCallCount.Load() >= 2 }) }) - t.Run("delivers later-ingested lower block after higher block", func(t *testing.T) { - service, mockSolana := setupTest(t) - - meta := testRequestMetadata() - ctx := meta.ContextWithCRE(t.Context()) - - config := createTestRequest() - triggerID := "test-trigger" - startingBlock := int64(100) - logCh := make(chan capabilities.TriggerAndId[*solanacappb.Log], 10) - - // Higher block ingested first (sequence 1), lower block arrives later (sequence 2). - firstBatch := []*solana.Log{ - createTestLogWithSequence(102, 1, testPublicKey), - } - secondBatch := []*solana.Log{ - createTestLogWithSequence(101, 2, testPublicKey), - } - - var queryCallCount atomic.Int32 - mockSolana.EXPECT().QueryTrackedLogs(mock.Anything, mock.Anything, mock.Anything). - RunAndReturn(func(_ context.Context, _ []query.Expression, _ query.LimitAndSort) ([]*solana.Log, error) { - if queryCallCount.Add(1) == 1 { - return firstBatch, nil - } - return secondBatch, nil - }).Maybe() - - telemetryContext := createTestTelemetryContext() - startPollingAsync(ctx, t, service, telemetryContext, config, triggerID, startingBlock, logCh) - - first := waitForLog(t, logCh, 100*time.Millisecond) - assert.Equal(t, int64(102), first.Trigger.BlockNumber) - - second := waitForLog(t, logCh, 200*time.Millisecond) - assert.Equal(t, int64(101), second.Trigger.BlockNumber) - - mockSolana.AssertExpectations(t) - }) - t.Run("continues polling after transient errors", func(t *testing.T) { mockSolanaService := mocks.NewSolanaService(t) store := NewSolanaLogTriggerStore() From 29e8d186be458a51a3ee956aa86c89c95bed19e2 Mon Sep 17 00:00:00 2001 From: Silas Lenihan Date: Thu, 2 Jul 2026 15:48:21 -0400 Subject: [PATCH 3/4] bump solana --- chain_capabilities/solana/go.mod | 10 +- chain_capabilities/solana/go.sum | 20 +-- chain_capabilities/solana/trigger/trigger.go | 124 +++++++++++++++--- .../solana/trigger/trigger_test.go | 90 ++++++++++++- 4 files changed, 208 insertions(+), 36 deletions(-) diff --git a/chain_capabilities/solana/go.mod b/chain_capabilities/solana/go.mod index decd378d7..8dca4cd34 100644 --- a/chain_capabilities/solana/go.mod +++ b/chain_capabilities/solana/go.mod @@ -9,10 +9,10 @@ require ( github.com/smartcontractkit/capabilities/chain_capabilities/common v0.0.0-20260615195421-fb87220e503f github.com/smartcontractkit/capabilities/libs v0.0.0-20260609124022-2749e4a32bfb github.com/smartcontractkit/chain-selectors v1.0.100 - github.com/smartcontractkit/chainlink-common v0.11.2-0.20260602135221-cc7a5b50532a - github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260326180413-c69f27e37a13 - github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7 - github.com/smartcontractkit/chainlink-solana v1.1.2-0.20260420191419-ea62f88cbdb4 + github.com/smartcontractkit/chainlink-common v0.11.2-0.20260610124317-1a3c32c46eaa + github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243 + github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260604171908-6734db2d444f + github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260702194516-0c9666a3e925 github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260421131224-c46cbfe7bc6c github.com/smartcontractkit/libocr v0.0.0-20260403184524-b6409238958d github.com/stretchr/testify v1.11.1 @@ -132,7 +132,7 @@ require ( github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20260310183131-8d0f0e383288 // indirect github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20260310183131-8d0f0e383288 // indirect github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260601211238-9f526774fef0 // indirect - github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20251210101658-1c5c8e4c4f15 // indirect + github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260521164805-26d78d5e1243 // indirect github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b // indirect github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260319180422-b5808c964785 // indirect github.com/smartcontractkit/freeport v0.1.3-0.20250828155247-add56fa28aad // indirect diff --git a/chain_capabilities/solana/go.sum b/chain_capabilities/solana/go.sum index 70052a53a..5add77dc6 100644 --- a/chain_capabilities/solana/go.sum +++ b/chain_capabilities/solana/go.sum @@ -629,28 +629,28 @@ github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20260310183131-8 github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20260310183131-8d0f0e383288/go.mod h1:jPHlo/IN2YAArI001JJixmm6ZHQwgnAVJXY8VBFiFTc= github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20260310183131-8d0f0e383288 h1:eEjTgIQn4RW0ZPRepUDYTdgGwaRCMawMwgXkHItUc9U= github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20260310183131-8d0f0e383288/go.mod h1:67YbnoglYD61Pz/jTVCgav9wFq7S35OU8UyQSvPllRw= -github.com/smartcontractkit/chainlink-common v0.11.2-0.20260602135221-cc7a5b50532a h1:DHdGiDkMe+WS2e+Cy6ooNxGVX4gwiRRcgperFWmR4R4= -github.com/smartcontractkit/chainlink-common v0.11.2-0.20260602135221-cc7a5b50532a/go.mod h1:6jgqiFXFJHqjkvFFmuf8gvoUFa6Ygx/D1tKnIL+CCF8= +github.com/smartcontractkit/chainlink-common v0.11.2-0.20260610124317-1a3c32c46eaa h1:bd1aJVR6fPjyMsJKq5hEsR6SgpxacbMwG8LABkC5qq8= +github.com/smartcontractkit/chainlink-common v0.11.2-0.20260610124317-1a3c32c46eaa/go.mod h1:fP9RqD25/gTx3XqRstN8o4lAI3jp42vwJBLRZwRoOOM= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260601211238-9f526774fef0 h1:NExKM/D0HneOq/N5LGTbkV4VOa0UHCvfTNEb4GqYpto= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260601211238-9f526774fef0/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20251215152504-b1e41f508340 h1:PsjEI+5jZIz9AS4eOsLS5VpSWJINf38clXV3wryPyMk= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20251215152504-b1e41f508340/go.mod h1:P/0OSXUlFaxxD4B/P6HWbxYtIRmmWGDJAvanq19879c= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20251210101658-1c5c8e4c4f15 h1:IXF7+k8I1YY/yvXC1wnS3FAAggtCy6ByEQ9hv/F2FvQ= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20251210101658-1c5c8e4c4f15/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4= -github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260326180413-c69f27e37a13 h1:3KLLkTCIAy9CvT35Ey0k6pcWX/u+qsm3Y/58TI5VSAg= -github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260326180413-c69f27e37a13/go.mod h1:Y7h84PqCe/Vimf2h1Nc6tMiOJStDbtM33fEUeaaF5xk= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260521164805-26d78d5e1243 h1:vaFBupfFfImQgqOeuC7Muk2GflbYP6Gpi0Y/TLroFU8= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260521164805-26d78d5e1243/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4= +github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243 h1:71PGTkjdFZ0JrloEC2Fs8eHl1b1gmUuH+bq7q23usKk= +github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243/go.mod h1:7ketk4ischPQW/JQgmyHz6zdzLUJv1VC29SiSgosydQ= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 h1:GCzrxDWn3b7jFfEA+WiYRi8CKoegsayiDoJBCjYkneE= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7 h1:iljEJss3WOwcsMkWy72Yn2zvjw7Gyxc+RXL7r8YKM6g= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7/go.mod h1:vTFHTCbLui4Vn8fTmAadfE3rdnvfrDwOmMujmW857D0= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260604171908-6734db2d444f h1:t+OoYaXLdH0WHK2pbWKjTSnSQa5JBQD1+gf0yISYfQk= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260604171908-6734db2d444f/go.mod h1:vTFHTCbLui4Vn8fTmAadfE3rdnvfrDwOmMujmW857D0= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260319180422-b5808c964785 h1:oli+2uLU6jcrJGCuYFqk3475hiwL17SWlITWLv+tx/w= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260319180422-b5808c964785/go.mod h1:dkR2uYg9XYJuT1JASkPzWE51jjFkVb86P7a/yXe5/GM= github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528173149-f5b8336b19d9 h1:LQy2j2+TdKLSWsUTUYuqmQPn8kjqCLjGI3ZJYGtDc08= github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528173149-f5b8336b19d9/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= -github.com/smartcontractkit/chainlink-solana v1.1.2-0.20260420191419-ea62f88cbdb4 h1:WaQzRyMCmi7gddZraPtHSqCjZ5dnVYFYXqEtq2DsQjw= -github.com/smartcontractkit/chainlink-solana v1.1.2-0.20260420191419-ea62f88cbdb4/go.mod h1:sUsEwLtVPBlz0wPcysaolS+HVj9cOAt4jYhwE6J8dXg= +github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260702194516-0c9666a3e925 h1:9t7KPRezfZvFiLCG/ny7IeJ0K8HRY3oP26VlfEs1wvI= +github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260702194516-0c9666a3e925/go.mod h1:LXWUlBkE+f+FxPeMtaa669kUxJWYhkAzGrwW9affaKU= github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260421131224-c46cbfe7bc6c h1:Hn/80PyYFrQhRlNSaq9HY4cjc/7AuP9zyWLle22t34A= github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260421131224-c46cbfe7bc6c/go.mod h1:C5pZsbYX3qkhZTYWr1aYJi9QMfonFAun+Jl1npQ7UJA= github.com/smartcontractkit/freeport v0.1.3-0.20250828155247-add56fa28aad h1:lgHxTHuzJIF3Vj6LSMOnjhqKgRqYW+0MV2SExtCYL1Q= diff --git a/chain_capabilities/solana/trigger/trigger.go b/chain_capabilities/solana/trigger/trigger.go index ba5b4787b..9bd285ac0 100644 --- a/chain_capabilities/solana/trigger/trigger.go +++ b/chain_capabilities/solana/trigger/trigger.go @@ -487,12 +487,14 @@ func (lts *SolanaLogTriggerService) startPolling(ctx context.Context, telemetryC defer ticker.Stop() defer close(logCh) - // startingBlock is the finalized block at registration. The delivery cursor only advances to the - // last successfully delivered log and never jumps ahead within a batch. + // startingBlock is the finalized block at registration. The delivery cursor only advances on + // successful delivery. When log poller returns sequence numbers, progression is contiguous by + // sequence; otherwise it falls back to block/log-index position. var ( - lastDeliveredBlock = startingBlock - lastDeliveredLogIndex int64 - hasDelivered bool + lastDeliveredSequenceNum int64 + lastDeliveredBlock = startingBlock + lastDeliveredLogIndex int64 + hasDelivered bool ) for { @@ -503,6 +505,7 @@ func (lts *SolanaLogTriggerService) startPolling(ctx context.Context, telemetryC case <-ticker.C: lts.lggr.Debugw("Polling for trigger logs", "triggerID", triggerID, + "lastDeliveredSequenceNum", lastDeliveredSequenceNum, "lastDeliveredBlock", lastDeliveredBlock, "lastDeliveredLogIndex", lastDeliveredLogIndex, "startingBlock", startingBlock, @@ -529,7 +532,15 @@ func (lts *SolanaLogTriggerService) startPolling(ctx context.Context, telemetryC sentCount := 0 var latestDeliveredBlock int64 = startingBlock - pendingLogs := pendingLogsAfterPosition(logs, lastDeliveredBlock, lastDeliveredLogIndex) + useSequenceCursor := logsHaveSequenceNumbers(logs) + pendingLogs := deliverableLogsAfterCursor( + logs, + useSequenceCursor, + lastDeliveredSequenceNum, + lastDeliveredBlock, + lastDeliveredLogIndex, + hasDelivered, + ) for _, log := range pendingLogs { if log == nil { lts.lggr.Warnw("Received nil log from QueryTrackedLogs, skipping", "triggerID", triggerID) @@ -553,29 +564,64 @@ func (lts *SolanaLogTriggerService) startPolling(ctx context.Context, telemetryC sentCount++ hasDelivered = true - lastDeliveredBlock = log.BlockNumber - lastDeliveredLogIndex = log.LogIndex + if useSequenceCursor { + lastDeliveredSequenceNum = log.SequenceNum + } else { + lastDeliveredBlock = log.BlockNumber + lastDeliveredLogIndex = log.LogIndex + } if log.BlockNumber > latestDeliveredBlock { latestDeliveredBlock = log.BlockNumber } } - successMessage := fmt.Sprintf( - "Finished updating delivery cursor for triggerID: %s, BlockNumber: %d, LogIndex: %d, sent logs: %d", - triggerID, lastDeliveredBlock, lastDeliveredLogIndex, sentCount, - ) + var successMessage string + if useSequenceCursor { + successMessage = fmt.Sprintf( + "Finished updating delivery cursor for triggerID: %s, SequenceNum: %d, sent logs: %d", + triggerID, lastDeliveredSequenceNum, sentCount, + ) + } else { + successMessage = fmt.Sprintf( + "Finished updating delivery cursor for triggerID: %s, BlockNumber: %d, LogIndex: %d, sent logs: %d", + triggerID, lastDeliveredBlock, lastDeliveredLogIndex, sentCount, + ) + } lts.logAndEmitSuccess(ctx, successMessage, telemetryContext, triggerID, config, sentCount, latestDeliveredBlock) } } } -func pendingLogsAfterPosition(logs []*solana.Log, lastDeliveredBlock int64, lastDeliveredLogIndex int64) []*solana.Log { +func logsHaveSequenceNumbers(logs []*solana.Log) bool { + for _, log := range logs { + if log != nil && log.SequenceNum > 0 { + return true + } + } + return false +} + +func deliverableLogsAfterCursor( + logs []*solana.Log, + useSequenceCursor bool, + lastDeliveredSequenceNum int64, + lastDeliveredBlock int64, + lastDeliveredLogIndex int64, + hasDelivered bool, +) []*solana.Log { + if useSequenceCursor { + return deliverableLogsAfterSequence(logs, lastDeliveredSequenceNum, hasDelivered) + } + return pendingLogsAfterBlockPosition(logs, lastDeliveredBlock, lastDeliveredLogIndex) +} + +func pendingLogsAfterBlockPosition(logs []*solana.Log, lastDeliveredBlock int64, lastDeliveredLogIndex int64) []*solana.Log { pending := make([]*solana.Log, 0, len(logs)) for _, log := range logs { if log == nil { continue } - if isLogAtOrBeforePosition(log, lastDeliveredBlock, lastDeliveredLogIndex) { + if isLogAtOrBeforeBlockPosition(log, lastDeliveredBlock, lastDeliveredLogIndex) { continue } pending = append(pending, log) @@ -591,7 +637,7 @@ func pendingLogsAfterPosition(logs []*solana.Log, lastDeliveredBlock int64, last return pending } -func isLogAtOrBeforePosition(log *solana.Log, lastDeliveredBlock int64, lastDeliveredLogIndex int64) bool { +func isLogAtOrBeforeBlockPosition(log *solana.Log, lastDeliveredBlock int64, lastDeliveredLogIndex int64) bool { if log.BlockNumber < lastDeliveredBlock { return true } @@ -601,6 +647,54 @@ func isLogAtOrBeforePosition(log *solana.Log, lastDeliveredBlock int64, lastDeli return log.LogIndex <= lastDeliveredLogIndex } +func pendingLogsAfterSequence(logs []*solana.Log, lastDeliveredSequenceNum int64) []*solana.Log { + pending := make([]*solana.Log, 0, len(logs)) + for _, log := range logs { + if log == nil { + continue + } + if log.SequenceNum <= lastDeliveredSequenceNum { + continue + } + pending = append(pending, log) + } + + slices.SortFunc(pending, func(a, b *solana.Log) int { + return cmp.Compare(a.SequenceNum, b.SequenceNum) + }) + + return pending +} + +// deliverableLogsAfterSequence returns logs ready to deliver in contiguous sequence order. +// If the next expected sequence number is missing from the query result, delivery stops at the gap +// so that later-indexed logs are not delivered before earlier ones appear in the log poller. +func deliverableLogsAfterSequence(logs []*solana.Log, lastDeliveredSequenceNum int64, hasDelivered bool) []*solana.Log { + pending := pendingLogsAfterSequence(logs, lastDeliveredSequenceNum) + if len(pending) == 0 { + return nil + } + + expectedNext := lastDeliveredSequenceNum + 1 + if !hasDelivered { + expectedNext = pending[0].SequenceNum + } + + deliverable := make([]*solana.Log, 0, len(pending)) + for _, log := range pending { + if log.SequenceNum > expectedNext { + break + } + if log.SequenceNum < expectedNext { + continue + } + deliverable = append(deliverable, log) + expectedNext++ + } + + return deliverable +} + func getEventSig(eventName string) solana.EventSignature { sig := lptypes.NewEventSignatureFromName(eventName) return solana.EventSignature(sig[:]) diff --git a/chain_capabilities/solana/trigger/trigger_test.go b/chain_capabilities/solana/trigger/trigger_test.go index 1dadae53d..b8c3d24dc 100644 --- a/chain_capabilities/solana/trigger/trigger_test.go +++ b/chain_capabilities/solana/trigger/trigger_test.go @@ -166,15 +166,20 @@ func createTestRequest() *solanacappb.FilterLogTriggerRequest { } func createTestLog(blockNumber int64, address solana.PublicKey) *solana.Log { - return createTestLogWithIndex(blockNumber, blockNumber, address) + return createTestLogWithSequence(blockNumber, blockNumber, blockNumber, address) } func createTestLogWithIndex(blockNumber int64, logIndex int64, address solana.PublicKey) *solana.Log { + return createTestLogWithSequence(blockNumber, logIndex, blockNumber, address) +} + +func createTestLogWithSequence(blockNumber int64, logIndex int64, sequenceNum int64, address solana.PublicKey) *solana.Log { return &solana.Log{ Address: address, EventSig: testEventSig, BlockNumber: blockNumber, LogIndex: logIndex, + SequenceNum: sequenceNum, TxHash: solana.Signature{}, Data: []byte("test log data"), } @@ -395,22 +400,95 @@ func TestToLogPollerFilter(t *testing.T) { }) } -func TestPendingLogsAfterPosition(t *testing.T) { +func TestPendingLogsAfterBlockPosition(t *testing.T) { t.Parallel() logs := []*solana.Log{ - createTestLogWithIndex(101, 1, testPublicKey), - createTestLogWithIndex(103, 3, testPublicKey), - createTestLogWithIndex(102, 2, testPublicKey), + createTestLogWithSequence(101, 1, 1, testPublicKey), + createTestLogWithSequence(103, 3, 3, testPublicKey), + createTestLogWithSequence(102, 2, 2, testPublicKey), nil, } - pending := pendingLogsAfterPosition(logs, 101, 1) + pending := pendingLogsAfterBlockPosition(logs, 101, 1) require.Len(t, pending, 2) require.Equal(t, int64(102), pending[0].BlockNumber) require.Equal(t, int64(103), pending[1].BlockNumber) } +func TestPendingLogsAfterSequence(t *testing.T) { + t.Parallel() + + logs := []*solana.Log{ + createTestLogWithSequence(101, 1, 1, testPublicKey), + createTestLogWithSequence(103, 3, 3, testPublicKey), + createTestLogWithSequence(102, 2, 2, testPublicKey), + nil, + } + + pending := pendingLogsAfterSequence(logs, 1) + require.Len(t, pending, 2) + require.Equal(t, int64(2), pending[0].SequenceNum) + require.Equal(t, int64(3), pending[1].SequenceNum) +} + +func TestDeliverableLogsAfterSequence(t *testing.T) { + t.Parallel() + + t.Run("delivers contiguous sequences sorted by sequence number", func(t *testing.T) { + logs := []*solana.Log{ + createTestLogWithSequence(103, 3, 2, testPublicKey), + createTestLogWithSequence(101, 1, 1, testPublicKey), + } + + deliverable := deliverableLogsAfterSequence(logs, 0, false) + require.Len(t, deliverable, 2) + require.Equal(t, int64(1), deliverable[0].SequenceNum) + require.Equal(t, int64(2), deliverable[1].SequenceNum) + }) + + t.Run("does not skip ahead when later sequence is indexed first", func(t *testing.T) { + logs := []*solana.Log{ + createTestLogWithSequence(103, 3, 2, testPublicKey), + } + + deliverable := deliverableLogsAfterSequence(logs, 0, true) + require.Empty(t, deliverable) + + logs = append(logs, createTestLogWithSequence(101, 1, 1, testPublicKey)) + deliverable = deliverableLogsAfterSequence(logs, 0, true) + require.Len(t, deliverable, 2) + require.Equal(t, int64(101), deliverable[0].BlockNumber) + require.Equal(t, int64(103), deliverable[1].BlockNumber) + }) +} + +func TestDeliverableLogsAfterCursor(t *testing.T) { + t.Parallel() + + t.Run("uses block cursor when sequence numbers are unset", func(t *testing.T) { + logs := []*solana.Log{ + createTestLogWithSequence(101, 1, 0, testPublicKey), + createTestLogWithSequence(102, 2, 0, testPublicKey), + } + + deliverable := deliverableLogsAfterCursor(logs, false, 0, 100, 0, false) + require.Len(t, deliverable, 2) + require.Equal(t, int64(101), deliverable[0].BlockNumber) + }) + + t.Run("uses sequence cursor when sequence numbers are present", func(t *testing.T) { + logs := []*solana.Log{ + createTestLogWithSequence(103, 3, 2, testPublicKey), + createTestLogWithSequence(101, 1, 1, testPublicKey), + } + + deliverable := deliverableLogsAfterCursor(logs, true, 0, 100, 0, false) + require.Len(t, deliverable, 2) + require.Equal(t, int64(1), deliverable[0].SequenceNum) + }) +} + func TestBuildQueryExpressions(t *testing.T) { t.Run("omits starting block filter after first delivery", func(t *testing.T) { request := createTestRequest() From 20f8882a6bd5771c69ed2ec3b2478689bbc1caf5 Mon Sep 17 00:00:00 2001 From: Silas Lenihan Date: Thu, 2 Jul 2026 15:52:58 -0400 Subject: [PATCH 4/4] fix broken interface --- chain_capabilities/solana/actions/actions.go | 7 +++ chain_capabilities/solana/trigger/trigger.go | 9 +--- .../solana/trigger/trigger_test.go | 44 +++++++++++++------ 3 files changed, 40 insertions(+), 20 deletions(-) diff --git a/chain_capabilities/solana/actions/actions.go b/chain_capabilities/solana/actions/actions.go index d5b9a0036..395eb9dc7 100644 --- a/chain_capabilities/solana/actions/actions.go +++ b/chain_capabilities/solana/actions/actions.go @@ -168,6 +168,13 @@ func (s *Solana) GetMultipleAccountsWithOpts( return nil, GetError(errors.New("unimplemented"), false) } +func (s *Solana) GetProgramAccounts( + ctx context.Context, + metadata capabilities.RequestMetadata, + input *solcap.GetProgramAccountsRequest) (*capabilities.ResponseAndMetadata[*solcap.GetProgramAccountsReply], caperrors.Error) { + return nil, GetError(errors.New("unimplemented"), false) +} + func (s *Solana) GetSignatureStatuses( ctx context.Context, metadata capabilities.RequestMetadata, diff --git a/chain_capabilities/solana/trigger/trigger.go b/chain_capabilities/solana/trigger/trigger.go index 9bd285ac0..602cca12c 100644 --- a/chain_capabilities/solana/trigger/trigger.go +++ b/chain_capabilities/solana/trigger/trigger.go @@ -539,7 +539,6 @@ func (lts *SolanaLogTriggerService) startPolling(ctx context.Context, telemetryC lastDeliveredSequenceNum, lastDeliveredBlock, lastDeliveredLogIndex, - hasDelivered, ) for _, log := range pendingLogs { if log == nil { @@ -607,10 +606,9 @@ func deliverableLogsAfterCursor( lastDeliveredSequenceNum int64, lastDeliveredBlock int64, lastDeliveredLogIndex int64, - hasDelivered bool, ) []*solana.Log { if useSequenceCursor { - return deliverableLogsAfterSequence(logs, lastDeliveredSequenceNum, hasDelivered) + return deliverableLogsAfterSequence(logs, lastDeliveredSequenceNum) } return pendingLogsAfterBlockPosition(logs, lastDeliveredBlock, lastDeliveredLogIndex) } @@ -669,16 +667,13 @@ func pendingLogsAfterSequence(logs []*solana.Log, lastDeliveredSequenceNum int64 // deliverableLogsAfterSequence returns logs ready to deliver in contiguous sequence order. // If the next expected sequence number is missing from the query result, delivery stops at the gap // so that later-indexed logs are not delivered before earlier ones appear in the log poller. -func deliverableLogsAfterSequence(logs []*solana.Log, lastDeliveredSequenceNum int64, hasDelivered bool) []*solana.Log { +func deliverableLogsAfterSequence(logs []*solana.Log, lastDeliveredSequenceNum int64) []*solana.Log { pending := pendingLogsAfterSequence(logs, lastDeliveredSequenceNum) if len(pending) == 0 { return nil } expectedNext := lastDeliveredSequenceNum + 1 - if !hasDelivered { - expectedNext = pending[0].SequenceNum - } deliverable := make([]*solana.Log, 0, len(pending)) for _, log := range pending { diff --git a/chain_capabilities/solana/trigger/trigger_test.go b/chain_capabilities/solana/trigger/trigger_test.go index b8c3d24dc..6cbf7e675 100644 --- a/chain_capabilities/solana/trigger/trigger_test.go +++ b/chain_capabilities/solana/trigger/trigger_test.go @@ -135,7 +135,7 @@ func setupTest(t *testing.T) (*SolanaLogTriggerService, *mocks.SolanaService) { lggr := logger.Test(t) opts := LogTriggerServiceOpts{ - SolanaService: mockSolanaService, + SolanaService: mocks.WrapSolanaService(mockSolanaService), Logger: lggr, BeholderProcessor: NopBeholderProcessor{}, MessageBuilder: monitoring.NewMessageBuilder(types.ChainInfo{}, capabilities.CapabilityInfo{}, ""), @@ -166,11 +166,11 @@ func createTestRequest() *solanacappb.FilterLogTriggerRequest { } func createTestLog(blockNumber int64, address solana.PublicKey) *solana.Log { - return createTestLogWithSequence(blockNumber, blockNumber, blockNumber, address) + return createTestLogWithSequence(blockNumber, blockNumber, 0, address) } func createTestLogWithIndex(blockNumber int64, logIndex int64, address solana.PublicKey) *solana.Log { - return createTestLogWithSequence(blockNumber, logIndex, blockNumber, address) + return createTestLogWithSequence(blockNumber, logIndex, 0, address) } func createTestLogWithSequence(blockNumber int64, logIndex int64, sequenceNum int64, address solana.PublicKey) *solana.Log { @@ -441,7 +441,7 @@ func TestDeliverableLogsAfterSequence(t *testing.T) { createTestLogWithSequence(101, 1, 1, testPublicKey), } - deliverable := deliverableLogsAfterSequence(logs, 0, false) + deliverable := deliverableLogsAfterSequence(logs, 0) require.Len(t, deliverable, 2) require.Equal(t, int64(1), deliverable[0].SequenceNum) require.Equal(t, int64(2), deliverable[1].SequenceNum) @@ -452,15 +452,33 @@ func TestDeliverableLogsAfterSequence(t *testing.T) { createTestLogWithSequence(103, 3, 2, testPublicKey), } - deliverable := deliverableLogsAfterSequence(logs, 0, true) + deliverable := deliverableLogsAfterSequence(logs, 0) require.Empty(t, deliverable) logs = append(logs, createTestLogWithSequence(101, 1, 1, testPublicKey)) - deliverable = deliverableLogsAfterSequence(logs, 0, true) + deliverable = deliverableLogsAfterSequence(logs, 0) require.Len(t, deliverable, 2) require.Equal(t, int64(101), deliverable[0].BlockNumber) require.Equal(t, int64(103), deliverable[1].BlockNumber) }) + + t.Run("does not jump to first pending sequence on cold start", func(t *testing.T) { + logs := []*solana.Log{ + createTestLogWithSequence(103, 3, 8, testPublicKey), + createTestLogWithSequence(104, 4, 9, testPublicKey), + } + + deliverable := deliverableLogsAfterSequence(logs, 0) + require.Empty(t, deliverable) + + for seq := int64(1); seq <= 9; seq++ { + logs = append(logs, createTestLogWithSequence(100+seq, seq, seq, testPublicKey)) + } + deliverable = deliverableLogsAfterSequence(logs, 0) + require.Len(t, deliverable, 9) + require.Equal(t, int64(1), deliverable[0].SequenceNum) + require.Equal(t, int64(9), deliverable[8].SequenceNum) + }) } func TestDeliverableLogsAfterCursor(t *testing.T) { @@ -472,7 +490,7 @@ func TestDeliverableLogsAfterCursor(t *testing.T) { createTestLogWithSequence(102, 2, 0, testPublicKey), } - deliverable := deliverableLogsAfterCursor(logs, false, 0, 100, 0, false) + deliverable := deliverableLogsAfterCursor(logs, false, 0, 100, 0) require.Len(t, deliverable, 2) require.Equal(t, int64(101), deliverable[0].BlockNumber) }) @@ -483,7 +501,7 @@ func TestDeliverableLogsAfterCursor(t *testing.T) { createTestLogWithSequence(101, 1, 1, testPublicKey), } - deliverable := deliverableLogsAfterCursor(logs, true, 0, 100, 0, false) + deliverable := deliverableLogsAfterCursor(logs, true, 0, 100, 0) require.Len(t, deliverable, 2) require.Equal(t, int64(1), deliverable[0].SequenceNum) }) @@ -754,7 +772,7 @@ func TestStartPolling(t *testing.T) { store := NewSolanaLogTriggerStore() opts := LogTriggerServiceOpts{ - SolanaService: mockSolanaService, + SolanaService: mocks.WrapSolanaService(mockSolanaService), Logger: logger.Nop(), BeholderProcessor: NopBeholderProcessor{}, MessageBuilder: monitoring.NewMessageBuilder(types.ChainInfo{}, capabilities.CapabilityInfo{}, ""), @@ -866,7 +884,7 @@ func TestStartPolling(t *testing.T) { // Create service with very small buffer lggr := logger.Test(t) opts := LogTriggerServiceOpts{ - SolanaService: mockSolanaService, + SolanaService: mocks.WrapSolanaService(mockSolanaService), Logger: lggr, Triggers: store, LogTriggerPollInterval: 1 * time.Millisecond, @@ -923,7 +941,7 @@ func TestStartPolling(t *testing.T) { lggr := logger.Test(t) opts := LogTriggerServiceOpts{ - SolanaService: mockSolanaService, + SolanaService: mocks.WrapSolanaService(mockSolanaService), Logger: lggr, Triggers: store, LogTriggerPollInterval: 10 * time.Millisecond, @@ -985,7 +1003,7 @@ func TestStartPolling(t *testing.T) { lggr := logger.Test(t) opts := LogTriggerServiceOpts{ - SolanaService: mockSolanaService, + SolanaService: mocks.WrapSolanaService(mockSolanaService), Logger: lggr, Triggers: store, LogTriggerPollInterval: 5 * time.Millisecond, @@ -1280,7 +1298,7 @@ func TestSolanaLogTriggerService_NewLogTriggerService(t *testing.T) { lggr := logger.Test(t) opts := LogTriggerServiceOpts{ - SolanaService: mockService, + SolanaService: mocks.WrapSolanaService(mockService), Logger: lggr, Triggers: store, LogTriggerPollInterval: 5 * time.Second,