Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discrepancy in behavior of DoFn.process() when yield is combined with return statement, or vice versa #22969

Closed
toransahu opened this issue Aug 31, 2022 · 9 comments
Assignees
Labels
bug core done & done Issue has been reviewed after it was closed for verification, followups, etc. P1 python

Comments

@toransahu
Copy link
Contributor

toransahu commented Aug 31, 2022

What happened?

Apache Beam SDK Version: 2.40.0
SDK Language: Python
Runner: All (DirectRunner, DataflowRunner, PortableRunner etc.)

TL;DR: If a DoFn.process has yield <some value> as well as return <some iterable> statements, then it does not emit element for statement return <some iterable>.

The Apache Beam, Programming Guide, 4.2.1.2. Creating a DoFn states that:

Your process method should accept an argument element, which is the input element, and return an iterable with its output values. You can accomplish this by emitting individual elements with yield statements. You can also use a return statement with an iterable, like a list or a generator.

That statement is correct when a DoFn.process:

  1. either uses yield <some value> only
  2. or, uses return <some iterable> only

to emit an element, within the definition.

If the combination of yield and return are used in the DoFn.process() definition, then it does not comply with the statement made in the document.

See this example pipeline:

# dofn_issue.py

import argparse
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


logger = logging.getLogger(__name__)
logger.setLevel("INFO")


class Pipeline:
    def run(self, args=None):
        parser = argparse.ArgumentParser()
        _, extra_args = parser.parse_known_args(args)
        pipeline_options = PipelineOptions(extra_args, save_main_session=True)
        with beam.Pipeline(options=pipeline_options) as pipeline:
            data_to_process = pipeline | beam.Create(
                [
                    {"key": 1},
                    {"key": 2},
                    {"key": None},
                    {"key": 4},
                ],
            )
            data_to_process | beam.ParDo(SetDefaultValFn1()) | "1" >> beam.ParDo(LogElementsFn(), 1)
            data_to_process | beam.ParDo(SetDefaultValFn2()) | "2" >> beam.ParDo(LogElementsFn(), 2)
            data_to_process | beam.ParDo(SetDefaultValFn3()) | "3" >> beam.ParDo(LogElementsFn(), 3)
            data_to_process | beam.ParDo(SetDefaultValFn4()) | "4" >> beam.ParDo(LogElementsFn(), 4)


# NOT EXPECTED - `return` statement doesn't emit data
class SetDefaultValFn1(beam.DoFn):
    def process(self, element):
        data = element.get("key")
        if not data:
            return [{"key": -9999}]
        yield element


# NOT EXPECTED - `yield` statement only emits data
class SetDefaultValFn2(beam.DoFn):
    def process(self, element):
        data = element.get("key")
        if not data:
            yield {"key": -9999}
            return
        return [element]


# EXPECTED
class SetDefaultValFn3(beam.DoFn):
    def process(self, element):
        data = element.get("key")
        if not data:
            return [{"key": -9999}]
        return [element]


# EXPECTED
class SetDefaultValFn4(beam.DoFn):
    def process(self, element):
        data = element.get("key")
        if not data:
            yield {"key": -9999}
            return
        yield element


class LogElementsFn(beam.DoFn):
    def process(self, element, where):
        logger.info(f"From {where} found {element}")
        yield element


if __name__ == "__main__":
    pipeline = Pipeline()
    pipeline.run()

Actual Output:

$ python dofn_issue.py \
--runner DirectRunner

INFO:__main__:From 1 found {'key': 1}
INFO:__main__:From 1 found {'key': 2}
# NOTE: SetDefaultValFn1 silently skipped element {'key': -9999}
INFO:__main__:From 1 found {'key': 4}
# NOTE: SetDefaultValFn2 silently skipped element {'key': 1}
# NOTE: SetDefaultValFn2 silently skipped element {'key': 2}
INFO:__main__:From 2 found {'key': -9999}
# NOTE: SetDefaultValFn2 silently skipped element {'key': 4}
INFO:__main__:From 3 found {'key': 1}
INFO:__main__:From 3 found {'key': 2}
INFO:__main__:From 3 found {'key': -9999}
INFO:__main__:From 3 found {'key': 4}
INFO:__main__:From 4 found {'key': 1}
INFO:__main__:From 4 found {'key': 2}
INFO:__main__:From 4 found {'key': -9999}
INFO:__main__:From 4 found {'key': 4}

PS: Output is manually ordered for ease of interpretation.

Expected Output:

$ python dofn_issue.py \
--runner DirectRunner

INFO:__main__:From 1 found {'key': 1}
INFO:__main__:From 1 found {'key': 2}
INFO:__main__:From 1 found {'key': -9999}
INFO:__main__:From 1 found {'key': 4}
INFO:__main__:From 2 found {'key': 1}
INFO:__main__:From 2 found {'key': 2}
INFO:__main__:From 2 found {'key': -9999}
INFO:__main__:From 2 found {'key': 4}
INFO:__main__:From 3 found {'key': 1}
INFO:__main__:From 3 found {'key': 2}
INFO:__main__:From 3 found {'key': -9999}
INFO:__main__:From 3 found {'key': 4}
INFO:__main__:From 4 found {'key': 1}
INFO:__main__:From 4 found {'key': 2}
INFO:__main__:From 4 found {'key': -9999}
INFO:__main__:From 4 found {'key': 4}

PS: Output is manually ordered for ease of interpretation.

If the analogy of DoFn.process is similar to a Python generator, then on running a similar code (made purely with generator):

# generator_eg.py

import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


class SetDefaultVal1:
    @staticmethod
    def process(element):
        data = element.get("key")
        if not data:
            return [{"key": -9999}]
        yield element


class SetDefaultVal2:
    @staticmethod
    def process(element):
        data = element.get("key")
        if not data:
            yield {"key": -9999}
            return
        return [element]


class SetDefaultVal3:
    @staticmethod
    def process(element):
        data = element.get("key")
        if not data:
            return [{"key": -9999}]
        return [element]


class SetDefaultVal4:
    @staticmethod
    def process(element):
        data = element.get("key")
        if not data:
            yield {"key": -9999}
            return
        yield element


def run_and_log(Generator, element, where):
    generator = Generator.process(element)
    try:
        processed_element = next(generator)
        logger.info(f"From {where} found {processed_element}")
    except Exception:
        pass


if __name__ == "__main__":
    for element in [
        {"key": 1},
        {"key": 2},
        {"key": None},
        {"key": 4},
    ]:
        run_and_log(SetDefaultVal1, element, 1)
        run_and_log(SetDefaultVal2, element, 2)
        run_and_log(SetDefaultVal3, element, 3)
        run_and_log(SetDefaultVal4, element, 4)

Outputs:

$ python generator_eg.py
INFO:__main__:From 1 found {'key': 1}
INFO:__main__:From 1 found {'key': 2}
INFO:__main__:From 1 found {'key': 4}
INFO:__main__:From 2 found {'key': -9999}
INFO:__main__:From 4 found {'key': 1}
INFO:__main__:From 4 found {'key': 2}
INFO:__main__:From 4 found {'key': -9999}
INFO:__main__:From 4 found {'key': 4}

# PS: Output is manually ordered for ease of interpretation.

If we compare behavior of DoFn.process with Python generator, for all 4 cases:

Case 4.

  1. Output is expected for dofn_issue.py & generator_eg.py.
  2. Output is same for dofn_issue.py & generator_eg.py.

Case 3.

  1. Output is expected for dofn_issue.py & generator_eg.py.
  2. Output is NOT same for dofn_issue.py & generator_eg.py.

Case 2.

  1. a) Output is NOT expected for dofn_issue.py. b) Output is expected for generator_eg.py.
  2. Output is same for dofn_issue.py & generator_eg.py.

Case 1.

  1. a) Output is NOT expected for dofn_issue.py. b) Output is expected for generator_eg.py.
  2. Output is same for dofn_issue.py & generator_eg.py.

Discrepancy

If dofn_issue.py for case 3 emits on statement return <some iterable> (which is NOT the case with generator_eg.py), then why it does NOT emit for case 1 and case2's statement return <some iterable>.

Issue Priority

Priority: 1

Issue Component

Component: sdk-py-core

@toransahu toransahu changed the title [Bug]: Discrepancy in behavior of DoFn.process() when yield is combined with return statement, or vice versa Discrepancy in behavior of DoFn.process() when yield is combined with return statement, or vice versa Aug 31, 2022
@toransahu toransahu changed the title Discrepancy in behavior of DoFn.process() when yield is combined with return statement, or vice versa Discrepancy in behavior of DoFn.process() when yield is combined with return statement, or vice versa Aug 31, 2022
@TheNeuralBit
Copy link
Member

Thanks for the detailed report @toransahu

In the short-term we should at least update the docs to indicate that mixing return and yield leads to undefined behavior. (optional) medium-term: detect this situation and fail at construction time. Long-term: Enable mixing yield and return.

CC: @tvalentyn @robertwb

@TheNeuralBit
Copy link
Member

I think P1 is appropriate here since this could lead to data loss. I'm not sure updating docs is enough to drop to P2, since users could easily miss that.

@toransahu
Copy link
Contributor Author

@TheNeuralBit Thanks for promptly triaging the issue.

@tvalentyn tvalentyn removed their assignment Oct 21, 2022
@kennknowles
Copy link
Member

Has anything been done to mitigate this? It seems quite severe!

@tvalentyn
Copy link
Contributor

tvalentyn commented Jan 18, 2023

This is in the backlog, awaiting available hands; doc update in flight. I think we can recommend yield for elements and yield from for iterators and not use return altogether, we should check that the issue wouldn't be reproducible in that case.

@liferoad
Copy link
Collaborator

liferoad commented Mar 5, 2023

.take-issue

@tvalentyn
Copy link
Contributor

Taking a closer look, the unexpected behavior is not caused by Beam but rather how Python interprets a function with yield and return statements mixed together. See: https://stackoverflow.com/questions/16780002/return-in-generator-together-with-yield which summarizes it.

For example:

def foo(c):
  if (c):
    yield "foo"
  else:
    return "bar
list(foo(True))
Out: ['foo']
list(foo(False))
Out: []

next(foo(False)) raises StopIteration: bar

Seems that such mix-and-match was an error in Python 2.

@toransahu
Copy link
Contributor Author

@liferoad Thanks for the PR. @tvalentyn Thanks for PR review & sharing the root cause.

@liferoad
Copy link
Collaborator

.close-issue

@damccorm damccorm added the done & done Issue has been reviewed after it was closed for verification, followups, etc. label Apr 4, 2023
jimmytobin2425 added a commit to jimmytobin2425/beam that referenced this issue Mar 6, 2024
The issue this warning message points to has been resolved showing it was an issue with Python 2 and not with beam.

apache#22969
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug core done & done Issue has been reviewed after it was closed for verification, followups, etc. P1 python
Projects
None yet
Development

No branches or pull requests

6 participants