Subsections of 🐿️Apache Flink
On K8s Operator
Subsections of On K8s Operator
Job Privilieges
Template
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: flink
name: flink-deployment-manager
rules:
- apiGroups:
- flink.apache.org
resources:
- flinkdeployments
verbs:
- 'get'
- 'list'
- 'create'
- 'update'
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: flink-deployment-manager-binding
namespace: flink
subjects:
- kind: User
name: "277293711358271379"
apiGroup: rbac.authorization.k8s.io
roleRef:
kind: Role
name: flink-deployment-manager
apiGroup: rbac.authorization.k8s.io
OSS Template
Template
apiVersion: "flink.apache.org/v1beta1"
kind: "FlinkDeployment"
metadata:
name: "financial-job"
spec:
image: "cr.registry.res.cloud.wuxi-yqgcy.cn/mirror/financial-topic:1.5-oss"
flinkVersion: "v1_17"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "8"
fs.oss.endpoint: http://ay-test.oss-cn-jswx-xuelang-d01-a.ops.cloud.wuxi-yqgcy.cn/
fs.oss.accessKeyId: 4gqOVOfQqCsCUwaC
fs.oss.accessKeySecret: xxx
ingress:
template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
annotations:
cert-manager.io/cluster-issuer: "self-signed-ca-issuer"
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
serviceAccount: "flink"
podTemplate:
apiVersion: "v1"
kind: "Pod"
metadata:
name: "financial-job"
spec:
containers:
- name: "flink-main-container"
env:
- name: ENABLE_BUILT_IN_PLUGINS
value: flink-oss-fs-hadoop-1.17.2.jar
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: "local:///app/application.jar"
parallelism: 1
upgradeMode: "stateless"
S3 Template
Template
apiVersion: "flink.apache.org/v1beta1"
kind: "FlinkDeployment"
metadata:
name: "financial-job"
spec:
image: "cr.registry.res.cloud.wuxi-yqgcy.cn/mirror/financial-topic:1.5"
flinkVersion: "v1_17"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "8"
s3a.endpoint: http://172.27.253.89:9000
s3a.access-key: minioadmin
s3a.secret-key: minioadmin
ingress:
template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
annotations:
cert-manager.io/cluster-issuer: "self-signed-ca-issuer"
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
serviceAccount: "flink"
podTemplate:
apiVersion: "v1"
kind: "Pod"
metadata:
name: "financial-job"
spec:
containers:
- name: "flink-main-container"
env:
- name: ENABLE_BUILT_IN_PLUGINS
value: flink-s3-fs-hadoop-1.17.2.jar
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: "local:///app/application.jar"
parallelism: 1
upgradeMode: "stateless"
CDC
Subsections of CDC
Mysql CDC
More Ofthen, we can get a simplest example form CDC Connectors. But people still need to google some inescapable problems before using it.
preliminary
Flink: 1.17
JDK: 11
usage for DataStream API
Only import com.ververica.flink-connector-mysql-cdc
is not enough.
implementation("com.ververica:flink-connector-mysql-cdc:2.4.0")
//you also need these following dependencies
implementation("org.apache.flink:flink-shaded-guava:30.1.1-jre-16.1")
implementation("org.apache.flink:flink-connector-base:1.17")
implementation("org.apache.flink:flink-table-planner_2.12:1.17")
<dependency>
<groupId>com.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependencies need to be built based on master or release- branches by yourself. -->
<version>2.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-guava -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>30.1.1-jre-16.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.17.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.17.1</version>
</dependency>
Example Code
MySqlSource<String> mySqlSource =
MySqlSource.<String>builder()
.hostname("192.168.56.107")
.port(3306)
.databaseList("test") // set captured database
.tableList("test.table_a") // set captured table
.username("root")
.password("mysql")
.deserializer(
new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.serverTimeZone("UTC")
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print()
.setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");