Будет ли параллельный поток работать отлично с отличной работой?
Я читал о безгражданстве и наткнулся на это в документе:
Результаты потокового конвейера могут быть недетерминированными или неправильными, если поведенческие параметры для операций потока являются состоящими. Лямбда с состоянием (или другой объект, реализующий соответствующий функциональный интерфейс) - это тот, чей результат зависит от любого состояния, которое может измениться во время выполнения потокового конвейера.
Теперь, если у меня есть список строк (скажем, strList
), а затем я пытаюсь удалить дублирующиеся строки из него, используя параллельные потоки, следующим образом:
List<String> resultOne = strList.parallelStream().distinct().collect(Collectors.toList());
или если мы хотим, чтобы регистр не учитывался:
List<String> result2 = strList.parallelStream().map(String::toLowerCase)
.distinct().collect(Collectors.toList());
Может ли этот код иметь какие-либо проблемы, так как параллельные потоки будут разделять входные данные и отличать их в одном фрагменте, не обязательно означать отличные во всем входном сигнале?
РЕДАКТИРОВАТЬ (Краткое резюме ответов ниже)
distinct
является операция с состоянием, и в случае промежуточных операций с состоянием параллельные потоки могут потребовать многократных проходов или существенных издержек буферизации. Также distinct
могут быть реализованы более эффективно, если упорядочение элементов не имеет значения. Также согласно документу:
Для упорядоченных потоков выбор отдельных элементов является стабильным (для дублированных элементов элемент, появляющийся первым в порядке встречи, сохраняется.) Для неупорядоченных потоков гарантии стабильности не предоставляются.
Но в случае, когда упорядоченный поток, работающий в параллельном режиме, различные могут быть нестабильными - это означает, что он будет содержать произвольный элемент в случае дубликатов и не обязательно первый, как ожидалось, из distinct
от других.
По ссылке:
Внутренне, операция Different() сохраняет Set, который содержит элементы, которые были замечены ранее, но он скрыт внутри операции, и мы не можем получить его из кода приложения.
Так что в случае параллельных потоков он, вероятно, будет использовать весь поток или может использовать CHM (например, ConcurrentHashMap.newKeySet()
). А для заказанных, скорее всего, он будет использовать LinkedHashSet
или аналогичный конструкт.
Ответы
Ответ 1
Приблизительно указав соответствующие части doc
(Акцент, мой):
Промежуточные операции далее делятся на лица без гражданства и операции с состоянием. Операции без сохранения состояния, такие как фильтр и карта, не сохранять состояние от ранее увиденного элемента при обработке нового element - каждый элемент может обрабатываться независимо от операций на других элементах. Операции с состоянием, такие как отдельные и отсортированные, может включать состояние из ранее замеченных элементов при обработке новые элементы
Операции с состоянием, возможно, должны обработать весь ввод перед производя результат. Например, нельзя получить какие-либо результаты от сортировка потока до тех пор, пока не будут видны все элементы потока. Как результат, при параллельном вычислении, некоторые конвейеры, содержащие промежуточные операции могут потребовать многократных передач данных или могут нужно буферизовать важные данные. Трубопроводы, содержащие исключительно промежуточные операции без сохранения состояния могут быть обработаны за один проход, последовательный или параллельный, с минимальной буферизацией данных
Если вы читаете дальше (раздел о заказе):
Потоки могут иметь или не иметь определенный порядок встречи. Так или иначе поток имеет порядок встречи, зависит от источника и промежуточные операции. Некоторые источники потоков (такие как List или Массивы) изначально упорядочены, тогда как другие (такие как HashSet) не. Некоторые промежуточные операции, такие как sorted(), могут налагать встретить порядок в неупорядоченном потоке, а другие могут сделать упорядоченный поток неупорядоченным, например BaseStream.unordered(). Кроме того, некоторые операции терминала могут игнорировать порядок встречи, например Foreach().
...
Для параллельных потоков ослабление ограничения порядка может иногда включить более эффективное исполнение. Некоторые совокупные операции, такие как фильтрация дубликатов (четких()) или сгруппированных сокращений (Collectors.groupingBy()) может быть реализован более эффективно, если порядок элементов не имеет значения. Точно так же операции, которые внутренне привязанный к порядку встреч, такой как limit(), может потребовать буферизация для обеспечения правильного упорядочения, подрывая выгоду параллелизм. В тех случаях, когда поток имеет порядок встречи, но пользователь не особо заботится об этом порядке встречи, явно упорядочение потока с помощью unordered() может улучшить параллелизм производительность для некоторых операций с состоянием или терминалов. Тем не менее, большинство потоковые конвейеры, такие как пример "суммы весов блоков" выше, по-прежнему эффективно распараллеливать даже при ограничении порядка.
В заключение,
- Функция отлично будет работать с параллельными потоками, но, как вы, возможно, уже знаете, для продолжения необходимо использовать весь поток, что может занять много памяти.
- Если источником элементов является неупорядоченная коллекция (например, hashset) или поток
unordered()
, то distinct
не беспокоится о порядке вывода и, следовательно, будет эффективен
Решение состоит в том, чтобы добавить .unordered()
в конвейер потока, если вы не беспокоитесь о порядке и хотите повысить производительность.
List<String> result2 = strList.parallelStream()
.unordered()
.map(String::toLowerCase)
.distinct()
.collect(Collectors.toList());
Увы, в Java нет (доступного встроенного) одновременного хэш-набора (если они не стали умными с ConcurrentHashMap
), поэтому я могу только оставить вас с прискорбной возможностью того, что Different реализован блокирующим способом, используя обычный Java установлен. В этом случае я не вижу никакой пользы от проведения параллельных различий.
Изменение: я говорил слишком рано. Там может быть некоторая выгода с использованием параллельных потоков с различными. Похоже, что distinct
реализован с большей сообразительностью, чем я изначально думал. Смотрите @Eugene ответ.
Ответ 2
Проблем не будет (проблема как неправильный результат), но, как отмечает API,
Сохранение стабильности для отдельных() в параллельных трубопроводах относительно дорого
Но если производительность вызывает беспокойство, и если стабильность не является проблемой (т.е. Результат имеет другой порядок элементов в отношении обрабатываемой коллекции), то вы следуете за примечанием API
удаление ограничения порядка с помощью BaseStream.unordered() может привести к значительно более эффективному выполнению для отдельных() параллельных конвейеров,
Я думал, почему бы не сравнить производительность параллельных и последовательных потоков для distinct
public static void main(String[] args) {
List<String> strList = Arrays.asList("cat", "nat", "hat", "tat", "heart", "fat", "bat", "lad", "crab", "snob");
List<String> words = new Vector<>();
int wordCount = 1_000_000; // no. of words in the list words
int avgIter = 10; // iterations to run to find average running time
//populate a list randomly with the strings in 'strList'
for (int i = 0; i < wordCount; i++)
words.add(strList.get((int) Math.round(Math.random() * (strList.size() - 1))));
//find out average running times
long starttime, pod = 0, pud = 0, sod = 0;
for (int i = 0; i < avgIter; i++) {
starttime = System.currentTimeMillis();
List<String> parallelOrderedDistinct = words.parallelStream().distinct().collect(Collectors.toList());
pod += System.currentTimeMillis() - starttime;
starttime = System.currentTimeMillis();
List<String> parallelUnorderedDistinct =
words.parallelStream().unordered().distinct().collect(Collectors.toList());
pud += System.currentTimeMillis() - starttime;
starttime = System.currentTimeMillis();
List<String> sequentialOrderedDistinct = words.stream().distinct().collect(Collectors.toList());
sod += System.currentTimeMillis() - starttime;
}
System.out.println("Parallel ordered time in ms: " + pod / avgIter);
System.out.println("Parallel unordered time in ms: " + pud / avgIter);
System.out.println("Sequential implicitly ordered time in ms: " + sod / avgIter);
}
Вышеприведенное было скомпилировано open-jdk 8 и запущено на openjdk jre 8 (без конкретных jvm-аргументов) на i3 6-го поколения (4 логических ядра), и я получил эти результаты
Казалось, что после определенного нет. элементов, упорядоченная параллель была быстрее и иронически параллельная неупорядоченная была самой медленной. Причина этого (благодаря @Hulk) заключается в том, как она реализована (с использованием HashSet). Таким образом, общее правило заключается в том, что если вы несколько элементов и много дублирования на несколько величин больше, вы можете выиграть от parallel()
.
1)
Parallel ordered time in ms: 52
Parallel unordered time in ms: 81
Sequential implicitly ordered time in ms: 35
2)
Parallel ordered time in ms: 48
Parallel unordered time in ms: 83
Sequential implicitly ordered time in ms: 34
3)
Parallel ordered time in ms: 36
Parallel unordered time in ms: 70
Sequential implicitly ordered time in ms: 32
Неупорядоченная параллель была в два раза медленнее, чем обе.
Затем я поднял wordCount
до 5_000_000
и это были результаты
1)
Parallel ordered time in ms: 93
Parallel unordered time in ms: 363
Sequential implicitly ordered time in ms: 123
2)
Parallel ordered time in ms: 100
Parallel unordered time in ms: 363
Sequential implicitly ordered time in ms: 124
3)
Parallel ordered time in ms: 89
Parallel unordered time in ms: 365
Sequential implicitly ordered time in ms: 118
а затем до 10_000_000
1)
Parallel ordered time in ms: 148
Parallel unordered time in ms: 725
Sequential implicitly ordered time in ms: 218
2)
Parallel ordered time in ms: 150
Parallel unordered time in ms: 749
Sequential implicitly ordered time in ms: 224
3)
Parallel ordered time in ms: 143
Parallel unordered time in ms: 743
Sequential implicitly ordered time in ms: 222
Ответ 3
Вы, кажется, пропустите немало вещей из документации, которую вы предоставляете, и фактического примера.
Результаты трубопровода потока могут быть недетерминированы или неправильно, если поведенческие параметры для операций потока являются состоянием.
В вашем примере у вас нет определенных действий с состоянием, определенных вами. Stateful в документе означает, что тот, вы определяете, а не те, которые реализуются jdk
сам - как distinct
в вашем примере. Но в любом случае вы могли бы определить корректную работу с состоянием, даже такой пример может представлять Стюарт Маркс - работающий на Oracle/Java.
Таким образом, вы более чем хорошо в примерах, которые вы предоставляете, будь то параллельно или нет.
Дорогая часть distinct
(параллельно) исходит из того факта, что внутренне должна быть потокобезопасная структура данных, которая будет содержать отдельные элементы; в случае jdk это ConcurrentHashMap
используется в случае, если порядок не имеет значения, или сокращение с помощью LinkedHashSet
когда порядок имеет значение.
distinct
битва - довольно умная реализация, она выглядит, если ваш источник потока уже разный (в таком случае это нет-op) или выглядит, если ваши данные отсортированы, и в этом случае он будет делать более умный обход источника (поскольку он знает, что если вы видели один элемент, следующий - либо тот, который вы только что видели, либо другой), либо с помощью ConcurrentHashMap
внутри и т.д.
Ответ 4
Из javadocs, parallelStream()
Возвращает возможно параллельный поток с этой коллекцией в качестве источника. Для этого метода можно возвращать последовательный поток.
Спектакль:
- Давайте рассмотрим, что у нас есть многократный поток (к счастью), который передается различным ядрам ЦП.
ArrayList<T>
который имеет внутреннее представление данных, основанное на массиве. Или LinkedList<T>
которому требуется больше вычислений для разделения для обработки параллельно. ArrayList<T>
лучше в этом случае! -
stream.unordered().parallel().distinct()
имеет лучшую производительность, чем stream.parallel().distinct()
Сохранение стабильности для отдельных() параллельных трубопроводов относительно дорого (требуется, чтобы операция выполнялась как полный барьер с существенными накладными расходами).
Таким образом, в вашем случае это не должно быть проблемой (если ваш List<T>
не заботится о порядке). Ниже приводятся объяснения,
Допустим, у вас есть 4 элемента в ArrayList, {"a", "b", "a", "b"}
Теперь, если вы не используете parallelStream()
перед вызовом функции distinct()
, сохраняется только String в позициях 0 и 1. (Сохраняет порядок, Последовательный поток)
Else (если вы используете parallelStream().distinct()
), то элементы в 1 и 2 могут сохраняться как разные (это неустойчиво, но результат такой же {"a," b "}, или он может даже быть {" б", "а"}).
Нестабильная отдельная операция будет случайным образом удалять дубликаты.
В заключение,
при параллельных вычислениях некоторые конвейеры, содержащие промежуточные операции с промежуточным состоянием, могут потребовать нескольких проходов на данных или могут потребовать буферизации значительных данных
Ответ 5
distinct()
привязана к порядку. Таким образом, затраты времени и пространства для параллельного выполнения, когда операции привязаны к порядку, будут очень высокими. Очевидные последовательные реализации являются простыми, эффективными и требуют почти никаких накладных расходов, но параллельные реализации сложны и часто связаны с значительным ожиданием и буферизацией. Однако функциональность работает, как ожидается, с уменьшением отдачи. Таким образом, вы тратите больше ресурсов, чтобы выполнить работу и потратили намного больше времени, чем последовательный аналог.