跳转至

MySQL Reader

MysqlReader 插件实现了从 MySQL 读取数据的能力

示例

我们在 MySQL 的 test 库上创建如下表,并插入一条记录

CREATE TABLE addax_reader
(
    c_bigint     bigint,
    c_varchar    varchar(100),
    c_timestamp  timestamp,
    c_text       text,
    c_decimal    decimal(8, 3),
    c_mediumtext mediumtext,
    c_longtext   longtext,
    c_int        int,
    c_time       time,
    c_datetime   datetime,
    c_enum       enum('one', 'two', 'three'),
    c_float      float,
    c_smallint   smallint,
    c_bit        bit,
    c_double     double,
    c_blob       blob,
    c_char       char(5),
    c_varbinary  varbinary(100),
    c_tinyint    tinyint,
    c_json       json,
    c_set SET ('a', 'b', 'c', 'd'),
    c_binary     binary,
    c_longblob   longblob,
    c_mediumblob mediumblob
);
INSERT INTO addax_reader
VALUES (2E18,
        'a varchar data',
        '2021-12-12 12:12:12',
        'a long text',
        12345.122,
        'a medium text',
        'a long text',
         2 ^ 32 - 1,
        '12:13:14',
        '2021-12-12 12:13:14',
        'one',
        17.191,
        126,
        0,
        1114.1114,
        'blob',
        'a123b',
        'a var binary content',
        126,
        '{"k1":"val1","k2":"val2"}',
        'b',
        binary(1),
        x'89504E470D0A1A0A0000000D494844520000001000000010080200000090916836000000017352474200',
        x'89504E470D0A1A0A0000000D');

下面的配置是读取该表到终端的作业:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3,
        "bytes": -1
      }
    },
    "content": {
      "reader": {
        "name": "mysqlreader",
        "parameter": {
          "username": "root",
          "password": "root",
          "column": [
            "*"
          ],
          "connection": [
            {
              "table": [
                "addax_reader"
              ],
              "jdbcUrl": [
                "jdbc:mysql://127.0.0.1:3306/test"
              ],
              "driver": "com.mysql.jdbc.Driver"
            }
          ]
        }
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }
  }
}

将上述配置文件保存为 job/mysql2stream.json

执行采集命令

执行以下命令进行数据采集

bin/addax.sh job/mysql2stream.json

参数说明

MysqlReader 基于 rdbmsreader 实现,因此可以参考 rdbmsreader 的所有配置项。

driver

当前 Addax 采用的 MySQL JDBC 驱动为 8.0 以上版本,驱动类名使用的 com.mysql.cj.jdbc.Driver,而不是 com.mysql.jdbc.Driver。 如果你需要采集的 MySQL 服务低于 5.6,需要使用到 Connector/J 5.1 驱动,则可以采取下面的步骤:

替换插件内置的驱动

rm -f plugin/reader/mysqlreader/lib/mysql-connector-java-*.jar

拷贝老的驱动到插件目录

cp mysql-connector-java-5.1.48.jar plugin/reader/mysqlreader/lib/

指定驱动类名称

在你的 json 文件类,配置 "driver": "com.mysql.jdbc.Driver"

类型转换注意事项

  • tinyint(1) 会视为整形
  • year 被视为整形
  • bit 如果是 bit(1) 被视为布尔类型,否则当作二进制类型