Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 254 additions & 24 deletions cmd/mithril/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"runtime/pprof"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -83,9 +84,11 @@ var (

loadFromSnapshot bool
loadFromAccountsDb bool
bootstrapMode string // "auto", "snapshot", or "accountsdb"
bootstrapMode string // "auto", "snapshot", "local-snapshot", or "accountsdb"
snapshotArchivePath string
incrementalSnapshotFilename string
localFullSnapshotPath string // Path to existing full snapshot file (for local-snapshot mode)
localIncrSnapshotPath string // Path to existing incremental snapshot file (optional)
accountsPath string
scratchDirectory string
rpcEndpoints []string
Expand Down Expand Up @@ -167,7 +170,9 @@ func init() {

// flags for 'mithril run' (live full node mode)
// [bootstrap] section flags
Run.Flags().StringVar(&bootstrapMode, "bootstrap-mode", "auto", "Bootstrap mode: 'auto' (use AccountsDB if exists, else snapshot), 'accountsdb' (require existing), 'snapshot' (rebuild from snapshot), 'new-snapshot' (always download fresh)")
Run.Flags().StringVar(&bootstrapMode, "bootstrap-mode", "auto", "Bootstrap mode: 'auto', 'accountsdb', 'snapshot', 'new-snapshot', or 'local-snapshot'")
Run.Flags().StringVar(&localFullSnapshotPath, "full-snapshot", "", "Path to existing full snapshot file (triggers local-snapshot mode)")
Run.Flags().StringVar(&localIncrSnapshotPath, "incremental-snapshot", "", "Path to existing incremental snapshot file (optional, used with --full-snapshot)")

// [ledger] section flags
Run.Flags().StringVarP(&accountsPath, "accounts-path", "o", "", "Output path for writing AccountsDB data to")
Expand Down Expand Up @@ -1087,6 +1092,23 @@ func runLive(c *cobra.Command, args []string) {
// Use configured snapshot directory (storage.snapshots / snapshot.download_path), not scratch
snapshotDownloadPath := snapshotDlPath

// Auto-detect local-snapshot mode when --full-snapshot is provided
if localFullSnapshotPath != "" {
if bootstrapMode != "auto" && bootstrapMode != "local-snapshot" {
mlog.Log.Infof("WARNING: --full-snapshot provided, overriding --bootstrap-mode=%s with local-snapshot", bootstrapMode)
}
bootstrapMode = "local-snapshot"
// Validate the snapshot file exists
if _, err := os.Stat(localFullSnapshotPath); os.IsNotExist(err) {
klog.Fatalf("full snapshot file not found: %s", localFullSnapshotPath)
}
if localIncrSnapshotPath != "" {
if _, err := os.Stat(localIncrSnapshotPath); os.IsNotExist(err) {
klog.Fatalf("incremental snapshot file not found: %s", localIncrSnapshotPath)
}
}
}

// Prune old history entries if needed (keeps last 100)
if accountsPath != "" {
if err := state.PruneHistory(accountsPath); err != nil {
Expand Down Expand Up @@ -1242,6 +1264,70 @@ func runLive(c *cobra.Command, args []string) {
// Record bootstrap in history
state.RecordBootstrap(accountsPath, manifest.Bank.Slot, "", replay.CurrentRunID, getVersion(), getCommit())

case "local-snapshot":
// Mode: Build from user-provided snapshot files (no download)
if localFullSnapshotPath == "" {
klog.Fatalf("mode=local-snapshot requires --full-snapshot flag")
}
if accountsPath == "" {
klog.Fatalf("mode=local-snapshot requires --accounts-path flag")
}

mlog.Log.Infof("mode=local-snapshot: building AccountsDB from local snapshot files")
mlog.Log.Infof(" full snapshot: %s", localFullSnapshotPath)
if localIncrSnapshotPath != "" {
mlog.Log.Infof(" incremental snapshot: %s", localIncrSnapshotPath)
} else {
mlog.Log.Infof(" incremental snapshot: (will fetch from network)")
}

// Clean previous AccountsDB
if mithrilState != nil {
state.RecordRebuild(accountsPath, mithrilState.LastSlot, mithrilState.LastBankhash, getVersion(), getCommit(), "local-snapshot mode")
} else {
state.RecordRebuild(accountsPath, 0, "", getVersion(), getCommit(), "local-snapshot mode (no prior state)")
}
mlog.Log.Infof("cleaning up previous AccountsDB artifacts in %s", accountsPath)
snapshot.CleanAccountsDbDir(accountsPath)

// Build from local snapshot
// BuildAccountsDb accepts an optional incremental as second param
var incrPath string
if localIncrSnapshotPath != "" {
// Both full and incremental provided
mlog.Log.Infof("building AccountsDB from full + incremental snapshots")
incrPath = localIncrSnapshotPath
} else {
// Only full provided - try to fetch incremental
mlog.Log.Infof("building AccountsDB from full snapshot (will attempt to fetch incremental)")
fullSlot := parseSlotFromSnapshotName(filepath.Base(localFullSnapshotPath))
if fullSlot > 0 && snapshotDownloadPath != "" {
fetchedIncr, incrErr := downloadIncrementalForFullSnapshot(ctx, rpcEndpoints, fullSlot, snapshotDownloadPath)
if incrErr == nil && fetchedIncr != "" {
mlog.Log.Infof("found matching incremental snapshot at %s", fetchedIncr)
incrPath = fetchedIncr
} else if incrErr != nil {
mlog.Log.Infof("could not fetch incremental: %v (building from full snapshot only)", incrErr)
}
}
}

accountsDb, manifest, err = snapshot.BuildAccountsDb(ctx, localFullSnapshotPath, incrPath, accountsPath)
if err != nil {
klog.Fatalf("failed to build AccountsDB from local snapshot: %v", err)
}

// Write state file
var snapshotEpoch uint64
if sealevel.SysvarCache.EpochSchedule.Sysvar != nil {
snapshotEpoch = sealevel.SysvarCache.EpochSchedule.Sysvar.GetEpoch(manifest.Bank.Slot)
}
mithrilState = state.NewReadyState(manifest.Bank.Slot, snapshotEpoch, "", "", 0, 0)
if err := mithrilState.Save(accountsPath); err != nil {
mlog.Log.Errorf("failed to save state file: %v", err)
}
state.RecordBootstrap(accountsPath, manifest.Bank.Slot, "", replay.CurrentRunID, getVersion(), getCommit())

case "auto":
fallthrough
default:
Expand Down Expand Up @@ -1773,9 +1859,11 @@ func printStartupInfo(commandName string) {
case "auto":
bootstrapDesc = "use existing AccountsDB if valid, else download snapshot"
case "snapshot":
bootstrapDesc = "rebuild from local snapshot"
bootstrapDesc = "rebuild from snapshot (reuse if fresh)"
case "new-snapshot":
bootstrapDesc = "download fresh snapshot from network"
case "local-snapshot":
bootstrapDesc = "build from user-provided snapshot file"
case "accountsdb":
bootstrapDesc = "require existing AccountsDB"
default:
Expand Down Expand Up @@ -1968,7 +2056,8 @@ func detectExistingAccountsDB(path string) (bool, uint64) {
return true, manifest.Bank.Slot
}

// detectExistingSnapshots finds snapshot files in the given directory
// detectExistingSnapshots finds snapshot files in the given directory.
// It skips .partial files (incomplete downloads from crashed runs).
func detectExistingSnapshots(dir string) []snapshotInfo {
if dir == "" {
return nil
Expand All @@ -1987,6 +2076,11 @@ func detectExistingSnapshots(dir string) []snapshotInfo {
}
name := entry.Name()

// Skip partial downloads (incomplete files from crashed runs)
if strings.HasSuffix(name, ".partial") {
continue
}

// Full snapshot: snapshot-{slot}-{hash}.tar.zst
if len(name) > 9 && name[:9] == "snapshot-" && filepath.Ext(name) == ".zst" {
slot := parseSlotFromSnapshotName(name)
Expand Down Expand Up @@ -2065,6 +2159,30 @@ func parseSlotFromIncrementalName(name string) uint64 {
return 0
}

// downloadIncrementalForFullSnapshot attempts to download an incremental snapshot
// that builds on the given full snapshot slot.
func downloadIncrementalForFullSnapshot(ctx context.Context, rpcEndpoints []string, fullSlot uint64, downloadPath string) (string, error) {
if downloadPath == "" {
return "", fmt.Errorf("no download path specified")
}

// Query current slot to get reference
currentSlot, err := queryCurrentSlot(ctx, rpcEndpoints)
if err != nil {
return "", fmt.Errorf("could not query current slot: %w", err)
}

mlog.Log.Infof("searching for incremental snapshot (full=%d, current=%d)", fullSlot, currentSlot)

// Use snapshotdl to find and download an incremental
incrPath, _, _, err := snapshotdl.DownloadIncrementalSnapshot(rpcEndpoints, downloadPath, int(currentSlot), int(fullSlot))
if err != nil {
return "", fmt.Errorf("failed to download incremental: %w", err)
}

return incrPath, nil
}

// detectFreshSnapshot checks for an existing snapshot file within the freshness threshold.
// Returns the snapshotInfo if found, nil otherwise.
func detectFreshSnapshot(snapshotDir string, fullThreshold int, rpcEndpoints []string, ctx context.Context) *snapshotInfo {
Expand Down Expand Up @@ -2141,7 +2259,7 @@ func buildFromExistingSnapshot(ctx context.Context, snap *snapshotInfo, snapshot
// Create progress display for extract
dp := progress.NewDualProgress()

accountsDb, manifest, err := snapshot.BuildAccountsDbWithIncr(ctx, fullSnapshotPath, snapshotDir, int(snap.slot), int(snap.slot), accountsPath, rpcEndpoints, blockstorePath, snapCfg, dp)
accountsDb, manifest, err := snapshot.BuildAccountsDbWithIncr(ctx, fullSnapshotPath, snapshotDir, int(snap.slot), int(snap.slot), accountsPath, rpcEndpoints, blockstorePath, snapCfg, dp, nil)
if err != nil {
return nil, nil, fmt.Errorf("failed to build AccountsDB from snapshot: %w", err)
}
Expand All @@ -2150,38 +2268,150 @@ func buildFromExistingSnapshot(ctx context.Context, snap *snapshotInfo, snapshot
return accountsDb, manifest, nil
}

// downloadAndBuildFromSnapshot finds, downloads, and builds AccountsDB from a snapshot
// downloadAndBuildFromSnapshot finds, downloads, and builds AccountsDB from a snapshot.
// Supports interactive source switching during download - press 'n' to try the next source.
func downloadAndBuildFromSnapshot(ctx context.Context, rpcEndpoints []string, snapshotDownloadPath, accountsPath, blockstorePath string) (*accountsdb.AccountsDb, *snapshot.SnapshotManifest, error) {
snapCfg := buildSnapshotConfig(rpcEndpoints)
fullSnapshotDlStart := time.Now()
fullSnapshotInfo, err := snapshotdl.GetSnapshotURLWithInfo(ctx, snapCfg)

// Set logging info for detailed speed test log
snapCfg.LogDir = logDir
snapCfg.RunID = replay.CurrentRunID

// Get all ranked snapshot sources (runs Stage 1 + Stage 2 testing)
sourceSelector, err := snapshotdl.GetRankedSnapshotSources(ctx, snapCfg)
if err != nil {
return nil, nil, fmt.Errorf("error getting snapshot URL: %w", err)
return nil, nil, fmt.Errorf("error getting snapshot sources: %w", err)
}
defer sourceSelector.Close()

// Get initial source
currentSource := sourceSelector.Current()
if currentSource == nil {
return nil, nil, fmt.Errorf("no snapshot sources available")
}
fullSnapshotURL := fullSnapshotInfo.URL
fullSnapshotSlot := fullSnapshotInfo.Slot

// Print a clean summary of the selected snapshot source
progress.PrintSnapshotSourceSummary(
fullSnapshotInfo.NodeIP,
fullSnapshotInfo.Slot,
fullSnapshotInfo.ReferenceSlot,
fullSnapshotInfo.NodeVersion,
fullSnapshotInfo.SpeedMBs,
fullSnapshotInfo.RTTMs,
time.Since(fullSnapshotDlStart),
currentSource.NodeIP,
currentSource.Slot,
currentSource.ReferenceSlot,
currentSource.Version,
currentSource.SpeedMBs,
currentSource.RTTMs,
sourceSelector.SearchTime,
)

// Create progress display for snapshot download and extract
dp := progress.NewDualProgress()

accountsDb, manifest, err := snapshot.BuildAccountsDbWithIncr(ctx, fullSnapshotURL, snapshotDownloadPath, fullSnapshotSlot, fullSnapshotSlot, accountsPath, rpcEndpoints, blockstorePath, snapCfg, dp)
if err != nil {
return nil, nil, fmt.Errorf("failed to build AccountsDB from snapshot: %w", err)
}
mlog.Log.Infof("finished building AccountsDB")
// Track if source switch was requested
var sourceSwitchRequested atomic.Bool

return accountsDb, manifest, nil
// Try sources with interactive switching support
for {
// Check if parent context was cancelled
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}

currentSource = sourceSelector.Current()
if currentSource == nil {
return nil, nil, fmt.Errorf("exhausted all %d snapshot sources", sourceSelector.TotalSources())
}

// Create a cancellable context for this download attempt
downloadCtx, cancelDownload := context.WithCancel(ctx)

// Format source info for display
sourceInfo := fmt.Sprintf("Source %d/%d: %s (%.1f MB/s)",
sourceSelector.CurrentIndex()+1,
sourceSelector.TotalSources(),
currentSource.NodeIP,
currentSource.SpeedMBs,
)

// Enable source switching UI on progress bar
sourceSwitchRequested.Store(false)
dp.EnableSourceSwitching(sourceInfo, func() {
if sourceSelector.HasMore() {
sourceSwitchRequested.Store(true)
cancelDownload()
mlog.Log.Infof("User requested source switch - cancelling current download...")
} else {
mlog.Log.Infof("No more sources available to switch to")
}
})

// Attempt download from current source
accountsDb, manifest, err := snapshot.BuildAccountsDbWithIncr(
downloadCtx,
currentSource.URL,
snapshotDownloadPath,
currentSource.Slot,
currentSource.Slot,
accountsPath,
rpcEndpoints,
blockstorePath,
snapCfg,
dp,
sourceSelector, // Pass selector for cached incremental source lookup
)

// Disable source switching after this attempt
dp.DisableSourceSwitching()
cancelDownload() // Clean up context

// Check results
if err == nil {
// Success!
mlog.Log.Infof("finished building AccountsDB")
return accountsDb, manifest, nil
}

// Handle source switch request
if sourceSwitchRequested.Load() || (downloadCtx.Err() != nil && ctx.Err() == nil) {
// Source switch was requested or download was cancelled (but not parent ctx)
nextSource := sourceSelector.Next()
if nextSource == nil {
return nil, nil, fmt.Errorf("exhausted all %d snapshot sources after user-initiated switch", sourceSelector.TotalSources())
}

// Clean up any partial download
snapshot.CleanAccountsDbDir(accountsPath)

// Update source info and continue
mlog.Log.Infof("Switching to source %d/%d: %s (%.1f MB/s)",
sourceSelector.CurrentIndex()+1,
sourceSelector.TotalSources(),
nextSource.NodeIP,
nextSource.SpeedMBs,
)
continue
}

// Actual error (not source switch)
// Check if parent context was cancelled
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}

// Try next source automatically on error
mlog.Log.Errorf("Download from %s failed: %v", currentSource.NodeIP, err)
nextSource := sourceSelector.Next()
if nextSource == nil {
return nil, nil, fmt.Errorf("failed to download from all %d sources, last error: %w", sourceSelector.TotalSources(), err)
}

// Clean up any partial download
snapshot.CleanAccountsDbDir(accountsPath)

mlog.Log.Infof("Trying next source %d/%d: %s (%.1f MB/s)",
sourceSelector.CurrentIndex()+1,
sourceSelector.TotalSources(),
nextSource.NodeIP,
nextSource.SpeedMBs,
)
}
}

// killExistingMithrilProcesses finds and kills any other running mithril processes.
Expand Down
Loading