Массив процесса параллельно с использованием GCD
У меня есть большой массив, который я хотел бы обработать, передав его в несколько асинхронных задач. В качестве доказательства концепции я написал следующий код:
class TestParallelArrayProcessing {
let array: [Int]
var summary: [Int]
init() {
array = Array<Int>(count: 500000, repeatedValue: 0)
for i in 0 ..< 500000 {
array[i] = Int(arc4random_uniform(10))
}
summary = Array<Int>(count: 10, repeatedValue: 0)
}
func calcSummary() {
let group = dispatch_group_create()
let queue = dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0)
for i in 0 ..< 10 {
dispatch_group_async(group, queue, {
let base = i * 50000
for x in base ..< base + 50000 {
self.summary[i] += self.array[x]
}
})
}
dispatch_group_notify(group, queue, {
println(self.summary)
})
}
}
После init()
, array
будет инициализирован случайными целыми числами от 0 до 9.
Функция calcSummary
отправляет 10 задач, которые занимают непересекающиеся фрагменты из 50000 элементов из array
и добавляют их, используя их соответствующий интервал в summary
как ускоритель.
Эта программа выходит из строя в строке self.summary[i] += self.array[x]
. Ошибка:
EXC_BAD_INSTRUCTION (code = EXC_I386_INVOP).
Я могу видеть в отладчике, что ему удалось выполнить итерацию несколько раз перед сбоем и что переменные во время сбоя имеют значения в правильных пределах.
Я прочитал, что EXC_I386_INVOP
может произойти при попытке получить доступ к уже выпущенному объекту. Интересно, имеет ли это отношение к Swift, делая копию массива, если он изменен, и если да, то как этого избежать.
Ответы
Ответ 1
Это немного отличается от подхода в ответе @Eduardo, используя метод Array
type withUnsafeMutableBufferPointer<R>(body: (inout UnsafeMutableBufferPointer<T>) -> R) -> R
. Указана эта документация метода:
Вызов body(p)
, где p
- указатель на изменяемое непрерывное хранилище Array
. Если такое хранилище не существует, оно сначала создается.
Часто оптимизатор может устранить проверки границ и уникальности в алгоритме массива, но когда это не удается, вызов того же алгоритма в аргументе body
позволяет вам торговать безопасностью для скорости.
Этот второй абзац, похоже, является тем, что происходит здесь, поэтому использование этого метода может быть более "идиоматичным" в Swift, что бы это ни значило:
func calcSummary() {
let group = dispatch_group_create()
let queue = dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0)
self.summary.withUnsafeMutableBufferPointer {
summaryMem -> Void in
for i in 0 ..< 10 {
dispatch_group_async(group, queue, {
let base = i * 50000
for x in base ..< base + 50000 {
summaryMem[i] += self.array[x]
}
})
}
}
dispatch_group_notify(group, queue, {
println(self.summary)
})
}
Ответ 2
Когда вы используете оператор +=
, LHS является параметром inout
- я думаю, что вы получаете условия гонки, когда, как вы упомянули в своем обновлении, Swift перемещается по массиву для оптимизации. Я смог заставить его работать, суммируя кусок в локальной переменной, а затем просто присваивая правильному индексу в summary
:
func calcSummary() {
let group = dispatch_group_create()
let queue = dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0)
for i in 0 ..< 10 {
dispatch_group_async(group, queue, {
let base = i * 50000
var sum = 0
for x in base ..< base + 50000 {
sum += self.array[x]
}
self.summary[i] = sum
})
}
dispatch_group_notify(group, queue, {
println(self.summary)
})
}
Ответ 3
Я думаю, что Nate прав: есть условия гонки с переменной summary
. Чтобы исправить это, я использовал summary
память напрямую:
func calcSummary() {
let group = dispatch_group_create()
let queue = dispatch_get_global_queue(QOS_CLASS_USER_INITIATED, 0)
let summaryMem = UnsafeMutableBufferPointer<Int>(start: &summary, count: 10)
for i in 0 ..< 10 {
dispatch_group_async(group, queue, {
let base = i * 50000
for x in base ..< base + 50000 {
summaryMem[i] += self.array[x]
}
})
}
dispatch_group_notify(group, queue, {
println(self.summary)
})
}
Это работает (пока).
ИЗМЕНИТЬ
У Майка С есть очень хороший момент, в его комментарии ниже. Я также нашел этот пост в блоге, который проливает свет на проблему.
Ответ 4
Вы также можете использовать concurrentPerform(iterations: Int, execute work: (Int) -> Swift.Void)
(начиная с Swift 3).
Он имеет гораздо более простой синтаксис:
DispatchQueue.concurrentPerform(iterations: iterations) {i in
performOperation(i)
}
и будет ждать завершения всех потоков до возврата.