266 lines
No EOL
9.3 KiB
Python
266 lines
No EOL
9.3 KiB
Python
import os
|
|
import logging
|
|
from typing import Optional, Dict, Any, Tuple
|
|
from fastapi import HTTPException, Request
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from dotenv import load_dotenv
|
|
|
|
logger = logging.getLogger(__name__)
|
|
load_dotenv()
|
|
|
|
|
|
class ClaudeCodeAuthManager:
|
|
"""Manages authentication for Claude Code SDK integration."""
|
|
|
|
def __init__(self):
|
|
self.env_api_key = os.getenv("API_KEY") # Environment API key
|
|
self.auth_method = self._detect_auth_method()
|
|
self.auth_status = self._validate_auth_method()
|
|
|
|
def get_api_key(self):
|
|
"""Get the active API key (environment or runtime-generated)."""
|
|
# Try to import runtime_api_key from main module
|
|
try:
|
|
import main
|
|
if hasattr(main, 'runtime_api_key') and main.runtime_api_key:
|
|
return main.runtime_api_key
|
|
except ImportError:
|
|
pass
|
|
|
|
# Fall back to environment variable
|
|
return self.env_api_key
|
|
|
|
def _detect_auth_method(self) -> str:
|
|
"""Detect which Claude Code authentication method is configured."""
|
|
if os.getenv("CLAUDE_CODE_USE_BEDROCK") == "1":
|
|
return "bedrock"
|
|
elif os.getenv("CLAUDE_CODE_USE_VERTEX") == "1":
|
|
return "vertex"
|
|
elif os.getenv("ANTHROPIC_API_KEY"):
|
|
return "anthropic"
|
|
else:
|
|
# If no explicit method, assume Claude Code CLI is already authenticated
|
|
return "claude_cli"
|
|
|
|
def _validate_auth_method(self) -> Dict[str, Any]:
|
|
"""Validate the detected authentication method."""
|
|
method = self.auth_method
|
|
status = {
|
|
"method": method,
|
|
"valid": False,
|
|
"errors": [],
|
|
"config": {}
|
|
}
|
|
|
|
if method == "anthropic":
|
|
status.update(self._validate_anthropic_auth())
|
|
elif method == "bedrock":
|
|
status.update(self._validate_bedrock_auth())
|
|
elif method == "vertex":
|
|
status.update(self._validate_vertex_auth())
|
|
elif method == "claude_cli":
|
|
status.update(self._validate_claude_cli_auth())
|
|
else:
|
|
status["errors"].append("No Claude Code authentication method configured")
|
|
|
|
return status
|
|
|
|
def _validate_anthropic_auth(self) -> Dict[str, Any]:
|
|
"""Validate Anthropic API key authentication."""
|
|
api_key = os.getenv("ANTHROPIC_API_KEY")
|
|
if not api_key:
|
|
return {
|
|
"valid": False,
|
|
"errors": ["ANTHROPIC_API_KEY environment variable not set"],
|
|
"config": {}
|
|
}
|
|
|
|
if len(api_key) < 10: # Basic sanity check
|
|
return {
|
|
"valid": False,
|
|
"errors": ["ANTHROPIC_API_KEY appears to be invalid (too short)"],
|
|
"config": {}
|
|
}
|
|
|
|
return {
|
|
"valid": True,
|
|
"errors": [],
|
|
"config": {
|
|
"api_key_present": True,
|
|
"api_key_length": len(api_key)
|
|
}
|
|
}
|
|
|
|
def _validate_bedrock_auth(self) -> Dict[str, Any]:
|
|
"""Validate AWS Bedrock authentication."""
|
|
errors = []
|
|
config = {}
|
|
|
|
# Check if Bedrock is enabled
|
|
if os.getenv("CLAUDE_CODE_USE_BEDROCK") != "1":
|
|
errors.append("CLAUDE_CODE_USE_BEDROCK must be set to '1'")
|
|
|
|
# Check AWS credentials
|
|
aws_access_key = os.getenv("AWS_ACCESS_KEY_ID")
|
|
aws_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
|
|
aws_region = os.getenv("AWS_REGION", os.getenv("AWS_DEFAULT_REGION"))
|
|
|
|
if not aws_access_key:
|
|
errors.append("AWS_ACCESS_KEY_ID environment variable not set")
|
|
if not aws_secret_key:
|
|
errors.append("AWS_SECRET_ACCESS_KEY environment variable not set")
|
|
if not aws_region:
|
|
errors.append("AWS_REGION or AWS_DEFAULT_REGION environment variable not set")
|
|
|
|
config.update({
|
|
"aws_access_key_present": bool(aws_access_key),
|
|
"aws_secret_key_present": bool(aws_secret_key),
|
|
"aws_region": aws_region,
|
|
})
|
|
|
|
return {
|
|
"valid": len(errors) == 0,
|
|
"errors": errors,
|
|
"config": config
|
|
}
|
|
|
|
def _validate_vertex_auth(self) -> Dict[str, Any]:
|
|
"""Validate Google Vertex AI authentication."""
|
|
errors = []
|
|
config = {}
|
|
|
|
# Check if Vertex is enabled
|
|
if os.getenv("CLAUDE_CODE_USE_VERTEX") != "1":
|
|
errors.append("CLAUDE_CODE_USE_VERTEX must be set to '1'")
|
|
|
|
# Check required Vertex AI environment variables
|
|
project_id = os.getenv("ANTHROPIC_VERTEX_PROJECT_ID")
|
|
region = os.getenv("CLOUD_ML_REGION")
|
|
|
|
if not project_id:
|
|
errors.append("ANTHROPIC_VERTEX_PROJECT_ID environment variable not set")
|
|
if not region:
|
|
errors.append("CLOUD_ML_REGION environment variable not set")
|
|
|
|
config.update({
|
|
"project_id": project_id,
|
|
"region": region,
|
|
})
|
|
|
|
return {
|
|
"valid": len(errors) == 0,
|
|
"errors": errors,
|
|
"config": config
|
|
}
|
|
|
|
def _validate_claude_cli_auth(self) -> Dict[str, Any]:
|
|
"""Validate that Claude Code CLI is already authenticated."""
|
|
# For CLI authentication, we assume it's valid and let the SDK handle auth
|
|
# The actual validation will happen when we try to use the SDK
|
|
return {
|
|
"valid": True,
|
|
"errors": [],
|
|
"config": {
|
|
"method": "Claude Code CLI authentication",
|
|
"note": "Using existing Claude Code CLI authentication"
|
|
}
|
|
}
|
|
|
|
def get_claude_code_env_vars(self) -> Dict[str, str]:
|
|
"""Get environment variables needed for Claude Code SDK."""
|
|
env_vars = {}
|
|
|
|
if self.auth_method == "anthropic":
|
|
if os.getenv("ANTHROPIC_API_KEY"):
|
|
env_vars["ANTHROPIC_API_KEY"] = os.getenv("ANTHROPIC_API_KEY")
|
|
|
|
elif self.auth_method == "bedrock":
|
|
env_vars["CLAUDE_CODE_USE_BEDROCK"] = "1"
|
|
if os.getenv("AWS_ACCESS_KEY_ID"):
|
|
env_vars["AWS_ACCESS_KEY_ID"] = os.getenv("AWS_ACCESS_KEY_ID")
|
|
if os.getenv("AWS_SECRET_ACCESS_KEY"):
|
|
env_vars["AWS_SECRET_ACCESS_KEY"] = os.getenv("AWS_SECRET_ACCESS_KEY")
|
|
if os.getenv("AWS_REGION"):
|
|
env_vars["AWS_REGION"] = os.getenv("AWS_REGION")
|
|
|
|
elif self.auth_method == "vertex":
|
|
env_vars["CLAUDE_CODE_USE_VERTEX"] = "1"
|
|
if os.getenv("ANTHROPIC_VERTEX_PROJECT_ID"):
|
|
env_vars["ANTHROPIC_VERTEX_PROJECT_ID"] = os.getenv("ANTHROPIC_VERTEX_PROJECT_ID")
|
|
if os.getenv("CLOUD_ML_REGION"):
|
|
env_vars["CLOUD_ML_REGION"] = os.getenv("CLOUD_ML_REGION")
|
|
if os.getenv("GOOGLE_APPLICATION_CREDENTIALS"):
|
|
env_vars["GOOGLE_APPLICATION_CREDENTIALS"] = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
|
|
|
|
elif self.auth_method == "claude_cli":
|
|
# For CLI auth, don't set any environment variables
|
|
# Let Claude Code SDK use the existing CLI authentication
|
|
pass
|
|
|
|
return env_vars
|
|
|
|
|
|
# Initialize the auth manager
|
|
auth_manager = ClaudeCodeAuthManager()
|
|
|
|
# HTTP Bearer security scheme (for FastAPI endpoint protection)
|
|
security = HTTPBearer(auto_error=False)
|
|
|
|
|
|
async def verify_api_key(request: Request, credentials: Optional[HTTPAuthorizationCredentials] = None):
|
|
"""
|
|
Verify API key if one is configured for FastAPI endpoint protection.
|
|
This is separate from Claude Code authentication.
|
|
"""
|
|
# Get the active API key (environment or runtime-generated)
|
|
active_api_key = auth_manager.get_api_key()
|
|
|
|
# If no API key is configured, allow all requests
|
|
if not active_api_key:
|
|
return True
|
|
|
|
# Get credentials from Authorization header
|
|
if credentials is None:
|
|
credentials = await security(request)
|
|
|
|
# Check if credentials were provided
|
|
if not credentials:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Missing API key",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
# Verify the API key
|
|
if credentials.credentials != active_api_key:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Invalid API key",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
return True
|
|
|
|
|
|
def validate_claude_code_auth() -> Tuple[bool, Dict[str, Any]]:
|
|
"""
|
|
Validate Claude Code authentication and return status.
|
|
Returns (is_valid, status_info)
|
|
"""
|
|
status = auth_manager.auth_status
|
|
|
|
if not status["valid"]:
|
|
logger.error(f"Claude Code authentication failed: {status['errors']}")
|
|
return False, status
|
|
|
|
logger.info(f"Claude Code authentication validated: {status['method']}")
|
|
return True, status
|
|
|
|
|
|
def get_claude_code_auth_info() -> Dict[str, Any]:
|
|
"""Get Claude Code authentication information for diagnostics."""
|
|
return {
|
|
"method": auth_manager.auth_method,
|
|
"status": auth_manager.auth_status,
|
|
"environment_variables": list(auth_manager.get_claude_code_env_vars().keys())
|
|
} |