Skip to content

Doris Writer

DorisWriter 插件用于向 Doris 数据库以流式方式写入数据。 其实现上是通过访问 Doris http 连接(8030),然后通过 stream load 加载数据到数据中,相比 insert into 方式效率要高不少,也是官方推荐的生产环境下的数据加载方式。

Doris 是一个兼容 MySQL 协议的数据库后端,因此 Doris 读取可以使用 MySQL Reader 进行访问。

示例

假定要写入的表的建表语句如下:

sql
CREATE DATABASE example_db;
CREATE TABLE example_db.table1
(
  siteid INT DEFAULT '10',
  citycode SMALLINT,
  username VARCHAR(32) DEFAULT '',
  pv BIGINT SUM DEFAULT '0'
) AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY HASH(siteid) BUCKETS 10
PROPERTIES("replication_num" = "1");

下面配置一个从内存读取数据,然后写入到 doris 表的配置文件

json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 2
      }
    },
    "content": {
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [
            {
              "random": "1,500",
              "type": "long"
            },
            {
              "random": "1,127",
              "type": "long"
            },
            {
              "value": "this is a text",
              "type": "string"
            },
            {
              "random": "5,200",
              "type": "long"
            }
          ],
          "sliceRecordCount": 100
        }
      },
      "writer": {
        "name": "doriswriter",
        "parameter": {
          "loadUrl": [
            "127.0.0.1:8030"
          ],
          "username": "test",
          "password": "123456",
          "batchSize": 1024,
          "column": [
            "siteid",
            "citycode",
            "username",
            "pv"
          ],
          "connection": {
            "table": "table1",
            "database": "example_db",
            "jdbcUrl": "jdbc:mysql://localhost:9030/example_db"
          },
          "loadProps": {
            "format": "json",
            "strip_outer_array": true
          }
        }
      }
    }
  }
}

将上述配置文件保存为 job/stream2doris.json

执行下面的命令

bash
bin/addax.sh job/stream2doris.json

输出类似如下:

Details
txt
2021-02-23 15:22:57.851 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2021-02-23 15:22:57.871 [main] INFO  Engine -
{
"content":{
"reader":{
    "parameter":{
            "column":[
                    {
                            "random":"1,500",
                            "type":"long"
                    },
                    {
                            "random":"1,127",
                            "type":"long"
                    },
                    {
                            "type":"string",
                            "value":"username"
                    }
            ],
            "sliceRecordCount":100
    },
    "name":"streamreader"
},
"writer":{
    "parameter":{
            "password":"*****",
            "batchSize":1024,
            "connection":[
                    {
                            "database":"example_db",
                            "endpoint":"http://127.0.0.1:8030/",
                            "table":"table1"
                    }
            ],
            "username":"test"
    },
    "name":"doriswriter"
 }
},
"setting":{
"speed":{
"channel":2
}
}
}

2021-02-23 15:22:57.886 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-23 15:22:57.886 [main] INFO  JobContainer - Addax jobContainer starts job.
2021-02-23 15:22:57.920 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2021-02-23 15:22:57.928 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [2] channels for [2] tasks.
2021-02-23 15:22:57.935 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-02-23 15:22:57.936 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2021-02-23 15:22:57.970 [0-0-1-writer] INFO  DorisWriterTask - connect DorisDB with http://127.0.0.1:8030//api/example_db/table1/_stream_load
2021-02-23 15:22:57.970 [0-0-0-writer] INFO  DorisWriterTask - connect DorisDB with http://127.0.0.1:8030//api/example_db/table1/_stream_load

2021-02-23 15:23:00.941 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-02-23 15:23:00.946 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-02-23 15:22:57
任务结束时刻                    : 2021-02-23 15:23:00
任务总计耗时                    :                  3s
任务平均流量                    :            1.56KB/s
记录写入速度                    :             66rec/s
读出记录总数                    :                 200
读写失败总数                    :                   0

参数说明

配置项是否必须类型默认值描述
loadUrlstringStream Load 的连接目标 |
usernamestring访问Doris数据库的用户名
passwordstring访问Doris数据库的密码
flushIntervalint3000数据写入到目标表的间隔时间,单位为毫秒,即每隔多少毫秒写入一次数据
flushQueueLengthint1上传数据的队列长度
tablestring所选取的需要同步的表名
columnlist所配置的表中需要同步的列名集合,详细描述见 RBDMS Writer
batchSizeint2048每批次导入数据的最大行数
loadPropsmapcsvstreamLoad 的请求参数,详情参照StreamLoad介绍页面
preSqllist写入数据到目标表前要执行的 SQL 语句
postSqllist数据写完后要执行的 SQL 语句

loadUrl

作为 Stream Load 的连接目标。格式为 "ip:port"。其中 IP 是 FE 节点 IP,port 是 FE 节点的 http_port。可以填写多个,当填写多个时,插件会每个批次随机选择一个有效 FE 节点进行连接。

column

允许配置为 ["*"] , 如果是 "*" , 则尝试从 Doris 数据库中直接读取表字段,然后进行拼装。

loadProps

StreamLoad 的请求参数,详情参照StreamLoad介绍页面。Stream load - Apache Doris

这里包括导入的数据格式:format等,导入数据格式默认我们使用csv,支持JSON,具体可以参照下面类型转换部分,也可以参照上面Stream load 官方信息

类型转换

默认传入的数据均会被转为字符串,并以 \t 作为列分隔符,\n 作为行分隔符,组成 csv 文件进行 StreamLoad 导入操作。

默认是 csv 格式导入,如需更改列分隔符, 则正确配置 loadProps 即可

json
{
  "loadProps": {
    "column_separator": "\\x01",
    "line_delimiter": "\\x02"
  }
}

如需更改导入格式为json, 则正确配置 loadProps 即可:

json
{
  "loadProps": {
    "format": "json",
    "strip_outer_array": true
  }
}
``