main.go

package main

import (	"log"
	"net/http"
	"sync"
	"time"

	"github.com/gorilla/websocket")

// WebSocketHub manages active connections and broadcasts messages to all clients
type WebSocketHub struct {
	clients    map[*WebSocketClient]bool // Connected clients
	broadcast  chan []byte              // Broadcast message queue
	register   chan *WebSocketClient    // Register new clients
	unregister chan *WebSocketClient    // Unregister disconnected clients
	mu         sync.RWMutex
}

// NewWebSocketHub creates a new hub instance
func NewWebSocketHub() *WebSocketHub {
	return &WebSocketHub{
		broadcast:  make(chan []byte),
		register:   make(chan *WebSocketClient),
		unregister: make(chan *WebSocketClient),
		clients:    make(map[*WebSocketClient]bool),
	}
}

// Run starts the hub's message loop
func (h *WebSocketHub) Run() {
	for {
		select {
		case client := <-h.register:
			h.mu.Lock()
			h.clients[client] = true
			h.mu.Unlock()
			log.Printf("Client registered: %s. Total clients: %d", client.conn.RemoteAddr(), len(h.clients))

		case client := <-h.unregister:
			h.mu.Lock()
			if _, ok := h.clients[client]; ok {
				delete(h.clients, client)
				close(client.send)
				log.Printf("Client unregistered: %s. Total clients: %d", client.conn.RemoteAddr(), len(h.clients))
			}
			h.mu.Unlock()

		case message := <-h.broadcast:
			h.mu.RLock()
			for client := range h.clients {
				select {
				case client.send <- message:
				default:
					close(client.send)
					delete(h.clients, client)
				}
			}
			h.mu.RUnlock()
		}
	}
}

// WebSocketClient represents a single client connection
type WebSocketClient struct {
	hub  *WebSocketHub
	conn *websocket.Conn
	send chan []byte // Outgoing message queue
}

// readPump reads messages from the client and forwards to the hub for broadcast
func (c *WebSocketClient) readPump() {
	defer func() {
		c.hub.unregister <- c
		c.conn.Close()
	}()

	// Configure read limit and close handling
	c.conn.SetReadLimit(512)
	c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
	c.conn.SetPongHandler(func(string) error {
		c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
		return nil
	})

	for {
		_, message, err := c.conn.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				log.Printf("read error: %v", err)
			}
			break
		}
		log.Printf("Received message from %s: %s", c.conn.RemoteAddr(), message)
		c.hub.broadcast <- message
	}
}

// writePump sends messages from the hub to the client
func (c *WebSocketClient) writePump() {
	ticker := time.NewTicker(30 * time.Second) // Ping ticker to keep connection alive
	defer func() {
		ticker.Stop()
		c.conn.Close()
	}()

	for {
		select {
		case message, ok := <-c.send:
			c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
			if !ok {
				// Hub closed the send channel
				c.conn.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}

			// Write message to client
			w, err := c.conn.NextWriter(websocket.TextMessage)
			if err != nil {
				return
			}
			w.Write(message)

			// Flush and close writer
			if err := w.Close(); err != nil {
				return
			}

		case <-ticker.C:
			// Send ping to keep connection alive
			c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
			if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
				return
			}
		}
	}
}

// upgrader converts HTTP connections to WebSocket connections
var upgrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool {
		// Allow all origins (restrict in production!)
		return true
	},
}

// serveWs handles WebSocket upgrade requests
func serveWs(hub *WebSocketHub, w http.ResponseWriter, r *http.Request) {
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Printf("upgrade error: %v", err)
		return
	}

	client := &WebSocketClient{
		hub:  hub,
		conn: conn,
		send: make(chan []byte, 256),
	}
	client.hub.register <- client

	// Run read/write pumps in goroutines (concurrent bidirectional communication)
	go client.writePump()
	go client.readPump()
}

// Example Usage: Server with HTTP endpoint for WebSocket upgrade
func main() {
	hub := NewWebSocketHub()
	go hub.Run()

	// WebSocket endpoint
	http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
		serveWs(hub, w, r)
	})

	// Static HTML client (for testing)
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		html := `
			<html>
			<head><title>WebSocket Chat</title></head>
			<body>
				<input type="text" id="message" placeholder="Enter message">
				<button onclick="sendMessage()">Send</button>
				<div id="messages"></div>
				<script>
					const ws = new WebSocket('ws://' + window.location.host + '/ws');
					ws.onmessage = function(event) {
						const div = document.createElement('div');
						div.textContent = event.data;
						document.getElementById('messages').appendChild(div);
					};
					function sendMessage() {
						const input = document.getElementById('message');
						ws.send(input.value);
						input.value = '';
					}
				</script>
			</body>
			</html>
		`
		w.Write([]byte(html))
	})

	log.Println("WebSocket server starting on :8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

How It Works

Full-duplex WebSocket example with a server that tracks clients and a client that sends and receives broadcast messages.

Upgrades HTTP connections, stores clients in a concurrency-safe map, reads messages per connection, broadcasts to all peers, and removes clients on disconnect; the example client dials the server, sends messages, and prints responses.

Key Concepts

  • 1Broadcast loop fans out messages to all connected sessions.
  • 2Heartbeat or close handling cleans up disconnected clients.
  • 3Concurrency-safe client registry prevents race conditions.

When to Use This Pattern

  • Chat applications or notification hubs.
  • Live dashboards needing instant updates.
  • Collaborative tools requiring low-latency messaging.

Best Practices

  • Check write deadlines to avoid slow client backpressure.
  • Serialize access to the client map when adding or removing connections.
  • Validate inbound payloads before broadcasting to others.
Go Version1.15
Difficultyintermediate
Production ReadyYes
Lines of Code206