Skip to content

Commit 95d8481

Browse files
authored
Merge pull request #37798: Fix flaky TextIOWriteTest by loosening the shard count
2 parents 78061b8 + 4d9e7fc commit 95d8481

1 file changed

Lines changed: 57 additions & 93 deletions

File tree

sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java

Lines changed: 57 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.hamcrest.MatcherAssert.assertThat;
2727
import static org.hamcrest.Matchers.containsInAnyOrder;
2828
import static org.junit.Assert.assertEquals;
29+
import static org.junit.Assert.assertFalse;
2930
import static org.junit.Assert.assertTrue;
3031
import static org.junit.Assume.assumeFalse;
3132

@@ -47,7 +48,6 @@
4748
import java.util.Collections;
4849
import java.util.List;
4950
import java.util.stream.Collectors;
50-
import java.util.stream.StreamSupport;
5151
import org.apache.beam.sdk.coders.Coder;
5252
import org.apache.beam.sdk.coders.CoderException;
5353
import org.apache.beam.sdk.coders.CustomCoder;
@@ -197,23 +197,17 @@ private void testDynamicDestinations(boolean customType) throws Exception {
197197
p.run();
198198

199199
assertOutputFiles(
200-
Iterables.toArray(Iterables.filter(elements, new StartsWith("a")), String.class),
201-
null,
202-
null,
200+
Iterables.filter(elements, new StartsWith("a")),
203201
0,
204202
baseDir.resolve("file_a.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
205203
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
206204
assertOutputFiles(
207-
Iterables.toArray(Iterables.filter(elements, new StartsWith("b")), String.class),
208-
null,
209-
null,
205+
Iterables.filter(elements, new StartsWith("b")),
210206
0,
211207
baseDir.resolve("file_b.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
212208
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
213209
assertOutputFiles(
214-
Iterables.toArray(Iterables.filter(elements, new StartsWith("c")), String.class),
215-
null,
216-
null,
210+
Iterables.filter(elements, new StartsWith("c")),
217211
0,
218212
baseDir.resolve("file_c.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
219213
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
@@ -323,95 +317,71 @@ public void testDynamicDefaultFilenamePolicy() throws Exception {
323317
.withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
324318
p.run();
325319

326-
String[] aElements =
327-
Iterables.toArray(
328-
StreamSupport.stream(
329-
elements.stream()
330-
.filter(
331-
Predicates.compose(new StartsWith("a"), new ExtractWriteDestination())
332-
::apply)
333-
.collect(Collectors.toList())
334-
.spliterator(),
335-
false)
336-
.map(Functions.toStringFunction()::apply)
337-
.collect(Collectors.toList()),
338-
String.class);
339-
String[] bElements =
340-
Iterables.toArray(
341-
StreamSupport.stream(
342-
elements.stream()
343-
.filter(
344-
Predicates.compose(new StartsWith("b"), new ExtractWriteDestination())
345-
::apply)
346-
.collect(Collectors.toList())
347-
.spliterator(),
348-
false)
349-
.map(Functions.toStringFunction()::apply)
350-
.collect(Collectors.toList()),
351-
String.class);
352-
String[] cElements =
353-
Iterables.toArray(
354-
StreamSupport.stream(
355-
elements.stream()
356-
.filter(
357-
Predicates.compose(new StartsWith("c"), new ExtractWriteDestination())
358-
::apply)
359-
.collect(Collectors.toList())
360-
.spliterator(),
361-
false)
362-
.map(Functions.toStringFunction()::apply)
363-
.collect(Collectors.toList()),
364-
String.class);
320+
Iterable<String> aElements =
321+
elements.stream()
322+
.filter(Predicates.compose(new StartsWith("a"), new ExtractWriteDestination())::apply)
323+
.collect(Collectors.toList())
324+
.stream()
325+
.map(Functions.toStringFunction())
326+
.collect(Collectors.toList());
327+
Iterable<String> bElements =
328+
elements.stream()
329+
.filter(Predicates.compose(new StartsWith("b"), new ExtractWriteDestination())::apply)
330+
.collect(Collectors.toList())
331+
.stream()
332+
.map(Functions.toStringFunction())
333+
.collect(Collectors.toList());
334+
Iterable<String> cElements =
335+
elements.stream()
336+
.filter(Predicates.compose(new StartsWith("c"), new ExtractWriteDestination())::apply)
337+
.collect(Collectors.toList())
338+
.stream()
339+
.map(Functions.toStringFunction())
340+
.collect(Collectors.toList());
365341
assertOutputFiles(
366342
aElements,
367-
null,
368-
null,
369343
0,
370344
baseDir.resolve("file_a.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
371345
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
372346
assertOutputFiles(
373347
bElements,
374-
null,
375-
null,
376348
0,
377349
baseDir.resolve("file_b.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
378350
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
379351
assertOutputFiles(
380352
cElements,
381-
null,
382-
null,
383353
0,
384354
baseDir.resolve("file_c.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
385355
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
386356
}
387357

388-
private void runTestWrite(String[] elems) throws Exception {
358+
private void runTestWrite(Iterable<String> elems) throws Exception {
389359
runTestWrite(elems, null, null, 1);
390360
}
391361

392-
private void runTestWrite(String[] elems, int numShards) throws Exception {
362+
private void runTestWrite(Iterable<String> elems, int numShards) throws Exception {
393363
runTestWrite(elems, null, null, numShards);
394364
}
395365

396-
private void runTestWrite(String[] elems, String header, String footer) throws Exception {
366+
private void runTestWrite(Iterable<String> elems, String header, String footer) throws Exception {
397367
runTestWrite(elems, header, footer, 1);
398368
}
399369

400-
private void runTestWrite(String[] elems, String header, String footer, int numShards)
370+
private void runTestWrite(Iterable<String> elems, String header, String footer, int numShards)
401371
throws Exception {
402372
runTestWrite(elems, header, footer, numShards, false);
403373
}
404374

405375
private void runTestWrite(
406-
String[] elems, String header, String footer, int numShards, boolean skipIfEmpty)
376+
Iterable<String> elems, String header, String footer, int numShards, boolean skipIfEmpty)
407377
throws Exception {
408378
String outputName = "file.txt";
409379
Path baseDir = Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite");
410380
ResourceId baseFilename =
411381
FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString());
412382

413383
PCollection<String> input =
414-
p.apply("CreateInput", Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of()));
384+
p.apply("CreateInput", Create.of(elems).withCoder(StringUtf8Coder.of()));
415385

416386
TextIO.TypedWrite<String, Void> write =
417387
TextIO.write().to(baseFilename).withHeader(header).withFooter(footer).withOutputFilenames();
@@ -441,27 +411,22 @@ private void runTestWrite(
441411
}
442412

443413
private static void assertOutputFiles(
444-
String[] elems,
445-
final String header,
446-
final String footer,
447-
int numShards,
448-
ResourceId outputPrefix,
449-
String shardNameTemplate)
414+
Iterable<String> elems, int numShards, ResourceId outputPrefix, String shardNameTemplate)
450415
throws Exception {
451-
assertOutputFiles(elems, header, footer, numShards, outputPrefix, shardNameTemplate, false);
416+
assertOutputFiles(elems, null, null, numShards, outputPrefix, shardNameTemplate, false);
452417
}
453418

454419
private static void assertOutputFiles(
455-
String[] elems,
456-
final String header,
457-
final String footer,
420+
Iterable<String> elems,
421+
final @Nullable String header,
422+
final @Nullable String footer,
458423
int numShards,
459424
ResourceId outputPrefix,
460425
String shardNameTemplate,
461426
boolean skipIfEmpty)
462427
throws Exception {
463428
List<File> expectedFiles = new ArrayList<>();
464-
if (skipIfEmpty && elems.length == 0) {
429+
if (skipIfEmpty && Iterables.isEmpty(elems)) {
465430
String pattern = outputPrefix.toString() + "*";
466431
MatchResult matches =
467432
Iterables.getOnlyElement(FileSystems.match(Collections.singletonList(pattern)));
@@ -489,7 +454,7 @@ private static void assertOutputFiles(
489454
actual.add(currentFile);
490455
}
491456

492-
List<String> expectedElements = new ArrayList<>(elems.length);
457+
List<String> expectedElements = new ArrayList<>();
493458
for (String elem : elems) {
494459
byte[] encodedElem = CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem);
495460
String line = new String(encodedElem, StandardCharsets.UTF_8);
@@ -551,49 +516,49 @@ private static Predicate<List<String>> haveProperHeaderAndFooter(
551516
@Test
552517
@Category(NeedsRunner.class)
553518
public void testWriteStrings() throws Exception {
554-
runTestWrite(LINES.toArray(new String[0]));
519+
runTestWrite(LINES);
555520
}
556521

557522
@Test
558523
@Category(NeedsRunner.class)
559524
public void testWriteEmptyStringsNoSharding() throws Exception {
560-
runTestWrite(NO_LINES.toArray(new String[0]), 0);
525+
runTestWrite(NO_LINES, 0);
561526
}
562527

563528
@Test
564529
@Category(NeedsRunner.class)
565530
public void testWriteEmptyStrings() throws Exception {
566-
runTestWrite(NO_LINES.toArray(new String[0]));
531+
runTestWrite(NO_LINES);
567532
}
568533

569534
@Test
570535
@Category(NeedsRunner.class)
571536
public void testWriteEmptyStringsSkipIfEmpty() throws Exception {
572-
runTestWrite(NO_LINES.toArray(new String[0]), null, null, 0, true);
537+
runTestWrite(NO_LINES, null, null, 0, true);
573538
}
574539

575540
@Test
576541
@Category(NeedsRunner.class)
577542
public void testShardedWrite() throws Exception {
578-
runTestWrite(LINES.toArray(new String[0]), 5);
543+
runTestWrite(LINES, 5);
579544
}
580545

581546
@Test
582547
@Category(NeedsRunner.class)
583548
public void testWriteWithHeader() throws Exception {
584-
runTestWrite(LINES.toArray(new String[0]), MY_HEADER, null);
549+
runTestWrite(LINES, MY_HEADER, null);
585550
}
586551

587552
@Test
588553
@Category(NeedsRunner.class)
589554
public void testWriteWithFooter() throws Exception {
590-
runTestWrite(LINES.toArray(new String[0]), null, MY_FOOTER);
555+
runTestWrite(LINES, null, MY_FOOTER);
591556
}
592557

593558
@Test
594559
@Category(NeedsRunner.class)
595560
public void testWriteWithHeaderAndFooter() throws Exception {
596-
runTestWrite(LINES.toArray(new String[0]), MY_HEADER, MY_FOOTER);
561+
runTestWrite(LINES, MY_HEADER, MY_FOOTER);
597562
}
598563

599564
@Test
@@ -605,8 +570,7 @@ public void testWriteWithWritableByteChannelFactory() throws Exception {
605570
FileSystems.matchNewResource(
606571
Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite").toString(), true);
607572

608-
PCollection<String> input =
609-
p.apply(Create.of(Arrays.asList(LINES2.toArray(new String[0]))).withCoder(coder));
573+
PCollection<String> input = p.apply(Create.of(LINES2).withCoder(coder));
610574

611575
final WritableByteChannelFactory writableByteChannelFactory =
612576
new DrunkWritableByteChannelFactory();
@@ -625,15 +589,13 @@ public void testWriteWithWritableByteChannelFactory() throws Exception {
625589

626590
p.run();
627591

628-
final List<String> drunkElems = new ArrayList<>(LINES2.toArray(new String[0]).length * 2 + 2);
629-
for (String elem : LINES2.toArray(new String[0])) {
630-
drunkElems.add(elem);
631-
drunkElems.add(elem);
592+
List<String> expectedElems = new ArrayList<>(2 * LINES2.size());
593+
for (String elem : LINES2) {
594+
expectedElems.add(elem);
595+
expectedElems.add(elem);
632596
}
633597
assertOutputFiles(
634-
drunkElems.toArray(new String[0]),
635-
null,
636-
null,
598+
expectedElems,
637599
1,
638600
baseDir.resolve(
639601
outputName + writableByteChannelFactory.getSuggestedFilenameSuffix(),
@@ -716,7 +678,7 @@ public void testWriteUnboundedWithCustomBatchParameters() throws Exception {
716678
FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString());
717679

718680
PCollection<String> input =
719-
p.apply(Create.of(Arrays.asList(LINES2.toArray(new String[0]))).withCoder(coder))
681+
p.apply(Create.of(LINES2).withCoder(coder))
720682
.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED)
721683
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))));
722684

@@ -738,11 +700,13 @@ public void testWriteUnboundedWithCustomBatchParameters() throws Exception {
738700
String pattern = baseFilename.toString() + "*";
739701
List<MatchResult> matches = FileSystems.match(Collections.singletonList(pattern));
740702
List<Metadata> found = new ArrayList<>(Iterables.getOnlyElement(matches).metadata());
741-
assertEquals(3, found.size());
703+
// As sharding is random, the elements may end up in the same shards.
704+
assertFalse(found.isEmpty());
705+
assertTrue(found.size() <= 3);
742706

743707
// Now assert file contents irrespective of exact shard indices.
744708
assertOutputFiles(
745-
LINES2.toArray(new String[0]),
709+
LINES2,
746710
null,
747711
null,
748712
0, // match all files by prefix

0 commit comments

Comments
 (0)