Skip to content

StockPulse is a real‑time data engineering pipeline that simulates live stock index ticks and processes them end‑to‑end.

License

Notifications You must be signed in to change notification settings

Arjun-M-101/Stock_Pulse

Repository files navigation

📈 StockPulse – Real‑Time Data Engineering Pipeline (Streaming)

🚀 Overview

StockPulse is a real‑time data engineering pipeline that simulates live stock index ticks and processes them end‑to‑end.

It demonstrates a modern streaming architecture using:

  • Kafka → Message broker for real‑time ingestion
  • Apache Spark Structured Streaming → Scalable stream processing & ETL
  • Postgres → Serving layer for analytics
  • Parquet Data Lake → Partitioned storage for historical queries
  • Apache Airflow (3.x) → Orchestration & scheduling
  • Streamlit + Altair → Interactive BI dashboard

The pipeline ingests simulated stock index data, enriches it with derived metrics, stores it in both Postgres and Parquet, and visualizes it in real‑time.


🏗️ Architecture

image

📂 Project Structure

stock_pulse/
│
├── airflow/                      # Airflow home (local)
│   ├── dags/
│   │   └── stock_pipeline_dag.py # Airflow DAG definition
│   ├── logs/                     # Airflow logs
│   ├── airflow.cfg               # Airflow config
│   └── airflow.db                # Airflow metadata DB
│
├── chk/                          # Spark checkpoints
│   ├── parquet/
│   └── postgres/
│
├── consumer/
│   └── consumer.py               # Spark Structured Streaming consumer
│
├── data/
│   └── indexProcessed.csv        # Input dataset for producer
│
├── lake/
│   └── parquet/                  # Partitioned parquet sink (index/date)
│
├── postgres/
│   └── init.sql                  # DB init script (optional)
│
├── producer/
│   └── producer.py               # Kafka producer (simulated ticks)
│
├── clear_outputs.py              # Utility to clear outputs/checkpoints
├── requirements-lock.txt         # Pinned dependencies
├── streamlit_app.py              # Streamlit + Altair dashboard
└── venv/                         # Python virtual environment (local)

🛠️ Prerequisites & Installation

System Packages

  • Python 3.10+

    sudo apt update
    sudo apt install -y python3 python3-venv python3-dev
  • Java (JDK 11+) → required for PySpark

    sudo apt install -y openjdk-11-jdk
    java -version
  • Apache Spark

    wget https://downloads.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
    tar xvf spark-3.4.1-bin-hadoop3.tgz
    mv spark-3.4.1-bin-hadoop3 ~/spark
    export SPARK_HOME=~/spark
    export PATH=$SPARK_HOME/bin:$PATH
    spark-shell --version
  • Kafka

    wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
    tar -xzf kafka_2.13-3.6.0.tgz
    mv kafka_2.13-3.6.0 ~/kafka

    Start services in separate terminals:

    ~/kafka/bin/zookeeper-server-start.sh ~/kafka/config/zookeeper.properties
    ~/kafka/bin/kafka-server-start.sh ~/kafka/config/server.properties
  • Postgres

    sudo apt install -y postgresql postgresql-contrib libpq-dev
    sudo systemctl start postgresql

    Create DB:

    CREATE DATABASE stocks;
    \c stocks

    Create Table:

    -- create fresh table
    CREATE TABLE ticks_raw (
      stream_ts TIMESTAMPTZ NOT NULL,
      date DATE NOT NULL,
      index TEXT NOT NULL,
      open NUMERIC,
      high NUMERIC,
      low NUMERIC,
      close NUMERIC NOT NULL,
      adj_close NUMERIC,
      volume BIGINT NOT NULL,
      close_usd NUMERIC,
      trade_value NUMERIC
    );
  • Airflow (3.x)

    pip install apache-airflow
    airflow db migrate
    airflow standalone

⚙️ Setup Instructions

Clone repo

git clone https://github.com/Arjun-M-101/StockPulse.git
cd stock_pulse

Create virtual environment

python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt

Start Kafka topic

~/kafka/bin/kafka-topics.sh --create --topic stock_ticks --bootstrap-server localhost:9092

Run Producer

python producer/producer.py

▶️ Running the Pipeline

You have two options to run the Spark consumer:

Option A: Run Consumer Manually

$SPARK_HOME/bin/spark-submit \
  --master local[*] \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,org.postgresql:postgresql:42.7.4 \
  consumer/consumer.py
  • Best for quick local testing and debugging.
  • Logs appear in the terminal.

Option B: Orchestrate via Airflow DAG

  • Open Airflow UI → enable stockpulse_pipeline.
  • Trigger the DAG to run the same spark-submit job as a scheduled/managed task.
  • Best for repeatable runs, retries, and monitoring.

Launch Dashboard

streamlit run streamlit_app.py

Reset Outputs (optional)

python clear_outputs.py
  • Clears: lake/parquet contents and Spark checkpoint directories (chk/parquet, chk/postgres)
  • Restores: .gitkeep to keep folders tracked by Git

📊 Dashboard Features

  • Raw Close Prices → Line chart of absolute price levels
  • % Change → Relative moves per index
  • Rebased to 100 → Growth from baseline
  • Volume Share → Market share by traded volume

Auto‑refreshes every 5 seconds for live updates.


🔄 Data Flow

Producer

  • Reads indexProcessed.csv
  • Emits 1 row per index per tick with synchronized timestamp
  • Publishes to Kafka topic stock_ticks

Consumer (Spark)

  • Reads from Kafka in streaming mode
  • Parses JSON → applies schema
  • Adds derived column: trade_value = close * volume
  • Writes to:
    • Parquet (partitioned by index/date)
    • Postgres (ticks_raw table)

Airflow DAG

  • Orchestrates Spark consumer job
  • Can be extended for retries, alerts, and scheduling

Dashboard

  • Connects to Postgres
  • Queries latest 500 ticks
  • Provides interactive Altair visualizations

Clear Outputs Utility

  • clear_outputs.py wipes lake/parquet and chk/ directories
  • Restores .gitkeep to preserve folder structure
  • Useful for reproducibility and fresh pipeline runs

- Sample Outputs (Airflow + Dashboard)


✅ Key Takeaways

  • Demonstrates a real‑time streaming pipeline using Kafka as the event bus and Spark Structured Streaming as the processing engine.
  • Uses a dual‑sink pattern: Parquet files as a historical data lake and Postgres as a low‑latency serving layer for dashboards.
  • Runs under an Airflow DAG that triggers the Spark consumer, showing how to orchestrate streaming jobs.
  • Exposes live metrics through a Streamlit dashboard, which queries Postgres for recruiter‑friendly visualizations.
  • Includes a clear‑outputs utility and checkpoint management to make the demo reproducible from a clean state.
  • Fully reproducible on a single machine, while the architecture maps naturally to cloud services like MSK/Kinesis, S3, EMR/Databricks, MWAA, and RDS/Redshift.

⚖️ Trade‑offs & Design Decisions

  • Kafka vs file‑based ingestion:
    Chose Kafka so producers and consumers are decoupled, events are durable, and the stream can be replayed from offsets for debugging and backfills.

  • Postgres vs data warehouse for serving:
    Used Postgres because it is easy to run locally and good enough for second‑level latency queries from Streamlit. In production, a warehouse or OLAP store (Redshift, Snowflake, ClickHouse) would handle higher volume and concurrency.

  • Parquet + partitioning for history:
    Wrote events to Parquet partitioned by index and date, which makes historical queries and downstream batch processing efficient and aligns with lake/lakehouse best practices.

  • Airflow standalone vs managed orchestration:
    Used standalone Airflow to keep the setup simple while still demonstrating scheduling and monitoring. In production, managed Airflow (MWAA) or a Kubernetes‑based orchestrator would provide better scaling and reliability.

  • Streamlit vs enterprise BI:
    Picked Streamlit for fast, code‑first dashboarding that showcases the pipeline end‑to‑end. In an organization, the same serving tables would likely feed Superset, Tableau, or Power BI for governed reporting.

  • Checkpoint and output clearing:
    In development, clearing checkpoints and lake outputs is useful to rerun the stream from the earliest offsets and demo the full flow. In production, checkpoints must be preserved to avoid duplicate processing and to maintain exactly‑once or at‑least‑once guarantees.


About

StockPulse is a real‑time data engineering pipeline that simulates live stock index ticks and processes them end‑to‑end.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages