PySpark ETL Project on Azure: Batch and Streaming Guide

This HTML page compiles the entire guide for creating a PySpark ETL project using Azure's free account. It includes infrastructure setup, sample data, Python scripts for batch processing, and explanations for batch vs. streaming in industry use cases. All scripts are included inline for self-containment.

Azure Infrastructure Setup for Medallion Architecture (Bronze, Silver, Gold)

In data warehousing and big data pipelines, the medallion architecture is commonly used to organize data lakes into layers:

To support this for large datasets:

All creation uses Azure free account. No local runs—scripts run in Databricks notebooks/jobs.

Step 1: Create Supporting Azure Infrastructure

Use Azure Portal or CLI (install Azure CLI if needed). Run these in a cloud shell or terminal after az login.

# Variables (customize)
RESOURCE_GROUP="data-lake-demo-rg"
LOCATION="eastus"  # Your region
STORAGE_ACCOUNT="datalakedemo$(date +%s)"  # Unique
FILE_SYSTEM="data-lake"  # Root container for Data Lake

# Create Resource Group
az group create --name $RESOURCE_GROUP --location $LOCATION

# Create Storage Account with Data Lake Gen2
az storage account create --name $STORAGE_ACCOUNT --resource-group $RESOURCE_GROUP --location $LOCATION --sku Standard_LRS --kind StorageV2 --hierarchical-namespace true

# Get Account Key
ACCOUNT_KEY=$(az storage account keys list --resource-group $RESOURCE_GROUP --account-name $STORAGE_ACCOUNT --query "[0].value" -o tsv)

# Create File System (root container) and folders for layers
az storage fs create --name $FILE_SYSTEM --account-name $STORAGE_ACCOUNT --account-key $ACCOUNT_KEY
az storage fs directory create --name "bronze" -f $FILE_SYSTEM --account-name $STORAGE_ACCOUNT --account-key $ACCOUNT_KEY
az storage fs directory create --name "silver" -f $FILE_SYSTEM --account-name $STORAGE_ACCOUNT --account-key $ACCOUNT_KEY
az storage fs directory create --name "gold" -f $FILE_SYSTEM --account-name $STORAGE_ACCOUNT --account-key $ACCOUNT_KEY

Azure Databricks Setup: In Azure Portal, create a Databricks workspace (link to your Storage Account). Use CLI:

az databricks workspace create --name "etl-databricks" --resource-group $RESOURCE_GROUP --location $LOCATION --sku standard

In Databricks UI: Create a cluster (e.g., single-node for demo; auto-scales for large data). Mount the Data Lake (use DBFS or direct ABFS paths).

Step 2: Create Small Sample Datasets

For demo (including join):

order_id,product,quantity,price,customer_id,date
1,Laptop,2,1000.50,101,2023-01-15
2,Phone,1,500.00,102,2023-01-16
3,Laptop,1,1000.50,101,2023-02-20
4,Tablet,3,,103,2023-02-21  # Missing price
5,Phone,2,500.00,102,2023-03-10
6,Tablet,,300.00,104,2023-03-11  # Missing quantity
customer_id,name,region,loyalty_score
101,Alice,East,85
102,Bob,West,90
103,Charlie,North,75
104,David,South,  # Missing score

Save these as CSV files on your machine temporarily.

Step 3: Upload Datasets to Bronze Layer

Upload to Azure Data Lake (bronze folder) using Azure CLI or Portal (Storage Explorer). No local processing—raw ingest.

# Upload sales_data.csv to bronze
az storage fs file upload --account-name $STORAGE_ACCOUNT --account-key $ACCOUNT_KEY --file-system $FILE_SYSTEM --path "bronze/sales_data.csv" --source sales_data.csv

# Upload customers_data.csv to bronze
az storage fs file upload --account-name $STORAGE_ACCOUNT --account-key $ACCOUNT_KEY --file-system $FILE_SYSTEM --path "bronze/customers_data.csv" --source customers_data.csv

When/Where: Do this after infrastructure creation, before running scripts. Upload to /bronze/ for raw landing zone. In production pipelines (e.g., Azure Data Factory), automate ingestion from sources like APIs/FTP.

Step 4: PySpark ETL Script Template for Batch Data Pipeline

This script uses Spark SQL primarily for SQL-like commands to manage large datasets. Run in Azure Databricks (upload as a notebook or job). It's a template—variables for reusability.

Pipeline Features:

For large datasets: Spark distributes processing across cluster nodes; partitioning reduces shuffle; Parquet compression saves storage/I/O.

Save as data_pipeline_notebook.py in Databricks.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType
from pyspark.sql.functions import col, lit, current_timestamp
import pyspark.sql.utils as utils  # For error handling

# Configuration Variables (Template: Load from secrets/DBFS config in prod)
AZURE_ACCOUNT_NAME = "your_storage_account_name"  # e.g., datalakedemo123
AZURE_ACCOUNT_KEY = dbutils.secrets.get(scope="your-secret-scope", key="storage-key")  # Use Databricks secrets for security
FILE_SYSTEM = "data-lake"
BRONZE_PATH_SALES = f"abfss://{FILE_SYSTEM}@{AZURE_ACCOUNT_NAME}.dfs.core.windows.net/bronze/sales_data.csv"  # Note: abfss for secure access
BRONZE_PATH_CUSTOMERS = f"abfss://{FILE_SYSTEM}@{AZURE_ACCOUNT_NAME}.dfs.core.windows.net/bronze/customers_data.csv"
SILVER_PATH_SALES = f"abfss://{FILE_SYSTEM}@{AZURE_ACCOUNT_NAME}.dfs.core.windows.net/silver/sales_cleaned"
SILVER_PATH_CUSTOMERS = f"abfss://{FILE_SYSTEM}@{AZURE_ACCOUNT_NAME}.dfs.core.windows.net/silver/customers_cleaned"
GOLD_PATH = f"abfss://{FILE_SYSTEM}@{AZURE_ACCOUNT_NAME}.dfs.core.windows.net/gold/aggregated_sales"

# Initialize Spark Session (Databricks handles config; add for large data: .config("spark.executor.memory", "8g"))
spark = SparkSession.builder \
    .appName("DataLakeETLPipeline") \
    .getOrCreate()

# Mount Data Lake if needed (Databricks-specific; optional if using direct paths)
# dbutils.fs.mount(source=f"wasbs://{FILE_SYSTEM}@{AZURE_ACCOUNT_NAME}.blob.core.windows.net/", mount_point="/mnt/data-lake", extra_configs={f"fs.azure.account.key.{AZURE_ACCOUNT_NAME}.blob.core.windows.net": AZURE_ACCOUNT_KEY})

# Step 1: Ingest and Clean Sales Data (Bronze to Silver) - Schema enforcement, missing values, filter
# What it's doing: Reads raw CSV from bronze. Enforces schema to prevent bad data. Handles missing values (fill defaults). Filters invalid records. Writes to silver as Parquet for efficiency.
schema_sales = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("product", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", DoubleType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("date", StringType(), True)  # Cast to date later
])
try:
    df_sales_raw = spark.read.csv(BRONZE_PATH_SALES, header=True, schema=schema_sales)
except utils.AnalysisException as e:
    print(f"Error ingesting sales: {e}")
    raise

df_sales_clean = df_sales_raw.na.fill({"quantity": 0, "price": 0.0}) \
    .withColumn("date", col("date").cast(DateType())) \
    .withColumn("ingestion_timestamp", current_timestamp()) \
    .filter((col("quantity") > 0) & (col("price") > 0))

df_sales_clean.write.mode("overwrite").partitionBy("date").parquet(SILVER_PATH_SALES)  # Partition for large data queries
print("Sales cleaned and written to silver.")

# Step 2: Ingest and Clean Customers Data (Bronze to Silver) - Similar cleaning
# What it's doing: Reads customers CSV. Handles missing loyalty_score. Adds metadata. Writes partitioned Parquet to silver.
schema_customers = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("region", StringType(), True),
    StructField("loyalty_score", IntegerType(), True)
])
df_customers_raw = spark.read.csv(BRONZE_PATH_CUSTOMERS, header=True, schema=schema_customers)

df_customers_clean = df_customers_raw.na.fill({"loyalty_score": 0}) \
    .withColumn("loyalty_category", when(col("loyalty_score") >= 80, "High").otherwise("Low")) \
    .filter(col("name").isNotNull())

df_customers_clean.write.mode("overwrite").partitionBy("region").parquet(SILVER_PATH_CUSTOMERS)
print("Customers cleaned and written to silver.")

# Step 3: Transform and Aggregate with Join (Silver to Gold) - Join, aggregate, etc.
# What it's doing: Registers silver tables as views. Uses DataFrame for join (sales + customers), aggregate (total revenue), filter (post-2023), conditional categorization. Handles large data via broadcast join if one table small. Adds error handling.
df_sales_clean = spark.read.parquet(SILVER_PATH_SALES)
df_customers_clean = spark.read.parquet(SILVER_PATH_CUSTOMERS)

try:
    df_joined = df_sales_clean.join(df_customers_clean, on="customer_id", how="left") \
        .withColumn("revenue", col("quantity") * col("price")) \
        .filter(col("date") > lit("2023-01-31"))

    df_aggregated = df_joined.groupBy("product", "region") \
        .agg(
            sum("revenue").alias("total_revenue"),
            avg("loyalty_score").alias("avg_loyalty"),
            count("order_id").alias("order_count"),
            max("date").alias("latest_date")
        ) \
        .filter(col("total_revenue") > 500) \
        .withColumn("revenue_tier", when(col("total_revenue") > 2000, "Premium").otherwise("Standard")) \
        .withColumn("process_timestamp", current_timestamp()) \
        .orderBy(col("total_revenue").desc())

    df_aggregated.write.mode("overwrite").partitionBy("product", "region").parquet(GOLD_PATH)  # Partition for warehouse queries
    df_aggregated.show()  # Preview in Databricks
    print("Gold layer created with aggregates.")
except Exception as e:
    print(f"Pipeline error: {e}")
    # In prod: Log to Azure Monitor or retry logic

# Stop Session (Databricks auto-manages)
spark.stop()

print("Pipeline Completed. Query gold layer in Databricks or Azure Synapse for analytics.")

Running in Databricks: Create a notebook, paste script, attach to cluster, run. For large data, schedule as job.

Explanations in Script: Each step comments "What it's doing" – covers ingestion, cleaning (missing/filter), transform (join/aggregate/conditional), load (partitioned).

Scalability: For TB+ data, increase cluster size; Spark optimizes distributed execution.

Clean Up: az group delete --name $RESOURCE_GROUP --yes to avoid costs.

This is a complete, SQL-focused pipeline template. Extend with Delta Lake (for ACID transactions) by adding .format("delta") in writes—free in Databricks.

Batch Processing Confirmation

Yes, the ETL process is a classic batch processing pipeline. Bronze: Raw ingest. Silver: Cleaning. Gold: Transformations and load.

Handling Data in Real Industry Use Cases

In real-world industry projects (e.g., finance, retail, healthcare, IoT), data pipelines evolve based on needs: batch for historical analysis/cost-efficiency, streaming for real-time insights, or hybrid (unified) approaches.

1. Batch Processing in Industry

Common Use Cases: End-of-day financial reconciliations, inventory reporting.

Input Handling: From databases, files, APIs via ADF or Databricks Auto Loader.

Transformation: Via Spark SQL in Databricks jobs, with deduplication, joins.

Output: To Gold for querying via Power BI, exported to databases.

2. Streaming Data Processing in Industry

Streaming handles continuous data for low latency. Use Spark Structured Streaming in Databricks.

Common Use Cases: Real-time analytics, IoT, fraud detection.

Input Handling: From Event Hubs, Kafka, IoT Hub; use Auto Loader for streaming files.

Transformation: Continuous queries with filtering, joins, windowed aggregations; stateful for sessions.

Output: To Delta Tables, databases, event hubs; feed dashboards or ML models.

Key Differences: Batch is cheaper for large historical; streaming for real-time.

Tools: Databricks for processing, ADF for orchestration.