101 lines
1.9 KiB
Go
101 lines
1.9 KiB
Go
package tasks
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
)
|
|
|
|
// Simple in-memory task registry placeholder. Replace with durable store if needed.
|
|
type Task struct {
|
|
ID string
|
|
Status string
|
|
Result interface{}
|
|
Err string
|
|
}
|
|
|
|
type Registry struct {
|
|
mu sync.Mutex
|
|
tasks map[string]Task
|
|
workChan chan workItem
|
|
}
|
|
|
|
type workItem struct {
|
|
taskID string
|
|
fn WorkFunc
|
|
}
|
|
|
|
// WorkFunc performs the actual task work.
|
|
type WorkFunc func(ctx context.Context) (interface{}, error)
|
|
|
|
func NewRegistry() *Registry {
|
|
return &Registry{
|
|
tasks: make(map[string]Task),
|
|
workChan: make(chan workItem, 64),
|
|
}
|
|
}
|
|
|
|
func (r *Registry) Add(t Task) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
r.tasks[t.ID] = t
|
|
}
|
|
|
|
// EnqueueWork registers a task and queues it for asynchronous processing.
|
|
func (r *Registry) EnqueueWork(id string, fn WorkFunc) {
|
|
r.Add(Task{ID: id, Status: "queued"})
|
|
r.workChan <- workItem{taskID: id, fn: fn}
|
|
}
|
|
|
|
func (r *Registry) Update(id string, status string, result interface{}) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if t, ok := r.tasks[id]; ok {
|
|
t.Status = status
|
|
t.Result = result
|
|
r.tasks[id] = t
|
|
}
|
|
}
|
|
|
|
func (r *Registry) Fail(id string, err error) {
|
|
if err == nil {
|
|
err = errors.New("unknown error")
|
|
}
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if t, ok := r.tasks[id]; ok {
|
|
t.Status = "error"
|
|
t.Err = err.Error()
|
|
r.tasks[id] = t
|
|
}
|
|
}
|
|
|
|
func (r *Registry) Get(id string) (Task, bool) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
t, ok := r.tasks[id]
|
|
return t, ok
|
|
}
|
|
|
|
// StartWorker processes queued work sequentially. Caller should run this in a goroutine.
|
|
func (r *Registry) StartWorker(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case item := <-r.workChan:
|
|
r.runItem(ctx, item)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Registry) runItem(ctx context.Context, item workItem) {
|
|
r.Update(item.taskID, "running", nil)
|
|
res, err := item.fn(ctx)
|
|
if err != nil {
|
|
r.Fail(item.taskID, err)
|
|
return
|
|
}
|
|
r.Update(item.taskID, "completed", res)
|
|
}
|