Skip to content

Comments

[Kafka] remove mutable state items#44

Open
alephys26 wants to merge 4 commits intomainfrom
alephys26/handle-concurrent-kafka
Open

[Kafka] remove mutable state items#44
alephys26 wants to merge 4 commits intomainfrom
alephys26/handle-concurrent-kafka

Conversation

@alephys26
Copy link
Contributor

Description

Handles the concurrent task runs by removing variables from state that are shared between goroutines.

Changes

This pull request mainly refactors the Kafka pipeline task to improve context handling and retry logic, making the retry counters local to each run instead of being stored on the struct. It also removes unnecessary struct fields and updates method signatures for clarity and correctness. Additionally, a test pipeline is updated to set task concurrency.

Kafka Task Refactoring:

  • Removed ctx, readErrorRetries, and emptyReadRetries fields from the kafka struct, and moved retry counters to be local variables within the read method. This makes the retry logic per-run and avoids shared state issues. [1] [2]
  • Updated method signatures for write and read to accept a context.Context parameter (runCtx) instead of using a struct field, improving context propagation and cancellation handling. [1] [2] [3] [4]
  • Refactored handleReadError to accept pointers to the local retry counters, and updated its usage accordingly. [1] [2]

Test Pipeline Update:

  • Added task_concurrency: 3 to the Kafka task in test/pipelines/kafka_group_read.yaml to enable concurrent task execution in tests.

Minor Code Cleanup:

  • Removed an unnecessary blank line in the compress task for consistency.

Types of changes

  • Docs change / refactoring / dependency upgrade
  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)

Tests

  • Tested on local build

Checklist

  • My code follows the code style of this project.
  • My change requires a change to the documentation and I have updated the documentation accordingly.
  • I have added tests to cover my changes.

@alephys26 alephys26 requested a review from a team as a code owner February 18, 2026 19:02
Copilot AI review requested due to automatic review settings February 18, 2026 19:02
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors the Kafka pipeline task to avoid shared mutable state across concurrent task_concurrency workers by moving retry/cancellation-related variables to per-run scope, and updates a test pipeline to exercise Kafka group consumption with concurrency.

Changes:

  • Remove ctx and retry counters from the kafka struct; make retry counters local to each read() invocation.
  • Update Kafka read/write helpers to accept a context.Context parameter rather than relying on struct state.
  • Update test/pipelines/kafka_group_read.yaml to run the Kafka task with task_concurrency: 3 (plus minor whitespace cleanups elsewhere).

Reviewed changes

Copilot reviewed 3 out of 4 changed files in this pull request and generated 1 comment.

File Description
internal/pkg/pipeline/task/kafka/kafka.go Removes shared mutable state from the Kafka task and makes retry tracking per worker run.
test/pipelines/kafka_group_read.yaml Enables concurrent execution for the Kafka group-read test pipeline.
internal/pkg/pipeline/task/converter/sst.go Removes trailing whitespace.
internal/pkg/pipeline/task/compress/compress.go Removes an extra blank line.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

GroupID string `yaml:"group_id,omitempty" json:"group_id,omitempty"` // the consumer group id (optional)
BatchSize int `yaml:"batch_size,omitempty" json:"batch_size,omitempty"` // number of messages to read/write in a batch
RetryLimit *int `yaml:"retry_limit,omitempty" json:"retry_limit,omitempty"` // number of retries for read errors
ExitOnEmpty bool `yaml:"exit_on_empty,omitempty" json:"exit_on_empty,omitempty"` // exit when no more messages are available
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we are removing this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The major reason for that, is the fact that ExitOnEmpty is "a syntax sugar" and this state can be emulated with a timeout 100d and 5 retries. It simplifies code and person understanding of taks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants