Skip to content

Commit 3d9e3c3

Browse files
mp911dechristophstrobl
authored andcommitted
Differentiate between initial exception handling, recovery and recovery after subscription.
We now differentiate exception handling regarding the recovery state. Initial listen fails if the connection is unavailable. Upon recovery after a preceeding subscription we now log the success to create a counterpart to our error logging. Closes: #2782 Original Pull Request: #2808 # Conflicts: # src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java # src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java
1 parent 95b0bde commit 3d9e3c3

File tree

2 files changed

+140
-12
lines changed

2 files changed

+140
-12
lines changed

src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java

+71-12
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ private void lazyListen() {
371371
State state = this.state.get();
372372

373373
CompletableFuture<Void> futureToAwait = state.isPrepareListening() ? containerListenFuture
374-
: lazyListen(this.backOff.start());
374+
: lazyListen(new InitialBackoffExecution(this.backOff.start()));
375375

376376
try {
377377
futureToAwait.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
@@ -537,8 +537,7 @@ private void awaitRegistrationTime(CompletableFuture<Void> future) {
537537
future.get(getMaxSubscriptionRegistrationWaitingTime(), TimeUnit.MILLISECONDS);
538538
} catch (InterruptedException ex) {
539539
Thread.currentThread().interrupt();
540-
} catch (ExecutionException | TimeoutException ignore) {
541-
}
540+
} catch (ExecutionException | TimeoutException ignore) {}
542541
}
543542

544543
@Override
@@ -890,8 +889,13 @@ protected void handleSubscriptionException(CompletableFuture<Void> future, BackO
890889

891890
Runnable recoveryFunction = () -> {
892891

893-
CompletableFuture<Void> lazyListen = lazyListen(backOffExecution);
894-
lazyListen.whenComplete(propagate(future));
892+
CompletableFuture<Void> lazyListen = lazyListen(new RecoveryBackoffExecution(backOffExecution));
893+
lazyListen.whenComplete(propagate(future)).thenRun(() -> {
894+
895+
if (backOffExecution instanceof RecoveryAfterSubscriptionBackoffExecution) {
896+
logger.info("Subscription(s) recovered");
897+
}
898+
});
895899
};
896900

897901
if (potentiallyRecover(loggingBackOffExecution, recoveryFunction)) {
@@ -984,7 +988,7 @@ private boolean hasTopics() {
984988
private Subscriber getRequiredSubscriber() {
985989

986990
Assert.state(this.subscriber != null,
987-
"Subscriber not created; Configure RedisConnectionFactory to create a Subscriber");
991+
"Subscriber not created; Configure RedisConnectionFactory to create a Subscriber. Make sure that afterPropertiesSet() has been called");
988992

989993
return this.subscriber;
990994
}
@@ -1015,6 +1019,54 @@ private void logTrace(Supplier<String> message) {
10151019
}
10161020
}
10171021

1022+
BackOffExecution nextBackoffExecution(BackOffExecution backOffExecution, boolean subscribed) {
1023+
1024+
if (subscribed) {
1025+
return new RecoveryAfterSubscriptionBackoffExecution(backOff.start());
1026+
}
1027+
1028+
return backOffExecution;
1029+
}
1030+
1031+
/**
1032+
* Marker for an initial backoff.
1033+
*
1034+
* @param delegate
1035+
*/
1036+
record InitialBackoffExecution(BackOffExecution delegate) implements BackOffExecution {
1037+
1038+
@Override
1039+
public long nextBackOff() {
1040+
return delegate.nextBackOff();
1041+
}
1042+
}
1043+
1044+
/**
1045+
* Marker for a recovery after a subscription has been active previously.
1046+
*
1047+
* @param delegate
1048+
*/
1049+
record RecoveryAfterSubscriptionBackoffExecution(BackOffExecution delegate) implements BackOffExecution {
1050+
1051+
@Override
1052+
public long nextBackOff() {
1053+
return delegate.nextBackOff();
1054+
}
1055+
}
1056+
1057+
/**
1058+
* Marker for a recovery execution.
1059+
*
1060+
* @param delegate
1061+
*/
1062+
record RecoveryBackoffExecution(BackOffExecution delegate) implements BackOffExecution {
1063+
1064+
@Override
1065+
public long nextBackOff() {
1066+
return delegate.nextBackOff();
1067+
}
1068+
}
1069+
10181070
/**
10191071
* Represents an operation that accepts three input arguments {@link SubscriptionListener},
10201072
* {@code channel or pattern}, and {@code count} and returns no result.
@@ -1189,10 +1241,15 @@ public CompletableFuture<Void> initialize(BackOffExecution backOffExecution, Col
11891241
try {
11901242
eventuallyPerformSubscription(connection, backOffExecution, initFuture, patterns, channels);
11911243
} catch (Throwable t) {
1192-
handleSubscriptionException(initFuture, backOffExecution, t);
1244+
handleSubscriptionException(initFuture, nextBackoffExecution(backOffExecution, connection.isSubscribed()),
1245+
t);
11931246
}
11941247
} catch (RuntimeException ex) {
1195-
initFuture.completeExceptionally(ex);
1248+
if (backOffExecution instanceof InitialBackoffExecution) {
1249+
initFuture.completeExceptionally(ex);
1250+
} else {
1251+
handleSubscriptionException(initFuture, backOffExecution, ex);
1252+
}
11961253
}
11971254

11981255
return initFuture;
@@ -1205,8 +1262,9 @@ public CompletableFuture<Void> initialize(BackOffExecution backOffExecution, Col
12051262
void eventuallyPerformSubscription(RedisConnection connection, BackOffExecution backOffExecution,
12061263
CompletableFuture<Void> subscriptionDone, Collection<byte[]> patterns, Collection<byte[]> channels) {
12071264

1208-
addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels,
1209-
() -> subscriptionDone.complete(null)));
1265+
addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels, () -> {
1266+
subscriptionDone.complete(null);
1267+
}));
12101268

12111269
doSubscribe(connection, patterns, channels);
12121270
}
@@ -1412,7 +1470,7 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
14121470
try {
14131471
subscribeChannel(channels.toArray(new byte[0][]));
14141472
} catch (Exception ex) {
1415-
handleSubscriptionException(subscriptionDone, backOffExecution, ex);
1473+
handleSubscriptionException(subscriptionDone, nextBackoffExecution(backOffExecution, true), ex);
14161474
}
14171475
}));
14181476
} else {
@@ -1429,7 +1487,8 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff
14291487
closeConnection();
14301488
unsubscribeFuture.complete(null);
14311489
} catch (Throwable cause) {
1432-
handleSubscriptionException(subscriptionDone, backOffExecution, cause);
1490+
handleSubscriptionException(subscriptionDone,
1491+
nextBackoffExecution(backOffExecution, connection.isSubscribed()), cause);
14331492
}
14341493
});
14351494
}

src/test/java/org/springframework/data/redis/listener/RedisMessageListenerContainerUnitTests.java

+69
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919
import static org.mockito.Mockito.*;
2020

2121
import java.nio.charset.StandardCharsets;
22+
import java.util.concurrent.CountDownLatch;
2223
import java.util.concurrent.Executor;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.AtomicInteger;
2326

2427
import org.junit.jupiter.api.BeforeEach;
2528
import org.junit.jupiter.api.Test;
2629

30+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
2731
import org.springframework.core.task.SyncTaskExecutor;
2832
import org.springframework.data.redis.RedisConnectionFailureException;
2933
import org.springframework.data.redis.connection.RedisConnection;
@@ -33,6 +37,7 @@
3337
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
3438
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
3539
import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException;
40+
import org.springframework.util.backoff.FixedBackOff;
3641

3742
/**
3843
* Unit tests for {@link RedisMessageListenerContainer}.
@@ -148,6 +153,70 @@ void containerListenShouldReportFailureOnRedisUnavailability() {
148153
assertThat(container.isListening()).isFalse();
149154
}
150155

156+
@Test // GH-2335
157+
void shouldRecoverFromConnectionFailure() throws Exception {
158+
159+
AtomicInteger requestCount = new AtomicInteger();
160+
AtomicBoolean shouldThrowSubscriptionException = new AtomicBoolean();
161+
162+
container = new RedisMessageListenerContainer();
163+
container.setConnectionFactory(connectionFactoryMock);
164+
container.setBeanName("container");
165+
container.setTaskExecutor(new SyncTaskExecutor());
166+
container.setSubscriptionExecutor(new SimpleAsyncTaskExecutor());
167+
container.setMaxSubscriptionRegistrationWaitingTime(1000);
168+
container.setRecoveryBackoff(new FixedBackOff(1, 5));
169+
container.afterPropertiesSet();
170+
171+
doAnswer(it -> {
172+
173+
int req = requestCount.incrementAndGet();
174+
if (req == 1 || req == 3) {
175+
return connectionMock;
176+
}
177+
178+
throw new RedisConnectionFailureException("Booh");
179+
}).when(connectionFactoryMock).getConnection();
180+
181+
CountDownLatch exceptionWait = new CountDownLatch(1);
182+
CountDownLatch armed = new CountDownLatch(1);
183+
CountDownLatch recoveryArmed = new CountDownLatch(1);
184+
185+
doAnswer(it -> {
186+
187+
SubscriptionListener listener = it.getArgument(0);
188+
when(connectionMock.isSubscribed()).thenReturn(true);
189+
190+
listener.onChannelSubscribed("a".getBytes(StandardCharsets.UTF_8), 1);
191+
192+
armed.countDown();
193+
exceptionWait.await();
194+
195+
if (shouldThrowSubscriptionException.compareAndSet(true, false)) {
196+
when(connectionMock.isSubscribed()).thenReturn(false);
197+
throw new RedisConnectionFailureException("Disconnected");
198+
}
199+
200+
recoveryArmed.countDown();
201+
202+
return null;
203+
}).when(connectionMock).subscribe(any(), any());
204+
205+
container.start();
206+
container.addMessageListener(new MessageListenerAdapter(handler), new ChannelTopic("a"));
207+
armed.await();
208+
209+
// let an exception happen
210+
shouldThrowSubscriptionException.set(true);
211+
exceptionWait.countDown();
212+
213+
// wait for subscription recovery
214+
recoveryArmed.await();
215+
216+
assertThat(recoveryArmed.getCount()).isZero();
217+
218+
}
219+
151220
@Test // GH-964
152221
void failsOnDuplicateInit() {
153222
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> container.afterPropertiesSet());

0 commit comments

Comments
 (0)