diff --git a/plugin/gthulhu/gthulhu.go b/plugin/gthulhu/gthulhu.go index 20ed043..78eaee9 100644 --- a/plugin/gthulhu/gthulhu.go +++ b/plugin/gthulhu/gthulhu.go @@ -3,6 +3,7 @@ package gthulhu import ( "context" "log" + "sync" "time" "github.com/Gthulhu/plugin/models" @@ -54,14 +55,14 @@ type GthulhuPlugin struct { // Task pool state taskPool []Task taskPoolCount int - taskPoolHead int - taskPoolTail int + poolMu sync.Mutex // Global vruntime minVruntime uint64 // Strategy map for PID-based scheduling strategies strategyMap map[int32]SchedulingStrategy + strategyMu sync.RWMutex // JWT client for API authentication jwtClient *JWTClient @@ -76,8 +77,6 @@ func NewGthulhuPlugin(sliceNsDefault, sliceNsMin uint64) *GthulhuPlugin { sliceNsMin: 500 * 1000, // 0.5ms (default) taskPool: make([]Task, taskPoolSize), taskPoolCount: 0, - taskPoolHead: 0, - taskPoolTail: 0, minVruntime: 0, strategyMap: make(map[int32]SchedulingStrategy), } @@ -132,21 +131,32 @@ func (g *GthulhuPlugin) GetPoolCount() uint64 { // drainQueuedTask drains tasks from the scheduler queue into the task pool func (g *GthulhuPlugin) drainQueuedTask(s reg.Sched) int { var count int - for (g.taskPoolTail+1)%taskPoolSize != g.taskPoolHead { + // Hold the lock across capacity check and insertion to avoid TOCTOU race + for { + g.poolMu.Lock() + if g.taskPoolCount >= taskPoolSize-1 { + g.poolMu.Unlock() + break + } var newQueuedTask models.QueuedTask s.DequeueTask(&newQueuedTask) if newQueuedTask.Pid == -1 { + g.poolMu.Unlock() return count } - t := Task{ QueuedTask: &newQueuedTask, Deadline: g.updatedEnqueueTask(&newQueuedTask), + Timestamp: newQueuedTask.StartTs, } - g.insertTaskToPool(t) + // Direct heap insert (no second lock acquisition) + g.taskPool[g.taskPoolCount] = t + g.heapSiftUp(g.taskPoolCount) + g.taskPoolCount++ + g.poolMu.Unlock() count++ } - return 0 + return count } // updatedEnqueueTask updates the task's vtime based on scheduling strategy @@ -181,39 +191,74 @@ func saturatingSub(a, b uint64) uint64 { // getTaskFromPool retrieves a task from the pool func (g *GthulhuPlugin) getTaskFromPool() *models.QueuedTask { - if g.taskPoolHead == g.taskPoolTail { + // Pop-min from binary heap stored in g.taskPool[0:g.taskPoolCount] + g.poolMu.Lock() + defer g.poolMu.Unlock() + if g.taskPoolCount == 0 { return nil } - t := &g.taskPool[g.taskPoolHead] - g.taskPoolHead = (g.taskPoolHead + 1) % taskPoolSize + // Take the root element + top := g.taskPool[0] g.taskPoolCount-- - return t.QueuedTask + if g.taskPoolCount > 0 { + // Move last element to root and sift down + g.taskPool[0] = g.taskPool[g.taskPoolCount] + g.heapSiftDown(0) + } + return top.QueuedTask } // insertTaskToPool inserts a task into the pool in sorted order func (g *GthulhuPlugin) insertTaskToPool(newTask Task) bool { + // In-place binary min-heap using preallocated array + g.poolMu.Lock() + defer g.poolMu.Unlock() if g.taskPoolCount >= taskPoolSize-1 { return false } - insertIdx := g.taskPoolTail - for i := 0; i < g.taskPoolCount; i++ { - idx := (g.taskPoolHead + i) % taskPoolSize - if lessQueuedTask(&newTask, &g.taskPool[idx]) { - insertIdx = idx + // Place at the end and sift up + g.taskPool[g.taskPoolCount] = newTask + g.heapSiftUp(g.taskPoolCount) + g.taskPoolCount++ + return true +} + +// heapLess compares elements at indices i and j in the heap according to lessQueuedTask +func (g *GthulhuPlugin) heapLess(i, j int) bool { + return lessQueuedTask(&g.taskPool[i], &g.taskPool[j]) +} + +// heapSiftUp moves the element at idx up to restore heap property +func (g *GthulhuPlugin) heapSiftUp(idx int) { + for idx > 0 { + parent := (idx - 1) / 2 + if !g.heapLess(idx, parent) { break } + g.taskPool[idx], g.taskPool[parent] = g.taskPool[parent], g.taskPool[idx] + idx = parent } +} - cur := g.taskPoolTail - for cur != insertIdx { - next := (cur - 1 + taskPoolSize) % taskPoolSize - g.taskPool[cur] = g.taskPool[next] - cur = next +// heapSiftDown moves the element at idx down to restore heap property +func (g *GthulhuPlugin) heapSiftDown(idx int) { + n := g.taskPoolCount + for { + left := 2*idx + 1 + if left >= n { + break + } + smallest := left + right := left + 1 + if right < n && g.heapLess(right, left) { + smallest = right + } + if !g.heapLess(smallest, idx) { + break + } + g.taskPool[idx], g.taskPool[smallest] = g.taskPool[smallest], g.taskPool[idx] + idx = smallest } - g.taskPool[insertIdx] = newTask - g.taskPoolTail = (g.taskPoolTail + 1) % taskPoolSize - g.taskPoolCount++ - return true } // lessQueuedTask compares two tasks for priority ordering @@ -229,7 +274,10 @@ func lessQueuedTask(a, b *Task) bool { // applySchedulingStrategy applies scheduling strategies to a task func (g *GthulhuPlugin) applySchedulingStrategy(task *models.QueuedTask) bool { - if strategy, exists := g.strategyMap[task.Tgid]; exists { + g.strategyMu.RLock() + strategy, exists := g.strategyMap[task.Tgid] + g.strategyMu.RUnlock() + if exists { // Apply strategy if strategy.Priority { // Priority tasks get minimum vtime @@ -242,7 +290,10 @@ func (g *GthulhuPlugin) applySchedulingStrategy(task *models.QueuedTask) bool { // getTaskExecutionTime returns the custom execution time for a task if defined func (g *GthulhuPlugin) getTaskExecutionTime(pid int32) uint64 { - if strategy, exists := g.strategyMap[pid]; exists && strategy.ExecutionTime > 0 { + g.strategyMu.RLock() + strategy, exists := g.strategyMap[pid] + g.strategyMu.RUnlock() + if exists && strategy.ExecutionTime > 0 { return strategy.ExecutionTime } return 0 @@ -306,5 +357,7 @@ func (g *GthulhuPlugin) UpdateStrategyMap(strategies []SchedulingStrategy) { } // Replace the old map with the new one + g.strategyMu.Lock() g.strategyMap = newMap + g.strategyMu.Unlock() } diff --git a/plugin/gthulhu/gthulhu_test.go b/plugin/gthulhu/gthulhu_test.go index 47af86c..671a863 100644 --- a/plugin/gthulhu/gthulhu_test.go +++ b/plugin/gthulhu/gthulhu_test.go @@ -492,3 +492,50 @@ func TestGthulhuPluginRuntimeSimulation(t *testing.T) { } }) } + +// TestMinHeapOrderByDeadline verifies that the task pool (min-heap) always pops +// the task with the smallest Deadline first, using Timestamp and Pid as +// tie-breakers per lessQueuedTask. +func TestMinHeapOrderByDeadline(t *testing.T) { + g := NewGthulhuPlugin(0, 0) + + // Build tasks with explicit deadlines (representing vtime) and timestamps + type input struct { + pid int32 + deadline uint64 + timestamp uint64 + } + inputs := []input{ + {pid: 101, deadline: 30, timestamp: 300}, + {pid: 102, deadline: 10, timestamp: 200}, + {pid: 103, deadline: 20, timestamp: 100}, + {pid: 104, deadline: 10, timestamp: 150}, + {pid: 105, deadline: 5, timestamp: 250}, + } + + for _, in := range inputs { + qt := &models.QueuedTask{Pid: in.pid, StartTs: in.timestamp} + task := Task{QueuedTask: qt, Deadline: in.deadline, Timestamp: in.timestamp} + ok := g.insertTaskToPool(task) + if !ok { + t.Fatalf("failed to insert task pid=%d", in.pid) + } + } + + // Expected pop order by (deadline, timestamp, pid) + expected := []int32{105, 104, 102, 103, 101} + got := make([]int32, 0, len(expected)) + for range expected { + qt := g.getTaskFromPool() + if qt == nil { + t.Fatalf("expected a task, got nil") + } + got = append(got, qt.Pid) + } + + for i := range expected { + if got[i] != expected[i] { + t.Fatalf("pop order mismatch at %d: got %v, want %v", i, got, expected) + } + } +}