Вставить данные в MySQL в несколько таблиц в С# эффективно
Мне нужно вставить огромный CSV файл в 2 таблицы с соотношением 1: n в базе данных mySQL.
CSV файл поступает еженедельно и имеет около 1 ГБ, что необходимо добавить к существующим данным.
Каждая из двух таблиц имеет первичный ключ Auto increment.
Я пробовал:
- Entity Framework (занимает большинство времени всех подходов)
- Наборы данных (такие же)
- Массовая загрузка (не поддерживает несколько таблиц)
- MySqlCommand с параметрами (должен быть вложен, мой текущий подход)
- MySqlCommand со StoredProcedure, включая транзакцию
Другие предложения?
Пусть упрощенно это моя структура данных:
public class User
{
public string FirstName { get; set; }
public string LastName { get; set; }
public List<string> Codes { get; set; }
}
Мне нужно вставить из csv в эту базу данных:
User (1-n) Code
+---+-----+-----+ +---+---+-----+
|PID|FName|LName| |CID|PID|Code |
+---+-----+-----+ +---+---+-----+
| 1 |Jon | Foo | | 1 | 1 | ed3 |
| 2 |Max | Foo | | 2 | 1 | wst |
| 3 |Paul | Foo | | 3 | 2 | xsd |
+---+-----+-----+ +---+---+-----+
Здесь пример строки CSV файла
Jon;Foo;ed3,wst
A Массовая загрузка, например LOAD DATA LOCAL INFILE
, невозможна, потому что у меня ограниченные права на запись
Ответы
Ответ 1
Учитывая большой размер данных, наилучшим подходом (с точки зрения производительности) является то, что большая часть данных обрабатывается в базе данных, а не в приложении.
Создайте временную таблицу, в которой данные из файла .csv будут временно сохранены.
CREATE TABLE `imported` (
`id` int(11) NOT NULL,
`firstname` varchar(45) DEFAULT NULL,
`lastname` varchar(45) DEFAULT NULL,
`codes` varchar(450) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Загрузка данных из .csv
в эту таблицу довольно проста. Я бы предложил использовать MySqlCommand
(который также является вашим текущим подходом). Кроме того, , используя тот же MySqlConnection
объект для всех операторов INSERT
, сократит общее время выполнения.
Затем, чтобы обработать данные, вы можете создать хранимую процедуру, которая будет обрабатывать ее.
Предполагая эти две таблицы (взятые из упрощенного примера):
CREATE TABLE `users` (
`PID` int(11) NOT NULL AUTO_INCREMENT,
`FName` varchar(45) DEFAULT NULL,
`LName` varchar(45) DEFAULT NULL,
PRIMARY KEY (`PID`)
) ENGINE=InnoDB AUTO_INCREMENT=3737 DEFAULT CHARSET=utf8;
и
CREATE TABLE `codes` (
`CID` int(11) NOT NULL AUTO_INCREMENT,
`PID` int(11) DEFAULT NULL,
`code` varchar(45) DEFAULT NULL,
PRIMARY KEY (`CID`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8;
у вас может быть следующая хранимая процедура.
CREATE DEFINER=`root`@`localhost` PROCEDURE `import_data`()
BEGIN
DECLARE fname VARCHAR(255);
DECLARE lname VARCHAR(255);
DECLARE codesstr VARCHAR(255);
DECLARE splitted_value VARCHAR(255);
DECLARE done INT DEFAULT 0;
DECLARE newid INT DEFAULT 0;
DECLARE occurance INT DEFAULT 0;
DECLARE i INT DEFAULT 0;
DECLARE cur CURSOR FOR SELECT firstname,lastname,codes FROM imported;
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1;
OPEN cur;
import_loop:
LOOP FETCH cur INTO fname, lname, codesstr;
IF done = 1 THEN
LEAVE import_loop;
END IF;
INSERT INTO users (FName,LName) VALUES (fname, lname);
SET newid = LAST_INSERT_ID();
SET i=1;
SET occurance = (SELECT LENGTH(codesstr) - LENGTH(REPLACE(codesstr, ',', '')) + 1);
WHILE i <= occurance DO
SET splitted_value =
(SELECT REPLACE(SUBSTRING(SUBSTRING_INDEX(codesstr, ',', i),
LENGTH(SUBSTRING_INDEX(codesstr, ',', i - 1)) + 1), ',', ''));
INSERT INTO codes (PID, code) VALUES (newid, splitted_value);
SET i = i + 1;
END WHILE;
END LOOP;
CLOSE cur;
END
Для каждой строки исходных данных она делает оператор INSERT
для таблицы user
. Затем существует цикл WHILE
для разделения разделенных запятыми кодов и для каждого из них - оператор INSERT
для таблицы codes
.
Что касается использования LAST_INSERT_ID()
, он надежен на основе PER CONNECTION (см. документ здесь). Если соединение MySQL, используемое для запуска этой хранимой процедуры, не используется другими транзакциями, использование LAST_INSERT_ID()
безопасно.
Идентификатор, который был сгенерирован, поддерживается на сервере для каждого подключения. Это означает, что значение, возвращаемое функцией для данного клиента, является первым значением AUTO_INCREMENT, сгенерированным для самого последнего оператора, влияющим на столбец AUTO_INCREMENT этого клиента. Это значение не может повлиять на других клиентов, даже если они генерируют собственные значения AUTO_INCREMENT. Такое поведение гарантирует, что каждый клиент может получить свой собственный идентификатор, не заботясь о деятельности других клиентов, и без необходимости блокировок или транзакций.
Изменить. Здесь представлен вариант OP, который опускает временную таблицу imported
. Вместо того, чтобы вставлять данные из таблицы .csv в таблицу imported
, вы вызываете SP для непосредственного хранения их в вашей базе данных.
CREATE DEFINER=`root`@`localhost` PROCEDURE `import_data`(IN fname VARCHAR(255), IN lname VARCHAR(255),IN codesstr VARCHAR(255))
BEGIN
DECLARE splitted_value VARCHAR(255);
DECLARE done INT DEFAULT 0;
DECLARE newid INT DEFAULT 0;
DECLARE occurance INT DEFAULT 0;
DECLARE i INT DEFAULT 0;
INSERT INTO users (FName,LName) VALUES (fname, lname);
SET newid = LAST_INSERT_ID();
SET i=1;
SET occurance = (SELECT LENGTH(codesstr) - LENGTH(REPLACE(codesstr, ',', '')) + 1);
WHILE i <= occurance DO
SET splitted_value =
(SELECT REPLACE(SUBSTRING(SUBSTRING_INDEX(codesstr, ',', i),
LENGTH(SUBSTRING_INDEX(codesstr, ',', i - 1)) + 1), ',', ''));
INSERT INTO codes (PID, code) VALUES (newid, splitted_value);
SET i = i + 1;
END WHILE;
END
Примечание. Код для разделения кодов берется из здесь (MySQL не предоставляет функцию разделения для строки).
Ответ 2
Ссылаясь на ваш ответ, я бы заменил
using (MySqlCommand myCmdNested = new MySqlCommand(cCommand, mConnection))
{
foreach (string Code in item.Codes)
{
myCmdNested.Parameters.Add(new MySqlParameter("@UserID", UID));
myCmdNested.Parameters.Add(new MySqlParameter("@Code", Code));
myCmdNested.ExecuteNonQuery();
}
}
с
List<string> lCodes = new List<string>();
foreach (string code in item.Codes)
{
lCodes.Add(String.Format("('{0}','{1}')", UID, MySqlHelper.EscapeString(code)));
}
string cCommand = "INSERT INTO Code (UserID, Code) VALUES " + string.Join(",", lCodes);
using (MySqlCommand myCmdNested = new MySqlCommand(cCommand, mConnection))
{
myCmdNested.ExecuteNonQuery();
}
который генерирует один оператор insert вместо item.Count
Ответ 3
Я разработал приложение приложения WPF с использованием Entity Framework и использовал базу данных SQL Server и должен был читать данные из файла excel и должен был вставить эти данные в 2 таблицы, которые имеют отношения между ними. Примерно за 15000 строк в excel он занимал около 4 часов. Затем я использовал блок из 500 строк для каждой вставки, и это ускорило мою установку в unbelievalbe быстро, и теперь для импорта этих же данных требуется всего 3-5 секунд.
Итак, я предлагаю вам добавить ваши строки в контекст, например, 100/200/500, а затем вызвать метод SaveChanges (если вы действительно хотите использовать EF). Есть и другие полезные советы, чтобы ускорить работу EF. Пожалуйста, прочитайте этот для справки.
var totalRecords = TestPacksData.Rows.Count;
var totalPages = (totalRecords / ImportRecordsPerPage) + 1;
while (count <= totalPages)
{
var pageWiseRecords = TestPacksData.Rows.Cast<DataRow>().Skip(count * ImportRecordsPerPage).Take(ImportRecordsPerPage);
count++;
Project.CreateNewSheet(pageWiseRecords.ToList());
Project.CreateNewSpool(pageWiseRecords.ToList());
}
И вот метод CreateNewSheet
/// <summary>
/// Creates a new Sheet record in the database
/// </summary>
/// <param name="row">DataRow containing the Sheet record</param>
public void CreateNewSheet(List<DataRow> rows)
{
var tempSheetsList = new List<Sheet>();
foreach (var row in rows)
{
var sheetNo = row[SheetFields.Sheet_No.ToString()].ToString();
if (string.IsNullOrWhiteSpace(sheetNo))
continue;
var testPackNo = row[SheetFields.Test_Pack_No.ToString()].ToString();
TestPack testPack = null;
if (!string.IsNullOrWhiteSpace(testPackNo))
testPack = GetTestPackByTestPackNo(testPackNo);
var existingSheet = GetSheetBySheetNo(sheetNo);
if (existingSheet != null)
{
UpdateSheet(existingSheet, row);
continue;
}
var isometricNo = GetIsometricNoFromSheetNo(sheetNo);
var newSheet = new Sheet
{
sheet_no = sheetNo,
isometric_no = isometricNo,
ped_rev = row[SheetFields.PED_Rev.ToString()].ToString(),
gpc_rev = row[SheetFields.GPC_Rev.ToString()].ToString()
};
if (testPack != null)
{
newSheet.test_pack_id = testPack.id;
newSheet.test_pack_no = testPack.test_pack_no;
}
if (!tempSheetsList.Any(l => l.sheet_no == newSheet.sheet_no))
{
DataStore.Context.Sheets.Add(newSheet);
tempSheetsList.Add(newSheet);
}
}
try
{
DataStore.Context.SaveChanges();
**DataStore.Dispose();** This is very important. Dispose the context
}
catch (DbEntityValidationException ex)
{
// Create log for the exception here
}
}
CreateNewSpool - это тот же метод, за исключением имени поля и имени таблицы, поскольку он обновляет дочернюю таблицу. Но идея такая же
Ответ 4
1 - добавьте столбец VirtualId в таблицу User
и .
EDITED
2 - Назначьте числа в цикле для VirtualId (используйте отрицательные числа, начиная с -1, чтобы избежать столкновений в последнем шаге) в каждом объекте User
. Для каждого объекта Code c
, принадлежащего объекту User u
, установите c.UserId = u.VirtualId
.
3 - Массовая загрузка Пользователей в таблицу User
, Массовая загрузка Коды в таблицу Code
.
4- UPDATE CODE C,USER U SET C.UserId = U.Id WHERE C.UserId = U.VirtualId.
ПРИМЕЧАНИЕ. Если у вас есть ограничение FK на Code.UserId, вы можете удалить его и повторно добавить после вставки.
public class User
{
public int Id { get; set; }
public string FirstName { get; set; }
public string LastName { get; set; }
public int VirtualId { get; set; }
}
public class Code
{
public int Id { get; set; }
public string Code { get; set; }
public string UserId { get; set; }
}
Ответ 5
Можете ли вы разбить CSV на два файла?
например. Предположим, что ваш файл имеет следующие столбцы:
... A ... | ... B ...
a0 | b0
a0 | b1
a0 | b2 <-- data
a1 | b3
a1 | b4
Таким образом, один набор из A может иметь несколько записей B. После того, как вы разделите его, вы получите:
... A ...
a0
a1
... B ...
b0
b1
b2
b3
b4
Затем вы вставляете их отдельно.
Изменить: псевдокод
Основываясь на разговоре, что-то вроде:
DataTable tableA = ...; // query schema for TableA
DataTable tableB = ...; // query schmea for TableB
List<String> usernames = select distinct username from TableA;
Hashtable htUsername = new Hashtable(StringComparer.InvariantCultureIgnoreCase);
foreach (String username in usernames)
htUsername[username] = "";
int colUsername = ...;
foreach (String[] row in CSVFile) {
String un = row[colUsername] as String;
if (htUsername[un] == null) {
// add new row to tableA
DataRow row = tableA.NewRow();
row["Username"] = un;
// etc.
tableA.Rows.Add(row);
htUsername[un] = "";
}
}
// bulk insert TableA
select userid, username from TableA
Hashtable htUserId = new Hashtable(StringComparer.InvariantCultureIgnoreCase);
// htUserId[username] = userid;
int colUserId = ...;
foreach (String[] row in CSVFile) {
String un = row[colUsername] as String;
int userid = (int) htUserId[un];
DataRow row = tableB.NewRow();
row[colUserId] = userId;
// fill in other values
tableB.Rows.Add(row);
if (table.Rows.Count == 65000) {
// bulk insert TableB
var t = tableB.Clone();
tableB.Dispose();
tableB = t;
}
}
if (tableB.Rows.Count > 0)
// bulk insert TableB
Ответ 6
AFAIK вставки, сделанные в таблице, являются последовательными, а вставки в другой таблице могут выполняться параллельно. Откройте два отдельных новых подключения к одной и той же базе данных, а затем добавьте их параллельно, возможно, используя параллельную библиотеку задач.
Однако, если существуют ограничения целостности между отношениями 1: n между таблицами, тогда:
- Вставки могут завершиться ошибкой, и, следовательно, любой подход с параллельной вставкой будет неправильным. Ясно, что лучше всего будет делать только последовательные вставки, одну таблицу за другой.
- Вы можете попробовать и отсортировать данные обеих таблиц, напишите метод
InsertInto
, написанный ниже, так что вставка во вторую таблицу произойдет только после того, как вы закончите вставлять данные в первую.
Изменить:. Поскольку вы запросили, если есть возможность для параллельного выполнения вставок, следуйте за шаблоном кода, который вы можете использовать.
private void ParallelInserts()
{
..
//Other code in the method
..
//Read first csv into memory. It just a GB so should be fine
ReadFirstCSV();
//Read second csv into memory...
ReadSecondCSV();
//Because the inserts will last more than a few CPU cycles...
var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None)
//An array to hold the two parallel inserts
var insertTasks = new Task[2];
//Begin insert into first table...
insertTasks[0] = taskFactory.StartNew(() => InsertInto(commandStringFirst, connectionStringFirst));
//Begin insert into second table...
insertTasks[1] = taskFactory.StartNew(() => InsertInto(commandStringSecond, connectionStringSecond));
//Let them be done...
Task.WaitAll(insertTasks);
Console.WriteLine("Parallel insert finished.");
}
//Defining the InsertInto method which we are passing to the tasks in the method above
private static void InsertInto(string commandString, string connectionString)
{
using (/*open a new connection using the connectionString passed*/)
{
//In a while loop, iterate until you have 100/200/500 rows
while (fileIsNotExhausted)
{
using (/*commandString*/)
{
//Execute command to insert in bulk
}
}
}
}
Ответ 7
Когда вы говорите "эффективно", вы говорите о памяти или времени?
Что касается повышения скорости вставки, если вы можете сделать несколько блоков значений в инструкции вставки, вы можете получить 500% -ное улучшение скорости. В этом вопросе я сделал несколько тестов: Что происходит быстрее: несколько одиночных INSERT или один многострочный INSERT?
Мой подход описывается в ответе, но просто помещается, считывая до 50 "строк" (для вставки) сразу и связывая их с одним выражением типа INSERT INTO(...), VALUES(...),(...),(...)...(...),(...)
, похоже, действительно ускоряет работу. По крайней мере, если вам не разрешено загружать навалом.
Другой подход btw, если у вас есть живые данные, которые вы не можете удалить индексы во время загрузки, заключается в создании таблицы памяти на сервере mysql без индексов, выгружении данных там, а затем выполните INSERT INTO live SELECT * FROM mem
. Хотя это использует больше памяти на сервере, поэтому вопрос в начале этого ответа о том, "что вы подразумеваете под" эффективно "?":)
О, и, вероятно, нет ничего плохого в том, чтобы итерации через файл и сначала сделать все первые вставки таблицы, а затем сделать вторую таблицу. Думаю, если данные не будут использованы вживую. В этом случае вы определенно можете использовать наборный подход, но логика приложения для этого намного сложнее.
UPDATE:. Пример запроса кода С# для многозначных блоков вставки.
Примечание: этот код предполагает, что у вас уже настроено несколько структур:
- таблицы Список <string> - имена таблиц для вставки в
- список полей Словарь < string, List <String → → - список имен полей для каждой таблицы
- typeslist Словарь < string, List <MySqlDbType → → - список
MySqlDbType
для каждой таблицы, в том же порядке, что и имена полей.
- nullslist Словарь < string, List <Boolean → → - список флагов, указывающий, является ли поле нулевым или нет, для каждой таблицы (тот же порядок, что и имена полей).
- prikey Словарь < string, string > - список имени поля первичного ключа для каждой таблицы (обратите внимание: это не поддерживает несколько первичных ключей поля, хотя, если вам это нужно, вы могли бы взломать его - я думаю, где-то у меня есть версия, которая поддерживает это, но... meh).
- словарь Словарь < string, List < Dictionary < int, object → → → → - фактические данные, в виде списка словарей значения fieldnum, для каждой таблицы.
О да, и локальная команда - это MySqlCommand, созданная с помощью CreateCommand() на локальном объекте MySqlConnection.
Далее: я написал это довольно давно, когда я начинал. Если это заставляет глаза или мозг кровоточить, я заранее извиняюсь:)
const int perinsert = 50;
foreach (string table in tables)
{
string[] fields = fieldslist[table].ToArray();
MySqlDbType[] types = typeslist[table].ToArray();
bool[] nulls = nullslist[table].ToArray();
int thisblock = perinsert;
int rowstotal = theData[table].Count;
int rowsremainder = rowstotal % perinsert;
int rowscopied = 0;
// Do the bulk (multi-VALUES block) INSERTs, but only if we have more rows than there are in a single bulk insert to perform:
while (rowscopied < rowstotal)
{
if (rowstotal - rowscopied < perinsert)
thisblock = rowstotal - rowscopied;
// Generate a 'perquery' multi-VALUES prepared INSERT statement:
List<string> extravals = new List<string>();
for (int j = 0; j < thisblock; j++)
extravals.Add(String.Format("(@{0}_{1})", j, String.Join(String.Format(", @{0}_", j), fields)));
localcmd.CommandText = String.Format("INSERT INTO {0} VALUES{1}", tmptable, String.Join(",", extravals.ToArray()));
// Now create the parameters to match these:
for (int j = 0; j < thisblock; j++)
for (int i = 0; i < fields.Length; i++)
localcmd.Parameters.Add(String.Format("{0}_{1}", j, fields[i]), types[i]).IsNullable = nulls[i];
// Keep doing bulk INSERTs until there less rows left than we need for another one:
while (rowstotal - rowscopied >= thisblock)
{
// Queue up all the VALUES for this block INSERT:
for (int j = 0; j < thisblock; j++)
{
Dictionary<int, object> row = theData[table][rowscopied++];
for (int i = 0; i < fields.Length; i++)
localcmd.Parameters[String.Format("{0}_{1}", j, fields[i])].Value = row[i];
}
// Run the query:
localcmd.ExecuteNonQuery();
}
// Clear all the paramters - we're done here:
localcmd.Parameters.Clear();
}
}