Skip to content

Commit dbf9a26

Browse files
committed
Take ClientBulkWriteException into account in OperationExecutor.execute, ClientSession.withTransaction
JAVA-5527
1 parent 4846e0b commit dbf9a26

File tree

4 files changed

+37
-10
lines changed

4 files changed

+37
-10
lines changed

driver-core/src/main/com/mongodb/internal/operation/OperationHelper.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
package com.mongodb.internal.operation;
1818

19+
import com.mongodb.ClientBulkWriteException;
1920
import com.mongodb.MongoClientException;
21+
import com.mongodb.MongoException;
2022
import com.mongodb.WriteConcern;
2123
import com.mongodb.client.cursor.TimeoutMode;
2224
import com.mongodb.client.model.Collation;
@@ -47,7 +49,10 @@
4749
import static com.mongodb.internal.operation.ServerVersionHelper.serverIsLessThanVersionFourDotTwo;
4850
import static java.lang.String.format;
4951

50-
final class OperationHelper {
52+
/**
53+
* This class is not part of the public API and may be removed or changed at any time.
54+
*/
55+
public final class OperationHelper {
5156
public static final Logger LOGGER = Loggers.getLogger("operation");
5257

5358
static void validateCollationAndWriteConcern(@Nullable final Collation collation, final WriteConcern writeConcern) {
@@ -202,6 +207,20 @@ static void setNonTailableCursorMaxTimeSupplier(final TimeoutMode timeoutMode, f
202207
}
203208
}
204209

210+
/**
211+
* Returns the {@link MongoException} that carries or should carry
212+
* the {@linkplain MongoException#getCode() error code} and {@linkplain MongoException#getErrorLabels() error labels}.
213+
* This method is needed because exceptions like {@link ClientBulkWriteException} do not carry that data themselves.
214+
*/
215+
public static MongoException unwrap(final MongoException exception) {
216+
MongoException result = exception;
217+
if (exception instanceof ClientBulkWriteException) {
218+
result = ((ClientBulkWriteException) exception).getError().orElse(exception);
219+
}
220+
return result;
221+
}
222+
223+
205224
/**
206225
* This internal exception is used to
207226
* <ul>

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext;
3434
import com.mongodb.internal.operation.AsyncReadOperation;
3535
import com.mongodb.internal.operation.AsyncWriteOperation;
36+
import com.mongodb.internal.operation.OperationHelper;
3637
import com.mongodb.lang.Nullable;
3738
import com.mongodb.reactivestreams.client.ClientSession;
3839
import com.mongodb.reactivestreams.client.ReactiveContextProvider;
@@ -96,8 +97,9 @@ public <T> Mono<T> execute(final AsyncReadOperation<T> operation, final ReadPref
9697
sinkToCallback(sink).onResult(result, t);
9798
}
9899
})).doOnError((t) -> {
99-
labelException(session, t);
100-
unpinServerAddressOnTransientTransactionError(session, t);
100+
Throwable exceptionToHandle = t instanceof MongoException ? OperationHelper.unwrap((MongoException) t) : t;
101+
labelException(session, exceptionToHandle);
102+
unpinServerAddressOnTransientTransactionError(session, exceptionToHandle);
101103
});
102104
}
103105
}).subscribe(subscriber)
@@ -126,8 +128,9 @@ public <T> Mono<T> execute(final AsyncWriteOperation<T> operation, final ReadCon
126128
sinkToCallback(sink).onResult(result, t);
127129
}
128130
})).doOnError((t) -> {
129-
labelException(session, t);
130-
unpinServerAddressOnTransientTransactionError(session, t);
131+
Throwable exceptionToHandle = t instanceof MongoException ? OperationHelper.unwrap((MongoException) t) : t;
132+
labelException(session, exceptionToHandle);
133+
unpinServerAddressOnTransientTransactionError(session, exceptionToHandle);
131134
})
132135
).subscribe(subscriber)
133136
);

driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.mongodb.internal.TimeoutContext;
3131
import com.mongodb.internal.operation.AbortTransactionOperation;
3232
import com.mongodb.internal.operation.CommitTransactionOperation;
33+
import com.mongodb.internal.operation.OperationHelper;
3334
import com.mongodb.internal.operation.ReadOperation;
3435
import com.mongodb.internal.operation.WriteConcernHelper;
3536
import com.mongodb.internal.operation.WriteOperation;
@@ -241,7 +242,8 @@ public <T> T withTransaction(final TransactionBody<T> transactionBody, final Tra
241242
abortTransaction();
242243
}
243244
if (e instanceof MongoException && !(e instanceof MongoOperationTimeoutException)) {
244-
if (((MongoException) e).hasErrorLabel(TRANSIENT_TRANSACTION_ERROR_LABEL)
245+
MongoException exceptionToHandle = OperationHelper.unwrap((MongoException) e);
246+
if (exceptionToHandle.hasErrorLabel(TRANSIENT_TRANSACTION_ERROR_LABEL)
245247
&& ClientSessionClock.INSTANCE.now() - startTime < MAX_RETRY_TIME_LIMIT_MS) {
246248
continue;
247249
}

driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import com.mongodb.internal.connection.Cluster;
5454
import com.mongodb.internal.connection.OperationContext;
5555
import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext;
56+
import com.mongodb.internal.operation.OperationHelper;
5657
import com.mongodb.internal.operation.ReadOperation;
5758
import com.mongodb.internal.operation.WriteOperation;
5859
import com.mongodb.internal.session.ServerSessionPool;
@@ -398,8 +399,9 @@ public <T> T execute(final ReadOperation<T> operation, final ReadPreference read
398399
}
399400
return operation.execute(binding);
400401
} catch (MongoException e) {
401-
labelException(actualClientSession, e);
402-
clearTransactionContextOnTransientTransactionError(session, e);
402+
MongoException exceptionToHandle = OperationHelper.unwrap(e);
403+
labelException(actualClientSession, exceptionToHandle);
404+
clearTransactionContextOnTransientTransactionError(session, exceptionToHandle);
403405
throw e;
404406
} finally {
405407
binding.release();
@@ -419,8 +421,9 @@ public <T> T execute(final WriteOperation<T> operation, final ReadConcern readCo
419421
try {
420422
return operation.execute(binding);
421423
} catch (MongoException e) {
422-
labelException(actualClientSession, e);
423-
clearTransactionContextOnTransientTransactionError(session, e);
424+
MongoException exceptionToHandle = OperationHelper.unwrap(e);
425+
labelException(actualClientSession, exceptionToHandle);
426+
clearTransactionContextOnTransientTransactionError(session, exceptionToHandle);
424427
throw e;
425428
} finally {
426429
binding.release();

0 commit comments

Comments
 (0)