Skip to content

Commit 1f11735

Browse files
garyrussellartembilan
authored andcommitted
GH-1800: Add KafkaNullAwarePayloadArgumentResolver
Resolves #1800 Move the code to a top-level class and document its use.
1 parent f173887 commit 1f11735

File tree

3 files changed

+76
-35
lines changed

3 files changed

+76
-35
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4510,6 +4510,7 @@ This lets you further customize listener deserialization without changing the de
45104510

45114511
IMPORTANT: Setting a custom `MessageHandlerMethodFactory` on the `KafkaListenerEndpointRegistrar` through a `KafkaListenerConfigurer` bean disables this feature.
45124512

4513+
[[custom-arg-resolve]]
45134514
===== Adding custom `HandlerMethodArgumentResolver` to `@KafkaListener`
45144515

45154516
Starting with version 2.4.2 you are able to add your own `HandlerMethodArgumentResolver` and resolve custom method parameters.
@@ -4545,6 +4546,12 @@ class CustomKafkaConfig implements KafkaListenerConfigurer {
45454546
----
45464547
====
45474548

4549+
You can also completely replace the framework's argument resolution by adding a custom `MessageHandlerMethodFactory` to the `KafkaListenerEndpointRegistrar` bean.
4550+
If you do this, and your application needs to handle tombstone records, with a `null` `value()` (e.g. from a compacted topic), you should add a `KafkaNullAwarePayloadArgumentResolver` to the factory; it must be the last resolver because it supports all types and can match arguments without a `@Payload` annotation.
4551+
If you are using a `DefaultMessageHandlerMethodFactory`, set this resolver as the last custom resolver; the factory will ensure that this resolver will be used before the standard `PayloadMethodArgumentResolver`, which has no knowledge of `KafkaNull` payloads.
4552+
4553+
See also <<tombstones>>.
4554+
45484555
[[headers]]
45494556
==== Message Headers
45504557

@@ -4770,6 +4777,9 @@ Note that the argument is `null`, not `KafkaNull`.
47704777

47714778
TIP: See <<tip-assign-all-parts>>.
47724779

4780+
IMPORTANT: This feature requires the use of a `KafkaNullAwarePayloadArgumentResolver` which the framework will configure when using the default `MessageHandlerMethodFactory`.
4781+
When using a custom `MessageHandlerMethodFactory`, see <<custom-arg-resolve>>.
4782+
47734783
[[annotation-error-handling]]
47744784
==== Handling Exceptions
47754785

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import org.springframework.context.ConfigurableApplicationContext;
6565
import org.springframework.context.expression.StandardBeanExpressionResolver;
6666
import org.springframework.core.MethodIntrospector;
67-
import org.springframework.core.MethodParameter;
6867
import org.springframework.core.OrderComparator;
6968
import org.springframework.core.Ordered;
7069
import org.springframework.core.annotation.AnnotatedElementUtils;
@@ -87,16 +86,12 @@
8786
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
8887
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
8988
import org.springframework.kafka.retrytopic.RetryTopicInternalBeanNames;
90-
import org.springframework.kafka.support.KafkaNull;
9189
import org.springframework.kafka.support.TopicPartitionOffset;
9290
import org.springframework.lang.Nullable;
93-
import org.springframework.messaging.Message;
9491
import org.springframework.messaging.converter.GenericMessageConverter;
95-
import org.springframework.messaging.converter.MessageConverter;
9692
import org.springframework.messaging.converter.SmartMessageConverter;
9793
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
9894
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
99-
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
10095
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
10196
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
10297
import org.springframework.util.Assert;
@@ -1112,36 +1107,6 @@ public String getConversationId() {
11121107

11131108
}
11141109

1115-
private static class KafkaNullAwarePayloadArgumentResolver extends PayloadMethodArgumentResolver {
1116-
1117-
KafkaNullAwarePayloadArgumentResolver(MessageConverter messageConverter, Validator validator) {
1118-
super(messageConverter, validator);
1119-
}
1120-
1121-
@Override
1122-
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception { // NOSONAR
1123-
Object resolved = super.resolveArgument(parameter, message);
1124-
/*
1125-
* Replace KafkaNull list elements with null.
1126-
*/
1127-
if (resolved instanceof List) {
1128-
List<?> list = ((List<?>) resolved);
1129-
for (int i = 0; i < list.size(); i++) {
1130-
if (list.get(i) instanceof KafkaNull) {
1131-
list.set(i, null);
1132-
}
1133-
}
1134-
}
1135-
return resolved;
1136-
}
1137-
1138-
@Override
1139-
protected boolean isEmptyPayload(Object payload) {
1140-
return payload == null || payload instanceof KafkaNull;
1141-
}
1142-
1143-
}
1144-
11451110
/**
11461111
* Post processes each set of annotation attributes.
11471112
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.annotation;
18+
19+
import java.util.List;
20+
21+
import org.springframework.core.MethodParameter;
22+
import org.springframework.kafka.support.KafkaNull;
23+
import org.springframework.messaging.Message;
24+
import org.springframework.messaging.converter.MessageConverter;
25+
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
26+
import org.springframework.validation.Validator;
27+
28+
/**
29+
* {@link PayloadMethodArgumentResolver} that can properly decode {@link KafkaNull}
30+
* payloads, returning {@code null}. When using a custom
31+
* {@link org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory},
32+
* add this resolver if you need to handle tombstone records with null values.
33+
*
34+
* @author Gary Russell
35+
* @since 2.7.4
36+
*
37+
*/
38+
public class KafkaNullAwarePayloadArgumentResolver extends PayloadMethodArgumentResolver {
39+
40+
KafkaNullAwarePayloadArgumentResolver(MessageConverter messageConverter, Validator validator) {
41+
super(messageConverter, validator);
42+
}
43+
44+
@Override
45+
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception { // NOSONAR
46+
Object resolved = super.resolveArgument(parameter, message);
47+
/*
48+
* Replace KafkaNull list elements with null.
49+
*/
50+
if (resolved instanceof List) {
51+
List<?> list = ((List<?>) resolved);
52+
for (int i = 0; i < list.size(); i++) {
53+
if (list.get(i) instanceof KafkaNull) {
54+
list.set(i, null);
55+
}
56+
}
57+
}
58+
return resolved;
59+
}
60+
61+
@Override
62+
protected boolean isEmptyPayload(Object payload) {
63+
return payload == null || payload instanceof KafkaNull;
64+
}
65+
66+
}

0 commit comments

Comments
 (0)