Массовая вставка A Pandas DataFrame с использованием SQLAlchemy
У меня есть довольно большие pandas DataFrames, и я бы хотел использовать новые массовые сопоставления SQL для загрузки их на Microsoft SQL Server через SQL Alchemy. Метод pandas.to_sql, хотя и приятный, медленный.
У меня возникли проблемы с написанием кода...
Мне бы хотелось передать эту функцию pandas DataFrame, которую я вызываю table
, имя схемы, которую я вызываю schema
, и имя таблицы, которую я вызываю name
, В идеале, функция будет 1.) удалять таблицу, если она уже существует. 2.) создать новую таблицу 3.) создать картограф и 4.) объемную вставку с использованием данных mapper и pandas. Я застрял на третьей части.
Вот мой (правда, грубый) код. Я борюсь с тем, как заставить функцию mapper работать с моими первичными ключами. Мне действительно не нужны первичные ключи, но функция mapper требует этого.
Спасибо за понимание.
from sqlalchemy import create_engine Table, Column, MetaData
from sqlalchemy.orm import mapper, create_session
from sqlalchemy.ext.declarative import declarative_base
from pandas.io.sql import SQLTable, SQLDatabase
def bulk_upload(table, schema, name):
e = create_engine('mssql+pyodbc://MYDB')
s = create_session(bind=e)
m = MetaData(bind=e,reflect=True,schema=schema)
Base = declarative_base(bind=e,metadata=m)
t = Table(name,m)
m.remove(t)
t.drop(checkfirst=True)
sqld = SQLDatabase(e, schema=schema,meta=m)
sqlt = SQLTable(name, sqld, table).table
sqlt.metadata = m
m.create_all(bind=e,tables=[sqlt])
class MyClass(Base):
return
mapper(MyClass, sqlt)
s.bulk_insert_mappings(MyClass, table.to_dict(orient='records'))
return
Ответы
Ответ 1
Я столкнулся с аналогичной проблемой, когда pd.to_sql занимал часы для загрузки данных. В приведенной ниже сумме кода вставлялись одни и те же данные за несколько секунд.
from sqlalchemy import create_engine
import psycopg2 as pg
#load python script that batch loads pandas df to sql
import cStringIO
address = 'postgresql://<username>:<pswd>@<host>:<port>/<database>'
engine = create_engine(address)
connection = engine.raw_connection()
cursor = connection.cursor()
#df is the dataframe containing an index and the columns "Event" and "Day"
#create Index column to use as primary key
df.reset_index(inplace=True)
df.rename(columns={'index':'Index'}, inplace =True)
#create the table but first drop if it already exists
command = '''DROP TABLE IF EXISTS localytics_app2;
CREATE TABLE localytics_app2
(
"Index" serial primary key,
"Event" text,
"Day" timestamp without time zone,
);'''
cursor.execute(command)
connection.commit()
#stream the data using 'to_csv' and StringIO(); then use sql 'copy_from' function
output = cStringIO.StringIO()
#ignore the index
df.to_csv(output, sep='\t', header=False, index=False)
#jump to start of stream
output.seek(0)
contents = output.getvalue()
cur = connection.cursor()
#null values become ''
cur.copy_from(output, 'localytics_app2', null="")
connection.commit()
cur.close()
Ответ 2
К этому, возможно, ответили тогда, но я нашел решение, сопоставив разные ответы на этом сайте и совместив с документом SQLAlchemy.
- Стол должен существовать в db1; с индексом, установленным с auto_increment on.
- Класс Current должен согласовываться с файловой рамкой, импортированной в CSV, и таблицей в db1.
Надеюсь, что это поможет любому, кто приходит сюда, и хочет быстро перемещать Panda и SQLAlchemy.
from urllib import quote_plus as urlquote
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Numeric
from sqlalchemy.orm import sessionmaker
import pandas as pd
# Set up of the engine to connect to the database
# the urlquote is used for passing the password which might contain special characters such as "/"
engine = create_engine('mysql://root:%[email protected]/db1' % urlquote('weirdPassword*withsp€cialcharacters'), echo=False)
conn = engine.connect()
Base = declarative_base()
#Declaration of the class in order to write into the database. This structure is standard and should align with SQLAlchemy doc.
class Current(Base):
__tablename__ = 'tableName'
id = Column(Integer, primary_key=True)
Date = Column(String(500))
Type = Column(String(500))
Value = Column(Numeric())
def __repr__(self):
return "(id='%s', Date='%s', Type='%s', Value='%s')" % (self.id, self.Date, self.Type, self.Value)
# Set up of the table in db and the file to import
fileToRead = 'file.csv'
tableToWriteTo = 'tableName'
# Panda to create a lovely dataframe
df_to_be_written = pd.read_csv(fileToRead)
# The orient='records' is the key of this, it allows to align with the format mentioned in the doc to insert in bulks.
listToWrite = df_to_be_written.to_dict(orient='records')
metadata = sqlalchemy.schema.MetaData(bind=engine,reflect=True)
table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True)
# Open the session
Session = sessionmaker(bind=engine)
session = Session()
# Inser the dataframe into the database in one bulk
conn.execute(table.insert(), listToWrite)
# Commit the changes
session.commit()
# Close the session
session.close()
Ответ 3
На основании ответов @ansonw:
def to_sql(engine, df, table, if_exists='fail', sep='\t', encoding='utf8'):
# Create Table
df[:0].to_sql(table, engine, if_exists=if_exists)
# Prepare data
output = cStringIO.StringIO()
df.to_csv(output, sep=sep, header=False, encoding=encoding)
output.seek(0)
# Insert data
connection = engine.raw_connection()
cursor = connection.cursor()
cursor.copy_from(output, table, sep=sep, null='')
connection.commit()
cursor.close()
Я вставляю 200000 строк за 5 секунд вместо 4 минут
Ответ 4
Мое решение для postgres ниже автоматически создает таблицу базы данных, используя ваш фрейм данных pandas, и выполняет быструю COPY my_table FROM...
вставку с использованием postgres COPY my_table FROM...
import io
import pandas as pd
from sqlalchemy import create_engine
def write_to_table(df, db_engine, schema, table_name, if_exists='fail'):
string_data_io = io.StringIO()
df.to_csv(string_data_io, sep='|', index=False)
pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine, schema=schema)
table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df,
index=False, if_exists=if_exists, schema=schema)
table.create()
string_data_io.seek(0)
string_data_io.readline() # remove header
with db_engine.connect() as connection:
with connection.connection.cursor() as cursor:
copy_cmd = "COPY %s.%s FROM STDIN HEADER DELIMITER '|' CSV" % (schema, table_name)
cursor.copy_expert(copy_cmd, string_data_io)
connection.connection.commit()
Ответ 5
Поскольку это тяжелая рабочая нагрузка ввода-вывода, вы также можете использовать модуль потоковой передачи python через multiprocessing.dummy. Это ускорило меня:
import math
from multiprocessing.dummy import Pool as ThreadPool
...
def insert_df(df, *args, **kwargs):
nworkers = 4
chunksize = math.floor(df.shape[0] / nworkers)
chunks = [(chunksize * i, (chunksize * i) + chunksize) for i in range(nworkers)]
chunks.append((chunksize * nworkers, df.shape[0]))
pool = ThreadPool(nworkers)
def worker(chunk):
i, j = chunk
df.iloc[i:j, :].to_sql(*args, **kwargs)
pool.map(worker, chunks)
pool.close()
pool.join()
....
insert_df(df, "foo_bar", engine, if_exists='append')
Ответ 6
для таких как я, которые пытаются реализовать вышеупомянутые решения:
У Pandas 0.24.0 теперь есть to_sql с параметром chunksize и method = 'multi', который вставляет навалом...
Ответ 7
Вот простой метод
,
Загрузить драйверы для подключения к базе данных SQL
Для Linux и Mac OS:
https://docs.microsoft.com/en-us/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server?view=sql-server-2017
Для Windows:
https://www.microsoft.com/en-us/download/details.aspx?id=56567
Создание соединения
from sqlalchemy import create_engine
import urllib
server = '*****'
database = '********'
username = '**********'
password = '*********'
params = urllib.parse.quote_plus(
'DRIVER={ODBC Driver 17 for SQL Server};'+
'SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password)
engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)
#Checking Connection
connected = pd.io.sql._is_sqlalchemy_connectable(engine)
print(connected) #Output is True if connection established successfully
Вставка данных
df.to_sql('Table_Name', con=engine, if_exists='append', index=False)
"""
if_exists: {'fail', 'replace', 'append'}, default 'fail'
fail: If table exists, do nothing.
replace: If table exists, drop it, recreate it, and insert data.
append: If table exists, insert data. Create if does not exist.
"""
Если есть много записей
# limit based on sp_prepexec parameter count
tsql_chunksize = 2097 // len(bd_pred_score_100.columns)
# cap at 1000 (limit for number of rows inserted by table-value constructor)
tsql_chunksize = 1000 if tsql_chunksize > 1000 else tsql_chunksize
print(tsql_chunksize)
df.to_sql('table_name', con = engine, if_exists = 'append', index= False, chunksize=tsql_chunksize)
PS: вы можете изменить параметры согласно вашему требованию.
Ответ 8
В Pandas 0.25.1 есть параметр для многократной вставки, поэтому больше нет необходимости обходить эту проблему с помощью SQLAlchemy.
Установите method='multi'
при вызове pandas.DataFrame.to_sql
.
В этом примере это было бы
df.to_sql(table, schema=schema, con=e, index=False, if_exists='replace', method='multi')
Ответ получен из документации здесь
Стоит отметить, что я проверял это только с Redshift. Пожалуйста, дайте мне знать, как идут дела с другими базами данных, чтобы я мог обновить этот ответ.
Ответ 9
Это помогло мне подключиться к базе данных Oracle, используя cx_Oracle и SQLALchemy.
import sqlalchemy
import cx_Oracle
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String
from sqlalchemy.orm import sessionmaker
import pandas as pd
# credentials
username = "username"
password = "password"
connectStr = "connection:/string"
tableName = "tablename"
t0 = time.time()
# connection
dsn = cx_Oracle.makedsn('host','port',service_name='servicename')
Base = declarative_base()
class LANDMANMINERAL(Base):
__tablename__ = 'tablename'
DOCUMENTNUM = Column(String(500), primary_key=True)
DOCUMENTTYPE = Column(String(500))
FILENUM = Column(String(500))
LEASEPAYOR = Column(String(500))
LEASESTATUS = Column(String(500))
PROSPECT = Column(String(500))
SPLIT = Column(String(500))
SPLITSTATUS = Column(String(500))
engine = create_engine('oracle+cx_oracle://%s:%[email protected]%s' % (username, password, dsn))
conn = engine.connect()
Base.metadata.bind = engine
# Creating the session
DBSession = sessionmaker(bind=engine)
session = DBSession()
# Bulk insertion
data = pd.read_csv('data.csv')
lists = data.to_dict(orient='records')
table = sqlalchemy.Table('landmanmineral', Base.metadata, autoreload=True)
conn.execute(table.insert(), lists)
session.commit()
session.close()
print("time taken %8.8f seconds" % (time.time() - t0) )
Ответ 10
Для тех, кто сталкивается с этой проблемой и имеет целевую БД в виде Redshift, обратите внимание, что Redshift не реализует полный набор команд Postgres, и поэтому некоторые ответы, использующие либо COPY FROM
либо copy_from()
Postgres, не будут работать. psycopg2.ProgrammingError: синтаксическая ошибка в или около ошибки "stdin" при попытке copy_fds красного смещения
Решение для ускорения вставки в Redshift заключается в использовании файла Inest или Odo.
Ссылка:
Об Одо http://odo.pydata.org/en/latest/perf.html
Одо с Redshift
https://github.com/blaze/odo/blob/master/docs/source/aws.rst
Redshift COPY (из файла S3)
https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html