本篇博客之起因是,在做技术选型时,DataX 成为了数据同步的候选项。然而并未在实际项目中使用(因项目原因)。仅仅是尝试了一遍,把技术路子走通,并未深入学习。
博客内容,意在测试,不免简陋。如在真实项目中使用,不管是项目代码编写,还是作业脚本编写,都应该根据实际情况去深入学习、多多优化,以满足真实的使用场景。
1. DataX 简介
官方介绍:DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。
2. 下载部署
2.1 依赖环境:Windows/Linux、JDK 1.8+、Python(2 或 3)。
2.2 访问项目DataX,简单浏览项目文档。
2.2 两种下载方式:
- 方式一:直接下载 DataX 工具包:DataX 下载地址。
- 方式二:下载 DataX 源码,自己编译。
2.2 为了方便,我们直接下载工具包,得到 datax.tar.gz 文件。
2.3 将下载文件加压到本地某个目录,进入 bin 目录,即可运行同步作业。(说明:DataX 解压后即可使用,所以部署很简单)
1 2
| $ cd {YOUR_DATAX_HOME}/bin $ python datax.py {YOUR_JOB.json}
|
2.4 执行自检脚本,看到如下运行结果,表示部署正常。
1 2 3 4 5 6 7 8 9 10 11
| # 当前目录 datax/bin $ python datax.py ../job/job.json …… 2024-02-29 15:52:09.374 [job-0] INFO JobContainer - 任务启动时刻 : 2024-02-29 15:51:59 任务结束时刻 : 2024-02-29 15:52:09 任务总计耗时 : 10s 任务平均流量 : 253.91KB/s 记录写入速度 : 10000rec/s 读出记录总数 : 100000 读写失败总数 : 0
|
3. 运行测试脚本
3.1 编写作业脚本,新建 json 文件,将模板中的示例配置改为自己的实际配置即可。
- 方式一:通过命令查看配置模板,示例如下:
1 2
| # 命令格式:python datax.py -r {YOUR_READER} -w {YOUR_WRITER} $ python datax.py -r mysqlreader -w mysqlwriter
|
- 方式二:直接查看源码中的文档MysqlReader。
3.2 本人测试所写,用于 GreenPlum 数据库同步的作业脚本 gp2gp.json 如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| { "job": { "setting": { "speed": { "channel": 3, "bps": 1048576 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "postgresqlreader", "parameter": { "username": "", "password": "", "column": [ "id", "name" ], "connection": [ { "table": [ "cj_user" ], "jdbcUrl": [ "jdbc:postgresql://ip:port/db?currentSchema=schema" ] } ] } }, "writer": { "name": "postgresqlwriter", "parameter": { "username": "", "password": "", "column": [ "id", "name" ], "connection": [ { "jdbcUrl": "jdbc:postgresql://ip:port/db?currentSchema=schema", "table": [ "zl_user_test" ] } ] } } } ] } }
|
3.2 运行作业脚本,命令如下:
1 2
| # 进入 bin 目录 $ python datax.py ../job/gp2gp.json
|
3.3 执行成功后可以到数据库中进行验证,如果执行失败可以在 logs 目录中查看日志文件。
4. Java 项目调用
4.1 因 DataX 是通过 Python 脚本启动执行,所以在 Java 项目中,我们可以提供一个执行 Python 脚本的工具类方法。
4.2 工具类方法示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| package main.java.com.xhp.misc;
import java.io.BufferedReader; import java.io.InputStreamReader;
public class DataxTest { private static final String dataxPath = "D:\\ProgramData\\BigData\\datax\\bin\\datax.py";
public static void main(String[] args) { String scriptPath = "D:\\ProgramData\\BigData\\datax\\job\\gp2gp.json"; executeScript(scriptPath); }
private static void executeScript(String scriptPath) {
String[] cmd = new String[]{"python", dataxPath, scriptPath};
ProcessBuilder processBuilder = new ProcessBuilder(cmd); try { Process process = processBuilder.start();
BufferedReader stdOutput = new BufferedReader(new InputStreamReader(process.getInputStream())); String line; while ((line = stdOutput.readLine()) != null) { System.out.println(" 标准输出: " + line); }
BufferedReader errorOutput = new BufferedReader(new InputStreamReader(process.getErrorStream())); while ((line = errorOutput.readLine()) != null) { System.err.println(" 错误输出: " + line); }
int exitCode = process.waitFor(); if (exitCode == 0) { System.out.println("DataX 执行成功 "); } else { System.err.println("DataX 执行失败,exitCode: " + exitCode); } } catch (Exception e) { e.printStackTrace(); } } }
|
5. 小结
5.1 DataX 还是比较简单易用的,核心工作是在作业 JSON 文件的编写上。在真实项目里,可以根据配置动态生成对应的脚本文件。
5.2 DataX 提供了精准的速度控制和丰富的数据转换功能,还可以自定义转换函数,应能满足常见的使用场景。
原文链接: https://xinghuipeng.pages.dev/2024/02/29/work/bigdata/DataX的简单试用/
版权声明: 转载请注明出处.