Поиск лучшей стратегии для SQLAlchemy bulk upsert
У меня есть Flask приложение с RESTful API. Один из вызовов API - это вызов "массового upsert" с полезной нагрузкой JSON. Я борюсь с работой.
Первое, что я пробовал, это использовать merge-result
для объекта Query
, потому что...
Это оптимизированный метод, который объединит все сопоставленные экземпляры, сохраняя структуру строк результата и неотображаемых столбцов с меньшими затратами на методы, чем метод вызова Session.merge() явно для каждого значения.
Это был начальный код:
class AdminApiUpdateTasks(Resource):
"""Bulk task creation / update endpoint"""
def put(self, slug):
taskdata = json.loads(request.data)
existing = db.session.query(Task).filter_by(challenge_slug=slug)
existing.merge_result(
[task_from_json(slug, **task) for task in taskdata])
db.session.commit()
return {}, 200
Запрос на эту конечную точку с ~ 5000 записями, все из которых уже существуют в базе данных, возвращает более 11 м:
real 11m36.459s
user 0m3.660s
sys 0m0.391s
Поскольку это было бы довольно типичным вариантом использования, я начал изучать альтернативы для повышения производительности. Против моего лучшего суждения я попытался merge
сеанс для каждой отдельной записи:
class AdminApiUpdateTasks(Resource):
"""Bulk task creation / update endpoint"""
def put(self, slug):
# Get the posted data
taskdata = json.loads(request.data)
for task in taskdata:
db.session.merge(task_from_json(slug, **task))
db.session.commit()
return {}, 200
К моему удивлению, это оказалось более чем в два раза быстрее:
real 4m33.945s
user 0m3.608s
sys 0m0.258s
У меня есть два вопроса:
- Почему вторая стратегия использует
merge
быстрее, чем предположительно оптимизированная первая, которая использует merge_result
?
- Какие еще стратегии следует использовать для оптимизации этого, если таковые имеются?
Ответы
Ответ 1
Я думаю, что это вызвало вашу медлительность в первом запросе:
existing = db.session.query(Task).filter_by(challenge_slug=slug)
Также вы должны, вероятно, изменить это:
existing.merge_result(
[task_from_json(slug, **task) for task in taskdata])
To:
existing.merge_result(
(task_from_json(slug, **task) for task in taskdata))
Так как это должно сохранить вам некоторую память и время, так как список не будет сгенерирован в памяти перед отправкой его методу merge_result.
Ответ 2
Это старый вопрос, но я надеюсь, что этот ответ все еще может помочь людям.
Я использовал ту же идею, что и этот пример, установленный SQLAlchemy, но я добавил бенчмаркинг для выполнения операций UPSERT (вставьте, если существует, в противном случае обновите существующую запись). Я добавил результаты в базу данных PostgreSQL 11 ниже:
Tests to run: test_customer_individual_orm_select, test_customer_batched_orm_select, test_customer_batched_orm_select_add_all, test_customer_batched_orm_merge_result
test_customer_individual_orm_select : UPSERT statements via individual checks on whether objects exist and add new objects individually (10000 iterations); total time 9.359603 sec
test_customer_batched_orm_select : UPSERT statements via batched checks on whether objects exist and add new objects individually (10000 iterations); total time 1.553555 sec
test_customer_batched_orm_select_add_all : UPSERT statements via batched checks on whether objects exist and add new objects in bulk (10000 iterations); total time 1.358680 sec
test_customer_batched_orm_merge_result : UPSERT statements using batched merge_results (10000 iterations); total time 7.191284 sec
Как видите, слияние-результат далеко не самый эффективный вариант. Я бы посоветовал проверять в пакетах, есть ли результаты и нужно ли их обновлять. Надеюсь это поможет!
"""
This series of tests illustrates different ways to UPSERT
or INSERT ON CONFLICT UPDATE a large number of rows in bulk.
"""
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from profiler import Profiler
Base = declarative_base()
engine = None
class Customer(Base):
__tablename__ = "customer"
id = Column(Integer, primary_key=True)
name = Column(String(255))
description = Column(String(255))
Profiler.init("bulk_upserts", num=100000)
@Profiler.setup
def setup_database(dburl, echo, num):
global engine
engine = create_engine(dburl, echo=echo)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
s = Session(engine)
for chunk in range(0, num, 10000):
# Insert half of the customers we want to merge
s.bulk_insert_mappings(
Customer,
[
{
"id": i,
"name": "customer name %d" % i,
"description": "customer description %d" % i,
}
for i in range(chunk, chunk + 10000, 2)
],
)
s.commit()
@Profiler.profile
def test_customer_individual_orm_select(n):
"""
UPSERT statements via individual checks on whether objects exist
and add new objects individually
"""
session = Session(bind=engine)
for i in range(0, n):
customer = session.query(Customer).get(i)
if customer:
customer.description += "updated"
else:
session.add(Customer(
id=i,
name=f"customer name {i}",
description=f"customer description {i} new"
))
session.flush()
session.commit()
@Profiler.profile
def test_customer_batched_orm_select(n):
"""
UPSERT statements via batched checks on whether objects exist
and add new objects individually
"""
session = Session(bind=engine)
for chunk in range(0, n, 1000):
customers = {
c.id: c for c in
session.query(Customer)\
.filter(Customer.id.between(chunk, chunk + 1000))
}
for i in range(chunk, chunk + 1000):
if i in customers:
customers[i].description += "updated"
else:
session.add(Customer(
id=i,
name=f"customer name {i}",
description=f"customer description {i} new"
))
session.flush()
session.commit()
@Profiler.profile
def test_customer_batched_orm_select_add_all(n):
"""
UPSERT statements via batched checks on whether objects exist
and add new objects in bulk
"""
session = Session(bind=engine)
for chunk in range(0, n, 1000):
customers = {
c.id: c for c in
session.query(Customer)\
.filter(Customer.id.between(chunk, chunk + 1000))
}
to_add = []
for i in range(chunk, chunk + 1000):
if i in customers:
customers[i].description += "updated"
else:
to_add.append({
"id": i,
"name": "customer name %d" % i,
"description": "customer description %d new" % i,
})
if to_add:
session.bulk_insert_mappings(
Customer,
to_add
)
to_add = []
session.flush()
session.commit()
@Profiler.profile
def test_customer_batched_orm_merge_result(n):
"UPSERT statements using batched merge_results"
session = Session(bind=engine)
for chunk in range(0, n, 1000):
customers = session.query(Customer)\
.filter(Customer.id.between(chunk, chunk + 1000))
customers.merge_result(
Customer(
id=i,
name=f"customer name {i}",
description=f"customer description {i} new"
) for i in range(chunk, chunk + 1000)
)
session.flush()
session.commit()