Skip to content

Commit c34eb76

Browse files
garyrussellartembilan
authored andcommitted
GH-1855: Fix Call to RetryingBatchErrorHandler
Resolves #1855 Unsupported op when a consumer exception occurs. **cherry-pick to all 2.x.x (down to and including 2.3.x)**
1 parent 23f0404 commit c34eb76

File tree

3 files changed

+41
-2
lines changed

3 files changed

+41
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1602,7 +1602,8 @@ protected void handleConsumerException(Exception e) {
16021602
}
16031603
else if (this.isBatchListener && this.batchErrorHandler != null) {
16041604
this.batchErrorHandler.handle(e, new ConsumerRecords<K, V>(Collections.emptyMap()), this.consumer,
1605-
KafkaMessageListenerContainer.this.thisOrParentContainer);
1605+
KafkaMessageListenerContainer.this.thisOrParentContainer, () -> {
1606+
});
16061607
}
16071608
else {
16081609
this.logger.error(e, "Consumer exception");

spring-kafka/src/main/java/org/springframework/kafka/listener/RetryingBatchErrorHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
9696
public void handle(Exception thrownException, ConsumerRecords<?, ?> records,
9797
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
9898

99+
if (records.count() == 0) {
100+
LOGGER.error(thrownException, "Called with no records; consumer exception");
101+
return;
102+
}
99103
BackOffExecution execution = this.backOff.start();
100104
long nextBackOff = execution.nextBackOff();
101105
String failed = null;

spring-kafka/src/test/java/org/springframework/kafka/listener/RetryingBatchErrorHandlerIntegrationTests.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,10 @@
1717
package org.springframework.kafka.listener;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.isNull;
22+
import static org.mockito.BDDMockito.given;
23+
import static org.mockito.Mockito.mock;
2024

2125
import java.util.List;
2226
import java.util.Map;
@@ -28,15 +32,18 @@
2832
import org.apache.kafka.clients.consumer.Consumer;
2933
import org.apache.kafka.clients.consumer.ConsumerConfig;
3034
import org.apache.kafka.clients.consumer.ConsumerRecord;
35+
import org.apache.kafka.clients.consumer.ConsumerRecords;
3136
import org.apache.kafka.common.TopicPartition;
3237
import org.junit.jupiter.api.BeforeAll;
3338
import org.junit.jupiter.api.Test;
3439

40+
import org.springframework.kafka.core.ConsumerFactory;
3541
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
3642
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
3743
import org.springframework.kafka.core.KafkaOperations;
3844
import org.springframework.kafka.core.KafkaTemplate;
3945
import org.springframework.kafka.event.ConsumerStoppedEvent;
46+
import org.springframework.kafka.support.TopicPartitionOffset;
4047
import org.springframework.kafka.test.EmbeddedKafkaBroker;
4148
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
4249
import org.springframework.kafka.test.context.EmbeddedKafka;
@@ -206,4 +213,31 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
206213
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
207214
}
208215

216+
@SuppressWarnings("unchecked")
217+
@Test
218+
void consumerEx() throws InterruptedException {
219+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
220+
Consumer<Integer, String> consumer = mock(Consumer.class);
221+
given(consumer.poll(any())).willThrow(new RuntimeException("test"));
222+
given(cf.createConsumer(any(), any(), isNull(), any())).willReturn(consumer);
223+
ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset("foo", 0));
224+
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
225+
containerProps);
226+
CountDownLatch called = new CountDownLatch(1);
227+
container.setBatchErrorHandler(new RetryingBatchErrorHandler() {
228+
229+
@Override
230+
public void handle(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
231+
MessageListenerContainer container, Runnable invokeListener) {
232+
233+
called.countDown();
234+
super.handle(thrownException, records, consumer, container, invokeListener);
235+
}
236+
});
237+
container.setupMessageListener((BatchMessageListener<Integer, String>) (recs -> { }));
238+
container.start();
239+
assertThat(called.await(10, TimeUnit.SECONDS)).isTrue();
240+
container.stop();
241+
}
242+
209243
}

0 commit comments

Comments
 (0)