Angular 2 Наблюдается с несколькими подписчиками

У меня есть угловая услуга 2, которая извлекает данные из API. Эта служба имеет 3 подписчика (определенные в Компонентах), каждый из которых делает что-то еще с данными (разные графики)

Я замечаю, что я делаю три запроса GET API, а то, что я хочу достичь, - это один запрос и что подписчики будут делиться данными, которые я просматриваю в HOT и COLD, наблюдаемыми и пробовали.share() на наблюдаемый, но я все еще делаю 3 индивидуальных звонка

Обновление, добавление кода

обслуживание

import { Injectable } from '@angular/core';
import { Http, Response } from '@angular/http';

import {Observable} from 'rxjs/Rx';

// Import RxJs required methods
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/catch';

import { StationCompliance } from './model/StationCompliance';


@Injectable()
export class StationComplianceService {

  private url = '/api/read/stations';

  constructor(private http : Http) {
    console.log('Started Station compliance service');
   }

   getStationCompliance() : Observable<StationCompliance []> {
     return this.http.get(this.url)
      .map((res:Response) => res.json())
      .catch((error:any) => Observable.throw(error.json().error || 'Server Error'));
   }
}

Компонент 1

import { Component, OnInit } from '@angular/core';
import { CHART_DIRECTIVES } from 'angular2-highcharts';

import { StationComplianceService } from '../station-compliance.service';


@Component({
  selector: 'app-up-down-graph',
  templateUrl: './up-down-graph.component.html',
  styleUrls: ['./up-down-graph.component.css']
})
export class UpDownGraphComponent implements OnInit {

  graphData;

  errorMessage: string;

  options;

  constructor(private stationService : StationComplianceService) { }

  ngOnInit() {
    this.getStationTypes();
  }

  getStationTypes(){
    this.stationService.getStationCompliance()
      .subscribe(
        graphData => {
          this.graphData = graphData;
          this.options = {
            chart : {type: 'pie',
                    plotShadow: true
            },
            plotOptions : {
              showInLegend: true
            },
            title : {text: 'Up and Down devices'},
            series: [{
              data: this.processStationType(this.graphData)
            }]
          }
        },
        error => this.errorMessage = <any>error
      );
  }

Другие два компонента почти одинаковы, они просто показывают другой график

Ответы

Ответ 1

Я столкнулся с подобной проблемой и решил ее, используя предложение Арана, ссылаясь на сообщение в блоге Cory Rylan Angular 2 Observable Data Services. Ключом для меня было использование BehaviorSubject. Вот фрагменты кода, который в конечном итоге работал для меня.

Служба данных:

Служба данных создает внутренний объект BehaviorSubject для однократного кэширования данных при инициализации службы. Потребители используют метод subscribeToDataService() для доступа к данным.

    import { Injectable } from '@angular/core';
    import { Http, Response } from '@angular/http';

    import { BehaviorSubject } from 'rxjs/BehaviorSubject';
    import { Observable } from 'rxjs/Observable';

    import { Data } from './data';
    import { properties } from '../../properties';

    @Injectable()
    export class DataService {
      allData: Data[] = new Array<Data>();
      allData$: BehaviorSubject<Data[]>;

      constructor(private http: Http) {
        this.initializeDataService();
      }

      initializeDataService() {
        if (!this.allData$) {
          this.allData$ = <BehaviorSubject<Data[]>> new BehaviorSubject(new Array<Data>());

          this.http.get(properties.DATA_API)
            .map(this.extractData)
            .catch(this.handleError)
            .subscribe(
              allData => {
                this.allData = allData;
                this.allData$.next(allData);
              },
              error => console.log("Error subscribing to DataService: " + error)
            );
        }
      }

      subscribeToDataService(): Observable<Data[]> {
        return this.allData$.asObservable();
      }

      // other methods have been omitted

    }
Компонент:

Компоненты могут подписаться на услугу передачи данных при инициализации.

    export class TestComponent implements OnInit {
      allData$: Observable<Data[]>;

      constructor(private dataService: DataService) {
      }

      ngOnInit() {
        this.allData$ = this.dataService.subscribeToDataService();
      }

    }
Шаблон компонента:

Шаблон может затем перебирать наблюдаемое по мере необходимости, используя асинхронный канал.

    *ngFor="let data of allData$ | async" 

Подписчики обновляются каждый раз, когда вызывается метод next() объекта BehaviorSubject в службе данных.

Ответ 2

Проблема, которую вы имеете в своем коде, заключается в том, что вы возвращаете новое наблюдаемое при каждом вызове функции. Это связано с тем, что http.get создает новый Observable каждый раз, когда он вызывается. Способ решения проблемы может заключаться в том, чтобы хранить наблюдаемые (через закрытие) услуги, которые гарантируют, что все участники подписываются на одно и то же наблюдаемое. Это не идеальный код, но у меня была аналогичная проблема, и в настоящее время это решило мою проблему.

import { Injectable } from '@angular/core';
import { Http, Response } from '@angular/http';

import {Observable} from 'rxjs/Rx';

// Import RxJs required methods
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/catch';

import { StationCompliance } from './model/StationCompliance';


@Injectable()
export class StationComplianceService {

  private url = '/api/read/stations';

  constructor(private http : Http) {
    console.log('Started Station compliance service');
   }

   private stationComplianceObservable: Rx.Observable<StationCompliance[]>;


   getStationCompliance() : Observable<StationCompliance []> {

    if(this.stationComplianceObservable){
        return this.stationComplianceObservable;
    }        

      this.stationComplianceObservable = this.http.get(this.url)
      .debounce(1000)
      .share()
      .map((res:Response) => res.json())
      .finally(function () { this.stationComplianceObservable = null}) 
      .catch((error:any) => Observable.throw(error.json().error || 'Server Error'));

    return this.stationComplianceObservable;
   }
}

Ответ 3

вы можете создать службу реактивных данных и определить локальную переменную Observable, которая обновляется внутри, а абоненты могут обновлять себя. эта статья объясняет это правильно службами данных

Ответ 4

Решение сохраняется после создания наблюдаемого и делает его доступным (по умолчанию это не так). Таким образом, ваш сервис будет выглядеть так:

@Injectable()
export class StationComplianceService {

  private stationCompliance: StationCompliance;
  private stream: Observable<StationCompliance []>;
  private url = '/api/read/stations';

  constructor(private http : Http) {
    console.log('Started Station compliance service');
   }

   getStationCompliance() : Observable<StationCompliance []> {
     /** is remote value is already fetched, just return it as Observable */
     if (this.stationComliance) {
       return Observable.of(this.stationComliance);
     }
     /** otherwise if stream already created, prevent another stream creation (exactly your question */
     if (this.stream) {
       return this.stream;
     }
     /** otherwise run remote data fetching */
     this.stream = this.http.get(this.url)
      .map((res:Response) => res.json())
      .catch((error:any) => Observable.throw(error.json().error || 'Server Error'))
      .share(); /** and make the stream shareable (by default it is not) */
     return this.stream;
   }
}

Ответ 6

Я знаю, что эта ветка старая, но принятый ответ мне очень помог, и я хотел бы добавить некоторые возможные улучшения, используя debounce, switchmap и взломанную глобальную систему уведомлений (для этого следует использовать ngrx: https://ngrx. IO/)

Предпосылка концепции заключается в том, что служба уведомлений может использоваться для передачи измененных всем другим службам, сообщая им о необходимости извлечения своих данных:

export class NotifyerService {

  constructor() { }

  notifyer: Subject<any> = new Subject

  notifyAll() {
    console.log("ALL NOTIFIED")
    this.notifyer.next("GO")
  }

}

Тема используется, потому что вызов .next(val) для субъекта передает данные всем слушателям

В сервисе для конкретного компонента (в вашем случае "DataService") вы можете управлять сбором данных и операцией кэширования:

export class GetDataService {

  // cache the incoming data
  cache: Subject<any> = new Subject

  constructor(private http: HttpClient,
    private notifyerService: NotifyerService) {

    // subscribe to any notification from central message broker
    this.notifyerService.notifyer.pipe(

      // filtering can be used to perform different actions based on different notifications
      filter(val => val == "GO"),

      // prevent notification spam by debouncing
      debounceTime(300),

      // SUBSCRIBE to the output of getData, cancelling previous subscription
      switchMap(() => this.getData())

    ).subscribe(res => {

      console.log("expensive server request")

      // save data in cache, notify listeners
      this.cache.next(res)
    })

  }

  // expensive function which gets data
  getData(): Observable<any> {
    return this.http.get<any>(BASE_URL);
  }

} 

Ключевая концепция в приведенном выше коде заключается в том, что вы устанавливаете объект кэша и обновляете его всякий раз, когда появляется уведомление. В конструкторе мы хотим передать все будущие уведомления через ряд операторов:

  • Фильтрация может использоваться, если вы хотите обновить кэш только в том случае, если служба уведомлений выводит определенные данные (в данном случае "GO"). Если вы начнете это делать, почти наверняка будет использоваться ngrx.
  • debounceTime предотвращает спам на вашем сервере с большим количеством запросов (если, скажем, уведомления основаны на вводе пользователя)
  • switchMap - очень важный оператор для кэширования и изменения состояния. switchMap запускает функцию внутри (в данном случае, функцию getData) и подписывается на ее вывод. Это означает, что на выходе switchMap будет подписка на данные сервера.
  • Еще одна важная особенность switchMap - это отмена предыдущих подписок. Это означает, что если ваш запрос к серверу не был возвращен до получения другого уведомления, он отменит старый и снова пингует сервер.

Теперь, после всего этого, данные сервера будут помещены в кеш методом .next(res).

Теперь все, что нужно сделать последнему компоненту, это прослушать кеш для обновлений и обработать соответствующим образом:

export class ButtonclickerComponent implements OnInit {

  value: any;

  constructor(private getDataService: GetDataService,
    private notifyerService: NotifyerService) { }

  ngOnInit() {

    // listen to cache for updates
    this.getDataService.cache.pipe(

      // can do something specific to this component if have multiple subscriptions off same cache
      map(x => x)

      // subsc
    ).subscribe(x => { console.log(x); this.value = x.value })

  }

  onClick() {

    // notify all components of state changes
    this.notifyerService.notifyAll()
  }

}

Концепция в действии:

Угловое приложение по нажатию кнопки

Ответ сервера