跳转至

ElasticSearch Writer

elasticsearchWriter 插件用于向 ElasticSearch 写入数据。其实现是通过 elasticsearch 的rest api接口, 批量把据写入elasticsearch

配置样例

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": {
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [
            {
              "random": "10,1000",
              "type": "long"
            },
            {
              "value": "1.1.1.1",
              "type": "string"
            },
            {
              "value": 19890604,
              "type": "double"
            },
            {
              "value": 19890604,
              "type": "long"
            },
            {
              "value": 19890604,
              "type": "long"
            },
            {
              "value": "hello world",
              "type": "string"
            },
            {
              "value": "long text",
              "type": "string"
            },
            {
              "value": "41.12,-71.34",
              "type": "string"
            },
            {
              "value": "2017-05-25 11:22:33",
              "type": "string"
            }
          ],
          "sliceRecordCount": 100
        }
      },
      "writer": {
        "name": "elasticsearchwriter",
        "parameter": {
          "endpoint": "http://localhost:9200",
          "index": "test-1",
          "type": "default",
          "cleanup": true,
          "settings": {
            "index": {
              "number_of_shards": 1,
              "number_of_replicas": 0
            }
          },
          "discovery": false,
          "batchSize": 1000,
          "splitter": ",",
          "column": [
            {
              "name": "pk",
              "type": "id"
            },
            {
              "name": "col_ip",
              "type": "ip"
            },
            {
              "name": "col_double",
              "type": "double"
            },
            {
              "name": "col_long",
              "type": "long"
            },
            {
              "name": "col_integer",
              "type": "integer"
            },
            {
              "name": "col_keyword",
              "type": "keyword"
            },
            {
              "name": "col_text",
              "type": "text",
              "analyzer": "ik_max_word"
            },
            {
              "name": "col_geo_point",
              "type": "geo_point"
            },
            {
              "name": "col_date",
              "type": "date",
              "format": "yyyy-MM-dd HH:mm:ss"
            },
            {
              "name": "col_nested1",
              "type": "nested"
            },
            {
              "name": "col_nested2",
              "type": "nested"
            },
            {
              "name": "col_object1",
              "type": "object"
            },
            {
              "name": "col_object2",
              "type": "object"
            },
            {
              "name": "col_integer_array",
              "type": "integer",
              "array": true
            },
            {
              "name": "col_geo_shape",
              "type": "geo_shape",
              "tree": "quadtree",
              "precision": "10m"
            }
          ]
        }
      }
    }
  }
}

参数说明

配置项 是否必须 默认值 描述
endpoint ElasticSearch的连接地址
accessId http auth中的user, 默认为空
accessKey http auth中的password
index index名
type index名 index的type名
cleanup false 是否删除原表
batchSize 1000 每次批量数据的条数
trySize 30 失败后重试的次数
timeout 600000 客户端超时时间,单位为毫秒(ms)
discovery false 启用节点发现将(轮询)并定期更新客户机中的服务器列表
compression true 否是开启http请求压缩
multiThread true 是否开启多线程http请求
ignoreWriteError false 写入错误时,是否重试,如果是 true 则表示一直重试,否则忽略该条数据
ignoreParseError true 解析数据格式错误时,是否继续写入
alias 数据导入完成后写入别名
aliasMode append 数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个)
settings 创建index时候的settings, 与elasticsearch官方相同
splitter , 如果插入数据是array,就使用指定分隔符
column 字段类型,文档中给出的样例中包含了全部支持的字段类型
dynamic false 不使用addax的mappings,使用es自己的自动mappings

约束限制

  • 如果导入id,这样数据导入失败也会重试,重新导入也仅仅是覆盖,保证数据一致性
  • 如果不导入id,就是append_only模式,elasticsearch自动生成id,速度会提升20%左右,但数据无法修复,适合日志型数据(对数据精度要求不高的)