@@ -66,7 +66,10 @@ public class RegistryBasedResourceClaim {
6666 this .registryEntry = registryEntry ;
6767 this .connectToEtcd = connectToEtcd ;
6868
69- logger .info ("Acquiring resource-claim…" );
69+ Duration timeout = acquisitionTimeout == null
70+ ? Duration .ofMinutes (5 )
71+ : acquisitionTimeout ;
72+ logger .info ("Acquiring resource-claim; timeout is set to {}." , timeout );
7073
7174 Client etcd = connectToEtcd .get ();
7275
@@ -78,22 +81,23 @@ public class RegistryBasedResourceClaim {
7881
7982 List <Integer > clusterIds = ClusterID .get (etcd );
8083
81- Duration timeout = acquisitionTimeout == null
82- ? Duration .ofMinutes (5 )
83- : acquisitionTimeout ;
84+
8485 Instant giveUpAfter = Instant .now ().plus (timeout );
86+ long timeoutSeconds = timeout .getSeconds ();
8587
8688 this .poolSize = maxGeneratorCount ;
8789
88- ResourcePair resourcePair = null ;
90+ ResourcePair resourcePair ;
91+ LockResponse lock ;
92+ long leaseId ;
8993 try {
90- logger .debug ("Acquiring lock, timeout is set to {}." , timeout );
94+ logger .debug ("Acquiring lock." );
9195 // Have the lease TTL just a bit after our timeout.
92- LeaseGrantResponse lease = etcd .getLeaseClient ().grant (timeout .plusSeconds (5 ).getSeconds ()).get ();
93- long leaseId = lease .getID ();
96+ LeaseGrantResponse lease = etcd .getLeaseClient ().grant (timeoutSeconds + 5 ).get (timeoutSeconds , TimeUnit .SECONDS );
97+ leaseId = lease .getID ();
98+ logger .debug ("Got lease {}." , leaseId );
9499
95100 // Acquire the lock. This makes sure we are the only process claiming a resource.
96- LockResponse lock ;
97101 try {
98102 lock = etcd .getLockClient ()
99103 .lock (LOCK_NAME , leaseId )
@@ -102,39 +106,44 @@ public class RegistryBasedResourceClaim {
102106 throw new IOException ("Process timed out." );
103107 }
104108
105- // Keep the lease alive for another period in order to safely finish claiming the resource.
106- etcd .getLeaseClient ().keepAliveOnce (leaseId ).get ();
107-
108109 if (logger .isDebugEnabled ()) {
109110 logger .debug ("Acquired lock: {}." , lock .getKey ().toString (StandardCharsets .UTF_8 ));
110111 }
111112
113+ // Keep the lease alive for another period in order to safely finish claiming the resource.
114+ etcd .getLeaseClient ().keepAliveOnce (leaseId ).get (timeoutSeconds , TimeUnit .SECONDS );
115+
116+ logger .debug ("Lease renewed." );
117+
112118 resourcePair = claimResource (
113119 etcd , maxGeneratorCount , clusterIds , giveUpAfter , waitWhenNoResourcesAvailable
114120 );
115121 this .clusterId = resourcePair .clusterId ;
116122 this .generatorId = resourcePair .generatorId ;
123+ } catch (TimeoutException | ExecutionException e ) {
124+ throw new IOException (e );
125+ } catch (InterruptedException e ) {
126+ Thread .currentThread ().interrupt ();
127+ throw new IOException (e );
128+ }
117129
130+ try {
118131 // Explicitly release the lock. If this line is not reached due to exceptions raised, the lock will
119132 // automatically be removed when the lease holding it expires.
120- etcd .getLockClient ().unlock (lock .getKey ()).get ();
133+ etcd .getLockClient ().unlock (lock .getKey ()).get (timeoutSeconds , TimeUnit . SECONDS );
121134 if (logger .isDebugEnabled ()) {
122135 logger .debug ("Released lock: {}." , lock .getKey ().toString (StandardCharsets .UTF_8 ));
123136 }
124137
125138 // Revoke the lease instead of letting it time out.
126- etcd .getLeaseClient ().revoke (leaseId ).get ();
127- } catch (ExecutionException e ) {
128- if ( resourcePair != null ) {
129- close ();
130- }
131- throw new IOException ( e );
139+ etcd .getLeaseClient ().revoke (leaseId ).get (timeoutSeconds , TimeUnit . SECONDS );
140+ } catch (TimeoutException | ExecutionException e ) {
141+ logger . warn (
142+ "Failed to release lock {} (will be released automatically by Etcd server). Resource-claims was successfully acquired though." ,
143+ lock . getKey (). toString ( StandardCharsets . UTF_8 )
144+ );
132145 } catch (InterruptedException e ) {
133- if (resourcePair != null ) {
134- close ();
135- }
136146 Thread .currentThread ().interrupt ();
137- throw new IOException (e );
138147 }
139148
140149 logger .debug ("Resource-claim acquired ({}/{})." , clusterId , generatorId );
@@ -276,10 +285,10 @@ private void relinquishResource() {
276285 ByteSequence resourcePath = ByteSequence .from (resourcePathString , StandardCharsets .UTF_8 );
277286
278287 try {
279- kvClient .delete (resourcePath ).get ();
288+ kvClient .delete (resourcePath ).get (5 , TimeUnit . SECONDS );
280289 } catch (InterruptedException e ) {
281290 Thread .currentThread ().interrupt ();
282- } catch (ExecutionException e ) {
291+ } catch (ExecutionException | TimeoutException e ) {
283292 logger .error ("Failed to revoke Etcd lease." , e );
284293 }
285294 }
0 commit comments