In-Memory Pub/Sub with Unsubscribe and Backpressure
Lightweight in-process pub/sub topic with subscriber management, non-blocking publishes, and explicit unsubscribe support.
main.go
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
package main
import (
"fmt"
"log"
"sync"
"time"
)
type Topic struct {
mu sync.RWMutex
next int
subs map[int]chan string
}
func NewTopic() *Topic {
return &Topic{subs: make(map[int]chan string)}
}
func (t *Topic) Subscribe(buffer int) (ch <-chan string, unsubscribe func()) {
t.mu.Lock()
defer t.mu.Unlock()
t.next++
id := t.next
c := make(chan string, buffer)
t.subs[id] = c
unsubscribe = func() {
t.mu.Lock()
if c2, ok := t.subs[id]; ok {
delete(t.subs, id)
close(c2)
}
t.mu.Unlock()
}
return c, unsubscribe
}
func (t *Topic) Publish(msg string) {
t.mu.RLock()
defer t.mu.RUnlock()
for _, c := range t.subs {
select {
case c <- msg:
default:
}
}
}
func main() {
topic := NewTopic()
ch, unsub := topic.Subscribe(2)
defer unsub()
go func() {
for msg := range ch {
log.Printf("got: %s", msg)
time.Sleep(300 * time.Millisecond)
}
}()
for i := 1; i <= 10; i++ {
topic.Publish(fmt.Sprintf("msg-%d", i))
time.Sleep(100 * time.Millisecond)
}
}How It Works
Lightweight in-process pub/sub topic with subscriber management, non-blocking publishes, and explicit unsubscribe support.
Topic keeps a map of subscribers to channels, publish iterates and sends with select to drop messages for slow subscribers, unsubscribe closes channels and removes entries, and a stop function cleans everything up.
Key Concepts
- 1Non-blocking sends prevent one slow consumer from blocking others.
- 2Unsubscribe support avoids leaks when consumers leave.
- 3Backpressure strategy of dropping when busy is explicit.
When to Use This Pattern
- Decoupling components inside a single service.
- Testing event-driven flows without external brokers.
- UI or CLI event buses for progress updates.
Best Practices
- Document drop behavior so consumers know they may miss events.
- Buffer subscriber channels based on expected throughput.
- Close topics cleanly to avoid goroutine leaks.
Go Version1.18+
Difficultyintermediate
Production ReadyYes
Lines of Code67