Skip to content

Commit fc3dd5e

Browse files
committed
Initialize KV/Watcher client during claim
If we instantiate these Etcd sub-clients during application shutdown, this will fail because of the registering of a shutdown-hook further down the stack. So for the `close()` of the resource-claims the relevant sub-client is initialized when the claim is acquired.
1 parent 180d137 commit fc3dd5e

2 files changed

Lines changed: 27 additions & 17 deletions

File tree

uniqueid-etcd/src/main/java/org/lable/oss/uniqueid/etcd/RegistryBasedResourceClaim.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@
1515
*/
1616
package org.lable.oss.uniqueid.etcd;
1717

18-
import io.etcd.jetcd.ByteSequence;
19-
import io.etcd.jetcd.Client;
20-
import io.etcd.jetcd.KeyValue;
21-
import io.etcd.jetcd.Watch;
18+
import io.etcd.jetcd.*;
2219
import io.etcd.jetcd.kv.GetResponse;
2320
import io.etcd.jetcd.kv.TxnResponse;
2421
import io.etcd.jetcd.lease.LeaseGrantResponse;
@@ -58,6 +55,7 @@ public class RegistryBasedResourceClaim {
5855
final int generatorId;
5956

6057
final int poolSize;
58+
final KV kvClient;
6159

6260
RegistryBasedResourceClaim(Supplier<Client> connectToEtcd,
6361
int maxGeneratorCount,
@@ -72,6 +70,12 @@ public class RegistryBasedResourceClaim {
7270

7371
Client etcd = connectToEtcd.get();
7472

73+
// Keep the KV client around, because if we try to instantiate it during shutdown of a Java application when
74+
// the resource is likely to be released, it will fail because further down the stack an attempt is made to
75+
// register a shutdown handler, which fails because the application is already shutting down. So we instantiate
76+
// this here and keep it.
77+
kvClient = etcd.getKVClient();
78+
7579
List<Integer> clusterIds = ClusterID.get(etcd);
7680

7781
Duration timeout = acquisitionTimeout == null
@@ -268,12 +272,11 @@ private void awaitLatchUnlessItTakesTooLong(CountDownLatch latch, Instant giveUp
268272
private void relinquishResource() {
269273
logger.debug("Relinquishing claimed registry resource {}:{}.", clusterId, generatorId);
270274

271-
Client etcd = connectToEtcd.get();
272275
String resourcePathString = resourceKey(clusterId, generatorId);
273276
ByteSequence resourcePath = ByteSequence.from(resourcePathString, StandardCharsets.UTF_8);
274277

275278
try {
276-
etcd.getKVClient().delete(resourcePath).get();
279+
kvClient.delete(resourcePath).get();
277280
} catch (InterruptedException e) {
278281
Thread.currentThread().interrupt();
279282
} catch (ExecutionException e) {

uniqueid-etcd/src/main/java/org/lable/oss/uniqueid/etcd/ResourceClaim.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@
1515
*/
1616
package org.lable.oss.uniqueid.etcd;
1717

18-
import io.etcd.jetcd.ByteSequence;
19-
import io.etcd.jetcd.Client;
20-
import io.etcd.jetcd.KeyValue;
21-
import io.etcd.jetcd.Watch;
18+
import io.etcd.jetcd.*;
2219
import io.etcd.jetcd.kv.GetResponse;
2320
import io.etcd.jetcd.kv.TxnResponse;
2421
import io.etcd.jetcd.lease.LeaseGrantResponse;
@@ -66,6 +63,7 @@ public class ResourceClaim implements Closeable {
6663

6764
final int poolSize;
6865
final Client etcd;
66+
final Lease leaseClient;
6967

7068
long leaseId;
7169

@@ -85,6 +83,15 @@ public class ResourceClaim implements Closeable {
8583

8684
this.poolSize = maxGeneratorCount;
8785
this.etcd = etcd;
86+
87+
// Keep the lease client around, because if we try to instantiate it during shutdown of a Java application when
88+
// the resource is likely to be released, it will fail because further down the stack an attempt is made to
89+
// register a shutdown handler, which fails because the application is already shutting down. So we instantiate
90+
// this here and keep it.
91+
this.leaseClient = etcd.getLeaseClient();
92+
93+
KV kvClient = etcd.getKVClient();
94+
8895
try {
8996
LeaseGrantResponse lease = etcd.getLeaseClient().grant(5).get();
9097
leaseId = lease.getID();
@@ -110,7 +117,7 @@ public class ResourceClaim implements Closeable {
110117
logger.debug("Acquired lock: {}.", lock.getKey().toString(StandardCharsets.UTF_8));
111118
}
112119

113-
ResourcePair resourcePair = claimResource(etcd, maxGeneratorCount, clusterIds, giveUpAfter);
120+
ResourcePair resourcePair = claimResource(kvClient, maxGeneratorCount, clusterIds, giveUpAfter);
114121
this.clusterId = resourcePair.clusterId;
115122
this.generatorId = resourcePair.generatorId;
116123

@@ -230,13 +237,13 @@ public void run() {
230237
/**
231238
* Try to claim an available resource from the resource pool.
232239
*
233-
* @param etcd Etcd connection.
240+
* @param kvClient Etcd KV client.
234241
* @param maxGeneratorCount Maximum number of generators possible.
235242
* @param clusterIds Cluster Ids available to use.
236243
* @param giveUpAfter Give up after this instant in time.
237244
* @return The claimed resource.
238245
*/
239-
ResourcePair claimResource(Client etcd, int maxGeneratorCount, List<Integer> clusterIds, Instant giveUpAfter)
246+
ResourcePair claimResource(KV kvClient, int maxGeneratorCount, List<Integer> clusterIds, Instant giveUpAfter)
240247
throws InterruptedException, IOException, ExecutionException {
241248

242249
logger.debug("Trying to claim a resource.");
@@ -247,7 +254,7 @@ ResourcePair claimResource(Client etcd, int maxGeneratorCount, List<Integer> clu
247254
.withKeysOnly(true)
248255
.withRange(OptionsUtil.prefixEndOf(POOL_KEY))
249256
.build();
250-
GetResponse get = etcd.getKVClient().get(POOL_KEY, getOptions).get();
257+
GetResponse get = kvClient.get(POOL_KEY, getOptions).get();
251258

252259
List<ByteSequence> claimedResources = get.getKvs().stream()
253260
.map(KeyValue::getKey)
@@ -267,7 +274,7 @@ ResourcePair claimResource(Client etcd, int maxGeneratorCount, List<Integer> clu
267274
);
268275
awaitLatchUnlessItTakesTooLong(latch, giveUpAfter);
269276
watcher.close();
270-
return claimResource(etcd, maxGeneratorCount, clusterIds, giveUpAfter);
277+
return claimResource(kvClient, maxGeneratorCount, clusterIds, giveUpAfter);
271278
}
272279

273280
// Try to claim an available resource.
@@ -310,7 +317,7 @@ ResourcePair claimResource(Client etcd, int maxGeneratorCount, List<Integer> clu
310317
}
311318
}
312319

313-
return claimResource(etcd, maxGeneratorCount, clusterIds, giveUpAfter);
320+
return claimResource(kvClient, maxGeneratorCount, clusterIds, giveUpAfter);
314321
}
315322

316323
/**
@@ -319,7 +326,7 @@ ResourcePair claimResource(Client etcd, int maxGeneratorCount, List<Integer> clu
319326
private void relinquishResource() {
320327
logger.debug("Relinquishing claimed resource {}:{}.", clusterId, generatorId);
321328
try {
322-
etcd.getLeaseClient().revoke(leaseId).get();
329+
leaseClient.revoke(leaseId).get();
323330
} catch (InterruptedException e) {
324331
Thread.currentThread().interrupt();
325332
} catch (ExecutionException e) {

0 commit comments

Comments
 (0)