Ist mein Setup robust für die Quant-Berechnungs-Engine und sollte ich SQLAlchemy 2.0 oder eine Alternative verwenden?Python

Python-Programme
Anonymous
 Ist mein Setup robust für die Quant-Berechnungs-Engine und sollte ich SQLAlchemy 2.0 oder eine Alternative verwenden?

Post by Anonymous »

Ich erstelle eine FastAPI-Anwendung, die als Kern-Engine für mein System fungiert und häufige CRUD-Vorgänge (hohe Lese-/Schreibhäufigkeit, gleichzeitige Anforderungen) abwickelt.
Ich möchte Ratschläge dazu, ob meine aktuelle Datenbankkonfiguration und mein Nutzungsmuster robust sind und ob es bei der Skalierung der Anwendung zu Engpässen im Zusammenhang mit der Verbindungsverarbeitung, der Leistung oder der Protokollierung kommen kann.
Aktuelles Setup (hohes Niveau)
  • FastAPI-Backend
  • Relationale Datenbank (z. B. PostgreSQL / MySQL)
  • SQLAlchemy für ORM-/DB-Zugriff
  • Sitzungsbasiert DB-Interaktion pro Anfrage
  • Protokollierung für Abfragen und Anwendungsereignisse aktiviert
Ziel der App: Es ist eine Berechnungs-Engine für den Derivatehandel und quantitative Operationen
Einstiegspunkt

Code: Select all

""" Application entry point. """

from __future__ import annotations

import logging
from contextlib import asynccontextmanager

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from app.config import settings
from app.database import check_database_health, dispose_engine, get_engine
from app.api.v1 import api_router

# Configure module logger
logger = logging.getLogger(__name__)

@asynccontextmanager
async def lifespan(app: FastAPI):

logger.info("Starting %s in %s mode...", settings.app_name, settings.environment)

# --- STARTUP PHASE ---
try:
if not check_database_health():
if settings.environment == "production":
logger.critical("Startup Failed: Database unreachable.")
raise RuntimeError("Database connection required in production mode.")
else:
logger.warning("Database unavailable.  Running in limited mode.")
else:
# Attach engine to app state for access in request handlers
app.state.db_engine = get_engine()
logger.info("Database connection established: %s", settings.db_host or "from DATABASE_URL")

logger.info("Application Startup Complete.")

except Exception as e:
logger.critical("Startup Failed: %s", e)
raise

# --- RUNNING PHASE ---
yield

# --- SHUTDOWN PHASE ---
logger.info("Shutting down application...")

dispose_engine()

logger.info("Goodbye!")

def create_application() -> FastAPI:
app = FastAPI(
title=settings.app_name,
version=settings.app_version,
description="Production-grade derivatives pricing calculation engine",
lifespan=lifespan,
docs_url=f"{settings.api_base_path}/docs",
redoc_url=f"{settings.api_base_path}/redoc",
openapi_url=f"{settings.api_base_path}/openapi.json",
)

# --- CORS Middleware ---
if settings.allowed_origins:
app.add_middleware(
CORSMiddleware,
allow_origins=settings.allowed_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
logger.info("CORS enabled for %d origins.", len(settings.allowed_origins))

# --- Router Registration ---

app.include_router(
api_router,
prefix=settings.api_base_path,
)
logger.info("API routes registered under: %s", settings.api_base_path)

# --- Health Check Endpoint (outside API prefix for k8s probes) ---
@app.get("/health", tags=["Health"])
def health_check():
db_healthy = check_database_health()
return {
"status": "healthy" if db_healthy else "degraded",
"database": "connected" if db_healthy else "disconnected",
"version": settings.app_version,
}

@app.get(f"{settings.context_path}/health", tags=["Health"])
def health_check_with_context():
db_healthy = check_database_health()
return {
"status": "healthy" if db_healthy else "degraded",
"database": "connected" if db_healthy else "disconnected",
"version": settings.app_version,
}

return app

# Create the application instance
app = create_application()
DB-Konfiguration (Postgres)

Code: Select all

from __future__ import annotations

import logging
from contextlib import contextmanager
from typing import Generator

from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.orm import sessionmaker, Session

from app.config import settings

logger = logging.getLogger(__name__)

def create_db_engine() -> Engine:

db_uri = settings.sqlalchemy_database_uri

if not db_uri:
logger.warning("No database URI configured. Engine creation skipped.")
return None

logger.info("Creating database engine for host: %s", settings.db_host or "from DATABASE_URL")

engine = create_engine(
db_uri,
pool_size=5,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=1800,  # 30 minutes
echo=settings.debug,  # SQL logging only in debug mod
)

logger.info("Database engine created successfully.")
return engine

# Module-level engine instance (lazy initialization)
_engine: Engine | None = None

def get_engine() -> Engine:
global _engine
if _engine is None:
_engine = create_db_engine()
return _engine

def get_session_factory() -> sessionmaker:
engine = get_engine()
if engine is None:
raise RuntimeError("Database engine not initialized.  Check database configuration.")
return sessionmaker(autocommit=False, autoflush=False, bind=engine)

@contextmanager
def get_db_session() -> Generator[Session, None, None]:
SessionLocal = get_session_factory()
session = SessionLocal()
try:
yield session
session.commit()
except SQLAlchemyError as e:
session.rollback()
logger.error("Database session error: %s", e)
raise
finally:
session.close()

def get_db() -> Generator[Session, None, None]:
SessionLocal = get_session_factory()
db = SessionLocal()
try:
yield db
except Exception:
db.rollback()
raise
finally:
db.close()

def check_database_health() -> bool:
engine = get_engine()
if engine is None:
logger.warning("Health check skipped: No database engine configured.")
return False

try:
with engine.connect() as connection:
connection.execute(text("SELECT 1"))
logger.info("Database health check passed.")
return True
except OperationalError as e:
logger.error("Database health check failed (OperationalError): %s", e)
return False
except Exception as e:
logger.error("Database health check failed (unexpected): %s", e)
return False

def dispose_engine() -> None:
global _engine
if _engine is not None:
try:
_engine.dispose()
logger.info("Database engine disposed successfully.")
except Exception as e:
logger.error("Error disposing database engine: %s", e)
finally:
_engine = None
Konfiguration kommt von .env

Code: Select all

import logging
import json # Import at top level
from typing import List, Union
from urllib.parse import quote_plus

from pydantic import Field, SecretStr, field_validator, computed_field, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict

# 1.  Configure Logging so INFO messages actually show up
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)

class Settings(BaseSettings):
# Application Settings
app_name: str = "Pricing Calculation Engine"
app_version: str = "1.0.0"
environment: str = Field(default="production")
debug: bool = Field(default=False)

# Routing
context_path: str = "/calculation-engine"
api_prefix: str = "/api/v1"

@computed_field
@property
def api_base_path(self) -> str:
return f"{self.context_path}{self.api_prefix}"

# Database Creds
database_url: SecretStr = SecretStr("")

db_host: str | None = None
db_port: int = 5432
db_name: str | None = None
db_username: str | None = None
db_password: SecretStr | None = None

@computed_field
@property
def sqlalchemy_database_uri(self) -> str:
if self.database_url.get_secret_value():
return self.database_url.get_secret_value()

if not all([self.db_host, self.db_name, self.db_username, self.db_password]):
if self.environment == "production":
raise ValueError("Missing Database Config: Set DATABASE_URL or component fields.")
return ""

user = quote_plus(self.db_username)
password = quote_plus(self.db_password.get_secret_value())

return f"postgresql://{user}:{password}@{self.db_host}:{self.db_port}/{self.db_name}"

# Security
secret_key: SecretStr = SecretStr("")
algorithm: str = "HS256"
access_token_expire_minutes: int = 30

# CORS
allowed_origins: List[str] = Field(default_factory=list)

@field_validator("allowed_origins", mode="before")
@classmethod
def parse_cors_origins(cls, v: Union[str, List[str], None]) -> List[str]:
if v is None:
return []

if isinstance(v, list):
return v

if isinstance(v, str):
v = v.strip()
if v.startswith("[") and v.endswith("]"):
try:
return json.loads(v)
except json.JSONDecodeError:

pass

#Fallback to comma separation
return [origin.strip() for origin in v.split(",") if origin.strip()]

return []

# Validation
@model_validator(mode='after')
def verify_configuration(self) -> 'Settings':
if self.environment == "production" and not self.secret_key.get_secret_value():
raise ValueError("CRITICAL: 'SECRET_KEY' is missing.")

if self.environment == "production" and not self.database_url.get_secret_value():
if not all([self.db_host, self.db_name, self.db_username, self.db_password]):
raise ValueError("Database configuration incomplete.")

if self.environment == "production" and not self.allowed_origins:
logging.warning("No CORS origins configured for production.")

return self

model_config = SettingsConfigDict(
env_file=".env",
case_sensitive=False,
extra="ignore"
)

# Execution Block
try:
settings = Settings()
logging.info(f"Server starting in {settings.environment} mode.")
logging.info(f"CORS Origins loaded: {len(settings.allowed_origins)}")
except ValueError as e:
logging.error(f"Configuration Error: {e}")
exit(1)
except Exception as e:
logging.error(f"Unexpected Error during startup: {e}")
exit(1)
Frage
Angesichts meiner aktuellen Konfiguration:
  • Ist dieses Datenbank-Setup produktionsbereit und skalierbar?
  • Gibt es Anti-Patterns oder Risiken, die ich jetzt ansprechen sollte?
  • Was wäre die Best-Practice-Empfehlung für den Datenbankzugriff in FastAPI für häufige CRUD-Vorgänge?
Jede Anleitung oder Referenz wäre sehr dankbar.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post