initial commit
This commit is contained in:
181
internal/tracker/tracker.go
Normal file
181
internal/tracker/tracker.go
Normal file
@@ -0,0 +1,181 @@
|
||||
package tracker
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
const (
|
||||
StatusPending = "pending"
|
||||
StatusInProgress = "in_progress"
|
||||
StatusCompleted = "completed"
|
||||
StatusFailed = "failed"
|
||||
)
|
||||
|
||||
type MigrationJob struct {
|
||||
ID string `json:"id"`
|
||||
SourceName string `json:"source_name"`
|
||||
DestName string `json:"dest_name"`
|
||||
Status string `json:"status"`
|
||||
StartTime time.Time `json:"start_time"`
|
||||
EndTime time.Time `json:"end_time,omitempty"`
|
||||
TotalFiles int64 `json:"total_files"`
|
||||
CompletedFiles int64 `json:"completed_files"`
|
||||
ErrorMessage string `json:"error_message,omitempty"`
|
||||
}
|
||||
|
||||
type FileEntry struct {
|
||||
Path string `json:"path"`
|
||||
Size int64 `json:"size"`
|
||||
Status string `json:"status"`
|
||||
TransferTime time.Time `json:"transfer_time,omitempty"`
|
||||
ErrorMessage string `json:"error_message,omitempty"`
|
||||
}
|
||||
|
||||
type Tracker struct {
|
||||
db *bolt.DB
|
||||
}
|
||||
|
||||
var (
|
||||
jobsBucket = []byte("jobs")
|
||||
filesBucket = []byte("files")
|
||||
)
|
||||
|
||||
// NewTracker creates a new tracker instance
|
||||
func NewTracker(dbPath string) (*Tracker, error) {
|
||||
db, err := bolt.Open(dbPath, 0600, &bolt.Options{Timeout: 1 * time.Second})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open database: %w", err)
|
||||
}
|
||||
|
||||
// Create buckets if they don't exist
|
||||
err = db.Update(func(tx *bolt.Tx) error {
|
||||
if _, err := tx.CreateBucketIfNotExists(jobsBucket); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.CreateBucketIfNotExists(filesBucket); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("failed to create buckets: %w", err)
|
||||
}
|
||||
|
||||
return &Tracker{db: db}, nil
|
||||
}
|
||||
|
||||
// Close closes the database connection
|
||||
func (t *Tracker) Close() error {
|
||||
return t.db.Close()
|
||||
}
|
||||
|
||||
// CreateJob creates a new migration job
|
||||
func (t *Tracker) CreateJob(job *MigrationJob) error {
|
||||
return t.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(jobsBucket)
|
||||
data, err := json.Marshal(job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return b.Put([]byte(job.ID), data)
|
||||
})
|
||||
}
|
||||
|
||||
// UpdateJob updates an existing migration job
|
||||
func (t *Tracker) UpdateJob(job *MigrationJob) error {
|
||||
return t.CreateJob(job) // Same as create for now
|
||||
}
|
||||
|
||||
// GetJob retrieves a migration job by ID
|
||||
func (t *Tracker) GetJob(jobID string) (*MigrationJob, error) {
|
||||
var job MigrationJob
|
||||
err := t.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(jobsBucket)
|
||||
data := b.Get([]byte(jobID))
|
||||
if data == nil {
|
||||
return fmt.Errorf("job not found")
|
||||
}
|
||||
return json.Unmarshal(data, &job)
|
||||
})
|
||||
return &job, err
|
||||
}
|
||||
|
||||
// MarkFileCompleted marks a file as completed
|
||||
func (t *Tracker) MarkFileCompleted(jobID, filePath string, size int64) error {
|
||||
file := &FileEntry{
|
||||
Path: filePath,
|
||||
Size: size,
|
||||
Status: StatusCompleted,
|
||||
TransferTime: time.Now(),
|
||||
}
|
||||
|
||||
return t.db.Update(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(filesBucket)
|
||||
key := fmt.Sprintf("%s:%s", jobID, filePath)
|
||||
data, err := json.Marshal(file)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return b.Put([]byte(key), data)
|
||||
})
|
||||
}
|
||||
|
||||
// IsFileCompleted checks if a file has been completed
|
||||
func (t *Tracker) IsFileCompleted(jobID, filePath string) bool {
|
||||
var completed bool
|
||||
t.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(filesBucket)
|
||||
key := fmt.Sprintf("%s:%s", jobID, filePath)
|
||||
data := b.Get([]byte(key))
|
||||
if data != nil {
|
||||
var file FileEntry
|
||||
if err := json.Unmarshal(data, &file); err == nil {
|
||||
completed = file.Status == StatusCompleted
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return completed
|
||||
}
|
||||
|
||||
// GetCompletedFiles returns the count of completed files for a job
|
||||
func (t *Tracker) GetCompletedFiles(jobID string) int64 {
|
||||
var count int64
|
||||
t.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(filesBucket)
|
||||
c := b.Cursor()
|
||||
prefix := []byte(jobID + ":")
|
||||
|
||||
for k, v := c.Seek(prefix); k != nil && len(k) > len(prefix) && string(k[:len(prefix)]) == string(prefix); k, v = c.Next() {
|
||||
var file FileEntry
|
||||
if err := json.Unmarshal(v, &file); err == nil && file.Status == StatusCompleted {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return count
|
||||
}
|
||||
|
||||
// LogProgress logs the current progress
|
||||
func (t *Tracker) LogProgress(jobID string) {
|
||||
job, err := t.GetJob(jobID)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to get job: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
completed := t.GetCompletedFiles(jobID)
|
||||
if job.TotalFiles > 0 {
|
||||
percentage := float64(completed) / float64(job.TotalFiles) * 100
|
||||
logrus.Infof("Progress: %d/%d files (%.2f%%)", completed, job.TotalFiles, percentage)
|
||||
} else {
|
||||
logrus.Infof("Progress: %d files completed", completed)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user