22
22
import java .util .Objects ;
23
23
import javax .annotation .Nullable ;
24
24
import lombok .extern .slf4j .Slf4j ;
25
+ import org .apache .commons .lang3 .StringUtils ;
25
26
import org .springframework .http .ResponseEntity ;
26
27
import org .springframework .util .unit .DataSize ;
27
28
import org .springframework .web .client .RestClientException ;
@@ -51,14 +52,36 @@ private static Retry conflictCodeRetry() {
51
52
(WebClientResponseException .Conflict ) signal .failure ()));
52
53
}
53
54
54
- private static <T > Mono <T > withRetryOnConflict (Mono <T > publisher ) {
55
- return publisher .retryWhen (conflictCodeRetry ());
55
+ private static @ NotNull Retry retryOnRebalance () {
56
+ return Retry .fixedDelay (MAX_RETRIES , RETRIES_DELAY ).filter (e -> {
57
+
58
+ if (e instanceof WebClientResponseException .InternalServerError exception ) {
59
+ final var errorMessage = getMessage (exception );
60
+ return StringUtils .equals (errorMessage ,
61
+ // From https://github.com/apache/kafka/blob/dfc07e0e0c6e737a56a5402644265f634402b864/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2340
62
+ "Request cannot be completed because a rebalance is expected" );
63
+ }
64
+ return false ;
65
+ });
66
+ }
67
+
68
+ private static <T > Mono <T > withRetryOnConflictOrRebalance (Mono <T > publisher ) {
69
+ return publisher
70
+ .retryWhen (retryOnRebalance ())
71
+ .retryWhen (conflictCodeRetry ());
72
+ }
73
+
74
+ private static <T > Flux <T > withRetryOnConflictOrRebalance (Flux <T > publisher ) {
75
+ return publisher
76
+ .retryWhen (retryOnRebalance ())
77
+ .retryWhen (conflictCodeRetry ());
56
78
}
57
79
58
- private static <T > Flux <T > withRetryOnConflict ( Flux <T > publisher ) {
59
- return publisher .retryWhen (conflictCodeRetry ());
80
+ private static <T > Mono <T > withRetryOnRebalance ( Mono <T > publisher ) {
81
+ return publisher .retryWhen (retryOnRebalance ());
60
82
}
61
83
84
+
62
85
private static <T > Mono <T > withBadRequestErrorHandling (Mono <T > publisher ) {
63
86
return publisher
64
87
.onErrorResume (WebClientResponseException .BadRequest .class ,
@@ -73,197 +96,200 @@ private record ErrorMessage(@NotNull @JsonProperty("message") String message) {
73
96
}
74
97
75
98
private static <T > @ NotNull Mono <T > parseConnectErrorMessage (WebClientResponseException parseException ) {
99
+ return Mono .error (new ValidationException (getMessage (parseException )));
100
+ }
101
+
102
+ private static String getMessage (WebClientResponseException parseException ) {
76
103
final var errorMessage = parseException .getResponseBodyAs (ErrorMessage .class );
77
- return Mono .error (new ValidationException (
78
- Objects .requireNonNull (errorMessage ,
79
- // see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
80
- "This should not happen according to the ConnectExceptionMapper" )
81
- .message ()));
104
+ return Objects .requireNonNull (errorMessage ,
105
+ // see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
106
+ "This should not happen according to the ConnectExceptionMapper" )
107
+ .message ();
82
108
}
83
109
84
110
@ Override
85
111
public Mono <Connector > createConnector (NewConnector newConnector ) throws RestClientException {
86
112
return withBadRequestErrorHandling (
87
- super .createConnector (newConnector )
113
+ withRetryOnRebalance ( super .createConnector (newConnector ) )
88
114
);
89
115
}
90
116
91
117
@ Override
92
118
public Mono <Connector > setConnectorConfig (String connectorName , Map <String , Object > requestBody )
93
119
throws RestClientException {
94
120
return withBadRequestErrorHandling (
95
- super .setConnectorConfig (connectorName , requestBody )
121
+ withRetryOnRebalance ( super .setConnectorConfig (connectorName , requestBody ) )
96
122
);
97
123
}
98
124
99
125
@ Override
100
126
public Mono <ResponseEntity <Connector >> createConnectorWithHttpInfo (NewConnector newConnector )
101
127
throws WebClientResponseException {
102
- return withRetryOnConflict (super .createConnectorWithHttpInfo (newConnector ));
128
+ return withRetryOnConflictOrRebalance (super .createConnectorWithHttpInfo (newConnector ));
103
129
}
104
130
105
131
@ Override
106
132
public Mono <Void > deleteConnector (String connectorName ) throws WebClientResponseException {
107
- return withRetryOnConflict (super .deleteConnector (connectorName ));
133
+ return withRetryOnConflictOrRebalance (super .deleteConnector (connectorName ));
108
134
}
109
135
110
136
@ Override
111
137
public Mono <ResponseEntity <Void >> deleteConnectorWithHttpInfo (String connectorName )
112
138
throws WebClientResponseException {
113
- return withRetryOnConflict (super .deleteConnectorWithHttpInfo (connectorName ));
139
+ return withRetryOnConflictOrRebalance (super .deleteConnectorWithHttpInfo (connectorName ));
114
140
}
115
141
116
142
117
143
@ Override
118
144
public Mono <Connector > getConnector (String connectorName ) throws WebClientResponseException {
119
- return withRetryOnConflict (super .getConnector (connectorName ));
145
+ return withRetryOnConflictOrRebalance (super .getConnector (connectorName ));
120
146
}
121
147
122
148
@ Override
123
149
public Mono <ResponseEntity <Connector >> getConnectorWithHttpInfo (String connectorName )
124
150
throws WebClientResponseException {
125
- return withRetryOnConflict (super .getConnectorWithHttpInfo (connectorName ));
151
+ return withRetryOnConflictOrRebalance (super .getConnectorWithHttpInfo (connectorName ));
126
152
}
127
153
128
154
@ Override
129
155
public Mono <Map <String , Object >> getConnectorConfig (String connectorName ) throws WebClientResponseException {
130
- return withRetryOnConflict (super .getConnectorConfig (connectorName ));
156
+ return withRetryOnConflictOrRebalance (super .getConnectorConfig (connectorName ));
131
157
}
132
158
133
159
@ Override
134
160
public Mono <ResponseEntity <Map <String , Object >>> getConnectorConfigWithHttpInfo (String connectorName )
135
161
throws WebClientResponseException {
136
- return withRetryOnConflict (super .getConnectorConfigWithHttpInfo (connectorName ));
162
+ return withRetryOnConflictOrRebalance (super .getConnectorConfigWithHttpInfo (connectorName ));
137
163
}
138
164
139
165
@ Override
140
166
public Flux <ConnectorPlugin > getConnectorPlugins () throws WebClientResponseException {
141
- return withRetryOnConflict (super .getConnectorPlugins ());
167
+ return withRetryOnConflictOrRebalance (super .getConnectorPlugins ());
142
168
}
143
169
144
170
@ Override
145
171
public Mono <ResponseEntity <List <ConnectorPlugin >>> getConnectorPluginsWithHttpInfo ()
146
172
throws WebClientResponseException {
147
- return withRetryOnConflict (super .getConnectorPluginsWithHttpInfo ());
173
+ return withRetryOnConflictOrRebalance (super .getConnectorPluginsWithHttpInfo ());
148
174
}
149
175
150
176
@ Override
151
177
public Mono <ConnectorStatus > getConnectorStatus (String connectorName ) throws WebClientResponseException {
152
- return withRetryOnConflict (super .getConnectorStatus (connectorName ));
178
+ return withRetryOnConflictOrRebalance (super .getConnectorStatus (connectorName ));
153
179
}
154
180
155
181
@ Override
156
182
public Mono <ResponseEntity <ConnectorStatus >> getConnectorStatusWithHttpInfo (String connectorName )
157
183
throws WebClientResponseException {
158
- return withRetryOnConflict (super .getConnectorStatusWithHttpInfo (connectorName ));
184
+ return withRetryOnConflictOrRebalance (super .getConnectorStatusWithHttpInfo (connectorName ));
159
185
}
160
186
161
187
@ Override
162
188
public Mono <TaskStatus > getConnectorTaskStatus (String connectorName , Integer taskId )
163
189
throws WebClientResponseException {
164
- return withRetryOnConflict (super .getConnectorTaskStatus (connectorName , taskId ));
190
+ return withRetryOnConflictOrRebalance (super .getConnectorTaskStatus (connectorName , taskId ));
165
191
}
166
192
167
193
@ Override
168
194
public Mono <ResponseEntity <TaskStatus >> getConnectorTaskStatusWithHttpInfo (String connectorName , Integer taskId )
169
195
throws WebClientResponseException {
170
- return withRetryOnConflict (super .getConnectorTaskStatusWithHttpInfo (connectorName , taskId ));
196
+ return withRetryOnConflictOrRebalance (super .getConnectorTaskStatusWithHttpInfo (connectorName , taskId ));
171
197
}
172
198
173
199
@ Override
174
200
public Flux <ConnectorTask > getConnectorTasks (String connectorName ) throws WebClientResponseException {
175
- return withRetryOnConflict (super .getConnectorTasks (connectorName ));
201
+ return withRetryOnConflictOrRebalance (super .getConnectorTasks (connectorName ));
176
202
}
177
203
178
204
@ Override
179
205
public Mono <ResponseEntity <List <ConnectorTask >>> getConnectorTasksWithHttpInfo (String connectorName )
180
206
throws WebClientResponseException {
181
- return withRetryOnConflict (super .getConnectorTasksWithHttpInfo (connectorName ));
207
+ return withRetryOnConflictOrRebalance (super .getConnectorTasksWithHttpInfo (connectorName ));
182
208
}
183
209
184
210
@ Override
185
211
public Mono <Map <String , ConnectorTopics >> getConnectorTopics (String connectorName ) throws WebClientResponseException {
186
- return withRetryOnConflict (super .getConnectorTopics (connectorName ));
212
+ return withRetryOnConflictOrRebalance (super .getConnectorTopics (connectorName ));
187
213
}
188
214
189
215
@ Override
190
216
public Mono <ResponseEntity <Map <String , ConnectorTopics >>> getConnectorTopicsWithHttpInfo (String connectorName )
191
217
throws WebClientResponseException {
192
- return withRetryOnConflict (super .getConnectorTopicsWithHttpInfo (connectorName ));
218
+ return withRetryOnConflictOrRebalance (super .getConnectorTopicsWithHttpInfo (connectorName ));
193
219
}
194
220
195
221
@ Override
196
222
public Mono <List <String >> getConnectors (String search ) throws WebClientResponseException {
197
- return withRetryOnConflict (super .getConnectors (search ));
223
+ return withRetryOnConflictOrRebalance (super .getConnectors (search ));
198
224
}
199
225
200
226
@ Override
201
227
public Mono <ResponseEntity <List <String >>> getConnectorsWithHttpInfo (String search ) throws WebClientResponseException {
202
- return withRetryOnConflict (super .getConnectorsWithHttpInfo (search ));
228
+ return withRetryOnConflictOrRebalance (super .getConnectorsWithHttpInfo (search ));
203
229
}
204
230
205
231
@ Override
206
232
public Mono <Void > pauseConnector (String connectorName ) throws WebClientResponseException {
207
- return withRetryOnConflict (super .pauseConnector (connectorName ));
233
+ return withRetryOnConflictOrRebalance (super .pauseConnector (connectorName ));
208
234
}
209
235
210
236
@ Override
211
237
public Mono <ResponseEntity <Void >> pauseConnectorWithHttpInfo (String connectorName ) throws WebClientResponseException {
212
- return withRetryOnConflict (super .pauseConnectorWithHttpInfo (connectorName ));
238
+ return withRetryOnConflictOrRebalance (super .pauseConnectorWithHttpInfo (connectorName ));
213
239
}
214
240
215
241
@ Override
216
242
public Mono <Void > restartConnector (String connectorName , Boolean includeTasks , Boolean onlyFailed )
217
243
throws WebClientResponseException {
218
- return withRetryOnConflict (super .restartConnector (connectorName , includeTasks , onlyFailed ));
244
+ return withRetryOnConflictOrRebalance (super .restartConnector (connectorName , includeTasks , onlyFailed ));
219
245
}
220
246
221
247
@ Override
222
248
public Mono <ResponseEntity <Void >> restartConnectorWithHttpInfo (String connectorName , Boolean includeTasks ,
223
249
Boolean onlyFailed ) throws WebClientResponseException {
224
- return withRetryOnConflict (super .restartConnectorWithHttpInfo (connectorName , includeTasks , onlyFailed ));
250
+ return withRetryOnConflictOrRebalance (super .restartConnectorWithHttpInfo (connectorName , includeTasks , onlyFailed ));
225
251
}
226
252
227
253
@ Override
228
254
public Mono <Void > restartConnectorTask (String connectorName , Integer taskId ) throws WebClientResponseException {
229
- return withRetryOnConflict (super .restartConnectorTask (connectorName , taskId ));
255
+ return withRetryOnConflictOrRebalance (super .restartConnectorTask (connectorName , taskId ));
230
256
}
231
257
232
258
@ Override
233
259
public Mono <ResponseEntity <Void >> restartConnectorTaskWithHttpInfo (String connectorName , Integer taskId )
234
260
throws WebClientResponseException {
235
- return withRetryOnConflict (super .restartConnectorTaskWithHttpInfo (connectorName , taskId ));
261
+ return withRetryOnConflictOrRebalance (super .restartConnectorTaskWithHttpInfo (connectorName , taskId ));
236
262
}
237
263
238
264
@ Override
239
265
public Mono <Void > resumeConnector (String connectorName ) throws WebClientResponseException {
240
- return super .resumeConnector (connectorName );
266
+ return withRetryOnRebalance ( super .resumeConnector (connectorName ) );
241
267
}
242
268
243
269
@ Override
244
270
public Mono <ResponseEntity <Void >> resumeConnectorWithHttpInfo (String connectorName )
245
271
throws WebClientResponseException {
246
- return withRetryOnConflict (super .resumeConnectorWithHttpInfo (connectorName ));
272
+ return withRetryOnConflictOrRebalance (super .resumeConnectorWithHttpInfo (connectorName ));
247
273
}
248
274
249
275
@ Override
250
276
public Mono <ResponseEntity <Connector >> setConnectorConfigWithHttpInfo (String connectorName ,
251
277
Map <String , Object > requestBody )
252
278
throws WebClientResponseException {
253
- return withRetryOnConflict (super .setConnectorConfigWithHttpInfo (connectorName , requestBody ));
279
+ return withRetryOnConflictOrRebalance (super .setConnectorConfigWithHttpInfo (connectorName , requestBody ));
254
280
}
255
281
256
282
@ Override
257
283
public Mono <ConnectorPluginConfigValidationResponse > validateConnectorPluginConfig (String pluginName ,
258
284
Map <String , Object > requestBody )
259
285
throws WebClientResponseException {
260
- return withRetryOnConflict (super .validateConnectorPluginConfig (pluginName , requestBody ));
286
+ return withRetryOnConflictOrRebalance (super .validateConnectorPluginConfig (pluginName , requestBody ));
261
287
}
262
288
263
289
@ Override
264
290
public Mono <ResponseEntity <ConnectorPluginConfigValidationResponse >> validateConnectorPluginConfigWithHttpInfo (
265
291
String pluginName , Map <String , Object > requestBody ) throws WebClientResponseException {
266
- return withRetryOnConflict (super .validateConnectorPluginConfigWithHttpInfo (pluginName , requestBody ));
292
+ return withRetryOnConflictOrRebalance (super .validateConnectorPluginConfigWithHttpInfo (pluginName , requestBody ));
267
293
}
268
294
269
295
private static class RetryingApiClient extends ApiClient {
0 commit comments