Files
storage-appliance/internal/job/runner.go

144 lines
4.6 KiB
Go

package job
import (
"context"
"database/sql"
"encoding/json"
"log"
"time"
"github.com/example/storage-appliance/internal/audit"
"github.com/example/storage-appliance/internal/domain"
"github.com/example/storage-appliance/internal/infra/zfs"
"github.com/google/uuid"
)
type Runner struct {
DB *sql.DB
ZFS *zfs.Adapter
Audit audit.AuditLogger
}
func (r *Runner) Enqueue(ctx context.Context, j domain.Job) (string, error) {
id := uuid.New().String()
if j.ID == "" {
j.ID = domain.UUID(id)
}
j.Status = "queued"
j.CreatedAt = time.Now()
j.UpdatedAt = time.Now()
// persist details JSON if present
detailsJSON := ""
if j.Details != nil {
b, _ := json.Marshal(j.Details)
detailsJSON = string(b)
}
_, err := r.DB.ExecContext(ctx, `INSERT INTO jobs (id, type, status, progress, owner, created_at, updated_at, details) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
j.ID, j.Type, j.Status, j.Progress, j.Owner, j.CreatedAt, j.UpdatedAt, detailsJSON)
if err != nil {
return "", err
}
log.Printf("enqueued job %s (%s)", j.ID, j.Type)
// run async worker (very simple worker for skeleton)
go func() {
// update running
_ = r.updateStatus(ctx, j.ID, "running", 0)
// execute based on job type
switch j.Type {
case "create-pool":
// parse details: expect name and vdevs
var name string
var vdevs []string
if j.Details != nil {
if n, ok := j.Details["name"].(string); ok {
name = n
}
if rawV, ok := j.Details["vdevs"].([]any); ok {
for _, vv := range rawV {
if s, ok := vv.(string); ok {
vdevs = append(vdevs, s)
}
}
}
}
_ = r.updateStatus(ctx, j.ID, "running", 10)
if r.ZFS != nil {
// call sync create pool
if err := r.ZFS.CreatePoolSync(ctx, name, vdevs); err != nil {
_ = r.updateStatus(ctx, j.ID, "failed", 0)
if r.Audit != nil {
r.Audit.Record(ctx, audit.Event{UserID: string(j.Owner), Action: "pool.create", ResourceType: "pool", ResourceID: name, Success: false, Details: map[string]any{"error": err.Error()}})
}
return
}
_ = r.updateStatus(ctx, j.ID, "succeeded", 100)
if r.Audit != nil {
r.Audit.Record(ctx, audit.Event{UserID: string(j.Owner), Action: "pool.create", ResourceType: "pool", ResourceID: name, Success: true})
}
return
}
_ = r.updateStatus(ctx, j.ID, "succeeded", 100)
case "snapshot":
_ = r.updateStatus(ctx, j.ID, "running", 10)
// call zfs snapshot; expect dataset and name
var dataset, snapName string
if j.Details != nil {
if d, ok := j.Details["dataset"].(string); ok {
dataset = d
}
if s, ok := j.Details["snap_name"].(string); ok {
snapName = s
}
}
if r.ZFS != nil {
if err := r.ZFS.Snapshot(ctx, dataset, snapName); err != nil {
_ = r.updateStatus(ctx, j.ID, "failed", 0)
if r.Audit != nil {
r.Audit.Record(ctx, audit.Event{UserID: string(j.Owner), Action: "snapshot", ResourceType: "snapshot", ResourceID: dataset + "@" + snapName, Success: false, Details: map[string]any{"error": err.Error()}})
}
return
}
_ = r.updateStatus(ctx, j.ID, "succeeded", 100)
if r.Audit != nil {
r.Audit.Record(ctx, audit.Event{UserID: string(j.Owner), Action: "snapshot", ResourceType: "snapshot", ResourceID: dataset + "@" + snapName, Success: true})
}
return
}
_ = r.updateStatus(ctx, j.ID, "succeeded", 100)
case "scrub":
_ = r.updateStatus(ctx, j.ID, "running", 10)
var pool string
if j.Details != nil {
if p, ok := j.Details["pool"].(string); ok {
pool = p
}
}
if r.ZFS != nil {
if err := r.ZFS.ScrubStart(ctx, pool); err != nil {
_ = r.updateStatus(ctx, j.ID, "failed", 0)
if r.Audit != nil {
r.Audit.Record(ctx, audit.Event{UserID: string(j.Owner), Action: "pool.scrub", ResourceType: "pool", ResourceID: pool, Success: false, Details: map[string]any{"error": err.Error()}})
}
return
}
_ = r.updateStatus(ctx, j.ID, "succeeded", 100)
if r.Audit != nil {
r.Audit.Record(ctx, audit.Event{UserID: string(j.Owner), Action: "pool.scrub", ResourceType: "pool", ResourceID: pool, Success: true})
}
return
}
_ = r.updateStatus(ctx, j.ID, "succeeded", 100)
default:
// unknown job types just succeed
time.Sleep(500 * time.Millisecond)
_ = r.updateStatus(ctx, j.ID, "succeeded", 100)
}
}()
return id, nil
}
func (r *Runner) updateStatus(ctx context.Context, id domain.UUID, status string, progress int) error {
_, err := r.DB.ExecContext(ctx, `UPDATE jobs SET status = ?, progress = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?`, status, progress, id)
return err
}