main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
// chat.proto (reference)
//
// syntax = "proto3";
// package chat;
// option go_package = "example.com/project/chatpb";
//
// message ChatMessage {
//   string username = 1;
//   string content = 2;
//   string timestamp = 3;
// }
//
// service ChatService {
//   rpc ChatStream(stream ChatMessage) returns (stream ChatMessage);
// }
//
// Notes:
// - This snippet is runnable as a single file.
// - It avoids `protoc` by using `structpb.Struct` as the streaming message payload.
// - The first message a client sends is treated as registration (uses the `username` field).

package main

import (
	"bufio"
	"context"
	"flag"
	"fmt"
	"io"
	"log"
	"net"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/types/known/structpb"
)

// ---- Manual gRPC service definition (no generated stubs) ----

type ChatServiceServer interface {
	ChatStream(ChatService_ChatStreamServer) error
}

type ChatService_ChatStreamServer interface {
	Send(*structpb.Struct) error
	Recv() (*structpb.Struct, error)
	grpc.ServerStream
}

type chatServiceChatStreamServer struct {
	grpc.ServerStream
}

func (x *chatServiceChatStreamServer) Send(m *structpb.Struct) error {
	return x.ServerStream.SendMsg(m)
}

func (x *chatServiceChatStreamServer) Recv() (*structpb.Struct, error) {
	m := new(structpb.Struct)
	if err := x.ServerStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

func RegisterChatServiceServer(s grpc.ServiceRegistrar, srv ChatServiceServer) {
	s.RegisterService(&ChatService_ServiceDesc, srv)
}

var ChatService_ServiceDesc = grpc.ServiceDesc{
	ServiceName: "chat.ChatService",
	HandlerType: (*ChatServiceServer)(nil),
	Methods:     []grpc.MethodDesc{},
	Streams: []grpc.StreamDesc{
		{
			StreamName:    "ChatStream",
			Handler:       _ChatService_ChatStream_Handler,
			ServerStreams: true,
			ClientStreams: true,
		},
	},
	Metadata: "chat.proto",
}

func _ChatService_ChatStream_Handler(srv interface{}, stream grpc.ServerStream) error {
	return srv.(ChatServiceServer).ChatStream(&chatServiceChatStreamServer{ServerStream: stream})
}

type ChatServiceClient interface {
	ChatStream(ctx context.Context, opts ...grpc.CallOption) (ChatService_ChatStreamClient, error)
}

type ChatService_ChatStreamClient interface {
	Send(*structpb.Struct) error
	Recv() (*structpb.Struct, error)
	CloseSend() error
	grpc.ClientStream
}

type chatServiceClient struct {
	cc grpc.ClientConnInterface
}

func NewChatServiceClient(cc grpc.ClientConnInterface) ChatServiceClient {
	return &chatServiceClient{cc: cc}
}

type chatServiceChatStreamClient struct {
	grpc.ClientStream
}

func (x *chatServiceChatStreamClient) Send(m *structpb.Struct) error {
	return x.ClientStream.SendMsg(m)
}

func (x *chatServiceChatStreamClient) Recv() (*structpb.Struct, error) {
	m := new(structpb.Struct)
	if err := x.ClientStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

func (c *chatServiceClient) ChatStream(ctx context.Context, opts ...grpc.CallOption) (ChatService_ChatStreamClient, error) {
	desc := &ChatService_ServiceDesc.Streams[0]
	stream, err := c.cc.NewStream(ctx, desc, "/chat.ChatService/ChatStream", opts...)
	if err != nil {
		return nil, err
	}
	return &chatServiceChatStreamClient{ClientStream: stream}, nil
}

// ---- Chat server implementation ----

type chatClient struct {
	username string
	stream   ChatService_ChatStreamServer
	sendMu   sync.Mutex
}

type chatServer struct {
	mu      sync.RWMutex
	clients map[string]*chatClient
}

func NewChatServer() *chatServer {
	return &chatServer{clients: make(map[string]*chatClient)}
}

func getStringField(m *structpb.Struct, key string) string {
	if m == nil {
		return ""
	}
	v, ok := m.Fields[key]
	if !ok {
		return ""
	}
	return v.GetStringValue()
}

func newMessage(username, content string, t time.Time) (*structpb.Struct, error) {
	return structpb.NewStruct(map[string]interface{}{
		"username":  username,
		"content":   content,
		"timestamp": t.UTC().Format(time.RFC3339Nano),
	})
}

func (s *chatServer) broadcast(msg *structpb.Struct, sender string) {
	s.mu.RLock()
	clients := make([]*chatClient, 0, len(s.clients))
	for username, c := range s.clients {
		if username == sender {
			continue
		}
		clients = append(clients, c)
	}
	s.mu.RUnlock()

	for _, c := range clients {
		c.sendMu.Lock()
		err := c.stream.Send(msg)
		c.sendMu.Unlock()
		if err != nil {
			log.Printf("send to %s failed: %v", c.username, err)
		}
	}
}

func (s *chatServer) ChatStream(stream ChatService_ChatStreamServer) error {
	// First message is treated as registration.
	regMsg, err := stream.Recv()
	if err != nil {
		return status.Errorf(codes.InvalidArgument, "failed to receive registration: %v", err)
	}
	username := getStringField(regMsg, "username")
	if username == "" {
		return status.Error(codes.InvalidArgument, "username cannot be empty")
	}

	// Register client.
	s.mu.Lock()
	if _, exists := s.clients[username]; exists {
		s.mu.Unlock()
		return status.Errorf(codes.AlreadyExists, "username %q is already taken", username)
	}
	client := &chatClient{username: username, stream: stream}
	s.clients[username] = client
	count := len(s.clients)
	s.mu.Unlock()

	log.Printf("client %s joined (clients=%d)", username, count)

	join, _ := newMessage("System", fmt.Sprintf("%s joined", username), time.Now())
	s.broadcast(join, username)

	defer func() {
		s.mu.Lock()
		delete(s.clients, username)
		count := len(s.clients)
		s.mu.Unlock()

		leave, _ := newMessage("System", fmt.Sprintf("%s left", username), time.Now())
		s.broadcast(leave, username)
		log.Printf("client %s left (clients=%d)", username, count)
	}()

	for {
		msg, err := stream.Recv()
		if err != nil {
			if code := status.Code(err); code == codes.Canceled {
				return nil
			}
			if err == io.EOF {
				return nil
			}
			return err
		}

		content := getStringField(msg, "content")
		if content == "" {
			continue
		}

		out, err := newMessage(username, content, time.Now())
		if err != nil {
			return status.Errorf(codes.Internal, "failed to build message: %v", err)
		}
		s.broadcast(out, username)
	}
}

func runServer(ctx context.Context, addr string) error {
	lis, err := net.Listen("tcp", addr)
	if err != nil {
		return fmt.Errorf("listen: %w", err)
	}
	defer lis.Close()

	grpcServer := grpc.NewServer()
	RegisterChatServiceServer(grpcServer, NewChatServer())

	errCh := make(chan error, 1)
	go func() { errCh <- grpcServer.Serve(lis) }()
	log.Printf("gRPC chat server listening on %s", addr)

	select {
	case <-ctx.Done():
		log.Println("Shutting down...")
		grpcServer.GracefulStop()
		return nil
	case err := <-errCh:
		return err
	}
}

func runClient(ctx context.Context, addr, username string) error {
	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		return fmt.Errorf("dial: %w", err)
	}
	defer conn.Close()

	client := NewChatServiceClient(conn)
	stream, err := client.ChatStream(ctx)
	if err != nil {
		return fmt.Errorf("create stream: %w", err)
	}

	reg, err := newMessage(username, "", time.Now())
	if err != nil {
		return fmt.Errorf("build registration: %w", err)
	}
	if err := stream.Send(reg); err != nil {
		return fmt.Errorf("register: %w", err)
	}

	done := make(chan struct{})
	go func() {
		defer close(done)
		for {
			msg, err := stream.Recv()
			if err != nil {
				if err == io.EOF {
					return
				}
				log.Printf("recv error: %v", err)
				return
			}
			fmt.Printf("[%s] %s: %s\n", getStringField(msg, "timestamp"), getStringField(msg, "username"), getStringField(msg, "content"))
		}
	}()

	scanner := bufio.NewScanner(os.Stdin)
	fmt.Println("Type messages and press Enter (type 'exit' to quit)")
	for {
		select {
		case <-ctx.Done():
			_ = stream.CloseSend()
			<-done
			return nil
		default:
		}

		if !scanner.Scan() {
			_ = stream.CloseSend()
			<-done
			return scanner.Err()
		}
		text := scanner.Text()
		if text == "exit" {
			_ = stream.CloseSend()
			<-done
			return nil
		}
		msg, err := newMessage(username, text, time.Now())
		if err != nil {
			return err
		}
		if err := stream.Send(msg); err != nil {
			return fmt.Errorf("send: %w", err)
		}
	}
}

func main() {
	var (
		mode     string
		addr     string
		username string
	)

	flag.StringVar(&mode, "mode", "server", "server or client")
	flag.StringVar(&addr, "addr", "127.0.0.1:50051", "listen/dial address")
	flag.StringVar(&username, "username", "", "username (client only)")
	flag.Parse()

	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
	defer stop()

	switch mode {
	case "server":
		if err := runServer(ctx, addr); err != nil {
			log.Fatal(err)
		}
	case "client":
		if username == "" {
			log.Fatal("-username is required in client mode")
		}
		if err := runClient(ctx, addr, username); err != nil {
			log.Fatal(err)
		}
	default:
		log.Fatalf("unknown -mode: %s (use server or client)", mode)
	}
}

How It Works

Bidirectional gRPC chat example where clients stream messages to the server and receive broadcast updates in real time.

Server keeps a registry of connected clients, listens to each stream for incoming messages, rebroadcasts to others, and cleans up on disconnect; the client opens a stream, sends messages, and reads responses concurrently.

Key Concepts

  • 1Streaming RPC demonstrates server-side fan-out without polling.
  • 2Client registry handles joins and leaves to avoid sending to dead streams.
  • 3Context-aware loops exit cleanly on cancellation or errors.

When to Use This Pattern

  • Chat or messaging backends over gRPC.
  • Live collaboration features like document presence.
  • Telemetry streams where both sides push updates.

Best Practices

  • Use channel buffering carefully so slow clients do not block broadcasters.
  • Propagate user identity or metadata in stream context for auditing.
  • Handle io.EOF and context errors separately for clean shutdown.
Go Version1.17
Difficultyadvanced
Production ReadyYes
Lines of Code384