main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
package main

import (	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"regexp"
	"sync"
	"syscall"
	"time"

	"github.com/hpcloud/tail")

// LogAggregator manages multiple log file tails and filters log lines
type LogAggregator struct {
	files    []string          // Log files to monitor
	filters  []*regexp.Regexp  // Regex filters for matching lines
	ctx      context.Context
	cancel   context.CancelFunc
	wg       sync.WaitGroup
	mu       sync.RWMutex
}

// NewLogAggregator creates a new aggregator with log files and filters
func NewLogAggregator(files []string, filterPatterns []string) (*LogAggregator, error) {
	// Compile regex filters
	var filters []*regexp.Regexp
	for _, pattern := range filterPatterns {
		if pattern == "" {
			continue
		}
		r, err := regexp.Compile(pattern)
		if err != nil {
			return nil, fmt.Errorf("invalid regex pattern '%s': %v", pattern, err)
		}
		filters = append(filters, r)
	}

	ctx, cancel := context.WithCancel(context.Background())
	return &LogAggregator{
		files:   files,
		filters: filters,
		ctx:     ctx,
		cancel:  cancel,
	}, nil
}

// matchFilters checks if a log line matches any of the configured filters
func (a *LogAggregator) matchFilters(line string) bool {
	if len(a.filters) == 0 {
		return true // No filters = match all lines
	}
	a.mu.RLock()
	defer a.mu.RUnlock()
	for _, r := range a.filters {
		if r.MatchString(line) {
			return true
		}
	}
	return false
}

// tailFile monitors a single log file and streams matching lines
func (a *LogAggregator) tailFile(filePath string) {
	defer a.wg.Done()
	// Configure tail: follow log rotation, re-open on truncate
	opts := tail.Config{
		Follow:    true,
		ReOpen:    true,
		Poll:      true,
		Location:  &tail.SeekInfo{Offset: 0, Whence: os.SEEK_END},
		Logger:    tail.DiscardingLogger,
		MaxLineSize: 1024 * 1024, // 1MB max line size
	}
	t, err := tail.TailFile(filePath, opts)
	if err != nil {
		log.Printf("Failed to tail file %s: %v", filePath, err)
		return
	}
	defer t.Stop()

	log.Printf("Started tailing log file: %s", filePath)
	for {
		select {
		case <-a.ctx.Done():
			log.Printf("Stopped tailing file: %s", filePath)
			return
		case line, ok := <-t.Lines:
			if !ok {
				log.Printf("Log file %s closed", filePath)
				return
			}
			if line.Err != nil {
				log.Printf("Error reading line from %s: %v", filePath, line.Err)
				continue
			}
			// Filter and print matching lines
			if a.matchFilters(line.Text) {
				fmt.Printf("[%s] %s: %s\n", time.Now().Format("15:04:05"), filePath, line.Text)
			}
		}
	}
}

// Start begins monitoring all log files concurrently
func (a *LogAggregator) Start() {
	for _, file := range a.files {
		a.wg.Add(1)
		go a.tailFile(file)
	}
	// Wait for all tail goroutines to exit
	a.wg.Wait()
}

// Stop terminates all log tails gracefully
func (a *LogAggregator) Stop() {
	log.Println("Stopping log aggregator...")
	a.cancel()
}

// Example Usage
func main() {
	if len(os.Args) < 2 {
		log.Fatal("Usage: go run main.go <log-file-1> [log-file-2...] [-filter regex-pattern]")
	}

	// Parse command line arguments: log files + optional filter
	var files []string
	var filterPatterns []string
	filterMode := false
	for _, arg := range os.Args[1:] {
		if arg == "-filter" {
			filterMode = true
			continue
		}
		if filterMode {
			filterPatterns = append(filterPatterns, arg)
			filterMode = false
		} else {
			files = append(files, arg)
		}
	}

	// Create aggregator
	aggregator, err := NewLogAggregator(files, filterPatterns)
	if err != nil {
		log.Fatalf("Failed to create aggregator: %v", err)
	}

	// Handle SIGINT/SIGTERM for graceful shutdown
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigChan
		aggregator.Stop()
	}()

	// Start aggregator
	aggregator.Start()
	log.Println("Log aggregator stopped")
}

How It Works

Tails multiple log files concurrently, filters lines with regex, and streams matches to outputs in near real time.

Watches files with tail readers, handles rotation, pushes lines into channels, applies regex filters, and forwards formatted matches while tracking offsets.

Key Concepts

  • 1Concurrent tailers let multiple files be monitored without blocking.
  • 2Filter engine reduces noise by matching only relevant lines.
  • 3Rotation handling keeps readers alive when log files roll over.

When to Use This Pattern

  • Live debugging across multiple service logs.
  • Lightweight observability for on-host monitoring.
  • Feeding alerts or dashboards from tail-followed logs.

Best Practices

  • Use backpressure or buffering to avoid dropping lines during bursts.
  • Validate regex patterns and handle compile failures early.
  • Run with the least privileges needed to read log files.
Go Version1.16
Difficultyintermediate
Production ReadyYes
Lines of Code162