Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3153] Add Python processing-time timers and clock injection #4148

Merged
merged 6 commits into from
Dec 15, 2017

Conversation

mariapython
Copy link
Contributor

  • Add processing-time timers
  • Add clock to runners/direct
  • Clock injection and propagation:
    DirectRunner → EvaluationContext → WatermarkManager
  • Use RealClock() for production and TestClock() for tests
  • Add mechanism to detect not-yet fired realtime timers
  • Refactoring:
    _TransformWatermarks’ extract_fired_timers → extract_transform_timers
    WatermarkManager’s extract_fired_timers → extract_all_timers
    EvaluationContext’s extract_fired_timers → extract_all_timers
  • Add and improve comments
  • Tests

@asfgit
Copy link

asfgit commented Nov 18, 2017

FAILURE

--none--

@asfgit
Copy link

asfgit commented Nov 18, 2017

SUCCESS

--none--

@asfgit
Copy link

asfgit commented Nov 18, 2017

FAILURE

--none--

@asfgit
Copy link

asfgit commented Nov 18, 2017

SUCCESS

--none--

@mariapython
Copy link
Contributor Author

R: @charlesccychen

@asfgit
Copy link

asfgit commented Nov 19, 2017

SUCCESS

--none--

1 similar comment
@asfgit
Copy link

asfgit commented Nov 19, 2017

SUCCESS

--none--

@asfgit
Copy link

asfgit commented Nov 19, 2017

FAILURE

--none--

@mariapython
Copy link
Contributor Author

retest this please

@asfgit
Copy link

asfgit commented Nov 20, 2017

SUCCESS

--none--

1 similar comment
@asfgit
Copy link

asfgit commented Nov 20, 2017

SUCCESS

--none--

@asfgit
Copy link

asfgit commented Nov 20, 2017

FAILURE

--none--

@mariapython
Copy link
Contributor Author

Test timing out, will attempt a 3rd run. Perhaps related to https://issues.apache.org/jira/browse/BEAM-3040

@mariapython
Copy link
Contributor Author

retest this please

Copy link
Contributor

@charlesccychen charlesccychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

# Currently this only works for overrides where the input and output types do
# not change.
# For internal SDK use only. This should not be updated by Beam pipeline
# authors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the change here?

@@ -143,13 +143,18 @@ def _refresh_watermarks(self, applied_ptransform):
for consumer in consumers:
self._refresh_watermarks(consumer)

def extract_fired_timers(self):
def extract_all_timers(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the previous method name? This name is misleading because it isn't we still only extract fired timers.

def time(self):
"""Returns the number of milliseconds since epoch."""
return int(time.time() * 1000)
def advance_time(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add duration argument.

def __init__(self, now_in_ms):
self._now_in_ms = now_in_ms
class TestClock(object):
"""Clock used for Testing"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"testing".

Please document the unit used in current_time and advance_by.

from __future__ import absolute_import

import time


class Clock(object):
"""For internal use only; no backwards-compatibility guarantees."""
def current_time(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this time() to be consistent with the way time.time() is called?

It will check for successful completion by checking the watermarks of all
transforms. If they all reached the maximum watermark it means that
pipeline successfully reached to completion.
If there is anything in the queue of tasks to do, do not shut down.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we checking whether there are pending processing time timers?

_PTRANSFORM_OVERRIDES = []

def __init__(self):
self._cache = None
self._use_test_clock = False # use RealClock() in production
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you document that this is used for testing when TestStream is used in the pipeline?

Copy link
Contributor Author

@mariapython mariapython left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL. Thanks!

# Currently this only works for overrides where the input and output types do
# not change.
# For internal SDK use only. This should not be updated by Beam pipeline
# authors.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Why the change here?

At the time of the 2.0 release we tried to normalize the message to "For internal use only; no backwards-compatibility guarantees." (https://issues.apache.org/jira/browse/BEAM-2241)

@@ -143,13 +143,18 @@ def _refresh_watermarks(self, applied_ptransform):
for consumer in consumers:
self._refresh_watermarks(consumer)

def extract_fired_timers(self):
def extract_all_timers(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Can we use the previous method name? This name is misleading because it isn't we still only extract fired timers.

I meant to do the following changes:
WatermarkManager's extract_fired_timers --> extract_all_timers (meaning for all transforms)
_TransformWatermarks' extract_fired_timers --> extract_transform_timers (meaning per transform).
I meant to accomplish two things: 1) differentiate between "for all" and "per transfer" and 2) we are no longer just extracting the fired timers. I understand 2) is not crucial, as the other return is basically a flag.
Another (verbose) option is: extract_fired_timers and extract_fired_timers_per_transform.

It will check for successful completion by checking the watermarks of all
transforms. If they all reached the maximum watermark it means that
pipeline successfully reached to completion.
If there is anything in the queue of tasks to do, do not shut down.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Are we checking whether there are pending processing time timers?

Yes, and I consider that a "task to do," which is looked at in _is_executing() and that is where I added a comment about timers being set.

def __init__(self, now_in_ms):
self._now_in_ms = now_in_ms
class TestClock(object):
"""Clock used for Testing"""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
"testing".

Please document the unit used in current_time and advance_by.

Done.

def time(self):
"""Returns the number of milliseconds since epoch."""
return int(time.time() * 1000)
def advance_time(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Please add duration argument.

Done.

from __future__ import absolute_import

import time


class Clock(object):
"""For internal use only; no backwards-compatibility guarantees."""
def current_time(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Can we rename this time() to be consistent with the way time.time() is called?

Done.

_PTRANSFORM_OVERRIDES = []

def __init__(self):
self._cache = None
self._use_test_clock = False # use RealClock() in production
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Can you document that this is used for testing when TestStream is used in the pipeline?

I mention that a few lines below, when I turn on the flag.

@mariapython
Copy link
Contributor Author

retest this please

@aaltay
Copy link
Member

aaltay commented Dec 6, 2017

What is the next action on this PR? @charlesccychen could you please make another pass?

Copy link
Contributor

@charlesccychen charlesccychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

"""For internal use only; no backwards-compatibility guarantees.
class RealClock(object):
def time(self):
return int(time.time() * 1000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please change this back to seconds?

@@ -1138,20 +1138,36 @@ def clear_state(self, window, tag):
if not self.state[window]:
self.state.pop(window, None)

def get_timers(self, clear=False, watermark=MAX_TIMESTAMP):
def get_timers(self, clear=False, watermark=MAX_TIMESTAMP, current_time=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's change this argument name to "processing_time".

for window, timers in list(self.timers.items()):
for (name, time_domain), timestamp in list(timers.items()):
if timestamp <= watermark:
if time_domain == 'REAL_TIME':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the TimeDomain enum (i.e. TimeDomain.WATERMARK, etc) here.

@@ -1138,20 +1138,36 @@ def clear_state(self, window, tag):
if not self.state[window]:
self.state.pop(window, None)

def get_timers(self, clear=False, watermark=MAX_TIMESTAMP):
def get_timers(self, clear=False, watermark=MAX_TIMESTAMP, current_time=None):
"""Gets expired timers and reports if there
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to fit in one line:

"""Returns expired timers and existence of REAL_TIME timers."""

@mariapython
Copy link
Contributor Author

retest this please

Copy link
Contributor Author

@mariapython mariapython left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! PTAL

"""For internal use only; no backwards-compatibility guarantees.
class RealClock(object):
def time(self):
return int(time.time() * 1000)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Can you please change this back to seconds?

Done.

@@ -1138,20 +1138,36 @@ def clear_state(self, window, tag):
if not self.state[window]:
self.state.pop(window, None)

def get_timers(self, clear=False, watermark=MAX_TIMESTAMP):
def get_timers(self, clear=False, watermark=MAX_TIMESTAMP, current_time=None):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Let's change this argument name to "processing_time".

Done.

for window, timers in list(self.timers.items()):
for (name, time_domain), timestamp in list(timers.items()):
if timestamp <= watermark:
if time_domain == 'REAL_TIME':
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Please use the TimeDomain enum (i.e. TimeDomain.WATERMARK, etc) here.

Done.

@@ -1138,20 +1138,36 @@ def clear_state(self, window, tag):
if not self.state[window]:
self.state.pop(window, None)

def get_timers(self, clear=False, watermark=MAX_TIMESTAMP):
def get_timers(self, clear=False, watermark=MAX_TIMESTAMP, current_time=None):
"""Gets expired timers and reports if there
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Let's try to fit in one line:

"""Returns expired timers and existence of REAL_TIME timers."""

Yes, it can be done for the first sentence, but I think the second is more important (explaining the addition of 'time_marker'). When I added the docstring initially, I considered writing that second sentence by the 'if' but I found it more clunky and, since the expiration is mentioned in the first sentence, referring to it in the second flowed better.

Copy link
Contributor

@charlesccychen charlesccychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.


mariapython wrote:
Thanks! PTAL

Done.

raise NotImplementedError()

def advance_time(self, advance_by):
"""Advances the clock by a number of miliseconds."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix comment to indicate seconds.

from __future__ import absolute_import

import time


class Clock(object):
"""For internal use only; no backwards-compatibility guarantees."""

def time(self):
"""Returns the number of milliseconds since epoch."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix comment to indicate seconds.


watermark_manager = self._executor.evaluation_context._watermark_manager
_, any_unfired_timers = watermark_manager.extract_all_timers()
if any_unfired_timers:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you more clearly indicate (by naming or comment) that this is for unfired real time timers?

Copy link
Contributor Author

@mariapython mariapython left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. PTAL.

raise NotImplementedError()

def advance_time(self, advance_by):
"""Advances the clock by a number of miliseconds."""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Please fix comment to indicate seconds.

Done.

from __future__ import absolute_import

import time


class Clock(object):
"""For internal use only; no backwards-compatibility guarantees."""

def time(self):
"""Returns the number of milliseconds since epoch."""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Please fix comment to indicate seconds.

Done.


watermark_manager = self._executor.evaluation_context._watermark_manager
_, any_unfired_timers = watermark_manager.extract_all_timers()
if any_unfired_timers:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

charlesccychen wrote:
Can you more clearly indicate (by naming or comment) that this is for unfired real time timers?

Done.

Copy link
Contributor

@charlesccychen charlesccychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM.


mariapython wrote:
Thanks. PTAL.

Done.

Copy link
Contributor

@charlesccychen charlesccychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mariapython wrote:
PTAL. Thanks!

Done.

@chamikaramj
Copy link
Contributor

Thanks. Merging.

@chamikaramj chamikaramj merged commit 734b2be into apache:master Dec 15, 2017
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants