Flink S3 F3 Multiple
Normally, Flink only can access one
S3 endpoint during the runtime. But we need to process some files from multiple minio simultaneously.
So I modified the original flink-s3-fs-hadoop
and enable flink to do so.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
env.setParallelism(1);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///./checkpoints");
final FileSource<String> source =
FileSource.forRecordStreamFormat(
new TextLineInputFormat(),
new Path(
"s3u://admin:ZrwpsezF1Lt85dxl@10.11.33.132:9000/user-data/home/conti/2024-02-08--10"))
.build();
final FileSource<String> source2 =
FileSource.forRecordStreamFormat(
new TextLineInputFormat(),
new Path(
"s3u://minioadmin:minioadmin@10.101.16.72:9000/user-data/home/conti"))
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source")
.union(env.fromSource(source2, WatermarkStrategy.noWatermarks(), "file-source2"))
.print("union-result");
env.execute();
Usage
There
Install From
For now, you can directly download flink-s3-fs-hadoop-$VERSION.jar and load in your project.$VERSION is the flink version you are using.
implementation(files("flink-s3-fs-hadoop-$flinkVersion.jar"))
<dependency>
<groupId>org.apache</groupId>
<artifactId>flink</artifactId>
<version>$flinkVersion</version>
<systemPath>${project.basedir}flink-s3-fs-hadoop-$flinkVersion.jar</systemPath>
</dependency>
Or maybe you can wait from the PR, after I mereged into flink-master, you don't need to do anything, just update your flink version.
and directly use s3u://