main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"log"
	"math/rand"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/go-redis/redis/v8"
)

type Task struct {
	ID         string                 `json:"id"`
	Type       string                 `json:"type"`
	Payload    map[string]interface{} `json:"payload,omitempty"`
	Priority   int                    `json:"priority"`    // 0 = highest
	RetryCount int                    `json:"retry_count"`
	MaxRetries int                    `json:"max_retries"`
	CreatedAt  time.Time              `json:"created_at"`
}

type TaskQueue struct {
	rdb    *redis.Client
	prefix string

	ctx    context.Context
	cancel context.CancelFunc
	wg     sync.WaitGroup

	mu         sync.RWMutex
	processors map[string]func(context.Context, Task) error
}

func NewTaskQueue(redisAddr, password string, db int, prefix string) *TaskQueue {
	ctx, cancel := context.WithCancel(context.Background())
	rdb := redis.NewClient(&redis.Options{Addr: redisAddr, Password: password, DB: db})
	return &TaskQueue{
		rdb:        rdb,
		prefix:     prefix,
		ctx:        ctx,
		cancel:     cancel,
		processors: make(map[string]func(context.Context, Task) error),
	}
}

func (q *TaskQueue) readyKey() string   { return fmt.Sprintf("%s:ready", q.prefix) }
func (q *TaskQueue) delayedKey() string { return fmt.Sprintf("%s:delayed", q.prefix) }
func (q *TaskQueue) deadKey() string    { return fmt.Sprintf("%s:dead", q.prefix) }

func (q *TaskQueue) RegisterProcessor(taskType string, fn func(context.Context, Task) error) {
	q.mu.Lock()
	defer q.mu.Unlock()
	q.processors[taskType] = fn
}

func (q *TaskQueue) Enqueue(task Task, delay time.Duration) error {
	if task.Type == "" {
		return fmt.Errorf("task.Type cannot be empty")
	}
	if task.ID == "" {
		task.ID = fmt.Sprintf("task_%d_%d", rand.Int63(), time.Now().UnixNano())
	}
	if task.MaxRetries == 0 {
		task.MaxRetries = 3
	}
	if task.CreatedAt.IsZero() {
		task.CreatedAt = time.Now()
	}

	b, err := json.Marshal(task)
	if err != nil {
		return fmt.Errorf("marshal task: %w", err)
	}
	member := string(b)

	if delay > 0 {
		score := float64(time.Now().Add(delay).Unix())
		return q.rdb.ZAdd(q.ctx, q.delayedKey(), &redis.Z{Score: score, Member: member}).Err()
	}
	return q.rdb.ZAdd(q.ctx, q.readyKey(), &redis.Z{Score: float64(task.Priority), Member: member}).Err()
}

func asString(v interface{}) (string, bool) {
	switch t := v.(type) {
	case string:
		return t, true
	case []byte:
		return string(t), true
	default:
		return "", false
	}
}

func (q *TaskQueue) promoteDelayed(limit int64) {
	now := float64(time.Now().Unix())
	res, err := q.rdb.ZRangeByScoreWithScores(q.ctx, q.delayedKey(), &redis.ZRangeBy{Min: "0", Max: fmt.Sprintf("%f", now), Offset: 0, Count: limit}).Result()
	if err != nil {
		log.Printf("promote delayed: %v", err)
		return
	}
	for _, z := range res {
		member, ok := asString(z.Member)
		if !ok {
			continue
		}
		_ = q.rdb.ZRem(q.ctx, q.delayedKey(), member).Err()
		_ = q.rdb.ZAdd(q.ctx, q.readyKey(), &redis.Z{Score: 0, Member: member}).Err()
	}
}

func (q *TaskQueue) popReady() (Task, bool) {
	res, err := q.rdb.ZPopMin(q.ctx, q.readyKey(), 1).Result()
	if err != nil {
		if errors.Is(err, context.Canceled) {
			return Task{}, false
		}
		log.Printf("pop ready: %v", err)
		return Task{}, false
	}
	if len(res) == 0 {
		return Task{}, false
	}
	member, ok := asString(res[0].Member)
	if !ok {
		return Task{}, false
	}
	var task Task
	if err := json.Unmarshal([]byte(member), &task); err != nil {
		log.Printf("unmarshal task: %v", err)
		return Task{}, false
	}
	return task, true
}

func (q *TaskQueue) handleFailure(task Task, cause error) {
	task.RetryCount++
	if task.RetryCount <= task.MaxRetries {
		backoff := time.Duration(task.RetryCount) * time.Second
		log.Printf("task %s failed (%v); retry %d/%d in %v", task.ID, cause, task.RetryCount, task.MaxRetries, backoff)
		_ = q.Enqueue(task, backoff)
		return
	}

	log.Printf("task %s exceeded retries; sending to dead-letter", task.ID)
	record := map[string]interface{}{
		"task":  task,
		"error": cause.Error(),
		"time":  time.Now().Format(time.RFC3339Nano),
	}
	b, _ := json.Marshal(record)
	_ = q.rdb.LPush(q.ctx, q.deadKey(), string(b)).Err()
}

func (q *TaskQueue) worker(id int) {
	defer q.wg.Done()
	log.Printf("worker %d started", id)

	for {
		select {
		case <-q.ctx.Done():
			log.Printf("worker %d stopped", id)
			return
		default:
		}

		q.promoteDelayed(25)

		task, ok := q.popReady()
		if !ok {
			time.Sleep(250 * time.Millisecond)
			continue
		}

		q.mu.RLock()
		fn := q.processors[task.Type]
		q.mu.RUnlock()
		if fn == nil {
			q.handleFailure(task, fmt.Errorf("no processor for type=%s", task.Type))
			continue
		}

		if err := fn(q.ctx, task); err != nil {
			q.handleFailure(task, err)
			continue
		}
		log.Printf("task %s done", task.ID)
	}
}

func (q *TaskQueue) StartWorkers(n int) {
	for i := 0; i < n; i++ {
		q.wg.Add(1)
		go q.worker(i + 1)
	}
}

func (q *TaskQueue) Stop() {
	q.cancel()
	q.wg.Wait()
	_ = q.rdb.Close()
}

func main() {
	queue := NewTaskQueue("localhost:6379", "", 0, "myapp:tasks")
	defer queue.Stop()

	queue.RegisterProcessor("email.send", func(ctx context.Context, task Task) error {
		to, _ := task.Payload["to"].(string)
		subject, _ := task.Payload["subject"].(string)
		log.Printf("sending email to=%s subject=%s", to, subject)
		return nil
	})

	queue.StartWorkers(3)

	_ = queue.Enqueue(Task{Type: "email.send", Payload: map[string]interface{}{"to": "[email protected]", "subject": "Welcome"}, Priority: 0}, 2*time.Second)
	_ = queue.Enqueue(Task{Type: "email.send", Payload: map[string]interface{}{"to": "[email protected]", "subject": "Daily report"}, Priority: 1}, 0)

	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
	<-sigCh
	log.Println("shutting down")
}

How It Works

Redis-backed task queue with enqueue, worker pools, retries, delays, and dead-letter handling for failed jobs.

Serializes tasks to JSON, pushes them into Redis lists, workers BLPOP tasks, executes handlers, requeues with backoff on failure, and moves irrecoverable tasks to a dead-letter list; graceful shutdown cancels workers via context.

Key Concepts

  • 1Blocking pop avoids busy-waiting while waiting for tasks.
  • 2Retry metadata controls attempt counts and backoff.
  • 3Dead-letter queue captures permanently failing jobs for inspection.

When to Use This Pattern

  • Background job processing for web applications such as emails or webhooks.
  • Offloading expensive work from HTTP request paths.
  • Coordinating distributed workers in microservices.

Best Practices

  • Keep task payloads small and versioned for compatibility.
  • Instrument success and failure counts to tune retry policies.
  • Ensure idempotent handlers so retries do not duplicate effects.
Go Version1.16
Difficultyadvanced
Production ReadyYes
Lines of Code231