Split Rx Наблюдается в несколько потоков и обрабатывается индивидуально
Вот фотография того, что я пытаюсь выполнить.
- а-б-с-а - БББ - а
разбивается на
- a ----- a ------- a → поток
---- b ------ bbb --- → b поток
------ c ---------- → c поток
Тогда, уметь
a.subscribe()
b.subscribe()
c.subscribe()
До сих пор все, что я нашел, разделило поток с помощью groupBy(), но затем свернуло все обратно в один поток и обработало их все в одной и той же функции. То, что я хочу сделать, это обрабатывать каждый производный поток по-другому.
То, как я делаю это сейчас, - это набор фильтров. Есть ли лучший способ сделать это?
Ответы
Ответ 1
Вам не нужно сворачивать Observables
с groupBy
. Вы можете подписаться на них.
Что-то вроде этого:
String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"};
Action1<String> a = s -> System.out.print("-a-");
Action1<String> b = s -> System.out.print("-b-");
Action1<String> c = s -> System.out.print("-c-");
Observable
.from(inputs)
.groupBy(s -> s)
.subscribe((g) -> {
if ("a".equals(g.getKey())) {
g.subscribe(a);
}
if ("b".equals(g.getKey())) {
g.subscribe(b);
}
if ("c".equals(g.getKey())) {
g.subscribe(c);
}
});
Если утверждения выглядят кажущимися уродливыми, но по крайней мере вы можете обрабатывать каждый поток отдельно. Возможно, есть способ избежать их.
Ответ 2
Просто как пирог, просто используйте filter
Пример в scala
import rx.lang.scala.Observable
val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a")
val hotO: Observable[String] = o.share
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a")
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b")
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c")
aSource.subscribe(o ⇒ println("A: " + o), println, () ⇒ println("A Completed"))
bSource.subscribe(o ⇒ println("B: " + o), println, () ⇒ println("B Completed"))
cSource.subscribe(o ⇒ println("C: " + o), println, () ⇒ println("C Completed"))
Вам просто нужно убедиться, что источник, наблюдаемый, горячий. Самый простой способ - share
it.