Conversation
There was a problem hiding this comment.
Pull request overview
Refactors the Kafka pipeline task to avoid shared mutable state across concurrent task_concurrency workers by moving retry/cancellation-related variables to per-run scope, and updates a test pipeline to exercise Kafka group consumption with concurrency.
Changes:
- Remove
ctxand retry counters from thekafkastruct; make retry counters local to eachread()invocation. - Update Kafka
read/writehelpers to accept acontext.Contextparameter rather than relying on struct state. - Update
test/pipelines/kafka_group_read.yamlto run the Kafka task withtask_concurrency: 3(plus minor whitespace cleanups elsewhere).
Reviewed changes
Copilot reviewed 3 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
internal/pkg/pipeline/task/kafka/kafka.go |
Removes shared mutable state from the Kafka task and makes retry tracking per worker run. |
test/pipelines/kafka_group_read.yaml |
Enables concurrent execution for the Kafka group-read test pipeline. |
internal/pkg/pipeline/task/converter/sst.go |
Removes trailing whitespace. |
internal/pkg/pipeline/task/compress/compress.go |
Removes an extra blank line. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| GroupID string `yaml:"group_id,omitempty" json:"group_id,omitempty"` // the consumer group id (optional) | ||
| BatchSize int `yaml:"batch_size,omitempty" json:"batch_size,omitempty"` // number of messages to read/write in a batch | ||
| RetryLimit *int `yaml:"retry_limit,omitempty" json:"retry_limit,omitempty"` // number of retries for read errors | ||
| ExitOnEmpty bool `yaml:"exit_on_empty,omitempty" json:"exit_on_empty,omitempty"` // exit when no more messages are available |
There was a problem hiding this comment.
Any reason we are removing this?
There was a problem hiding this comment.
The major reason for that, is the fact that ExitOnEmpty is "a syntax sugar" and this state can be emulated with a timeout 100d and 5 retries. It simplifies code and person understanding of taks
Description
Handles the concurrent task runs by removing variables from state that are shared between goroutines.
Changes
This pull request mainly refactors the Kafka pipeline task to improve context handling and retry logic, making the retry counters local to each run instead of being stored on the struct. It also removes unnecessary struct fields and updates method signatures for clarity and correctness. Additionally, a test pipeline is updated to set task concurrency.
Kafka Task Refactoring:
ctx,readErrorRetries, andemptyReadRetriesfields from thekafkastruct, and moved retry counters to be local variables within thereadmethod. This makes the retry logic per-run and avoids shared state issues. [1] [2]writeandreadto accept acontext.Contextparameter (runCtx) instead of using a struct field, improving context propagation and cancellation handling. [1] [2] [3] [4]handleReadErrorto accept pointers to the local retry counters, and updated its usage accordingly. [1] [2]Test Pipeline Update:
task_concurrency: 3to the Kafka task intest/pipelines/kafka_group_read.yamlto enable concurrent task execution in tests.Minor Code Cleanup:
Types of changes
Tests
Checklist