Mysql CDC
More Ofthen, we can get a simplest example form CDC Connectors. But people still need to google some inescapable problems before using it.
preliminary
Flink: 1.17
JDK: 11
usage for DataStream API
Only import com.ververica.flink-connector-mysql-cdc
is not enough.
implementation("com.ververica:flink-connector-mysql-cdc:2.4.0")
//you also need these following dependencies
implementation("org.apache.flink:flink-shaded-guava:30.1.1-jre-16.1")
implementation("org.apache.flink:flink-connector-base:1.17")
implementation("org.apache.flink:flink-table-planner_2.12:1.17")
<dependency>
<groupId>com.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependencies need to be built based on master or release- branches by yourself. -->
<version>2.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-guava -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>30.1.1-jre-16.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.17.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.17.1</version>
</dependency>
Example Code
MySqlSource<String> mySqlSource =
MySqlSource.<String>builder()
.hostname("192.168.56.107")
.port(3306)
.databaseList("test") // set captured database
.tableList("test.table_a") // set captured table
.username("root")
.password("mysql")
.deserializer(
new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.serverTimeZone("UTC")
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print()
.setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");