Skip to content

Commit

Permalink
Add review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mariapython committed Nov 30, 2017
1 parent b265af2 commit 0f103d9
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
20 changes: 11 additions & 9 deletions sdks/python/apache_beam/runners/direct/clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,27 @@


class Clock(object):
def current_time(self):
def time(self):
"""Returns the number of milliseconds since epoch."""
raise NotImplementedError()

def advance_time(self):
def advance_time(self, advance_by):
"""Advances the clock by a number of miliseconds."""
raise NotImplementedError()


class RealClock(object):
def current_time(self):
return time.time()
def time(self):
return int(time.time() * 1000)


class TestClock(object):
"""Clock used for Testing"""
def __init__(self, current=0):
self._current = current
def __init__(self, current_time=0):
self._current_time = current_time

def current_time(self):
return self._current
def time(self):
return self._current_time

def advance_time(self, advance_by):
self._current += advance_by
self._current_time += advance_by
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/runners/direct/watermark_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ def refresh(self):

@property
def synchronized_processing_output_time(self):
return self._clock.current_time()
return self._clock.time()

def extract_transform_timers(self):
"""Extracts fired timers and reports of any timers set per transform."""
Expand All @@ -259,7 +259,7 @@ def extract_transform_timers(self):
for encoded_key, state in self._keyed_states.iteritems():
timers, had_realtime_timer = state.get_timers(
watermark=self._input_watermark,
current_time=self._clock.current_time())
current_time=self._clock.time())
if had_realtime_timer:
has_realtime_timer = True
for expired in timers:
Expand Down

0 comments on commit 0f103d9

Please sign in to comment.