Add buffer pools, reduce allocation and free-up reference for GC#40
Open
divyanshu-tiwari wants to merge 4 commits intocontext-at-pipeline-levelfrom
Open
Add buffer pools, reduce allocation and free-up reference for GC#40divyanshu-tiwari wants to merge 4 commits intocontext-at-pipeline-levelfrom
divyanshu-tiwari wants to merge 4 commits intocontext-at-pipeline-levelfrom
Conversation
- record marshalling in SendData allocates a lot of space, using a buffer pool to reuse buffers can reduce this overhead - avoiding []byte -> string -> []byte allocations - sending record clone to branch tasks to avoid GC references to original record - explicitly clear references in task buffers after use to help GC
Contributor
There was a problem hiding this comment.
Pull request overview
This PR improves pipeline memory behavior by reducing allocations during record/context JSON processing, preventing shared references across parallel branches, and switching several string-based transformations to byte-slice operations for more consistent binary handling.
Changes:
- Add
Record.Clone()and update pipeline fan-out to clone records for parallel branches. - Introduce a pooled
bytes.Bufferfor record serialization and context value encoding. - Replace
stringsoperations withbytes/regexpbyte-slice operations in several tasks, and explicitly clear join buffers to aid GC.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/pkg/pipeline/task/task.go | Adds a sync.Pool for bytes.Buffer reuse and uses json.Encoder for record/context encoding. |
| internal/pkg/pipeline/task/split/split.go | Switches splitting to bytes APIs and iterator-based splitting. |
| internal/pkg/pipeline/task/replace/replace.go | Switches regexp replacement to []byte replacement. |
| internal/pkg/pipeline/task/join/join.go | Switches join building to bytes.Buffer and clears buffer elements after flush. |
| internal/pkg/pipeline/task/converter/sst.go | Switches parsing to bytes split APIs and avoids string-based splitting. |
| internal/pkg/pipeline/record/record.go | Adds Clone() deep copy for Record (data + meta). |
| internal/pkg/pipeline/pipeline.go | Updates parallel distribution to send original to one branch and clones to others. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Contributor
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This pull request introduces several improvements to the pipeline framework, focusing on memory efficiency, safer record handling, and consistent use of byte slices for data manipulation. The most important changes include the addition of a deep copy method for records to prevent shared references across pipeline branches, the adoption of
bytesutilities instead ofstringsfor handling record data, and the introduction of buffer pooling to optimize memory usage during record serialization and context processing.Memory management and efficiency:
Clonemethod to theRecordstruct for deep copying records, ensuring that parallel pipeline branches do not share references and allowing independent garbage collection. (internal/pkg/pipeline/record/record.go)byteBufferPool) usingsync.Poolfor efficient reuse ofbytes.Bufferinstances during record serialization and context value encoding, reducing memory allocations. (internal/pkg/pipeline/task/task.go)Safer record distribution and processing:
distributeToChannelsmethod in the pipeline to send the original record to the first branch and cloned records to other branches, preventing unintended data sharing between branches. (internal/pkg/pipeline/pipeline.go)internal/pkg/pipeline/task/join/join.go)Consistent use of byte slices for data manipulation:
stringsutilities withbytesutilities for delimiter-based splitting and joining in tasks such assplit,join,replace, andsst, ensuring consistent handling of binary data and improving performance. [1] [2] [3] [4]Context value encoding improvements:
json.Encoderon pooled buffers, removing trailing newlines and ensuring clean JSON values in record metadata. (internal/pkg/pipeline/task/task.go) [1] [2]### DescriptionTypes of changes
Checklist