main.go
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
package main

import (
	"fmt"
	"log"
	"sync"
	"time"
)

type Topic struct {
	mu   sync.RWMutex
	next int
	subs map[int]chan string
}

func NewTopic() *Topic {
	return &Topic{subs: make(map[int]chan string)}
}

func (t *Topic) Subscribe(buffer int) (ch <-chan string, unsubscribe func()) {
	t.mu.Lock()
	defer t.mu.Unlock()

	t.next++
	id := t.next
	c := make(chan string, buffer)
	t.subs[id] = c

	unsubscribe = func() {
		t.mu.Lock()
		if c2, ok := t.subs[id]; ok {
			delete(t.subs, id)
			close(c2)
		}
		t.mu.Unlock()
	}
	return c, unsubscribe
}

func (t *Topic) Publish(msg string) {
	t.mu.RLock()
	defer t.mu.RUnlock()
	for _, c := range t.subs {
		select {
		case c <- msg:
		default:
		}
	}
}

func main() {
	topic := NewTopic()
	ch, unsub := topic.Subscribe(2)
	defer unsub()

	go func() {
		for msg := range ch {
			log.Printf("got: %s", msg)
			time.Sleep(300 * time.Millisecond)
		}
	}()

	for i := 1; i <= 10; i++ {
		topic.Publish(fmt.Sprintf("msg-%d", i))
		time.Sleep(100 * time.Millisecond)
	}
}

How It Works

Lightweight in-process pub/sub topic with subscriber management, non-blocking publishes, and explicit unsubscribe support.

Topic keeps a map of subscribers to channels, publish iterates and sends with select to drop messages for slow subscribers, unsubscribe closes channels and removes entries, and a stop function cleans everything up.

Key Concepts

  • 1Non-blocking sends prevent one slow consumer from blocking others.
  • 2Unsubscribe support avoids leaks when consumers leave.
  • 3Backpressure strategy of dropping when busy is explicit.

When to Use This Pattern

  • Decoupling components inside a single service.
  • Testing event-driven flows without external brokers.
  • UI or CLI event buses for progress updates.

Best Practices

  • Document drop behavior so consumers know they may miss events.
  • Buffer subscriber channels based on expected throughput.
  • Close topics cleanly to avoid goroutine leaks.
Go Version1.18+
Difficultyintermediate
Production ReadyYes
Lines of Code67