diff --git a/cmd/mithril/node/node.go b/cmd/mithril/node/node.go index 0ad87901..c8ad6cf5 100644 --- a/cmd/mithril/node/node.go +++ b/cmd/mithril/node/node.go @@ -193,6 +193,7 @@ func init() { Run.Flags().BoolVar(&sbpf.UsePool, "use-pool", true, "Disable to allocate fresh slices") // [tuning.pprof] section flags + Run.Flags().Int64Var(&pprofPort, "pprof-port", -1, "Port to serve HTTP pprof endpoint") Run.Flags().StringVar(&cpuprofPath, "cpu-profile-path", "", "Filename to write CPU profile") // [debug] section flags @@ -855,7 +856,15 @@ func runVerifyRange(c *cobra.Command, args []string) { klog.Fatalf("end slot cannot be lower than start slot") } mlog.Log.Infof("will replay startSlot=%d endSlot=%d", startSlot, endSlot) - accountsDb.InitCaches() + accountsDb.InitCaches( + config.GetInt("tuning.cache.vote_acct_lru"), + config.GetInt("tuning.cache.stake_acct_lru"), + config.GetInt("tuning.cache.small_acct_lru"), + config.GetInt("tuning.cache.medium_acct_lru"), + config.GetInt("tuning.cache.huge_acct_lru"), + config.GetInt("tuning.cache.program_lru"), + config.GetInt("tuning.cache.seen_once_filter_size"), + ) metricsWriter, metricsWriterCleanup, err := createBufWriter(metricsPath) if err != nil { @@ -1075,6 +1084,11 @@ func runLive(c *cobra.Command, args []string) { // Now start the metrics server (after banner so errors don't appear first) statsd.StartMetricsServer() + // Start pprof HTTP server if configured + if pprofPort != -1 { + startPprofHandlers(int(pprofPort)) + } + // Determine if using Lightbringer based on block source // NOTE: Lightbringer mode is TEMPORARILY DISABLED. The background block downloader that // wrote Lightbringer blocks to disk was removed due to reliability issues (panics, race conditions). @@ -1160,7 +1174,7 @@ func runLive(c *cobra.Command, args []string) { // Handle explicit --snapshot flag (bypasses all auto-discovery, does NOT delete snapshot files) if snapshotArchivePath != "" { - mlog.Log.Infof("Using snapshot file: %s", snapshotArchivePath) + mlog.Log.Infof("Using full snapshot: %s", snapshotArchivePath) // Parse full snapshot slot from filename for validation fullSnapshotSlot := parseSlotFromSnapshotName(filepath.Base(snapshotArchivePath)) @@ -1610,7 +1624,15 @@ postBootstrap: } liveEndSlot := uint64(math.MaxUint64) - accountsDb.InitCaches() + accountsDb.InitCaches( + config.GetInt("tuning.cache.vote_acct_lru"), + config.GetInt("tuning.cache.stake_acct_lru"), + config.GetInt("tuning.cache.small_acct_lru"), + config.GetInt("tuning.cache.medium_acct_lru"), + config.GetInt("tuning.cache.huge_acct_lru"), + config.GetInt("tuning.cache.program_lru"), + config.GetInt("tuning.cache.seen_once_filter_size"), + ) metricsWriter, metricsWriterCleanup, err := createBufWriter(metricsPath) if err != nil { diff --git a/cmd/mithril/node/pprof.go b/cmd/mithril/node/pprof.go index 75030dd4..001ec46e 100644 --- a/cmd/mithril/node/pprof.go +++ b/cmd/mithril/node/pprof.go @@ -3,6 +3,7 @@ package node import ( "fmt" "net/http" + _ "net/http/pprof" // registers /debug/pprof/* handlers "runtime" "strconv" "time" diff --git a/config.example.toml b/config.example.toml index 488cf479..5d73152c 100644 --- a/config.example.toml +++ b/config.example.toml @@ -232,13 +232,13 @@ name = "mithril" port = 8899 # ============================================================================ -# [tuning] - Performance Tuning & Profiling +# [development] - Performance Tuning & Profiling # ============================================================================ # # Advanced settings for optimizing Mithril's performance. # The defaults work well for most deployments. -[tuning] +[development] # Zstd decoder concurrency (defaults to NumCPU) # zstd_decoder_concurrency = 16 @@ -254,8 +254,8 @@ name = "mithril" # Enable/disable pool allocator for slices use_pool = true - # [tuning.pprof] - CPU/Memory Profiling - [tuning.pprof] + # [development.pprof] - CPU/Memory Profiling + [development.pprof] # Port to serve HTTP pprof endpoint (-1 to disable) # Access at http://localhost:PORT/debug/pprof/ # port = 6060 @@ -263,6 +263,48 @@ name = "mithril" # Filename to write CPU profile (for offline analysis) # cpu_profile_path = "/tmp/cpuprof.pprof" + # [development.cache] - AccountsDB LRU Cache Sizes + # + # These control the LRU caches for fast account data reads during replay. + # Values are NUMBER OF ENTRIES, not bytes. + # + # NOTE: These are DIFFERENT from the global vote/stake caches used for + # leader schedule building. Those are unbounded maps that store vote STATE + # (voting history, credits) and stake DELEGATIONS. These LRU caches store + # full ACCOUNT data for frequently-accessed accounts. + # + # Larger caches = fewer disk reads, but more memory usage. + # Memory per entry is ~200-1000 bytes depending on account data size. + [development.cache] + # Vote account data cache - number of entries (frequently accessed during replay) + vote_acct_lru = 5000 + + # Stake account data cache - number of entries (separated to avoid evicting + # hot accounts during epoch rewards when ~1.25M stake accounts are touched once each) + stake_acct_lru = 2000 + + # Small account data cache - accounts ≤512 bytes (token accounts, etc.) + small_acct_lru = 50000 + + # Medium account data cache - accounts 512-64KB + medium_acct_lru = 20000 + + # Huge account data cache - accounts >64KB (mostly programs) + huge_acct_lru = 500 + + # Compiled BPF program cache - number of entries + program_lru = 5000 + + # Admit-on-second-hit LRU filter for common accounts (small/medium/huge) + # Only caches accounts seen twice within the filter window, filtering one-shot reads. + # 0 = disabled (cache everything immediately, like traditional LRU) + # >0 = enable filtering with LRU of this capacity + # Recommended: 50000 (roughly 0.5-1x of small+medium+huge total) + # Monitor SeenOnceAdmitted/SeenOnceFiltered ratio to tune: + # <5-10% admitted = too strict (filter too small) + # >40-50% admitted = too lenient (filter too large or most accesses are hot) + seen_once_filter_size = 0 + # ============================================================================ # [debug] - Debug Logging # ============================================================================ diff --git a/docs/TODO.md b/docs/TODO.md new file mode 100644 index 00000000..ee806050 --- /dev/null +++ b/docs/TODO.md @@ -0,0 +1,88 @@ +# TODO / Known Issues + +Identified on branch `perf/reward-distribution-optimizations` at commit `3b2ad67` +dev HEAD at time of identification: `a25b2e3` +Date: 2026-01-13 + +--- + +## Failing Tests + +### 1. Address Lookup Table Tests - `InstrErrUnsupportedProgramId` + +**File:** `pkg/sealevel/address_lookup_table_test.go` +**Test:** `TestExecute_AddrLookupTable_Program_Test_Create_Lookup_Table_Idempotent` (and likely all other ALT tests) + +**Root Cause:** `AddressLookupTableAddr` and `StakeProgramAddr` were accidentally removed from `resolveNativeProgramById` switch in `pkg/sealevel/native_programs_common.go`. + +| Program | Removed In | Commit Date | Commit Message | +|---------|------------|-------------|----------------| +| `AddressLookupTableAddr` | `d47c16b` | May 16, 2025 | "many optimisations and changes" | +| `StakeProgramAddr` | `e890f9e` | Jul 26, 2025 | "snapshot download, stake program migration, refactoring" | + +**Fix:** Add these cases back to the switch in `resolveNativeProgramById`: +```go +case a.StakeProgramAddr: + return StakeProgramExecute, a.StakeProgramAddrStr, nil +case a.AddressLookupTableAddr: + return AddressLookupTableExecute, a.AddressLookupTableProgramAddrStr, nil +``` + +--- + +### 2. Bank Hash Test - Nil Pointer Dereference + +**File:** `pkg/replay/hash_test.go` +**Test:** `Test_Compute_Bank_Hash` + +**Error:** +``` +panic: runtime error: invalid memory address or nil pointer dereference +pkg/replay/hash.go:227 - shouldIncludeEah(0x0, 0x0) +``` + +**Root Cause:** Test passes `nil` for the first argument to `shouldIncludeEah`, which dereferences it without a nil check. + +**Fix:** Either add nil check in `shouldIncludeEah` or fix the test to pass valid arguments. + +--- + +## Agave/Firedancer Parity Issues + +### 3. Missing "Burned Rewards" Semantics in Reward Distribution + +**File:** `pkg/rewards/rewards.go` (lines 180-230) + +**Problem:** Mithril does not implement "burn" semantics for per-account failures during partitioned reward distribution. This diverges from both Agave and Firedancer. + +**Current Mithril behavior:** +- `GetAccount` error → panic (aborts replay) +- `UnmarshalStakeState` error → silent skip (reward lost, not counted) +- `MarshalStakeStakeInto` error → panic (aborts replay) +- Lamport overflow → panic (aborts replay) + +**Agave behavior** (`distribution.rs:260`): +- `build_updated_stake_reward` returns `DistributionError::UnableToSetState` or `AccountNotFound` +- Caller logs error and adds to `lamports_burned` +- Continues processing remaining accounts + +**Firedancer behavior** (`fd_rewards.c:958`): +- `distribute_epoch_reward_to_stake_acc` returns non-zero on decode/non-stake/etc. +- Caller increments `lamports_burned` and continues + +**Failure scenarios that should burn (not panic):** +- Account missing / not found +- Stake state decode fails (including short/invalid data) +- Account isn't a stake account +- Lamport add overflows +- `set_state`/encode fails (e.g., data too small) + +**Fix required:** +1. Add `lamports_burned` tracking to reward distribution +2. Change panics to log + burn + continue +3. `epochRewards.Distribute()` should receive `distributedLamports` (successful) separately from burned amount +4. Ensure `SysvarEpochRewards.DistributedRewards` advances correctly (may need to include burned in total) + +**Note:** The current silent skip on `UnmarshalStakeState` error reduces `distributedLamports` but doesn't track it as burned, which may cause `SysvarEpochRewards` to diverge from Agave/FD. + +--- diff --git a/pkg/accountsdb/accountsdb.go b/pkg/accountsdb/accountsdb.go index 7cd2e866..711b878c 100644 --- a/pkg/accountsdb/accountsdb.go +++ b/pkg/accountsdb/accountsdb.go @@ -24,9 +24,18 @@ type AccountsDb struct { BankHashStore *pebble.DB AcctsDir string LargestFileId atomic.Uint64 - VoteAcctCache otter.Cache[solana.PublicKey, *accounts.Account] - CommonAcctsCache otter.Cache[solana.PublicKey, *accounts.Account] + VoteAcctCache otter.Cache[solana.PublicKey, *accounts.Account] // Vote accounts (frequently accessed) + StakeAcctCache otter.Cache[solana.PublicKey, *accounts.Account] // Stake accounts (small 2k cache) + SmallAcctCache otter.Cache[solana.PublicKey, *accounts.Account] // Small accounts ≤512 bytes (50k entries) + MediumAcctCache otter.Cache[solana.PublicKey, *accounts.Account] // Medium accounts 512-64KB (20k entries) + HugeAcctCache otter.Cache[solana.PublicKey, *accounts.Account] // Huge accounts >64KB (500 entries, mostly programs) ProgramCache otter.Cache[solana.PublicKey, *ProgramCacheEntry] + InRewardsWindow bool // When true, only update existing stake cache entries (don't add new ones) + + // Admit-on-second-hit LRU filter for common accounts (small/medium/huge) + // Only accounts seen twice within the filter window get cached, filtering one-shot reads. + // nil = disabled (cache everything immediately) + SeenOnceFilter *otter.Cache[solana.PublicKey, struct{}] } // silentLogger implements pebble.Logger but discards all messages. @@ -40,6 +49,132 @@ var ( ErrNoAccount = errors.New("ErrNoAccount") ) +// Cache hit/miss counters for profiling +var ( + // Cache hits per cache type + SmallCacheHits atomic.Uint64 // Small accounts ≤512 bytes + MediumCacheHits atomic.Uint64 // Medium accounts 512-64KB + HugeCacheHits atomic.Uint64 // Huge accounts >64KB + StakeCacheHits atomic.Uint64 + VoteCacheHits atomic.Uint64 + ProgramCacheHits atomic.Uint64 // Compiled BPF programs + + // Cache misses per cache type + SmallCacheMisses atomic.Uint64 // ≤512 bytes + MediumCacheMisses atomic.Uint64 // 512-64KB + HugeCacheMisses atomic.Uint64 // >64KB (total) + StakeCacheMisses atomic.Uint64 + VoteCacheMisses atomic.Uint64 + ProgramCacheMisses atomic.Uint64 // Compiled BPF programs + + // Granular miss breakdown within huge range (>64KB) + HugeMiss64Kto256K atomic.Uint64 // 64KB-256KB + HugeMiss256Kto1M atomic.Uint64 // 256KB-1MB + HugeMissOver1M atomic.Uint64 // >1MB + + // Program cache miss size breakdown + ProgramMissUnder1M atomic.Uint64 // <1MB programs + ProgramMissOver1M atomic.Uint64 // ≥1MB programs + + // Admit-on-second-hit filter stats + SeenOnceFiltered atomic.Uint64 // First hit, added to seen-once tracking + SeenOnceAdmitted atomic.Uint64 // Second hit, admitted to cache +) + +// CacheStats holds cache hit/miss counts for reporting +type CacheStats struct { + SmallHits, MediumHits, HugeHits, StakeHits, VoteHits uint64 + SmallMisses, MediumMisses, HugeMisses uint64 + StakeMisses, VoteMisses uint64 + // Program cache stats + ProgramHits, ProgramMisses uint64 + ProgramMissUnder1M uint64 // <1MB programs + ProgramMissOver1M uint64 // ≥1MB programs + // Granular breakdown within huge range + HugeMiss64Kto256K uint64 // 64KB-256KB + HugeMiss256Kto1M uint64 // 256KB-1MB + HugeMissOver1M uint64 // >1MB + // Admit-on-second-hit stats + SeenOnceFiltered uint64 // First hits (tracked but not cached) + SeenOnceAdmitted uint64 // Second hits (admitted to cache) +} + +// GetAndResetCacheStats returns current cache hit/miss counts and resets them +func GetAndResetCacheStats() CacheStats { + return CacheStats{ + SmallHits: SmallCacheHits.Swap(0), + MediumHits: MediumCacheHits.Swap(0), + HugeHits: HugeCacheHits.Swap(0), + StakeHits: StakeCacheHits.Swap(0), + VoteHits: VoteCacheHits.Swap(0), + ProgramHits: ProgramCacheHits.Swap(0), + SmallMisses: SmallCacheMisses.Swap(0), + MediumMisses: MediumCacheMisses.Swap(0), + HugeMisses: HugeCacheMisses.Swap(0), + StakeMisses: StakeCacheMisses.Swap(0), + VoteMisses: VoteCacheMisses.Swap(0), + ProgramMisses: ProgramCacheMisses.Swap(0), + ProgramMissUnder1M: ProgramMissUnder1M.Swap(0), + ProgramMissOver1M: ProgramMissOver1M.Swap(0), + HugeMiss64Kto256K: HugeMiss64Kto256K.Swap(0), + HugeMiss256Kto1M: HugeMiss256Kto1M.Swap(0), + HugeMissOver1M: HugeMissOver1M.Swap(0), + SeenOnceFiltered: SeenOnceFiltered.Swap(0), + SeenOnceAdmitted: SeenOnceAdmitted.Swap(0), + } +} + +// CacheFillStats holds current cache fill levels +type CacheFillStats struct { + SmallSize, SmallCap int + MediumSize, MediumCap int + HugeSize, HugeCap int + StakeSize, StakeCap int + VoteSize, VoteCap int + ProgramSize, ProgramCap int +} + +// GetCacheFillStats returns current cache fill levels (size/capacity) +func (accountsDb *AccountsDb) GetCacheFillStats() CacheFillStats { + return CacheFillStats{ + SmallSize: accountsDb.SmallAcctCache.Size(), + SmallCap: accountsDb.SmallAcctCache.Capacity(), + MediumSize: accountsDb.MediumAcctCache.Size(), + MediumCap: accountsDb.MediumAcctCache.Capacity(), + HugeSize: accountsDb.HugeAcctCache.Size(), + HugeCap: accountsDb.HugeAcctCache.Capacity(), + StakeSize: accountsDb.StakeAcctCache.Size(), + StakeCap: accountsDb.StakeAcctCache.Capacity(), + VoteSize: accountsDb.VoteAcctCache.Size(), + VoteCap: accountsDb.VoteAcctCache.Capacity(), + ProgramSize: accountsDb.ProgramCache.Size(), + ProgramCap: accountsDb.ProgramCache.Capacity(), + } +} + +// recordCacheMiss increments the appropriate cache miss counter based on owner and size +func recordCacheMiss(owner solana.PublicKey, dataLen uint64) { + if owner == addresses.VoteProgramAddr { + VoteCacheMisses.Add(1) + } else if owner == addresses.StakeProgramAddr { + StakeCacheMisses.Add(1) + } else if dataLen <= 512 { + SmallCacheMisses.Add(1) + } else if dataLen <= 65536 { + MediumCacheMisses.Add(1) + } else { + // Huge: >64KB - track total and granular breakdown + HugeCacheMisses.Add(1) + if dataLen <= 262144 { // 64KB-256KB + HugeMiss64Kto256K.Add(1) + } else if dataLen <= 1048576 { // 256KB-1MB + HugeMiss256Kto1M.Add(1) + } else { // >1MB + HugeMissOver1M.Add(1) + } + } +} + func OpenDb(accountsDbDir string) (*AccountsDb, error) { // check for existence of the 'accounts' directory, which holds the appendvecs appendVecsDir := fmt.Sprintf("%s/accounts", accountsDbDir) @@ -98,9 +233,34 @@ func (accountsDb *AccountsDb) CloseDb() { mlog.Log.Infof("CloseDb: done\n") // extra newline for spacing after close } -func (accountsDb *AccountsDb) InitCaches() { +// InitCaches initializes the LRU caches with the given sizes. +// Pass 0 for any size to use a reasonable builtin value. +// seenOnceFilterSize controls the admit-on-second-hit LRU filter for common accounts: +// - 0 = disabled (cache everything immediately, no filtering) +// - >0 = enable filtering with LRU of this capacity +func (accountsDb *AccountsDb) InitCaches(voteSize, stakeSize, smallSize, mediumSize, hugeSize, programSize int, seenOnceFilterSize int) { + // Apply builtin values when config not set + if voteSize <= 0 { + voteSize = 5000 + } + if stakeSize <= 0 { + stakeSize = 2000 + } + if smallSize <= 0 { + smallSize = 50000 // 50k small accounts ≤512 bytes + } + if mediumSize <= 0 { + mediumSize = 20000 // 20k medium accounts 512-64KB + } + if hugeSize <= 0 { + hugeSize = 500 // 500 huge accounts >64KB (mostly programs) + } + if programSize <= 0 { + programSize = 5000 + } + var err error - accountsDb.VoteAcctCache, err = otter.MustBuilder[solana.PublicKey, *accounts.Account](5000). + accountsDb.VoteAcctCache, err = otter.MustBuilder[solana.PublicKey, *accounts.Account](voteSize). Cost(func(key solana.PublicKey, acct *accounts.Account) uint32 { return 1 }). @@ -109,7 +269,7 @@ func (accountsDb *AccountsDb) InitCaches() { panic(err) } - accountsDb.ProgramCache, err = otter.MustBuilder[solana.PublicKey, *ProgramCacheEntry](5000). + accountsDb.ProgramCache, err = otter.MustBuilder[solana.PublicKey, *ProgramCacheEntry](programSize). Cost(func(key solana.PublicKey, progEntry *ProgramCacheEntry) uint32 { return 1 }). @@ -118,7 +278,7 @@ func (accountsDb *AccountsDb) InitCaches() { panic(err) } - accountsDb.CommonAcctsCache, err = otter.MustBuilder[solana.PublicKey, *accounts.Account](10000). + accountsDb.SmallAcctCache, err = otter.MustBuilder[solana.PublicKey, *accounts.Account](smallSize). Cost(func(key solana.PublicKey, acct *accounts.Account) uint32 { return 1 }). @@ -126,6 +286,54 @@ func (accountsDb *AccountsDb) InitCaches() { if err != nil { panic(err) } + + accountsDb.MediumAcctCache, err = otter.MustBuilder[solana.PublicKey, *accounts.Account](mediumSize). + Cost(func(key solana.PublicKey, acct *accounts.Account) uint32 { + return 1 + }). + Build() + if err != nil { + panic(err) + } + + accountsDb.HugeAcctCache, err = otter.MustBuilder[solana.PublicKey, *accounts.Account](hugeSize). + Cost(func(key solana.PublicKey, acct *accounts.Account) uint32 { + return 1 + }). + Build() + if err != nil { + panic(err) + } + + accountsDb.StakeAcctCache, err = otter.MustBuilder[solana.PublicKey, *accounts.Account](stakeSize). + Cost(func(key solana.PublicKey, acct *accounts.Account) uint32 { + return 1 + }). + Build() + if err != nil { + panic(err) + } + + // Initialize admit-on-second-hit LRU filter for common accounts + if seenOnceFilterSize > 0 { + // Otter v1 requires capacity >= 10 + if seenOnceFilterSize < 10 { + seenOnceFilterSize = 10 + } + filter, err := otter.MustBuilder[solana.PublicKey, struct{}](seenOnceFilterSize). + Cost(func(key solana.PublicKey, val struct{}) uint32 { return 1 }). + Build() + if err != nil { + panic(err) + } + accountsDb.SeenOnceFilter = &filter + mlog.Log.Infof("AccountsDB caches initialized: vote=%d stake=%d small=%d medium=%d huge=%d program=%d seenOnceFilter=%d", + voteSize, stakeSize, smallSize, mediumSize, hugeSize, programSize, seenOnceFilterSize) + } else { + // SeenOnceFilter stays nil (disabled) + mlog.Log.Infof("AccountsDB caches initialized: vote=%d stake=%d small=%d medium=%d huge=%d program=%d (seen-once filter disabled)", + voteSize, stakeSize, smallSize, mediumSize, hugeSize, programSize) + } } type ProgramCacheEntry struct { @@ -134,7 +342,23 @@ type ProgramCacheEntry struct { } func (accountsDb *AccountsDb) MaybeGetProgramFromCache(pubkey solana.PublicKey) (*ProgramCacheEntry, bool) { - return accountsDb.ProgramCache.Get(pubkey) + entry, found := accountsDb.ProgramCache.Get(pubkey) + if found { + ProgramCacheHits.Add(1) + } else { + ProgramCacheMisses.Add(1) + } + return entry, found +} + +// RecordProgramCacheMissSize records the size breakdown for a program cache miss. +// Call this after loading the program to track <1MB vs ≥1MB breakdown. +func RecordProgramCacheMissSize(programBytes uint64) { + if programBytes >= 1024*1024 { + ProgramMissOver1M.Add(1) + } else { + ProgramMissUnder1M.Add(1) + } } func (accountsDb *AccountsDb) AddProgramToCache(pubkey solana.PublicKey, programEntry *ProgramCacheEntry) { @@ -145,14 +369,124 @@ func (accountsDb *AccountsDb) RemoveProgramFromCache(pubkey solana.PublicKey) { accountsDb.ProgramCache.Delete(pubkey) } +// cacheAccount evicts stale entries from other caches, then inserts into the correct +// cache based on owner and size. This prevents stale data when an account changes owner +// (e.g., stake account closed becomes system-owned). +// +// During rewards window (InRewardsWindow=true), stake accounts are only updated if +// already cached - new entries are not added. This prevents cache thrash from the +// ~1.25M one-shot stake account accesses while preserving hot entries. +// +// For common accounts (small/medium/huge), if SeenOnceFilter != nil, uses +// admit-on-second-hit: only caches accounts seen twice within the filter window. +// This filters one-shot reads that would pollute the cache. +func (accountsDb *AccountsDb) cacheAccount(acct *accounts.Account) { + owner := solana.PublicKeyFromBytes(acct.Owner[:]) + + // OSCILLATION FIX: Check if already in a common cache BEFORE deleting + // This preserves "was hot" information so writes don't oscillate + wasCommonCached := false + if owner != addresses.VoteProgramAddr && owner != addresses.StakeProgramAddr { + // Use GetEntryQuietly to avoid inflating hit stats + _, wasCommonCached = accountsDb.SmallAcctCache.Extension().GetEntryQuietly(acct.Key) + if !wasCommonCached { + _, wasCommonCached = accountsDb.MediumAcctCache.Extension().GetEntryQuietly(acct.Key) + } + if !wasCommonCached { + _, wasCommonCached = accountsDb.HugeAcctCache.Extension().GetEntryQuietly(acct.Key) + } + } + + // Delete from all caches (prevents stale data if size tier changes) + accountsDb.VoteAcctCache.Delete(acct.Key) + accountsDb.StakeAcctCache.Delete(acct.Key) + accountsDb.SmallAcctCache.Delete(acct.Key) + accountsDb.MediumAcctCache.Delete(acct.Key) + accountsDb.HugeAcctCache.Delete(acct.Key) + + if owner == addresses.VoteProgramAddr { + accountsDb.VoteAcctCache.Set(acct.Key, acct) + } else if owner == addresses.StakeProgramAddr { + // During rewards: only update existing entries, don't add new ones + if accountsDb.InRewardsWindow { + if _, exists := accountsDb.StakeAcctCache.Get(acct.Key); exists { + accountsDb.StakeAcctCache.Set(acct.Key, acct) + } + } else { + accountsDb.StakeAcctCache.Set(acct.Key, acct) + } + } else { + // Common accounts - pass wasCommonCached to bypass filter if already hot + accountsDb.cacheCommonAccount(acct, wasCommonCached) + } +} + +// cacheCommonAccount handles caching for non-vote, non-stake accounts. +// If force=true or filter disabled, caches immediately. +// Otherwise applies admit-on-second-hit filter. +func (accountsDb *AccountsDb) cacheCommonAccount(acct *accounts.Account, force bool) { + // Direct cache if filter disabled or account was already cached (force=true) + if accountsDb.SeenOnceFilter == nil || force { + accountsDb.cacheCommonDirect(acct) + // Cleanup from filter if present (account is now hot) + if accountsDb.SeenOnceFilter != nil { + accountsDb.SeenOnceFilter.Delete(acct.Key) + } + return + } + + // Admit-on-second-hit: only cache if seen before + if _, seenBefore := accountsDb.SeenOnceFilter.Get(acct.Key); seenBefore { + // Second hit - admit to cache + SeenOnceAdmitted.Add(1) + accountsDb.SeenOnceFilter.Delete(acct.Key) // Delete AFTER checking + accountsDb.cacheCommonDirect(acct) + } else { + // First hit - track but don't cache + SeenOnceFiltered.Add(1) + accountsDb.SeenOnceFilter.Set(acct.Key, struct{}{}) + } +} + +// cacheCommonDirect inserts into the appropriate size-tiered cache +func (accountsDb *AccountsDb) cacheCommonDirect(acct *accounts.Account) { + if len(acct.Data) <= 512 { + accountsDb.SmallAcctCache.Set(acct.Key, acct) + } else if len(acct.Data) <= 65536 { + accountsDb.MediumAcctCache.Set(acct.Key, acct) + } else { + accountsDb.HugeAcctCache.Set(acct.Key, acct) + } +} + func (accountsDb *AccountsDb) GetAccount(slot uint64, pubkey solana.PublicKey) (*accounts.Account, error) { cachedAcct, hasAcct := accountsDb.VoteAcctCache.Get(pubkey) if hasAcct { + VoteCacheHits.Add(1) + return cachedAcct, nil + } + + cachedAcct, hasAcct = accountsDb.StakeAcctCache.Get(pubkey) + if hasAcct { + StakeCacheHits.Add(1) return cachedAcct, nil } - cachedAcct, hasAcct = accountsDb.CommonAcctsCache.Get(pubkey) + cachedAcct, hasAcct = accountsDb.SmallAcctCache.Get(pubkey) if hasAcct { + SmallCacheHits.Add(1) + return cachedAcct, nil + } + + cachedAcct, hasAcct = accountsDb.MediumAcctCache.Get(pubkey) + if hasAcct { + MediumCacheHits.Add(1) + return cachedAcct, nil + } + + cachedAcct, hasAcct = accountsDb.HugeAcctCache.Get(pubkey) + if hasAcct { + HugeCacheHits.Add(1) return cachedAcct, nil } @@ -196,11 +530,10 @@ func (accountsDb *AccountsDb) GetAccount(slot uint64, pubkey solana.PublicKey) ( acct.Slot = acctIdxEntry.Slot - if solana.PublicKeyFromBytes(acct.Owner[:]) == addresses.VoteProgramAddr { - accountsDb.VoteAcctCache.Set(pubkey, acct) - } else { - accountsDb.CommonAcctsCache.Set(pubkey, acct) - } + // Record cache miss by owner type and size bucket (for profiling) + recordCacheMiss(solana.PublicKeyFromBytes(acct.Owner[:]), uint64(len(acct.Data))) + + accountsDb.cacheAccount(acct) return acct, err } @@ -219,12 +552,7 @@ func (accountsDb *AccountsDb) StoreAccounts(accts []*accounts.Account, slot uint if acct == nil { continue } - // if vote account, do not serialize up and write into accountsdb - just save it in cache. - if solana.PublicKeyFromBytes(acct.Owner[:]) == addresses.VoteProgramAddr { - accountsDb.VoteAcctCache.Set(acct.Key, acct) - } else { - accountsDb.CommonAcctsCache.Set(acct.Key, acct) - } + accountsDb.cacheAccount(acct) } return nil diff --git a/pkg/config/config.go b/pkg/config/config.go index ef953e00..1e00579e 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -43,6 +43,20 @@ type DebugConfig struct { AccountWrites []string `toml:"account_writes" mapstructure:"account_writes"` // was: debugacctwrites } +// CacheConfig holds LRU cache sizing for AccountsDB +// These are DIFFERENT from the global vote/stake caches used for leader schedule - +// those are unbounded maps holding vote state and delegation info. +// These LRU caches store full account data for fast reads during replay. +type CacheConfig struct { + VoteAcctLRU int `toml:"vote_acct_lru" mapstructure:"vote_acct_lru"` // Vote account data (default: 5000) + StakeAcctLRU int `toml:"stake_acct_lru" mapstructure:"stake_acct_lru"` // Stake account data (default: 2000) + SmallAcctLRU int `toml:"small_acct_lru" mapstructure:"small_acct_lru"` // Small accounts ≤512 bytes (default: 50000) + MediumAcctLRU int `toml:"medium_acct_lru" mapstructure:"medium_acct_lru"` // Medium accounts 512-64KB (default: 20000) + HugeAcctLRU int `toml:"huge_acct_lru" mapstructure:"huge_acct_lru"` // Huge accounts >64KB (default: 500) + ProgramLRU int `toml:"program_lru" mapstructure:"program_lru"` // Compiled BPF programs (default: 5000) + SeenOnceFilterSize int `toml:"seen_once_filter_size" mapstructure:"seen_once_filter_size"` // Admit-on-second-hit LRU capacity (0=disabled, default: 0) +} + // DevelopmentConfig holds development/tuning configuration (matches Firedancer [development] section) type DevelopmentConfig struct { ZstdDecoderConcurrency int `toml:"zstd_decoder_concurrency" mapstructure:"zstd_decoder_concurrency"` // was: zstd-decoder-concurrency @@ -52,6 +66,7 @@ type DevelopmentConfig struct { UsePool bool `toml:"use_pool" mapstructure:"use_pool"` // was: use-pool Pprof PprofConfig `toml:"pprof" mapstructure:"pprof"` Debug DebugConfig `toml:"debug" mapstructure:"debug"` + Cache CacheConfig `toml:"cache" mapstructure:"cache"` } // ReportingConfig holds metrics/reporting configuration (matches Firedancer [reporting] section) diff --git a/pkg/replay/accounts.go b/pkg/replay/accounts.go index 7cbd518f..344f40b2 100644 --- a/pkg/replay/accounts.go +++ b/pkg/replay/accounts.go @@ -219,11 +219,16 @@ func loadAndValidateTxAcctsSimd186(slotCtx *sealevel.SlotCtx, acctMetasPerInstr return nil, err } - for _, pubkey := range acctKeys { + // Memoize accounts loaded in Pass 1 to avoid re-cloning in Pass 2 + // Use slice indexed by account position (same ordering as txAcctMetas) + acctCache := make([]*accounts.Account, len(acctKeys)) + + for i, pubkey := range acctKeys { acct, err := slotCtx.GetAccount(pubkey) if err != nil { panic("should be impossible - programming error") } + acctCache[i] = acct // Cache by index for reuse in Pass 2 err = accumulator.collectAcct(acct) if err != nil { return nil, err @@ -235,11 +240,15 @@ func loadAndValidateTxAcctsSimd186(slotCtx *sealevel.SlotCtx, acctMetasPerInstr return nil, err } - var programIdIdxs []uint64 + // Use boolean mask for O(1) program index lookup + isProgramIdx := make([]bool, len(acctKeys)) instructionAcctPubkeys := make(map[solana.PublicKey]struct{}) for instrIdx, instr := range tx.Message.Instructions { - programIdIdxs = append(programIdIdxs, uint64(instr.ProgramIDIndex)) + i := int(instr.ProgramIDIndex) + if i >= 0 && i < len(isProgramIdx) { + isProgramIdx[i] = true + } ias := acctMetasPerInstr[instrIdx] for _, ia := range ias { instructionAcctPubkeys[ia.Pubkey] = struct{}{} @@ -251,21 +260,17 @@ func loadAndValidateTxAcctsSimd186(slotCtx *sealevel.SlotCtx, acctMetasPerInstr for idx, acctMeta := range txAcctMetas { var acct *accounts.Account + cached := acctCache[idx] // Reuse account from Pass 1 _, instrContainsAcctMeta := instructionAcctPubkeys[acctMeta.PublicKey] if acctMeta.PublicKey == sealevel.SysvarInstructionsAddr { acct = instrsAcct - } else if !slotCtx.Features.IsActive(features.DisableAccountLoaderSpecialCase) && slices.Contains(programIdIdxs, uint64(idx)) && !acctMeta.IsWritable && !instrContainsAcctMeta { - tmp, err := slotCtx.GetAccount(acctMeta.PublicKey) - if err != nil { - return nil, err - } - acct = &accounts.Account{Key: acctMeta.PublicKey, Owner: tmp.Owner, Executable: true, IsDummy: true} + } else if !slotCtx.Features.IsActive(features.DisableAccountLoaderSpecialCase) && isProgramIdx[idx] && !acctMeta.IsWritable && !instrContainsAcctMeta { + // Dummy account case - only need owner from cached account + acct = &accounts.Account{Key: acctMeta.PublicKey, Owner: cached.Owner, Executable: true, IsDummy: true} } else { - acct, err = slotCtx.GetAccount(acctMeta.PublicKey) - if err != nil { - return nil, err - } + // Normal case - use cached account directly + acct = cached } acctsForTx = append(acctsForTx, *acct) @@ -278,16 +283,24 @@ func loadAndValidateTxAcctsSimd186(slotCtx *sealevel.SlotCtx, acctMetasPerInstr removeAcctsExecutableFlagChecks := slotCtx.Features.IsActive(features.RemoveAccountsExecutableFlagChecks) - for _, instr := range instrs { + for instrIdx, instr := range instrs { if instr.ProgramId == addresses.NativeLoaderAddr { continue } - programAcct, err := slotCtx.GetAccount(instr.ProgramId) - if err != nil { - programAcct, err = slotCtx.GetAccountFromAccountsDb(instr.ProgramId) + // Use cached account via ProgramIDIndex from tx.Message + programIdx := int(tx.Message.Instructions[instrIdx].ProgramIDIndex) + programAcct := acctCache[programIdx] + + // Fallback if not in cache (shouldn't happen for valid txs) + if programAcct == nil { + var err error + programAcct, err = slotCtx.GetAccount(instr.ProgramId) if err != nil { - return nil, TxErrProgramAccountNotFound + programAcct, err = slotCtx.GetAccountFromAccountsDb(instr.ProgramId) + if err != nil { + return nil, TxErrProgramAccountNotFound + } } } diff --git a/pkg/replay/block.go b/pkg/replay/block.go index fce171ed..ec14240c 100644 --- a/pkg/replay/block.go +++ b/pkg/replay/block.go @@ -1175,6 +1175,12 @@ func ReplayBlocks( var nonVoteTxCounts []uint64 // non-vote txns per block var justCrossedEpochBoundary bool + // Memory stats tracking for delta calculations + var lastMemStats runtime.MemStats + var lastSummaryTime time.Time + runtime.ReadMemStats(&lastMemStats) + lastSummaryTime = time.Now() + // Preallocate slices for 100 blocks const summaryInterval = 100 execTimes = make([]float64, 0, summaryInterval) @@ -1690,7 +1696,72 @@ func ReplayBlocks( mlog.Log.InfofPrecise(" execution: median %.3fs, min %.3fs, max %.3fs | wait: median %.3fs, min %.3fs, max %.3fs | replay total: median %.3fs", medExec, minExec, maxExec, medWait, minWait, maxWait, medTotal) - // Line 4: RPC/fetch debugging info + // Line 4: Cache hit/miss stats + cs := accountsdb.GetAndResetCacheStats() + totalHits := cs.SmallHits + cs.MediumHits + cs.HugeHits + cs.StakeHits + cs.VoteHits + totalMiss := cs.SmallMisses + cs.MediumMisses + cs.HugeMisses + cs.StakeMisses + cs.VoteMisses + if totalHits+totalMiss > 0 { + hitRate := float64(totalHits) / float64(totalHits+totalMiss) * 100 + // Show hits and misses per cache, with granular breakdown for huge (>64KB) + mlog.Log.InfofPrecise(" cache: %.1f%% hit | hits: small %d, medium %d, huge %d, stake %d, vote %d | miss: small %d, medium %d, huge %d [64K-256K:%d 256K-1M:%d >1M:%d], stake %d, vote %d", + hitRate, cs.SmallHits, cs.MediumHits, cs.HugeHits, cs.StakeHits, cs.VoteHits, + cs.SmallMisses, cs.MediumMisses, cs.HugeMisses, cs.HugeMiss64Kto256K, cs.HugeMiss256Kto1M, cs.HugeMissOver1M, + cs.StakeMisses, cs.VoteMisses) + + // Program cache stats (compiled BPF programs) + if cs.ProgramHits+cs.ProgramMisses > 0 { + progHitRate := float64(cs.ProgramHits) / float64(cs.ProgramHits+cs.ProgramMisses) * 100 + mlog.Log.InfofPrecise(" program cache: %.1f%% hit (%d/%d) | miss by size: <1M:%d ≥1M:%d", + progHitRate, cs.ProgramHits, cs.ProgramHits+cs.ProgramMisses, + cs.ProgramMissUnder1M, cs.ProgramMissOver1M) + } + + // Cache fill stats + cf := acctsDb.GetCacheFillStats() + mlog.Log.InfofPrecise(" cache fill: small %d/%d (%.0f%%), medium %d/%d (%.0f%%), huge %d/%d (%.0f%%), stake %d/%d, vote %d/%d, program %d/%d (%.0f%%)", + cf.SmallSize, cf.SmallCap, float64(cf.SmallSize)/float64(cf.SmallCap)*100, + cf.MediumSize, cf.MediumCap, float64(cf.MediumSize)/float64(cf.MediumCap)*100, + cf.HugeSize, cf.HugeCap, float64(cf.HugeSize)/float64(cf.HugeCap)*100, + cf.StakeSize, cf.StakeCap, cf.VoteSize, cf.VoteCap, + cf.ProgramSize, cf.ProgramCap, float64(cf.ProgramSize)/float64(cf.ProgramCap)*100) + + // Seen-once filter stats (only show if filter is active) + if cs.SeenOnceFiltered > 0 || cs.SeenOnceAdmitted > 0 { + total := cs.SeenOnceFiltered + cs.SeenOnceAdmitted + admitRate := float64(cs.SeenOnceAdmitted) / float64(total) * 100 + mlog.Log.InfofPrecise(" seen-once filter: %.1f%% admitted (%d/%d) | filtered %d", + admitRate, cs.SeenOnceAdmitted, total, cs.SeenOnceFiltered) + } + } + + // Memory stats for GC pressure monitoring (with deltas) + var m runtime.MemStats + runtime.ReadMemStats(&m) + elapsed := time.Since(lastSummaryTime).Seconds() + if elapsed < 0.001 { + elapsed = 0.001 // Avoid division by zero + } + + // Calculate deltas + deltaGC := m.NumGC - lastMemStats.NumGC + deltaPauseMs := float64(m.PauseTotalNs-lastMemStats.PauseTotalNs) / 1e6 + deltaAllocMB := float64(m.TotalAlloc-lastMemStats.TotalAlloc) / 1024 / 1024 + allocPerSec := deltaAllocMB / elapsed + deltaHeapMB := float64(m.HeapAlloc) - float64(lastMemStats.HeapAlloc) + deltaHeapMB = deltaHeapMB / 1024 / 1024 + + mlog.Log.InfofPrecise(" mem: heap %.1fMB (Δ%+.1fMB) | alloc %.1fMB/s | gc %d (Δ%d, Δ%.1fms) | sys %.1fMB | goroutines %d", + float64(m.HeapAlloc)/1024/1024, deltaHeapMB, + allocPerSec, + m.NumGC, deltaGC, deltaPauseMs, + float64(m.Sys)/1024/1024, + runtime.NumGoroutine()) + + // Update tracking for next interval + lastMemStats = m + lastSummaryTime = time.Now() + + // Line 5: RPC/fetch debugging info if fetchStats.Attempts > 0 { retryRate := float64(fetchStats.Retries) / float64(fetchStats.Attempts) * 100 prefetch := fetchStats.BufferDepth + fetchStats.ReorderBufLen diff --git a/pkg/replay/epoch.go b/pkg/replay/epoch.go index f84ebaad..fa596dca 100644 --- a/pkg/replay/epoch.go +++ b/pkg/replay/epoch.go @@ -150,7 +150,12 @@ func handleEpochTransition(acctsDb *accountsdb.AccountsDb, rpcc *rpcclient.RpcCl updateEpochStakesAndRefreshVoteCache(leaderScheduleEpoch, block, epochSchedule, f, acctsDb, prevSlotCtx.Slot) if global.ManageLeaderSchedule() { - _, err = PrepareLeaderScheduleLocalFromVoteCache(newEpoch, epochSchedule, "") + if len(global.EpochStakesVoteAccts(newEpoch)) > 0 { + _, err = PrepareLeaderScheduleLocal(newEpoch, epochSchedule, "") + } else { + _, err = PrepareLeaderScheduleLocalFromVoteCache(newEpoch, epochSchedule, "") + } + if err != nil { panic(err) } diff --git a/pkg/replay/rewards.go b/pkg/replay/rewards.go index fd24d045..f76a73cf 100644 --- a/pkg/replay/rewards.go +++ b/pkg/replay/rewards.go @@ -218,7 +218,19 @@ func distributePartitionedEpochRewardsForSlot(acctsDb *accountsdb.AccountsDb, ep epochRewards.MustUnmarshalWithDecoder(decoder) partitionIdx := currentBlockHeight - epochRewards.DistributionStartingBlockHeight - distributedAccts, parentDistributedAccts, distributedLamports := rewards.DistributeStakingRewardsForPartition(acctsDb, partitionedEpochRewardsInfo.RewardPartitions.Partition(partitionIdx), partitionedEpochRewardsInfo.StakingRewards, currentSlot) + + // Initialize shared worker pool on first partition (reused across all 243 partitions) + if partitionedEpochRewardsInfo.WorkerPool == nil { + if err := partitionedEpochRewardsInfo.InitWorkerPool(); err != nil { + panic(fmt.Sprintf("unable to initialize reward distribution worker pool: %s", err)) + } + } + + // During reward distribution, don't add new stake accounts to cache (prevents thrash). + // Only existing hot entries are updated. + acctsDb.InRewardsWindow = true + distributedAccts, parentDistributedAccts, distributedLamports := rewards.DistributeStakingRewardsForPartition(acctsDb, partitionedEpochRewardsInfo.RewardPartitions.Partition(partitionIdx), partitionedEpochRewardsInfo.StakingRewards, currentSlot, partitionedEpochRewardsInfo.WorkerPool) + acctsDb.InRewardsWindow = false parentDistributedAccts = append(parentDistributedAccts, epochRewardsAcct.Clone()) epochRewards.Distribute(distributedLamports) @@ -226,6 +238,7 @@ func distributePartitionedEpochRewardsForSlot(acctsDb *accountsdb.AccountsDb, ep if partitionedEpochRewardsInfo.NumRewardPartitionsRemaining == 0 { epochRewards.Active = false + partitionedEpochRewardsInfo.ReleaseWorkerPool() } writer := new(bytes.Buffer) diff --git a/pkg/replay/transaction.go b/pkg/replay/transaction.go index be9035ce..a9f58690 100644 --- a/pkg/replay/transaction.go +++ b/pkg/replay/transaction.go @@ -184,7 +184,6 @@ func recordStakeDelegation(acct *accounts.Account) { if isEmpty || isUninitialized { global.DeleteStakeCacheItem(acct.Key) } else { - //mlog.Log.Debugf("added stake delegation record for %s: %v", acct.Key, acct) stakeState, err := sealevel.UnmarshalStakeState(acct.Data) if err == nil { delegation := stakeState.Stake.Stake.Delegation diff --git a/pkg/rewards/rewards.go b/pkg/rewards/rewards.go index d83fcffa..8ae03406 100644 --- a/pkg/rewards/rewards.go +++ b/pkg/rewards/rewards.go @@ -11,6 +11,7 @@ import ( "github.com/Overclock-Validator/mithril/pkg/accounts" "github.com/Overclock-Validator/mithril/pkg/accountsdb" "github.com/Overclock-Validator/mithril/pkg/features" + "github.com/Overclock-Validator/mithril/pkg/global" "github.com/Overclock-Validator/mithril/pkg/rpcclient" "github.com/Overclock-Validator/mithril/pkg/safemath" "github.com/Overclock-Validator/mithril/pkg/sealevel" @@ -35,6 +36,87 @@ type PartitionedRewardDistributionInfo struct { Credits map[solana.PublicKey]CalculatedStakePoints RewardPartitions Partitions StakingRewards map[solana.PublicKey]*CalculatedStakeRewards + WorkerPool *ants.PoolWithFunc +} + +// rewardDistributionTask carries all context needed for processing one stake account. +// Used with the shared worker pool to avoid per-partition pool creation overhead. +type rewardDistributionTask struct { + acctsDb *accountsdb.AccountsDb + slot uint64 + stakingRewards map[solana.PublicKey]*CalculatedStakeRewards + accts []*accounts.Account + parentAccts []*accounts.Account + distributedLamports *atomic.Uint64 + wg *sync.WaitGroup + idx int + pubkey solana.PublicKey +} + +// rewardDistributionWorker is the shared worker function for stake reward distribution. +func rewardDistributionWorker(i interface{}) { + task := i.(*rewardDistributionTask) + defer task.wg.Done() + + reward, ok := task.stakingRewards[task.pubkey] + if !ok { + return + } + + stakeAcct, err := task.acctsDb.GetAccount(task.slot, task.pubkey) + if err != nil { + panic(fmt.Sprintf("unable to get acct %s from acctsdb for partitioned epoch rewards distribution in slot %d", task.pubkey, task.slot)) + } + task.parentAccts[task.idx] = stakeAcct.Clone() + + stakeState, err := sealevel.UnmarshalStakeState(stakeAcct.Data) + if err != nil { + return + } + + stakeState.Stake.Stake.CreditsObserved = reward.NewCreditsObserved + stakeState.Stake.Stake.Delegation.StakeLamports = safemath.SaturatingAddU64(stakeState.Stake.Stake.Delegation.StakeLamports, uint64(reward.StakerRewards)) + + err = sealevel.MarshalStakeStakeInto(stakeState, stakeAcct.Data) + if err != nil { + panic(fmt.Sprintf("unable to serialize new stake account state in distributing partitioned rewards: %s", err)) + } + + stakeAcct.Lamports, err = safemath.CheckedAddU64(stakeAcct.Lamports, uint64(reward.StakerRewards)) + if err != nil { + panic(fmt.Sprintf("overflow in partitioned epoch rewards distribution in slot %d to acct %s: %s", task.slot, task.pubkey, err)) + } + + task.accts[task.idx] = stakeAcct + task.distributedLamports.Add(reward.StakerRewards) + + // update the stake cache + delegationToCache := stakeState.Stake.Stake.Delegation + delegationToCache.CreditsObserved = stakeState.Stake.Stake.CreditsObserved + global.PutStakeCacheItem(task.pubkey, &delegationToCache) +} + +// InitWorkerPool creates the shared worker pool for reward distribution. +// Call once at the start of partitioned rewards, before processing any partition. +func (info *PartitionedRewardDistributionInfo) InitWorkerPool() error { + if info.WorkerPool != nil { + return nil + } + size := runtime.GOMAXPROCS(0) * 8 + pool, err := ants.NewPoolWithFunc(size, rewardDistributionWorker) + if err != nil { + return err + } + info.WorkerPool = pool + return nil +} + +// ReleaseWorkerPool releases the shared pool. Call when NumRewardPartitionsRemaining == 0. +func (info *PartitionedRewardDistributionInfo) ReleaseWorkerPool() { + if info.WorkerPool != nil { + info.WorkerPool.Release() + info.WorkerPool = nil + } } type CalculatedStakePoints struct { @@ -162,7 +244,6 @@ func DistributeVotingRewards(acctsDb *accountsdb.AccountsDb, validatorRewards ma wg.Wait() workerPool.Release() - ants.Release() err := acctsDb.StoreAccounts(updatedAccts, slot) if err != nil { @@ -172,72 +253,30 @@ func DistributeVotingRewards(acctsDb *accountsdb.AccountsDb, validatorRewards ma return updatedAccts, parentUpdatedAccts, totalVotingRewards.Load() } -type idxAndPubkey struct { - idx int - pubkey solana.PublicKey -} - -func DistributeStakingRewardsForPartition(acctsDb *accountsdb.AccountsDb, partition *Partition, stakingRewards map[solana.PublicKey]*CalculatedStakeRewards, slot uint64) ([]*accounts.Account, []*accounts.Account, uint64) { +func DistributeStakingRewardsForPartition(acctsDb *accountsdb.AccountsDb, partition *Partition, stakingRewards map[solana.PublicKey]*CalculatedStakeRewards, slot uint64, workerPool *ants.PoolWithFunc) ([]*accounts.Account, []*accounts.Account, uint64) { var distributedLamports atomic.Uint64 accts := make([]*accounts.Account, partition.NumPubkeys()) parentAccts := make([]*accounts.Account, partition.NumPubkeys()) var wg sync.WaitGroup - size := runtime.GOMAXPROCS(0) * 8 - workerPool, _ := ants.NewPoolWithFunc(size, func(i interface{}) { - defer wg.Done() - - ip := i.(idxAndPubkey) - idx := ip.idx - stakePk := ip.pubkey - - reward, ok := stakingRewards[stakePk] - if !ok { - return - } - - stakeAcct, err := acctsDb.GetAccount(slot, stakePk) - if err != nil { - panic(fmt.Sprintf("unable to get acct %s from acctsdb for partitioned epoch rewards distribution in slot %d", stakePk, slot)) - } - parentAccts[idx] = stakeAcct.Clone() - - // update the delegation in the stake account state - stakeState, err := sealevel.UnmarshalStakeState(stakeAcct.Data) - if err != nil { - return - } - - stakeState.Stake.Stake.CreditsObserved = reward.NewCreditsObserved - stakeState.Stake.Stake.Delegation.StakeLamports = safemath.SaturatingAddU64(stakeState.Stake.Stake.Delegation.StakeLamports, uint64(reward.StakerRewards)) - - newStakeStateBytes, err := sealevel.MarshalStakeStake(stakeState) - if err != nil { - panic(fmt.Sprintf("unable to serialize new stake account state in distributing partitioned rewards: %s", err)) - } - copy(stakeAcct.Data, newStakeStateBytes) - - // update lamports in stake account - stakeAcct.Lamports, err = safemath.CheckedAddU64(stakeAcct.Lamports, uint64(reward.StakerRewards)) - if err != nil { - panic(fmt.Sprintf("overflow in partitioned epoch rewards distribution in slot %d to acct %s: %s", slot, stakePk, err)) - } - - accts[idx] = stakeAcct - distributedLamports.Add(reward.StakerRewards) - }) - for idx, stakePk := range partition.Pubkeys() { - ip := idxAndPubkey{idx: idx, pubkey: stakePk} + task := &rewardDistributionTask{ + acctsDb: acctsDb, + slot: slot, + stakingRewards: stakingRewards, + accts: accts, + parentAccts: parentAccts, + distributedLamports: &distributedLamports, + wg: &wg, + idx: idx, + pubkey: stakePk, + } wg.Add(1) - workerPool.Invoke(ip) + workerPool.Invoke(task) } wg.Wait() - workerPool.Release() - ants.Release() - err := acctsDb.StoreAccounts(accts, slot) if err != nil { panic(fmt.Sprintf("error updating accounts for partitioned epoch rewards in slot %d: %s", slot, err)) @@ -350,7 +389,6 @@ func CalculateStakeRewardsAndPartitions(pointsPerStakeAcct map[solana.PublicKey] } wg.Wait() partitionCalcWorkerPool.Release() - ants.Release() return stakeInfoResults, validatorRewards, partitions } @@ -497,7 +535,6 @@ func CalculateStakePoints( wg.Wait() workerPool.Release() - ants.Release() return pointsAccum.CalculatedStakePoints(), pointsAccum.TotalPoints() } diff --git a/pkg/sealevel/bpf_loader.go b/pkg/sealevel/bpf_loader.go index 6cffc888..e04fa4e9 100644 --- a/pkg/sealevel/bpf_loader.go +++ b/pkg/sealevel/bpf_loader.go @@ -1169,6 +1169,7 @@ func BpfLoaderProgramExecute(execCtx *ExecutionCtx) error { programBytes = programAcct.Data() } programAcctKey = programAcct.Key() + accountsdb.RecordProgramCacheMissSize(uint64(len(programBytes))) } } else if programOwner == a.BpfLoaderUpgradeableAddr { var programAcctState *UpgradeableLoaderState @@ -1236,6 +1237,7 @@ func BpfLoaderProgramExecute(execCtx *ExecutionCtx) error { programAcctKey = programAcctState.Program.ProgramDataAddress programBytes = programDataAcct.Data[upgradeableLoaderSizeOfProgramDataMetaData:] metrics.GlobalBlockReplay.GetProgramDataUncachedMarshal.AddTimingSince(start) + accountsdb.RecordProgramCacheMissSize(uint64(len(programBytes))) } } else { return InstrErrUnsupportedProgramId diff --git a/pkg/sealevel/loader_v4.go b/pkg/sealevel/loader_v4.go index 5e029e46..6624a7f3 100644 --- a/pkg/sealevel/loader_v4.go +++ b/pkg/sealevel/loader_v4.go @@ -305,6 +305,7 @@ func LoaderV4Execute(execCtx *ExecutionCtx) error { } programBytes = programDataAcct.Data[loaderV4ProgramDataOffset:] + accountsdb.RecordProgramCacheMissSize(uint64(len(programBytes))) } syscallRegistry := sbpf.SyscallRegistry(func(u uint32) (sbpf.Syscall, bool) { diff --git a/pkg/sealevel/stake_state.go b/pkg/sealevel/stake_state.go index 873e6c9e..d45043f4 100644 --- a/pkg/sealevel/stake_state.go +++ b/pkg/sealevel/stake_state.go @@ -2,6 +2,7 @@ package sealevel import ( "bytes" + "fmt" "math" "github.com/Overclock-Validator/mithril/pkg/features" @@ -765,6 +766,31 @@ func MarshalStakeStake(state *StakeStateV2) ([]byte, error) { } } +// fixedSliceWriter implements io.Writer over a fixed-size byte slice, +// avoiding allocation during serialization. +type fixedSliceWriter struct { + buf []byte + pos int +} + +func (w *fixedSliceWriter) Write(p []byte) (int, error) { + if w.pos+len(p) > len(w.buf) { + return 0, fmt.Errorf("write exceeds buffer: pos=%d, write=%d, cap=%d", w.pos, len(p), len(w.buf)) + } + copy(w.buf[w.pos:], p) + w.pos += len(p) + return len(p), nil +} + +// MarshalStakeStakeInto writes the stake state directly into dst, avoiding allocation. +// dst should be at least StakeStateV2Size (200) bytes for valid stake accounts. +func MarshalStakeStakeInto(state *StakeStateV2, dst []byte) error { + writer := &fixedSliceWriter{buf: dst, pos: 0} + encoder := bin.NewBinEncoder(writer) + + return state.MarshalWithEncoder(encoder) +} + func setStakeAccountState(acct *BorrowedAccount, stakeState *StakeStateV2, f features.Features) error { stakeStateBytes, err := MarshalStakeStake(stakeState) if err != nil { diff --git a/pkg/snapshot/build_db.go b/pkg/snapshot/build_db.go index 31054b98..bc5e6f45 100644 --- a/pkg/snapshot/build_db.go +++ b/pkg/snapshot/build_db.go @@ -164,21 +164,21 @@ func BuildAccountsDbPaths( // Clean any leftover artifacts from previous incomplete runs (e.g., Ctrl+C) CleanAccountsDbDir(accountsDbDir) - mlog.Log.Infof("Parsing manifest from %s", snapshotFile) + mlog.Log.Infof("Parsing full snapshot manifest...") manifest, err := UnmarshalManifestFromSnapshot(ctx, snapshotFile, accountsDbDir) if err != nil { return nil, nil, fmt.Errorf("reading snapshot manifest: %v", err) } - mlog.Log.Infof("Parsed manifest from full snapshot") + mlog.Log.Infof("Parsed full snapshot manifest") var incrementalManifest *SnapshotManifest if incrementalSnapshotFile != "" { - mlog.Log.Infof("Parsing manifest from %s", incrementalSnapshotFile) + mlog.Log.Infof("Parsing incremental snapshot manifest...") incrementalManifest, err = UnmarshalManifestFromSnapshot(ctx, incrementalSnapshotFile, accountsDbDir) if err != nil { return nil, nil, fmt.Errorf("reading incremental snapshot manifest: %v", err) } - mlog.Log.Infof("Parsed manifest from incremental snapshot") + mlog.Log.Infof("Parsed incremental snapshot manifest") } start := time.Now() diff --git a/pkg/snapshot/build_db_with_incr.go b/pkg/snapshot/build_db_with_incr.go index f60a9bbd..c942685f 100644 --- a/pkg/snapshot/build_db_with_incr.go +++ b/pkg/snapshot/build_db_with_incr.go @@ -50,12 +50,12 @@ func BuildAccountsDbAuto( // Clean any leftover artifacts from previous incomplete runs (e.g., Ctrl+C) CleanAccountsDbDir(accountsDbDir) - mlog.Log.Infof("Parsing manifest from %s", fullSnapshotFile) + mlog.Log.Infof("Parsing full snapshot manifest...") manifest, err := UnmarshalManifestFromSnapshot(ctx, fullSnapshotFile, accountsDbDir) if err != nil { return nil, nil, fmt.Errorf("reading snapshot manifest: %v", err) } - mlog.Log.Infof("Parsed manifest from full snapshot") + mlog.Log.Infof("Parsed full snapshot manifest") start := time.Now() @@ -171,7 +171,7 @@ func BuildAccountsDbAuto( } incrSnapshotStart := time.Now() - mlog.Log.Infof("Parsing manifest from %s", incrementalSnapshotPath) + mlog.Log.Infof("Parsing incremental snapshot manifest...") incrementalManifestCopy, err := UnmarshalManifestFromSnapshot(ctx, incrementalSnapshotPath, accountsDbDir) if err != nil { mlog.Log.Errorf("reading incremental snapshot manifest: %v", err) @@ -179,7 +179,7 @@ func BuildAccountsDbAuto( } // Copy the manifest so the worker pool's pointer has the value. *incrementalManifest = *incrementalManifestCopy - mlog.Log.Infof("Parsed manifest from incremental snapshot") + mlog.Log.Infof("Parsed incremental snapshot manifest") // Determine save path for incremental snapshot if streaming from HTTP var incrSavePath string