Mysql CDC

More Ofthen, we can get a simplest example form CDC Connectors. But people still need to google some inescapable problems before using it.

preliminary

Flink: 1.17 JDK: 11

Flink CDC VersionFlink Version
1.0.01.11.*
1.1.01.11.*
1.2.01.12.*
1.3.01.12.*
1.4.01.13.*
2.0.*1.13.*
2.1.*1.13.*
2.2.*1.13.*, 1.14.*
2.3.*1.13.*, 1.14.*, 1.15.*
2.4.*1.13.*, 1.14.*, 1.15.*
3.0.*1.14.*, 1.15.*, 1.16.*

usage for DataStream API

Only import com.ververica.flink-connector-mysql-cdc is not enough.

implementation("com.ververica:flink-connector-mysql-cdc:2.4.0")

//you also need these following dependencies
implementation("org.apache.flink:flink-shaded-guava:30.1.1-jre-16.1")
implementation("org.apache.flink:flink-connector-base:1.17")
implementation("org.apache.flink:flink-table-planner_2.12:1.17")
<dependency>
  <groupId>com.ververica</groupId>
  <!-- add the dependency matching your database -->
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <!-- The dependency is available only for stable releases, SNAPSHOT dependencies need to be built based on master or release- branches by yourself. -->
  <version>2.4.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-guava -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-shaded-guava</artifactId>
  <version>30.1.1-jre-16.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-base</artifactId>
  <version>1.17.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.12</artifactId>
  <version>1.17.1</version>
</dependency>

Example Code

MySqlSource<String> mySqlSource =
    MySqlSource.<String>builder()
        .hostname("192.168.56.107")
        .port(3306)
        .databaseList("test") // set captured database
        .tableList("test.table_a") // set captured table
        .username("root")
        .password("mysql")
        .deserializer(
            new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
        .serverTimeZone("UTC")
        .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// enable checkpoint
env.enableCheckpointing(3000);

env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
    // set 4 parallel source tasks
    .setParallelism(4)
    .print()
    .setParallelism(1); // use parallelism 1 for sink to keep message ordering

env.execute("Print MySQL Snapshot + Binlog");

usage for table/SQL API