Ich denke also, dass der Conninfo-String gespeichert ist. Aber irgendwie muss ich get_conninfo() jedes Mal aufrufen, wenn eine neue Verbindung hergestellt werden soll. Irgendwelche Ideen, wie man das lösen kann?
Hier ist ein Teil meiner Implementierung.
Code: Select all
@classmethod
async def connect(cls):
"""Initializes the connection pool."""
async with cls._lock:
if cls._instance is None:
cls._instance = cls()
if not hasattr(cls._instance, 'pool'):
# Retrieve connection parameters
cls._instance.user = settings.user
cls._instance.host = settings.host
cls._instance.port = settings.port
cls._instance.dbname = settings.dbname
cls._instance.connection_type = settings.connection_type
if cls._instance.connection_type == "managed_identity":
cls._instance.credential = ManagedIdentityCredential()
else:
cls._instance.password = settings.pgvector.pgvector_password.get_secret_value()
cls._instance.pool = AsyncConnectionPool(
conninfo=await cls._instance.get_conninfo(),
open=True
)
await cls._instance.pool.open()
logger.info("Async database connection pool initialized in PostgresClient.")
@classmethod
async def get_conninfo(cls) -> str:
"""Returns connection info string based on the connection type."""
if cls._instance.connection_type == "managed_identity"
# retrieve token from Microsoft
access_token = cls._instance.credential.get_token(
"https://ossrdbms-aad.database.windows.net/.default"
)
return (
f"dbname={cls._instance.dbname} host={cls._instance.host} "
f"port={cls._instance.port} sslmode=require user={cls._instance.user} "
f"password={access_token.token}"
else:
...