Каков наилучший способ заставить несколько потоков работать, и ждать, пока все они завершатся?
Я пишу простое приложение (для моей жены не менее: -P), которое делает некоторые манипуляции с изображениями (изменение размера, временная привязка и т.д.) для потенциально большой партии изображений. Поэтому я пишу библиотеку, которая может делать это как синхронно, так и асинхронно. Я решил использовать Асинхронный шаблон на основе событий. При использовании этого шаблона вам нужно поднять событие, когда работа будет завершена. Здесь я испытываю проблемы, зная, когда это будет сделано. Итак, в моем методе DownsizeAsync (метод async для сокращения изображений) я делаю что-то вроде этого:
public void DownsizeAsync(string[] files, string destination)
{
foreach (var name in files)
{
string temp = name; //countering the closure issue
ThreadPool.QueueUserWorkItem(f =>
{
string newFileName = this.DownsizeImage(temp, destination);
this.OnImageResized(newFileName);
});
}
}
Теперь сложная часть - это знать, когда все они полны.
Вот что я рассмотрел: Использование ManualResetEvents, как здесь: http://msdn.microsoft.com/en-us/library/3dasc8as%28VS.80%29.aspx Но проблема, с которой я столкнулся, заключается в том, что вы можете только ждать для 64 или менее событий. У меня может быть много изображений.
Второй вариант: иметь счетчик, который учитывает сделанные изображения, и поднять событие, когда счетчик достигнет общего значения:
public void DownsizeAsync(string[] files, string destination)
{
foreach (var name in files)
{
string temp = name; //countering the closure issue
ThreadPool.QueueUserWorkItem(f =>
{
string newFileName = this.DownsizeImage(temp, destination);
this.OnImageResized(newFileName);
total++;
if (total == files.Length)
{
this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
}
});
}
}
private volatile int total = 0;
Теперь это кажется "взломанным", и я не совсем уверен, что этот поток безопасен.
Итак, мой вопрос: какой лучший способ сделать это? Есть ли другой способ синхронизации всех потоков? Должен ли я использовать ThreadPool? Спасибо!!
ОБНОВЛЕНИЕ. Основываясь на отзывах в комментариях и нескольких ответах, я решил использовать этот подход:
Во-первых, я создал метод расширения, который перечисляет перечисление в "партии":
public static IEnumerable<IEnumerable<T>> GetBatches<T>(this IEnumerable<T> source, int batchCount)
{
for (IEnumerable<T> s = source; s.Any(); s = s.Skip(batchCount))
{
yield return s.Take(batchCount);
}
}
В принципе, если вы делаете что-то вроде этого:
foreach (IEnumerable<int> batch in Enumerable.Range(1, 95).GetBatches(10))
{
foreach (int i in batch)
{
Console.Write("{0} ", i);
}
Console.WriteLine();
}
Вы получаете этот вывод:
1 2 3 4 5 6 7 8 9 10
11 12 13 14 15 16 17 18 19 20
21 22 23 24 25 26 27 28 29 30
31 32 33 34 35 36 37 38 39 40
41 42 43 44 45 46 47 48 49 50
51 52 53 54 55 56 57 58 59 60
61 62 63 64 65 66 67 68 69 70
71 72 73 74 75 76 77 78 79 80
81 82 83 84 85 86 87 88 89 90
91 92 93 94 95
Идея заключалась в том, что (как указывал кто-то в комментариях) нет необходимости создавать отдельный поток для каждого изображения. Поэтому я буду загружать изображения в [machine.cores * 2] количество партий. Затем я воспользуюсь своим вторым подходом, который просто будет поддерживать счетчик, и когда счетчик достигнет общей суммы, которую я ожидаю, я узнаю, что я закончил.
Причина, по которой я сейчас убежден, что она действительно потокобезопасна, заключается в том, что я обозначил полную переменную как изменчивую, которая согласно MSDN:
Обычно используется изменчивый модификатор для поля, к которому осуществляется доступ несколько потоков без использования блокировки для сериализации доступа. Использование изменчивого модификатора обеспечивает что один поток извлекает больше всего обновленное значение, написанное другим нить
означает, что я должен быть в ясном (если нет, пожалуйста, дайте мне знать!)
Итак, вот код, который я собираюсь:
public void DownsizeAsync(string[] files, string destination)
{
int cores = Environment.ProcessorCount * 2;
int batchAmount = files.Length / cores;
foreach (var batch in files.GetBatches(batchAmount))
{
var temp = batch.ToList(); //counter closure issue
ThreadPool.QueueUserWorkItem(b =>
{
foreach (var item in temp)
{
string newFileName = this.DownsizeImage(item, destination);
this.OnImageResized(newFileName);
total++;
if (total == files.Length)
{
this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
}
}
});
}
}
Я открыт для обратной связи, поскольку я никоим образом не являюсь экспертом по многопоточности, поэтому, если кто-либо видит какую-либо проблему с этим или имеет лучшую идею, пожалуйста, дайте мне знать. (Да, это только домашнее приложение, но у меня есть некоторые идеи о том, как я могу использовать полученные знания, чтобы улучшить наш сервис поиска/индекса, который мы используем на работе.) Пока я буду держать этот вопрос открытым до тех пор, пока я чувствую, что я использую правильный подход. Спасибо всем за вашу помощь.
Ответы
Ответ 1
Простейшим может быть создание новых потоков, а затем вызов Thread.Join
для каждого из них. Вы можете использовать семафор или что-то в этом роде, но, возможно, проще просто создать новые потоки.
В .NET 4.0 вы можете использовать Parallel Extensions, чтобы сделать это довольно легко с задачами.
В качестве другой альтернативы, которая будет использовать threadpool, вы можете создать делегат и называть BeginInvoke
на нем, чтобы вернуть IAsyncResult
- вы можете получить WaitHandle
для каждого результата через AsyncWaitHandle
и вызовите WaitHandle.WaitAll
.
EDIT: Как указано в комментариях, вы можете вызывать только WaitAll
с 64 ручками одновременно на некоторых реализациях. Альтернативы могут вызывать WaitOne
на каждом из них по очереди или вызывать WaitAll
с партиями. Это не имеет большого значения, если вы делаете это из потока, который не собирается блокировать threadpool. Также обратите внимание, что вы не можете вызывать WaitAll
из потока STA.
Ответ 2
Вы все еще хотите использовать ThreadPool, потому что он будет управлять количеством потоков, которые он запускает одновременно. Недавно я столкнулся с подобной проблемой и решил ее так:
var dispatcher = new ThreadPoolDispatcher();
dispatcher = new ChunkingDispatcher(dispatcher, 10);
foreach (var image in images)
{
dispatcher.Add(new ResizeJob(image));
}
dispatcher.WaitForJobsToFinish();
IDispatcher и IJob выглядят следующим образом:
public interface IJob
{
void Execute();
}
public class ThreadPoolDispatcher : IDispatcher
{
private IList<ManualResetEvent> resetEvents = new List<ManualResetEvent>();
public void Dispatch(IJob job)
{
var resetEvent = CreateAndTrackResetEvent();
var worker = new ThreadPoolWorker(job, resetEvent);
ThreadPool.QueueUserWorkItem(new WaitCallback(worker.ThreadPoolCallback));
}
private ManualResetEvent CreateAndTrackResetEvent()
{
var resetEvent = new ManualResetEvent(false);
resetEvents.Add(resetEvent);
return resetEvent;
}
public void WaitForJobsToFinish()
{
WaitHandle.WaitAll(resetEvents.ToArray() ?? new ManualResetEvent[] { });
resetEvents.Clear();
}
}
И затем использовал декоратор для использования ThreadPool:
public class ChunkingDispatcher : IDispatcher
{
private IDispatcher dispatcher;
private int numberOfJobsDispatched;
private int chunkSize;
public ChunkingDispatcher(IDispatcher dispatcher, int chunkSize)
{
this.dispatcher = dispatcher;
this.chunkSize = chunkSize;
}
public void Dispatch(IJob job)
{
dispatcher.Dispatch(job);
if (++numberOfJobsDispatched % chunkSize == 0)
WaitForJobsToFinish();
}
public void WaitForJobsToFinish()
{
dispatcher.WaitForJobsToFinish();
}
}
Абстракция IDispatcher работает очень хорошо для замены вашей технологии потоковой обработки. У меня есть другая реализация, которая является SingleThreadedDispatcher, и вы можете сделать версию ThreadStart, например, предложенную Джоном Скитом. Затем легко запустить каждый из них и посмотреть, какую производительность вы получите. SingleThreadedDispatcher хорош при отладке вашего кода или когда вы не хотите убивать процессор на вашем поле.
Изменить: Я забыл добавить код для ThreadPoolWorker:
public class ThreadPoolWorker
{
private IJob job;
private ManualResetEvent doneEvent;
public ThreadPoolWorker(IJob job, ManualResetEvent doneEvent)
{
this.job = job;
this.doneEvent = doneEvent;
}
public void ThreadPoolCallback(object state)
{
try
{
job.Execute();
}
finally
{
doneEvent.Set();
}
}
}
Ответ 3
Самое простое и эффективное решение - использовать счетчики и сделать их потокобезопасными. Это будет потреблять меньше памяти и может масштабироваться до большего количества потоков
Вот пример
int itemCount = 0;
for (int i = 0; i < 5000; i++)
{
Interlocked.Increment(ref itemCount);
ThreadPool.QueueUserWorkItem(x=>{
try
{
//code logic here.. sleep is just for demo
Thread.Sleep(100);
}
finally
{
Interlocked.Decrement(ref itemCount);
}
});
}
while (itemCount > 0)
{
Console.WriteLine("Waiting for " + itemCount + " threads...");
Thread.Sleep(100);
}
Console.WriteLine("All Done!");
Ответ 4
.Net 4.0 делает многопоточность еще проще (хотя вы все равно можете снимать себя с побочными эффектами).
Ответ 5
Я использовал SmartThreadPool с большим успехом, чтобы справиться с этой проблемой. Существует также Codeplex сайт о сборке.
SmartThreadPool может помочь с другими проблемами, так же как некоторые потоки не могут работать одновременно, в то время как другие могут.
Ответ 6
Я использую статический метод утилиты для проверки всех отдельных команд ожидания.
public static void WaitAll(WaitHandle[] handles)
{
if (handles == null)
throw new ArgumentNullException("handles",
"WaitHandle[] handles was null");
foreach (WaitHandle wh in handles) wh.WaitOne();
}
Затем в моем основном потоке я создаю список этих команд ожидания, и для каждого делегата, который я помещал в свою очередь ThreadPool, я добавляю дескриптор wait в список...
List<WaitHandle> waitHndls = new List<WaitHandle>();
foreach (iterator logic )
{
ManualResetEvent txEvnt = new ManualResetEvent(false);
ThreadPool.QueueUserWorkItem(
delegate
{
try { // Code to process each task... }
// Finally, set each wait handle when done
finally { lock (locker) txEvnt.Set(); }
});
waitHndls.Add(txEvnt); // Add wait handle to List
}
util.WaitAll(waitHndls.ToArray()); // Check all wait Handles in List
Ответ 7
Другой вариант - использовать трубку.
Вы публикуете всю работу, которая должна быть выполнена для канала, а затем читать данные из канала из каждого потока. Когда труба пуста, все готово, потоки заканчиваются сами, и все счастливы (конечно, убедитесь, что вы сначала произвели всю работу, а затем уничтожили ее).
Ответ 8
Я предлагаю помещать нетронутые изображения в очередь и, когда вы читаете из очереди, запускаете поток и вставляете его свойство System.Threading.Thread.ManagedThreadId
в словарь вместе с именем файла. Таким образом, ваш пользовательский интерфейс может отображать как ожидающие, так и активные файлы.
Когда каждый поток завершается, он вызывает процедуру обратного вызова, возвращая свой ManagedThreadId. Этот обратный вызов (переданный в качестве делегата в поток) удаляет идентификатор потока из словаря, запускает другой поток из очереди и обновляет пользовательский интерфейс.
Когда и очередь, и словарь пусты, вы закончили.
Немного сложнее, но таким образом вы получаете отзывчивый интерфейс, вы можете легко контролировать количество активных потоков, и вы можете видеть, что в полете. Собирайте статистику. Познакомьтесь с WPF и установите индикаторы выполнения для каждого файла. Она не может не быть впечатлена.