From fc4b5d025d0b08afb18cdc9d1c64f1535cbc0385 Mon Sep 17 00:00:00 2001 From: Antoine Boucher Date: Fri, 19 Apr 2024 17:00:37 -0400 Subject: [PATCH] changes api with api_key --- api/app.py | 83 ++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 74 insertions(+), 9 deletions(-) diff --git a/api/app.py b/api/app.py index 0a13df5..9747413 100644 --- a/api/app.py +++ b/api/app.py @@ -1021,6 +1021,23 @@ class ClientSSLError(Exception): from starlette.responses import Response import httpx +from datetime import datetime +import hashlib +import os +from Crypto.PublicKey import RSA +from Crypto.Signature import pkcs1_15 +from Crypto.Hash import SHA256 +from fastapi import HTTPException, Security +from fastapi.security.api_key import APIKeyHeader + +security_basic = HTTPBasic() +API_KEY_NAME = "access_token" +api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=True) + +# Load RSA keys from environment variables +PRIVATE_KEY = RSA.import_key(os.getenv("RSA_PRIVATE_KEY")) +PUBLIC_KEY = RSA.import_key(os.getenv("RSA_PUBLIC_KEY")) + # Initialize FastAPI and Jinja2 app = FastAPI(docs_url="/docs", redoc_url=None) @@ -1036,8 +1053,6 @@ class ClientSSLError(Exception): allow_headers=["*"], ) -security = HTTPBasic() - class APIResponse(BaseModel): status: str @@ -1045,15 +1060,51 @@ class APIResponse(BaseModel): data: Optional[Any] = None -async def get_current_user(credentials: HTTPBasicCredentials = Depends(security)): +def generate_api_key(email: str, password: str) -> str: + """Generate a signed API key.""" + timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f") + salt = os.urandom(16).hex() + payload = f"{email}:{password}:{timestamp}:{salt}" + api_key = hashlib.sha256(payload.encode()).hexdigest() + + # Sign the API key + key_hash = SHA256.new(api_key.encode()) + signature = pkcs1_15.new(PRIVATE_KEY).sign(key_hash) + signed_api_key = api_key + ":" + signature.hex() + + return signed_api_key + +def decrypt_api_key(api_key: str) -> Optional[Dict[str, str]]: + """Decrypt API key to extract the email and password.""" try: + api_key, signature = api_key.rsplit(':', 1) + key_hash = SHA256.new(api_key.encode()) + # Verify the signature + pkcs1_15.new(PUBLIC_KEY).verify(key_hash, bytes.fromhex(signature)) + # If verification is successful, decode the email and password + decoded_data = hashlib.sha256().new(bytes.fromhex(api_key)).hexdigest().split(":") + return {"email": decoded_data[0], "password": decoded_data[1]} + except (ValueError, pkcs1_15.PKCS115_SigSchemeError) as e: + return None + +async def get_api_key(api_key: str = Depends(api_key_header)): + """Dependency that validates the API key.""" + user_credentials = decrypt_api_key(api_key) + if user_credentials: + return user_credentials + raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail="Invalid API key") + +async def get_current_user(credentials: Optional[HTTPBasicCredentials] = Depends(security_basic), api_key: Optional[str] = Depends(get_api_key)): + """Dependency that authenticates user either by basic auth or API key.""" + if credentials: user = RenphoWeight(email=credentials.username, password=credentials.password) - await user.auth() # Ensure that user can authenticate - return user - except Exception as e: - _LOGGER.error(f"Authentication failed: {e}") - raise HTTPException(status_code=401, detail="Authentication failed") from e - + if await user.auth(): + return user + elif api_key: + user = RenphoWeight(email=api_key["email"], password=api_key["password"]) + if await user.auth(): + return user + raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials or API key") @app.get("/") def read_root(request: Request): @@ -1064,6 +1115,20 @@ async def auth(renpho: RenphoWeight = Depends(get_current_user)): # If this point is reached, authentication was successful return APIResponse(status="success", message="Authentication successful.") +@app.get("/auth/apikey", response_model=APIResponse) +async def auth_api_key(api_key: str = Depends(get_api_key)): + """Endpoint to authenticate using an API key.""" + return APIResponse(status="success", message="Authenticated using API key.") + +@app.get("/generate_api_key", response_model=APIResponse) +def generate_key(request: Request, email: str, password: str): + """Generate API key for a user.""" + try: + api_key = generate_api_key(email, password) + return APIResponse(status="success", message="API key generated.", data={"api_key": api_key}) + except Exception as e: + return APIResponse(status="error", message="Failed to generate API key.", data=str(e)) + @app.get("/info", response_model=APIResponse) async def get_info(renpho: RenphoWeight = Depends(get_current_user)): try: