diff --git a/chain_capabilities/solana/actions/actions.go b/chain_capabilities/solana/actions/actions.go index d5b9a0036..395eb9dc7 100644 --- a/chain_capabilities/solana/actions/actions.go +++ b/chain_capabilities/solana/actions/actions.go @@ -168,6 +168,13 @@ func (s *Solana) GetMultipleAccountsWithOpts( return nil, GetError(errors.New("unimplemented"), false) } +func (s *Solana) GetProgramAccounts( + ctx context.Context, + metadata capabilities.RequestMetadata, + input *solcap.GetProgramAccountsRequest) (*capabilities.ResponseAndMetadata[*solcap.GetProgramAccountsReply], caperrors.Error) { + return nil, GetError(errors.New("unimplemented"), false) +} + func (s *Solana) GetSignatureStatuses( ctx context.Context, metadata capabilities.RequestMetadata, diff --git a/chain_capabilities/solana/go.mod b/chain_capabilities/solana/go.mod index decd378d7..8dca4cd34 100644 --- a/chain_capabilities/solana/go.mod +++ b/chain_capabilities/solana/go.mod @@ -9,10 +9,10 @@ require ( github.com/smartcontractkit/capabilities/chain_capabilities/common v0.0.0-20260615195421-fb87220e503f github.com/smartcontractkit/capabilities/libs v0.0.0-20260609124022-2749e4a32bfb github.com/smartcontractkit/chain-selectors v1.0.100 - github.com/smartcontractkit/chainlink-common v0.11.2-0.20260602135221-cc7a5b50532a - github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260326180413-c69f27e37a13 - github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7 - github.com/smartcontractkit/chainlink-solana v1.1.2-0.20260420191419-ea62f88cbdb4 + github.com/smartcontractkit/chainlink-common v0.11.2-0.20260610124317-1a3c32c46eaa + github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243 + github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260604171908-6734db2d444f + github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260702194516-0c9666a3e925 github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260421131224-c46cbfe7bc6c github.com/smartcontractkit/libocr v0.0.0-20260403184524-b6409238958d github.com/stretchr/testify v1.11.1 @@ -132,7 +132,7 @@ require ( github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20260310183131-8d0f0e383288 // indirect github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20260310183131-8d0f0e383288 // indirect github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260601211238-9f526774fef0 // indirect - github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20251210101658-1c5c8e4c4f15 // indirect + github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260521164805-26d78d5e1243 // indirect github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b // indirect github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260319180422-b5808c964785 // indirect github.com/smartcontractkit/freeport v0.1.3-0.20250828155247-add56fa28aad // indirect diff --git a/chain_capabilities/solana/go.sum b/chain_capabilities/solana/go.sum index 70052a53a..5add77dc6 100644 --- a/chain_capabilities/solana/go.sum +++ b/chain_capabilities/solana/go.sum @@ -629,28 +629,28 @@ github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20260310183131-8 github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20260310183131-8d0f0e383288/go.mod h1:jPHlo/IN2YAArI001JJixmm6ZHQwgnAVJXY8VBFiFTc= github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20260310183131-8d0f0e383288 h1:eEjTgIQn4RW0ZPRepUDYTdgGwaRCMawMwgXkHItUc9U= github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20260310183131-8d0f0e383288/go.mod h1:67YbnoglYD61Pz/jTVCgav9wFq7S35OU8UyQSvPllRw= -github.com/smartcontractkit/chainlink-common v0.11.2-0.20260602135221-cc7a5b50532a h1:DHdGiDkMe+WS2e+Cy6ooNxGVX4gwiRRcgperFWmR4R4= -github.com/smartcontractkit/chainlink-common v0.11.2-0.20260602135221-cc7a5b50532a/go.mod h1:6jgqiFXFJHqjkvFFmuf8gvoUFa6Ygx/D1tKnIL+CCF8= +github.com/smartcontractkit/chainlink-common v0.11.2-0.20260610124317-1a3c32c46eaa h1:bd1aJVR6fPjyMsJKq5hEsR6SgpxacbMwG8LABkC5qq8= +github.com/smartcontractkit/chainlink-common v0.11.2-0.20260610124317-1a3c32c46eaa/go.mod h1:fP9RqD25/gTx3XqRstN8o4lAI3jp42vwJBLRZwRoOOM= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260601211238-9f526774fef0 h1:NExKM/D0HneOq/N5LGTbkV4VOa0UHCvfTNEb4GqYpto= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260601211238-9f526774fef0/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20251215152504-b1e41f508340 h1:PsjEI+5jZIz9AS4eOsLS5VpSWJINf38clXV3wryPyMk= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20251215152504-b1e41f508340/go.mod h1:P/0OSXUlFaxxD4B/P6HWbxYtIRmmWGDJAvanq19879c= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20251210101658-1c5c8e4c4f15 h1:IXF7+k8I1YY/yvXC1wnS3FAAggtCy6ByEQ9hv/F2FvQ= -github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20251210101658-1c5c8e4c4f15/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4= -github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260326180413-c69f27e37a13 h1:3KLLkTCIAy9CvT35Ey0k6pcWX/u+qsm3Y/58TI5VSAg= -github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260326180413-c69f27e37a13/go.mod h1:Y7h84PqCe/Vimf2h1Nc6tMiOJStDbtM33fEUeaaF5xk= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260521164805-26d78d5e1243 h1:vaFBupfFfImQgqOeuC7Muk2GflbYP6Gpi0Y/TLroFU8= +github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260521164805-26d78d5e1243/go.mod h1:HG/aei0MgBOpsyRLexdKGtOUO8yjSJO3iUu0Uu8KBm4= +github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243 h1:71PGTkjdFZ0JrloEC2Fs8eHl1b1gmUuH+bq7q23usKk= +github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243/go.mod h1:7ketk4ischPQW/JQgmyHz6zdzLUJv1VC29SiSgosydQ= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 h1:GCzrxDWn3b7jFfEA+WiYRi8CKoegsayiDoJBCjYkneE= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7 h1:iljEJss3WOwcsMkWy72Yn2zvjw7Gyxc+RXL7r8YKM6g= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7/go.mod h1:vTFHTCbLui4Vn8fTmAadfE3rdnvfrDwOmMujmW857D0= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260604171908-6734db2d444f h1:t+OoYaXLdH0WHK2pbWKjTSnSQa5JBQD1+gf0yISYfQk= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260604171908-6734db2d444f/go.mod h1:vTFHTCbLui4Vn8fTmAadfE3rdnvfrDwOmMujmW857D0= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260319180422-b5808c964785 h1:oli+2uLU6jcrJGCuYFqk3475hiwL17SWlITWLv+tx/w= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260319180422-b5808c964785/go.mod h1:dkR2uYg9XYJuT1JASkPzWE51jjFkVb86P7a/yXe5/GM= github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528173149-f5b8336b19d9 h1:LQy2j2+TdKLSWsUTUYuqmQPn8kjqCLjGI3ZJYGtDc08= github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528173149-f5b8336b19d9/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= -github.com/smartcontractkit/chainlink-solana v1.1.2-0.20260420191419-ea62f88cbdb4 h1:WaQzRyMCmi7gddZraPtHSqCjZ5dnVYFYXqEtq2DsQjw= -github.com/smartcontractkit/chainlink-solana v1.1.2-0.20260420191419-ea62f88cbdb4/go.mod h1:sUsEwLtVPBlz0wPcysaolS+HVj9cOAt4jYhwE6J8dXg= +github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260702194516-0c9666a3e925 h1:9t7KPRezfZvFiLCG/ny7IeJ0K8HRY3oP26VlfEs1wvI= +github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260702194516-0c9666a3e925/go.mod h1:LXWUlBkE+f+FxPeMtaa669kUxJWYhkAzGrwW9affaKU= github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260421131224-c46cbfe7bc6c h1:Hn/80PyYFrQhRlNSaq9HY4cjc/7AuP9zyWLle22t34A= github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260421131224-c46cbfe7bc6c/go.mod h1:C5pZsbYX3qkhZTYWr1aYJi9QMfonFAun+Jl1npQ7UJA= github.com/smartcontractkit/freeport v0.1.3-0.20250828155247-add56fa28aad h1:lgHxTHuzJIF3Vj6LSMOnjhqKgRqYW+0MV2SExtCYL1Q= diff --git a/chain_capabilities/solana/trigger/trigger.go b/chain_capabilities/solana/trigger/trigger.go index 44bc2b0f9..602cca12c 100644 --- a/chain_capabilities/solana/trigger/trigger.go +++ b/chain_capabilities/solana/trigger/trigger.go @@ -2,7 +2,9 @@ package trigger import ( "context" + "cmp" "fmt" + "slices" "strconv" "strings" "time" @@ -89,15 +91,18 @@ func (lts *SolanaLogTriggerService) ToLogPollerFilter(triggerID string, config * }, nil } -// BuildQueryExpressions builds query expressions including subkey filters -func BuildQueryExpressions(config *solanacappb.FilterLogTriggerRequest, lastProcessedBlock int64) ([]query.Expression, error) { +// BuildQueryExpressions builds query expressions including subkey filters. +// When includeStartingBlockFilter is true and startingBlock >= 0, results are limited to logs +// after the registration-time finalized block. Once delivery starts, progression uses the last +// successfully delivered block/log-index position. +func BuildQueryExpressions(config *solanacappb.FilterLogTriggerRequest, startingBlock int64, includeStartingBlockFilter bool) ([]query.Expression, error) { expressions := []query.Expression{ solprimitives.NewAddressFilter(solana.PublicKey(config.Address)), solprimitives.NewEventSigFilter(getEventSig(config.EventName)), } - if lastProcessedBlock >= 0 { - blockStr := strconv.FormatInt(lastProcessedBlock, 10) + if includeStartingBlockFilter && startingBlock >= 0 { + blockStr := strconv.FormatInt(startingBlock, 10) expressions = append(expressions, query.Block(blockStr, primitives.Gt)) } @@ -482,8 +487,15 @@ func (lts *SolanaLogTriggerService) startPolling(ctx context.Context, telemetryC defer ticker.Stop() defer close(logCh) - // Use the finalized block number from registration as the starting point - lastProcessedBlock := startingBlock + // startingBlock is the finalized block at registration. The delivery cursor only advances on + // successful delivery. When log poller returns sequence numbers, progression is contiguous by + // sequence; otherwise it falls back to block/log-index position. + var ( + lastDeliveredSequenceNum int64 + lastDeliveredBlock = startingBlock + lastDeliveredLogIndex int64 + hasDelivered bool + ) for { select { @@ -491,8 +503,15 @@ func (lts *SolanaLogTriggerService) startPolling(ctx context.Context, telemetryC lts.lggr.Debugf("Context cancelled for triggerID: %s, stopping polling", triggerID) return case <-ticker.C: - lts.lggr.Debugf("Awake, polling for triggerID: %s, currentOffset: %d", triggerID, lastProcessedBlock) - expressions, err := BuildQueryExpressions(config, lastProcessedBlock) + lts.lggr.Debugw("Polling for trigger logs", + "triggerID", triggerID, + "lastDeliveredSequenceNum", lastDeliveredSequenceNum, + "lastDeliveredBlock", lastDeliveredBlock, + "lastDeliveredLogIndex", lastDeliveredLogIndex, + "startingBlock", startingBlock, + ) + includeStartingBlockFilter := !hasDelivered + expressions, err := BuildQueryExpressions(config, startingBlock, includeStartingBlockFilter) if err != nil { summary := fmt.Sprintf("Failed to build query expressions for trigger %s: %v", triggerID, err) lts.logAndEmitError(ctx, telemetryContext, triggerID, summary, err.Error()) @@ -512,39 +531,163 @@ func (lts *SolanaLogTriggerService) startPolling(ctx context.Context, telemetryC } sentCount := 0 - calculatedLatestBlock := lastProcessedBlock - for _, log := range logs { + var latestDeliveredBlock int64 = startingBlock + useSequenceCursor := logsHaveSequenceNumbers(logs) + pendingLogs := deliverableLogsAfterCursor( + logs, + useSequenceCursor, + lastDeliveredSequenceNum, + lastDeliveredBlock, + lastDeliveredLogIndex, + ) + for _, log := range pendingLogs { if log == nil { lts.lggr.Warnw("Received nil log from QueryTrackedLogs, skipping", "triggerID", triggerID) continue } - // Track the highest block number from logs (all logs from log poller are already finalized) - if log.BlockNumber > calculatedLatestBlock { - calculatedLatestBlock = log.BlockNumber - } - protoLog := solanacappb.ConvertLogToProto(log) eventID := lts.generateLogIdentifier(log) checksLimitsOk := lts.checkLimitsOnLog(ctx, telemetryContext, protoLog, triggerID, eventID, log) if !checksLimitsOk { - continue + break } if ctx.Err() != nil { return } - if lts.deliverLogReliably(ctx, telemetryContext, triggerID, protoLog, eventID, log) { - sentCount++ + if !lts.deliverLogReliably(ctx, telemetryContext, triggerID, protoLog, eventID, log) { + break + } + + sentCount++ + hasDelivered = true + if useSequenceCursor { + lastDeliveredSequenceNum = log.SequenceNum + } else { + lastDeliveredBlock = log.BlockNumber + lastDeliveredLogIndex = log.LogIndex + } + if log.BlockNumber > latestDeliveredBlock { + latestDeliveredBlock = log.BlockNumber } } - lastProcessedBlock = calculatedLatestBlock - successMessage := fmt.Sprintf("Finished updating BlockNumber for triggerID: %s, BlockNumber: %d, sent logs: %d", triggerID, calculatedLatestBlock, sentCount) - lts.logAndEmitSuccess(ctx, successMessage, telemetryContext, triggerID, config, sentCount, calculatedLatestBlock) + var successMessage string + if useSequenceCursor { + successMessage = fmt.Sprintf( + "Finished updating delivery cursor for triggerID: %s, SequenceNum: %d, sent logs: %d", + triggerID, lastDeliveredSequenceNum, sentCount, + ) + } else { + successMessage = fmt.Sprintf( + "Finished updating delivery cursor for triggerID: %s, BlockNumber: %d, LogIndex: %d, sent logs: %d", + triggerID, lastDeliveredBlock, lastDeliveredLogIndex, sentCount, + ) + } + lts.logAndEmitSuccess(ctx, successMessage, telemetryContext, triggerID, config, sentCount, latestDeliveredBlock) + } + } +} + +func logsHaveSequenceNumbers(logs []*solana.Log) bool { + for _, log := range logs { + if log != nil && log.SequenceNum > 0 { + return true + } + } + return false +} + +func deliverableLogsAfterCursor( + logs []*solana.Log, + useSequenceCursor bool, + lastDeliveredSequenceNum int64, + lastDeliveredBlock int64, + lastDeliveredLogIndex int64, +) []*solana.Log { + if useSequenceCursor { + return deliverableLogsAfterSequence(logs, lastDeliveredSequenceNum) + } + return pendingLogsAfterBlockPosition(logs, lastDeliveredBlock, lastDeliveredLogIndex) +} + +func pendingLogsAfterBlockPosition(logs []*solana.Log, lastDeliveredBlock int64, lastDeliveredLogIndex int64) []*solana.Log { + pending := make([]*solana.Log, 0, len(logs)) + for _, log := range logs { + if log == nil { + continue + } + if isLogAtOrBeforeBlockPosition(log, lastDeliveredBlock, lastDeliveredLogIndex) { + continue } + pending = append(pending, log) } + + slices.SortFunc(pending, func(a, b *solana.Log) int { + if c := cmp.Compare(a.BlockNumber, b.BlockNumber); c != 0 { + return c + } + return cmp.Compare(a.LogIndex, b.LogIndex) + }) + + return pending +} + +func isLogAtOrBeforeBlockPosition(log *solana.Log, lastDeliveredBlock int64, lastDeliveredLogIndex int64) bool { + if log.BlockNumber < lastDeliveredBlock { + return true + } + if log.BlockNumber > lastDeliveredBlock { + return false + } + return log.LogIndex <= lastDeliveredLogIndex +} + +func pendingLogsAfterSequence(logs []*solana.Log, lastDeliveredSequenceNum int64) []*solana.Log { + pending := make([]*solana.Log, 0, len(logs)) + for _, log := range logs { + if log == nil { + continue + } + if log.SequenceNum <= lastDeliveredSequenceNum { + continue + } + pending = append(pending, log) + } + + slices.SortFunc(pending, func(a, b *solana.Log) int { + return cmp.Compare(a.SequenceNum, b.SequenceNum) + }) + + return pending +} + +// deliverableLogsAfterSequence returns logs ready to deliver in contiguous sequence order. +// If the next expected sequence number is missing from the query result, delivery stops at the gap +// so that later-indexed logs are not delivered before earlier ones appear in the log poller. +func deliverableLogsAfterSequence(logs []*solana.Log, lastDeliveredSequenceNum int64) []*solana.Log { + pending := pendingLogsAfterSequence(logs, lastDeliveredSequenceNum) + if len(pending) == 0 { + return nil + } + + expectedNext := lastDeliveredSequenceNum + 1 + + deliverable := make([]*solana.Log, 0, len(pending)) + for _, log := range pending { + if log.SequenceNum > expectedNext { + break + } + if log.SequenceNum < expectedNext { + continue + } + deliverable = append(deliverable, log) + expectedNext++ + } + + return deliverable } func getEventSig(eventName string) solana.EventSignature { @@ -585,7 +728,7 @@ func (lts *SolanaLogTriggerService) deliverLogReliably( Payload: anyPayload, } - lts.lggr.Infow("Sending log event to pipe", "triggerID", triggerID, "eventID", eventID, "blockNumber", log.BlockNumber, "txHash", log.TxHash) + lts.lggr.Infow("Sending log event to pipe", "triggerID", triggerID, "eventID", eventID, "blockNumber", log.BlockNumber, "logIndex", log.LogIndex, "txHash", log.TxHash) deliverCtx := capcommon.ContextWithOrgForDelivery(ctx, lts.lggr, lts.orgResolver, telemetryContext.RequestMetadata) if err := lts.baseTrigger.DeliverEvent(deliverCtx, te, triggerID); err != nil { summary := fmt.Sprintf("failed to persist/deliver event (triggerID=%s, eventID=%s): %v", triggerID, eventID, err) diff --git a/chain_capabilities/solana/trigger/trigger_test.go b/chain_capabilities/solana/trigger/trigger_test.go index 14ae54a85..6cbf7e675 100644 --- a/chain_capabilities/solana/trigger/trigger_test.go +++ b/chain_capabilities/solana/trigger/trigger_test.go @@ -135,7 +135,7 @@ func setupTest(t *testing.T) (*SolanaLogTriggerService, *mocks.SolanaService) { lggr := logger.Test(t) opts := LogTriggerServiceOpts{ - SolanaService: mockSolanaService, + SolanaService: mocks.WrapSolanaService(mockSolanaService), Logger: lggr, BeholderProcessor: NopBeholderProcessor{}, MessageBuilder: monitoring.NewMessageBuilder(types.ChainInfo{}, capabilities.CapabilityInfo{}, ""), @@ -166,10 +166,20 @@ func createTestRequest() *solanacappb.FilterLogTriggerRequest { } func createTestLog(blockNumber int64, address solana.PublicKey) *solana.Log { + return createTestLogWithSequence(blockNumber, blockNumber, 0, address) +} + +func createTestLogWithIndex(blockNumber int64, logIndex int64, address solana.PublicKey) *solana.Log { + return createTestLogWithSequence(blockNumber, logIndex, 0, address) +} + +func createTestLogWithSequence(blockNumber int64, logIndex int64, sequenceNum int64, address solana.PublicKey) *solana.Log { return &solana.Log{ Address: address, EventSig: testEventSig, BlockNumber: blockNumber, + LogIndex: logIndex, + SequenceNum: sequenceNum, TxHash: solana.Signature{}, Data: []byte("test log data"), } @@ -390,13 +400,132 @@ func TestToLogPollerFilter(t *testing.T) { }) } +func TestPendingLogsAfterBlockPosition(t *testing.T) { + t.Parallel() + + logs := []*solana.Log{ + createTestLogWithSequence(101, 1, 1, testPublicKey), + createTestLogWithSequence(103, 3, 3, testPublicKey), + createTestLogWithSequence(102, 2, 2, testPublicKey), + nil, + } + + pending := pendingLogsAfterBlockPosition(logs, 101, 1) + require.Len(t, pending, 2) + require.Equal(t, int64(102), pending[0].BlockNumber) + require.Equal(t, int64(103), pending[1].BlockNumber) +} + +func TestPendingLogsAfterSequence(t *testing.T) { + t.Parallel() + + logs := []*solana.Log{ + createTestLogWithSequence(101, 1, 1, testPublicKey), + createTestLogWithSequence(103, 3, 3, testPublicKey), + createTestLogWithSequence(102, 2, 2, testPublicKey), + nil, + } + + pending := pendingLogsAfterSequence(logs, 1) + require.Len(t, pending, 2) + require.Equal(t, int64(2), pending[0].SequenceNum) + require.Equal(t, int64(3), pending[1].SequenceNum) +} + +func TestDeliverableLogsAfterSequence(t *testing.T) { + t.Parallel() + + t.Run("delivers contiguous sequences sorted by sequence number", func(t *testing.T) { + logs := []*solana.Log{ + createTestLogWithSequence(103, 3, 2, testPublicKey), + createTestLogWithSequence(101, 1, 1, testPublicKey), + } + + deliverable := deliverableLogsAfterSequence(logs, 0) + require.Len(t, deliverable, 2) + require.Equal(t, int64(1), deliverable[0].SequenceNum) + require.Equal(t, int64(2), deliverable[1].SequenceNum) + }) + + t.Run("does not skip ahead when later sequence is indexed first", func(t *testing.T) { + logs := []*solana.Log{ + createTestLogWithSequence(103, 3, 2, testPublicKey), + } + + deliverable := deliverableLogsAfterSequence(logs, 0) + require.Empty(t, deliverable) + + logs = append(logs, createTestLogWithSequence(101, 1, 1, testPublicKey)) + deliverable = deliverableLogsAfterSequence(logs, 0) + require.Len(t, deliverable, 2) + require.Equal(t, int64(101), deliverable[0].BlockNumber) + require.Equal(t, int64(103), deliverable[1].BlockNumber) + }) + + t.Run("does not jump to first pending sequence on cold start", func(t *testing.T) { + logs := []*solana.Log{ + createTestLogWithSequence(103, 3, 8, testPublicKey), + createTestLogWithSequence(104, 4, 9, testPublicKey), + } + + deliverable := deliverableLogsAfterSequence(logs, 0) + require.Empty(t, deliverable) + + for seq := int64(1); seq <= 9; seq++ { + logs = append(logs, createTestLogWithSequence(100+seq, seq, seq, testPublicKey)) + } + deliverable = deliverableLogsAfterSequence(logs, 0) + require.Len(t, deliverable, 9) + require.Equal(t, int64(1), deliverable[0].SequenceNum) + require.Equal(t, int64(9), deliverable[8].SequenceNum) + }) +} + +func TestDeliverableLogsAfterCursor(t *testing.T) { + t.Parallel() + + t.Run("uses block cursor when sequence numbers are unset", func(t *testing.T) { + logs := []*solana.Log{ + createTestLogWithSequence(101, 1, 0, testPublicKey), + createTestLogWithSequence(102, 2, 0, testPublicKey), + } + + deliverable := deliverableLogsAfterCursor(logs, false, 0, 100, 0) + require.Len(t, deliverable, 2) + require.Equal(t, int64(101), deliverable[0].BlockNumber) + }) + + t.Run("uses sequence cursor when sequence numbers are present", func(t *testing.T) { + logs := []*solana.Log{ + createTestLogWithSequence(103, 3, 2, testPublicKey), + createTestLogWithSequence(101, 1, 1, testPublicKey), + } + + deliverable := deliverableLogsAfterCursor(logs, true, 0, 100, 0) + require.Len(t, deliverable, 2) + require.Equal(t, int64(1), deliverable[0].SequenceNum) + }) +} + func TestBuildQueryExpressions(t *testing.T) { + t.Run("omits starting block filter after first delivery", func(t *testing.T) { + request := createTestRequest() + + withBlock, err := BuildQueryExpressions(request, 99, true) + require.NoError(t, err) + require.Len(t, withBlock, 4) + + withoutBlock, err := BuildQueryExpressions(request, 99, false) + require.NoError(t, err) + require.Len(t, withoutBlock, 3) + }) + t.Run("builds basic expressions", func(t *testing.T) { request := &solanacappb.FilterLogTriggerRequest{ Address: testPublicKey[:], } - expressions, err := BuildQueryExpressions(request, 99) + expressions, err := BuildQueryExpressions(request, 99, true) require.NoError(t, err) require.Len(t, expressions, 3) @@ -408,7 +537,7 @@ func TestBuildQueryExpressions(t *testing.T) { Subkeys: testSubkeys, } - expressions, err := BuildQueryExpressions(request, 99) + expressions, err := BuildQueryExpressions(request, 99, true) require.NoError(t, err) require.Len(t, expressions, 4) @@ -422,7 +551,7 @@ func TestBuildQueryExpressions(t *testing.T) { }, } - expressions, err := BuildQueryExpressions(request, 99) + expressions, err := BuildQueryExpressions(request, 99, true) require.NoError(t, err) require.Len(t, expressions, 3) @@ -447,7 +576,7 @@ func TestLogTriggerSubkeyFilters(t *testing.T) { }, } - expressions, err := BuildQueryExpressions(request, 99) + expressions, err := BuildQueryExpressions(request, 99, true) require.NoError(t, err) require.NotNil(t, expressions) @@ -493,7 +622,7 @@ func TestLogTriggerSubkeyFilters(t *testing.T) { }, } - expressions, err := BuildQueryExpressions(request, 49) + expressions, err := BuildQueryExpressions(request, 49, true) require.NoError(t, err) require.NotNil(t, expressions) @@ -526,7 +655,7 @@ func TestLogTriggerSubkeyFilters(t *testing.T) { }, } - expressions, err := BuildQueryExpressions(request, 0) + expressions, err := BuildQueryExpressions(request, 0, true) require.NoError(t, err) require.NotNil(t, expressions) @@ -558,7 +687,7 @@ func TestLogTriggerSubkeyFilters(t *testing.T) { }, } - expressions, err := BuildQueryExpressions(request, 0) + expressions, err := BuildQueryExpressions(request, 0, true) require.NoError(t, err) require.NotNil(t, expressions) @@ -643,7 +772,7 @@ func TestStartPolling(t *testing.T) { store := NewSolanaLogTriggerStore() opts := LogTriggerServiceOpts{ - SolanaService: mockSolanaService, + SolanaService: mocks.WrapSolanaService(mockSolanaService), Logger: logger.Nop(), BeholderProcessor: NopBeholderProcessor{}, MessageBuilder: monitoring.NewMessageBuilder(types.ChainInfo{}, capabilities.CapabilityInfo{}, ""), @@ -685,7 +814,7 @@ func TestStartPolling(t *testing.T) { mockSolanaService.AssertExpectations(t) }) - t.Run("updates lastProcessedBlock correctly", func(t *testing.T) { + t.Run("updates delivery cursor on successful delivery", func(t *testing.T) { service, mockSolana := setupTest(t) baseCtx, cancel := context.WithTimeout(t.Context(), 100*time.Millisecond) defer cancel() @@ -755,7 +884,7 @@ func TestStartPolling(t *testing.T) { // Create service with very small buffer lggr := logger.Test(t) opts := LogTriggerServiceOpts{ - SolanaService: mockSolanaService, + SolanaService: mocks.WrapSolanaService(mockSolanaService), Logger: lggr, Triggers: store, LogTriggerPollInterval: 1 * time.Millisecond, @@ -806,13 +935,13 @@ func TestStartPolling(t *testing.T) { mockSolanaService.AssertExpectations(t) }) - t.Run("updates lastProcessedBlock when inbox is full", func(t *testing.T) { + t.Run("does not advance delivery cursor when inbox is full", func(t *testing.T) { mockSolanaService := mocks.NewSolanaService(t) store := NewSolanaLogTriggerStore() lggr := logger.Test(t) opts := LogTriggerServiceOpts{ - SolanaService: mockSolanaService, + SolanaService: mocks.WrapSolanaService(mockSolanaService), Logger: lggr, Triggers: store, LogTriggerPollInterval: 10 * time.Millisecond, @@ -874,7 +1003,7 @@ func TestStartPolling(t *testing.T) { lggr := logger.Test(t) opts := LogTriggerServiceOpts{ - SolanaService: mockSolanaService, + SolanaService: mocks.WrapSolanaService(mockSolanaService), Logger: lggr, Triggers: store, LogTriggerPollInterval: 5 * time.Millisecond, @@ -968,7 +1097,7 @@ func TestErrorHandling(t *testing.T) { }, } - expressions, err := BuildQueryExpressions(request, 99) + expressions, err := BuildQueryExpressions(request, 99, true) if err != nil { assert.Contains(t, err.Error(), "failed to create subkey filter") } else { @@ -978,7 +1107,7 @@ func TestErrorHandling(t *testing.T) { t.Run("block range validation", func(t *testing.T) { request := createTestRequest() - expressions, err := BuildQueryExpressions(request, 199) + expressions, err := BuildQueryExpressions(request, 199, true) if err == nil { assert.NotNil(t, expressions) } @@ -998,7 +1127,7 @@ func TestErrorHandling(t *testing.T) { }, } - expressions, err := BuildQueryExpressions(request, 99) + expressions, err := BuildQueryExpressions(request, 99, true) if err == nil { assert.NotNil(t, expressions) } @@ -1006,7 +1135,7 @@ func TestErrorHandling(t *testing.T) { t.Run("zero block range", func(t *testing.T) { request := createTestRequest() - expressions, err := BuildQueryExpressions(request, -1) + expressions, err := BuildQueryExpressions(request, -1, true) require.NoError(t, err) assert.GreaterOrEqual(t, len(expressions), 2) }) @@ -1099,7 +1228,7 @@ func BenchmarkSolanaLogTriggerService_BuildQueryExpressions(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := BuildQueryExpressions(request, 99) + _, err := BuildQueryExpressions(request, 99, true) if err != nil { b.Fatal(err) } @@ -1169,7 +1298,7 @@ func TestSolanaLogTriggerService_NewLogTriggerService(t *testing.T) { lggr := logger.Test(t) opts := LogTriggerServiceOpts{ - SolanaService: mockService, + SolanaService: mocks.WrapSolanaService(mockService), Logger: lggr, Triggers: store, LogTriggerPollInterval: 5 * time.Second, @@ -1320,7 +1449,7 @@ func TestSolanaLogTriggerService_EdgeCases(t *testing.T) { t.Run("very large block numbers", func(t *testing.T) { request := createTestRequest() - expressions, err := BuildQueryExpressions(request, 9223372036854775805) + expressions, err := BuildQueryExpressions(request, 9223372036854775805, true) require.NoError(t, err) require.NotEmpty(t, expressions) @@ -1329,7 +1458,7 @@ func TestSolanaLogTriggerService_EdgeCases(t *testing.T) { t.Run("negative block numbers", func(t *testing.T) { request := createTestRequest() - expressions, err := BuildQueryExpressions(request, -2) + expressions, err := BuildQueryExpressions(request, -2, true) require.NoError(t, err) require.NotEmpty(t, expressions)