diff --git a/cmd/mithril/node/node.go b/cmd/mithril/node/node.go index edb7af08..72608a19 100644 --- a/cmd/mithril/node/node.go +++ b/cmd/mithril/node/node.go @@ -809,6 +809,8 @@ func runVerifyRange(c *cobra.Command, args []string) { snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot) } mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0) + // Populate manifest seed data so replay doesn't need manifest at runtime + snapshot.PopulateManifestSeed(mithrilState, manifest) if err := mithrilState.Save(accountsDbDir); err != nil { mlog.Log.Errorf("failed to save state file: %v", err) } @@ -1175,6 +1177,8 @@ func runLive(c *cobra.Command, args []string) { snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot) } mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0) + // Populate manifest seed data so replay doesn't need manifest at runtime + snapshot.PopulateManifestSeed(mithrilState, manifest) if err := mithrilState.Save(accountsPath); err != nil { mlog.Log.Errorf("failed to save state file: %v", err) } @@ -1243,6 +1247,8 @@ func runLive(c *cobra.Command, args []string) { snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot) } mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0) + // Populate manifest seed data so replay doesn't need manifest at runtime + snapshot.PopulateManifestSeed(mithrilState, manifest) if err := mithrilState.Save(accountsPath); err != nil { mlog.Log.Errorf("failed to save state file: %v", err) } @@ -1299,6 +1305,8 @@ func runLive(c *cobra.Command, args []string) { snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot) } mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0) + // Populate manifest seed data so replay doesn't need manifest at runtime + snapshot.PopulateManifestSeed(mithrilState, manifest) if err := mithrilState.Save(accountsPath); err != nil { mlog.Log.Errorf("failed to save state file: %v", err) } @@ -1369,6 +1377,8 @@ func runLive(c *cobra.Command, args []string) { snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot) } mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0) + // Populate manifest seed data so replay doesn't need manifest at runtime + snapshot.PopulateManifestSeed(mithrilState, manifest) if err := mithrilState.Save(accountsPath); err != nil { mlog.Log.Errorf("failed to save state file: %v", err) } @@ -1471,6 +1481,8 @@ func runLive(c *cobra.Command, args []string) { WriterVersion: getVersion(), WriterCommit: getCommit(), }) + // Populate manifest seed data so replay doesn't need manifest at runtime + snapshot.PopulateManifestSeed(mithrilState, manifest) if err := mithrilState.Save(accountsPath); err != nil { mlog.Log.Errorf("failed to save state file: %v", err) } @@ -1582,6 +1594,8 @@ postBootstrap: snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot) } mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0) + // Populate manifest seed data so replay doesn't need manifest at runtime + snapshot.PopulateManifestSeed(mithrilState, manifest) if err := mithrilState.Save(accountsPath); err != nil { mlog.Log.Errorf("failed to save state file: %v", err) } @@ -2604,6 +2618,6 @@ func runReplayWithRecovery( } }() - result = replay.ReplayBlocks(ctx, accountsDb, accountsDbPath, manifest, resumeState, startSlot, endSlot, rpcEndpoints, blockDir, txParallelism, isLive, useLightbringer, dbgOpts, metricsWriter, rpcServer, blockFetchOpts, onCancelWriteState) + result = replay.ReplayBlocks(ctx, accountsDb, accountsDbPath, mithrilState, resumeState, startSlot, endSlot, rpcEndpoints, blockDir, txParallelism, isLive, useLightbringer, dbgOpts, metricsWriter, rpcServer, blockFetchOpts, onCancelWriteState) return result } diff --git a/pkg/global/global_ctx.go b/pkg/global/global_ctx.go index 6ea685d2..7eef3409 100644 --- a/pkg/global/global_ctx.go +++ b/pkg/global/global_ctx.go @@ -6,13 +6,17 @@ import ( "fmt" "os" "path/filepath" + "runtime" "sync" + "sync/atomic" + "github.com/Overclock-Validator/mithril/pkg/accountsdb" "github.com/Overclock-Validator/mithril/pkg/epochstakes" "github.com/Overclock-Validator/mithril/pkg/forkchoice" "github.com/Overclock-Validator/mithril/pkg/leaderschedule" "github.com/Overclock-Validator/mithril/pkg/sealevel" "github.com/gagliardetto/solana-go" + "github.com/panjf2000/ants/v2" ) // StakePubkeyIndexFileName is the name of the stake pubkey index file @@ -27,6 +31,7 @@ type GlobalCtx struct { stakeCache map[solana.PublicKey]*sealevel.Delegation pendingNewStakePubkeys []solana.PublicKey // New stake pubkeys to append to index after block commit voteCache map[solana.PublicKey]*sealevel.VoteStateVersions + voteStakeTotals map[solana.PublicKey]uint64 // Aggregated stake totals per vote account (replaces full stake cache at startup) epochStakes *epochstakes.EpochStakesCache epochAuthorizedVoters *epochstakes.EpochAuthorizedVotersCache forkChoice *forkchoice.ForkChoiceService @@ -37,6 +42,7 @@ type GlobalCtx struct { manageBlockHeight bool stakeCacheMutex sync.Mutex // Changed from RWMutex - simpler, used for both cache and pending voteCacheMutex sync.RWMutex + voteStakeTotalsMu sync.RWMutex slotsConfirmedMutex sync.Mutex mu sync.Mutex } @@ -78,6 +84,8 @@ func IncrTransactionCount(num uint64) { // PutStakeCacheItem adds or updates a stake cache entry during replay. // If this is a NEW pubkey (not already in cache), it's added to pendingNewStakePubkeys // for later append to the index file via FlushPendingStakePubkeys. +// NOTE: With streaming rewards, this function may only be called during rewards distribution +// to maintain consistency. For normal transaction recording, use TrackNewStakePubkey instead. func PutStakeCacheItem(pubkey solana.PublicKey, delegation *sealevel.Delegation) { instance.stakeCacheMutex.Lock() defer instance.stakeCacheMutex.Unlock() @@ -92,6 +100,31 @@ func PutStakeCacheItem(pubkey solana.PublicKey, delegation *sealevel.Delegation) instance.stakeCache[pubkey] = delegation } +// TrackNewStakePubkey tracks a new stake pubkey for index append without populating the cache. +// Use this during normal transaction recording when streaming rewards is enabled. +// Returns true if the pubkey was newly added to pending list. +func TrackNewStakePubkey(pubkey solana.PublicKey) bool { + instance.stakeCacheMutex.Lock() + defer instance.stakeCacheMutex.Unlock() + + // Check if already in cache or pending list + // Since we're not populating the cache anymore, we need to track seen pubkeys differently + if instance.stakeCache == nil { + instance.stakeCache = make(map[solana.PublicKey]*sealevel.Delegation) + } + + // Check if already known (in cache from previous runs or pending) + _, exists := instance.stakeCache[pubkey] + if exists { + return false + } + + // Mark as seen with nil delegation (just for tracking) + instance.stakeCache[pubkey] = nil + instance.pendingNewStakePubkeys = append(instance.pendingNewStakePubkeys, pubkey) + return true +} + // PutStakeCacheItemBulk adds a stake cache entry during bulk population (startup). // Does NOT track new pubkeys - use this when loading cache from index/snapshot/scan // to avoid enqueueing the entire cache on rebuild. @@ -196,6 +229,37 @@ func VoteCacheSnapshot() map[solana.PublicKey]*sealevel.VoteStateVersions { return snapshot } +// SetVoteStakeTotals sets the aggregated stake totals per vote account. +// Called at startup after scanning all stake accounts. +func SetVoteStakeTotals(m map[solana.PublicKey]uint64) { + instance.voteStakeTotalsMu.Lock() + instance.voteStakeTotals = m + instance.voteStakeTotalsMu.Unlock() +} + +// VoteStakeTotalsCopy returns a copy of the vote stake totals map. +// Safe for concurrent use - callers can mutate the returned map. +func VoteStakeTotalsCopy() map[solana.PublicKey]uint64 { + instance.voteStakeTotalsMu.RLock() + defer instance.voteStakeTotalsMu.RUnlock() + if instance.voteStakeTotals == nil { + return nil + } + result := make(map[solana.PublicKey]uint64, len(instance.voteStakeTotals)) + for k, v := range instance.voteStakeTotals { + result[k] = v + } + return result +} + +// VoteStakeTotalsRef returns a direct reference to the vote stake totals map. +// Callers must NOT mutate the returned map. +func VoteStakeTotalsRef() map[solana.PublicKey]uint64 { + instance.voteStakeTotalsMu.RLock() + defer instance.voteStakeTotalsMu.RUnlock() + return instance.voteStakeTotals +} + func PutEpochStakesEntry(epoch uint64, pubkey solana.PublicKey, stake uint64, voteAcct *epochstakes.VoteAccount) { if instance.epochStakes == nil { instance.epochStakes = epochstakes.NewEpochStakesCache() @@ -487,3 +551,79 @@ func SaveStakePubkeyIndex(accountsDbDir string) error { return nil } + +// StreamStakeAccounts iterates all stake accounts from the pubkey index, +// calling fn for each valid delegation. Returns count of processed accounts. +// This streams directly from AccountsDB without building a full stake cache. +func StreamStakeAccounts( + acctsDb *accountsdb.AccountsDb, + slot uint64, + fn func(pubkey solana.PublicKey, delegation *sealevel.Delegation, creditsObserved uint64), +) (int, error) { + // Load stake pubkeys from index file + acctsDbDir := filepath.Join(acctsDb.AcctsDir, "..") + stakePubkeys, err := LoadStakePubkeyIndex(acctsDbDir) + if err != nil { + return 0, fmt.Errorf("loading stake pubkey index: %w", err) + } + + var processedCount atomic.Int64 + var wg sync.WaitGroup + + // Batched worker pool for parallel processing + const batchSize = 1000 + workerPool, err := ants.NewPoolWithFunc(runtime.NumCPU()*2, func(i interface{}) { + defer wg.Done() + + batch := i.([]solana.PublicKey) + + for _, pk := range batch { + // Read stake account from AccountsDB + stakeAcct, err := acctsDb.GetAccount(slot, pk) + if err != nil { + continue // Account not found or closed + } + + stakeState, err := sealevel.UnmarshalStakeState(stakeAcct.Data) + if err != nil { + continue // Invalid stake state + } + + // Only process delegated stake accounts (status must be "Stake") + if stakeState.Status != sealevel.StakeStateV2StatusStake { + continue + } + + // Call the callback with delegation data + delegation := &stakeState.Stake.Stake.Delegation + creditsObserved := stakeState.Stake.Stake.CreditsObserved + fn(pk, delegation, creditsObserved) + processedCount.Add(1) + } + }) + if err != nil { + return 0, fmt.Errorf("creating worker pool: %w", err) + } + defer workerPool.Release() + + // Submit stake pubkeys in batches + var invokeErr error + for i := 0; i < len(stakePubkeys); i += batchSize { + end := min(i+batchSize, len(stakePubkeys)) + wg.Add(1) + if err := workerPool.Invoke(stakePubkeys[i:end]); err != nil { + wg.Done() // balance the Add since worker won't run + invokeErr = fmt.Errorf("worker pool invoke failed: %w", err) + break // stop submitting new batches, but wait for queued ones + } + } + + // Always wait for all queued workers to finish before returning + wg.Wait() + + if invokeErr != nil { + return 0, invokeErr + } + + return int(processedCount.Load()), nil +} diff --git a/pkg/replay/block.go b/pkg/replay/block.go index 93f64c22..a62d51d9 100644 --- a/pkg/replay/block.go +++ b/pkg/replay/block.go @@ -3,6 +3,7 @@ package replay import ( "context" "crypto/rand" + "encoding/base64" "encoding/hex" "encoding/json" "fmt" @@ -25,7 +26,6 @@ import ( "github.com/Overclock-Validator/mithril/pkg/base58" b "github.com/Overclock-Validator/mithril/pkg/block" "github.com/Overclock-Validator/mithril/pkg/blockstream" - "github.com/Overclock-Validator/mithril/pkg/epochstakes" "github.com/Overclock-Validator/mithril/pkg/features" "github.com/Overclock-Validator/mithril/pkg/fees" "github.com/Overclock-Validator/mithril/pkg/global" @@ -37,7 +37,7 @@ import ( "github.com/Overclock-Validator/mithril/pkg/rpcclient" "github.com/Overclock-Validator/mithril/pkg/rpcserver" "github.com/Overclock-Validator/mithril/pkg/sealevel" - "github.com/Overclock-Validator/mithril/pkg/snapshot" + "github.com/Overclock-Validator/mithril/pkg/state" "github.com/Overclock-Validator/mithril/pkg/statsd" bin "github.com/gagliardetto/binary" "github.com/gagliardetto/solana-go" @@ -682,8 +682,9 @@ func scanAndEnableFeatures(acctsDb *accountsdb.AccountsDb, slot uint64, startOfE // 3. Extract delegation fields (VoterPubkey, StakeLamports, epochs, etc.) from AccountsDB // // This ensures the stake cache reflects the actual on-chain state, not potentially outdated -// manifest data. Fatal error if index file is missing - indicates corrupt/incomplete AccountsDB. -func setupInitialVoteAcctsAndStakeAccts(acctsDb *accountsdb.AccountsDb, block *b.Block, snapshotManifest *snapshot.SnapshotManifest) { +// data. Fatal error if index file is missing - indicates corrupt/incomplete AccountsDB. +// NO manifest parameter - derives everything from AccountsDB. +func setupInitialVoteAcctsAndStakeAccts(acctsDb *accountsdb.AccountsDb, block *b.Block) { block.VoteTimestamps = make(map[solana.PublicKey]sealevel.BlockTimestamp) block.EpochStakesPerVoteAcct = make(map[solana.PublicKey]uint64) @@ -707,31 +708,28 @@ func setupInitialVoteAcctsAndStakeAccts(acctsDb *accountsdb.AccountsDb, block *b mlog.Log.Errorf("=======================================================") os.Exit(1) } - mlog.Log.Infof("Loading vote and stake caches") + mlog.Log.Infof("Loading vote and stake caches (aggregate-only mode)") var wg sync.WaitGroup - voteAcctWorkerPool, _ := ants.NewPoolWithFunc(1024, func(i interface{}) { - defer wg.Done() - pk := i.(solana.PublicKey) - voteAcct, err := acctsDb.GetAccount(block.Slot, pk) - if err == nil { - versionedVoteState, err := sealevel.UnmarshalVersionedVoteState(voteAcct.Data) - if err == nil { - global.PutVoteCacheItem(pk, versionedVoteState) - } - } - }) + // Shared aggregated stake totals - built directly from AccountsDB scan + // Thread-safe: each worker builds local map, then merges under mutex + voteAcctStakes := make(map[solana.PublicKey]uint64) + var voteAcctStakesMu sync.Mutex - // Stake worker pool reads ALL delegation fields from AccountsDB (not manifest) - // Uses batched processing to reduce wg.Add/Invoke overhead (1M pubkeys → ~1K batches) + // Stake worker pool reads stake accounts and aggregates totals directly + // Does NOT populate global.StakeCache - only builds vote account totals const stakeBatchSize = 1000 stakeAcctWorkerPool, _ := ants.NewPoolWithFunc(runtime.NumCPU()*2, func(i interface{}) { defer wg.Done() batch := i.([]solana.PublicKey) + + // Build local aggregation for this batch + localStakes := make(map[solana.PublicKey]uint64) + for _, pk := range batch { - // Read from AccountsDB - ALL fields, not manifest + // Read from AccountsDB stakeAcct, err := acctsDb.GetAccount(block.Slot, pk) if err != nil { continue // Account not found or closed @@ -742,39 +740,23 @@ func setupInitialVoteAcctsAndStakeAccts(acctsDb *accountsdb.AccountsDb, block *b continue // Invalid stake state } - // Only cache if this is a delegated stake account (status must be "Stake") + // Only count delegated stake accounts (status must be "Stake") if stakeState.Status != sealevel.StakeStateV2StatusStake { continue } - // Use delegation from AccountsDB, not manifest - // Use Bulk variant for startup loading - doesn't track as "new" for index append + // Aggregate stake by vote account delegation := stakeState.Stake.Stake.Delegation - global.PutStakeCacheItemBulk(pk, - &sealevel.Delegation{ - VoterPubkey: delegation.VoterPubkey, - StakeLamports: delegation.StakeLamports, - ActivationEpoch: delegation.ActivationEpoch, - DeactivationEpoch: delegation.DeactivationEpoch, - WarmupCooldownRate: delegation.WarmupCooldownRate, - CreditsObserved: stakeState.Stake.Stake.CreditsObserved, - }) + localStakes[delegation.VoterPubkey] += delegation.StakeLamports } - }) - wg.Add(1) - go func() { - defer wg.Done() - for _, va := range snapshotManifest.Bank.Stakes.VoteAccounts { - ts := sealevel.BlockTimestamp{Slot: va.Value.LastTimestampSlot, Timestamp: va.Value.LastTimestampTs} - block.VoteTimestamps[va.Key] = ts - block.EpochStakesPerVoteAcct[va.Key] = va.Stake - block.TotalEpochStake += va.Stake - - wg.Add(1) - voteAcctWorkerPool.Invoke(va.Key) + // Merge local aggregation into shared map under lock + voteAcctStakesMu.Lock() + for voter, stake := range localStakes { + voteAcctStakes[voter] += stake } - }() + voteAcctStakesMu.Unlock() + }) // Submit stake pubkeys in batches (reduces wg.Add/Invoke calls from ~1M to ~1K) numBatches := (len(stakePubkeys) + stakeBatchSize - 1) / stakeBatchSize @@ -786,36 +768,81 @@ func setupInitialVoteAcctsAndStakeAccts(acctsDb *accountsdb.AccountsDb, block *b wg.Wait() stakeAcctWorkerPool.Release() - voteAcctWorkerPool.Release() - // After both caches are loaded, ensure vote cache has ALL vote accounts - // referenced by stake cache (catches any vote accounts not in manifest) - voteAcctStakes := make(map[solana.PublicKey]uint64) - for _, delegation := range global.StakeCache() { - voteAcctStakes[delegation.VoterPubkey] += delegation.StakeLamports - } + // Store aggregated totals in global for later use + global.SetVoteStakeTotals(voteAcctStakes) + + // Load vote accounts from AccountsDB into vote cache if err := RebuildVoteCacheFromAccountsDB(acctsDb, block.Slot, voteAcctStakes, 0); err != nil { mlog.Log.Warnf("vote cache rebuild had errors: %v", err) } + + // Derive EpochStakesPerVoteAcct and TotalEpochStake from aggregated totals + for pk, stake := range voteAcctStakes { + block.EpochStakesPerVoteAcct[pk] = stake + block.TotalEpochStake += stake + } + + // Derive VoteTimestamps from ALL vote accounts in cache (including zero-stake) + // This matches original manifest behavior where all vote accounts had timestamps populated + for pk, voteState := range global.VoteCache() { + if voteState != nil { + ts := voteState.LastTimestamp() + if ts != nil { + block.VoteTimestamps[pk] = *ts + } + } + } } func configureInitialBlock(acctsDb *accountsdb.AccountsDb, block *b.Block, - snapshotManifest *snapshot.SnapshotManifest, + mithrilState *state.MithrilState, epochCtx *ReplayCtx, epochSchedule *sealevel.SysvarEpochSchedule, rpcClient *rpcclient.RpcClient, auxBackupEndpoints []string) error { - block.ParentBankhash = snapshotManifest.Bank.Hash - block.ParentSlot = snapshotManifest.Bank.Slot - block.AcctsLtHash = snapshotManifest.LtHash + // Read from state file manifest_* fields (required) + if mithrilState.ManifestParentBankhash == "" { + return fmt.Errorf("state file missing manifest_parent_bankhash - delete AccountsDB and rebuild from snapshot") + } + + parentBankhash, err := base58.DecodeFromString(mithrilState.ManifestParentBankhash) + if err != nil { + return fmt.Errorf("corrupted state file: failed to decode manifest_parent_bankhash: %w", err) + } + block.ParentBankhash = parentBankhash + block.ParentSlot = mithrilState.ManifestParentSlot + + // LtHash: decode base64, restore with InitWithHash + if mithrilState.ManifestAcctsLtHash != "" { + ltHashBytes, err := base64.StdEncoding.DecodeString(mithrilState.ManifestAcctsLtHash) + if err != nil { + return fmt.Errorf("corrupted state file: failed to decode manifest_accts_lt_hash: %w", err) + } + block.AcctsLtHash = new(lthash.LtHash).InitWithHash(ltHashBytes) + } + + block.PrevFeeRateGovernor = reconstructFeeRateGovernor(mithrilState) + if block.PrevFeeRateGovernor == nil { + return fmt.Errorf("state file missing manifest_fee_rate_governor - delete AccountsDB and rebuild from snapshot") + } + block.PrevNumSignatures = mithrilState.ManifestSignatureCount + block.InitialPreviousLamportsPerSignature = mithrilState.ManifestLamportsPerSignature + + if mithrilState.ManifestEvictedBlockhash == "" { + return fmt.Errorf("state file missing manifest_evicted_blockhash - delete AccountsDB and rebuild from snapshot") + } + evictedHash, err := base58.DecodeFromString(mithrilState.ManifestEvictedBlockhash) + if err != nil { + return fmt.Errorf("corrupted state file: failed to decode manifest_evicted_blockhash: %w", err) + } + block.LatestEvictedBlockhash = evictedHash + block.EpochAcctsHash = epochCtx.EpochAcctsHash - block.PrevFeeRateGovernor = &snapshotManifest.Bank.FeeRateGovernor - block.PrevNumSignatures = snapshotManifest.Bank.SignatureCount - block.InitialPreviousLamportsPerSignature = snapshotManifest.LamportsPerSignature - setupInitialVoteAcctsAndStakeAccts(acctsDb, block, snapshotManifest) + setupInitialVoteAcctsAndStakeAccts(acctsDb, block) configureGlobalCtx(block) if global.ManageLeaderSchedule() { @@ -835,18 +862,25 @@ func configureInitialBlock(acctsDb *accountsdb.AccountsDb, block.BlockHeight = global.BlockHeight() } - // we use the RecentBlockhashes sysvar to determine whether a tx has a blockhash of acceptable - // age, but due to how Agave's BlockhashQueue is implemented, the latest 151 blockhashes - // are valid, rather than 150. we therefore need the last blockhash that was evicted from - // the RecentBlockhashes sysvar, and that's what the code below does. - ages := snapshotManifest.Bank.BlockhashQueue.HashAndAge - sort.Slice(ages, func(i, j int) bool { return ages[i].Val.HashIndex > ages[j].Val.HashIndex }) - block.LatestEvictedBlockhash = ages[150].Key - - snapshotManifest = nil return nil } +// reconstructFeeRateGovernor creates a FeeRateGovernor from state file manifest_* fields +func reconstructFeeRateGovernor(s *state.MithrilState) *sealevel.FeeRateGovernor { + if s.ManifestFeeRateGovernor == nil { + return nil + } + return &sealevel.FeeRateGovernor{ + TargetLamportsPerSignature: s.ManifestFeeRateGovernor.TargetLamportsPerSignature, + TargetSignaturesPerSlot: s.ManifestFeeRateGovernor.TargetSignaturesPerSlot, + MinLamportsPerSignature: s.ManifestFeeRateGovernor.MinLamportsPerSignature, + MaxLamportsPerSignature: s.ManifestFeeRateGovernor.MaxLamportsPerSignature, + BurnPercent: s.ManifestFeeRateGovernor.BurnPercent, + LamportsPerSignature: s.ManifestLamportsPerSignature, + PrevLamportsPerSignature: s.ManifestLamportsPerSignature, // Initial = current for fresh start + } +} + func configureBlock(block *b.Block, epochCtx *ReplayCtx, lastSlotCtx *sealevel.SlotCtx, @@ -923,7 +957,7 @@ func ensureStakeHistorySysvarCached(acctsDb *accountsdb.AccountsDb, slot uint64) func configureInitialBlockFromResume(acctsDb *accountsdb.AccountsDb, block *b.Block, resumeState *ResumeState, - snapshotManifest *snapshot.SnapshotManifest, // Still needed for static FeeRateGovernor fields + mithrilState *state.MithrilState, epochCtx *ReplayCtx, epochSchedule *sealevel.SysvarEpochSchedule, rpcClient *rpcclient.RpcClient, @@ -935,8 +969,11 @@ func configureInitialBlockFromResume(acctsDb *accountsdb.AccountsDb, block.AcctsLtHash = resumeState.AcctsLtHash block.EpochAcctsHash = epochCtx.EpochAcctsHash - // Reconstruct PrevFeeRateGovernor from manifest static fields + resume dynamic fields - prevFeeRateGovernor := snapshotManifest.Bank.FeeRateGovernor.Clone() + // Reconstruct PrevFeeRateGovernor from state file static fields + resume dynamic fields + prevFeeRateGovernor := reconstructFeeRateGovernor(mithrilState) + if prevFeeRateGovernor == nil { + return fmt.Errorf("cannot resume: state file missing manifest_fee_rate_governor (rebuild AccountsDB required)") + } prevFeeRateGovernor.LamportsPerSignature = resumeState.LamportsPerSignature prevFeeRateGovernor.PrevLamportsPerSignature = resumeState.PrevLamportsPerSignature block.PrevFeeRateGovernor = prevFeeRateGovernor @@ -945,7 +982,7 @@ func configureInitialBlockFromResume(acctsDb *accountsdb.AccountsDb, // Load vote accounts and populate global caches - same as fresh start // This seeds both block.VoteAccts/VoteTimestamps AND global.VoteCache() from AccountsDB // Required because getTimestampEstimate reads from global.VoteCache() - setupInitialVoteAcctsAndStakeAccts(acctsDb, block, snapshotManifest) + setupInitialVoteAcctsAndStakeAccts(acctsDb, block) configureGlobalCtx(block) // On resume, epoch stakes will be loaded from the persisted state file (not manifest or AccountsDB). @@ -1022,35 +1059,42 @@ func configureGlobalCtx(block *b.Block) { global.SetBlockHeight(block.BlockHeight) } -// buildInitialEpochStakesCache seeds the epoch stakes cache from manifest. -// If persistedEpochs is non-nil, skips epochs already loaded from the state file. -func buildInitialEpochStakesCache(snapshotManifest *snapshot.SnapshotManifest, persistedEpochs map[uint64]bool) { - for _, epochStake := range snapshotManifest.VersionedEpochStakes { - // Skip epochs already loaded from persisted state file - if persistedEpochs != nil && persistedEpochs[epochStake.Epoch] { - mlog.Log.Debugf("skipping epoch %d stakes from manifest (already loaded from state file)", epochStake.Epoch) - continue - } +// buildInitialEpochStakesCache seeds the epoch stakes cache from state file or manifest. +// Priority: 1) State file ManifestEpochStakes, 2) Direct manifest (backwards compat) +func buildInitialEpochStakesCache(mithrilState *state.MithrilState) error { + // Require state file ManifestEpochStakes (PersistedEpochStakes JSON format) + if mithrilState == nil || len(mithrilState.ManifestEpochStakes) == 0 { + return fmt.Errorf("state file missing manifest_epoch_stakes - delete AccountsDB and rebuild from snapshot") + } - if epochStake.Epoch == snapshotManifest.Bank.Epoch { - for _, entry := range epochStake.Val.EpochAuthorizedVoters { - global.PutEpochAuthorizedVoter(entry.Key, entry.Val) - } + for epoch, data := range mithrilState.ManifestEpochStakes { + if loadedEpoch, err := global.DeserializeAndLoadEpochStakes([]byte(data)); err != nil { + return fmt.Errorf("failed to load manifest epoch %d stakes from state file: %w", epoch, err) + } else { + mlog.Log.Debugf("loaded epoch %d stakes from state file manifest_epoch_stakes", loadedEpoch) } + } - global.PutEpochTotalStake(epochStake.Epoch, epochStake.Val.TotalStake) - for _, entry := range epochStake.Val.Stakes.VoteAccounts { - voteAcct := &epochstakes.VoteAccount{Lamports: entry.Value.Lamports, - NodePubkey: entry.Value.NodePubkey, - LastTimestampTs: entry.Value.LastTimestampTs, - LastTimestampSlot: entry.Value.LastTimestampSlot, - Owner: entry.Value.Owner, - Executable: entry.Value.Executable, - RentEpoch: entry.Value.RentEpoch} - global.PutEpochStakesEntry(epochStake.Epoch, entry.Key, entry.Stake, voteAcct) + // Load EpochAuthorizedVoters from state file (required) + // Supports multiple authorized voters per vote account (matches original manifest behavior) + if len(mithrilState.ManifestEpochAuthorizedVoters) == 0 { + return fmt.Errorf("state file missing manifest_epoch_authorized_voters - delete AccountsDB and rebuild from snapshot") + } + for voteAcctStr, authorizedVoterStrs := range mithrilState.ManifestEpochAuthorizedVoters { + voteAcct, err := base58.DecodeFromString(voteAcctStr) + if err != nil { + return fmt.Errorf("corrupted state file: failed to decode epoch_authorized_voters key %s: %w", voteAcctStr, err) + } + for _, authorizedVoterStr := range authorizedVoterStrs { + authorizedVoter, err := base58.DecodeFromString(authorizedVoterStr) + if err != nil { + return fmt.Errorf("corrupted state file: failed to decode epoch_authorized_voters value %s: %w", authorizedVoterStr, err) + } + global.PutEpochAuthorizedVoter(voteAcct, authorizedVoter) } - mlog.Log.Debugf("loaded epoch %d stakes from manifest", epochStake.Epoch) } + + return nil } type persistedTracker struct { @@ -1076,20 +1120,12 @@ func (t *persistedTracker) Get() (uint64, []byte) { return slot, out } -func nilifySnapshotManifest(manifest *snapshot.SnapshotManifest) { - manifest.Bank = nil - manifest.AccountsDb = nil - manifest.BankIncrementalSnapshotPersistence = nil - manifest.VersionedEpochStakes = nil - manifest.LtHash = nil -} - func ReplayBlocks( ctx context.Context, acctsDb *accountsdb.AccountsDb, acctsDbPath string, - snapshotManifest *snapshot.SnapshotManifest, - resumeState *ResumeState, // nil if not resuming, contains parent slot info when resuming + mithrilState *state.MithrilState, // State file with manifest_* seed fields + resumeState *ResumeState, // nil if not resuming, contains parent slot info when resuming startSlot, endSlot uint64, rpcEndpoints []string, // RPC endpoints in priority order (first = primary, rest = fallbacks) blockDir string, @@ -1147,16 +1183,21 @@ func ReplayBlocks( var featuresActivatedInFirstSlot []*accounts.Account var parentFeaturesActivatedInFirstSlot []*accounts.Account - // Pass resumeState if resuming, so ReplayCtx uses fresh values instead of stale manifest - replayCtx := newReplayCtx(snapshotManifest, resumeState) + // Pass mithrilState + resumeState so ReplayCtx uses state file for seed data + replayCtx, err := newReplayCtx(mithrilState, resumeState) + if err != nil { + result.Error = err + return result + } - global.IncrTransactionCount(snapshotManifest.Bank.TransactionCount) + // Use state file for transaction count (required) + global.IncrTransactionCount(mithrilState.ManifestTransactionCount) isFirstSlotInEpoch := epochSchedule.FirstSlotInEpoch(currentEpoch) == startSlot replayCtx.CurrentFeatures, featuresActivatedInFirstSlot, parentFeaturesActivatedInFirstSlot = scanAndEnableFeatures(acctsDb, startSlot, isFirstSlotInEpoch) partitionedEpochRewardsEnabled = replayCtx.CurrentFeatures.IsActive(features.EnablePartitionedEpochReward) || replayCtx.CurrentFeatures.IsActive(features.EnablePartitionedEpochRewardsSuperfeature) - // Load epoch stakes - persisted stakes on resume, manifest on fresh start - snapshotEpoch := epochSchedule.GetEpoch(snapshotManifest.Bank.Slot) + // Load epoch stakes - persisted stakes on resume, state file on fresh start + snapshotEpoch := epochSchedule.GetEpoch(mithrilState.ManifestParentSlot) if resumeState != nil { // Resume case - check if we've crossed epoch boundaries since snapshot epochsCrossed := currentEpoch > snapshotEpoch @@ -1187,12 +1228,18 @@ func ReplayBlocks( return result } } else { - // Resume in same epoch as snapshot, no boundaries crossed - manifest is still valid - buildInitialEpochStakesCache(snapshotManifest, nil) + // Resume in same epoch as snapshot, no boundaries crossed - state file epoch stakes still valid + if err := buildInitialEpochStakesCache(mithrilState); err != nil { + result.Error = err + return result + } } } else { - // Fresh start: load all epochs from manifest - buildInitialEpochStakesCache(snapshotManifest, nil) + // Fresh start: load all epochs from state file + if err := buildInitialEpochStakesCache(mithrilState); err != nil { + result.Error = err + return result + } } //forkChoice, err := forkchoice.NewForkChoiceService(currentEpoch, global.EpochStakes(currentEpoch), global.EpochTotalStake(currentEpoch), global.EpochAuthorizedVoters(), 4) //forkChoice.Start() @@ -1337,15 +1384,12 @@ func ReplayBlocks( // the first emitted block might have slot > startSlot. if lastSlotCtx == nil { if resumeState != nil { - // RESUME: Use resume state + manifest (for static fields) - configErr = configureInitialBlockFromResume(acctsDb, block, resumeState, snapshotManifest, replayCtx, epochSchedule, rpcc, rpcBackups) + // RESUME: Use resume state + state file (for static fields) + configErr = configureInitialBlockFromResume(acctsDb, block, resumeState, mithrilState, replayCtx, epochSchedule, rpcc, rpcBackups) } else { - // FRESH START: Use snapshot manifest - configErr = configureInitialBlock(acctsDb, block, snapshotManifest, replayCtx, epochSchedule, rpcc, rpcBackups) + // FRESH START: Use state file manifest_* fields + configErr = configureInitialBlock(acctsDb, block, mithrilState, replayCtx, epochSchedule, rpcc, rpcBackups) } - // We're done with the SnapshotManifest object. Since these objects are quite large, we hint to the GC to free - // the object's contents by nil'ing the struct's members. - nilifySnapshotManifest(snapshotManifest) } else { configErr = configureBlock(block, replayCtx, lastSlotCtx, epochSchedule, rpcc, rpcBackups) } @@ -1452,6 +1496,13 @@ func ReplayBlocks( replayCtx.Capitalization -= lastSlotCtx.LamportsBurnt + // Clear ManifestEpochStakes after first replayed slot past snapshot + // This frees memory and ensures we don't use stale manifest data on restart + if block.Slot > mithrilState.SnapshotSlot && len(mithrilState.ManifestEpochStakes) > 0 { + mithrilState.ClearManifestEpochStakes() + mlog.Log.Debugf("cleared manifest_epoch_stakes after replaying past snapshot slot") + } + // Check for cancellation immediately after block completes. // This minimizes the window between bankhash persistence and state file update, // preventing false "corruption" detection on graceful shutdown. diff --git a/pkg/replay/epoch.go b/pkg/replay/epoch.go index c4dad0a5..394aef6a 100644 --- a/pkg/replay/epoch.go +++ b/pkg/replay/epoch.go @@ -2,9 +2,9 @@ package replay import ( "bytes" + "encoding/base64" "fmt" "maps" - "runtime" "sync" "sync/atomic" @@ -18,10 +18,9 @@ import ( "github.com/Overclock-Validator/mithril/pkg/rewards" "github.com/Overclock-Validator/mithril/pkg/rpcclient" "github.com/Overclock-Validator/mithril/pkg/sealevel" - "github.com/Overclock-Validator/mithril/pkg/snapshot" + "github.com/Overclock-Validator/mithril/pkg/state" bin "github.com/gagliardetto/binary" "github.com/gagliardetto/solana-go" - "github.com/panjf2000/ants/v2" ) type ReplayCtx struct { @@ -35,10 +34,10 @@ type ReplayCtx struct { // newReplayCtx creates a new ReplayCtx, preferring values from resumeState if available. // This ensures resume uses fresh values instead of potentially stale manifest data. -func newReplayCtx(snapshotManifest *snapshot.SnapshotManifest, resumeState *ResumeState) *ReplayCtx { +func newReplayCtx(mithrilState *state.MithrilState, resumeState *ResumeState) (*ReplayCtx, error) { epochCtx := new(ReplayCtx) - // Prefer resume state if available (has non-zero capitalization) + // Priority 1: Resume state (has most recent values) if resumeState != nil && resumeState.Capitalization > 0 { epochCtx.Capitalization = resumeState.Capitalization epochCtx.SlotsPerYear = resumeState.SlotsPerYear @@ -49,19 +48,35 @@ func newReplayCtx(snapshotManifest *snapshot.SnapshotManifest, resumeState *Resu FoundationVal: resumeState.InflationFoundation, FoundationTerm: resumeState.InflationFoundationTerm, } + } else if mithrilState != nil && mithrilState.ManifestCapitalization > 0 { + // Priority 2: State file manifest_* fields (fresh start) + epochCtx.Capitalization = mithrilState.ManifestCapitalization + epochCtx.SlotsPerYear = mithrilState.ManifestSlotsPerYear + epochCtx.Inflation = rewards.Inflation{ + Initial: mithrilState.ManifestInflationInitial, + Terminal: mithrilState.ManifestInflationTerminal, + Taper: mithrilState.ManifestInflationTaper, + FoundationVal: mithrilState.ManifestInflationFoundation, + FoundationTerm: mithrilState.ManifestInflationFoundationTerm, + } } else { - // Fallback to manifest (fresh start) - epochCtx.Capitalization = snapshotManifest.Bank.Capitalization - epochCtx.Inflation = snapshotManifest.Bank.Inflation - epochCtx.SlotsPerYear = snapshotManifest.Bank.SlotsPerYear + return nil, fmt.Errorf("state file missing manifest_capitalization - delete AccountsDB and rebuild from snapshot") } - if snapshotManifest.EpochAccountHash != [32]byte{} { - epochCtx.HasEpochAcctsHash = true - epochCtx.EpochAcctsHash = snapshotManifest.EpochAccountHash[:] + // Epoch account hash from state file (required) + if mithrilState != nil && mithrilState.ManifestEpochAcctsHash != "" { + epochAcctsHash, err := base64.StdEncoding.DecodeString(mithrilState.ManifestEpochAcctsHash) + if err != nil { + return nil, fmt.Errorf("corrupted state file: failed to decode manifest_epoch_accts_hash: %w", err) + } + if len(epochAcctsHash) == 32 { + epochCtx.HasEpochAcctsHash = true + epochCtx.EpochAcctsHash = epochAcctsHash + } } + // Note: epoch account hash may be empty for snapshots before SIMD-0160 - return epochCtx + return epochCtx, nil } func updateStakeHistorySysvar(acctsDb *accountsdb.AccountsDb, block *block.Block, prevSlotCtx *sealevel.SlotCtx, targetEpoch uint64, epochSchedule *sealevel.SysvarEpochSchedule, f *features.Features) *sealevel.SysvarStakeHistory { @@ -80,35 +95,28 @@ func updateStakeHistorySysvar(acctsDb *accountsdb.AccountsDb, block *block.Block newRateActivationEpoch := newWarmupCooldownRateEpoch(epochSchedule, f) - var wg sync.WaitGroup var effective atomic.Uint64 var activating atomic.Uint64 var deactivating atomic.Uint64 - workerPool, _ := ants.NewPoolWithFunc(runtime.GOMAXPROCS(0)*8, func(i interface{}) { - defer wg.Done() + // Stream stakes from AccountsDB instead of iterating cache + // Note: callback is called from multiple goroutines (worker pool) + _, err = global.StreamStakeAccounts(acctsDb, prevSlotCtx.Slot, + func(pk solana.PublicKey, delegation *sealevel.Delegation, creditsObs uint64) { + if delegation.StakeLamports == 0 { + return + } - delegation := i.(*sealevel.Delegation) - if delegation.StakeLamports == 0 { - return - } - - stakeHistoryEntry := delegation.StakeActivatingAndDeactivating(targetEpoch, &stakeHistory, newRateActivationEpoch) + stakeHistoryEntry := delegation.StakeActivatingAndDeactivating(targetEpoch, &stakeHistory, newRateActivationEpoch) - effective.Add(stakeHistoryEntry.Effective) - activating.Add(stakeHistoryEntry.Activating) - deactivating.Add(stakeHistoryEntry.Deactivating) - }) - - for _, delegation := range global.StakeCache() { - wg.Add(1) - workerPool.Invoke(delegation) + effective.Add(stakeHistoryEntry.Effective) + activating.Add(stakeHistoryEntry.Activating) + deactivating.Add(stakeHistoryEntry.Deactivating) + }) + if err != nil { + panic(fmt.Sprintf("error streaming stake accounts for stake history: %s", err)) } - wg.Wait() - workerPool.Release() - ants.Release() - var accumulatorStakeHistoryEntry sealevel.StakeHistoryEntry accumulatorStakeHistoryEntry.Activating = activating.Load() accumulatorStakeHistoryEntry.Effective = effective.Load() @@ -217,13 +225,41 @@ func (esb *epochStakesBuilder) TotalEpochStake() uint64 { } func updateEpochStakesAndRefreshVoteCache(leaderScheduleEpoch uint64, b *block.Block, epochSchedule *sealevel.SysvarEpochSchedule, f *features.Features, acctsDb *accountsdb.AccountsDb, slot uint64) { - stakes := global.StakeCache() newRateActivationEpoch := newWarmupCooldownRateEpoch(epochSchedule, f) - // Build vote pubkey set for vote cache refresh (only needs pubkeys, not effective stakes) + // Check if we need to compute epoch stakes (skip on resume) + hasEpochStakes := global.HasEpochStakes(leaderScheduleEpoch) + + // Build vote totals by streaming from AccountsDB + // Use per-batch local maps + mutex merge for thread-safety voteAcctStakes := make(map[solana.PublicKey]uint64) - for _, delegation := range stakes { - voteAcctStakes[delegation.VoterPubkey] += delegation.StakeLamports + var voteAcctStakesMu sync.Mutex + + // For epoch stakes calculation, we'll also track effective stakes + // We pre-compute these even if hasEpochStakes is true (minor overhead vs 2 passes) + effectiveStakes := make(map[solana.PublicKey]uint64) + var effectiveStakesMu sync.Mutex + var totalEffectiveStake atomic.Uint64 + + // Single streaming pass to build both raw totals and effective stakes + _, err := global.StreamStakeAccounts(acctsDb, slot, + func(pk solana.PublicKey, delegation *sealevel.Delegation, creditsObs uint64) { + // Accumulate raw stake totals for vote cache refresh + voteAcctStakesMu.Lock() + voteAcctStakes[delegation.VoterPubkey] += delegation.StakeLamports + voteAcctStakesMu.Unlock() + + // Compute effective stake for epoch stakes + effectiveStake := delegation.Stake(leaderScheduleEpoch, sealevel.SysvarCache.StakeHistory.Sysvar, newRateActivationEpoch) + if effectiveStake > 0 { + effectiveStakesMu.Lock() + effectiveStakes[delegation.VoterPubkey] += effectiveStake + effectiveStakesMu.Unlock() + totalEffectiveStake.Add(effectiveStake) + } + }) + if err != nil { + mlog.Log.Errorf("failed to stream stake accounts: %v", err) } // ALWAYS refresh vote cache from AccountsDB, even if HasEpochStakes is true @@ -232,35 +268,22 @@ func updateEpochStakesAndRefreshVoteCache(leaderScheduleEpoch uint64, b *block.B mlog.Log.Errorf("failed to rebuild vote cache at epoch boundary: %v", err) } - // Skip epoch stakes calculation if already cached (resume) - hasEpochStakes := global.HasEpochStakes(leaderScheduleEpoch) + // Skip epoch stakes storage if already cached (resume) if hasEpochStakes { mlog.Log.Infof("already had EpochStakes for epoch %d", leaderScheduleEpoch) return } + // Store epoch stakes computed during streaming voteCache := global.VoteCache() - esb := newEpochStakesBuilder(leaderScheduleEpoch, voteCache) - var wg sync.WaitGroup - - workerPool, _ := ants.NewPoolWithFunc(runtime.GOMAXPROCS(0)*8, func(i interface{}) { - defer wg.Done() - - delegation := i.(*sealevel.Delegation) - _, exists := voteCache[delegation.VoterPubkey] + for votePk, stake := range effectiveStakes { + voteAcct, exists := voteCache[votePk] if exists { - effectiveStake := delegation.Stake(esb.epoch, sealevel.SysvarCache.StakeHistory.Sysvar, newRateActivationEpoch) - esb.AddStakeForVoteAcct(delegation.VoterPubkey, effectiveStake) + global.PutEpochStakesEntry(leaderScheduleEpoch, votePk, stake, &epochstakes.VoteAccount{NodePubkey: voteAcct.NodePubkey()}) } - }) - - for _, entry := range stakes { - wg.Add(1) - workerPool.Invoke(entry) } - wg.Wait() - esb.Finish() + global.PutEpochTotalStake(leaderScheduleEpoch, totalEffectiveStake.Load()) maps.Copy(b.EpochStakesPerVoteAcct, global.EpochStakes(leaderScheduleEpoch)) - b.TotalEpochStake = esb.TotalEpochStake() + b.TotalEpochStake = totalEffectiveStake.Load() } diff --git a/pkg/replay/rewards.go b/pkg/replay/rewards.go index 806d8aec..dbfb0538 100644 --- a/pkg/replay/rewards.go +++ b/pkg/replay/rewards.go @@ -166,22 +166,28 @@ func beginPartitionedEpochRewardsDistribution(acctsDb *accountsdb.AccountsDb, sl totalRewards := partitionedRewardsInfo.TotalStakingRewards newWarmupCooldownRateEpoch := newWarmupCooldownRateEpoch(epochSchedule, f) - var points wide.Uint128 - var pointsPerStakeAcct map[solana.PublicKey]*rewards.CalculatedStakePoints - stakeCacheSnapshot := global.StakeCacheSnapshot() voteCacheSnapshot := global.VoteCacheSnapshot() - pointsPerStakeAcct, points = rewards.CalculateStakePoints(acctsDb, slotCtx, slot, stakeHistory, newWarmupCooldownRateEpoch, stakeCacheSnapshot, voteCacheSnapshot) - pointValue := rewards.PointValue{Rewards: totalRewards, Points: points} + // Use streaming rewards calculation - no full stake cache needed + pointValue := rewards.PointValue{Rewards: totalRewards, Points: wide.Uint128{}} + streamingResult, err := rewards.CalculateRewardsStreaming( + acctsDb, slot, stakeHistory, newWarmupCooldownRateEpoch, + voteCacheSnapshot, pointValue, epoch-1, slotCtx.Blockhash, slotCtx, f) + if err != nil { + panic(fmt.Sprintf("streaming rewards calculation failed: %s", err)) + } + + // Store spool info for distribution phase + partitionedRewardsInfo.SpoolDir = streamingResult.SpoolDir + partitionedRewardsInfo.SpoolSlot = streamingResult.SpoolSlot + partitionedRewardsInfo.NumRewardPartitionsRemaining = streamingResult.NumPartitions - var validatorRewards map[solana.PublicKey]*atomic.Uint64 - partitionedRewardsInfo.StakingRewards, validatorRewards, partitionedRewardsInfo.RewardPartitions = rewards.CalculateStakeRewardsAndPartitions(pointsPerStakeAcct, slotCtx, stakeHistory, slot, epoch-1, pointValue, newWarmupCooldownRateEpoch, slotCtx.Features, stakeCacheSnapshot, voteCacheSnapshot) - updatedAccts, parentUpdatedAccts, voteRewardsDistributed := rewards.DistributeVotingRewards(acctsDb, validatorRewards, slot) - partitionedRewardsInfo.NumRewardPartitionsRemaining = partitionedRewardsInfo.RewardPartitions.NumPartitions() + // Distribute voting rewards + updatedAccts, parentUpdatedAccts, voteRewardsDistributed := rewards.DistributeVotingRewards(acctsDb, streamingResult.ValidatorRewards, slot) newEpochRewards := sealevel.SysvarEpochRewards{DistributionStartingBlockHeight: block.BlockHeight + 1, - NumPartitions: partitionedRewardsInfo.NumRewardPartitionsRemaining, ParentBlockhash: block.LastBlockhash, - TotalRewards: totalRewards, DistributedRewards: voteRewardsDistributed, TotalPoints: points, Active: true} + NumPartitions: streamingResult.NumPartitions, ParentBlockhash: block.LastBlockhash, + TotalRewards: totalRewards, DistributedRewards: voteRewardsDistributed, TotalPoints: streamingResult.TotalPoints, Active: true} epochRewardsAcct, err := acctsDb.GetAccount(slot, sealevel.SysvarEpochRewardsAddr) if err != nil { @@ -204,6 +210,9 @@ func beginPartitionedEpochRewardsDistribution(acctsDb *accountsdb.AccountsDb, sl updatedAccts = append(updatedAccts, epochRewardsAcct.Clone()) epochCtx.Capitalization += voteRewardsDistributed + mlog.Log.Infof("streaming rewards: %d stake accounts, %d partitions (per-partition spool files in %s)", + streamingResult.NumStakeRewards, streamingResult.NumPartitions, streamingResult.SpoolDir) + return partitionedRewardsInfo, updatedAccts, parentUpdatedAccts } @@ -219,16 +228,16 @@ func distributePartitionedEpochRewardsForSlot(acctsDb *accountsdb.AccountsDb, ep partitionIdx := currentBlockHeight - epochRewards.DistributionStartingBlockHeight - // Initialize shared worker pool on first partition (reused across all 243 partitions) - if partitionedEpochRewardsInfo.WorkerPool == nil { - if err := partitionedEpochRewardsInfo.InitWorkerPool(); err != nil { - panic(fmt.Sprintf("unable to initialize reward distribution worker pool: %s", err)) - } - } - // Set flag to prevent stake account cache pollution during one-shot reward reads/writes acctsDb.InRewardsWindow.Store(true) - distributedAccts, parentDistributedAccts, distributedLamports := rewards.DistributeStakingRewardsForPartition(acctsDb, partitionedEpochRewardsInfo.RewardPartitions.Partition(partitionIdx), partitionedEpochRewardsInfo.StakingRewards, currentSlot, partitionedEpochRewardsInfo.WorkerPool) + + // Use spool-based distribution (sequential I/O from per-partition file) + distributedAccts, parentDistributedAccts, distributedLamports, err := rewards.DistributeStakingRewardsFromSpool( + acctsDb, partitionedEpochRewardsInfo.SpoolDir, partitionedEpochRewardsInfo.SpoolSlot, uint32(partitionIdx), currentSlot) + if err != nil { + panic(fmt.Sprintf("spool distribution failed for partition %d: %s", partitionIdx, err)) + } + parentDistributedAccts = append(parentDistributedAccts, epochRewardsAcct.Clone()) epochRewards.Distribute(distributedLamports) @@ -237,7 +246,11 @@ func distributePartitionedEpochRewardsForSlot(acctsDb *accountsdb.AccountsDb, ep if partitionedEpochRewardsInfo.NumRewardPartitionsRemaining == 0 { epochRewards.Active = false acctsDb.InRewardsWindow.Store(false) - partitionedEpochRewardsInfo.ReleaseWorkerPool() + // Clean up all per-partition spool files when done + rewards.CleanupPartitionedSpoolFiles( + partitionedEpochRewardsInfo.SpoolDir, + partitionedEpochRewardsInfo.SpoolSlot, + epochRewards.NumPartitions) } writer := new(bytes.Buffer) diff --git a/pkg/replay/transaction.go b/pkg/replay/transaction.go index 533bcfd5..5d18a33e 100644 --- a/pkg/replay/transaction.go +++ b/pkg/replay/transaction.go @@ -190,14 +190,16 @@ func recordStakeDelegation(acct *accounts.Account) { } if isEmpty || isUninitialized { - global.DeleteStakeCacheItem(acct.Key) - } else { - stakeState, err := sealevel.UnmarshalStakeState(acct.Data) - if err == nil { - delegation := stakeState.Stake.Stake.Delegation - delegation.CreditsObserved = stakeState.Stake.Stake.CreditsObserved - global.PutStakeCacheItem(acct.Key, &delegation) - } + // Account closed or uninitialized - no need to track + // (stake cache is not used with streaming rewards) + return + } + + // Only track new stake pubkeys for index append + // We no longer populate the full stake cache since streaming reads from AccountsDB directly + stakeState, err := sealevel.UnmarshalStakeState(acct.Data) + if err == nil && stakeState.Status == sealevel.StakeStateV2StatusStake { + global.TrackNewStakePubkey(acct.Key) } } diff --git a/pkg/rewards/partitions.go b/pkg/rewards/partitions.go deleted file mode 100644 index ad1e6fc0..00000000 --- a/pkg/rewards/partitions.go +++ /dev/null @@ -1,46 +0,0 @@ -package rewards - -import ( - "sync" - - "github.com/gagliardetto/solana-go" -) - -type Partition struct { - pubkeys []solana.PublicKey - mu sync.Mutex - partitionIdx uint64 -} - -type Partitions []*Partition - -func NewPartitions(numPartitions uint64) Partitions { - p := make(Partitions, numPartitions) - for i := uint64(0); i < numPartitions; i++ { - p[i] = &Partition{pubkeys: make([]solana.PublicKey, 0, 2000), partitionIdx: i} - } - return p -} - -func (partitions Partitions) AddPubkey(partitionIdx uint64, pk solana.PublicKey) { - prt := partitions[partitionIdx] - prt.mu.Lock() - prt.pubkeys = append(prt.pubkeys, pk) - prt.mu.Unlock() -} - -func (partitions Partitions) Partition(partitionIdx uint64) *Partition { - return partitions[partitionIdx] -} - -func (partitions Partitions) NumPartitions() uint64 { - return uint64(len(partitions)) -} - -func (partition *Partition) NumPubkeys() uint64 { - return uint64(len(partition.pubkeys)) -} - -func (partition *Partition) Pubkeys() []solana.PublicKey { - return partition.pubkeys -} diff --git a/pkg/rewards/points.go b/pkg/rewards/points.go deleted file mode 100644 index 4221a076..00000000 --- a/pkg/rewards/points.go +++ /dev/null @@ -1,38 +0,0 @@ -package rewards - -import ( - "github.com/Overclock-Validator/wide" - "github.com/gagliardetto/solana-go" -) - -type CalculatedStakePointsAccumulator struct { - pubkeys []solana.PublicKey - pointsMap map[solana.PublicKey]*CalculatedStakePoints -} - -func NewCalculatedStakePointsAccumulator(pubkeys []solana.PublicKey) *CalculatedStakePointsAccumulator { - stakePointStructs := make([]CalculatedStakePoints, len(pubkeys)) - accum := &CalculatedStakePointsAccumulator{pubkeys: pubkeys, pointsMap: make(map[solana.PublicKey]*CalculatedStakePoints, len(pubkeys))} - for i, pk := range pubkeys { - accum.pointsMap[pk] = &stakePointStructs[i] - } - return accum -} - -func (accum *CalculatedStakePointsAccumulator) Add(pk solana.PublicKey, points CalculatedStakePoints) { - accum.pointsMap[pk].Points = points.Points - accum.pointsMap[pk].NewCreditsObserved = points.NewCreditsObserved - accum.pointsMap[pk].ForceCreditsUpdateWithSkippedReward = points.ForceCreditsUpdateWithSkippedReward -} - -func (accum CalculatedStakePointsAccumulator) TotalPoints() wide.Uint128 { - var totalPoints wide.Uint128 - for _, pk := range accum.pubkeys { - totalPoints = totalPoints.Add(accum.pointsMap[pk].Points) - } - return totalPoints -} - -func (accum CalculatedStakePointsAccumulator) CalculatedStakePoints() map[solana.PublicKey]*CalculatedStakePoints { - return accum.pointsMap -} diff --git a/pkg/rewards/rewards.go b/pkg/rewards/rewards.go index df72efc4..4fab1881 100644 --- a/pkg/rewards/rewards.go +++ b/pkg/rewards/rewards.go @@ -2,7 +2,9 @@ package rewards import ( "fmt" + "io" "math" + "path/filepath" "runtime" "sort" "sync" @@ -18,7 +20,6 @@ import ( "github.com/Overclock-Validator/wide" "github.com/dgryski/go-sip13" "github.com/gagliardetto/solana-go" - "github.com/gagliardetto/solana-go/rpc" "github.com/panjf2000/ants/v2" ) @@ -33,90 +34,8 @@ type PartitionedRewardDistributionInfo struct { TotalStakingRewards uint64 FirstStakingRewardSlot uint64 NumRewardPartitionsRemaining uint64 - Credits map[solana.PublicKey]CalculatedStakePoints - RewardPartitions Partitions - StakingRewards map[solana.PublicKey]*CalculatedStakeRewards - WorkerPool *ants.PoolWithFunc -} - -// rewardDistributionTask carries all context needed for processing one stake account. -// Used with the shared worker pool to avoid per-partition pool creation overhead. -type rewardDistributionTask struct { - acctsDb *accountsdb.AccountsDb - slot uint64 - stakingRewards map[solana.PublicKey]*CalculatedStakeRewards - accts []*accounts.Account - parentAccts []*accounts.Account - distributedLamports *atomic.Uint64 - wg *sync.WaitGroup - idx int - pubkey solana.PublicKey -} - -// rewardDistributionWorker is the shared worker function for stake reward distribution. -func rewardDistributionWorker(i interface{}) { - task := i.(*rewardDistributionTask) - defer task.wg.Done() - - reward, ok := task.stakingRewards[task.pubkey] - if !ok { - return - } - - stakeAcct, err := task.acctsDb.GetAccount(task.slot, task.pubkey) - if err != nil { - panic(fmt.Sprintf("unable to get acct %s from acctsdb for partitioned epoch rewards distribution in slot %d", task.pubkey, task.slot)) - } - task.parentAccts[task.idx] = stakeAcct.Clone() - - stakeState, err := sealevel.UnmarshalStakeState(stakeAcct.Data) - if err != nil { - return - } - - stakeState.Stake.Stake.CreditsObserved = reward.NewCreditsObserved - stakeState.Stake.Stake.Delegation.StakeLamports = safemath.SaturatingAddU64(stakeState.Stake.Stake.Delegation.StakeLamports, uint64(reward.StakerRewards)) - - err = sealevel.MarshalStakeStakeInto(stakeState, stakeAcct.Data) - if err != nil { - panic(fmt.Sprintf("unable to serialize new stake account state in distributing partitioned rewards: %s", err)) - } - - stakeAcct.Lamports, err = safemath.CheckedAddU64(stakeAcct.Lamports, uint64(reward.StakerRewards)) - if err != nil { - panic(fmt.Sprintf("overflow in partitioned epoch rewards distribution in slot %d to acct %s: %s", task.slot, task.pubkey, err)) - } - - task.accts[task.idx] = stakeAcct - task.distributedLamports.Add(reward.StakerRewards) - - // update the stake cache - delegationToCache := stakeState.Stake.Stake.Delegation - delegationToCache.CreditsObserved = stakeState.Stake.Stake.CreditsObserved - global.PutStakeCacheItem(task.pubkey, &delegationToCache) -} - -// InitWorkerPool creates the shared worker pool for reward distribution. -// Call once at the start of partitioned rewards, before processing any partition. -func (info *PartitionedRewardDistributionInfo) InitWorkerPool() error { - if info.WorkerPool != nil { - return nil - } - size := runtime.GOMAXPROCS(0) * 8 - pool, err := ants.NewPoolWithFunc(size, rewardDistributionWorker) - if err != nil { - return err - } - info.WorkerPool = pool - return nil -} - -// ReleaseWorkerPool releases the shared pool. Call when NumRewardPartitionsRemaining == 0. -func (info *PartitionedRewardDistributionInfo) ReleaseWorkerPool() { - if info.WorkerPool != nil { - info.WorkerPool.Release() - info.WorkerPool = nil - } + SpoolDir string // Base directory for per-partition spool files + SpoolSlot uint64 // Slot for spool file naming } type CalculatedStakePoints struct { @@ -187,11 +106,6 @@ func DeterminePartitionedStakingRewardsInfo(rpcc *rpcclient.RpcClient, rpcBackup return &PartitionedRewardDistributionInfo{TotalStakingRewards: totalStakingRewards, FirstStakingRewardSlot: firstSlotInEpoch + 1} } -type idxAndReward struct { - idx int - reward rpc.BlockReward -} - type idxAndRewardNew struct { idx int reward uint64 @@ -253,38 +167,6 @@ func DistributeVotingRewards(acctsDb *accountsdb.AccountsDb, validatorRewards ma return updatedAccts, parentUpdatedAccts, totalVotingRewards.Load() } -func DistributeStakingRewardsForPartition(acctsDb *accountsdb.AccountsDb, partition *Partition, stakingRewards map[solana.PublicKey]*CalculatedStakeRewards, slot uint64, workerPool *ants.PoolWithFunc) ([]*accounts.Account, []*accounts.Account, uint64) { - var distributedLamports atomic.Uint64 - accts := make([]*accounts.Account, partition.NumPubkeys()) - parentAccts := make([]*accounts.Account, partition.NumPubkeys()) - - var wg sync.WaitGroup - - for idx, stakePk := range partition.Pubkeys() { - task := &rewardDistributionTask{ - acctsDb: acctsDb, - slot: slot, - stakingRewards: stakingRewards, - accts: accts, - parentAccts: parentAccts, - distributedLamports: &distributedLamports, - wg: &wg, - idx: idx, - pubkey: stakePk, - } - wg.Add(1) - workerPool.Invoke(task) - } - wg.Wait() - - err := acctsDb.StoreAccounts(accts, slot, nil) - if err != nil { - panic(fmt.Sprintf("error updating accounts for partitioned epoch rewards in slot %d: %s", slot, err)) - } - - return accts, parentAccts, distributedLamports.Load() -} - func minimumStakeDelegation(slotCtx *sealevel.SlotCtx) uint64 { if !slotCtx.Features.IsActive(features.StakeMinimumDelegationForRewards) { return 0 @@ -322,77 +204,6 @@ type CalculatedStakeRewards struct { NewCreditsObserved uint64 } -func CalculateStakeRewardsAndPartitions(pointsPerStakeAcct map[solana.PublicKey]*CalculatedStakePoints, slotCtx *sealevel.SlotCtx, stakeHistory *sealevel.SysvarStakeHistory, slot uint64, rewardedEpoch uint64, pointValue PointValue, newRateActivationEpoch *uint64, f *features.Features, stakeCache map[solana.PublicKey]*sealevel.Delegation, voteCache map[solana.PublicKey]*sealevel.VoteStateVersions) (map[solana.PublicKey]*CalculatedStakeRewards, map[solana.PublicKey]*atomic.Uint64, Partitions) { - stakeInfoResults := make(map[solana.PublicKey]*CalculatedStakeRewards, 1500000) - validatorRewards := make(map[solana.PublicKey]*atomic.Uint64, 2000) - - minimumStakeDelegation := minimumStakeDelegation(slotCtx) - - var stakeMu sync.Mutex - var wg sync.WaitGroup - - workerPool, _ := ants.NewPoolWithFunc(runtime.GOMAXPROCS(0)*8, func(i interface{}) { - defer wg.Done() - - delegation := i.(*delegationAndPubkey) - - if delegation.delegation.StakeLamports < minimumStakeDelegation { - return - } - - voterPk := delegation.delegation.VoterPubkey - voteStateVersioned := voteCache[voterPk] - if voteStateVersioned == nil { - return - } - - pointsForStakeAcct := pointsPerStakeAcct[delegation.pubkey] - calculatedStakeRewards := CalculateStakeRewardsForAcct(delegation.pubkey, pointsForStakeAcct, delegation.delegation, voteStateVersioned, rewardedEpoch, pointValue, newRateActivationEpoch) - if calculatedStakeRewards != nil { - stakeMu.Lock() - stakeInfoResults[delegation.pubkey] = calculatedStakeRewards - stakeMu.Unlock() - - validatorRewards[voterPk].Add(calculatedStakeRewards.VoterRewards) - } - }) - - for _, delegation := range stakeCache { - _, exists := validatorRewards[delegation.VoterPubkey] - if !exists { - validatorRewards[delegation.VoterPubkey] = &atomic.Uint64{} - } - } - - for pk, delegation := range stakeCache { - d := &delegationAndPubkey{delegation: delegation, pubkey: pk} - wg.Add(1) - workerPool.Invoke(d) - } - wg.Wait() - workerPool.Release() - - numRewardPartitions := CalculateNumRewardPartitions(uint64(len(stakeInfoResults))) - partitions := NewPartitions(numRewardPartitions) - - partitionCalcWorkerPool, _ := ants.NewPoolWithFunc(runtime.GOMAXPROCS(0)*8, func(i interface{}) { - defer wg.Done() - - stakePk := i.(solana.PublicKey) - idx := CalculateRewardPartitionForPubkey(stakePk, slotCtx.Blockhash, numRewardPartitions) - partitions.AddPubkey(idx, stakePk) - }) - - for stakePk := range stakeInfoResults { - wg.Add(1) - partitionCalcWorkerPool.Invoke(stakePk) - } - wg.Wait() - partitionCalcWorkerPool.Release() - - return stakeInfoResults, validatorRewards, partitions -} - func CalculateStakeRewardsForAcct(pubkey solana.PublicKey, stakePointsResult *CalculatedStakePoints, delegation *sealevel.Delegation, voteState *sealevel.VoteStateVersions, rewardedEpoch uint64, pointValue PointValue, newRateActivationEpoch *uint64) *CalculatedStakeRewards { if pointValue.Rewards == 0 || delegation.ActivationEpoch == rewardedEpoch { stakePointsResult.ForceCreditsUpdateWithSkippedReward = true @@ -483,62 +294,6 @@ func voteCommissionSplit(voteState *sealevel.VoteStateVersions, rewards uint64) return result } -type delegationAndPubkey struct { - delegation *sealevel.Delegation - pubkey solana.PublicKey -} - -func CalculateStakePoints( - acctsDb *accountsdb.AccountsDb, - slotCtx *sealevel.SlotCtx, - slot uint64, - stakeHistory *sealevel.SysvarStakeHistory, - newWarmupCooldownRateEpoch *uint64, - stakeCache map[solana.PublicKey]*sealevel.Delegation, - voteCache map[solana.PublicKey]*sealevel.VoteStateVersions, -) (map[solana.PublicKey]*CalculatedStakePoints, wide.Uint128) { - minimum := minimumStakeDelegation(slotCtx) - - n := len(stakeCache) - pks := make([]solana.PublicKey, 0, n) - for pk := range stakeCache { - pks = append(pks, pk) - } - - pointsAccum := NewCalculatedStakePointsAccumulator(pks) - var wg sync.WaitGroup - - size := runtime.GOMAXPROCS(0) * 8 - workerPool, _ := ants.NewPoolWithFunc(size, func(i interface{}) { - defer wg.Done() - - t := i.(*delegationAndPubkey) - d := t.delegation - if d.StakeLamports < minimum { - return - } - - voterPk := d.VoterPubkey - voteState := voteCache[voterPk] - if voteState == nil { - return - } - - pcs := calculateStakePointsAndCredits(t.pubkey, stakeHistory, d, voteState, newWarmupCooldownRateEpoch) - pointsAccum.Add(t.pubkey, pcs) - }) - - for pk, delegation := range stakeCache { - wg.Add(1) - workerPool.Invoke(&delegationAndPubkey{delegation: delegation, pubkey: pk}) - } - - wg.Wait() - workerPool.Release() - - return pointsAccum.CalculatedStakePoints(), pointsAccum.TotalPoints() -} - func calculateStakePointsAndCredits( pubkey solana.PublicKey, stakeHistory *sealevel.SysvarStakeHistory, @@ -623,3 +378,333 @@ func CalculateNumRewardPartitions(numStakingRewards uint64) uint64 { return numRewardPartitions } + +// StreamingRewardsResult holds the results from streaming rewards calculation. +type StreamingRewardsResult struct { + SpoolDir string // Base directory for per-partition spool files + SpoolSlot uint64 // Slot for spool file naming + TotalPoints wide.Uint128 + ValidatorRewards map[solana.PublicKey]*atomic.Uint64 + NumStakeRewards uint64 + NumPartitions uint64 +} + +// spoolWriteRequest is sent to the single-writer goroutine for spool writes. +type spoolWriteRequest struct { + record *SpoolRecord +} + +// CalculateRewardsStreaming performs a two-pass streaming calculation of stake rewards. +// Pass 1: Stream stakes to calculate total points (no caching - flat RAM) +// Pass 2: Recompute points + calculate rewards + write to spool file +// Uses channel-based single writer to capture spool write errors. +func CalculateRewardsStreaming( + acctsDb *accountsdb.AccountsDb, + slot uint64, + stakeHistory *sealevel.SysvarStakeHistory, + newWarmupCooldownRateEpoch *uint64, + voteCache map[solana.PublicKey]*sealevel.VoteStateVersions, + pointValue PointValue, + rewardedEpoch uint64, + blockhash [32]byte, + slotCtx *sealevel.SlotCtx, + f *features.Features, +) (*StreamingRewardsResult, error) { + minimum := minimumStakeDelegation(slotCtx) + + // Pass 1: Stream stakes to calculate total points only (no caching) + var totalPoints wide.Uint128 + var totalPointsMu sync.Mutex + var eligibleCount atomic.Uint64 + + _, err := global.StreamStakeAccounts(acctsDb, slot, + func(pk solana.PublicKey, delegation *sealevel.Delegation, creditsObs uint64) { + if delegation.StakeLamports < minimum { + return + } + + voterPk := delegation.VoterPubkey + voteState := voteCache[voterPk] + if voteState == nil { + return + } + + // Calculate points for this stake account + delegWithCredits := *delegation + delegWithCredits.CreditsObserved = creditsObs + pcs := calculateStakePointsAndCredits(pk, stakeHistory, &delegWithCredits, voteState, newWarmupCooldownRateEpoch) + + // Accumulate total points + totalPointsMu.Lock() + totalPoints = totalPoints.Add(pcs.Points) + totalPointsMu.Unlock() + + // Count eligible accounts for partition calculation + zero128 := wide.Uint128FromUint64(0) + if !pcs.Points.Eq(zero128) || pcs.ForceCreditsUpdateWithSkippedReward { + eligibleCount.Add(1) + } + }) + if err != nil { + return nil, fmt.Errorf("pass 1 streaming stakes for points: %w", err) + } + + // Create point value with calculated total points + pv := PointValue{Rewards: pointValue.Rewards, Points: totalPoints} + + // Calculate number of partitions based on eligible stake count + numPartitions := CalculateNumRewardPartitions(eligibleCount.Load()) + + // Create per-partition spool writers for pass 2 + spoolDir := filepath.Join(acctsDb.AcctsDir, "..") + spoolWriters := NewPartitionedSpoolWriters(spoolDir, slot, numPartitions) + + // Channel-based single writer pattern to capture write errors + // All writes go through one goroutine to avoid file handle contention + writeChan := make(chan spoolWriteRequest, 10000) + var writeErr atomic.Pointer[error] + var writerWg sync.WaitGroup + writerWg.Add(1) + + go func() { + defer writerWg.Done() + for req := range writeChan { + if writeErr.Load() != nil { + continue // already failed, drain channel + } + if err := spoolWriters.WriteRecord(req.record); err != nil { + writeErr.Store(&err) + } + } + }() + + // Track validator rewards (for voting rewards distribution) + validatorRewards := make(map[solana.PublicKey]*atomic.Uint64) + var validatorRewardsMu sync.Mutex + var numStakeRewards atomic.Uint64 + + // Pass 2: Recompute points + calculate rewards + write to per-partition spool files + // (Recomputing points is cheap CPU vs 140MB RAM cache) + _, err = global.StreamStakeAccounts(acctsDb, slot, + func(pk solana.PublicKey, delegation *sealevel.Delegation, creditsObs uint64) { + if delegation.StakeLamports < minimum { + return + } + + voterPk := delegation.VoterPubkey + voteState := voteCache[voterPk] + if voteState == nil { + return + } + + // Recompute points (same as Pass 1 - cheap) + delegWithCredits := *delegation + delegWithCredits.CreditsObserved = creditsObs + pcs := calculateStakePointsAndCredits(pk, stakeHistory, &delegWithCredits, voteState, newWarmupCooldownRateEpoch) + + // Skip if no points and not forced update + zero128 := wide.Uint128FromUint64(0) + if pcs.Points.Eq(zero128) && !pcs.ForceCreditsUpdateWithSkippedReward { + return + } + + // Calculate rewards using recomputed points + calculatedRewards := CalculateStakeRewardsForAcct(pk, &pcs, &delegWithCredits, voteState, rewardedEpoch, pv, newWarmupCooldownRateEpoch) + if calculatedRewards == nil { + return + } + + // Calculate partition index + partitionIdx := CalculateRewardPartitionForPubkey(pk, blockhash, numPartitions) + + // Send to single writer (non-blocking if channel has room) + writeChan <- spoolWriteRequest{record: &SpoolRecord{ + StakePubkey: pk, + VotePubkey: delegation.VoterPubkey, + StakeLamports: delegation.StakeLamports, + CreditsObserved: calculatedRewards.NewCreditsObserved, + RewardLamports: calculatedRewards.StakerRewards, + PartitionIndex: uint32(partitionIdx), + }} + + numStakeRewards.Add(1) + + // Track validator rewards + if calculatedRewards.VoterRewards > 0 { + validatorRewardsMu.Lock() + if _, exists := validatorRewards[voterPk]; !exists { + validatorRewards[voterPk] = &atomic.Uint64{} + } + validatorRewards[voterPk].Add(calculatedRewards.VoterRewards) + validatorRewardsMu.Unlock() + } + }) + + // Close write channel and wait for writer to finish + close(writeChan) + writerWg.Wait() + + // Check for spool write errors + if werr := writeErr.Load(); werr != nil { + spoolWriters.Close() + CleanupPartitionedSpoolFiles(spoolDir, slot, numPartitions) + return nil, fmt.Errorf("spool write failed: %w", *werr) + } + + // Close all partition spool files and check for errors + if err := spoolWriters.Close(); err != nil { + CleanupPartitionedSpoolFiles(spoolDir, slot, numPartitions) + return nil, fmt.Errorf("spool close failed: %w", err) + } + + if err != nil { + CleanupPartitionedSpoolFiles(spoolDir, slot, numPartitions) + return nil, fmt.Errorf("pass 2 streaming stakes for rewards: %w", err) + } + + return &StreamingRewardsResult{ + SpoolDir: spoolDir, + SpoolSlot: slot, + TotalPoints: totalPoints, + ValidatorRewards: validatorRewards, + NumStakeRewards: numStakeRewards.Load(), + NumPartitions: numPartitions, + }, nil +} + +// spoolDistributionTask carries context for processing one spool record. +type spoolDistributionTask struct { + rec *SpoolRecord + acctsDb *accountsdb.AccountsDb + slot uint64 + accts *[]*accounts.Account + parentAccts *[]*accounts.Account + mu *sync.Mutex + distributed *atomic.Uint64 + firstError *atomic.Pointer[error] +} + +// DistributeStakingRewardsFromSpool reads rewards from a per-partition spool file and distributes them. +// Uses streaming I/O - reads records one at a time to keep RAM flat. +// STRICT MODE: Any account read/unmarshal/marshal failure is fatal - we cannot diverge from consensus. +func DistributeStakingRewardsFromSpool( + acctsDb *accountsdb.AccountsDb, + spoolDir string, + spoolSlot uint64, + partitionIndex uint32, + slot uint64, +) ([]*accounts.Account, []*accounts.Account, uint64, error) { + // Open partition-specific spool file for sequential reading + reader, err := NewPartitionReader(spoolDir, spoolSlot, partitionIndex) + if err != nil { + return nil, nil, 0, fmt.Errorf("opening partition %d reader: %w", partitionIndex, err) + } + if reader == nil { + // No records for this partition + return nil, nil, 0, nil + } + defer reader.Close() + + var distributedLamports atomic.Uint64 + var firstError atomic.Pointer[error] + var mu sync.Mutex + + // Dynamic slices - append as we process (no pre-allocation with nils) + var accts []*accounts.Account + var parentAccts []*accounts.Account + + var wg sync.WaitGroup + size := runtime.GOMAXPROCS(0) * 8 + workerPool, err := ants.NewPoolWithFunc(size, func(i interface{}) { + defer wg.Done() + + task := i.(*spoolDistributionTask) + + // Skip if we already have an error + if task.firstError.Load() != nil { + return + } + + stakeAcct, err := task.acctsDb.GetAccount(task.slot, task.rec.StakePubkey) + if err != nil { + newErr := fmt.Errorf("GetAccount %s: %w", task.rec.StakePubkey, err) + task.firstError.CompareAndSwap(nil, &newErr) + return + } + parentAcct := stakeAcct.Clone() + + stakeState, err := sealevel.UnmarshalStakeState(stakeAcct.Data) + if err != nil { + newErr := fmt.Errorf("UnmarshalStakeState %s: %w", task.rec.StakePubkey, err) + task.firstError.CompareAndSwap(nil, &newErr) + return + } + + // Apply reward + stakeState.Stake.Stake.CreditsObserved = task.rec.CreditsObserved + stakeState.Stake.Stake.Delegation.StakeLamports = safemath.SaturatingAddU64( + stakeState.Stake.Stake.Delegation.StakeLamports, task.rec.RewardLamports) + + err = sealevel.MarshalStakeStakeInto(stakeState, stakeAcct.Data) + if err != nil { + newErr := fmt.Errorf("MarshalStakeStakeInto %s: %w", task.rec.StakePubkey, err) + task.firstError.CompareAndSwap(nil, &newErr) + return + } + + stakeAcct.Lamports = safemath.SaturatingAddU64(stakeAcct.Lamports, task.rec.RewardLamports) + task.distributed.Add(task.rec.RewardLamports) + + // Append to result slices under lock + task.mu.Lock() + *task.accts = append(*task.accts, stakeAcct) + *task.parentAccts = append(*task.parentAccts, parentAcct) + task.mu.Unlock() + }) + if err != nil { + return nil, nil, 0, fmt.Errorf("creating worker pool: %w", err) + } + defer workerPool.Release() + + // Stream records from spool file - one at a time (flat RAM) + for { + rec, err := reader.Next() + if err == io.EOF { + break + } + if err != nil { + return nil, nil, 0, fmt.Errorf("reading partition %d record: %w", partitionIndex, err) + } + + // Submit to worker pool - check for invoke errors + wg.Add(1) + if invokeErr := workerPool.Invoke(&spoolDistributionTask{ + rec: rec, + acctsDb: acctsDb, + slot: slot, + accts: &accts, + parentAccts: &parentAccts, + mu: &mu, + distributed: &distributedLamports, + firstError: &firstError, + }); invokeErr != nil { + wg.Done() // balance the Add since worker won't run + return nil, nil, 0, fmt.Errorf("worker pool invoke failed: %w", invokeErr) + } + } + wg.Wait() + + // STRICT: Any failure is fatal - we cannot silently skip rewards and diverge from consensus + if ferr := firstError.Load(); ferr != nil { + return nil, nil, 0, fmt.Errorf("reward distribution partition %d failed: %w", partitionIndex, *ferr) + } + + if len(accts) > 0 { + err = acctsDb.StoreAccounts(accts, slot, nil) + if err != nil { + return nil, nil, 0, fmt.Errorf("storing accounts: %w", err) + } + } + + return accts, parentAccts, distributedLamports.Load(), nil +} diff --git a/pkg/rewards/spool.go b/pkg/rewards/spool.go new file mode 100644 index 00000000..049611b3 --- /dev/null +++ b/pkg/rewards/spool.go @@ -0,0 +1,218 @@ +package rewards + +import ( + "bufio" + "encoding/binary" + "fmt" + "io" + "os" + "path/filepath" + "sync" + + "github.com/gagliardetto/solana-go" +) + +// SpoolRecordSize is the binary size of a spool record. +// Format: stake_pubkey(32) + vote_pubkey(32) + stake_lamports(8) + +// +// credits_observed(8) + reward_lamports(8) = 88 bytes +const SpoolRecordSize = 88 + +// SpoolRecord represents a single stake reward record. +type SpoolRecord struct { + StakePubkey solana.PublicKey + VotePubkey solana.PublicKey + StakeLamports uint64 + CreditsObserved uint64 + RewardLamports uint64 + PartitionIndex uint32 // Only used during calculation, not serialized +} + +// encodeRecord encodes a record into the buffer (without partition index). +func encodeRecord(rec *SpoolRecord, buf []byte) { + copy(buf[0:32], rec.StakePubkey[:]) + copy(buf[32:64], rec.VotePubkey[:]) + binary.LittleEndian.PutUint64(buf[64:72], rec.StakeLamports) + binary.LittleEndian.PutUint64(buf[72:80], rec.CreditsObserved) + binary.LittleEndian.PutUint64(buf[80:88], rec.RewardLamports) +} + +// decodeRecord decodes a record from the buffer. +func decodeRecord(buf []byte, rec *SpoolRecord) { + copy(rec.StakePubkey[:], buf[0:32]) + copy(rec.VotePubkey[:], buf[32:64]) + rec.StakeLamports = binary.LittleEndian.Uint64(buf[64:72]) + rec.CreditsObserved = binary.LittleEndian.Uint64(buf[72:80]) + rec.RewardLamports = binary.LittleEndian.Uint64(buf[80:88]) +} + +// PartitionedSpoolWriters manages per-partition spool files. +// Thread-safe - multiple goroutines can write concurrently. +// Uses buffered I/O for performance. +type PartitionedSpoolWriters struct { + baseDir string + slot uint64 + numPartitions uint64 + writers map[uint32]*partitionWriter + mu sync.Mutex + closed bool +} + +// partitionWriter is a buffered writer for a single partition file. +type partitionWriter struct { + file *os.File + bufw *bufio.Writer + count int +} + +// NewPartitionedSpoolWriters creates a new set of per-partition spool writers. +func NewPartitionedSpoolWriters(baseDir string, slot uint64, numPartitions uint64) *PartitionedSpoolWriters { + return &PartitionedSpoolWriters{ + baseDir: baseDir, + slot: slot, + numPartitions: numPartitions, + writers: make(map[uint32]*partitionWriter), + } +} + +// SpoolDir returns the base directory for spool files. +func (p *PartitionedSpoolWriters) SpoolDir() string { + return p.baseDir +} + +// Slot returns the slot this spool is for. +func (p *PartitionedSpoolWriters) Slot() uint64 { + return p.slot +} + +// WriteRecord writes a record to the appropriate partition file. +// Thread-safe - lazily opens partition files as needed. +func (p *PartitionedSpoolWriters) WriteRecord(rec *SpoolRecord) error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.closed { + return fmt.Errorf("spool writers are closed") + } + + partition := rec.PartitionIndex + + // Get or create writer for this partition + w, exists := p.writers[partition] + if !exists { + path := partitionFilePath(p.baseDir, p.slot, partition) + f, err := os.Create(path) + if err != nil { + return fmt.Errorf("creating partition %d spool file: %w", partition, err) + } + // 1MB buffer for efficient sequential writes + w = &partitionWriter{file: f, bufw: bufio.NewWriterSize(f, 1<<20)} + p.writers[partition] = w + } + + // Write record to buffer + var buf [SpoolRecordSize]byte + encodeRecord(rec, buf[:]) + if _, err := w.bufw.Write(buf[:]); err != nil { + return fmt.Errorf("writing to partition %d: %w", partition, err) + } + w.count++ + return nil +} + +// Close flushes buffers, syncs, and closes all partition files. +// Returns the first error encountered. +func (p *PartitionedSpoolWriters) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.closed { + return nil + } + p.closed = true + + var firstErr error + for partition, w := range p.writers { + // Flush buffer first + if err := w.bufw.Flush(); err != nil && firstErr == nil { + firstErr = fmt.Errorf("flushing partition %d: %w", partition, err) + } + // Sync to disk + if err := w.file.Sync(); err != nil && firstErr == nil { + firstErr = fmt.Errorf("syncing partition %d: %w", partition, err) + } + // Close file + if err := w.file.Close(); err != nil && firstErr == nil { + firstErr = fmt.Errorf("closing partition %d: %w", partition, err) + } + } + return firstErr +} + +// TotalRecords returns the total number of records written across all partitions. +func (p *PartitionedSpoolWriters) TotalRecords() int { + p.mu.Lock() + defer p.mu.Unlock() + + total := 0 + for _, w := range p.writers { + total += w.count + } + return total +} + +// PartitionReader reads records sequentially from a partition spool file. +// Uses buffered I/O for efficient sequential reads. +type PartitionReader struct { + file *os.File + bufr *bufio.Reader + buf [SpoolRecordSize]byte +} + +// NewPartitionReader opens a partition file for sequential reading. +func NewPartitionReader(baseDir string, slot uint64, partition uint32) (*PartitionReader, error) { + path := partitionFilePath(baseDir, slot, partition) + f, err := os.Open(path) + if err != nil { + if os.IsNotExist(err) { + // No records for this partition + return nil, nil + } + return nil, fmt.Errorf("opening partition %d spool: %w", partition, err) + } + // 1MB buffer for efficient sequential reads + return &PartitionReader{file: f, bufr: bufio.NewReaderSize(f, 1<<20)}, nil +} + +// Next reads the next record. Returns io.EOF when done. +func (r *PartitionReader) Next() (*SpoolRecord, error) { + _, err := io.ReadFull(r.bufr, r.buf[:]) + if err == io.EOF { + return nil, io.EOF + } + if err != nil { + return nil, fmt.Errorf("reading spool record: %w", err) + } + + rec := &SpoolRecord{} + decodeRecord(r.buf[:], rec) + return rec, nil +} + +// Close closes the partition file. +func (r *PartitionReader) Close() error { + return r.file.Close() +} + +// partitionFilePath returns the path for a partition spool file. +func partitionFilePath(baseDir string, slot uint64, partition uint32) string { + return filepath.Join(baseDir, fmt.Sprintf("reward_spool_%d_p%d.bin", slot, partition)) +} + +// CleanupPartitionedSpoolFiles removes all partition spool files for a slot. +func CleanupPartitionedSpoolFiles(baseDir string, slot uint64, numPartitions uint64) { + for p := uint64(0); p < numPartitions; p++ { + path := partitionFilePath(baseDir, slot, uint32(p)) + os.Remove(path) // Ignore errors - file may not exist + } +} diff --git a/pkg/snapshot/manifest_seed.go b/pkg/snapshot/manifest_seed.go new file mode 100644 index 00000000..f4609284 --- /dev/null +++ b/pkg/snapshot/manifest_seed.go @@ -0,0 +1,133 @@ +package snapshot + +import ( + "encoding/base64" + "encoding/json" + "sort" + + "github.com/Overclock-Validator/mithril/pkg/base58" + "github.com/Overclock-Validator/mithril/pkg/epochstakes" + "github.com/Overclock-Validator/mithril/pkg/state" +) + +// PopulateManifestSeed copies manifest data to state file for replay context. +// Called ONCE after AccountsDB build completes, before writing state file. +// This eliminates the need to read the manifest at runtime. +func PopulateManifestSeed(s *state.MithrilState, m *SnapshotManifest) { + // Block config + s.ManifestParentSlot = m.Bank.Slot + s.ManifestParentBankhash = base58.Encode(m.Bank.Hash[:]) + + // LtHash: use Hash() method, encode as base64 + if m.LtHash != nil { + s.ManifestAcctsLtHash = base64.StdEncoding.EncodeToString(m.LtHash.Hash()) + } + + // Fee rate governor (static fields only) + s.ManifestFeeRateGovernor = &state.ManifestFeeRateGovernorSeed{ + TargetLamportsPerSignature: m.Bank.FeeRateGovernor.TargetLamportsPerSignature, + TargetSignaturesPerSlot: m.Bank.FeeRateGovernor.TargetSignaturesPerSlot, + MinLamportsPerSignature: m.Bank.FeeRateGovernor.MinLamportsPerSignature, + MaxLamportsPerSignature: m.Bank.FeeRateGovernor.MaxLamportsPerSignature, + BurnPercent: m.Bank.FeeRateGovernor.BurnPercent, + } + + // Signature/fee state + s.ManifestSignatureCount = m.Bank.SignatureCount + s.ManifestLamportsPerSignature = m.LamportsPerSignature + + // Blockhash context (sort by hash_index descending) + ages := make([]HashAgePair, len(m.Bank.BlockhashQueue.HashAndAge)) + copy(ages, m.Bank.BlockhashQueue.HashAndAge) + sort.Slice(ages, func(i, j int) bool { + return ages[i].Val.HashIndex > ages[j].Val.HashIndex + }) + + // Store top 150 blockhashes + numBlockhashes := min(150, len(ages)) + s.ManifestRecentBlockhashes = make([]state.BlockhashEntry, numBlockhashes) + for i := 0; i < numBlockhashes; i++ { + s.ManifestRecentBlockhashes[i] = state.BlockhashEntry{ + Blockhash: base58.Encode(ages[i].Key[:]), + LamportsPerSignature: ages[i].Val.FeeCalculator.LamportsPerSignature, + } + } + + // Guard: only access ages[150] if we have at least 151 entries + if len(ages) > 150 { + s.ManifestEvictedBlockhash = base58.Encode(ages[150].Key[:]) + } + + // ReplayCtx seed + s.ManifestCapitalization = m.Bank.Capitalization + s.ManifestSlotsPerYear = m.Bank.SlotsPerYear + s.ManifestInflationInitial = m.Bank.Inflation.Initial + s.ManifestInflationTerminal = m.Bank.Inflation.Terminal + s.ManifestInflationTaper = m.Bank.Inflation.Taper + s.ManifestInflationFoundation = m.Bank.Inflation.FoundationVal + s.ManifestInflationFoundationTerm = m.Bank.Inflation.FoundationTerm + + // Epoch account hash (base64 for consistency with LtHash) + if m.EpochAccountHash != [32]byte{} { + s.ManifestEpochAcctsHash = base64.StdEncoding.EncodeToString(m.EpochAccountHash[:]) + } + + // Transaction count at snapshot + s.ManifestTransactionCount = m.Bank.TransactionCount + + // Epoch authorized voters (for snapshot epoch only) + // Supports multiple authorized voters per vote account (matches original manifest behavior) + s.ManifestEpochAuthorizedVoters = make(map[string][]string) + for _, epochStake := range m.VersionedEpochStakes { + if epochStake.Epoch == m.Bank.Epoch { + for _, entry := range epochStake.Val.EpochAuthorizedVoters { + voteAcctStr := base58.Encode(entry.Key[:]) + authorizedVoterStr := base58.Encode(entry.Val[:]) + s.ManifestEpochAuthorizedVoters[voteAcctStr] = append(s.ManifestEpochAuthorizedVoters[voteAcctStr], authorizedVoterStr) + } + } + } + + // Epoch stakes: convert VersionedEpochStakes to PersistedEpochStakes format + // This stores ONLY vote-account aggregates, NOT full stake account data + s.ManifestEpochStakes = convertVersionedEpochStakesToPersisted(m.VersionedEpochStakes) +} + +// convertVersionedEpochStakesToPersisted converts manifest epoch stakes to +// the same PersistedEpochStakes JSON format used by ComputedEpochStakes. +// Only stores vote-account stakes (aggregated), NOT full stake account data. +func convertVersionedEpochStakesToPersisted(stakes []VersionedEpochStakesPair) map[uint64]string { + result := make(map[uint64]string, len(stakes)) + + for _, epochStake := range stakes { + // Build PersistedEpochStakes from manifest data + persisted := epochstakes.PersistedEpochStakes{ + Epoch: epochStake.Epoch, + TotalStake: epochStake.Val.TotalStake, + Stakes: make(map[string]uint64), + VoteAccts: make(map[string]*epochstakes.VoteAccountJSON), + } + + // Extract vote accounts from Stakes.VoteAccounts (aggregated data) + for _, va := range epochStake.Val.Stakes.VoteAccounts { + pkStr := base58.Encode(va.Key[:]) + persisted.Stakes[pkStr] = va.Stake + persisted.VoteAccts[pkStr] = &epochstakes.VoteAccountJSON{ + Lamports: va.Value.Lamports, + NodePubkey: base58.Encode(va.Value.NodePubkey[:]), + LastTimestampTs: va.Value.LastTimestampTs, + LastTimestampSlot: va.Value.LastTimestampSlot, + Owner: base58.Encode(va.Value.Owner[:]), + Executable: va.Value.Executable, + RentEpoch: va.Value.RentEpoch, + } + } + + data, err := json.Marshal(persisted) + if err != nil { + continue + } + result[epochStake.Epoch] = string(data) + } + return result +} diff --git a/pkg/state/state.go b/pkg/state/state.go index df678cc9..83531581 100644 --- a/pkg/state/state.go +++ b/pkg/state/state.go @@ -16,7 +16,7 @@ const HistoryFileName = "mithril_state.history.jsonl" // CurrentStateSchemaVersion is the current version of the state file format. // Increment this when making breaking changes to the state file structure. -const CurrentStateSchemaVersion uint32 = 1 +const CurrentStateSchemaVersion uint32 = 2 // MithrilState tracks the current state of the mithril node. // The state file serves as an atomic marker of validity - AccountsDB is valid @@ -58,6 +58,52 @@ type MithrilState struct { CorruptionReason string `json:"corruption_reason,omitempty"` CorruptionDetectedAt time.Time `json:"corruption_detected_at,omitempty"` + // ========================================================================= + // Manifest Seed Data (copied from manifest at snapshot build time) + // Used ONLY for fresh-start replay. Resume uses Last* fields instead. + // ========================================================================= + + // Block configuration seed + ManifestParentSlot uint64 `json:"manifest_parent_slot,omitempty"` + ManifestParentBankhash string `json:"manifest_parent_bankhash,omitempty"` // base58 + ManifestAcctsLtHash string `json:"manifest_accts_lt_hash,omitempty"` // base64 + + // Fee rate governor seed (static fields only) + ManifestFeeRateGovernor *ManifestFeeRateGovernorSeed `json:"manifest_fee_rate_governor,omitempty"` + + // Signature/fee state at snapshot + ManifestSignatureCount uint64 `json:"manifest_signature_count,omitempty"` + ManifestLamportsPerSignature uint64 `json:"manifest_lamports_per_sig,omitempty"` + + // Blockhash context (150 recent + 1 evicted) + ManifestRecentBlockhashes []BlockhashEntry `json:"manifest_recent_blockhashes,omitempty"` + ManifestEvictedBlockhash string `json:"manifest_evicted_blockhash,omitempty"` // base58 + + // ReplayCtx seed (inflation/capitalization at snapshot) + ManifestCapitalization uint64 `json:"manifest_capitalization,omitempty"` + ManifestSlotsPerYear float64 `json:"manifest_slots_per_year,omitempty"` + ManifestInflationInitial float64 `json:"manifest_inflation_initial,omitempty"` + ManifestInflationTerminal float64 `json:"manifest_inflation_terminal,omitempty"` + ManifestInflationTaper float64 `json:"manifest_inflation_taper,omitempty"` + ManifestInflationFoundation float64 `json:"manifest_inflation_foundation,omitempty"` + ManifestInflationFoundationTerm float64 `json:"manifest_inflation_foundation_term,omitempty"` + + // Epoch account hash (base64 for consistency with LtHash) + ManifestEpochAcctsHash string `json:"manifest_epoch_accts_hash,omitempty"` // base64 + + // Transaction count at snapshot slot + ManifestTransactionCount uint64 `json:"manifest_transaction_count,omitempty"` + + // Epoch authorized voters (for current epoch only) + // Maps vote account pubkey (base58) -> list of authorized voter pubkeys (base58) + // Multiple authorized voters per vote account are supported (matches original manifest behavior) + ManifestEpochAuthorizedVoters map[string][]string `json:"manifest_epoch_authorized_voters,omitempty"` + + // Epoch stakes seed - AGGREGATED vote-account stakes only (NOT full VersionedEpochStakes) + // Same format as ComputedEpochStakes (PersistedEpochStakes JSON) + // Cleared after first replayed slot to save space. + ManifestEpochStakes map[uint64]string `json:"manifest_epoch_stakes,omitempty"` + // ========================================================================= // Current Position (where we left off) // ========================================================================= @@ -123,6 +169,17 @@ type BlockhashEntry struct { LamportsPerSignature uint64 `json:"lamports_per_sig"` } +// ManifestFeeRateGovernorSeed contains the static fields from FeeRateGovernor +// that do not change during replay. Dynamic fields (LamportsPerSignature, +// PrevLamportsPerSignature) are stored separately and updated on resume. +type ManifestFeeRateGovernorSeed struct { + TargetLamportsPerSignature uint64 `json:"target_lamports_per_sig"` + TargetSignaturesPerSlot uint64 `json:"target_sigs_per_slot"` + MinLamportsPerSignature uint64 `json:"min_lamports_per_sig"` + MaxLamportsPerSignature uint64 `json:"max_lamports_per_sig"` + BurnPercent byte `json:"burn_percent"` +} + // SlotHashEntry represents a single entry in the SlotHashes sysvar type SlotHashEntry struct { Slot uint64 `json:"slot"` @@ -155,15 +212,9 @@ func LoadState(accountsDbDir string) (*MithrilState, error) { return nil, fmt.Errorf("failed to parse state file: %w", err) } - // Migrate from older schema versions - if state.StateSchemaVersion == 0 { - // Version 0 → 1 migration: - // - Migrate LastCommit to LastWriterCommit - if state.LastCommit != "" && state.LastWriterCommit == "" { - state.LastWriterCommit = state.LastCommit - } - state.StateSchemaVersion = 1 - // Note: We don't save here - the state will be saved on next update + // Require schema version 2 - no migration from older versions + if state.StateSchemaVersion != CurrentStateSchemaVersion { + return nil, fmt.Errorf("state file schema version %d is not supported (requires v%d). Delete AccountsDB and rebuild from snapshot", state.StateSchemaVersion, CurrentStateSchemaVersion) } return &state, nil @@ -348,6 +399,12 @@ func (s *MithrilState) HasResumeData() bool { return s != nil && s.LastSlot > 0 && s.LastAcctsLtHash != "" } +// ClearManifestEpochStakes removes the manifest epoch stakes after they're no longer needed. +// This should be called after the first slot is replayed past the snapshot slot. +func (s *MithrilState) ClearManifestEpochStakes() { + s.ManifestEpochStakes = nil +} + // getWriterCommit returns the writer commit, preferring the new field but falling back to legacy. func (s *MithrilState) getWriterCommit() string { if s.LastWriterCommit != "" {