跳转至

Doris Writer

DorisWriter 插件用于向 Doris 数据库以流式方式写入数据。 其实现上是通过访问 Doris http 连接(8030),然后通过 stream load 加载数据到数据中,相比 insert into 方式效率要高不少,也是官方推荐的生产环境下的数据加载方式。

Doris 是一个兼容 MySQL 协议的数据库后端,因此 Doris 读取可以使用 MySQL Reader 进行访问。

示例

假定要写入的表的建表语句如下:

CREATE
DATABASE example_db;
CREATE TABLE example_db.table1
(
    siteid   INT         DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv       BIGINT SUM DEFAULT '0'
) AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");

下面配置一个从内存读取数据,然后写入到 doris 表的配置文件

{
  "job": {
    "setting": {
      "speed": {
        "channel": 2
      }
    },
    "content": {
      "writer": {
        "name": "doriswriter",
        "parameter": {
          "loadUrl": [
            "127.0.0.1:8030"
          ],
          "username": "test",
          "password": "123456",
          "batchSize": 1024,
          "column": [
            "siteid",
            "citycode",
            "username",
            "pv"
          ],
          "connection": {
            "table": "table1",
            "database": "example_db",
            "jdbcUrl": "jdbc:mysql://localhost:9030/example_db"
          },
          "loadProps": {
            "format": "json",
            "strip_outer_array": true
          }
        }
      },
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [
            {
              "random": "1,500",
              "type": "long"
            },
            {
              "random": "1,127",
              "type": "long"
            },
            {
              "value": "this is a text",
              "type": "string"
            },
            {
              "random": "5,200",
              "type": "long"
            }
          ],
          "sliceRecordCount": 100
        }
      }
    }
  }
}

将上述配置文件保存为 job/stream2doris.json

执行下面的命令

bin/addax.sh job/stream2doris.json

输出类似如下:

2021-02-23 15:22:57.851 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2021-02-23 15:22:57.871 [main] INFO  Engine -
{
"content":{
"reader":{
    "parameter":{
            "column":[
                    {
                            "random":"1,500",
                            "type":"long"
                    },
                    {
                            "random":"1,127",
                            "type":"long"
                    },
                    {
                            "type":"string",
                            "value":"username"
                    }
            ],
            "sliceRecordCount":100
    },
    "name":"streamreader"
},
"writer":{
    "parameter":{
            "password":"*****",
            "batchSize":1024,
            "connection":[
                    {
                            "database":"example_db",
                            "endpoint":"http://127.0.0.1:8030/",
                            "table":"table1"
                    }
            ],
            "username":"test"
    },
    "name":"doriswriter"
 }
},
"setting":{
"speed":{
"channel":2
}
}
}

2021-02-23 15:22:57.886 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-23 15:22:57.886 [main] INFO  JobContainer - Addax jobContainer starts job.
2021-02-23 15:22:57.920 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2021-02-23 15:22:57.928 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [2] channels for [2] tasks.
2021-02-23 15:22:57.935 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-02-23 15:22:57.936 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2021-02-23 15:22:57.970 [0-0-1-writer] INFO  DorisWriterTask - connect DorisDB with http://127.0.0.1:8030//api/example_db/table1/_stream_load
2021-02-23 15:22:57.970 [0-0-0-writer] INFO  DorisWriterTask - connect DorisDB with http://127.0.0.1:8030//api/example_db/table1/_stream_load

2021-02-23 15:23:00.941 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-02-23 15:23:00.946 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-02-23 15:22:57
任务结束时刻                    : 2021-02-23 15:23:00
任务总计耗时                    :                  3s
任务平均流量                    :            1.56KB/s
记录写入速度                    :             66rec/s
读出记录总数                    :                 200
读写失败总数                    :                   0

参数说明

配置项 是否必须 类型 默认值 描述
loadUrl string Stream Load 的连接目标 |
username string 访问Doris数据库的用户名
password string 访问Doris数据库的密码
flushInterval int 3000 数据写入到目标表的间隔时间,单位为毫秒,即每隔多少毫秒写入一次数据
flushQueueLength int 1 上传数据的队列长度
table string 所选取的需要同步的表名
column list 所配置的表中需要同步的列名集合,详细描述见 RBDMS Writer
batchSize int 2048 每批次导入数据的最大行数
loadProps map csv streamLoad 的请求参数,详情参照StreamLoad介绍页面
preSql list 写入数据到目标表前要执行的 SQL 语句
postSql list 数据写完后要执行的 SQL 语句

loadUrl

作为 Stream Load 的连接目标。格式为 "ip:port"。其中 IP 是 FE 节点 IP,port 是 FE 节点的 http_port。可以填写多个,当填写多个时,插件会每个批次随机选择一个有效 FE 节点进行连接。

column

允许配置为 ["*"] , 如果是 "*" , 则尝试从 Doris 数据库中直接读取表字段,然后进行拼装。

loadProps

StreamLoad 的请求参数,详情参照StreamLoad介绍页面。Stream load - Apache Doris

这里包括导入的数据格式:format等,导入数据格式默认我们使用csv,支持JSON,具体可以参照下面类型转换部分,也可以参照上面Stream load 官方信息

类型转换

默认传入的数据均会被转为字符串,并以\t作为列分隔符,\n作为行分隔符,组成csv文件进行StreamLoad导入操作。

默认是csv格式导入,如需更改列分隔符, 则正确配置 loadProps 即可

{
  "loadProps": {
    "column_separator": "\\x01",
    "line_delimiter": "\\x02"
  }
}

如需更改导入格式为json, 则正确配置 loadProps 即可:

{
  "loadProps": {
    "format": "json",
    "strip_outer_array": true
  }
}