跳转至

Kudu Reader

Kudu Reader 插件利用 Kudu 的 java客户端 KuduClient 进行 Kudu 的读操作。

配置示例

我们通过 Trinokudu connector 连接 kudu 服务,然后进行表创建以及数据插入

建表语句以及数据插入语句

CREATE TABLE kudu.default.users (
  user_id int WITH (primary_key = true),
  user_name varchar with (nullable=true),
  age int with (nullable=true),
  salary double with (nullable=true),
  longtitue decimal(18,6) with (nullable=true),
  latitude decimal(18,6) with (nullable=true),
  p decimal(21,20) with (nullable=true),
  mtime timestamp with (nullable=true)
) WITH (
  partition_by_hash_columns = ARRAY['user_id'],
  partition_by_hash_buckets = 2
);

insert into kudu.default.users 
values 
(1, cast('wgzhao' as varchar), 18, cast(18888.88 as double), 
 cast(123.282424 as decimal(18,6)), cast(23.123456 as decimal(18,6)),
 cast(1.12345678912345678912 as decimal(21,20)), 
 timestamp '2021-01-10 14:40:41'),
(2, cast('anglina' as varchar), 16, cast(23456.12 as double), 
 cast(33.192123 as decimal(18,6)), cast(56.654321 as decimal(18,6)), 
 cast(1.12345678912345678912 as decimal(21,20)), 
 timestamp '2021-01-10 03:40:41');
-- ONLY insert primary key value
 insert into kudu.default.users(user_id) values  (3);

配置

以下是读取kudu表并输出到终端的配置

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": {
      "reader": {
        "name": "kudureader",
        "parameter": {
          "masterAddress": "localhost:7051,localhost:7151,localhost:7251",
          "table": "users",
          "splitPk": "user_id",
          "lowerBound": 1,
          "upperBound": 100,
          "readTimeout": 5,
          "scanTimeout": 10
        }
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }
  }
}

把上述配置文件保存为 job/kudu2stream.json

执行

执行下面的命令进行采集

bin/addax.sh job/kudu2stream.json

参数说明

配置项 是否必须 类型 默认值 描述
masterAddress string Kudu Master 集群RPC地址,多个地址用逗号(,)分隔
table string kudu 表名
splitPk string 并行读取数据分片字段
lowerBound string 并行读取数据分片范围下界
upperBound string 并行读取数据分片范围上界
readTimeout int 10 读取数据超时(秒)
scanTimeout int 20 数据扫描请求超时(秒)
column list 指定要获取的字段
where list 指定其他过滤条件,详见下面描述

where

where 用来定制更多的过滤条件,他是一个数组类型,数组的每个元素都是一个过滤条件,比如

{
  "where": ["age > 1", "user_name = 'wgzhao'"] 
}

上述定义了两个过滤条件,每个过滤条件由三部分组成,格式为 column operator value

  • column: 要过滤的字段
  • operator: 比较符号,当前仅支持 =, >, >=, <, <= , != 其他操作符号当前还不支持
  • value: 比较值

这里还有其他一些限定,在使用时,要特别注意:

  1. 多个过滤条件之间的逻辑与关系(AND),暂不支持逻辑或(OR)关系

类型转换

Addax 内部类型 Kudu 数据类型
Long byte, short, int, long
Double float, double, decimal
String string
Date timestamp
Boolean boolean
Bytes binary