Home NewsX Building Bronze Layer of Medallion Architecture in Fabric Lakehouse using WAL2JSON

Building Bronze Layer of Medallion Architecture in Fabric Lakehouse using WAL2JSON

by info.odysseyx@gmail.com
0 comment 7 views


introduction

If you work in data engineering, you may have come across the term “medallion architecture.” This design pattern organizes data within a lakehouse into separate layers to facilitate efficient processing and analytics. Learn more here. This is also the recommended design approach for Microsoft Fabric. To make Lakehouse usable, data must pass through several tiers: Bronze, Silver, and Gold. Each tier focuses on progressively improving data cleansing and quality. In this article, we will look specifically at how to build the Bronze tier using real-time data streaming from an existing PostgreSQL database. This approach enables real-time analytics and provides real-time, raw, and unprocessed data to support AI applications.

Kapilsamant_0-1722669988199.png

Image source - https://www.databricks.com/glossary/medallion-architecture

What is a bronze layer?

This layer is often called the raw area, where data is stored in its original form and structure. By common definition, data in this layer is usually append-only and immutable, but this is misleading. The intention is to preserve the original data collected, but this does not mean that there are no deletions or updates. Instead, when deletions or updates occur, the original values ​​are preserved as previous versions. This approach ensures that the previous data is accessible and unchanged. Delta Lake is commonly used to manage this data, as it supports versioning and maintains a full history of changes.

PostgreSQL as a source for the Bronze Layer

Let’s say you have multiple PostgreSQL databases running different applications and you want to integrate that data into Delta Lake. There are a few options to achieve this. The first is to create a copy activity that extracts data from individual tables and stores it in a Delta table. However, this approach can be inefficient because it requires a watermark column to track changes or reloading the entire data each time.

The second approach is to set up Change Data Capture in PostgreSQL to continuously capture and stream data changes. This approach allows for real-time data synchronization and efficient updates to OneLake. In this blog, we will look at a proof of concept for implementing this CDC-based approach.

Kapilsamant_1-1722670076159.png

How to create a continuously replicated bronze layer leveraging PostgreSQL logical decoding, Wal2json, and Fabric Delta Lake?

Capture and apply changes to delta lake using PostgreSQL logical replication, Wal2Json plugin, and PySpark. In PostgreSQL, logical replication is a method used to replicate data changes from one PostgreSQL instance to another instance or another system. Wal2json is a PostgreSQL output plugin for logical replication that converts Write-Ahead Log (WAL) changes into JSON format.

Setting up Azure PostgreSQL

Log in to the Azure Portal and navigate to “Server Parameters” for your PostgreSQL service and change the following server parameters:

Parameter name value
Monthly level Logical
Maximum number of duplicate slots >0 (eg 4 or 8)
max_senders >0 (eg 4 or 8)
  1. Create a publication for any table. Publications are a feature of logical replication that allows you to define changes to a table that should be streamed to subscribers.
    CREATE PUBLICATION cdc_publication FOR ALL TABLES;​
  2. Create a replication slot using wal2json as the plugin name. A slot represents a stream of changes that can be replayed to the client in the order they were performed on the source server. Each slot streams a sequence of changes from a single database. Note – The Wal2json plugin comes pre-installed on Azure PostgreSQL.

    SELECT * FROM pg_create_logical_replication_slot('cdc_slot', 'wal2json');​
  3. You can update some test data and test that replication is running by running the following command:
    SELECT * FROM pg_logical_slot_get_changes('cdc_slot', NULL, NULL,'include-xids', 'true', 'include-timestamp', 'true')​
  4. Now that we’ve tested replication, let’s take a look at the output format. Here are the main components of wal2jobs output, followed by an example.
    Originate value
    xid Transaction ID
    Timestamp The timestamp when the transaction was committed.
    kind

    Operation type (insert, update, delete).

    outline Schema of the table.
    table The name of the table where the change occurred.
    Column name An array of column names affected by the change.
    Type of heat An array of column data types corresponding to the column names.
    Heat value An array of new values ​​for the column (provided for insert and update operations).
    Old Kiss An object containing the primary or unique key values ​​before the change (exists for update and delete operations).
    
    For INSERT statement
    
    {
      "xid": 8362757,
      "timestamp": "2024-08-01 15:09:34.086064+05:30",
      "change": [
        {
          "kind": "insert",
          "schema": "public",
          "table": "employees_synapse_test",
          "columnnames": [
            "EMPLOYEE_ID",
            "FIRST_NAME",
            "LAST_NAME",
            "EMAIL",
            "PHONE_NUMBER",
            "HIRE_DATE",
            "JOB_ID",
            "SALARY",
            "COMMISSION_PCT",
            "MANAGER_ID",
            "DEPARTMENT_ID"
          ],
          "columntypes": [
            "numeric(10,0)",
            "text",
            "text",
            "text",
            "text",
            "timestamp without time zone",
            "text",
            "numeric(8,2)",
            "numeric(2,2)",
            "numeric(6,0)",
            "numeric(4,0)"
          ],
          "columnvalues": [
            327,
            "3275FIRST NAME111",
            "3275LAST NAME",
            "3275EMAIL3275EMAIL",
            "3275",
            "2024-07-31 00:00:00",
            "IT_PROG",
            32750,
            0,
            100,
            60
          ]
        }
      ]
    }​
    
    For UPDATE statement
    
    {
      "xid": 8362759,
      "timestamp": "2024-08-01 15:09:37.228446+05:30",
      "change": [
        {
          "kind": "update",
          "schema": "public",
          "table": "employees_synapse_test",
          "columnnames": [
            "EMPLOYEE_ID",
            "FIRST_NAME",
            "LAST_NAME",
            "EMAIL",
            "PHONE_NUMBER",
            "HIRE_DATE",
            "JOB_ID",
            "SALARY",
            "COMMISSION_PCT",
            "MANAGER_ID",
            "DEPARTMENT_ID"
          ],
          "columntypes": [
            "numeric(10,0)",
            "text",
            "text",
            "text",
            "text",
            "timestamp without time zone",
            "text",
            "numeric(8,2)",
            "numeric(2,2)",
            "numeric(6,0)",
            "numeric(4,0)"
          ],
          "columnvalues": [
            100,
            "Third1111",
            "BLOB",
            "SKING",
            "515.123.4567",
            "2024-08-01 00:00:00",
            "AD_PRES",
            24000,
            null,
            null,
            90
          ],
          "oldkeys": {
            "keynames": [
              "EMPLOYEE_ID"
            ],
            "keytypes": [
              "numeric(10,0)"
            ],
            "keyvalues": [
              100
            ]
          }
        }
      ]
    }
    
    For DELETE statement
    
    {
      "xid": 8362756,
      "timestamp": "2024-08-01 15:09:29.552539+05:30",
      "change": [
        {
          "kind": "delete",
          "schema": "public",
          "table": "employees_synapse_test",
          "oldkeys": {
            "keynames": [
              "EMPLOYEE_ID"
            ],
            "keytypes": [
              "numeric(10,0)"
            ],
            "keyvalues": [
              327
            ]
          }
        }
      ]
    }
    
  5. Create a OneLake in Fabric. Detailed instructions can be found here.
  6. Create a Delta table with an initial load of data using Spark.

    # PostgreSQL connection details
    jdbc_url = "jdbc:postgresql://your_postgres_db.postgres.database.azure.com:5432/postgres"
    jdbc_properties = {
        "user": "postgres",
        "driver": "org.postgresql.Driver"
    }
    
    # Read data from PostgreSQL employees table
    employee_df = spark.read.jdbc(url=jdbc_url, table="employees", properties=jdbc_properties)
    
    # Define the path for the Delta table in ADLS
    delta_table_path = "abfss://your_container@your_storage_account.dfs.core.windows.net/delta/employees"
    
    # Write DataFrame to Delta table
    employee_df.write.format("delta").mode("overwrite").save(delta_table_path)
    
    delta_df = spark.read.format("delta").load(delta_table_path)
    delta_df.show()
    

  7. Now, by continuously running the following code, the data in Delta Lake will stay in sync with the underlying PostgreSQL database.
    import json
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import lit
    from delta.tables import DeltaTable
    import pandas as pd
    
    # PostgreSQL connection details
    jdbc_url = "jdbc:postgresql://your_postgres_db.postgres.database.azure.com:5432/postgres"
    jdbc_properties = {
        "user": "postgres",
        "driver": "org.postgresql.Driver"
    }
    
    #Delta table details
    delta_table_path = "abfss://your_container@your_storage_account.dfs.core.windows.net/delta/employees"
    delta_table = DeltaTable.forPath(spark, delta_table_path)
    delta_df = spark.read.format("delta").load(delta_table_path)
    schema = delta_df.schema
    
    loop
        cdc_df = spark.read.jdbc(url=jdbc_url, table="(SELECT data FROM pg_logical_slot_get_changes('cdc_slot', NULL, NULL, 'include-xids', 'true', 'include-timestamp', 'true')) as cdc", properties=jdbc_properties)
    
        cdc_array = cdc_df.collect()
    
        for i in cdc_array:
            print(i)
            changedData = json.loads(i['data'])['change'][0]
            print(changedData)
    
            schema = changedData['schema']
            table = changedData['table']
            DMLtype = changedData['kind']
            if DMLtype == "insert" or DMLtype == "update":
                column_names = changedData['columnnames']
                column_values = changedData['columnvalues']
                source_data = {col: [val] for col, val in zip(column_names, column_values)}
                print(source_data)
                change_df = spark.createDataFrame(pd.DataFrame(source_data))
    
            if DMLtype == "insert":
                change_df.write.format("delta").mode("append").save(delta_table_path)
    
            if DMLtype == "update":
                old_keys = changedData['oldkeys']
    
                condition = " AND ".join(
                [f"target.{key} = source.{key}" for key in old_keys['keynames']]
                )
                print(condition)
    
                delta_table.alias("target").merge(
                            change_df.alias("source"),
                            condition
                        ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
    
            if DMLtype == "delete":
                condition = " AND ".join([
                            f"{key} = '{value}'"
                            for key, value in zip(changedData["oldkeys"]["keynames"], changedData["oldkeys"]["keyvalues"])
                        ])
                delta_table.delete(condition)
    end loop​

conclusion

In conclusion, building the Bronze tier of the Medallion Architecture using PostgreSQL’s wal2json as a source for Fabric OneLake provides a robust and scalable approach to handling raw data ingestion. This setup leverages PostgreSQL’s logical replication capabilities to capture and stream changes in real time, ensuring that the data lake is up to date with the latest transactional data.

Implementing this architecture ensures that the base layer is well-structured and supports real-time analytics, advanced data processing, and AI applications, while also providing a robust layer for the next layer.

By adopting this strategy, organizations can achieve greater data consistency, reduce data processing latency, and improve the overall efficiency of their data pipeline.

References
https://learn.microsoft.com/en-us/fabric/onelake/onelake-medallion-lakehouse-architecture
https://learn.microsoft.com/en-us/azure/databricks/lakehouse/medallion
https://blog.fabric.microsoft.com/en-us/blog/eventhouse-onelake-availability-is-now-generally-availa...
https://learn.microsoft.com/en-us/fabric/data-engineering/lakehouse-and-delta-tables

Feedback and Suggestions

If you are If you have any feedback or suggestions to improve this data migration asset, please email us. Database Platform Engineering Team.





Source link

You may also like

Leave a Comment

Our Company

Welcome to OdysseyX, your one-stop destination for the latest news and opportunities across various domains.

Newsletter

Subscribe my Newsletter for new blog posts, tips & new photos. Let's stay updated!

Laest News

@2024 – All Right Reserved. Designed and Developed by OdysseyX