Paimon Writer¶
Paimon Writer 提供向 已有的paimon表写入数据的能力。
配置样例¶
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0
}
},
"content": [
{
"reader": {
"name": "rdbmsreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"*"
],
"connection": [
{
"querySql": [
"select 1+0 id ,'test1' as name"
],
"jdbcUrl": ["jdbc:mysql://localhost:3306/ruoyi_vue_camunda?allowPublicKeyRetrieval=true",]
}
],
"fetchSize": 1024
}
},
"writer": {
"name": "paimonwriter",
"parameter": {
"dbName": "test",
"tableName": "test2",
"writeMode": "truncate",
"paimonConfig": {
"warehouse": "file:///g:/paimon",
"metastore": "filesystem"
}
}
}
}
]
}
}
参数说明¶
配置项 | 是否必须 | 数据类型 | 默认值 | 说明 |
---|---|---|---|---|
dbName | 是 | string | 无 | 要写入的paimon数据库名 |
tableName | 是 | string | 无 | 要写入的paimon表名 |
writeMode | 是 | string | 无 | 写入模式,详述见下 |
paimonConfig | 是 | json | {} | 里可以配置与 Paimon catalog和Hadoop 相关的一些高级参数,比如HA的配置 |
writeMode¶
写入前数据清理处理模式:
- append,写入前不做任何处理,直接写入,不清除原来的数据。
- truncate 写入前先清空表,再写入。
paimonConfig¶
paimonConfig
里可以配置与 Paimon catalog和Hadoop 相关的一些高级参数,比如HA的配置
本地目录创建paimon表
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>paimon-java-api-test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>3.2.4</hadoop.version>
<woodstox.version>7.0.0</woodstox.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-bundle</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.woodstox</groupId>
<artifactId>woodstox-core</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>net.minidev</groupId>
<artifactId>json-smart</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.woodstox</groupId>
<artifactId>woodstox-core</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>net.minidev</groupId>
<artifactId>json-smart</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.woodstox</groupId>
<artifactId>woodstox-core</artifactId>
<version>${woodstox.version}</version>
</dependency>
</dependencies>
</project>
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;
import java.util.HashMap;
import java.util.Map;
public class CreatePaimonTable {
public static Catalog createFilesystemCatalog() {
CatalogContext context = CatalogContext.create(new Path("file:///g:/paimon"));
return CatalogFactory.createCatalog(context);
}
/* 如果是minio则例子如下
public static Catalog createFilesystemCatalog() {
Options options = new Options();
options.set("warehouse", "s3a://pvc-91d1e2cd-4d25-45c9-8613-6c4f7bf0a4cc/paimon");
Configuration hadoopConf = new Configuration();
hadoopConf.set("fs.s3a.endpoint", "http://localhost:9000");
hadoopConf.set("fs.s3a.access.key", "gy0dX5lALP176g6c9fYf");
hadoopConf.set("fs.s3a.secret.key", "ReuUrCzzu5wKWAegtswoHIWV389BYl9AB1ZQbiKr");
hadoopConf.set("fs.s3a.connection.ssl.enabled", "false");
hadoopConf.set("fs.s3a.path.style.access", "true");
hadoopConf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem");
CatalogContext context = CatalogContext.create(options,hadoopConf);
return CatalogFactory.createCatalog(context);
}
*
*
* */
public static void main(String[] args) {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.primaryKey("id");
schemaBuilder.column("id", DataTypes.INT());
schemaBuilder.column("name", DataTypes.STRING());
Map<String, String> options = new HashMap<>();
options.put("bucket", "1");//由于paimon java api 限制需要bucket>0
options.put("bucket-key", "id");
options.put("file.format", "orc");
options.put("file.compression", "lz4");
options.put("lookup.cache-spill-compression", "lz4");
options.put("spill-compression", "LZ4");
options.put("orc.compress", "lz4");
options.put("manifest.format", "orc");
schemaBuilder.options(options);
Schema schema = schemaBuilder.build();
Identifier identifier = Identifier.create("test", "test2");
try {
Catalog catalog = CreatePaimonTable.createFilesystemCatalog();
catalog.createDatabase("test",true);
catalog.createTable(identifier, schema, true);
} catch (Catalog.TableAlreadyExistException e) {
e.printStackTrace();
} catch (Catalog.DatabaseNotExistException e) {
e.printStackTrace();
} catch (Catalog.DatabaseAlreadyExistException e) {
throw new RuntimeException(e);
}
}
}
Spark 或者 flink 环境创建表
CREATE TABLE if not exists test.test2(id int ,name string) tblproperties (
'primary-key' = 'id',
'bucket' = '1',
'bucket-key' = 'id'
'file.format'='orc',
'file.compression'='lz4',
'lookup.cache-spill-compression'='lz4',
'spill-compression'='LZ4',
'orc.compress'='lz4',
'manifest.format'='orc'
)
本地文件例子
{
"name": "paimonwriter",
"parameter": {
"dbName": "test",
"tableName": "test2",
"writeMode": "truncate",
"paimonConfig": {
"warehouse": "file:///g:/paimon",
"metastore": "filesystem"
}
}
}
s3 或者 minio catalog例子
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0
}
},
"content": [
{
"reader": {
"name": "rdbmsreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"*"
],
"connection": [
{
"querySql": [
"select 1+0 id ,'test1' as name"
],
"jdbcUrl": [
"jdbc:mysql://localhost:3306/ruoyi_vue_camunda?allowPublicKeyRetrieval=true"
]
}
],
"fetchSize": 1024
}
},
"writer": {
"name": "paimonwriter",
"parameter": {
"dbName": "test",
"tableName": "test2",
"writeMode": "truncate",
"paimonConfig": {
"warehouse": "s3a://pvc-91d1e2cd-4d25-45c9-8613-6c4f7bf0a4cc/paimon",
"metastore": "filesystem",
"fs.s3a.endpoint": "http://localhost:9000",
"fs.s3a.access.key": "gy0dX5lALP176g6c9fYf",
"fs.s3a.secret.key": "ReuUrCzzu5wKWAegtswoHIWV389BYl9AB1ZQbiKr",
"fs.s3a.connection.ssl.enabled": "false",
"fs.s3a.path.style.access": "true",
"fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"
}
}
}
}
]
}
}
hdfs catalog例子
{
"paimonConfig": {
"warehouse": "hdfs://nameservice1/user/hive/paimon",
"metastore": "filesystem",
"fs.defaultFS":"hdfs://nameservice1",
"hadoop.security.authentication" : "kerberos",
"hadoop.kerberos.principal" : "hive/_HOST@XXXX.COM",
"hadoop.kerberos.keytab" : "/tmp/hive@XXXX.COM.keytab",
"ha.zookeeper.quorum" : "test-pr-nn1:2181,test-pr-nn2:2181,test-pr-nn3:2181",
"dfs.nameservices" : "nameservice1",
"dfs.namenode.rpc-address.nameservice1.namenode371" : "test-pr-nn2:8020",
"dfs.namenode.rpc-address.nameservice1.namenode265": "test-pr-nn1:8020",
"dfs.namenode.keytab.file" : "/tmp/hdfs@XXXX.COM.keytab",
"dfs.namenode.keytab.enabled" : "true",
"dfs.namenode.kerberos.principal" : "hdfs/_HOST@XXXX.COM",
"dfs.namenode.kerberos.internal.spnego.principal" : "HTTP/_HOST@XXXX.COM",
"dfs.ha.namenodes.nameservice1" : "namenode265,namenode371",
"dfs.datanode.keytab.file" : "/tmp/hdfs@XXXX.COM.keytab",
"dfs.datanode.keytab.enabled" : "true",
"dfs.datanode.kerberos.principal" : "hdfs/_HOST@XXXX.COM",
"dfs.client.use.datanode.hostname" : "false",
"dfs.client.failover.proxy.provider.nameservice1" : "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"dfs.balancer.keytab.file" : "/tmp/hdfs@XXXX.COM.keytab",
"dfs.balancer.keytab.enabled" : "true",
"dfs.balancer.kerberos.principal" : "hdfs/_HOST@XXXX.COM"
}
}
类型转换¶
Addax 内部类型 | Paimon 数据类型 |
---|---|
Integer | TINYINT,SMALLINT,INT,INTEGER |
Long | BIGINT |
Double | FLOAT,DOUBLE,DECIMAL |
String | STRING,VARCHAR,CHAR |
Boolean | BOOLEAN |
Date | DATE,TIMESTAMP |
Bytes | BINARY |