Skip to content

Commit b560364

Browse files
committed
feat(util): add WaitUtils for waiting on resource conditions
Add a utility class that provides fabric8-style wait functionality for Kubernetes resources. Features include: - waitUntilReady: wait for resource to pass Readiness check - waitUntilCondition: wait for custom predicate to be satisfied - waitUntilDeleted: wait for resource to be removed - Async variants returning CompletableFuture - Configurable timeout and poll intervals - Integration with GenericKubernetesApi Also adds comprehensive unit tests covering timeout behavior, condition polling, and async operations.
1 parent a4319a4 commit b560364

2 files changed

Lines changed: 631 additions & 0 deletions

File tree

Lines changed: 368 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,368 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.util;
14+
15+
import io.kubernetes.client.common.KubernetesObject;
16+
import io.kubernetes.client.openapi.ApiException;
17+
import io.kubernetes.client.util.generic.GenericKubernetesApi;
18+
import io.kubernetes.client.util.generic.KubernetesApiResponse;
19+
20+
import java.time.Duration;
21+
import java.util.Objects;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.ScheduledExecutorService;
26+
import java.util.concurrent.ScheduledFuture;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.TimeoutException;
29+
import java.util.function.Predicate;
30+
import java.util.function.Supplier;
31+
32+
/**
33+
* Utilities for waiting on Kubernetes resources to reach desired conditions.
34+
* Provides fabric8-style waitUntilReady and waitUntilCondition functionality.
35+
*
36+
* <p>Example usage:
37+
* <pre>{@code
38+
* // Wait for a Pod to be ready
39+
* V1Pod pod = WaitUtils.waitUntilReady(
40+
* () -> coreV1Api.readNamespacedPod("my-pod", "default").execute(),
41+
* Duration.ofMinutes(5),
42+
* Duration.ofSeconds(1)
43+
* );
44+
*
45+
* // Wait for a custom condition
46+
* V1Pod runningPod = WaitUtils.waitUntilCondition(
47+
* () -> coreV1Api.readNamespacedPod("my-pod", "default").execute(),
48+
* pod -> "Running".equals(pod.getStatus().getPhase()),
49+
* Duration.ofMinutes(5),
50+
* Duration.ofSeconds(1)
51+
* );
52+
*
53+
* // Using GenericKubernetesApi
54+
* V1Pod readyPod = WaitUtils.waitUntilReady(
55+
* podApi,
56+
* "default",
57+
* "my-pod",
58+
* Duration.ofMinutes(5)
59+
* );
60+
* }</pre>
61+
*/
62+
public class WaitUtils {
63+
64+
private static final Duration DEFAULT_POLL_INTERVAL = Duration.ofSeconds(1);
65+
66+
private WaitUtils() {
67+
// Utility class
68+
}
69+
70+
/**
71+
* Waits until the resource is ready using the Readiness utility.
72+
*
73+
* @param <T> the resource type
74+
* @param resourceSupplier supplier that fetches the current state of the resource
75+
* @param timeout maximum time to wait
76+
* @param pollInterval time between checks
77+
* @return the ready resource
78+
* @throws TimeoutException if the resource doesn't become ready within the timeout
79+
* @throws InterruptedException if the thread is interrupted
80+
*/
81+
public static <T extends KubernetesObject> T waitUntilReady(
82+
Supplier<T> resourceSupplier,
83+
Duration timeout,
84+
Duration pollInterval) throws TimeoutException, InterruptedException {
85+
return waitUntilCondition(resourceSupplier, Readiness::isReady, timeout, pollInterval);
86+
}
87+
88+
/**
89+
* Waits until the resource is ready using the Readiness utility with default poll interval.
90+
*
91+
* @param <T> the resource type
92+
* @param resourceSupplier supplier that fetches the current state of the resource
93+
* @param timeout maximum time to wait
94+
* @return the ready resource
95+
* @throws TimeoutException if the resource doesn't become ready within the timeout
96+
* @throws InterruptedException if the thread is interrupted
97+
*/
98+
public static <T extends KubernetesObject> T waitUntilReady(
99+
Supplier<T> resourceSupplier,
100+
Duration timeout) throws TimeoutException, InterruptedException {
101+
return waitUntilReady(resourceSupplier, timeout, DEFAULT_POLL_INTERVAL);
102+
}
103+
104+
/**
105+
* Waits until the resource satisfies the given condition.
106+
*
107+
* @param <T> the resource type
108+
* @param resourceSupplier supplier that fetches the current state of the resource
109+
* @param condition predicate that tests if the condition is met
110+
* @param timeout maximum time to wait
111+
* @param pollInterval time between checks
112+
* @return the resource that satisfies the condition
113+
* @throws TimeoutException if the condition is not met within the timeout
114+
* @throws InterruptedException if the thread is interrupted
115+
*/
116+
public static <T extends KubernetesObject> T waitUntilCondition(
117+
Supplier<T> resourceSupplier,
118+
Predicate<T> condition,
119+
Duration timeout,
120+
Duration pollInterval) throws TimeoutException, InterruptedException {
121+
122+
Objects.requireNonNull(resourceSupplier, "resourceSupplier must not be null");
123+
Objects.requireNonNull(condition, "condition must not be null");
124+
Objects.requireNonNull(timeout, "timeout must not be null");
125+
Objects.requireNonNull(pollInterval, "pollInterval must not be null");
126+
127+
CompletableFuture<T> future = new CompletableFuture<>();
128+
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
129+
130+
try {
131+
ScheduledFuture<?> scheduledTask = executor.scheduleAtFixedRate(() -> {
132+
try {
133+
T resource = resourceSupplier.get();
134+
if (resource != null && condition.test(resource)) {
135+
future.complete(resource);
136+
}
137+
} catch (Exception e) {
138+
// Log but don't fail - resource might not exist yet
139+
// We'll keep polling until timeout
140+
}
141+
}, 0, pollInterval.toMillis(), TimeUnit.MILLISECONDS);
142+
143+
try {
144+
return future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
145+
} finally {
146+
scheduledTask.cancel(true);
147+
}
148+
} catch (ExecutionException e) {
149+
throw new RuntimeException("Unexpected error while waiting", e.getCause());
150+
} finally {
151+
executor.shutdownNow();
152+
}
153+
}
154+
155+
/**
156+
* Waits until the resource satisfies the given condition with default poll interval.
157+
*
158+
* @param <T> the resource type
159+
* @param resourceSupplier supplier that fetches the current state of the resource
160+
* @param condition predicate that tests if the condition is met
161+
* @param timeout maximum time to wait
162+
* @return the resource that satisfies the condition
163+
* @throws TimeoutException if the condition is not met within the timeout
164+
* @throws InterruptedException if the thread is interrupted
165+
*/
166+
public static <T extends KubernetesObject> T waitUntilCondition(
167+
Supplier<T> resourceSupplier,
168+
Predicate<T> condition,
169+
Duration timeout) throws TimeoutException, InterruptedException {
170+
return waitUntilCondition(resourceSupplier, condition, timeout, DEFAULT_POLL_INTERVAL);
171+
}
172+
173+
/**
174+
* Waits until the resource is deleted (returns null or throws 404).
175+
*
176+
* @param <T> the resource type
177+
* @param resourceSupplier supplier that fetches the current state of the resource
178+
* @param timeout maximum time to wait
179+
* @param pollInterval time between checks
180+
* @throws TimeoutException if the resource is not deleted within the timeout
181+
* @throws InterruptedException if the thread is interrupted
182+
*/
183+
public static <T extends KubernetesObject> void waitUntilDeleted(
184+
Supplier<T> resourceSupplier,
185+
Duration timeout,
186+
Duration pollInterval) throws TimeoutException, InterruptedException {
187+
188+
Objects.requireNonNull(resourceSupplier, "resourceSupplier must not be null");
189+
Objects.requireNonNull(timeout, "timeout must not be null");
190+
Objects.requireNonNull(pollInterval, "pollInterval must not be null");
191+
192+
CompletableFuture<Void> future = new CompletableFuture<>();
193+
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
194+
195+
try {
196+
ScheduledFuture<?> scheduledTask = executor.scheduleAtFixedRate(() -> {
197+
try {
198+
T resource = resourceSupplier.get();
199+
if (resource == null) {
200+
future.complete(null);
201+
}
202+
} catch (Exception e) {
203+
// Treat any exception as deleted (typically 404)
204+
future.complete(null);
205+
}
206+
}, 0, pollInterval.toMillis(), TimeUnit.MILLISECONDS);
207+
208+
try {
209+
future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
210+
} finally {
211+
scheduledTask.cancel(true);
212+
}
213+
} catch (ExecutionException e) {
214+
throw new RuntimeException("Unexpected error while waiting for deletion", e.getCause());
215+
} finally {
216+
executor.shutdownNow();
217+
}
218+
}
219+
220+
/**
221+
* Waits until the resource is deleted with default poll interval.
222+
*
223+
* @param <T> the resource type
224+
* @param resourceSupplier supplier that fetches the current state of the resource
225+
* @param timeout maximum time to wait
226+
* @throws TimeoutException if the resource is not deleted within the timeout
227+
* @throws InterruptedException if the thread is interrupted
228+
*/
229+
public static <T extends KubernetesObject> void waitUntilDeleted(
230+
Supplier<T> resourceSupplier,
231+
Duration timeout) throws TimeoutException, InterruptedException {
232+
waitUntilDeleted(resourceSupplier, timeout, DEFAULT_POLL_INTERVAL);
233+
}
234+
235+
/**
236+
* Waits until a resource is ready using GenericKubernetesApi.
237+
*
238+
* @param <T> the resource type
239+
* @param <L> the list type
240+
* @param api the GenericKubernetesApi
241+
* @param namespace the namespace (null for cluster-scoped resources)
242+
* @param name the resource name
243+
* @param timeout maximum time to wait
244+
* @return the ready resource
245+
* @throws TimeoutException if the resource doesn't become ready within the timeout
246+
* @throws InterruptedException if the thread is interrupted
247+
*/
248+
public static <T extends KubernetesObject, L extends io.kubernetes.client.common.KubernetesListObject> T waitUntilReady(
249+
GenericKubernetesApi<T, L> api,
250+
String namespace,
251+
String name,
252+
Duration timeout) throws TimeoutException, InterruptedException {
253+
254+
return waitUntilCondition(
255+
() -> {
256+
KubernetesApiResponse<T> response = namespace != null
257+
? api.get(namespace, name)
258+
: api.get(name);
259+
return response.isSuccess() ? response.getObject() : null;
260+
},
261+
Readiness::isReady,
262+
timeout,
263+
DEFAULT_POLL_INTERVAL
264+
);
265+
}
266+
267+
/**
268+
* Waits until a resource satisfies the given condition using GenericKubernetesApi.
269+
*
270+
* @param <T> the resource type
271+
* @param <L> the list type
272+
* @param api the GenericKubernetesApi
273+
* @param namespace the namespace (null for cluster-scoped resources)
274+
* @param name the resource name
275+
* @param condition predicate that tests if the condition is met
276+
* @param timeout maximum time to wait
277+
* @return the resource that satisfies the condition
278+
* @throws TimeoutException if the condition is not met within the timeout
279+
* @throws InterruptedException if the thread is interrupted
280+
*/
281+
public static <T extends KubernetesObject, L extends io.kubernetes.client.common.KubernetesListObject> T waitUntilCondition(
282+
GenericKubernetesApi<T, L> api,
283+
String namespace,
284+
String name,
285+
Predicate<T> condition,
286+
Duration timeout) throws TimeoutException, InterruptedException {
287+
288+
return waitUntilCondition(
289+
() -> {
290+
KubernetesApiResponse<T> response = namespace != null
291+
? api.get(namespace, name)
292+
: api.get(name);
293+
return response.isSuccess() ? response.getObject() : null;
294+
},
295+
condition,
296+
timeout,
297+
DEFAULT_POLL_INTERVAL
298+
);
299+
}
300+
301+
/**
302+
* Waits until a cluster-scoped resource is ready.
303+
*
304+
* @param <T> the resource type
305+
* @param <L> the list type
306+
* @param api the GenericKubernetesApi
307+
* @param name the resource name
308+
* @param timeout maximum time to wait
309+
* @return the ready resource
310+
* @throws TimeoutException if the resource doesn't become ready within the timeout
311+
* @throws InterruptedException if the thread is interrupted
312+
*/
313+
public static <T extends KubernetesObject, L extends io.kubernetes.client.common.KubernetesListObject> T waitUntilReady(
314+
GenericKubernetesApi<T, L> api,
315+
String name,
316+
Duration timeout) throws TimeoutException, InterruptedException {
317+
return waitUntilReady(api, null, name, timeout);
318+
}
319+
320+
/**
321+
* Asynchronously waits until the resource is ready.
322+
*
323+
* @param <T> the resource type
324+
* @param resourceSupplier supplier that fetches the current state of the resource
325+
* @param timeout maximum time to wait
326+
* @param pollInterval time between checks
327+
* @return CompletableFuture that completes with the ready resource
328+
*/
329+
public static <T extends KubernetesObject> CompletableFuture<T> waitUntilReadyAsync(
330+
Supplier<T> resourceSupplier,
331+
Duration timeout,
332+
Duration pollInterval) {
333+
return waitUntilConditionAsync(resourceSupplier, Readiness::isReady, timeout, pollInterval);
334+
}
335+
336+
/**
337+
* Asynchronously waits until the resource satisfies the given condition.
338+
*
339+
* @param <T> the resource type
340+
* @param resourceSupplier supplier that fetches the current state of the resource
341+
* @param condition predicate that tests if the condition is met
342+
* @param timeout maximum time to wait
343+
* @param pollInterval time between checks
344+
* @return CompletableFuture that completes with the resource or exceptionally with TimeoutException
345+
*/
346+
public static <T extends KubernetesObject> CompletableFuture<T> waitUntilConditionAsync(
347+
Supplier<T> resourceSupplier,
348+
Predicate<T> condition,
349+
Duration timeout,
350+
Duration pollInterval) {
351+
352+
CompletableFuture<T> result = new CompletableFuture<>();
353+
354+
CompletableFuture.runAsync(() -> {
355+
try {
356+
T resource = waitUntilCondition(resourceSupplier, condition, timeout, pollInterval);
357+
result.complete(resource);
358+
} catch (TimeoutException e) {
359+
result.completeExceptionally(e);
360+
} catch (InterruptedException e) {
361+
Thread.currentThread().interrupt();
362+
result.completeExceptionally(e);
363+
}
364+
});
365+
366+
return result;
367+
}
368+
}

0 commit comments

Comments
 (0)