Skip to content

Kafka Writer

Kafka Writer 插件实现了将数据以 json 格式写入 Kafka 的功能。

示例

以下配置演示了如何从内存读取数据并写入到 kafka 的指定 topic 中。

创建任务文件

首先创建一个任务文件 stream2kafka.json , 内容如下:

json
{
  "job": {
    "setting": {
        "speed": {
            "channel": 1
        }
    },
    "content": [
      {
        "reader": {
            "name": "streamreader",
            "parameter": {
              "column": [
                    {"random": "10,1000", "type": "long"},
                    {"value": "1.1.1.1", "type": "string"},
                    {"value": 19890604.0, "type": "double"},
                    {"value": 19890604, "type": "long"},
                    {"value": 19890604, "type": "long"},
                    {"value": "hello world", "type": "string"},
                    {"value": "long text", "type": "string"},
                    {"value": "41.12,-71.34", "type": "string"},
                    {"value": "2017-05-25 11:22:33", "type": "string"}
                    ],
            "sliceRecordCount": 100
            }
        },
        "writer": {
          "name": "kafkawriter",
          "parameter": {
            "brokerList": "localhost:9092",
            "topic": "test-1",
            "partitions": 0,
            "batchSize": 1000,
            "column": ["col1", "col2","col3","col4","col5", "col6", "col7", "col8", "col9"]
          }
        }
      }
    ]
  }
}

运行

执行 bin/addax.sh stream2kafka.json 命令,获得类似下面的输出:

txt
2022-02-26 21:59:22.975 [        main] INFO  VMInfo               - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2022-02-26 21:59:22.985 [        main] INFO  Engine               - 
{
	"content":{
		"reader":{
			"parameter":{
				"column":[
					{
						"random":"10,1000",
						"type":"long"
					},
					{
						"type":"string",
						"value":"1.1.1.1"
					},
					{
						"type":"double",
						"value":19890604.0
					},
					{
						"type":"long",
						"value":19890604
					},
					{
						"type":"long",
						"value":19890604
					},
					{
						"type":"string",
						"value":"hello world"
					},
					{
						"type":"string",
						"value":"long text"
					},
					{
						"type":"string",
						"value":"41.12,-71.34"
					},
					{
						"type":"string",
						"value":"2017-05-25 11:22:33"
					}
				],
				"sliceRecordCount":100
			},
			"name":"streamreader"
		},
		"writer":{
			"parameter":{
				"partitions":0,
				"column":[
					"col1",
					"col2",
					"col3",
					"col4",
					"col5",
					"col6",
					"col7",
					"col8",
					"col9"
				],
				"topic":"test-1",
				"batchSize":1000,
				"brokerList":"localhost:9092"
			},
			"name":"kafkawriter"
		}
	},
	"setting":{
		"speed":{
			"channel":1
		}
	}
}

2022-02-26 21:59:23.002 [        main] INFO  PerfTrace            - PerfTrace traceId=job_-1, isEnable=false, priority=0
2022-02-26 21:59:23.003 [        main] INFO  JobContainer         - Addax jobContainer starts job.
2022-02-26 21:59:23.004 [        main] INFO  JobContainer         - Set jobId = 0
2022-02-26 21:59:23.017 [       job-0] INFO  JobContainer         - Addax Reader.Job [streamreader] do prepare work .
2022-02-26 21:59:23.017 [       job-0] INFO  JobContainer         - Addax Writer.Job [kafkawriter] do prepare work .
2022-02-26 21:59:23.017 [       job-0] INFO  JobContainer         - Job set Channel-Number to 1 channel(s).
2022-02-26 21:59:23.018 [       job-0] INFO  JobContainer         - Addax Reader.Job [streamreader] splits to [1] tasks.
2022-02-26 21:59:23.019 [       job-0] INFO  JobContainer         - Addax Writer.Job [kafkawriter] splits to [1] tasks.
2022-02-26 21:59:23.039 [       job-0] INFO  JobContainer         - Scheduler starts [1] taskGroups.
2022-02-26 21:59:23.047 [ taskGroup-0] INFO  TaskGroupContainer   - taskGroupId=[0] start [1] channels for [1] tasks.
2022-02-26 21:59:23.050 [ taskGroup-0] INFO  Channel              - Channel set byte_speed_limit to -1, No bps activated.
2022-02-26 21:59:23.050 [ taskGroup-0] INFO  Channel              - Channel set record_speed_limit to -1, No tps activated.
2022-02-26 21:59:23.082 [0-0-0-writer] INFO  ProducerConfig       - ProducerConfig values: 
	acks = 1
	batch.size = 1000
	bootstrap.servers = [localhost:9092]
	buffer.memory = 33554432
	client.id = addax-kafka-writer
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 0
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2022-02-26 21:59:23.412 [0-0-0-writer] INFO  AppInfoParser        - Kafka version : 2.0.0
2022-02-26 21:59:23.413 [0-0-0-writer] INFO  AppInfoParser        - Kafka commitId : 3402a8361b734732
2022-02-26 21:59:23.534 [kafka-producer-network-thread | addax-kafka-writer] INFO  Metadata             - Cluster ID: xPAQZFNDTp6y63nZO4LACA
2022-02-26 21:59:26.061 [       job-0] INFO  AbstractScheduler    - Scheduler accomplished all tasks.
2022-02-26 21:59:26.062 [       job-0] INFO  JobContainer         - Addax Writer.Job [kafkawriter] do post work.
2022-02-26 21:59:26.062 [       job-0] INFO  JobContainer         - Addax Reader.Job [streamreader] do post work.
2022-02-26 21:59:26.063 [       job-0] INFO  JobContainer         - PerfTrace not enable!
2022-02-26 21:59:26.064 [       job-0] INFO  StandAloneJobContainerCommunicator - Total 100 records, 9200 bytes | Speed 2.99KB/s, 33 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-02-26 21:59:26.065 [       job-0] INFO  JobContainer         - 
任务启动时刻                    : 2022-02-26 21:59:23
任务结束时刻                    : 2022-02-26 21:59:26
任务总计耗时                    :                  3s
任务平均流量                    :            2.99KB/s
记录写入速度                    :             33rec/s
读出记录总数                    :                 100
读写失败总数                    :                   0

我们使用 kafka 自带的 kafka-console-consumer.sh 尝试读取数据,输出如下:

bash
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-1 --from-beginning

{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":916}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":572}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":88}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":33}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":697}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":381}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":304}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":103}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":967}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":147}

参数说明

配置项是否必须数据类型默认值描述
brokerListstring连接 kafka 服务的 broker 配置,多个 broker之间用逗号(,)分隔
topicstring要写入的 topic
batchSizeint1204设置 Kafka 的 batch.size 参数
columnlist所配置的表中需要同步的列名集合,不允许为 *
propertiesmap需要设置的其他 kafka 连接参数

限制

  1. 仅支持 Kafka 1.0 及以上版本,低于该版本的无法确定是否能写入
  2. 当前不支持启用了 kerberos 认证的 kafka 服务