Context-Cancelable Worker Pool with First-Error Cancellation
Worker pool that processes jobs with bounded concurrency and cancels remaining work on the first failure.
main.go
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
package main
import (
"context"
"errors"
"fmt"
"log"
"sync"
"time"
)
type Job func(context.Context) error
func runWorkerPool(ctx context.Context, workerCount int, jobs []Job) error {
if workerCount < 1 {
return fmt.Errorf("workerCount must be >= 1")
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
jobCh := make(chan Job)
errCh := make(chan error, 1)
var wg sync.WaitGroup
worker := func(id int) {
defer wg.Done()
for job := range jobCh {
if ctx.Err() != nil {
return
}
if err := job(ctx); err != nil {
select {
case errCh <- err:
default:
}
cancel()
return
}
}
}
wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go worker(i + 1)
}
go func() {
defer close(jobCh)
for _, job := range jobs {
select {
case <-ctx.Done():
return
case jobCh <- job:
}
}
}()
wg.Wait()
close(errCh)
if err := <-errCh; err != nil {
return err
}
return ctx.Err()
}
func main() {
jobs := []Job{
func(ctx context.Context) error {
time.Sleep(150 * time.Millisecond)
log.Printf("job 1 done")
return nil
},
func(ctx context.Context) error {
time.Sleep(200 * time.Millisecond)
return errors.New("job 2 failed")
},
func(ctx context.Context) error {
time.Sleep(500 * time.Millisecond)
log.Printf("job 3 done")
return nil
},
}
err := runWorkerPool(context.Background(), 3, jobs)
if err != nil {
log.Printf("worker pool finished with error: %v", err)
return
}
log.Printf("all jobs finished successfully")
}How It Works
Worker pool that processes jobs with bounded concurrency and cancels remaining work on the first failure.
Spins up worker goroutines tied to a shared context, feeds jobs over a channel, on the first error cancels the context, stops dispatching, waits for workers, and returns the encountered error.
Key Concepts
- 1Context cancellation propagates stop signals to all workers.
- 2WaitGroup and channel closing ensure workers exit cleanly.
- 3Collects the first error to fail fast instead of hanging.
When to Use This Pattern
- Parallel tasks where any failure invalidates the overall result.
- Batch processing with strict all-or-nothing semantics.
- Testing frameworks that should stop on first failure.
Best Practices
- Make worker functions idempotent in case they are interrupted.
- Ensure producers stop sending jobs after cancel to avoid panics.
- Buffer job channels if producing faster than consumption.
Go Version1.18+
Difficultyintermediate
Production ReadyYes
Lines of Code91