Skip to content

Commit c780aae

Browse files
authored
Ensure connection closed when non 200 status for request with Expect header and incoming data not completed (#2919)
Fixes #2910
1 parent cd099a7 commit c780aae

File tree

3 files changed

+120
-4
lines changed

3 files changed

+120
-4
lines changed

reactor-netty-core/src/main/java/reactor/netty/channel/ChannelOperations.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -386,14 +386,27 @@ public final boolean isInboundCancelled() {
386386
}
387387

388388
/**
389-
* Return true if inbound traffic is not incoming or expected anymore
389+
* Return true if inbound traffic is not incoming or expected anymore.
390+
* The buffered data is consumed.
390391
*
391-
* @return true if inbound traffic is not incoming or expected anymore
392+
* @return true if inbound traffic is not incoming or expected anymore.
393+
* The buffered data is consumed
392394
*/
393395
public final boolean isInboundDisposed() {
394396
return inbound.isDisposed();
395397
}
396398

399+
/**
400+
* Return true if inbound traffic is not incoming or expected anymore.
401+
* The buffered data might still not be consumed.
402+
*
403+
* @return true if inbound traffic is not incoming or expected anymore.
404+
* The buffered data might still not be consumed.
405+
*/
406+
protected final boolean isInboundComplete() {
407+
return inbound.inboundDone;
408+
}
409+
397410
/**
398411
* React on inbound {@link Channel#read}
399412
*

reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerOperations.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
113113
final ServerCookieEncoder cookieEncoder;
114114
final ServerCookies cookieHolder;
115115
final HttpServerFormDecoderProvider formDecoderProvider;
116+
final boolean is100ContinueExpected;
116117
final boolean isHttp2;
117118
final BiFunction<? super Mono<Void>, ? super Connection, ? extends Mono<Void>> mapHandle;
118119
final HttpRequest nettyRequest;
@@ -138,6 +139,7 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
138139
this.cookieHolder = replaced.cookieHolder;
139140
this.currentContext = replaced.currentContext;
140141
this.formDecoderProvider = replaced.formDecoderProvider;
142+
this.is100ContinueExpected = replaced.is100ContinueExpected;
141143
this.isHttp2 = replaced.isHttp2;
142144
this.mapHandle = replaced.mapHandle;
143145
this.nettyRequest = replaced.nettyRequest;
@@ -186,6 +188,7 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
186188
this.cookieHolder = ServerCookies.newServerRequestHolder(nettyRequest.headers(), decoder);
187189
this.currentContext = Context.empty();
188190
this.formDecoderProvider = formDecoderProvider;
191+
this.is100ContinueExpected = HttpUtil.is100ContinueExpected(nettyRequest);
189192
this.isHttp2 = isHttp2;
190193
this.mapHandle = mapHandle;
191194
this.nettyRequest = nettyRequest;
@@ -406,7 +409,7 @@ public Flux<?> receiveObject() {
406409
// and discard the traffic or close the connection.
407410
// No need to notify the upstream handlers - just log.
408411
// If decoding a response, just throw an error.
409-
if (HttpUtil.is100ContinueExpected(nettyRequest)) {
412+
if (is100ContinueExpected) {
410413
return FutureMono.deferFuture(() -> {
411414
if (!hasSentHeaders()) {
412415
return channel().writeAndFlush(CONTINUE);
@@ -674,7 +677,12 @@ protected void afterMarkSentHeaders() {
674677

675678
@Override
676679
protected void beforeMarkSentHeaders() {
677-
//noop
680+
if (is100ContinueExpected && !isInboundComplete()) {
681+
int code = status().code();
682+
if (code < 200 || code > 299) {
683+
keepAlive(false);
684+
}
685+
}
678686
}
679687

680688
@Override

reactor-netty-http/src/test/java/reactor/netty/http/HttpProtocolsTests.java

+95
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,14 @@
4646
import org.junit.jupiter.params.ParameterizedTest;
4747
import org.junit.jupiter.params.provider.MethodSource;
4848
import org.mockito.Mockito;
49+
import org.reactivestreams.Publisher;
4950
import org.slf4j.LoggerFactory;
5051
import reactor.core.publisher.Flux;
5152
import reactor.core.publisher.Mono;
5253
import reactor.core.scheduler.Schedulers;
5354
import reactor.netty.BaseHttpTest;
5455
import reactor.netty.ByteBufFlux;
56+
import reactor.netty.ByteBufMono;
5557
import reactor.netty.Connection;
5658
import reactor.netty.LogTracker;
5759
import reactor.netty.NettyPipeline;
@@ -60,6 +62,8 @@
6062
import reactor.netty.http.client.HttpClientResponse;
6163
import reactor.netty.http.server.HttpServer;
6264
import reactor.netty.http.server.HttpServerConfig;
65+
import reactor.netty.http.server.HttpServerRequest;
66+
import reactor.netty.http.server.HttpServerResponse;
6367
import reactor.netty.http.server.logging.AccessLog;
6468
import reactor.netty.resources.ConnectionProvider;
6569
import reactor.test.StepVerifier;
@@ -79,6 +83,8 @@
7983
import java.util.concurrent.TimeUnit;
8084
import java.util.concurrent.atomic.AtomicBoolean;
8185
import java.util.concurrent.atomic.AtomicLong;
86+
import java.util.concurrent.atomic.AtomicReference;
87+
import java.util.function.BiFunction;
8288
import java.util.function.Function;
8389
import java.util.function.Predicate;
8490

@@ -789,6 +795,95 @@ void test100Continue(HttpServer server, HttpClient client) throws Exception {
789795
assertThat(content.getT2()).isEqualTo(200);
790796
}
791797

798+
@ParameterizedCompatibleCombinationsCustomPoolTest
799+
void test100ContinueConnectionClose(HttpServer server, HttpClient client) throws Exception {
800+
doTest100ContinueConnection(server, client,
801+
(req, res) -> res.status(400).sendString(Mono.just("ERROR")),
802+
ByteBufFlux.fromString(Flux.just("1", "2", "3", "4", "5").delaySubscription(Duration.ofMillis(100))),
803+
false);
804+
}
805+
806+
@ParameterizedCompatibleCombinationsCustomPoolTest
807+
void test100ContinueConnectionKeepAlive(HttpServer server, HttpClient client) throws Exception {
808+
doTest100ContinueConnection(server, client,
809+
(req, res) -> res.status(400).sendString(Mono.just("ERROR").delaySubscription(Duration.ofMillis(100))),
810+
ByteBufMono.fromString(Mono.just("12345")),
811+
true);
812+
}
813+
814+
private void doTest100ContinueConnection(
815+
HttpServer server,
816+
HttpClient client,
817+
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> postHandler,
818+
Publisher<ByteBuf> sendBody,
819+
boolean isKeepAlive) throws Exception {
820+
HttpProtocol[] serverProtocols = server.configuration().protocols();
821+
HttpProtocol[] clientProtocols = client.configuration().protocols();
822+
823+
CountDownLatch latch = new CountDownLatch(2);
824+
AtomicReference<List<Channel>> channels = new AtomicReference<>(new ArrayList<>(2));
825+
disposableServer =
826+
server.doOnConnection(conn -> {
827+
channels.get().add(conn.channel());
828+
conn.onTerminate().subscribe(null, t -> latch.countDown(), latch::countDown);
829+
})
830+
.route(r ->
831+
r.post("/post", postHandler)
832+
.get("/get", (req, res) -> res.sendString(Mono.just("OK"))))
833+
.bindNow();
834+
835+
Mono<Tuple2<String, HttpClientResponse>> content1 =
836+
client.port(disposableServer.port())
837+
.headers(h -> h.add(HttpHeaderNames.EXPECT, HttpHeaderValues.CONTINUE))
838+
.post()
839+
.uri("/post")
840+
.send(sendBody)
841+
.responseSingle((res, bytes) -> bytes.asString().zipWith(Mono.just(res)));
842+
843+
Mono<Tuple2<String, HttpClientResponse>> content2 =
844+
client.port(disposableServer.port())
845+
.get()
846+
.uri("/get")
847+
.responseSingle((res, bytes) -> bytes.asString().zipWith(Mono.just(res)));
848+
849+
List<Tuple2<String, HttpClientResponse>> responses =
850+
Flux.concat(content1, content2)
851+
.collectList()
852+
.block(Duration.ofSeconds(5));
853+
854+
assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue();
855+
856+
assertThat(responses).isNotNull();
857+
assertThat(responses.size()).isEqualTo(2);
858+
assertThat(responses.get(0).getT1()).isEqualTo("ERROR");
859+
assertThat(responses.get(0).getT2().status().code()).isEqualTo(400);
860+
assertThat(responses.get(1).getT1()).isEqualTo("OK");
861+
assertThat(responses.get(1).getT2().status().code()).isEqualTo(200);
862+
863+
assertThat(channels.get().size()).isEqualTo(2);
864+
if ((serverProtocols.length == 1 && serverProtocols[0] == HttpProtocol.HTTP11) ||
865+
(clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11)) {
866+
if (isKeepAlive) {
867+
assertThat(channels.get().get(0)).isEqualTo(channels.get().get(1));
868+
869+
assertThat(responses.get(0).getT2().responseHeaders().get(HttpHeaderNames.CONNECTION))
870+
.isNull();
871+
}
872+
else {
873+
assertThat(channels.get()).doesNotHaveDuplicates();
874+
875+
assertThat(responses.get(0).getT2().responseHeaders().get(HttpHeaderNames.CONNECTION))
876+
.isEqualTo(HttpHeaderValues.CLOSE.toString());
877+
}
878+
}
879+
else {
880+
assertThat(channels.get()).doesNotHaveDuplicates();
881+
882+
assertThat(responses.get(0).getT2().responseHeaders().get(HttpHeaderNames.CONNECTION))
883+
.isNull();
884+
}
885+
}
886+
792887
static final class IdleTimeoutTestChannelInboundHandler extends ChannelInboundHandlerAdapter {
793888

794889
final CountDownLatch latch = new CountDownLatch(1);

0 commit comments

Comments
 (0)