Ускорение pandas.DataFrame.to_sql с fast_executemany из pyODBC
Я хотел бы отправить большой pandas.DataFrame
на удаленный сервер с MS SQL. То, как я делаю это сейчас, - это преобразовать объект data_frame
в список кортежей и затем отправить его с помощью executemany()
pyODBC executemany()
. Это выглядит примерно так:
import pyodbc as pdb
list_of_tuples = convert_df(data_frame)
connection = pdb.connect(cnxn_str)
cursor = connection.cursor()
cursor.fast_executemany = True
cursor.executemany(sql_statement, list_of_tuples)
connection.commit()
cursor.close()
connection.close()
Затем я начал задаваться вопросом, можно ли ускорить работу (или, по крайней мере, более читаемую), с помощью data_frame.to_sql()
. Я придумал следующее решение:
import sqlalchemy as sa
engine = sa.create_engine("mssql+pyodbc:///?odbc_connect=%s" % cnxn_str)
data_frame.to_sql(table_name, engine, index=False)
Теперь код более читабельен, но загрузка по меньшей мере в 150 раз медленнее...
Есть ли способ перевернуть fast_executemany
при использовании SQLAlchemy?
Я использую pandas-0.20.3, pyODBC-4.0.21 и sqlalchemy-1.1.13.
Ответы
Ответ 1
После обращения к разработчикам SQLAlchemy появился способ решить эту проблему. Большое спасибо им за отличную работу!
Нужно использовать событие выполнения курсора и проверить, был ли executemany
флаг executemany
. Если это действительно так, fast_executemany
опцию fast_executemany
. Например:
from sqlalchemy import event
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
if executemany:
cursor.fast_executemany = True
Более подробную информацию о событиях выполнения можно найти здесь.
ОБНОВЛЕНИЕ: Поддержка fast_executemany
pyodbc
была добавлена в SQLAlchemy 1.3.0, так что этот хак больше не нужен.
Ответ 2
РЕДАКТИРОВАТЬ (2019-03-08): Горд Томпсон прокомментировал ниже хорошие новости из журналов обновлений sqlalchemy: начиная с SQLAlchemy 1.3.0, выпущенного 2019-03-04, sqlalchemy теперь поддерживает engine = create_engine(sqlalchemy_url, fast_executemany=True)
для диалект mssql+pyodbc
. Т.е. больше нет необходимости определять функцию и использовать @event.listens_for(engine, 'before_cursor_execute')
означает, что нижеследующая функция может быть удалена, и в операторе create_engine должен быть установлен только флаг - и при этом сохраняется скорость вверх.
Исходное сообщение:
Просто сделал аккаунт чтобы опубликовать это. Я хотел прокомментировать вышеупомянутую ветку, поскольку это продолжение уже предоставленного ответа. Вышеупомянутое решение работало для меня с драйвером SQL версии 17 для записи в хранилище Microsft SQL из установки на основе Ubuntu.
Полный код, который я использовал для значительного ускорения (говорим> ускорение в 100 раз), приведен ниже. Это фрагмент кода "под ключ" при условии, что вы измените строку подключения, указав соответствующие данные. На постере выше, большое спасибо за решение, так как я уже довольно долго искал это.
import pandas as pd
import numpy as np
import time
from sqlalchemy import create_engine, event
from urllib.parse import quote_plus
conn = "DRIVER={ODBC Driver 17 for SQL Server};SERVER=IP_ADDRESS;DATABASE=DataLake;UID=USER;PWD=PASS"
quoted = quote_plus(conn)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
engine = create_engine(new_con)
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
print("FUNC call")
if executemany:
cursor.fast_executemany = True
table_name = 'fast_executemany_test'
df = pd.DataFrame(np.random.random((10**4, 100)))
s = time.time()
df.to_sql(table_name, engine, if_exists = 'replace', chunksize = None)
print(time.time() - s)
Основываясь на комментариях ниже, я хотел потратить некоторое время, чтобы объяснить некоторые ограничения в реализации pandas to_sql
и способа обработки запроса. Есть две вещи, которые могут вызвать MemoryError
.
1) Предполагается, что вы пишете в удаленное хранилище SQL. Когда вы пытаетесь написать большой DataFrame для панд с to_sql
метода to_sql
он преобразует весь dataframe в список значений. Это преобразование занимает намного больше оперативной памяти, чем исходный DataFrame (поверх него, так как старый DataFrame все еще присутствует в RAM). Этот список предоставляется последнему вызову executemany
для вашего коннектора ODBC. Я думаю, что у соединителя ODBC есть некоторые проблемы, обрабатывающие такие большие запросы. Способ решения этой проблемы является предоставление to_sql
метод а chunksize аргумент (10 ** 5, кажется, вокруг оптимальной давая около 600 Мбит/с() скорость записи на 2 CPU 7GB барана приложения MSSQL Storage от Azure - не может рекомендую Azure кстати). Таким образом, первое ограничение - размер запроса - можно обойти, предоставив аргумент chunksize
. Однако это не позволит вам записать фрейм данных размером 10 ** 7 или больше (по крайней мере, не на той виртуальной машине, с которой я работаю, у которой ~ 55 ГБ ОЗУ), поскольку это номер 2.
Этого можно обойти, разбив DataFrame с помощью np.split
(составляющего 10 ** 6 блоков DataFrame размера). Они могут быть записаны итеративно. Я попытаюсь сделать пулл-запрос, когда у меня будет готово решение для метода to_sql
в ядре самой панды, так что вам не придется делать это предварительно каждый раз. В любом случае я написал функцию, похожую (не под ключ) на следующую:
import pandas as pd
import numpy as np
def write_df_to_sql(df, **kwargs):
chunks = np.split(df, df.shape()[0] / 10**6)
for chunk in chunks:
chunk.to_sql(**kwargs)
return True
Более полный пример приведенного выше фрагмента можно посмотреть здесь: https://gitlab.com/timelord/timelord/blob/master/timelord/utils/connector.py
Это класс, который я написал, который включает в себя патч и облегчает некоторые необходимые накладные расходы, связанные с настройкой соединений с SQL. Еще нужно написать какую-то документацию. Также я планировал добавить патч для самой панды, но пока не нашел хорошего способа сделать это.
Надеюсь, это поможет.
Ответ 3
Я просто хотел опубликовать этот полный пример в качестве дополнительного высокопроизводительного варианта для тех, кто может использовать новую библиотеку turbodbc: http://turbodbc.readthedocs.io/en/latest/
Ясно, что существует множество параметров между потоками между пандами .to_sql(), запускающих fast_executemany через sqlalchemy, непосредственного использования pyodbc с кортежами /lists/etc. Или даже попыткой BULK UPLOAD с плоскими файлами.
Надеемся, что следующее может сделать жизнь немного приятнее, так как функциональность развивается в текущем проекте Pandas или включает что-то вроде интеграции с Turbodbc в будущем.
import pandas as pd
import numpy as np
from turbodbc import connect, make_options
from io import StringIO
test_data = '''id,transaction_dt,units,measures
1,2018-01-01,4,30.5
1,2018-01-03,4,26.3
2,2018-01-01,3,12.7
2,2018-01-03,3,8.8'''
df_test = pd.read_csv(StringIO(test_data), sep=',')
df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])
options = make_options(parameter_sets_to_buffer=1000)
conn = connect(driver='{SQL Server}', server='server_nm', database='db_nm', turbodbc_options=options)
test_query = '''DROP TABLE IF EXISTS [db_name].[schema].[test]
CREATE TABLE [db_name].[schema].[test]
(
id int NULL,
transaction_dt datetime NULL,
units int NULL,
measures float NULL
)
INSERT INTO [db_name].[schema].[test] (id,transaction_dt,units,measures)
VALUES (?,?,?,?) '''
cursor.executemanycolumns(test_query, [df_test['id'].values, df_test['transaction_dt'].values, df_test['units'].values, df_test['measures'].values]
turbodbc должен быть ОЧЕНЬ быстрым во многих случаях использования (особенно с массивами numpy). Пожалуйста, обратите внимание, насколько просто передать лежащие в основе массивы из столбцов данных в качестве параметров непосредственно в запрос. Я также считаю, что это помогает предотвратить создание промежуточных объектов, которые чрезмерно увеличивают потребление памяти. Надеюсь, что это полезно!
Ответ 4
Я столкнулся с той же проблемой, но с использованием PostgreSQL. Теперь они просто выпускают версию 0.24.0 для панд, и в функции to_sql
появился новый параметр method
который решил мою проблему.
from sqlalchemy import create_engine
engine = create_engine(your_options)
data_frame.to_sql(table_name, engine, method="multi")
Скорость загрузки в 100 раз выше для меня. Я также рекомендую установить параметр chunksize
если вы собираетесь отправлять много данных.
Ответ 5
Похоже, что Pandas 0.23.0 и 0.24.0 используют вставки нескольких значений с PyODBC, что не позволяет быстрому выполнению помочь - один оператор INSERT... VALUES...
испускается для каждого чанка. Чанки вставки с несколькими значениями являются улучшением по сравнению со старым по умолчанию медленным выполнением, но, по крайней мере, в простых тестах метод быстрого выполнения все еще преобладает, не говоря уже о том, что нет необходимости в ручных вычислениях chunksize
, как это требуется для вставок с несколькими значениями. Принудительное старое поведение может быть выполнено с помощью monkeypatching, если в будущем не будет предоставлена опция конфигурации:
import pandas.io.sql
def insert_statement(self, data, conn):
return self.table.insert(), data
pandas.io.sql.SQLTable.insert_statement = insert_statement
Будущее уже здесь, и, по крайней мере, в master
ветке метод вставки можно контролировать с помощью ключевого слова аргумент method=
of to_sql()
. По умолчанию None
, что вызывает метод executemany. Передача method='multi'
приводит к использованию вставки с несколькими значениями. Он может даже использоваться для реализации конкретных подходов к СУБД, таких как Postgresql COPY
.
Ответ 6
Как указано @Pylander
На сегодняшний день Turbodbc - лучший выбор для приема данных!
Я так обрадовался этому, что написал "блог" на своем github и medium: пожалуйста, проверьте https://medium.com/@erickfis/etl-process-with-turbodbc-1d19ed71510e
за рабочий пример и сравнение с pandas.to_sql
Короче,
с turbodbc у меня 10000 строк (77 столбцов) за 3 секунды
с pandas.to_sql я получил те же 10000 строк (77 столбцов) за 198 секунд...
И вот что я делаю в деталях
Импорт:
import sqlalchemy
import pandas as pd
import numpy as np
import turbodbc
import time
Загрузите и обработайте некоторые данные - замените мой sample.pkl своим:
df = pd.read_pickle('sample.pkl')
df.columns = df.columns.str.strip() # remove white spaces around column names
df = df.applymap(str.strip) # remove white spaces around values
df = df.replace('', np.nan) # map nans, to drop NAs rows and columns later
df = df.dropna(how='all', axis=0) # remove rows containing only NAs
df = df.dropna(how='all', axis=1) # remove columns containing only NAs
df = df.replace(np.nan, 'NA') # turbodbc hates null values...
Создайте таблицу с помощью sqlAlchemy
К сожалению, turbodbc требует больших накладных расходов с большим количеством ручного труда sql, для создания таблиц и для вставки данных в него.
К счастью, Python является чистой радостью, и мы можем автоматизировать этот процесс написания кода SQL.
Первым шагом является создание таблицы, которая будет получать наши данные. Однако создание таблицы с ручным написанием кода SQL может быть проблематичным, если в вашей таблице более нескольких столбцов. В моем случае очень часто таблицы имеют 240 столбцов!
В этом нам могут помочь sqlAlchemy и pandas: pandas плохо подходит для записи большого количества строк (в нашем примере это 10000), но как насчет всего 6 строк, заголовка таблицы? Таким образом, мы автоматизируем процесс создания таблиц.
Создать sqlAlchemy соединение:
mydb = 'someDB'
def make_con(db):
"""Connect to a specified db."""
database_connection = sqlalchemy.create_engine(
'mssql+pymssql://{0}:{1}@{2}/{3}'.format(
myuser, mypassword,
myhost, db
)
)
return database_connection
pd_connection = make_con(mydb)
Создать таблицу на SQL Server
Использование pandas + sqlAlchemy, но только для подготовки комнаты для turbodbc, как упоминалось ранее. Обратите внимание, что здесь df.head(): мы используем pandas + sqlAlchemy для вставки только 6 строк наших данных. Это будет выполняться довольно быстро и делается для автоматизации создания таблицы.
table = 'testing'
df.head().to_sql(table, con=pd_connection, index=False)
Теперь, когда стол уже на месте, давайте серьезно.
Turbodbc соединение:
def turbo_conn(mydb):
"""Connect to a specified db - turbo."""
database_connection = turbodbc.connect(
driver='ODBC Driver 17 for SQL Server',
server=myhost,
database=mydb,
uid=myuser,
pwd=mypassword
)
return database_connection
Подготовка sql команд и данных для turbodbc. Позволяет автоматизировать создание этого кода, будучи креативным:
def turbo_write(mydb, df, table):
"""Use turbodbc to insert data into sql."""
start = time.time()
# preparing columns
colunas = '('
colunas += ', '.join(df.columns)
colunas += ')'
# preparing value place holders
val_place_holder = ['?' for col in df.columns]
sql_val = '('
sql_val += ', '.join(val_place_holder)
sql_val += ')'
# writing sql query for turbodbc
sql = f"""
INSERT INTO {mydb}.dbo.{table} {colunas}
VALUES {sql_val}
"""
# writing array of values for turbodbc
valores_df = [df[col].values for col in df.columns]
# cleans the previous head insert
with connection.cursor() as cursor:
cursor.execute(f"delete from {mydb}.dbo.{table}")
connection.commit()
# inserts data, for real
with connection.cursor() as cursor:
try:
cursor.executemanycolumns(sql, valores_df)
connection.commit()
except Exception:
connection.rollback()
print('something went wrong')
stop = time.time() - start
return print(f'finished in {stop} seconds')
Запись данных с использованием turbodbc - Ive получил 10000 строк (77 столбцов) за 3 секунды:
turbo_write(mydb, df.sample(10000), table)
Сравнение метода Панд - у меня те же 10000 строк (77 столбцов) за 198 секунд...
table = 'pd_testing'
def pandas_comparisson(df, table):
"""Load data using pandas."""
start = time.time()
df.to_sql(table, con=pd_connection, index=False)
stop = time.time() - start
return print(f'finished in {stop} seconds')
pandas_comparisson(df.sample(10000), table)
Окружающая среда и условия
Python 3.6.7 :: Anaconda, Inc.
TURBODBC version ‘3.0.0
sqlAlchemy version ‘1.2.12
pandas version ‘0.23.4
Microsoft SQL Server 2014
user with bulk operations privileges
Пожалуйста, проверьте https://erickfis.github.io/loose-code/ для обновлений в этом коде!
Ответ 7
Производительность SQL Server INSERT: pyodbc против turbodbc
При использовании to_sql
для загрузки DataFrame pandas на SQL Server, turbodbc определенно будет быстрее, чем pyodbc без fast_executemany
. Однако при включенном fast_executemany
для pyodbc оба подхода дают практически одинаковую производительность.
Тестовые среды:
[Venv1_pyodbc]
pyodbc 2.0.25
[Venv2_turbodbc]
turbodbc 3.0.0
sqlalchemy-turbodbc 0.1.0
[общий для обоих]
Python 3.6.4 64-битный на Windows
SQLAlchemy 1.3.0b1
Панды 0.23.4
numpy 1.15.4
Тестовый код:
# for pyodbc
engine = create_engine('mssql+pyodbc://sa:[email protected]_panorama', fast_executemany=True)
# for turbodbc
# engine = create_engine('mssql+turbodbc://sa:[email protected]_panorama')
# test data
num_rows = 10000
num_cols = 100
df = pd.DataFrame(
[[f'row{x:04}col{y:03}' for y in range(num_cols)] for x in range(num_rows)],
columns=[f'col{y:03}' for y in range(num_cols)]
)
t0 = time.time()
df.to_sql("sqlalchemy_test", engine, if_exists='replace', index=None)
print(f"pandas wrote {num_rows} rows in {(time.time() - t0):0.1f} seconds")
Тесты проводились двенадцать (12) раз для каждой среды, отбрасывая лучшие и худшие времена для каждой среды. Результаты (в секундах):
rank pyodbc turbodbc
---- ------ --------
1 22.8 27.5
2 23.4 28.1
3 24.6 28.2
4 25.2 28.5
5 25.7 29.3
6 26.9 29.9
7 27.0 31.4
8 30.1 32.1
9 33.6 32.5
10 39.8 32.9
---- ------ --------
average 27.9 30.0
Ответ 8
Просто хотел добавить в ответ @JK.
Если вы используете этот подход:
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
if executemany:
cursor.fast_executemany = True
И вы получаете эту ошибку:
"sqlalchemy.exc.DBAPIError: (pyodbc.Error) ('HY010', '[HY010] [Microsoft] [собственный клиент SQL Server 11.0] Ошибка последовательности функций (0) (SQLParamData)') [SQL: 'INSERT INTO... (...) VALUES (?,?) '] [Parameters: ((...,...), (...,...)] (Справочная информация об этой ошибке по адресу: http://sqlalche.me/e/dbapi) "
Кодируйте строковые значения, например, так: 'yourStringValue'.encode('ascii')
Это решит вашу проблему.