Skip to content

dimifontaine/simple-data-pipeline-python

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

18 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Simple Data Pipeline (Python)

A simple end-to-end data pipeline built as part of a technical assessment, modeling ingestion, validation, and persistence of customer data.


Context

This repository was created as part of a technical assessment.

The goal was to implement a small but complete data pipeline within a limited time frame, focusing on clarity, correctness, and structure rather than production-level completeness.


Overview

This repository implements a three-service customer data pipeline:

  • mock-server: Flask API serving customer records from a JSON dataset
  • pipeline-service: FastAPI service responsible for ingestion, validation, and read APIs
  • postgres: PostgreSQL for persistence

Flow:

Flask JSON API → FastAPI ingestion → PostgreSQL → FastAPI read API


Design Notes

  • The pipeline is intentionally synchronous and minimal
  • Error handling is kept simple given the scope
  • The focus is on clarity of flow and separation of concerns over abstraction

In a production setting, this would likely evolve toward:

  • streaming or incremental ingestion instead of full in-memory batches
  • retry and failure handling strategies
  • idempotent processing guarantees
  • separation via queues or event-driven pipelines
  • observability (logging, metrics, tracing)

The goal is to provide a clear baseline that can be extended, rather than a production-ready system.


Running the project

Start all services:

docker compose up --build -d

Test endpoints:

curl "http://localhost:5000/api/health"  
curl "http://localhost:5000/api/customers?page=1&limit=5"  
curl -X POST "http://localhost:8000/api/ingest"  
curl "http://localhost:8000/api/customers?page=1&limit=5"  
curl "http://localhost:8000/api/customers/{customer_id}"

Testing

Install dev dependencies:

uv pip install --python .venv/bin/python -r requirements-dev.txt -r mock-server/requirements.txt -r pipeline-service/requirements.txt

Run tests:

env PYTHONPYCACHEPREFIX=/tmp .venv/bin/python -m pytest -q

Tests focus on:

  • API contract validation
  • pagination behavior
  • ingestion validation (invalid data, conflicts)

System Design

Data generation

  • Dataset: ~10,000 customers
  • Generated using Faker
  • Deterministic UUIDs ensure reproducibility
  • Generator script is included (mock-server/scripts/generate_customers.py)

API contract

  • Shared response shape: data, total, page, limit
  • Defaults: page=1, limit=10
  • Invalid values return 400
  • limit > 100 is rejected

Ingestion

  • Uses dlt to load into PostgreSQL
  • Data is staged before merging into the main table
  • Staging allows explicit validation and failure tracking

Notes:

  • Source API is fetched page-by-page
  • Current implementation accumulates data in memory before loading

Acceptable for the current dataset (~10k rows), but would need to shift to incremental ingestion for larger scale.


Conflict handling

  • customer_id is the primary key (upsert)
  • Existing rows are updated while preserving created_at
  • updated_at is set only on updates
  • Email conflicts across different customer_id values are skipped and logged
  • Invalid data is skipped and recorded in ingest_failures

Service boundaries

  • mock-server is independent from PostgreSQL
  • pipeline-service owns ingestion, validation, persistence, and read APIs
  • SQLAlchemy is used for schema modeling and queries
  • The mock server loads the full dataset in memory at startup for simplicity

Notes

This repository is intentionally scoped to a small dataset and a simple pipeline, but reflects the trade-offs involved when evolving toward more robust data systems.

About

Minimal data pipeline modeling ingestion, validation, and persistence (Flask → FastAPI → PostgreSQL).

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors