Skip to content

Commit 2b0d0be

Browse files
rstoyanchevcesarhernandezgt
authored andcommitted
Ignore invalid STOMP frame
Closes spring-projectsgh-28444
1 parent 4d6dc0c commit 2b0d0be

File tree

3 files changed

+59
-14
lines changed

3 files changed

+59
-14
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java

+23-12
Original file line numberDiff line numberDiff line change
@@ -296,18 +296,29 @@ protected void handleMessageInternal(Message<?> message) {
296296
}
297297
else if (SimpMessageType.CONNECT.equals(messageType)) {
298298
logMessage(message);
299-
long[] clientHeartbeat = SimpMessageHeaderAccessor.getHeartbeat(headers);
300-
long[] serverHeartbeat = getHeartbeatValue();
301-
Principal user = SimpMessageHeaderAccessor.getUser(headers);
302-
this.sessions.put(sessionId, new SessionInfo(sessionId, user, clientHeartbeat, serverHeartbeat));
303-
SimpMessageHeaderAccessor connectAck = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
304-
initHeaders(connectAck);
305-
connectAck.setSessionId(sessionId);
306-
connectAck.setUser(SimpMessageHeaderAccessor.getUser(headers));
307-
connectAck.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);
308-
connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, serverHeartbeat);
309-
Message<byte[]> messageOut = MessageBuilder.createMessage(EMPTY_PAYLOAD, connectAck.getMessageHeaders());
310-
getClientOutboundChannel().send(messageOut);
299+
300+
if (sessionId != null) {
301+
if (this.sessions.get(sessionId) != null) {
302+
if (logger.isWarnEnabled()) {
303+
logger.warn("Ignoring CONNECT in session " + sessionId + ". Already connected.");
304+
}
305+
return;
306+
}
307+
308+
long[] clientHeartbeat = SimpMessageHeaderAccessor.getHeartbeat(headers);
309+
long[] serverHeartbeat = getHeartbeatValue();
310+
Principal user = SimpMessageHeaderAccessor.getUser(headers);
311+
this.sessions.put(sessionId, new SessionInfo(sessionId, user, clientHeartbeat, serverHeartbeat));
312+
SimpMessageHeaderAccessor connectAck = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
313+
initHeaders(connectAck);
314+
connectAck.setSessionId(sessionId);
315+
connectAck.setUser(SimpMessageHeaderAccessor.getUser(headers));
316+
connectAck.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);
317+
connectAck.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, serverHeartbeat);
318+
Message<byte[]> messageOut =
319+
MessageBuilder.createMessage(EMPTY_PAYLOAD, connectAck.getMessageHeaders());
320+
getClientOutboundChannel().send(messageOut);
321+
}
311322
}
312323
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
313324
logMessage(message);

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,13 @@ else if (accessor instanceof SimpMessageHeaderAccessor) {
493493
return;
494494
}
495495

496-
if (StompCommand.CONNECT.equals(command)) {
496+
if (StompCommand.CONNECT.equals(command) || StompCommand.STOMP.equals(command)) {
497+
if (this.connectionHandlers.get(sessionId) != null) {
498+
if (logger.isWarnEnabled()) {
499+
logger.warn("Ignoring CONNECT in session " + sessionId + ". Already connected.");
500+
}
501+
return;
502+
}
497503
if (logger.isDebugEnabled()) {
498504
logger.debug(stompAccessor.getShortLogMessage(EMPTY_PAYLOAD));
499505
}

spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java

+29-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -220,6 +220,34 @@ public void systemSubscription() throws Exception {
220220
assertSame(message, captor.getValue());
221221
}
222222

223+
224+
225+
@Test
226+
public void alreadyConnected() throws Exception{
227+
228+
this.brokerRelay.start();
229+
230+
Message<byte[]> connect = connectMessage("sess1", "joe");
231+
this.brokerRelay.handleMessage(connect);
232+
233+
assertEquals(2, this.tcpClient.getSentMessages().size());
234+
235+
StompHeaderAccessor headers1 = this.tcpClient.getSentHeaders(0);
236+
assertEquals(StompCommand.CONNECT, headers1.getCommand());
237+
assertEquals(StompBrokerRelayMessageHandler.SYSTEM_SESSION_ID, headers1.getSessionId());
238+
239+
240+
StompHeaderAccessor headers2 = this.tcpClient.getSentHeaders(1);
241+
assertEquals(StompCommand.CONNECT, headers2.getCommand());
242+
assertEquals("sess1", headers2.getSessionId());
243+
244+
this.brokerRelay.handleMessage(connect);
245+
246+
assertEquals(2, this.tcpClient.getSentMessages().size());
247+
assertTrue(this.outboundChannel.getMessages().isEmpty());
248+
}
249+
250+
223251
private Message<byte[]> connectMessage(String sessionId, String user) {
224252
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
225253
headers.setSessionId(sessionId);

0 commit comments

Comments
 (0)