Kudu Reader
Kudu Reader plugin uses Kudu's Java client KuduClient to perform Kudu read operations.
Configuration Example
We connect to kudu service through Trino's kudu connector, then perform table creation and data insertion.
Table Creation and Data Insertion Statements
sql
CREATE TABLE kudu.default.users (
user_id int WITH (primary_key = true),
user_name varchar with (nullable=true),
age int with (nullable=true),
salary double with (nullable=true),
longtitue decimal(18,6) with (nullable=true),
latitude decimal(18,6) with (nullable=true),
p decimal(21,20) with (nullable=true),
mtime timestamp with (nullable=true)
) WITH (
partition_by_hash_columns = ARRAY['user_id'],
partition_by_hash_buckets = 2
);
insert into kudu.default.users
values
(1, cast('wgzhao' as varchar), 18, cast(18888.88 as double),
cast(123.282424 as decimal(18,6)), cast(23.123456 as decimal(18,6)),
cast(1.12345678912345678912 as decimal(21,20)),
timestamp '2021-01-10 14:40:41'),
(2, cast('anglina' as varchar), 16, cast(23456.12 as double),
cast(33.192123 as decimal(18,6)), cast(56.654321 as decimal(18,6)),
cast(1.12345678912345678912 as decimal(21,20)),
timestamp '2021-01-10 03:40:41');
-- ONLY insert primary key value
insert into kudu.default.users(user_id) values (3);Configuration
The following is the configuration for reading kudu table and outputting to terminal:
json
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": {
"reader": {
"name": "kudureader",
"parameter": {
"masterAddress": "localhost:7051,localhost:7151,localhost:7251",
"table": "users",
"splitPk": "user_id",
"lowerBound": 1,
"upperBound": 100,
"readTimeout": 5,
"scanTimeout": 10
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true
}
}
}
}
}Save the above configuration file as job/kudu2stream.json
Execution
Execute the following command for collection
bash
bin/addax.sh job/kudu2stream.jsonParameters
| Configuration | Required | Type | Default Value | Description |
|---|---|---|---|---|
| masterAddress | Yes | string | None | Kudu Master cluster RPC address, multiple addresses separated by comma (,) |
| table | Yes | string | None | Kudu table name |
| splitPk | No | string | None | Parallel reading data shard field |
| lowerBound | No | string | None | Lower bound of parallel reading data shard range |
| upperBound | No | string | None | Upper bound of parallel reading data shard range |
| readTimeout | No | int | 10 | Read data timeout (seconds) |
| scanTimeout | No | int | 20 | Data scan request timeout (seconds) |
| column | No | list | None | Specify fields to get |
| where | No | list | None | Specify other filter conditions, see description below |
| haveKerberos | No | boolean | false | Whether to enable Kerberos authentication, if enabled, need to configure the following two items |
| kerberosKeytabFilePath | No | string | None | Credential file path for Kerberos authentication, e.g. /your/path/addax.service.keytab |
| kerberosPrincipal | No | string | None | Credential principal for Kerberos authentication, e.g. addax/[email protected] |
where
where is used to define more filter conditions. It is an array type, where each element of the array is a filter condition, for example:
json
{
"where": ["age > 1", "user_name = 'wgzhao'"]
}The above defines two filter conditions. Each filter condition consists of three parts in the format column operator value:
column: Field to filteroperator: Comparison symbol, currently only supports=,>,>=,<,<=,!=. Other operators are not currently supportedvalue: Comparison value
There are other limitations here that need special attention when using:
- Multiple filter conditions have logical AND relationship between them, logical OR relationship is not supported yet
Type Conversion
| Addax Internal Type | Kudu Data Type |
|---|---|
| Long | byte, short, int, long |
| Double | float, double, decimal |
| String | string |
| Date | timestamp |
| Boolean | boolean |
| Bytes | binary |