Skip to content

Commit 4d9e7fc

Browse files
committed
Fix flaky TextIOWriteTest by loosening the shard count. Records may end up in the same shard as other records as it is random. Simplify the test to use Iterables instead of arrays.
1 parent 927ee2c commit 4d9e7fc

1 file changed

Lines changed: 57 additions & 93 deletions

File tree

sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java

Lines changed: 57 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.hamcrest.MatcherAssert.assertThat;
2727
import static org.hamcrest.Matchers.containsInAnyOrder;
2828
import static org.junit.Assert.assertEquals;
29+
import static org.junit.Assert.assertFalse;
2930
import static org.junit.Assert.assertTrue;
3031
import static org.junit.Assume.assumeFalse;
3132

@@ -47,7 +48,6 @@
4748
import java.util.Collections;
4849
import java.util.List;
4950
import java.util.stream.Collectors;
50-
import java.util.stream.StreamSupport;
5151
import org.apache.beam.sdk.coders.Coder;
5252
import org.apache.beam.sdk.coders.CoderException;
5353
import org.apache.beam.sdk.coders.CustomCoder;
@@ -197,23 +197,17 @@ private void testDynamicDestinations(boolean customType) throws Exception {
197197
p.run();
198198

199199
assertOutputFiles(
200-
Iterables.toArray(Iterables.filter(elements, new StartsWith("a")), String.class),
201-
null,
202-
null,
200+
Iterables.filter(elements, new StartsWith("a")),
203201
0,
204202
baseDir.resolve("file_a.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
205203
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
206204
assertOutputFiles(
207-
Iterables.toArray(Iterables.filter(elements, new StartsWith("b")), String.class),
208-
null,
209-
null,
205+
Iterables.filter(elements, new StartsWith("b")),
210206
0,
211207
baseDir.resolve("file_b.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
212208
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
213209
assertOutputFiles(
214-
Iterables.toArray(Iterables.filter(elements, new StartsWith("c")), String.class),
215-
null,
216-
null,
210+
Iterables.filter(elements, new StartsWith("c")),
217211
0,
218212
baseDir.resolve("file_c.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
219213
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
@@ -323,95 +317,71 @@ public void testDynamicDefaultFilenamePolicy() throws Exception {
323317
.withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
324318
p.run();
325319

326-
String[] aElements =
327-
Iterables.toArray(
328-
StreamSupport.stream(
329-
elements.stream()
330-
.filter(
331-
Predicates.compose(new StartsWith("a"), new ExtractWriteDestination())
332-
::apply)
333-
.collect(Collectors.toList())
334-
.spliterator(),
335-
false)
336-
.map(Functions.toStringFunction()::apply)
337-
.collect(Collectors.toList()),
338-
String.class);
339-
String[] bElements =
340-
Iterables.toArray(
341-
StreamSupport.stream(
342-
elements.stream()
343-
.filter(
344-
Predicates.compose(new StartsWith("b"), new ExtractWriteDestination())
345-
::apply)
346-
.collect(Collectors.toList())
347-
.spliterator(),
348-
false)
349-
.map(Functions.toStringFunction()::apply)
350-
.collect(Collectors.toList()),
351-
String.class);
352-
String[] cElements =
353-
Iterables.toArray(
354-
StreamSupport.stream(
355-
elements.stream()
356-
.filter(
357-
Predicates.compose(new StartsWith("c"), new ExtractWriteDestination())
358-
::apply)
359-
.collect(Collectors.toList())
360-
.spliterator(),
361-
false)
362-
.map(Functions.toStringFunction()::apply)
363-
.collect(Collectors.toList()),
364-
String.class);
320+
Iterable<String> aElements =
321+
elements.stream()
322+
.filter(Predicates.compose(new StartsWith("a"), new ExtractWriteDestination())::apply)
323+
.collect(Collectors.toList())
324+
.stream()
325+
.map(Functions.toStringFunction())
326+
.collect(Collectors.toList());
327+
Iterable<String> bElements =
328+
elements.stream()
329+
.filter(Predicates.compose(new StartsWith("b"), new ExtractWriteDestination())::apply)
330+
.collect(Collectors.toList())
331+
.stream()
332+
.map(Functions.toStringFunction())
333+
.collect(Collectors.toList());
334+
Iterable<String> cElements =
335+
elements.stream()
336+
.filter(Predicates.compose(new StartsWith("c"), new ExtractWriteDestination())::apply)
337+
.collect(Collectors.toList())
338+
.stream()
339+
.map(Functions.toStringFunction())
340+
.collect(Collectors.toList());
365341
assertOutputFiles(
366342
aElements,
367-
null,
368-
null,
369343
0,
370344
baseDir.resolve("file_a.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
371345
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
372346
assertOutputFiles(
373347
bElements,
374-
null,
375-
null,
376348
0,
377349
baseDir.resolve("file_b.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
378350
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
379351
assertOutputFiles(
380352
cElements,
381-
null,
382-
null,
383353
0,
384354
baseDir.resolve("file_c.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
385355
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
386356
}
387357

388-
private void runTestWrite(String[] elems) throws Exception {
358+
private void runTestWrite(Iterable<String> elems) throws Exception {
389359
runTestWrite(elems, null, null, 1);
390360
}
391361

392-
private void runTestWrite(String[] elems, int numShards) throws Exception {
362+
private void runTestWrite(Iterable<String> elems, int numShards) throws Exception {
393363
runTestWrite(elems, null, null, numShards);
394364
}
395365

396-
private void runTestWrite(String[] elems, String header, String footer) throws Exception {
366+
private void runTestWrite(Iterable<String> elems, String header, String footer) throws Exception {
397367
runTestWrite(elems, header, footer, 1);
398368
}
399369

400-
private void runTestWrite(String[] elems, String header, String footer, int numShards)
370+
private void runTestWrite(Iterable<String> elems, String header, String footer, int numShards)
401371
throws Exception {
402372
runTestWrite(elems, header, footer, numShards, false);
403373
}
404374

405375
private void runTestWrite(
406-
String[] elems, String header, String footer, int numShards, boolean skipIfEmpty)
376+
Iterable<String> elems, String header, String footer, int numShards, boolean skipIfEmpty)
407377
throws Exception {
408378
String outputName = "file.txt";
409379
Path baseDir = Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite");
410380
ResourceId baseFilename =
411381
FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString());
412382

413383
PCollection<String> input =
414-
p.apply("CreateInput", Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of()));
384+
p.apply("CreateInput", Create.of(elems).withCoder(StringUtf8Coder.of()));
415385

416386
TextIO.TypedWrite<String, Void> write =
417387
TextIO.write().to(baseFilename).withHeader(header).withFooter(footer).withOutputFilenames();
@@ -441,27 +411,22 @@ private void runTestWrite(
441411
}
442412

443413
private static void assertOutputFiles(
444-
String[] elems,
445-
final String header,
446-
final String footer,
447-
int numShards,
448-
ResourceId outputPrefix,
449-
String shardNameTemplate)
414+
Iterable<String> elems, int numShards, ResourceId outputPrefix, String shardNameTemplate)
450415
throws Exception {
451-
assertOutputFiles(elems, header, footer, numShards, outputPrefix, shardNameTemplate, false);
416+
assertOutputFiles(elems, null, null, numShards, outputPrefix, shardNameTemplate, false);
452417
}
453418

454419
private static void assertOutputFiles(
455-
String[] elems,
456-
final String header,
457-
final String footer,
420+
Iterable<String> elems,
421+
final @Nullable String header,
422+
final @Nullable String footer,
458423
int numShards,
459424
ResourceId outputPrefix,
460425
String shardNameTemplate,
461426
boolean skipIfEmpty)
462427
throws Exception {
463428
List<File> expectedFiles = new ArrayList<>();
464-
if (skipIfEmpty && elems.length == 0) {
429+
if (skipIfEmpty && Iterables.isEmpty(elems)) {
465430
String pattern = outputPrefix.toString() + "*";
466431
MatchResult matches =
467432
Iterables.getOnlyElement(FileSystems.match(Collections.singletonList(pattern)));
@@ -489,7 +454,7 @@ private static void assertOutputFiles(
489454
actual.add(currentFile);
490455
}
491456

492-
List<String> expectedElements = new ArrayList<>(elems.length);
457+
List<String> expectedElements = new ArrayList<>();
493458
for (String elem : elems) {
494459
byte[] encodedElem = CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem);
495460
String line = new String(encodedElem, StandardCharsets.UTF_8);
@@ -551,49 +516,49 @@ private static Predicate<List<String>> haveProperHeaderAndFooter(
551516
@Test
552517
@Category(NeedsRunner.class)
553518
public void testWriteStrings() throws Exception {
554-
runTestWrite(LINES.toArray(new String[0]));
519+
runTestWrite(LINES);
555520
}
556521

557522
@Test
558523
@Category(NeedsRunner.class)
559524
public void testWriteEmptyStringsNoSharding() throws Exception {
560-
runTestWrite(NO_LINES.toArray(new String[0]), 0);
525+
runTestWrite(NO_LINES, 0);
561526
}
562527

563528
@Test
564529
@Category(NeedsRunner.class)
565530
public void testWriteEmptyStrings() throws Exception {
566-
runTestWrite(NO_LINES.toArray(new String[0]));
531+
runTestWrite(NO_LINES);
567532
}
568533

569534
@Test
570535
@Category(NeedsRunner.class)
571536
public void testWriteEmptyStringsSkipIfEmpty() throws Exception {
572-
runTestWrite(NO_LINES.toArray(new String[0]), null, null, 0, true);
537+
runTestWrite(NO_LINES, null, null, 0, true);
573538
}
574539

575540
@Test
576541
@Category(NeedsRunner.class)
577542
public void testShardedWrite() throws Exception {
578-
runTestWrite(LINES.toArray(new String[0]), 5);
543+
runTestWrite(LINES, 5);
579544
}
580545

581546
@Test
582547
@Category(NeedsRunner.class)
583548
public void testWriteWithHeader() throws Exception {
584-
runTestWrite(LINES.toArray(new String[0]), MY_HEADER, null);
549+
runTestWrite(LINES, MY_HEADER, null);
585550
}
586551

587552
@Test
588553
@Category(NeedsRunner.class)
589554
public void testWriteWithFooter() throws Exception {
590-
runTestWrite(LINES.toArray(new String[0]), null, MY_FOOTER);
555+
runTestWrite(LINES, null, MY_FOOTER);
591556
}
592557

593558
@Test
594559
@Category(NeedsRunner.class)
595560
public void testWriteWithHeaderAndFooter() throws Exception {
596-
runTestWrite(LINES.toArray(new String[0]), MY_HEADER, MY_FOOTER);
561+
runTestWrite(LINES, MY_HEADER, MY_FOOTER);
597562
}
598563

599564
@Test
@@ -605,8 +570,7 @@ public void testWriteWithWritableByteChannelFactory() throws Exception {
605570
FileSystems.matchNewResource(
606571
Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite").toString(), true);
607572

608-
PCollection<String> input =
609-
p.apply(Create.of(Arrays.asList(LINES2.toArray(new String[0]))).withCoder(coder));
573+
PCollection<String> input = p.apply(Create.of(LINES2).withCoder(coder));
610574

611575
final WritableByteChannelFactory writableByteChannelFactory =
612576
new DrunkWritableByteChannelFactory();
@@ -625,15 +589,13 @@ public void testWriteWithWritableByteChannelFactory() throws Exception {
625589

626590
p.run();
627591

628-
final List<String> drunkElems = new ArrayList<>(LINES2.toArray(new String[0]).length * 2 + 2);
629-
for (String elem : LINES2.toArray(new String[0])) {
630-
drunkElems.add(elem);
631-
drunkElems.add(elem);
592+
List<String> expectedElems = new ArrayList<>(2 * LINES2.size());
593+
for (String elem : LINES2) {
594+
expectedElems.add(elem);
595+
expectedElems.add(elem);
632596
}
633597
assertOutputFiles(
634-
drunkElems.toArray(new String[0]),
635-
null,
636-
null,
598+
expectedElems,
637599
1,
638600
baseDir.resolve(
639601
outputName + writableByteChannelFactory.getSuggestedFilenameSuffix(),
@@ -716,7 +678,7 @@ public void testWriteUnboundedWithCustomBatchParameters() throws Exception {
716678
FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString());
717679

718680
PCollection<String> input =
719-
p.apply(Create.of(Arrays.asList(LINES2.toArray(new String[0]))).withCoder(coder))
681+
p.apply(Create.of(LINES2).withCoder(coder))
720682
.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED)
721683
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))));
722684

@@ -738,11 +700,13 @@ public void testWriteUnboundedWithCustomBatchParameters() throws Exception {
738700
String pattern = baseFilename.toString() + "*";
739701
List<MatchResult> matches = FileSystems.match(Collections.singletonList(pattern));
740702
List<Metadata> found = new ArrayList<>(Iterables.getOnlyElement(matches).metadata());
741-
assertEquals(3, found.size());
703+
// As sharding is random, the elements may end up in the same shards.
704+
assertFalse(found.isEmpty());
705+
assertTrue(found.size() <= 3);
742706

743707
// Now assert file contents irrespective of exact shard indices.
744708
assertOutputFiles(
745-
LINES2.toArray(new String[0]),
709+
LINES2,
746710
null,
747711
null,
748712
0, // match all files by prefix

0 commit comments

Comments
 (0)