Каков наилучший способ заставить несколько потоков работать, и ждать, пока все они завершатся?

Я пишу простое приложение (для моей жены не менее: -P), которое делает некоторые манипуляции с изображениями (изменение размера, временная привязка и т.д.) для потенциально большой партии изображений. Поэтому я пишу библиотеку, которая может делать это как синхронно, так и асинхронно. Я решил использовать Асинхронный шаблон на основе событий. При использовании этого шаблона вам нужно поднять событие, когда работа будет завершена. Здесь я испытываю проблемы, зная, когда это будет сделано. Итак, в моем методе DownsizeAsync (метод async для сокращения изображений) я делаю что-то вроде этого:

    public void DownsizeAsync(string[] files, string destination)
    {
        foreach (var name in files)
        {
            string temp = name; //countering the closure issue
            ThreadPool.QueueUserWorkItem(f =>
            {
                string newFileName = this.DownsizeImage(temp, destination);
                this.OnImageResized(newFileName);
            });
        }
     }

Теперь сложная часть - это знать, когда все они полны.

Вот что я рассмотрел: Использование ManualResetEvents, как здесь: http://msdn.microsoft.com/en-us/library/3dasc8as%28VS.80%29.aspx Но проблема, с которой я столкнулся, заключается в том, что вы можете только ждать для 64 или менее событий. У меня может быть много изображений.

Второй вариант: иметь счетчик, который учитывает сделанные изображения, и поднять событие, когда счетчик достигнет общего значения:

public void DownsizeAsync(string[] files, string destination)
{
    foreach (var name in files)
    {
        string temp = name; //countering the closure issue
        ThreadPool.QueueUserWorkItem(f =>
        {
            string newFileName = this.DownsizeImage(temp, destination);
            this.OnImageResized(newFileName);
            total++;
            if (total == files.Length)
            {
                this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
            }
        });
    }


}

private volatile int total = 0;

Теперь это кажется "взломанным", и я не совсем уверен, что этот поток безопасен.

Итак, мой вопрос: какой лучший способ сделать это? Есть ли другой способ синхронизации всех потоков? Должен ли я использовать ThreadPool? Спасибо!!

ОБНОВЛЕНИЕ. Основываясь на отзывах в комментариях и нескольких ответах, я решил использовать этот подход:

Во-первых, я создал метод расширения, который перечисляет перечисление в "партии":

    public static IEnumerable<IEnumerable<T>> GetBatches<T>(this IEnumerable<T> source, int batchCount)
    {
        for (IEnumerable<T> s = source; s.Any(); s = s.Skip(batchCount))
        {
            yield return s.Take(batchCount);
        }
    }

В принципе, если вы делаете что-то вроде этого:

        foreach (IEnumerable<int> batch in Enumerable.Range(1, 95).GetBatches(10))
        {
            foreach (int i in batch)
            {
                Console.Write("{0} ", i);
            }
            Console.WriteLine();
        }

Вы получаете этот вывод:

1 2 3 4 5 6 7 8 9 10
11 12 13 14 15 16 17 18 19 20
21 22 23 24 25 26 27 28 29 30
31 32 33 34 35 36 37 38 39 40
41 42 43 44 45 46 47 48 49 50
51 52 53 54 55 56 57 58 59 60
61 62 63 64 65 66 67 68 69 70
71 72 73 74 75 76 77 78 79 80
81 82 83 84 85 86 87 88 89 90
91 92 93 94 95

Идея заключалась в том, что (как указывал кто-то в комментариях) нет необходимости создавать отдельный поток для каждого изображения. Поэтому я буду загружать изображения в [machine.cores * 2] количество партий. Затем я воспользуюсь своим вторым подходом, который просто будет поддерживать счетчик, и когда счетчик достигнет общей суммы, которую я ожидаю, я узнаю, что я закончил.

Причина, по которой я сейчас убежден, что она действительно потокобезопасна, заключается в том, что я обозначил полную переменную как изменчивую, которая согласно MSDN:

Обычно используется изменчивый модификатор для поля, к которому осуществляется доступ несколько потоков без использования блокировки для сериализации доступа. Использование изменчивого модификатора обеспечивает что один поток извлекает больше всего обновленное значение, написанное другим нить

означает, что я должен быть в ясном (если нет, пожалуйста, дайте мне знать!)

Итак, вот код, который я собираюсь:

    public void DownsizeAsync(string[] files, string destination)
    {
        int cores = Environment.ProcessorCount * 2;
        int batchAmount = files.Length / cores;

        foreach (var batch in files.GetBatches(batchAmount))
        {
            var temp = batch.ToList(); //counter closure issue
            ThreadPool.QueueUserWorkItem(b =>
            {
                foreach (var item in temp)
                {
                    string newFileName = this.DownsizeImage(item, destination);
                    this.OnImageResized(newFileName);
                    total++;
                    if (total == files.Length)
                    {
                        this.OnDownsizeCompleted(new AsyncCompletedEventArgs(null, false, null));
                    }
                }
            });
        }
    }

Я открыт для обратной связи, поскольку я никоим образом не являюсь экспертом по многопоточности, поэтому, если кто-либо видит какую-либо проблему с этим или имеет лучшую идею, пожалуйста, дайте мне знать. (Да, это только домашнее приложение, но у меня есть некоторые идеи о том, как я могу использовать полученные знания, чтобы улучшить наш сервис поиска/индекса, который мы используем на работе.) Пока я буду держать этот вопрос открытым до тех пор, пока я чувствую, что я использую правильный подход. Спасибо всем за вашу помощь.

Ответы

Ответ 1

Простейшим может быть создание новых потоков, а затем вызов Thread.Join для каждого из них. Вы можете использовать семафор или что-то в этом роде, но, возможно, проще просто создать новые потоки.

В .NET 4.0 вы можете использовать Parallel Extensions, чтобы сделать это довольно легко с задачами.

В качестве другой альтернативы, которая будет использовать threadpool, вы можете создать делегат и называть BeginInvoke на нем, чтобы вернуть IAsyncResult - вы можете получить WaitHandle для каждого результата через AsyncWaitHandle и вызовите WaitHandle.WaitAll.

EDIT: Как указано в комментариях, вы можете вызывать только WaitAll с 64 ручками одновременно на некоторых реализациях. Альтернативы могут вызывать WaitOne на каждом из них по очереди или вызывать WaitAll с партиями. Это не имеет большого значения, если вы делаете это из потока, который не собирается блокировать threadpool. Также обратите внимание, что вы не можете вызывать WaitAll из потока STA.

Ответ 2

Вы все еще хотите использовать ThreadPool, потому что он будет управлять количеством потоков, которые он запускает одновременно. Недавно я столкнулся с подобной проблемой и решил ее так:

var dispatcher = new ThreadPoolDispatcher();
dispatcher = new ChunkingDispatcher(dispatcher, 10);

foreach (var image in images)
{
    dispatcher.Add(new ResizeJob(image));
}

dispatcher.WaitForJobsToFinish();

IDispatcher и IJob выглядят следующим образом:

public interface IJob
{
    void Execute();
}

public class ThreadPoolDispatcher : IDispatcher
{
    private IList<ManualResetEvent> resetEvents = new List<ManualResetEvent>();

    public void Dispatch(IJob job)
    {
        var resetEvent = CreateAndTrackResetEvent();
        var worker = new ThreadPoolWorker(job, resetEvent);
        ThreadPool.QueueUserWorkItem(new WaitCallback(worker.ThreadPoolCallback));
    }

    private ManualResetEvent CreateAndTrackResetEvent()
    {
        var resetEvent = new ManualResetEvent(false);
        resetEvents.Add(resetEvent);
        return resetEvent;
    }

    public void WaitForJobsToFinish()
    {
        WaitHandle.WaitAll(resetEvents.ToArray() ?? new ManualResetEvent[] { });
        resetEvents.Clear();
    }
}

И затем использовал декоратор для использования ThreadPool:

public class ChunkingDispatcher : IDispatcher
{
    private IDispatcher dispatcher;
    private int numberOfJobsDispatched;
    private int chunkSize;

    public ChunkingDispatcher(IDispatcher dispatcher, int chunkSize)
    {
        this.dispatcher = dispatcher;
        this.chunkSize = chunkSize;
    }

    public void Dispatch(IJob job)
    {
        dispatcher.Dispatch(job);

        if (++numberOfJobsDispatched % chunkSize == 0)
            WaitForJobsToFinish();
    }

    public void WaitForJobsToFinish()
    {
        dispatcher.WaitForJobsToFinish();
    }
}

Абстракция IDispatcher работает очень хорошо для замены вашей технологии потоковой обработки. У меня есть другая реализация, которая является SingleThreadedDispatcher, и вы можете сделать версию ThreadStart, например, предложенную Джоном Скитом. Затем легко запустить каждый из них и посмотреть, какую производительность вы получите. SingleThreadedDispatcher хорош при отладке вашего кода или когда вы не хотите убивать процессор на вашем поле.

Изменить: Я забыл добавить код для ThreadPoolWorker:

public class ThreadPoolWorker
{
    private IJob job;
    private ManualResetEvent doneEvent;

    public ThreadPoolWorker(IJob job, ManualResetEvent doneEvent)
    {
        this.job = job;
        this.doneEvent = doneEvent;
    }

    public void ThreadPoolCallback(object state)
    {
        try
        {
            job.Execute();
        }
        finally
        {
            doneEvent.Set();
        }
    }
}

Ответ 3

Самое простое и эффективное решение - использовать счетчики и сделать их потокобезопасными. Это будет потреблять меньше памяти и может масштабироваться до большего количества потоков

Вот пример

int itemCount = 0;
for (int i = 0; i < 5000; i++)
{
    Interlocked.Increment(ref itemCount);

    ThreadPool.QueueUserWorkItem(x=>{
        try
        {
            //code logic here.. sleep is just for demo
            Thread.Sleep(100);
        }
        finally
        {
            Interlocked.Decrement(ref itemCount);
        }
    });
}

while (itemCount > 0)
{
    Console.WriteLine("Waiting for " + itemCount + " threads...");
    Thread.Sleep(100);
}
Console.WriteLine("All Done!");

Ответ 4

.Net 4.0 делает многопоточность еще проще (хотя вы все равно можете снимать себя с побочными эффектами).

Ответ 5

Я использовал SmartThreadPool с большим успехом, чтобы справиться с этой проблемой. Существует также Codeplex сайт о сборке.

SmartThreadPool может помочь с другими проблемами, так же как некоторые потоки не могут работать одновременно, в то время как другие могут.

Ответ 6

Я использую статический метод утилиты для проверки всех отдельных команд ожидания.

    public static void WaitAll(WaitHandle[] handles)
    {
        if (handles == null)
            throw new ArgumentNullException("handles",
                "WaitHandle[] handles was null");
        foreach (WaitHandle wh in handles) wh.WaitOne();
    }

Затем в моем основном потоке я создаю список этих команд ожидания, и для каждого делегата, который я помещал в свою очередь ThreadPool, я добавляю дескриптор wait в список...

 List<WaitHandle> waitHndls = new List<WaitHandle>();
 foreach (iterator logic )
 {
      ManualResetEvent txEvnt = new ManualResetEvent(false);

      ThreadPool.QueueUserWorkItem(
           delegate
               {
                   try { // Code to process each task... }
                   // Finally, set each wait handle when done
                   finally { lock (locker) txEvnt.Set(); } 
               });
      waitHndls.Add(txEvnt);  // Add wait handle to List
 }
 util.WaitAll(waitHndls.ToArray());   // Check all wait Handles in List

Ответ 7

Другой вариант - использовать трубку.

Вы публикуете всю работу, которая должна быть выполнена для канала, а затем читать данные из канала из каждого потока. Когда труба пуста, все готово, потоки заканчиваются сами, и все счастливы (конечно, убедитесь, что вы сначала произвели всю работу, а затем уничтожили ее).

Ответ 8

Я предлагаю помещать нетронутые изображения в очередь и, когда вы читаете из очереди, запускаете поток и вставляете его свойство System.Threading.Thread.ManagedThreadId в словарь вместе с именем файла. Таким образом, ваш пользовательский интерфейс может отображать как ожидающие, так и активные файлы.

Когда каждый поток завершается, он вызывает процедуру обратного вызова, возвращая свой ManagedThreadId. Этот обратный вызов (переданный в качестве делегата в поток) удаляет идентификатор потока из словаря, запускает другой поток из очереди и обновляет пользовательский интерфейс.

Когда и очередь, и словарь пусты, вы закончили.

Немного сложнее, но таким образом вы получаете отзывчивый интерфейс, вы можете легко контролировать количество активных потоков, и вы можете видеть, что в полете. Собирайте статистику. Познакомьтесь с WPF и установите индикаторы выполнения для каждого файла. Она не может не быть впечатлена.