Комбинатор потока Java 8 никогда не назывался
Я пишу пользовательский сборщик java 8, который должен вычислять среднее значение POJO, которое имеет метод getValue()
. Здесь код:
public static Collector<BoltAggregationData, BigDecimal[], BigDecimal> avgCollector = new Collector<BoltAggregationData, BigDecimal[], BigDecimal>() {
@Override
public Supplier<BigDecimal[]> supplier() {
return () -> {
BigDecimal[] start = new BigDecimal[2];
start[0] = BigDecimal.ZERO;
start[1] = BigDecimal.ZERO;
return start;
};
}
@Override
public BiConsumer<BigDecimal[], BoltAggregationData> accumulator() {
return (a,b) -> {
a[0] = a[0].add(b.getValue());
a[1] = a[1].add(BigDecimal.ONE);
};
}
@Override
public BinaryOperator<BigDecimal[]> combiner() {
return (a,b) -> {
a[0] = a[0].add(b[0]);
a[1] = a[1].add(b[1]);
return a;
};
}
@Override
public Function<BigDecimal[], BigDecimal> finisher() {
return (a) -> {
return a[0].divide(a[1], 6 , RoundingMode.HALF_UP);
};
}
private final Set<Characteristics> CHARACTERISTICS = new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));
@Override
public Set<Characteristics> characteristics() {
return CHARACTERISTICS;
}
};
Все работает хорошо в непараллельном случае. Однако, когда я использую parallelStream()
, он иногда не работает. Например, учитывая значения от 1 до 10, он вычисляет (53/9 вместо 55/10). При отладке отладчик никогда не ударяет точку останова в функции combiner(). Есть ли какой-то флаг, который мне нужно установить?
Ответы
Ответ 1
Похоже, проблема заключается в характеристике CONCURRENT
, которая делает что-то еще, чем вы могли бы подумать:
Указывает, что этот коллектор является параллельным, что означает, что контейнер результата может поддерживать функцию аккумулятора, являющуюся вызываемый одновременно с одним и тем же контейнером результата из нескольких потоки.
Вместо вызова объединителя аккумулятор вызывается одновременно, используя тот же BigDecimal[] a
для всех потоков. Доступ к a
не является атомарным, поэтому он идет не так:
Thread1 -> retrieves value of a[0]: 3
Thread2 -> retrieves value of a[0]: 3
Thread1 -> adds own value: 3 + 3 = 6
Thread2 -> adds own value: 3 + 4 = 7
Thread1 -> writes 6 to a[0]
Thread2 -> writes 7 to a[0]
Создание значения a[0]
7, когда оно должно быть 10. То же самое может произойти с a[1]
, поэтому результаты могут быть непоследовательными.
Если вы удалите атрибут CONCURRENT
, вместо этого будет использоваться комбайнер.
Ответ 2
Ну, это именно то, что вы запрашиваете при указании Characteristics.CONCURRENT
:
Указывает, что этот коллектор является параллельным, что означает, что контейнер результатов может поддерживать функцию аккумулятора, вызываемую одновременно с одним и тем же контейнером результатов из нескольких потоков.
Если это не так, как с вашим Collector
, вы не должны указывать этот флаг.
В качестве побочного примечания, new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));
является довольно неэффективным для задания характеристик. Вы можете просто использовать EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED)
. Когда вы удаляете неправильный параллельный признак, вы можете использовать либо EnumSet.of(Characteristics.UNORDERED)
, либо Collections.singleton(Characteristics.UNORDERED)
, но HashSet
определенно перебор.