Ответ 1
Предполагая, что вы создаете это с помощью TPL, вы можете установить ParallelOptions.MaxDegreesOfParallelism в соответствии с тем, что вы хотите.
Parallel.For для примера кода.
Я пишу программу С# для создания и загрузки полмиллиона файлов по FTP. Я хочу обрабатывать 4 файла параллельно, так как машина имеет 4 ядра, а генерация файлов занимает гораздо больше времени. Можно ли преобразовать следующий пример Powershell в С#? Или есть ли какие-либо улучшенные рамки, такие как структура Actor в С# (например, F # MailboxProcessor)?
$maxConcurrentJobs = 3;
# Read the input and queue it up
$jobInput = get-content .\input.txt
$queue = [System.Collections.Queue]::Synchronized( (New-Object System.Collections.Queue) )
foreach($item in $jobInput)
{
$queue.Enqueue($item)
}
# Function that pops input off the queue and starts a job with it
function RunJobFromQueue
{
if( $queue.Count -gt 0)
{
$j = Start-Job -ScriptBlock {param($x); Get-WinEvent -LogName $x} -ArgumentList $queue.Dequeue()
Register-ObjectEvent -InputObject $j -EventName StateChanged -Action { RunJobFromQueue; Unregister-Event $eventsubscriber.SourceIdentifier; Remove-Job $eventsubscriber.SourceIdentifier } | Out-Null
}
}
# Start up to the max number of concurrent jobs
# Each job will take care of running the rest
for( $i = 0; $i -lt $maxConcurrentJobs; $i++ )
{
RunJobFromQueue
}
Update:
Соединение с удаленным FTP-сервером может быть медленным, поэтому я хочу ограничить обработку загрузки FTP.
Предполагая, что вы создаете это с помощью TPL, вы можете установить ParallelOptions.MaxDegreesOfParallelism в соответствии с тем, что вы хотите.
Parallel.For для примера кода.
Задача Parallel Library - ваш друг здесь. См. эту ссылку, которая описывает, что доступно вам. В основном, платформа 4 поставляется с ней, которая оптимизирует эти по существу потоки, связанные с потоком потоков, на количество процессоров на текущей машине.
Возможно, что-то похожее:
ParallelOptions options = new ParallelOptions();
options.MaxDegreeOfParallelism = 4;
Затем в вашем цикле что-то вроде:
Parallel.Invoke(options,
() => new WebClient().Upload("http://www.linqpad.net", "lp.html"),
() => new WebClient().Upload("http://www.jaoo.dk", "jaoo.html"));
Если вы используете .Net 4.0, вы можете использовать Parallel library
Предположим, что вы повторяете полмиллиона файлов, вы можете "параллельно" итерации с помощью Parallel Foreach например или вы можете взгляните на PLinq Здесь сравнение между двумя
По сути, вы захотите создать действие или задачу для каждого загружаемого файла, поместить их в список и затем обработать этот список, ограничивая число, которое может обрабатываться параллельно.
Мой пост в блоге показывает, как это сделать как с задачами, так и с действиями, и предоставляет примерный проект, который вы можете скачать и запустить, чтобы увидеть оба В бою.
При использовании действий вы можете использовать встроенную функцию .Net Parallel.Invoke. Здесь мы ограничиваем его параллельным запуском не более 4 потоков.
var listOfActions = new List<Action>();
foreach (var file in files)
{
var localFile = file;
// Note that we create the Task here, but do not start it.
listOfTasks.Add(new Task(() => UploadFile(localFile)));
}
var options = new ParallelOptions {MaxDegreeOfParallelism = 4};
Parallel.Invoke(options, listOfActions.ToArray());
Этот параметр не поддерживает async, хотя я предполагаю, что вы являетесь функцией FileUpload, поэтому вы можете использовать пример задачи ниже.
С помощью Заданий нет встроенной функции. Однако вы можете использовать тот, который я предоставляю в своем блоге.
/// <summary>
/// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
{
await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
}
/// <summary>
/// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
/// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
{
// Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
var tasks = tasksToRun.ToList();
using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
{
var postTaskTasks = new List<Task>();
// Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));
// Start running each task.
foreach (var task in tasks)
{
// Increment the number of tasks currently running and wait if too many are running.
await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
task.Start();
}
// Wait for all of the provided tasks to complete.
// We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
await Task.WhenAll(postTaskTasks.ToArray());
}
}
И затем, создав свой список задач и вызывая функцию, чтобы они запускались, скажем, максимум 4 одновременных за один раз, вы могли бы сделать это:
var listOfTasks = new List<Task>();
foreach (var file in files)
{
var localFile = file;
// Note that we create the Task here, but do not start it.
listOfTasks.Add(new Task(async () => await UploadFile(localFile)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 4);
Кроме того, поскольку этот метод поддерживает async, он не будет блокировать поток пользовательского интерфейса, например, используя Parallel.Invoke или Parallel.ForEach.
Я кодировал ниже технику, где я использую BlockingCollection как диспетчер подсчета потоков. Это довольно просто реализовать и справиться с этой задачей. Он просто принимает объекты задачи и добавляет целочисленное значение в список блокировки, увеличивая количество выполняемых потоков на 1. Когда поток заканчивается, он удаляет объект и освобождает блок для операции добавления для предстоящих задач.
public class BlockingTaskQueue
{
private BlockingCollection<int> threadManager { get; set; } = null;
public bool IsWorking
{
get
{
return threadManager.Count > 0 ? true : false;
}
}
public BlockingTaskQueue(int maxThread)
{
threadManager = new BlockingCollection<int>(maxThread);
}
public async Task AddTask(Task task)
{
Task.Run(() =>
{
Run(task);
});
}
private bool Run(Task task)
{
try
{
threadManager.Add(1);
task.Start();
task.Wait();
return true;
}
catch (Exception ex)
{
return false;
}
finally
{
threadManager.Take();
}
}
}