-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-14303] Add a way to exclude output timestamp watermark holds #17359
Conversation
R: @zhengbuqian |
run flink validatesrunner |
Run Java Flink PortableValidatesRunner Streaming |
Run Java Spark v2 PortableValidatesRunner Streaming |
Run Java Spark v3 PortableValidatesRunner Streaming |
Run Spark ValidatesRunner Java 11 |
Run Java_PVR_Flink_Docker PreCommit |
Run Spotless PreCommit |
run flink validatesrunner |
Run Java Flink PortableValidatesRunner Streaming |
a53e488
to
28f3ead
Compare
@@ -1251,7 +1262,7 @@ private void setAndVerifyOutputTimestamp() { | |||
: PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()), | |||
BoundedWindow.TIMESTAMP_MAX_VALUE)); | |||
} | |||
} else if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { | |||
} else if (!noOutputTimestamp && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems if the user called withNoOutputTimestamp()
on an event time timer, the else
branch will be executed. Is this expected? I don't see any code preventing the user from doing so. I guess the answer is yes because if withNoOutputTimestamp()
is called then the outputTimestamp value doesn't matter as we don't allow output from this timer. If so then the comment should also be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a bug - good catch.
@@ -99,6 +99,12 @@ public interface Timer { | |||
*/ | |||
Timer withOutputTimestamp(Instant outputTime); | |||
|
|||
/** | |||
* Asserts that there is no output timestamp. The output watermark will not be held up, and it is | |||
* illegal to output messages from this timer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is illegal to output messages from this timer.
How is this enforced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be precise, illegal to output messages using the default timestamp (outputWithTimestamp is still allowed).
Run Java PreCommit |
2 similar comments
Run Java PreCommit |
Run Java PreCommit |
94dc207
to
7908f84
Compare
Run Java PreCommit |
@zhengbuqian friendly ping |
Run Java PreCommit |
4 similar comments
Run Java PreCommit |
Run Java PreCommit |
Run Java PreCommit |
Run Java PreCommit |
@zhengbuqian friendly ping |
Run Java PreCommit |
friendly ping |
Friendly ping - some customers are starting to complain about draining this sink |
@aaltay do you know who would be a good reviewer for this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. LGTM.
} | ||
|
||
@Override | ||
public Timer withNoOutputTimestamp() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a test for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Run Kotlin_Examples PreCommit |
Run Java_PVR_Flink_Docker PreCommit |
Run Flink ValidatesRunner |
Run Java Flink PortableValidatesRunner Streaming |
No description provided.