Subsections of 🐿️Apache Flink

Subsections of On K8s Operator

Job Privilieges

Template

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: flink
  name: flink-deployment-manager
rules:
- apiGroups: 
  - flink.apache.org
  resources: 
  - flinkdeployments
  verbs: 
  - 'get'
  - 'list'
  - 'create'
  - 'update'
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: flink-deployment-manager-binding
  namespace: flink
subjects:
- kind: User
  name: "277293711358271379"  
  apiGroup: rbac.authorization.k8s.io
roleRef:
  kind: Role
  name: flink-deployment-manager
  apiGroup: rbac.authorization.k8s.io

OSS Template

Template

apiVersion: "flink.apache.org/v1beta1"
kind: "FlinkDeployment"
metadata:
  name: "financial-job"
spec:
  image: "cr.registry.res.cloud.wuxi-yqgcy.cn/mirror/financial-topic:1.5-oss"
  flinkVersion: "v1_17"
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "8"
    fs.oss.endpoint: http://ay-test.oss-cn-jswx-xuelang-d01-a.ops.cloud.wuxi-yqgcy.cn/
    fs.oss.accessKeyId: 4gqOVOfQqCsCUwaC
    fs.oss.accessKeySecret: xxx
  ingress:
    template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
    className: "nginx"
    annotations:
      cert-manager.io/cluster-issuer: "self-signed-ca-issuer"
      nginx.ingress.kubernetes.io/rewrite-target: "/$2"
  serviceAccount: "flink"
  podTemplate:
    apiVersion: "v1"
    kind: "Pod"
    metadata:
      name: "financial-job"
    spec:
      containers:
        - name: "flink-main-container"
          env:
            - name: ENABLE_BUILT_IN_PLUGINS
              value: flink-oss-fs-hadoop-1.17.2.jar
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "local:///app/application.jar"
    parallelism: 1
    upgradeMode: "stateless"

S3 Template

Template

apiVersion: "flink.apache.org/v1beta1"
kind: "FlinkDeployment"
metadata:
  name: "financial-job"
spec:
  image: "cr.registry.res.cloud.wuxi-yqgcy.cn/mirror/financial-topic:1.5"
  flinkVersion: "v1_17"
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "8"
    s3a.endpoint: http://172.27.253.89:9000
    s3a.access-key: minioadmin
    s3a.secret-key: minioadmin
  ingress:
    template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
    className: "nginx"
    annotations:
      cert-manager.io/cluster-issuer: "self-signed-ca-issuer"
      nginx.ingress.kubernetes.io/rewrite-target: "/$2"
  serviceAccount: "flink"
  podTemplate:
    apiVersion: "v1"
    kind: "Pod"
    metadata:
      name: "financial-job"
    spec:
      containers:
        - name: "flink-main-container"
          env:
            - name: ENABLE_BUILT_IN_PLUGINS
              value: flink-s3-fs-hadoop-1.17.2.jar
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: "local:///app/application.jar"
    parallelism: 1
    upgradeMode: "stateless"

Subsections of CDC

Mysql CDC

More Ofthen, we can get a simplest example form CDC Connectors. But people still need to google some inescapable problems before using it.

preliminary

Flink: 1.17 JDK: 11

Flink CDC VersionFlink Version
1.0.01.11.*
1.1.01.11.*
1.2.01.12.*
1.3.01.12.*
1.4.01.13.*
2.0.*1.13.*
2.1.*1.13.*
2.2.*1.13.*, 1.14.*
2.3.*1.13.*, 1.14.*, 1.15.*
2.4.*1.13.*, 1.14.*, 1.15.*
3.0.*1.14.*, 1.15.*, 1.16.*

usage for DataStream API

Only import com.ververica.flink-connector-mysql-cdc is not enough.

implementation("com.ververica:flink-connector-mysql-cdc:2.4.0")

//you also need these following dependencies
implementation("org.apache.flink:flink-shaded-guava:30.1.1-jre-16.1")
implementation("org.apache.flink:flink-connector-base:1.17")
implementation("org.apache.flink:flink-table-planner_2.12:1.17")
<dependency>
  <groupId>com.ververica</groupId>
  <!-- add the dependency matching your database -->
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <!-- The dependency is available only for stable releases, SNAPSHOT dependencies need to be built based on master or release- branches by yourself. -->
  <version>2.4.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-guava -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-shaded-guava</artifactId>
  <version>30.1.1-jre-16.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-base</artifactId>
  <version>1.17.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.12</artifactId>
  <version>1.17.1</version>
</dependency>

Example Code

MySqlSource<String> mySqlSource =
    MySqlSource.<String>builder()
        .hostname("192.168.56.107")
        .port(3306)
        .databaseList("test") // set captured database
        .tableList("test.table_a") // set captured table
        .username("root")
        .password("mysql")
        .deserializer(
            new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
        .serverTimeZone("UTC")
        .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// enable checkpoint
env.enableCheckpointing(3000);

env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
    // set 4 parallel source tasks
    .setParallelism(4)
    .print()
    .setParallelism(1); // use parallelism 1 for sink to keep message ordering

env.execute("Print MySQL Snapshot + Binlog");

usage for table/SQL API

Connector