Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 81 additions & 28 deletions plugin/gthulhu/gthulhu.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gthulhu
import (
"context"
"log"
"sync"
"time"

"github.com/Gthulhu/plugin/models"
Expand Down Expand Up @@ -54,14 +55,14 @@ type GthulhuPlugin struct {
// Task pool state
taskPool []Task
taskPoolCount int
taskPoolHead int
taskPoolTail int
poolMu sync.Mutex

// Global vruntime
minVruntime uint64

// Strategy map for PID-based scheduling strategies
strategyMap map[int32]SchedulingStrategy
strategyMu sync.RWMutex

// JWT client for API authentication
jwtClient *JWTClient
Expand All @@ -76,8 +77,6 @@ func NewGthulhuPlugin(sliceNsDefault, sliceNsMin uint64) *GthulhuPlugin {
sliceNsMin: 500 * 1000, // 0.5ms (default)
taskPool: make([]Task, taskPoolSize),
taskPoolCount: 0,
taskPoolHead: 0,
taskPoolTail: 0,
minVruntime: 0,
strategyMap: make(map[int32]SchedulingStrategy),
}
Expand Down Expand Up @@ -132,21 +131,32 @@ func (g *GthulhuPlugin) GetPoolCount() uint64 {
// drainQueuedTask drains tasks from the scheduler queue into the task pool
func (g *GthulhuPlugin) drainQueuedTask(s reg.Sched) int {
var count int
for (g.taskPoolTail+1)%taskPoolSize != g.taskPoolHead {
// Hold the lock across capacity check and insertion to avoid TOCTOU race
for {
g.poolMu.Lock()
if g.taskPoolCount >= taskPoolSize-1 {
g.poolMu.Unlock()
break
}
var newQueuedTask models.QueuedTask
s.DequeueTask(&newQueuedTask)
if newQueuedTask.Pid == -1 {
g.poolMu.Unlock()
return count
}

t := Task{
QueuedTask: &newQueuedTask,
Deadline: g.updatedEnqueueTask(&newQueuedTask),
Timestamp: newQueuedTask.StartTs,
}
g.insertTaskToPool(t)
// Direct heap insert (no second lock acquisition)
g.taskPool[g.taskPoolCount] = t
g.heapSiftUp(g.taskPoolCount)
g.taskPoolCount++
g.poolMu.Unlock()
count++
}
return 0
return count
}

// updatedEnqueueTask updates the task's vtime based on scheduling strategy
Expand Down Expand Up @@ -181,39 +191,74 @@ func saturatingSub(a, b uint64) uint64 {

// getTaskFromPool retrieves a task from the pool
func (g *GthulhuPlugin) getTaskFromPool() *models.QueuedTask {
if g.taskPoolHead == g.taskPoolTail {
// Pop-min from binary heap stored in g.taskPool[0:g.taskPoolCount]
g.poolMu.Lock()
defer g.poolMu.Unlock()
if g.taskPoolCount == 0 {
return nil
}
t := &g.taskPool[g.taskPoolHead]
g.taskPoolHead = (g.taskPoolHead + 1) % taskPoolSize
// Take the root element
top := g.taskPool[0]
g.taskPoolCount--
return t.QueuedTask
if g.taskPoolCount > 0 {
// Move last element to root and sift down
g.taskPool[0] = g.taskPool[g.taskPoolCount]
g.heapSiftDown(0)
}
return top.QueuedTask
}

// insertTaskToPool inserts a task into the pool in sorted order
func (g *GthulhuPlugin) insertTaskToPool(newTask Task) bool {
// In-place binary min-heap using preallocated array
g.poolMu.Lock()
defer g.poolMu.Unlock()
if g.taskPoolCount >= taskPoolSize-1 {
return false
}
insertIdx := g.taskPoolTail
for i := 0; i < g.taskPoolCount; i++ {
idx := (g.taskPoolHead + i) % taskPoolSize
if lessQueuedTask(&newTask, &g.taskPool[idx]) {
insertIdx = idx
// Place at the end and sift up
g.taskPool[g.taskPoolCount] = newTask
g.heapSiftUp(g.taskPoolCount)
g.taskPoolCount++
return true
}

// heapLess compares elements at indices i and j in the heap according to lessQueuedTask
func (g *GthulhuPlugin) heapLess(i, j int) bool {
return lessQueuedTask(&g.taskPool[i], &g.taskPool[j])
}

// heapSiftUp moves the element at idx up to restore heap property
func (g *GthulhuPlugin) heapSiftUp(idx int) {
for idx > 0 {
parent := (idx - 1) / 2
if !g.heapLess(idx, parent) {
break
}
g.taskPool[idx], g.taskPool[parent] = g.taskPool[parent], g.taskPool[idx]
idx = parent
}
}

cur := g.taskPoolTail
for cur != insertIdx {
next := (cur - 1 + taskPoolSize) % taskPoolSize
g.taskPool[cur] = g.taskPool[next]
cur = next
// heapSiftDown moves the element at idx down to restore heap property
func (g *GthulhuPlugin) heapSiftDown(idx int) {
n := g.taskPoolCount
for {
left := 2*idx + 1
if left >= n {
break
}
smallest := left
right := left + 1
if right < n && g.heapLess(right, left) {
smallest = right
}
if !g.heapLess(smallest, idx) {
break
}
g.taskPool[idx], g.taskPool[smallest] = g.taskPool[smallest], g.taskPool[idx]
idx = smallest
}
g.taskPool[insertIdx] = newTask
g.taskPoolTail = (g.taskPoolTail + 1) % taskPoolSize
g.taskPoolCount++
return true
}

// lessQueuedTask compares two tasks for priority ordering
Expand All @@ -229,7 +274,10 @@ func lessQueuedTask(a, b *Task) bool {

// applySchedulingStrategy applies scheduling strategies to a task
func (g *GthulhuPlugin) applySchedulingStrategy(task *models.QueuedTask) bool {
if strategy, exists := g.strategyMap[task.Tgid]; exists {
g.strategyMu.RLock()
strategy, exists := g.strategyMap[task.Tgid]
g.strategyMu.RUnlock()
if exists {
// Apply strategy
if strategy.Priority {
// Priority tasks get minimum vtime
Expand All @@ -242,7 +290,10 @@ func (g *GthulhuPlugin) applySchedulingStrategy(task *models.QueuedTask) bool {

// getTaskExecutionTime returns the custom execution time for a task if defined
func (g *GthulhuPlugin) getTaskExecutionTime(pid int32) uint64 {
if strategy, exists := g.strategyMap[pid]; exists && strategy.ExecutionTime > 0 {
g.strategyMu.RLock()
strategy, exists := g.strategyMap[pid]
g.strategyMu.RUnlock()
if exists && strategy.ExecutionTime > 0 {
return strategy.ExecutionTime
}
return 0
Expand Down Expand Up @@ -306,5 +357,7 @@ func (g *GthulhuPlugin) UpdateStrategyMap(strategies []SchedulingStrategy) {
}

// Replace the old map with the new one
g.strategyMu.Lock()
g.strategyMap = newMap
g.strategyMu.Unlock()
}
47 changes: 47 additions & 0 deletions plugin/gthulhu/gthulhu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,3 +492,50 @@ func TestGthulhuPluginRuntimeSimulation(t *testing.T) {
}
})
}

// TestMinHeapOrderByDeadline verifies that the task pool (min-heap) always pops
// the task with the smallest Deadline first, using Timestamp and Pid as
// tie-breakers per lessQueuedTask.
func TestMinHeapOrderByDeadline(t *testing.T) {
g := NewGthulhuPlugin(0, 0)

// Build tasks with explicit deadlines (representing vtime) and timestamps
type input struct {
pid int32
deadline uint64
timestamp uint64
}
inputs := []input{
{pid: 101, deadline: 30, timestamp: 300},
{pid: 102, deadline: 10, timestamp: 200},
{pid: 103, deadline: 20, timestamp: 100},
{pid: 104, deadline: 10, timestamp: 150},
{pid: 105, deadline: 5, timestamp: 250},
}

for _, in := range inputs {
qt := &models.QueuedTask{Pid: in.pid, StartTs: in.timestamp}
task := Task{QueuedTask: qt, Deadline: in.deadline, Timestamp: in.timestamp}
ok := g.insertTaskToPool(task)
if !ok {
t.Fatalf("failed to insert task pid=%d", in.pid)
}
}

// Expected pop order by (deadline, timestamp, pid)
expected := []int32{105, 104, 102, 103, 101}
got := make([]int32, 0, len(expected))
for range expected {
qt := g.getTaskFromPool()
if qt == nil {
t.Fatalf("expected a task, got nil")
}
got = append(got, qt.Pid)
}

for i := range expected {
if got[i] != expected[i] {
t.Fatalf("pop order mismatch at %d: got %v, want %v", i, got, expected)
}
}
}
Loading