跳转至

DatabendWriter

Databend 插件用于向 Databend 数据库以流式方式写入数据。 其实现上是通过访问 Databend http 连接(8000) ,然后通过 stream load 加载数据到数据中,相比 insert into 方式效率要高不少,也是官方推荐的生产环境下的数据加载方式。

Databend 是一个兼容 MySQL 协议的数据库后端,因此 Databend 读取可以使用 MySQLReader 进行访问。

示例

假定要写入的表的建表语句如下:

CREATE
DATABASE example_db;
CREATE TABLE `example_db`.`table1` (
  `siteid` INT DEFAULT CAST(10 AS INT),
  `citycode` INT,
  `username` VARCHAR,
  `pv` BIGINT
);

下面配置一个从内存读取数据,然后写入到 databend 表的配置文件

{
  "job": {
    "setting": {
      "speed": {
        "channel": 2
      }
    },
    "content": {
      "writer": {
        "name": "databendwriter",
        "parameter": {
          "preSql": [
            "truncate table @table"
          ],
          "postSql": [
          ],
          "username": "u1",
          "password": "123",
          "database": "example_db",
          "table": "table1",
          "jdbcUrl": "jdbc:mysql://127.0.0.1:3307/example_db",
          "loadUrl": ["127.0.0.1:8000","127.0.0.1:8000"],
          "fieldDelimiter": "\\x01",
          "lineDelimiter": "\\x02",
          "column": ["*"],
          "format": "csv"
        }
      },
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [
            {
              "random": "1,500",
              "type": "long"
            },
            {
              "random": "1,127",
              "type": "long"
            },
            {
              "value": "this is a text",
              "type": "string"
            },
            {
              "random": "5,200",
              "type": "long"
            }
          ],
          "sliceRecordCount": 100
        }
      }
    }
  }
}

将上述配置文件保存为 job/stream2databend.json

执行下面的命令

bin/addax.sh job/stream2Databend.json

参数说明

配置项 是否必须 类型 默认值 描述
jdbcUrl string 目的数据库的 JDBC 连接信息,用于执行preSqlpostSql
loadUrl string Databend query 节点的地址用于StreamLoad,可以为多个 query 地址,query_ip:query_http_port,从多个地址轮循写入 |
username string HTTP 签名验证帐号
password string HTTP 签名验证密码
database string Databend表的数据库名称
table string Databend表的表名称
column list 所配置的表中需要同步的列名集合,详细描述见 rdbmswriter
maxBatchRows int 500000 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起
maxBatchSize int 104857600 单次StreamLoad导入的最大字节数
flushInterval int 300000 上一次StreamLoad结束至下一次开始的时间间隔(单位:ms)
endpoint string Databend 的HTTP连接方式,只需要写到主机和端口即可,具体路径插件会自动拼装 |
username string HTTP 签名验证帐号
password string HTTP 签名验证密码
table string 所选取的需要同步的表名
column list 所配置的表中需要同步的列名集合,详细描述见 rdbmswriter
batchSize int 1024
lineDelimiter string \n 每行的分隔符,支持高位字节, 例如 \\x02
filedDelimiter string \t 每列的分隔符,支持高位字节, 例如 \\x01
format string csv 被导入数据会被转换成 format 指定格式。

类型转换

默认传入的数据均会被转为字符串,并以\t作为列分隔符,\n作为行分隔符,组成csv文件进行StreamLoad导入操作。