Как выполнить большое количество SQL-запросов асинхронно и в потоках
Проблема: У меня огромное количество SQL-запросов (около 10–20 тыс.), И я хочу выполнить их асинхронно в 50 (или более) потоках.
Я написал сценарий powershell для этой работы, но он очень медленный (на выполнение всех ушло около 20 часов). Желаемый результат - максимум 3-4 часа.
Вопрос: Как я могу оптимизировать этот скрипт powershell? Должен ли я пересмотреть и использовать другую технологию, такую как python
или c#
?
Я думаю, что это проблема PowerShell, потому что, когда я проверяю с помощью whoisactive
, запросы выполняются быстро. Создание, завершение и выгрузка заданий занимает много времени, поскольку для каждого потока создаются отдельные экземпляры PS.
Мой код:
$NumberOfParallerThreads = 50;
$Arr_AllQueries = @('Exec [mystoredproc] @param1=1, @param2=2',
'Exec [mystoredproc] @param1=11, @param2=22',
'Exec [mystoredproc] @param1=111, @param2=222')
#Creating the batches
$counter = [pscustomobject] @{ Value = 0 };
$Batches_AllQueries = $Arr_AllQueries | Group-Object -Property {
[math]::Floor($counter.Value++ / $NumberOfParallerThreads)
};
forEach ($item in $Batches_AllQueries) {
$tmpBatch = $item.Group;
$tmpBatch | % {
$ScriptBlock = {
# accept the loop variable across the job-context barrier
param($query)
# Execute a command
Try
{
Write-Host "[processing '$query']"
$objConnection = New-Object System.Data.SqlClient.SqlConnection;
$objConnection.ConnectionString = 'Data Source=...';
$ObjCmd = New-Object System.Data.SqlClient.SqlCommand;
$ObjCmd.CommandText = $query;
$ObjCmd.Connection = $objConnection;
$ObjCmd.CommandTimeout = 0;
$objAdapter = New-Object System.Data.SqlClient.SqlDataAdapter;
$objAdapter.SelectCommand = $ObjCmd;
$objDataTable = New-Object System.Data.DataTable;
$objAdapter.Fill($objDataTable) | Out-Null;
$objConnection.Close();
$objConnection = $null;
}
Catch
{
$ErrorMessage = $_.Exception.Message
$FailedItem = $_.Exception.ItemName
Write-Host "[Error processing: $($query)]" -BackgroundColor Red;
Write-Host $ErrorMessage
}
}
# pass the loop variable across the job-context barrier
Start-Job $ScriptBlock -ArgumentList $_ | Out-Null
}
# Wait for all to complete
While (Get-Job -State "Running") { Start-Sleep 2 }
# Display output from all jobs
Get-Job | Receive-Job | Out-Null
# Cleanup
Remove-Job *
}
UPDATE:
Ресурсы: Сервер БД находится на удаленном компьютере с:
- 24 ГБ ОЗУ,
- 8 ядер,
- 500 ГБ,
- SQL Server 2016
Мы хотим использовать максимальную мощность процессора.
Основное ограничение: Единственное ограничение - не использовать SQL Server для выполнения запросов. Запросы должны поступать из внешних источников, таких как: Powershell, С#, Python и т.д.
Ответы
Ответ 1
RunspacePool - способ пойти сюда, попробуйте это:
$AllQueries = @( ... )
$MaxThreads = 5
# Each thread keeps its own connection but shares the query queue
$ScriptBlock = {
Param($WorkQueue)
$objConnection = New-Object System.Data.SqlClient.SqlConnection
$objConnection.ConnectionString = 'Data Source=...'
$objCmd = New-Object System.Data.SqlClient.SqlCommand
$objCmd.Connection = $objConnection
$objCmd.CommandTimeout = 0
$query = ""
while ($WorkQueue.TryDequeue([ref]$query)) {
$objCmd.CommandText = $query
$objAdapter = New-Object System.Data.SqlClient.SqlDataAdapter $objCmd
$objDataTable = New-Object System.Data.DataTable
$objAdapter.Fill($objDataTable) | Out-Null
}
$objConnection.Close()
}
# create a pool
$pool = [RunspaceFactory]::CreateRunspacePool(1, $MaxThreads)
$pool.ApartmentState = 'STA'
$pool.Open()
# convert the query array into a concurrent queue
$workQueue = New-Object System.Collections.Concurrent.ConcurrentQueue[object]
$AllQueries | % { $workQueue.Enqueue($_) }
$threads = @()
# Create each powershell thread and add them to the pool
1..$MaxThreads | % {
$ps = [powershell]::Create()
$ps.RunspacePool = $pool
$ps.AddScript($ScriptBlock) | Out-Null
$ps.AddParameter('WorkQueue', $workQueue) | Out-Null
$threads += [pscustomobject]@{
Ps = $ps
Handle = $null
}
}
# Start all the threads
$threads | % { $_.Handle = $_.Ps.BeginInvoke() }
# Wait for all the threads to complete - errors will still set the IsCompleted flag
while ($threads | ? { !$_.Handle.IsCompleted }) {
Start-Sleep -Seconds 1
}
# Get any results and display an errors
$threads | % {
$_.Ps.EndInvoke($_.Handle) | Write-Output
if ($_.Ps.HadErrors) {
$_.Ps.Streams.Error.ReadAll() | Write-Error
}
}
В отличие от заданий powershell, RunspacePools может совместно использовать ресурсы. Таким образом, существует одна параллельная очередь всех запросов, и каждый поток сохраняет свое собственное соединение с базой данных.
Однако, как уже говорили другие - если вы не подвергаете стресс-тестирование своей базы данных, вам, вероятно, лучше реорганизовать запросы в массовые вставки.
Ответ 2
Вам необходимо реорганизовать свой сценарий, чтобы соединение с базой данных оставалось открытым в каждом рабочем потоке, используя его для всех запросов, выполняемых этим потоком. Прямо сейчас вы открываете новое соединение с базой данных для каждого запроса, что увеличивает объем накладных расходов. Устранение этих накладных расходов должно ускорить процесс до или за пределами вашей цели.
Ответ 3
Попробуйте использовать SqlCmd.
Вы можете запустить несколько процессов с помощью Process.Start() и использовать sqlcmd для выполнения запросов в параллельных процессах.
Конечно, если вы обязаны делать это в темах, этот ответ больше не будет решением.
Ответ 4
- Сгруппируйте свои запросы на основе таблицы и операций с этой таблицей. Используя это, вы можете определить, сколько асинхронных SQL-запросов вы можете выполнить для разных таблиц.
- Убедитесь, что размер каждой таблицы, против которой вы собираетесь работать. Потому что, если таблица содержит миллионы строк, и выполнение вами операции соединения с какой-либо другой таблицей увеличит время или, если это операция CUD, она также может заблокировать вашу таблицу.
- А также выберите количество потоков на основе ядер вашего процессора, а не на основе предположений. Поскольку ядро ЦП будет запускать по одному процессу за раз, поэтому лучше создать количество ядер * 2 потока являются эффективными.
Поэтому сначала изучите ваш набор данных, а затем выполните 2 вышеупомянутых пункта, чтобы вы могли легко определить, какие все запросы выполняются параллельно и эффективно.
Надеюсь, что это даст некоторые идеи. Лучше использовать для этого любой скрипт на Python, чтобы можно было легко запускать более одного процесса, а также отслеживать их активность.
Ответ 5
К сожалению, у меня нет времени, чтобы ответить на этот вопрос полностью, но это должно помочь:
Во-первых, вы не собираетесь использовать весь процессор для вставки такого количества записей, почти обещано. Но!
Поскольку, похоже, вы используете строковые команды SQL:
- Разделите вставки на группы, скажем, ~ 100 - ~ 1000 и вручную создайте массовые вставки:
Примерно так, как POC:
$query = "INSERT INTO [dbo].[Attributes] ([Name],[PetName]) VALUES "
for ($alot = 0; $alot -le 10; $alot++){
for ($i = 65; $i -le 85; $i++) {
$query += "('" + [char]$i + "', '" + [char]$i + "')";
if ($i -ne 85 -or $alot -ne 10) {$query += ",";}
}
}
После создания пакета передайте его в SQL для вставки, эффективно используя существующий код.
Бульдная вставка будет выглядеть примерно так:
INSERT INTO [dbo].[Attributes] ([Name],[PetName]) VALUES ('A', 'A'),('B', 'B'),('C', 'C'),('D', 'D'),('E', 'E'),('F', 'F'),('G', 'G'),('H', 'H'),('I', 'I'),('J', 'J'),('K', 'K'),('L', 'L'),('M', 'M'),('N', 'N'),('O', 'O'),('P', 'P'),('Q', 'Q'),('R', 'R'),('S', 'S')
Одно это должно ускорить ваши вкладыши на тонну!
- Не используйте 50 потоков, как упоминалось ранее, если у вас нет 25+ логических ядер. Большую часть времени вставки SQL вы будете тратить на ожидание в сети, а на жесткие диски, а не на ЦП. Имея такое количество потоков в очереди, вы будете тратить большую часть своего процессорного времени на ожидание более медленных частей стека.
Только эти две вещи, которые я себе представляю, могут сократить ваши вставки за считанные минуты (однажды 80k+ я использовал в основном этот подход примерно за 90 секунд).
Последняя часть может быть рефакторингом так, чтобы каждое ядро получало свое собственное соединение Sql, и затем вы оставляете его открытым, пока не будете готовы избавиться от всех потоков.
Ответ 6
Я мало что знаю о powershell, но я все время выполняю SQL в С# на работе.
С# новые ключевые слова async/await позволяют чрезвычайно легко делать то, о чем вы говорите.
С# также создаст для вас пул потоков с оптимальным количеством потоков для вашей машины.
async Task<DataTable> ExecuteQueryAsync(query)
{
return await Task.Run(() => ExecuteQuerySync(query));
}
async Task ExecuteAllQueriesAsync()
{
IList<Task<DataTable>> queryTasks = new List<Task<DataTable>>();
foreach query
{
queryTasks.Add(ExecuteQueryAsync(query));
}
foreach task in queryTasks
{
await task;
}
}
Приведенный выше код добавит все запросы в рабочую очередь пула потоков.
Затем дождитесь их всех до завершения. В результате достигается максимальный уровень параллелизма для вашего SQL.
Надеюсь это поможет!