Files

175 lines
8.8 KiB
Python

"""Secure API fixture demonstrating proper defensive patterns.
This file contains well-secured API endpoints that should produce
minimal findings when scanned by TMT. Used to validate that scanners
do not generate excessive false positives.
"""
import secrets
from datetime import datetime, timedelta, timezone
from functools import wraps
from flask import Flask, request, jsonify, session
from flask_limiter import Limiter
from werkzeug.security import generate_password_hash, check_password_hash
app = Flask(__name__)
app.secret_key = secrets.token_hex(32)
# ──────────────────────────────────────────────────────────────────────────────
# Secure session configuration
# ──────────────────────────────────────────────────────────────────────────────
app.config["SESSION_COOKIE_SECURE"] = True
app.config["SESSION_COOKIE_HTTPONLY"] = True
app.config["SESSION_COOKIE_SAMESITE"] = "Lax"
# ──────────────────────────────────────────────────────────────────────────────
# Rate limiter setup
# ──────────────────────────────────────────────────────────────────────────────
limiter = Limiter(app=app, default_limits=["100 per hour"])
# ──────────────────────────────────────────────────────────────────────────────
# Strict CORS with explicit origin
# ──────────────────────────────────────────────────────────────────────────────
ALLOWED_ORIGINS = ["https://app.example.com"]
@app.after_request
def add_cors(response):
"""Add CORS headers with explicit origin allowlist."""
origin = request.headers.get("Origin", "")
if origin in ALLOWED_ORIGINS:
response.headers["Access-Control-Allow-Origin"] = origin
return response
# ──────────────────────────────────────────────────────────────────────────────
# Authentication decorator with login_required check
# ──────────────────────────────────────────────────────────────────────────────
def login_required(f):
"""Decorator that enforces authentication on protected routes."""
@wraps(f)
def decorated(*args, **kwargs):
"""Check session for authenticated user before proceeding."""
if "user_id" not in session:
return jsonify({"error": "Unauthorized"}), 401
return f(*args, **kwargs)
return decorated
# ──────────────────────────────────────────────────────────────────────────────
# Secure login with bcrypt-equivalent hashing and session regeneration
# ──────────────────────────────────────────────────────────────────────────────
@app.post("/api/login")
@limiter.limit("5 per minute")
def login():
"""Authenticate with rate limiting and session regeneration."""
schema = LoginSchema()
data = schema.validate(request.json)
user = db.users.find_one({"email": data["email"]})
if user and check_password_hash(user["password"], data["password"]):
session.regenerate()
session["user_id"] = str(user["_id"])
return jsonify({"status": "ok"})
return jsonify({"error": "Invalid credentials"}), 401
# ──────────────────────────────────────────────────────────────────────────────
# Secure invite with rate limit, expiry, and single-use enforcement
# ──────────────────────────────────────────────────────────────────────────────
@app.post("/api/invite")
@login_required
@limiter.limit("5 per hour")
def generate_invite():
"""Generate a time-limited, single-use invitation token."""
token = secrets.token_urlsafe(32)
expires_at = datetime.now(timezone.utc) + timedelta(hours=72)
db.invites.insert_one(
{
"token": token,
"created_by": session["user_id"],
"expires_at": expires_at,
"is_used": False,
"idempotency_key": request.headers.get("Idempotency-Key"),
}
)
return jsonify({"invite_token": token})
# ──────────────────────────────────────────────────────────────────────────────
# Atomic invite acceptance with transaction and single-use mark
# ──────────────────────────────────────────────────────────────────────────────
@app.post("/api/accept-invite")
@limiter.limit("10 per hour")
def accept_invite():
"""Accept an invitation atomically with single-use enforcement."""
schema = AcceptInviteSchema()
data = schema.validate(request.json)
with db.transaction():
invite = db.invites.find_one_and_update(
{
"token": data["token"],
"is_used": False,
"expires_at": {"$gt": datetime.now(timezone.utc)},
},
{"$set": {"is_used": True, "used_at": datetime.now(timezone.utc)}},
)
if not invite:
return jsonify({"error": "Invalid or expired invite"}), 400
db.users.insert_one({"email": data["email"], "role": "member"})
return jsonify({"status": "account created"})
# ──────────────────────────────────────────────────────────────────────────────
# Atomic balance transfer with select_for_update
# ──────────────────────────────────────────────────────────────────────────────
@app.post("/api/transfer")
@login_required
@limiter.limit("20 per hour")
def transfer():
"""Transfer balance atomically with proper locking."""
schema = TransferSchema()
data = schema.validate(request.json)
idempotency_key = request.headers.get("Idempotency-Key")
with db.transaction():
sender = db.accounts.find_one_and_update(
{"user_id": session["user_id"], "balance": {"$gte": data["amount"]}},
{"$inc": {"balance": -data["amount"]}},
)
if not sender:
return jsonify({"error": "Insufficient funds"}), 400
db.accounts.update(
{"user_id": data["to"]}, {"$inc": {"balance": data["amount"]}}
)
return jsonify({"status": "transferred"})
# ──────────────────────────────────────────────────────────────────────────────
# Secure logout with session destruction
# ──────────────────────────────────────────────────────────────────────────────
@app.post("/api/logout")
@login_required
def logout():
"""Destroy session and invalidate tokens on logout."""
user_id = session["user_id"]
db.tokens.delete_many({"user_id": user_id})
session.clear()
return jsonify({"status": "logged out"})