开篇
最早接触DataX是在前阿里同事在现在的公司引入的时候提到的,一直想抽空好好看看这部分代码,因为DataX的代码框架设计的很好,非常适合二次开发。在熟悉DataX的代码过程中,没有时间针对每个数据源的读写部分代码进行研究(这部分代码非常值得研究,基本上主流数据源的读写操作都能看到),主要阅读的还是DataX的启动和工作部分代码。
DataX的框架的核心部分我个人看来就两大块,一块是配置贯穿DataX,all in configuration,将配置的json用到了极致;另一块是通过URLClassLoader实现插件的热加载。
DataX的github地址:https://github.com/alibaba/DataX
DataX介绍
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。
DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
Job&Task概念
在DataX的逻辑模型中包括job、task两个维度,通过将job进行task拆分,然后将task合并到taskGroup进行运行。
-
job实例运行在jobContainer容器中,它是所有任务的master,负责初始化、拆分、调度、运行、回收、监控和汇报,但它并不做实际的数据同步操作。
-
Job: Job是DataX用以描述从一个源头到一个目的端的同步作业,是DataX数据同步的最小业务单元。比如:从一张mysql的表同步到odps的一个表的特定分区。
-
Task: Task是为最大化而把Job拆分得到的最小执行单元。比如:读一张有1024个分表的mysql分库分表的Job,拆分成1024个读Task,用若干个并发执行。
-
TaskGroup: 描述的是一组Task集合。在同一个TaskGroupContainer执行下的Task集合称之为TaskGroup。
-
JobContainer: Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker。
-
TaskGroupContainer: TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TaskTracker。
-
简而言之, Job拆分成Task,在分别在框架提供的容器中执行,插件只需要实现Job和Task两部分逻辑。
启动过程
说明:
- 上图中,黄色表示
Job
部分的执行阶段,蓝色表示Task
部分的执行阶段,绿色表示框架执行阶段。
说明:
- reader和writer的自定义插件内部需要实现job和task的接口即可
DataX开启Debug
阅读源码的最好方法是debug整个项目工程,在如何调试DataX项目的过程中还是花费了一些精力在里面的,现在一并共享出来供有兴趣的程序员一并研究。
整个debug过程需要按照下列步骤进行:
- 1、github上下载DataX的源码并通过以下命令进行编译,github官网有编译命令,如果遇到依赖包无法下载可以省去部分writer或reader插件,不影响debug。
(1)、下载DataX源码:
$ git clone git@github.com:alibaba/DataX.git
(2)、通过maven打包:
$ cd {DataX_source_code_home}
$ mvn -U clean package assembly:assembly -Dmaven.test.skip=true
打包成功,日志显示如下:
[INFO] BUILD SUCCESS
[INFO] -----------------------------------------------------------------
[INFO] Total time: 08:12 min
[INFO] Finished at: 2015-12-13T16:26:48+08:00
[INFO] Final Memory: 133M/960M
[INFO] -----------------------------------------------------------------
打包成功后的DataX包位于 {DataX_source_code_home}/target/datax/datax/ ,结构如下:
$ cd {DataX_source_code_home}
$ ls ./target/datax/datax/
bin conf job lib log log_perf plugin script tmp
- 2、由于DataX是通过python脚本进行启动的,所以在python脚本中把启动参数打印出来,核心在于print startCommand这句,继而我们就能够获取启动命令参数了。
if __name__ == "__main__":
printCopyright()
parser = getOptionParser()
options, args = parser.parse_args(sys.argv[1:])
if options.reader is not None and options.writer is not None:
generateJobConfigTemplate(options.reader,options.writer)
sys.exit(RET_STATE['OK'])
if len(args) != 1:
parser.print_help()
sys.exit(RET_STATE['FAIL'])
startCommand = buildStartCommand(options, args)
print startCommand
child_process = subprocess.Popen(startCommand, shell=True)
register_signal()
(stdout, stderr) = child_process.communicate()
sys.exit(child_process.returncode)
- 3、获取启动DataX的启动命令
java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=//Users/lebron374/Documents/github/DataX/target/datax/datax/log
-Dloglevel=info -Dfile.encoding=UTF-8
-Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener
-Djava.security.egd=file:///dev/urandom -Ddatax.home=//Users/lebron374/Documents/github/DataX/target/datax/datax
-Dlogback.configurationFile=//Users/lebron374/Documents/github/DataX/target/datax/datax/conf/logback.xml
-classpath //Users/lebron374/Documents/github/DataX/target/datax/datax/lib/*:.
-Dlog.file.name=s_datax_job_job_json
com.alibaba.datax.core.Engine
-mode standalone -jobid -1
-job //Users/lebron374/Documents/github/DataX/target/datax/datax/job/job.json
- 4、配置Idea启动脚本
以下配置在VM options当中
-server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=//Users/lebron374/Documents/github/DataX/target/datax/datax/log
-Dloglevel=info -Dfile.encoding=UTF-8
-Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener
-Djava.security.egd=file:///dev/urandom -Ddatax.home=//Users/lebron374/Documents/github/DataX/target/datax/datax
-Dlogback.configurationFile=//Users/lebron374/Documents/github/DataX/target/datax/datax/conf/logback.xml
-classpath //Users/lebron374/Documents/github/DataX/target/datax/datax/lib/*:.
-Dlog.file.name=s_datax_job_job_json
com.alibaba.datax.core.Engine
以下配置在Program arguments当中
-mode standalone -jobid -1
-job //Users/lebron374/Documents/github/DataX/target/datax/datax/job/job.json
启动步骤解析
-
1、解析配置,包括job.json、core.json、plugin.json三个配置
-
2、设置jobId到configuration当中
-
3、启动Engine,通过Engine.start()进入启动程序
-
4、设置RUNTIME_MODE奥configuration当中
-
5、通过JobContainer的start()方法启动
-
6、依次执行job的preHandler()、init()、prepare()、split()、schedule()、- post()、postHandle()等方法。
-
7、init()方法涉及到根据configuration来初始化reader和writer插件,这里涉及到jar包热加载以及调用插件init()操作方法,同时设置reader和writer的configuration信息
-
8、prepare()方法涉及到初始化reader和writer插件的初始化,通过调用插件的prepare()方法实现,每个插件都有自己的jarLoader,通过集成URLClassloader实现而来
-
9、split()方法通过adjustChannelNumber()方法调整channel个数,同时执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,达到切分后数目相等,才能满足1:1的通道模型
-
10、channel的计数主要是根据byte和record的限速来实现的,在split()的函数中第一步就是计算channel的大小
-
11、split()方法reader插件会根据channel的值进行拆分,但是有些reader插件可能不会参考channel的值,writer插件会完全根据reader的插件1:1进行返回
-
12、split()方法内部的mergeReaderAndWriterTaskConfigs()负责合并reader、writer、以及transformer三者关系,生成task的配置,并且重写job.content的配置
-
13、schedule()方法根据split()拆分生成的task配置分配生成taskGroup对象,根据task的数量和单个taskGroup支持的task数量进行配置,两者相除就可以得出taskGroup的数量
-
14、schdule()内部通过AbstractScheduler的schedule()执行,继续执行startAllTaskGroup()方法创建所有的TaskGroupContainer组织相关的task,TaskGroupContainerRunner负责运行TaskGroupContainer执行分配的task。
-
15、taskGroupContainerExecutorService启动固定的线程池用以执行TaskGroupContainerRunner对象,TaskGroupContainerRunner的run()方法调用taskGroupContainer.start()方法,针对每个channel创建一个TaskExecutor,通过taskExecutor.doStart()启动任务
启动过程源码分析
入口main函数
public class Engine {
public static void main(String[] args) throws Exception {
int exitCode = 0;
try {
Engine.entry(args);
} catch (Throwable e) {
System.exit(exitCode);
}
}
public static void entry(final String[] args) throws Throwable {
// 省略相关参数的解析代码
// 获取job的配置路径信息
String jobPath = cl.getOptionValue("job");
// 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
String jobIdString = cl.getOptionValue("jobid");
RUNTIME_MODE = cl.getOptionValue("mode");
// 解析配置信息
Configuration configuration = ConfigParser.parse(jobPath);
// 省略相关代码
boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
// 根据配置启动参数
Engine engine = new Engine();
engine.start(configuration);
}
}
说明:
main函数主要做两件事情,分别是:
- 1、解析job相关配置生成configuration。
- 2、依据配置启动Engine。
configuration解析过程
public final class ConfigParser {
private static final Logger LOG = LoggerFactory.getLogger(ConfigParser.class);
/**
* 指定Job配置路径,ConfigParser会解析Job、Plugin、Core全部信息,并以Configuration返回
*/
public static Configuration parse(final String jobPath) {
// 加载任务的指定的配置文件,这个配置是有固定的json的固定模板格式的
Configuration configuration = ConfigParser.parseJobConfig(jobPath);
// 合并conf/core.json的配置文件
configuration.merge(
ConfigParser.parseCoreConfig(CoreConstant.DATAX_CONF_PATH),
false);
// todo config优化,只捕获需要的plugin
// 固定的节点路径 job.content[0].reader.name
String readerPluginName = configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_READER_NAME);
// 固定的节点路径 job.content[0].writer.name
String writerPluginName = configuration.getString(
CoreConstant.DATAX_JOB_CONTENT_WRITER_NAME);
// 固定的节点路径 job.preHandler.pluginName
String preHandlerName = configuration.getString(
CoreConstant.DATAX_JOB_PREHANDLER_PLUGINNAME);
// 固定的节点路径 job.postHandler.pluginName
String postHandlerName = configuration.getString(
CoreConstant.DATAX_JOB_POSTHANDLER_PLUGINNAME);
// 添加读写插件的列表待加载
Set<String> pluginList = new HashSet<String>();
pluginList.add(readerPluginName);
pluginList.add(writerPluginName);
if(StringUtils.isNotEmpty(preHandlerName)) {
pluginList.add(preHandlerName);
}
if(StringUtils.isNotEmpty(postHandlerName)) {
pluginList.add(postHandlerName);
}
try {
// parsePluginConfig(new ArrayList<String>(pluginList))加载指定的插件的配置信息,并且和全局的配置文件进行合并
configuration.merge(parsePluginConfig(new ArrayList<String>(pluginList)), false);
}catch (Exception e){
}
// configuration整合了三方的配置,包括 任务配置、core核心配置、指定插件的配置。
return configuration;
}
// 在指定的reader和writer目录获取指定的插件并解析其配置
public static Configuration parsePluginConfig(List<String> wantPluginNames) {
// 创建一个空的配置信息对象
Configuration configuration = Configuration.newDefault();
Set<String> replicaCheckPluginSet = new HashSet<String>();
int complete = 0;
// 所有的reader在/plugin/reader目录,遍历获取所有reader的目录
// 获取待加载插件的配资信息,并合并到上面创建的空配置对象
// //Users/lebron374/Documents/github/DataX/target/datax/datax/plugin/reader
for (final String each : ConfigParser
.getDirAsList(CoreConstant.DATAX_PLUGIN_READER_HOME)) {
// 解析单个reader目录,eachReaderConfig保存的是key是plugin.reader.pluginname,value是对应的plugin.json内容
Configuration eachReaderConfig = ConfigParser.parseOnePluginConfig(each, "reader", replicaCheckPluginSet, wantPluginNames);
if(eachReaderConfig!=null) {
// 采用覆盖式的合并
configuration.merge(eachReaderConfig, true);
complete += 1;
}
}
// //Users/lebron374/Documents/github/DataX/target/datax/datax/plugin/writer
for (final String each : ConfigParser
.getDirAsList(CoreConstant.DATAX_PLUGIN_WRITER_HOME)) {
Configuration eachWriterConfig = ConfigParser.parseOnePluginConfig(each, "writer", replicaCheckPluginSet, wantPluginNames