Вставить данные в MySQL в несколько таблиц в С# эффективно

Мне нужно вставить огромный CSV файл в 2 таблицы с соотношением 1: n в базе данных mySQL.

CSV файл поступает еженедельно и имеет около 1 ГБ, что необходимо добавить к существующим данным. Каждая из двух таблиц имеет первичный ключ Auto increment.

Я пробовал:

  • Entity Framework (занимает большинство времени всех подходов)
  • Наборы данных (такие же)
  • Массовая загрузка (не поддерживает несколько таблиц)
  • MySqlCommand с параметрами (должен быть вложен, мой текущий подход)
  • MySqlCommand со StoredProcedure, включая транзакцию

Другие предложения?

Пусть упрощенно это моя структура данных:

public class User
{
    public string FirstName { get; set; }
    public string LastName { get; set; }
    public List<string> Codes { get; set; }
}

Мне нужно вставить из csv в эту базу данных:

       User   (1-n)   Code     
+---+-----+-----+ +---+---+-----+        
|PID|FName|LName| |CID|PID|Code | 
+---+-----+-----+ +---+---+-----+
| 1 |Jon  | Foo | | 1 | 1 | ed3 | 
| 2 |Max  | Foo | | 2 | 1 | wst | 
| 3 |Paul | Foo | | 3 | 2 | xsd | 
+---+-----+-----+ +---+---+-----+ 

Здесь пример строки CSV файла

Jon;Foo;ed3,wst

A Массовая загрузка, например LOAD DATA LOCAL INFILE, невозможна, потому что у меня ограниченные права на запись

Ответы

Ответ 1

Учитывая большой размер данных, наилучшим подходом (с точки зрения производительности) является то, что большая часть данных обрабатывается в базе данных, а не в приложении.

Создайте временную таблицу, в которой данные из файла .csv будут временно сохранены.

CREATE TABLE `imported` (
    `id` int(11) NOT NULL,
    `firstname` varchar(45) DEFAULT NULL,
    `lastname` varchar(45) DEFAULT NULL,
    `codes` varchar(450) DEFAULT NULL,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Загрузка данных из .csv в эту таблицу довольно проста. Я бы предложил использовать MySqlCommand (который также является вашим текущим подходом). Кроме того, , используя тот же MySqlConnection объект для всех операторов INSERT , сократит общее время выполнения.

Затем, чтобы обработать данные, вы можете создать хранимую процедуру, которая будет обрабатывать ее.

Предполагая эти две таблицы (взятые из упрощенного примера):

CREATE TABLE `users` (
  `PID` int(11) NOT NULL AUTO_INCREMENT,
  `FName` varchar(45) DEFAULT NULL,
  `LName` varchar(45) DEFAULT NULL,
  PRIMARY KEY (`PID`)
) ENGINE=InnoDB AUTO_INCREMENT=3737 DEFAULT CHARSET=utf8;

и

CREATE TABLE `codes` (
  `CID` int(11) NOT NULL AUTO_INCREMENT,
  `PID` int(11) DEFAULT NULL,
  `code` varchar(45) DEFAULT NULL,
  PRIMARY KEY (`CID`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8;

у вас может быть следующая хранимая процедура.

CREATE DEFINER=`root`@`localhost` PROCEDURE `import_data`()
BEGIN
    DECLARE fname VARCHAR(255);
    DECLARE lname VARCHAR(255);
    DECLARE codesstr VARCHAR(255);
    DECLARE splitted_value VARCHAR(255);
    DECLARE done INT DEFAULT 0;
    DECLARE newid INT DEFAULT 0;
    DECLARE occurance INT DEFAULT 0;
    DECLARE i INT DEFAULT 0;

    DECLARE cur CURSOR FOR SELECT firstname,lastname,codes FROM imported;
    DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1;

    OPEN cur;

    import_loop: 
        LOOP FETCH cur INTO fname, lname, codesstr;
            IF done = 1 THEN
                LEAVE import_loop;
            END IF;

            INSERT INTO users (FName,LName) VALUES (fname, lname);
            SET newid = LAST_INSERT_ID();

            SET i=1;
            SET occurance = (SELECT LENGTH(codesstr) - LENGTH(REPLACE(codesstr, ',', '')) + 1);

            WHILE i <= occurance DO
                SET splitted_value =
                    (SELECT REPLACE(SUBSTRING(SUBSTRING_INDEX(codesstr, ',', i),
                    LENGTH(SUBSTRING_INDEX(codesstr, ',', i - 1)) + 1), ',', ''));

                INSERT INTO codes (PID, code) VALUES (newid, splitted_value);
                SET i = i + 1;
            END WHILE;
        END LOOP;
    CLOSE cur;
END

Для каждой строки исходных данных она делает оператор INSERT для таблицы user. Затем существует цикл WHILE для разделения разделенных запятыми кодов и для каждого из них - оператор INSERT для таблицы codes.

Что касается использования LAST_INSERT_ID(), он надежен на основе PER CONNECTION (см. документ здесь). Если соединение MySQL, используемое для запуска этой хранимой процедуры, не используется другими транзакциями, использование LAST_INSERT_ID() безопасно.

Идентификатор, который был сгенерирован, поддерживается на сервере для каждого подключения. Это означает, что значение, возвращаемое функцией для данного клиента, является первым значением AUTO_INCREMENT, сгенерированным для самого последнего оператора, влияющим на столбец AUTO_INCREMENT этого клиента. Это значение не может повлиять на других клиентов, даже если они генерируют собственные значения AUTO_INCREMENT. Такое поведение гарантирует, что каждый клиент может получить свой собственный идентификатор, не заботясь о деятельности других клиентов, и без необходимости блокировок или транзакций.

Изменить. Здесь представлен вариант OP, который опускает временную таблицу imported. Вместо того, чтобы вставлять данные из таблицы .csv в таблицу imported, вы вызываете SP для непосредственного хранения их в вашей базе данных.

CREATE DEFINER=`root`@`localhost` PROCEDURE `import_data`(IN fname VARCHAR(255), IN lname VARCHAR(255),IN codesstr VARCHAR(255))
BEGIN
    DECLARE splitted_value VARCHAR(255);
    DECLARE done INT DEFAULT 0;
    DECLARE newid INT DEFAULT 0;
    DECLARE occurance INT DEFAULT 0;
    DECLARE i INT DEFAULT 0;

    INSERT INTO users (FName,LName) VALUES (fname, lname);
    SET newid = LAST_INSERT_ID();

    SET i=1;
    SET occurance = (SELECT LENGTH(codesstr) - LENGTH(REPLACE(codesstr, ',', '')) + 1);

    WHILE i <= occurance DO
        SET splitted_value =
            (SELECT REPLACE(SUBSTRING(SUBSTRING_INDEX(codesstr, ',', i),
            LENGTH(SUBSTRING_INDEX(codesstr, ',', i - 1)) + 1), ',', ''));

        INSERT INTO codes (PID, code) VALUES (newid, splitted_value);
        SET i = i + 1;
    END WHILE;
END

Примечание. Код для разделения кодов берется из здесь (MySQL не предоставляет функцию разделения для строки).

Ответ 2

Ссылаясь на ваш ответ, я бы заменил

using (MySqlCommand myCmdNested = new MySqlCommand(cCommand, mConnection))
{
    foreach (string Code in item.Codes)
    {
        myCmdNested.Parameters.Add(new MySqlParameter("@UserID", UID));
        myCmdNested.Parameters.Add(new MySqlParameter("@Code", Code));
        myCmdNested.ExecuteNonQuery();
    }
}

с

List<string> lCodes = new List<string>();
foreach (string code in item.Codes)
{
    lCodes.Add(String.Format("('{0}','{1}')", UID, MySqlHelper.EscapeString(code)));
}
string cCommand = "INSERT INTO Code (UserID, Code) VALUES " + string.Join(",", lCodes);
using (MySqlCommand myCmdNested = new MySqlCommand(cCommand, mConnection))
{
    myCmdNested.ExecuteNonQuery();
}

который генерирует один оператор insert вместо item.Count

Ответ 3

Я разработал приложение приложения WPF с использованием Entity Framework и использовал базу данных SQL Server и должен был читать данные из файла excel и должен был вставить эти данные в 2 таблицы, которые имеют отношения между ними. Примерно за 15000 строк в excel он занимал около 4 часов. Затем я использовал блок из 500 строк для каждой вставки, и это ускорило мою установку в unbelievalbe быстро, и теперь для импорта этих же данных требуется всего 3-5 секунд.

Итак, я предлагаю вам добавить ваши строки в контекст, например, 100/200/500, а затем вызвать метод SaveChanges (если вы действительно хотите использовать EF). Есть и другие полезные советы, чтобы ускорить работу EF. Пожалуйста, прочитайте этот для справки.

var totalRecords = TestPacksData.Rows.Count;
var totalPages = (totalRecords / ImportRecordsPerPage) + 1;
while (count <= totalPages)
{
     var pageWiseRecords = TestPacksData.Rows.Cast<DataRow>().Skip(count * ImportRecordsPerPage).Take(ImportRecordsPerPage);
     count++;
     Project.CreateNewSheet(pageWiseRecords.ToList());
     Project.CreateNewSpool(pageWiseRecords.ToList());
}

И вот метод CreateNewSheet

/// <summary>
/// Creates a new Sheet record in the database
/// </summary>
/// <param name="row">DataRow containing the Sheet record</param>
public void CreateNewSheet(List<DataRow> rows)
{
     var tempSheetsList = new List<Sheet>();
     foreach (var row in rows)
     {
         var sheetNo = row[SheetFields.Sheet_No.ToString()].ToString();
         if (string.IsNullOrWhiteSpace(sheetNo))
              continue;
         var testPackNo = row[SheetFields.Test_Pack_No.ToString()].ToString();
         TestPack testPack = null;
         if (!string.IsNullOrWhiteSpace(testPackNo))
              testPack = GetTestPackByTestPackNo(testPackNo);

         var existingSheet = GetSheetBySheetNo(sheetNo);
         if (existingSheet != null)
         {
             UpdateSheet(existingSheet, row);
             continue;
         }

         var isometricNo = GetIsometricNoFromSheetNo(sheetNo);
         var newSheet = new Sheet
         {
             sheet_no = sheetNo,
             isometric_no = isometricNo,
             ped_rev = row[SheetFields.PED_Rev.ToString()].ToString(),
             gpc_rev = row[SheetFields.GPC_Rev.ToString()].ToString()
         };
         if (testPack != null)
         {
             newSheet.test_pack_id = testPack.id;
             newSheet.test_pack_no = testPack.test_pack_no;
         }
         if (!tempSheetsList.Any(l => l.sheet_no == newSheet.sheet_no))
         {
              DataStore.Context.Sheets.Add(newSheet);
              tempSheetsList.Add(newSheet);
         }
   }
   try
   {
        DataStore.Context.SaveChanges();
        **DataStore.Dispose();** This is very important. Dispose the context
   }
   catch (DbEntityValidationException ex)
   {
       // Create log for the exception here
   }
}

CreateNewSpool - это тот же метод, за исключением имени поля и имени таблицы, поскольку он обновляет дочернюю таблицу. Но идея такая же

Ответ 4

1 - добавьте столбец VirtualId в таблицу User и .

EDITED 2 - Назначьте числа в цикле для VirtualId (используйте отрицательные числа, начиная с -1, чтобы избежать столкновений в последнем шаге) в каждом объекте User. Для каждого объекта Code c, принадлежащего объекту User u, установите c.UserId = u.VirtualId.

3 - Массовая загрузка Пользователей в таблицу User, Массовая загрузка Коды в таблицу Code.

4- UPDATE CODE C,USER U SET C.UserId = U.Id WHERE C.UserId = U.VirtualId.

ПРИМЕЧАНИЕ. Если у вас есть ограничение FK на Code.UserId, вы можете удалить его и повторно добавить после вставки.

public class User
{
    public int Id { get; set; }
    public string FirstName { get; set; }
    public string LastName { get; set; }
    public int VirtualId { get; set; }

}

public class Code
{
    public int Id { get; set; }
    public string Code { get; set; }
    public string UserId { get; set; }
}

Ответ 5

Можете ли вы разбить CSV на два файла?

например. Предположим, что ваш файл имеет следующие столбцы:

... A ... | ... B ... 
a0 | b0
a0 | b1
a0 | b2    <-- data
a1 | b3
a1 | b4

Таким образом, один набор из A может иметь несколько записей B. После того, как вы разделите его, вы получите:

... A ...
a0
a1

... B ...
b0
b1
b2
b3
b4

Затем вы вставляете их отдельно.

Изменить: псевдокод

Основываясь на разговоре, что-то вроде:

DataTable tableA = ...; // query schema for TableA
DataTable tableB = ...; // query schmea for TableB

List<String> usernames = select distinct username from TableA;
Hashtable htUsername = new Hashtable(StringComparer.InvariantCultureIgnoreCase);
foreach (String username in usernames)
     htUsername[username] = "";

int colUsername = ...;
foreach (String[] row in CSVFile) {
    String un = row[colUsername] as String;
    if (htUsername[un] == null) {
        // add new row to tableA
        DataRow row = tableA.NewRow();
        row["Username"] = un;
        // etc.
        tableA.Rows.Add(row);
        htUsername[un] = "";
    }
}

// bulk insert TableA

select userid, username from TableA
Hashtable htUserId = new Hashtable(StringComparer.InvariantCultureIgnoreCase);
// htUserId[username] = userid;

int colUserId = ...;

foreach (String[] row in CSVFile) {

    String un = row[colUsername] as String;
    int userid = (int) htUserId[un];
    DataRow row = tableB.NewRow();
    row[colUserId] = userId;
    // fill in other values
    tableB.Rows.Add(row);
    if (table.Rows.Count == 65000) {
        // bulk insert TableB
        var t = tableB.Clone();
        tableB.Dispose();
        tableB = t;
    }
}

if (tableB.Rows.Count > 0)
    // bulk insert TableB

Ответ 6

AFAIK вставки, сделанные в таблице, являются последовательными, а вставки в другой таблице могут выполняться параллельно. Откройте два отдельных новых подключения к одной и той же базе данных, а затем добавьте их параллельно, возможно, используя параллельную библиотеку задач.

Однако, если существуют ограничения целостности между отношениями 1: n между таблицами, тогда:

  • Вставки могут завершиться ошибкой, и, следовательно, любой подход с параллельной вставкой будет неправильным. Ясно, что лучше всего будет делать только последовательные вставки, одну таблицу за другой.
  • Вы можете попробовать и отсортировать данные обеих таблиц, напишите метод InsertInto, написанный ниже, так что вставка во вторую таблицу произойдет только после того, как вы закончите вставлять данные в первую.

Изменить:. Поскольку вы запросили, если есть возможность для параллельного выполнения вставок, следуйте за шаблоном кода, который вы можете использовать.

private void ParallelInserts()
{
    ..
    //Other code in the method
    ..

    //Read first csv into memory. It just a GB so should be fine
    ReadFirstCSV();

    //Read second csv into memory...
    ReadSecondCSV();

    //Because the inserts will last more than a few CPU cycles...
    var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None)

    //An array to hold the two parallel inserts
    var insertTasks = new Task[2];

    //Begin insert into first table...
    insertTasks[0] = taskFactory.StartNew(() => InsertInto(commandStringFirst, connectionStringFirst));

    //Begin insert into second table...
    insertTasks[1] = taskFactory.StartNew(() => InsertInto(commandStringSecond, connectionStringSecond));

    //Let them be done...
    Task.WaitAll(insertTasks);

    Console.WriteLine("Parallel insert finished.");
}


//Defining the InsertInto method which we are passing to the tasks in the method above
private static void InsertInto(string commandString, string connectionString)
{
    using (/*open a new connection using the connectionString passed*/)
    {
        //In a while loop, iterate until you have 100/200/500 rows
        while (fileIsNotExhausted)
        {
            using (/*commandString*/)
            {
                //Execute command to insert in bulk
            }
        }
    }
}

Ответ 7

Когда вы говорите "эффективно", вы говорите о памяти или времени?

Что касается повышения скорости вставки, если вы можете сделать несколько блоков значений в инструкции вставки, вы можете получить 500% -ное улучшение скорости. В этом вопросе я сделал несколько тестов: Что происходит быстрее: несколько одиночных INSERT или один многострочный INSERT?

Мой подход описывается в ответе, но просто помещается, считывая до 50 "строк" ​​(для вставки) сразу и связывая их с одним выражением типа INSERT INTO(...), VALUES(...),(...),(...)...(...),(...), похоже, действительно ускоряет работу. По крайней мере, если вам не разрешено загружать навалом.

Другой подход btw, если у вас есть живые данные, которые вы не можете удалить индексы во время загрузки, заключается в создании таблицы памяти на сервере mysql без индексов, выгружении данных там, а затем выполните INSERT INTO live SELECT * FROM mem. Хотя это использует больше памяти на сервере, поэтому вопрос в начале этого ответа о том, "что вы подразумеваете под" эффективно "?":)

О, и, вероятно, нет ничего плохого в том, чтобы итерации через файл и сначала сделать все первые вставки таблицы, а затем сделать вторую таблицу. Думаю, если данные не будут использованы вживую. В этом случае вы определенно можете использовать наборный подход, но логика приложения для этого намного сложнее.

UPDATE:. Пример запроса кода С# для многозначных блоков вставки.

Примечание: этот код предполагает, что у вас уже настроено несколько структур:

  • таблицы Список <string> - имена таблиц для вставки в
  • список полей Словарь < string, List <String → → - список имен полей для каждой таблицы
  • typeslist Словарь < string, List <MySqlDbType → → - список MySqlDbType для каждой таблицы, в том же порядке, что и имена полей.
  • nullslist Словарь < string, List <Boolean → → - список флагов, указывающий, является ли поле нулевым или нет, для каждой таблицы (тот же порядок, что и имена полей).
  • prikey Словарь < string, string > - список имени поля первичного ключа для каждой таблицы (обратите внимание: это не поддерживает несколько первичных ключей поля, хотя, если вам это нужно, вы могли бы взломать его - я думаю, где-то у меня есть версия, которая поддерживает это, но... meh).
  • словарь Словарь < string, List < Dictionary < int, object → → → → - фактические данные, в виде списка словарей значения fieldnum, для каждой таблицы.

О да, и локальная команда - это MySqlCommand, созданная с помощью CreateCommand() на локальном объекте MySqlConnection.

Далее: я написал это довольно давно, когда я начинал. Если это заставляет глаза или мозг кровоточить, я заранее извиняюсь:)

const int perinsert = 50;
foreach (string table in tables)
{
    string[] fields = fieldslist[table].ToArray();
    MySqlDbType[] types = typeslist[table].ToArray();
    bool[] nulls = nullslist[table].ToArray();

    int thisblock = perinsert;
    int rowstotal = theData[table].Count;
    int rowsremainder = rowstotal % perinsert;
    int rowscopied = 0;

    // Do the bulk (multi-VALUES block) INSERTs, but only if we have more rows than there are in a single bulk insert to perform:
    while (rowscopied < rowstotal)
    {
        if (rowstotal - rowscopied < perinsert)
            thisblock = rowstotal - rowscopied;
        // Generate a 'perquery' multi-VALUES prepared INSERT statement:
        List<string> extravals = new List<string>();
        for (int j = 0; j < thisblock; j++)
            extravals.Add(String.Format("(@{0}_{1})", j, String.Join(String.Format(", @{0}_", j), fields)));
        localcmd.CommandText = String.Format("INSERT INTO {0} VALUES{1}", tmptable, String.Join(",", extravals.ToArray()));
        // Now create the parameters to match these:
        for (int j = 0; j < thisblock; j++)
            for (int i = 0; i < fields.Length; i++)
                localcmd.Parameters.Add(String.Format("{0}_{1}", j, fields[i]), types[i]).IsNullable = nulls[i];

        // Keep doing bulk INSERTs until there less rows left than we need for another one:
        while (rowstotal - rowscopied >= thisblock)
        {
            // Queue up all the VALUES for this block INSERT:
            for (int j = 0; j < thisblock; j++)
            {
                Dictionary<int, object> row = theData[table][rowscopied++];
                for (int i = 0; i < fields.Length; i++)
                    localcmd.Parameters[String.Format("{0}_{1}", j, fields[i])].Value = row[i];
            }
            // Run the query:
            localcmd.ExecuteNonQuery();
        }
        // Clear all the paramters - we're done here:
        localcmd.Parameters.Clear();
    }
}