初步使用
基本使用
切换catalog
bin/sql-client.sh -s yarn-session -i conf/sql-client-init.sql
切换catalog
USE CATALOG fs_catalog;
show catalogs;
show current catalog;
DDL
管理表
在 Paimon Catalog中创建的表就是Paimon的管理表,由Catalog管理。当表从Catalog中删除时,其表文件也将被删除,类似于Hive的内部表。
创建表
CREATE TABLE test(
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED 声明了表的主键,但允许在插入数据时不强制执行主键约束。这对于某些特定的数据处理和存储需求可能是有用的。如果你需要强制执行主键约束,可以使用 ENFORCED 选项,这将阻止插入重复的主键值。
创建分区表
CREATE TABLE test_p(
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);
Create Table As
CREATE TABLE test1(
user_id BIGINT,
item_id BIGINT
);
CREATE TABLE test2 AS SELECT * FROM test1;
-- 指定分区
CREATE TABLE test2_p WITH ('partition' = 'dt') AS SELECT * FROM test_p;
-- 指定配置
CREATE TABLE test3(
user_id BIGINT,
item_id BIGINT
) WITH ('file.format' = 'orc');
CREATE TABLE test3_op WITH ('file.format' = 'parquet') AS SELECT * FROM test3;
-- 指定主键
CREATE TABLE test_pk WITH ('primary-key' = 'dt,hh') AS SELECT * FROM test;
-- 指定主键和分区
CREATE TABLE test_all WITH ('primary-key' = 'dt,hh', 'partition' = 'dt') AS SELECT * FROM test_p;
Create Table Like
create table as 会创建一个实时任务,下面这个不会。下面就是创建相同的schema。
CREATE TABLE test_ctl LIKE test;
表属性
用户可以指定表属性来启用Paimon的功能或提高Paimon的性能。
https://paimon.apache.org/docs/master/maintenance/configurations/
CREATE TABLE tbl(
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh)
WITH (
'bucket' = '2',
'bucket-key' = 'user_id'
);
外部表
hdfs dfs -mkdir -p /paimon/external/ex
外部表由Catalog记录但不管理。如果删除外部表,其表文件不会被删除,类似于Hive的外部表。Paimon 外部表可以在任何Catalog中使用。如果您不想创建Paimon Catalog而只想读/写表,则可以考虑外部表。
CREATE TABLE ex(
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) WITH (
'connector' = 'paimon',
'path' = 'hdfs://bigdatacluster/paimon/external/ex',
'auto-create' = 'true'
);
临时表
仅 Flink 支持临时表。与外部表一样,临时表只是记录,但不由当前 Flink SQL 会话管理。如果临时表被删除,其资源将不会被删除。当 Flink SQL 会话关闭时,临时表也会被删除。与外部表的区别在于,临时表在Paimon Catalog中创建。
CREATE TEMPORARY TABLE temp(
k INT,
v STRING
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://bigdatacluster/temp.csv',
'format' = 'csv'
);
修改表
- 更改/添加表属性
ALTER TABLE test SET (
'write-buffer-size' = '256 MB'
);
- 重命名表名称
ALTER TABLE test1 RENAME TO test_new;
删除表属性 ALTER TABLE test RESET ('write-buffer-size');
修改列
- 添加新列
ALTER TABLE test ADD (c1 INT, c2 STRING);
- 重命名列名称
ALTER TABLE test RENAME c1 TO c0;
- 删除列
ALTER TABLE test DROP (c0, c2);
- 更改列的可为空性
CREATE TABLE test_null(
id INT PRIMARY KEY NOT ENFORCED,
coupon_info FLOAT NOT NULL
);
-- 列coupon_info修改成允许为null
ALTER TABLE test_null MODIFY coupon_info FLOAT;
-- 列coupon_info修改成不允许为null
-- 如果表中已经有null值, 修改之前先设置如下参数删除null值
SET 'table.exec.sink.not-null-enforcer' = 'DROP';
ALTER TABLE test_null MODIFY coupon_info FLOAT NOT NULL;更改列注释
ALTER TABLE test MODIFY user_id BIGINT COMMENT 'user id';
- 添加列位置
ALTER TABLE test ADD a INT FIRST;
ALTER TABLE test ADD b INT AFTER a;
- 更改列位置
ALTER TABLE test MODIFY b INT FIRST;
ALTER TABLE test MODIFY a INT AFTER user_id;
- 更改列类型
ALTER TABLE test MODIFY a DOUBLE;
修改水印
- 添加水印
CREATE TABLE test_wm (
id INT,
name STRING,
ts BIGINT
);
ALTER TABLE test_wm ADD(
et AS to_timestamp_ltz(ts,3),
WATERMARK FOR et AS et - INTERVAL '1' SECOND
);- 修改水印
ALTER TABLE test_wm MODIFY WATERMARK FOR et AS et - INTERVAL '2' SECOND;
- 去掉水印
ALTER TABLE test_wm DROP WATERMARK;
DML
插入数据
INSERT 语句向表中插入新行或覆盖表中的现有数据。插入的行可以由值表达式或查询结果指定,跟标准的sql语法一致。目前,Flink 不支持直接使用 NULL,因此需要将 NULL 转换为实际数据类型值,比如“CAST (NULL AS STRING)”,不能将另一个表的可为空列插入到一个表的非空列中。Flink可以使用COALESCE函数来处理,比如A表的key1是not null,B表的key2nullable:
INSERT INTO A key1 SELECT COALESCE(key2, <non-null expression>) FROM B
案例
INSERT INTO test VALUES(1,1,'order','2023-07-01','1'), (2,2,'pay','2023-07-01','2');
INSERT INTO test_p PARTITION(dt='2023-07-01',hh='1') VALUES(3,3, 'pv');
-- 执行模式区分流、批
INSERT INTO test_p SELECT * from test;
Paimon支持在sink阶段通过partition和bucket对数据进行shuffle。
覆盖数据
覆盖数据只支持batch模式。默认情况下,流式读取将忽略 INSERT OVERWRITE 生成的提交。如果你想读取OVERWRITE的提交,你可以配置streaming-read-overwrite。
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';
- 覆盖未分区的表
INSERT OVERWRITE test VALUES(3,3,'pay','2023-07-01','2');
- 覆盖分区表
对于分区表,Paimon默认的覆盖模式是动态分区覆盖(即Paimon只删除insert overwrite数据中出现的分区)。您可以配置动态分区覆盖来更改它。
INSERT OVERWRITE test_p SELECT * from test;
- 覆盖指定分区:
INSERT OVERWRITE test_p PARTITION (dt = '2023-07-01', hh = '2') SELECT user_id,item_id,behavior from test;
- 清空表
可以使用 INSERT OVERWRITE 通过插入空值来清除表(关闭动态分区覆盖)。
INSERT OVERWRITE test_p/*+ OPTIONS('dynamic-partition-overwrite'='false') */ SELECT * FROM test_p WHERE false;
更新数据
目前,Paimon 在 Flink 1.17 及后续版本中支持使用 UPDATE 更新记录。您可以在Flink的批处理模式下执行UPDATE。只有主键表支持此功能。不支持更新主键。MergeEngine 需要deduplicate或partial-update才能支持此功能。(默认deduplicate)
UPDATE test SET item_id = 4, behavior = 'pv' WHERE user_id = 3;
删除数据
从表中删除(Flink 1.17):
- 只有写入模式设置为change-log的表支持此功能。(有主键默认就是change-log)
- 如果表有主键,MergeEngine需要为deduplicate。(默认deduplicate)
DELETE FROM test WHERE user_id = 3;
DQL
批量查询
Paimon的批量读取返回表快照中的所有数据。默认情况下,批量读取返回最新快照。在sql-client中,设置执行模式为批即可:
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';
时间旅行
- 读取指定id的快照
SELECT * FROM test /*+ OPTIONS('scan.snapshot-id' = '1') */;
SELECT * FROM test /*+ OPTIONS('scan.snapshot-id' = '2') */;
- 读取指定时间戳的快照
-- 查看快照信息
SELECT * FROM test&snapshots;
SELECT * FROM test /*+ OPTIONS('scan.timestamp-millis' = '1688369660841') */;
- 读取指定标签
SELECT * FROM test /*+ OPTIONS('scan.tag-name' = 'my-tag') */;
测试时间旅行
CREATE TABLE luxing(
user_id BIGINT
);
写入数据
INSERT INTO luxing values(1);
查询数据
SELECT * FROM luxing /*+ OPTIONS('scan.snapshot-id' = '1') */;
SELECT * FROM luxing /*+ OPTIONS('scan.snapshot-id' = '2') */;
增量查询
读取开始快照(不包括)和结束快照之间的增量更改。例如,“3,5”表示快照 3 和快照 5 之间的更改:
SELECT * FROM test /*+ OPTIONS('incremental-between' = '3,5') */;
在batch模式中,不返回DELETE记录,因此-D的记录将被删除。如果你想查看DELETE记录,可以查询audit_log表:
SELECT * FROM test$audit_log /*+ OPTIONS('incremental-between' = '3,5') */;
流式查询
默认情况下,Streaming read 在第一次启动时会生成表上的最新快照,并继续读取最新的更改。
SET 'execution.checkpointing.interval'='30s';
SET 'execution.runtime-mode' = 'streaming';
也可以从最新读取,设置扫描模式:
SELECT * FROM ws_t /*+ OPTIONS('scan.mode' = 'latest') */