Kafka Writer¶
KafkaWriter 插件实现了将数据以 json 格式写入 Kafka 的功能。 该插件在 4.0.9
版本中引入。
示例¶
以下配置演示了如何从内存读取数据并写入到 kafka 的指定 topic 中。
创建任务文件¶
首先创建一个任务文件 stream2kafka.json
, 内容如下:
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{"random": "10,1000", "type": "long"},
{"value": "1.1.1.1", "type": "string"},
{"value": 19890604.0, "type": "double"},
{"value": 19890604, "type": "long"},
{"value": 19890604, "type": "long"},
{"value": "hello world", "type": "string"},
{"value": "long text", "type": "string"},
{"value": "41.12,-71.34", "type": "string"},
{"value": "2017-05-25 11:22:33", "type": "string"}
],
"sliceRecordCount": 100
}
},
"writer": {
"name": "kafkawriter",
"parameter": {
"brokerList": "localhost:9092",
"topic": "test-1",
"partitions": 0,
"batchSize": 1000,
"column": ["col1", "col2","col3","col4","col5", "col6", "col7", "col8", "col9"]
}
}
}
]
}
}
运行¶
执行 bin/addax.sh stream2kafka.json
命令,获得类似下面的输出:
2022-02-26 21:59:22.975 [ main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2022-02-26 21:59:22.985 [ main] INFO Engine -
{
"content":{
"reader":{
"parameter":{
"column":[
{
"random":"10,1000",
"type":"long"
},
{
"type":"string",
"value":"1.1.1.1"
},
{
"type":"double",
"value":19890604.0
},
{
"type":"long",
"value":19890604
},
{
"type":"long",
"value":19890604
},
{
"type":"string",
"value":"hello world"
},
{
"type":"string",
"value":"long text"
},
{
"type":"string",
"value":"41.12,-71.34"
},
{
"type":"string",
"value":"2017-05-25 11:22:33"
}
],
"sliceRecordCount":100
},
"name":"streamreader"
},
"writer":{
"parameter":{
"partitions":0,
"column":[
"col1",
"col2",
"col3",
"col4",
"col5",
"col6",
"col7",
"col8",
"col9"
],
"topic":"test-1",
"batchSize":1000,
"brokerList":"localhost:9092"
},
"name":"kafkawriter"
}
},
"setting":{
"speed":{
"channel":1
}
}
}
2022-02-26 21:59:23.002 [ main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2022-02-26 21:59:23.003 [ main] INFO JobContainer - Addax jobContainer starts job.
2022-02-26 21:59:23.004 [ main] INFO JobContainer - Set jobId = 0
2022-02-26 21:59:23.017 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] do prepare work .
2022-02-26 21:59:23.017 [ job-0] INFO JobContainer - Addax Writer.Job [kafkawriter] do prepare work .
2022-02-26 21:59:23.017 [ job-0] INFO JobContainer - Job set Channel-Number to 1 channel(s).
2022-02-26 21:59:23.018 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] splits to [1] tasks.
2022-02-26 21:59:23.019 [ job-0] INFO JobContainer - Addax Writer.Job [kafkawriter] splits to [1] tasks.
2022-02-26 21:59:23.039 [ job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2022-02-26 21:59:23.047 [ taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2022-02-26 21:59:23.050 [ taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2022-02-26 21:59:23.050 [ taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2022-02-26 21:59:23.082 [0-0-0-writer] INFO ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 1000
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id = addax-kafka-writer
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2022-02-26 21:59:23.412 [0-0-0-writer] INFO AppInfoParser - Kafka version : 2.0.0
2022-02-26 21:59:23.413 [0-0-0-writer] INFO AppInfoParser - Kafka commitId : 3402a8361b734732
2022-02-26 21:59:23.534 [kafka-producer-network-thread | addax-kafka-writer] INFO Metadata - Cluster ID: xPAQZFNDTp6y63nZO4LACA
2022-02-26 21:59:26.061 [ job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2022-02-26 21:59:26.062 [ job-0] INFO JobContainer - Addax Writer.Job [kafkawriter] do post work.
2022-02-26 21:59:26.062 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] do post work.
2022-02-26 21:59:26.063 [ job-0] INFO JobContainer - PerfTrace not enable!
2022-02-26 21:59:26.064 [ job-0] INFO StandAloneJobContainerCommunicator - Total 100 records, 9200 bytes | Speed 2.99KB/s, 33 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-02-26 21:59:26.065 [ job-0] INFO JobContainer -
任务启动时刻 : 2022-02-26 21:59:23
任务结束时刻 : 2022-02-26 21:59:26
任务总计耗时 : 3s
任务平均流量 : 2.99KB/s
记录写入速度 : 33rec/s
读出记录总数 : 100
读写失败总数 : 0
我们使用 kafka 自带的 kafka-console-consumer.sh
尝试读取数据,输出如下:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-1 --from-beginning
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":916}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":572}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":88}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":33}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":697}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":381}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":304}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":103}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":967}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":147}
参数说明¶
配置项 | 是否必须 | 数据类型 | 默认值 | 描述 |
---|---|---|---|---|
brokerList | 是 | string | 无 | 连接 kafka 服务的 broker 配置,类似 localhost:9092 ,多个 broker之间用逗号(, )分隔 |
topic | 是 | string | 无 | 要写入的 topic |
batchSize | 否 | int | 1204 | 设置 Kafka 的 batch.size 参数 |
column | 是 | list | 无 | 所配置的表中需要同步的列名集合,不允许为 * |
properties | 否 | dict | 无 | 需要设置的其他 kafka 连接参数 |
限制¶
- 仅支持 Kafka
1.0
及以上版本,低于该版本的无法确定是否能写入 - 当前不支持启用了
kerberos
认证的 kafka 服务