Как правильно прочитать Flux <DataBuffer> и преобразовать его в один входной поток
Я использую WebClient
и пользовательский BodyExtractor
класс для моего приложения spring -boot
WebClient webLCient = WebClient.create();
webClient.get()
.uri(url, params)
.accept(MediaType.APPLICATION.XML)
.exchange()
.flatMap(response -> {
return response.body(new BodyExtractor());
})
BodyExtractor.java
@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
Flux<DataBuffer> body = response.getBody();
body.map(dataBuffer -> {
try {
JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
Unmarshaller unmarshaller = jc.createUnmarshaller();
return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
} catch(Exception e){
return null;
}
}).next();
}
Выше код работает с небольшой полезной нагрузкой, но не с большой полезной нагрузкой, я думаю, потому что я только читаю одно значение потока с помощью next
, и я не уверен, как объединить и прочитать все dataBuffer
.
Я новичок в реакторе, поэтому я не знаю много трюков с флюсом/моно.
Ответы
Ответ 1
Мне удалось заставить его работать, используя Flux#collect
и SequenceInputStream
@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
Flux<DataBuffer> body = response.getBody();
return body.collect(InputStreamCollector::new, (t, dataBuffer)-> t.collectInputStream(dataBuffer.asInputStream))
.map(inputStream -> {
try {
JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
Unmarshaller unmarshaller = jc.createUnmarshaller();
return (T) unmarshaller.unmarshal(inputStream);
} catch(Exception e){
return null;
}
}).next();
}
InputStreamCollector.java
public class InputStreamCollector {
private InputStream is;
public void collectInputStream(InputStream is) {
if (this.is == null) this.is = is;
this.is = new SequenceInputStream(this.is, is);
}
public InputStream getInputStream() {
return this.is;
}
}
Ответ 2
Слегка измененная версия ответа Bk Santiago использует reduce()
вместо collect()
. Очень похоже, но не требует дополнительного класса:
Джава:
body.reduce(new InputStream() {
public int read() { return -1; }
}, (s: InputStream, d: DataBuffer) -> new SequenceInputStream(s, d.asInputStream())
).flatMap(inputStream -> /* do something with single InputStream */
Или Котлин
body.reduce(object : InputStream() {
override fun read() = -1
}) { s: InputStream, d -> SequenceInputStream(s, d.asInputStream()) }
.flatMap { inputStream -> /* do something with single InputStream */ }
Преимущество этого подхода перед использованием collect()
заключается в том, что вам просто не нужен другой класс для сбора информации.
Я создал новый пустой InputStream()
, но если этот синтаксис сбивает с толку, вы также можете заменить его на ByteArrayInputStream("".toByteArray())
вместо этого, чтобы вместо этого создать пустой ByteArrayInputStream
качестве исходного значения.
Ответ 3
Это действительно не так сложно, как подразумевают другие ответы.
Единственный способ потоковой передачи данных без буферизации всего в памяти - это использовать канал, как предложено @jin-kwon. Однако это можно сделать очень просто с помощью служебных классов Spring BodyExtractors и DataBufferUtils.
Пример:
private InputStream readAsInputStream(String url) throws IOException {
PipedOutputStream osPipe = new PipedOutputStream();
PipedInputSteam isPipe = new PipedInputStream(osPipe);
ClientResponse response = webClient.get().uri(url)
.accept(MediaType.APPLICATION.XML)
.exchange()
.block();
final int statusCode = response.rawStatusCode();
// check HTTP status code, can throw exception if needed
// ....
Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers())
.doOnError(t -> {
log.error("Error reading body.", t);
// close pipe to force InputStream to error,
// otherwise the returned InputStream will hang forever if an error occurs
try(isPipe) {
//no-op
} catch (IOException ioe) {
log.error("Error closing streams", ioe);
}
})
.doFinally(s -> {
try(osPipe) {
//no-op
} catch (IOException ioe) {
log.error("Error closing streams", ioe);
}
});
DataBufferUtils.write(body, osPipe)
.subscribe(DataBufferUtils.releaseConsumer());
return isPipe;
}
Если вам не нужна проверка кода ответа или создание исключения для кода состояния сбоя, вы можете пропустить вызов block()
и промежуточную переменную ClientResponse
, используя
flatMap(r -> r.body(BodyExtractors.toDataBuffers()))
вместо этого.
Ответ 4
Восстановление InputStream
приводит к победе в использовании WebClient
, потому что ничего не будет выдано до завершения операции collect
. Для большого потока это может быть очень долгое время. Реактивная модель не касается отдельных байтов, а блоков байтов (например, Spring DataBuffer
). См. Мой ответ здесь для более элегантного решения: fooobar.com/questions/847185/...
Ответ 5
Вы можете использовать pipes.
static <R> Mono<R> pipeAndApply(
final Publisher<DataBuffer> source, final Executor executor,
final Function<? super ReadableByteChannel, ? extends R> function) {
return using(Pipe::open,
p -> {
executor.execute(() -> write(source, p.sink())
.doFinally(s -> {
try {
p.sink().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.sink", ioe);
throw new RuntimeException(ioe);
}
})
.subscribe(releaseConsumer()));
return just(function.apply(p.source()));
},
p -> {
try {
p.source().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.source", ioe);
throw new RuntimeException(ioe);
}
});
}
Или используя CompletableFuture
,
static <R> Mono<R> pipeAndApply(
final Publisher<DataBuffer> source,
final Function<? super ReadableByteChannel, ? extends R> function) {
return using(Pipe::open,
p -> fromFuture(supplyAsync(() -> function.apply(p.source())))
.doFirst(() -> write(source, p.sink())
.doFinally(s -> {
try {
p.sink().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.sink", ioe);
throw new RuntimeException(ioe);
}
})
.subscribe(releaseConsumer())),
p -> {
try {
p.source().close();
} catch (final IOException ioe) {
log.error("failed to close pipe.source", ioe);
throw new RuntimeException(ioe);
}
});
}
Ответ 6
Вот еще один вариант из других ответов. И это все еще не благоприятно для памяти.
static Mono<InputStream> asStream(WebClient.ResponseSpec response) {
return response.bodyToFlux(DataBuffer.class)
.map(b -> b.asInputStream(true))
.reduce(SequenceInputStream::new);
}
static void doSome(WebClient.ResponseSpec response) {
asStream(response)
.doOnNext(stream -> {
// do some with stream
})
.block();
}