Add back in test code, get queue working correctly

This commit is contained in:
teknomunk 2024-04-13 21:26:25 +00:00
parent 55d521bc50
commit 3f82320411
4 changed files with 191 additions and 67 deletions

View File

@ -6,6 +6,7 @@ local mod = vl_scheduler
dofile(modpath.."/queue.lua")
dofile(modpath.."/fifo.lua")
dofile(modpath.."/test.lua")
local run_queues = {}
for i = 1,4 do
@ -14,32 +15,61 @@ end
local time = 0
local priority_queue = mod.queue:new()
local functions = {}
local function_id_from_name = {}
local table_unpack = table.unpack
local minetest_get_us_time = minetest.get_us_time
local queue_add_task = mod.queue.add_task
local queue_get = mod.queue.get
local queue_tick = mod.queue.tick
local fifo_insert = mod.fifo.insert
local fifo_get = mod.fifo.get
function mod.add_task(time, name)
local fid = function_id_from_name[name]
local task = {
time = time,
fid = fid,
}
queue_add_task(priority_queue, task)
end
function mod.register_function(name, func)
local fid = #functions + 1
functions[fid] = {
func = func,
name = name,
fid = fid,
}
function_id_from_name = name
print("Registering "..name.." as #"..tostring(fid))
end
minetest.register_globalstep(function(dtime)
local start_time = minetest.get_us_time()
local start_time = minetest_get_us_time()
time = time + dtime
-- Add tasks to the run queues
local iter = priority_queue:tick()
local iter = queue_tick(priority_queue)
while iter do
local task = iter
iter = iter.next
local priority = task.priority or 3
run_queues[priority]:insert(task)
fifo_insert(run_queues[priority], task)
end
-- Run tasks until we run out of timeslice
local i = 1
while i < 4 and (minetest.get_us_time() - start_time) < 50000 do
local task = run_queues[i]:get()
while i < 4 and (minetest_get_us_time() - start_time) < 50000 do
local task = fifo_get(run_queues[i])
if task then
print("Running task "..dump(task))
local func = functions[task.fid]
local cancel = false
if func then
local err
cancel,err = pcall(func, task.dtime, table.unpack(task.args))
cancel,err = pcall(func, task.dtime, table_unpack(task.args))
if err then
minetest.log("error","Error while running task: err")
end
@ -48,11 +78,12 @@ minetest.register_globalstep(function(dtime)
-- Add periodic tasks back into the queue
if task.period and not cancel then
task.time = task.period
priority_queue:add_task(task)
queue_add_task(priority_queue, task)
end
else
i = i + 1
end
end
print("Total scheduler time: "..tostring(minetest_get_us_time() - start_time).." microseconds")
end)

View File

@ -1,4 +1,4 @@
name = mcl_scheduler
name = vl_scheduler
author = teknomunk
description = Event and Process Scheduler
depends = mcl_util

View File

@ -30,11 +30,18 @@ function Class()
end
local inner_queue = Class()
-- Imperative forward declarations
local inner_queue_construct
local inner_queue_get
local inner_queue_insert_task
local inner_queue_add_tasks
local inner_queue_init_slot_list
function inner_queue:construct(level)
self.level = level
self.items = {}
--self.items = {}
self.unsorted_count = 0
self.slots = 4
-- Precompute slot size
local slot_size = 20
@ -43,9 +50,14 @@ function inner_queue:construct(level)
end
self.slot_size = slot_size
end
inner_queue_construct = inner_queue.construct
function inner_queue:get()
local slots = self.slots
local slots = 4
local slot = 5 - slots
if not self.items then
self.items = inner_queue_init_slot_list(self)
end
local ret = self.items[slot]
self.items[slot] = nil
@ -53,7 +65,7 @@ function inner_queue:get()
slots = slots - 1
if slots == 0 then
if self.next_level then
local next_level_get = self.next_level:get()
local next_level_get = inner_queue_get(self.next_level)
if next_level_get then
self.items = next_level_get.items
else
@ -64,56 +76,91 @@ function inner_queue:get()
end
self.slots = slots
return ret or self:init_slot_list()
return ret
end
function inner_queue:add_tasks(tasks, time)
local task = tasks
inner_queue_get = inner_queue.get
function inner_queue:insert_task(task)
local slots = self.slots
local slot_size = self.slot_size
local level = self.level
local slots = self.slots
while task do
local t = task.time
t = t - time
local t = task.time
--task.log = tostring(t).."(1)<- "..(task.log or "")
--print("<"..tostring(self.level).."> t="..tostring(t)..",task.time="..tostring(task.time)..",time="..tostring(time))
if not (t >= 1 ) then
error("Invalid time: task="..dump(task))
end
if t > slot_size * slots then
-- Add to list for next level in the finger tree
task.time = t
local curr_task = task
task = task.next
curr_task.next = self.first_unsorted
local count = self.unsorted_count + 1
if count > 20 then
if not self.next_level then
self.next_level = inner_queue:new(self.level + 1)
end
self.next_level:add_tasks(curr_task, slot_size * slots)
self.first_unsorted = nil
self.unsorted_count = 0
else
self.first_unsorted = curr_task
self.unsorted_count = count
if t > slot_size * slots then
-- Add to list for next level in the finger tree
local count = self.unsorted_count + 1
if count > 20 then
if not self.next_level then
self.next_level = inner_queue:new(self.level + 1)
end
else
-- Task belongs in a slot on this level
local slot = math.floor(t / slot_size) + 1 + ( slots - 4 )
t = t % slot_size
task.time = t
local curr_task = task
task = task.next
local list = self.items[slot] or self:init_slot_list()
self.items[slot] = list
if level == 1 then
curr_task.next = list[t]
list[t] = curr_task
else
list:add_tasks(curr_task, (slot - 1) * slot_size)
end
inner_queue_add_tasks( self.next_level, self.first_unsorted, slot_size * slots)
self.first_unsorted = nil
self.unsorted_count = 0
count = 0
end
task.next = self.first_unsorted
self.first_unsorted = task
self.unsorted_count = count + 1
return
end
-- Task belongs in a slot on this level
--print("t="..tostring(t)..",slot_size="..tostring(slot_size)..",slots="..tostring(slots))
local slot = math.floor((t-1) / slot_size) + 1 -- + ( slots - 4 )
t = (t - 1) % slot_size + 1
--print("slot="..tostring(slot)..",t="..tostring(t))
task.time = t
--task.log = tostring(t).."(2)<- "..(task.log or "")
-- Lazily initialize items
if not self.items then
self.items = inner_queue_init_slot_list(self)
end
-- Get the sublist the item belongs in
local list = self.items[slot]
if level == 1 then
assert(task.time <= 20)
task.next = list[t]
list[t] = task
--print("list="..dump(list))
else
--print("list="..dump(list))
inner_queue_insert_task(list, task, 0)
end
end
inner_queue_insert_task = inner_queue.insert_task
function inner_queue:add_tasks(tasks, time)
--print("inner_queue<"..tostring(self.level)..">:add_tasks()")
local task = tasks
local slots = self.slots
local slot_size = self.slot_size
--print("This queue handles times 1-"..tostring(slot_size*slots))
while task do
local curr_task = task
task = task.next
curr_task.next = nil
curr_task.time = curr_task.time - time
inner_queue_insert_task(self, curr_task)
end
--print("self="..dump(self))
end
inner_queue_add_tasks = inner_queue.add_tasks
function inner_queue:init_slot_list()
local level = self.level
if level == 1 then
@ -121,10 +168,12 @@ function inner_queue:init_slot_list()
else
local r = {}
for i=1,4 do
r[i] = inner_queue:new(self.level - 1)
r[i] = inner_queue:new(level - 1)
end
return r
end
end
inner_queue_init_slot_list = inner_queue.init_slot_list
local queue = Class()
mod.queue = queue
@ -136,42 +185,52 @@ function queue:construct()
end
function queue:add_task(task)
-- Adjust time to align with the start of the current second
task.time = task.time + self.m_tick
local t = task.time
task.original_time = t
t = t + self.m_tick
--print("add_task({ time="..tostring(t).." })")
-- Handle task in current seccond
if task.time <= 20 then
task.next = self.items[task.time]
self.items[task.time] = task
if t <= 20 then
task.next = self.items[t]
self.items[t] = task
return
end
local count = self.unsorted_count + 1
local count = self.unsorted_count
if count > 20 then
-- Push to next level
self.unsorted_count = 0
else
-- Add to the list of tasks for later time slots
task.next = self.first_unsorted
self.first_unsorted = task
self.unsorted_count = count
inner_queue_add_tasks( self.next_level, self.first_unsorted, 0)
self.first_unsorted = nil
count = 0
end
-- Add to the list of tasks for later time slots
task.next = self.first_unsorted
task.time = task.time
self.first_unsorted = task
self.unsorted_count = count + 1
end
function queue:tick()
-- Get the tasks for this tick
local ret = self.items[self.m_tick]
self.items[self.m_tick] = nil
local ret = nil
if self.items then
ret = self.items[self.m_tick]
self.items[self.m_tick] = nil
end
self.m_tick = self.m_tick + 1
-- Handle second rollover
if self.m_tick == 21 then
-- Push items to next level
if self.first_unsorted then
self.next_level:add_tasks(self.first_unsorted, 20)
inner_queue_add_tasks(self.next_level, self.first_unsorted, 20)
self.first_unsorted = nil
self.unsorted_count = 0
end
self.items = self.next_level:get()
self.items = inner_queue_get(self.next_level)
self.m_tick = 1
end

View File

@ -0,0 +1,34 @@
local mod = vl_scheduler
function mod.test()
local t = mod.queue.new()
local pr = PseudoRandom(123456789)
local start_time = minetest.get_us_time()
for i=1,500 do
t:add_task({ time = pr:next(1,3600) })
local stop_time = minetest.get_us_time()
print("took "..tostring(stop_time - start_time).."us")
start_time = stop_time
end
--print(dump(t:tick()))
print(dump(t))
print("starting ticks")
local start_time = minetest.get_us_time()
for i=1,3600 do
local s = t:tick()
print("time="..tostring(i+1))
--print(dump(s))
local stop_time = minetest.get_us_time()
print("took "..tostring(stop_time - start_time).."us")
start_time = stop_time
end
end
--mod.test()
--error("test failed")