1717 */
1818package org .apache .beam .runners .dataflow .worker .windmill .work .processing ;
1919
20- import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions .checkState ;
21-
2220import com .google .auto .value .AutoValue ;
2321import java .util .ArrayList ;
24- import java .util .Comparator ;
2522import java .util .HashMap ;
2623import java .util .List ;
2724import java .util .Map ;
28- import java .util .PriorityQueue ;
25+ import java .util .concurrent .ScheduledExecutorService ;
26+ import java .util .concurrent .ScheduledFuture ;
2927import java .util .concurrent .TimeUnit ;
30- import java .util .concurrent .locks .Condition ;
3128import java .util .concurrent .locks .ReentrantLock ;
3229import javax .annotation .Nullable ;
3330import javax .annotation .concurrent .GuardedBy ;
@@ -56,99 +53,82 @@ public abstract static class FinalizationInfo {
5653
5754 public abstract Runnable getCallback ();
5855
59- public static FinalizationInfo create (Long id , Instant expiryTime , Runnable callback ) {
60- return new AutoValue_StreamingCommitFinalizer_FinalizationInfo (id , expiryTime , callback );
56+ public abstract ScheduledFuture <?> getCleanupFuture ();
57+
58+ public static FinalizationInfo create (
59+ Long id , Instant expiryTime , Runnable callback , ScheduledFuture <?> cleanupFuture ) {
60+ return new AutoValue_StreamingCommitFinalizer_FinalizationInfo (
61+ id , expiryTime , callback , cleanupFuture );
6162 }
6263 }
6364
6465 private final ReentrantLock lock = new ReentrantLock ();
65- private final Condition queueMinChanged = lock .newCondition ();
6666
6767 @ GuardedBy ("lock" )
6868 private final HashMap <Long , FinalizationInfo > commitFinalizationCallbacks = new HashMap <>();
6969
70- @ GuardedBy ("lock" )
71- private final PriorityQueue <FinalizationInfo > cleanUpQueue =
72- new PriorityQueue <>(11 , Comparator .comparing (FinalizationInfo ::getExpiryTime ));
73-
74- @ GuardedBy ("lock" )
75- private boolean cleanUpThreadStarted = false ;
76-
7770 private final BoundedQueueExecutor finalizationExecutor ;
7871
79- private StreamingCommitFinalizer (BoundedQueueExecutor finalizationCleanupExecutor ) {
80- finalizationExecutor = finalizationCleanupExecutor ;
81- }
72+ // The cleanup threads run in their own Executor, so they don't block processing.
73+ private final ScheduledExecutorService cleanupExecutor ;
8274
83- private void cleanupThreadBody () {
84- lock .lock ();
85- try {
86- while (true ) {
87- final @ Nullable FinalizationInfo minValue = cleanUpQueue .peek ();
88- if (minValue == null ) {
89- // Wait for an element to be added and loop to re-examine the min.
90- queueMinChanged .await ();
91- continue ;
92- }
93-
94- Instant now = Instant .now ();
95- Duration timeDifference = new Duration (now , minValue .getExpiryTime ());
96- if (timeDifference .getMillis () < 0
97- || (queueMinChanged .await (timeDifference .getMillis (), TimeUnit .MILLISECONDS )
98- && cleanUpQueue .peek () == minValue )) {
99- // The minimum element has an expiry time before now, either because it had elapsed when
100- // we pulled it or because we awaited it, and it is still the minimum.
101- checkState (minValue == cleanUpQueue .poll ());
102- checkState (commitFinalizationCallbacks .remove (minValue .getId ()) == minValue );
103- }
104- }
105- } catch (InterruptedException e ) {
106- // We're being shutdown.
107- } finally {
108- lock .unlock ();
109- }
75+ private StreamingCommitFinalizer (
76+ BoundedQueueExecutor finalizationExecutor , ScheduledExecutorService cleanupExecutor ) {
77+ this .finalizationExecutor = finalizationExecutor ;
78+ this .cleanupExecutor = cleanupExecutor ;
11079 }
11180
112- static StreamingCommitFinalizer create (BoundedQueueExecutor workExecutor ) {
113- return new StreamingCommitFinalizer (workExecutor );
81+ static StreamingCommitFinalizer create (
82+ BoundedQueueExecutor workExecutor , ScheduledExecutorService cleanupExecutor ) {
83+ return new StreamingCommitFinalizer (workExecutor , cleanupExecutor );
11484 }
11585
11686 /**
11787 * Stores a map of user worker generated finalization ids and callbacks to execute once a commit
11888 * has been successfully committed to the backing state store.
11989 */
12090 public void cacheCommitFinalizers (Map <Long , Pair <Instant , Runnable >> callbacks ) {
121- for (Map .Entry <Long , Pair <Instant , Runnable >> entry : callbacks .entrySet ()) {
122- Long finalizeId = entry .getKey ();
123- final FinalizationInfo info =
124- FinalizationInfo .create (
125- finalizeId , entry .getValue ().getLeft (), entry .getValue ().getRight ());
126-
127- lock .lock ();
128- try {
129- FinalizationInfo existingInfo = commitFinalizationCallbacks .put (finalizeId , info );
91+ if (callbacks .isEmpty ()) {
92+ return ;
93+ }
94+ Instant now = Instant .now ();
95+ lock .lock ();
96+ try {
97+ for (Map .Entry <Long , Pair <Instant , Runnable >> entry : callbacks .entrySet ()) {
98+ Instant cleanupTime = entry .getValue ().getLeft ();
99+ // Ignore finalizers that have already expired.
100+ if (cleanupTime .isBefore (now )) {
101+ continue ;
102+ }
103+ ScheduledFuture <?> cleanupFuture =
104+ cleanupExecutor .schedule (
105+ () -> {
106+ lock .lock ();
107+ try {
108+ commitFinalizationCallbacks .remove (entry .getKey ());
109+ } finally {
110+ lock .unlock ();
111+ }
112+ },
113+ new Duration (now , cleanupTime ).getMillis (),
114+ TimeUnit .MILLISECONDS );
115+ FinalizationInfo info =
116+ FinalizationInfo .create (
117+ entry .getKey (),
118+ entry .getValue ().getLeft (),
119+ entry .getValue ().getRight (),
120+ cleanupFuture );
121+ FinalizationInfo existingInfo = commitFinalizationCallbacks .put (info .getId (), info );
130122 if (existingInfo != null ) {
131123 throw new IllegalStateException (
132124 "Expected to not have any past callbacks for bundle "
133- + finalizeId
125+ + info . getId ()
134126 + " but had "
135127 + existingInfo );
136128 }
137- if (!cleanUpThreadStarted ) {
138- // Start the cleanup thread lazily for pipelines that don't use finalization callbacks
139- // and some tests.
140- cleanUpThreadStarted = true ;
141- finalizationExecutor .execute (this ::cleanupThreadBody , 0 );
142- }
143- cleanUpQueue .add (info );
144- @ SuppressWarnings ("ReferenceEquality" )
145- boolean newMin = cleanUpQueue .peek () == info ;
146- if (newMin ) {
147- queueMinChanged .signal ();
148- }
149- } finally {
150- lock .unlock ();
151129 }
130+ } finally {
131+ lock .unlock ();
152132 }
153133 }
154134
@@ -167,8 +147,8 @@ public void finalizeCommits(Iterable<Long> finalizeIds) {
167147 for (long finalizeId : finalizeIds ) {
168148 @ Nullable FinalizationInfo info = commitFinalizationCallbacks .remove (finalizeId );
169149 if (info != null ) {
170- checkState (cleanUpQueue .remove (info ));
171150 callbacksToExecute .add (info .getCallback ());
151+ info .getCleanupFuture ().cancel (true );
172152 }
173153 }
174154 } finally {
@@ -186,10 +166,10 @@ public void finalizeCommits(Iterable<Long> finalizeIds) {
186166 }
187167
188168 @ VisibleForTesting
189- int cleanupQueueSize () {
169+ int pendingCallbacksSize () {
190170 lock .lock ();
191171 try {
192- return cleanUpQueue .size ();
172+ return commitFinalizationCallbacks .size ();
193173 } finally {
194174 lock .unlock ();
195175 }
0 commit comments