main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
package main

import (
	"container/list"
	"fmt"
	"sync"
	"time"
)

// DataPoint represents a single time series data point
type DataPoint struct {
	Timestamp time.Time // Time when the data point was recorded
	Value     float64   // Numeric value of the data point
}

// AggregationType defines the type of aggregation to perform
type AggregationType string

const (
	AggregationSum     AggregationType = "sum"
	AggregationAvg     AggregationType = "avg"
	AggregationMin     AggregationType = "min"
	AggregationMax     AggregationType = "max"
	AggregationCount   AggregationType = "count"
)

// WindowAggregationResult contains the result of a window aggregation
type WindowAggregationResult struct {
	WindowStart  time.Time       // Start time of the window
	WindowEnd    time.Time       // End time of the window
	Aggregation  AggregationType // Type of aggregation performed
	Value        float64         // Result value
	Count        int             // Number of data points in the window
}

// SlidingWindowAggregator processes time series data with a sliding window
type SlidingWindowAggregator struct {
	windowSize    time.Duration       // Size of the sliding window
	slideInterval time.Duration       // Interval at which the window slides
	aggregation   AggregationType     // Type of aggregation to perform
	dataPoints    *list.List          // Thread-safe list to hold data points
	mu            sync.RWMutex        // For thread-safe access to data points
	resultChan    chan WindowAggregationResult // Channel for aggregation results
	stopChan      chan struct{}       // Channel to stop the aggregator
}

// NewSlidingWindowAggregator creates a new sliding window aggregator
func NewSlidingWindowAggregator(
	windowSize time.Duration,
	slideInterval time.Duration,
	aggregation AggregationType,
	bufferSize int,
) *SlidingWindowAggregator {
	if slideInterval > windowSize {
		slideInterval = windowSize // Slide interval can't be larger than window size
	}

	agg := &SlidingWindowAggregator{
		windowSize:    windowSize,
		slideInterval: slideInterval,
		aggregation:   aggregation,
		dataPoints:    list.New(),
		resultChan:    make(chan WindowAggregationResult, bufferSize),
		stopChan:      make(chan struct{}),
	}

	// Start the aggregation loop
	go agg.run()

	return agg
}

// AddDataPoint adds a new data point to the aggregator (thread-safe)
func (s *SlidingWindowAggregator) AddDataPoint(value float64) {
	s.mu.Lock()
	defer s.mu.Unlock()

	// Add new data point with current timestamp
	s.dataPoints.PushBack(DataPoint{
		Timestamp: time.Now(),
		Value:     value,
	})

	// Remove old data points that are outside the maximum window size
	s.trimOldDataPoints(time.Now().Add(-s.windowSize))
}

// trimOldDataPoints removes data points older than the specified cutoff time
func (s *SlidingWindowAggregator) trimOldDataPoints(cutoff time.Time) {
	// Iterate from front and remove old data points
	for e := s.dataPoints.Front(); e != nil; {
		dp := e.Value.(DataPoint)
		if dp.Timestamp.Before(cutoff) {
			next := e.Next()
			s.dataPoints.Remove(e)
			e = next
		} else {
			break // Remaining points are newer
		}
	}
}

// run performs the sliding window aggregation on a fixed interval
func (s *SlidingWindowAggregator) run() {
	// Create ticker for slide interval
	ticker := time.NewTicker(s.slideInterval)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			// Perform aggregation for current window
			now := time.Now()
			windowEnd := now
			windowStart := now.Add(-s.windowSize)

			// Get copy of data points in window
			s.mu.RLock()
			dataInWindow := s.getDataPointsInWindow(windowStart, windowEnd)
			s.mu.RUnlock()

			// Calculate aggregation
			result := s.calculateAggregation(dataInWindow, windowStart, windowEnd)

			// Send result to channel (non-blocking)
			select {
			case s.resultChan <- result:
			default:
				// Channel is full - log and discard (or implement backpressure)
				fmt.Printf("Warning: result channel full, dropping aggregation result\n")
			}

		case <-s.stopChan:
			// Stop the aggregator
			return
		}
	}
}

// getDataPointsInWindow returns all data points within the specified time window
func (s *SlidingWindowAggregator) getDataPointsInWindow(start, end time.Time) []DataPoint {
	var points []DataPoint

	// Iterate through data points and collect those in the window
	for e := s.dataPoints.Front(); e != nil; e = e.Next() {
		dp := e.Value.(DataPoint)
		if dp.Timestamp.After(start) && dp.Timestamp.Before(end) {
			points = append(points, dp)
		}
	}

	return points
}

// calculateAggregation computes the specified aggregation on the data points
func (s *SlidingWindowAggregator) calculateAggregation(
	dataPoints []DataPoint,
	windowStart, windowEnd time.Time,
) WindowAggregationResult {
	result := WindowAggregationResult{
		WindowStart:  windowStart,
		WindowEnd:    windowEnd,
		Aggregation:  s.aggregation,
		Count:        len(dataPoints),
	}

	if len(dataPoints) == 0 {
		return result // Return 0 value for empty window
	}

	// Calculate based on aggregation type
	switch s.aggregation {
	case AggregationSum:
		sum := 0.0
		for _, dp := range dataPoints {
			sum += dp.Value
		}
		result.Value = sum

	case AggregationAvg:
		sum := 0.0
		for _, dp := range dataPoints {
			sum += dp.Value
		}
		result.Value = sum / float64(len(dataPoints))

	case AggregationMin:
		min := dataPoints[0].Value
		for _, dp := range dataPoints {
			if dp.Value < min {
				min = dp.Value
			}
		}
		result.Value = min

	case AggregationMax:
		max := dataPoints[0].Value
		for _, dp := range dataPoints {
			if dp.Value > max {
				max = dp.Value
			}
		}
		result.Value = max

	case AggregationCount:
		result.Value = float64(len(dataPoints))
	}

	return result
}

// GetResults returns the channel for aggregation results
func (s *SlidingWindowAggregator) GetResults() <-chan WindowAggregationResult {
	return s.resultChan
}

// Stop terminates the aggregator and cleans up resources
func (s *SlidingWindowAggregator) Stop() {
	close(s.stopChan)
	close(s.resultChan)
}

func main() {
	// Create aggregator with 10-second window, 2-second slide interval, average aggregation
	aggregator := NewSlidingWindowAggregator(10*time.Second, 2*time.Second, AggregationAvg, 100)
	defer aggregator.Stop()

	// Simulate data points (generate random values every 500ms)
	go func() {
		for i := 0; i < 30; i++ {
			// Generate values between 10 and 20
			value := 10.0 + float64(i%10) // Simple pattern for demonstration
			aggregator.AddDataPoint(value)
			fmt.Printf("Added data point: %.2f\n", value)
			time.Sleep(500 * time.Millisecond)
		}
	}()

	// Read and print aggregation results
	resultCount := 0
	for result := range aggregator.GetResults() {
		fmt.Printf("\nWindow: %s to %s\n", result.WindowStart.Format(time.RFC3339), result.WindowEnd.Format(time.RFC3339))
		fmt.Printf("%s: %.2f (Data points: %d)\n", result.Aggregation, result.Value, result.Count)

		resultCount++
		if resultCount >= 10 {
			break // Stop after 10 results
		}
	}

	fmt.Println("\nAggregation completed!")
}

How It Works

Sliding-window aggregator that maintains rolling sums, counts, minimums, maximums, and averages over time-based buckets.

Stores points with timestamps, prunes entries outside the window on each add, recalculates aggregates efficiently, and supports queries for current window metrics.

Key Concepts

  • 1Windowing keeps memory bounded regardless of total samples.
  • 2Supports multiple aggregate functions with one pass pruning.
  • 3Uses time comparisons so the window slides naturally.

When to Use This Pattern

  • Rate calculations for metrics such as requests per second or error rates.
  • Streaming analytics dashboards requiring live stats.
  • Detecting anomalies within recent time windows.

Best Practices

  • Pick window size and slide interval appropriate to signal noise.
  • Use monotonic time where possible to avoid clock jumps.
  • Avoid recomputing heavy stats by maintaining running totals.
Go Version1.18
Difficultyadvanced
Production ReadyYes
Lines of Code252