Logo AppDev24 Login / Sign Up
Sign Up
Have Login?
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.
Login
New Account?
Recovery
Go to Login
By continuing you indicate that you agree to Terms of Service and Privacy Policy of the site.
Cloud

Iceberg Data Lake on Amazon S3 with AWS Glue Catalog

Updated on Jul 09, 2024

Apache Iceberg is a high-performance open table format for analytic datasets. Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for engines like Spark, Trino, Flink, Presto, Hive and Impala to safely work with the same tables, at the same time. Iceberg provides ACID compliance, Schema evolution, Time travel for data lakes.

Spark is currently the most feature-rich compute engine for Iceberg operations. Iceberg has several catalog back-ends that can be used to track tables like JDBC, Hive MetaStore and Glue.

In this article, we will explore how to set up an Iceberg data lake on Amazon S3, leveraging the AWS Glue Catalog for metadata management. We will demonstrate this using a local Spark engine, as described in the Setup Apache Spark with Jupyter Notebook on MacOS.

Prerequisites

  • Spark Version: 3.5.1
  • Scala Version: 2.12

Iceberg allows the use of AWS Glue as the Catalog implementation, where an Iceberg namespace is stored as a Glue Database and an Iceberg table is stored as a Glue Table.

AWS S3 Bucket Structure

Our demo S3 bucket named iceberg-datalake-demo contains the following directories:

  • raw
  • stg
  • transform
  • warehouse

AWS Glue Data Catalog

  • Database Name: iceberg_stg
  • Location URI: s3://iceberg-datalake-demo/stg/

IAM Policy

First, we create an IAM policy to grant necessary permissions:

iceberg-datalake-demo-policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "glue:SearchTables",
                "glue:BatchCreatePartition",
                "glue:CreatePartitionIndex",
                "glue:DeleteDatabase",
                "glue:GetTableVersions",
                "glue:GetPartitions",
                "glue:DeleteTableVersion",
                "glue:UpdateTable",
                "glue:DeleteTable",
                "glue:DeletePartitionIndex",
                "glue:GetTableVersion",
                "glue:UpdateColumnStatisticsForTable",
                "glue:CreatePartition",
                "glue:UpdateDatabase",
                "glue:CreateTable",
                "glue:GetTables",
                "glue:GetDatabases",
                "glue:GetTable",
                "glue:GetDatabase",
                "glue:GetPartition",
                "glue:UpdateColumnStatisticsForPartition",
                "glue:CreateDatabase",
                "glue:BatchDeleteTableVersion",
                "glue:BatchDeleteTable",
                "glue:DeletePartition",
                "glue:GetUserDefinedFunctions"
            ],
            "Resource": [
                "arn:aws:glue:eu-central-1:123456789012:catalog",
                "arn:aws:glue:eu-central-1:123456789012:table/iceberg_stg/*",
                "arn:aws:glue:eu-central-1:123456789012:database/iceberg_stg"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "glue:SearchTables",
                "glue:GetTableVersions",
                "glue:GetPartitions",
                "glue:GetTableVersion",
                "glue:GetTables",
                "glue:GetDatabases",
                "glue:GetTable",
                "glue:GetDatabase",
                "glue:GetPartition"
            ],
            "Resource": [
                "arn:aws:glue:eu-central-1:123456789012:table/datalake_raw/*",
                "arn:aws:glue:eu-central-1:123456789012:database/datalake_raw",
                "arn:aws:glue:eu-central-1:123456789012:database/default",
                "arn:aws:glue:eu-central-1:123456789012:database/global_temp"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "s3:GetBucketLocation",
                "s3:ListBucket",
                "s3:PutObject",
                "s3:PutObjectAcl",
                "s3:GetObject",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::iceberg-datalake-demo"
            ],
            "Effect": "Allow"
        }
    ]
}

Create an IAM user iceberg-datalake-demo-user and attach the above policy. Generate the access keys.

Setting Up the Environment

Open terminal to set up environment variables and create a Jupyter Notebook

export AWS_ACCESS_KEY="XXXX"
export AWS_SECRET_ACCESS_KEY="XXXX

jupyter lab

Jupyter Notebook Configuration

Import Python Packages

import pyspark
from pyspark.sql import SparkSession
import os
import warnings
from IPython.core.display import HTML

warnings.filterwarnings("ignore")
display(HTML("<style>pre { white-space: pre !important; }</style>"))

AWS Credentials

AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY")
AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY")

Variables

catalog_name = "iceberg_glue_catalog"
bucket_name = "iceberg-datalake-demo"
bucket_prefix = "stg"
database_name = "iceberg_stg"
table_name = "consultants"
warehouse_path = f"s3a://{bucket_name}/{bucket_prefix}"

Configure Spark & Iceberg

Iceberg provides integration with different AWS services through the iceberg-aws module.

conf = (
    pyspark.SparkConf()
        # App Name
        .setAppName("Spark_Iceberg_Glue_Aws_Demo")
        # Packages
        .set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.iceberg:iceberg-aws-bundle:1.5.2,software.amazon.awssdk:bundle:2.26.14,software.amazon.awssdk:url-connection-client:2.26.14,org.apache.hadoop:hadoop-aws:3.3.4")
        # SQL Extension
        .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        # Glue Catalog
        .set(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog")
        .set(f"spark.sql.catalog.{catalog_name}.warehouse", f"{warehouse_path}")
        .set(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
        .set(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
        # AWS Credentials
        .set("spark.hadoop.fs.s3a.access.key", f"{AWS_ACCESS_KEY}")
        .set("spark.hadoop.fs.s3a.secret.key", f"{AWS_SECRET_ACCESS_KEY}")
        .set("spark.hadoop.fs.s3a.endpoint.region", "eu-central-1")
        .set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
)

# Initialize Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

Create Iceberg Table

query = f"""
    CREATE TABLE {catalog_name}.{database_name}.{table_name} (
        id BIGINT,
        first_name STRING,
        last_name STRING,
        email STRING,
        rate DECIMAL(10,2),
        status STRING,
        created_at TIMESTAMP
    ) USING iceberg
    TBLPROPERTIES (
        'write.metadata.delete-after-commit.enabled'='true',
        'write.metadata.previous-versions-max'='5',
        'write.update.mode'='merge-on-read',
        'write.delete.mode'='merge-on-read',
        'write.merge.mode'='merge-on-read'
    )
"""
spark.sql(query)
spark.catalog.listTables(f"{catalog_name}.{database_name}")

Insert Data

The script inserts some sample data into the Iceberg table.

query = f"""
    INSERT INTO {catalog_name}.{database_name}.{table_name} VALUES 
        (1, 'John', 'Doe', 'john.doe@gmail.com', 3000.00, 'perm', current_timestamp()),
        (2, 'Tom', 'Hanks', 'tom.hanks@yahoo.com', 3500.75, 'contract', current_timestamp()),
        (3, 'Jane', 'Doe', 'jane.doe@moneybank.com', 3500.75, 'perm', current_timestamp()),
        (4, 'Duke', 'Johnson', 'duke@hello.com', 4500.25, 'contract', current_timestamp()),
        (5, 'Peter', 'Parker', 'peter@gmail.com', 5000, 'contract', current_timestamp()),
        (6, 'Rick', 'Nice', 'rick@gmail.com', 4900, 'contract', current_timestamp()),
        (7, 'Tommy', 'Hill', 'tommy@gmail.com', 4100, 'perm', current_timestamp()),
        (8, 'Jill', 'Stone', 'jill@gmail.com', 4250.50, 'contract', current_timestamp()),
        (9, 'Honey', 'Bee', 'honey@gmail.com', 3200, 'perm', current_timestamp()),
        (10, 'Bell', 'Doe', 'bell@gmail.com', 34000, 'contract', current_timestamp())
"""
spark.sql(query)

Query Table

Query the Iceberg table using Spark SQL:

Selecting all rows from the table

query = f"""
    SELECT * 
    FROM {catalog_name}.{database_name}.{table_name} ORDER BY id
"""
spark.sql(query).show(truncate=False)

Grouping by a column and calculating an aggregate (e.g., average rate)

query = f"""
    SELECT status, avg(rate) as average_rate 
    FROM {catalog_name}.{database_name}.{table_name}
    GROUP BY status ORDER BY status
"""
spark.sql(query).show()

Data Modification (ACID compliance)

spark.sql(f"""
    INSERT INTO {catalog_name}.{database_name}.{table_name} VALUES 
        (11, 'Saurav', 'Mitra', 'saurav.karate@gmail.com', 5000.00, 'perm', current_timestamp())
""")
spark.sql(f""" UPDATE {catalog_name}.{database_name}.{table_name} SET rate=6500.00 WHERE id=11 """)

spark.sql(f"""
    INSERT INTO {catalog_name}.{database_name}.{table_name} VALUES 
        (12, 'Tim', 'Smith', 'tim.smith@freelance.com', 3500.70, 'contract', current_timestamp())
""")
spark.sql(f""" DELETE FROM {catalog_name}.{database_name}.{table_name} WHERE id=12 """)

spark.sql(f"""
    INSERT INTO {catalog_name}.{database_name}.{table_name} VALUES 
        (13, 'Shane', 'Wilson', 'shane.wilson@freelance.com', 5000.00, 'perm', current_timestamp()),
        (14, 'John', 'Sinha', 'john.sinha@freelance.com', 9000.00, 'contract', current_timestamp())
""")

Query Table

spark.read.format("iceberg").load(f"{catalog_name}.{database_name}.{table_name}").show(truncate = False)
query = f"""
    SELECT status, avg(rate) as average_rate 
    FROM {catalog_name}.{database_name}.{table_name}
    GROUP BY status ORDER BY status
"""
spark.sql(query).show()

Iceberg Time Travel

Iceberg supports time travel in SQL queries using VERSION AS OF or TIMESTAMP AS OF clauses.

# Time Travel to snapshot with id 500856966152140191
query = f"""
    SELECT * 
    FROM {catalog_name}.{database_name}.{table_name} VERSION AS OF 500856966152140191 ORDER BY id
"""
spark.sql(query).show(truncate=False)
# Time Travel to timestamp 2024-07-08 21:06:02
query = f"""
    SELECT * 
    FROM {catalog_name}.{database_name}.{table_name} TIMESTAMP AS OF '2024-07-08 21:06:02' ORDER BY id
"""
spark.sql(query).show(truncate=False)

Iceberg Table Data and Metadata Organization

Advanced Iceberg Table Operations

To inspect a table's history, snapshots, and other metadata, Iceberg supports metadata tables. Metadata tables are identified by adding the metadata table name after the original table name.

Iceberg History

Shows the history of changes made to the table

spark.sql(f"SELECT * from {catalog_name}.{database_name}.{table_name}.history limit 10").show(truncate = False)

Iceberg Metadata Log Entries

Shows table metadata log entries

spark.sql(f"SELECT * from {catalog_name}.{database_name}.{table_name}.metadata_log_entries limit 10").show(truncate=False)

Iceberg Snapshots

Shows the valid snapshots taken for the table

spark.sql(f"SELECT * from {catalog_name}.{database_name}.{table_name}.snapshots limit 10").show(truncate=False)

We can also join snapshots to table history. For example, this query will show table history, with the application ID that wrote each snapshot:

spark.sql(f"""
    SELECT 
        h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary["spark.app.id"]
    FROM {catalog_name}.{database_name}.{table_name}.history h 
    JOIN {catalog_name}.{database_name}.{table_name}.snapshots s  
    ON h.snapshot_id = s.snapshot_id ORDER BY made_current_at
""").show(truncate = False)

Iceberg Files

Shows the table's current files

spark.sql(f"SELECT * from {catalog_name}.{database_name}.{table_name}.files limit 5").show(truncate=False)

Iceberg Manifests

Shows the manifests (i.e., file lists) for the table

spark.sql(f"SELECT * from {catalog_name}.{database_name}.{table_name}.manifests limit 10").show(truncate=False)

Iceberg Partitions

Shows the partitions defined for the table

spark.sql(f"SELECT * from {catalog_name}.{database_name}.{table_name}.partitions limit 10").show()

Iceberg Positional Delete Files

Shows the positional delete files from the current snapshot used to manage deletes

spark.sql(f"SELECT * from {catalog_name}.{database_name}.{table_name}.position_deletes limit 10").show(vertical=True, truncate=False)

Iceberg provides table maintenance APIs to expire snapshots, remove old metadata files, and delete orphan files for query optimisation.

Note: These below operations are for demonstration purpose only, and may have performance implications hence should be used judiciously.

from datetime import datetime, timedelta
import pytz

# Past 2 Minutes
older_than = datetime.now(pytz.utc) - timedelta(minutes=2)

Rewrite Data Files (Compaction)

# rewrite_data_files = f"CALL {catalog_name}.system.rewrite_data_files('{database_name}.{table_name}', 'sort', 'status DESC NULLS LAST,id ASC NULLS FIRST')"
rewrite_data_files = f"""CALL {catalog_name}.system.rewrite_data_files(
    table => '{database_name}.{table_name}',
    options => map('delete-file-threshold','1')
)"""
spark.sql(rewrite_data_files).show(truncate = False)

Rewrite Position Delete Files

rewrite_position_delete_files = f"CALL {catalog_name}.system.rewrite_position_delete_files('{database_name}.{table_name}')"
spark.sql(rewrite_position_delete_files).show(truncate = False)

Rewrite Manifests

rewrite_manifests = f"CALL {catalog_name}.system.rewrite_manifests('{database_name}.{table_name}')"
spark.sql(rewrite_manifests).show(truncate = False)

Expire Snapshots

expire_snapshots = f"""CALL {catalog_name}.system.expire_snapshots(
    table => '{database_name}.{table_name}',
    older_than => TIMESTAMP '{older_than}'
)"""
spark.sql(expire_snapshots).show(truncate = False)

Remove Orphan Files

remove_orphan_files = f"CALL {catalog_name}.system.remove_orphan_files('{database_name}.{table_name}')"
spark.sql(remove_orphan_files).show()
spark.sql(f"""
    SELECT 
        h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary["spark.app.id"]
    FROM {catalog_name}.{database_name}.{table_name}.history h 
    JOIN {catalog_name}.{database_name}.{table_name}.snapshots s  
    ON h.snapshot_id = s.snapshot_id ORDER BY made_current_at
""").show(truncate = False)

Finally, stop the Spark session

spark.stop()

This guide provides a comprehensive setup for managing an Iceberg data lake on Amazon S3 with AWS Glue Catalog. By following these steps, you can effectively create, manage, and optimize your Iceberg tables using Spark, ensuring efficient data handling and querying capabilities.

PrimeChess

PrimeChess.org

PrimeChess.org makes elite chess training accessible and affordable for everyone. For the past 6 years, we have offered free chess camps for kids in Singapore and India, and during that time, we also observed many average-rated coaches charging far too much for their services.

To change that, we assembled a team of top-rated coaches including International Masters (IM) or coaches with multiple IM or GM norms, to provide online classes starting from $50 per month (8 classes each month + 4 tournaments)

This affordability is only possible if we get more students. This is why it will be very helpful if you could please pass-on this message to others.

Exclucively For Indian Residents: 
Basic - ₹1500
Intermediate- ₹2000
Advanced - ₹2500

Top 10 Articles