Bounded Parallel Map with Generics and Ordered Results
Parallel map helper that limits concurrency, preserves input order, and aborts on the first error.
main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
)
func parallelMap[T any, R any](
ctx context.Context,
items []T,
concurrency int,
fn func(context.Context, T) (R, error),
) ([]R, error) {
if concurrency < 1 {
return nil, fmt.Errorf("concurrency must be >= 1")
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
type result struct {
index int
value R
err error
}
sem := make(chan struct{}, concurrency)
out := make(chan result, len(items))
var wg sync.WaitGroup
for i := range items {
i := i
sem <- struct{}{}
wg.Add(1)
go func() {
defer wg.Done()
defer func() { <-sem }()
var zero R
if ctx.Err() != nil {
out <- result{index: i, value: zero, err: ctx.Err()}
return
}
v, err := fn(ctx, items[i])
out <- result{index: i, value: v, err: err}
if err != nil {
cancel()
}
}()
}
go func() {
wg.Wait()
close(out)
}()
results := make([]R, len(items))
for r := range out {
if r.err != nil {
return nil, r.err
}
results[r.index] = r.value
}
return results, nil
}
func main() {
items := []int{1, 2, 3, 4, 5}
res, err := parallelMap(context.Background(), items, 2, func(ctx context.Context, n int) (string, error) {
time.Sleep(time.Duration(100+n*20) * time.Millisecond)
return fmt.Sprintf("num=%d", n), nil
})
if err != nil {
log.Fatalf("parallel map failed: %v", err)
}
log.Printf("results: %v", res)
}How It Works
Parallel map helper that limits concurrency, preserves input order, and aborts on the first error.
Creates worker goroutines processing indexed tasks, writes results into a pre-sized slice by index, tracks errors via a channel, and cancels remaining work with context when a failure occurs.
Key Concepts
- 1Bounded worker count prevents overwhelming resources.
- 2Maintains deterministic output ordering despite parallel execution.
- 3First error cancels outstanding tasks to save time.
When to Use This Pattern
- Transforming large slices where CPU-bound work can be parallelized.
- Fetching multiple resources while preserving response ordering.
- ETL steps that must short-circuit on corrupt data.
Best Practices
- Size the worker pool relative to CPU and I/O characteristics.
- Protect shared resources inside the map function if needed.
- Drain result and error channels to avoid goroutine leaks on cancel.
Go Version1.18+
Difficultyadvanced
Production ReadyYes
Lines of Code81