main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
package main

import (
	"container/heap"
	"context"
	"fmt"
	"log"
	"sync"
	"time"
)

type Task struct {
	ID string
	At time.Time
	Do func(context.Context) error
}

type taskHeap []*Task

func (h taskHeap) Len() int           { return len(h) }
func (h taskHeap) Less(i, j int) bool { return h[i].At.Before(h[j].At) }
func (h taskHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
func (h *taskHeap) Push(x any)        { *h = append(*h, x.(*Task)) }
func (h *taskHeap) Pop() any {
	old := *h
	n := len(old)
	t := old[n-1]
	old[n-1] = nil
	*h = old[:n-1]
	return t
}
func (h taskHeap) Peek() *Task { if len(h) == 0 { return nil }; return h[0] }

type Scheduler struct {
	mu   sync.Mutex
	h    taskHeap
	wake chan struct{}
}

func NewScheduler() *Scheduler {
	s := &Scheduler{wake: make(chan struct{}, 1)}
	heap.Init(&s.h)
	return s
}

func (s *Scheduler) Schedule(t *Task) {
	s.mu.Lock()
	heap.Push(&s.h, t)
	s.mu.Unlock()
	select {
	case s.wake <- struct{}{}:
	default:
	}
}

func (s *Scheduler) Run(ctx context.Context) error {
	for {
		s.mu.Lock()
		next := s.h.Peek()
		s.mu.Unlock()

		if next == nil {
			select {
			case <-ctx.Done():
				return ctx.Err()
			case <-s.wake:
				continue
			}
		}

		wait := next.At.Sub(time.Now())
		if wait > 0 {
			timer := time.NewTimer(wait)
			select {
			case <-ctx.Done():
				timer.Stop()
				return ctx.Err()
			case <-s.wake:
				timer.Stop()
				continue
			case <-timer.C:
			}
		}

		s.mu.Lock()
		next = s.h.Peek()
		if next != nil && !next.At.After(time.Now()) {
			heap.Pop(&s.h)
		} else {
			s.mu.Unlock()
			continue
		}
		s.mu.Unlock()

		if err := next.Do(ctx); err != nil {
			return fmt.Errorf("task %s failed: %w", next.ID, err)
		}
	}
}

func main() {
	s := NewScheduler()
	s.Schedule(&Task{
		ID: "task-1",
		At: time.Now().Add(300 * time.Millisecond),
		Do: func(ctx context.Context) error {
			log.Printf("ran task-1")
			return nil
		},
	})
	s.Schedule(&Task{
		ID: "task-2",
		At: time.Now().Add(120 * time.Millisecond),
		Do: func(ctx context.Context) error {
			log.Printf("ran task-2")
			return nil
		},
	})

	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	if err := s.Run(ctx); err != nil && err != context.DeadlineExceeded {
		log.Fatalf("scheduler stopped: %v", err)
	}
	log.Printf("scheduler done")
}

How It Works

Min-heap scheduler that executes tasks at specific times without busy-waiting, supporting dynamic scheduling and cancellation via context.

Uses container/heap to order tasks by next run time; a goroutine waits until the soonest task is due, executes it, requeues if recurring, and stops when context is canceled; channels synchronize task submission.

Key Concepts

  • 1Heap ensures O(log n) scheduling operations.
  • 2Timer-based loop sleeps until the next job is due.
  • 3Context cancellation shuts down workers and timers safely.

When to Use This Pattern

  • Delayed execution of jobs inside a single process.
  • Lightweight alternative to cron or queuing systems.
  • Scheduling retries or reminders without external services.

Best Practices

  • Guard heap operations with a mutex when accessed concurrently.
  • Coalesce tasks scheduled at the same timestamp to avoid drift.
  • Persist tasks if they must survive restarts.
Go Version1.18+
Difficultyadvanced
Production ReadyYes
Lines of Code127