跳转至

MongoDB Writer

MongoDB Writer 插件用于向 MongoDB 写入数据。

配置样例

该示例将流式数据写入到 MongoDB 表中

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": {
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [
            {
              "value": "unique_id",
              "type": "string"
            },
            {
              "value": "sid",
              "type": "string"
            },
            {
              "value": "user_id",
              "type": "string"
            },
            {
              "value": "auction_id",
              "type": "string"
            },
            {
              "value": "content_type",
              "type": "string"
            },
            {
              "value": "pool_type",
              "type": "string"
            },
            {
              "value": "a1 a2 a3",
              "type": "string"
            },
            {
              "value": "c1 c2 c3",
              "type": "string"
            },
            {
              "value": "2020-09-06",
              "type": "string"
            },
            {
              "value": "tag1 tag2 tag3",
              "type": "string"
            },
            {
              "value": "property",
              "type": "string"
            },
            {
              "value": 1984,
              "type": "long"
            },
            {
              "value": 1900,
              "type": "long"
            },
            {
              "value": 75,
              "type": "long"
            }
          ],
          "sliceRecordCount": 10
        }
      },
      "writer": {
        "name": "mongodbwriter",
        "parameter": {
          "connection": {
            "address": [
              "127.0.0.1:27017"
            ],
            "database": "my_database",
            "collection": "addax_writer",
            "authDb": "my_database"
          },
          "username": "my_user",
          "password": "password123",
          "column": [
            {
              "name": "unique_id",
              "type": "string"
            },
            {
              "name": "sid",
              "type": "string"
            },
            {
              "name": "user_id",
              "type": "string"
            },
            {
              "name": "auction_id",
              "type": "string"
            },
            {
              "name": "content_type",
              "type": "string"
            },
            {
              "name": "pool_type",
              "type": "string"
            },
            {
              "name": "frontcat_id",
              "type": "Array",
              "splitter": " "
            },
            {
              "name": "categoryid",
              "type": "Array",
              "splitter": " "
            },
            {
              "name": "gmt_create",
              "type": "string"
            },
            {
              "name": "taglist",
              "type": "Array",
              "splitter": " "
            },
            {
              "name": "property",
              "type": "string"
            },
            {
              "name": "scorea",
              "type": "int"
            },
            {
              "name": "scoreb",
              "type": "int"
            },
            {
              "name": "scorec",
              "type": "int"
            }
          ],
          "writeMode": "insert"
        }
      }
    }
  }
}

参数说明

配置项 是否必须 类型 默认值 描述
address list MongoDB 的数据地址信息
username string MongoDB 的用户名
password string MongoDB 的密码
collection string MongoDB 的集合名
column list<map> MongoDB 的文档列名
splitter string 特殊分隔符,详见下文
writeMode string insert 指定了传输数据时更新的信息,支持 insert, update 两种
batchSize int 2048 指定批次输入的数量
isUpsert boolean 当设置为 true 时,表示针对相同的 upsertKey 做更新操作
upsertKey string upsertKey 指定了没行记录的业务主键。用来做更新时使用

column

column 指定 mongo collection 的字段以及类型,如果是数组类型,还需要指定接收到的数据按照什么分割,一个 column 字段至少需要指定 name 以及 type,比如

{
  "column": [
    {
      "name": "user_id",
      "type": "string"
    }
  ]
}

如果是数组类型,则需要配置 splitter 来告知分隔符,类似如下:

{
  "column": {
    "name": "taglist",
    "type": "Array",
    "splitter": " "
  }
}

splitter

当且仅当要处理的字符串要用分隔符分隔为字符数组时,才使用这个参数,通过这个参数指定的分隔符,将字符串分隔存储到 MongoDB 的数组中

writeMode

不配置的情况下,默认采取直接插入记录的方式,如果希望实现插入更新(即记录存在则更新否则插入),可以指定为 update 模式,该模式下,必须同时更新的字段是哪个,比如:

{
  "writeMode": "update(unique_id)"
}

上述配置表示依据字段 unique_id 来决定当前记录是插入还是更新,当前暂不支持指定多个字段

类型转换

Addax 内部类型 MongoDB 数据类型
Long int, Long
Double double
String string, array
Date date
Boolean boolean
Bytes bytes