跳转至

插件开发

本指南主要面向那些需要开发符合自己需求的 Addax 插件开发人员。

插件机制

Addax 为了应对不同数据源的差异、同时提供一致地同步原语和扩展能力,采用了 框架 + 插件 的模式:

  • 插件只需关心数据的读取或者写入本身。
  • 而同步的共性问题,比如:类型转换、性能、统计,则交由框架来处理。

作为插件开发人员,则需要关注两个问题:

  1. 数据源本身的读写数据正确性。
  2. 如何与框架沟通、合理正确地使用框架。

插件视角看框架

逻辑执行模型

插件开发者不用关心太多,基本只需要关注特定系统读和写,以及自己的代码在逻辑上是怎样被执行的,哪一个方法是在什么时候被调用的。在此之前,需要明确以下概念:

  • Job: 用以描述从一个源头到一个目的端的同步作业,是数据同步的最小业务单元。比如:从一张 MySQL 的表同步到 PostgreSQL 的一个表。
  • Task: 为最性能大化而把 Job 拆分得到的最小执行单元。比如:读一张有 1024 个分表的 MySQL 分库分表的 Job,拆分成 1024 个读 Task,用若干个并发执行。
  • TaskGroup: 一组 Task 集合。在同一个 TaskGroupContainer 执行下的 Task 集合称之为 TaskGroup
  • JobContainer: Job 执行器,负责 Job 全局拆分、调度、前置语句和后置语句等工作的工作单元。类似 Yarn 中的 JobTracker
  • TaskGroupContainer: TaskGroup 执行器,负责执行一组 Task 的工作单元,类似 Yarn 中的 TaskTracker。

简而言之, Job拆分成Task,分别在框架提供的容器中执行,插件只需要实现 JobTask 两部分逻辑。

编程接口

那么,JobTask 的逻辑应是怎么对应到具体的代码中的?

首先,插件的入口类必须扩展 ReaderWriter 抽象类,并且实现分别实现 JobTask 两个内部抽象类,JobTask 的实现必须是 内部类 的形式,原因见 加载原理 一节。以 Reader 为例:

public class SomeReader
        extends Reader
{
    public static class Job
            extends Reader.Job
    {
        @Override
        public void init()
        {
        }

        @Override
        public void prepare()
        {
        }

        @Override
        public List<Configuration> split(int adviceNumber)
        {
            return null;
        }

        @Override
        public void post()
        {
        }

        @Override
        public void destroy()
        {
        }
    }

    public static class Task
            extends Reader.Task
    {

        @Override
        public void init()
        {
        }

        @Override
        public void prepare()
        {
        }

        @Override
        public void startRead(RecordSender recordSender)
        {
        }

        @Override
        public void post()
        {
        }

        @Override
        public void destroy()
        {
        }
    }
}

Job 接口功能如下:

  • init: Job对象初始化工作,此时可以通过 super.getPluginJobConf() 获取与本插件相关的配置。读插件获得配置中 reader 部分,写插件获得 writer 部分。
  • prepare: 全局准备工作,比如 MySQL 清空目标表。
  • split: 拆分 Task。参数 adviceNumber 框架建议的拆分数,一般是运行时所配置的并发度。值返回的是 Task 的配置列表。
  • post: 全局的后置工作,比如 MySQL writer 同步完影子表后的 rename 操作。
  • destroy: Job对象自身的销毁工作。

Task 接口功能如下:

  • init:Task 对象的初始化。此时可以通过 super.getPluginJobConf() 获取与本 Task 相关的配置。这里的配置是 Job#split 方法返回的配置列表中的其中一个。
  • prepare:局部的准备工作。
  • startRead: 从数据源读数据,写入到 RecordSender 中。RecordSender 会把数据写入连接 ReaderWriter 的缓存队列。
  • startWrite:从 RecordReceiver 中读取数据,写入目标数据源。RecordReceiver 中的数据来自 ReaderWriter 之间的缓存队列。
  • post: 局部的后置工作。
  • destroy: Task 对象自身的销毁工作。

需要注意的是:

  • JobTask 之间一定不能有共享变量,因为分布式运行时不能保证共享变量会被正确初始化。两者之间只能通过配置文件进行依赖。
  • preparepostJobTask 中都存在,插件需要根据实际情况确定在什么地方执行操作。

框架按照如下的顺序执行 JobTask 的接口:

AddaxReaderWriter

上图中,黄色表示 Job 部分的执行阶段,蓝色表示 Task 部分的执行阶段,绿色表示框架执行阶段。

相关类关系如下:

Addax

插件定义

在每个插件的项目中,都有一个plugin.json文件,这个文件定义了插件的相关信息,包括入口类。例如:

{
  "name": "mysqlwriter",
  "class": "com.wgzhao.addax.plugin.writer.mysqlwriter.MysqlWriter",
  "description": "Use Jdbc connect to database, execute insert sql.",
  "developer": "wgzhao"
}
  • name: 插件名称,大小写敏感。框架根据用户在配置文件中指定的名称来搜寻插件。 十分重要
  • class: 入口类的全限定名称,框架通过反射创建入口类的实例。十分重要
  • description: 描述信息。
  • developer: 开发人员。

打包发布

Addax 使用 assembly 打包,打包命令如下:

mvn clean package 
mvn package assembly:single

Addax 插件需要遵循统一的目录结构:

${ADDAX_HOME}
├── bin
│     ├── addax.sh
├── conf
│     ├── core.json
│     └── logback.xml
├── job
├── lib
│     ├── addax-common-4.0.5.jar
│     ├── addax-core-4.0.7-SNAPSHOT.jar
│     ├── addax-rdbms-4.0.5.jar
│     ├── addax-storage-4.0.5.jar
│     ├── addax-transformer-4.0.5.jar
│     ├── aircompressor-0.21.jar
│     ├── annotations-2.0.3.jar
│     ├── checker-qual-2.11.1.jar
│     ├── commons-beanutils-1.9.4.jar
├── log
├── plugin
│     ├── reader
│     │     ├── cassandrareader
│     │     │     ├── cassandrareader-4.0.5.jar
│     │     │     ├── libs
│     │     │     │     ├── <symbol link to shared folder>
│     │     │     ├── plugin.json
│     │     │     └── plugin_job_template.json
│     └── writer
│         ├── cassandrawriter
│         │     ├── cassandrawriter-4.0.5.jar
│         │     ├── libs
│         │     │     ├── <symbol link to shared folder>
│         │     ├── plugin.json
│         │     └── plugin_job_template.json
  • ${ADDAX_HOME}/bin: 可执行程序目录
  • ${ADDAX_HOME}/conf: 框架配置目录
  • ${ADDAX_HOME}/lib: 框架依赖库目录
  • ${ADDAX_HOME}/shared: 插件依赖目录
  • ${ADDAX_HOME}/plugin: 插件目录

插件目录分为 readerwriter 子目录,读写插件分别存放。插件目录规范如下:

  • ${PLUGIN_HOME}/libs: 插件的依赖库,为了减少程序包大小,这些依赖包都是指向 shared 目录的符号链接
  • ${PLUGIN_HOME}/plugin-name-version.jar: 插件本身的jar。
  • ${PLUGIN_HOME}/plugin.json: 插件描述文件。

尽管框架加载插件时,会把 ${PLUGIN_HOME} 下所有的 jar 包添加到 classpath 环境变量中,但还是推荐依赖库的jar和插件本身的jar分开存放。

特别提醒

插件的目录名字必须和 plugin.json 中定义的插件名称一致。

配置文件

Addax 使用 json 作为配置文件的格式。一个典型的 Addax 任务配置如下:

{
  "job": {
    "setting": {
      "speed": {
        "byte": -1,
        "channel": 1
      }
    },
    "content": {
      "reader": {
        "name": "postgresqlreader",
        "parameter": {
          "username": "pgtest",
          "password": "pgtest",
          "column": [
            "*"
          ],
          "connection": [
            {
              "table": [
                "addax_tbl"
              ],
              "jdbcUrl": [
                "jdbc:postgresql://localhost:5432/pgtest"
              ]
            }
          ]
        }
      },
      "writer": {
        "name": "postgresqlwriter",
        "parameter": {
          "column": [
            "*"
          ],
          "preSql": [
            "truncate table @table"
          ],
          "connection": [
            {
              "jdbcUrl": "jdbc:postgresql://127.0.0.1:5432/pgtest",
              "table": [
                "addax_tbl1"
              ]
            }
          ],
          "username": "pgtest",
          "password": "pgtest",
          "writeMode": "insert"
        }
      }
    }
  }
}

Addax 框架有 core.json 配置文件,指定了框架的默认行为。任务的配置里头可以指定框架中已经存在的配置项,而且具有更高的优先级,会覆盖 core.json 中的默认值。

配置中job.content.reader.parametervalue 部分会传给 Reader.Jobjob.content.writer.parametervalue 部分会传给Writer.JobReader.JobWriter.Job 可以通过 super.getPluginJobConf() 来获取。

如何设计配置参数

配置文件的设计是插件开发的第一步!

任务配置中 readerwriterparameter 部分是插件的配置参数,插件的配置参数应当遵循以下原则:

  • 驼峰命名:所有配置项采用小驼峰命名法,首字母小写。
  • 正交原则:配置项必须正交,功能没有重复,没有潜规则。
  • 富类型:合理使用json的类型,减少无谓的处理逻辑,减少出错的可能。
    • 使用正确的数据类型。比如,bool 类型的值使用 true/false,而非 "yes"/"true"/0 等。
    • 合理使用集合类型,比如,用数组替代有分隔符的字符串。
  • 类似通用:遵守同一类型的插件的习惯,比如关系型数据库的 connection 参数都是如下结构:
{
  "connection": [
    {
      "table": [
        "table_1",
        "table_2"
      ],
      "jdbcUrl": [
        "jdbc:mysql://127.0.0.1:3306/database_1",
        "jdbc:mysql://127.0.0.2:3306/database_1_slave"
      ]
    },
    {
      "table": [
        "table_3",
        "table_4"
      ],
      "jdbcUrl": [
        "jdbc:mysql://127.0.0.3:3306/database_2",
        "jdbc:mysql://127.0.0.4:3306/database_2_slave"
      ]
    }
  ]
}

如何使用 Configuration

为了简化对 json 的操作,Addax 提供了简单的 DSL 配合 Configuration 类使用。

Configuration 提供了常见的 get, 带类型get带默认值getset 等读写配置项的操作,以及 clone, toJSON 等方法。配置项读写操作都需要传入一个 path 做为参数, 这个 path 就是 Addax 定义的 DSL。语法有两条:

  1. 子map用 .key 表示,path 的第一个点省略。
  2. 数组元素用 [index] 表示。

比如操作如下json:

{
  "a": {
    "b": {
      "c": 2
    },
    "f": [
      1,
      2,
      {
        "g": true,
        "h": false
      },
      4
    ]
  },
  "x": 4
}

比如调用 configuration.get(path) 方法,当path为如下值的时候得到的结果为:

  • x4
  • a.b.c2
  • a.b.c.dnull
  • a.b.f[0]1
  • a.b.f[2].gtrue

注意,因为插件看到的配置只是整个配置的一部分。使用 Configuration 对象时,需要注意当前的根路径是什么。

更多 Configuration 的操作请参考 Configuration.java

插件数据传输

跟一般的 生产者-消费者 模式一样,Reader 插件和 Writer 插件之间也是通过 channel 来实现数据的传输的。channel 可以是内存的,也可能是持久化的,插件不必关心。 插件通过 RecordSenderchannel 写入数据,通过 RecordReceiverchannel 读取数据。

channel 中的一条数据为一个 Record 的对象,Record 中可以放多个 Column 对象,这可以简单理解为数据库中的记录和列。

Record 有如下方法:

public interface Record
{
    // 加入一个列,放在最后的位置
    void addColumn(Column column);

    // 在指定下标处放置一个列
    void setColumn(int i, final Column column);

    // 获取一个列
    Column getColumn(int i);

    // 转换为json String
    String toString();

    // 获取总列数
    int getColumnNumber();

    // 计算整条记录在内存中占用的字节数
    int getByteSize();
}

因为 Record 是一个接口,Reader 插件首先调用 RecordSender.createRecord() 创建一个 Record 实例,然后把 Column 一个个添加到 Record 中。

Writer 插件调用 RecordReceiver.getFromReader() 方法获取 Record,然后把 Column 遍历出来,写入目标存储中。当 Reader 尚未退出,传输还在进行时,如果暂时没有数据 RecordReceiver.getFromReader() 方法会阻塞直到有数据。 如果传输已经结束,会返回nullWriter 插件可以据此判断是否结束 startWrite 方法。

类型转换

为了规范源端和目的端类型转换操作,保证数据不失真,Addax 支持六种内部数据类型:

  • Long:定点数(Int、Short、Long、BigInteger等)。
  • Double:浮点数(Float、Double、BigDecimal(无限精度)等)。
  • String:字符串类型,底层不限长,使用通用字符集(Unicode)。
  • Date:日期类型。
  • Timestamp: 时间戳
  • Bool:布尔值。
  • Bytes:二进制,可以存放诸如MP3等非结构化数据。

对应地,有 DateColumnLongColumnDoubleColumnBytesColumnStringColumnBoolColumnTimestampColumn 七种 Column 的实现。

Column 除了提供数据相关的方法外,还提供一系列以 as 开头的数据类型转换转换方法。

Columns

Addax的内部类型在实现上会选用不同的java类型:

内部类型 实现类型 备注
Date java.util.Date
Timestamp java.sql.Timestamp 可以精确到纳秒
Long java.math.BigInteger 使用无限精度的大整数,保证不失真
Double java.lang.String 用String表示,保证不失真
Bytes byte[]
String java.lang.String
Bool java.lang.Boolean

类型之间相互转换的关系如下:

from/to Date Long Double Bytes String Bool
Date - 使用毫秒时间戳 不支持 不支持 使用系统配置的date/time/datetime格式转换 不支持
Long 作为毫秒时间戳构造Date - BigInteger转为BigDecimal,然后BigDecimal.doubleValue() 不支持 BigInteger.toString() 0为false,否则true
Double 不支持 内部String构造BigDecimal,然后BigDecimal.longValue() - 不支持 直接返回内部String
Bytes 不支持 不支持 不支持 - 按照common.column.encoding配置的编码转换为String,默认utf-8 不支持
String 按照配置的date/time/datetime/extra格式解析 用String构造BigDecimal,然后取longValue() 用String构造BigDecimal,然后取doubleValue(),会正确处理NaN/Infinity/-Infinity 按照common.column.encoding配置的编码转换为byte[],默认utf-8 - "true"为true, "false"为false,大小写不敏感。其他字符串不支持
Bool 不支持 true1L,否则0L true1.0,否则0.0 不支持 -

脏数据处理

什么是脏数据

目前主要有三类脏数据:

  1. Reader读到不支持的类型、不合法的值。
  2. 不支持的类型转换,比如:Bytes 转换为 Date
  3. 写入目标端失败,比如:写 MySQL 整型长度超长。

如何处理脏数据

Reader.TaskWriter.Task 中,通过 AbstractTaskPlugin.getPluginCollector() 可以拿到一个 TaskPluginCollector,它提供了一系列 collectDirtyRecord 的方法。 当脏数据出现时,只需要调用合适的 collectDirtyRecord 方法,把被认为是脏数据的 Record 传入即可。

用户可以在任务的配置中指定脏数据限制条数或者百分比限制,当脏数据超出限制时,框架会结束同步任务,退出。插件需要保证脏数据都被收集到,其他工作交给框架就好。

加载原理

  1. 框架扫描 plugin/readerplugin/writer目录,加载每个插件的 plugin.json 文件。
  2. plugin.json 文件中 name 为 key,索引所有的插件配置。如果发现重名的插件,框架会异常退出。
  3. 用户在插件中在 reader/writer 配置的 name 字段指定插件名字。框架根据插件的类型(reader/writer)和插件名称去插件的路径下扫描所有的jar,加入 classpath
  4. 根据插件配置中定义的入口类,框架通过反射实例化对应的 JobTask 对象。