Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 2 additions & 46 deletions pkg/snapshot/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,13 @@ type shardRequest struct {
v accountsdb.AccountIndexEntry
}

// ShardProgressCallback is called with (bytesDone, totalBytes) to report shard flush progress
type ShardProgressCallback func(bytesDone, totalBytes int64)

// ShardLogger manages multiple sharded log files
type ShardLogger struct {
shards []*shard
filePrefix string
wg *sync.WaitGroup
flushSem *semaphore.Weighted

// Progress tracking
totalBytes atomic.Int64 // total bytes written to shard logs
bytesDone atomic.Int64 // bytes flushed to cache
onProgress ShardProgressCallback

// closed flag to prevent sends after Close is called (defensive)
closed atomic.Bool
}
Expand All @@ -58,7 +50,6 @@ type shard struct {
requests chan shardRequest
logSize int
flushSem *semaphore.Weighted
parent *ShardLogger // parent for progress reporting
}

// NewShardLogger creates a new ShardLogger with the specified number of
Expand All @@ -77,31 +68,15 @@ func NewShardLogger(numShards int, filePrefix string) *ShardLogger {

sl.wg.Add(numShards)
for i := range numShards {
sl.shards[i] = newShard(i, filePrefix, sl.flushSem, sl)
sl.shards[i] = newShard(i, filePrefix, sl.flushSem)
go sl.shards[i].processRequests(sl.wg)
}

return sl
}

// SetProgressCallback sets a callback to receive progress updates during shard flushes.
// The callback receives (bytesDone, totalBytes) and is called as bytes are flushed to cache.
func (sl *ShardLogger) SetProgressCallback(cb ShardProgressCallback) {
sl.onProgress = cb
}

// TotalBytes returns the total bytes written to shard logs
func (sl *ShardLogger) TotalBytes() int64 {
return sl.totalBytes.Load()
}

// BytesDone returns the bytes that have been flushed to cache
func (sl *ShardLogger) BytesDone() int64 {
return sl.bytesDone.Load()
}

// newShard creates a new shard with the given ID
func newShard(id int, filePrefix string, flushSem *semaphore.Weighted, parent *ShardLogger) *shard {
func newShard(id int, filePrefix string, flushSem *semaphore.Weighted) *shard {
filename := filepath.Join(filePrefix, fmt.Sprintf("%03d", id))
file, err := os.Create(filename)
if err != nil {
Expand All @@ -114,7 +89,6 @@ func newShard(id int, filePrefix string, flushSem *semaphore.Weighted, parent *S
file: file,
requests: make(chan shardRequest, 100),
flushSem: flushSem,
parent: parent,
}

return s
Expand All @@ -135,16 +109,6 @@ func (s *shard) processRequests(wg *sync.WaitGroup) {

bytesWritten := int64(len(req.k) + vlen)
s.logSize += int(bytesWritten)

// Track total bytes for progress reporting and notify callback
if s.parent != nil {
total := s.parent.totalBytes.Add(bytesWritten)
if s.parent.onProgress != nil {
// Notify with bytesDone=0 during streaming (before flush)
// The callback can use totalBytes to show indexing progress
s.parent.onProgress(0, total)
}
}
}
}

Expand Down Expand Up @@ -195,14 +159,6 @@ func (s *shard) logToSST(ctx context.Context) error {
pairs[i].k = solana.PublicKey(buf[:32])
pairs[i].v.Unmarshal((*[24]byte)(buf[32:56]))
i++

// Track progress
if s.parent != nil {
done := s.parent.bytesDone.Add(recordSize)
if s.parent.onProgress != nil {
s.parent.onProgress(done, s.parent.totalBytes.Load())
}
}
}

// Truncate file and replace file/writer pointers
Expand Down