gRPC Bidirectional Stream Chat Service
Bidirectional gRPC chat example where clients stream messages to the server and receive broadcast updates in real time.
main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
// chat.proto (reference)
//
// syntax = "proto3";
// package chat;
// option go_package = "example.com/project/chatpb";
//
// message ChatMessage {
// string username = 1;
// string content = 2;
// string timestamp = 3;
// }
//
// service ChatService {
// rpc ChatStream(stream ChatMessage) returns (stream ChatMessage);
// }
//
// Notes:
// - This snippet is runnable as a single file.
// - It avoids `protoc` by using `structpb.Struct` as the streaming message payload.
// - The first message a client sends is treated as registration (uses the `username` field).
package main
import (
"bufio"
"context"
"flag"
"fmt"
"io"
"log"
"net"
"os"
"os/signal"
"sync"
"syscall"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/structpb"
)
// ---- Manual gRPC service definition (no generated stubs) ----
type ChatServiceServer interface {
ChatStream(ChatService_ChatStreamServer) error
}
type ChatService_ChatStreamServer interface {
Send(*structpb.Struct) error
Recv() (*structpb.Struct, error)
grpc.ServerStream
}
type chatServiceChatStreamServer struct {
grpc.ServerStream
}
func (x *chatServiceChatStreamServer) Send(m *structpb.Struct) error {
return x.ServerStream.SendMsg(m)
}
func (x *chatServiceChatStreamServer) Recv() (*structpb.Struct, error) {
m := new(structpb.Struct)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func RegisterChatServiceServer(s grpc.ServiceRegistrar, srv ChatServiceServer) {
s.RegisterService(&ChatService_ServiceDesc, srv)
}
var ChatService_ServiceDesc = grpc.ServiceDesc{
ServiceName: "chat.ChatService",
HandlerType: (*ChatServiceServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "ChatStream",
Handler: _ChatService_ChatStream_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "chat.proto",
}
func _ChatService_ChatStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(ChatServiceServer).ChatStream(&chatServiceChatStreamServer{ServerStream: stream})
}
type ChatServiceClient interface {
ChatStream(ctx context.Context, opts ...grpc.CallOption) (ChatService_ChatStreamClient, error)
}
type ChatService_ChatStreamClient interface {
Send(*structpb.Struct) error
Recv() (*structpb.Struct, error)
CloseSend() error
grpc.ClientStream
}
type chatServiceClient struct {
cc grpc.ClientConnInterface
}
func NewChatServiceClient(cc grpc.ClientConnInterface) ChatServiceClient {
return &chatServiceClient{cc: cc}
}
type chatServiceChatStreamClient struct {
grpc.ClientStream
}
func (x *chatServiceChatStreamClient) Send(m *structpb.Struct) error {
return x.ClientStream.SendMsg(m)
}
func (x *chatServiceChatStreamClient) Recv() (*structpb.Struct, error) {
m := new(structpb.Struct)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *chatServiceClient) ChatStream(ctx context.Context, opts ...grpc.CallOption) (ChatService_ChatStreamClient, error) {
desc := &ChatService_ServiceDesc.Streams[0]
stream, err := c.cc.NewStream(ctx, desc, "/chat.ChatService/ChatStream", opts...)
if err != nil {
return nil, err
}
return &chatServiceChatStreamClient{ClientStream: stream}, nil
}
// ---- Chat server implementation ----
type chatClient struct {
username string
stream ChatService_ChatStreamServer
sendMu sync.Mutex
}
type chatServer struct {
mu sync.RWMutex
clients map[string]*chatClient
}
func NewChatServer() *chatServer {
return &chatServer{clients: make(map[string]*chatClient)}
}
func getStringField(m *structpb.Struct, key string) string {
if m == nil {
return ""
}
v, ok := m.Fields[key]
if !ok {
return ""
}
return v.GetStringValue()
}
func newMessage(username, content string, t time.Time) (*structpb.Struct, error) {
return structpb.NewStruct(map[string]interface{}{
"username": username,
"content": content,
"timestamp": t.UTC().Format(time.RFC3339Nano),
})
}
func (s *chatServer) broadcast(msg *structpb.Struct, sender string) {
s.mu.RLock()
clients := make([]*chatClient, 0, len(s.clients))
for username, c := range s.clients {
if username == sender {
continue
}
clients = append(clients, c)
}
s.mu.RUnlock()
for _, c := range clients {
c.sendMu.Lock()
err := c.stream.Send(msg)
c.sendMu.Unlock()
if err != nil {
log.Printf("send to %s failed: %v", c.username, err)
}
}
}
func (s *chatServer) ChatStream(stream ChatService_ChatStreamServer) error {
// First message is treated as registration.
regMsg, err := stream.Recv()
if err != nil {
return status.Errorf(codes.InvalidArgument, "failed to receive registration: %v", err)
}
username := getStringField(regMsg, "username")
if username == "" {
return status.Error(codes.InvalidArgument, "username cannot be empty")
}
// Register client.
s.mu.Lock()
if _, exists := s.clients[username]; exists {
s.mu.Unlock()
return status.Errorf(codes.AlreadyExists, "username %q is already taken", username)
}
client := &chatClient{username: username, stream: stream}
s.clients[username] = client
count := len(s.clients)
s.mu.Unlock()
log.Printf("client %s joined (clients=%d)", username, count)
join, _ := newMessage("System", fmt.Sprintf("%s joined", username), time.Now())
s.broadcast(join, username)
defer func() {
s.mu.Lock()
delete(s.clients, username)
count := len(s.clients)
s.mu.Unlock()
leave, _ := newMessage("System", fmt.Sprintf("%s left", username), time.Now())
s.broadcast(leave, username)
log.Printf("client %s left (clients=%d)", username, count)
}()
for {
msg, err := stream.Recv()
if err != nil {
if code := status.Code(err); code == codes.Canceled {
return nil
}
if err == io.EOF {
return nil
}
return err
}
content := getStringField(msg, "content")
if content == "" {
continue
}
out, err := newMessage(username, content, time.Now())
if err != nil {
return status.Errorf(codes.Internal, "failed to build message: %v", err)
}
s.broadcast(out, username)
}
}
func runServer(ctx context.Context, addr string) error {
lis, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("listen: %w", err)
}
defer lis.Close()
grpcServer := grpc.NewServer()
RegisterChatServiceServer(grpcServer, NewChatServer())
errCh := make(chan error, 1)
go func() { errCh <- grpcServer.Serve(lis) }()
log.Printf("gRPC chat server listening on %s", addr)
select {
case <-ctx.Done():
log.Println("Shutting down...")
grpcServer.GracefulStop()
return nil
case err := <-errCh:
return err
}
}
func runClient(ctx context.Context, addr, username string) error {
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return fmt.Errorf("dial: %w", err)
}
defer conn.Close()
client := NewChatServiceClient(conn)
stream, err := client.ChatStream(ctx)
if err != nil {
return fmt.Errorf("create stream: %w", err)
}
reg, err := newMessage(username, "", time.Now())
if err != nil {
return fmt.Errorf("build registration: %w", err)
}
if err := stream.Send(reg); err != nil {
return fmt.Errorf("register: %w", err)
}
done := make(chan struct{})
go func() {
defer close(done)
for {
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
return
}
log.Printf("recv error: %v", err)
return
}
fmt.Printf("[%s] %s: %s\n", getStringField(msg, "timestamp"), getStringField(msg, "username"), getStringField(msg, "content"))
}
}()
scanner := bufio.NewScanner(os.Stdin)
fmt.Println("Type messages and press Enter (type 'exit' to quit)")
for {
select {
case <-ctx.Done():
_ = stream.CloseSend()
<-done
return nil
default:
}
if !scanner.Scan() {
_ = stream.CloseSend()
<-done
return scanner.Err()
}
text := scanner.Text()
if text == "exit" {
_ = stream.CloseSend()
<-done
return nil
}
msg, err := newMessage(username, text, time.Now())
if err != nil {
return err
}
if err := stream.Send(msg); err != nil {
return fmt.Errorf("send: %w", err)
}
}
}
func main() {
var (
mode string
addr string
username string
)
flag.StringVar(&mode, "mode", "server", "server or client")
flag.StringVar(&addr, "addr", "127.0.0.1:50051", "listen/dial address")
flag.StringVar(&username, "username", "", "username (client only)")
flag.Parse()
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
switch mode {
case "server":
if err := runServer(ctx, addr); err != nil {
log.Fatal(err)
}
case "client":
if username == "" {
log.Fatal("-username is required in client mode")
}
if err := runClient(ctx, addr, username); err != nil {
log.Fatal(err)
}
default:
log.Fatalf("unknown -mode: %s (use server or client)", mode)
}
}
How It Works
Bidirectional gRPC chat example where clients stream messages to the server and receive broadcast updates in real time.
Server keeps a registry of connected clients, listens to each stream for incoming messages, rebroadcasts to others, and cleans up on disconnect; the client opens a stream, sends messages, and reads responses concurrently.
Key Concepts
- 1Streaming RPC demonstrates server-side fan-out without polling.
- 2Client registry handles joins and leaves to avoid sending to dead streams.
- 3Context-aware loops exit cleanly on cancellation or errors.
When to Use This Pattern
- Chat or messaging backends over gRPC.
- Live collaboration features like document presence.
- Telemetry streams where both sides push updates.
Best Practices
- Use channel buffering carefully so slow clients do not block broadcasters.
- Propagate user identity or metadata in stream context for auditing.
- Handle io.EOF and context errors separately for clean shutdown.
Go Version1.17
Difficultyadvanced
Production ReadyYes
Lines of Code384