Skip to content

Commit

Permalink
changes api with api_key
Browse files Browse the repository at this point in the history
  • Loading branch information
antoinebou12 authored Apr 19, 2024
1 parent a9b5505 commit fc4b5d0
Showing 1 changed file with 74 additions and 9 deletions.
83 changes: 74 additions & 9 deletions api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,23 @@ class ClientSSLError(Exception):

from starlette.responses import Response
import httpx
from datetime import datetime
import hashlib
import os
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256
from fastapi import HTTPException, Security
from fastapi.security.api_key import APIKeyHeader

security_basic = HTTPBasic()
API_KEY_NAME = "access_token"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=True)

# Load RSA keys from environment variables
PRIVATE_KEY = RSA.import_key(os.getenv("RSA_PRIVATE_KEY"))
PUBLIC_KEY = RSA.import_key(os.getenv("RSA_PUBLIC_KEY"))


# Initialize FastAPI and Jinja2
app = FastAPI(docs_url="/docs", redoc_url=None)
Expand All @@ -1036,24 +1053,58 @@ class ClientSSLError(Exception):
allow_headers=["*"],
)

security = HTTPBasic()


class APIResponse(BaseModel):
status: str
message: str
data: Optional[Any] = None


async def get_current_user(credentials: HTTPBasicCredentials = Depends(security)):
def generate_api_key(email: str, password: str) -> str:
"""Generate a signed API key."""
timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")
salt = os.urandom(16).hex()
payload = f"{email}:{password}:{timestamp}:{salt}"
api_key = hashlib.sha256(payload.encode()).hexdigest()

# Sign the API key
key_hash = SHA256.new(api_key.encode())
signature = pkcs1_15.new(PRIVATE_KEY).sign(key_hash)
signed_api_key = api_key + ":" + signature.hex()

return signed_api_key

def decrypt_api_key(api_key: str) -> Optional[Dict[str, str]]:
"""Decrypt API key to extract the email and password."""
try:
api_key, signature = api_key.rsplit(':', 1)
key_hash = SHA256.new(api_key.encode())
# Verify the signature
pkcs1_15.new(PUBLIC_KEY).verify(key_hash, bytes.fromhex(signature))
# If verification is successful, decode the email and password
decoded_data = hashlib.sha256().new(bytes.fromhex(api_key)).hexdigest().split(":")
return {"email": decoded_data[0], "password": decoded_data[1]}
except (ValueError, pkcs1_15.PKCS115_SigSchemeError) as e:
return None

async def get_api_key(api_key: str = Depends(api_key_header)):
"""Dependency that validates the API key."""
user_credentials = decrypt_api_key(api_key)
if user_credentials:
return user_credentials
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail="Invalid API key")

async def get_current_user(credentials: Optional[HTTPBasicCredentials] = Depends(security_basic), api_key: Optional[str] = Depends(get_api_key)):
"""Dependency that authenticates user either by basic auth or API key."""
if credentials:
user = RenphoWeight(email=credentials.username, password=credentials.password)
await user.auth() # Ensure that user can authenticate
return user
except Exception as e:
_LOGGER.error(f"Authentication failed: {e}")
raise HTTPException(status_code=401, detail="Authentication failed") from e

if await user.auth():
return user
elif api_key:
user = RenphoWeight(email=api_key["email"], password=api_key["password"])
if await user.auth():
return user
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials or API key")

@app.get("/")
def read_root(request: Request):
Expand All @@ -1064,6 +1115,20 @@ async def auth(renpho: RenphoWeight = Depends(get_current_user)):
# If this point is reached, authentication was successful
return APIResponse(status="success", message="Authentication successful.")

@app.get("/auth/apikey", response_model=APIResponse)
async def auth_api_key(api_key: str = Depends(get_api_key)):
"""Endpoint to authenticate using an API key."""
return APIResponse(status="success", message="Authenticated using API key.")

@app.get("/generate_api_key", response_model=APIResponse)
def generate_key(request: Request, email: str, password: str):
"""Generate API key for a user."""
try:
api_key = generate_api_key(email, password)
return APIResponse(status="success", message="API key generated.", data={"api_key": api_key})
except Exception as e:
return APIResponse(status="error", message="Failed to generate API key.", data=str(e))

@app.get("/info", response_model=APIResponse)
async def get_info(renpho: RenphoWeight = Depends(get_current_user)):
try:
Expand Down

0 comments on commit fc4b5d0

Please sign in to comment.