Kudu Writer¶
KuduWriter 插件实现了将数据写入到 kudu 的能力,当前是通过调用原生RPC接口来实现的。 后期希望通过 impala 接口实现,从而增加更多的功能。
示例¶
以下示例演示了如何从内存读取样例数据并写入到 kudu 表中的。
表结构¶
我们用 trino 工具连接到 kudu 服务,然后通过下面的 SQL 语句创建表
CREATE TABLE kudu.default.users (
user_id int WITH (primary_key = true),
user_name varchar,
salary double
) WITH (
partition_by_hash_columns = ARRAY['user_id'],
partition_by_hash_buckets = 2
);
job 配置文件¶
创建 job/stream2kudu.json
文件,内容如下:
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": {
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"random": "1,1000",
"type": "long"
},
{
"random": "1,10",
"type": "string"
},
{
"random": "1000,50000",
"type": "double"
}
],
"sliceRecordCount": 1000
}
},
"writer": {
"name": "kuduwriter",
"parameter": {
"masterAddress": "127.0.0.1:7051,127.0.0.1:7151,127.0.0.1:7251",
"timeout": 60,
"table": "users",
"writeMode": "upsert",
"column": [ "user_id", "user_name", "salary"],
"batchSize": 1024,
"bufferSize": 2048,
"skipFail": false,
"encoding": "UTF-8"
}
}
}
}
}
运行¶
执行下下面的命令进行数据采集
bin/addax.sh job/stream2kudu.json
参数说明¶
配置项 | 是否必须 | 类型 | 默认值 | 描述 |
---|---|---|---|---|
masterAddress | 必须 | string | 无 | Kudu Master集群RPC地址,多个地址用逗号(,)分隔 |
table | 必须 | string | 无 | kudu 表名 |
writeMode | 否 | string | upsert | 表数据写入模式,支持 upsert, insert 两者 |
timeout | 否 | int | 100 | 写入数据超时时间(秒), 0 表示不受限制 |
column | 是 | list | 无 | 要写入的表字段及类型,配置方式见上示例 |
skipFail | 否 | boolean | false | 是否跳过插入失败的记录,如果设置为true,则插件不会把插入失败的当作异常 |
column¶
column
可以直接指定要写入的列,如同上述例子,也可以设置 ["*"]
来表示写入所有列。