跳到主要内容

初步使用

基本使用

切换catalog

bin/sql-client.sh -s yarn-session -i conf/sql-client-init.sql

切换catalog

USE CATALOG fs_catalog;
show catalogs;
show current catalog;

DDL

管理表

在 Paimon Catalog中创建的表就是Paimon的管理表,由Catalog管理。当表从Catalog中删除时,其表文件也将被删除,类似于Hive的内部表。

创建表

CREATE TABLE test(
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);

PRIMARY KEY (dt, hh, user_id) NOT ENFORCED 声明了表的主键,但允许在插入数据时不强制执行主键约束。这对于某些特定的数据处理和存储需求可能是有用的。如果你需要强制执行主键约束,可以使用 ENFORCED 选项,这将阻止插入重复的主键值。

创建分区表

CREATE TABLE test_p(
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);

Create Table As

CREATE TABLE test1(
user_id BIGINT,
item_id BIGINT
);
CREATE TABLE test2 AS SELECT * FROM test1;

-- 指定分区
CREATE TABLE test2_p WITH ('partition' = 'dt') AS SELECT * FROM test_p;

-- 指定配置
CREATE TABLE test3(
user_id BIGINT,
item_id BIGINT
) WITH ('file.format' = 'orc');

CREATE TABLE test3_op WITH ('file.format' = 'parquet') AS SELECT * FROM test3;

-- 指定主键
CREATE TABLE test_pk WITH ('primary-key' = 'dt,hh') AS SELECT * FROM test;


-- 指定主键和分区
CREATE TABLE test_all WITH ('primary-key' = 'dt,hh', 'partition' = 'dt') AS SELECT * FROM test_p;

Create Table Like

create table as 会创建一个实时任务,下面这个不会。下面就是创建相同的schema。

CREATE TABLE test_ctl LIKE test;

表属性

用户可以指定表属性来启用Paimon的功能或提高Paimon的性能。

https://paimon.apache.org/docs/master/maintenance/configurations/

CREATE TABLE tbl(
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh)
WITH (
'bucket' = '2',
'bucket-key' = 'user_id'
);

外部表

hdfs dfs -mkdir -p /paimon/external/ex

外部表由Catalog记录但不管理。如果删除外部表,其表文件不会被删除,类似于Hive的外部表。Paimon 外部表可以在任何Catalog中使用。如果您不想创建Paimon Catalog而只想读/写表,则可以考虑外部表。

CREATE TABLE ex(
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) WITH (
'connector' = 'paimon',
'path' = 'hdfs://bigdatacluster/paimon/external/ex',
'auto-create' = 'true'
);

临时表

仅 Flink 支持临时表。与外部表一样,临时表只是记录,但不由当前 Flink SQL 会话管理。如果临时表被删除,其资源将不会被删除。当 Flink SQL 会话关闭时,临时表也会被删除。与外部表的区别在于,临时表在Paimon Catalog中创建。

CREATE TEMPORARY TABLE temp(
k INT,
v STRING
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://bigdatacluster/temp.csv',
'format' = 'csv'
);

修改表

  1. 更改/添加表属性
ALTER TABLE test SET (
'write-buffer-size' = '256 MB'
);
  1. 重命名表名称
ALTER TABLE test1 RENAME TO test_new;
  1. 删除表属性 ALTER TABLE test RESET ('write-buffer-size');

  2. 修改列

    • 添加新列
    ALTER TABLE test ADD (c1 INT, c2 STRING);
    • 重命名列名称
    ALTER TABLE test RENAME c1 TO c0;
    • 删除列
    ALTER TABLE test DROP (c0, c2);
    • 更改列的可为空性
    CREATE TABLE test_null(
    id INT PRIMARY KEY NOT ENFORCED,
    coupon_info FLOAT NOT NULL
    );

    -- 列coupon_info修改成允许为null
    ALTER TABLE test_null MODIFY coupon_info FLOAT;

    -- 列coupon_info修改成不允许为null

    -- 如果表中已经有null值, 修改之前先设置如下参数删除null值
    SET 'table.exec.sink.not-null-enforcer' = 'DROP';
    ALTER TABLE test_null MODIFY coupon_info FLOAT NOT NULL;
  3. 更改列注释

ALTER TABLE test MODIFY user_id BIGINT COMMENT 'user id';
  1. 添加列位置
ALTER TABLE test ADD a INT FIRST;

ALTER TABLE test ADD b INT AFTER a;
  1. 更改列位置
ALTER TABLE test MODIFY b INT FIRST;

ALTER TABLE test MODIFY a INT AFTER user_id;
  1. 更改列类型
ALTER TABLE test MODIFY a DOUBLE;
  1. 修改水印

    • 添加水印
    CREATE TABLE test_wm (
    id INT,
    name STRING,
    ts BIGINT
    );


    ALTER TABLE test_wm ADD(
    et AS to_timestamp_ltz(ts,3),
    WATERMARK FOR et AS et - INTERVAL '1' SECOND
    );
    • 修改水印
    ALTER TABLE test_wm MODIFY WATERMARK FOR et AS et - INTERVAL '2' SECOND;
    • 去掉水印
    ALTER TABLE test_wm DROP WATERMARK;

DML

插入数据

INSERT 语句向表中插入新行或覆盖表中的现有数据。插入的行可以由值表达式或查询结果指定,跟标准的sql语法一致。目前,Flink 不支持直接使用 NULL,因此需要将 NULL 转换为实际数据类型值,比如“CAST (NULL AS STRING)”,不能将另一个表的可为空列插入到一个表的非空列中。Flink可以使用COALESCE函数来处理,比如A表的key1是not null,B表的key2nullable:

INSERT INTO A key1 SELECT COALESCE(key2, <non-null expression>) FROM B

案例

INSERT INTO test VALUES(1,1,'order','2023-07-01','1'), (2,2,'pay','2023-07-01','2');
INSERT INTO test_p PARTITION(dt='2023-07-01',hh='1') VALUES(3,3, 'pv');

-- 执行模式区分流、批
INSERT INTO test_p SELECT * from test;

Paimon支持在sink阶段通过partition和bucket对数据进行shuffle。

覆盖数据

覆盖数据只支持batch模式。默认情况下,流式读取将忽略 INSERT OVERWRITE 生成的提交。如果你想读取OVERWRITE的提交,你可以配置streaming-read-overwrite。

RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';
  • 覆盖未分区的表
INSERT OVERWRITE test VALUES(3,3,'pay','2023-07-01','2');
  • 覆盖分区表

对于分区表,Paimon默认的覆盖模式是动态分区覆盖(即Paimon只删除insert overwrite数据中出现的分区)。您可以配置动态分区覆盖来更改它。

INSERT OVERWRITE test_p SELECT * from test;
  • 覆盖指定分区:
INSERT OVERWRITE test_p PARTITION (dt = '2023-07-01', hh = '2') SELECT user_id,item_id,behavior from test;
  • 清空表

可以使用 INSERT OVERWRITE 通过插入空值来清除表(关闭动态分区覆盖)。

INSERT OVERWRITE test_p/*+ OPTIONS('dynamic-partition-overwrite'='false') */ SELECT * FROM test_p WHERE false;

更新数据

目前,Paimon 在 Flink 1.17 及后续版本中支持使用 UPDATE 更新记录。您可以在Flink的批处理模式下执行UPDATE。只有主键表支持此功能。不支持更新主键。MergeEngine 需要deduplicate或partial-update才能支持此功能。(默认deduplicate)

UPDATE test SET item_id = 4, behavior = 'pv' WHERE user_id = 3;

删除数据

从表中删除(Flink 1.17):

  • 只有写入模式设置为change-log的表支持此功能。(有主键默认就是change-log)
  • 如果表有主键,MergeEngine需要为deduplicate。(默认deduplicate)
DELETE FROM test WHERE user_id = 3;

DQL

批量查询

Paimon的批量读取返回表快照中的所有数据。默认情况下,批量读取返回最新快照。在sql-client中,设置执行模式为批即可:

RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';

时间旅行

  • 读取指定id的快照
SELECT * FROM test /*+ OPTIONS('scan.snapshot-id' = '1') */;
SELECT * FROM test /*+ OPTIONS('scan.snapshot-id' = '2') */;
  • 读取指定时间戳的快照
-- 查看快照信息
SELECT * FROM test&snapshots;

SELECT * FROM test /*+ OPTIONS('scan.timestamp-millis' = '1688369660841') */;
  • 读取指定标签
SELECT * FROM test /*+ OPTIONS('scan.tag-name' = 'my-tag') */;

测试时间旅行

CREATE TABLE luxing(
user_id BIGINT
);

写入数据

INSERT INTO luxing values(1);

查询数据

SELECT * FROM luxing /*+ OPTIONS('scan.snapshot-id' = '1') */;
SELECT * FROM luxing /*+ OPTIONS('scan.snapshot-id' = '2') */;

增量查询

读取开始快照(不包括)和结束快照之间的增量更改。例如,“3,5”表示快照 3 和快照 5 之间的更改:

SELECT * FROM test /*+ OPTIONS('incremental-between' = '3,5') */;

在batch模式中,不返回DELETE记录,因此-D的记录将被删除。如果你想查看DELETE记录,可以查询audit_log表:

SELECT * FROM test$audit_log /*+ OPTIONS('incremental-between' = '3,5') */;

流式查询

默认情况下,Streaming read 在第一次启动时会生成表上的最新快照,并继续读取最新的更改。

SET 'execution.checkpointing.interval'='30s';
SET 'execution.runtime-mode' = 'streaming';

也可以从最新读取,设置扫描模式:

SELECT * FROM ws_t /*+ OPTIONS('scan.mode' = 'latest') */