AI SaaS Authentication: OAuth & Security
Authentication is the gatekeeper of your AI SaaS. A weak implementation leaks user credentials, allows account takeover, and opens your LLM API to abuse. This article covers OAuth 2.0 for user sign-up, JWT tokens for stateless sessions, secure API key generation, and token refresh strategies that balance security with user experience.
What is OAuth 2.0 and Why Use It?
OAuth 2.0 is an open standard for delegated authorization. Instead of storing user passwords directly, your app redirects users to a trusted identity provider (Google, GitHub, or your own identity server) who authenticates them and returns a token. Your app then uses that token to access protected resources on behalf of the user. This is safer than passwords because the user never enters their password into your app, and you never store plaintext passwords.
Implementing OAuth 2.0 Flow with Authorization Code Grant
The Authorization Code Grant flow is the most common for web applications:
- User clicks "Sign up with Google"
- Your app redirects to Google with your client ID
- Google shows a login screen and consent dialog
- User grants permission; Google redirects back with an authorization code
- Your backend exchanges the code for a token
- Your app creates a session and returns a JWT
# Backend: OAuth 2.0 authorization code grant flow
from fastapi import FastAPI, HTTPException
import httpx
import os
import secrets
import jwt
from datetime import datetime, timedelta
app = FastAPI()
# OAuth configuration
GOOGLE_CLIENT_ID = os.environ["GOOGLE_CLIENT_ID"]
GOOGLE_CLIENT_SECRET = os.environ["GOOGLE_CLIENT_SECRET"]
GOOGLE_REDIRECT_URI = "https://yourdomain.com/api/v1/auth/google/callback"
JWT_SECRET = os.environ["JWT_SECRET"]
@app.get("/api/v1/auth/google/login")
async def initiate_google_login(state: str | None = None):
"""
Redirect user to Google's OAuth endpoint.
The 'state' parameter prevents CSRF attacks.
"""
if not state:
state = secrets.token_urlsafe(32) # Generate random state
google_auth_url = (
"https://accounts.google.com/o/oauth2/v2/auth"
f"?client_id={GOOGLE_CLIENT_ID}"
f"&redirect_uri={GOOGLE_REDIRECT_URI}"
f"&response_type=code"
f"&scope=openid email profile"
f"&state={state}"
)
# Store state in Redis with 10-minute expiry to prevent CSRF
cache.setex(f"oauth_state:{state}", 600, "valid")
return {"redirect_url": google_auth_url}
@app.post("/api/v1/auth/google/callback")
async def handle_google_callback(code: str, state: str):
"""
Google redirects here with authorization code.
Exchange code for tokens and create user session.
"""
# Verify state token to prevent CSRF
if not cache.get(f"oauth_state:{state}"):
raise HTTPException(status_code=400, detail="Invalid or expired state")
# Exchange authorization code for tokens
async with httpx.AsyncClient() as client:
token_response = await client.post(
"https://oauth2.googleapis.com/token",
data={
"client_id": GOOGLE_CLIENT_ID,
"client_secret": GOOGLE_CLIENT_SECRET,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": GOOGLE_REDIRECT_URI
}
)
if token_response.status_code != 200:
raise HTTPException(status_code=400, detail="Failed to exchange code")
tokens = token_response.json()
# Decode Google's ID token (contains user info)
id_token = tokens["id_token"]
user_info = jwt.decode(
id_token,
options={"verify_signature": False} # Normally verify signature in production
)
# Create or update user in database
email = user_info["email"]
user = create_or_get_user(email, user_info["name"])
# Generate your own JWT token
access_token = generate_jwt_token(
user_id=user.id,
organization_id=user.organization_id,
expires_in=3600 # 1 hour
)
refresh_token = generate_refresh_token(user_id=user.id)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "Bearer",
"expires_in": 3600
}
def generate_jwt_token(user_id: str, organization_id: str, expires_in: int) -> str:
"""Create a signed JWT token."""
payload = {
"user_id": user_id,
"organization_id": organization_id,
"exp": datetime.utcnow() + timedelta(seconds=expires_in),
"iat": datetime.utcnow()
}
token = jwt.encode(payload, JWT_SECRET, algorithm="HS256")
return token
Secure API Key Management
API keys allow programmatic access (for third-party integrations or CLI tools). Never store plaintext keys; hash them like passwords.
# API key generation and validation
import secrets
import hashlib
@app.post("/api/v1/api-keys")
async def create_api_key(request: Request, name: str):
"""Generate a new API key for the authenticated user."""
if not request.state.organization_id:
raise HTTPException(status_code=401, detail="Unauthorized")
# Generate a random secret
secret = secrets.token_urlsafe(32)
# Hash the secret (like a password)
secret_hash = hashlib.sha256(secret.encode()).hexdigest()
# Store only the hash in the database
key_record = APIKey(
organization_id=request.state.organization_id,
name=name,
key_hash=secret_hash,
created_at=datetime.utcnow()
)
db.add(key_record)
db.commit()
# Return the secret only once (user must save it)
# Format: sk_<org_id>_<secret>
api_key = f"sk_{request.state.organization_id}_{secret}"
return {
"api_key": api_key,
"warning": "Save this key securely. You won't see it again."
}
def validate_api_key(api_key_string: str) -> dict | None:
"""Validate an API key and return the organization."""
try:
parts = api_key_string.split("_")
if len(parts) < 3 or parts[0] != "sk":
return None
org_id = parts[1]
secret = "_".join(parts[2:])
# Hash the secret and compare with stored hash
secret_hash = hashlib.sha256(secret.encode()).hexdigest()
key_record = db.query(APIKey).filter(
APIKey.key_hash == secret_hash,
APIKey.organization_id == org_id
).first()
if not key_record or not key_record.is_active:
return None
return {
"organization_id": org_id,
"key_id": key_record.id,
"rate_limit": key_record.rate_limit_per_minute
}
except Exception:
return None
Token Refresh and Revocation
Short-lived access tokens (1-hour) are more secure than long-lived ones, but users should not re-authenticate constantly. Use refresh tokens to trade for new access tokens.
@app.post("/api/v1/auth/refresh")
async def refresh_access_token(refresh_token: str):
"""Exchange a refresh token for a new access token."""
try:
# Verify the refresh token (stored in database)
token_record = db.query(RefreshToken).filter(
RefreshToken.token_hash == hashlib.sha256(refresh_token.encode()).hexdigest(),
RefreshToken.revoked_at == None
).first()
if not token_record or token_record.expires_at < datetime.utcnow():
raise HTTPException(status_code=401, detail="Invalid or expired refresh token")
user = token_record.user
# Generate a new access token
new_access_token = generate_jwt_token(
user_id=user.id,
organization_id=user.organization_id,
expires_in=3600
)
return {
"access_token": new_access_token,
"token_type": "Bearer",
"expires_in": 3600
}
except Exception as e:
raise HTTPException(status_code=401, detail="Token refresh failed")
@app.post("/api/v1/auth/revoke")
async def revoke_token(request: Request, refresh_token: str):
"""Revoke a refresh token (logout)."""
token_record = db.query(RefreshToken).filter(
RefreshToken.token_hash == hashlib.sha256(refresh_token.encode()).hexdigest()
).first()
if token_record:
token_record.revoked_at = datetime.utcnow()
db.commit()
return {"status": "revoked"}
Authentication Security Checklist
| Requirement | Implementation |
|---|---|
| HTTPS only | Enforce HTTPS in production; never send tokens over HTTP |
| Secure cookies | Mark cookies as HttpOnly, Secure, SameSite=Strict |
| Token expiration | Access tokens expire in 1 hour; refresh tokens in 30 days |
| CSRF protection | Use state parameter in OAuth flows; validate origin headers |
| Password hashing | Use bcrypt or Argon2, not SHA-256 for passwords |
| Key rotation | Rotate signing keys annually; support key versioning |
| Rate limiting | Limit login attempts to 5 per minute per IP |
| MFA | Offer TOTP or WebAuthn for sensitive operations |
Key Takeaways
- Use OAuth 2.0 with a trusted identity provider (Google, GitHub) instead of storing passwords directly.
- Store refresh tokens in the database with expiration and revocation flags; short-lived access tokens can be stateless JWTs.
- Hash API keys and secrets before storing them; never store plaintext credentials.
- Validate state tokens in OAuth flows to prevent CSRF attacks.
- Rotate signing keys and API keys regularly; support key versioning for zero-downtime updates.
Frequently Asked Questions
Should I use JWTs or session cookies for authentication?
JWTs are stateless and scale better for distributed systems; session cookies require a central store (Redis) to check validity. For SaaS, JWTs are more common. Store the JWT in localStorage for SPAs or a secure HTTP-only cookie for traditional web apps.
What if a user's API key is leaked?
Immediately revoke the key by marking it as inactive in the database. No more requests with that key will be accepted. Advise the user to generate a new key. Store API key usage logs to detect anomalous activity (e.g., requests from unusual IPs).
Can I use the same token for web and mobile clients?
Yes, but mobile apps should not store tokens in localStorage (which is browser-specific). Use secure storage (Keychain on iOS, Keystore on Android) for tokens. Web apps can use HTTP-only cookies (more secure than localStorage).
How do I handle token expiration on the frontend?
Intercept HTTP 401 responses and automatically refresh the token using the refresh endpoint. If the refresh fails (e.g., refresh token expired), redirect the user to login. Most HTTP client libraries (axios, fetch) support interceptors for this.