将流消息转发到连接在GRPC golang的所有客户端



我有一个GRPC服务器,它有这样的.proto文件。。

syntax = "proto3";
package chatserver;
message FromClient {
string name = 1;
string body = 2;
}
message FromServer {
string name = 1;
string body = 2; 
}
service Services {
rpc ChatService(stream FromClient) returns (stream FromServer){};
}

server.go代码如下所示。。

package chatserver
import (
"fmt"
"log"
"math/rand"
"sync"
"time"
)
type messageUnit struct {
ClientName        string
MessageBody       string
MessageUniqueCode int
ClientUniqueCode  int
}
type messageHandle struct {
MQue []messageUnit
mu   sync.Mutex
}
var messageHandleObject = messageHandle{}
type ChatServer struct {
}
//define ChatService
func (is *ChatServer) ChatService(csi Services_ChatServiceServer) error {
clientUniqueCode := rand.Intn(1e6)
errch := make(chan error)
println(csi.Context())
// receive messages - init a go routine
go receiveFromStream(csi, clientUniqueCode, errch)
// send messages - init a go routine
go sendToStream(csi, clientUniqueCode, errch)
return <-errch
}
//receive messages
func receiveFromStream(csi_ Services_ChatServiceServer, clientUniqueCode_ int, errch_ chan error) {
//implement a loop
for {
mssg, err := csi_.Recv()
if err != nil {
log.Printf("Error in receiving message from client :: %v", err)
errch_ <- err
} else {
messageHandleObject.mu.Lock()
messageHandleObject.MQue = append(messageHandleObject.MQue, messageUnit{
ClientName:        mssg.Name,
MessageBody:       mssg.Body,
MessageUniqueCode: rand.Intn(1e8),
ClientUniqueCode:  clientUniqueCode_,
})
log.Printf("%v", messageHandleObject.MQue[len(messageHandleObject.MQue)-1])
messageHandleObject.mu.Unlock()
}
}
}
//send message
func sendToStream(csi_ Services_ChatServiceServer, clientUniqueCode_ int, errch_ chan error) {
//implement a loop
for {
//loop through messages in MQue
for {
time.Sleep(500 * time.Millisecond)
messageHandleObject.mu.Lock()
if len(messageHandleObject.MQue) == 0 {
messageHandleObject.mu.Unlock()
break
}
senderUniqueCode := messageHandleObject.MQue[0].ClientUniqueCode
senderName4Client := messageHandleObject.MQue[0].ClientName
message4Client := messageHandleObject.MQue[0].MessageBody
messageHandleObject.mu.Unlock()
//send message to designated client (do not send to the same client)
if senderUniqueCode != clientUniqueCode_ {
err := csi_.Send(&FromServer{Name: senderName4Client, Body: message4Client})
if err != nil {
errch_ <- err
}
messageHandleObject.mu.Lock()
if len(messageHandleObject.MQue) > 1 {
messageHandleObject.MQue = messageHandleObject.MQue[1:] // delete the message at index 0 after sending to receiver
fmt.Println("message greater then 1")
} else {
messageHandleObject.MQue = []messageUnit{}
}
messageHandleObject.mu.Unlock()
}
}
time.Sleep(100 * time.Millisecond)
}
}

现在,当例如三个客户端连接到服务器时。如果一个客户端发送消息,则不会将其转发给其他两个客户端。它只发送给另外两个客户端中的一个。我能以某种方式将消息广播给所有其他客户吗?或者我可以指定哪个客户端将根据某个客户端id接收消息吗?

您必须保留一个包含所有连接流的全局数据结构,并在其中循环。您还需要为它们中的每一个生成一个唯一的标识符,以便进行区分。我看不出其他办法。

最新更新