Java 8 Stream - параллельное выполнение - разный результат - почему?
Скажем, у меня есть List<Integer> ints = new ArrayList<>();
и я хочу добавить к нему значения и сравнить результаты параллельного выполнения с использованием forEach()
и Collectors.toList()
.
Сначала я добавляю в этот список некоторые значения из последовательного IntStream и forEach:
IntStream.range(0,10).boxed().forEach(ints::add);
И я получаю правильный результат:
ints ==> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Теперь я .clear()
список и сделать то же самое параллельно:
IntStream.range(0,10).parallel().boxed().forEach(ints::add);
Теперь из-за многопоточности я получаю неверный результат:
ints ==> [6, 5, 8, 9, 7, 2, 4, 3, 1, 0]
Теперь я переключаюсь на сбор одного и того же потока целых чисел:
IntStream.range(0,10).parallel().boxed().collect(Collectors.toList());
И я получаю правильный результат:
ints ==> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Вопрос: Почему два параллельных исполнения дают разные результаты и почему Collector
производит правильный результат?
Если forEach
производит случайный результат, Collector
тоже должен. Я не указывал какую-либо сортировку, и я думаю, что внутренне он добавляет в список, как я сделал вручную, используя forEach
. Поскольку он делает это параллельно, он add
метод, чтобы получить значения в неуказанном порядке. Тестирование сделано в JShell.
EDIT: Нет дубликатов здесь. Я понимаю связанный вопрос. Почему Коллектор дает правильный результат? Если бы он произвел другой случайный результат, я бы не спросил.
Ответы
Ответ 1
Операция collect
будет приводить к неупорядоченному выходу, если Collector
вы передали, имел разные характеристики. То есть, если CONCURRENT
флаги CONCURRENT
и UNORDERED
(см. Collector.characteristics()
).
Под капотом Collectors.toList()
строит Collector
примерно эквивалентный этому:
Collector.of(
// Supplier of accumulators
ArrayList::new,
// Accumulation operation
List::add,
// Combine accumulators
(left, right) -> {
left.addAll(right);
return left;
}
)
Некоторое количество журналов показывает длины, которые будет выполнять операция collect
чтобы поддерживать безопасность потока и порядок потока:
Collector.of(
() -> {
System.out.printf("%s supplying\n", Thread.currentThread().getName());
return new ArrayList<>();
},
(l, o) -> {
System.out.printf("%s accumulating %s to %s\n", Thread.currentThread().getName(), o, l);
l.add(o);
},
(l1, l2) -> {
System.out.printf("%s combining %s & %s\n", Thread.currentThread().getName(), l1, l2);
l1.addAll(l2);
return l1;
}
)
журналы:
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-0 accumulating 2 to []
ForkJoinPool-1-worker-1 accumulating 6 to []
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-0 accumulating 4 to []
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 5 to []
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-0 accumulating 3 to []
ForkJoinPool-1-worker-0 combining [3] & [4]
ForkJoinPool-1-worker-0 combining [2] & [3, 4]
ForkJoinPool-1-worker-1 combining [5] & [6]
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-0 accumulating 1 to []
ForkJoinPool-1-worker-1 accumulating 8 to []
ForkJoinPool-1-worker-0 supplying
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 9 to []
ForkJoinPool-1-worker-1 combining [8] & [9]
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 7 to []
ForkJoinPool-1-worker-1 combining [7] & [8, 9]
ForkJoinPool-1-worker-1 combining [5, 6] & [7, 8, 9]
ForkJoinPool-1-worker-0 accumulating 0 to []
ForkJoinPool-1-worker-0 combining [0] & [1]
ForkJoinPool-1-worker-0 combining [0, 1] & [2, 3, 4]
ForkJoinPool-1-worker-0 combining [0, 1, 2, 3, 4] & [5, 6, 7, 8, 9]
Вы можете видеть, что каждое чтение из потока записывается в новый накопитель и что они тщательно объединены для поддержания порядка.
Если мы установим CONCURRENT
флаги CONCURRENT
и UNORDERED
метод collect может принимать быстрые клавиши; выделяется только один аккумулятор, и упорядоченная комбинация не нужна.
С помощью:
Collector.of(
() -> {
System.out.printf("%s supplying\n", Thread.currentThread().getName());
return Collections.synchronizedList(new ArrayList<>());
},
(l, o) -> {
System.out.printf("%s accumulating %s to %s\n", Thread.currentThread().getName(), o, l);
l.add(o);
},
(l1, l2) -> {
System.out.printf("%s combining %s & %s\n", Thread.currentThread().getName(), l1, l2);
l1.addAll(l2);
return l1;
},
Characteristics.CONCURRENT,
Characteristics.UNORDERED
)
Журналы:
ForkJoinPool-1-worker-1 supplying
ForkJoinPool-1-worker-1 accumulating 6 to []
ForkJoinPool-1-worker-0 accumulating 2 to [6]
ForkJoinPool-1-worker-1 accumulating 5 to [6, 2]
ForkJoinPool-1-worker-0 accumulating 4 to [6, 2, 5]
ForkJoinPool-1-worker-0 accumulating 3 to [6, 2, 5, 4]
ForkJoinPool-1-worker-0 accumulating 1 to [6, 2, 5, 4, 3]
ForkJoinPool-1-worker-0 accumulating 0 to [6, 2, 5, 4, 3, 1]
ForkJoinPool-1-worker-1 accumulating 8 to [6, 2, 5, 4, 3, 1, 0]
ForkJoinPool-1-worker-0 accumulating 7 to [6, 2, 5, 4, 3, 1, 0, 8]
ForkJoinPool-1-worker-1 accumulating 9 to [6, 2, 5, 4, 3, 1, 0, 8, 7]
Ответ 2
Во-первых, я бы посоветовал пройти через Почему общая изменчивость плохой? ,
Во-вторых, есть пример, предоставленный авторами в разделе "Побочные эффекты", который в значительной степени делает что-то похожее на то, что вы делаете:
В качестве примера того, как преобразовать конвейер потока, который ненадлежащим образом использует побочные эффекты для одного, который этого не делает, следующий код ищет поток строк для тех, которые соответствуют данному регулярному выражению, и помещает совпадения в список.
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
.forEach(s -> results.add(s)); // Unnecessary use of side-effects!
Если выполняется параллельно, безопасность без потока ArrayList приведет к неправильным результатам, и добавление необходимой синхронизации вызовет конфликт, подрывая преимущество параллелизма. Кроме того, использование побочных эффектов здесь совершенно не нужно; forEach() можно просто заменить на операцию сокращения, которая является более безопасной, более эффективной и более поддающейся распараллеливанию:
List<String>results =
stream.filter(s -> pattern.matcher(s).matches())
.collect(Collectors.toList()); // No side-effects!
Итак, вы все еще можете спросить: "Почему коллекционер производит правильный результат?".
Просто потому, что у авторов уже есть что-то для решения параллелизма.
Ответ 3
Во-первых, forEach
документируется как:
Поведение этой операции явно недетерминировано
Таким образом, в будущей версии jdk даже ваш непараллельный код может создавать "неправильные" результаты, то есть результаты вне порядка. В рамках текущей реализации только параллельная версия будет давать такие результаты; но опять же это не является гарантией, forEach
волен делать все, что хочет, чтобы внутри, в отличие от forEachOrdered
, например.
Сохранение порядка или нет - это не свойство последовательного или параллельного, оно зависит исключительно от операции, нарушающей этот порядок или нет; что он (например, явно вызывает unordered
).
Collectors.toList
с другой стороны, является терминальной операцией, которая сохраняет порядок. Как правило, если терминальная операция не является явной в нем документацией о порядке, она будет ее хранить. Так, например, см. Stream::generate
:
Возвращает бесконечный последовательный неупорядоченный поток.
При этом, как правило, существует два порядка, порядок обработки промежуточных операций и терминальные операции. Первые не определены, вы можете изменить свой пример и проверить:
IntStream.range(0,10)
.parallel()
.peek(System.out::println) // out of order printing
.boxed()
.collect(Collectors.toList());
в то время как порядок операций с терминалом сохраняется.
И последний момент состоит в том, что это:
....parallel().forEach(ints::add)
вам просто повезло даже увидеть все элементы в первую очередь. Вы добавляете из разных потоков несколько элементов в не-поточную безопасную коллекцию (ArrayList
); вы могли бы легко пропустить элементы или иметь нули в своих ints
. Держу пари, что это происходит несколько раз, докажу это.
Даже если вы перейдете к предложению Collections.synchronizedList(yourList)
, порядок, в котором они будут отображаться, по-прежнему не определен, по причинам, указанным выше о forEach