跳转至

ElasticSearch Writer

ElasticSearch Writer 插件用于向 ElasticSearch 写入数据。 其实现是通过 elasticsearch 的 rest api 接口, 批量把据写入 elasticsearch

配置样例

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": {
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [
            {
              "random": "10,1000",
              "type": "long"
            },
            {
              "value": "1.1.1.1",
              "type": "string"
            },
            {
              "value": 19890604,
              "type": "double"
            },
            {
              "value": 19890604,
              "type": "long"
            },
            {
              "value": 19890604,
              "type": "long"
            },
            {
              "value": "hello world",
              "type": "string"
            },
            {
              "value": "long text",
              "type": "string"
            },
            {
              "value": "41.12,-71.34",
              "type": "string"
            },
            {
              "value": "2017-05-25 11:22:33",
              "type": "string"
            }
          ],
          "sliceRecordCount": 100
        }
      },
      "writer": {
        "name": "elasticsearchwriter",
        "parameter": {
          "endpoint": "http://localhost:9200",
          "index": "test-1",
          "type": "default",
          "cleanup": true,
          "settings": {
            "index": {
              "number_of_shards": 1,
              "number_of_replicas": 0
            }
          },
          "discovery": false,
          "batchSize": 1000,
          "splitter": ",",
          "column": [
            {
              "name": "pk",
              "type": "id"
            },
            {
              "name": "col_ip",
              "type": "ip"
            },
            {
              "name": "col_double",
              "type": "double"
            },
            {
              "name": "col_long",
              "type": "long"
            },
            {
              "name": "col_integer",
              "type": "integer"
            },
            {
              "name": "col_keyword",
              "type": "keyword"
            },
            {
              "name": "col_text",
              "type": "text",
              "analyzer": "ik_max_word"
            },
            {
              "name": "col_geo_point",
              "type": "geo_point"
            },
            {
              "name": "col_date",
              "type": "date",
              "format": "yyyy-MM-dd HH:mm:ss"
            },
            {
              "name": "col_nested1",
              "type": "nested"
            },
            {
              "name": "col_nested2",
              "type": "nested"
            },
            {
              "name": "col_object1",
              "type": "object"
            },
            {
              "name": "col_object2",
              "type": "object"
            },
            {
              "name": "col_integer_array",
              "type": "integer",
              "array": true
            },
            {
              "name": "col_geo_shape",
              "type": "geo_shape",
              "tree": "quadtree",
              "precision": "10m"
            }
          ]
        }
      }
    }
  }
}

参数说明

配置项 是否必须 数据类型 默认值 描述
endpoint string ElasticSearch 的连接地址,如果是集群,则多个地址用逗号(,)分割
accessId string http auth 中的 user, 默认为空
accessKey string http auth 中的 password
index string index 名
type string default index 类型
cleanup boolean false 是否删除原表
batchSize int 1000 每次批量数据的条数
trySize int 30 失败后重试的次数
timeout int 600000 客户端超时时间,单位为毫秒(ms)
discovery boolean false 启用节点发现将(轮询)并定期更新客户机中的服务器列表
compression boolean true 否是开启 http 请求压缩
multiThread boolean true 是否开启多线程 http 请求
ignoreWriteError boolean false 写入错误时,是否重试,如果是 true 则表示一直重试,否则忽略该条数据
ignoreParseError boolean true 解析数据格式错误时,是否继续写入
alias string 数据导入完成后写入别名
aliasMode string append 数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个)
settings map 创建 index 时候的 settings, 与 elasticsearch 官方相同
splitter string , 如果插入数据是 array,就使用指定分隔符
column list<map> 字段类型,文档中给出的样例中包含了全部支持的字段类型
dynamic boolean false 不使用 addax 的 mappings,使用 es 自己的自动 mappings

约束限制

  • 如果导入 id,这样数据导入失败也会重试,重新导入也仅仅是覆盖,保证数据一致性
  • 如果不导入 id,就是 append_only 模式,elasticsearch 自动生成 id,速度会提升 20%左右,但数据无法修复,适合日志型数据(对数据精度要求不高的)