diff --git a/hookwise/auth.py b/hookwise/auth.py index 0389d54..b8cbf7c 100644 --- a/hookwise/auth.py +++ b/hookwise/auth.py @@ -28,6 +28,69 @@ def _bp() -> Any: def _register() -> None: from .routes import main_bp + _register_login_routes(main_bp) + _register_2fa_routes(main_bp) + + +def _handle_otp_login(pending_user_id: int) -> Any: + """Handle OTP submission during login.""" + otp = request.form.get("otp") + user = User.query.get(pending_user_id) + + if user and pyotp.TOTP(cast(str, user.otp_secret)).verify(cast(str, otp)): + # Success + session["user_id"] = user.id + session["username"] = user.username + session["role"] = user.role + session.pop("pending_user_id", None) + log_audit("login_2fa", None, f"User {user.username} logged in with 2FA") + return redirect(url_for("main.index")) + + flash("Invalid 2FA code", "danger") + return render_template("login.html", step="2fa") + + +def _handle_credential_login() -> Any: + """Handle username/password submission during login.""" + username = request.form.get("username") + password = request.form.get("password") + + user = User.query.filter_by(username=username).first() + if user and check_password_hash(cast(str, user.password_hash), cast(str, password)): + if user.is_2fa_enabled: + session["pending_user_id"] = user.id + return render_template("login.html", step="2fa") + + session.clear() + session["user_id"] = user.id + session["username"] = user.username + session["role"] = user.role + log_audit("login", None, f"User {username} logged in") + return redirect(url_for("main.index")) + + flash("Invalid username or password", "danger") + return None + + +def _handle_2fa_setup_post(user: User) -> Any: + """Handle OTP verification during 2FA setup.""" + otp = request.form.get("otp") + secret = session.get("pending_otp_secret") + if secret and pyotp.TOTP(cast(str, secret)).verify(cast(str, otp)): + user.otp_secret = secret + user.is_2fa_enabled = True + db.session.commit() + session.pop("pending_otp_secret") + log_audit("2fa_enabled", None, f"User {user.username} enabled 2FA") + flash("2FA has been enabled successfully!", "success") + return redirect(url_for("main.settings")) + flash("Invalid 2FA code", "danger") + return None + + +def _register_login_routes(main_bp: Any) -> None: + """Register login and logout routes.""" + @main_bp.route("/login", methods=["GET", "POST"]) @limiter.limit("5 per minute") def login() -> Any: @@ -37,49 +100,32 @@ def login() -> Any: if request.method == "POST": # Case 1: Submitting OTP (User is in pending state) if pending_user_id and "otp" in request.form: - otp = request.form.get("otp") - user = User.query.get(pending_user_id) - - if user and pyotp.TOTP(cast(str, user.otp_secret)).verify(cast(str, otp)): - # Success - session["user_id"] = user.id - session["username"] = user.username - session["role"] = user.role - session.pop("pending_user_id", None) - log_audit("login_2fa", None, f"User {user.username} logged in with 2FA") - return redirect(url_for("main.index")) - - flash("Invalid 2FA code", "danger") - return render_template("login.html", step="2fa") + return _handle_otp_login(pending_user_id) # Case 2: Submitting Credentials or restarting flow - # If attempting to login with new creds, clear old pending state if pending_user_id: session.pop("pending_user_id", None) - username = request.form.get("username") - password = request.form.get("password") + result = _handle_credential_login() + if result: + return result - user = User.query.filter_by(username=username).first() - if user and check_password_hash(cast(str, user.password_hash), cast(str, password)): - if user.is_2fa_enabled: - session["pending_user_id"] = user.id - return render_template("login.html", step="2fa") + # GET request or failed POST + if request.method == "GET" and "pending_user_id" in session: + session.pop("pending_user_id", None) - session.clear() - session["user_id"] = user.id - session["username"] = user.username - session["role"] = user.role - log_audit("login", None, f"User {username} logged in") - return redirect(url_for("main.index")) + return render_template("login.html") - flash("Invalid username or password", "danger") + @main_bp.route("/logout") + def logout() -> Any: + username = session.get("username") + session.clear() + log_audit("logout", None, f"User {username} logged out") + return redirect(url_for("main.login")) - # GET request - always reset pending state to ensure clean login flow - if "pending_user_id" in session: - session.pop("pending_user_id", None) - return render_template("login.html") +def _register_2fa_routes(main_bp: Any) -> None: + """Register 2FA related routes.""" @main_bp.route("/settings/2fa/setup", methods=["GET", "POST"]) @auth_required @@ -90,19 +136,11 @@ def setup_2fa() -> Any: return redirect(url_for("main.settings")) if request.method == "POST": - otp = request.form.get("otp") - secret = session.get("pending_otp_secret") - if secret and pyotp.TOTP(cast(str, secret)).verify(cast(str, otp)): - user.otp_secret = secret - user.is_2fa_enabled = True - db.session.commit() - session.pop("pending_otp_secret") - log_audit("2fa_enabled", None, f"User {user.username} enabled 2FA") - flash("2FA has been enabled successfully!", "success") - return redirect(url_for("main.settings")) - flash("Invalid 2FA code", "danger") - - # GET: Generate secret and QR code + result = _handle_2fa_setup_post(user) + if result: + return result + + # GET or failed POST: Generate secret and QR code secret = pyotp.random_base32() session["pending_otp_secret"] = secret totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(name=user.username, issuer_name="HookWise") @@ -125,12 +163,5 @@ def disable_2fa() -> Any: flash("2FA has been disabled.", "warning") return redirect(url_for("main.settings")) - @main_bp.route("/logout") - def logout() -> Any: - username = session.get("username") - session.clear() - log_audit("logout", None, f"User {username} logged out") - return redirect(url_for("main.login")) - _register()