Skip to content

Commit 503a2a1

Browse files
committed
Add a dropdown option to reset the offsets of stopped connectors
1 parent 6ce7078 commit 503a2a1

File tree

8 files changed

+274
-146
lines changed

8 files changed

+274
-146
lines changed

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

Lines changed: 70 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@
3434
@RequiredArgsConstructor
3535
@Slf4j
3636
public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
37-
private static final Set<ConnectorActionDTO> RESTART_ACTIONS
38-
= Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
37+
private static final Set<ConnectorActionDTO> RESTART_ACTIONS = Set.of(RESTART, RESTART_FAILED_TASKS,
38+
RESTART_ALL_TASKS);
3939
private static final String CONNECTOR_NAME = "connectorName";
4040

4141
private final KafkaConnectService kafkaConnectService;
4242

4343
@Override
4444
public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
45-
ServerWebExchange exchange) {
45+
ServerWebExchange exchange) {
4646

4747
Flux<ConnectDTO> availableConnects = kafkaConnectService.getConnects(getCluster(clusterName))
4848
.filterWhen(dto -> accessControlService.isConnectAccessible(dto, clusterName));
@@ -52,7 +52,7 @@ public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
5252

5353
@Override
5454
public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, String connectName,
55-
ServerWebExchange exchange) {
55+
ServerWebExchange exchange) {
5656

5757
var context = AccessContext.builder()
5858
.cluster(clusterName)
@@ -61,14 +61,15 @@ public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, Stri
6161
.build();
6262

6363
return validateAccess(context)
64-
.thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
64+
.thenReturn(
65+
ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
6566
.doOnEach(sig -> audit(context, sig));
6667
}
6768

6869
@Override
6970
public Mono<ResponseEntity<ConnectorDTO>> createConnector(String clusterName, String connectName,
70-
@Valid Mono<NewConnectorDTO> connector,
71-
ServerWebExchange exchange) {
71+
@Valid Mono<NewConnectorDTO> connector,
72+
ServerWebExchange exchange) {
7273

7374
var context = AccessContext.builder()
7475
.cluster(clusterName)
@@ -78,14 +79,14 @@ public Mono<ResponseEntity<ConnectorDTO>> createConnector(String clusterName, St
7879

7980
return validateAccess(context).then(
8081
kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector)
81-
.map(ResponseEntity::ok)
82-
).doOnEach(sig -> audit(context, sig));
82+
.map(ResponseEntity::ok))
83+
.doOnEach(sig -> audit(context, sig));
8384
}
8485

8586
@Override
8687
public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, String connectName,
87-
String connectorName,
88-
ServerWebExchange exchange) {
88+
String connectorName,
89+
ServerWebExchange exchange) {
8990

9091
var context = AccessContext.builder()
9192
.cluster(clusterName)
@@ -95,14 +96,14 @@ public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, Strin
9596

9697
return validateAccess(context).then(
9798
kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
98-
.map(ResponseEntity::ok)
99-
).doOnEach(sig -> audit(context, sig));
99+
.map(ResponseEntity::ok))
100+
.doOnEach(sig -> audit(context, sig));
100101
}
101102

102103
@Override
103104
public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String connectName,
104-
String connectorName,
105-
ServerWebExchange exchange) {
105+
String connectorName,
106+
ServerWebExchange exchange) {
106107

107108
var context = AccessContext.builder()
108109
.cluster(clusterName)
@@ -113,19 +114,17 @@ public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String con
113114

114115
return validateAccess(context).then(
115116
kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
116-
.map(ResponseEntity::ok)
117-
).doOnEach(sig -> audit(context, sig));
117+
.map(ResponseEntity::ok))
118+
.doOnEach(sig -> audit(context, sig));
118119
}
119120

120-
121121
@Override
122122
public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
123123
String clusterName,
124124
String search,
125125
ConnectorColumnsToSortDTO orderBy,
126126
SortOrderDTO sortOrder,
127-
ServerWebExchange exchange
128-
) {
127+
ServerWebExchange exchange) {
129128
var context = AccessContext.builder()
130129
.cluster(clusterName)
131130
.operationName("getAllConnectors")
@@ -145,9 +144,9 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
145144

146145
@Override
147146
public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clusterName,
148-
String connectName,
149-
String connectorName,
150-
ServerWebExchange exchange) {
147+
String connectName,
148+
String connectorName,
149+
ServerWebExchange exchange) {
151150

152151
var context = AccessContext.builder()
153152
.cluster(clusterName)
@@ -158,15 +157,15 @@ public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clust
158157
return validateAccess(context).then(
159158
kafkaConnectService
160159
.getConnectorConfig(getCluster(clusterName), connectName, connectorName)
161-
.map(ResponseEntity::ok)
162-
).doOnEach(sig -> audit(context, sig));
160+
.map(ResponseEntity::ok))
161+
.doOnEach(sig -> audit(context, sig));
163162
}
164163

165164
@Override
166165
public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName, String connectName,
167-
String connectorName,
168-
Mono<Map<String, Object>> requestBody,
169-
ServerWebExchange exchange) {
166+
String connectorName,
167+
Mono<Map<String, Object>> requestBody,
168+
ServerWebExchange exchange) {
170169

171170
var context = AccessContext.builder()
172171
.cluster(clusterName)
@@ -176,22 +175,22 @@ public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName,
176175
.build();
177176

178177
return validateAccess(context).then(
179-
kafkaConnectService
180-
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
181-
.map(ResponseEntity::ok))
178+
kafkaConnectService
179+
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
180+
.map(ResponseEntity::ok))
182181
.doOnEach(sig -> audit(context, sig));
183182
}
184183

185184
@Override
186185
public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, String connectName,
187-
String connectorName,
188-
ConnectorActionDTO action,
189-
ServerWebExchange exchange) {
186+
String connectorName,
187+
ConnectorActionDTO action,
188+
ServerWebExchange exchange) {
190189
ConnectAction[] connectActions;
191190
if (RESTART_ACTIONS.contains(action)) {
192-
connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.RESTART};
191+
connectActions = new ConnectAction[] { ConnectAction.VIEW, ConnectAction.RESTART };
193192
} else {
194-
connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.EDIT};
193+
connectActions = new ConnectAction[] { ConnectAction.VIEW, ConnectAction.EDIT };
195194
}
196195

197196
var context = AccessContext.builder()
@@ -204,15 +203,15 @@ public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, Strin
204203
return validateAccess(context).then(
205204
kafkaConnectService
206205
.updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
207-
.map(ResponseEntity::ok)
208-
).doOnEach(sig -> audit(context, sig));
206+
.map(ResponseEntity::ok))
207+
.doOnEach(sig -> audit(context, sig));
209208
}
210209

211210
@Override
212211
public Mono<ResponseEntity<Flux<TaskDTO>>> getConnectorTasks(String clusterName,
213-
String connectName,
214-
String connectorName,
215-
ServerWebExchange exchange) {
212+
String connectName,
213+
String connectorName,
214+
ServerWebExchange exchange) {
216215
var context = AccessContext.builder()
217216
.cluster(clusterName)
218217
.connectActions(connectName, ConnectAction.VIEW)
@@ -223,14 +222,14 @@ public Mono<ResponseEntity<Flux<TaskDTO>>> getConnectorTasks(String clusterName,
223222
return validateAccess(context).thenReturn(
224223
ResponseEntity
225224
.ok(kafkaConnectService
226-
.getConnectorTasks(getCluster(clusterName), connectName, connectorName))
227-
).doOnEach(sig -> audit(context, sig));
225+
.getConnectorTasks(getCluster(clusterName), connectName, connectorName)))
226+
.doOnEach(sig -> audit(context, sig));
228227
}
229228

230229
@Override
231230
public Mono<ResponseEntity<Void>> restartConnectorTask(String clusterName, String connectName,
232-
String connectorName, Integer taskId,
233-
ServerWebExchange exchange) {
231+
String connectorName, Integer taskId,
232+
ServerWebExchange exchange) {
234233

235234
var context = AccessContext.builder()
236235
.cluster(clusterName)
@@ -242,8 +241,8 @@ public Mono<ResponseEntity<Void>> restartConnectorTask(String clusterName, Strin
242241
return validateAccess(context).then(
243242
kafkaConnectService
244243
.restartConnectorTask(getCluster(clusterName), connectName, connectorName, taskId)
245-
.map(ResponseEntity::ok)
246-
).doOnEach(sig -> audit(context, sig));
244+
.map(ResponseEntity::ok))
245+
.doOnEach(sig -> audit(context, sig));
247246
}
248247

249248
@Override
@@ -259,8 +258,8 @@ public Mono<ResponseEntity<Flux<ConnectorPluginDTO>>> getConnectorPlugins(
259258
return validateAccess(context).then(
260259
Mono.just(
261260
ResponseEntity.ok(
262-
kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName)))
263-
).doOnEach(sig -> audit(context, sig));
261+
kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName))))
262+
.doOnEach(sig -> audit(context, sig));
264263
}
265264

266265
@Override
@@ -285,4 +284,26 @@ private Comparator<FullConnectorInfoDTO> getConnectorsComparator(ConnectorColumn
285284
default -> defaultComparator;
286285
};
287286
}
287+
288+
@Override
289+
public Mono<ResponseEntity<Void>> resetConnectorOffsets(String clusterName, String connectName,
290+
String connectorName,
291+
ServerWebExchange exchange) {
292+
ConnectAction[] connectActions;
293+
294+
connectActions = new ConnectAction[] { ConnectAction.VIEW, ConnectAction.RESET_OFFSETS };
295+
296+
var context = AccessContext.builder()
297+
.cluster(clusterName)
298+
.connectActions(connectName, connectActions)
299+
.operationName("resetConnectorOffsets")
300+
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
301+
.build();
302+
303+
return validateAccess(context).then(
304+
kafkaConnectService
305+
.resetConnectorOffsets(getCluster(clusterName), connectName, connectorName)
306+
.map(ResponseEntity::ok))
307+
.doOnEach(sig -> audit(context, sig));
308+
}
288309
}

api/src/main/java/io/kafbat/ui/model/rbac/permission/ConnectAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ public enum ConnectAction implements PermissibleAction {
1010
EDIT(VIEW),
1111
CREATE(VIEW),
1212
RESTART(VIEW),
13-
DELETE(VIEW)
13+
DELETE(VIEW),
14+
RESET_OFFSETS(VIEW)
1415

1516
;
1617

@@ -20,7 +21,7 @@ public enum ConnectAction implements PermissibleAction {
2021
this.dependantActions = dependantActions;
2122
}
2223

23-
public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, RESTART);
24+
public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, RESTART, RESET_OFFSETS);
2425

2526
@Nullable
2627
public static ConnectAction fromString(String name) {

0 commit comments

Comments
 (0)