Java 8 lambda api
Я работаю, чтобы перейти с Rx Java на Java 8 lambdas. Один из примеров, который я не могу найти, - это способ буферизации запросов. Например, в Rx Java я могу сказать следующее.
Observable.create(getIterator()).buffer(20, 1000, TimeUnit. MILLISECONDS).doOnNext(list -> doWrite(list));
Где мы буферизируем 20 элементов в список или тайм-аут в 1000 миллисекунд, который когда-либо случается раньше.
Наблюдаемые в RX - это стиль "push", который можно наблюдать, когда Streams использует java pull. Возможно ли это реализовать мою собственную операцию с картами в потоках или неумение испускать проблемы с этим, поскольку doOnNext
должен опросить предыдущий элемент?
Ответы
Ответ 1
Один из способов сделать это - использовать BlockingQueue и Guava. Используя Queues.drain
, вы можете создать Collection
, который вы могли бы затем вызвать stream()
, и выполнить свои преобразования. Здесь ссылка: Guava Queues.drain
И вот быстрый пример:
public void transform(BlockingQueue<Something> input)
{
List<Something> buffer = new ArrayList<>(20);
Queues.drain(input, buffer, 20, 1000, TimeUnit.MILLISECONDS);
doWrite(buffer);
}
Ответ 2
simple-react имеет похожие операторы, но не этот точный. Это довольно растяжимо, поэтому, возможно, вам придется писать свои собственные. С предостережением, что я не написал это в IDE или не тестировал его, примерно буфер по размеру с оператором тайм-аута для простой реакции выглядел бы примерно так.
import com.aol.simple.react.async.Queue;
import com.aol.simple.react.stream.traits.LazyFutureStream;
import com.aol.simple.react.async.Queue.ClosedQueueException;
import com.aol.simple.react.util.SimpleTimer;
import java.util.concurrent.TimeUnit;
static LazyFutureStream batchBySizeAndTime(LazyFutureStream stream,int size,long time, TimeUnit unit) {
Queue queue = stream.toQueue();
Function<Supplier<U>, Supplier<Collection<U>>> fn = s -> {
return () -> {
SimpleTimer timer = new SimpleTimer();
List<U> list = new ArrayList<>();
try {
do {
if(list.size()==size())
return list;
list.add(s.get());
} while (timer.getElapsedNanoseconds()<unit.toNanos(time));
} catch (ClosedQueueException e) {
throw new ClosedQueueException(list);
}
return list;
};
};
return stream.fromStream(queue.streamBatch(stream.getSubscription(), fn));
}