跳转至

Doris Writer

DorisWriter 插件用于向 Doris 数据库以流式方式写入数据。 其实现上是通过访问 Doris http 连接(8030) ,然后通过 stream load 加载数据到数据中,相比 insert into 方式效率要高不少,也是官方推荐的生产环境下的数据加载方式。

Doris 是一个兼容 MySQL 协议的数据库后端,因此 Doris 读取可以使用 MySQLReader 进行访问。

示例

假定要写入的表的建表语句如下:

CREATE
DATABASE example_db;
CREATE TABLE example_db.table1
(
    siteid   INT         DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv       BIGINT SUM DEFAULT '0'
) AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");

下面配置一个从内存读取数据,然后写入到 doris 表的配置文件

{
  "job": {
    "setting": {
      "speed": {
        "channel": 2
      }
    },
    "content": {
      "writer": {
        "name": "doriswriter",
        "parameter": {
          "username": "test",
          "password": "123456",
          "batchSize": 1024,
          "connection": [
            {
              "table": "table1",
              "database": "example_db",
              "endpoint": "http://127.0.0.1:8030/"
            }
          ],
          "loadProps": {},
          "lineDelimiter": "\n",
          "format": "csv"
        }
      },
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [
            {
              "random": "1,500",
              "type": "long"
            },
            {
              "random": "1,127",
              "type": "long"
            },
            {
              "value": "this is a text",
              "type": "string"
            },
            {
              "random": "5,200",
              "type": "long"
            }
          ],
          "sliceRecordCount": 100
        }
      }
    }
  }
}

将上述配置文件保存为 job/stream2doris.json

执行下面的命令

bin/addax.sh job/stream2doris.json

输出类似如下:

2021-02-23 15:22:57.851 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2021-02-23 15:22:57.871 [main] INFO  Engine -
{
"content":{
"reader":{
    "parameter":{
            "column":[
                    {
                            "random":"1,500",
                            "type":"long"
                    },
                    {
                            "random":"1,127",
                            "type":"long"
                    },
                    {
                            "type":"string",
                            "value":"username"
                    }
            ],
            "sliceRecordCount":100
    },
    "name":"streamreader"
},
"writer":{
    "parameter":{
            "password":"*****",
            "batchSize":1024,
            "connection":[
                    {
                            "database":"example_db",
                            "endpoint":"http://127.0.0.1:8030/",
                            "table":"table1"
                    }
            ],
            "username":"test"
    },
    "name":"doriswriter"
 }
},
"setting":{
"speed":{
"channel":2
}
}
}

2021-02-23 15:22:57.886 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-23 15:22:57.886 [main] INFO  JobContainer - Addax jobContainer starts job.
2021-02-23 15:22:57.920 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2021-02-23 15:22:57.928 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [2] channels for [2] tasks.
2021-02-23 15:22:57.935 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-02-23 15:22:57.936 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2021-02-23 15:22:57.970 [0-0-1-writer] INFO  DorisWriterTask - connect DorisDB with http://127.0.0.1:8030//api/example_db/table1/_stream_load
2021-02-23 15:22:57.970 [0-0-0-writer] INFO  DorisWriterTask - connect DorisDB with http://127.0.0.1:8030//api/example_db/table1/_stream_load

2021-02-23 15:23:00.941 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-02-23 15:23:00.946 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-02-23 15:22:57
任务结束时刻                    : 2021-02-23 15:23:00
任务总计耗时                    :                  3s
任务平均流量                    :            1.56KB/s
记录写入速度                    :             66rec/s
读出记录总数                    :                 200
读写失败总数                    :                   0

参数说明

配置项 是否必须 类型 默认值 描述
endpoint string Doris 的HTTP连接方式,只需要写到主机和端口即可,具体路径插件会自动拼装 |
username string HTTP 签名验证帐号
password string HTTP 签名验证密码
table string 所选取的需要同步的表名
column list 所配置的表中需要同步的列名集合,详细描述见 rdbmswriter
batchSize int 1024 定义了插件和数据库服务器端每次批量数据获取条数,调高该值可能导致 Addax 出现OOM或者目标数据库事务提交失败导致挂起
lineDelimiter string \n 每行的的分隔符,支持多个字节, 例如 \x02\x03
format string csv 导入数据的格式, 可以使是 json 或者 csv
loadProps map csv streamLoad 的请求参数,详情参照StreamLoad介绍页面
connectTimeout int -1 StreamLoad单次请求的超时时间, 单位毫秒(ms)

endpoint

endpoint 只是的任意一个 BE 的主机名及 webserver_port 端口,官方文档描述也可以填写 FE 主机名和 http_port 端口,但实际测试一直处于连接拒绝状态。

column

该插件中的 column 不是必须项,如果没有配置该项,或者配置为 ["*"] , 则按照 reader 插件获取的字段值进行顺序拼装。 否则可以按照如下方式指定需要插入的字段

{
  "column": [
    "siteid",
    "citycode",
    "username"
  ]
}