Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions chain_capabilities/solana/actions/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ func (s *Solana) GetMultipleAccountsWithOpts(
return nil, GetError(errors.New("unimplemented"), false)
}

func (s *Solana) GetProgramAccounts(
ctx context.Context,
metadata capabilities.RequestMetadata,
input *solcap.GetProgramAccountsRequest) (*capabilities.ResponseAndMetadata[*solcap.GetProgramAccountsReply], caperrors.Error) {
return nil, GetError(errors.New("unimplemented"), false)
}

func (s *Solana) GetSignatureStatuses(
ctx context.Context,
metadata capabilities.RequestMetadata,
Expand Down
10 changes: 5 additions & 5 deletions chain_capabilities/solana/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ require (
github.com/smartcontractkit/capabilities/chain_capabilities/common v0.0.0-20260615195421-fb87220e503f
github.com/smartcontractkit/capabilities/libs v0.0.0-20260609124022-2749e4a32bfb
github.com/smartcontractkit/chain-selectors v1.0.100
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260602135221-cc7a5b50532a
github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260326180413-c69f27e37a13
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7
github.com/smartcontractkit/chainlink-solana v1.1.2-0.20260420191419-ea62f88cbdb4
github.com/smartcontractkit/chainlink-common v0.11.2-0.20260610124317-1a3c32c46eaa
github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260521164805-26d78d5e1243
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260604171908-6734db2d444f
github.com/smartcontractkit/chainlink-solana v1.3.1-0.20260702194516-0c9666a3e925
github.com/smartcontractkit/chainlink-solana/contracts v0.0.0-20260421131224-c46cbfe7bc6c
github.com/smartcontractkit/libocr v0.0.0-20260403184524-b6409238958d
github.com/stretchr/testify v1.11.1
Expand Down Expand Up @@ -132,7 +132,7 @@ require (
github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20260310183131-8d0f0e383288 // indirect
github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20260310183131-8d0f0e383288 // indirect
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260601211238-9f526774fef0 // indirect
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20251210101658-1c5c8e4c4f15 // indirect
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20260521164805-26d78d5e1243 // indirect
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b // indirect
github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260319180422-b5808c964785 // indirect
github.com/smartcontractkit/freeport v0.1.3-0.20250828155247-add56fa28aad // indirect
Expand Down
20 changes: 10 additions & 10 deletions chain_capabilities/solana/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

187 changes: 165 additions & 22 deletions chain_capabilities/solana/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package trigger

import (
"context"
"cmp"
"fmt"
"slices"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -89,15 +91,18 @@ func (lts *SolanaLogTriggerService) ToLogPollerFilter(triggerID string, config *
}, nil
}

// BuildQueryExpressions builds query expressions including subkey filters
func BuildQueryExpressions(config *solanacappb.FilterLogTriggerRequest, lastProcessedBlock int64) ([]query.Expression, error) {
// BuildQueryExpressions builds query expressions including subkey filters.
// When includeStartingBlockFilter is true and startingBlock >= 0, results are limited to logs
// after the registration-time finalized block. Once delivery starts, progression uses the last
// successfully delivered block/log-index position.
func BuildQueryExpressions(config *solanacappb.FilterLogTriggerRequest, startingBlock int64, includeStartingBlockFilter bool) ([]query.Expression, error) {
expressions := []query.Expression{
solprimitives.NewAddressFilter(solana.PublicKey(config.Address)),
solprimitives.NewEventSigFilter(getEventSig(config.EventName)),
}

if lastProcessedBlock >= 0 {
blockStr := strconv.FormatInt(lastProcessedBlock, 10)
if includeStartingBlockFilter && startingBlock >= 0 {
blockStr := strconv.FormatInt(startingBlock, 10)
expressions = append(expressions, query.Block(blockStr, primitives.Gt))
}

Expand Down Expand Up @@ -482,17 +487,31 @@ func (lts *SolanaLogTriggerService) startPolling(ctx context.Context, telemetryC
defer ticker.Stop()
defer close(logCh)

// Use the finalized block number from registration as the starting point
lastProcessedBlock := startingBlock
// startingBlock is the finalized block at registration. The delivery cursor only advances on
// successful delivery. When log poller returns sequence numbers, progression is contiguous by
// sequence; otherwise it falls back to block/log-index position.
var (
lastDeliveredSequenceNum int64
lastDeliveredBlock = startingBlock
lastDeliveredLogIndex int64
hasDelivered bool
)

for {
select {
case <-ctx.Done():
lts.lggr.Debugf("Context cancelled for triggerID: %s, stopping polling", triggerID)
return
case <-ticker.C:
lts.lggr.Debugf("Awake, polling for triggerID: %s, currentOffset: %d", triggerID, lastProcessedBlock)
expressions, err := BuildQueryExpressions(config, lastProcessedBlock)
lts.lggr.Debugw("Polling for trigger logs",
"triggerID", triggerID,
"lastDeliveredSequenceNum", lastDeliveredSequenceNum,
"lastDeliveredBlock", lastDeliveredBlock,
"lastDeliveredLogIndex", lastDeliveredLogIndex,
"startingBlock", startingBlock,
)
includeStartingBlockFilter := !hasDelivered
expressions, err := BuildQueryExpressions(config, startingBlock, includeStartingBlockFilter)
if err != nil {
summary := fmt.Sprintf("Failed to build query expressions for trigger %s: %v", triggerID, err)
lts.logAndEmitError(ctx, telemetryContext, triggerID, summary, err.Error())
Expand All @@ -512,39 +531,163 @@ func (lts *SolanaLogTriggerService) startPolling(ctx context.Context, telemetryC
}

sentCount := 0
calculatedLatestBlock := lastProcessedBlock
for _, log := range logs {
var latestDeliveredBlock int64 = startingBlock
useSequenceCursor := logsHaveSequenceNumbers(logs)
pendingLogs := deliverableLogsAfterCursor(
logs,
useSequenceCursor,
lastDeliveredSequenceNum,
lastDeliveredBlock,
lastDeliveredLogIndex,
)
for _, log := range pendingLogs {
if log == nil {
lts.lggr.Warnw("Received nil log from QueryTrackedLogs, skipping", "triggerID", triggerID)
continue
}

// Track the highest block number from logs (all logs from log poller are already finalized)
if log.BlockNumber > calculatedLatestBlock {
calculatedLatestBlock = log.BlockNumber
}

protoLog := solanacappb.ConvertLogToProto(log)
eventID := lts.generateLogIdentifier(log)

checksLimitsOk := lts.checkLimitsOnLog(ctx, telemetryContext, protoLog, triggerID, eventID, log)
if !checksLimitsOk {
continue
break
}

if ctx.Err() != nil {
return
}
if lts.deliverLogReliably(ctx, telemetryContext, triggerID, protoLog, eventID, log) {
sentCount++
if !lts.deliverLogReliably(ctx, telemetryContext, triggerID, protoLog, eventID, log) {
break
}

sentCount++
hasDelivered = true
if useSequenceCursor {
lastDeliveredSequenceNum = log.SequenceNum
} else {
lastDeliveredBlock = log.BlockNumber
lastDeliveredLogIndex = log.LogIndex
}
if log.BlockNumber > latestDeliveredBlock {
latestDeliveredBlock = log.BlockNumber
}
}

lastProcessedBlock = calculatedLatestBlock
successMessage := fmt.Sprintf("Finished updating BlockNumber for triggerID: %s, BlockNumber: %d, sent logs: %d", triggerID, calculatedLatestBlock, sentCount)
lts.logAndEmitSuccess(ctx, successMessage, telemetryContext, triggerID, config, sentCount, calculatedLatestBlock)
var successMessage string
if useSequenceCursor {
successMessage = fmt.Sprintf(
"Finished updating delivery cursor for triggerID: %s, SequenceNum: %d, sent logs: %d",
triggerID, lastDeliveredSequenceNum, sentCount,
)
} else {
successMessage = fmt.Sprintf(
"Finished updating delivery cursor for triggerID: %s, BlockNumber: %d, LogIndex: %d, sent logs: %d",
triggerID, lastDeliveredBlock, lastDeliveredLogIndex, sentCount,
)
}
lts.logAndEmitSuccess(ctx, successMessage, telemetryContext, triggerID, config, sentCount, latestDeliveredBlock)
}
}
}

func logsHaveSequenceNumbers(logs []*solana.Log) bool {
for _, log := range logs {
if log != nil && log.SequenceNum > 0 {
return true
}
}
return false
}

func deliverableLogsAfterCursor(
logs []*solana.Log,
useSequenceCursor bool,
lastDeliveredSequenceNum int64,
lastDeliveredBlock int64,
lastDeliveredLogIndex int64,
) []*solana.Log {
if useSequenceCursor {
return deliverableLogsAfterSequence(logs, lastDeliveredSequenceNum)
}
return pendingLogsAfterBlockPosition(logs, lastDeliveredBlock, lastDeliveredLogIndex)
}

func pendingLogsAfterBlockPosition(logs []*solana.Log, lastDeliveredBlock int64, lastDeliveredLogIndex int64) []*solana.Log {
pending := make([]*solana.Log, 0, len(logs))
for _, log := range logs {
if log == nil {
continue
}
if isLogAtOrBeforeBlockPosition(log, lastDeliveredBlock, lastDeliveredLogIndex) {
continue
}
pending = append(pending, log)
}

slices.SortFunc(pending, func(a, b *solana.Log) int {
if c := cmp.Compare(a.BlockNumber, b.BlockNumber); c != 0 {
return c
}
return cmp.Compare(a.LogIndex, b.LogIndex)
})

return pending
}

func isLogAtOrBeforeBlockPosition(log *solana.Log, lastDeliveredBlock int64, lastDeliveredLogIndex int64) bool {
if log.BlockNumber < lastDeliveredBlock {
return true
}
if log.BlockNumber > lastDeliveredBlock {
return false
}
return log.LogIndex <= lastDeliveredLogIndex
}

func pendingLogsAfterSequence(logs []*solana.Log, lastDeliveredSequenceNum int64) []*solana.Log {
pending := make([]*solana.Log, 0, len(logs))
for _, log := range logs {
if log == nil {
continue
}
if log.SequenceNum <= lastDeliveredSequenceNum {
continue
}
pending = append(pending, log)
}

slices.SortFunc(pending, func(a, b *solana.Log) int {
return cmp.Compare(a.SequenceNum, b.SequenceNum)
})

return pending
}

// deliverableLogsAfterSequence returns logs ready to deliver in contiguous sequence order.
// If the next expected sequence number is missing from the query result, delivery stops at the gap
// so that later-indexed logs are not delivered before earlier ones appear in the log poller.
func deliverableLogsAfterSequence(logs []*solana.Log, lastDeliveredSequenceNum int64) []*solana.Log {
pending := pendingLogsAfterSequence(logs, lastDeliveredSequenceNum)
if len(pending) == 0 {
return nil
}

expectedNext := lastDeliveredSequenceNum + 1

deliverable := make([]*solana.Log, 0, len(pending))
for _, log := range pending {
if log.SequenceNum > expectedNext {
break
}
if log.SequenceNum < expectedNext {
continue
}
deliverable = append(deliverable, log)
expectedNext++
}

return deliverable
}

func getEventSig(eventName string) solana.EventSignature {
Expand Down Expand Up @@ -585,7 +728,7 @@ func (lts *SolanaLogTriggerService) deliverLogReliably(
Payload: anyPayload,
}

lts.lggr.Infow("Sending log event to pipe", "triggerID", triggerID, "eventID", eventID, "blockNumber", log.BlockNumber, "txHash", log.TxHash)
lts.lggr.Infow("Sending log event to pipe", "triggerID", triggerID, "eventID", eventID, "blockNumber", log.BlockNumber, "logIndex", log.LogIndex, "txHash", log.TxHash)
deliverCtx := capcommon.ContextWithOrgForDelivery(ctx, lts.lggr, lts.orgResolver, telemetryContext.RequestMetadata)
if err := lts.baseTrigger.DeliverEvent(deliverCtx, te, triggerID); err != nil {
summary := fmt.Sprintf("failed to persist/deliver event (triggerID=%s, eventID=%s): %v", triggerID, eventID, err)
Expand Down
Loading
Loading