Skip to content

Commit 0d8ba54

Browse files
committed
adapt to connect retry mechanism
use the proper return type for the client interface fix stopConnector override with new retry mechanism
1 parent cb06d48 commit 0d8ba54

File tree

2 files changed

+21
-14
lines changed

2 files changed

+21
-14
lines changed

api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -233,14 +233,19 @@ public Mono<Void> pauseConnector(String connectorName) throws WebClientResponseE
233233
return withRetryOnConflictOrRebalance(super.pauseConnector(connectorName));
234234
}
235235

236+
@Override
237+
public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
238+
return withRetryOnConflictOrRebalance(super.pauseConnectorWithHttpInfo(connectorName));
239+
}
240+
236241
@Override
237242
public Mono<Void> stopConnector(String connectorName) throws WebClientResponseException {
238-
return withRetryOnConflict(super.stopConnector(connectorName));
243+
return withRetryOnConflictOrRebalance(super.stopConnector(connectorName));
239244
}
240245

241246
@Override
242-
public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
243-
return withRetryOnConflictOrRebalance(super.pauseConnectorWithHttpInfo(connectorName));
247+
public Mono<ResponseEntity<Void>> stopConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
248+
return withRetryOnConflictOrRebalance(super.stopConnectorWithHttpInfo(connectorName));
244249
}
245250

246251
@Override
@@ -266,6 +271,18 @@ public Mono<ResponseEntity<Void>> restartConnectorTaskWithHttpInfo(String connec
266271
return withRetryOnConflictOrRebalance(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
267272
}
268273

274+
@Override
275+
public Mono<Void> resetConnectorOffsets(String connectorName)
276+
throws WebClientResponseException {
277+
return withRetryOnConflictOrRebalance(super.resetConnectorOffsets(connectorName));
278+
}
279+
280+
@Override
281+
public Mono<ResponseEntity<Void>> resetConnectorOffsetsWithHttpInfo(String connectorName)
282+
throws WebClientResponseException {
283+
return withRetryOnConflictOrRebalance(super.resetConnectorOffsetsWithHttpInfo(connectorName));
284+
}
285+
269286
@Override
270287
public Mono<Void> resumeConnector(String connectorName) throws WebClientResponseException {
271288
return withRetryOnRebalance(super.resumeConnector(connectorName));

api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java

+1-11
Original file line numberDiff line numberDiff line change
@@ -59,20 +59,10 @@ public void setUp() {
5959
"file", "/tmp/test",
6060
"test.password", "test-credentials")))
6161
.exchange()
62+
.expectStatus().isOk()
6263
.expectBody()
6364
.returnResult();
6465

65-
webTestClient.get()
66-
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}",
67-
LOCAL, connectName, connectorName)
68-
.exchange()
69-
.expectStatus().isOk();
70-
71-
// Kafka Connect may return transient HTTP 500 errors during rebalances
72-
if (creationResult.getStatus() != HttpStatus.OK) {
73-
log.warn(
74-
"Ignoring a transient error while setting up the tested connector, because it has been created anyway.");
75-
}
7666
}
7767

7868
@AfterEach

0 commit comments

Comments
 (0)