Skip to content

Commit 2cab170

Browse files
committed
Sonar Fixes
1 parent 1875a6c commit 2cab170

14 files changed

+43
-32
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchErrorHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import org.apache.kafka.clients.consumer.Consumer;
2020
import org.apache.kafka.clients.consumer.ConsumerRecords;
2121

22+
import org.springframework.lang.Nullable;
23+
2224
/**
2325
* Handles errors thrown during the execution of a {@link BatchMessageListener}.
2426
* The listener should communicate which position(s) in the list failed in the
@@ -37,7 +39,7 @@ public interface BatchErrorHandler extends GenericErrorHandler<ConsumerRecords<?
3739
* @param consumer the consumer.
3840
* @param container the container.
3941
*/
40-
default void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
42+
default void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
4143
MessageListenerContainer container) {
4244
handle(thrownException, data);
4345
}
@@ -51,7 +53,7 @@ default void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consu
5153
* @param invokeListener a callback to re-invoke the listener.
5254
* @since 2.3.7
5355
*/
54-
default void handle(Exception thrownException, ConsumerRecords<?, ?> data,
56+
default void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data,
5557
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
5658

5759
handle(thrownException, data);

spring-kafka/src/main/java/org/springframework/kafka/listener/ConditionalDelegatingBatchErrorHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
* @since 2.7.4
3535
*
3636
*/
37-
public class ConditionalDelegatingBatchErrorHandler implements ContainerAwareBatchErrorHandler {
37+
public class ConditionalDelegatingBatchErrorHandler implements ListenerInvokingBatchErrorHandler {
3838

3939
private final ContainerAwareBatchErrorHandler defaultErrorHandler;
4040

@@ -70,21 +70,21 @@ public void addDelegate(Class<? extends Throwable> throwable, ContainerAwareBatc
7070
}
7171

7272
@Override
73-
public void handle(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
73+
public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
7474
MessageListenerContainer container) {
7575

7676
// Never called but, just in case
7777
doHandle(thrownException, records, consumer, container, null);
7878
}
7979

8080
@Override
81-
public void handle(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
81+
public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
8282
MessageListenerContainer container, Runnable invokeListener) {
8383

8484
doHandle(thrownException, records, consumer, container, invokeListener);
8585
}
8686

87-
protected void doHandle(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
87+
protected void doHandle(Exception thrownException, @Nullable ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
8888
MessageListenerContainer container, @Nullable Runnable invokeListener) {
8989

9090
Throwable cause = thrownException;

spring-kafka/src/main/java/org/springframework/kafka/listener/ConditionalDelegatingErrorHandler.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ public void addDelegate(Class<? extends Throwable> throwable, ContainerAwareErro
7474
public void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
7575
MessageListenerContainer container) {
7676

77-
boolean handled = false;
7877
Throwable cause = thrownException;
7978
if (cause instanceof ListenerExecutionFailedException) {
8079
cause = thrownException.getCause();
@@ -83,7 +82,6 @@ public void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?
8382
Class<? extends Throwable> causeClass = cause.getClass();
8483
for (Entry<Class<? extends Throwable>, ContainerAwareErrorHandler> entry : this.delegates.entrySet()) {
8584
if (entry.getKey().isAssignableFrom(causeClass)) {
86-
handled = true;
8785
entry.getValue().handle(thrownException, records, consumer, container);
8886
return;
8987
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareBatchErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ default void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> d
3838
}
3939

4040
@Override
41-
void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer);
41+
void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer);
4242

4343
@Override
4444
default void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerAwareBatchErrorHandler.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import org.apache.kafka.clients.consumer.Consumer;
2020
import org.apache.kafka.clients.consumer.ConsumerRecords;
2121

22+
import org.springframework.lang.Nullable;
23+
2224
/**
2325
* An error handler that has access to the batch of records from the last poll the
2426
* consumer, and the container.
@@ -31,12 +33,12 @@
3133
public interface ContainerAwareBatchErrorHandler extends ConsumerAwareBatchErrorHandler {
3234

3335
@Override
34-
default void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer) {
36+
default void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer) {
3537
throw new UnsupportedOperationException("Container should never call this");
3638
}
3739

3840
@Override
39-
void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
41+
void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
4042
MessageListenerContainer container);
4143

4244
/**
@@ -50,8 +52,8 @@ void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?
5052
*/
5153
@Override
5254
@SuppressWarnings("unused")
53-
default void handle(Exception thrownException, ConsumerRecords<?, ?> data,
54-
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
55+
default void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data,
56+
Consumer<?, ?> consumer, MessageListenerContainer container, @Nullable Runnable invokeListener) {
5557

5658
handle(thrownException, data, consumer, container);
5759
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerStoppingBatchErrorHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.springframework.core.task.SimpleAsyncTaskExecutor;
2525
import org.springframework.kafka.KafkaException;
26+
import org.springframework.lang.Nullable;
2627
import org.springframework.util.Assert;
2728

2829
/**
@@ -55,7 +56,7 @@ public ContainerStoppingBatchErrorHandler(Executor executor) {
5556
}
5657

5758
@Override
58-
public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
59+
public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
5960
MessageListenerContainer container) {
6061
this.executor.execute(() -> container.stop());
6162
// isRunning is false before the container.stop() waits for listener thread

spring-kafka/src/main/java/org/springframework/kafka/listener/GenericErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public interface GenericErrorHandler<T> {
4545
* @param data the data.
4646
* @param consumer the consumer.
4747
*/
48-
default void handle(Exception thrownException, T data, Consumer<?, ?> consumer) {
48+
default void handle(Exception thrownException, @Nullable T data, Consumer<?, ?> consumer) {
4949
handle(thrownException, data);
5050
}
5151

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1534,16 +1534,19 @@ private void doResumeConsumerIfNeccessary() {
15341534

15351535
private void pausePartitionsIfNecessary() {
15361536
Set<TopicPartition> pausedConsumerPartitions = this.consumer.paused();
1537-
List<TopicPartition> partitionsToPause = getAssignedPartitions()
1538-
.stream()
1539-
.filter(tp -> isPartitionPauseRequested(tp)
1540-
&& !pausedConsumerPartitions.contains(tp))
1541-
.collect(Collectors.toList());
1542-
if (partitionsToPause.size() > 0) {
1543-
this.consumer.pause(partitionsToPause);
1544-
this.pausedPartitions.addAll(partitionsToPause);
1545-
this.logger.debug(() -> "Paused consumption from " + partitionsToPause);
1546-
partitionsToPause.forEach(KafkaMessageListenerContainer.this::publishConsumerPartitionPausedEvent);
1537+
Collection<TopicPartition> partitions = getAssignedPartitions();
1538+
if (partitions != null) {
1539+
List<TopicPartition> partitionsToPause = partitions
1540+
.stream()
1541+
.filter(tp -> isPartitionPauseRequested(tp)
1542+
&& !pausedConsumerPartitions.contains(tp))
1543+
.collect(Collectors.toList());
1544+
if (partitionsToPause.size() > 0) {
1545+
this.consumer.pause(partitionsToPause);
1546+
this.pausedPartitions.addAll(partitionsToPause);
1547+
this.logger.debug(() -> "Paused consumption from " + partitionsToPause);
1548+
partitionsToPause.forEach(KafkaMessageListenerContainer.this::publishConsumerPartitionPausedEvent);
1549+
}
15471550
}
15481551
}
15491552

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerInvokingBatchErrorHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import org.apache.kafka.clients.consumer.Consumer;
2020
import org.apache.kafka.clients.consumer.ConsumerRecords;
2121

22+
import org.springframework.lang.Nullable;
23+
2224
/**
2325
* A batch error handler that is capable of invoking the listener during error handling.
2426
*
@@ -30,14 +32,14 @@
3032
public interface ListenerInvokingBatchErrorHandler extends ContainerAwareBatchErrorHandler {
3133

3234
@Override
33-
default void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
35+
default void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
3436
MessageListenerContainer container) {
3537

3638
throw new UnsupportedOperationException("Container should never call this");
3739
}
3840

3941
@Override
40-
void handle(Exception thrownException, ConsumerRecords<?, ?> records,
42+
void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> records,
4143
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener);
4244

4345
}

spring-kafka/src/main/java/org/springframework/kafka/listener/RecoveringBatchErrorHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,12 @@ private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {
155155
return i;
156156
}
157157

158-
private void seekOrRecover(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
158+
private void seekOrRecover(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
159159
MessageListenerContainer container, int indexArg) {
160160

161+
if (data == null) {
162+
return;
163+
}
161164
Iterator<?> iterator = data.iterator();
162165
List<ConsumerRecord<?, ?>> toCommit = new ArrayList<>();
163166
List<ConsumerRecord<?, ?>> remaining = new ArrayList<>();

spring-kafka/src/main/java/org/springframework/kafka/listener/RemainingRecordsErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ default void handle(Exception thrownException, @Nullable ConsumerRecord<?, ?> da
4747
* @param records the remaining records including the one that failed.
4848
* @param consumer the consumer.
4949
*/
50-
void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer);
50+
void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer);
5151

5252
@Override
5353
default void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records,

spring-kafka/src/main/java/org/springframework/kafka/listener/RetryingBatchErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
9393
}
9494

9595
@Override
96-
public void handle(Exception thrownException, ConsumerRecords<?, ?> records,
96+
public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> records,
9797
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
9898

9999
if (records == null || records.count() == 0) {

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
109109
public void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records,
110110
Consumer<?, ?> consumer, MessageListenerContainer container) {
111111

112-
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(),
112+
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
113113
getRecoveryStrategy(records, thrownException), this.logger, getLogLevel());
114114
}
115115

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ public static void seekOrRecover(Exception thrownException, @Nullable List<Consu
202202
}
203203
}
204204

205-
if (records == null || !doSeeks(records, consumer, thrownException, true, recovery, container, logger)) {
205+
if (records == null || !doSeeks(records, consumer, thrownException, true, recovery, container, logger)) { // NOSONAR
206206
throw new KafkaException("Seek to current after exception", level, thrownException);
207207
}
208208
if (commitRecovered) {

0 commit comments

Comments
 (0)