Как безопасно использовать потоки Java безопасно без методов isFinite() и isOrdered()?

Возникает вопрос о том, должны ли java-методы возвращать Collections или Streams, в котором Брайан Гетц отвечает, что даже для конечных последовательностей Streams обычно предпочтительнее.

Но мне кажется, что в настоящее время многие операции над потоками, которые приходят из других мест, не могут быть безопасно выполнены, и защитные средства защиты кода невозможны, потому что потоки не показывают, являются ли они бесконечными или неупорядоченными.

Если параллель была проблемой для операций, которые я хочу выполнить в Stream(), я могу вызвать isParallel() для проверки или последовательно, чтобы убедиться, что вычисления выполняются параллельно (если я не забуду).

Но если упорядоченность или ограниченность (размерность) были важны для безопасности моей программы, я не могу написать гарантии.

Предполагая, что я использую библиотеку, реализующую этот вымышленный интерфейс:

public interface CoordinateServer {
    public Stream<Integer> coordinates();
    // example implementations:
    // IntStream.range(0, 100).boxed()   // finite, ordered, sequential
    // final AtomicInteger atomic = new AtomicInteger();
    // Stream.generate(() -> atomic2.incrementAndGet()) // infinite, unordered, sequential
    // Stream.generate(() -> atomic2.incrementAndGet()).parallel() // infinite, unordered, parallel
}

Тогда какие операции я могу безопасно вызвать в этом потоке, чтобы написать правильный алгоритм?

Кажется, если я, возможно, захочу записать элементы в файл как побочный эффект, мне нужно беспокоиться о параллельности потока:

// if stream is parallel, which order will be written to file?
coordinates().peek(i -> {writeToFile(i)}).count();
// how should I remember to always add sequential() in  such cases?

А также, если он параллелен, на основании чего Threadpool это параллельно?

Если я хочу отсортировать поток (или другие операции без короткого замыкания), мне нужно быть осторожным, так как он бесконечен:

coordinates().sorted().limit(1000).collect(toList()); // will this terminate?
coordinates().allMatch(x -> x > 0); // will this terminate?

Я могу наложить ограничение перед сортировкой, но какое магическое число это должно быть, если я ожидаю конечный поток неизвестного размера?

Наконец, возможно, я хочу вычислить параллельно, чтобы сэкономить время, а затем собрать результат:

// will result list maintain the same order as sequential?
coordinates().map(i -> complexLookup(i)).parallel().collect(toList());

Но если поток не упорядочен (в этой версии библиотеки), результат может стать искаженным из-за параллельной обработки. Но как я могу защититься от этого, кроме как не использовать параллель (что отрицательно сказывается на производительности)?

Коллекции явно являются конечными или бесконечными, имеют ли они порядок или нет, и они не несут с собой режим обработки или пулы потоков. Это похоже на ценные свойства для API.

Кроме того, потоки иногда могут быть закрыты, но чаще всего нет. Если я использую поток из метода (или из параметра метода), я должен вообще вызвать close?

Кроме того, потоки, возможно, уже были использованы, и было бы хорошо иметь возможность изящно обработать этот случай, поэтому было бы хорошо проверить, был ли поток уже использован;

Я хотел бы получить фрагмент кода, который можно использовать для проверки предположений о потоке перед его обработкой, например>

Stream<X> stream = fooLibrary.getStream();
Stream<X> safeStream = StreamPreconditions(
    stream, 
    /*maxThreshold or elements before IllegalArgumentException*/
    10_000,
    /* fail with IllegalArgumentException if not ordered */
    true
    )

Ответы

Ответ 1

Посмотрев немного (некоторые эксперименты и здесь), насколько я вижу, нет никакого способа точно определить, конечен ли поток или нет.

Более того, иногда даже это не определяется, кроме как во время выполнения (например, в Java 11 - IntStream.generate(() → 1).takeWhile(x → externalCondition(x))).

Что вы можете сделать, это:

  1. Вы можете с уверенностью узнать, является ли оно конечным, несколькими способами (обратите внимание, что получение ложных значений на них не означает, что оно бесконечно, только то, что это может быть так):

    1. stream.spliterator().getExactSizeIfKnown() - если это имеет известный точный размер, это конечно, иначе это возвратит -1.

    2. stream.spliterator().hasCharacteristics(Spliterator.SIZED) - если это SIZED, вернет true.

  2. Вы можете защитить себя, предполагая худшее (зависит от вашего случая).

    1. stream.sequential()/stream.parallel() - явно установить предпочитаемый тип потребления.
    2. С потенциально бесконечным потоком, предположите ваш худший случай для каждого сценария.

      1. Например, предположим, что вы хотите прослушать поток твитов, пока не найдете один от Venkat - это потенциально бесконечная операция, но вы хотели бы подождать, пока такой твит не будет найден. Так что в этом случае просто перейдите к stream.filter(tweet → isByVenkat(tweet)).findAny() - он будет повторяться до тех пор, пока такой твит не появится (или навсегда).
      2. Другой сценарий, и, возможно, более распространенный сценарий, заключается в желании сделать что-то на всех элементах или только попробовать определенное количество времени (аналогично тайм-ауту). Для этого я рекомендую всегда вызывать stream.limit(x) перед вызовом вашей операции (collect или allMatch или аналогичной), где x - это количество попыток, которые вы готовы терпеть.

После всего этого я просто упомяну, что я думаю, что возвращение потока, как правило, не очень хорошая идея, и я постараюсь избежать этого, если не будет больших выгод.

Ответ 2

Я не очень понимаю, в чем ваш вопрос, кажется, это в основном указывает на возврат потока.

Во-первых, я попытаюсь добавить несколько плюсов к возвращению потока:

  1. Ленивый - вы не тратите ресурсы на копирование или обработку коллекции, если только вызывающая сторона на самом деле не требует их.
  2. Обновления - насколько я понимаю, если перед обработкой потока происходит изменение в базовой коллекции, оно будет отражено в потоке (это также определенно является недостатком).
  3. Поднятые вами вопросы хороши, но чаще всего вы знаете достаточно контекста, чтобы не допустить ошибок, на которые вы указали, насколько это возможно (конечные, параллельные обычно являются вещами, о которых вы можете рассуждать).

Сказав это и исходя из личного опыта, я предлагаю НЕ возвращать поток.

Помимо всех ваших очень важных пунктов, есть также:

  1. Завершение - поскольку поток может быть прерван, вы должны знать, что вы работаете с не прекращенным потоком.
  2. Тестирование - поскольку оно прекращается после потребления, вы не можете захватить его в ArgumentCaptor или аналогичном (как это обычно используется тестируемым вами методом), и вы можете использовать его только один раз в самом тесте.

Это мои 2 цента, могут они помочь :)