Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,57 @@
# Business logic, recommendation scoring, and data loading all live in
# the utils/ and routes/ packages, not here.

from flask import Flask, render_template
from flask import Flask, render_template, request, jsonify
from routes.main_routes import main
import time
from utils.redis_client import get_redis_client

app = Flask(__name__)

WINDOW_SIZE_IN_SECONDS = 60
MAX_REQUESTS = 100


@app.before_request
def rate_limit():
if request.path.startswith('/api'):
r = get_redis_client()
ip = request.headers.get('X-Forwarded-For', request.remote_addr) or 'anonymous'

redis_key = f"rate_limit:{ip}"
now = int(time.time() * 1000)
clear_before = now - (WINDOW_SIZE_IN_SECONDS * 1000)

try:
pipe = r.pipeline()
pipe.zremrangebyscore(redis_key, 0, clear_before)
pipe.zadd(redis_key, {str(now): now}) # Add current hit
pipe.zcard(redis_key) # Get current window count
pipe.expire(redis_key, WINDOW_SIZE_IN_SECONDS) # Slide window TTL
results = pipe.execute()

request_count = results[2]
remaining = max(0, MAX_REQUESTS - request_count)

# Injecting standards compliance limit attributes into the request context
request.rate_limit_headers = {
'X-RateLimit-Limit': str(MAX_REQUESTS),
'X-RateLimit-Remaining': str(remaining)
}

if request_count > MAX_REQUESTS:
response = jsonify({'error': 'Too Many Requests', 'message': 'Rate limit exceeded.'})
response.status_code = 429
response.headers.update(request.rate_limit_headers)
response.headers['Retry-After'] = str(WINDOW_SIZE_IN_SECONDS)
return response

except Exception as e:
app.logger.error(f"Rate Limiter Error: {e}")
# Fail-open design option so down-stream clients don't suffer complete outage
pass


# Register all routes defined in the main Blueprint
app.register_blueprint(main)

Expand Down
56 changes: 55 additions & 1 deletion routes/main_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,69 @@
# and returns a response. No business logic lives here.

from flask import Blueprint, render_template, request, jsonify, send_from_directory, abort

import time
from utils.recommender import get_recommendations, validate_recommendation_inputs
from utils.data_loader import find_project_by_id, get_project_stats
from utils.file_server import read_starter_code, resolve_starter_file, get_starter_code_dir

from utils.redis_client import get_redis_client
from utils.kafka_producer import get_kafka_producer

# Create the Blueprint that app.py will register
main = Blueprint("main", __name__)


@main.route('/api/suggestions', methods=['GET'])
def get_suggestions():
user_id = request.args.get('userId')
if not user_id:
return jsonify({'error': 'Missing userId parameter'}), 400

try:
r = get_redis_client()
redis_key = f"user:history:{user_id}"

# Pull top 20 interactive IDs by descending score sorting logic
raw_history = r.zrevrange(redis_key, 0, 19)

response = jsonify({'success': True, 'suggestions': raw_history})
if hasattr(request, 'rate_limit_headers'):
response.headers.update(request.rate_limit_headers)
return response

except Exception as e:
return jsonify({'error': 'Internal Caching Failure', 'details': str(e)}), 500

@main.route('/api/suggestions', methods=['POST'])
def log_interaction():
data = request.get_json() or {}
user_id = data.get('userId')
project_id = data.get('projectId')
interaction_type = data.get('interactionType', 'view')

if not user_id or not project_id:
return jsonify({'error': 'Missing required payload'}), 400

try:
producer = get_kafka_producer()
payload = {
'userId': user_id,
'projectId': project_id,
'interactionType': interaction_type,
'timestamp': int(time.time() * 1000)
}

# Unblocking asynchronous push operation offloaded to broker topology
producer.send('user-activity', key=user_id, value=payload)

response = jsonify({'success': True, 'message': 'Interaction logged.'})
if hasattr(request, 'rate_limit_headers'):
response.headers.update(request.rate_limit_headers)
return response

except Exception as e:
return jsonify({'error': 'Kafka Event Logging Pipeline Failure', 'details': str(e)}), 500

@main.route("/")
def index():
"""Render the homepage with the skill input form and dynamic stats."""
Expand Down
18 changes: 18 additions & 0 deletions utils/kafka_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os
import json
from kafka import KafkaProducer

KAFKA_BROKERS = os.getenv("KAFKA_BROKERS", "localhost:9092")

_producer = None

def get_kafka_producer():
global _producer
if _producer is None:
_producer = KafkaProducer(
bootstrap_servers=[KAFKA_BROKERS],
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
return _producer

10 changes: 10 additions & 0 deletions utils/redis_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import os
import redis

REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")

# Thread-safe connection pool for Flask
redis_pool = redis.ConnectionPool.from_url(REDIS_URL, decode_responses=True)

def get_redis_client():
return redis.Redis(connection_pool=redis_pool)
43 changes: 43 additions & 0 deletions worker/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import os
import json
import redis
from kafka import KafkaConsumer

REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
KAFKA_BROKERS = os.getenv("KAFKA_BROKERS", "localhost:9092")

def run_worker():
r = redis.from_url(REDIS_URL, decode_responses=True)

consumer = KafkaConsumer(
'user-activity',
bootstrap_servers=[KAFKA_BROKERS],
group_id='analytics-group',
value_serializer=lambda v: json.loads(v.decode('utf-8')),
auto_offset_reset='latest'
)

print("👷 Background Python Worker running and streaming 'user-activity' channel...")

for message in consumer:
try:
payload = message.value
user_id = payload['userId']
project_id = payload['projectId']
timestamp = payload['timestamp']

redis_key = f"user:history:{user_id}"

# Execute an atomic pipeline to update score and cap the list to the last 20 elements
pipe = r.pipeline()
pipe.zadd(redis_key, {project_id: timestamp})
pipe.zremrangebyrank(redis_key, 0, -21)
pipe.execute()

print(f"Processed interaction: User {user_id} -> Project {project_id}")

except Exception as e:
print(f"Error compiling incoming event instance step data: {e}")

if __name__ == '__main__':
run_worker()
Loading