Building Bronze Layer of Medallion Architecture in Fabric Lakehouse using WAL2JSON by info.odysseyx@gmail.com September 10, 2024 written by info.odysseyx@gmail.com September 10, 2024 0 comment 7 views 7 introduction If you work in data engineering, you may have come across the term “medallion architecture.” This design pattern organizes data within a lakehouse into separate layers to facilitate efficient processing and analytics. Learn more here. This is also the recommended design approach for Microsoft Fabric. To make Lakehouse usable, data must pass through several tiers: Bronze, Silver, and Gold. Each tier focuses on progressively improving data cleansing and quality. In this article, we will look specifically at how to build the Bronze tier using real-time data streaming from an existing PostgreSQL database. This approach enables real-time analytics and provides real-time, raw, and unprocessed data to support AI applications. Image source - https://www.databricks.com/glossary/medallion-architecture What is a bronze layer? This layer is often called the raw area, where data is stored in its original form and structure. By common definition, data in this layer is usually append-only and immutable, but this is misleading. The intention is to preserve the original data collected, but this does not mean that there are no deletions or updates. Instead, when deletions or updates occur, the original values are preserved as previous versions. This approach ensures that the previous data is accessible and unchanged. Delta Lake is commonly used to manage this data, as it supports versioning and maintains a full history of changes. PostgreSQL as a source for the Bronze Layer Let’s say you have multiple PostgreSQL databases running different applications and you want to integrate that data into Delta Lake. There are a few options to achieve this. The first is to create a copy activity that extracts data from individual tables and stores it in a Delta table. However, this approach can be inefficient because it requires a watermark column to track changes or reloading the entire data each time. The second approach is to set up Change Data Capture in PostgreSQL to continuously capture and stream data changes. This approach allows for real-time data synchronization and efficient updates to OneLake. In this blog, we will look at a proof of concept for implementing this CDC-based approach. How to create a continuously replicated bronze layer leveraging PostgreSQL logical decoding, Wal2json, and Fabric Delta Lake? Capture and apply changes to delta lake using PostgreSQL logical replication, Wal2Json plugin, and PySpark. In PostgreSQL, logical replication is a method used to replicate data changes from one PostgreSQL instance to another instance or another system. Wal2json is a PostgreSQL output plugin for logical replication that converts Write-Ahead Log (WAL) changes into JSON format. Setting up Azure PostgreSQL Log in to the Azure Portal and navigate to “Server Parameters” for your PostgreSQL service and change the following server parameters: Parameter name value Monthly level Logical Maximum number of duplicate slots >0 (eg 4 or 8) max_senders >0 (eg 4 or 8) Create a publication for any table. Publications are a feature of logical replication that allows you to define changes to a table that should be streamed to subscribers. CREATE PUBLICATION cdc_publication FOR ALL TABLES; Create a replication slot using wal2json as the plugin name. A slot represents a stream of changes that can be replayed to the client in the order they were performed on the source server. Each slot streams a sequence of changes from a single database. Note – The Wal2json plugin comes pre-installed on Azure PostgreSQL. SELECT * FROM pg_create_logical_replication_slot('cdc_slot', 'wal2json'); You can update some test data and test that replication is running by running the following command: SELECT * FROM pg_logical_slot_get_changes('cdc_slot', NULL, NULL,'include-xids', 'true', 'include-timestamp', 'true') Now that we’ve tested replication, let’s take a look at the output format. Here are the main components of wal2jobs output, followed by an example. Originate value xid Transaction ID Timestamp The timestamp when the transaction was committed. kind Operation type (insert, update, delete). outline Schema of the table. table The name of the table where the change occurred. Column name An array of column names affected by the change. Type of heat An array of column data types corresponding to the column names. Heat value An array of new values for the column (provided for insert and update operations). Old Kiss An object containing the primary or unique key values before the change (exists for update and delete operations). For INSERT statement { "xid": 8362757, "timestamp": "2024-08-01 15:09:34.086064+05:30", "change": [ { "kind": "insert", "schema": "public", "table": "employees_synapse_test", "columnnames": [ "EMPLOYEE_ID", "FIRST_NAME", "LAST_NAME", "EMAIL", "PHONE_NUMBER", "HIRE_DATE", "JOB_ID", "SALARY", "COMMISSION_PCT", "MANAGER_ID", "DEPARTMENT_ID" ], "columntypes": [ "numeric(10,0)", "text", "text", "text", "text", "timestamp without time zone", "text", "numeric(8,2)", "numeric(2,2)", "numeric(6,0)", "numeric(4,0)" ], "columnvalues": [ 327, "3275FIRST NAME111", "3275LAST NAME", "3275EMAIL3275EMAIL", "3275", "2024-07-31 00:00:00", "IT_PROG", 32750, 0, 100, 60 ] } ] } For UPDATE statement { "xid": 8362759, "timestamp": "2024-08-01 15:09:37.228446+05:30", "change": [ { "kind": "update", "schema": "public", "table": "employees_synapse_test", "columnnames": [ "EMPLOYEE_ID", "FIRST_NAME", "LAST_NAME", "EMAIL", "PHONE_NUMBER", "HIRE_DATE", "JOB_ID", "SALARY", "COMMISSION_PCT", "MANAGER_ID", "DEPARTMENT_ID" ], "columntypes": [ "numeric(10,0)", "text", "text", "text", "text", "timestamp without time zone", "text", "numeric(8,2)", "numeric(2,2)", "numeric(6,0)", "numeric(4,0)" ], "columnvalues": [ 100, "Third1111", "BLOB", "SKING", "515.123.4567", "2024-08-01 00:00:00", "AD_PRES", 24000, null, null, 90 ], "oldkeys": { "keynames": [ "EMPLOYEE_ID" ], "keytypes": [ "numeric(10,0)" ], "keyvalues": [ 100 ] } } ] } For DELETE statement { "xid": 8362756, "timestamp": "2024-08-01 15:09:29.552539+05:30", "change": [ { "kind": "delete", "schema": "public", "table": "employees_synapse_test", "oldkeys": { "keynames": [ "EMPLOYEE_ID" ], "keytypes": [ "numeric(10,0)" ], "keyvalues": [ 327 ] } } ] } Create a OneLake in Fabric. Detailed instructions can be found here. Create a Delta table with an initial load of data using Spark. # PostgreSQL connection details jdbc_url = "jdbc:postgresql://your_postgres_db.postgres.database.azure.com:5432/postgres" jdbc_properties = { "user": "postgres", "driver": "org.postgresql.Driver" } # Read data from PostgreSQL employees table employee_df = spark.read.jdbc(url=jdbc_url, table="employees", properties=jdbc_properties) # Define the path for the Delta table in ADLS delta_table_path = "abfss://your_container@your_storage_account.dfs.core.windows.net/delta/employees" # Write DataFrame to Delta table employee_df.write.format("delta").mode("overwrite").save(delta_table_path) delta_df = spark.read.format("delta").load(delta_table_path) delta_df.show() Now, by continuously running the following code, the data in Delta Lake will stay in sync with the underlying PostgreSQL database. import json from pyspark.sql import SparkSession from pyspark.sql.functions import lit from delta.tables import DeltaTable import pandas as pd # PostgreSQL connection details jdbc_url = "jdbc:postgresql://your_postgres_db.postgres.database.azure.com:5432/postgres" jdbc_properties = { "user": "postgres", "driver": "org.postgresql.Driver" } #Delta table details delta_table_path = "abfss://your_container@your_storage_account.dfs.core.windows.net/delta/employees" delta_table = DeltaTable.forPath(spark, delta_table_path) delta_df = spark.read.format("delta").load(delta_table_path) schema = delta_df.schema loop cdc_df = spark.read.jdbc(url=jdbc_url, table="(SELECT data FROM pg_logical_slot_get_changes('cdc_slot', NULL, NULL, 'include-xids', 'true', 'include-timestamp', 'true')) as cdc", properties=jdbc_properties) cdc_array = cdc_df.collect() for i in cdc_array: print(i) changedData = json.loads(i['data'])['change'][0] print(changedData) schema = changedData['schema'] table = changedData['table'] DMLtype = changedData['kind'] if DMLtype == "insert" or DMLtype == "update": column_names = changedData['columnnames'] column_values = changedData['columnvalues'] source_data = {col: [val] for col, val in zip(column_names, column_values)} print(source_data) change_df = spark.createDataFrame(pd.DataFrame(source_data)) if DMLtype == "insert": change_df.write.format("delta").mode("append").save(delta_table_path) if DMLtype == "update": old_keys = changedData['oldkeys'] condition = " AND ".join( [f"target.{key} = source.{key}" for key in old_keys['keynames']] ) print(condition) delta_table.alias("target").merge( change_df.alias("source"), condition ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() if DMLtype == "delete": condition = " AND ".join([ f"{key} = '{value}'" for key, value in zip(changedData["oldkeys"]["keynames"], changedData["oldkeys"]["keyvalues"]) ]) delta_table.delete(condition) end loop conclusion In conclusion, building the Bronze tier of the Medallion Architecture using PostgreSQL’s wal2json as a source for Fabric OneLake provides a robust and scalable approach to handling raw data ingestion. This setup leverages PostgreSQL’s logical replication capabilities to capture and stream changes in real time, ensuring that the data lake is up to date with the latest transactional data. Implementing this architecture ensures that the base layer is well-structured and supports real-time analytics, advanced data processing, and AI applications, while also providing a robust layer for the next layer. By adopting this strategy, organizations can achieve greater data consistency, reduce data processing latency, and improve the overall efficiency of their data pipeline. Referenceshttps://learn.microsoft.com/en-us/fabric/onelake/onelake-medallion-lakehouse-architecturehttps://learn.microsoft.com/en-us/azure/databricks/lakehouse/medallionhttps://blog.fabric.microsoft.com/en-us/blog/eventhouse-onelake-availability-is-now-generally-availa...https://learn.microsoft.com/en-us/fabric/data-engineering/lakehouse-and-delta-tables Feedback and Suggestions If you are If you have any feedback or suggestions to improve this data migration asset, please email us. Database Platform Engineering Team. Source link Share 0 FacebookTwitterPinterestEmail info.odysseyx@gmail.com previous post Partner Case Study Series | Siemens next post Native JSON support now in preview in Azure SQL Managed Instance You may also like 7 Disturbing Tech Trends of 2024 December 19, 2024 AI on phones fails to impress Apple, Samsung users: Survey December 18, 2024 Standout technology products of 2024 December 16, 2024 Is Intel Equivalent to Tech Industry 2024 NY Giant? December 12, 2024 Google’s Willow chip marks breakthrough in quantum computing December 11, 2024 Job seekers are targeted in mobile phishing campaigns December 10, 2024 Leave a Comment Cancel Reply Save my name, email, and website in this browser for the next time I comment.