大数据知识体系
首页
数据结构与算法
  • JVM
  • Java
  • Scala
  • Python
设计模式
  • MySQL
  • Redis
  • HDFS
  • HBase
  • ClickHouse
  • ElasticSearch
  • Iceberg
  • Hudi
  • Spark
  • Flink
  • Hive
  • Yarn
  • Zookeeper
  • Maven
  • Git
  • 数据仓库
  • 用户画像
  • 指标体系
数据治理
关于
首页
数据结构与算法
  • JVM
  • Java
  • Scala
  • Python
设计模式
  • MySQL
  • Redis
  • HDFS
  • HBase
  • ClickHouse
  • ElasticSearch
  • Iceberg
  • Hudi
  • Spark
  • Flink
  • Hive
  • Yarn
  • Zookeeper
  • Maven
  • Git
  • 数据仓库
  • 用户画像
  • 指标体系
数据治理
关于
  • 数据湖介绍
  • Iceberg

    • Iceberg 概述
    • Iceberg 数据类型
    • Iceberg 演化
    • Iceberg 分区
    • Iceberg 文件组织形式
    • Iceberg Spark 入门
    • Iceberg Spark DDL
    • Iceberg Spark DQL
    • Iceberg Spark SQL 存储过程
      • 语法
      • 使用
        • 快照管理
        • 元数据管理
        • 表迁移
  • Hudi

    • Hudi 概述
  • 数据湖
  • Iceberg
Will
2022-03-17
目录

Iceberg Spark SQL 存储过程

Iceberg 0.11.0 及之后的版本中,对原生的 Spark 进行了一些扩展,主要在 SQL 命令中新增了存储过程和部分Alter Table语法。如果要使用扩展功能,需要在 Spark 中新增以下配置项:

spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
1

本节介绍存储过程部分。

存储过程(Procedure)是数据库领域的概念,类似于编程语言中的方法或函数,是对实现特定操作的封装,原生的 Spark SQL 中是不支持存储过程的,Iceberg 对其进行了扩展,并提供了部分存储过程的实现。Iceberg 中提供的所有存储过程都在systemnamespace 下。

# 语法

call catalog_name.system.procedure_name(args...)
1

其中catalog_name为 Iceberg 配置的 catalog 名称;procedure_name为需要调用的存储过程名称;system为固定写法;args为存储过程参数,可以有多个。

有两种方式进行参数传递:一种是基于参数位置,一种是基于参数名称。如:

-- 基于参数位置
call catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n);

-- 基于参数名称
call catalog_name.system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1);
1
2
3
4
5

# 使用

Iceberg 原生支持的存储过程可以按照其功能分为三大类:

  • 快照管理
  • 元数据管理
  • 表迁移

# 快照管理

  1. rollback_to_snapshot

将表回滚到特定的快照 ID。

参数:

  • table:string,必须,表名。
  • snapshot_id:long,必须,快照 ID。

输出:

  • previous_snapshot_id:long,回滚前的快照 ID。
  • current_snapshot_id:long,回滚后的快照 ID。

示例:

call catalog_name.system.rollback_to_snapshot('db.sample', 1);
1
  1. rollback_to_timestamp

将表回滚到特定时间的快照。

参数:

  • table:string,必须,表名。
  • timestamp:timestamp,必须,回滚的时间戳。

输出:

  • previous_snapshot_id:long,回滚前的快照 ID;
  • current_snapshot_id:long,回滚后的快照 ID。

示例:

call catalog_name.system.rollback_to_timestamp('db.sample', TIMESTAMP '2021-06-30 00:00:00.000');
1
  1. set_current_snapshot

设置表的当前快照 ID。与回滚不同的是,set_current_snapshot可以在各个快照之间任意穿梭。

参数:

  • table:string,必须,表名。
  • snapshot_id:long,必须,快照 ID。

输出:

  • previous_snapshot_id:long,回滚前的快照 ID;
  • current_snapshot_id:long,回滚后的快照 ID。

示例:

call catalog_name.system.set_current_snapshot('db.sample', 1);
1
  1. cherrypick_snapshot

从快照 cherry-pick 到当前表状态。从现有快照创建新快照,而不更改或删除原始快照。只追加和动态覆盖可以 cherry-pick 的快照。

参数:

  • table:string,必须,表名
  • snapshot_id:long,必须,快照 ID。

输出:

  • source_snapshot_id:long,cherry-pick 之前的快照 ID;
  • current_snapshot_id:long,通过 cherry-pick 创建的快照 ID。
  • CALL catalog_name.system.cherrypick_snapshot('my_table', 1)

# 元数据管理

  1. expire_snapshots

删除过期快照和相关数据文件。

参数:

  • table:string,必须,表名。
  • older_than:timestamp,非必须,该时间戳之前的快照将被删除,默认为 5 天前。
  • retain_last:int,非必须,和 older_than 同时存在的时候,要保留的快照数(默认为 1)。
  • max_concurrent_deletes:int,非必须,用于删除文件操作的线程池大小(默认不使用线程池)。

输出:

  • deleted_data_files_count:long,删除的数据文件数。
  • deleted_manifest_files_count:long,删除的 manifest 文件数。
  • deleted_manifest_lists_count:删除的 manifest list 文件数。

示例:

call hive_prod.system.expire_snapshots('db.sample', TIMESTAMP '2021-06-30 00:00:00.000', 100);
1
  1. remove_orphan_files

删除 Iceberg 元数据文件中未被引用的“孤儿”文件。 参数:

  • table:string,必须,表名。
  • older_than:timestamp,非必须,删除在此时间戳之前创建的孤立文件(默认为 3 天前)。
  • location:string,非必须,查找文件的目录(默认为表的位置)。
  • dry_run:boolean,非必须,当为 true 时,实际上不删除文件(默认为 false)。
  • max_concurrent_deletes:int,非必须,用于删除文件操作的线程池大小(默认不使用线程池)。

输出:

  • orphan_file_location:string,被确定为孤立文件的每个文件的路径。

示例:

call catalog_name.system.remove_orphan_files(table => 'db.sample', location => 'tablelocation/data');
1
  1. rewrite_data_files

合并小文件,加速文件扫描。

参数:

  • table:string,必须,表名。
  • strategy:string,非必须,合并策略binpack或sort。默认为binpack。
  • sort_order:string,非必须,用以描述排序方式,多个字段之间用逗号分隔。如:name asc nulls last, age desc nulls first。
  • options:map<string, string>,非必须,用以重写文件时的其它参数。
  • where:string,非必须,指定过滤条件。

输出:

  • rewritten_data_files_count:被重写的文件数量。
  • added_data_files_count:此命令写入的新数据文件数。

示例:

call catalog_name.system.rewrite_data_files('db.sample');
call catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'id DESC NULLS LAST,name ASC NULLS FIRST');
call catalog_name.system.rewrite_data_files(table => 'db.sample', options => map('min-input-files','2'));
call catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"');
1
2
3
4
  1. rewrite_manifests

重写 manifest 文件来优化扫描计划。

参数:

  • table:string,必须,表名。
  • use_caching:boolean,非必须,是否使用 Spark 缓存(默认为 true)。

输出:

  • rewritten_manifests_count:int,被重写的 manifest 文件数。
  • added_mainfests_count:int,新生成的 manifest 文件数。

示例:

call catalog_name.system.rewrite_manifests('db.sample', false)
1
  1. ancestors_of

获取指定快照的血缘关系。

参数:

  • table:string,必须,表名。
  • snapshot_id:long,非必须,指定的快照 ID。

输出:

  • snapshot_id:long,祖先快照 ID。
  • timestamp:long,快照生成时间。

示例:

call spark_catalog.system.ancestors_of('db.tbl', 1);
call spark_catalog.system.ancestors_of(snapshot_id => 1, table => 'db.tbl');
1
2

# 表迁移

  1. snapshots

在不影响原始表的情况下创建一个新的轻量级快照表用以测试,测试结束之后可以通过drop table删除掉。如果在新的快照表中没有插入新数据,则依然使用的是原始表的数据文件,如果插入操作,则新的数据文件放在快照表的数据目录下,不会影响原始表。

在新表中任何只影响元数据的操作都是允许的,如 inset、delete。但是会影响到物理数据文件的操作是禁止的,如删除过期快照(expire_snapshots),因为会影响到原始表。

参数:

  • source_table:string,必须,原始表表名。
  • table:string,必须,新表(快照表)表名。
  • location:string,非必须,新表的存储目录,默认有 catalog 管理。
  • properties:map<string, string>,非必须,添加到新表中的属性。

输出:

  • imported_files_count:long,添加到新表的文件数。

示例:

call catalog_name.system.snapshot('db.sample', 'db.snap');
call catalog_name.system.snapshot('db.sample', 'db.snap', '/tmp/temptable/');
1
2
  1. migrate

将一个非 Iceberg 表转为 Iceberg 表,原始表中的文件会加载到新的 Iceberg 表中,原始表中的 schema 信息、partition 信息、属性信息以及位置都会拷贝至新表。

提示

迁移成功的前提是新表兼容原始表使用的文件格式。

参数:

  • table:string,必须,需要迁移的原始表。
  • properties:map<string, string>,非必须,新 Iceberg 表的属性。

输出:

  • migrated_files_count:long,添加到新 Iceberg 表中的数据文件数。

示例:

call catalog_name.system.migrate('db.sample');
1
  1. add_files

将 Hive 或其它基于文件的表中的数据文件添加到指定的 Iceberg 表中,可以从一个或多个分区导入文件。add_files只会为需要导入的数据文件增加元数据信息,并不会物理地移动数据文件,而且不会考虑导入文件的 Scheme 信息是否和 Iceberg 表匹配。

注意

  1. expire_snapshot等可能删除物理文件的操作同样会删除新导入的文件。
  2. 导入过程中不会验证表的 Schema 信息,如果源表和目标表的 Schema 不匹配可能会带来新的问题。

参数:

  • table:string,必须,目标表,数据被导入其中。
  • source_table:string,必须,源表,提供需要被导入的数据。如果是 Hive 或 Spark 中的表,可以为db.tbname;如果文件,可以为file_format.path。
  • partition_filter:map<string, string>,非必须,要从中导入的源表中的分区集合。

示例:

-- 将 db.src_tbl 表(非Iceberg表)中分区 year = 2022 的数据添加到 db.tbl 中
call spark_catalog.system.add_files(
    table => 'db.tbl',
    source_table => 'db.src_tbl',
    partition_filter => map('year', '2022')
)

-- 将 path/to/table 目录中 格式为 parquet 的文件添加到 db.tbl 表中,
call spark_catalog.system.add_files(
    table => 'db.tbl',
    source_table => '`parquet`.`path/to/table`'
)
1
2
3
4
5
6
7
8
9
10
11
12
上次更新: 2023/11/01, 03:11:44

← Iceberg Spark DQL Hudi 概述→

Theme by Vdoing | Copyright © 2022-2023 Will 蜀ICP备2022002285号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式