Skip to content

Flink: Add support for Flink 2.0 #12527

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 23, 2025
Merged

Flink: Add support for Flink 2.0 #12527

merged 4 commits into from
Apr 23, 2025

Conversation

mxm
Copy link
Contributor

@mxm mxm commented Mar 14, 2025

This adds support for running Iceberg with Flink 2.0. No major changes, but there are several interfaces removed or moved to legacy packages. This posed the question whether to remove some older implementations of these interfaces, e.g. FlinkSource and FlinkSink, and replace them with newer ones, e.g. IcebergSource and IcebergSink.

After a bit of investigation and further discussions on the mailing list, we decided to leverage the legacy packages and keep support for FlinkSource and FlinkSink. There is an ongoing discussion when to remove them.

To ease the review, it is broken down into several commits:

  1. Flink 2.0 build modules and scripts
  2. Copy over code from 1.20
  3. Adjust code for Flink 2.0
  4. Remove Flink 1.18 support (in accordance with the Flink release policy)

@mxm
Copy link
Contributor Author

mxm commented Mar 17, 2025

CC @pvary @stevenzwu

@pvary
Copy link
Contributor

pvary commented Mar 17, 2025

@mxm: We did it a bit differently in the past. See: #10881
We renamed the "old" master branch to the new one, and then copied back the code in another commit to the old brach.

This would mean here to move Flink 1.20 to Flink 2.0, and then in another PR copy back the original Flink 1.20 code.

The result is that the history of the main Flink branch is easier to read.

@mxm
Copy link
Contributor Author

mxm commented Mar 17, 2025

@pvary I've updated the PR to mimic what we did in #10881.

Comment on lines 144 to 150
@Override
protected void setup(
StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Void>> output) {
super.setup(containingTask, config, output);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this override?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is that the factory calls the setup method and the visibility has changed which requires an override here.

Copy link
Contributor

@stevenzwu stevenzwu Mar 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are saying AbstractStreamOperator changed the scope of this method from public to protected. can you share which operator factory still calls this method?

If I understand this correctly, it doesn't seem to be a right change from Flink side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was StreamingReadOperator. this is IcebergFilesCommitter. don't quite see the connection here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, that was an example for a different operator. Generally, calling setup is required for all operators to initialize the task, its config, and its output. Flink moved away from an explicit call to calling this in the constructor of the base class: https://github.com/apache/flink/blob/b0607a15e62b664d15efbda0b0e991f72e45a467/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L166 We are not calling the super constructor, so we need to manually call the setup method.

Looking at the code base, Flink 2.0 seems to favor creating a StreamOperatorFactory. I've updated the code to use a factory and call the super constructor instead. The setup method overrides are now removed.

@pvary
Copy link
Contributor

pvary commented Mar 19, 2025

I think it would be good to decide on the general question:

  • When do we want to start include Flink 2.0 in the Iceberg code?

While it is good to test out things, I'm not entirely sure we want to run forward supporting a preview version. Others from the community could have a different opinion, but it might be good to ask the users on the dev list


/**
* Override Flink's internal FlinkScalaKryoInstantiator to avoid loading the Scala extensions for
* the KryoSerializer. This is a workaround until Kryo-related issues with the Scala extensions are
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any Flink jira we can link here for the follow-up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is this ticket: https://issues.apache.org/jira/browse/FLINK-37546

I've added it to the comment.

Comment on lines 144 to 150
@Override
protected void setup(
StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Void>> output) {
super.setup(containingTask, config, output);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was StreamingReadOperator. this is IcebergFilesCommitter. don't quite see the connection here.

Copy link
Contributor

@stevenzwu stevenzwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mxm can you squash the last 2 fixup commits to the 1.20 change commit so that we can apply the rebase and merge.

@mxm
Copy link
Contributor Author

mxm commented Apr 23, 2025

@stevenzwu Thanks for the review! I squashed the commits.

@mxm
Copy link
Contributor Author

mxm commented Apr 23, 2025

(I renamed the Flink 2.0 commit to "Flink: Add support for Flink 2.0")

@stevenzwu stevenzwu merged commit d7f8e54 into apache:main Apr 23, 2025
20 checks passed
@stevenzwu
Copy link
Contributor

thanks @mxm for the contribution and @pvary @ajantha-bhat for the reviews

@manuzhang
Copy link
Collaborator

Is there a fourth commit that removed 1.18 support?

@mxm
Copy link
Contributor Author

mxm commented Apr 24, 2025

@manuzhang Yes, there is.

@mxm
Copy link
Contributor Author

mxm commented Apr 24, 2025

Thanks for reviewing / merging @stevenzwu.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants