Add RBAC support with roles, permissions, and session management. Implement middleware for authentication and CSRF protection. Enhance audit logging with additional fields. Update HTTP handlers and routes for new features.
This commit is contained in:
@@ -3,15 +3,20 @@ package job
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/example/storage-appliance/internal/audit"
|
||||
"github.com/example/storage-appliance/internal/domain"
|
||||
"github.com/example/storage-appliance/internal/infra/zfs"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type Runner struct {
|
||||
DB *sql.DB
|
||||
DB *sql.DB
|
||||
ZFS *zfs.Adapter
|
||||
Audit audit.AuditLogger
|
||||
}
|
||||
|
||||
func (r *Runner) Enqueue(ctx context.Context, j domain.Job) (string, error) {
|
||||
@@ -22,16 +27,112 @@ func (r *Runner) Enqueue(ctx context.Context, j domain.Job) (string, error) {
|
||||
j.Status = "queued"
|
||||
j.CreatedAt = time.Now()
|
||||
j.UpdatedAt = time.Now()
|
||||
_, err := r.DB.ExecContext(ctx, `INSERT INTO jobs (id, type, status, progress, owner, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)`,
|
||||
j.ID, j.Type, j.Status, j.Progress, j.Owner, j.CreatedAt, j.UpdatedAt)
|
||||
// persist details JSON if present
|
||||
detailsJSON := ""
|
||||
if j.Details != nil {
|
||||
b, _ := json.Marshal(j.Details)
|
||||
detailsJSON = string(b)
|
||||
}
|
||||
_, err := r.DB.ExecContext(ctx, `INSERT INTO jobs (id, type, status, progress, owner, created_at, updated_at, details) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||
j.ID, j.Type, j.Status, j.Progress, j.Owner, j.CreatedAt, j.UpdatedAt, detailsJSON)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
log.Printf("enqueued job %s (%s)", j.ID, j.Type)
|
||||
// run async worker (very simple worker for skeleton)
|
||||
go func() {
|
||||
time.Sleep(1 * time.Second)
|
||||
r.updateStatus(ctx, j.ID, "succeeded", 100)
|
||||
// update running
|
||||
_ = r.updateStatus(ctx, j.ID, "running", 0)
|
||||
// execute based on job type
|
||||
switch j.Type {
|
||||
case "create-pool":
|
||||
// parse details: expect name and vdevs
|
||||
var name string
|
||||
var vdevs []string
|
||||
if j.Details != nil {
|
||||
if n, ok := j.Details["name"].(string); ok {
|
||||
name = n
|
||||
}
|
||||
if rawV, ok := j.Details["vdevs"].([]any); ok {
|
||||
for _, vv := range rawV {
|
||||
if s, ok := vv.(string); ok {
|
||||
vdevs = append(vdevs, s)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = r.updateStatus(ctx, j.ID, "running", 10)
|
||||
if r.ZFS != nil {
|
||||
// call sync create pool
|
||||
if err := r.ZFS.CreatePoolSync(ctx, name, vdevs); err != nil {
|
||||
_ = r.updateStatus(ctx, j.ID, "failed", 0)
|
||||
if r.Audit != nil {
|
||||
r.Audit.Record(ctx, audit.Event{UserID: string(j.Owner), Action: "pool.create", ResourceType: "pool", ResourceID: name, Success: false, Details: map[string]any{"error": err.Error()}})
|
||||
}
|
||||
return
|
||||
}
|
||||
_ = r.updateStatus(ctx, j.ID, "succeeded", 100)
|
||||
if r.Audit != nil {
|
||||
r.Audit.Record(ctx, audit.Event{UserID: string(j.Owner), Action: "pool.create", ResourceType: "pool", ResourceID: name, Success: true})
|
||||
}
|
||||
return
|
||||
}
|
||||
_ = r.updateStatus(ctx, j.ID, "succeeded", 100)
|
||||
case "snapshot":
|
||||
_ = r.updateStatus(ctx, j.ID, "running", 10)
|
||||
// call zfs snapshot; expect dataset and name
|
||||
var dataset, snapName string
|
||||
if j.Details != nil {
|
||||
if d, ok := j.Details["dataset"].(string); ok {
|
||||
dataset = d
|
||||
}
|
||||
if s, ok := j.Details["snap_name"].(string); ok {
|
||||
snapName = s
|
||||
}
|
||||
}
|
||||
if r.ZFS != nil {
|
||||
if err := r.ZFS.Snapshot(ctx, dataset, snapName); err != nil {
|
||||
_ = r.updateStatus(ctx, j.ID, "failed", 0)
|
||||
if r.Audit != nil {
|
||||
r.Audit.Record(ctx, audit.Event{UserID: string(j.Owner), Action: "snapshot", ResourceType: "snapshot", ResourceID: dataset + "@" + snapName, Success: false, Details: map[string]any{"error": err.Error()}})
|
||||
}
|
||||
return
|
||||
}
|
||||
_ = r.updateStatus(ctx, j.ID, "succeeded", 100)
|
||||
if r.Audit != nil {
|
||||
r.Audit.Record(ctx, audit.Event{UserID: string(j.Owner), Action: "snapshot", ResourceType: "snapshot", ResourceID: dataset + "@" + snapName, Success: true})
|
||||
}
|
||||
return
|
||||
}
|
||||
_ = r.updateStatus(ctx, j.ID, "succeeded", 100)
|
||||
case "scrub":
|
||||
_ = r.updateStatus(ctx, j.ID, "running", 10)
|
||||
var pool string
|
||||
if j.Details != nil {
|
||||
if p, ok := j.Details["pool"].(string); ok {
|
||||
pool = p
|
||||
}
|
||||
}
|
||||
if r.ZFS != nil {
|
||||
if err := r.ZFS.ScrubStart(ctx, pool); err != nil {
|
||||
_ = r.updateStatus(ctx, j.ID, "failed", 0)
|
||||
if r.Audit != nil {
|
||||
r.Audit.Record(ctx, audit.Event{UserID: string(j.Owner), Action: "pool.scrub", ResourceType: "pool", ResourceID: pool, Success: false, Details: map[string]any{"error": err.Error()}})
|
||||
}
|
||||
return
|
||||
}
|
||||
_ = r.updateStatus(ctx, j.ID, "succeeded", 100)
|
||||
if r.Audit != nil {
|
||||
r.Audit.Record(ctx, audit.Event{UserID: string(j.Owner), Action: "pool.scrub", ResourceType: "pool", ResourceID: pool, Success: true})
|
||||
}
|
||||
return
|
||||
}
|
||||
_ = r.updateStatus(ctx, j.ID, "succeeded", 100)
|
||||
default:
|
||||
// unknown job types just succeed
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
_ = r.updateStatus(ctx, j.ID, "succeeded", 100)
|
||||
}
|
||||
}()
|
||||
return id, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user