Ответ 1
Я бы использовал Parallel.For
и установил MaxDegreeOfParallelism
соответственно.
Parallel.For(0, 1000, new ParallelOptions { MaxDegreeOfParallelism = 10 },
i =>
{
GetPage(pageList[i]);
});
Я разрабатываю консольное приложение.
Я хочу использовать Threadpool для загрузки веб-страниц. Вот какой-то поддельный код.
for (int loop=0; loop< 100; loop++)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(GetPage), pageList[loop]);
}
snip
private static void GetPage(object o)
{
//get the page
}
Как мне предотвратить запуск моего кода более чем двумя (или десятью или другими) одновременными потоками?
Я пробовал
ThreadPool.SetMaxThreads(1, 0);
ThreadPool.SetMinThreads(1, 0);
Но они, похоже, не имеют никакого влияния.
Я бы использовал Parallel.For
и установил MaxDegreeOfParallelism
соответственно.
Parallel.For(0, 1000, new ParallelOptions { MaxDegreeOfParallelism = 10 },
i =>
{
GetPage(pageList[i]);
});
Лично я использовал SmartThreadPool и оставил ThreadPool самостоятельно. Однако это, вероятно, то, что вы хотите: Ограничивающие потоки потоков потока С#
Включенный код из ссылки (пожалуйста, дайте автору кредит, а не мне)
System.Threading.Semaphore S = new System.Threading.Semaphore(3, 3);
try
{
// wait your turn (decrement)
S.WaitOne();
// do your thing
}
finally
{
// release so others can go (increment)
S.Release();
}
Просто инвертируйте этот код из:
ThreadPool.SetMaxThreads(1, 0);
ThreadPool.SetMinThreads(1, 0);
To:
ThreadPool.SetMinThreads(1, 0);
ThreadPool.SetMaxThreads(1, 0);
Вы не можете установить MaxThread меньше, чем MinThread
Вы можете сделать это, используя метод ThreadPool.SetMaxThreads
.
Но есть некоторые проблемы с использованием ThreadPool для WebRequest. Прочитайте, например, this (ошибка в ThreadPool или HttpWebRequest?)
ThreadPool.SetMaxThreads(2,2);
Лично я использовал бы AsParallel от Linq
, для этого.
Посмотрите на параметры ThreadPool.SetMaxThreads. Первый параметр - количество рабочих потоков, а второй параметр - количество асинхронных потоков, о котором вы говорите.
Далее в документации, он говорит:
Вы не можете установить количество рабочих потоков или количество операций ввода-вывода завершение потоков на число, меньшее, чем количество процессоров в компьютере.
Похоже, вы пытаетесь использовать ThreadPool для того, для чего он не предназначен для использования. Если вы хотите ограничить количество загрузок, создайте класс, который управляет этим для вас, потому что ThreadPool не обязательно является полным решением вашей проблемы.
Я бы предложил класс, который запускает два потока в ThreadPool и ждет обратного вызова. Когда он получает обратный вызов для завершения одной из очередей потоков, новый.
Если вы затягиваете .Net 2.0, вы можете использовать следующую технику:
зная тот факт, что сыворотка вы ставите задачу в ThreadPool
, она создаст новый поток (конечно, если нет свободных), вы будете ждать, прежде чем делать это, пока не появится свободный поток. Для этой цели используется класс BlockingCounter
(описанный ниже), который, как только достигнут предел, будет ждать увеличения, пока кто-то (другой поток) не уменьшит его. Затем он ввел "закрытое" состояние, указывающее, что никаких новых приращений не будет сделано и ждет завершения.
Ниже приведен образец, который показывает максимум 4 задания с общим числом 10.
class Program
{
static int s_numCurrentThreads = 0;
static Random s_rnd = new Random();
static void Main(string[] args)
{
int maxParallelTasks = 4;
int totalTasks = 10;
using (BlockingCounter blockingCounter = new BlockingCounter(maxParallelTasks))
{
for (int i = 1; i <= totalTasks; i++)
{
Console.WriteLine("Submitting task {0}", i);
blockingCounter.WaitableIncrement();
if (!ThreadPool.QueueUserWorkItem((obj) =>
{
try
{
ThreadProc(obj);
}
catch (Exception ex)
{
Console.Error.WriteLine("Task {0} failed: {1}", obj, ex.Message);
}
finally
{
// Exceptions are possible here too,
// but proper error handling is not the goal of this sample
blockingCounter.WaitableDecrement();
}
}, i))
{
blockingCounter.WaitableDecrement();
Console.Error.WriteLine("Failed to submit task {0} for execution.", i);
}
}
Console.WriteLine("Waiting for copmletion...");
blockingCounter.CloseAndWait(30000);
}
Console.WriteLine("Work done!");
Console.ReadKey();
}
static void ThreadProc (object obj)
{
int taskNumber = (int) obj;
int numThreads = Interlocked.Increment(ref s_numCurrentThreads);
Console.WriteLine("Task {0} started. Total: {1}", taskNumber, numThreads);
int sleepTime = s_rnd.Next(0, 5);
Thread.Sleep(sleepTime * 1000);
Console.WriteLine("Task {0} finished.", taskNumber);
Interlocked.Decrement(ref s_numCurrentThreads);
}
Он использует класс BlockingCounter, основанный на Marc Gravell SizeQueue, размещенный здесь, но без счетчика вместо очереди. Когда вы завершаете очередь в новых потоках, вызовите метод Close(), а затем дождитесь окончания.
public class BlockingCounter : IDisposable
{
private int m_Count;
private object m_counterLock = new object();
private bool m_isClosed = false;
private volatile bool m_isDisposed = false;
private int m_MaxSize = 0;
private ManualResetEvent m_Finished = new ManualResetEvent(false);
public BlockingCounter(int maxSize = 0)
{
if (maxSize < 0)
throw new ArgumentOutOfRangeException("maxSize");
m_MaxSize = maxSize;
}
public void WaitableIncrement(int timeoutMs = Timeout.Infinite)
{
lock (m_counterLock)
{
while (m_MaxSize > 0 && m_Count >= m_MaxSize)
{
CheckClosedOrDisposed();
if (!Monitor.Wait(m_counterLock, timeoutMs))
throw new TimeoutException("Failed to wait for counter to decrement.");
}
CheckClosedOrDisposed();
m_Count++;
if (m_Count == 1)
{
Monitor.PulseAll(m_counterLock);
}
}
}
public void WaitableDecrement(int timeoutMs = Timeout.Infinite)
{
lock (m_counterLock)
{
try
{
while (m_Count == 0)
{
CheckClosedOrDisposed();
if (!Monitor.Wait(m_counterLock, timeoutMs))
throw new TimeoutException("Failed to wait for counter to increment.");
}
CheckDisposed();
m_Count--;
if (m_MaxSize == 0 || m_Count == m_MaxSize - 1)
Monitor.PulseAll(m_counterLock);
}
finally
{
if (m_isClosed && m_Count == 0)
m_Finished.Set();
}
}
}
void CheckClosedOrDisposed()
{
if (m_isClosed)
throw new Exception("The counter is closed");
CheckDisposed();
}
void CheckDisposed()
{
if (m_isDisposed)
throw new ObjectDisposedException("The counter has been disposed.");
}
public void Close()
{
lock (m_counterLock)
{
CheckDisposed();
m_isClosed = true;
Monitor.PulseAll(m_counterLock);
}
}
public bool WaitForFinish(int timeoutMs = Timeout.Infinite)
{
CheckDisposed();
lock (m_counterLock)
{
if (m_Count == 0)
return true;
}
return m_Finished.WaitOne(timeoutMs);
}
public void CloseAndWait (int timeoutMs = Timeout.Infinite)
{
Close();
WaitForFinish(timeoutMs);
}
public void Dispose()
{
if (!m_isDisposed)
{
m_isDisposed = true;
lock (m_counterLock)
{
// Wake up all waiting threads, so that they know the object
// is disposed and there nothing to wait anymore
Monitor.PulseAll(m_counterLock);
}
m_Finished.Close();
}
}
}
Результат будет таким:
Submitting task 1
Submitting task 2
Submitting task 3
Submitting task 4
Submitting task 5
Task 1 started. Total: 1
Task 1 finished.
Task 3 started. Total: 1
Submitting task 6
Task 2 started. Total: 2
Task 3 finished.
Task 6 started. Total: 4
Task 5 started. Total: 3
Task 4 started. Total: 4
Submitting task 7
Task 4 finished.
Submitting task 8
Task 7 started. Total: 4
Task 5 finished.
Submitting task 9
Task 7 finished.
Task 8 started. Total: 4
Task 9 started. Total: 4
Submitting task 10
Task 2 finished.
Waiting for copmletion...
Task 10 started. Total: 4
Task 10 finished.
Task 6 finished.
Task 8 finished.
Task 9 finished.
Work done!