19
19
import io .modelcontextprotocol .spec .McpSchema .ClientCapabilities ;
20
20
import io .modelcontextprotocol .spec .McpSchema .InitializeResult ;
21
21
import io .modelcontextprotocol .spec .McpSchema .Root ;
22
- import org .junit .jupiter .api .Disabled ;
23
22
import org .junit .jupiter .api .Test ;
24
23
import reactor .core .publisher .Mono ;
25
24
30
29
class McpAsyncClientResponseHandlerTests {
31
30
32
31
private InitializeResult initialization (McpAsyncClient asyncMcpClient , MockMcpTransport transport ) {
33
-
34
32
// Create mock server response
35
- McpSchema .ServerCapabilities mockServerCapabilities = McpSchema .ServerCapabilities .builder ()
36
- .tools (true )
37
- .resources (true , true ) // Enable both resources and resource templates
38
- .build ();
39
- McpSchema .Implementation mockServerInfo = new McpSchema .Implementation ("test-server" , "1.0.0" );
40
33
McpSchema .InitializeResult mockInitResult = new McpSchema .InitializeResult (McpSchema .LATEST_PROTOCOL_VERSION ,
41
- mockServerCapabilities , mockServerInfo , "Test instructions" );
42
-
43
- Mono <McpSchema .InitializeResult > initMono = asyncMcpClient .initialize ();
44
-
45
- new Thread (new Runnable () {
46
- @ Override
47
- public void run () {
34
+ McpSchema .ServerCapabilities .builder ()
35
+ .tools (true )
36
+ .resources (true , true ) // Enable both resources and resource templates
37
+ .build (),
38
+ new McpSchema .Implementation ("test-server" , "1.0.0" ), "Test instructions" );
39
+
40
+ // Use CountDownLatch to coordinate between threads
41
+ java .util .concurrent .CountDownLatch latch = new java .util .concurrent .CountDownLatch (1 );
42
+
43
+ // Create a Mono that will handle the initialization and response simulation
44
+ return asyncMcpClient .initialize ().doOnSubscribe (subscription -> {
45
+ // Run in a separate reactive context to avoid blocking the main subscription
46
+ Mono .fromRunnable (() -> {
48
47
McpSchema .JSONRPCRequest initRequest = transport .getLastSentMessageAsRequest ();
49
48
assertThat (initRequest .method ()).isEqualTo (McpSchema .METHOD_INITIALIZE );
50
49
51
50
// Send mock server response
52
51
McpSchema .JSONRPCResponse initResponse = new McpSchema .JSONRPCResponse (McpSchema .JSONRPC_VERSION ,
53
52
initRequest .id (), mockInitResult , null );
54
53
transport .simulateIncomingMessage (initResponse );
54
+ latch .countDown ();
55
+ }).subscribeOn (reactor .core .scheduler .Schedulers .boundedElastic ()).subscribe ();
56
+ }).doOnTerminate (() -> {
57
+ try {
58
+ // Wait for the response simulation to complete
59
+ latch .await (5 , java .util .concurrent .TimeUnit .SECONDS );
55
60
}
56
- }).start ();
57
-
58
- return initMono .block ();
61
+ catch (InterruptedException e ) {
62
+ Thread .currentThread ().interrupt ();
63
+ throw new RuntimeException ("Interrupted while waiting for initialization" , e );
64
+ }
65
+ }).block ();
59
66
}
60
67
61
68
@ Test
@@ -75,23 +82,32 @@ void testSuccessfulInitialization() {
75
82
McpSchema .InitializeResult mockInitResult = new McpSchema .InitializeResult (McpSchema .LATEST_PROTOCOL_VERSION ,
76
83
mockServerCapabilities , mockServerInfo , "Test instructions" );
77
84
78
- // Start initialization
79
- Mono < McpSchema . InitializeResult > initMono = asyncMcpClient . initialize ( );
85
+ // Use CountDownLatch to coordinate between threads
86
+ java . util . concurrent . CountDownLatch latch = new java . util . concurrent . CountDownLatch ( 1 );
80
87
81
- new Thread (new Runnable () {
82
- @ Override
83
- public void run () {
88
+ // Start initialization with reactive handling
89
+ InitializeResult result = asyncMcpClient .initialize ().doOnSubscribe (subscription -> {
90
+ // Run in a separate reactive context to avoid blocking the main subscription
91
+ Mono .fromRunnable (() -> {
84
92
McpSchema .JSONRPCRequest initRequest = transport .getLastSentMessageAsRequest ();
85
93
assertThat (initRequest .method ()).isEqualTo (McpSchema .METHOD_INITIALIZE );
86
94
87
95
// Send mock server response
88
96
McpSchema .JSONRPCResponse initResponse = new McpSchema .JSONRPCResponse (McpSchema .JSONRPC_VERSION ,
89
97
initRequest .id (), mockInitResult , null );
90
98
transport .simulateIncomingMessage (initResponse );
99
+ latch .countDown ();
100
+ }).subscribeOn (reactor .core .scheduler .Schedulers .boundedElastic ()).subscribe ();
101
+ }).doOnTerminate (() -> {
102
+ try {
103
+ // Wait for the response simulation to complete
104
+ latch .await (5 , java .util .concurrent .TimeUnit .SECONDS );
91
105
}
92
- }).start ();
93
-
94
- InitializeResult result = initMono .block ();
106
+ catch (InterruptedException e ) {
107
+ Thread .currentThread ().interrupt ();
108
+ throw new RuntimeException ("Interrupted while waiting for initialization" , e );
109
+ }
110
+ }).block ();
95
111
96
112
// Verify initialized notification was sent
97
113
McpSchema .JSONRPCMessage notificationMessage = transport .getLastSentMessage ();
0 commit comments