WebSocket Server/Client with Real-Time Messaging
Full-duplex WebSocket example with a server that tracks clients and a client that sends and receives broadcast messages.
main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
package main
import ( "log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket")
// WebSocketHub manages active connections and broadcasts messages to all clients
type WebSocketHub struct {
clients map[*WebSocketClient]bool // Connected clients
broadcast chan []byte // Broadcast message queue
register chan *WebSocketClient // Register new clients
unregister chan *WebSocketClient // Unregister disconnected clients
mu sync.RWMutex
}
// NewWebSocketHub creates a new hub instance
func NewWebSocketHub() *WebSocketHub {
return &WebSocketHub{
broadcast: make(chan []byte),
register: make(chan *WebSocketClient),
unregister: make(chan *WebSocketClient),
clients: make(map[*WebSocketClient]bool),
}
}
// Run starts the hub's message loop
func (h *WebSocketHub) Run() {
for {
select {
case client := <-h.register:
h.mu.Lock()
h.clients[client] = true
h.mu.Unlock()
log.Printf("Client registered: %s. Total clients: %d", client.conn.RemoteAddr(), len(h.clients))
case client := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
log.Printf("Client unregistered: %s. Total clients: %d", client.conn.RemoteAddr(), len(h.clients))
}
h.mu.Unlock()
case message := <-h.broadcast:
h.mu.RLock()
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
h.mu.RUnlock()
}
}
}
// WebSocketClient represents a single client connection
type WebSocketClient struct {
hub *WebSocketHub
conn *websocket.Conn
send chan []byte // Outgoing message queue
}
// readPump reads messages from the client and forwards to the hub for broadcast
func (c *WebSocketClient) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
// Configure read limit and close handling
c.conn.SetReadLimit(512)
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("read error: %v", err)
}
break
}
log.Printf("Received message from %s: %s", c.conn.RemoteAddr(), message)
c.hub.broadcast <- message
}
}
// writePump sends messages from the hub to the client
func (c *WebSocketClient) writePump() {
ticker := time.NewTicker(30 * time.Second) // Ping ticker to keep connection alive
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
// Hub closed the send channel
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// Write message to client
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
// Flush and close writer
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
// Send ping to keep connection alive
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// upgrader converts HTTP connections to WebSocket connections
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// Allow all origins (restrict in production!)
return true
},
}
// serveWs handles WebSocket upgrade requests
func serveWs(hub *WebSocketHub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("upgrade error: %v", err)
return
}
client := &WebSocketClient{
hub: hub,
conn: conn,
send: make(chan []byte, 256),
}
client.hub.register <- client
// Run read/write pumps in goroutines (concurrent bidirectional communication)
go client.writePump()
go client.readPump()
}
// Example Usage: Server with HTTP endpoint for WebSocket upgrade
func main() {
hub := NewWebSocketHub()
go hub.Run()
// WebSocket endpoint
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
serveWs(hub, w, r)
})
// Static HTML client (for testing)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
html := `
<html>
<head><title>WebSocket Chat</title></head>
<body>
<input type="text" id="message" placeholder="Enter message">
<button onclick="sendMessage()">Send</button>
<div id="messages"></div>
<script>
const ws = new WebSocket('ws://' + window.location.host + '/ws');
ws.onmessage = function(event) {
const div = document.createElement('div');
div.textContent = event.data;
document.getElementById('messages').appendChild(div);
};
function sendMessage() {
const input = document.getElementById('message');
ws.send(input.value);
input.value = '';
}
</script>
</body>
</html>
`
w.Write([]byte(html))
})
log.Println("WebSocket server starting on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}How It Works
Full-duplex WebSocket example with a server that tracks clients and a client that sends and receives broadcast messages.
Upgrades HTTP connections, stores clients in a concurrency-safe map, reads messages per connection, broadcasts to all peers, and removes clients on disconnect; the example client dials the server, sends messages, and prints responses.
Key Concepts
- 1Broadcast loop fans out messages to all connected sessions.
- 2Heartbeat or close handling cleans up disconnected clients.
- 3Concurrency-safe client registry prevents race conditions.
When to Use This Pattern
- Chat applications or notification hubs.
- Live dashboards needing instant updates.
- Collaborative tools requiring low-latency messaging.
Best Practices
- Check write deadlines to avoid slow client backpressure.
- Serialize access to the client map when adding or removing connections.
- Validate inbound payloads before broadcasting to others.
Go Version1.15
Difficultyintermediate
Production ReadyYes
Lines of Code206