2020import org .junit .jupiter .api .BeforeAll ;
2121
2222public class BasicTest {
23+ private static boolean environmentInitialized ;
2324 protected static UserRequestObject testUserRequestObject ;
2425 protected static List <UserRequestObject > testUsersRequestObjects = new ArrayList <>();
2526 protected static ChannelGetResponse testChannelGetResponse ;
2627 protected static Channel testChannel ;
2728 protected static Message testMessage ;
2829
2930 @ BeforeAll
30- static void setup () throws StreamException , SecurityException , IllegalArgumentException {
31+ static synchronized void setup ()
32+ throws StreamException , SecurityException , IllegalArgumentException {
3133 // failOnUnknownProperties();
3234 setProperties ();
33- cleanChannels ();
34- cleanChannelTypes ();
35- cleanBlocklists ();
36- cleanCommands ();
37- cleanUsers ();
35+ if (!environmentInitialized ) {
36+ cleanChannels ();
37+ cleanChannelTypes ();
38+ cleanBlocklists ();
39+ cleanCommands ();
40+ cleanUsers ();
41+ environmentInitialized = true ;
42+ }
3843 upsertUsers ();
3944 createTestChannel ();
4045 createTestMessage ();
@@ -74,6 +79,11 @@ private static void cleanChannels() throws StreamException {
7479 // wait for the channels to delete
7580 Assertions .assertDoesNotThrow (() -> java .lang .Thread .sleep (500 ));
7681 }
82+
83+ waitFor (
84+ () -> Assertions .assertDoesNotThrow (() -> Channel .list ().request ().getChannels ().isEmpty ()),
85+ 1000L ,
86+ 60000L );
7787 }
7888 }
7989
@@ -111,6 +121,11 @@ private static void cleanUsers() throws StreamException {
111121 // wait for the channels to delete
112122 Assertions .assertDoesNotThrow (() -> java .lang .Thread .sleep (500 ));
113123 }
124+
125+ waitFor (
126+ () -> Assertions .assertDoesNotThrow (() -> User .list ().request ().getUsers ().isEmpty ()),
127+ 1000L ,
128+ 60000L );
114129 }
115130 }
116131
@@ -165,12 +180,32 @@ private static void cleanCommands() throws StreamException {
165180 }
166181
167182 private static void createTestMessage () throws StreamException {
168- testMessage = sendTestMessage ();
183+ waitFor (
184+ () -> {
185+ try {
186+ testMessage = sendTestMessage ();
187+ return testMessage != null ;
188+ } catch (StreamException e ) {
189+ return false ;
190+ }
191+ },
192+ 1000L ,
193+ 60000L );
169194 }
170195
171196 private static void createTestChannel () throws StreamException {
172- testChannelGetResponse = createRandomChannel ();
173- testChannel = testChannelGetResponse .getChannel ();
197+ waitFor (
198+ () -> {
199+ try {
200+ testChannelGetResponse = createRandomChannel ();
201+ testChannel = testChannelGetResponse .getChannel ();
202+ return testChannel != null ;
203+ } catch (StreamException e ) {
204+ return false ;
205+ }
206+ },
207+ 1000L ,
208+ 60000L );
174209 }
175210
176211 static void upsertUsers () throws StreamException {
@@ -199,6 +234,20 @@ static void upsertUsers() throws StreamException {
199234 UserUpsertRequest usersUpsertRequest = User .upsert ();
200235 testUsersRequestObjects .forEach (user -> usersUpsertRequest .user (user ));
201236 usersUpsertRequest .request ();
237+ waitFor (
238+ () -> {
239+ var existingUsers = Assertions .assertDoesNotThrow (() -> User .list ().request ().getUsers ());
240+ return testUsersRequestObjects .stream ()
241+ .allMatch (
242+ expectedUser ->
243+ existingUsers .stream ()
244+ .anyMatch (
245+ existingUser ->
246+ expectedUser .getId ().equals (existingUser .getId ())
247+ && existingUser .getDeletedAt () == null ));
248+ },
249+ 1000L ,
250+ 60000L );
202251 }
203252
204253 static void setProperties () {
@@ -255,14 +304,23 @@ protected static void waitFor(Supplier<Boolean> predicate) {
255304
256305 protected static void waitFor (Supplier <Boolean > predicate , Long askInterval , Long timeout ) {
257306 var start = System .currentTimeMillis ();
307+ Throwable lastError = null ;
258308
259309 while (true ) {
260310 if (timeout < (System .currentTimeMillis () - start )) {
311+ if (lastError != null ) {
312+ Assertions .fail (lastError );
313+ }
261314 Assertions .fail (new TimeoutException ());
262315 }
263316
264- if (Assertions .assertDoesNotThrow (predicate ::get )) {
265- return ;
317+ try {
318+ if (predicate .get ()) {
319+ return ;
320+ }
321+ lastError = null ;
322+ } catch (Throwable t ) {
323+ lastError = t ;
266324 }
267325
268326 Assertions .assertDoesNotThrow (() -> java .lang .Thread .sleep (askInterval ));
@@ -273,7 +331,7 @@ protected static void waitForTaskCompletion(String taskId) {
273331 var start = System .currentTimeMillis ();
274332 TaskStatusGetResponse lastResponse = null ;
275333 var askInterval = 500L ;
276- var timeout = 15000L ;
334+ var timeout = 120000L ;
277335
278336 System .out .printf ("Waiting for task %s to complete...\n " , taskId );
279337
@@ -291,17 +349,53 @@ protected static void waitForTaskCompletion(String taskId) {
291349 lastResponse = Assertions .assertDoesNotThrow (() -> TaskStatus .get (taskId ).request ());
292350 var status = lastResponse .getStatus ();
293351 System .out .printf ("Task %s status=%s result=%s\n " , taskId , status , lastResponse .getResult ());
294- if ("completed" .equals (status ) || "ok" .equals (status )) {
295- return ;
296- }
297352 if ("failed" .equals (status ) || "error" .equals (status )) {
298353 Assertions .fail (
299354 String .format (
300355 "Task %s failed with status=%s result=%s" ,
301356 taskId , status , lastResponse .getResult ()));
302357 }
303358
359+ if (isTaskResultFailed (lastResponse )) {
360+ Assertions .fail (
361+ String .format (
362+ "Task %s failed with status=%s result=%s" ,
363+ taskId , status , lastResponse .getResult ()));
364+ }
365+
366+ if (("completed" .equals (status ) || "ok" .equals (status )) && isTaskResultTerminal (lastResponse )) {
367+ return ;
368+ }
369+
304370 Assertions .assertDoesNotThrow (() -> java .lang .Thread .sleep (askInterval ));
305371 }
306372 }
373+
374+ private static boolean isTaskResultTerminal (TaskStatusGetResponse response ) {
375+ if (response == null || response .getResult () == null || response .getResult ().isEmpty ()) {
376+ return true ;
377+ }
378+
379+ var resultStatus = response .getResult ().get ("status" );
380+ if (!(resultStatus instanceof String )) {
381+ return true ;
382+ }
383+
384+ var normalizedStatus = ((String ) resultStatus ).toLowerCase ();
385+ return !"started" .equals (normalizedStatus ) && !"pending" .equals (normalizedStatus );
386+ }
387+
388+ private static boolean isTaskResultFailed (TaskStatusGetResponse response ) {
389+ if (response == null || response .getResult () == null ) {
390+ return false ;
391+ }
392+
393+ var resultStatus = response .getResult ().get ("status" );
394+ if (!(resultStatus instanceof String )) {
395+ return false ;
396+ }
397+
398+ var normalizedStatus = ((String ) resultStatus ).toLowerCase ();
399+ return "failed" .equals (normalizedStatus ) || "error" .equals (normalizedStatus );
400+ }
307401}
0 commit comments