Angular 2 Наблюдается с несколькими подписчиками
У меня есть угловая услуга 2, которая извлекает данные из API. Эта служба имеет 3 подписчика (определенные в Компонентах), каждый из которых делает что-то еще с данными (разные графики)
Я замечаю, что я делаю три запроса GET API, а то, что я хочу достичь, - это один запрос и что подписчики будут делиться данными, которые я просматриваю в HOT и COLD, наблюдаемыми и пробовали.share() на наблюдаемый, но я все еще делаю 3 индивидуальных звонка
Обновление, добавление кода
обслуживание
import { Injectable } from '@angular/core';
import { Http, Response } from '@angular/http';
import {Observable} from 'rxjs/Rx';
// Import RxJs required methods
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/catch';
import { StationCompliance } from './model/StationCompliance';
@Injectable()
export class StationComplianceService {
private url = '/api/read/stations';
constructor(private http : Http) {
console.log('Started Station compliance service');
}
getStationCompliance() : Observable<StationCompliance []> {
return this.http.get(this.url)
.map((res:Response) => res.json())
.catch((error:any) => Observable.throw(error.json().error || 'Server Error'));
}
}
Компонент 1
import { Component, OnInit } from '@angular/core';
import { CHART_DIRECTIVES } from 'angular2-highcharts';
import { StationComplianceService } from '../station-compliance.service';
@Component({
selector: 'app-up-down-graph',
templateUrl: './up-down-graph.component.html',
styleUrls: ['./up-down-graph.component.css']
})
export class UpDownGraphComponent implements OnInit {
graphData;
errorMessage: string;
options;
constructor(private stationService : StationComplianceService) { }
ngOnInit() {
this.getStationTypes();
}
getStationTypes(){
this.stationService.getStationCompliance()
.subscribe(
graphData => {
this.graphData = graphData;
this.options = {
chart : {type: 'pie',
plotShadow: true
},
plotOptions : {
showInLegend: true
},
title : {text: 'Up and Down devices'},
series: [{
data: this.processStationType(this.graphData)
}]
}
},
error => this.errorMessage = <any>error
);
}
Другие два компонента почти одинаковы, они просто показывают другой график
Ответы
Ответ 1
Я столкнулся с подобной проблемой и решил ее, используя предложение Арана, ссылаясь на сообщение в блоге Cory Rylan Angular 2 Observable Data Services. Ключом для меня было использование BehaviorSubject
. Вот фрагменты кода, который в конечном итоге работал для меня.
Служба данных: Служба данных создает внутренний объект BehaviorSubject
для однократного кэширования данных при инициализации службы. Потребители используют метод subscribeToDataService()
для доступа к данным.
import { Injectable } from '@angular/core';
import { Http, Response } from '@angular/http';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import { Observable } from 'rxjs/Observable';
import { Data } from './data';
import { properties } from '../../properties';
@Injectable()
export class DataService {
allData: Data[] = new Array<Data>();
allData$: BehaviorSubject<Data[]>;
constructor(private http: Http) {
this.initializeDataService();
}
initializeDataService() {
if (!this.allData$) {
this.allData$ = <BehaviorSubject<Data[]>> new BehaviorSubject(new Array<Data>());
this.http.get(properties.DATA_API)
.map(this.extractData)
.catch(this.handleError)
.subscribe(
allData => {
this.allData = allData;
this.allData$.next(allData);
},
error => console.log("Error subscribing to DataService: " + error)
);
}
}
subscribeToDataService(): Observable<Data[]> {
return this.allData$.asObservable();
}
// other methods have been omitted
}
Компонент: Компоненты могут подписаться на услугу передачи данных при инициализации.
export class TestComponent implements OnInit {
allData$: Observable<Data[]>;
constructor(private dataService: DataService) {
}
ngOnInit() {
this.allData$ = this.dataService.subscribeToDataService();
}
}
Шаблон компонента: Шаблон может затем перебирать наблюдаемое по мере необходимости, используя асинхронный канал.
*ngFor="let data of allData$ | async"
Подписчики обновляются каждый раз, когда вызывается метод next()
объекта BehaviorSubject
в службе данных.
Ответ 2
Проблема, которую вы имеете в своем коде, заключается в том, что вы возвращаете новое наблюдаемое при каждом вызове функции. Это связано с тем, что http.get
создает новый Observable каждый раз, когда он вызывается. Способ решения проблемы может заключаться в том, чтобы хранить наблюдаемые (через закрытие) услуги, которые гарантируют, что все участники подписываются на одно и то же наблюдаемое. Это не идеальный код, но у меня была аналогичная проблема, и в настоящее время это решило мою проблему.
import { Injectable } from '@angular/core';
import { Http, Response } from '@angular/http';
import {Observable} from 'rxjs/Rx';
// Import RxJs required methods
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/catch';
import { StationCompliance } from './model/StationCompliance';
@Injectable()
export class StationComplianceService {
private url = '/api/read/stations';
constructor(private http : Http) {
console.log('Started Station compliance service');
}
private stationComplianceObservable: Rx.Observable<StationCompliance[]>;
getStationCompliance() : Observable<StationCompliance []> {
if(this.stationComplianceObservable){
return this.stationComplianceObservable;
}
this.stationComplianceObservable = this.http.get(this.url)
.debounce(1000)
.share()
.map((res:Response) => res.json())
.finally(function () { this.stationComplianceObservable = null})
.catch((error:any) => Observable.throw(error.json().error || 'Server Error'));
return this.stationComplianceObservable;
}
}
Ответ 3
вы можете создать службу реактивных данных и определить локальную переменную Observable, которая обновляется внутри, а абоненты могут обновлять себя. эта статья объясняет это правильно службами данных
Ответ 4
Решение сохраняется после создания наблюдаемого и делает его доступным (по умолчанию это не так). Таким образом, ваш сервис будет выглядеть так:
@Injectable()
export class StationComplianceService {
private stationCompliance: StationCompliance;
private stream: Observable<StationCompliance []>;
private url = '/api/read/stations';
constructor(private http : Http) {
console.log('Started Station compliance service');
}
getStationCompliance() : Observable<StationCompliance []> {
/** is remote value is already fetched, just return it as Observable */
if (this.stationComliance) {
return Observable.of(this.stationComliance);
}
/** otherwise if stream already created, prevent another stream creation (exactly your question */
if (this.stream) {
return this.stream;
}
/** otherwise run remote data fetching */
this.stream = this.http.get(this.url)
.map((res:Response) => res.json())
.catch((error:any) => Observable.throw(error.json().error || 'Server Error'))
.share(); /** and make the stream shareable (by default it is not) */
return this.stream;
}
}
Ответ 5
shareReplay
должен сделать это сейчас - "(...) ценно в ситуациях, когда вы знаете, что у вас будут поздние подписчики на поток, которому нужен доступ к ранее выпущенным значениям".
Ответ 6
Я знаю, что эта ветка старая, но принятый ответ мне очень помог, и я хотел бы добавить некоторые возможные улучшения, используя debounce, switchmap и взломанную глобальную систему уведомлений (для этого следует использовать ngrx: https://ngrx. IO/)
Предпосылка концепции заключается в том, что служба уведомлений может использоваться для передачи измененных всем другим службам, сообщая им о необходимости извлечения своих данных:
export class NotifyerService {
constructor() { }
notifyer: Subject<any> = new Subject
notifyAll() {
console.log("ALL NOTIFIED")
this.notifyer.next("GO")
}
}
Тема используется, потому что вызов .next(val) для субъекта передает данные всем слушателям
В сервисе для конкретного компонента (в вашем случае "DataService") вы можете управлять сбором данных и операцией кэширования:
export class GetDataService {
// cache the incoming data
cache: Subject<any> = new Subject
constructor(private http: HttpClient,
private notifyerService: NotifyerService) {
// subscribe to any notification from central message broker
this.notifyerService.notifyer.pipe(
// filtering can be used to perform different actions based on different notifications
filter(val => val == "GO"),
// prevent notification spam by debouncing
debounceTime(300),
// SUBSCRIBE to the output of getData, cancelling previous subscription
switchMap(() => this.getData())
).subscribe(res => {
console.log("expensive server request")
// save data in cache, notify listeners
this.cache.next(res)
})
}
// expensive function which gets data
getData(): Observable<any> {
return this.http.get<any>(BASE_URL);
}
}
Ключевая концепция в приведенном выше коде заключается в том, что вы устанавливаете объект кэша и обновляете его всякий раз, когда появляется уведомление. В конструкторе мы хотим передать все будущие уведомления через ряд операторов:
- Фильтрация может использоваться, если вы хотите обновить кэш только в том случае, если служба уведомлений выводит определенные данные (в данном случае "GO"). Если вы начнете это делать, почти наверняка будет использоваться ngrx.
- debounceTime предотвращает спам на вашем сервере с большим количеством запросов (если, скажем, уведомления основаны на вводе пользователя)
- switchMap - очень важный оператор для кэширования и изменения состояния. switchMap запускает функцию внутри (в данном случае, функцию getData) и подписывается на ее вывод. Это означает, что на выходе switchMap будет подписка на данные сервера.
- Еще одна важная особенность switchMap - это отмена предыдущих подписок. Это означает, что если ваш запрос к серверу не был возвращен до получения другого уведомления, он отменит старый и снова пингует сервер.
Теперь, после всего этого, данные сервера будут помещены в кеш методом .next(res).
Теперь все, что нужно сделать последнему компоненту, это прослушать кеш для обновлений и обработать соответствующим образом:
export class ButtonclickerComponent implements OnInit {
value: any;
constructor(private getDataService: GetDataService,
private notifyerService: NotifyerService) { }
ngOnInit() {
// listen to cache for updates
this.getDataService.cache.pipe(
// can do something specific to this component if have multiple subscriptions off same cache
map(x => x)
// subsc
).subscribe(x => { console.log(x); this.value = x.value })
}
onClick() {
// notify all components of state changes
this.notifyerService.notifyAll()
}
}
Концепция в действии:
Угловое приложение по нажатию кнопки
Ответ сервера