From c23eccf67bdc17d8f71413615c8b8bd155af9967 Mon Sep 17 00:00:00 2001 From: Vaibhavgoel Date: Mon, 18 May 2026 16:51:54 +0530 Subject: [PATCH] implement_redis_kafka_rate_limit --- app.py | 48 +++++++++++++++++++++++++++++++++- routes/main_routes.py | 56 +++++++++++++++++++++++++++++++++++++++- utils/kafka_producer.py | 18 +++++++++++++ utils/redis_client.py | 10 +++++++ worker/kafka_consumer.py | 43 ++++++++++++++++++++++++++++++ 5 files changed, 173 insertions(+), 2 deletions(-) create mode 100644 utils/kafka_producer.py create mode 100644 utils/redis_client.py create mode 100644 worker/kafka_consumer.py diff --git a/app.py b/app.py index d7892bd..3cdc2cb 100644 --- a/app.py +++ b/app.py @@ -10,11 +10,57 @@ # Business logic, recommendation scoring, and data loading all live in # the utils/ and routes/ packages, not here. -from flask import Flask, render_template +from flask import Flask, render_template, request, jsonify from routes.main_routes import main +import time +from utils.redis_client import get_redis_client app = Flask(__name__) +WINDOW_SIZE_IN_SECONDS = 60 +MAX_REQUESTS = 100 + + +@app.before_request +def rate_limit(): + if request.path.startswith('/api'): + r = get_redis_client() + ip = request.headers.get('X-Forwarded-For', request.remote_addr) or 'anonymous' + + redis_key = f"rate_limit:{ip}" + now = int(time.time() * 1000) + clear_before = now - (WINDOW_SIZE_IN_SECONDS * 1000) + + try: + pipe = r.pipeline() + pipe.zremrangebyscore(redis_key, 0, clear_before) + pipe.zadd(redis_key, {str(now): now}) # Add current hit + pipe.zcard(redis_key) # Get current window count + pipe.expire(redis_key, WINDOW_SIZE_IN_SECONDS) # Slide window TTL + results = pipe.execute() + + request_count = results[2] + remaining = max(0, MAX_REQUESTS - request_count) + + # Injecting standards compliance limit attributes into the request context + request.rate_limit_headers = { + 'X-RateLimit-Limit': str(MAX_REQUESTS), + 'X-RateLimit-Remaining': str(remaining) + } + + if request_count > MAX_REQUESTS: + response = jsonify({'error': 'Too Many Requests', 'message': 'Rate limit exceeded.'}) + response.status_code = 429 + response.headers.update(request.rate_limit_headers) + response.headers['Retry-After'] = str(WINDOW_SIZE_IN_SECONDS) + return response + + except Exception as e: + app.logger.error(f"Rate Limiter Error: {e}") + # Fail-open design option so down-stream clients don't suffer complete outage + pass + + # Register all routes defined in the main Blueprint app.register_blueprint(main) diff --git a/routes/main_routes.py b/routes/main_routes.py index 4cce046..4b0ba2f 100644 --- a/routes/main_routes.py +++ b/routes/main_routes.py @@ -4,15 +4,69 @@ # and returns a response. No business logic lives here. from flask import Blueprint, render_template, request, jsonify, send_from_directory, abort - +import time from utils.recommender import get_recommendations, validate_recommendation_inputs from utils.data_loader import find_project_by_id, get_project_stats from utils.file_server import read_starter_code, resolve_starter_file, get_starter_code_dir +from utils.redis_client import get_redis_client +from utils.kafka_producer import get_kafka_producer + # Create the Blueprint that app.py will register main = Blueprint("main", __name__) +@main.route('/api/suggestions', methods=['GET']) +def get_suggestions(): + user_id = request.args.get('userId') + if not user_id: + return jsonify({'error': 'Missing userId parameter'}), 400 + + try: + r = get_redis_client() + redis_key = f"user:history:{user_id}" + + # Pull top 20 interactive IDs by descending score sorting logic + raw_history = r.zrevrange(redis_key, 0, 19) + + response = jsonify({'success': True, 'suggestions': raw_history}) + if hasattr(request, 'rate_limit_headers'): + response.headers.update(request.rate_limit_headers) + return response + + except Exception as e: + return jsonify({'error': 'Internal Caching Failure', 'details': str(e)}), 500 + +@main.route('/api/suggestions', methods=['POST']) +def log_interaction(): + data = request.get_json() or {} + user_id = data.get('userId') + project_id = data.get('projectId') + interaction_type = data.get('interactionType', 'view') + + if not user_id or not project_id: + return jsonify({'error': 'Missing required payload'}), 400 + + try: + producer = get_kafka_producer() + payload = { + 'userId': user_id, + 'projectId': project_id, + 'interactionType': interaction_type, + 'timestamp': int(time.time() * 1000) + } + + # Unblocking asynchronous push operation offloaded to broker topology + producer.send('user-activity', key=user_id, value=payload) + + response = jsonify({'success': True, 'message': 'Interaction logged.'}) + if hasattr(request, 'rate_limit_headers'): + response.headers.update(request.rate_limit_headers) + return response + + except Exception as e: + return jsonify({'error': 'Kafka Event Logging Pipeline Failure', 'details': str(e)}), 500 + @main.route("/") def index(): """Render the homepage with the skill input form and dynamic stats.""" diff --git a/utils/kafka_producer.py b/utils/kafka_producer.py new file mode 100644 index 0000000..7d1261d --- /dev/null +++ b/utils/kafka_producer.py @@ -0,0 +1,18 @@ +import os +import json +from kafka import KafkaProducer + +KAFKA_BROKERS = os.getenv("KAFKA_BROKERS", "localhost:9092") + +_producer = None + +def get_kafka_producer(): + global _producer + if _producer is None: + _producer = KafkaProducer( + bootstrap_servers=[KAFKA_BROKERS], + value_serializer=lambda v: json.dumps(v).encode("utf-8"), + key_serializer=lambda k: k.encode('utf-8') if k else None + ) + return _producer + diff --git a/utils/redis_client.py b/utils/redis_client.py new file mode 100644 index 0000000..96263e3 --- /dev/null +++ b/utils/redis_client.py @@ -0,0 +1,10 @@ +import os +import redis + +REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379") + +# Thread-safe connection pool for Flask +redis_pool = redis.ConnectionPool.from_url(REDIS_URL, decode_responses=True) + +def get_redis_client(): + return redis.Redis(connection_pool=redis_pool) \ No newline at end of file diff --git a/worker/kafka_consumer.py b/worker/kafka_consumer.py new file mode 100644 index 0000000..ffb851c --- /dev/null +++ b/worker/kafka_consumer.py @@ -0,0 +1,43 @@ +import os +import json +import redis +from kafka import KafkaConsumer + +REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379") +KAFKA_BROKERS = os.getenv("KAFKA_BROKERS", "localhost:9092") + +def run_worker(): + r = redis.from_url(REDIS_URL, decode_responses=True) + + consumer = KafkaConsumer( + 'user-activity', + bootstrap_servers=[KAFKA_BROKERS], + group_id='analytics-group', + value_serializer=lambda v: json.loads(v.decode('utf-8')), + auto_offset_reset='latest' + ) + + print("👷 Background Python Worker running and streaming 'user-activity' channel...") + + for message in consumer: + try: + payload = message.value + user_id = payload['userId'] + project_id = payload['projectId'] + timestamp = payload['timestamp'] + + redis_key = f"user:history:{user_id}" + + # Execute an atomic pipeline to update score and cap the list to the last 20 elements + pipe = r.pipeline() + pipe.zadd(redis_key, {project_id: timestamp}) + pipe.zremrangebyrank(redis_key, 0, -21) + pipe.execute() + + print(f"Processed interaction: User {user_id} -> Project {project_id}") + + except Exception as e: + print(f"Error compiling incoming event instance step data: {e}") + +if __name__ == '__main__': + run_worker() \ No newline at end of file