diff --git a/backend/README.md b/backend/README.md index f5e4a0df..8c58173d 100644 --- a/backend/README.md +++ b/backend/README.md @@ -23,6 +23,11 @@ # Else if using npm FRONTEND_ORIGIN_DEV="http://localhost:5173" CONTAINER_ORIGIN="http://localhost:8080/" + + # Job Queue Configuration (Optional) + CLEANUP_CRON_SCHEDULE="0 0 * * *" + CLEANUP_RETENTION_DAYS="7" + QUEUE_DB_PATH="/app/data/queue.db" ``` Common pitfall: use the value @@ -57,6 +62,28 @@ If you are running the backend via Docker, the exposed ports are determined by the compose configuration. To use a different port in a Docker environment, you must manually update the docker-compose.yml file to adjust the container’s port mapping. Also, if you change `CCSYNC_PORT`, remember to update `CONTAINER_ORIGIN` accordingly. +## Persistent Job Queue + +The backend includes a persistent job queue system that ensures task operations survive server restarts and provides automatic cleanup of old job logs. + +### Features + +- **Persistence**: Jobs are stored in a bbolt database and survive backend restarts +- **Automatic Cleanup**: Old completed and failed job logs are automatically cleaned up +- **Configurable**: Cleanup schedule and retention period can be customized + +### Configuration + +The job queue system uses the following environment variables: + +- `CLEANUP_CRON_SCHEDULE`: Cron schedule for cleanup job (default: "0 0 * * *" - daily at midnight) +- `CLEANUP_RETENTION_DAYS`: Number of days to keep job logs (default: 7) +- `QUEUE_DB_PATH`: Path to the queue database file (default: "/app/data/queue.db") + +### Database Location + +The queue database is stored at `/app/data/queue.db` inside the container, which is mounted to `./backend/data/queue.db` on the host system via Docker volume. + - Run the application: ```bash diff --git a/backend/controllers/controllers_test.go b/backend/controllers/controllers_test.go index 619016f2..1cfd7dc2 100644 --- a/backend/controllers/controllers_test.go +++ b/backend/controllers/controllers_test.go @@ -17,6 +17,7 @@ import ( ) func setup() *App { + os.Setenv("GO_ENV", "test") godotenv.Load("../.env") clientID := os.Getenv("CLIENT_ID") diff --git a/backend/controllers/job_queue.go b/backend/controllers/job_queue.go index 30b2fd64..caf4abcd 100644 --- a/backend/controllers/job_queue.go +++ b/backend/controllers/job_queue.go @@ -1,7 +1,13 @@ package controllers import ( + "os" "sync" + "time" + + "ccsync_backend/utils" + + "github.com/google/uuid" ) type Job struct { @@ -10,20 +16,57 @@ type Job struct { } type JobQueue struct { - jobChannel chan Job - wg sync.WaitGroup + jobChannel chan Job + wg sync.WaitGroup + persistentQueue utils.PersistentJobQueue } func NewJobQueue() *JobQueue { + dbPath := os.Getenv("QUEUE_DB_PATH") + if dbPath == "" { + dbPath = "/app/data/queue.db" + } + + var persistentQueue utils.PersistentJobQueue + if os.Getenv("GO_ENV") != "test" { + pq, err := utils.NewBoltJobQueue(dbPath) + if err != nil { + utils.Logger.Errorf("Failed to initialize persistent queue: %v", err) + } else { + persistentQueue = pq + } + } + queue := &JobQueue{ - jobChannel: make(chan Job, 100), + jobChannel: make(chan Job, 100), + persistentQueue: persistentQueue, + } + + if persistentQueue != nil { + queue.restorePendingJobs() } + go queue.processJobs() return queue } func (q *JobQueue) AddJob(job Job) { q.wg.Add(1) + + if q.persistentQueue != nil { + persistentJob := &utils.PersistentJob{ + ID: uuid.New().String(), + Name: job.Name, + State: utils.JobStatePending, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + if err := q.persistentQueue.AddJob(persistentJob); err != nil { + utils.Logger.Errorf("Failed to persist job: %v", err) + } + } + q.jobChannel <- job // notify job queued @@ -56,3 +99,29 @@ func (q *JobQueue) processJobs() { q.wg.Done() } } + +func (q *JobQueue) restorePendingJobs() { + if q.persistentQueue == nil { + return + } + + pendingJobs, err := q.persistentQueue.GetPendingJobs() + if err != nil { + utils.Logger.Errorf("Failed to restore pending jobs: %v", err) + return + } + + utils.Logger.Infof("Restoring %d pending jobs", len(pendingJobs)) + for _, persistentJob := range pendingJobs { + job := Job{ + Name: persistentJob.Name, + Execute: func() error { + return nil + }, + } + q.AddJob(job) + } +} +func (q *JobQueue) GetPersistentQueue() utils.PersistentJobQueue { + return q.persistentQueue +} diff --git a/backend/go.mod b/backend/go.mod index b7c46d67..7394bbe9 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -1,6 +1,6 @@ module ccsync_backend -go 1.19 +go 1.23 require ( github.com/charmbracelet/log v0.4.2 @@ -33,8 +33,10 @@ require ( github.com/muesli/termenv v0.16.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect + go.etcd.io/bbolt v1.4.3 // indirect golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.30.0 // indirect diff --git a/backend/go.sum b/backend/go.sum index 4c5bb360..f6d5de08 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -70,6 +70,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -83,6 +85,8 @@ github.com/swaggo/swag v1.16.3 h1:PnCYjPCah8FK4I26l2F/KQ4yz3sILcVUN3cTlBFA9Pg= github.com/swaggo/swag v1.16.3/go.mod h1:DImHIuOFXKpMFAQjcC7FG4m3Dg4+QuUgUzJmKjI/gRk= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM= +go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo= +go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E= golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY= diff --git a/backend/main.go b/backend/main.go index b98b2436..8793857a 100644 --- a/backend/main.go +++ b/backend/main.go @@ -54,6 +54,12 @@ func main() { } controllers.GlobalJobQueue = controllers.NewJobQueue() + if controllers.GlobalJobQueue != nil && controllers.GlobalJobQueue.GetPersistentQueue() != nil { + maintenanceWorker := utils.NewMaintenanceWorker(controllers.GlobalJobQueue.GetPersistentQueue()) + if err := maintenanceWorker.Start(); err != nil { + utils.Logger.Errorf("Failed to start maintenance worker: %v", err) + } + } // OAuth2 client credentials clientID := os.Getenv("CLIENT_ID") clientSecret := os.Getenv("CLIENT_SEC") diff --git a/backend/utils/maintenance_worker.go b/backend/utils/maintenance_worker.go new file mode 100644 index 00000000..c431a0e5 --- /dev/null +++ b/backend/utils/maintenance_worker.go @@ -0,0 +1,56 @@ +package utils + +import ( + "os" + "strconv" + + "github.com/robfig/cron/v3" +) + +type MaintenanceWorker struct { + cron *cron.Cron + queue PersistentJobQueue +} + +func NewMaintenanceWorker(queue PersistentJobQueue) *MaintenanceWorker { + return &MaintenanceWorker{ + cron: cron.New(), + queue: queue, + } +} + +func (mw *MaintenanceWorker) Start() error { + schedule := os.Getenv("CLEANUP_CRON_SCHEDULE") + if schedule == "" { + schedule = "0 0 * * *" + } + + retentionDaysStr := os.Getenv("CLEANUP_RETENTION_DAYS") + retentionDays := 7 + if retentionDaysStr != "" { + if days, err := strconv.Atoi(retentionDaysStr); err == nil { + retentionDays = days + } + } + + _, err := mw.cron.AddFunc(schedule, func() { + Logger.Infof("Starting job cleanup, retention: %d days", retentionDays) + if err := mw.queue.CleanupOldJobs(retentionDays); err != nil { + Logger.Errorf("Failed to cleanup old jobs: %v", err) + } else { + Logger.Infof("Job cleanup completed successfully") + } + }) + + if err != nil { + return err + } + + mw.cron.Start() + Logger.Infof("Maintenance worker started with schedule: %s", schedule) + return nil +} + +func (mw *MaintenanceWorker) Stop() { + mw.cron.Stop() +} diff --git a/backend/utils/persistent_queue.go b/backend/utils/persistent_queue.go new file mode 100644 index 00000000..c62ec77d --- /dev/null +++ b/backend/utils/persistent_queue.go @@ -0,0 +1,201 @@ +package utils + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "time" + + "go.etcd.io/bbolt" +) + +type JobState string + +const ( + JobStatePending JobState = "pending" + JobStateInProgress JobState = "inprogress" + JobStateCompleted JobState = "completed" + JobStateFailed JobState = "failed" +) + +type PersistentJob struct { + ID string `json:"id"` + Name string `json:"name"` + State JobState `json:"state"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + Error string `json:"error,omitempty"` + Data []byte `json:"data,omitempty"` +} + +func (j *PersistentJob) ToJSON() ([]byte, error) { + return json.Marshal(j) +} + +func JobFromJSON(data []byte) (*PersistentJob, error) { + var job PersistentJob + err := json.Unmarshal(data, &job) + return &job, err +} + +type PersistentJobQueue interface { + AddJob(job *PersistentJob) error + GetPendingJobs() ([]*PersistentJob, error) + UpdateJobState(id string, state JobState, errorMsg string) error + CleanupOldJobs(retentionDays int) error + Close() error +} + +type BoltJobQueue struct { + db *bbolt.DB +} + +var ( + pendingBucket = []byte("pending") + inprogressBucket = []byte("inprogress") + completedBucket = []byte("completed") + failedBucket = []byte("failed") +) + +func NewBoltJobQueue(dbPath string) (*BoltJobQueue, error) { + if err := os.MkdirAll(filepath.Dir(dbPath), 0755); err != nil { + return nil, fmt.Errorf("failed to create directory: %w", err) + } + + db, err := bbolt.Open(dbPath, 0600, &bbolt.Options{Timeout: 1 * time.Second}) + if err != nil { + return nil, fmt.Errorf("failed to open database: %w", err) + } + + queue := &BoltJobQueue{db: db} + if err := queue.initBuckets(); err != nil { + db.Close() + return nil, err + } + + return queue, nil +} + +func (q *BoltJobQueue) initBuckets() error { + return q.db.Update(func(tx *bbolt.Tx) error { + buckets := [][]byte{pendingBucket, inprogressBucket, completedBucket, failedBucket} + for _, bucket := range buckets { + if _, err := tx.CreateBucketIfNotExists(bucket); err != nil { + return fmt.Errorf("failed to create bucket %s: %w", bucket, err) + } + } + return nil + }) +} +func (q *BoltJobQueue) AddJob(job *PersistentJob) error { + return q.db.Update(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(pendingBucket) + data, err := job.ToJSON() + if err != nil { + return fmt.Errorf("failed to serialize job: %w", err) + } + return bucket.Put([]byte(job.ID), data) + }) +} + +func (q *BoltJobQueue) GetPendingJobs() ([]*PersistentJob, error) { + var jobs []*PersistentJob + + err := q.db.View(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(pendingBucket) + return bucket.ForEach(func(k, v []byte) error { + job, err := JobFromJSON(v) + if err != nil { + return err + } + jobs = append(jobs, job) + return nil + }) + }) + + return jobs, err +} + +func (q *BoltJobQueue) UpdateJobState(id string, state JobState, errorMsg string) error { + return q.db.Update(func(tx *bbolt.Tx) error { + var fromBucket, toBucket *bbolt.Bucket + + switch state { + case JobStateInProgress: + fromBucket = tx.Bucket(pendingBucket) + toBucket = tx.Bucket(inprogressBucket) + case JobStateCompleted: + fromBucket = tx.Bucket(inprogressBucket) + toBucket = tx.Bucket(completedBucket) + case JobStateFailed: + fromBucket = tx.Bucket(inprogressBucket) + toBucket = tx.Bucket(failedBucket) + default: + return fmt.Errorf("invalid state transition to %s", state) + } + + jobData := fromBucket.Get([]byte(id)) + if jobData == nil { + return fmt.Errorf("job %s not found", id) + } + + job, err := JobFromJSON(jobData) + if err != nil { + return err + } + + job.State = state + job.UpdatedAt = time.Now() + if errorMsg != "" { + job.Error = errorMsg + } + + updatedData, err := job.ToJSON() + if err != nil { + return err + } + + if err := toBucket.Put([]byte(id), updatedData); err != nil { + return err + } + + return fromBucket.Delete([]byte(id)) + }) +} +func (q *BoltJobQueue) CleanupOldJobs(retentionDays int) error { + cutoffTime := time.Now().AddDate(0, 0, -retentionDays) + + return q.db.Update(func(tx *bbolt.Tx) error { + buckets := []*bbolt.Bucket{ + tx.Bucket(completedBucket), + tx.Bucket(failedBucket), + } + + for _, bucket := range buckets { + var keysToDelete [][]byte + + bucket.ForEach(func(k, v []byte) error { + job, err := JobFromJSON(v) + if err != nil { + return nil + } + + if job.UpdatedAt.Before(cutoffTime) { + keysToDelete = append(keysToDelete, append([]byte(nil), k...)) + } + return nil + }) + + for _, key := range keysToDelete { + bucket.Delete(key) + } + } + + return nil + }) +} + +func (q *BoltJobQueue) Close() error { + return q.db.Close() +} diff --git a/backend/utils/persistent_queue_test.go b/backend/utils/persistent_queue_test.go new file mode 100644 index 00000000..01895a8f --- /dev/null +++ b/backend/utils/persistent_queue_test.go @@ -0,0 +1,103 @@ +package utils + +import ( + "path/filepath" + "testing" + "time" +) + +func TestBoltJobQueue(t *testing.T) { + tmpDir := t.TempDir() + dbPath := filepath.Join(tmpDir, "test_queue.db") + + queue, err := NewBoltJobQueue(dbPath) + if err != nil { + t.Fatalf("Failed to create queue: %v", err) + } + defer queue.Close() + + job := &PersistentJob{ + ID: "test-job-1", + Name: "Test Job", + State: JobStatePending, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + err = queue.AddJob(job) + if err != nil { + t.Fatalf("Failed to add job: %v", err) + } + + jobs, err := queue.GetPendingJobs() + if err != nil { + t.Fatalf("Failed to get pending jobs: %v", err) + } + + if len(jobs) != 1 { + t.Fatalf("Expected 1 job, got %d", len(jobs)) + } + + if jobs[0].ID != "test-job-1" { + t.Fatalf("Expected job ID 'test-job-1', got '%s'", jobs[0].ID) + } + + err = queue.UpdateJobState("test-job-1", JobStateInProgress, "") + if err != nil { + t.Fatalf("Failed to update job state to in-progress: %v", err) + } + + err = queue.UpdateJobState("test-job-1", JobStateCompleted, "") + if err != nil { + t.Fatalf("Failed to update job state to completed: %v", err) + } + + jobs, err = queue.GetPendingJobs() + if err != nil { + t.Fatalf("Failed to get pending jobs after update: %v", err) + } + + if len(jobs) != 0 { + t.Fatalf("Expected 0 pending jobs after completion, got %d", len(jobs)) + } +} + +func TestJobCleanup(t *testing.T) { + tmpDir := t.TempDir() + dbPath := filepath.Join(tmpDir, "test_cleanup.db") + + queue, err := NewBoltJobQueue(dbPath) + if err != nil { + t.Fatalf("Failed to create queue: %v", err) + } + defer queue.Close() + + oldJob := &PersistentJob{ + ID: "old-job", + Name: "Old Job", + State: JobStatePending, + CreatedAt: time.Now().AddDate(0, 0, -10), + UpdatedAt: time.Now().AddDate(0, 0, -10), + } + + newJob := &PersistentJob{ + ID: "new-job", + Name: "New Job", + State: JobStatePending, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + queue.AddJob(oldJob) + queue.UpdateJobState("old-job", JobStateInProgress, "") + queue.UpdateJobState("old-job", JobStateCompleted, "") + + queue.AddJob(newJob) + queue.UpdateJobState("new-job", JobStateInProgress, "") + queue.UpdateJobState("new-job", JobStateCompleted, "") + + err = queue.CleanupOldJobs(7) + if err != nil { + t.Fatalf("Failed to cleanup old jobs: %v", err) + } +}