我正在使用angular 2构建一个web应用程序,我想让多个组件监听同一个服务。这个服务返回一个可观察对象,它返回来自websocket的传入数据。我根据这个例子编写了代码。
当前的问题是:数据从主组件通过服务发送到服务器(使用websockets),然后返回数据。然而,只有home.component中的观察者会被调用(id: room)。创建和数据),而不是导航栏中的那个。
谁能告诉我为什么不是两个都叫?我还尝试添加消息$。订阅app.component,但无济于事。
现在,让我们开始代码。
返回一个可观察对象的消息服务。组件使用此服务发送和接收来自。
@Injectable()
export class MessageService {
private _messages: Rx.Subject<Message>;
messages$: Rx.Observable<Message>;
constructor(wsService: SocketService, private configuration: Configuration) {
console.log('messag eservice');
this._messages = <Rx.Subject<Message>>wsService
.connect()
.map((response: MessageEvent): Message => {
let data = JSON.parse(response.data);
return {
id: data.id,
data: data.data,
}
});
this.messages$ = this._messages.asObservable();
}
public send(message: Message): void {
this._messages.next(message);
}
}
一个套接字服务,它创建一个websocket连接,并将自己绑定到这个套接字的输入和输出。
import { Injectable } from '@angular/core';
import * as Rx from "rxjs/Rx";
import { Configuration } from '../app.constants';
@Injectable()
export class SocketService {
private subject: Rx.Subject<MessageEvent>;
constructor(private configuration: Configuration){};
public connect(wsNamespace = ''): Rx.Subject<MessageEvent> {
var url = this.configuration.wsUrl + wsNamespace;
if(!this.subject) {
this.subject = this.create(url);
}
return this.subject;
}
private create(url): Rx.Subject<MessageEvent> {
let ws = new WebSocket(url);
// bind ws events to observable (streams)
let observable = Rx.Observable.create((obs: Rx.Observer<MessageEvent>) => {
ws.onmessage = obs.next.bind(obs);
ws.onerror = obs.error.bind(obs);
ws.onclose = obs.complete.bind(obs);
return ws.close.bind(ws);
});
// on obs next (send something in the stream) send it using ws.
let observer = {
next: (data: Object) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(data));
}
},
};
return Rx.Subject.create(observer, observable);
}
}
包含以下提供商的应用组件:
providers: [MessageService, SocketService, Configuration, AuthService]
我在主app.component中实例化提供商,以确保消息和套接字服务不会实例化两次。
我的home.component是这样的(这是一个使用路由加载的页面):
import { Component, OnInit } from '@angular/core';
import { Subject } from 'rxjs';
import { Router } from '@angular/router';
import { MessageService } from '../../services/message.service';
@Component({
...
providers: []
})
export class HomeComponent implements OnInit {
constructor(private router: Router, private messageService: MessageService) {}
ngOnInit(): void {
this.messageService.send({
id: 'room.create',
data: {'name': 'Blaat'}
});
this.messageService.messages$.subscribe(msg => {
console.log(msg);
if(msg.id == 'room.created') {
// navigate naar games!
}
});
}
}
我的导航栏组件看起来像这样(指令):
import { Component, OnInit } from '@angular/core';
import { MessageService } from '../../services/message.service';
@Component({
moduleId: module.id,
selector: 'navbar',
templateUrl: 'navbar.component.html',
styleUrls: ['navbar.component.css']
})
export class Navbar implements OnInit {
constructor(private messageService: MessageService) { }
ngOnInit() {
this.messageService.messages$.subscribe(msg => {
console.log(msg);
if(msg.id == 'room.created') {
// navigate naar games!
}
});
}
}
看起来你的observable create函数被多次调用,很可能是两个组件=>两个订阅=>两个observable create函数调用。所以最新的可观察对象create fn覆盖了之前对websocket的onmessage, onerror和onclose的可观察回调。你应该多播底层的可观察对象来防止这种情况(共享操作符可以做到这一点)。
// bind ws events to observable (streams)
let observable = Rx.Observable.create((obs: Rx.Observer<MessageEvent>) => {
ws.onmessage = obs.next.bind(obs);
ws.onerror = obs.error.bind(obs);
ws.onclose = obs.complete.bind(obs);
return ws.close.bind(ws);
}).share();
关于如何正确执行此操作的更多有用资源https://github.com/ReactiveX/rxjs/blob/master/src/observable/dom/WebSocketSubject.tshttps://github.com/blesh/RxSocketSubject
如果有人想在angular上使用service发送加密请求,并获取加密后的响应,然后将其解密以在模板中使用下面是代码
mycomponent添加。Ts文件:::
import { Component, TemplateRef, OnInit, ViewChild } from '@angular/core';
import { myService } from 'src/app/my.service';
import { Base64 } from 'js-base64';
import { SelectItem } from 'primeng/api';
import { ActivatedRoute, Router } from '@angular/router';
getDataFromService() {
this._myService.getData("api_name/url")
.subscribe(data => {
let myResp = Base64.decode(this.myData = data);
this.myResponse = JSON.parse(myResp );
}
myservice。Ts文件:::
import { Injectable } from '@angular/core';
import { HttpClient, HttpHeaders, HttpParams } from '@angular/common/http';
import { Observable } from 'rxjs';
import { Iwatchlist } from './watchlist';
import { Base64 } from 'js-base64';
private _url: string = "http://192.168.10.10:1000";
getData(svcName): Observable<any> {
let baseUrl= this._url + "/"+svcName;
const httpOptions = {
headers: { 'Content-Type': 'application/json' },
"request": {
"FormFactor": "M",
"svcGroup": "portfolio",
"svcVersion": "1.0.0",
"svcName": svcName,
"requestType": "U",
"data": {
"gcid": "123",
"gscid": "abc123"
}
}
};
let encryptedData = Base64.encode(JSON.stringify(httpOptions));
let postRequest = this.http.post(baseUrl, encryptedData, {responseType: 'text'});
return postRequest;
}