Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion cmd/mithril/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,8 @@ func runVerifyRange(c *cobra.Command, args []string) {
snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot)
}
mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0)
// Populate manifest seed data so replay doesn't need manifest at runtime
snapshot.PopulateManifestSeed(mithrilState, manifest)
if err := mithrilState.Save(accountsDbDir); err != nil {
mlog.Log.Errorf("failed to save state file: %v", err)
}
Expand Down Expand Up @@ -1175,6 +1177,8 @@ func runLive(c *cobra.Command, args []string) {
snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot)
}
mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0)
// Populate manifest seed data so replay doesn't need manifest at runtime
snapshot.PopulateManifestSeed(mithrilState, manifest)
if err := mithrilState.Save(accountsPath); err != nil {
mlog.Log.Errorf("failed to save state file: %v", err)
}
Expand Down Expand Up @@ -1243,6 +1247,8 @@ func runLive(c *cobra.Command, args []string) {
snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot)
}
mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0)
// Populate manifest seed data so replay doesn't need manifest at runtime
snapshot.PopulateManifestSeed(mithrilState, manifest)
if err := mithrilState.Save(accountsPath); err != nil {
mlog.Log.Errorf("failed to save state file: %v", err)
}
Expand Down Expand Up @@ -1299,6 +1305,8 @@ func runLive(c *cobra.Command, args []string) {
snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot)
}
mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0)
// Populate manifest seed data so replay doesn't need manifest at runtime
snapshot.PopulateManifestSeed(mithrilState, manifest)
if err := mithrilState.Save(accountsPath); err != nil {
mlog.Log.Errorf("failed to save state file: %v", err)
}
Expand Down Expand Up @@ -1369,6 +1377,8 @@ func runLive(c *cobra.Command, args []string) {
snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot)
}
mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0)
// Populate manifest seed data so replay doesn't need manifest at runtime
snapshot.PopulateManifestSeed(mithrilState, manifest)
if err := mithrilState.Save(accountsPath); err != nil {
mlog.Log.Errorf("failed to save state file: %v", err)
}
Expand Down Expand Up @@ -1471,6 +1481,8 @@ func runLive(c *cobra.Command, args []string) {
WriterVersion: getVersion(),
WriterCommit: getCommit(),
})
// Populate manifest seed data so replay doesn't need manifest at runtime
snapshot.PopulateManifestSeed(mithrilState, manifest)
if err := mithrilState.Save(accountsPath); err != nil {
mlog.Log.Errorf("failed to save state file: %v", err)
}
Expand Down Expand Up @@ -1582,6 +1594,8 @@ postBootstrap:
snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot)
}
mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0)
// Populate manifest seed data so replay doesn't need manifest at runtime
snapshot.PopulateManifestSeed(mithrilState, manifest)
if err := mithrilState.Save(accountsPath); err != nil {
mlog.Log.Errorf("failed to save state file: %v", err)
}
Expand Down Expand Up @@ -2604,6 +2618,6 @@ func runReplayWithRecovery(
}
}()

result = replay.ReplayBlocks(ctx, accountsDb, accountsDbPath, manifest, resumeState, startSlot, endSlot, rpcEndpoints, blockDir, txParallelism, isLive, useLightbringer, dbgOpts, metricsWriter, rpcServer, blockFetchOpts, onCancelWriteState)
result = replay.ReplayBlocks(ctx, accountsDb, accountsDbPath, mithrilState, resumeState, startSlot, endSlot, rpcEndpoints, blockDir, txParallelism, isLive, useLightbringer, dbgOpts, metricsWriter, rpcServer, blockFetchOpts, onCancelWriteState)
return result
}
140 changes: 140 additions & 0 deletions pkg/global/global_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"sync"
"sync/atomic"

"github.com/Overclock-Validator/mithril/pkg/accountsdb"
"github.com/Overclock-Validator/mithril/pkg/epochstakes"
"github.com/Overclock-Validator/mithril/pkg/forkchoice"
"github.com/Overclock-Validator/mithril/pkg/leaderschedule"
"github.com/Overclock-Validator/mithril/pkg/sealevel"
"github.com/gagliardetto/solana-go"
"github.com/panjf2000/ants/v2"
)

// StakePubkeyIndexFileName is the name of the stake pubkey index file
Expand All @@ -27,6 +31,7 @@ type GlobalCtx struct {
stakeCache map[solana.PublicKey]*sealevel.Delegation
pendingNewStakePubkeys []solana.PublicKey // New stake pubkeys to append to index after block commit
voteCache map[solana.PublicKey]*sealevel.VoteStateVersions
voteStakeTotals map[solana.PublicKey]uint64 // Aggregated stake totals per vote account (replaces full stake cache at startup)
epochStakes *epochstakes.EpochStakesCache
epochAuthorizedVoters *epochstakes.EpochAuthorizedVotersCache
forkChoice *forkchoice.ForkChoiceService
Expand All @@ -37,6 +42,7 @@ type GlobalCtx struct {
manageBlockHeight bool
stakeCacheMutex sync.Mutex // Changed from RWMutex - simpler, used for both cache and pending
voteCacheMutex sync.RWMutex
voteStakeTotalsMu sync.RWMutex
slotsConfirmedMutex sync.Mutex
mu sync.Mutex
}
Expand Down Expand Up @@ -78,6 +84,8 @@ func IncrTransactionCount(num uint64) {
// PutStakeCacheItem adds or updates a stake cache entry during replay.
// If this is a NEW pubkey (not already in cache), it's added to pendingNewStakePubkeys
// for later append to the index file via FlushPendingStakePubkeys.
// NOTE: With streaming rewards, this function may only be called during rewards distribution
// to maintain consistency. For normal transaction recording, use TrackNewStakePubkey instead.
func PutStakeCacheItem(pubkey solana.PublicKey, delegation *sealevel.Delegation) {
instance.stakeCacheMutex.Lock()
defer instance.stakeCacheMutex.Unlock()
Expand All @@ -92,6 +100,31 @@ func PutStakeCacheItem(pubkey solana.PublicKey, delegation *sealevel.Delegation)
instance.stakeCache[pubkey] = delegation
}

// TrackNewStakePubkey tracks a new stake pubkey for index append without populating the cache.
// Use this during normal transaction recording when streaming rewards is enabled.
// Returns true if the pubkey was newly added to pending list.
func TrackNewStakePubkey(pubkey solana.PublicKey) bool {
instance.stakeCacheMutex.Lock()
defer instance.stakeCacheMutex.Unlock()

// Check if already in cache or pending list
// Since we're not populating the cache anymore, we need to track seen pubkeys differently
if instance.stakeCache == nil {
instance.stakeCache = make(map[solana.PublicKey]*sealevel.Delegation)
}

// Check if already known (in cache from previous runs or pending)
_, exists := instance.stakeCache[pubkey]
if exists {
return false
}

// Mark as seen with nil delegation (just for tracking)
instance.stakeCache[pubkey] = nil
instance.pendingNewStakePubkeys = append(instance.pendingNewStakePubkeys, pubkey)
return true
}

// PutStakeCacheItemBulk adds a stake cache entry during bulk population (startup).
// Does NOT track new pubkeys - use this when loading cache from index/snapshot/scan
// to avoid enqueueing the entire cache on rebuild.
Expand Down Expand Up @@ -196,6 +229,37 @@ func VoteCacheSnapshot() map[solana.PublicKey]*sealevel.VoteStateVersions {
return snapshot
}

// SetVoteStakeTotals sets the aggregated stake totals per vote account.
// Called at startup after scanning all stake accounts.
func SetVoteStakeTotals(m map[solana.PublicKey]uint64) {
instance.voteStakeTotalsMu.Lock()
instance.voteStakeTotals = m
instance.voteStakeTotalsMu.Unlock()
}

// VoteStakeTotalsCopy returns a copy of the vote stake totals map.
// Safe for concurrent use - callers can mutate the returned map.
func VoteStakeTotalsCopy() map[solana.PublicKey]uint64 {
instance.voteStakeTotalsMu.RLock()
defer instance.voteStakeTotalsMu.RUnlock()
if instance.voteStakeTotals == nil {
return nil
}
result := make(map[solana.PublicKey]uint64, len(instance.voteStakeTotals))
for k, v := range instance.voteStakeTotals {
result[k] = v
}
return result
}

// VoteStakeTotalsRef returns a direct reference to the vote stake totals map.
// Callers must NOT mutate the returned map.
func VoteStakeTotalsRef() map[solana.PublicKey]uint64 {
instance.voteStakeTotalsMu.RLock()
defer instance.voteStakeTotalsMu.RUnlock()
return instance.voteStakeTotals
}

func PutEpochStakesEntry(epoch uint64, pubkey solana.PublicKey, stake uint64, voteAcct *epochstakes.VoteAccount) {
if instance.epochStakes == nil {
instance.epochStakes = epochstakes.NewEpochStakesCache()
Expand Down Expand Up @@ -487,3 +551,79 @@ func SaveStakePubkeyIndex(accountsDbDir string) error {

return nil
}

// StreamStakeAccounts iterates all stake accounts from the pubkey index,
// calling fn for each valid delegation. Returns count of processed accounts.
// This streams directly from AccountsDB without building a full stake cache.
func StreamStakeAccounts(
acctsDb *accountsdb.AccountsDb,
slot uint64,
fn func(pubkey solana.PublicKey, delegation *sealevel.Delegation, creditsObserved uint64),
) (int, error) {
// Load stake pubkeys from index file
acctsDbDir := filepath.Join(acctsDb.AcctsDir, "..")
stakePubkeys, err := LoadStakePubkeyIndex(acctsDbDir)
if err != nil {
return 0, fmt.Errorf("loading stake pubkey index: %w", err)
}

var processedCount atomic.Int64
var wg sync.WaitGroup

// Batched worker pool for parallel processing
const batchSize = 1000
workerPool, err := ants.NewPoolWithFunc(runtime.NumCPU()*2, func(i interface{}) {
defer wg.Done()

batch := i.([]solana.PublicKey)

for _, pk := range batch {
// Read stake account from AccountsDB
stakeAcct, err := acctsDb.GetAccount(slot, pk)
if err != nil {
continue // Account not found or closed
}

stakeState, err := sealevel.UnmarshalStakeState(stakeAcct.Data)
if err != nil {
continue // Invalid stake state
}

// Only process delegated stake accounts (status must be "Stake")
if stakeState.Status != sealevel.StakeStateV2StatusStake {
continue
}

// Call the callback with delegation data
delegation := &stakeState.Stake.Stake.Delegation
creditsObserved := stakeState.Stake.Stake.CreditsObserved
fn(pk, delegation, creditsObserved)
processedCount.Add(1)
}
})
if err != nil {
return 0, fmt.Errorf("creating worker pool: %w", err)
}
defer workerPool.Release()

// Submit stake pubkeys in batches
var invokeErr error
for i := 0; i < len(stakePubkeys); i += batchSize {
end := min(i+batchSize, len(stakePubkeys))
wg.Add(1)
if err := workerPool.Invoke(stakePubkeys[i:end]); err != nil {
wg.Done() // balance the Add since worker won't run
invokeErr = fmt.Errorf("worker pool invoke failed: %w", err)
break // stop submitting new batches, but wait for queued ones
}
}

// Always wait for all queued workers to finish before returning
wg.Wait()

if invokeErr != nil {
return 0, invokeErr
}

return int(processedCount.Load()), nil
}
Loading