Description
I may be missing something obvious, but for some reason I can't make PAssert & TestPipeline work with CoGroupByKey -- but without it, it works fine.
Here is a reference test file that can reproduce the issue I'm facing. I tested with both beam sdk 2.4 and 2.5.
(For the record this was posted on StackOverflow before.)
For comparison, testWorking works as intended, and testBroken has an additional step like this:
// code placeholder // The following four lines causes an issue. PCollectionTuple tuple = KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, String>", ParDo.of(new String2KV()))) .and(inTag2, pc2).apply("CoGroupByKey", CoGroupByKey.<String>create()).apply("Some Merge DoFn", ParDo.of(new MergeDoFn(inTag1, inTag2, outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2)));
The error I get can be found after the code below.
Has anyone had a similar issue with test pipeline before? I haven't tested it yet extensively, but I couldn't find relevant information on CoGroupByKey & TestPipeline together. In production, the same code works fine for my team, and we wanted to add a few unit tests using TestPipeline and PAssert. That's how we ended up with this issue.
Any help will be appreciated!
NOTE: Resolved, after adding 'implements Serializable' to the main Test class as shown below. Without it, it will throw an exception. I'll leave the original contents for reference.
// code placeholder public class ReferenceTest implements Serializable { @Rule public final transient TestPipeline pipe1 = TestPipeline.create(); @Rule public final transient TestPipeline pipe2 = TestPipeline.create(); public static class String2KV extends DoFn<String, KV<String, String>> { @ProcessElement public void processElement(ProcessContext c) { // "key1:value1" -> ["key1", "value1"] String[] tokens = c.element().split(":"); c.output(KV.of(tokens[0], tokens[1])); } } public static class MergeDoFn extends DoFn<KV<String, CoGbkResult>, String> { final TupleTag<String> inTag1; final TupleTag<String> inTag2; final TupleTag<String> outTag2; public MergeDoFn(TupleTag<String> inTag1, TupleTag<String> inTag2, TupleTag<String> outTag2) { this.inTag1 = inTag1; this.inTag2 = inTag2; this.outTag2 = outTag2; } @ProcessElement public void processElement(ProcessContext c) { String val1 = c.element().getValue().getOnly(inTag1); String val2 = c.element().getValue().getOnly(inTag2); // outTag1 = main output // outTag2 = side output c.output(outTag2, val1 + "," + val2); } } @Test public void testWorking() { // Create two PCs for test. PCollection<String> pc1 = pipe1.apply("create pc1", Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of())); PCollection<KV<String, String>> pc2 = pipe1.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1", "key1:value2")) .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))); // Sanity check. PAssert.that(pc1).containsInAnyOrder("key1:value1"); PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "key1:value2")); pipe1.run(); } // Disabled as of 2018-07-13. // https://stackoverflow.com/questions/51334429/beam-java-sdk-2-4-2-5-passert-with-cogroupbykey @Test public void testBroken() { // Create two PCs for test. PCollection<String> pc1 = pipe2.apply("create pc1", Create.<String>of("key1:value1").withCoder(StringUtf8Coder.of())); PCollection<KV<String, String>> pc2 = pipe2.apply("create pc2", Create.<KV<String, String>>of(KV.of("key1", "value2")) .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))); // Sanity check. PAssert.that(pc1).containsInAnyOrder("key1:value1"); PAssert.that(pc2).containsInAnyOrder(KV.of("key1", "value2")); TupleTag<String> inTag1 = new TupleTag<String>() { private static final long serialVersionUID = 1L; }; TupleTag<String> inTag2 = new TupleTag<String>() { private static final long serialVersionUID = 1L; }; TupleTag<String> outTag1 = new TupleTag<String>() { private static final long serialVersionUID = 1L; }; TupleTag<String> outTag2 = new TupleTag<String>() { private static final long serialVersionUID = 1L; }; // The following four lines causes an issue. PCollectionTuple tuple = KeyedPCollectionTuple.of(inTag1, pc1.apply("String to KV<String, String>", ParDo.of(new String2KV()))) .and(inTag2, pc2).apply("CoGroupByKey", CoGroupByKey.<String>create()).apply("Some Merge DoFn", ParDo.of(new MergeDoFn(inTag1, inTag2, outTag2)).withOutputTags(outTag1, TupleTagList.of(outTag2))); PAssert.that(tuple.get(outTag1)).empty(); PAssert.that(tuple.get(outTag2)).containsInAnyOrder("value1,value2"); pipe2.run(); } }
Here's the error:
// code placeholder java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.transforms.join.CoGbkResult$CoGbkResultCoder@217aa4f at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57) at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toCustomCoder(CoderTranslation.java:121) at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:85) at org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183) at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.registerComponents(CoderTranslation.java:107) at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toKnownCoder(CoderTranslation.java:91) at org.apache.beam.runners.direct.repackaged.runners.core.construction.CoderTranslation.toProto(CoderTranslation.java:83) at org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerCoder(SdkComponents.java:183) at org.apache.beam.runners.direct.repackaged.runners.core.construction.PCollectionTranslation.toProto(PCollectionTranslation.java:36) at org.apache.beam.runners.direct.repackaged.runners.core.construction.SdkComponents.registerPCollection(SdkComponents.java:138) at org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:173) at org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:515) at org.apache.beam.runners.direct.repackaged.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:525) at org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformMatchers$4.matches(PTransformMatchers.java:194) at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:278) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660) at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311) at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245) at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458) at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:256) at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:209) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:173) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:62) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311) at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:353) at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:335) at exp.moloco.dataflow2.ReferenceTest.testBroken(ReferenceTest.java:110) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324) at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:324) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) Caused by: java.io.NotSerializableException: exp.moloco.dataflow2.ReferenceTest at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at java.util.HashMap.internalWriteEntries(HashMap.java:1789) at java.util.HashMap.writeObject(HashMap.java:1363) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53) ... 54 more Process finished with exit code 255