Ich verwende eine Postgres -Datenbank. Eine Flask -Anwendung liest aus der Datenbank. Ein Präfektenjob verwendet dieselbe Codebasis und aktualisiert die Datenbank regelmäßig. /Offboarding von Mietern in einer Datenbank "Ein Schema pro Mieter" ist ziemlich schwierig. Wir haben beschlossen, mit freigegebenen Tabellen zu gehen (gemeinsame Tabellen haben einen Spaltenmandant_id ).
[*] Wie wir Kann zu einem anderen Datenbanksystem migrieren. Wir möchten die RLS -Funktion von Postgres nicht verwenden.
Jetzt frage ich Für dieses Shared-Tisch-Multiteenancy-Setup in Sqlalchemy?! Ich fand die alte Multialchemy Bibliothek, aber sie wurde 2022 archiviert und das letzte Commit war vor 9 Jahren: https://github.com/mwhite/multialchemy
Ich habe angefangen Schreiben Sie meinen eigenen Multitenancy ContextManager (unter Verwendung von ORM -globalen Kriterien), aber es ist ziemlich schwierig, dies zu tun ... < /p>
Code: Select all
class HasClientColumn(object):
"""Mixin that identifies a class as having a tenant/client column"""
client_id = Column(BigInteger, nullable=False)
class Testerle(HasClientColumn, Base):
__tablename__ = "testerle"
id = Column(BigInteger, primary_key=True, autoincrement=True)
secret = Column(Text)
class Testerle2(HasClientColumn, Base):
__tablename__ = "testerle2"
id = Column(BigInteger, primary_key=True, autoincrement=True)
secret = Column(Text)
db_sessionmaker = sessionmaker(
autocommit=False, autoflush=False, bind=db_engine, future=True
)
def get_client_scoped_db_session(client_id: int):
session = db_sessionmaker()
# Overwrite execute method so nobody uses it without ORM
session.execute = lambda x, *args, **kwargs: _raise(
Exception(
"client_scoped_db_session not meant to be used with execute command (unsafe). Use ORM!"
)
)
# Docs: https://docs.sqlalchemy.org/en/14/orm/session_events.html#do-orm-execute-global-criteria
@event.listens_for(session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
if (
orm_execute_state.is_select
and not orm_execute_state.is_column_load
and not orm_execute_state.is_relationship_load
):
orm_execute_state.statement = orm_execute_state.statement.options(
with_loader_criteria(
HasClientColumn,
lambda cls: cls.client_id == client_id,
include_aliases=True,
)
)
return session
@contextmanager
def with_client_scoped_db_session(client_id: int):
session = False
try:
logging.debug(f"Establishing DB session for client {client_id}")
session = get_client_scoped_db_session(client_id)
yield session
session.commit()
except Exception as e:
logging.exception(f"Exception in with_client_scoped_db({client_id}): {e}")
if session:
session.rollback()
raise e
finally:
logging.debug(f"Closing DB session for client {client_id}")
if session:
session.close()
< /code>
Dies funktioniert ziemlich gut, wenn Sie nur Dinge wie < /p>
aufrufenwith with_client_scoped_db_session(1) as session:
session.query(Testerle).all()
< /code>
Aber wann immer ich einen Join wie < /p>
machewith with_client_scoped_db_session(1) as session:
session.query(Testerle, Testerle2.secret).join(Testerle2, Testerle.id == Testerle2.id).all()
< /code>
Es wird in meiner benutzerdefinierten Ausnahme ausgeführt, dass die Ausführungsmethode verboten ist (client_scoped_db_session not meant to be used with execute command (unsafe). Use ORM!