Time Series Data Aggregation with Sliding Window
Sliding-window aggregator that maintains rolling sums, counts, minimums, maximums, and averages over time-based buckets.
main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
package main
import (
"container/list"
"fmt"
"sync"
"time"
)
// DataPoint represents a single time series data point
type DataPoint struct {
Timestamp time.Time // Time when the data point was recorded
Value float64 // Numeric value of the data point
}
// AggregationType defines the type of aggregation to perform
type AggregationType string
const (
AggregationSum AggregationType = "sum"
AggregationAvg AggregationType = "avg"
AggregationMin AggregationType = "min"
AggregationMax AggregationType = "max"
AggregationCount AggregationType = "count"
)
// WindowAggregationResult contains the result of a window aggregation
type WindowAggregationResult struct {
WindowStart time.Time // Start time of the window
WindowEnd time.Time // End time of the window
Aggregation AggregationType // Type of aggregation performed
Value float64 // Result value
Count int // Number of data points in the window
}
// SlidingWindowAggregator processes time series data with a sliding window
type SlidingWindowAggregator struct {
windowSize time.Duration // Size of the sliding window
slideInterval time.Duration // Interval at which the window slides
aggregation AggregationType // Type of aggregation to perform
dataPoints *list.List // Thread-safe list to hold data points
mu sync.RWMutex // For thread-safe access to data points
resultChan chan WindowAggregationResult // Channel for aggregation results
stopChan chan struct{} // Channel to stop the aggregator
}
// NewSlidingWindowAggregator creates a new sliding window aggregator
func NewSlidingWindowAggregator(
windowSize time.Duration,
slideInterval time.Duration,
aggregation AggregationType,
bufferSize int,
) *SlidingWindowAggregator {
if slideInterval > windowSize {
slideInterval = windowSize // Slide interval can't be larger than window size
}
agg := &SlidingWindowAggregator{
windowSize: windowSize,
slideInterval: slideInterval,
aggregation: aggregation,
dataPoints: list.New(),
resultChan: make(chan WindowAggregationResult, bufferSize),
stopChan: make(chan struct{}),
}
// Start the aggregation loop
go agg.run()
return agg
}
// AddDataPoint adds a new data point to the aggregator (thread-safe)
func (s *SlidingWindowAggregator) AddDataPoint(value float64) {
s.mu.Lock()
defer s.mu.Unlock()
// Add new data point with current timestamp
s.dataPoints.PushBack(DataPoint{
Timestamp: time.Now(),
Value: value,
})
// Remove old data points that are outside the maximum window size
s.trimOldDataPoints(time.Now().Add(-s.windowSize))
}
// trimOldDataPoints removes data points older than the specified cutoff time
func (s *SlidingWindowAggregator) trimOldDataPoints(cutoff time.Time) {
// Iterate from front and remove old data points
for e := s.dataPoints.Front(); e != nil; {
dp := e.Value.(DataPoint)
if dp.Timestamp.Before(cutoff) {
next := e.Next()
s.dataPoints.Remove(e)
e = next
} else {
break // Remaining points are newer
}
}
}
// run performs the sliding window aggregation on a fixed interval
func (s *SlidingWindowAggregator) run() {
// Create ticker for slide interval
ticker := time.NewTicker(s.slideInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Perform aggregation for current window
now := time.Now()
windowEnd := now
windowStart := now.Add(-s.windowSize)
// Get copy of data points in window
s.mu.RLock()
dataInWindow := s.getDataPointsInWindow(windowStart, windowEnd)
s.mu.RUnlock()
// Calculate aggregation
result := s.calculateAggregation(dataInWindow, windowStart, windowEnd)
// Send result to channel (non-blocking)
select {
case s.resultChan <- result:
default:
// Channel is full - log and discard (or implement backpressure)
fmt.Printf("Warning: result channel full, dropping aggregation result\n")
}
case <-s.stopChan:
// Stop the aggregator
return
}
}
}
// getDataPointsInWindow returns all data points within the specified time window
func (s *SlidingWindowAggregator) getDataPointsInWindow(start, end time.Time) []DataPoint {
var points []DataPoint
// Iterate through data points and collect those in the window
for e := s.dataPoints.Front(); e != nil; e = e.Next() {
dp := e.Value.(DataPoint)
if dp.Timestamp.After(start) && dp.Timestamp.Before(end) {
points = append(points, dp)
}
}
return points
}
// calculateAggregation computes the specified aggregation on the data points
func (s *SlidingWindowAggregator) calculateAggregation(
dataPoints []DataPoint,
windowStart, windowEnd time.Time,
) WindowAggregationResult {
result := WindowAggregationResult{
WindowStart: windowStart,
WindowEnd: windowEnd,
Aggregation: s.aggregation,
Count: len(dataPoints),
}
if len(dataPoints) == 0 {
return result // Return 0 value for empty window
}
// Calculate based on aggregation type
switch s.aggregation {
case AggregationSum:
sum := 0.0
for _, dp := range dataPoints {
sum += dp.Value
}
result.Value = sum
case AggregationAvg:
sum := 0.0
for _, dp := range dataPoints {
sum += dp.Value
}
result.Value = sum / float64(len(dataPoints))
case AggregationMin:
min := dataPoints[0].Value
for _, dp := range dataPoints {
if dp.Value < min {
min = dp.Value
}
}
result.Value = min
case AggregationMax:
max := dataPoints[0].Value
for _, dp := range dataPoints {
if dp.Value > max {
max = dp.Value
}
}
result.Value = max
case AggregationCount:
result.Value = float64(len(dataPoints))
}
return result
}
// GetResults returns the channel for aggregation results
func (s *SlidingWindowAggregator) GetResults() <-chan WindowAggregationResult {
return s.resultChan
}
// Stop terminates the aggregator and cleans up resources
func (s *SlidingWindowAggregator) Stop() {
close(s.stopChan)
close(s.resultChan)
}
func main() {
// Create aggregator with 10-second window, 2-second slide interval, average aggregation
aggregator := NewSlidingWindowAggregator(10*time.Second, 2*time.Second, AggregationAvg, 100)
defer aggregator.Stop()
// Simulate data points (generate random values every 500ms)
go func() {
for i := 0; i < 30; i++ {
// Generate values between 10 and 20
value := 10.0 + float64(i%10) // Simple pattern for demonstration
aggregator.AddDataPoint(value)
fmt.Printf("Added data point: %.2f\n", value)
time.Sleep(500 * time.Millisecond)
}
}()
// Read and print aggregation results
resultCount := 0
for result := range aggregator.GetResults() {
fmt.Printf("\nWindow: %s to %s\n", result.WindowStart.Format(time.RFC3339), result.WindowEnd.Format(time.RFC3339))
fmt.Printf("%s: %.2f (Data points: %d)\n", result.Aggregation, result.Value, result.Count)
resultCount++
if resultCount >= 10 {
break // Stop after 10 results
}
}
fmt.Println("\nAggregation completed!")
}How It Works
Sliding-window aggregator that maintains rolling sums, counts, minimums, maximums, and averages over time-based buckets.
Stores points with timestamps, prunes entries outside the window on each add, recalculates aggregates efficiently, and supports queries for current window metrics.
Key Concepts
- 1Windowing keeps memory bounded regardless of total samples.
- 2Supports multiple aggregate functions with one pass pruning.
- 3Uses time comparisons so the window slides naturally.
When to Use This Pattern
- Rate calculations for metrics such as requests per second or error rates.
- Streaming analytics dashboards requiring live stats.
- Detecting anomalies within recent time windows.
Best Practices
- Pick window size and slide interval appropriate to signal noise.
- Use monotonic time where possible to avoid clock jumps.
- Avoid recomputing heavy stats by maintaining running totals.
Go Version1.18
Difficultyadvanced
Production ReadyYes
Lines of Code252