跳转至

ElasticSearchReader

ElasticSearchReader 插件实现了从 Elasticsearch 读取索引的功能, 它通过 Elasticsearch 提供的 Rest API (默认端口9200),执行指定的查询语句批量获取数据

示例

假定要获取的索引内容如下

{
  "took": 14,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 2,
    "max_score": 1,
    "hits": [
      {
        "_index": "test-1",
        "_type": "default",
        "_id": "38",
        "_score": 1,
        "_source": {
          "col_date": "2017-05-25T11:22:33.000+08:00",
          "col_integer": 19890604,
          "col_keyword": "hello world",
          "col_ip": "1.1.1.1",
          "col_text": "long text",
          "col_double": 19890604,
          "col_long": 19890604,
          "col_geo_point": "41.12,-71.34"
        }
      },
      {
        "_index": "test-1",
        "_type": "default",
        "_id": "103",
        "_score": 1,
        "_source": {
          "col_date": "2017-05-25T11:22:33.000+08:00",
          "col_integer": 19890604,
          "col_keyword": "hello world",
          "col_ip": "1.1.1.1",
          "col_text": "long text",
          "col_double": 19890604,
          "col_long": 19890604,
          "col_geo_point": "41.12,-71.34"
        }
      }
    ]
  }
}

配置一个从 Elasticsearch 读取数据并打印到终端的任务

{
  "job": {
    "setting": {
      "speed": {
        "byte": -1,
        "channel": 1
      }
    },
    "content": [
      {
        "reader": {
          "name": "elasticsearchreader",
          "parameter": {
            "endpoint": "http://127.0.0.1:9200",
            "accessId": "",
            "accesskey": "",
            "index": "test-1",
            "type": "default",
            "searchType": "dfs_query_then_fetch",
            "headers": {},
            "scroll": "3m",
            "search": [
              {
                "query": {
                  "match": {
                    "col_ip": "1.1.1.1"
                  }
                },
                "aggregations": {
                  "top_10_states": {
                    "terms": {
                      "field": "col_date",
                      "size": 10
                    }
                  }
                }
              }
            ],
            "column": [
              "col_ip",
              "col_double",
              "col_long",
              "col_integer",
              "col_keyword",
              "col_text",
              "col_geo_point",
              "col_date"
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": true,
            "encoding": "UTF-8"
          }
        }
      }
    ]
  }
}

将上述内容保存为 job/es2stream.json

执行下面的命令进行采集

bin/addax.sh job/es2stream.json

其输出结果类似如下(输出记录数有删减)

2021-02-19 13:38:15.860 [main] INFO  VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-02-19 13:38:15.895 [main] INFO  Engine -
{
    "content":[
        {
            "reader":{
                "parameter":{
                    "accessId":"",
                    "headers":{},
                    "endpoint":"http://127.0.0.1:9200",
                    "search":[
                      {
                        "query": {
                          "match": {
                            "col_ip": "1.1.1.1"
                          }
                        },
                        "aggregations": {
                          "top_10_states": {
                            "terms": {
                              "field": "col_date",
                              "size": 10
                            }
                          }
                        }
                      }
                    ],
                    "accesskey":"*****",
                    "searchType":"dfs_query_then_fetch",
                    "scroll":"3m",
                    "column":[
                        "col_ip",
                        "col_double",
                        "col_long",
                        "col_integer",
                        "col_keyword",
                        "col_text",
                        "col_geo_point",
                        "col_date"
                    ],
                    "index":"test-1",
                    "type":"default"
                },
                "name":"elasticsearchreader"
            },
            "writer":{
                "parameter":{
                    "print":true,
                    "encoding":"UTF-8"
                },
                "name":"streamwriter"
            }
        }
    ],
    "setting":{
        "errorLimit":{
            "record":0,
            "percentage":0.02
        },
        "speed":{
            "byte":-1,
            "channel":1
        }
    }
}

2021-02-19 13:38:15.934 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-19 13:38:15.934 [main] INFO  JobContainer - Addax jobContainer starts job.
2021-02-19 13:38:15.937 [main] INFO  JobContainer - Set jobId = 0

2017-05-25T11:22:33.000+08:00   19890604    hello world 1.1.1.1 long text   19890604    19890604    41.12,-71.34
2017-05-25T11:22:33.000+08:00   19890604    hello world 1.1.1.1 long text   19890604    19890604    41.12,-71.34

2021-02-19 13:38:19.845 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2021-02-19 13:38:19.848 [job-0] INFO  JobContainer - Addax Writer.Job [streamwriter] do post work.
2021-02-19 13:38:19.849 [job-0] INFO  JobContainer - Addax Reader.Job [elasticsearchreader] do post work.
2021-02-19 13:38:19.855 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-02-19 13:38:19.858 [job-0] INFO  StandAloneJobContainerCommunicator - Total 95 records, 8740 bytes | Speed 2.84KB/s, 31 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.103s | Percentage 100.00%
2021-02-19 13:38:19.861 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-02-19 13:38:15
任务结束时刻                    : 2021-02-19 13:38:19
任务总计耗时                    :                  3s
任务平均流量                    :            2.84KB/s
记录写入速度                    :             31rec/s
读出记录总数                    :                   2
读写失败总数                    :                   0

参数说明

配置项 是否必须 类型 默认值 描述
endpoint string ElasticSearch的连接地址
accessId string "" http auth中的user
accessKey string "" http auth中的password
index string elasticsearch中的index名
type string index名 elasticsearch中index的type名
search list [] json格式api搜索数据体
column list 需要读取的字段
timeout int 60 客户端超时时间(单位:秒)
discovery boolean false 启用节点发现将(轮询)并定期更新客户机中的服务器列表
compression boolean true http请求,开启压缩
multiThread boolean true http请求,是否有多线程
searchType string dfs_query_then_fetch 搜索类型
headers map {} http请求头
scroll string "" 滚动分页配置

search 配置项允许配置为满足 Elasticsearch API 查询要求的内容,比如这样:

{
  "query": {
    "match": {
      "message": "myProduct"
    }
  },
  "aggregations": {
    "top_10_states": {
      "terms": {
        "field": "state",
        "size": 10
      }
    }
  }
}

searchType

searchType 目前支持以下几种:

  • dfs_query_then_fetch
  • query_then_fetch
  • count
  • scan
Back to top