Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 84 additions & 53 deletions hookwise/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,69 @@ def _bp() -> Any:
def _register() -> None:
from .routes import main_bp

_register_login_routes(main_bp)
_register_2fa_routes(main_bp)


def _handle_otp_login(pending_user_id: int) -> Any:
"""Handle OTP submission during login."""
otp = request.form.get("otp")
user = User.query.get(pending_user_id)

if user and pyotp.TOTP(cast(str, user.otp_secret)).verify(cast(str, otp)):
# Success
session["user_id"] = user.id
session["username"] = user.username
session["role"] = user.role
session.pop("pending_user_id", None)
log_audit("login_2fa", None, f"User {user.username} logged in with 2FA")
return redirect(url_for("main.index"))

flash("Invalid 2FA code", "danger")
return render_template("login.html", step="2fa")


def _handle_credential_login() -> Any:
"""Handle username/password submission during login."""
username = request.form.get("username")
password = request.form.get("password")

user = User.query.filter_by(username=username).first()
if user and check_password_hash(cast(str, user.password_hash), cast(str, password)):
if user.is_2fa_enabled:
session["pending_user_id"] = user.id
return render_template("login.html", step="2fa")

session.clear()
session["user_id"] = user.id
session["username"] = user.username
session["role"] = user.role
log_audit("login", None, f"User {username} logged in")
return redirect(url_for("main.index"))

flash("Invalid username or password", "danger")
return None


def _handle_2fa_setup_post(user: User) -> Any:
"""Handle OTP verification during 2FA setup."""
otp = request.form.get("otp")
secret = session.get("pending_otp_secret")
if secret and pyotp.TOTP(cast(str, secret)).verify(cast(str, otp)):
user.otp_secret = secret
user.is_2fa_enabled = True
db.session.commit()
session.pop("pending_otp_secret")
log_audit("2fa_enabled", None, f"User {user.username} enabled 2FA")
flash("2FA has been enabled successfully!", "success")
return redirect(url_for("main.settings"))
flash("Invalid 2FA code", "danger")
return None


def _register_login_routes(main_bp: Any) -> None:
"""Register login and logout routes."""

@main_bp.route("/login", methods=["GET", "POST"])
@limiter.limit("5 per minute")
def login() -> Any:
Expand All @@ -37,49 +100,32 @@ def login() -> Any:
if request.method == "POST":
# Case 1: Submitting OTP (User is in pending state)
if pending_user_id and "otp" in request.form:
otp = request.form.get("otp")
user = User.query.get(pending_user_id)

if user and pyotp.TOTP(cast(str, user.otp_secret)).verify(cast(str, otp)):
# Success
session["user_id"] = user.id
session["username"] = user.username
session["role"] = user.role
session.pop("pending_user_id", None)
log_audit("login_2fa", None, f"User {user.username} logged in with 2FA")
return redirect(url_for("main.index"))

flash("Invalid 2FA code", "danger")
return render_template("login.html", step="2fa")
return _handle_otp_login(pending_user_id)

# Case 2: Submitting Credentials or restarting flow
# If attempting to login with new creds, clear old pending state
if pending_user_id:
session.pop("pending_user_id", None)

username = request.form.get("username")
password = request.form.get("password")
result = _handle_credential_login()
if result:
return result

user = User.query.filter_by(username=username).first()
if user and check_password_hash(cast(str, user.password_hash), cast(str, password)):
if user.is_2fa_enabled:
session["pending_user_id"] = user.id
return render_template("login.html", step="2fa")
# GET request or failed POST
if request.method == "GET" and "pending_user_id" in session:
session.pop("pending_user_id", None)

session.clear()
session["user_id"] = user.id
session["username"] = user.username
session["role"] = user.role
log_audit("login", None, f"User {username} logged in")
return redirect(url_for("main.index"))
return render_template("login.html")

flash("Invalid username or password", "danger")
@main_bp.route("/logout")
def logout() -> Any:
username = session.get("username")
session.clear()
log_audit("logout", None, f"User {username} logged out")
return redirect(url_for("main.login"))

# GET request - always reset pending state to ensure clean login flow
if "pending_user_id" in session:
session.pop("pending_user_id", None)

return render_template("login.html")
def _register_2fa_routes(main_bp: Any) -> None:
"""Register 2FA related routes."""

@main_bp.route("/settings/2fa/setup", methods=["GET", "POST"])
@auth_required
Expand All @@ -90,19 +136,11 @@ def setup_2fa() -> Any:
return redirect(url_for("main.settings"))

if request.method == "POST":
otp = request.form.get("otp")
secret = session.get("pending_otp_secret")
if secret and pyotp.TOTP(cast(str, secret)).verify(cast(str, otp)):
user.otp_secret = secret
user.is_2fa_enabled = True
db.session.commit()
session.pop("pending_otp_secret")
log_audit("2fa_enabled", None, f"User {user.username} enabled 2FA")
flash("2FA has been enabled successfully!", "success")
return redirect(url_for("main.settings"))
flash("Invalid 2FA code", "danger")

# GET: Generate secret and QR code
result = _handle_2fa_setup_post(user)
if result:
return result

# GET or failed POST: Generate secret and QR code
secret = pyotp.random_base32()
session["pending_otp_secret"] = secret
totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(name=user.username, issuer_name="HookWise")
Expand All @@ -125,12 +163,5 @@ def disable_2fa() -> Any:
flash("2FA has been disabled.", "warning")
return redirect(url_for("main.settings"))

@main_bp.route("/logout")
def logout() -> Any:
username = session.get("username")
session.clear()
log_audit("logout", None, f"User {username} logged out")
return redirect(url_for("main.login"))


_register()