DataX 的简单试用

   本篇博客之起因是,在做技术选型时,DataX 成为了数据同步的候选项。然而并未在实际项目中使用(因项目原因)。仅仅是尝试了一遍,把技术路子走通,并未深入学习。

   博客内容,意在测试,不免简陋。如在真实项目中使用,不管是项目代码编写,还是作业脚本编写,都应该根据实际情况去深入学习、多多优化,以满足真实的使用场景。


1. DataX 简介

官方介绍:DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。

2. 下载部署

2.1 依赖环境:Windows/Linux、JDK 1.8+、Python(2 或 3)。

2.2 访问项目DataX,简单浏览项目文档。

2.2 两种下载方式:

  • 方式一:直接下载 DataX 工具包:DataX 下载地址
  • 方式二:下载 DataX 源码,自己编译。

2.2 为了方便,我们直接下载工具包,得到 datax.tar.gz 文件。

2.3 将下载文件加压到本地某个目录,进入 bin 目录,即可运行同步作业。(说明:DataX 解压后即可使用,所以部署很简单)

1
2
$ cd  {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}

2.4 执行自检脚本,看到如下运行结果,表示部署正常。

1
2
3
4
5
6
7
8
9
10
11
# 当前目录 datax/bin
$ python datax.py ../job/job.json
……
2024-02-29 15:52:09.374 [job-0] INFO JobContainer -
任务启动时刻 : 2024-02-29 15:51:59
任务结束时刻 : 2024-02-29 15:52:09
任务总计耗时 : 10s
任务平均流量 : 253.91KB/s
记录写入速度 : 10000rec/s
读出记录总数 : 100000
读写失败总数 : 0

3. 运行测试脚本

3.1 编写作业脚本,新建 json 文件,将模板中的示例配置改为自己的实际配置即可。

  • 方式一:通过命令查看配置模板,示例如下:
    1
    2
    # 命令格式:python datax.py -r {YOUR_READER} -w {YOUR_WRITER}
    $ python datax.py -r mysqlreader -w mysqlwriter
  • 方式二:直接查看源码中的文档MysqlReader

3.2 本人测试所写,用于 GreenPlum 数据库同步的作业脚本 gp2gp.json 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
{
"job": {
"setting": {
"speed": {
// 通道数
"channel": 3,
// 设置传输速度,单位为 byte/s,DataX 运行会尽可能达到该速度但是不超过它.
"bps": 1048576
},
// 出错限制
"errorLimit": {
// 出错的 record 条数上限,当大于该值即报错。
"record": 0,
// 出错的 record 百分比上限 1.0 表示 100%,0.02 表示 2%
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
// 数据库连接用户名
"username": "",
// 数据库连接密码
"password": "",
"column": [
"id",
"name"
],
// 切分主键
// "splitPk": "id",
"connection": [
{
"table": [
"cj_user"
],
"jdbcUrl": [
"jdbc:postgresql://ip:port/db?currentSchema=schema"
]
}
]
}
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"username": "",
"password": "",
"column": [
"id",
"name"
],
// "preSql": [
// "delete from test"
// ],
"connection": [
{
"jdbcUrl": "jdbc:postgresql://ip:port/db?currentSchema=schema",
"table": [
"zl_user_test"
]
}
]
}
}
}
]
}
}

3.2 运行作业脚本,命令如下:

1
2
# 进入 bin 目录
$ python datax.py ../job/gp2gp.json

3.3 执行成功后可以到数据库中进行验证,如果执行失败可以在 logs 目录中查看日志文件。

4. Java 项目调用

4.1 因 DataX 是通过 Python 脚本启动执行,所以在 Java 项目中,我们可以提供一个执行 Python 脚本的工具类方法。
4.2 工具类方法示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main.java.com.xhp.misc;

import java.io.BufferedReader;
import java.io.InputStreamReader;

/**
* Java 执行 DataX 脚本
*
* @Author xhp
* @Date 2024/2/28 15:55
**/
public class DataxTest {
// datax 安装路径
private static final String dataxPath = "D:\\ProgramData\\BigData\\datax\\bin\\datax.py";

public static void main(String[] args) {
// 脚本路径
String scriptPath = "D:\\ProgramData\\BigData\\datax\\job\\gp2gp.json";
executeScript(scriptPath);
}

/**
* 执行脚本
*
* @param scriptPath
*/
private static void executeScript(String scriptPath) {

// cmd 命令
String[] cmd = new String[]{"python", dataxPath, scriptPath};

ProcessBuilder processBuilder = new ProcessBuilder(cmd);
try {
Process process = processBuilder.start();

// 从进程的标准输出读取数据
BufferedReader stdOutput = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line;
while ((line = stdOutput.readLine()) != null) {
System.out.println(" 标准输出: " + line);
}

// 从进程的错误输出读取数据
BufferedReader errorOutput = new BufferedReader(new InputStreamReader(process.getErrorStream()));
while ((line = errorOutput.readLine()) != null) {
System.err.println(" 错误输出: " + line);
}

int exitCode = process.waitFor();
// 根据 exitCode 判断任务是否成功
if (exitCode == 0) {
System.out.println("DataX 执行成功 ");
} else {
System.err.println("DataX 执行失败,exitCode: " + exitCode);
// 这里可以进一步处理错误,例如记录日志、发送告警等
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

5. 小结

5.1 DataX 还是比较简单易用的,核心工作是在作业 JSON 文件的编写上。在真实项目里,可以根据配置动态生成对应的脚本文件。
5.2 DataX 提供了精准的速度控制和丰富的数据转换功能,还可以自定义转换函数,应能满足常见的使用场景。