Kudu Reader¶
KuduReader 插件利用 Kudu 的java客户端KuduClient进行Kudu的读操作。
配置示例¶
我们通过 Trino 的 kudu connector
连接 kudu 服务,然后进行表创建以及数据插入
建表语句以及数据插入语句¶
CREATE TABLE kudu.default.users (
user_id int WITH (primary_key = true),
user_name varchar with (nullable=true),
age int with (nullable=true),
salary double with (nullable=true),
longtitue decimal(18,6) with (nullable=true),
latitude decimal(18,6) with (nullable=true),
p decimal(21,20) with (nullable=true),
mtime timestamp with (nullable=true)
) WITH (
partition_by_hash_columns = ARRAY['user_id'],
partition_by_hash_buckets = 2
);
insert into kudu.default.users
values
(1, cast('wgzhao' as varchar), 18, cast(18888.88 as double),
cast(123.282424 as decimal(18,6)), cast(23.123456 as decimal(18,6)),
cast(1.12345678912345678912 as decimal(21,20)),
timestamp '2021-01-10 14:40:41'),
(2, cast('anglina' as varchar), 16, cast(23456.12 as double),
cast(33.192123 as decimal(18,6)), cast(56.654321 as decimal(18,6)),
cast(1.12345678912345678912 as decimal(21,20)),
timestamp '2021-01-10 03:40:41');
-- ONLY insert primary key value
insert into kudu.default.users(user_id) values (3);
配置¶
以下是读取kudu表并输出到终端的配置
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": {
"reader": {
"name": "kudureader",
"parameter": {
"masterAddress": "localhost:7051,localhost:7151,localhost:7251",
"table": "users",
"splitPk": "user_id",
"lowerBound": 1,
"upperBound": 100,
"readTimeout": 5,
"scanTimeout": 10
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
}
}
把上述配置文件保存为 job/kudu2stream.json
执行¶
执行下面的命令进行采集
bin/addax.sh job/kudu2stream.json
参数说明¶
配置项 | 是否必须 | 类型 | 默认值 | 描述 |
---|---|---|---|---|
masterAddress | 必须 | string | 无 | Kudu Master集群RPC地址,多个地址用逗号(,)分隔 |
table | 必须 | string | 无 | kudu 表名 |
splitPk | 否 | string | 无 | 并行读取数据分片字段 |
lowerBound | 否 | string | 无 | 并行读取数据分片范围下界 |
upperBound | 否 | string | 无 | 并行读取数据分片范围上界 |
readTimeout | 否 | int | 10 | 读取数据超时(秒) |
scanTimeout | 否 | int | 20 | 数据扫描请求超时(秒) |
column | 否 | list | 无 | 指定要获取的字段,多个字段用逗号分隔,比如 "column":["user_id","user_name","age"] |
where | 否 | list | 无 | 指定其他过滤条件,详见下面描述 |
where¶
where
用来定制更多的过滤条件,他是一个数组类型,数组的每个元素都是一个过滤条件,比如
{
"where": ["age > 1", "user_name = 'wgzhao'"]
}
上述定义了两个过滤条件,每个过滤条件由三部分组成,格式为 column operator value
column
: 要过滤的字段operator
: 比较符号,当前仅支持=
,>
, '>=',<
,<=
, 其他操作符号当前还不支持value
: 比较值,如果是字符串,可以加上单引号('
), 不加可以,因为实际类型会从数据库表中获取对应字段(column
)的类型,但如果值含有空格,则一定要加上单引号
这里还有其他一些限定,在使用时,要特别注意:
- 上述三个部分之间至少有一个空格
age>1
,age >1
这种均无效,这是因为我们实际上是把 SQL 风格的过滤提交转换为 Kudu 的 KuduPredicate 类 - 多个过滤条件之间的逻辑与关系(
AND
),暂不支持逻辑或(OR
)关系
类型转换¶
Addax 内部类型 | Kudu 数据类型 |
---|---|
Long | byte, short, int, long |
Double | float, double, decimal |
String | string |
Date | timestamp |
Boolean | boolean |
Bytes | binary |