Skip to content

Kudu Reader

Kudu Reader 插件利用 Kudu 的 java客户端 KuduClient 进行 Kudu 的读操作。

配置示例

我们通过 Trinokudu connector 连接 kudu 服务,然后进行表创建以及数据插入

建表语句以及数据插入语句

sql
CREATE TABLE kudu.default.users (
  user_id int WITH (primary_key = true),
  user_name varchar with (nullable=true),
  age int with (nullable=true),
  salary double with (nullable=true),
  longtitue decimal(18,6) with (nullable=true),
  latitude decimal(18,6) with (nullable=true),
  p decimal(21,20) with (nullable=true),
  mtime timestamp with (nullable=true)
) WITH (
  partition_by_hash_columns = ARRAY['user_id'],
  partition_by_hash_buckets = 2
);

insert into kudu.default.users 
values 
(1, cast('wgzhao' as varchar), 18, cast(18888.88 as double), 
 cast(123.282424 as decimal(18,6)), cast(23.123456 as decimal(18,6)),
 cast(1.12345678912345678912 as decimal(21,20)), 
 timestamp '2021-01-10 14:40:41'),
(2, cast('anglina' as varchar), 16, cast(23456.12 as double), 
 cast(33.192123 as decimal(18,6)), cast(56.654321 as decimal(18,6)), 
 cast(1.12345678912345678912 as decimal(21,20)), 
 timestamp '2021-01-10 03:40:41');
-- ONLY insert primary key value
 insert into kudu.default.users(user_id) values  (3);

配置

以下是读取kudu表并输出到终端的配置

json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": {
      "reader": {
        "name": "kudureader",
        "parameter": {
          "masterAddress": "localhost:7051,localhost:7151,localhost:7251",
          "table": "users",
          "splitPk": "user_id",
          "lowerBound": 1,
          "upperBound": 100,
          "readTimeout": 5,
          "scanTimeout": 10
        }
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }
  }
}

把上述配置文件保存为 job/kudu2stream.json

执行

执行下面的命令进行采集

bash
bin/addax.sh job/kudu2stream.json

参数说明

配置项是否必须类型默认值描述
masterAddressstringKudu Master 集群RPC地址,多个地址用逗号(,)分隔
tablestringkudu 表名
splitPkstring并行读取数据分片字段
lowerBoundstring并行读取数据分片范围下界
upperBoundstring并行读取数据分片范围上界
readTimeoutint10读取数据超时(秒)
scanTimeoutint20数据扫描请求超时(秒)
columnlist指定要获取的字段
wherelist指定其他过滤条件,详见下面描述
haveKerberosbooleanfalse是否启用 Kerberos 认证,如果启用,则需要同时配置以下两项
kerberosKeytabFilePathstring用于 Kerberos 认证的凭证文件路径, 比如 /your/path/addax.service.keytab
kerberosPrincipalstring用于 Kerberos 认证的凭证主体, 比如 addax/[email protected]

where

where 用来定制更多的过滤条件,他是一个数组类型,数组的每个元素都是一个过滤条件,比如

json
{
  "where": ["age > 1", "user_name = 'wgzhao'"]
}

上述定义了两个过滤条件,每个过滤条件由三部分组成,格式为 column operator value

  • column: 要过滤的字段
  • operator: 比较符号,当前仅支持 =, >, >=, <, <= , != 其他操作符号当前还不支持
  • value: 比较值

这里还有其他一些限定,在使用时,要特别注意:

  1. 多个过滤条件之间的逻辑与关系(AND),暂不支持逻辑或(OR)关系

类型转换

Addax 内部类型Kudu 数据类型
Longbyte, short, int, long
Doublefloat, double, decimal
Stringstring
Datetimestamp
Booleanboolean
Bytesbinary