package tracker import ( "encoding/json" "fmt" "time" "github.com/sirupsen/logrus" bolt "go.etcd.io/bbolt" ) const ( StatusPending = "pending" StatusInProgress = "in_progress" StatusCompleted = "completed" StatusFailed = "failed" ) type MigrationJob struct { ID string `json:"id"` SourceName string `json:"source_name"` DestName string `json:"dest_name"` Status string `json:"status"` StartTime time.Time `json:"start_time"` EndTime time.Time `json:"end_time,omitempty"` TotalFiles int64 `json:"total_files"` CompletedFiles int64 `json:"completed_files"` ErrorMessage string `json:"error_message,omitempty"` } type FileEntry struct { Path string `json:"path"` Size int64 `json:"size"` Status string `json:"status"` TransferTime time.Time `json:"transfer_time,omitempty"` ErrorMessage string `json:"error_message,omitempty"` } type Tracker struct { db *bolt.DB } var ( jobsBucket = []byte("jobs") filesBucket = []byte("files") ) // NewTracker creates a new tracker instance func NewTracker(dbPath string) (*Tracker, error) { db, err := bolt.Open(dbPath, 0600, &bolt.Options{Timeout: 1 * time.Second}) if err != nil { return nil, fmt.Errorf("failed to open database: %w", err) } // Create buckets if they don't exist err = db.Update(func(tx *bolt.Tx) error { if _, err := tx.CreateBucketIfNotExists(jobsBucket); err != nil { return err } if _, err := tx.CreateBucketIfNotExists(filesBucket); err != nil { return err } return nil }) if err != nil { db.Close() return nil, fmt.Errorf("failed to create buckets: %w", err) } return &Tracker{db: db}, nil } // Close closes the database connection func (t *Tracker) Close() error { return t.db.Close() } // CreateJob creates a new migration job func (t *Tracker) CreateJob(job *MigrationJob) error { return t.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket(jobsBucket) data, err := json.Marshal(job) if err != nil { return err } return b.Put([]byte(job.ID), data) }) } // UpdateJob updates an existing migration job func (t *Tracker) UpdateJob(job *MigrationJob) error { return t.CreateJob(job) // Same as create for now } // GetJob retrieves a migration job by ID func (t *Tracker) GetJob(jobID string) (*MigrationJob, error) { var job MigrationJob err := t.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(jobsBucket) data := b.Get([]byte(jobID)) if data == nil { return fmt.Errorf("job not found") } return json.Unmarshal(data, &job) }) return &job, err } // MarkFileCompleted marks a file as completed func (t *Tracker) MarkFileCompleted(jobID, filePath string, size int64) error { file := &FileEntry{ Path: filePath, Size: size, Status: StatusCompleted, TransferTime: time.Now(), } return t.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket(filesBucket) key := fmt.Sprintf("%s:%s", jobID, filePath) data, err := json.Marshal(file) if err != nil { return err } return b.Put([]byte(key), data) }) } // IsFileCompleted checks if a file has been completed func (t *Tracker) IsFileCompleted(jobID, filePath string) bool { var completed bool t.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(filesBucket) key := fmt.Sprintf("%s:%s", jobID, filePath) data := b.Get([]byte(key)) if data != nil { var file FileEntry if err := json.Unmarshal(data, &file); err == nil { completed = file.Status == StatusCompleted } } return nil }) return completed } // GetCompletedFiles returns the count of completed files for a job func (t *Tracker) GetCompletedFiles(jobID string) int64 { var count int64 t.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(filesBucket) c := b.Cursor() prefix := []byte(jobID + ":") for k, v := c.Seek(prefix); k != nil && len(k) > len(prefix) && string(k[:len(prefix)]) == string(prefix); k, v = c.Next() { var file FileEntry if err := json.Unmarshal(v, &file); err == nil && file.Status == StatusCompleted { count++ } } return nil }) return count } // LogProgress logs the current progress func (t *Tracker) LogProgress(jobID string) { job, err := t.GetJob(jobID) if err != nil { logrus.Errorf("Failed to get job: %v", err) return } completed := t.GetCompletedFiles(jobID) if job.TotalFiles > 0 { percentage := float64(completed) / float64(job.TotalFiles) * 100 logrus.Infof("Progress: %d/%d files (%.2f%%)", completed, job.TotalFiles, percentage) } else { logrus.Infof("Progress: %d files completed", completed) } }