Ответ 1
Ты почти там. Оберните код, который вы отправили в функции с этой подписью:
IEnumerable<IDataRecord> MyQuery()
а затем замените код // Do something with Reader
следующим образом:
yield return reader;
Теперь у вас есть что-то, что работает в одном потоке. К сожалению, когда вы читаете результаты запроса, он возвращает ссылку на один и тот же объект каждый раз, и объект просто мутирует себя для каждой итерации. Это означает, что если вы попытаетесь запустить его параллельно, вы получите некоторые действительно нечетные результаты, поскольку параллельные чтения мутируют объект, используемый в разных потоках. Вам нужен код, чтобы взять копию записи для отправки в ваш параллельный цикл.
На этом этапе, тем не менее, мне нравится пропустить дополнительную копию записи и перейти прямо к строго типизированному классу. Более того, мне нравится использовать общий метод для этого:
IEnumerable<T> GetData<T>(Func<IDataRecord, T> factory, string sql, Action<SqlParameterCollection> addParameters)
{
using (var cn = new SqlConnection("My connection string"))
using (var cmd = new SqlCommand(sql, cn))
{
addParameters(cmd.Parameters);
cn.Open();
using (var rdr = cmd.ExecuteReader())
{
while (rdr.Read())
{
yield return factory(rdr);
}
}
}
}
Предполагая, что ваши методы factory создают копию, как ожидалось, этот код должен быть безопасным для использования в цикле Parallel.ForEach. Вызов метода будет выглядеть примерно так (предполагая класс Employee со статическим методом factory с именем "Создать" ):
var UnderPaid = GetData<Employee>(Employee.Create,
"SELECT * FROM Employee WHERE AnnualSalary <= @MinSalary",
p => {
p.Add("@MinSalary", SqlDbType.Int).Value = 50000;
});
Parallel.ForEach(UnderPaid, e => e.GiveRaise());
Важное обновление:
Я не настолько уверен в этом коде, как когда-то был. Отдельный поток может все еще мутировать читателя, пока другой поток находится в процессе его копирования. Я мог бы установить блокировку, но я также обеспокоен тем, что другой поток может вызвать обновление читателя после того, как оригинал сам вызвал Read(), но прежде чем он начнет делать копию. Поэтому критический раздел здесь состоит из всего цикла while... и в этот момент вы снова возвращаетесь к однопоточному. Я ожидаю, что есть способ изменить этот код для работы, как ожидалось, для многопоточных сценариев, но для этого потребуется больше изучения.