main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/gorilla/websocket"
)

// Constants for WebSocket configuration
const (
	writeWait      = 10 * time.Second    // Time allowed to write a message to the peer
	pongWait       = 60 * time.Second    // Time allowed to read the next pong message from the peer
	pingPeriod     = (pongWait * 9) / 10 // Send pings to peer with this period
	maxMessageSize = 512                 // Maximum message size allowed from peer
)

// upgrader converts HTTP connections to WebSocket connections
var upgrader = websocket.Upgrader{
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
	CheckOrigin: func(r *http.Request) bool {
		// Allow all origins (for demonstration - restrict in production)
		return true
	},
}

// Client represents a single WebSocket connection
type Client struct {
	room    *Room       // Room the client is subscribed to
	conn    *websocket.Conn // The WebSocket connection
	send    chan []byte     // Buffered channel of outbound messages
	username string         // Client's username
}

// Room represents a chat room with multiple clients
type Room struct {
	name string // Unique room name

	// Registered clients
	clients map[*Client]bool

	// Inbound messages from clients
	broadcast chan []byte

	// Register requests from clients
	register chan *Client

	// Unregister requests from clients
	unregister chan *Client

	// Presence tracking
	mu       sync.RWMutex
	users    map[string]bool // Track online users in the room
}

// Hub manages all chat rooms
type Hub struct {
	rooms map[string]*Room // Map of room name to Room
	mu    sync.RWMutex     // For thread-safe room access
}

// NewHub creates a new Hub instance
func NewHub() *Hub {
	return &Hub{
		rooms: make(map[string]*Room),
	}
}

// GetRoom retrieves or creates a room by name
func (h *Hub) GetRoom(roomName string) *Room {
	h.mu.Lock()
	defer h.mu.Unlock()

	if room, ok := h.rooms[roomName]; ok {
		return room
	}

	// Create new room if it doesn't exist
	room := &Room{
		name:       roomName,
		broadcast:  make(chan []byte),
		register:   make(chan *Client),
		unregister: make(chan *Client),
		clients:    make(map[*Client]bool),
		users:      make(map[string]bool),
	}
	h.rooms[roomName] = room

	// Start room handler
	go room.run()

	return room
}

// run handles room operations (register, unregister, broadcast)
func (r *Room) run() {
	for {
		select {
		case client := <-r.register:
			// Register new client
			r.mu.Lock()
			r.clients[client] = true
			r.users[client.username] = true
			r.mu.Unlock()

			// Broadcast join notification
			joinMsg := Message{
				Type:    "presence",
				Username: client.username,
				Content:  fmt.Sprintf("%s joined the room", client.username),
				Room:     r.name,
				Timestamp: time.Now().Unix(),
			}
			data, _ := json.Marshal(joinMsg)
			r.broadcast <- data

		case client := <-r.unregister:
			// Unregister client
			r.mu.Lock()
			if _, ok := r.clients[client]; ok {
				delete(r.clients, client)
				delete(r.users, client.username)
				close(client.send)

				// Broadcast leave notification
				leaveMsg := Message{
					Type:      "presence",
					Username:  client.username,
					Content:   fmt.Sprintf("%s left the room", client.username),
					Room:      r.name,
					Timestamp: time.Now().Unix(),
				}
				data, _ := json.Marshal(leaveMsg)
				r.broadcast <- data
			}
			r.mu.Unlock()

		case message := <-r.broadcast:
			// Broadcast message to all clients
			r.mu.RLock()
			for client := range r.clients {
				select {
				case client.send <- message:
				default:
					close(client.send)
					delete(r.clients, client)
				}
			}
			r.mu.RUnlock()
		}
	}
}

// Message represents a WebSocket message
type Message struct {
	Type      string `json:"type"`      // "chat", "presence", "ping"
	Username  string `json:"username"`  // Sender username
	Content   string `json:"content"`   // Message content
	Room      string `json:"room"`      // Target room
	Timestamp int64  `json:"timestamp"` // Unix timestamp
}

// readPump pumps messages from the WebSocket connection to the room
func (c *Client) readPump() {
	defer func() {
		c.room.unregister <- c
		c.conn.Close()
	}()

	// Configure connection settings
	c.conn.SetReadLimit(maxMessageSize)
	c.conn.SetReadDeadline(time.Now().Add(pongWait))
	c.conn.SetPongHandler(func(string) error {
		c.conn.SetReadDeadline(time.Now().Add(pongWait))
		return nil
	})

	// Read messages from client
	for {
		_, message, err := c.conn.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				log.Printf("error: %v", err)
			}
			break
		}

		// Parse message
		var msg Message
		if err := json.Unmarshal(message, &msg); err != nil {
			log.Printf("failed to parse message: %v", err)
			continue
		}

		// Set metadata
		msg.Username = c.username
		msg.Room = c.room.name
		msg.Timestamp = time.Now().Unix()

		// Re-serialize with complete metadata
		data, err := json.Marshal(msg)
		if err != nil {
			log.Printf("failed to marshal message: %v", err)
			continue
		}

		// Send to broadcast channel
		c.room.broadcast <- data
	}
}

// writePump pumps messages from the room to the WebSocket connection
func (c *Client) writePump() {
	// Create ticker for ping messages
	ticker := time.NewTicker(pingPeriod)
	defer func() {
		ticker.Stop()
		c.conn.Close()
	}()

	for {
		select {
		case message, ok := <-c.send:
			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if !ok {
				// Room closed the channel
				c.conn.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}

			// Write message to WebSocket
			w, err := c.conn.NextWriter(websocket.TextMessage)
			if err != nil {
				return
			}
			w.Write(message)

			// Close writer
			if err := w.Close(); err != nil {
				return
			}

		case <-ticker.C:
			// Send ping message
			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
				return
			}
		}
	}
}

// serveWs handles WebSocket requests from clients
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
	// Extract room name and username from query parameters
	roomName := r.URL.Query().Get("room")
	username := r.URL.Query().Get("username")
	if roomName == "" || username == "" {
		http.Error(w, "room and username query parameters are required", http.StatusBadRequest)
		return
	}

	// Upgrade HTTP connection to WebSocket
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println(err)
		return
	}

	// Get or create room
	room := hub.GetRoom(roomName)

	// Create new client
	client := &Client{
		room:    room,
		conn:    conn,
		send:    make(chan []byte, 256),
		username: username,
	}

	// Register client with room
	room.register <- client

	// Start read/write pumps (goroutines)
	go client.writePump()
	go client.readPump()
}

// serveHome serves the HTML client page
func serveHome(w http.ResponseWriter, r *http.Request) {
	log.Println(r.URL)
	if r.URL.Path != "/" {
		http.Error(w, "Not found", http.StatusNotFound)
		return
	}
	if r.Method != http.MethodGet {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}
	http.ServeFile(w, r, "home.html")
}

func main() {
	hub := NewHub()

	// HTTP routes
	http.HandleFunc("/", serveHome)
	http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
		serveWs(hub, w, r)
	})

	// Start HTTP server
	log.Println("WebSocket Pub/Sub server starting on :8080")
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatal("ListenAndServe: ", err)
	}
}

How It Works

Room-based WebSocket broker that manages subscribers per room, broadcasts messages, and tracks presence with ping or pong heartbeats.

Hub maintains maps of rooms to clients, registers joins and leaves via channels, listens for incoming messages, and distributes them to all clients in the same room; write pumps send messages with timeouts and remove slow clients.

Key Concepts

  • 1Room abstraction isolates broadcasts to interested subscribers.
  • 2Channels serialize join, leave, and message events to avoid race conditions.
  • 3Heartbeats detect dead connections and trigger cleanup.

When to Use This Pattern

  • Chat rooms or collaborative groups.
  • Realtime dashboards segmented by tenant or team.
  • Live event streams scoped to topics.

Best Practices

  • Cap message queue lengths to avoid memory blowups.
  • Handle slow clients by dropping or disconnecting to protect the hub.
  • Validate room IDs and authenticate before joining.
Go Version1.19
Difficultyadvanced
Production ReadyYes
Lines of Code322