main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"
)

func parallelMap[T any, R any](
	ctx context.Context,
	items []T,
	concurrency int,
	fn func(context.Context, T) (R, error),
) ([]R, error) {
	if concurrency < 1 {
		return nil, fmt.Errorf("concurrency must be >= 1")
	}
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	type result struct {
		index int
		value R
		err   error
	}

	sem := make(chan struct{}, concurrency)
	out := make(chan result, len(items))

	var wg sync.WaitGroup
	for i := range items {
		i := i
		sem <- struct{}{}
		wg.Add(1)
		go func() {
			defer wg.Done()
			defer func() { <-sem }()

			var zero R
			if ctx.Err() != nil {
				out <- result{index: i, value: zero, err: ctx.Err()}
				return
			}

			v, err := fn(ctx, items[i])
			out <- result{index: i, value: v, err: err}
			if err != nil {
				cancel()
			}
		}()
	}

	go func() {
		wg.Wait()
		close(out)
	}()

	results := make([]R, len(items))
	for r := range out {
		if r.err != nil {
			return nil, r.err
		}
		results[r.index] = r.value
	}
	return results, nil
}

func main() {
	items := []int{1, 2, 3, 4, 5}

	res, err := parallelMap(context.Background(), items, 2, func(ctx context.Context, n int) (string, error) {
		time.Sleep(time.Duration(100+n*20) * time.Millisecond)
		return fmt.Sprintf("num=%d", n), nil
	})
	if err != nil {
		log.Fatalf("parallel map failed: %v", err)
	}
	log.Printf("results: %v", res)
}

How It Works

Parallel map helper that limits concurrency, preserves input order, and aborts on the first error.

Creates worker goroutines processing indexed tasks, writes results into a pre-sized slice by index, tracks errors via a channel, and cancels remaining work with context when a failure occurs.

Key Concepts

  • 1Bounded worker count prevents overwhelming resources.
  • 2Maintains deterministic output ordering despite parallel execution.
  • 3First error cancels outstanding tasks to save time.

When to Use This Pattern

  • Transforming large slices where CPU-bound work can be parallelized.
  • Fetching multiple resources while preserving response ordering.
  • ETL steps that must short-circuit on corrupt data.

Best Practices

  • Size the worker pool relative to CPU and I/O characteristics.
  • Protect shared resources inside the map function if needed.
  • Drain result and error channels to avoid goroutine leaks on cancel.
Go Version1.18+
Difficultyadvanced
Production ReadyYes
Lines of Code81