Subsections of Plugins
Flink S3 F3 Multiple
Normally, Flink only can access only one
S3 endpoint during the runtime. But we need to process some files from multiple minio simultaneously.
So I modified the original flink-s3-fs-hadoop
and enable flink to do so.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
env.setParallelism(1);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///./checkpoints");
final FileSource<String> source =
FileSource.forRecordStreamFormat(
new TextLineInputFormat(),
new Path(
"s3u://admin:ZrwpsezF1Lt85dxl@10.11.33.132:9000/user-data/home/conti/2024-02-08--10"))
.build();
final FileSource<String> source2 =
FileSource.forRecordStreamFormat(
new TextLineInputFormat(),
new Path(
"s3u://minioadmin:minioadmin@10.101.16.72:9000/user-data/home/conti"))
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source")
.union(env.fromSource(source2, WatermarkStrategy.noWatermarks(), "file-source2"))
.print("union-result");
env.execute();
Usage
There
Install From
For now, you can directly download flink-s3-fs-hadoop-$VERSION.jar and load in your project.$VERSION is the flink version you are using.
implementation(files("flink-s3-fs-hadoop-$flinkVersion.jar"))
<dependency>
<groupId>org.apache</groupId>
<artifactId>flink</artifactId>
<version>$flinkVersion</version>
<systemPath>${project.basedir}flink-s3-fs-hadoop-$flinkVersion.jar</systemPath>
</dependency>
Or maybe you can wait from the PR, after I mereged into flink-master, you don't need to do anything, just update your flink version.
and directly use s3u://