diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java index 98918c33e..aaedb0418 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java @@ -508,7 +508,13 @@ protected void trackEventsProcessedProgress(int recordCount) { } protected ConsumerRecords consumerPoll(long pollInterval) { - return _consumer.poll(pollInterval); + if (pollInterval == 0) { + // Brooklin calls poll with 0 pollInterval when a task is newly initialized. There's a behavior change between old + // and new poll in this case. We need to understand that behavior better before removing usages of deprecated API + return _consumer.poll(pollInterval); + } else { + return _consumer.poll(Duration.ofMillis(pollInterval)); + } } /** diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java index 676a170c9..e32b9ed8a 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java @@ -423,8 +423,12 @@ protected void maybeCommitOffsets(Consumer consumer, boolean hardCommit) { if (_enablePartitionAssignment && _consumerAssignment.isEmpty()) { // Kafka rejects a poll if there is empty assignment return ConsumerRecords.EMPTY; - } else { + } else if (pollInterval == 0) { + // BMM calls poll with 0 pollInterval when a task is newly initialized. There's a behavior change between old poll + // and new poll in this case. We need to understand that behavior better before removing usages of deprecated API return _consumer.poll(pollInterval); + } else { + return _consumer.poll(Duration.ofMillis(pollInterval)); } } diff --git a/gradle/maven.gradle b/gradle/maven.gradle index c1a52034c..91465b20e 100644 --- a/gradle/maven.gradle +++ b/gradle/maven.gradle @@ -1,5 +1,5 @@ allprojects { - version = "5.3.0-SNAPSHOT" + version = "5.3.2-SNAPSHOT" } subprojects {