forked from VoxeLibre/VoxeLibre
Add back in test code, get queue working correctly
This commit is contained in:
parent
939d2c9ef0
commit
09f034de16
|
@ -6,6 +6,7 @@ local mod = vl_scheduler
|
|||
|
||||
dofile(modpath.."/queue.lua")
|
||||
dofile(modpath.."/fifo.lua")
|
||||
dofile(modpath.."/test.lua")
|
||||
|
||||
local run_queues = {}
|
||||
for i = 1,4 do
|
||||
|
@ -14,32 +15,61 @@ end
|
|||
local time = 0
|
||||
local priority_queue = mod.queue:new()
|
||||
local functions = {}
|
||||
local function_id_from_name = {}
|
||||
|
||||
local table_unpack = table.unpack
|
||||
local minetest_get_us_time = minetest.get_us_time
|
||||
local queue_add_task = mod.queue.add_task
|
||||
local queue_get = mod.queue.get
|
||||
local queue_tick = mod.queue.tick
|
||||
local fifo_insert = mod.fifo.insert
|
||||
local fifo_get = mod.fifo.get
|
||||
|
||||
function mod.add_task(time, name)
|
||||
local fid = function_id_from_name[name]
|
||||
local task = {
|
||||
time = time,
|
||||
fid = fid,
|
||||
}
|
||||
queue_add_task(priority_queue, task)
|
||||
end
|
||||
|
||||
function mod.register_function(name, func)
|
||||
local fid = #functions + 1
|
||||
functions[fid] = {
|
||||
func = func,
|
||||
name = name,
|
||||
fid = fid,
|
||||
}
|
||||
function_id_from_name = name
|
||||
print("Registering "..name.." as #"..tostring(fid))
|
||||
end
|
||||
|
||||
minetest.register_globalstep(function(dtime)
|
||||
local start_time = minetest.get_us_time()
|
||||
local start_time = minetest_get_us_time()
|
||||
time = time + dtime
|
||||
|
||||
-- Add tasks to the run queues
|
||||
local iter = priority_queue:tick()
|
||||
local iter = queue_tick(priority_queue)
|
||||
while iter do
|
||||
local task = iter
|
||||
iter = iter.next
|
||||
|
||||
local priority = task.priority or 3
|
||||
run_queues[priority]:insert(task)
|
||||
fifo_insert(run_queues[priority], task)
|
||||
end
|
||||
|
||||
-- Run tasks until we run out of timeslice
|
||||
local i = 1
|
||||
while i < 4 and (minetest.get_us_time() - start_time) < 50000 do
|
||||
local task = run_queues[i]:get()
|
||||
while i < 4 and (minetest_get_us_time() - start_time) < 50000 do
|
||||
local task = fifo_get(run_queues[i])
|
||||
if task then
|
||||
print("Running task "..dump(task))
|
||||
local func = functions[task.fid]
|
||||
local cancel = false
|
||||
if func then
|
||||
local err
|
||||
cancel,err = pcall(func, task.dtime, table.unpack(task.args))
|
||||
cancel,err = pcall(func, task.dtime, table_unpack(task.args))
|
||||
if err then
|
||||
minetest.log("error","Error while running task: err")
|
||||
end
|
||||
|
@ -48,11 +78,12 @@ minetest.register_globalstep(function(dtime)
|
|||
-- Add periodic tasks back into the queue
|
||||
if task.period and not cancel then
|
||||
task.time = task.period
|
||||
priority_queue:add_task(task)
|
||||
queue_add_task(priority_queue, task)
|
||||
end
|
||||
else
|
||||
i = i + 1
|
||||
end
|
||||
end
|
||||
print("Total scheduler time: "..tostring(minetest_get_us_time() - start_time).." microseconds")
|
||||
end)
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
name = mcl_scheduler
|
||||
name = vl_scheduler
|
||||
author = teknomunk
|
||||
description = Event and Process Scheduler
|
||||
depends = mcl_util
|
||||
|
|
|
@ -30,11 +30,18 @@ function Class()
|
|||
end
|
||||
|
||||
local inner_queue = Class()
|
||||
|
||||
-- Imperative forward declarations
|
||||
local inner_queue_construct
|
||||
local inner_queue_get
|
||||
local inner_queue_insert_task
|
||||
local inner_queue_add_tasks
|
||||
local inner_queue_init_slot_list
|
||||
|
||||
function inner_queue:construct(level)
|
||||
self.level = level
|
||||
self.items = {}
|
||||
--self.items = {}
|
||||
self.unsorted_count = 0
|
||||
self.slots = 4
|
||||
|
||||
-- Precompute slot size
|
||||
local slot_size = 20
|
||||
|
@ -43,9 +50,14 @@ function inner_queue:construct(level)
|
|||
end
|
||||
self.slot_size = slot_size
|
||||
end
|
||||
inner_queue_construct = inner_queue.construct
|
||||
|
||||
function inner_queue:get()
|
||||
local slots = self.slots
|
||||
local slots = 4
|
||||
local slot = 5 - slots
|
||||
if not self.items then
|
||||
self.items = inner_queue_init_slot_list(self)
|
||||
end
|
||||
local ret = self.items[slot]
|
||||
self.items[slot] = nil
|
||||
|
||||
|
@ -53,7 +65,7 @@ function inner_queue:get()
|
|||
slots = slots - 1
|
||||
if slots == 0 then
|
||||
if self.next_level then
|
||||
local next_level_get = self.next_level:get()
|
||||
local next_level_get = inner_queue_get(self.next_level)
|
||||
if next_level_get then
|
||||
self.items = next_level_get.items
|
||||
else
|
||||
|
@ -64,56 +76,91 @@ function inner_queue:get()
|
|||
end
|
||||
self.slots = slots
|
||||
|
||||
return ret or self:init_slot_list()
|
||||
return ret
|
||||
end
|
||||
function inner_queue:add_tasks(tasks, time)
|
||||
local task = tasks
|
||||
inner_queue_get = inner_queue.get
|
||||
|
||||
function inner_queue:insert_task(task)
|
||||
local slots = self.slots
|
||||
local slot_size = self.slot_size
|
||||
local level = self.level
|
||||
local slots = self.slots
|
||||
|
||||
while task do
|
||||
local t = task.time
|
||||
t = t - time
|
||||
local t = task.time
|
||||
--task.log = tostring(t).."(1)<- "..(task.log or "")
|
||||
--print("<"..tostring(self.level).."> t="..tostring(t)..",task.time="..tostring(task.time)..",time="..tostring(time))
|
||||
if not (t >= 1 ) then
|
||||
error("Invalid time: task="..dump(task))
|
||||
end
|
||||
|
||||
if t > slot_size * slots then
|
||||
-- Add to list for next level in the finger tree
|
||||
task.time = t
|
||||
local curr_task = task
|
||||
task = task.next
|
||||
curr_task.next = self.first_unsorted
|
||||
local count = self.unsorted_count + 1
|
||||
if count > 20 then
|
||||
if not self.next_level then
|
||||
self.next_level = inner_queue:new(self.level + 1)
|
||||
end
|
||||
self.next_level:add_tasks(curr_task, slot_size * slots)
|
||||
self.first_unsorted = nil
|
||||
self.unsorted_count = 0
|
||||
else
|
||||
self.first_unsorted = curr_task
|
||||
self.unsorted_count = count
|
||||
if t > slot_size * slots then
|
||||
-- Add to list for next level in the finger tree
|
||||
local count = self.unsorted_count + 1
|
||||
if count > 20 then
|
||||
if not self.next_level then
|
||||
self.next_level = inner_queue:new(self.level + 1)
|
||||
end
|
||||
else
|
||||
-- Task belongs in a slot on this level
|
||||
local slot = math.floor(t / slot_size) + 1 + ( slots - 4 )
|
||||
t = t % slot_size
|
||||
task.time = t
|
||||
local curr_task = task
|
||||
task = task.next
|
||||
|
||||
local list = self.items[slot] or self:init_slot_list()
|
||||
self.items[slot] = list
|
||||
|
||||
if level == 1 then
|
||||
curr_task.next = list[t]
|
||||
list[t] = curr_task
|
||||
else
|
||||
list:add_tasks(curr_task, (slot - 1) * slot_size)
|
||||
end
|
||||
inner_queue_add_tasks( self.next_level, self.first_unsorted, slot_size * slots)
|
||||
self.first_unsorted = nil
|
||||
self.unsorted_count = 0
|
||||
count = 0
|
||||
end
|
||||
|
||||
task.next = self.first_unsorted
|
||||
self.first_unsorted = task
|
||||
self.unsorted_count = count + 1
|
||||
return
|
||||
end
|
||||
|
||||
-- Task belongs in a slot on this level
|
||||
--print("t="..tostring(t)..",slot_size="..tostring(slot_size)..",slots="..tostring(slots))
|
||||
local slot = math.floor((t-1) / slot_size) + 1 -- + ( slots - 4 )
|
||||
t = (t - 1) % slot_size + 1
|
||||
--print("slot="..tostring(slot)..",t="..tostring(t))
|
||||
task.time = t
|
||||
--task.log = tostring(t).."(2)<- "..(task.log or "")
|
||||
|
||||
-- Lazily initialize items
|
||||
if not self.items then
|
||||
self.items = inner_queue_init_slot_list(self)
|
||||
end
|
||||
|
||||
-- Get the sublist the item belongs in
|
||||
local list = self.items[slot]
|
||||
|
||||
if level == 1 then
|
||||
assert(task.time <= 20)
|
||||
task.next = list[t]
|
||||
list[t] = task
|
||||
|
||||
--print("list="..dump(list))
|
||||
else
|
||||
--print("list="..dump(list))
|
||||
inner_queue_insert_task(list, task, 0)
|
||||
end
|
||||
end
|
||||
inner_queue_insert_task = inner_queue.insert_task
|
||||
|
||||
function inner_queue:add_tasks(tasks, time)
|
||||
--print("inner_queue<"..tostring(self.level)..">:add_tasks()")
|
||||
local task = tasks
|
||||
local slots = self.slots
|
||||
local slot_size = self.slot_size
|
||||
|
||||
--print("This queue handles times 1-"..tostring(slot_size*slots))
|
||||
while task do
|
||||
local curr_task = task
|
||||
task = task.next
|
||||
curr_task.next = nil
|
||||
curr_task.time = curr_task.time - time
|
||||
|
||||
inner_queue_insert_task(self, curr_task)
|
||||
end
|
||||
|
||||
--print("self="..dump(self))
|
||||
end
|
||||
inner_queue_add_tasks = inner_queue.add_tasks
|
||||
|
||||
function inner_queue:init_slot_list()
|
||||
local level = self.level
|
||||
if level == 1 then
|
||||
|
@ -121,10 +168,12 @@ function inner_queue:init_slot_list()
|
|||
else
|
||||
local r = {}
|
||||
for i=1,4 do
|
||||
r[i] = inner_queue:new(self.level - 1)
|
||||
r[i] = inner_queue:new(level - 1)
|
||||
end
|
||||
return r
|
||||
end
|
||||
end
|
||||
inner_queue_init_slot_list = inner_queue.init_slot_list
|
||||
|
||||
local queue = Class()
|
||||
mod.queue = queue
|
||||
|
@ -136,42 +185,52 @@ function queue:construct()
|
|||
end
|
||||
function queue:add_task(task)
|
||||
-- Adjust time to align with the start of the current second
|
||||
task.time = task.time + self.m_tick
|
||||
local t = task.time
|
||||
task.original_time = t
|
||||
t = t + self.m_tick
|
||||
--print("add_task({ time="..tostring(t).." })")
|
||||
|
||||
-- Handle task in current seccond
|
||||
if task.time <= 20 then
|
||||
task.next = self.items[task.time]
|
||||
self.items[task.time] = task
|
||||
if t <= 20 then
|
||||
task.next = self.items[t]
|
||||
self.items[t] = task
|
||||
return
|
||||
end
|
||||
|
||||
local count = self.unsorted_count + 1
|
||||
local count = self.unsorted_count
|
||||
if count > 20 then
|
||||
-- Push to next level
|
||||
self.unsorted_count = 0
|
||||
else
|
||||
-- Add to the list of tasks for later time slots
|
||||
task.next = self.first_unsorted
|
||||
self.first_unsorted = task
|
||||
self.unsorted_count = count
|
||||
inner_queue_add_tasks( self.next_level, self.first_unsorted, 0)
|
||||
self.first_unsorted = nil
|
||||
count = 0
|
||||
end
|
||||
|
||||
-- Add to the list of tasks for later time slots
|
||||
task.next = self.first_unsorted
|
||||
task.time = task.time
|
||||
self.first_unsorted = task
|
||||
self.unsorted_count = count + 1
|
||||
end
|
||||
function queue:tick()
|
||||
-- Get the tasks for this tick
|
||||
local ret = self.items[self.m_tick]
|
||||
self.items[self.m_tick] = nil
|
||||
local ret = nil
|
||||
if self.items then
|
||||
ret = self.items[self.m_tick]
|
||||
self.items[self.m_tick] = nil
|
||||
end
|
||||
self.m_tick = self.m_tick + 1
|
||||
|
||||
-- Handle second rollover
|
||||
if self.m_tick == 21 then
|
||||
-- Push items to next level
|
||||
if self.first_unsorted then
|
||||
self.next_level:add_tasks(self.first_unsorted, 20)
|
||||
inner_queue_add_tasks(self.next_level, self.first_unsorted, 20)
|
||||
self.first_unsorted = nil
|
||||
self.unsorted_count = 0
|
||||
end
|
||||
|
||||
self.items = self.next_level:get()
|
||||
self.items = inner_queue_get(self.next_level)
|
||||
self.m_tick = 1
|
||||
end
|
||||
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
local mod = vl_scheduler
|
||||
function mod.test()
|
||||
local t = mod.queue.new()
|
||||
|
||||
local pr = PseudoRandom(123456789)
|
||||
|
||||
local start_time = minetest.get_us_time()
|
||||
for i=1,500 do
|
||||
t:add_task({ time = pr:next(1,3600) })
|
||||
|
||||
local stop_time = minetest.get_us_time()
|
||||
print("took "..tostring(stop_time - start_time).."us")
|
||||
start_time = stop_time
|
||||
end
|
||||
|
||||
--print(dump(t:tick()))
|
||||
print(dump(t))
|
||||
|
||||
print("starting ticks")
|
||||
|
||||
local start_time = minetest.get_us_time()
|
||||
for i=1,3600 do
|
||||
local s = t:tick()
|
||||
print("time="..tostring(i+1))
|
||||
--print(dump(s))
|
||||
|
||||
local stop_time = minetest.get_us_time()
|
||||
print("took "..tostring(stop_time - start_time).."us")
|
||||
start_time = stop_time
|
||||
end
|
||||
end
|
||||
|
||||
--mod.test()
|
||||
--error("test failed")
|
Loading…
Reference in New Issue