Паузный поток RxJS

У меня есть простой компонент с одной кнопкой, которая запускает и приостанавливает поток чисел, генерируемых таймером RxJS.

import { Component, OnInit } from '@angular/core';
import { BehaviorSubject, Observable, timer, merge } from 'rxjs';
import { filter, bufferToggle, windowToggle, mergeMap, mergeAll, share } from 'rxjs/operators';

@Component({
  selector: 'my-app',
  template: '<button (click)="toggle()">{{ (active$ | async) ? 'Pause' : 'Play' }}</button>',
  styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
  active$ = new BehaviorSubject<boolean>(true);

  ngOnInit(): void {
    const on$ = this.active$.pipe(filter(v => v));
    const off$ = this.active$.pipe(filter(v => !v));

    const stream$ = timer(500, 500).pipe(share());

    const out$ = merge(
      stream$.pipe(
        bufferToggle(off$, () => on$),
        mergeAll(),
      ),
      stream$.pipe(
        windowToggle(on$, () => off$),
        mergeAll(),
      ),
    );

    out$.subscribe(v => console.log(v));
  }

  toggle(): void {
    this.active$.next(!this.active$.value);
  }
}

Это работает отлично.

enter image description here

Мне нужно добавить еще одну функцию.

Мне нужно автоматически приостановить поток на основе значения в потоке, удовлетворяющем условию.

Например, приостановить поток, если последнее значение кратно 5.


У вас есть идеи, как это сделать?

Вот работающий пример на stackblitz https://stackblitz.com/edit/angular-6hjznn

Ответы

Ответ 1

Можно либо расширить свой текущий подход bufferToggle/windowToggle, либо использовать собственную реализацию буфера.

1. Расширение подхода bufferToggle/windowToggle

Вы можете добавить собственный буфер (массив) в очередь операторов после bufferToggle. Когда выходит bufferToggle, добавьте эти значения в ваш пользовательский буфер. Затем принимайте значения из пользовательского буфера, пока определенный элемент в буфере не будет соответствовать условию остановки. Измените эти значения и приостановите поток.

Оператор паузы (Демонстрация)

Оператор pausable выдаст значения, которые соответствуют условию остановки, а затем сразу же остановит поток. Он может быть адаптирован к вашим конкретным потребностям, например упрощенный с меньшими входными параметрами или share может быть включен в pausable.

export function pausable<T, O>(
  on$: Observable<any>, // when on$ emits 'pausable' will emit values from the buffer and all incoming values 
  off$: Observable<O>, // when off$ emits 'pausable' will stop emitting and buffer incoming values
  haltCondition: (value: T) => boolean, // if 'haltCondition' returns true for a value in the stream the stream will be paused
  pause: () => void, // pauses the stream by triggering the given on$ and off$ observables
  spread: boolean = true // if true values from the buffer will be emitted separately, if 'false' values from the buffer will be emitted in an array
) {
  return (source: Observable<T>) => defer(() => { // defer is used so that each subscription gets its own buffer
    let buffer: T[] = [];
    return merge(
      source.pipe(
        bufferToggle(off$, () => on$),
        tap(values => buffer = buffer.concat(values)), // append values to your custom buffer
        map(_ => buffer.findIndex(haltCondition)), // find the index of the first element that matches the halt condition
        tap(haltIndex => haltIndex >= 0 ? pause() : null), // pause the stream when a value matching the halt condition was found
        map(haltIndex => buffer.splice(0, haltIndex === -1 ? customBuffer.length : haltIndex + 1)), // get all values from your custom buffer until a haltCondition is met
        mergeMap(toEmit => spread ? from(toEmit) : toEmit.length > 0 ? of(toEmit) : EMPTY) // optional value spread (what your mergeAll did)
      ),
      source.pipe(
        windowToggle(on$, () => off$),
        mergeMap(x => x),
        tap(value => haltCondition(value) ? pause() : null), // pause the stream when an unbuffered value matches the halt condition
      ),
    );
  });
}

Usage

Usage
active$ = new BehaviorSubject<boolean>(true);
on$ = this.active$.pipe(filter(v => v));
off$ = this.active$.pipe(filter(v => !v));

interval(500).pipe(
  share(),
  pausable(on$, off$, v => this.active$.value && this.pauseOn(v), () => this.active$.next(false))
).subscribe(console.log);

pauseOn = (value: number) => value > 0 && value % 10 === 0

2. Полностью настраиваемый буфер

Учитывая, что оператор pausable выше использует много входных параметров, я реализовал второй оператор, используя полностью настраиваемый буфер и только одну наблюдаемую входную информацию, которая включает или выключает буфер, аналогично подходу Брэндона.

Оператор bufferIf (Demo)

bufferIf будет буферизовать входящие значения, когда данный condition испускает true и испускает все значения из буфера или пропускает новые значения, когда condition равен false.

export function bufferIf<T>(condition: Observable<boolean>) {
  return (source: Observable<T>) => defer(() => {
    const buffer: T[] = [];
    let paused = false;
    let sourceTerminated = false;
    return merge( // add a custon streamId to values from the source and the condition so that they can be differentiated later on
      source.pipe(map(v => [v, 0]), finalize(() => sourceTerminated = true)),
      condition.pipe(map(v => [v, 1]))
    ).pipe( // add values from the source to the buffer or set the paused variable
      tap(([value, streamId]) => streamId === 0 ? buffer.push(value as T) : paused = value as boolean), 
      switchMap(_ => new Observable<T>(s => {
        setTimeout(() => { // map to a stream of values taken from the buffer, setTimeout is used so that a subscriber to the condition outside of this function gets the values in the correct order (also see Brandons answer & comments)
          while (buffer.length > 0 && !paused) s.next(buffer.shift())
        }, 0)
      })), // complete the stream when the source terminated and the buffer is empty
      takeWhile(_ => !sourceTerminated || buffer.length > 0, true) 
    );
  })
} 

Usage

Usage
active$ = new BehaviorSubject<boolean>(true);

interval(500).pipe(
  bufferIf(this.active$.pipe(map(v => !v))),
  tap(value => this.pauseOn(value) ? this.active$.next(false) : null)
).subscribe(console.log);

pauseOn = (value: number) => value > 0 && value % 10 === 0

Ответ 2

Здесь пользовательский оператор паузы, который просто накапливает значения в буфере, когда сигнал паузы имеет значение true, и генерирует их один за другим, когда он false.

Объедините его с простым оператором tap чтобы переключить сигнал паузы субъекта поведения, когда значение достигает определенного условия, и у вас есть что-то, чтобы приостановить нажатие кнопки, а также сделать паузу, когда значение соответствует условию (кратное 12 в этом случае):

Вот оператор pause:

function pause<T>(pauseSignal: Observable<boolean>) {
  return (source: Observable<T>) => Observable.create(observer => {
    const buffer = [];
    let paused = false;
    let error;
    let isComplete = false;

    function notify() {
      while (!paused && buffer.length) {
        const value = buffer.shift();
        observer.next(value);
      }

      if (!buffer.length && error) {
        observer.error(error);
      }

      if (!buffer.length && isComplete) {
        observer.complete();
      }
    }

    const subscription = pauseSignal.subscribe(
      p => {
        paused = !p;
        setTimeout(notify, 0);
      },
      e => {
        error = e;
        setTimeout(notify, 0);
      },
      () => {});

    subscription.add(source.subscribe(
      v => {
        buffer.push(v);
        notify();
      },
      e => {
        error = e;
        notify();
      },
      () => {
        isComplete = true;
        notify();
      }
    ));

    return subscription;
  });
}

Вот использование этого:

const CONDITION = x => (x > 0) && ((x % 12) === 0); // is multiple
this.active$ = new BehaviorSubject<boolean>(true);
const stream$ = timer(500, 500);
const out$ = stream$.pipe(
  pause(this.active$),
  tap(value => {
    if (CONDITION(value)) {
      this.active$.next(false);
    }
  }));

this.d = out$.subscribe(v => console.log(v));

И рабочий пример: https://stackblitz.com/edit/angular-bvxnbf

Ответ 3

Вот простой способ сделать это. Используйте timer() как излучатель и увеличивайте счет отдельно. Это дает вам немного больше прямого контроля.

export class AppComponent implements OnInit {
  active = true;
  out$: Observable<number>;

  count = 0;

  ngOnInit(): void {

    const stream$ = timer(500, 500);

    this.out$ = stream$.pipe(
      filter(v => this.active),
      map(v => {
        this.count += 1;
        return this.count;
      }),
      tap(v => {
        if (this.count % 5 === 0) {
          this.active = false;
        }
      })
    )
  }

}

https://stackblitz.com/edit/angular-nzs7zh

Ответ 4

Я предполагаю, что желаемое поведение не связано с получением значений, которые таймер излучает сам по себе, и что вместо приостановки уведомлений для текущего потока (в вашем примере таймер продолжает работать, даже если мы не видим, чтобы значения были напечатано), это нормально, чтобы фактически перестать излучать, когда приостановлено.

Мое решение вдохновлено рецептом секундомера

Решение ниже использует две отдельные кнопки для воспроизведения и паузы, но вы можете настроить это по вкусу. Мы передаем кнопки (ViewChild) сервису в хуке ngAfterViewInit компонента, затем подписываемся на поток.

// pausable.component.ts
  ngAfterViewInit() {
    this.pausableService.initPausableStream(this.start.nativeElement, this.pause.nativeElement);

    this.pausableService.counter$
      .pipe(takeUntil(this.unsubscribe$)) // don't forget to unsubscribe :)
      .subscribe((state: State) => {
        console.log(state.value); // whatever you need
    });
  }
// pausable.service.ts
import { Injectable } from '@angular/core';

import { merge, fromEvent, Subject, interval, NEVER } from 'rxjs';
import { mapTo, startWith, scan, switchMap, tap, map } from 'rxjs/operators';

export interface State {
  active: boolean;
  value: number;
}

@Injectable({
  providedIn: 'root'
})
export class PausableService {

  public counter$;

  constructor() { }

  initPausableStream(start: HTMLElement, pause: HTMLElement) {

    // convenience functions to map an element click to a result
    const fromClick = (el: HTMLElement) => fromEvent(el, 'click');
    const clickMapTo = (el: HTMLElement, obj: {}) => fromClick(el).pipe(mapTo(obj));

    const pauseByCondition$ = new Subject();
    const pauseCondition = (state: State): boolean => state.value % 5 === 0 && state.value !== 0;

    // define the events that may trigger a change
    const events$ = merge(
      clickMapTo(start, { active: true }),
      clickMapTo(pause, { active: false }),
      pauseByCondition$.pipe(mapTo({ active: false }))
    );

    // switch the counter stream based on events
    this.counter$ = events$.pipe(
      startWith({ active: true, value: 0 }),
      scan((state: State, curr) => ({ ...state, ...curr }), {}),
      switchMap((state: State) => state.active
        ? interval(500).pipe(
          tap(_ => ++state.value),
          map(_ => state))
        : NEVER),
      tap((state: State) => {
        if (pauseCondition(state)) {
          pauseByCondition$.next(); // trigger pause
        }
      })
    );
  }

}

Ответ 5

Как можно проще с одним windowToggle и использовать рабочий пример active.next(false): https://stackblitz.com/edit/angular-pdw7kw

 defer(() => {
      let count = 0;
      return stream$.pipe(
        windowToggle(on$, () => off$),
        exhaustMap(obs => obs),
        mergeMap(_ => {
          if ((++count) % 5 === 0) {
            this.active$.next(false)
            return never()
          }
          return of(count)
        }),
      )
    }).subscribe(console.log)

Ответ 6

Ваш пример на самом деле удивительно близок к рабочему решению, нет необходимости в новых пользовательских операторах.

Смотрите раздел "Буферизация" здесь:

https://medium.com/@kddsky/pauseable-observables-in-rxjs-58ce2b8c7dfd

И рабочий пример здесь:

https://thinkrx.io/gist/cef1572743cbf3f46105ec2ba56228cd

Он использует тот же подход, который у вас уже есть, с bufferToggle и windowToggle, похоже, главное отличие в том, что вам нужно share ваша пауза/активная subject-