ElasticSearchReader¶
ElasticSearchReader 插件实现了从 Elasticsearch 读取索引的功能, 它通过 Elasticsearch 提供的 Rest API (默认端口9200),执行指定的查询语句批量获取数据
示例¶
假定要获取的索引内容如下
{
  "took": 14,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 2,
    "max_score": 1,
    "hits": [
      {
        "_index": "test-1",
        "_type": "default",
        "_id": "38",
        "_score": 1,
        "_source": {
          "col_date": "2017-05-25T11:22:33.000+08:00",
          "col_integer": 19890604,
          "col_keyword": "hello world",
          "col_ip": "1.1.1.1",
          "col_text": "long text",
          "col_double": 19890604,
          "col_long": 19890604,
          "col_geo_point": "41.12,-71.34"
        }
      },
      {
        "_index": "test-1",
        "_type": "default",
        "_id": "103",
        "_score": 1,
        "_source": {
          "col_date": "2017-05-25T11:22:33.000+08:00",
          "col_integer": 19890604,
          "col_keyword": "hello world",
          "col_ip": "1.1.1.1",
          "col_text": "long text",
          "col_double": 19890604,
          "col_long": 19890604,
          "col_geo_point": "41.12,-71.34"
        }
      }
    ]
  }
}
配置一个从 Elasticsearch 读取数据并打印到终端的任务
{
  "job": {
    "setting": {
      "speed": {
        "byte": -1,
        "channel": 1
      }
    },
    "content": {
      "reader": {
        "name": "elasticsearchreader",
        "parameter": {
          "endpoint": "http://127.0.0.1:9200",
          "accessId": "",
          "accesskey": "",
          "index": "test-1",
          "type": "default",
          "searchType": "dfs_query_then_fetch",
          "headers": {},
          "scroll": "3m",
          "search": [
            {
              "query": {
                "match": {
                  "col_ip": "1.1.1.1"
                }
              },
              "aggregations": {
                "top_10_states": {
                  "terms": {
                    "field": "col_date",
                    "size": 10
                  }
                }
              }
            }
          ],
          "column": [
            "col_ip",
            "col_double",
            "col_long",
            "col_integer",
            "col_keyword",
            "col_text",
            "col_geo_point",
            "col_date"
          ]
        }
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true,
          "encoding": "UTF-8"
        }
      }
    }
  }
}
将上述内容保存为 job/es2stream.json
执行下面的命令进行采集
bin/addax.sh job/es2stream.json
其输出结果类似如下(输出记录数有删减)
2021-02-19 13:38:15.860 [main] INFO  VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-02-19 13:38:15.895 [main] INFO  Engine -
{
    "content":
        {
            "reader":{
                "parameter":{
                    "accessId":"",
                    "headers":{},
                    "endpoint":"http://127.0.0.1:9200",
                    "search":[
                      {
                        "query": {
                          "match": {
                            "col_ip": "1.1.1.1"
                          }
                        },
                        "aggregations": {
                          "top_10_states": {
                            "terms": {
                              "field": "col_date",
                              "size": 10
                            }
                          }
                        }
                      }
                    ],
                    "accesskey":"*****",
                    "searchType":"dfs_query_then_fetch",
                    "scroll":"3m",
                    "column":[
                        "col_ip",
                        "col_double",
                        "col_long",
                        "col_integer",
                        "col_keyword",
                        "col_text",
                        "col_geo_point",
                        "col_date"
                    ],
                    "index":"test-1",
                    "type":"default"
                },
                "name":"elasticsearchreader"
            },
            "writer":{
                "parameter":{
                    "print":true,
                    "encoding":"UTF-8"
                },
                "name":"streamwriter"
            }
        },
    "setting":{
        "errorLimit":{
            "record":0,
            "percentage":0.02
        },
        "speed":{
            "byte":-1,
            "channel":1
        }
    }
}
2021-02-19 13:38:15.934 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-19 13:38:15.934 [main] INFO  JobContainer - Addax jobContainer starts job.
2021-02-19 13:38:15.937 [main] INFO  JobContainer - Set jobId = 0
2017-05-25T11:22:33.000+08:00   19890604    hello world 1.1.1.1 long text   19890604    19890604    41.12,-71.34
2017-05-25T11:22:33.000+08:00   19890604    hello world 1.1.1.1 long text   19890604    19890604    41.12,-71.34
2021-02-19 13:38:19.845 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2021-02-19 13:38:19.848 [job-0] INFO  JobContainer - Addax Writer.Job [streamwriter] do post work.
2021-02-19 13:38:19.849 [job-0] INFO  JobContainer - Addax Reader.Job [elasticsearchreader] do post work.
2021-02-19 13:38:19.855 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-02-19 13:38:19.858 [job-0] INFO  StandAloneJobContainerCommunicator - Total 95 records, 8740 bytes | Speed 2.84KB/s, 31 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.103s | Percentage 100.00%
2021-02-19 13:38:19.861 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-02-19 13:38:15
任务结束时刻                    : 2021-02-19 13:38:19
任务总计耗时                    :                  3s
任务平均流量                    :            2.84KB/s
记录写入速度                    :             31rec/s
读出记录总数                    :                   2
读写失败总数                    :                   0
参数说明¶
| 配置项 | 是否必须 | 类型 | 默认值 | 描述 | 
|---|---|---|---|---|
| endpoint | 是 | string | 无 | ElasticSearch的连接地址 | 
| accessId | 否 | string | "" | http auth中的user | 
| accessKey | 否 | string | "" | http auth中的password | 
| index | 是 | string | 无 | elasticsearch中的index名 | 
| type | 否 | string | index名 | elasticsearch中index的type名 | 
| search | 是 | list | [] | json格式api搜索数据体 | 
| column | 是 | list | 无 | 需要读取的字段 | 
| timeout | 否 | int | 60 | 客户端超时时间(单位:秒) | 
| discovery | 否 | boolean | false | 启用节点发现(轮询)并定期更新客户机中的服务器列表 | 
| compression | 否 | boolean | true | http请求,开启压缩 | 
| multiThread | 否 | boolean | true | http请求,是否有多线程 | 
| searchType | 否 | string | dfs_query_then_fetch | 搜索类型 | 
| headers | 否 | map | {} | http请求头 | 
| scroll | 否 | string | "" | 滚动分页配置 | 
search¶
search 配置项允许配置为满足 Elasticsearch API 查询要求的内容,比如这样:
{
  "query": {
    "match": {
      "message": "myProduct"
    }
  },
  "aggregations": {
    "top_10_states": {
      "terms": {
        "field": "state",
        "size": 10
      }
    }
  }
}
searchType¶
searchType 目前支持以下几种:
- dfs_query_then_fetch
- query_then_fetch
- count
- scan