Массовая вставка с SQLAlchemy ORM
Есть ли способ заставить SQLAlchemy делать объемную вставку, а не вставлять каждый отдельный объект. то есть.,
делание:
INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)
а не:
INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)
Я только что преобразовал некоторый код, чтобы использовать sqlalchemy, а не raw sql, и хотя теперь с ним гораздо удобнее работать, кажется, что он медленнее (до 10 раз), мне интересно, причина.
Могу ли я улучшить ситуацию, используя сеансы, более эффективно. На данный момент у меня есть autoCommit=False
и делаю a session.commit()
после того, как добавил некоторые вещи. Хотя это, похоже, заставляет данные устаревать, если DB изменен в другом месте, например, даже если я делаю новый запрос, я все еще получаю старые результаты?
Спасибо за вашу помощь!
Ответы
Ответ 1
SQLAlchemy представил это в версии 1.0.0
:
Массовые операции - документы по SQLAlchemy
С помощью этих операций вы теперь можете делать массовые вставки или обновления!
Например, вы можете сделать:
s = Session()
objects = [
User(name="u1"),
User(name="u2"),
User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()
Здесь будет сделана основная вставка.
Ответ 2
Насколько я знаю, нет способа заставить ORM выдавать массовые вставки. Я полагаю, что основная причина заключается в том, что SQLAlchemy необходимо отслеживать идентичность каждого объекта (т.е. Новые первичные ключи), и массовые вставки мешают этому. Например, предположим, что ваша таблица foo
содержит столбец id
и сопоставлена с классом Foo
:
x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1
Поскольку SQLAlchemy выбрал значение для x.id
без выполнения другого запроса, мы можем сделать вывод, что оно получило значение непосредственно из оператора INSERT
. Если вам не нужен последующий доступ к созданным объектам через те же экземпляры, вы можете пропустить слой ORM для вставки:
Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))
SQLAlchemy не может сопоставить эти новые строки с какими-либо существующими объектами, поэтому вам придется заново запрашивать их для любых последующих операций.
Что касается устаревших данных, полезно помнить, что у сеанса нет встроенного способа узнать, когда база данных изменяется вне сеанса. Чтобы получить доступ к измененным извне данным через существующие экземпляры, они должны быть помечены как просроченные. Это происходит по умолчанию в session.commit()
, но это можно сделать вручную, вызвав session.expire_all()
или session.expire(instance)
. Пример (SQL опущен):
x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42
session.commit()
истекает x
, поэтому первый оператор print неявно открывает новую транзакцию и повторно запрашивает атрибуты x
. Если вы закомментируете первый оператор печати, вы заметите, что второй теперь выбирает правильное значение, потому что новый запрос не генерируется до окончания обновления.
Это имеет смысл с точки зрения изоляции транзакций - вам следует выбирать только внешние модификации между транзакциями. Если это вызывает у вас проблемы, я бы предложил уточнить или переосмыслить границы транзакций вашего приложения, вместо того, чтобы сразу обратиться к session.expire_all()
.
Ответ 3
Документы sqlalchemy содержат описание производительности различных методов, которые можно использовать для массовых вставок:
ORM в основном не предназначены для высокопроизводительных массовых вставок - вот почему SQLAlchemy предлагает Core в дополнение к ORM в качестве первоклассного компонента.
В случае использования быстрых массовых вставок система генерации и исполнения SQL, на основе которой строится ORM, является частью ядра. Используя эту систему напрямую, мы можем создать INSERT, который будет конкурировать с непосредственным использованием API необработанных баз данных.
В качестве альтернативы, SQLAlchemy ORM предлагает набор методов Bulk Operations, которые обеспечивают привязки к подразделам единицы рабочего процесса для создания конструкций INSERT и UPDATE на уровне ядра с небольшой степенью автоматизации на основе ORM.
Пример ниже иллюстрирует основанные на времени тесты для нескольких различных способов вставки строк, переходя от наиболее автоматизированного к наименьшему. В cPython 2.7 время выполнения:
classics-MacBook-Pro:sqlalchemy classic$ python test.py
SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs
SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs
SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs
sqlite3: Total time for 100000 records 0.487842082977 sec
Автор сценария:
import time
import sqlite3
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None
class Customer(Base):
__tablename__ = "customer"
id = Column(Integer, primary_key=True)
name = Column(String(255))
def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
global engine
engine = create_engine(dbname, echo=False)
DBSession.remove()
DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
def test_sqlalchemy_orm(n=100000):
init_sqlalchemy()
t0 = time.time()
for i in xrange(n):
customer = Customer()
customer.name = 'NAME ' + str(i)
DBSession.add(customer)
if i % 1000 == 0:
DBSession.flush()
DBSession.commit()
print(
"SQLAlchemy ORM: Total time for " + str(n) +
" records " + str(time.time() - t0) + " secs")
def test_sqlalchemy_orm_pk_given(n=100000):
init_sqlalchemy()
t0 = time.time()
for i in xrange(n):
customer = Customer(id=i+1, name="NAME " + str(i))
DBSession.add(customer)
if i % 1000 == 0:
DBSession.flush()
DBSession.commit()
print(
"SQLAlchemy ORM pk given: Total time for " + str(n) +
" records " + str(time.time() - t0) + " secs")
def test_sqlalchemy_orm_bulk_insert(n=100000):
init_sqlalchemy()
t0 = time.time()
n1 = n
while n1 > 0:
n1 = n1 - 10000
DBSession.bulk_insert_mappings(
Customer,
[
dict(name="NAME " + str(i))
for i in xrange(min(10000, n1))
]
)
DBSession.commit()
print(
"SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
" records " + str(time.time() - t0) + " secs")
def test_sqlalchemy_core(n=100000):
init_sqlalchemy()
t0 = time.time()
engine.execute(
Customer.__table__.insert(),
[{"name": 'NAME ' + str(i)} for i in xrange(n)]
)
print(
"SQLAlchemy Core: Total time for " + str(n) +
" records " + str(time.time() - t0) + " secs")
def init_sqlite3(dbname):
conn = sqlite3.connect(dbname)
c = conn.cursor()
c.execute("DROP TABLE IF EXISTS customer")
c.execute(
"CREATE TABLE customer (id INTEGER NOT NULL, "
"name VARCHAR(255), PRIMARY KEY(id))")
conn.commit()
return conn
def test_sqlite3(n=100000, dbname='sqlite3.db'):
conn = init_sqlite3(dbname)
c = conn.cursor()
t0 = time.time()
for i in xrange(n):
row = ('NAME ' + str(i),)
c.execute("INSERT INTO customer (name) VALUES (?)", row)
conn.commit()
print(
"sqlite3: Total time for " + str(n) +
" records " + str(time.time() - t0) + " sec")
if __name__ == '__main__':
test_sqlalchemy_orm(100000)
test_sqlalchemy_orm_pk_given(100000)
test_sqlalchemy_orm_bulk_insert(100000)
test_sqlalchemy_core(100000)
test_sqlite3(100000)
Ответ 4
Обычно я делаю это с помощью add_all
.
from app import session
from models import User
objects = [User(name="u1"), User(name="u2"), User(name="u3")]
session.add_all(objects)
session.commit()
Ответ 5
Прямая поддержка была добавлена в SQLAlchemy с версии 0.8
В соответствии с docs, connection.execute(table.insert().values(data))
должен сделать трюк. (Обратите внимание, что это не то же самое, что connection.execute(table.insert(), data)
, что приводит к множеству отдельных вставок строк при вызове executemany
). На чем угодно, кроме локального соединения, разница в производительности может быть огромной.
Ответ 6
SQLAlchemy представил это в версии 1.0.0
:
Массовые операции - документы по SQLAlchemy
С помощью этих операций вы теперь можете делать массовые вставки или обновления!
Например (если вы хотите минимальные издержки для простых INSERT-таблиц), вы можете использовать Session.bulk_insert_mappings()
:
loadme = [
(1, 'a')
, (2, 'b')
, (3, 'c')
]
dicts = []
for i in range(len(loadme)):
dicts.append(dict(bar=loadme[i][0], fly=loadme[i][1]))
s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()
Или, если хотите, пропустите кортежи loadme
и напишите словари непосредственно в dicts
(но я считаю, что проще исключить всю многословность данных и загрузить список словарей в цикле).
Ответ 7
Ответ Пьера правильный, но одна проблема в том, что bulk_save_objects
по умолчанию не возвращает первичные ключи объектов, если это вас беспокоит. Установите return_defaults
в True
чтобы получить это поведение.
Документация здесь.
foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
assert foo.id is not None
session.commit()
Ответ 8
Это способ:
values = [1, 2, 3]
Foo.__table__.insert().execute([{'bar': x} for x in values])
Это будет выглядеть следующим образом:
INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)
Ссылка: SQLAlchemy FAQ включает эталонные тесты для различных методов фиксации.
Ответ 9
Все дороги ведут в Рим, но некоторые из них пересекают горы, требуются паромы, но если вы хотите быстро туда добраться, просто езжайте по автостраде.
В этом случае автомагистраль должна использовать функцию execute_batch() в psycopg2. Документация говорит это лучше всего:
Текущая реализация executemany()
(с чрезвычайно благотворительным преуменьшением) не особенно эффективна. Эти функции могут использоваться для ускорения повторного выполнения оператора с набором параметров. Благодаря уменьшению количества обращений к серверу производительность может быть на несколько порядков лучше, чем при использовании executemany()
.
В моем собственном тесте execute_batch()
примерно в два раза быстрее, чем executemany()
, и дает возможность настроить page_size для дальнейшей настройки (если вы хотите выжать последние 2-3% производительности из драйвера).
Эту же функцию можно легко включить, если вы используете SQLAlchemy, установив use_batch_mode=True
в качестве параметра, когда вы создаете движок с помощью create_engine()
Ответ 10
Лучший ответ, который я нашел, был в документации по sqlalchemy:
http://docs.sqlalchemy.org/en/latest/faq/performance.html#im-inserting-400-000-rows-with-the-orm-and-it-s-really-slow
Существует полный пример эталона возможных решений.
Как показано в документации:
bulk_save_objects - не лучшее решение, но его производительность верна.
Вторая лучшая реализация с точки зрения читабельности, я думаю, была с ядром SQLAlchemy:
def test_sqlalchemy_core(n=100000):
init_sqlalchemy()
t0 = time.time()
engine.execute(
Customer.__table__.insert(),
[{"name": 'NAME ' + str(i)} for i in xrange(n)]
)
Контекст этой функции приведен в статье документации.