mirror of
https://github.com/mytechnotalent/Threat-Modeling-Toolkit.git
synced 2026-03-31 21:10:15 +02:00
175 lines
8.8 KiB
Python
175 lines
8.8 KiB
Python
"""Secure API fixture demonstrating proper defensive patterns.
|
|
|
|
This file contains well-secured API endpoints that should produce
|
|
minimal findings when scanned by TMT. Used to validate that scanners
|
|
do not generate excessive false positives.
|
|
"""
|
|
|
|
import secrets
|
|
from datetime import datetime, timedelta, timezone
|
|
from functools import wraps
|
|
|
|
from flask import Flask, request, jsonify, session
|
|
from flask_limiter import Limiter
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = secrets.token_hex(32)
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# Secure session configuration
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
app.config["SESSION_COOKIE_SECURE"] = True
|
|
app.config["SESSION_COOKIE_HTTPONLY"] = True
|
|
app.config["SESSION_COOKIE_SAMESITE"] = "Lax"
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# Rate limiter setup
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
limiter = Limiter(app=app, default_limits=["100 per hour"])
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# Strict CORS with explicit origin
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
ALLOWED_ORIGINS = ["https://app.example.com"]
|
|
|
|
|
|
@app.after_request
|
|
def add_cors(response):
|
|
"""Add CORS headers with explicit origin allowlist."""
|
|
origin = request.headers.get("Origin", "")
|
|
if origin in ALLOWED_ORIGINS:
|
|
response.headers["Access-Control-Allow-Origin"] = origin
|
|
return response
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# Authentication decorator with login_required check
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def login_required(f):
|
|
"""Decorator that enforces authentication on protected routes."""
|
|
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
"""Check session for authenticated user before proceeding."""
|
|
if "user_id" not in session:
|
|
return jsonify({"error": "Unauthorized"}), 401
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# Secure login with bcrypt-equivalent hashing and session regeneration
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@app.post("/api/login")
|
|
@limiter.limit("5 per minute")
|
|
def login():
|
|
"""Authenticate with rate limiting and session regeneration."""
|
|
schema = LoginSchema()
|
|
data = schema.validate(request.json)
|
|
user = db.users.find_one({"email": data["email"]})
|
|
if user and check_password_hash(user["password"], data["password"]):
|
|
session.regenerate()
|
|
session["user_id"] = str(user["_id"])
|
|
return jsonify({"status": "ok"})
|
|
return jsonify({"error": "Invalid credentials"}), 401
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# Secure invite with rate limit, expiry, and single-use enforcement
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@app.post("/api/invite")
|
|
@login_required
|
|
@limiter.limit("5 per hour")
|
|
def generate_invite():
|
|
"""Generate a time-limited, single-use invitation token."""
|
|
token = secrets.token_urlsafe(32)
|
|
expires_at = datetime.now(timezone.utc) + timedelta(hours=72)
|
|
db.invites.insert_one(
|
|
{
|
|
"token": token,
|
|
"created_by": session["user_id"],
|
|
"expires_at": expires_at,
|
|
"is_used": False,
|
|
"idempotency_key": request.headers.get("Idempotency-Key"),
|
|
}
|
|
)
|
|
return jsonify({"invite_token": token})
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# Atomic invite acceptance with transaction and single-use mark
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@app.post("/api/accept-invite")
|
|
@limiter.limit("10 per hour")
|
|
def accept_invite():
|
|
"""Accept an invitation atomically with single-use enforcement."""
|
|
schema = AcceptInviteSchema()
|
|
data = schema.validate(request.json)
|
|
with db.transaction():
|
|
invite = db.invites.find_one_and_update(
|
|
{
|
|
"token": data["token"],
|
|
"is_used": False,
|
|
"expires_at": {"$gt": datetime.now(timezone.utc)},
|
|
},
|
|
{"$set": {"is_used": True, "used_at": datetime.now(timezone.utc)}},
|
|
)
|
|
if not invite:
|
|
return jsonify({"error": "Invalid or expired invite"}), 400
|
|
db.users.insert_one({"email": data["email"], "role": "member"})
|
|
return jsonify({"status": "account created"})
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# Atomic balance transfer with select_for_update
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@app.post("/api/transfer")
|
|
@login_required
|
|
@limiter.limit("20 per hour")
|
|
def transfer():
|
|
"""Transfer balance atomically with proper locking."""
|
|
schema = TransferSchema()
|
|
data = schema.validate(request.json)
|
|
idempotency_key = request.headers.get("Idempotency-Key")
|
|
with db.transaction():
|
|
sender = db.accounts.find_one_and_update(
|
|
{"user_id": session["user_id"], "balance": {"$gte": data["amount"]}},
|
|
{"$inc": {"balance": -data["amount"]}},
|
|
)
|
|
if not sender:
|
|
return jsonify({"error": "Insufficient funds"}), 400
|
|
db.accounts.update(
|
|
{"user_id": data["to"]}, {"$inc": {"balance": data["amount"]}}
|
|
)
|
|
return jsonify({"status": "transferred"})
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
# Secure logout with session destruction
|
|
# ──────────────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@app.post("/api/logout")
|
|
@login_required
|
|
def logout():
|
|
"""Destroy session and invalidate tokens on logout."""
|
|
user_id = session["user_id"]
|
|
db.tokens.delete_many({"user_id": user_id})
|
|
session.clear()
|
|
return jsonify({"status": "logged out"})
|