跳转至

Kafka Writer

KafkaWriter 插件实现了将数据以 json 格式写入 Kafka 的功能。 该插件在 4.0.9 版本中引入。

示例

以下配置演示了如何从内存读取数据并写入到 kafka 的指定 topic 中。

创建任务文件

首先创建一个任务文件 stream2kafka.json , 内容如下:

{
  "job": {
    "setting": {
        "speed": {
            "channel": 1
        }
    },
    "content": [
      {
        "reader": {
            "name": "streamreader",
            "parameter": {
              "column": [
                    {"random": "10,1000", "type": "long"},
                    {"value": "1.1.1.1", "type": "string"},
                    {"value": 19890604.0, "type": "double"},
                    {"value": 19890604, "type": "long"},
                    {"value": 19890604, "type": "long"},
                    {"value": "hello world", "type": "string"},
                    {"value": "long text", "type": "string"},
                    {"value": "41.12,-71.34", "type": "string"},
                    {"value": "2017-05-25 11:22:33", "type": "string"}
                    ],
            "sliceRecordCount": 100
            }
        },
        "writer": {
          "name": "kafkawriter",
          "parameter": {
            "brokerList": "localhost:9092",
            "topic": "test-1",
            "partitions": 0,
            "batchSize": 1000,
            "column": ["col1", "col2","col3","col4","col5", "col6", "col7", "col8", "col9"]
          }
        }
      }
    ]
  }
}

运行

执行 bin/addax.sh stream2kafka.json 命令,获得类似下面的输出:

2022-02-26 21:59:22.975 [        main] INFO  VMInfo               - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2022-02-26 21:59:22.985 [        main] INFO  Engine               - 
{
    "content":{
        "reader":{
            "parameter":{
                "column":[
                    {
                        "random":"10,1000",
                        "type":"long"
                    },
                    {
                        "type":"string",
                        "value":"1.1.1.1"
                    },
                    {
                        "type":"double",
                        "value":19890604.0
                    },
                    {
                        "type":"long",
                        "value":19890604
                    },
                    {
                        "type":"long",
                        "value":19890604
                    },
                    {
                        "type":"string",
                        "value":"hello world"
                    },
                    {
                        "type":"string",
                        "value":"long text"
                    },
                    {
                        "type":"string",
                        "value":"41.12,-71.34"
                    },
                    {
                        "type":"string",
                        "value":"2017-05-25 11:22:33"
                    }
                ],
                "sliceRecordCount":100
            },
            "name":"streamreader"
        },
        "writer":{
            "parameter":{
                "partitions":0,
                "column":[
                    "col1",
                    "col2",
                    "col3",
                    "col4",
                    "col5",
                    "col6",
                    "col7",
                    "col8",
                    "col9"
                ],
                "topic":"test-1",
                "batchSize":1000,
                "brokerList":"localhost:9092"
            },
            "name":"kafkawriter"
        }
    },
    "setting":{
        "speed":{
            "channel":1
        }
    }
}

2022-02-26 21:59:23.002 [        main] INFO  PerfTrace            - PerfTrace traceId=job_-1, isEnable=false, priority=0
2022-02-26 21:59:23.003 [        main] INFO  JobContainer         - Addax jobContainer starts job.
2022-02-26 21:59:23.004 [        main] INFO  JobContainer         - Set jobId = 0
2022-02-26 21:59:23.017 [       job-0] INFO  JobContainer         - Addax Reader.Job [streamreader] do prepare work .
2022-02-26 21:59:23.017 [       job-0] INFO  JobContainer         - Addax Writer.Job [kafkawriter] do prepare work .
2022-02-26 21:59:23.017 [       job-0] INFO  JobContainer         - Job set Channel-Number to 1 channel(s).
2022-02-26 21:59:23.018 [       job-0] INFO  JobContainer         - Addax Reader.Job [streamreader] splits to [1] tasks.
2022-02-26 21:59:23.019 [       job-0] INFO  JobContainer         - Addax Writer.Job [kafkawriter] splits to [1] tasks.
2022-02-26 21:59:23.039 [       job-0] INFO  JobContainer         - Scheduler starts [1] taskGroups.
2022-02-26 21:59:23.047 [ taskGroup-0] INFO  TaskGroupContainer   - taskGroupId=[0] start [1] channels for [1] tasks.
2022-02-26 21:59:23.050 [ taskGroup-0] INFO  Channel              - Channel set byte_speed_limit to -1, No bps activated.
2022-02-26 21:59:23.050 [ taskGroup-0] INFO  Channel              - Channel set record_speed_limit to -1, No tps activated.
2022-02-26 21:59:23.082 [0-0-0-writer] INFO  ProducerConfig       - ProducerConfig values: 
    acks = 1
    batch.size = 1000
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.id = addax-kafka-writer
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2022-02-26 21:59:23.412 [0-0-0-writer] INFO  AppInfoParser        - Kafka version : 2.0.0
2022-02-26 21:59:23.413 [0-0-0-writer] INFO  AppInfoParser        - Kafka commitId : 3402a8361b734732
2022-02-26 21:59:23.534 [kafka-producer-network-thread | addax-kafka-writer] INFO  Metadata             - Cluster ID: xPAQZFNDTp6y63nZO4LACA
2022-02-26 21:59:26.061 [       job-0] INFO  AbstractScheduler    - Scheduler accomplished all tasks.
2022-02-26 21:59:26.062 [       job-0] INFO  JobContainer         - Addax Writer.Job [kafkawriter] do post work.
2022-02-26 21:59:26.062 [       job-0] INFO  JobContainer         - Addax Reader.Job [streamreader] do post work.
2022-02-26 21:59:26.063 [       job-0] INFO  JobContainer         - PerfTrace not enable!
2022-02-26 21:59:26.064 [       job-0] INFO  StandAloneJobContainerCommunicator - Total 100 records, 9200 bytes | Speed 2.99KB/s, 33 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-02-26 21:59:26.065 [       job-0] INFO  JobContainer         - 
任务启动时刻                    : 2022-02-26 21:59:23
任务结束时刻                    : 2022-02-26 21:59:26
任务总计耗时                    :                  3s
任务平均流量                    :            2.99KB/s
记录写入速度                    :             33rec/s
读出记录总数                    :                 100
读写失败总数                    :                   0

我们使用 kafka 自带的 kafka-console-consumer.sh 尝试读取数据,输出如下:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-1 --from-beginning

{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":916}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":572}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":88}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":33}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":697}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":381}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":304}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":103}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":967}
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":147}

参数说明

配置项 是否必须 数据类型 默认值 描述
brokerList string 连接 kafka 服务的 broker 配置,类似 localhost:9092 ,多个 broker之间用逗号(,)分隔
topic string 要写入的 topic
batchSize int 1204 设置 Kafka 的 batch.size 参数
column list 所配置的表中需要同步的列名集合,不允许为 *
properties dict 需要设置的其他 kafka 连接参数

限制

  1. 仅支持 Kafka 1.0 及以上版本,低于该版本的无法确定是否能写入
  2. 当前不支持启用了 kerberos 认证的 kafka 服务