Ответ 1
Есть несколько способов, которые можно рассматривать как удобный способ исключения:
Обработайте ваш элемент, используя Flux/Mono.handle
Одним из способов, который может упростить обработку элемента, который может привести к ошибке или пустому потоку, является оператор handle
.
Следующий код показывает, как мы можем использовать его для решения нашей проблемы:
Mono.just(userId)
.map(repo::findById)
.handle((user, sink) -> {
if(!isValid(user)){
sink.error(new InvalidUserException());
} else if (isSendable(user))
sink.next(user);
}
else {
//just ignore element
}
})
как мы видим, оператору .handle
требуется передать BiConsumer<T, SynchronousSink<>
для обработки элемента. Здесь у нас есть два параметра в нашем BiConsumer. Первый - это элемент из вышестоящего потока, а второй - SynchronousSink
, который помогает нам синхронно поставлять элемент в нисходящий поток. Такая техника расширяет возможности предоставления различных результатов обработки наших элементов. Например, если элемент недействителен, мы можем передать ошибку тому же SycnchronousSync
, который отменит восходящий поток и выдаст сигнал onError
для нисходящего потока. В свою очередь, мы можем "фильтровать", используя тот же оператор handle
. Как только дескриптор BiConsumer
будет выполнен и элемент не будет предоставлен, Reactor будет рассматривать это как своего рода фильтрацию и запросит для нас дополнительный элемент. Наконец, в случае, если элемент действителен, мы можем просто вызвать SynchronousSink#next
и распространить наш элемент вниз по потоку или применить к нему некоторое отображение, поэтому мы будем использовать handle
в качестве оператора map
здесь. Более того, мы можем безопасно использовать этот оператор без влияния на производительность и обеспечить сложную проверку элемента, такую как проверка элемента или отправка ошибки в нисходящий поток.
Броски с использованием #concatMap
+ Mono.error
Один из вариантов создания исключения во время отображения - заменить map
на concatMap
. По своей сути, concatMap
делает почти то же самое, что и flatMap
. Единственное отличие состоит в том, что concatMap
допускает только один подпоток за раз. Такое поведение значительно упрощает внутреннюю реализацию и не влияет на производительность. Таким образом, мы можем использовать следующий код для более функционального исключения:
Mono.just(userId)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Mono.error(new InvalidUserException());
}
return Mono.just(user);
})
В приведенном выше примере в случае недопустимого пользователя мы возвращаем исключение, используя Mono.error
. То же самое мы можем сделать для потока, используя Flux.error
:
Flux.just(userId1, userId2, userId3)
.map(repo::findById)
.concatMap(user-> {
if(!isValid(user)){
return Flux.error(new InvalidUserException());
}
return Mono.just(user);
})
Обратите внимание, что в обоих случаях мы возвращаем холодный поток, который имеет только один элемент. В Reactor есть несколько оптимизаций, которые повышают производительность в том случае, если возвращаемый поток является холодным скалярным потоком. Таким образом, рекомендуется использовать Flux/Mono concatMap
+ .just
, empty
, error
в результате, когда нам требуется более сложное отображение, которое может закончиться return null
или throw new ...
.
Внимание! Никогда не проверяйте входящий элемент на обнуляемость. Проект Reactor никогда не отправит вам значение
null
, поскольку это нарушает спецификацию Reactive Streams (см. Правило 2.13) Таким образом, в случае, еслиrepo.findById
возвращает ноль, Reactor выдаст исключение NullPointerException для вас.
Подождите, почему concatMap
лучше, чем flatMap
?
По своей сути, flatMap
предназначен для объединения элементов из нескольких подпотоков, которые выполняются одновременно. Это означает, что в flatMap должны быть асинхронные потоки, поэтому они могут обрабатывать данные в нескольких потоках или это могут быть несколько сетевых вызовов. Впоследствии такие ожидания сильно влияют на реализацию, поэтому flatMap
должен иметь возможность обрабатывать данные из нескольких потоков (Thread
) (означает использование параллельных структур данных), ставить элементы в очередь, если происходит слив из другого потока (означает дополнительные выделение памяти для Queue
для каждого подпотока) и не нарушать правила спецификации Reactive Streams (означает действительно сложную реализацию). Считая все эти факты и тот факт, что мы заменяем простую операцию map
(которая является синхронной) на более удобный способ генерирования исключения с использованием Flux/Mono.error
(который не изменяет синхронность выполнения), приводит к тому, что мы делаем нам не нужен такой сложный оператор, и мы можем использовать гораздо более простой concatMap
, который предназначен для асинхронной обработки одного потока за раз и имеет пару оптимизаций для обработки скалярного, холодного потока.
Выдает исключение, используя switchOnEmpty
Итак, еще один подход к созданию исключения, когда результат пуст, - это оператор switchOnEmpty
. Следующий код демонстрирует, как мы можем использовать этот подход:
Mono.just(userId)
.flatMap(repo::findById)
.switchIfEmpty(Mono.error(new UserNotFoundExeception()))
Как мы видим, в этом случае repo::findById
должен иметь Mono
из User
в качестве возвращаемого типа. Следовательно, если экземпляр User
не будет найден, поток результатов будет пустым. Таким образом, Reactor будет вызывать альтернативный Mono
, указанный как параметр switchIfEmpty
.
Оставь свое исключение как есть
Это может считаться менее читаемым кодом или плохой практикой, но вы можете бросить свое исключение как есть. Этот шаблон нарушает спецификацию Reactive Streams, но реактор поймает выброшенное для вас исключение и распространит его как сигнал onError
на ваш нисходящий поток
Takeaways
- Используйте оператор
.handle
для обеспечения сложной обработки элементов - Используйте
concatMap
+Mono.error
, когда нам нужно вызвать исключение во время отображения, но такой метод наиболее подходит для случаев асинхронной обработки элементов. - Используйте
flatMap
+Mono.error
, когда у нас уже естьflatMap
на месте Null
как тип возврата запрещен, поэтому вместоnull
в нисходящем потокеmap
вы получите неожиданноеonError
сNullPointerException
- Используйте
switchIfEmpty
во всех случаях, когда вам нужно отправить сигнал об ошибке, если результат вызова какой-либо конкретной функции завершился с пустым потоком