-
Notifications
You must be signed in to change notification settings - Fork 3.8k
fix: Terminate connection on closing the stream from the DirectDruidClient #19607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -181,6 +181,9 @@ public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext conte | |
| private final AtomicLong channelSuspendedTime = new AtomicLong(0); | ||
| private final BlockingQueue<InputStreamHolder> queue = new LinkedBlockingQueue<>(); | ||
| private final AtomicBoolean done = new AtomicBoolean(false); | ||
| // Set when the consumer closes the result stream early (see the SequenceInputStream.close() override in | ||
| // handleResponse). Once set, incoming chunks are dropped rather than buffered. | ||
| private final AtomicBoolean discard = new AtomicBoolean(false); | ||
| private final AtomicBoolean nodeMetricsEmitted = new AtomicBoolean(false); | ||
| private final AtomicReference<String> fail = new AtomicReference<>(); | ||
| private final AtomicReference<TrafficCop> trafficCopRef = new AtomicReference<>(); | ||
|
|
@@ -202,6 +205,12 @@ private QueryMetrics<? super Query<T>> acquireResponseMetrics() | |
| */ | ||
| private boolean enqueue(ChannelBuffer buffer, long chunkNum) throws InterruptedException | ||
| { | ||
| // If the consumer has abandoned the response (see the SequenceInputStream.close() override below), drop the | ||
| // chunk instead of buffering it, and keep reads flowing (continueReading = true) so we never suspend the | ||
| // channel while it is being wound down. | ||
| if (discard.get()) { | ||
| return true; | ||
| } | ||
| // Increment queuedByteCount before queueing the object, so queuedByteCount is at least as high as | ||
| // the actual number of queued bytes at any particular time. | ||
| final InputStreamHolder holder = InputStreamHolder.fromChannelBuffer(buffer, chunkNum); | ||
|
|
@@ -282,6 +291,12 @@ public int read() throws IOException | |
| @Override | ||
| public boolean hasMoreElements() | ||
| { | ||
| // If the consumer abandoned the stream (close() ran), report end-of-stream. After discard is set | ||
| // enqueue() drops every chunk, so a further read would otherwise block in dequeue() until the | ||
| // query timeout and then throw a misleading QueryTimeoutException. | ||
| if (discard.get()) { | ||
| return false; | ||
| } | ||
| if (fail.get() != null) { | ||
| throw new RE(fail.get()); | ||
| } | ||
|
|
@@ -310,7 +325,33 @@ public InputStream nextElement() | |
| } | ||
| } | ||
| } | ||
| ), | ||
| ) | ||
| { | ||
| /** | ||
| * Closing this stream means the caller no longer needs the response. The default | ||
| * {@link SequenceInputStream#close()} would drain the entire remaining response off the wire first | ||
| * we want to avoid. Instead, abandon the response and force-close the connection | ||
| */ | ||
| @Override | ||
| public void close() | ||
| { | ||
| final TrafficCop trafficCop; | ||
| synchronized (done) { | ||
| if (done.get()) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [P2] Early close still needs to discard completed buffered response When the server has already finished sending the response, |
||
| return; | ||
| } | ||
| // Stop buffering further chunks (see enqueue()) and drop anything already buffered so the | ||
| // underlying Netty ChannelBuffers can be released. | ||
| discard.set(true); | ||
| queue.clear(); | ||
| trafficCop = trafficCopRef.get(); | ||
| } | ||
| if (trafficCop == null) { | ||
| return; | ||
| } | ||
| trafficCop.abort(); | ||
| } | ||
| }, | ||
| continueReading | ||
| ); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,184 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.druid.client; | ||
|
|
||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||
| import com.google.common.collect.ImmutableMap; | ||
| import org.apache.druid.common.utils.SocketUtil; | ||
| import org.apache.druid.error.DruidException; | ||
| import org.apache.druid.java.util.common.DateTimes; | ||
| import org.apache.druid.java.util.common.StringUtils; | ||
| import org.apache.druid.java.util.common.concurrent.Execs; | ||
| import org.apache.druid.java.util.common.guava.Sequence; | ||
| import org.apache.druid.java.util.common.guava.Yielder; | ||
| import org.apache.druid.java.util.common.guava.Yielders; | ||
| import org.apache.druid.java.util.common.io.Closer; | ||
| import org.apache.druid.java.util.common.lifecycle.Lifecycle; | ||
| import org.apache.druid.java.util.http.client.HttpClientConfig; | ||
| import org.apache.druid.java.util.http.client.HttpClientInit; | ||
| import org.apache.druid.query.BaseQuery; | ||
| import org.apache.druid.query.Druids; | ||
| import org.apache.druid.query.QueryPlus; | ||
| import org.apache.druid.query.QueryRunnerFactoryConglomerate; | ||
| import org.apache.druid.query.QueryRunnerTestHelper; | ||
| import org.apache.druid.query.Result; | ||
| import org.apache.druid.segment.TestHelper; | ||
| import org.apache.druid.server.QueryStackTests; | ||
| import org.apache.druid.server.metrics.NoopServiceEmitter; | ||
| import org.eclipse.jetty.ee8.servlet.ServletContextHandler; | ||
| import org.eclipse.jetty.ee8.servlet.ServletHolder; | ||
| import org.eclipse.jetty.server.Server; | ||
| import org.junit.jupiter.api.Assertions; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| import javax.servlet.ServletOutputStream; | ||
| import javax.servlet.http.HttpServlet; | ||
| import javax.servlet.http.HttpServletRequest; | ||
| import javax.servlet.http.HttpServletResponse; | ||
| import java.io.IOException; | ||
| import java.util.Map; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
|
|
||
| public class DirectDruidClientAbortHttpTest | ||
| { | ||
| @Test | ||
| public void testEarlyStreamClose() throws Exception | ||
| { | ||
| final ObjectMapper mapper = TestHelper.makeJsonMapper(); | ||
|
|
||
| final AtomicBoolean serverSawDisconnect = new AtomicBoolean(false); | ||
| final CountDownLatch responseSent = new CountDownLatch(1); | ||
| final CountDownLatch connectionTerminated = new CountDownLatch(1); | ||
| final CountDownLatch terminationDetected = new CountDownLatch(1); | ||
|
|
||
| final int port = SocketUtil.findOpenPort(0); | ||
| final Server server = new Server(port); | ||
| final ServletContextHandler handler = new ServletContextHandler(); | ||
| handler.addServlet( | ||
| new ServletHolder(new HttpServlet() | ||
| { | ||
| @Override | ||
| protected void doPost(HttpServletRequest req, HttpServletResponse resp) | ||
| { | ||
| final ServletOutputStream out; | ||
| try { | ||
| out = resp.getOutputStream(); | ||
| resp.setStatus(HttpServletResponse.SC_OK); | ||
| out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\", \"result\": 42.0}]")); | ||
| out.flush(); | ||
| } | ||
| catch (Exception e) { | ||
| throw DruidException.defensive(e, "Encountered some issue while sending the real bytes"); | ||
| } | ||
|
|
||
| responseSent.countDown(); | ||
|
|
||
| try { | ||
| connectionTerminated.await(); | ||
| final long endTime = System.currentTimeMillis() + 100; | ||
| while (System.currentTimeMillis() < endTime) { | ||
| out.write(0); | ||
| out.flush(); | ||
| } | ||
| } | ||
| catch (IOException e) { | ||
| serverSawDisconnect.set(true); | ||
| terminationDetected.countDown(); | ||
| } | ||
| catch (Exception e) { | ||
| throw DruidException.defensive( | ||
| e, | ||
| "Encountered some issue while awaiting for the connection to be terminated" | ||
| ); | ||
| } | ||
| } | ||
| }), | ||
| "/*" | ||
| ); | ||
| server.setHandler(handler); | ||
|
|
||
| final Lifecycle lifecycle = new Lifecycle(); | ||
| final ScheduledExecutorService queryCancellationExecutor = | ||
| Execs.scheduledSingleThreaded("DirectDruidClientAbortHttpTest-cancel-%d"); | ||
| final Closer closer = Closer.create(); | ||
| try { | ||
| server.start(); | ||
| lifecycle.start(); | ||
|
|
||
| final QueryRunnerFactoryConglomerate conglomerate = | ||
| QueryStackTests.createQueryRunnerFactoryConglomerate(closer); | ||
|
|
||
| final DirectDruidClient directDruidClient = new DirectDruidClient( | ||
| conglomerate, | ||
| QueryRunnerTestHelper.NOOP_QUERYWATCHER, | ||
| mapper, | ||
| HttpClientInit.createClient( | ||
| HttpClientConfig.builder().withNumConnections(1).build(), | ||
| lifecycle | ||
| ), | ||
| "http", | ||
| "localhost:" + port, | ||
| new NoopServiceEmitter(), | ||
| queryCancellationExecutor | ||
| ); | ||
|
|
||
| final Map<String, Object> queryContext = ImmutableMap.of( | ||
| DirectDruidClient.QUERY_FAIL_TIME, System.currentTimeMillis() + 60_000L, | ||
| BaseQuery.QUERY_ID, "abort-test" | ||
| ); | ||
|
|
||
| final Sequence results = directDruidClient.run( | ||
| QueryPlus.wrap(Druids.newTimeBoundaryQueryBuilder().dataSource("test").context(queryContext).build()), | ||
| DirectDruidClient.makeResponseContextForQuery() | ||
| ); | ||
|
|
||
| responseSent.await(); | ||
|
|
||
| final AtomicInteger resultCount = new AtomicInteger(0); | ||
| final Yielder yielder = Yielders.each(results); | ||
| try { | ||
| Assertions.assertFalse(yielder.isDone(), "expected at least one result before stopping"); | ||
| final Result result = (Result) yielder.get(); | ||
| Assertions.assertEquals(DateTimes.of("2014-01-01T01:02:03Z"), result.getTimestamp()); | ||
| resultCount.incrementAndGet(); | ||
| } | ||
| finally { | ||
| connectionTerminated.countDown(); | ||
| yielder.close(); | ||
| } | ||
|
|
||
| Assertions.assertEquals(1, resultCount.get(), "expected exactly one result before stopping"); | ||
| if (!terminationDetected.await(5, TimeUnit.SECONDS)) { | ||
| Assertions.fail("Test did not complete in 5 seconds!?"); | ||
| } | ||
| Assertions.assertTrue(serverSawDisconnect.get(), "server should have marked the connection as disconnected"); | ||
| } | ||
| finally { | ||
| queryCancellationExecutor.shutdownNow(); | ||
| closer.close(); | ||
| lifecycle.stop(); | ||
| server.stop(); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[P2] Keep TrafficCop source and binary compatible
Adding
abort()as a second abstract method makesTrafficCopno longer a functional interface and forces every out-of-tree mock or customHttpClientimplementation to add the method, with existing compiled implementations at risk ofAbstractMethodErrorif this hook is invoked. Since only the Netty implementation needs special behavior, makeabort()a default no-op and let Netty override it; that preserves existing lambda/test implementations while still enabling this PR's connection-abort path.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want to add a default no-op for this modification. The callers must explicitly define what close() should do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for clarifying. I agree Netty should define the actual close behavior explicitly, and it does here. My concern is narrower: TrafficCop is a public nested interface that was previously SAM-compatible, so adding abort() as a required abstract method breaks source compatibility for lambdas/custom HttpClient fakes and can still throw AbstractMethodError for compiled implementations if this path is called. A default no-op does not stop Netty from overriding abort(); it just preserves the existing contract for implementations that cannot abort. If the intent is to require all implementations to make an explicit choice, I think this should be treated as an intentional API break and reflected accordingly. Reviewed 7 of 7 changed files.