CSV Data Processing with Concurrency
High-throughput CSV pipeline that streams records, processes them with worker goroutines, and writes transformed rows concurrently.
main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
package main
import (
"encoding/csv"
"fmt"
"log"
"os"
"strconv"
"sync"
"time"
)
// CSVRecord represents a single CSV record with processing metadata
type CSVRecord struct {
RowIndex int // Original row index (for error reporting)
Fields []string // CSV fields
Error error // Processing error (if any)
}
// CSVProcessor handles concurrent reading, processing, and writing of CSV data
type CSVProcessor struct {
inputPath string
outputPath string
workerCount int
batchSize int
progressChan chan int
wg sync.WaitGroup
}
// NewCSVProcessor creates a new processor instance
func NewCSVProcessor(inputPath, outputPath string, workerCount, batchSize int) *CSVProcessor {
return &CSVProcessor{
inputPath: inputPath,
outputPath: outputPath,
workerCount: workerCount,
batchSize: batchSize,
progressChan: make(chan int, 100),
}
}
// processRecord processes a single CSV record (custom logic here)
func (p *CSVProcessor) processRecord(record *CSVRecord) {
// Example processing: validate numeric fields and compute a new value
if len(record.Fields) < 3 {
record.Error = fmt.Errorf("insufficient fields (expected 3+, got %d)", len(record.Fields))
return
}
// Parse numeric fields
value1, err := strconv.ParseFloat(record.Fields[1], 64)
if err != nil {
record.Error = fmt.Errorf("field 2 is not a number: %v", err)
return
}
value2, err := strconv.ParseFloat(record.Fields[2], 64)
if err != nil {
record.Error = fmt.Errorf("field 3 is not a number: %v", err)
return
}
// Compute new field: sum of values
sum := value1 + value2
record.Fields = append(record.Fields, strconv.FormatFloat(sum, 'f', 2, 64))
}
// worker reads records from input channel, processes them, and sends to output channel
func (p *CSVProcessor) worker(id int, inputChan <-chan *CSVRecord, outputChan chan<- *CSVRecord) {
defer p.wg.Done()
log.Printf("Worker %d started", id)
for record := range inputChan {
p.processRecord(record)
outputChan <- record
p.progressChan <- 1 // Report progress
}
log.Printf("Worker %d finished", id)
}
// readCSV reads input CSV and sends records to worker channel
func (p *CSVProcessor) readCSV(inputChan chan<- *CSVRecord) error {
file, err := os.Open(p.inputPath)
if err != nil {
return fmt.Errorf("failed to open input file: %v", err)
}
defer file.Close()
reader := csv.NewReader(file)
reader.FieldsPerRecord = -1 // Allow variable fields per record
// Skip header row (adjust if your CSV has no header)
header, err := reader.Read()
if err != nil {
return fmt.Errorf("failed to read header: %v", err)
}
// Add new column for computed sum
header = append(header, "sum")
// Send header to output channel first
inputChan <- &CSVRecord{RowIndex: 0, Fields: header}
// Read data rows
rowIndex := 1
for {
fields, err := reader.Read()
if err != nil {
if err.Error() == "EOF" {
break
}
return fmt.Errorf("failed to read row %d: %v", rowIndex, err)
}
inputChan <- &CSVRecord{RowIndex: rowIndex, Fields: fields}
rowIndex++
}
close(inputChan)
log.Printf("Read %d rows from input CSV", rowIndex-1)
return nil
}
// writeCSV writes processed records to output CSV
func (p *CSVProcessor) writeCSV(outputChan <-chan *CSVRecord) error {
file, err := os.Create(p.outputPath)
if err != nil {
return fmt.Errorf("failed to create output file: %v", err)
}
defer file.Close()
writer := csv.NewWriter(file)
defer writer.Flush()
errorCount := 0
for record := range outputChan {
if record.Error != nil {
log.Printf("Row %d processing error: %v", record.RowIndex, record.Error)
errorCount++
continue
}
if err := writer.Write(record.Fields); err != nil {
return fmt.Errorf("failed to write row %d: %v", record.RowIndex, err)
}
}
log.Printf("Wrote output CSV with %d processing errors", errorCount)
return nil
}
// Run executes the concurrent CSV processing pipeline
func (p *CSVProcessor) Run() error {
startTime := time.Now()
// Create channels (buffered to reduce blocking)
inputChan := make(chan *CSVRecord, p.batchSize)
outputChan := make(chan *CSVRecord, p.batchSize)
// Start progress tracker goroutine
go func() {
processed := 0
for range p.progressChan {
processed++
if processed%1000 == 0 {
log.Printf("Processed %d records", processed)
}
}
}()
// Start workers
for i := 0; i < p.workerCount; i++ {
p.wg.Add(1)
go p.worker(i+1, inputChan, outputChan)
}
// Start CSV reader (runs in goroutine to avoid blocking)
readErrChan := make(chan error, 1)
go func() {
readErrChan <- p.readCSV(inputChan)
}()
// Wait for workers to finish, then close output channel
go func() {
p.wg.Wait()
close(outputChan)
close(p.progressChan)
}()
// Check for read errors
if err := <-readErrChan; err != nil {
return err
}
// Write output CSV
if err := p.writeCSV(outputChan); err != nil {
return err
}
duration := time.Since(startTime)
log.Printf("Processing completed in %v", duration)
return nil
}
// Example Usage
func main() {
if len(os.Args) < 3 {
log.Fatal("Usage: go run main.go <input.csv> <output.csv>")
}
inputPath := os.Args[1]
outputPath := os.Args[2]
// Configure processor: 4 workers, batch size 100 (adjust based on your CPU)
processor := NewCSVProcessor(inputPath, outputPath, 4, 100)
if err := processor.Run(); err != nil {
log.Fatalf("CSV processing failed: %v", err)
}
log.Println("CSV processing succeeded!")
}How It Works
High-throughput CSV pipeline that streams records, processes them with worker goroutines, and writes transformed rows concurrently.
Opens input and output files, spawns workers reading from a jobs channel, processes each record with rate limiting and context, collects results and errors, and reports progress while ensuring files and goroutines close via wait groups.
Key Concepts
- 1Worker pool decouples CSV reading from processing throughput.
- 2Buffered channels and wait groups coordinate backpressure and completion.
- 3Error channel collects issues without stopping the pipeline immediately.
When to Use This Pattern
- ETL tasks converting large CSV datasets.
- Data cleanup or enrichment jobs running on batch files.
- Migration scripts where performance matters but memory is limited.
Best Practices
- Tune worker counts based on CPU versus I/O bound workloads.
- Validate and sanitize each record before writing output.
- Propagate context and timeouts to stop gracefully on cancellations.
Go Version1.14
Difficultyintermediate
Production ReadyYes
Lines of Code220