Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions backend/src/compliance.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::api_error::ApiError;
use crate::external_integrations::{AnchorIntegrationClient, ComplianceApiClient};
use crate::external_integrations::{
AnchorIntegrationClient, ComplianceApiClient, SanctionsApiClient,
};
use crate::notifications::{
audit_action, entity_type, notif_type, AuditLogService, NotificationService,
};
Expand All @@ -16,6 +18,7 @@ pub struct ComplianceEngine {
velocity_window_mins: i64, // e.g., 10 minutes
pub volume_threshold: Decimal, // e.g., $100k
compliance_api_client: Option<ComplianceApiClient>,
sanctions_client: Option<SanctionsApiClient>,
anchor_client: Option<AnchorIntegrationClient>,
}

Expand All @@ -32,6 +35,7 @@ impl ComplianceEngine {
velocity_window_mins,
volume_threshold,
compliance_api_client: ComplianceApiClient::from_env(),
sanctions_client: SanctionsApiClient::from_env(),
anchor_client: AnchorIntegrationClient::from_env(),
}
}
Expand All @@ -49,7 +53,11 @@ impl ComplianceEngine {
}

pub async fn scan_suspicious_activity(&self) -> Result<(), ApiError> {
info!("Compliance Engine: Scanning for suspicious borrowing patterns...");
info!("Compliance Engine: Scanning for suspicious borrowing patterns and sanctions screening...");

if let Err(e) = self.run_sanctions_screening().await {
warn!(error = %e, "Sanctions screening failed; continuing with internal compliance checks");
}

// 1. Detect High Velocity Borrowing
self.detect_high_velocity().await?;
Expand Down Expand Up @@ -177,6 +185,56 @@ impl ComplianceEngine {
Ok(())
}

async fn run_sanctions_screening(&self) -> Result<(), ApiError> {
let client = match &self.sanctions_client {
Some(client) => client,
None => return Ok(()),
};

#[derive(sqlx::FromRow)]
struct SanctionsCandidate {
plan_id: Uuid,
user_id: Uuid,
email: String,
wallet_address: Option<String>,
}

let candidates = sqlx::query_as::<_, SanctionsCandidate>(
r#"
SELECT p.id as plan_id, u.id as user_id, u.email, u.wallet_address
FROM plans p
JOIN users u ON u.id = p.user_id
WHERE NOT p.is_flagged
"#,
)
.fetch_all(&self.db)
.await?;

for candidate in candidates {
if candidate.email.is_empty() && candidate.wallet_address.is_none() {
continue;
}

if let Ok(Some(match_reason)) = client
.screen_user(
candidate.user_id,
&candidate.email,
candidate.wallet_address.as_deref(),
)
.await
{
self.flag_plan(
candidate.plan_id,
candidate.user_id,
format!("Sanctions screening hit: {}", match_reason),
)
.await?;
}
}

Ok(())
}

async fn flag_plan(
&self,
plan_id: Uuid,
Expand Down
100 changes: 99 additions & 1 deletion backend/src/external_integrations.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::api_error::ApiError;
use crate::circuit_breaker::CircuitBreaker;
use reqwest::Client;
use serde::Serialize;
use reqwest::header::AUTHORIZATION;
use serde::{Deserialize, Serialize};
use std::time::Duration;

#[derive(Clone)]
Expand All @@ -18,13 +19,34 @@ pub struct ComplianceApiClient {
circuit_breaker: CircuitBreaker,
}

#[derive(Clone)]
pub struct SanctionsApiClient {
client: Client,
base_url: String,
api_key: String,
circuit_breaker: CircuitBreaker,
}

#[derive(Debug, Serialize)]
struct ComplianceFlagPayload {
plan_id: uuid::Uuid,
user_id: uuid::Uuid,
reason: String,
}

#[derive(Debug, Serialize)]
struct SanctionsScreenPayload<'a> {
user_id: uuid::Uuid,
email: &'a str,
wallet_address: Option<&'a str>,
}

#[derive(Debug, Deserialize)]
struct SanctionsScreenResponse {
flagged: bool,
reason: Option<String>,
}

impl AnchorIntegrationClient {
pub fn from_env() -> Option<Self> {
let base_url = std::env::var("ANCHOR_INTEGRATION_URL").ok()?;
Expand Down Expand Up @@ -153,6 +175,82 @@ impl ComplianceApiClient {
}
}

impl SanctionsApiClient {
pub fn from_env() -> Option<Self> {
let base_url = std::env::var("SANCTIONS_API_URL").ok()?;
let api_key = std::env::var("SANCTIONS_API_KEY").ok()?;
let failure_threshold = read_u32("CB_SANCTIONS_FAILURE_THRESHOLD", 5);
let recovery_timeout = read_u64("CB_SANCTIONS_RECOVERY_TIMEOUT_SECS", 30);

Some(Self {
client: Client::new(),
base_url,
api_key,
circuit_breaker: CircuitBreaker::new(
"sanctions_api",
failure_threshold,
Duration::from_secs(recovery_timeout),
),
})
}

pub async fn screen_user(
&self,
user_id: uuid::Uuid,
email: &str,
wallet_address: Option<&str>,
) -> Result<Option<String>, ApiError> {
let url = format!(
"{}/v1/sanctions/screen",
self.base_url.trim_end_matches('/')
);
let payload = SanctionsScreenPayload {
user_id,
email,
wallet_address,
};

self.circuit_breaker
.call(|| async {
let response = self
.client
.post(&url)
.timeout(Duration::from_secs(10))
.header(AUTHORIZATION, format!("Bearer {}", self.api_key))
.json(&payload)
.send()
.await
.map_err(|e| {
if e.is_timeout() {
ApiError::Timeout
} else {
ApiError::ExternalService(format!("Sanctions API request failed: {e}"))
}
})?;

if !response.status().is_success() {
return Err(ApiError::ExternalService(format!(
"Sanctions API returned status {}",
response.status()
)));
}

let screen_result: SanctionsScreenResponse = response.json().await.map_err(|e| {
ApiError::ExternalService(format!("Sanctions API response parse failed: {e}"))
})?;

if screen_result.flagged {
Ok(Some(screen_result.reason.unwrap_or_else(|| {
"Sanctions list match detected".to_string()
})))
} else {
Ok(None)
}
})
.await
}
}

fn read_u32(name: &str, default: u32) -> u32 {
std::env::var(name)
.ok()
Expand Down