3838import io .grpc .MethodDescriptor ;
3939import io .grpc .Status ;
4040import io .grpc .StatusRuntimeException ;
41+ import java .time .Duration ;
4142import java .util .List ;
4243import java .util .Optional ;
44+ import java .util .Random ;
4345import java .util .concurrent .ExecutionException ;
4446import java .util .concurrent .ScheduledExecutorService ;
4547import java .util .concurrent .ScheduledFuture ;
5153
5254/**
5355 * Decorator for a Bigtable data plane connection to add channel warming via PingAndWarm. Channel
54- * warming will happen on creation and then every 3 minutes.
56+ * warming will happen on creation and then every 3 minutes (with jitter) .
5557 */
5658public class DataChannel extends ManagedChannel {
5759 private static final Logger LOGGER = LoggerFactory .getLogger (DataChannel .class );
5860
61+ private static final Duration WARM_PERIOD = Duration .ofMinutes (3 );
62+ private static final Duration MAX_JITTER = Duration .ofSeconds (10 );
63+
64+ private final Random random = new Random ();
5965 private final ManagedChannel inner ;
6066 private final Metrics metrics ;
6167 private final ResourceCollector resourceCollector ;
6268 private final CallCredentials callCredentials ;
63- private final ScheduledFuture <?> antiIdleTask ;
69+ private final ScheduledExecutorService warmingExecutor ;
70+ private volatile ScheduledFuture <?> antiIdleTask ;
6471
6572 private final AtomicBoolean closed = new AtomicBoolean ();
73+ private final Object scheduleLock = new Object ();
6674
6775 public DataChannel (
6876 ResourceCollector resourceCollector ,
@@ -83,6 +91,8 @@ public DataChannel(
8391 .keepAliveTime (30 , TimeUnit .SECONDS )
8492 .keepAliveTimeout (10 , TimeUnit .SECONDS )
8593 .build ();
94+
95+ this .warmingExecutor = warmingExecutor ;
8696 this .metrics = metrics ;
8797
8898 try {
@@ -96,16 +106,30 @@ public DataChannel(
96106 throw e ;
97107 }
98108
99- antiIdleTask = warmingExecutor .scheduleAtFixedRate (this ::warmQuietly , 3 , 3 , TimeUnit .MINUTES );
109+ antiIdleTask =
110+ warmingExecutor .schedule (this ::warmTask , nextWarmup ().toMillis (), TimeUnit .MILLISECONDS );
100111 metrics .updateChannelCount (1 );
101112 }
102113
103- private void warmQuietly () {
114+ private Duration nextWarmup () {
115+ return WARM_PERIOD .minus (
116+ Duration .ofMillis ((long ) (MAX_JITTER .toMillis () * random .nextDouble ())));
117+ }
118+
119+ private void warmTask () {
104120 try {
105121 warm ();
106122 } catch (RuntimeException e ) {
107123 LOGGER .warn ("anti idle ping failed, forcing reconnect" , e );
108124 inner .enterIdle ();
125+ } finally {
126+ synchronized (scheduleLock ) {
127+ if (!closed .get ()) {
128+ antiIdleTask =
129+ warmingExecutor .schedule (
130+ this ::warmTask , nextWarmup ().toMillis (), TimeUnit .MILLISECONDS );
131+ }
132+ }
109133 }
110134 }
111135
@@ -204,10 +228,16 @@ public void onClose(Status status, Metadata trailers) {
204228
205229 @ Override
206230 public ManagedChannel shutdown () {
207- if (closed .compareAndSet (false , true )) {
231+ final boolean closing ;
232+
233+ synchronized (scheduleLock ) {
234+ closing = closed .compareAndSet (false , true );
235+ antiIdleTask .cancel (true );
236+ }
237+ if (closing ) {
208238 metrics .updateChannelCount (-1 );
209239 }
210- antiIdleTask . cancel ( true );
240+
211241 return inner .shutdown ();
212242 }
213243
@@ -223,10 +253,17 @@ public boolean isTerminated() {
223253
224254 @ Override
225255 public ManagedChannel shutdownNow () {
226- if (closed .compareAndSet (false , true )) {
256+ final boolean closing ;
257+
258+ synchronized (scheduleLock ) {
259+ closing = closed .compareAndSet (false , true );
260+ antiIdleTask .cancel (true );
261+ }
262+
263+ if (closing ) {
227264 metrics .updateChannelCount (-1 );
228265 }
229- antiIdleTask . cancel ( true );
266+
230267 return inner .shutdownNow ();
231268 }
232269
0 commit comments