In-Process Task Scheduler Using a Min-Heap
Min-heap scheduler that executes tasks at specific times without busy-waiting, supporting dynamic scheduling and cancellation via context.
main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
package main
import (
"container/heap"
"context"
"fmt"
"log"
"sync"
"time"
)
type Task struct {
ID string
At time.Time
Do func(context.Context) error
}
type taskHeap []*Task
func (h taskHeap) Len() int { return len(h) }
func (h taskHeap) Less(i, j int) bool { return h[i].At.Before(h[j].At) }
func (h taskHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *taskHeap) Push(x any) { *h = append(*h, x.(*Task)) }
func (h *taskHeap) Pop() any {
old := *h
n := len(old)
t := old[n-1]
old[n-1] = nil
*h = old[:n-1]
return t
}
func (h taskHeap) Peek() *Task { if len(h) == 0 { return nil }; return h[0] }
type Scheduler struct {
mu sync.Mutex
h taskHeap
wake chan struct{}
}
func NewScheduler() *Scheduler {
s := &Scheduler{wake: make(chan struct{}, 1)}
heap.Init(&s.h)
return s
}
func (s *Scheduler) Schedule(t *Task) {
s.mu.Lock()
heap.Push(&s.h, t)
s.mu.Unlock()
select {
case s.wake <- struct{}{}:
default:
}
}
func (s *Scheduler) Run(ctx context.Context) error {
for {
s.mu.Lock()
next := s.h.Peek()
s.mu.Unlock()
if next == nil {
select {
case <-ctx.Done():
return ctx.Err()
case <-s.wake:
continue
}
}
wait := next.At.Sub(time.Now())
if wait > 0 {
timer := time.NewTimer(wait)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-s.wake:
timer.Stop()
continue
case <-timer.C:
}
}
s.mu.Lock()
next = s.h.Peek()
if next != nil && !next.At.After(time.Now()) {
heap.Pop(&s.h)
} else {
s.mu.Unlock()
continue
}
s.mu.Unlock()
if err := next.Do(ctx); err != nil {
return fmt.Errorf("task %s failed: %w", next.ID, err)
}
}
}
func main() {
s := NewScheduler()
s.Schedule(&Task{
ID: "task-1",
At: time.Now().Add(300 * time.Millisecond),
Do: func(ctx context.Context) error {
log.Printf("ran task-1")
return nil
},
})
s.Schedule(&Task{
ID: "task-2",
At: time.Now().Add(120 * time.Millisecond),
Do: func(ctx context.Context) error {
log.Printf("ran task-2")
return nil
},
})
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := s.Run(ctx); err != nil && err != context.DeadlineExceeded {
log.Fatalf("scheduler stopped: %v", err)
}
log.Printf("scheduler done")
}How It Works
Min-heap scheduler that executes tasks at specific times without busy-waiting, supporting dynamic scheduling and cancellation via context.
Uses container/heap to order tasks by next run time; a goroutine waits until the soonest task is due, executes it, requeues if recurring, and stops when context is canceled; channels synchronize task submission.
Key Concepts
- 1Heap ensures O(log n) scheduling operations.
- 2Timer-based loop sleeps until the next job is due.
- 3Context cancellation shuts down workers and timers safely.
When to Use This Pattern
- Delayed execution of jobs inside a single process.
- Lightweight alternative to cron or queuing systems.
- Scheduling retries or reminders without external services.
Best Practices
- Guard heap operations with a mutex when accessed concurrently.
- Coalesce tasks scheduled at the same timestamp to avoid drift.
- Persist tasks if they must survive restarts.
Go Version1.18+
Difficultyadvanced
Production ReadyYes
Lines of Code127