main.go
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/robfig/cron/v3"
)

// Job represents a scheduled background task with metadata
type Job struct {
	ID        string
	CronExpr  string       // Cron expression (e.g., "0 3 * * *" for daily 3 AM)
	Task      func() error // Task function to execute
	Enabled   bool
	LastRun   time.Time
	LastError error
	RunCount  int
	mu        sync.Mutex
}

// JobScheduler manages scheduled jobs
type JobScheduler struct {
	cron   *cron.Cron
	jobs   map[string]*Job
	mu     sync.RWMutex
	ctx    context.Context
	cancel context.CancelFunc
}

// NewJobScheduler creates a scheduler instance with timezone support
func NewJobScheduler(timezone string) (*JobScheduler, error) {
	loc, err := time.LoadLocation(timezone)
	if err != nil {
		return nil, fmt.Errorf("invalid timezone: %w", err)
	}

	ctx, cancel := context.WithCancel(context.Background())
	return &JobScheduler{
		cron:   cron.New(cron.WithLocation(loc), cron.WithParser(cron.NewParser(cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor))),
		jobs:   make(map[string]*Job),
		ctx:    ctx,
		cancel: cancel,
	}, nil
}

// RegisterJob adds a new job to the scheduler
func (s *JobScheduler) RegisterJob(job *Job) error {
	s.mu.Lock()
	defer s.mu.Unlock()

	if job == nil {
		return fmt.Errorf("job cannot be nil")
	}
	if job.ID == "" {
		return fmt.Errorf("job.ID cannot be empty")
	}
	if job.CronExpr == "" {
		return fmt.Errorf("job.CronExpr cannot be empty")
	}
	if job.Task == nil {
		return fmt.Errorf("job.Task cannot be nil")
	}

	if _, exists := s.jobs[job.ID]; exists {
		return fmt.Errorf("job with ID %s already exists", job.ID)
	}

	job.Enabled = true

	wrapper := func() {
		job.mu.Lock()
		defer job.mu.Unlock()
		if !job.Enabled {
			return
		}

		log.Printf("starting job=%s", job.ID)
		job.LastRun = time.Now()
		job.LastError = job.Task()
		job.RunCount++
		if job.LastError != nil {
			log.Printf("job=%s failed: %v", job.ID, job.LastError)
			return
		}
		log.Printf("job=%s done (run_count=%d)", job.ID, job.RunCount)
	}

	entryID, err := s.cron.AddFunc(job.CronExpr, wrapper)
	if err != nil {
		return fmt.Errorf("failed to schedule job %s: %w", job.ID, err)
	}

	s.jobs[job.ID] = job
	log.Printf("registered job=%s cron=%q entry_id=%d", job.ID, job.CronExpr, entryID)
	return nil
}

// ToggleJob enables/disables a registered job
func (s *JobScheduler) ToggleJob(jobID string, enabled bool) error {
	s.mu.RLock()
	job, exists := s.jobs[jobID]
	s.mu.RUnlock()
	if !exists {
		return fmt.Errorf("job %s not found", jobID)
	}

	job.mu.Lock()
	defer job.mu.Unlock()
	job.Enabled = enabled
	return nil
}

// Start runs the scheduler and blocks until Shutdown()
func (s *JobScheduler) Start() {
	log.Println("starting job scheduler...")
	s.cron.Start()
	<-s.ctx.Done()
}

// Shutdown stops the scheduler gracefully
func (s *JobScheduler) Shutdown(timeout time.Duration) {
	log.Println("initiating scheduler shutdown...")
	s.cancel()

	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()

	stopCtx := s.cron.Stop() // waits for running jobs
	select {
	case <-stopCtx.Done():
		log.Println("scheduler shutdown completed")
	case <-ctx.Done():
		log.Println("scheduler shutdown timeout; exiting")
	}
}

func main() {
	scheduler, err := NewJobScheduler("UTC")
	if err != nil {
		log.Fatalf("create scheduler: %v", err)
	}

	backupJob := &Job{
		ID:       "daily-db-backup",
		CronExpr: "0 3 * * *",
		Task: func() error {
			log.Println("Performing database backup...")
			time.Sleep(2 * time.Second)
			return nil
		},
	}

	cleanupJob := &Job{
		ID:       "hourly-log-cleanup",
		CronExpr: "0 * * * *",
		Task: func() error {
			log.Println("Cleaning up old log files...")
			time.Sleep(1 * time.Second)
			return nil
		},
	}

	if err := scheduler.RegisterJob(backupJob); err != nil {
		log.Fatalf("register backup job: %v", err)
	}
	if err := scheduler.RegisterJob(cleanupJob); err != nil {
		log.Fatalf("register cleanup job: %v", err)
	}

	go scheduler.Start()

	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
	<-sigCh
	scheduler.Shutdown(30 * time.Second)
}

How It Works

In-process scheduler that registers cron-style jobs, tracks runs, and supports graceful shutdown.

Parses cron expressions, registers jobs in a map, runs a loop ticking each second to trigger due tasks, records execution history and error states, and stops via context cancellation.

Key Concepts

  • 1Cron parser enables flexible schedules beyond fixed intervals.
  • 2Thread-safe registry allows dynamic job registration and removal.
  • 3Execution history captures errors for observability.

When to Use This Pattern

  • Lightweight scheduled tasks like cleanups or notifications.
  • Local cron replacement inside containerized applications.
  • Running periodic health checks or data syncs without external schedulers.

Best Practices

  • Guard job execution with panic recovery to keep the scheduler alive.
  • Give jobs context and timeouts to avoid overlapping long runs.
  • Persist job state if you need durability across restarts.
Go Version1.16
Difficultyintermediate
Production ReadyYes
Lines of Code184