Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ OpenAPI: `backend/app/openapi.yaml`
- Expenses: CRUD `/expenses`
- Bills: CRUD `/bills`, pay/mark `/bills/{id}/pay`
- Reminders: CRUD `/reminders`, trigger `/reminders/run`
- Webhooks: `/webhooks` for subscription CRUD, plus CLI runner `flask run-webhooks`
- Insights: `/insights/monthly`, `/insights/budget-suggestion`

## MVP UI/UX Plan
Expand Down Expand Up @@ -173,9 +174,70 @@ finmind/
- request count by endpoint/status
- request duration histograms (latency, including dashboard p95 KPI)
- reminder event counters (engagement KPI)
- webhook event counters
- webhook delivery counters + latency histograms
- Logs are emitted as JSON with `request_id` and shipped to Loki via Promtail.
- Pre-provisioned Grafana dashboard: `FinMind Operations and KPI`.

## Outbound Webhooks
FinMind supports signed outbound webhooks for key product events.

### Supported events
- `expense.created`
- `expense.updated`
- `expense.deleted`
- `bill.created`
- `bill.paid`
- `reminder.created`
- `reminder.sent`

### Subscription API
Create a subscription with `POST /webhooks`:

```json
{
"target_url": "https://example.com/hooks/finmind",
"secret": "whsec_test_123",
"description": "Zapier bridge",
"subscribed_events": ["expense.created", "bill.paid"]
}
```

### Delivery semantics
- FinMind uses **at-least-once delivery**.
- Endpoints should treat `X-FinMind-Delivery-Id` as an idempotency key.
- Deliveries are retried with backoff on non-2xx responses and network failures.
- Use `python -m flask --app wsgi:app run-webhooks` (or the `run-webhooks` CLI command) from cron/worker context to process pending deliveries.

### Signature verification
Each request includes:
- `X-FinMind-Event`
- `X-FinMind-Event-Id`
- `X-FinMind-Delivery-Id`
- `X-FinMind-Timestamp`
- `X-FinMind-Signature`

Signature format:
- signed payload = `<timestamp>.<raw_json_body>`
- digest = `HMAC-SHA256(secret, signed payload)`
- header = `sha256=<hex_digest>`

Example verifier:

```python
import hashlib
import hmac


def verify_signature(secret: str, timestamp: str, raw_body: str, received: str) -> bool:
expected = hmac.new(
secret.encode("utf-8"),
f"{timestamp}.{raw_body}".encode("utf-8"),
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", received)
```

## Contribution Policy
- See `CONTRIBUTING.md` for fork-first contribution flow and PR requirements.

Expand Down
13 changes: 13 additions & 0 deletions packages/backend/app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import os
import logging
from datetime import timedelta
from .services.webhooks import run_pending_deliveries


def create_app(settings: Settings | None = None) -> Flask:
Expand Down Expand Up @@ -93,6 +94,18 @@ def init_db():
finally:
conn.close()

@app.cli.command("run-webhooks")
@click.option("--limit", default=100, type=int, show_default=True)
def run_webhooks(limit: int):
"""Process pending outbound webhook deliveries."""
with app.app_context():
result = run_pending_deliveries(limit=limit)
click.echo(
"processed={processed} succeeded={succeeded} retried={retried} failed={failed}".format(
**result
)
)

return app


Expand Down
45 changes: 45 additions & 0 deletions packages/backend/app/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,48 @@ CREATE TABLE IF NOT EXISTS audit_logs (
action VARCHAR(100) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS webhook_subscriptions (
id SERIAL PRIMARY KEY,
user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
target_url VARCHAR(1000) NOT NULL,
secret VARCHAR(255) NOT NULL,
description VARCHAR(255),
active BOOLEAN NOT NULL DEFAULT TRUE,
subscribed_events JSONB NOT NULL DEFAULT '[]'::jsonb,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
last_success_at TIMESTAMP NULL,
last_failure_at TIMESTAMP NULL,
failure_count INT NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_webhook_subscriptions_user_active ON webhook_subscriptions(user_id, active);

CREATE TABLE IF NOT EXISTS webhook_events (
id SERIAL PRIMARY KEY,
user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
event_type VARCHAR(100) NOT NULL,
resource_type VARCHAR(50) NOT NULL,
resource_id INT NULL,
payload_json JSONB NOT NULL,
occurred_at TIMESTAMP NOT NULL DEFAULT NOW(),
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_webhook_events_user_type ON webhook_events(user_id, event_type, occurred_at DESC);

CREATE TABLE IF NOT EXISTS webhook_deliveries (
id SERIAL PRIMARY KEY,
event_id INT NOT NULL REFERENCES webhook_events(id) ON DELETE CASCADE,
subscription_id INT NOT NULL REFERENCES webhook_subscriptions(id) ON DELETE CASCADE,
status VARCHAR(30) NOT NULL,
attempt_count INT NOT NULL DEFAULT 0,
next_attempt_at TIMESTAMP NOT NULL DEFAULT NOW(),
last_attempt_at TIMESTAMP NULL,
last_response_code INT NULL,
last_error TEXT NULL,
last_duration_ms INT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_ready ON webhook_deliveries(status, next_attempt_at);
CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_subscription ON webhook_deliveries(subscription_id, created_at DESC);
60 changes: 60 additions & 0 deletions packages/backend/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,63 @@ class AuditLog(db.Model):
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True)
action = db.Column(db.String(100), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)


class WebhookDeliveryStatus(str, Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
RETRY_SCHEDULED = "retry_scheduled"
SUCCEEDED = "succeeded"
FAILED = "failed"


class WebhookSubscription(db.Model):
__tablename__ = "webhook_subscriptions"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
target_url = db.Column(db.String(1000), nullable=False)
secret = db.Column(db.String(255), nullable=False)
description = db.Column(db.String(255), nullable=True)
active = db.Column(db.Boolean, default=True, nullable=False)
subscribed_events = db.Column(db.JSON, default=list, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
updated_at = db.Column(
db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False
)
last_success_at = db.Column(db.DateTime, nullable=True)
last_failure_at = db.Column(db.DateTime, nullable=True)
failure_count = db.Column(db.Integer, default=0, nullable=False)


class WebhookEvent(db.Model):
__tablename__ = "webhook_events"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
event_type = db.Column(db.String(100), nullable=False)
resource_type = db.Column(db.String(50), nullable=False)
resource_id = db.Column(db.Integer, nullable=True)
payload_json = db.Column(db.JSON, nullable=False)
occurred_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)


class WebhookDelivery(db.Model):
__tablename__ = "webhook_deliveries"
id = db.Column(db.Integer, primary_key=True)
event_id = db.Column(db.Integer, db.ForeignKey("webhook_events.id"), nullable=False)
subscription_id = db.Column(
db.Integer, db.ForeignKey("webhook_subscriptions.id"), nullable=False
)
status = db.Column(
db.String(30), default=WebhookDeliveryStatus.PENDING.value, nullable=False
)
attempt_count = db.Column(db.Integer, default=0, nullable=False)
next_attempt_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
last_attempt_at = db.Column(db.DateTime, nullable=True)
last_response_code = db.Column(db.Integer, nullable=True)
last_error = db.Column(db.Text, nullable=True)
last_duration_ms = db.Column(db.Integer, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
updated_at = db.Column(
db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False
)
62 changes: 62 additions & 0 deletions packages/backend/app/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,25 @@ def __init__(self) -> None:
["event", "channel", "status"],
registry=self.registry,
)
self.webhook_events_total = Counter(
"finmind_webhook_events_total",
"Webhook events emitted by type.",
["event_type"],
registry=self.registry,
)
self.webhook_deliveries_total = Counter(
"finmind_webhook_deliveries_total",
"Webhook delivery attempts by event type and result.",
["event_type", "status_code_class", "result"],
registry=self.registry,
)
self.webhook_delivery_duration_seconds = Histogram(
"finmind_webhook_delivery_duration_seconds",
"Webhook delivery latency in seconds by event type.",
["event_type"],
buckets=(0.01, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10),
registry=self.registry,
)

def observe_http_request(
self, method: str, endpoint: str, status_code: int, duration_seconds: float
Expand All @@ -79,6 +98,25 @@ def record_reminder_event(
event=event, channel=channel, status=status
).inc()

def record_webhook_event(self, event_type: str) -> None:
self.webhook_events_total.labels(event_type=event_type).inc()

def record_webhook_delivery(
self,
event_type: str,
status_code_class: str,
result: str,
duration_seconds: float,
) -> None:
self.webhook_deliveries_total.labels(
event_type=event_type,
status_code_class=status_code_class,
result=result,
).inc()
self.webhook_delivery_duration_seconds.labels(event_type=event_type).observe(
duration_seconds
)

def metrics_response(self) -> Response:
if self.multiprocess_enabled:
registry = CollectorRegistry()
Expand Down Expand Up @@ -137,3 +175,27 @@ def track_reminder_event(event: str, channel: str, status: str = "ok") -> None:
obs = current_app.extensions.get("observability")
if obs:
obs.record_reminder_event(event=event, channel=channel, status=status)


def track_webhook_event(event_type: str) -> None:
obs = current_app.extensions.get("observability")
if obs:
obs.record_webhook_event(event_type=event_type)


def track_webhook_delivery(
event_type: str, response_code: int | None, result: str, duration_ms: int
) -> None:
obs = current_app.extensions.get("observability")
if not obs:
return
if response_code is None:
status_class = "error"
else:
status_class = f"{int(response_code) // 100}xx"
obs.record_webhook_delivery(
event_type=event_type,
status_code_class=status_class,
result=result,
duration_seconds=max(duration_ms, 0) / 1000.0,
)
Loading