Используйте несколько задач для извлечения всех записей из большой коллекции
Я работаю над приложением, которое вызывает внешнюю службу, и должен добавить все записи внешней коллекции в локальную коллекцию. В настоящее время проблема заключается в том, что внешняя коллекция может превышать 1000 записей, но возвращаемые результаты поиска могут включать до двадцати элементов.
Для скорости, которую я решил использовать, сборка Задач будет способом продвижения вперед, поэтому я придумал код ниже:
int totalCount = returnedCol.total_count;
while (totalCount > myDict.Count)
{
int numberOfTasks = // logic to calculate how many tasks to run
List<Task> taskList = new List<Task>();
for (int i = 1; i <= numberOfTasks; i++)
{
Interlocked.Add(ref pageNumber, pageSize);
Task<SearchResponse> testTask = Task.Run(() =>
{
return ExternalCall.GetData(pageNumber, pageSize);
});
Thread.Sleep(100);
taskList.Add(testTask);
testTask.ContinueWith(o =>
{
foreach (ExternalDataRecord dataiwant in testTask.Result.dataiwant)
{
if (!myDict.ContainsKey(dataiwant.id))
myDict.GetOrAdd(dataiwant.id, dataiwant);
}
});
}
Task.WaitAll(taskList.ToArray());
}
Однако это не дает всех результатов. Переменная pageNumber
увеличивается каждый раз, но кажется, что не все результаты задачи анализируются (поскольку одна и та же логика в одном потоке на меньшем наборе данных возвращает все ожидаемые результаты). Кроме того, я попытался объявить отдельные задачи в цепочке (а не в цикле), и все тестовые данные возвращены. Похоже, что чем выше значение, которое я передаю в Thread.Sleep()
, тем больше результаты добавляются в локальную коллекцию (но это не идеально, так как это означает, что процесс занимает больше времени!)
В настоящее время в образце из 600 записей я получаю около 150-200, добавленных в коллекцию myDict
. Мне что-то не хватает?
Ответы
Ответ 1
Вам не хватает того, что ContinueWith()
приводит к другой задаче, и вы не добавляете, что ваш taskList
.
Лучшим подходом было бы использовать async
/await
, доступный с .NET 4.5. Это обеспечивает менее тяжелый подход к решению.
Вы изменили бы алгоритм таким образом:
public async Task Process()
{
int totalCount = returnedCol.total_count;
while (totalCount > myDict.Count)
{
int numberOfTasks = // logic to calculate how many tasks to run
List<Task> taskList = new List<Task>();
for (int i = 1; i <= numberOfTasks; i++)
{
Interlocked.Add(ref pageNumber, pageSize);
taskList.Add(ProcessPage(pageNumber, pageSize));
}
await Task.WhenAll(taskList.ToArray());
}
}
private async Task ProcessPage(int pageNumber, int pageSize)
{
SearchResponse result = await Task.Run(() =>
ExternalCall.GetData(pageNumber, pageSize)).ConfigureAwait(false);
foreach (ExternalDataRecord dataiwant in result.dataiwant)
{
myDict.GetOrAdd(dataiwant.id, dataiwant);
}
}
Ключевое слово async
сообщает компилятору, что позже будет await
. await
по существу обрабатывает детали вокруг вашего вызова ContinueWith
. Если вы действительно хотите, чтобы ExternalCall
произошел в другой задаче, вы просто await
получили бы результаты от этого вызова.
Ответ 2
Я думаю, что если вы возьмете более функциональный и менее императивный подход к вашему коду, у вас будет меньше шансов столкнуться с трудными для понимания проблемами. Я думаю, что что-то вроде этого будет иметь тот же эффект, что и вы:
int totalCount = returnedCol.total_count;
var tasks = Enumerable.Range(1, totalCount / pageSize)
.Select(async page => {
await Task.Delay(page * 100);
return ExternalCall.GetData(page, pageSize));
})
.ToArray();
myDict = (await Task.WhenAll(tasks))
.ToDictionary(dataiwant => dataiwant.id);
В приведенном выше коде предполагается, что вы все еще хотите подождать 100 мс между запросами на дросселирование. Если у вас только что был Thread.Sleep()
, чтобы попробовать устранить проблемы, которые у вас были, вы могли бы еще больше упростить его:
int totalCount = returnedCol.total_count;
var tasks = Enumerable.Range(1, totalCount / pageSize)
.Select(async page => await Task.Run(() => ExternalCall.GetData(page, pageSize)))
.ToArray();
myDict = (await Task.WhenAll(tasks))
.ToDictionary(dataiwant => dataiwant.id);