Asynchronous Task Queue with Redis Backend
Redis-backed task queue with enqueue, worker pools, retries, delays, and dead-letter handling for failed jobs.
main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/go-redis/redis/v8"
)
type Task struct {
ID string `json:"id"`
Type string `json:"type"`
Payload map[string]interface{} `json:"payload,omitempty"`
Priority int `json:"priority"` // 0 = highest
RetryCount int `json:"retry_count"`
MaxRetries int `json:"max_retries"`
CreatedAt time.Time `json:"created_at"`
}
type TaskQueue struct {
rdb *redis.Client
prefix string
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.RWMutex
processors map[string]func(context.Context, Task) error
}
func NewTaskQueue(redisAddr, password string, db int, prefix string) *TaskQueue {
ctx, cancel := context.WithCancel(context.Background())
rdb := redis.NewClient(&redis.Options{Addr: redisAddr, Password: password, DB: db})
return &TaskQueue{
rdb: rdb,
prefix: prefix,
ctx: ctx,
cancel: cancel,
processors: make(map[string]func(context.Context, Task) error),
}
}
func (q *TaskQueue) readyKey() string { return fmt.Sprintf("%s:ready", q.prefix) }
func (q *TaskQueue) delayedKey() string { return fmt.Sprintf("%s:delayed", q.prefix) }
func (q *TaskQueue) deadKey() string { return fmt.Sprintf("%s:dead", q.prefix) }
func (q *TaskQueue) RegisterProcessor(taskType string, fn func(context.Context, Task) error) {
q.mu.Lock()
defer q.mu.Unlock()
q.processors[taskType] = fn
}
func (q *TaskQueue) Enqueue(task Task, delay time.Duration) error {
if task.Type == "" {
return fmt.Errorf("task.Type cannot be empty")
}
if task.ID == "" {
task.ID = fmt.Sprintf("task_%d_%d", rand.Int63(), time.Now().UnixNano())
}
if task.MaxRetries == 0 {
task.MaxRetries = 3
}
if task.CreatedAt.IsZero() {
task.CreatedAt = time.Now()
}
b, err := json.Marshal(task)
if err != nil {
return fmt.Errorf("marshal task: %w", err)
}
member := string(b)
if delay > 0 {
score := float64(time.Now().Add(delay).Unix())
return q.rdb.ZAdd(q.ctx, q.delayedKey(), &redis.Z{Score: score, Member: member}).Err()
}
return q.rdb.ZAdd(q.ctx, q.readyKey(), &redis.Z{Score: float64(task.Priority), Member: member}).Err()
}
func asString(v interface{}) (string, bool) {
switch t := v.(type) {
case string:
return t, true
case []byte:
return string(t), true
default:
return "", false
}
}
func (q *TaskQueue) promoteDelayed(limit int64) {
now := float64(time.Now().Unix())
res, err := q.rdb.ZRangeByScoreWithScores(q.ctx, q.delayedKey(), &redis.ZRangeBy{Min: "0", Max: fmt.Sprintf("%f", now), Offset: 0, Count: limit}).Result()
if err != nil {
log.Printf("promote delayed: %v", err)
return
}
for _, z := range res {
member, ok := asString(z.Member)
if !ok {
continue
}
_ = q.rdb.ZRem(q.ctx, q.delayedKey(), member).Err()
_ = q.rdb.ZAdd(q.ctx, q.readyKey(), &redis.Z{Score: 0, Member: member}).Err()
}
}
func (q *TaskQueue) popReady() (Task, bool) {
res, err := q.rdb.ZPopMin(q.ctx, q.readyKey(), 1).Result()
if err != nil {
if errors.Is(err, context.Canceled) {
return Task{}, false
}
log.Printf("pop ready: %v", err)
return Task{}, false
}
if len(res) == 0 {
return Task{}, false
}
member, ok := asString(res[0].Member)
if !ok {
return Task{}, false
}
var task Task
if err := json.Unmarshal([]byte(member), &task); err != nil {
log.Printf("unmarshal task: %v", err)
return Task{}, false
}
return task, true
}
func (q *TaskQueue) handleFailure(task Task, cause error) {
task.RetryCount++
if task.RetryCount <= task.MaxRetries {
backoff := time.Duration(task.RetryCount) * time.Second
log.Printf("task %s failed (%v); retry %d/%d in %v", task.ID, cause, task.RetryCount, task.MaxRetries, backoff)
_ = q.Enqueue(task, backoff)
return
}
log.Printf("task %s exceeded retries; sending to dead-letter", task.ID)
record := map[string]interface{}{
"task": task,
"error": cause.Error(),
"time": time.Now().Format(time.RFC3339Nano),
}
b, _ := json.Marshal(record)
_ = q.rdb.LPush(q.ctx, q.deadKey(), string(b)).Err()
}
func (q *TaskQueue) worker(id int) {
defer q.wg.Done()
log.Printf("worker %d started", id)
for {
select {
case <-q.ctx.Done():
log.Printf("worker %d stopped", id)
return
default:
}
q.promoteDelayed(25)
task, ok := q.popReady()
if !ok {
time.Sleep(250 * time.Millisecond)
continue
}
q.mu.RLock()
fn := q.processors[task.Type]
q.mu.RUnlock()
if fn == nil {
q.handleFailure(task, fmt.Errorf("no processor for type=%s", task.Type))
continue
}
if err := fn(q.ctx, task); err != nil {
q.handleFailure(task, err)
continue
}
log.Printf("task %s done", task.ID)
}
}
func (q *TaskQueue) StartWorkers(n int) {
for i := 0; i < n; i++ {
q.wg.Add(1)
go q.worker(i + 1)
}
}
func (q *TaskQueue) Stop() {
q.cancel()
q.wg.Wait()
_ = q.rdb.Close()
}
func main() {
queue := NewTaskQueue("localhost:6379", "", 0, "myapp:tasks")
defer queue.Stop()
queue.RegisterProcessor("email.send", func(ctx context.Context, task Task) error {
to, _ := task.Payload["to"].(string)
subject, _ := task.Payload["subject"].(string)
log.Printf("sending email to=%s subject=%s", to, subject)
return nil
})
queue.StartWorkers(3)
_ = queue.Enqueue(Task{Type: "email.send", Payload: map[string]interface{}{"to": "[email protected]", "subject": "Welcome"}, Priority: 0}, 2*time.Second)
_ = queue.Enqueue(Task{Type: "email.send", Payload: map[string]interface{}{"to": "[email protected]", "subject": "Daily report"}, Priority: 1}, 0)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
log.Println("shutting down")
}
How It Works
Redis-backed task queue with enqueue, worker pools, retries, delays, and dead-letter handling for failed jobs.
Serializes tasks to JSON, pushes them into Redis lists, workers BLPOP tasks, executes handlers, requeues with backoff on failure, and moves irrecoverable tasks to a dead-letter list; graceful shutdown cancels workers via context.
Key Concepts
- 1Blocking pop avoids busy-waiting while waiting for tasks.
- 2Retry metadata controls attempt counts and backoff.
- 3Dead-letter queue captures permanently failing jobs for inspection.
When to Use This Pattern
- Background job processing for web applications such as emails or webhooks.
- Offloading expensive work from HTTP request paths.
- Coordinating distributed workers in microservices.
Best Practices
- Keep task payloads small and versioned for compatibility.
- Instrument success and failure counts to tune retry policies.
- Ensure idempotent handlers so retries do not duplicate effects.
Go Version1.16
Difficultyadvanced
Production ReadyYes
Lines of Code231