Правильная обработка SQLAlchemy сеанса в многопоточных приложениях
Мне трудно понять, как правильно открывать и закрывать сеансы базы данных эффективно, как я понял в документации sqlalchemy, если я использую scoped_session для создания объекта Session, а затем использую возвращаемый объект Session для создания сеансов, он потокобезопасен, поэтому в основном каждый поток будет иметь свою собственную сессию, и проблем с ней не будет. Теперь приведен пример ниже: я помещаю его в бесконечный цикл, чтобы проверить, правильно ли он закрывает сеансы, и если я правильно его отслеживаю (в mysql, выполняя "SHOW PROCESSLIST;" ), соединения просто продолжают расти, они не закрывают их, хотя я использовал session.close() и даже удалял объект scoped_session в конце каждого прогона. Что я делаю не так? Моя цель в более крупном приложении - использовать минимальное количество подключений к базе данных, потому что моя текущая рабочая реализация создает новый сеанс в каждом методе, где требуется, и закрывает его перед возвратом, что кажется неэффективным.
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from threading import Thread
from Queue import Queue, Empty as QueueEmpty
from models import MyModel
DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname'
class MTWorker(object):
def __init__(self, worker_count=5):
self.task_queue = Queue()
self.worker_count = worker_count
self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
self.DBSession = scoped_session(
sessionmaker(
autoflush=True,
autocommit=False,
bind=self.db_engine
)
)
def _worker(self):
db_session = self.DBSession()
while True:
try:
task_id = self.task_queue.get(False)
try:
item = db_session.query(MyModel).filter(MyModel.id == task_id).one()
# do something with item
except Exception as exc:
# if an error occurrs we skip it
continue
finally:
db_session.commit()
self.task_queue.task_done()
except QueueEmpty:
db_session.close()
return
def start(self):
try:
db_session = self.DBSession()
all_items = db_session.query(MyModel).all()
for item in all_items:
self.task_queue.put(item.id)
for _i in range(self.worker_count):
t = Thread(target=self._worker)
t.start()
self.task_queue.join()
finally:
db_session.close()
self.DBSession.remove()
if __name__ == '__main__':
while True:
mt_worker = MTWorker(worker_count=50)
mt_worker.start()
Ответы
Ответ 1
Вы должны звонить только create_engine
и scoped_session
один раз за
процесса (для каждой базы данных). Каждый из них получит свой собственный пул соединений или сеансов
(соответственно), поэтому вы хотите убедиться, что вы создаете только один пул. Просто сделайте его модульным уровнем глобальным. если вам нужно более эффективно управлять своими сеансами, вы, вероятно, не должны использовать scoped_session
Еще одно изменение заключается в том, чтобы использовать DBSession
напрямую, как если бы это было
сессия. методы вызова сеанса на scoped_session будут прозрачно
при необходимости создайте поток-локальный сеанс и переадресуйте вызов метода на
сессии.
Еще одна вещь, о которой нужно знать, - это
pool_size
пула соединений, который
по умолчанию - 5. Для многих приложений это хорошо, но если вы создаете
много потоков, вам может потребоваться настроить этот параметр
DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname'
db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
DBSession = scoped_session(
sessionmaker(
autoflush=True,
autocommit=False,
bind=db_engine
)
)
class MTWorker(object):
def __init__(self, worker_count=5):
self.task_queue = Queue()
self.worker_count = worker_count
# snip