WebSocket Pub/Sub Server with Room Support
Room-based WebSocket broker that manages subscribers per room, broadcasts messages, and tracks presence with ping or pong heartbeats.
main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
// Constants for WebSocket configuration
const (
writeWait = 10 * time.Second // Time allowed to write a message to the peer
pongWait = 60 * time.Second // Time allowed to read the next pong message from the peer
pingPeriod = (pongWait * 9) / 10 // Send pings to peer with this period
maxMessageSize = 512 // Maximum message size allowed from peer
)
// upgrader converts HTTP connections to WebSocket connections
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// Allow all origins (for demonstration - restrict in production)
return true
},
}
// Client represents a single WebSocket connection
type Client struct {
room *Room // Room the client is subscribed to
conn *websocket.Conn // The WebSocket connection
send chan []byte // Buffered channel of outbound messages
username string // Client's username
}
// Room represents a chat room with multiple clients
type Room struct {
name string // Unique room name
// Registered clients
clients map[*Client]bool
// Inbound messages from clients
broadcast chan []byte
// Register requests from clients
register chan *Client
// Unregister requests from clients
unregister chan *Client
// Presence tracking
mu sync.RWMutex
users map[string]bool // Track online users in the room
}
// Hub manages all chat rooms
type Hub struct {
rooms map[string]*Room // Map of room name to Room
mu sync.RWMutex // For thread-safe room access
}
// NewHub creates a new Hub instance
func NewHub() *Hub {
return &Hub{
rooms: make(map[string]*Room),
}
}
// GetRoom retrieves or creates a room by name
func (h *Hub) GetRoom(roomName string) *Room {
h.mu.Lock()
defer h.mu.Unlock()
if room, ok := h.rooms[roomName]; ok {
return room
}
// Create new room if it doesn't exist
room := &Room{
name: roomName,
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
clients: make(map[*Client]bool),
users: make(map[string]bool),
}
h.rooms[roomName] = room
// Start room handler
go room.run()
return room
}
// run handles room operations (register, unregister, broadcast)
func (r *Room) run() {
for {
select {
case client := <-r.register:
// Register new client
r.mu.Lock()
r.clients[client] = true
r.users[client.username] = true
r.mu.Unlock()
// Broadcast join notification
joinMsg := Message{
Type: "presence",
Username: client.username,
Content: fmt.Sprintf("%s joined the room", client.username),
Room: r.name,
Timestamp: time.Now().Unix(),
}
data, _ := json.Marshal(joinMsg)
r.broadcast <- data
case client := <-r.unregister:
// Unregister client
r.mu.Lock()
if _, ok := r.clients[client]; ok {
delete(r.clients, client)
delete(r.users, client.username)
close(client.send)
// Broadcast leave notification
leaveMsg := Message{
Type: "presence",
Username: client.username,
Content: fmt.Sprintf("%s left the room", client.username),
Room: r.name,
Timestamp: time.Now().Unix(),
}
data, _ := json.Marshal(leaveMsg)
r.broadcast <- data
}
r.mu.Unlock()
case message := <-r.broadcast:
// Broadcast message to all clients
r.mu.RLock()
for client := range r.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(r.clients, client)
}
}
r.mu.RUnlock()
}
}
}
// Message represents a WebSocket message
type Message struct {
Type string `json:"type"` // "chat", "presence", "ping"
Username string `json:"username"` // Sender username
Content string `json:"content"` // Message content
Room string `json:"room"` // Target room
Timestamp int64 `json:"timestamp"` // Unix timestamp
}
// readPump pumps messages from the WebSocket connection to the room
func (c *Client) readPump() {
defer func() {
c.room.unregister <- c
c.conn.Close()
}()
// Configure connection settings
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
// Read messages from client
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
// Parse message
var msg Message
if err := json.Unmarshal(message, &msg); err != nil {
log.Printf("failed to parse message: %v", err)
continue
}
// Set metadata
msg.Username = c.username
msg.Room = c.room.name
msg.Timestamp = time.Now().Unix()
// Re-serialize with complete metadata
data, err := json.Marshal(msg)
if err != nil {
log.Printf("failed to marshal message: %v", err)
continue
}
// Send to broadcast channel
c.room.broadcast <- data
}
}
// writePump pumps messages from the room to the WebSocket connection
func (c *Client) writePump() {
// Create ticker for ping messages
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// Room closed the channel
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// Write message to WebSocket
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
// Close writer
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
// Send ping message
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// serveWs handles WebSocket requests from clients
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
// Extract room name and username from query parameters
roomName := r.URL.Query().Get("room")
username := r.URL.Query().Get("username")
if roomName == "" || username == "" {
http.Error(w, "room and username query parameters are required", http.StatusBadRequest)
return
}
// Upgrade HTTP connection to WebSocket
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
// Get or create room
room := hub.GetRoom(roomName)
// Create new client
client := &Client{
room: room,
conn: conn,
send: make(chan []byte, 256),
username: username,
}
// Register client with room
room.register <- client
// Start read/write pumps (goroutines)
go client.writePump()
go client.readPump()
}
// serveHome serves the HTML client page
func serveHome(w http.ResponseWriter, r *http.Request) {
log.Println(r.URL)
if r.URL.Path != "/" {
http.Error(w, "Not found", http.StatusNotFound)
return
}
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
http.ServeFile(w, r, "home.html")
}
func main() {
hub := NewHub()
// HTTP routes
http.HandleFunc("/", serveHome)
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
serveWs(hub, w, r)
})
// Start HTTP server
log.Println("WebSocket Pub/Sub server starting on :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal("ListenAndServe: ", err)
}
}How It Works
Room-based WebSocket broker that manages subscribers per room, broadcasts messages, and tracks presence with ping or pong heartbeats.
Hub maintains maps of rooms to clients, registers joins and leaves via channels, listens for incoming messages, and distributes them to all clients in the same room; write pumps send messages with timeouts and remove slow clients.
Key Concepts
- 1Room abstraction isolates broadcasts to interested subscribers.
- 2Channels serialize join, leave, and message events to avoid race conditions.
- 3Heartbeats detect dead connections and trigger cleanup.
When to Use This Pattern
- Chat rooms or collaborative groups.
- Realtime dashboards segmented by tenant or team.
- Live event streams scoped to topics.
Best Practices
- Cap message queue lengths to avoid memory blowups.
- Handle slow clients by dropping or disconnecting to protect the hub.
- Validate room IDs and authenticate before joining.
Go Version1.19
Difficultyadvanced
Production ReadyYes
Lines of Code322