48
48
import java .util .Map .Entry ;
49
49
import java .util .Properties ;
50
50
import java .util .Set ;
51
+ import java .util .concurrent .ConcurrentHashMap ;
51
52
import java .util .concurrent .CountDownLatch ;
52
53
import java .util .concurrent .Executors ;
53
54
import java .util .concurrent .TimeUnit ;
@@ -2562,11 +2563,11 @@ public void testPauseResumeAndConsumerSeekAware() throws Exception {
2562
2563
given (consumer .assignment ()).willReturn (records .keySet ());
2563
2564
final CountDownLatch pauseLatch1 = new CountDownLatch (2 ); // consumer, event publisher
2564
2565
final CountDownLatch pauseLatch2 = new CountDownLatch (2 ); // consumer, consumer
2565
- Set <TopicPartition > pausedParts = new HashSet <> ();
2566
+ Set <TopicPartition > pausedParts = ConcurrentHashMap . newKeySet ();
2566
2567
willAnswer (i -> {
2568
+ pausedParts .addAll (i .getArgument (0 ));
2567
2569
pauseLatch1 .countDown ();
2568
2570
pauseLatch2 .countDown ();
2569
- pausedParts .addAll (i .getArgument (0 ));
2570
2571
return null ;
2571
2572
}).given (consumer ).pause (records .keySet ());
2572
2573
given (consumer .paused ()).willReturn (pausedParts );
@@ -2584,8 +2585,8 @@ public void testPauseResumeAndConsumerSeekAware() throws Exception {
2584
2585
});
2585
2586
final CountDownLatch resumeLatch = new CountDownLatch (2 );
2586
2587
willAnswer (i -> {
2587
- resumeLatch .countDown ();
2588
2588
pausedParts .removeAll (i .getArgument (0 ));
2589
+ resumeLatch .countDown ();
2589
2590
return null ;
2590
2591
}).given (consumer ).resume (any ());
2591
2592
willAnswer (invoc -> {
@@ -2701,7 +2702,7 @@ public void dontResumePausedPartition() throws Exception {
2701
2702
given (consumer .assignment ()).willReturn (Set .of (new TopicPartition ("foo" , 0 ), new TopicPartition ("foo" , 1 )));
2702
2703
final CountDownLatch pauseLatch1 = new CountDownLatch (1 );
2703
2704
final CountDownLatch pauseLatch2 = new CountDownLatch (2 );
2704
- Set <TopicPartition > pausedParts = new HashSet <> ();
2705
+ Set <TopicPartition > pausedParts = ConcurrentHashMap . newKeySet ();
2705
2706
willAnswer (i -> {
2706
2707
pausedParts .addAll (i .getArgument (0 ));
2707
2708
pauseLatch1 .countDown ();
@@ -2715,8 +2716,8 @@ public void dontResumePausedPartition() throws Exception {
2715
2716
});
2716
2717
final CountDownLatch resumeLatch = new CountDownLatch (1 );
2717
2718
willAnswer (i -> {
2718
- resumeLatch .countDown ();
2719
2719
pausedParts .removeAll (i .getArgument (0 ));
2720
+ resumeLatch .countDown ();
2720
2721
return null ;
2721
2722
}).given (consumer ).resume (any ());
2722
2723
ContainerProperties containerProps = new ContainerProperties (new TopicPartitionOffset ("foo" , 0 ),
0 commit comments