Stream API и Queues: подпишитесь на StreamingQueue stream-style
Скажем, у нас есть Очередь
BlockingQueue<String> queue= new LinkedBlockingQueue<>();
и какой-то другой поток помещает в него значения, тогда мы читаем его как
while (true) {
String next = queue.take();
System.out.println("next message:" + next);
}
Как я могу перебирать эту очередь в стиле потока, сохраняя при этом подобную семантику выше кода.
Этот код проходит только текущее состояние очереди:
queue.stream().forEach(e -> System.out.println(e));
Ответы
Ответ 1
Я догадываюсь о том, чего вы ожидаете, но я думаю, что у меня есть хорошая догадка.
Поток очереди, как итерация по очереди, представляет текущее содержимое очереди. Когда итератор или поток достигает хвоста очереди, он не блокирует ожидающие добавления дополнительных элементов. Итератор или поток исчерпаны в этой точке, и вычисление завершается.
Если вам нужен поток, состоящий из всех текущих и будущих элементов очереди, вы можете сделать что-то вроде этого:
Stream.generate(() -> {
try {
return queue.take();
} catch (InterruptedException ie) {
return "Interrupted!";
}
})
.filter(s -> s.endsWith("x"))
.forEach(System.out::println);
(К сожалению, необходимость обрабатывать InterruptedException
делает это довольно грязным.)
Обратите внимание, что нет возможности закрыть очередь, и нет возможности для Stream.generate
прекратить генерировать элементы, поэтому это фактически бесконечный поток. Единственный способ его прекратить - это операция короткого замыкания, такая как findFirst
.
Ответ 2
Вы можете посмотреть реализацию async Queue. Если у вас есть Java 8, то cyclops-react, я разработчик в этом проекте предоставляет async.Queue, который позволит вам как заполнять, так и потребляют из очереди асинхронно (и чисто).
например.
Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
Или просто (до тех пор, пока это com.aol.simple.react.async.Queue)
Queue<String> queue = new Queue<>();
Затем в отдельном потоке:
new Thread(() -> {
while (true) {
queue.add("New message " + System.currentTimeMillis());
}
}).start();
Вернитесь в основной поток, ваш исходный код должен теперь работать как ожидалось (бесконечно перебирать сообщения, добавляемые в очередь и распечатывать их)
queue.stream().forEach(e -> System.out.println(e));
Очередь и, следовательно, поток могут быть закрыты на любом этапе через -
queue.close();
Ответ 3
Другой подход заключается в создании пользовательского Spliterator. В моем случае у меня есть блокирующая очередь, и я хочу создать поток, который продолжает извлекать элементы, пока блок не истечет. Разделитель выглядит примерно так:
public class QueueSpliterator<T> implements Spliterator<T> {
private final BlockingQueue<T> queue;
private final long timeoutMs;
public QueueSpliterator(final BlockingQueue<T> queue, final long timeoutMs) {
this.queue = queue;
this.timeoutMs = timeoutMs;
}
@Override
public int characteristics() {
return Spliterator.CONCURRENT | Spliterator.NONNULL | Spliterator.ORDERED;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public boolean tryAdvance(final Consumer<? super T> action) {
try {
final T next = this.queue.poll(this.timeoutMs, TimeUnit.MILLISECONDS);
if (next == null) {
return false;
}
action.accept(next);
return true;
} catch (final InterruptedException e) {
throw new SupplierErrorException("interrupted", e);
}
}
@Override
public Spliterator<T> trySplit() {
return null;
}
}
Исключение, созданное для обработки InterruptedException, является расширением RuntimeException. Используя этот класс, можно построить поток через: StreamSupport.stream(новый QueueSpliterator (...))
и добавьте обычные операции потока.