Iceberg Data Lake on Amazon S3 with AWS Glue Catalog
Apache Iceberg is a high-performance open table format for analytic datasets. Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for engines like Spark, Trino, Flink, Presto, Hive and Impala to safely work with the same tables, at the same time. Iceberg provides ACID compliance, Schema evolution, Time travel for data lakes.
Spark is currently the most feature-rich compute engine for Iceberg operations. Iceberg has several catalog back-ends that can be used to track tables like JDBC, Hive MetaStore and Glue.
In this article, we will explore how to set up an Iceberg data lake on Amazon S3, leveraging the AWS Glue Catalog for metadata management. We will demonstrate this using a local Spark engine, as described in the Setup Apache Spark with Jupyter Notebook on MacOS.
Prerequisites
- Spark Version: 3.5.1
- Scala Version: 2.12
Iceberg allows the use of AWS Glue as the Catalog implementation, where an Iceberg namespace is stored as a Glue Database and an Iceberg table is stored as a Glue Table.
AWS S3 Bucket Structure
Our demo S3 bucket named iceberg-datalake-demo
contains the following directories:
- raw
- stg
- transform
- warehouse
AWS Glue Data Catalog
- Database Name:
iceberg_stg
- Location URI:
s3://iceberg-datalake-demo/stg/
IAM Policy
First, we create an IAM policy to grant necessary permissions:
iceberg-datalake-demo-policy
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"glue:SearchTables",
"glue:BatchCreatePartition",
"glue:CreatePartitionIndex",
"glue:DeleteDatabase",
"glue:GetTableVersions",
"glue:GetPartitions",
"glue:DeleteTableVersion",
"glue:UpdateTable",
"glue:DeleteTable",
"glue:DeletePartitionIndex",
"glue:GetTableVersion",
"glue:UpdateColumnStatisticsForTable",
"glue:CreatePartition",
"glue:UpdateDatabase",
"glue:CreateTable",
"glue:GetTables",
"glue:GetDatabases",
"glue:GetTable",
"glue:GetDatabase",
"glue:GetPartition",
"glue:UpdateColumnStatisticsForPartition",
"glue:CreateDatabase",
"glue:BatchDeleteTableVersion",
"glue:BatchDeleteTable",
"glue:DeletePartition",
"glue:GetUserDefinedFunctions"
],
"Resource": [
"arn:aws:glue:eu-central-1:123456789012:catalog",
"arn:aws:glue:eu-central-1:123456789012:table/iceberg_stg/*",
"arn:aws:glue:eu-central-1:123456789012:database/iceberg_stg"
],
"Effect": "Allow"
},
{
"Action": [
"glue:SearchTables",
"glue:GetTableVersions",
"glue:GetPartitions",
"glue:GetTableVersion",
"glue:GetTables",
"glue:GetDatabases",
"glue:GetTable",
"glue:GetDatabase",
"glue:GetPartition"
],
"Resource": [
"arn:aws:glue:eu-central-1:123456789012:table/datalake_raw/*",
"arn:aws:glue:eu-central-1:123456789012:database/datalake_raw",
"arn:aws:glue:eu-central-1:123456789012:database/default",
"arn:aws:glue:eu-central-1:123456789012:database/global_temp"
],
"Effect": "Allow"
},
{
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket",
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::iceberg-datalake-demo"
],
"Effect": "Allow"
}
]
}
Create an IAM user iceberg-datalake-demo-user
and attach the above policy. Generate the access keys.
Setting Up the Environment
Open terminal to set up environment variables and create a Jupyter Notebook
export AWS_ACCESS_KEY="XXXX"
export AWS_SECRET_ACCESS_KEY="XXXX
jupyter lab
Jupyter Notebook Configuration
Import Python Packages
import pyspark
from pyspark.sql import SparkSession
import os
import warnings
from IPython.core.display import HTML
warnings.filterwarnings("ignore")
display(HTML("<style>pre { white-space: pre !important; }</style>"))
AWS Credentials
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY")
AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY")
Variables
catalog_name = "iceberg_glue_catalog"
bucket_name = "iceberg-datalake-demo"
bucket_prefix = "stg"
database_name = "iceberg_stg"
table_name = "consultants"
warehouse_path = f"s3a://{bucket_name}/{bucket_prefix}"
Configure Spark & Iceberg
Iceberg provides integration with different AWS services through the iceberg-aws
module.
conf = (
pyspark.SparkConf()
# App Name
.setAppName("Spark_Iceberg_Glue_Aws_Demo")
# Packages
.set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.iceberg:iceberg-aws-bundle:1.5.2,software.amazon.awssdk:bundle:2.26.14,software.amazon.awssdk:url-connection-client:2.26.14,org.apache.hadoop:hadoop-aws:3.3.4")
# SQL Extension
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
# Glue Catalog
.set(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog")
.set(f"spark.sql.catalog.{catalog_name}.warehouse", f"{warehouse_path}")
.set(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
.set(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
# AWS Credentials
.set("spark.hadoop.fs.s3a.access.key", f"{AWS_ACCESS_KEY}")
.set("spark.hadoop.fs.s3a.secret.key", f"{AWS_SECRET_ACCESS_KEY}")
.set("spark.hadoop.fs.s3a.endpoint.region", "eu-central-1")
.set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
)
# Initialize Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
Create Iceberg Table
query = f"""
CREATE TABLE {catalog_name}.{database_name}.{table_name} (
id BIGINT,
first_name STRING,
last_name STRING,
email STRING,
rate DECIMAL(10,2),
status STRING,
created_at TIMESTAMP
) USING iceberg
TBLPROPERTIES (
'write.metadata.delete-after-commit.enabled'='true',
'write.metadata.previous-versions-max'='5',
'write.update.mode'='merge-on-read',
'write.delete.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)
"""
spark.sql(query)
spark.catalog.listTables(f"{catalog_name}.{database_name}")
Insert Data
The script inserts some sample data into the Iceberg table.
query = f"""
INSERT INTO {catalog_name}.{database_name}.{table_name} VALUES
(1, 'John', 'Doe', 'john.doe@gmail.com', 3000.00, 'perm', current_timestamp()),
(2, 'Tom', 'Hanks', 'tom.hanks@yahoo.com', 3500.75, 'contract', current_timestamp()),
(3, 'Jane', 'Doe', 'jane.doe@moneybank.com', 3500.75, 'perm', current_timestamp()),
(4, 'Duke', 'Johnson', 'duke@hello.com', 4500.25, 'contract', current_timestamp()),
(5, 'Peter', 'Parker', 'peter@gmail.com', 5000, 'contract', current_timestamp()),
(6, 'Rick', 'Nice', 'rick@gmail.com', 4900, 'contract', current_timestamp()),
(7, 'Tommy', 'Hill', 'tommy@gmail.com', 4100, 'perm', current_timestamp()),
(8, 'Jill', 'Stone', 'jill@gmail.com', 4250.50, 'contract', current_timestamp()),
(9, 'Honey', 'Bee', 'honey@gmail.com', 3200, 'perm', current_timestamp()),
(10, 'Bell', 'Doe', 'bell@gmail.com', 34000, 'contract', current_timestamp())
"""
spark.sql(query)
Query Table
Query the Iceberg table using Spark SQL:
Selecting all rows from the table
query = f"""
SELECT *
FROM {catalog_name}.{database_name}.{table_name} ORDER BY id
"""
spark.sql(query).show(truncate=False)
Grouping by a column and calculating an aggregate (e.g., average rate)
query = f"""
SELECT status, avg(rate) as average_rate
FROM {catalog_name}.{database_name}.{table_name}
GROUP BY status ORDER BY status
"""
spark.sql(query).show()
Data Modification (ACID compliance)
spark.sql(f"""
INSERT INTO {catalog_name}.{database_name}.{table_name} VALUES
(11, 'Saurav', 'Mitra', 'saurav.karate@gmail.com', 5000.00, 'perm', current_timestamp())
""")
spark.sql(f""" UPDATE {catalog_name}.{database_name}.{table_name} SET rate=6500.00 WHERE id=11 """)
spark.sql(f"""
INSERT INTO {catalog_name}.{database_name}.{table_name} VALUES
(12, 'Tim', 'Smith', 'tim.smith@freelance.com', 3500.70, 'contract', current_timestamp())
""")
spark.sql(f""" DELETE FROM {catalog_name}.{database_name}.{table_name} WHERE id=12 """)
spark.sql(f"""
INSERT INTO {catalog_name}.{database_name}.{table_name} VALUES
(13, 'Shane', 'Wilson', 'shane.wilson@freelance.com', 5000.00, 'perm', current_timestamp()),
(14, 'John', 'Sinha', 'john.sinha@freelance.com', 9000.00, 'contract', current_timestamp())
""")
Query Table
spark.read.format("iceberg").load(f"{catalog_name}.{database_name}.{table_name}").show(truncate = False)
query = f"""
SELECT status, avg(rate) as average_rate
FROM {catalog_name}.{database_name}.{table_name}
GROUP BY status ORDER BY status
"""
spark.sql(query).show()
Iceberg Time Travel
Iceberg supports time travel in SQL queries using VERSION AS OF
or TIMESTAMP AS OF
clauses.
# Time Travel to snapshot with id 500856966152140191
query = f"""
SELECT *
FROM {catalog_name}.{database_name}.{table_name} VERSION AS OF 500856966152140191 ORDER BY id
"""
spark.sql(query).show(truncate=False)
# Time Travel to timestamp 2024-07-08 21:06:02
query = f"""
SELECT *
FROM {catalog_name}.{database_name}.{table_name} TIMESTAMP AS OF '2024-07-08 21:06:02' ORDER BY id
"""
spark.sql(query).show(truncate=False)
Iceberg Table Data and Metadata Organization
Advanced Iceberg Table Operations
To inspect a table's history, snapshots, and other metadata, Iceberg supports metadata tables. Metadata tables are identified by adding the metadata table name after the original table name.
Iceberg History
Shows the history of changes made to the table
spark.sql(f"SELECT * from {catalog_name}.{database_name}.{table_name}.history limit 10").show(truncate = False)
Iceberg Metadata Log Entries
Shows table metadata log entries
spark.sql(f"SELECT * from {catalog_name}.{database_name}.{table_name}.metadata_log_entries limit 10").show(truncate=False)
Iceberg Snapshots
Shows the valid snapshots taken for the table
spark.sql(f"SELECT * from {catalog_name}.{database_name}.{table_name}.snapshots limit 10").show(truncate=False)
We can also join snapshots to table history. For example, this query will show table history, with the application ID that wrote each snapshot:
spark.sql(f"""
SELECT
h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary["spark.app.id"]
FROM {catalog_name}.{database_name}.{table_name}.history h
JOIN {catalog_name}.{database_name}.{table_name}.snapshots s
ON h.snapshot_id = s.snapshot_id ORDER BY made_current_at
""").show(truncate = False)
Iceberg Files
Shows the table's current files
spark.sql(f"SELECT * from {catalog_name}.{database_name}.{table_name}.files limit 5").show(truncate=False)
Iceberg Manifests
Shows the manifests (i.e., file lists) for the table
spark.sql(f"SELECT * from {catalog_name}.{database_name}.{table_name}.manifests limit 10").show(truncate=False)
Iceberg Partitions
Shows the partitions defined for the table
spark.sql(f"SELECT * from {catalog_name}.{database_name}.{table_name}.partitions limit 10").show()
Iceberg Positional Delete Files
Shows the positional delete files from the current snapshot used to manage deletes
spark.sql(f"SELECT * from {catalog_name}.{database_name}.{table_name}.position_deletes limit 10").show(vertical=True, truncate=False)
Iceberg provides table maintenance APIs to expire snapshots, remove old metadata files, and delete orphan files for query optimisation.
Note: These below operations are for demonstration purpose only, and may have performance implications hence should be used judiciously.
from datetime import datetime, timedelta
import pytz
# Past 2 Minutes
older_than = datetime.now(pytz.utc) - timedelta(minutes=2)
Rewrite Data Files (Compaction)
# rewrite_data_files = f"CALL {catalog_name}.system.rewrite_data_files('{database_name}.{table_name}', 'sort', 'status DESC NULLS LAST,id ASC NULLS FIRST')"
rewrite_data_files = f"""CALL {catalog_name}.system.rewrite_data_files(
table => '{database_name}.{table_name}',
options => map('delete-file-threshold','1')
)"""
spark.sql(rewrite_data_files).show(truncate = False)
Rewrite Position Delete Files
rewrite_position_delete_files = f"CALL {catalog_name}.system.rewrite_position_delete_files('{database_name}.{table_name}')"
spark.sql(rewrite_position_delete_files).show(truncate = False)
Rewrite Manifests
rewrite_manifests = f"CALL {catalog_name}.system.rewrite_manifests('{database_name}.{table_name}')"
spark.sql(rewrite_manifests).show(truncate = False)
Expire Snapshots
expire_snapshots = f"""CALL {catalog_name}.system.expire_snapshots(
table => '{database_name}.{table_name}',
older_than => TIMESTAMP '{older_than}'
)"""
spark.sql(expire_snapshots).show(truncate = False)
Remove Orphan Files
remove_orphan_files = f"CALL {catalog_name}.system.remove_orphan_files('{database_name}.{table_name}')"
spark.sql(remove_orphan_files).show()
spark.sql(f"""
SELECT
h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary["spark.app.id"]
FROM {catalog_name}.{database_name}.{table_name}.history h
JOIN {catalog_name}.{database_name}.{table_name}.snapshots s
ON h.snapshot_id = s.snapshot_id ORDER BY made_current_at
""").show(truncate = False)
Finally, stop the Spark session
spark.stop()
This guide provides a comprehensive setup for managing an Iceberg data lake on Amazon S3 with AWS Glue Catalog. By following these steps, you can effectively create, manage, and optimize your Iceberg tables using Spark, ensuring efficient data handling and querying capabilities.