main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
package main

import (
	"fmt"
	"log"
	"net/http"
	"sync"
	"time"
)

const lf = byte(10)

type Broker struct {
	mu   sync.RWMutex
	next int
	subs map[int]chan string
}

func NewBroker() *Broker {
	return &Broker{subs: make(map[int]chan string)}
}

func (b *Broker) Subscribe(buffer int) (id int, ch <-chan string, unsubscribe func()) {
	b.mu.Lock()
	defer b.mu.Unlock()

	b.next++
	id = b.next
	c := make(chan string, buffer)
	b.subs[id] = c

	unsubscribe = func() {
		b.mu.Lock()
		if c2, ok := b.subs[id]; ok {
			delete(b.subs, id)
			close(c2)
		}
		b.mu.Unlock()
	}

	return id, c, unsubscribe
}

func (b *Broker) Publish(msg string) {
	b.mu.RLock()
	defer b.mu.RUnlock()
	for _, c := range b.subs {
		select {
		case c <- msg:
		default:
		}
	}
}

func sseHandler(b *Broker) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		flusher, ok := w.(http.Flusher)
		if !ok {
			http.Error(w, "streaming unsupported", http.StatusInternalServerError)
			return
		}

		w.Header().Set("Content-Type", "text/event-stream")
		w.Header().Set("Cache-Control", "no-cache")
		w.Header().Set("Connection", "keep-alive")

		_, ch, unsubscribe := b.Subscribe(16)
		defer unsubscribe()

		keepAlive := time.NewTicker(20 * time.Second)
		defer keepAlive.Stop()

		ctx := r.Context()
		for {
			select {
			case <-ctx.Done():
				return
			case <-keepAlive.C:
				fmt.Fprintf(w, ": ping%c%c", lf, lf)
				flusher.Flush()
			case msg, ok := <-ch:
				if !ok {
					return
				}
				fmt.Fprintf(w, "event: message%cdata: %s%c%c", lf, msg, lf, lf)
				flusher.Flush()
			}
		}
	}
}

func main() {
	b := NewBroker()
	go func() {
		ticker := time.NewTicker(2 * time.Second)
		defer ticker.Stop()
		i := 0
		for range ticker.C {
			i++
			b.Publish(fmt.Sprintf("tick %d", i))
		}
	}()

	mux := http.NewServeMux()
	mux.HandleFunc("/events", sseHandler(b))

	srv := &http.Server{Addr: ":8080", Handler: mux, ReadHeaderTimeout: 5 * time.Second}
	log.Printf("SSE server on http://localhost:8080/events")
	if err := srv.ListenAndServe(); err != nil {
		log.Fatalf("server failed: %v", err)
	}
}

How It Works

Server-Sent Events broadcaster that lets clients subscribe, keeps connections alive, and broadcasts messages over HTTP.

Maintains a registry of subscribers, writes SSE-formatted events over chunked responses, sends periodic comments as keep-alives, and removes clients when contexts cancel or writes fail.

Key Concepts

  • 1SSE format enables unidirectional streaming over standard HTTP.
  • 2Keep-alive comments prevent proxies from closing idle connections.
  • 3Channel-driven broadcast loop fans out events to all clients.

When to Use This Pattern

  • Live feed updates for dashboards or logs.
  • Event streaming to browsers without WebSockets.
  • Lightweight notifications as a long-polling replacement.

Best Practices

  • Flush writers after each event to reduce latency.
  • Cap subscriber queues to avoid memory growth.
  • Serve over HTTPS to prevent intermediary buffering issues.
Go Version1.18+
Difficultyintermediate
Production ReadyYes
Lines of Code112