Customer Master Data Platform with Data Quality & Deduplication (Microsoft Fabric) — End-to-End
This post documents a complete, production-style build of a Customer Master Data Platform in Microsoft Fabric, using Fabric Pipelines (Copy Data) for ingestion from RandomUser.me API, transformations in Fabric Notebooks (PySpark), and Gold dimensional modelling with SCD Type 2 using a T-SQL Notebook into a Fabric Warehouse.
What you will build
A reusable platform that ingests raw user profiles, standardises and validates them, detects duplicates, applies survivorship rules, and publishes a trusted dimensional model for analytics and Power BI.
| Layer | Storage | Content |
|---|---|---|
| Bronze | Lakehouse (Files + Delta) | Raw JSON from API (no schema enforcement) |
| Silver | Lakehouse (Delta tables) | Flattened, cleaned, standardised, DQ checks, dedup candidates |
| Gold | Warehouse | Dimensional model with SCD Type 2 + analytics facts |
Table of contents
- Repository & project structure
- Fabric workspace, Lakehouse, Warehouse setup
- API ingestion with Fabric Pipelines (Copy Data)
- Bronze: raw storage layout & naming
- Silver: PySpark flattening + DQ rules
- Duplicate detection & survivorship
- Gold: dimensional modelling + SCD Type 2 (T-SQL)
- Incremental loads & watermarking
- Power BI semantic model & visuals
- Operational best practices (what to do / not do)
1) Version Control (GitHub) — reproducible, reviewable engineering
I keep all artefacts in a single Git repository to make the build reproducible and easy to review: https://github.com/the-mama/randomuser_project_ms_fabric. This repo includes pipeline configuration notes, notebook sources, SQL scripts, and Power BI model conventions.
mkdir randomuser_project_ms_fabric
cd randomuser_project_ms_fabric
git init
git checkout -b main
# recommended structure
mkdir -p docs notebooks/pyspark notebooks/tsql pipelines sql models powerbi samples
git add .
git commit -m "Initial project structure" | Folder | Purpose |
|---|---|
pipelines/ | Pipeline definitions, parameters, environment notes |
notebooks/pyspark/ | Bronze→Silver transformations, DQ, dedup logic |
notebooks/tsql/ | SCD Type 2 merges into Warehouse |
sql/ | DDL, views, stored procedures (if used) |
powerbi/ | Semantic model notes + measures |
samples/ | Sample API payloads for documentation |
- Do version control notebooks and SQL scripts (auditable changes).
- Do keep environment-specific config in parameters (not hard-coded).
- Do not commit secrets, keys, or tokens. Use Fabric connections and workspace security.
2) Microsoft Fabric Setup — Workspace, Lakehouse, Warehouse
In Fabric, I create one workspace, then two core items: Lakehouse for Bronze/Silver and Warehouse for Gold dimensional modelling.
| Item | Name (example) | Notes |
|---|---|---|
| Workspace | ws_randomuser_masterdata | Dev first; later add Test/Prod if needed |
| Lakehouse | lh_randomuser | Bronze raw files + Silver delta tables |
| Warehouse | wh_masterdata | Gold dims/facts + semantic layer |
- Use consistent naming:
lh_for Lakehouse,wh_for Warehouse,nb_for notebooks,pl_for pipelines. - Keep Bronze and Silver in Lakehouse; publish curated Gold to Warehouse for BI performance and governance.
3) Ingestion with Fabric Pipelines — Copy Data from RandomUser.me API
The ingestion is implemented using Microsoft Fabric Data Pipeline and a Copy Data activity. The API does not require authentication, so the connection is straightforward.
| Setting | Value |
|---|---|
| Connection (Base URL) | https://randomuser.me/api/ |
| Relative URL (example) | ?results=100&page=@{pipeline().parameters.page}&seed=fabric_masterdata&inc=gender,name,location,email,dob,registered,phone,cell,id,nat&format=json |
| Pagination approach | Loop pages 1..N using a pipeline ForEach |
| Sink | Lakehouse Files (Bronze) as JSON |
| File path | Files/bronze/randomuser/load_date=YYYY-MM-DD/page=0001/ |
| File name | randomuser_YYYYMMDD_page0001.json |
{
"load_date": "2026-01-24",
"results_per_page": 100,
"max_pages": 20,
"seed": "fabric_masterdata"
} 1) Set variable: pages = range(1, max_pages+1)
2) ForEach page in pages:
* Copy Data activity
Source:
Base URL: [https://randomuser.me/api/](https://randomuser.me/api/)
Relative URL:
?results=@{pipeline().parameters.results_per_page}
&page=@{item()}
&seed=@{pipeline().parameters.seed}
&inc=gender,name,location,email,dob,registered,phone,cell,id,nat
&format=json
Sink:
Lakehouse Files:
Files/bronze/randomuser/load_date=@{pipeline().parameters.load_date}/page=@{formatNumber(item(),'0000')}/ - Do use a stable
seedfor repeatability in dev/test. - Do store each page as a separate file for audit and replay.
- Do not flatten JSON in the pipeline; keep ingestion minimal (raw-first principle).
4) Bronze Layer — Raw JSON (no schema enforcement)
Bronze is intentionally “as-is”. I store the exact payload from RandomUser.me along with operational metadata (load date/time, pipeline run id, page number).
{
"results": [
{
"gender": "male",
"name": { "title": "Mr", "first": "Aiden", "last": "Smith" },
"location": {
"street": { "number": 221, "name": "Baker Street" },
"city": "London",
"state": "England",
"country": "United Kingdom",
"postcode": "NW1 6XE"
},
"email": "[aiden.smith@example.com](mailto:aiden.smith@example.com)",
"dob": { "date": "1988-02-12T10:15:30.000Z", "age": 37 },
"registered": { "date": "2014-06-01T09:10:20.000Z", "age": 11 },
"phone": "020 7946 0011",
"cell": "07911 123456",
"id": { "name": "NINO", "value": "QQ123456C" },
"nat": "GB"
}
],
"info": { "seed": "fabric_masterdata", "results": 1, "page": 1, "version": "1.4" }
} - Partition by load_date and page.
- Add ingestion metadata columns later in Silver (do not mutate Bronze).
- Keep immutable files for traceability and reprocessing.
5) Silver Layer — PySpark flattening, standardisation, data quality
Silver is where I convert semi-structured JSON into structured Delta tables, standardise formats, and compute DQ metrics. This is implemented in a Fabric Notebook (PySpark).
from pyspark.sql.functions import col, explode, lit, to_timestamp, sha2, concat_ws, upper, trim, regexp_replace
load_date = "2026-01-24" # in practice: pass as notebook parameter
bronze_path = f"Files/bronze/randomuser/load_date={load_date}/*/*.json"
raw = spark.read.option("multiLine", True).json(bronze_path)
df = raw.select(explode(col("results")).alias("r"), col("info").alias("info"))
silver = df.select(
lit(load_date).alias("load_date"),
col("info.seed").alias("api_seed"),
col("info.page").alias("api_page"),
col("r.gender").alias("gender"),
col("r.name.title").alias("name_title"),
col("r.name.first").alias("first_name"),
col("r.name.last").alias("last_name"),
col("r.email").alias("email"),
to_timestamp(col("r.dob.date")).alias("dob_utc"),
col("r.dob.age").alias("age"),
to_timestamp(col("r.registered.date")).alias("registered_utc"),
col("r.phone").alias("phone"),
col("r.cell").alias("cell"),
col("r.nat").alias("nationality"),
col("r.location.street.number").alias("street_no"),
col("r.location.street.name").alias("street_name"),
col("r.location.city").alias("city"),
col("r.location.state").alias("state"),
col("r.location.country").alias("country"),
col("r.location.postcode").cast("string").alias("postcode")
)
# standardise strings
silver = (silver
.withColumn("email_std", upper(trim(col("email"))))
.withColumn("postcode_std", upper(regexp_replace(trim(col("postcode")), r"\s+", "")))
)
# create business hash keys (stable)
silver = silver.withColumn(
"person_nk",
sha2(concat_ws("||", col("email_std"), col("dob_utc").cast("string"), col("postcode_std")), 256)
) from pyspark.sql.functions import count, sum as _sum, when
dq = silver.agg(
count("*").alias("row_count"),
(_sum(when(col("email").isNotNull(), 1).otherwise(0)) / count("*") * 100).alias("email_completeness_pct"),
(_sum(when(col("dob_utc").isNotNull(), 1).otherwise(0)) / count("*") * 100).alias("dob_completeness_pct"),
(_sum(when(col("postcode").isNotNull(), 1).otherwise(0)) / count("*") * 100).alias("postcode_completeness_pct")
)
# uniqueness of natural key candidate
unique_nk = (silver.groupBy("person_nk").count()
.agg((_sum(when(col("count")==1,1).otherwise(0))/count("*")*100).alias("person_nk_uniqueness_pct")))
display(dq)
display(unique_nk) silver.write.mode("append").format("delta").saveAsTable("lh_randomuser.silver_person_staging")
- Do standardise key fields before hashing (email/postcode).
- Do store both raw and standardised versions (audit + matching).
- Do not rely on API “id.value” as a primary key (it can be null or inconsistent).
6) Duplicate detection & survivorship rules
The core matching rule is: Email + DOB + Postcode similarity. For this synthetic dataset, I use a deterministic approach:
standardise these fields, then generate a person_nk hash key. Any duplicates share the same person_nk.
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc
w = Window.partitionBy("person_nk").orderBy(desc("registered_utc"), desc("load_date"))
survivors = (silver
.withColumn("rn", row_number().over(w))
.filter(col("rn")==1)
.drop("rn")
)
survivors.write.mode("append").format("delta").saveAsTable("lh_randomuser.silver_person_survivor") - Rule used here: latest registered timestamp wins. In real MDM, you may use source priority + confidence scoring.
- Keep a separate duplicates table for traceability: “survivor_id ↔ duplicate_ids”.
7) Gold Layer — Dimensional model + SCD Type 2 (T-SQL Notebook)
Gold lives in the Fabric Warehouse for strong BI performance and governance. I publish: dim_person, dim_address (both SCD2), and fact_person_snapshot.
| Table | Type | Grain |
|---|---|---|
dim_person | SCD Type 2 | One row per person version |
dim_address | SCD Type 2 | One row per address version |
fact_person_snapshot | Fact | One row per load_date per person |
-- dim_person (SCD2)
CREATE TABLE IF NOT EXISTS dbo.dim_person
(
person_sk BIGINT IDENTITY(1,1) NOT NULL,
person_nk VARCHAR(64) NOT NULL, -- sha256 hex
email_std VARCHAR(320) NULL,
first_name VARCHAR(100) NULL,
last_name VARCHAR(100) NULL,
gender VARCHAR(20) NULL,
dob_date DATE NULL,
nationality VARCHAR(10) NULL,
-- SCD2
effective_from DATETIME2 NOT NULL,
effective_to DATETIME2 NOT NULL,
is_current BIT NOT NULL,
record_hash VARCHAR(64) NOT NULL, -- hash of tracked attributes
CONSTRAINT PK_dim_person PRIMARY KEY (person_sk)
);
CREATE INDEX IX_dim_person_nk_current ON dbo.dim_person(person_nk, is_current); -- Source: survivors from Lakehouse (exposed to Warehouse via shortcuts or staging table)
-- Assume staging table exists in Warehouse: dbo.stg_person_survivor
DECLARE @asOf DATETIME2 = SYSUTCDATETIME();
-- 1) Expire changed current rows
UPDATE tgt
SET tgt.effective_to = DATEADD(SECOND, -1, @asOf),
tgt.is_current = 0
FROM dbo.dim_person tgt
JOIN dbo.stg_person_survivor src
ON tgt.person_nk = src.person_nk
WHERE tgt.is_current = 1
AND tgt.record_hash <> src.record_hash;
-- 2) Insert new rows for:
-- a) new natural keys
-- b) changed rows (after expiring old version)
INSERT INTO dbo.dim_person
(
person_nk, email_std, first_name, last_name, gender, dob_date, nationality,
effective_from, effective_to, is_current, record_hash
)
SELECT
src.person_nk,
src.email_std,
src.first_name,
src.last_name,
src.gender,
CAST(src.dob_utc AS DATE) AS dob_date,
src.nationality,
@asOf AS effective_from,
'9999-12-31' AS effective_to,
1 AS is_current,
src.record_hash
FROM dbo.stg_person_survivor src
LEFT JOIN dbo.dim_person tgt
ON tgt.person_nk = src.person_nk
AND tgt.is_current = 1
WHERE tgt.person_nk IS NULL
OR tgt.record_hash <> src.record_hash; - Do use
record_hashto detect attribute changes efficiently. - Do keep SCD2 dates in UTC and use a consistent “infinite” end date (
9999-12-31). - Do not MERGE with complex “WHEN MATCHED THEN UPDATE + INSERT” if your platform has edge-case behaviour; explicit UPDATE then INSERT is safer.
8) Incremental Loads — watermarking and idempotency
RandomUser.me does not provide true CDC, so incremental design here is implemented as:
daily ingestion + dedup + SCD2. The incremental behaviour is achieved by:
(1) tracking load_date as a partition,
(2) using deterministic natural keys,
(3) only applying changes when record_hash differs.
from pyspark.sql.functions import sha2, concat_ws, col
tracked_cols = ["email_std","first_name","last_name","gender","dob_utc","nationality","postcode_std","city","state","country","street_no","street_name"]
silver = silver.withColumn("record_hash", sha2(concat_ws("||", *[col(c).cast("string") for c in tracked_cols]), 256)) - Make your pipeline idempotent: re-running a day should not duplicate Gold rows.
- Use dedup + record_hash so only real changes create new SCD versions.
9) Masking sensitive fields (safe-by-default)
Even though RandomUser data is synthetic, I still implement masking patterns as if this were a real customer platform. In Gold, expose masked values in reporting views, and restrict raw fields to privileged roles.
CREATE OR ALTER VIEW dbo.vw_dim_person_masked AS
SELECT
person_sk,
person_nk,
CASE
WHEN email_std IS NULL THEN NULL
ELSE CONCAT(LEFT(email_std, 2), '***', RIGHT(email_std, 6))
END AS email_masked,
first_name,
last_name,
gender,
dob_date,
nationality,
is_current
FROM dbo.dim_person; - Do apply least privilege: analysts consume masked views by default.
- Do separate raw vs curated access paths.
- Do not publish raw email/phone to Power BI unless explicitly required and approved.
10) Power BI — semantic model & visuals
With Gold tables in the Warehouse, I build a semantic model and a small set of operational + business dashboards. This is the part stakeholders typically see first, so I include both quality and business outcomes.
| Visual | What it proves | Data |
|---|---|---|
| DQ Scorecard | Completeness/uniqueness trends over time | Silver DQ metrics table |
| Duplicates Trend | How many duplicates detected per load | Dedup groups |
| Current Customer Count | Mastered customers (current versions) | dim_person where is_current=1 |
| Change Volume | SCD2 new versions per day | dim_person effective_from |
| Geo Distribution | Customer distribution by country/state/city | dim_address |
- Prefer a star schema: dims relate to facts via surrogate keys.
- Use incremental refresh only if needed; otherwise keep semantic model simple.
- Always surface data quality metrics alongside business metrics.
Comments
Post a Comment