Skip to content

Kudu Reader

Kudu Reader plugin uses Kudu's Java client KuduClient to perform Kudu read operations.

Configuration Example

We connect to kudu service through Trino's kudu connector, then perform table creation and data insertion.

Table Creation and Data Insertion Statements

sql
CREATE TABLE kudu.default.users (
  user_id int WITH (primary_key = true),
  user_name varchar with (nullable=true),
  age int with (nullable=true),
  salary double with (nullable=true),
  longtitue decimal(18,6) with (nullable=true),
  latitude decimal(18,6) with (nullable=true),
  p decimal(21,20) with (nullable=true),
  mtime timestamp with (nullable=true)
) WITH (
  partition_by_hash_columns = ARRAY['user_id'],
  partition_by_hash_buckets = 2
);

insert into kudu.default.users 
values 
(1, cast('wgzhao' as varchar), 18, cast(18888.88 as double), 
 cast(123.282424 as decimal(18,6)), cast(23.123456 as decimal(18,6)),
 cast(1.12345678912345678912 as decimal(21,20)), 
 timestamp '2021-01-10 14:40:41'),
(2, cast('anglina' as varchar), 16, cast(23456.12 as double), 
 cast(33.192123 as decimal(18,6)), cast(56.654321 as decimal(18,6)), 
 cast(1.12345678912345678912 as decimal(21,20)), 
 timestamp '2021-01-10 03:40:41');
-- ONLY insert primary key value
 insert into kudu.default.users(user_id) values  (3);

Configuration

The following is the configuration for reading kudu table and outputting to terminal:

json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": {
      "reader": {
        "name": "kudureader",
        "parameter": {
          "masterAddress": "localhost:7051,localhost:7151,localhost:7251",
          "table": "users",
          "splitPk": "user_id",
          "lowerBound": 1,
          "upperBound": 100,
          "readTimeout": 5,
          "scanTimeout": 10
        }
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true
        }
      }
    }
  }
}

Save the above configuration file as job/kudu2stream.json

Execution

Execute the following command for collection

bash
bin/addax.sh job/kudu2stream.json

Parameters

ConfigurationRequiredTypeDefault ValueDescription
masterAddressYesstringNoneKudu Master cluster RPC address, multiple addresses separated by comma (,)
tableYesstringNoneKudu table name
splitPkNostringNoneParallel reading data shard field
lowerBoundNostringNoneLower bound of parallel reading data shard range
upperBoundNostringNoneUpper bound of parallel reading data shard range
readTimeoutNoint10Read data timeout (seconds)
scanTimeoutNoint20Data scan request timeout (seconds)
columnNolistNoneSpecify fields to get
whereNolistNoneSpecify other filter conditions, see description below
haveKerberosNobooleanfalseWhether to enable Kerberos authentication, if enabled, need to configure the following two items
kerberosKeytabFilePathNostringNoneCredential file path for Kerberos authentication, e.g. /your/path/addax.service.keytab
kerberosPrincipalNostringNoneCredential principal for Kerberos authentication, e.g. addax/[email protected]

where

where is used to define more filter conditions. It is an array type, where each element of the array is a filter condition, for example:

json
{
  "where": ["age > 1", "user_name = 'wgzhao'"]
}

The above defines two filter conditions. Each filter condition consists of three parts in the format column operator value:

  • column: Field to filter
  • operator: Comparison symbol, currently only supports =, >, >=, <, <=, !=. Other operators are not currently supported
  • value: Comparison value

There are other limitations here that need special attention when using:

  1. Multiple filter conditions have logical AND relationship between them, logical OR relationship is not supported yet

Type Conversion

Addax Internal TypeKudu Data Type
Longbyte, short, int, long
Doublefloat, double, decimal
Stringstring
Datetimestamp
Booleanboolean
Bytesbinary