Как безопасно использовать потоки Java безопасно без методов isFinite() и isOrdered()?
Возникает вопрос о том, должны ли java-методы возвращать Collections или Streams, в котором Брайан Гетц отвечает, что даже для конечных последовательностей Streams обычно предпочтительнее.
Но мне кажется, что в настоящее время многие операции над потоками, которые приходят из других мест, не могут быть безопасно выполнены, и защитные средства защиты кода невозможны, потому что потоки не показывают, являются ли они бесконечными или неупорядоченными.
Если параллель была проблемой для операций, которые я хочу выполнить в Stream(), я могу вызвать isParallel() для проверки или последовательно, чтобы убедиться, что вычисления выполняются параллельно (если я не забуду).
Но если упорядоченность или ограниченность (размерность) были важны для безопасности моей программы, я не могу написать гарантии.
Предполагая, что я использую библиотеку, реализующую этот вымышленный интерфейс:
public interface CoordinateServer {
public Stream<Integer> coordinates();
// example implementations:
// IntStream.range(0, 100).boxed() // finite, ordered, sequential
// final AtomicInteger atomic = new AtomicInteger();
// Stream.generate(() -> atomic2.incrementAndGet()) // infinite, unordered, sequential
// Stream.generate(() -> atomic2.incrementAndGet()).parallel() // infinite, unordered, parallel
}
Тогда какие операции я могу безопасно вызвать в этом потоке, чтобы написать правильный алгоритм?
Кажется, если я, возможно, захочу записать элементы в файл как побочный эффект, мне нужно беспокоиться о параллельности потока:
// if stream is parallel, which order will be written to file?
coordinates().peek(i -> {writeToFile(i)}).count();
// how should I remember to always add sequential() in such cases?
А также, если он параллелен, на основании чего Threadpool это параллельно?
Если я хочу отсортировать поток (или другие операции без короткого замыкания), мне нужно быть осторожным, так как он бесконечен:
coordinates().sorted().limit(1000).collect(toList()); // will this terminate?
coordinates().allMatch(x -> x > 0); // will this terminate?
Я могу наложить ограничение перед сортировкой, но какое магическое число это должно быть, если я ожидаю конечный поток неизвестного размера?
Наконец, возможно, я хочу вычислить параллельно, чтобы сэкономить время, а затем собрать результат:
// will result list maintain the same order as sequential?
coordinates().map(i -> complexLookup(i)).parallel().collect(toList());
Но если поток не упорядочен (в этой версии библиотеки), результат может стать искаженным из-за параллельной обработки. Но как я могу защититься от этого, кроме как не использовать параллель (что отрицательно сказывается на производительности)?
Коллекции явно являются конечными или бесконечными, имеют ли они порядок или нет, и они не несут с собой режим обработки или пулы потоков. Это похоже на ценные свойства для API.
Кроме того, потоки иногда могут быть закрыты, но чаще всего нет. Если я использую поток из метода (или из параметра метода), я должен вообще вызвать close?
Кроме того, потоки, возможно, уже были использованы, и было бы хорошо иметь возможность изящно обработать этот случай, поэтому было бы хорошо проверить, был ли поток уже использован;
Я хотел бы получить фрагмент кода, который можно использовать для проверки предположений о потоке перед его обработкой, например>
Stream<X> stream = fooLibrary.getStream();
Stream<X> safeStream = StreamPreconditions(
stream,
/*maxThreshold or elements before IllegalArgumentException*/
10_000,
/* fail with IllegalArgumentException if not ordered */
true
)
Ответы
Ответ 1
Посмотрев немного (некоторые эксперименты и здесь), насколько я вижу, нет никакого способа точно определить, конечен ли поток или нет.
Более того, иногда даже это не определяется, кроме как во время выполнения (например, в Java 11 - IntStream.generate(() → 1).takeWhile(x → externalCondition(x))
).
Что вы можете сделать, это:
-
Вы можете с уверенностью узнать, является ли оно конечным, несколькими способами (обратите внимание, что получение ложных значений на них не означает, что оно бесконечно, только то, что это может быть так):
-
stream.spliterator().getExactSizeIfKnown()
- если это имеет известный точный размер, это конечно, иначе это возвратит -1.
-
stream.spliterator().hasCharacteristics(Spliterator.SIZED)
- если это SIZED
, вернет true.
-
Вы можете защитить себя, предполагая худшее (зависит от вашего случая).
-
stream.sequential()/stream.parallel()
- явно установить предпочитаемый тип потребления. -
С потенциально бесконечным потоком, предположите ваш худший случай для каждого сценария.
- Например, предположим, что вы хотите прослушать поток твитов, пока не найдете один от Venkat - это потенциально бесконечная операция, но вы хотели бы подождать, пока такой твит не будет найден. Так что в этом случае просто перейдите к
stream.filter(tweet → isByVenkat(tweet)).findAny()
- он будет повторяться до тех пор, пока такой твит не появится (или навсегда). - Другой сценарий, и, возможно, более распространенный сценарий, заключается в желании сделать что-то на всех элементах или только попробовать определенное количество времени (аналогично тайм-ауту). Для этого я рекомендую всегда вызывать
stream.limit(x)
перед вызовом вашей операции (collect
или allMatch
или аналогичной), где x
- это количество попыток, которые вы готовы терпеть.
После всего этого я просто упомяну, что я думаю, что возвращение потока, как правило, не очень хорошая идея, и я постараюсь избежать этого, если не будет больших выгод.
Ответ 2
Я не очень понимаю, в чем ваш вопрос, кажется, это в основном указывает на возврат потока.
Во-первых, я попытаюсь добавить несколько плюсов к возвращению потока:
- Ленивый - вы не тратите ресурсы на копирование или обработку коллекции, если только вызывающая сторона на самом деле не требует их.
- Обновления - насколько я понимаю, если перед обработкой потока происходит изменение в базовой коллекции, оно будет отражено в потоке (это также определенно является недостатком).
- Поднятые вами вопросы хороши, но чаще всего вы знаете достаточно контекста, чтобы не допустить ошибок, на которые вы указали, насколько это возможно (конечные, параллельные обычно являются вещами, о которых вы можете рассуждать).
Сказав это и исходя из личного опыта, я предлагаю НЕ возвращать поток.
Помимо всех ваших очень важных пунктов, есть также:
- Завершение - поскольку поток может быть прерван, вы должны знать, что вы работаете с не прекращенным потоком.
- Тестирование - поскольку оно прекращается после потребления, вы не можете захватить его в
ArgumentCaptor
или аналогичном (как это обычно используется тестируемым вами методом), и вы можете использовать его только один раз в самом тесте.
Это мои 2 цента, могут они помочь :)