Server-Sent Events (SSE) Broadcaster with Keep-Alives
Server-Sent Events broadcaster that lets clients subscribe, keeps connections alive, and broadcasts messages over HTTP.
main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
package main
import (
"fmt"
"log"
"net/http"
"sync"
"time"
)
const lf = byte(10)
type Broker struct {
mu sync.RWMutex
next int
subs map[int]chan string
}
func NewBroker() *Broker {
return &Broker{subs: make(map[int]chan string)}
}
func (b *Broker) Subscribe(buffer int) (id int, ch <-chan string, unsubscribe func()) {
b.mu.Lock()
defer b.mu.Unlock()
b.next++
id = b.next
c := make(chan string, buffer)
b.subs[id] = c
unsubscribe = func() {
b.mu.Lock()
if c2, ok := b.subs[id]; ok {
delete(b.subs, id)
close(c2)
}
b.mu.Unlock()
}
return id, c, unsubscribe
}
func (b *Broker) Publish(msg string) {
b.mu.RLock()
defer b.mu.RUnlock()
for _, c := range b.subs {
select {
case c <- msg:
default:
}
}
}
func sseHandler(b *Broker) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
_, ch, unsubscribe := b.Subscribe(16)
defer unsubscribe()
keepAlive := time.NewTicker(20 * time.Second)
defer keepAlive.Stop()
ctx := r.Context()
for {
select {
case <-ctx.Done():
return
case <-keepAlive.C:
fmt.Fprintf(w, ": ping%c%c", lf, lf)
flusher.Flush()
case msg, ok := <-ch:
if !ok {
return
}
fmt.Fprintf(w, "event: message%cdata: %s%c%c", lf, msg, lf, lf)
flusher.Flush()
}
}
}
}
func main() {
b := NewBroker()
go func() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
i := 0
for range ticker.C {
i++
b.Publish(fmt.Sprintf("tick %d", i))
}
}()
mux := http.NewServeMux()
mux.HandleFunc("/events", sseHandler(b))
srv := &http.Server{Addr: ":8080", Handler: mux, ReadHeaderTimeout: 5 * time.Second}
log.Printf("SSE server on http://localhost:8080/events")
if err := srv.ListenAndServe(); err != nil {
log.Fatalf("server failed: %v", err)
}
}How It Works
Server-Sent Events broadcaster that lets clients subscribe, keeps connections alive, and broadcasts messages over HTTP.
Maintains a registry of subscribers, writes SSE-formatted events over chunked responses, sends periodic comments as keep-alives, and removes clients when contexts cancel or writes fail.
Key Concepts
- 1SSE format enables unidirectional streaming over standard HTTP.
- 2Keep-alive comments prevent proxies from closing idle connections.
- 3Channel-driven broadcast loop fans out events to all clients.
When to Use This Pattern
- Live feed updates for dashboards or logs.
- Event streaming to browsers without WebSockets.
- Lightweight notifications as a long-polling replacement.
Best Practices
- Flush writers after each event to reduce latency.
- Cap subscriber queues to avoid memory growth.
- Serve over HTTPS to prevent intermediary buffering issues.
Go Version1.18+
Difficultyintermediate
Production ReadyYes
Lines of Code112