From 871d2ff091d3f122ab47de215ba3e2edd0858292 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 8 May 2026 23:12:43 +0000 Subject: [PATCH] Guard against null TimeoutsHolder in NettyConnectListener.onSuccess --- .../netty/channel/NettyConnectListener.java | 26 +++- .../channel/NettyConnectListenerTest.java | 131 ++++++++++++++++++ 2 files changed, 151 insertions(+), 6 deletions(-) create mode 100644 client/src/test/java/org/asynchttpclient/netty/channel/NettyConnectListenerTest.java diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java b/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java index 4f5612223e..165e3de4ee 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java @@ -55,9 +55,11 @@ public NettyConnectListener(NettyResponseFuture future, NettyRequestSender re this.connectionSemaphore = connectionSemaphore; } - private boolean futureIsAlreadyCancelled(Channel channel) { - // If Future is cancelled then we will close the channel silently - if (future.isCancelled()) { + private boolean futureIsAlreadyCompleted(Channel channel) { + // Use isDone() (covers cancel + abort + done) rather than isCancelled() alone: + // abort() and done() set isDone but not isCancelled, so a future that has been + // aborted (e.g. by a request timeout) would otherwise slip past this check. + if (future.isDone()) { Channels.silentlyCloseChannel(channel); return true; } @@ -65,7 +67,7 @@ private boolean futureIsAlreadyCancelled(Channel channel) { } private void writeRequest(Channel channel) { - if (futureIsAlreadyCancelled(channel)) { + if (futureIsAlreadyCompleted(channel)) { return; } @@ -87,9 +89,21 @@ public void onSuccess(Channel channel, InetSocketAddress remoteAddress) { final Object partitionKeyLock = (connectionSemaphore != null) ? future.takePartitionKeyLock() : null; Channels.setActiveToken(channel); - TimeoutsHolder timeoutsHolder = future.getTimeoutsHolder(); - if (futureIsAlreadyCancelled(channel)) { + if (futureIsAlreadyCompleted(channel)) { + releaseSemaphoreImmediately(partitionKeyLock); + return; + } + + TimeoutsHolder timeoutsHolder = future.getTimeoutsHolder(); + if (timeoutsHolder == null) { + // The future is being terminated concurrently: cancelTimeouts() has nulled the + // holder but the isDone flag may not yet be visible on this thread. Per JMM, + // observing one volatile write does not require observing later ones, so the + // futureIsAlreadyCompleted check above can pass while the holder is already null. + // Drop this connection rather than NPE-ing on setResolvedRemoteAddress below. + Channels.silentlyCloseChannel(channel); + releaseSemaphoreImmediately(partitionKeyLock); return; } diff --git a/client/src/test/java/org/asynchttpclient/netty/channel/NettyConnectListenerTest.java b/client/src/test/java/org/asynchttpclient/netty/channel/NettyConnectListenerTest.java new file mode 100644 index 0000000000..1b325b1c85 --- /dev/null +++ b/client/src/test/java/org/asynchttpclient/netty/channel/NettyConnectListenerTest.java @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2014-2026 AsyncHttpClient Project. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.asynchttpclient.netty.channel; + +import io.netty.channel.embedded.EmbeddedChannel; +import org.asynchttpclient.AsyncCompletionHandler; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.asynchttpclient.Request; +import org.asynchttpclient.RequestBuilder; +import org.asynchttpclient.Response; +import org.asynchttpclient.channel.ChannelPoolPartitioning; +import org.asynchttpclient.netty.NettyResponseFuture; +import org.asynchttpclient.netty.timeout.TimeoutsHolder; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class NettyConnectListenerTest { + + private static NettyResponseFuture newFuture() { + Request request = new RequestBuilder().setUrl("http://example.com:12345").build(); + return new NettyResponseFuture<>( + request, + new AsyncCompletionHandler() { + @Override + public Object onCompleted(Response response) { + return null; + } + }, + null, + 0, + ChannelPoolPartitioning.PerHostChannelPoolPartitioning.INSTANCE, + null, + null); + } + + /** + * Reproduces the race in issue #2172: a TimeoutsHolder was previously installed + * on the future, but cancelTimeouts() has nulled it out before onSuccess fires + * on the IO event loop. The previous code would NPE on + * timeoutsHolder.setResolvedRemoteAddress(...). With the fix, the listener + * silently closes the freshly-connected channel and returns. + */ + @Test + public void onSuccessShouldNotThrowWhenTimeoutsHolderIsNull() { + NettyResponseFuture future = newFuture(); + TimeoutsHolder holder = new TimeoutsHolder(null, future, null, + new DefaultAsyncHttpClientConfig.Builder().build(), null); + future.setTimeoutsHolder(holder); + // Simulate the race: cancelTimeouts has nulled the holder, but isDone is not + // (yet) observable on this thread. + future.cancelTimeouts(); + + NettyConnectListener listener = new NettyConnectListener<>(future, null, null, null); + EmbeddedChannel channel = new EmbeddedChannel(); + + // Must not throw NPE. + listener.onSuccess(channel, new InetSocketAddress("127.0.0.1", 80)); + + // Listener should have closed the freshly-connected channel. + assertFalse(channel.isOpen(), "channel should be closed when holder is null"); + assertFalse(future.isDone(), + "future state was not modified by cancelTimeouts alone — still not done"); + } + + /** + * When the future has been aborted (e.g. by a request timeout firing while the + * connect was in flight), abort() calls terminateAndExit() which both nulls the + * holder and sets isDone=1. The early-out check must catch this — under the old + * isCancelled()-only check it would have fallen through to the holder NPE since + * abort() does not set isCancelled. + */ + @Test + public void onSuccessShouldExitEarlyWhenFutureWasAborted() { + NettyResponseFuture future = newFuture(); + TimeoutsHolder holder = new TimeoutsHolder(null, future, null, + new DefaultAsyncHttpClientConfig.Builder().build(), null); + future.setTimeoutsHolder(holder); + future.abort(new IOException("request timeout")); + + assertTrue(future.isDone(), "abort() should mark the future done"); + assertFalse(future.isCancelled(), + "abort() must not set isCancelled — that's the whole reason the old check was insufficient"); + + NettyConnectListener listener = new NettyConnectListener<>(future, null, null, null); + EmbeddedChannel channel = new EmbeddedChannel(); + + // Must not throw NPE. + listener.onSuccess(channel, new InetSocketAddress("127.0.0.1", 80)); + + assertFalse(channel.isOpen(), "channel should be closed when future is already done"); + } + + /** + * Cancelling the future also nulls the holder and sets isCancelled=1. + * Mirrors the abort case but via the explicit cancel path; guards against + * future regressions of the early-out for either flag. + */ + @Test + public void onSuccessShouldExitEarlyWhenFutureWasCancelled() { + NettyResponseFuture future = newFuture(); + TimeoutsHolder holder = new TimeoutsHolder(null, future, null, + new DefaultAsyncHttpClientConfig.Builder().build(), null); + future.setTimeoutsHolder(holder); + future.cancel(true); + + NettyConnectListener listener = new NettyConnectListener<>(future, null, null, null); + EmbeddedChannel channel = new EmbeddedChannel(); + + listener.onSuccess(channel, new InetSocketAddress("127.0.0.1", 80)); + + assertFalse(channel.isOpen()); + } +}