Background Job Scheduler with Cron Expression Support
In-process scheduler that registers cron-style jobs, tracks runs, and supports graceful shutdown.
main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/robfig/cron/v3"
)
// Job represents a scheduled background task with metadata
type Job struct {
ID string
CronExpr string // Cron expression (e.g., "0 3 * * *" for daily 3 AM)
Task func() error // Task function to execute
Enabled bool
LastRun time.Time
LastError error
RunCount int
mu sync.Mutex
}
// JobScheduler manages scheduled jobs
type JobScheduler struct {
cron *cron.Cron
jobs map[string]*Job
mu sync.RWMutex
ctx context.Context
cancel context.CancelFunc
}
// NewJobScheduler creates a scheduler instance with timezone support
func NewJobScheduler(timezone string) (*JobScheduler, error) {
loc, err := time.LoadLocation(timezone)
if err != nil {
return nil, fmt.Errorf("invalid timezone: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
return &JobScheduler{
cron: cron.New(cron.WithLocation(loc), cron.WithParser(cron.NewParser(cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor))),
jobs: make(map[string]*Job),
ctx: ctx,
cancel: cancel,
}, nil
}
// RegisterJob adds a new job to the scheduler
func (s *JobScheduler) RegisterJob(job *Job) error {
s.mu.Lock()
defer s.mu.Unlock()
if job == nil {
return fmt.Errorf("job cannot be nil")
}
if job.ID == "" {
return fmt.Errorf("job.ID cannot be empty")
}
if job.CronExpr == "" {
return fmt.Errorf("job.CronExpr cannot be empty")
}
if job.Task == nil {
return fmt.Errorf("job.Task cannot be nil")
}
if _, exists := s.jobs[job.ID]; exists {
return fmt.Errorf("job with ID %s already exists", job.ID)
}
job.Enabled = true
wrapper := func() {
job.mu.Lock()
defer job.mu.Unlock()
if !job.Enabled {
return
}
log.Printf("starting job=%s", job.ID)
job.LastRun = time.Now()
job.LastError = job.Task()
job.RunCount++
if job.LastError != nil {
log.Printf("job=%s failed: %v", job.ID, job.LastError)
return
}
log.Printf("job=%s done (run_count=%d)", job.ID, job.RunCount)
}
entryID, err := s.cron.AddFunc(job.CronExpr, wrapper)
if err != nil {
return fmt.Errorf("failed to schedule job %s: %w", job.ID, err)
}
s.jobs[job.ID] = job
log.Printf("registered job=%s cron=%q entry_id=%d", job.ID, job.CronExpr, entryID)
return nil
}
// ToggleJob enables/disables a registered job
func (s *JobScheduler) ToggleJob(jobID string, enabled bool) error {
s.mu.RLock()
job, exists := s.jobs[jobID]
s.mu.RUnlock()
if !exists {
return fmt.Errorf("job %s not found", jobID)
}
job.mu.Lock()
defer job.mu.Unlock()
job.Enabled = enabled
return nil
}
// Start runs the scheduler and blocks until Shutdown()
func (s *JobScheduler) Start() {
log.Println("starting job scheduler...")
s.cron.Start()
<-s.ctx.Done()
}
// Shutdown stops the scheduler gracefully
func (s *JobScheduler) Shutdown(timeout time.Duration) {
log.Println("initiating scheduler shutdown...")
s.cancel()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
stopCtx := s.cron.Stop() // waits for running jobs
select {
case <-stopCtx.Done():
log.Println("scheduler shutdown completed")
case <-ctx.Done():
log.Println("scheduler shutdown timeout; exiting")
}
}
func main() {
scheduler, err := NewJobScheduler("UTC")
if err != nil {
log.Fatalf("create scheduler: %v", err)
}
backupJob := &Job{
ID: "daily-db-backup",
CronExpr: "0 3 * * *",
Task: func() error {
log.Println("Performing database backup...")
time.Sleep(2 * time.Second)
return nil
},
}
cleanupJob := &Job{
ID: "hourly-log-cleanup",
CronExpr: "0 * * * *",
Task: func() error {
log.Println("Cleaning up old log files...")
time.Sleep(1 * time.Second)
return nil
},
}
if err := scheduler.RegisterJob(backupJob); err != nil {
log.Fatalf("register backup job: %v", err)
}
if err := scheduler.RegisterJob(cleanupJob); err != nil {
log.Fatalf("register cleanup job: %v", err)
}
go scheduler.Start()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
<-sigCh
scheduler.Shutdown(30 * time.Second)
}
How It Works
In-process scheduler that registers cron-style jobs, tracks runs, and supports graceful shutdown.
Parses cron expressions, registers jobs in a map, runs a loop ticking each second to trigger due tasks, records execution history and error states, and stops via context cancellation.
Key Concepts
- 1Cron parser enables flexible schedules beyond fixed intervals.
- 2Thread-safe registry allows dynamic job registration and removal.
- 3Execution history captures errors for observability.
When to Use This Pattern
- Lightweight scheduled tasks like cleanups or notifications.
- Local cron replacement inside containerized applications.
- Running periodic health checks or data syncs without external schedulers.
Best Practices
- Guard job execution with panic recovery to keep the scheduler alive.
- Give jobs context and timeouts to avoid overlapping long runs.
- Persist job state if you need durability across restarts.
Go Version1.16
Difficultyintermediate
Production ReadyYes
Lines of Code184