Подсоединение websocket в Angular и rxjs?
У меня есть приложение на основе ngrx/store (v2.2.2) и rxjs (v5.1.0), которое слушает веб-сокет для входящих данных с помощью наблюдаемого. Когда я запускаю приложение, я получаю поступающие данные безупречно.
Однако через некоторое время (обновления появляются довольно редко) соединение, похоже, затеряется, и я не получаю больше входящих данных. Мой код:
Сервис
import { Injectable, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Injectable()
export class MemberService implements OnInit {
private websocket: any;
private destination: string = "wss://notessensei.mybluemix.net/ws/time";
constructor() { }
ngOnInit() { }
listenToTheSocket(): Observable<any> {
this.websocket = new WebSocket(this.destination);
this.websocket.onopen = () => {
console.log("WebService Connected to " + this.destination);
}
return Observable.create(observer => {
this.websocket.onmessage = (evt) => {
observer.next(evt);
};
})
.map(res => res.data)
.share();
}
}
Абонент
export class AppComponent implements OnInit {
constructor(/*private store: Store<fromRoot.State>,*/ private memberService: MemberService) {}
ngOnInit() {
this.memberService.listenToTheSocket().subscribe((result: any) => {
try {
console.log(result);
// const member: any = JSON.parse(result);
// this.store.dispatch(new MemberActions.AddMember(member));
} catch (ex) {
console.log(JSON.stringify(ex));
}
})
}
}
Что мне нужно сделать, чтобы повторно подключить веб-сокет, когда он истечет, поэтому наблюдаемый продолжает выдавать входящие значения?
Я посмотрел на некоторые Q & A здесь, здесь и здесь, и он, похоже, не рассматривал этот вопрос (каким-то образом я мог бы понять).
Примечание: websocket в wss://notessensei.mybluemix.net/ws/time
является живым и генерирует отметку времени один раз в минуту (в случае, если кто-то хочет проверить это).
Совет приветствуется!
Ответы
Ответ 1
На самом деле там теперь есть WebsocketSubject в rxjs!
import { webSocket } from 'rxjs/webSocket' // for RxJS 6, for v5 use Observable.webSocket
let subject = webSocket('ws://localhost:8081');
subject.subscribe(
(msg) => console.log('message received: ' + msg),
(err) => console.log(err),
() => console.log('complete')
);
subject.next(JSON.stringify({ op: 'hello' }));
Он выполняет переподключение при повторной подписке на разорванное соединение. Так, например, напишите это, чтобы восстановить соединение:
subject.retry().subscribe(...)
Смотрите документацию для получения дополнительной информации. К сожалению, в окне поиска не отображается метод, но вы найдете его здесь:
http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-webSocket
что # -navigation не работает в моем браузере, поэтому поищите "webSocket" на этой странице.
Источник: http://reactivex.io/rxjs/file/es6/observable/dom/WebSocketSubject.js.html#lineNumber15
Ответ 2
Это может быть не очень хороший ответ, но это слишком много для комментария.
Возможно, проблема связана с вашим сервисом:
listenToTheSocket(): Observable<any> {
this.websocket = new WebSocket(this.destination);
this.websocket.onopen = () => {
console.log("WebService Connected to " + this.destination);
}
return Observable.create(observer => {
this.websocket.onmessage = (evt) => {
observer.next(evt);
};
})
.map(res => res.data)
.share();
}
Считаете ли вы, что в своем компоненте вы несколько раз переходите к методу ngOnInit
?
Вы должны попытаться вставить console.log
в ngOnInit
, чтобы быть уверенным.
Потому что если вы сделаете это, в вашем сервисе вы переопределите this.websocket
новым.
Вместо этого вы должны попробовать что-то подобное:
@Injectable()
export class MemberService implements OnInit {
private websocket: any;
private websocketSubject$ = new BehaviorSubject<any>();
private websocket$ = this.websocketSubject$.asObservable();
private destination = 'wss://notessensei.mybluemix.net/ws/time';
constructor() { }
ngOnInit() { }
listenToTheSocket(): Observable<any> {
if (this.websocket) {
return this.websocket$;
}
this.websocket = new WebSocket(this.destination);
this.websocket.onopen = () => console.log('WebService Connected to ${this.destination}');
this.websocket.onmessage = (res) => this.websocketSubject$.next(res => res.data);
}
}
BehaviorSubject
отправит последнее значение, если оно получит событие, прежде чем вы подпишетесь на него. Кроме того, как предмет, нет необходимости использовать оператор share
.
Ответ 3
Для rxjs 6 - реализация websocket.
import { webSocket } from 'rxjs/websocket';
let subject = webSocket('ws://localhost:8081');