跳转至

StarRocks Writer

StarRocks Writer 插件用于向 Starrocks 数据库以流式方式写入数据。 其实现上是通过访问 Doris http 连接(8030) ,然后通过 stream load 加载数据到数据中,相比 insert into 方式效率要高不少,也是官方推荐的生产环境下的数据加载方式。

StarRocks 是一个兼容 MySQL 协议的数据库后端,因此 Doris 读取可以使用 MySQLReader 进行访问。

示例

假定要写入的表的建表语句如下:

CREATE DATABASE example_db;
CREATE TABLE example_db.table1
(
    siteid   INT         DEFAULT '10',
    citycode SMALLINT,
    username VARCHAR(32) DEFAULT '',
    pv       BIGINT SUM DEFAULT '0'
) AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");

下面配置一个从内存读取数据,然后写入到 doris 表的配置文件

{
  "job": {
    "setting": {
      "speed": {
        "channel": 2
      }
    },
    "content": {
      "writer": {
        "name": "starrockswriter",
        "parameter": {
          "username": "test",
          "password": "123456",
          "database": "example_db",
          "table": "table1",
          "jdbcUrl": "jdbc:mysql://172.28.17.100:9030/",
          "loadUrl": ["172.28.17.100:8030", "172.28.17.100:8030"],
          "loadProps": {
               "column_separator": "\\x01",
               "row_delimiter": "\\x02"
           },
          "column": ["siteid", "citycode", "username", "pv"]
        }
      },
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [
            {
              "random": "1,500",
              "type": "long"
            },
            {
              "random": "1,127",
              "type": "long"
            },
            {
              "value": "this is a text",
              "type": "string"
            },
            {
              "random": "5,200",
              "type": "long"
            }
          ],
          "sliceRecordCount": 100
        }
      }
    }
  }
}

将上述配置文件保存为 job/stream2starrocks.json

执行下面的命令

bin/addax.sh job/stream2starrocks.json

参数说明

该插件基于 RDBMS Writer 实现,因此可以参考 RDBMS Writer 的所有配置项,并增加了下面的配置项。

配置项 是否必须 类型 默认值 描述
loadUrl string StarRocks FE 的地址用于StreamLoad1,可以为多个fe地址,fe_ip:fe_http_port
maxBatchRows int 500000 单次StreamLoad导入的最大行数
maxBatchSize int 104857600 单次StreamLoad导入的最大字节数
flushInterval int 300000 上一次StreamLoad结束至下一次开始的时间间隔(单位:ms)
loadProps map streamLoad 的请求参数,详情参照StreamLoad介绍页面

类型转换

默认传入的数据均会被转为字符串,并以\t作为列分隔符,\n作为行分隔符,组成csv文件进行StreamLoad导入操作。 如需更改列分隔符, 则正确配置 loadProps 即可:

"loadProps": {
    "column_separator": "\\x01",
    "row_delimiter": "\\x02"
}

如需更改导入格式为json, 则正确配置 loadProps 即可:

"loadProps": {
    "format": "json",
    "strip_outer_array": true
}