Change Data Capture (CDC) is a technique that captures all data changes in a data archive, collects them and prepares them for transfer and replication to other systems, either as a batch process or as a stream. This blog post focuses on the application of CDC in data lakehouses using the example of Change Data Feed, a variant of CDC developed by Databricks in the context of delta lake-based data lakehouses.

Introduction to data lakehouse and open table format technologies

A data lakehouse combines the advantages of data lakes and data warehouses by enabling both the scalable storage of raw data and the powerful analysis and management of structured and semi-structured data. A central element here is the Open Table Format (OTF), which stores structured data efficiently and enables users to work in the same way as in a traditional database.

Common OTFs such as Apache Iceberg, Apache Hudi and Delta Lake are open source and compatible with technologies such as Apache Spark and Presto. These formats provide efficient read, write and manage operations that support ACID transactions. They enable efficient data partitioning to improve query performance, access to historical data versions for analysis and recovery, and native support for change data capture to track and process data changes.

Application of CDC in the medallion data architecture

A data lakehouse often organises data using a medallion data architecture, which is divided into three levels: bronze, silver and gold, to enable efficient management and distribution of data changes (CDC).

  • Bronze level: Stores the raw data. Changes and new data are captured by external CDC tools such as Debezium and Kafka and written to the bronze tables.
  • Silver level: Cleans, filters and enriches the raw data to create usable data sets. Changes from the bronze level are integrated into the silver tables via temporary views and merge operations.
  • Gold level: Aggregates and optimises data for business analyses and reports. Changes from the silver level are transferred to the gold tables via temporary views and merge operations.

This structure improves data quality and reliability, while CDC ensures that all data changes are efficiently propagated through the levels to provide up-to-date and analysable data.


Graphic representation of the Medallion Architecture, source https://mattpalmer.io/posts/level-up-medallion-architecture/

Example of the use of CDC in a data lakehouse

An e-commerce company uses both a traditional database to manage its day-to-day business and a data lakehouse for advanced analyses and reports. The company needs to manage large amounts of sales, customer and product data. Let's assume that the following initial sales data exists in a PostgreSQL database:

Changes now occur in the operational database. A new sale is entered into the operational database:

	
		INSERT INTO sales (sale_id, product_id, customer_id, amount, sale_date)
		VALUES (3, 103, 1003, 300, '2023-01-03');
	

Debezium recognises this input and sends an event to Kafka. Kafka sends the event to a consumer that is configured with Apache Spark Structured Streaming. Structured Streaming processes the event and inserts the new data set into the Bronze table. An existing data record is also updated:

	
		UPDATE sales
		SET amount = 250
		WHERE sale_id = 2;
	

This change is also recognised by Debezium and sent to Kafka as an event. The Kafka consumer, which is configured with Apache Spark Structured Streaming, also processes this event and inserts the updated data record into the Bronze table as a duplicate. The data in the Bronze table now looks like this:

In the next phase (silver level), the data is cleansed and transformed. Duplicates are removed and missing values are corrected. In this example, the duplicate row with sale_id 2 is removed and any missing value for customer_id is corrected. If a correction is not possible, the corresponding row is removed. After cleansing and transformation, the data in the silver table looks as follows:

Finally, the data is aggregated at the Gold level. The total turnover per customer is calculated here:

Example of CDC: The Change Data Feed (CDF) in Delta Lake

Delta Lake is an OTF that guarantees ACID transactions. Each data change operation, be it insert, update or delete, is treated as a complete and independent transaction. A central feature of Delta Lake is the transaction log in which all changes are recorded. This log enables the recovery of previous versions of the data (time travelling function) and ensures that all changes are correctly recorded and applied. In addition, Delta Lake can automatically recognise and apply changes to the data schema, which greatly simplifies the management of the data structure.

Delta Tables uses Apache Parquet as its storage format, which has been optimised using techniques such as data indexing and advanced compression to ensure efficient storage and performant read and write operations for structured and semi-structured data.

For CDC, Delta Lake offers a special function, the Change Data Feed (CDF), which considerably simplifies the implementation of CDC in data engineering pipelines. In the past, many developers had to manage CDC manually in the context of a medallion architecture, which involved considerable effort and cost. Databricks recognised this problem and integrated CDF into Delta Lake to eliminate the inefficiencies of manual CDC implementation. This integration improves performance, simplifies administration and increases the scalability of CDC operations in large data lakes.


How Delta Lake CDF works, source: https://www.databricks.com/blog/2021/06/09/how-to-simplify-cdc-with-delta-lakes-change-data-feed.html

The following code snippets show how Delta Lake's Change Data Feed (CDF) is implemented in PySpark to efficiently manage the data transfer between Bronze, Silver and Gold tables. The full implementation, including all code, is available in the GitHub repository https://github.com/cdelmonte-zg/delta-table-example.

The first code sample shows how to simulate data updates and new data entries in the silver table, taking into account only the last change for each sale_id. First, updates and new data are added to the bronze table. Then the changes are retrieved from the bronze table using the CDF. A window function is used to select only the latest record for each sale_id. Finally, these changes are integrated into the silver table with a merge. If a matching sale_id is found, the row is updated, otherwise a new row is inserted.

	
		def simulate_data_flow_to_silver(
		   spark_session, bronze_path, silver_path, starting_from_timestamp
		):
		   # Updates, die der Bronze-Tabelle hinzugefügt werden sollen
		   updates = [
		       # Aktualisierter Betrag
		       (
		           1,
		           101,
		           1001,
		           250,
		           datetime.strptime("2020-08-21 10:00:00", "%Y-%m-%d %H:%M:%S"),
		       ),
		       # Weitere Aktualisierung
		       (
		           1,
		           101,
		           1001,
		           260,
		           datetime.strptime("2020-08-21 11:00:00", "%Y-%m-%d %H:%M:%S"),
		       ),
		       # Neuer Verkauf
		       (
		           3,
		           103,
		           1001,
		           300,
		           datetime.strptime("2020-08-22 10:00:00", "%Y-%m-%d %H:%M:%S"),
		       ),
		   ]
		   df_updates = spark_session.createDataFrame(updates, schema)
		   df_updates.write.format("delta").mode("append").save(bronze_path)
		   # Änderungen aus der Bronze-Tabelle mit CDF lesen
		   df_changes = (
		       spark_session.read.format("delta")
		       .option("readChangeData", "true")
		       .option("startingTimestamp", starting_from_timestamp)
		       .table("sales_bronze")
		   )
		   # Nur den neuesten Datensatz für jede sale_id mit einer Fensterfunktion auswählen
		   window_spec = Window.partitionBy("sale_id").orderBy(col("sale_date").desc())
		   df_latest_changes = (
		       df_changes.withColumn("rn", row_number().over(window_spec))
		       .filter("rn = 1")
		       .drop("rn")
		   )
		   # Änderungen mit Merge auf die Silber-Tabelle anwenden
		   silver_table = DeltaTable.forPath(spark_session, silver_path)
		   (
		       silver_table.alias("silver")
		       .merge(df_latest_changes.alias("updates"), "silver.sale_id = updates.sale_id")
		       .whenMatchedUpdateAll()
		       .whenNotMatchedInsertAll()
		       .execute()
		   )
	

The second code example shows how data updates from the silver table to the gold table are simulated with CDF. Changes are read from the silver table and a window function is used to select only the most recent records for each sale_id. These latest changes are stored in a temporary view and then aggregated to calculate the total amount for each customer. The aggregated data is then merged into the gold table.

	
		def simulate_data_flow_to_gold(
		   spark_session, silver_path, gold_path, starting_from_timestamp
		):
		   # Änderungen aus der Silber-Tabelle mit CDF lesen
		   df_silver_changes = (
		       spark_session.read.format("delta")
		       .option("readChangeData", "true")
		       .option("startingTimestamp", starting_from_timestamp)
		       .table("sales_silver")
		   )
		   # Nur den neuesten Datensatz für jede sale_id mit einer Fensterfunktion auswählen
		   window_spec = Window.partitionBy("sale_id").orderBy(col("sale_date").desc())
		   df_silver_latest = (
		       df_silver_changes.withColumn("rn", row_number().over(window_spec))
		       .filter("rn = 1")
		       .drop("rn")
		   )
		   # Temporäre Ansicht mit den neuesten Änderungen erstellen
		   df_silver_latest.createOrReplaceTempView("temp_latest_changes")
		   # Neueste Änderungen aggregieren
		   df_gold_aggregate = spark_session.sql(
		       """
		       SELECT customer_id, SUM(amount) AS total_amount
		       FROM temp_latest_changes
		       GROUP BY customer_id
		   """
		   )
		   # Aggregierte Daten in die Gold-Tabelle mergen
		   gold_table = DeltaTable.forPath(spark_session, gold_path)
		   (
		       gold_table.alias("gold")
		       .merge(
		           df_gold_aggregate.alias("updates"), "gold.customer_id = updates.customer_id"
		       )
		       .whenMatchedUpdateAll()
		       .whenNotMatchedInsertAll()
		       .execute()
		   )
	

Conclusion

Change Data Capture is critical for the synchronisation of data stores in data lakes and enables the management of data changes at the three levels Bronze, Silver and Gold. The Change Data Feed in Delta Lake significantly simplifies the implementation of CDC and improves the performance and scalability of CDC operations in large data lakes.

You can find more exciting topics from the world of adesso in our previous blog posts.

Also interesting:

Picture Christian Del Monte

Author Christian Del Monte

Christian Del Monte is a software architect and engineer with many years of experience. In various projects in the B2B and B2C sectors, he has worked with a variety of software architectures implemented with different IT technologies and frameworks. He is particularly interested in data lakes as well as highly available, real-time software systems and their implementation with means such as cloud, microservices and event-driven architectures.

Save this page. Remove this page.