Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,13 @@ protected void trackEventsProcessedProgress(int recordCount) {
}

protected ConsumerRecords<?, ?> consumerPoll(long pollInterval) {
return _consumer.poll(pollInterval);
if (pollInterval == 0) {
// Brooklin calls poll with 0 pollInterval when a task is newly initialized. There's a behavior change between old
// and new poll in this case. We need to understand that behavior better before removing usages of deprecated API
return _consumer.poll(pollInterval);
} else {
return _consumer.poll(Duration.ofMillis(pollInterval));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,12 @@ protected void maybeCommitOffsets(Consumer<?, ?> consumer, boolean hardCommit) {
if (_enablePartitionAssignment && _consumerAssignment.isEmpty()) {
// Kafka rejects a poll if there is empty assignment
return ConsumerRecords.EMPTY;
} else {
} else if (pollInterval == 0) {
// BMM calls poll with 0 pollInterval when a task is newly initialized. There's a behavior change between old poll
// and new poll in this case. We need to understand that behavior better before removing usages of deprecated API
return _consumer.poll(pollInterval);
} else {
return _consumer.poll(Duration.ofMillis(pollInterval));
}
}

Expand Down
2 changes: 1 addition & 1 deletion gradle/maven.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
allprojects {
version = "5.3.0-SNAPSHOT"
version = "5.3.2-SNAPSHOT"
}

subprojects {
Expand Down