Skip to content
110 changes: 110 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ flowchart LR
API -->|reminder jobs| SCH
SCH --> TW
SCH --> SMTP
API -->|webhook events| WH[Webhook Dispatcher]
WH --> TW
WH --> SMTP
AI --> OAI
```

## PostgreSQL Schema (DDL)
See `backend/app/db/schema.sql`. Key tables:
- users, categories, expenses, bills, reminders
- `webhook_endpoints`, `webhook_delivery_attempts` (for event system)
- ad_impressions, subscription_plans, user_subscriptions
- refresh_tokens (optional if rotating), audit_logs

Expand All @@ -66,6 +70,11 @@ OpenAPI: `backend/app/openapi.yaml`
- Bills: CRUD `/bills`, pay/mark `/bills/{id}/pay`
- Reminders: CRUD `/reminders`, trigger `/reminders/run`
- Insights: `/insights/monthly`, `/insights/budget-suggestion`
- Webhooks:
- `/webhooks/endpoints` (POST to create, GET to list)
- `/webhooks/endpoints/{id}` (GET, PUT, DELETE)
- `/webhooks/endpoints/{id}/rotate-secret` (POST to generate new secret)
- `/webhooks/endpoints/{id}/delivery-attempts` (GET to view recent delivery attempts)

## MVP UI/UX Plan
- Auth screens: register/login.
Expand Down Expand Up @@ -168,11 +177,112 @@ finmind/
- Backend: pytest, flake8, black. Frontend: vitest, eslint.
- GitHub Actions `ci.yml` runs lint, tests, and builds both apps; optional docker build.

## Webhook Event System
FinMind can emit signed webhook events to external systems, enabling real-time integrations and automation.

### Available Event Types
- `expense.created`: A new expense has been added.
- `expense.updated`: An existing expense has been modified.
- `expense.deleted`: An existing expense has been deleted.
- `bill.created`: A new bill has been added.
- `bill.updated`: An existing bill has been modified.
- `bill.deleted`: An existing bill has been deleted.
- `bill.paid`: A bill has been marked as paid.

### Payload Structure
All webhook events are sent as `application/json` with the following common structure:
```json
{
"id": "event_uuid_v4", // Unique ID for this specific event instance
"event_type": "expense.created", // Type of the event
"timestamp": "2023-10-27T10:30:00.123456", // UTC timestamp in ISO 8601 format
"data": { /* resource object (e.g., Expense, Bill) in its current state */ },
"metadata": {
"finmind_app_id": "finmind_backend"
}
}
```

### Signed Delivery (Signature Verification)
To ensure the authenticity and integrity of webhook payloads, each request is signed using an HMAC-SHA256 algorithm.

1. **Secret Key**: Each webhook endpoint configured by the user has a unique secret key. This key is provided once when the endpoint is created or when its secret is rotated. **It should be stored securely and not shared.**
2. **Signature Header**: The webhook request includes an `X-FinMind-Signature` header in the format `t=<timestamp>,v1=<signature>`.
* `t`: The UTC timestamp (seconds since epoch) when the webhook was sent.
* `v1`: The HMAC-SHA256 signature.
3. **Verification Steps (on receiver's end)**:
* Extract the `timestamp` and `signature` from the `X-FinMind-Signature` header.
* **Concatenate the timestamp and the raw request body**: `signed_payload = f"{timestamp}.{request_body_string}"`.
* Compute an HMAC with SHA256 using your endpoint's `secret` as the key and the `signed_payload` as the message.
* Compare the computed signature with the `v1` signature from the header. If they match, the webhook is authentic.
* **Timestamp Verification (optional but recommended)**: Compare the `timestamp` from the header with the current time. If the difference is too large (e.g., more than 5 minutes), reject the request to prevent replay attacks.

Example Python verification:
```python
import hmac
import hashlib
import time
import json

def verify_webhook_signature(payload_body, header_signature, secret, tolerance=300):
try:
parts = header_signature.split(',')
timestamp_part = next(p for p in parts if p.startswith('t='))
signature_part = next(p for p in parts if p.startswith('v1='))

timestamp = int(timestamp_part.split('=')[1])
signature = signature_part.split('=')[1]

# Check timestamp for replay attacks
if abs(time.time() - timestamp) > tolerance:
return False, "Timestamp too old or too new"

signed_payload = f"{timestamp}.{payload_body}"
expected_signature = hmac.new(
secret.encode('utf-8'),
signed_payload.encode('utf-8'),
hashlib.sha256
).hexdigest()

return hmac.compare_digest(expected_signature, signature), None
except Exception as e:
return False, str(e)

# Example usage (assuming Flask request):
# webhook_secret = "your_endpoint_secret"
# payload_body = request.get_data(as_text=True)
# header_signature = request.headers.get('X-FinMind-Signature')
# is_valid, error = verify_webhook_signature(payload_body, header_signature, webhook_secret)
# if not is_valid:
# print(f"Webhook verification failed: {error}")
# return "Unauthorized", 401
```

### Retry & Failure Handling
FinMind implements a robust retry mechanism for webhook deliveries:
- **Initial Attempt**: Webhooks are dispatched shortly after an event occurs.
- **Retry Policy**: If a delivery fails (e.g., non-2xx HTTP status, network error, timeout), FinMind will retry the delivery multiple times using an exponential backoff strategy.
* Default initial delay: 1 minute
* Default backoff factor: 2 (e.g., 1m, 2m, 4m, 8m, ...)
* Default max attempts: 5
- **Persistence**: Retry attempts are stored in the database and managed by `APScheduler`, ensuring that retries persist across application restarts.
- **Failure Logging**: All delivery attempts and their outcomes (status code, error messages) are logged and can be viewed via the `/webhooks/endpoints/{id}/delivery-attempts` API. If all retries are exhausted, the delivery is marked as failed.

### Webhook Management API
Users can manage their webhook endpoints through dedicated API routes:
- **Create**: `POST /webhooks/endpoints` to register a new URL and event types. A unique secret is generated and returned.
- **List**: `GET /webhooks/endpoints` to view all configured endpoints.
- **Update**: `PUT /webhooks/endpoints/{id}` to modify endpoint details (URL, event types, active status).
- **Rotate Secret**: `POST /webhooks/endpoints/{id}/rotate-secret` to generate a new secret key for an endpoint.
- **Delete**: `DELETE /webhooks/endpoints/{id}` to remove an endpoint.
- **Delivery Attempts**: `GET /webhooks/endpoints/{id}/delivery-attempts` to review recent delivery logs.

## Monitoring (Grafana OSS)
- Backend exposes Prometheus metrics at `/metrics` with:
- request count by endpoint/status
- request duration histograms (latency, including dashboard p95 KPI)
- reminder event counters (engagement KPI)
- webhook delivery attempt counters (success/failure KPIs)
- Logs are emitted as JSON with `request_id` and shipped to Loki via Promtail.
- Pre-provisioned Grafana dashboard: `FinMind Operations and KPI`.

Expand Down
40 changes: 40 additions & 0 deletions backend/app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from flask import Flask, jsonify
from flask_cors import CORS
from app.config import Config
from app.extensions import db, redis_client, jwt, scheduler
from app.routes.auth import auth_bp
from app.routes.expenses import expenses_bp
from app.routes.bills import bills_bp
from app.routes.reminders import reminders_bp
from app.routes.insights import insights_bp
from app.routes.webhooks import webhooks_bp # New import
from app.services.reminders import reminder_scheduler_job
from app.services.webhooks import webhook_scheduler_job # New import # Register blueprints
app.register_blueprint(auth_bp, url_prefix='/auth')
app.register_blueprint(expenses_bp, url_prefix='/expenses')
app.register_blueprint(bills_bp, url_prefix='/bills')
app.register_blueprint(reminders_bp, url_prefix='/reminders')
app.register_blueprint(insights_bp, url_prefix='/insights')
app.register_blueprint(webhooks_bp, url_prefix='/webhooks') # Register webhook blueprint with app.app_context():
# Add reminder job if not already scheduled
job_exists = scheduler.get_job('run_reminders')
if not job_exists:
scheduler.add_job(
id='run_reminders',
func=reminder_scheduler_job,
trigger='interval',
minutes=1,
max_instances=1,
coalesce=True
)
# Add webhook delivery retry job
webhook_job_exists = scheduler.get_job('retry_failed_webhooks')
if not webhook_job_exists:
scheduler.add_job(
id='retry_failed_webhooks',
func=webhook_scheduler_job,
trigger='interval',
seconds=30, # Check for failed webhooks every 30 seconds
max_instances=1,
coalesce=True
)
82 changes: 82 additions & 0 deletions backend/app/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import os
from datetime import timedelta

class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'you-will-never-guess'
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'postgresql://user:password@db:5432/finmind'
SQLALCHEMY_TRACK_MODIFICATIONS = False
REDIS_URL = os.environ.get('REDIS_URL') or 'redis://redis:6379/0'
JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') or 'super-secret-jwt'
JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1)
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)
CORS_HEADERS = 'Content-Type,Authorization'

# APScheduler configuration
SCHEDULER_API_ENABLED = True
SCHEDULER_JOBSTORES = {
'default': {'type': 'sqlalchemy', 'url': SQLALCHEMY_DATABASE_URI}
}
SCHEDULER_EXECUTORS = {
'default': {'type': 'threadpool', 'max_workers': 20}
}

# Webhook specific configuration
WEBHOOK_MAX_DELIVERY_ATTEMPTS = int(os.environ.get('WEBHOOK_MAX_DELIVERY_ATTEMPTS', 5))
WEBHOOK_INITIAL_RETRY_DELAY_SECONDS = int(os.environ.get('WEBHOOK_INITIAL_RETRY_DELAY_SECONDS', 60)) # 1 minute
WEBHOOK_RETRY_BACKOFF_FACTOR = int(os.environ.get('WEBHOOK_RETRY_BACKOFF_FACTOR', 2)) # Exponential backoff factor

# Third-party service credentials
TWILIO_ACCOUNT_SID = os.environ.get('TWILIO_ACCOUNT_SID')
TWILIO_AUTH_TOKEN = os.environ.get('TWILIO_AUTH_TOKEN')
TWILIO_WHATSAPP_NUMBER = os.environ.get('TWILIO_WHATSAPP_NUMBER')
SENDGRID_API_KEY = os.environ.get('SENDGRID_API_KEY')
SMTP_SERVER = os.environ.get('SMTP_SERVER')
SMTP_PORT = os.environ.get('SMTP_PORT')
SMTP_USERNAME = os.environ.get('SMTP_USERNAME')
SMTP_PASSWORD = os.environ.get('SMTP_PASSWORD')
SMTP_SENDER_EMAIL = os.environ.get('SMTP_SENDER_EMAIL')
OPENAI_API_KEY = os.environ.get('OPENAI_API_KEY')
OPENAI_MODEL = os.environ.get('OPENAI_MODEL', 'gpt-3.5-turbo')

# Flask-Caching config (if used, currently Flask-Redis for cache)
CACHE_TYPE = 'flask_redis'
CACHE_REDIS_URL = REDIS_URL# Database
DATABASE_URL=postgresql://user:password@db:5432/finmind

# Redis
REDIS_URL=redis://redis:6379/0

# Flask
SECRET_KEY=supersecretkeyforexample
FLASK_ENV=development # development or production

# JWT
JWT_SECRET_KEY=jwt_super_secret_key

# Webhook Configuration (optional, defaults are set in config.py)
# WEBHOOK_MAX_DELIVERY_ATTEMPTS=5
# WEBHOOK_INITIAL_RETRY_DELAY_SECONDS=60
# WEBHOOK_RETRY_BACKOFF_FACTOR=2

# Twilio (for WhatsApp reminders)
TWILIO_ACCOUNT_SID=ACxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
TWILIO_AUTH_TOKEN=your_auth_token
TWILIO_WHATSAPP_NUMBER=whatsapp:+14155238886 # Your Twilio Sandbox number

# Email (e.g., SendGrid, Mailgun, or direct SMTP)
# If using SendGrid:
SENDGRID_API_KEY=SG.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# If using direct SMTP:
# SMTP_SERVER=smtp.example.com
# SMTP_PORT=587
# SMTP_USERNAME=your_username
# SMTP_PASSWORD=your_password
# SMTP_SENDER_EMAIL=no-reply@finmind.com

# OpenAI (for insights)
OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

# Gunicorn (for production)
# GUNICORN_BIND=0.0.0.0:8000
# GUNICORN_WORKERS=4
# GUNICORN_TIMEOUT=60
22 changes: 22 additions & 0 deletions backend/app/extensions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from flask_sqlalchemy import SQLAlchemy
from flask_redis import FlaskRedis
from flask_jwt_extended import JWTManager
from flask_apscheduler import APScheduler
import logging

db = SQLAlchemy()
redis_client = FlaskRedis()
jwt = JWTManager()
scheduler = APScheduler()

# Initialize a logger for webhook specific events
webhook_logger = logging.getLogger('finmind.webhooks')
webhook_logger.setLevel(logging.INFO)
# Prevent propagation to the root logger if desired, to manage output specifically
# webhook_logger.propagate = False
# Example handler (could be configured to file, console, etc. in app setup)
if not webhook_logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
webhook_logger.addHandler(handler)
69 changes: 69 additions & 0 deletions backend/app/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from datetime import datetime
from app.extensions import db
from sqlalchemy.dialects.postgresql import JSONB
import uuid

class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
role = db.Column(db.String(20), default='user')
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

webhook_endpoints = db.relationship('WebhookEndpoint', backref='user', lazy=True, cascade='all, delete-orphan')

class WebhookEndpoint(db.Model):
__tablename__ = 'webhook_endpoints'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id', ondelete='CASCADE'), nullable=False)
url = db.Column(db.String(255), nullable=False)
secret = db.Column(db.String(255), nullable=False)
event_types = db.Column(JSONB, default=[]) # JSON array of event types
is_active = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

delivery_attempts = db.relationship('WebhookDeliveryAttempt', backref='endpoint', lazy=True, cascade='all, delete-orphan')

def to_dict(self):
return {
'id': self.id,
'user_id': self.user_id,
'url': self.url,
'event_types': self.event_types,
'is_active': self.is_active,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None
}

class WebhookDeliveryAttempt(db.Model):
__tablename__ = 'webhook_delivery_attempts'
id = db.Column(db.Integer, primary_key=True)
endpoint_id = db.Column(db.Integer, db.ForeignKey('webhook_endpoints.id', ondelete='CASCADE'), nullable=False)
event_type = db.Column(db.String(100), nullable=False)
event_id = db.Column(db.String(36), default=lambda: str(uuid.uuid4()), nullable=False) # UUID for the specific event instance
payload = db.Column(db.Text, nullable=False) # JSON payload as string
attempt_number = db.Column(db.Integer, default=1)
status_code = db.Column(db.Integer)
response_body = db.Column(db.Text)
error_message = db.Column(db.Text)
next_attempt_at = db.Column(db.DateTime)
is_successful = db.Column(db.Boolean, default=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
completed_at = db.Column(db.DateTime)

def to_dict(self):
return {
'id': self.id,
'endpoint_id': self.endpoint_id,
'event_type': self.event_type,
'event_id': self.event_id,
'attempt_number': self.attempt_number,
'status_code': self.status_code,
'is_successful': self.is_successful,
'created_at': self.created_at.isoformat() if self.created_at else None,
'completed_at': self.completed_at.isoformat() if self.completed_at else None
}
Loading