main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
package main

import (
	"encoding/csv"
	"fmt"
	"log"
	"os"
	"strconv"
	"sync"
	"time"
)

// CSVRecord represents a single CSV record with processing metadata
type CSVRecord struct {
	RowIndex int      // Original row index (for error reporting)
	Fields   []string // CSV fields
	Error    error    // Processing error (if any)
}

// CSVProcessor handles concurrent reading, processing, and writing of CSV data
type CSVProcessor struct {
	inputPath    string
	outputPath   string
	workerCount  int
	batchSize    int
	progressChan chan int
	wg           sync.WaitGroup
}

// NewCSVProcessor creates a new processor instance
func NewCSVProcessor(inputPath, outputPath string, workerCount, batchSize int) *CSVProcessor {
	return &CSVProcessor{
		inputPath:    inputPath,
		outputPath:   outputPath,
		workerCount:  workerCount,
		batchSize:    batchSize,
		progressChan: make(chan int, 100),
	}
}

// processRecord processes a single CSV record (custom logic here)
func (p *CSVProcessor) processRecord(record *CSVRecord) {
	// Example processing: validate numeric fields and compute a new value
	if len(record.Fields) < 3 {
		record.Error = fmt.Errorf("insufficient fields (expected 3+, got %d)", len(record.Fields))
		return
	}

	// Parse numeric fields
	value1, err := strconv.ParseFloat(record.Fields[1], 64)
	if err != nil {
		record.Error = fmt.Errorf("field 2 is not a number: %v", err)
		return
	}

	value2, err := strconv.ParseFloat(record.Fields[2], 64)
	if err != nil {
		record.Error = fmt.Errorf("field 3 is not a number: %v", err)
		return
	}

	// Compute new field: sum of values
	sum := value1 + value2
	record.Fields = append(record.Fields, strconv.FormatFloat(sum, 'f', 2, 64))
}

// worker reads records from input channel, processes them, and sends to output channel
func (p *CSVProcessor) worker(id int, inputChan <-chan *CSVRecord, outputChan chan<- *CSVRecord) {
	defer p.wg.Done()
	log.Printf("Worker %d started", id)

	for record := range inputChan {
		p.processRecord(record)
		outputChan <- record
		p.progressChan <- 1 // Report progress
	}

	log.Printf("Worker %d finished", id)
}

// readCSV reads input CSV and sends records to worker channel
func (p *CSVProcessor) readCSV(inputChan chan<- *CSVRecord) error {
	file, err := os.Open(p.inputPath)
	if err != nil {
		return fmt.Errorf("failed to open input file: %v", err)
	}
	defer file.Close()

	reader := csv.NewReader(file)
	reader.FieldsPerRecord = -1 // Allow variable fields per record

	// Skip header row (adjust if your CSV has no header)
	header, err := reader.Read()
	if err != nil {
		return fmt.Errorf("failed to read header: %v", err)
	}
	// Add new column for computed sum
	header = append(header, "sum")
	// Send header to output channel first
	inputChan <- &CSVRecord{RowIndex: 0, Fields: header}

	// Read data rows
	rowIndex := 1
	for {
		fields, err := reader.Read()
		if err != nil {
			if err.Error() == "EOF" {
				break
			}
			return fmt.Errorf("failed to read row %d: %v", rowIndex, err)
		}

		inputChan <- &CSVRecord{RowIndex: rowIndex, Fields: fields}
		rowIndex++
	}

	close(inputChan)
	log.Printf("Read %d rows from input CSV", rowIndex-1)
	return nil
}

// writeCSV writes processed records to output CSV
func (p *CSVProcessor) writeCSV(outputChan <-chan *CSVRecord) error {
	file, err := os.Create(p.outputPath)
	if err != nil {
		return fmt.Errorf("failed to create output file: %v", err)
	}
	defer file.Close()

	writer := csv.NewWriter(file)
	defer writer.Flush()

	errorCount := 0
	for record := range outputChan {
		if record.Error != nil {
			log.Printf("Row %d processing error: %v", record.RowIndex, record.Error)
			errorCount++
			continue
		}

		if err := writer.Write(record.Fields); err != nil {
			return fmt.Errorf("failed to write row %d: %v", record.RowIndex, err)
		}
	}

	log.Printf("Wrote output CSV with %d processing errors", errorCount)
	return nil
}

// Run executes the concurrent CSV processing pipeline
func (p *CSVProcessor) Run() error {
	startTime := time.Now()

	// Create channels (buffered to reduce blocking)
	inputChan := make(chan *CSVRecord, p.batchSize)
	outputChan := make(chan *CSVRecord, p.batchSize)

	// Start progress tracker goroutine
	go func() {
		processed := 0
		for range p.progressChan {
			processed++
			if processed%1000 == 0 {
				log.Printf("Processed %d records", processed)
			}
		}
	}()

	// Start workers
	for i := 0; i < p.workerCount; i++ {
		p.wg.Add(1)
		go p.worker(i+1, inputChan, outputChan)
	}

	// Start CSV reader (runs in goroutine to avoid blocking)
	readErrChan := make(chan error, 1)
	go func() {
		readErrChan <- p.readCSV(inputChan)
	}()

	// Wait for workers to finish, then close output channel
	go func() {
		p.wg.Wait()
		close(outputChan)
		close(p.progressChan)
	}()

	// Check for read errors
	if err := <-readErrChan; err != nil {
		return err
	}

	// Write output CSV
	if err := p.writeCSV(outputChan); err != nil {
		return err
	}

	duration := time.Since(startTime)
	log.Printf("Processing completed in %v", duration)
	return nil
}

// Example Usage
func main() {
	if len(os.Args) < 3 {
		log.Fatal("Usage: go run main.go <input.csv> <output.csv>")
	}

	inputPath := os.Args[1]
	outputPath := os.Args[2]

	// Configure processor: 4 workers, batch size 100 (adjust based on your CPU)
	processor := NewCSVProcessor(inputPath, outputPath, 4, 100)

	if err := processor.Run(); err != nil {
		log.Fatalf("CSV processing failed: %v", err)
	}

	log.Println("CSV processing succeeded!")
}

How It Works

High-throughput CSV pipeline that streams records, processes them with worker goroutines, and writes transformed rows concurrently.

Opens input and output files, spawns workers reading from a jobs channel, processes each record with rate limiting and context, collects results and errors, and reports progress while ensuring files and goroutines close via wait groups.

Key Concepts

  • 1Worker pool decouples CSV reading from processing throughput.
  • 2Buffered channels and wait groups coordinate backpressure and completion.
  • 3Error channel collects issues without stopping the pipeline immediately.

When to Use This Pattern

  • ETL tasks converting large CSV datasets.
  • Data cleanup or enrichment jobs running on batch files.
  • Migration scripts where performance matters but memory is limited.

Best Practices

  • Tune worker counts based on CPU versus I/O bound workloads.
  • Validate and sanitize each record before writing output.
  • Propagate context and timeouts to stop gracefully on cancellations.
Go Version1.14
Difficultyintermediate
Production ReadyYes
Lines of Code220