Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14303] Add a way to exclude output timestamp watermark holds #17359

Merged
merged 4 commits into from
May 5, 2022

Conversation

reuvenlax
Copy link
Contributor

No description provided.

@reuvenlax
Copy link
Contributor Author

R: @zhengbuqian

@reuvenlax
Copy link
Contributor Author

run flink validatesrunner

@reuvenlax
Copy link
Contributor Author

Run Java Flink PortableValidatesRunner Streaming

@reuvenlax
Copy link
Contributor Author

Run Java Spark v2 PortableValidatesRunner Streaming

@reuvenlax
Copy link
Contributor Author

Run Java Spark v3 PortableValidatesRunner Streaming

@reuvenlax
Copy link
Contributor Author

Run Spark ValidatesRunner Java 11

@reuvenlax
Copy link
Contributor Author

Run Java_PVR_Flink_Docker PreCommit

@reuvenlax
Copy link
Contributor Author

Run Spotless PreCommit

@reuvenlax
Copy link
Contributor Author

run flink validatesrunner

@reuvenlax
Copy link
Contributor Author

Run Java Flink PortableValidatesRunner Streaming

@@ -1251,7 +1262,7 @@ private void setAndVerifyOutputTimestamp() {
: PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
BoundedWindow.TIMESTAMP_MAX_VALUE));
}
} else if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
} else if (!noOutputTimestamp && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems if the user called withNoOutputTimestamp() on an event time timer, the else branch will be executed. Is this expected? I don't see any code preventing the user from doing so. I guess the answer is yes because if withNoOutputTimestamp() is called then the outputTimestamp value doesn't matter as we don't allow output from this timer. If so then the comment should also be updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a bug - good catch.

@@ -99,6 +99,12 @@ public interface Timer {
*/
Timer withOutputTimestamp(Instant outputTime);

/**
* Asserts that there is no output timestamp. The output watermark will not be held up, and it is
* illegal to output messages from this timer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is illegal to output messages from this timer.

How is this enforced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be precise, illegal to output messages using the default timestamp (outputWithTimestamp is still allowed).

@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

2 similar comments
@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

@reuvenlax
Copy link
Contributor Author

@zhengbuqian friendly ping

@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

4 similar comments
@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

@reuvenlax
Copy link
Contributor Author

@zhengbuqian friendly ping

@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

@reuvenlax
Copy link
Contributor Author

friendly ping

@reuvenlax
Copy link
Contributor Author

Friendly ping - some customers are starting to complain about draining this sink

@reuvenlax
Copy link
Contributor Author

@aaltay do you know who would be a good reviewer for this?

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. LGTM.

}

@Override
public Timer withNoOutputTimestamp() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@reuvenlax
Copy link
Contributor Author

Run Kotlin_Examples PreCommit

@reuvenlax
Copy link
Contributor Author

Run Java_PVR_Flink_Docker PreCommit

@reuvenlax
Copy link
Contributor Author

Run Flink ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Java Flink PortableValidatesRunner Streaming

@reuvenlax reuvenlax merged commit fa01615 into apache:master May 5, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants