跳转至

ElasticSearch Writer

elasticsearchWriter 插件用于向 ElasticSearch 写入数据。其实现是通过 elasticsearch 的 rest api 接口, 批量把据写入 elasticsearch

配置样例

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": {
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [
            {
              "random": "10,1000",
              "type": "long"
            },
            {
              "value": "1.1.1.1",
              "type": "string"
            },
            {
              "value": 19890604,
              "type": "double"
            },
            {
              "value": 19890604,
              "type": "long"
            },
            {
              "value": 19890604,
              "type": "long"
            },
            {
              "value": "hello world",
              "type": "string"
            },
            {
              "value": "long text",
              "type": "string"
            },
            {
              "value": "41.12,-71.34",
              "type": "string"
            },
            {
              "value": "2017-05-25 11:22:33",
              "type": "string"
            }
          ],
          "sliceRecordCount": 100
        }
      },
      "writer": {
        "name": "elasticsearchwriter",
        "parameter": {
          "endpoint": "http://localhost:9200",
          "index": "test-1",
          "type": "default",
          "cleanup": true,
          "settings": {
            "index": {
              "number_of_shards": 1,
              "number_of_replicas": 0
            }
          },
          "discovery": false,
          "batchSize": 1000,
          "splitter": ",",
          "column": [
            {
              "name": "pk",
              "type": "id"
            },
            {
              "name": "col_ip",
              "type": "ip"
            },
            {
              "name": "col_double",
              "type": "double"
            },
            {
              "name": "col_long",
              "type": "long"
            },
            {
              "name": "col_integer",
              "type": "integer"
            },
            {
              "name": "col_keyword",
              "type": "keyword"
            },
            {
              "name": "col_text",
              "type": "text",
              "analyzer": "ik_max_word"
            },
            {
              "name": "col_geo_point",
              "type": "geo_point"
            },
            {
              "name": "col_date",
              "type": "date",
              "format": "yyyy-MM-dd HH:mm:ss"
            },
            {
              "name": "col_nested1",
              "type": "nested"
            },
            {
              "name": "col_nested2",
              "type": "nested"
            },
            {
              "name": "col_object1",
              "type": "object"
            },
            {
              "name": "col_object2",
              "type": "object"
            },
            {
              "name": "col_integer_array",
              "type": "integer",
              "array": true
            },
            {
              "name": "col_geo_shape",
              "type": "geo_shape",
              "tree": "quadtree",
              "precision": "10m"
            }
          ]
        }
      }
    }
  }
}

参数说明

配置项 是否必须 默认值 描述
endpoint ElasticSearch 的连接地址,如果是集群,则多个地址用逗号(,)分割
accessId http auth 中的 user, 默认为空
accessKey http auth 中的 password
index index 名
type index 名 index 的 type 名
cleanup false 是否删除原表
batchSize 1000 每次批量数据的条数
trySize 30 失败后重试的次数
timeout 600000 客户端超时时间,单位为毫秒(ms)
discovery false 启用节点发现将(轮询)并定期更新客户机中的服务器列表
compression true 否是开启 http 请求压缩
multiThread true 是否开启多线程 http 请求
ignoreWriteError false 写入错误时,是否重试,如果是 true 则表示一直重试,否则忽略该条数据
ignoreParseError true 解析数据格式错误时,是否继续写入
alias 数据导入完成后写入别名
aliasMode append 数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个)
settings 创建 index 时候的 settings, 与 elasticsearch 官方相同
splitter , 如果插入数据是 array,就使用指定分隔符
column 字段类型,文档中给出的样例中包含了全部支持的字段类型
dynamic false 不使用 addax 的 mappings,使用 es 自己的自动 mappings

约束限制

  • 如果导入 id,这样数据导入失败也会重试,重新导入也仅仅是覆盖,保证数据一致性
  • 如果不导入 id,就是 append_only 模式,elasticsearch 自动生成 id,速度会提升 20%左右,但数据无法修复,适合日志型数据(对数据精度要求不高的)