In this guide, we will walk you through a lightweight data migration and synchronization solution from MySQL to OceanBase using Apache SeaTunnel (SeaTunnel for short). We will leverage its built-inZeta engine, which supportsfull data synchronization, offline incremental synchronization, and Change Data Capture (CDC) solutions.
Preparing the Runtime Environment
Before we begin, ensure that your environment is ready.
Install Java
SeaTunnel requiresJava 8 or higher. While Java 8 is recommended, later versions should work as well.
After installation, verify that Java is correctly configured by running:
root:~\# java -version
openjdk version "17.0.12" 2024-07-16
OpenJDK Runtime Environment (build 17.0.12+7-Debian-2deb11u1)
OpenJDK 64-Bit Server VM (build 17.0.12+7-Debian-2deb11u1, mixed mode, sharing)
Make sure thatJAVA_HOME
is properly set.
Download and Install Apache SeaTunnel
Visit theofficial SeaTunnel websiteto download the latest version.
For this guide, we will useversion 2.3.9:
\# Download
wget https://dlcdn.apache.org/seatunnel/2.3.9/apache-seatunnel-2.3.9-bin.tar.gz
\# Extract
tar -zxvf apache-seatunnel-2.3.9-bin.tar.gz
Installing Connector Plugins
SeaTunnel’s installation package only contains the core framework and theZeta engine. To connect with various data sources, you need tomanually download and configurethe required plugins.
Automatic Plugin Installation
To automatically download the necessary connectors, modify theconfig/plugin_config
file and specify the required connectors. By default, the file includes all connectors, but for this guide, we only include the essential ones:
connector-cdc-mysql
connector-jdbc
connector-fake
connector-console
Run the following command to install the plugins:
sh bin/install-plugin.sh 2.3.9
Manual Plugin Installation
Alternatively, you can manually download the required plugins fromApache Maven Repository.
Download the necessary.jar
files, for example:
connector-cdc-mysql-2.3.9.jar
connector-console-2.3.9.jar
connector-fake-2.3.9.jar
connector-jdbc-2.3.9.jar
seatunnel-transforms-v2-2.3.9.jar
After downloading,move the files into the**Connectors**
directory.
Verifying Connector Installation
To check if the connectors are installed correctly, run:
./bin/seatunnel-connector.sh -l
Source
FakeSource MySQL-CDC Jdbc
Sink
Jdbc Console
Transform
Copy DynamicCompile FieldMapper Filter FilterRowKind JsonPath LLM Replace Split Sql
Since we will be usingJDBC for MySQL connectionto interact withOceanBase, you also need to download theMySQL JDBC driverfrom theofficial MySQL website.
Once downloaded, place themysql-connector-j-9.0.0.jar
file into {seatunnel/lib}
Verifying SeaTunnel Installation
To confirm that SeaTunnel is installed correctly, execute abatch processing testusing the default configuration template:
./bin/seatunnel.sh --config ./config/v2.batch.config.template -m local
Command Explanation
seatunnel.sh
→ Standard SeaTunnel startup scriptconfig
→ Specifies the configuration script-m local
→ Runs in local mode
If everything is working correctly, you should see output similar to this:
2022-12-19 11:01:45,417 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - output rowType: name<STRING>, age<INT>
2022-12-19 11:01:46,489 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CpiOd, 8520946
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: eQqTs, 1256802974
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: UsRgO, 2053193072
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jDQJj, 1993016602
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: rqdKp, 1392682764
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: wCoWN, 986999925
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: qomTU, 72775247
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jcqXR, 1074529204
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: AkWIO, 1961723427
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: hBoib, 929089763
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=11: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: GSvzm, 827085798
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=12: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: NNAYI, 94307133
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=13: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: EexFl, 1823689599
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=14: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CBXUb, 869582787
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=15: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: Wbxtm, 1469371353
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=16: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: mIJDt, 995616438
At the end of the job execution, you will see asummary log:
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*
Job Statistic Information
*_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\***
Start Time : 2024-08-29 22:45:29
End Time : 2024-08-29 22:45:33
Total Time(s) : 4
Total Read Count : 32
Total Write Count : 32
Total Failed Count : 0
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_**
This confirms that SeaTunnel is working correctly.
Full Data Synchronization
Creating the Test Table
To verifyfull data synchronization, we create test tables in bothMySQLandOceanBase.
Step 1: Creating the MySQL Table
CREATE TABLE \`table1\` (
\`id\` INT NOT NULL AUTO_INCREMENT,
\`value1\` VARCHAR(255) NOT NULL,
\`value2\` VARCHAR(255) ,
\`value3\` VARCHAR(255) ,
\`value4\` VARCHAR(255) ,
\`value5\` VARCHAR(255) ,
\`created\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP,
\`updated\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (\`id\`),
UNIQUE INDEX \`idx_value1\` (\`value1\`),
INDEX \`idx\_value2\_value3\` (\`value2\`, \`value3\`),
INDEX \`idx\_value3\_value4_value5\` (\`value3\`, \`value4\`, \`value5\`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE \`table2\` (
\`id\` INT NOT NULL AUTO_INCREMENT,
\`value1\` VARCHAR(255) NOT NULL,
\`value2\` VARCHAR(255) ,
\`value3\` VARCHAR(255) ,
\`value4\` VARCHAR(255) ,
\`value5\` VARCHAR(255) ,
\`created\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP,
\`updated\_at\` TIMESTAMP DEFAULT CURRENT\_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (\`id\`),
UNIQUE INDEX \`idx_value1\` (\`value1\`),
INDEX \`idx\_value2\_value3\` (\`value2\`, \`value3\`),
INDEX \`idx\_value3\_value4_value5\` (\`value3\`, \`value4\`, \`value5\`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
We used Navicat to create 100,000 records each.
# Configuring SeaTunnel for Full Synchronization
# Full Data Synchronization Configuration File
_Note: We recommend manually migrating the table schema since automatic migration may encounter issues and does not create indexes._
# Single-Table Full Sync
env {
parallelism = 5
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection\_check\_timeout_sec = 100
user = "xxx"
password = "xxx"
query = "select * from seatunnel.table1"
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "xxx@xxx"
password = "xxx"
\# Automatically generate SQL statements
generate\_sink\_sql = true
database = seatunnel
table = table1
}
}
Result:
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*
Job Statistic Information
*_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\***
Start Time : 2024-08-30 15:05:39
End Time : 2024-08-30 15:05:47
Total Time(s) : 8
Total Read Count : 100000
Total Write Count : 100000
Total Failed Count : 0
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_**
Multi-Table Full Extraction
env {
parallelism = 5
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection\_check\_timeout_sec = 100
user = "xxx"
password = "xxx"
table_list = \[
{
table_path = "seatunnel.table1"
},
{
table_path = "seatunnel.table2"
query = "select * from seatunnel.table2 where id > 100"
}
\]
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "xxx@xxx"
password = "xxx"
\# Automatically generate SQL statements
generate\_sink\_sql = true
database = seatunnel
table_list = \["seatunnel.table1", "seatunnel.table2"\]
}
}
Result:
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*
Job Statistic Information
*_******\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\***
Start Time : 2024-08-30 15:10:09
End Time : 2024-08-30 15:10:20
Total Time(s) : 10
Total Read Count : 200000
Total Write Count : 200000
Total Failed Count : 0
**\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*****\*\*\*\*******_*_**
Incremental Synchronization Configuration File
For incremental sync, a simple approach is to use a query that filters based on anid
orupdatetime
column.
env {
parallelism = 1
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection\_check\_timeout_sec = 100
user = "xxx"
password = "xxx"
query = "SELECT * FROM seatunnel.table1 WHERE updatetime > '2024-01-01' "
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "xxx@xxx"
password = "xxx"
generate\_sink\_sql = true
database = seatunnel
table = table1
}
}
_Note:_The sink will perform insert and update operations based on the primary key. However, manually updating the configuration file for each incremental run can be cumbersome. We recommend using Apache DolphinScheduler in conjunction with SeaTunnel to create a workflow. With DolphinScheduler, you can obtain the maximum timestamp orid
from the sink and pass it as a workflow variable.
Example configuration with a variable:
env {
parallelism = 1
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
connection\_check\_timeout_sec = 100
user = "xxx"
password = "xxx"
query = "SELECT * FROM seatunnel.table1 WHERE updatetime > ${max_id} "
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "xxx@xxx"
password = "xxx"
generate\_sink\_sql = true
database = seatunnel
table = table1
}
}
The multi-table configuration is similar.
CDC Synchronization Configuration File
Manual Table Schema Migration
Due to issues with SeaTunnel’s OceanBase component, schema migration can be error-prone. It is recommended to migrate the table schema manually.
Check MySQL Binlog Status
Grant the necessary privileges to the user:
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
mysql> FLUSH PRIVILEGES;
Verify that the binlog is enabled:
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog\_row\_image', 'gtid_mode', 'enforce\_gtid\_consistency');
+--------------------------+----------------+
| Variable_name | Value |
+--------------------------+----------------+
| binlog_format | ROW |
| binlog\_row\_image | FULL |
| enforce\_gtid\_consistency | ON |
| gtid_mode | ON |
| log_bin | ON |
+--------------------------+----------------+
If the settings are not as above, please adjust yourmysql.cnf
file accordingly. Note that when creating a consistent snapshot on large databases, read timeouts may occur; please configureinteractive_timeout
andwait_timeout
as needed.
After preparing the environment, write the configuration file.
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 10000
}
source {
MySQL-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/mysql"
username = "xxx"
password = "xxx@xxx"
table-names = \["seatunnel.table1", "seatunnel.table2"\]
startup.mode = "initial"
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:2883/mysql?&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
driver = "com.mysql.cj.jdbc.Driver"
user = "xxx@xxx"
password = "xxx"
database = "seatunnel" \# Target database
table-names = \["seatunnel.table1", "seatunnel.table2"\]
generate\_sink\_sql = true \# Automatically generate SQL
}
}
Once started, the job will first perform a historical data migration, then process CDC changes.
Important Note:
Upon startup, SeaTunnel will execute different operations based on the configured tables and thestartup.mode
setting. Thestartup.mode
options are as follows:
**initial**
: Synchronizes historical data first, then incremental data.**earliest**
: Starts from the earliest offset.**latest**
: Starts from the latest offset.**specific**
: Starts from a user-provided specific offset.
If you usespecific
, you must provide the offset file (e.g.,startup.specific-offset.file binlog
) and the offset position (e.g.,startup.specific-offset.pos binlog
).
Conclusion
This article has detailed how to configure full, incremental, and CDC synchronization using Apache SeaTunnel. We covered:
- Full sync configuration for single and multi-table extraction.
- Incremental sync configuration using query filters (with an option to integrate with Apache DolphinScheduler).
- CDC sync configuration, including prerequisites like binlog verification.
By following these steps, you can achieve a complete, end-to-end data migration and synchronization solution. Thank you for reading, and please provide your feedback!🚀