Паузный поток RxJS
У меня есть простой компонент с одной кнопкой, которая запускает и приостанавливает поток чисел, генерируемых таймером RxJS.
import { Component, OnInit } from '@angular/core';
import { BehaviorSubject, Observable, timer, merge } from 'rxjs';
import { filter, bufferToggle, windowToggle, mergeMap, mergeAll, share } from 'rxjs/operators';
@Component({
selector: 'my-app',
template: '<button (click)="toggle()">{{ (active$ | async) ? 'Pause' : 'Play' }}</button>',
styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
active$ = new BehaviorSubject<boolean>(true);
ngOnInit(): void {
const on$ = this.active$.pipe(filter(v => v));
const off$ = this.active$.pipe(filter(v => !v));
const stream$ = timer(500, 500).pipe(share());
const out$ = merge(
stream$.pipe(
bufferToggle(off$, () => on$),
mergeAll(),
),
stream$.pipe(
windowToggle(on$, () => off$),
mergeAll(),
),
);
out$.subscribe(v => console.log(v));
}
toggle(): void {
this.active$.next(!this.active$.value);
}
}
Это работает отлично.
Мне нужно добавить еще одну функцию.
Мне нужно автоматически приостановить поток на основе значения в потоке, удовлетворяющем условию.
Например, приостановить поток, если последнее значение кратно 5.
У вас есть идеи, как это сделать?
Вот работающий пример на stackblitz https://stackblitz.com/edit/angular-6hjznn
Ответы
Ответ 1
Можно либо расширить свой текущий подход bufferToggle/windowToggle, либо использовать собственную реализацию буфера.
1. Расширение подхода bufferToggle/windowToggle
Вы можете добавить собственный буфер (массив) в очередь операторов после bufferToggle
. Когда выходит bufferToggle
, добавьте эти значения в ваш пользовательский буфер. Затем принимайте значения из пользовательского буфера, пока определенный элемент в буфере не будет соответствовать условию остановки. Измените эти значения и приостановите поток.
Оператор pausable
выдаст значения, которые соответствуют условию остановки, а затем сразу же остановит поток. Он может быть адаптирован к вашим конкретным потребностям, например упрощенный с меньшими входными параметрами или share
может быть включен в pausable
.
export function pausable<T, O>(
on$: Observable<any>, // when on$ emits 'pausable' will emit values from the buffer and all incoming values
off$: Observable<O>, // when off$ emits 'pausable' will stop emitting and buffer incoming values
haltCondition: (value: T) => boolean, // if 'haltCondition' returns true for a value in the stream the stream will be paused
pause: () => void, // pauses the stream by triggering the given on$ and off$ observables
spread: boolean = true // if true values from the buffer will be emitted separately, if 'false' values from the buffer will be emitted in an array
) {
return (source: Observable<T>) => defer(() => { // defer is used so that each subscription gets its own buffer
let buffer: T[] = [];
return merge(
source.pipe(
bufferToggle(off$, () => on$),
tap(values => buffer = buffer.concat(values)), // append values to your custom buffer
map(_ => buffer.findIndex(haltCondition)), // find the index of the first element that matches the halt condition
tap(haltIndex => haltIndex >= 0 ? pause() : null), // pause the stream when a value matching the halt condition was found
map(haltIndex => buffer.splice(0, haltIndex === -1 ? customBuffer.length : haltIndex + 1)), // get all values from your custom buffer until a haltCondition is met
mergeMap(toEmit => spread ? from(toEmit) : toEmit.length > 0 ? of(toEmit) : EMPTY) // optional value spread (what your mergeAll did)
),
source.pipe(
windowToggle(on$, () => off$),
mergeMap(x => x),
tap(value => haltCondition(value) ? pause() : null), // pause the stream when an unbuffered value matches the halt condition
),
);
});
}
Usage
Usage
active$ = new BehaviorSubject<boolean>(true);
on$ = this.active$.pipe(filter(v => v));
off$ = this.active$.pipe(filter(v => !v));
interval(500).pipe(
share(),
pausable(on$, off$, v => this.active$.value && this.pauseOn(v), () => this.active$.next(false))
).subscribe(console.log);
pauseOn = (value: number) => value > 0 && value % 10 === 0
2. Полностью настраиваемый буфер
Учитывая, что оператор pausable
выше использует много входных параметров, я реализовал второй оператор, используя полностью настраиваемый буфер и только одну наблюдаемую входную информацию, которая включает или выключает буфер, аналогично подходу Брэндона.
Оператор bufferIf (Demo)
bufferIf
будет буферизовать входящие значения, когда данный condition
испускает true
и испускает все значения из буфера или пропускает новые значения, когда condition
равен false
.
export function bufferIf<T>(condition: Observable<boolean>) {
return (source: Observable<T>) => defer(() => {
const buffer: T[] = [];
let paused = false;
let sourceTerminated = false;
return merge( // add a custon streamId to values from the source and the condition so that they can be differentiated later on
source.pipe(map(v => [v, 0]), finalize(() => sourceTerminated = true)),
condition.pipe(map(v => [v, 1]))
).pipe( // add values from the source to the buffer or set the paused variable
tap(([value, streamId]) => streamId === 0 ? buffer.push(value as T) : paused = value as boolean),
switchMap(_ => new Observable<T>(s => {
setTimeout(() => { // map to a stream of values taken from the buffer, setTimeout is used so that a subscriber to the condition outside of this function gets the values in the correct order (also see Brandons answer & comments)
while (buffer.length > 0 && !paused) s.next(buffer.shift())
}, 0)
})), // complete the stream when the source terminated and the buffer is empty
takeWhile(_ => !sourceTerminated || buffer.length > 0, true)
);
})
}
Usage
Usage
active$ = new BehaviorSubject<boolean>(true);
interval(500).pipe(
bufferIf(this.active$.pipe(map(v => !v))),
tap(value => this.pauseOn(value) ? this.active$.next(false) : null)
).subscribe(console.log);
pauseOn = (value: number) => value > 0 && value % 10 === 0
Ответ 2
Здесь пользовательский оператор паузы, который просто накапливает значения в буфере, когда сигнал паузы имеет значение true
, и генерирует их один за другим, когда он false
.
Объедините его с простым оператором tap
чтобы переключить сигнал паузы субъекта поведения, когда значение достигает определенного условия, и у вас есть что-то, чтобы приостановить нажатие кнопки, а также сделать паузу, когда значение соответствует условию (кратное 12 в этом случае):
Вот оператор pause
:
function pause<T>(pauseSignal: Observable<boolean>) {
return (source: Observable<T>) => Observable.create(observer => {
const buffer = [];
let paused = false;
let error;
let isComplete = false;
function notify() {
while (!paused && buffer.length) {
const value = buffer.shift();
observer.next(value);
}
if (!buffer.length && error) {
observer.error(error);
}
if (!buffer.length && isComplete) {
observer.complete();
}
}
const subscription = pauseSignal.subscribe(
p => {
paused = !p;
setTimeout(notify, 0);
},
e => {
error = e;
setTimeout(notify, 0);
},
() => {});
subscription.add(source.subscribe(
v => {
buffer.push(v);
notify();
},
e => {
error = e;
notify();
},
() => {
isComplete = true;
notify();
}
));
return subscription;
});
}
Вот использование этого:
const CONDITION = x => (x > 0) && ((x % 12) === 0); // is multiple
this.active$ = new BehaviorSubject<boolean>(true);
const stream$ = timer(500, 500);
const out$ = stream$.pipe(
pause(this.active$),
tap(value => {
if (CONDITION(value)) {
this.active$.next(false);
}
}));
this.d = out$.subscribe(v => console.log(v));
И рабочий пример: https://stackblitz.com/edit/angular-bvxnbf
Ответ 3
Вот простой способ сделать это. Используйте timer()
как излучатель и увеличивайте счет отдельно. Это дает вам немного больше прямого контроля.
export class AppComponent implements OnInit {
active = true;
out$: Observable<number>;
count = 0;
ngOnInit(): void {
const stream$ = timer(500, 500);
this.out$ = stream$.pipe(
filter(v => this.active),
map(v => {
this.count += 1;
return this.count;
}),
tap(v => {
if (this.count % 5 === 0) {
this.active = false;
}
})
)
}
}
https://stackblitz.com/edit/angular-nzs7zh
Ответ 4
Я предполагаю, что желаемое поведение не связано с получением значений, которые таймер излучает сам по себе, и что вместо приостановки уведомлений для текущего потока (в вашем примере таймер продолжает работать, даже если мы не видим, чтобы значения были напечатано), это нормально, чтобы фактически перестать излучать, когда приостановлено.
Мое решение вдохновлено рецептом секундомера
Решение ниже использует две отдельные кнопки для воспроизведения и паузы, но вы можете настроить это по вкусу. Мы передаем кнопки (ViewChild) сервису в хуке ngAfterViewInit компонента, затем подписываемся на поток.
// pausable.component.ts
ngAfterViewInit() {
this.pausableService.initPausableStream(this.start.nativeElement, this.pause.nativeElement);
this.pausableService.counter$
.pipe(takeUntil(this.unsubscribe$)) // don't forget to unsubscribe :)
.subscribe((state: State) => {
console.log(state.value); // whatever you need
});
}
// pausable.service.ts
import { Injectable } from '@angular/core';
import { merge, fromEvent, Subject, interval, NEVER } from 'rxjs';
import { mapTo, startWith, scan, switchMap, tap, map } from 'rxjs/operators';
export interface State {
active: boolean;
value: number;
}
@Injectable({
providedIn: 'root'
})
export class PausableService {
public counter$;
constructor() { }
initPausableStream(start: HTMLElement, pause: HTMLElement) {
// convenience functions to map an element click to a result
const fromClick = (el: HTMLElement) => fromEvent(el, 'click');
const clickMapTo = (el: HTMLElement, obj: {}) => fromClick(el).pipe(mapTo(obj));
const pauseByCondition$ = new Subject();
const pauseCondition = (state: State): boolean => state.value % 5 === 0 && state.value !== 0;
// define the events that may trigger a change
const events$ = merge(
clickMapTo(start, { active: true }),
clickMapTo(pause, { active: false }),
pauseByCondition$.pipe(mapTo({ active: false }))
);
// switch the counter stream based on events
this.counter$ = events$.pipe(
startWith({ active: true, value: 0 }),
scan((state: State, curr) => ({ ...state, ...curr }), {}),
switchMap((state: State) => state.active
? interval(500).pipe(
tap(_ => ++state.value),
map(_ => state))
: NEVER),
tap((state: State) => {
if (pauseCondition(state)) {
pauseByCondition$.next(); // trigger pause
}
})
);
}
}
Ответ 5
Как можно проще с одним windowToggle
и использовать рабочий пример active.next(false): https://stackblitz.com/edit/angular-pdw7kw
defer(() => {
let count = 0;
return stream$.pipe(
windowToggle(on$, () => off$),
exhaustMap(obs => obs),
mergeMap(_ => {
if ((++count) % 5 === 0) {
this.active$.next(false)
return never()
}
return of(count)
}),
)
}).subscribe(console.log)
Ответ 6
Ваш пример на самом деле удивительно близок к рабочему решению, нет необходимости в новых пользовательских операторах.
Смотрите раздел "Буферизация" здесь:
https://medium.com/@kddsky/pauseable-observables-in-rxjs-58ce2b8c7dfd
И рабочий пример здесь:
https://thinkrx.io/gist/cef1572743cbf3f46105ec2ba56228cd
Он использует тот же подход, который у вас уже есть, с bufferToggle
и windowToggle
, похоже, главное отличие в том, что вам нужно share
ваша пауза/активная subject-