Subsections of Plugins

Flink S3 F3 Multiple

Normally, Flink only can access only one S3 endpoint during the runtime. But we need to process some files from multiple minio simultaneously.

So I modified the original flink-s3-fs-hadoop and enable flink to do so.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
env.setParallelism(1);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///./checkpoints");

final FileSource<String> source =
    FileSource.forRecordStreamFormat(
            new TextLineInputFormat(),
            new Path(
                "s3u://admin:ZrwpsezF1Lt85dxl@10.11.33.132:9000/user-data/home/conti/2024-02-08--10"))
        .build();

final FileSource<String> source2 =
    FileSource.forRecordStreamFormat(
            new TextLineInputFormat(),
            new Path(
                "s3u://minioadmin:minioadmin@10.101.16.72:9000/user-data/home/conti"))
        .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source")
    .union(env.fromSource(source2, WatermarkStrategy.noWatermarks(), "file-source2"))
    .print("union-result");
    
env.execute();

using default flink-s3-fs-hadoop, the configuration value will set into Hadoop configuration map. Only one value functioning at the same, there is no way for user to operate different in single one job context.

Configuration pluginConfiguration = new Configuration();
pluginConfiguration.setString("s3a.access-key", "admin");
pluginConfiguration.setString("s3a.secret-key", "ZrwpsezF1Lt85dxl");
pluginConfiguration.setString("s3a.connection.maximum", "1000");
pluginConfiguration.setString("s3a.endpoint", "http://10.11.33.132:9000");
pluginConfiguration.setBoolean("s3a.path.style.access", Boolean.TRUE);
FileSystem.initialize(
    pluginConfiguration, PluginUtils.createPluginManagerFromRootFolder(pluginConfiguration));

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
env.setParallelism(1);
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///./checkpoints");

final FileSource<String> source =
    FileSource.forRecordStreamFormat(
            new TextLineInputFormat(), new Path("s3a://user-data/home/conti/2024-02-08--10"))
        .build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source").print();

env.execute();

Usage

There

Install From

For now, you can directly download flink-s3-fs-hadoop-$VERSION.jar and load in your project.
$VERSION is the flink version you are using.

  implementation(files("flink-s3-fs-hadoop-$flinkVersion.jar"))
  <dependency>
      <groupId>org.apache</groupId>
      <artifactId>flink</artifactId>
      <version>$flinkVersion</version>
      <systemPath>${project.basedir}flink-s3-fs-hadoop-$flinkVersion.jar</systemPath>
  </dependency>
the jar we provided was based on original flink-s3-fs-hadoop plugin, so you should use original protocal prefix s3a://

Or maybe you can wait from the PR, after I mereged into flink-master, you don't need to do anything, just update your flink version.
and directly use s3u://

Repo

you can get code from github, gitlab