-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-3153] Add Python processing-time timers and clock injection #4148
Conversation
FAILURE --none-- |
SUCCESS --none-- |
FAILURE --none-- |
SUCCESS --none-- |
SUCCESS --none-- |
1 similar comment
SUCCESS --none-- |
FAILURE --none-- |
retest this please |
SUCCESS --none-- |
1 similar comment
SUCCESS --none-- |
FAILURE --none-- |
Test timing out, will attempt a 3rd run. Perhaps related to https://issues.apache.org/jira/browse/BEAM-3040 |
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
# Currently this only works for overrides where the input and output types do | ||
# not change. | ||
# For internal SDK use only. This should not be updated by Beam pipeline | ||
# authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the change here?
@@ -143,13 +143,18 @@ def _refresh_watermarks(self, applied_ptransform): | |||
for consumer in consumers: | |||
self._refresh_watermarks(consumer) | |||
|
|||
def extract_fired_timers(self): | |||
def extract_all_timers(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the previous method name? This name is misleading because it isn't we still only extract fired timers.
def time(self): | ||
"""Returns the number of milliseconds since epoch.""" | ||
return int(time.time() * 1000) | ||
def advance_time(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add duration argument.
def __init__(self, now_in_ms): | ||
self._now_in_ms = now_in_ms | ||
class TestClock(object): | ||
"""Clock used for Testing""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"testing".
Please document the unit used in current_time
and advance_by
.
from __future__ import absolute_import | ||
|
||
import time | ||
|
||
|
||
class Clock(object): | ||
"""For internal use only; no backwards-compatibility guarantees.""" | ||
def current_time(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename this time()
to be consistent with the way time.time()
is called?
It will check for successful completion by checking the watermarks of all | ||
transforms. If they all reached the maximum watermark it means that | ||
pipeline successfully reached to completion. | ||
If there is anything in the queue of tasks to do, do not shut down. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we checking whether there are pending processing time timers?
_PTRANSFORM_OVERRIDES = [] | ||
|
||
def __init__(self): | ||
self._cache = None | ||
self._use_test_clock = False # use RealClock() in production |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you document that this is used for testing when TestStream is used in the pipeline?
2ab61a1
to
0f103d9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL. Thanks!
# Currently this only works for overrides where the input and output types do | ||
# not change. | ||
# For internal SDK use only. This should not be updated by Beam pipeline | ||
# authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
charlesccychen wrote:
Why the change here?
At the time of the 2.0 release we tried to normalize the message to "For internal use only; no backwards-compatibility guarantees." (https://issues.apache.org/jira/browse/BEAM-2241)
@@ -143,13 +143,18 @@ def _refresh_watermarks(self, applied_ptransform): | |||
for consumer in consumers: | |||
self._refresh_watermarks(consumer) | |||
|
|||
def extract_fired_timers(self): | |||
def extract_all_timers(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
charlesccychen wrote:
Can we use the previous method name? This name is misleading because it isn't we still only extract fired timers.
I meant to do the following changes:
WatermarkManager's extract_fired_timers --> extract_all_timers (meaning for all transforms)
_TransformWatermarks' extract_fired_timers --> extract_transform_timers (meaning per transform).
I meant to accomplish two things: 1) differentiate between "for all" and "per transfer" and 2) we are no longer just extracting the fired timers. I understand 2) is not crucial, as the other return is basically a flag.
Another (verbose) option is: extract_fired_timers and extract_fired_timers_per_transform.
It will check for successful completion by checking the watermarks of all | ||
transforms. If they all reached the maximum watermark it means that | ||
pipeline successfully reached to completion. | ||
If there is anything in the queue of tasks to do, do not shut down. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
charlesccychen wrote:
Are we checking whether there are pending processing time timers?
Yes, and I consider that a "task to do," which is looked at in _is_executing() and that is where I added a comment about timers being set.
def __init__(self, now_in_ms): | ||
self._now_in_ms = now_in_ms | ||
class TestClock(object): | ||
"""Clock used for Testing""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
charlesccychen wrote:
"testing".Please document the unit used in
current_time
andadvance_by
.
Done.
def time(self): | ||
"""Returns the number of milliseconds since epoch.""" | ||
return int(time.time() * 1000) | ||
def advance_time(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
charlesccychen wrote:
Please add duration argument.
Done.
from __future__ import absolute_import | ||
|
||
import time | ||
|
||
|
||
class Clock(object): | ||
"""For internal use only; no backwards-compatibility guarantees.""" | ||
def current_time(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
charlesccychen wrote:
Can we rename thistime()
to be consistent with the waytime.time()
is called?
Done.
_PTRANSFORM_OVERRIDES = [] | ||
|
||
def __init__(self): | ||
self._cache = None | ||
self._use_test_clock = False # use RealClock() in production |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
charlesccychen wrote:
Can you document that this is used for testing when TestStream is used in the pipeline?
I mention that a few lines below, when I turn on the flag.
retest this please |
What is the next action on this PR? @charlesccychen could you please make another pass? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
"""For internal use only; no backwards-compatibility guarantees. | ||
class RealClock(object): | ||
def time(self): | ||
return int(time.time() * 1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please change this back to seconds?
@@ -1138,20 +1138,36 @@ def clear_state(self, window, tag): | |||
if not self.state[window]: | |||
self.state.pop(window, None) | |||
|
|||
def get_timers(self, clear=False, watermark=MAX_TIMESTAMP): | |||
def get_timers(self, clear=False, watermark=MAX_TIMESTAMP, current_time=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's change this argument name to "processing_time".
for window, timers in list(self.timers.items()): | ||
for (name, time_domain), timestamp in list(timers.items()): | ||
if timestamp <= watermark: | ||
if time_domain == 'REAL_TIME': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the TimeDomain
enum (i.e. TimeDomain.WATERMARK
, etc) here.
@@ -1138,20 +1138,36 @@ def clear_state(self, window, tag): | |||
if not self.state[window]: | |||
self.state.pop(window, None) | |||
|
|||
def get_timers(self, clear=False, watermark=MAX_TIMESTAMP): | |||
def get_timers(self, clear=False, watermark=MAX_TIMESTAMP, current_time=None): | |||
"""Gets expired timers and reports if there |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's try to fit in one line:
"""Returns expired timers and existence of REAL_TIME timers."""
0f103d9
to
c1b2152
Compare
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! PTAL
"""For internal use only; no backwards-compatibility guarantees. | ||
class RealClock(object): | ||
def time(self): | ||
return int(time.time() * 1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
charlesccychen wrote:
Can you please change this back to seconds?
Done.
@@ -1138,20 +1138,36 @@ def clear_state(self, window, tag): | |||
if not self.state[window]: | |||
self.state.pop(window, None) | |||
|
|||
def get_timers(self, clear=False, watermark=MAX_TIMESTAMP): | |||
def get_timers(self, clear=False, watermark=MAX_TIMESTAMP, current_time=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
charlesccychen wrote:
Let's change this argument name to "processing_time".
Done.
for window, timers in list(self.timers.items()): | ||
for (name, time_domain), timestamp in list(timers.items()): | ||
if timestamp <= watermark: | ||
if time_domain == 'REAL_TIME': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
charlesccychen wrote:
Please use theTimeDomain
enum (i.e.TimeDomain.WATERMARK
, etc) here.
Done.
@@ -1138,20 +1138,36 @@ def clear_state(self, window, tag): | |||
if not self.state[window]: | |||
self.state.pop(window, None) | |||
|
|||
def get_timers(self, clear=False, watermark=MAX_TIMESTAMP): | |||
def get_timers(self, clear=False, watermark=MAX_TIMESTAMP, current_time=None): | |||
"""Gets expired timers and reports if there |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
charlesccychen wrote:
Let's try to fit in one line:"""Returns expired timers and existence of REAL_TIME timers."""
Yes, it can be done for the first sentence, but I think the second is more important (explaining the addition of 'time_marker'). When I added the docstring initially, I considered writing that second sentence by the 'if' but I found it more clunky and, since the expiration is mentioned in the first sentence, referring to it in the second flowed better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
mariapython wrote:
Thanks! PTAL
Done.
raise NotImplementedError() | ||
|
||
def advance_time(self, advance_by): | ||
"""Advances the clock by a number of miliseconds.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix comment to indicate seconds.
from __future__ import absolute_import | ||
|
||
import time | ||
|
||
|
||
class Clock(object): | ||
"""For internal use only; no backwards-compatibility guarantees.""" | ||
|
||
def time(self): | ||
"""Returns the number of milliseconds since epoch.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix comment to indicate seconds.
|
||
watermark_manager = self._executor.evaluation_context._watermark_manager | ||
_, any_unfired_timers = watermark_manager.extract_all_timers() | ||
if any_unfired_timers: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you more clearly indicate (by naming or comment) that this is for unfired real time timers?
d675132
to
79e167f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. PTAL.
raise NotImplementedError() | ||
|
||
def advance_time(self, advance_by): | ||
"""Advances the clock by a number of miliseconds.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
charlesccychen wrote:
Please fix comment to indicate seconds.
Done.
from __future__ import absolute_import | ||
|
||
import time | ||
|
||
|
||
class Clock(object): | ||
"""For internal use only; no backwards-compatibility guarantees.""" | ||
|
||
def time(self): | ||
"""Returns the number of milliseconds since epoch.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
charlesccychen wrote:
Please fix comment to indicate seconds.
Done.
|
||
watermark_manager = self._executor.evaluation_context._watermark_manager | ||
_, any_unfired_timers = watermark_manager.extract_all_timers() | ||
if any_unfired_timers: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
charlesccychen wrote:
Can you more clearly indicate (by naming or comment) that this is for unfired real time timers?
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, LGTM.
mariapython wrote:
Thanks. PTAL.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mariapython wrote:
PTAL. Thanks!
Done.
Thanks. Merging. |
DirectRunner → EvaluationContext → WatermarkManager
_TransformWatermarks’ extract_fired_timers → extract_transform_timers
WatermarkManager’s extract_fired_timers → extract_all_timers
EvaluationContext’s extract_fired_timers → extract_all_timers