Skip to content

Commit ede2767

Browse files
authored
fix: correct snap edge case (#1147)
* debug snap metadata issues * fix: add has_sector_key column to replace broken orig!=cur snap detection The assumption that orig_sealed_cid != cur_sealed_cid indicates a snap-upgraded sector breaks when deal data is all zeros: the encode formula out[i] = key[i] + data[i]*rho(i) produces key[i] when data[i]=0, leaving CommR unchanged and SealedCID == SectorKeyCID on chain. Add has_sector_key BOOLEAN to sectors_meta that mirrors chain SectorKeyCID state: - Set TRUE in transferUpdatedSectorData during snap submit - Reconciled from chain by SectorMetadata task (batched, 100 sectors/txn) - Seeded from orig!=cur in migration for common case; zero-data edge cases fixed by next metadata task run Replace all snap detection consumers: - is_cc triggers: use NOT has_sector_key instead of CID comparison - GC mark: query has_sector_key=TRUE instead of orig!=cur - Unseal decode: use HasSectorKey instead of commK!=commR (critical: was calling DecodeSDR instead of DecodeSnap for zero-data snaps) - Web UI: derive IsSnap from has_sector_key, fix mismatch detection - Guided setup: set has_sector_key from lotus UpdateSealed presence Also add encode-time guard rejecting snaps where output CommR equals input CommR, preventing future zero-data snaps from entering the pipeline. * fix: downgrade identical CommR snap check from error to warning Zero-data snaps are valid on chain; has_sector_key handles the detection correctly so there is no reason to reject the encode. * fix: add downgrade file for has_sector_key migration Reverts triggers to the 20260315 version (orig_sealed_cid check). Keeps the has_sector_key column to avoid data loss.
1 parent 12a0ec3 commit ede2767

File tree

10 files changed

+401
-18
lines changed

10 files changed

+401
-18
lines changed

cmd/curio/guidedsetup/shared.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -388,13 +388,14 @@ func MigrateSectors(ctx context.Context, maddr address.Address, mmeta datastore.
388388
_, err := tx.Exec(`
389389
INSERT INTO sectors_meta (sp_id, sector_num, reg_seal_proof, ticket_epoch, ticket_value,
390390
orig_sealed_cid, orig_unsealed_cid, cur_sealed_cid, cur_unsealed_cid,
391-
msg_cid_precommit, msg_cid_commit, msg_cid_update, seed_epoch, seed_value)
392-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
391+
msg_cid_precommit, msg_cid_commit, msg_cid_update, seed_epoch, seed_value, has_sector_key)
392+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
393393
ON CONFLICT (sp_id, sector_num) DO UPDATE
394394
SET reg_seal_proof = excluded.reg_seal_proof, ticket_epoch = excluded.ticket_epoch, ticket_value = excluded.ticket_value,
395395
orig_sealed_cid = excluded.orig_sealed_cid, orig_unsealed_cid = excluded.orig_unsealed_cid, cur_sealed_cid = excluded.cur_sealed_cid,
396396
cur_unsealed_cid = excluded.cur_unsealed_cid, msg_cid_precommit = excluded.msg_cid_precommit, msg_cid_commit = excluded.msg_cid_commit,
397-
msg_cid_update = excluded.msg_cid_update, seed_epoch = excluded.seed_epoch, seed_value = excluded.seed_value`,
397+
msg_cid_update = excluded.msg_cid_update, seed_epoch = excluded.seed_epoch, seed_value = excluded.seed_value,
398+
has_sector_key = excluded.has_sector_key`,
398399
mid,
399400
sectr.SectorNumber,
400401
sectr.SectorType,
@@ -409,6 +410,7 @@ func MigrateSectors(ctx context.Context, maddr address.Address, mmeta datastore.
409410
cidPtrToStrptr(sectr.ReplicaUpdateMessage),
410411
sectr.SeedEpoch,
411412
sectr.SeedValue,
413+
sectr.UpdateSealed != nil, // has_sector_key: true if sector was snap-upgraded in lotus
412414
)
413415
if err != nil {
414416
b, _ := json.MarshalIndent(sectr, "", " ")
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
-- Downgrade: Restore trigger functions to the 20260315 version that uses
2+
-- orig_sealed_cid = cur_sealed_cid instead of has_sector_key.
3+
-- The has_sector_key column is kept (harmless, avoids data loss).
4+
5+
CREATE OR REPLACE FUNCTION update_is_cc()
6+
RETURNS TRIGGER AS $$
7+
BEGIN
8+
NEW.is_cc := (NEW.orig_sealed_cid = NEW.cur_sealed_cid) AND NOT EXISTS (
9+
SELECT 1
10+
FROM sectors_snap_pipeline
11+
WHERE sectors_snap_pipeline.sp_id = NEW.sp_id
12+
AND sectors_snap_pipeline.sector_number = NEW.sector_num
13+
) AND EXISTS (
14+
SELECT 1
15+
FROM sectors_cc_values
16+
WHERE sectors_cc_values.reg_seal_proof = NEW.reg_seal_proof
17+
AND sectors_cc_values.cur_unsealed_cid = NEW.cur_unsealed_cid
18+
);
19+
20+
RETURN NEW;
21+
END;
22+
$$ LANGUAGE plpgsql;
23+
24+
CREATE OR REPLACE FUNCTION update_sectors_meta_is_cc()
25+
RETURNS TRIGGER AS $$
26+
DECLARE
27+
v_sp_id BIGINT;
28+
v_sector_number BIGINT;
29+
BEGIN
30+
IF TG_OP = 'DELETE' THEN
31+
v_sp_id := OLD.sp_id;
32+
v_sector_number := OLD.sector_number;
33+
ELSE
34+
v_sp_id := NEW.sp_id;
35+
v_sector_number := NEW.sector_number;
36+
END IF;
37+
38+
UPDATE sectors_meta
39+
SET is_cc = (sectors_meta.orig_sealed_cid = sectors_meta.cur_sealed_cid) AND NOT EXISTS (
40+
SELECT 1
41+
FROM sectors_snap_pipeline
42+
WHERE sectors_snap_pipeline.sp_id = sectors_meta.sp_id
43+
AND sectors_snap_pipeline.sector_number = sectors_meta.sector_num
44+
) AND EXISTS (
45+
SELECT 1
46+
FROM sectors_cc_values
47+
WHERE sectors_cc_values.reg_seal_proof = sectors_meta.reg_seal_proof
48+
AND sectors_cc_values.cur_unsealed_cid = sectors_meta.cur_unsealed_cid
49+
)
50+
WHERE sp_id = v_sp_id AND sector_num = v_sector_number;
51+
52+
IF TG_OP = 'DELETE' THEN
53+
RETURN OLD;
54+
END IF;
55+
RETURN NEW;
56+
END;
57+
$$ LANGUAGE plpgsql;
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
-- Add has_sector_key column to sectors_meta.
2+
-- This mirrors chain state: true when the on-chain sector has SectorKeyCID set
3+
-- (i.e. the sector has been snap-upgraded). It replaces the unreliable
4+
-- orig_sealed_cid != cur_sealed_cid heuristic which breaks when a snap-deal
5+
-- encodes all-zero data, producing an identical CommR.
6+
--
7+
-- Backfill of this column happens in the SectorMetadata reconciliation task
8+
-- which reads chain state for every sector. No SQL-level backfill is performed
9+
-- because the DB cannot call the chain API.
10+
11+
ALTER TABLE sectors_meta ADD COLUMN IF NOT EXISTS has_sector_key BOOLEAN NOT NULL DEFAULT FALSE;
12+
13+
-- Seed the column from the best heuristic available in SQL: if the CIDs differ
14+
-- the sector was definitely snapped. This covers the common case immediately;
15+
-- zero-data-snap sectors (where CIDs match) will be fixed by the metadata task.
16+
UPDATE sectors_meta SET has_sector_key = TRUE
17+
WHERE orig_sealed_cid != cur_sealed_cid;
18+
19+
-- Replace the BEFORE INSERT/UPDATE trigger on sectors_meta to use has_sector_key
20+
-- instead of orig_sealed_cid = cur_sealed_cid.
21+
CREATE OR REPLACE FUNCTION update_is_cc()
22+
RETURNS TRIGGER AS $$
23+
BEGIN
24+
NEW.is_cc := (NOT NEW.has_sector_key) AND NOT EXISTS (
25+
SELECT 1
26+
FROM sectors_snap_pipeline
27+
WHERE sectors_snap_pipeline.sp_id = NEW.sp_id
28+
AND sectors_snap_pipeline.sector_number = NEW.sector_num
29+
) AND EXISTS (
30+
SELECT 1
31+
FROM sectors_cc_values
32+
WHERE sectors_cc_values.reg_seal_proof = NEW.reg_seal_proof
33+
AND sectors_cc_values.cur_unsealed_cid = NEW.cur_unsealed_cid
34+
);
35+
36+
RETURN NEW;
37+
END;
38+
$$ LANGUAGE plpgsql;
39+
40+
-- Replace the AFTER INSERT/UPDATE/DELETE trigger on sectors_snap_pipeline
41+
CREATE OR REPLACE FUNCTION update_sectors_meta_is_cc()
42+
RETURNS TRIGGER AS $$
43+
DECLARE
44+
v_sp_id BIGINT;
45+
v_sector_number BIGINT;
46+
BEGIN
47+
IF TG_OP = 'DELETE' THEN
48+
v_sp_id := OLD.sp_id;
49+
v_sector_number := OLD.sector_number;
50+
ELSE
51+
v_sp_id := NEW.sp_id;
52+
v_sector_number := NEW.sector_number;
53+
END IF;
54+
55+
UPDATE sectors_meta
56+
SET is_cc = (NOT sectors_meta.has_sector_key) AND NOT EXISTS (
57+
SELECT 1
58+
FROM sectors_snap_pipeline
59+
WHERE sectors_snap_pipeline.sp_id = sectors_meta.sp_id
60+
AND sectors_snap_pipeline.sector_number = sectors_meta.sector_num
61+
) AND EXISTS (
62+
SELECT 1
63+
FROM sectors_cc_values
64+
WHERE sectors_cc_values.reg_seal_proof = sectors_meta.reg_seal_proof
65+
AND sectors_cc_values.cur_unsealed_cid = sectors_meta.cur_unsealed_cid
66+
)
67+
WHERE sp_id = v_sp_id AND sector_num = v_sector_number;
68+
69+
IF TG_OP = 'DELETE' THEN
70+
RETURN OLD;
71+
END IF;
72+
RETURN NEW;
73+
END;
74+
$$ LANGUAGE plpgsql;

lib/ffi/snap_funcs.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,11 @@ func (sb *SealCalls) EncodeUpdate(
340340
return cid.Undef, cid.Undef, xerrors.Errorf("compute sealed cid: %w", err)
341341
}
342342

343+
if sealedCid == sectorKeyCid {
344+
log.Warnw("snap encode produced identical CommR to sector key; deal data is likely all zeros",
345+
"sectorID", sector.ID, "taskID", taskID, "commR", sealedCid)
346+
}
347+
343348
// STEP 3: Generate update proofs
344349

345350
genVpsStart := time.Now()

tasks/gc/storage_gc_mark.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ func (s *StorageGCMark) Do(taskID harmonytask.TaskID, stillOwned func() bool) (d
370370
}
371371

372372
var minerIDs []int64
373-
if err = s.db.Select(ctx, &minerIDs, `SELECT DISTINCT sp_id FROM sectors_meta WHERE orig_sealed_cid != cur_sealed_cid`); err != nil {
373+
if err = s.db.Select(ctx, &minerIDs, `SELECT DISTINCT sp_id FROM sectors_meta WHERE has_sector_key = TRUE`); err != nil {
374374
return false, xerrors.Errorf("distinct miners from snap sectors: %w", err)
375375
}
376376

@@ -394,12 +394,11 @@ func (s *StorageGCMark) Do(taskID harmonytask.TaskID, stillOwned func() bool) (d
394394
finalityMinerStates[abi.ActorID(mID)] = mState
395395
}
396396

397-
// SELECT sp_id, sector_num FROM sectors_meta WHERE orig_sealed_cid != cur_sealed_cid
398397
var snapSectors []struct {
399398
SpID int64 `db:"sp_id"`
400399
SectorNum int64 `db:"sector_num"`
401400
}
402-
err = s.db.Select(ctx, &snapSectors, `SELECT sp_id, sector_num FROM sectors_meta WHERE orig_sealed_cid != cur_sealed_cid ORDER BY sp_id, sector_num`)
401+
err = s.db.Select(ctx, &snapSectors, `SELECT sp_id, sector_num FROM sectors_meta WHERE has_sector_key = TRUE ORDER BY sp_id, sector_num`)
403402
if err != nil {
404403
return false, xerrors.Errorf("select snap sectors: %w", err)
405404
}

tasks/metadata/task_sector_expirations.go

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ func (s *SectorMetadata) Do(taskID harmonytask.TaskID, stillOwned func() bool) (
7979
SectorNum uint64 `db:"sector_num"`
8080

8181
CurSealedCID string `db:"cur_sealed_cid"`
82+
HasSectorKey bool `db:"has_sector_key"`
8283

8384
Expiration *uint64 `db:"expiration_epoch"`
8485

@@ -88,7 +89,7 @@ func (s *SectorMetadata) Do(taskID harmonytask.TaskID, stillOwned func() bool) (
8889
InSnapPipeline bool `db:"in_snap_pipeline"`
8990
}
9091

91-
if err := s.db.Select(ctx, &sectors, `SELECT sm.sp_id, sm.sector_num, sm.cur_sealed_cid, sm.expiration_epoch, sm.partition, sm.deadline,
92+
if err := s.db.Select(ctx, &sectors, `SELECT sm.sp_id, sm.sector_num, sm.cur_sealed_cid, sm.has_sector_key, sm.expiration_epoch, sm.partition, sm.deadline,
9293
EXISTS(SELECT 1 FROM sectors_snap_pipeline snp WHERE snp.sp_id = sm.sp_id AND snp.sector_number = sm.sector_num) AS in_snap_pipeline
9394
FROM sectors_meta sm ORDER BY sm.sp_id, sm.sector_num`); err != nil {
9495
return false, xerrors.Errorf("get sector list: %w", err)
@@ -104,10 +105,54 @@ func (s *SectorMetadata) Do(taskID harmonytask.TaskID, stillOwned func() bool) (
104105
Deadline uint64
105106
}
106107

107-
const batchSize = 1000
108+
type sectorKeyUpdate struct {
109+
SpID uint64
110+
SectorNum uint64
111+
HasSectorKey bool
112+
}
113+
114+
const batchSize = 100
108115
updateBatch := make([]partitionUpdate, 0, batchSize)
109116
total := 0
110117

118+
sectorKeyBatch := make([]sectorKeyUpdate, 0, batchSize)
119+
sectorKeyTotal := 0
120+
121+
flushSectorKeyBatch := func() error {
122+
if len(sectorKeyBatch) == 0 {
123+
return nil
124+
}
125+
126+
sectorKeyTotal += len(sectorKeyBatch)
127+
log.Infow("updating has_sector_key", "total", sectorKeyTotal)
128+
129+
_, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
130+
batch := &pgx.Batch{}
131+
for _, update := range sectorKeyBatch {
132+
batch.Queue("UPDATE sectors_meta SET has_sector_key = $1 WHERE sp_id = $2 AND sector_num = $3",
133+
update.HasSectorKey, update.SpID, update.SectorNum)
134+
}
135+
136+
br, err := tx.SendBatch(ctx, batch)
137+
if err != nil {
138+
return false, xerrors.Errorf("failed to send has_sector_key batch: %w", err)
139+
}
140+
defer func() {
141+
_ = br.Close()
142+
}()
143+
144+
for i := 0; i < batch.Len(); i++ {
145+
_, err := br.Exec()
146+
if err != nil {
147+
return false, xerrors.Errorf("executing has_sector_key batch update %d: %w", i, err)
148+
}
149+
}
150+
151+
return true, nil
152+
}, harmonydb.OptionRetry())
153+
return err
154+
}
155+
111156
flushBatch := func() error {
112157
if len(updateBatch) == 0 {
113158
return nil
@@ -219,6 +264,30 @@ func (s *SectorMetadata) Do(taskID harmonytask.TaskID, stillOwned func() bool) (
219264
}
220265
}
221266

267+
// Reconcile has_sector_key from chain state.
268+
// SectorKeyCID is set on-chain when a sector has been snap-upgraded.
269+
// This is the authoritative source of truth; the DB column is kept in
270+
// sync here. Skip sectors currently in the snap pipeline.
271+
chainHasSectorKey := si.SectorKeyCID != nil
272+
if sector.HasSectorKey != chainHasSectorKey && !sector.InSnapPipeline {
273+
log.Infow("reconciling has_sector_key from chain",
274+
"sp", sector.SpID, "sector", sector.SectorNum,
275+
"db", sector.HasSectorKey, "chain", chainHasSectorKey)
276+
277+
sectorKeyBatch = append(sectorKeyBatch, sectorKeyUpdate{
278+
SpID: sector.SpID,
279+
SectorNum: sector.SectorNum,
280+
HasSectorKey: chainHasSectorKey,
281+
})
282+
283+
if len(sectorKeyBatch) >= batchSize {
284+
if err := flushSectorKeyBatch(); err != nil {
285+
return false, xerrors.Errorf("flushing has_sector_key batch: %w", err)
286+
}
287+
sectorKeyBatch = sectorKeyBatch[:0]
288+
}
289+
}
290+
222291
needsPartitionUpdate := false
223292

224293
if sector.Partition == nil || sector.Deadline == nil {
@@ -301,6 +370,9 @@ func (s *SectorMetadata) Do(taskID harmonytask.TaskID, stillOwned func() bool) (
301370
}
302371

303372
// Flush any remaining updates
373+
if err := flushSectorKeyBatch(); err != nil {
374+
return false, xerrors.Errorf("flushing final has_sector_key batch: %w", err)
375+
}
304376
if err := flushBatch(); err != nil {
305377
return false, xerrors.Errorf("flushing final batch: %w", err)
306378
}

tasks/snap/task_submit.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,24 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
239239
return false, xerrors.Errorf("Proof mismatch between on chain %d and local database %d for sector %d of miner %d", onChainInfo.SealProof, regProof, update.SectorNumber, update.SpID)
240240
}
241241

242+
// Check that the sector is a CC sector (SectorKeyCID must be nil for CC sectors).
243+
// If SectorKeyCID is set, the sector was already snapped and cannot be updated again.
244+
if onChainInfo.SectorKeyCID != nil {
245+
log.Errorw("sector is not CC on-chain (SectorKeyCID is set), skipping", "sp", update.SpID, "sector", update.SectorNumber, "sector_key_cid", onChainInfo.SectorKeyCID)
246+
247+
_, err := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET
248+
failed = TRUE, failed_at = NOW(), failed_reason = 'not-cc', failed_reason_msg = $1,
249+
task_id_submit = NULL, after_submit = FALSE
250+
WHERE sp_id = $2 AND sector_number = $3`,
251+
fmt.Sprintf("sector %d is not a CC sector on-chain: SectorKeyCID is set to %s", update.SectorNumber, onChainInfo.SectorKeyCID),
252+
update.SpID, update.SectorNumber)
253+
if err != nil {
254+
return false, xerrors.Errorf("marking sector as failed (not CC): %w", err)
255+
}
256+
257+
continue // Skip this sector
258+
}
259+
242260
sl, err := s.api.StateSectorPartition(ctx, maddr, snum, types.EmptyTSK)
243261
if err != nil {
244262
return false, xerrors.Errorf("getting sector location: %w", err)
@@ -366,7 +384,8 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
366384
}
367385

368386
if len(params.SectorUpdates) == 0 {
369-
return false, xerrors.Errorf("no sector updates")
387+
log.Warnw("no sector updates to submit after filtering, all sectors were skipped")
388+
return true, nil
370389
}
371390

372391
enc := new(bytes.Buffer)
@@ -444,7 +463,7 @@ func (s *SubmitTask) transferUpdatedSectorData(ctx context.Context, spID int64,
444463
for sectorNum, cids := range transferMap {
445464
sectorNum, cids := sectorNum, cids
446465
n, err := tx.Exec(`UPDATE sectors_meta SET cur_sealed_cid = $1,
447-
cur_unsealed_cid = $2, msg_cid_update = $3
466+
cur_unsealed_cid = $2, msg_cid_update = $3, has_sector_key = TRUE
448467
WHERE sp_id = $4 AND sector_num = $5`, cids.sealed.String(), cids.unsealed.String(), mcid.String(), spID, sectorNum)
449468

450469
if err != nil {

tasks/unseal/task_unseal_decode.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,10 @@ func (t *TaskUnsealDecode) Do(taskID harmonytask.TaskID, stillOwned func() bool)
7171
OrigSealedCID string `db:"orig_sealed_cid"`
7272
CurSealedCID string `db:"cur_sealed_cid"`
7373
CurUnsealedCID string `db:"cur_unsealed_cid"`
74+
HasSectorKey bool `db:"has_sector_key"`
7475
}
7576
err = t.db.Select(ctx, &sectorMeta, `
76-
SELECT ticket_value, orig_sealed_cid, cur_sealed_cid, cur_unsealed_cid
77+
SELECT ticket_value, orig_sealed_cid, cur_sealed_cid, cur_unsealed_cid, has_sector_key
7778
FROM sectors_meta
7879
WHERE sp_id = $1 AND sector_num = $2`, sectorParams.SpID, sectorParams.SectorNumber)
7980
if err != nil {
@@ -128,7 +129,7 @@ func (t *TaskUnsealDecode) Do(taskID harmonytask.TaskID, stillOwned func() bool)
128129
ProofType: abi.RegisteredSealProof(sectorParams.RegSealProof),
129130
}
130131

131-
isSnap := commK != commR
132+
isSnap := smeta.HasSectorKey
132133
log.Infow("unseal decode", "snap", isSnap, "task", taskID, "commK", commK, "commR", commR, "commD", commD)
133134
if isSnap {
134135
err := t.sc.DecodeSnap(ctx, taskID, commD, commK, sref)

0 commit comments

Comments
 (0)