Files
atlas/internal/job/manager.go
othman.suseno ed96137bad
Some checks failed
CI / test-build (push) Failing after 1m0s
adding snapshot function
2025-12-14 23:17:26 +07:00

164 lines
3.1 KiB
Go

package job
import (
"fmt"
"sync"
"time"
"gitea.avt.data-center.id/othman.suseno/atlas/internal/models"
)
// Manager handles job lifecycle and tracking
type Manager struct {
mu sync.RWMutex
jobs map[string]*models.Job
nextID int64
}
// NewManager creates a new job manager
func NewManager() *Manager {
return &Manager{
jobs: make(map[string]*models.Job),
nextID: 1,
}
}
// Create creates a new job
func (m *Manager) Create(jobType string, metadata map[string]interface{}) *models.Job {
m.mu.Lock()
defer m.mu.Unlock()
id := fmt.Sprintf("job-%d", m.nextID)
m.nextID++
job := &models.Job{
ID: id,
Type: jobType,
Status: models.JobStatusPending,
Progress: 0,
Message: "Job created",
Metadata: metadata,
CreatedAt: time.Now(),
}
m.jobs[id] = job
return job
}
// Get returns a job by ID
func (m *Manager) Get(id string) (*models.Job, error) {
m.mu.RLock()
defer m.mu.RUnlock()
job, exists := m.jobs[id]
if !exists {
return nil, fmt.Errorf("job %s not found", id)
}
return job, nil
}
// List returns all jobs, optionally filtered by status
func (m *Manager) List(status models.JobStatus) []models.Job {
m.mu.RLock()
defer m.mu.RUnlock()
var jobs []models.Job
for _, job := range m.jobs {
if status == "" || job.Status == status {
jobs = append(jobs, *job)
}
}
return jobs
}
// UpdateStatus updates job status
func (m *Manager) UpdateStatus(id string, status models.JobStatus, message string) error {
m.mu.Lock()
defer m.mu.Unlock()
job, exists := m.jobs[id]
if !exists {
return fmt.Errorf("job %s not found", id)
}
job.Status = status
job.Message = message
now := time.Now()
switch status {
case models.JobStatusRunning:
if job.StartedAt == nil {
job.StartedAt = &now
}
case models.JobStatusCompleted, models.JobStatusFailed, models.JobStatusCancelled:
job.CompletedAt = &now
}
return nil
}
// UpdateProgress updates job progress (0-100)
func (m *Manager) UpdateProgress(id string, progress int, message string) error {
m.mu.Lock()
defer m.mu.Unlock()
job, exists := m.jobs[id]
if !exists {
return fmt.Errorf("job %s not found", id)
}
if progress < 0 {
progress = 0
}
if progress > 100 {
progress = 100
}
job.Progress = progress
if message != "" {
job.Message = message
}
return nil
}
// SetError sets job error and marks as failed
func (m *Manager) SetError(id string, err error) error {
m.mu.Lock()
defer m.mu.Unlock()
job, exists := m.jobs[id]
if !exists {
return fmt.Errorf("job %s not found", id)
}
job.Status = models.JobStatusFailed
job.Error = err.Error()
now := time.Now()
job.CompletedAt = &now
return nil
}
// Cancel cancels a job
func (m *Manager) Cancel(id string) error {
m.mu.Lock()
defer m.mu.Unlock()
job, exists := m.jobs[id]
if !exists {
return fmt.Errorf("job %s not found", id)
}
if job.Status == models.JobStatusCompleted || job.Status == models.JobStatusFailed {
return fmt.Errorf("cannot cancel job in status %s", job.Status)
}
job.Status = models.JobStatusCancelled
job.Message = "Job cancelled by user"
now := time.Now()
job.CompletedAt = &now
return nil
}