Skip to content

Comments

Add buffer pools, reduce allocation and free-up reference for GC#40

Open
divyanshu-tiwari wants to merge 4 commits intocontext-at-pipeline-levelfrom
add-buffer-pools
Open

Add buffer pools, reduce allocation and free-up reference for GC#40
divyanshu-tiwari wants to merge 4 commits intocontext-at-pipeline-levelfrom
add-buffer-pools

Conversation

@divyanshu-tiwari
Copy link
Contributor

This pull request introduces several improvements to the pipeline framework, focusing on memory efficiency, safer record handling, and consistent use of byte slices for data manipulation. The most important changes include the addition of a deep copy method for records to prevent shared references across pipeline branches, the adoption of bytes utilities instead of strings for handling record data, and the introduction of buffer pooling to optimize memory usage during record serialization and context processing.

Memory management and efficiency:

  • Added a Clone method to the Record struct for deep copying records, ensuring that parallel pipeline branches do not share references and allowing independent garbage collection. (internal/pkg/pipeline/record/record.go)
  • Introduced a buffer pool (byteBufferPool) using sync.Pool for efficient reuse of bytes.Buffer instances during record serialization and context value encoding, reducing memory allocations. (internal/pkg/pipeline/task/task.go)

Safer record distribution and processing:

  • Updated the distributeToChannels method in the pipeline to send the original record to the first branch and cloned records to other branches, preventing unintended data sharing between branches. (internal/pkg/pipeline/pipeline.go)
  • Modified the join task to explicitly nil out buffer elements after flushing, aiding garbage collection and preventing memory leaks. (internal/pkg/pipeline/task/join/join.go)

Consistent use of byte slices for data manipulation:

  • Replaced strings utilities with bytes utilities for delimiter-based splitting and joining in tasks such as split, join, replace, and sst, ensuring consistent handling of binary data and improving performance. [1] [2] [3] [4]

Context value encoding improvements:

  • Changed context value encoding to use json.Encoder on pooled buffers, removing trailing newlines and ensuring clean JSON values in record metadata. (internal/pkg/pipeline/task/task.go) [1] [2]### Description

Types of changes

  • Docs change / refactoring / dependency upgrade
  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)

Checklist

  • My code follows the code style of this project.
  • My change requires a change to the documentation and I have updated the documentation accordingly.
  • I have added tests to cover my changes.

- record marshalling in SendData allocates a lot of space, using a buffer pool to reuse buffers can reduce this overhead
- avoiding []byte -> string -> []byte allocations
- sending record clone to branch tasks to avoid GC references to original record
- explicitly clear references in task buffers after use to help GC
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves pipeline memory behavior by reducing allocations during record/context JSON processing, preventing shared references across parallel branches, and switching several string-based transformations to byte-slice operations for more consistent binary handling.

Changes:

  • Add Record.Clone() and update pipeline fan-out to clone records for parallel branches.
  • Introduce a pooled bytes.Buffer for record serialization and context value encoding.
  • Replace strings operations with bytes/regexp byte-slice operations in several tasks, and explicitly clear join buffers to aid GC.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
internal/pkg/pipeline/task/task.go Adds a sync.Pool for bytes.Buffer reuse and uses json.Encoder for record/context encoding.
internal/pkg/pipeline/task/split/split.go Switches splitting to bytes APIs and iterator-based splitting.
internal/pkg/pipeline/task/replace/replace.go Switches regexp replacement to []byte replacement.
internal/pkg/pipeline/task/join/join.go Switches join building to bytes.Buffer and clears buffer elements after flush.
internal/pkg/pipeline/task/converter/sst.go Switches parsing to bytes split APIs and avoids string-based splitting.
internal/pkg/pipeline/record/record.go Adds Clone() deep copy for Record (data + meta).
internal/pkg/pipeline/pipeline.go Updates parallel distribution to send original to one branch and clones to others.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

divyanshu-tiwari and others added 2 commits February 12, 2026 10:39
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant