Skip to content

Paimon Writer

Paimon Writer 提供向 已有的paimon表写入数据的能力。

配置样例

json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0
      }
    },
    "content": [
      {
        "reader": {
          "name": "rdbmsreader",
          "parameter": {
            "username": "root",
            "password": "root",
            "column": [
              "*"
            ],
            "connection": [
              {
                "querySql": [
                  "select 1+0 id ,'test1' as name"
                ],
                "jdbcUrl": ["jdbc:mysql://localhost:3306/ruoyi_vue_camunda?allowPublicKeyRetrieval=true",]
              }
            ],
            "fetchSize": 1024
          }
        },
        "writer": {
          "name": "paimonwriter",
          "parameter": {
            "dbName": "test",
            "tableName": "test2",
            "writeMode": "truncate",
            "paimonConfig": {
              "warehouse": "file:///g:/paimon",
              "metastore": "filesystem"
            }
          }
        }
      }
    ]
  }
}

参数说明

配置项是否必须数据类型默认值说明
dbNamestring要写入的paimon数据库名
tableNamestring要写入的paimon表名
writeModestring写入模式,详述见下
paimonConfigjson{}里可以配置与 Paimon catalog和Hadoop 相关的一些高级参数,比如HA的配置

writeMode

写入前数据清理处理模式:

  • append,写入前不做任何处理,直接写入,不清除原来的数据。
  • truncate 写入前先清空表,再写入。

paimonConfig

paimonConfig 里可以配置与 Paimon catalog和Hadoop 相关的一些高级参数,比如HA的配置

本地目录创建paimon表

Details
xml
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

    <groupId>com.test</groupId>
    <artifactId>paimon-java-api-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <hadoop.version>3.2.4</hadoop.version>
        <woodstox.version>7.0.0</woodstox.version>
    </properties>

<dependencies>
    <dependency>
        <groupId>org.apache.paimon</groupId>
        <artifactId>paimon-bundle</artifactId>
        <version>1.0.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>${hadoop.version}</version>
        <exclusions>
            <exclusion>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.codehaus.jackson</groupId>
                <artifactId>jackson-core-asl</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.codehaus.jackson</groupId>
                <artifactId>jackson-mapper-asl</artifactId>
            </exclusion>
            <exclusion>
                <groupId>com.fasterxml.woodstox</groupId>
                <artifactId>woodstox-core</artifactId>
            </exclusion>
            <exclusion>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
            </exclusion>
            <exclusion>
                <groupId>commons-net</groupId>
                <artifactId>commons-net</artifactId>
            </exclusion>
            <exclusion>
                <groupId>io.netty</groupId>
                <artifactId>netty</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
                <groupId>net.minidev</groupId>
                <artifactId>json-smart</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.codehaus.jettison</groupId>
                <artifactId>jettison</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.eclipse.jetty</groupId>
                <artifactId>jetty-server</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.xerial.snappy</groupId>
                <artifactId>snappy-java</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.eclipse.jetty</groupId>
                <artifactId>jetty-util</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>${hadoop.version}</version>
        <exclusions>
            <exclusion>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.codehaus.jackson</groupId>
                <artifactId>jackson-core-asl</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.codehaus.jackson</groupId>
                <artifactId>jackson-mapper-asl</artifactId>
            </exclusion>
            <exclusion>
                <groupId>com.fasterxml.woodstox</groupId>
                <artifactId>woodstox-core</artifactId>
            </exclusion>
            <exclusion>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
            </exclusion>
            <exclusion>
                <groupId>commons-net</groupId>
                <artifactId>commons-net</artifactId>
            </exclusion>
            <exclusion>
                <groupId>io.netty</groupId>
                <artifactId>netty</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
                <groupId>net.minidev</groupId>
                <artifactId>json-smart</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.codehaus.jettison</groupId>
                <artifactId>jettison</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.eclipse.jetty</groupId>
                <artifactId>jetty-server</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.xerial.snappy</groupId>
                <artifactId>snappy-java</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.eclipse.jetty</groupId>
                <artifactId>jetty-util</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>${hadoop.version}</version>
        <exclusions>
            <exclusion>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
            </exclusion>
            <exclusion>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
            </exclusion>
            <exclusion>
                <groupId>io.netty</groupId>
                <artifactId>netty</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.eclipse.jetty</groupId>
                <artifactId>jetty-util</artifactId>
            </exclusion>
        </exclusions>
    </dependency>


    <dependency>
        <groupId>com.fasterxml.woodstox</groupId>
        <artifactId>woodstox-core</artifactId>
        <version>${woodstox.version}</version>
    </dependency>

</dependencies>
</project>

程序代码

Details
java
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;

import java.util.HashMap;
import java.util.Map;

public class CreatePaimonTable {

    public static Catalog createFilesystemCatalog() {
        CatalogContext context = CatalogContext.create(new Path("file:///g:/paimon"));
        return CatalogFactory.createCatalog(context);
    }
    /* 如果是minio则例子如下

     public static Catalog createFilesystemCatalog() {
        Options options = new Options();
        options.set("warehouse", "s3a://pvc-91d1e2cd-4d25-45c9-8613-6c4f7bf0a4cc/paimon");
        Configuration hadoopConf = new Configuration();
        hadoopConf.set("fs.s3a.endpoint", "http://localhost:9000");
        hadoopConf.set("fs.s3a.access.key", "gy0dX5lALP176g6c9fYf");
        hadoopConf.set("fs.s3a.secret.key", "ReuUrCzzu5wKWAegtswoHIWV389BYl9AB1ZQbiKr");
        hadoopConf.set("fs.s3a.connection.ssl.enabled", "false");
        hadoopConf.set("fs.s3a.path.style.access", "true");
        hadoopConf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem");
        CatalogContext context = CatalogContext.create(options,hadoopConf);


        return CatalogFactory.createCatalog(context);
    }
     *
     *
     * */

    public static void main(String[] args) {
        Schema.Builder schemaBuilder = Schema.newBuilder();
        schemaBuilder.primaryKey("id");
        schemaBuilder.column("id", DataTypes.INT());
        schemaBuilder.column("name", DataTypes.STRING());
        Map<String, String> options = new HashMap<>();
        options.put("bucket", "1");//由于paimon java api 限制需要bucket>0
        options.put("bucket-key", "id");
        options.put("file.format", "orc");
        options.put("file.compression", "lz4");
        options.put("lookup.cache-spill-compression", "lz4");
        options.put("spill-compression", "LZ4");
        options.put("orc.compress", "lz4");
        options.put("manifest.format", "orc");

        schemaBuilder.options(options);
        Schema schema = schemaBuilder.build();

        Identifier identifier = Identifier.create("test", "test2");
        try {
            Catalog catalog = CreatePaimonTable.createFilesystemCatalog();
            catalog.createDatabase("test",true);
            catalog.createTable(identifier, schema, true);
        } catch (Catalog.TableAlreadyExistException e) {
            e.printStackTrace();
        } catch (Catalog.DatabaseNotExistException e) {
            e.printStackTrace();
        } catch (Catalog.DatabaseAlreadyExistException e) {
            throw new RuntimeException(e);
        }


    }
}

Spark 或者 flink 环境创建表

sql
CREATE TABLE if not exists test.test2(id int ,name string)  tblproperties (
    'primary-key' = 'id',
    'bucket' = '1',
    'bucket-key' = 'id'
    'file.format'='orc',
    'file.compression'='lz4',
    'lookup.cache-spill-compression'='lz4',
    'spill-compression'='LZ4',
    'orc.compress'='lz4',
    'manifest.format'='orc'
)

本地文件例子

json
{
  "name": "paimonwriter",
  "parameter": {
    "dbName": "test",
    "tableName": "test2",
    "writeMode": "truncate",
    "paimonConfig": {
      "warehouse": "file:///g:/paimon",
      "metastore": "filesystem"
    }
  }
}

s3 或者 minio catalog例子

json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0
      }
    },
    "content": [
      {
        "reader": {
          "name": "rdbmsreader",
          "parameter": {
            "username": "root",
            "password": "root",
            "column": ["*"],
            "connection": [
              {
                "querySql": ["select 1+0 id ,'test1' as name"],
                "jdbcUrl": [
                  "jdbc:mysql://localhost:3306/ruoyi_vue_camunda?allowPublicKeyRetrieval=true"
                ]
              }
            ],
            "fetchSize": 1024
          }
        },
        "writer": {
          "name": "paimonwriter",
          "parameter": {
            "dbName": "test",
            "tableName": "test2",
            "writeMode": "truncate",
            "paimonConfig": {
              "warehouse": "s3a://pvc-91d1e2cd-4d25-45c9-8613-6c4f7bf0a4cc/paimon",
              "metastore": "filesystem",
              "fs.s3a.endpoint": "http://localhost:9000",
              "fs.s3a.access.key": "gy0dX5lALP176g6c9fYf",
              "fs.s3a.secret.key": "ReuUrCzzu5wKWAegtswoHIWV389BYl9AB1ZQbiKr",
              "fs.s3a.connection.ssl.enabled": "false",
              "fs.s3a.path.style.access": "true",
              "fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"
            }
          }
        }
      }
    ]
  }
}

hdfs catalog例子

json
{
  "paimonConfig": {
    "warehouse": "hdfs://nameservice1/user/hive/paimon",
    "metastore": "filesystem",
    "fs.defaultFS": "hdfs://nameservice1",
    "hadoop.security.authentication": "kerberos",
    "hadoop.kerberos.principal": "hive/[email protected]",
    "hadoop.kerberos.keytab": "/tmp/[email protected]",
    "ha.zookeeper.quorum": "test-pr-nn1:2181,test-pr-nn2:2181,test-pr-nn3:2181",
    "dfs.nameservices": "nameservice1",
    "dfs.namenode.rpc-address.nameservice1.namenode371": "test-pr-nn2:8020",
    "dfs.namenode.rpc-address.nameservice1.namenode265": "test-pr-nn1:8020",
    "dfs.namenode.keytab.file": "/tmp/[email protected]",
    "dfs.namenode.keytab.enabled": "true",
    "dfs.namenode.kerberos.principal": "hdfs/[email protected]",
    "dfs.namenode.kerberos.internal.spnego.principal": "HTTP/[email protected]",
    "dfs.ha.namenodes.nameservice1": "namenode265,namenode371",
    "dfs.datanode.keytab.file": "/tmp/[email protected]",
    "dfs.datanode.keytab.enabled": "true",
    "dfs.datanode.kerberos.principal": "hdfs/[email protected]",
    "dfs.client.use.datanode.hostname": "false",
    "dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
    "dfs.balancer.keytab.file": "/tmp/[email protected]",
    "dfs.balancer.keytab.enabled": "true",
    "dfs.balancer.kerberos.principal": "hdfs/[email protected]"
  }
}

类型转换

Addax 内部类型Paimon 数据类型
IntegerTINYINT,SMALLINT,INT,INTEGER
LongBIGINT
DoubleFLOAT,DOUBLE,DECIMAL
StringSTRING,VARCHAR,CHAR
BooleanBOOLEAN
DateDATE,TIMESTAMP
BytesBINARY