Select Git revision
TaskScheduler.cpp
-
Niccolò Betto authoredNiccolò Betto authored
TaskScheduler.cpp 8.46 KiB
/* Copyright (c) 2015-2016 Skyward Experimental Rocketry
* Authors: Alain Carlucci, Federico Terraneo, Matteo Piazzolla
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "TaskScheduler.h"
#include <diagnostic/SkywardStack.h>
#include <algorithm>
using namespace std;
using namespace miosix;
namespace Boardcore
{
TaskScheduler::TaskScheduler(miosix::Priority priority)
: ActiveObject(STACK_MIN_FOR_SKYWARD, priority)
{
// Create dynamically the tasks vector because of too much space
tasks = new std::array<Task, TASKS_SIZE>();
// Initialize the vector elements
for (size_t i = 1; i < TASKS_SIZE; i++)
{
function_t function;
(*tasks)[i] = makeTask(function, 0, i, false, Policy::SKIP);
}
}
TaskScheduler::~TaskScheduler() { delete tasks; }
size_t TaskScheduler::addTask(function_t function, uint32_t period,
Policy policy, int64_t startTick)
{
Lock<FastMutex> lock(mutex);
size_t id = 1;
// Find a suitable id for the new task
for (; id < TASKS_SIZE; id++)
{
if ((*tasks)[id].valid == false)
{
break;
}
}
// Check if in the corresponding id there's already a task
if ((*tasks)[id].valid)
{
// Unlock the mutex for expensive operation
Unlock<FastMutex> unlock(mutex);
LOG_ERR(logger, "Full task scheduler, id = {:zu}", id);
return 0;
}
// Register the task into the map
(*tasks)[id] = makeTask(function, period, id, true, policy);
if (policy == Policy::ONE_SHOT)
{
startTick += period;
}
// Add the task first event in the agenda, performs in-place construction
agenda.emplace(id, startTick);
condvar.broadcast(); // Signals the run thread
return id;
}
bool TaskScheduler::removeTask(size_t id)
{
Lock<FastMutex> lock(mutex);
// Check if the task is actually present
if ((*tasks)[id].valid == false)
{
// Unlock the mutex for expensive operation
Unlock<FastMutex> unlock(mutex);
LOG_ERR(logger, "Attempting to remove a task not registered");
return false;
}
// Set the validity of the task to false
(*tasks)[id].valid = false;
return true;
}
bool TaskScheduler::start()
{
// This check is necessary to prevent task normalization if the scheduler is
// already stopped
if (running)
{
return false;
}
// Normalize the tasks start time if they precede the current tick
normalizeTasks();
return ActiveObject::start();
}
void TaskScheduler::stop()
{
stopFlag = true; // Signal the run function to stop
condvar.broadcast(); // Wake the run function even if there are no tasks
ActiveObject::stop();
}
vector<TaskStatsResult> TaskScheduler::getTaskStats()
{
Lock<FastMutex> lock(mutex);
vector<TaskStatsResult> result;
for (auto const& task : (*tasks))
{
if (task.valid)
{
result.push_back(fromTaskIdPairToStatsResult(task));
}
}
return result;
}
void TaskScheduler::normalizeTasks()
{
int64_t currentTick = getTick();
std::priority_queue<Event> newAgenda;
while (agenda.size() > 0)
{
Event event = agenda.top();
agenda.pop();
if (event.nextTick < currentTick)
{
Task& task = (*tasks)[event.taskId];
event.nextTick +=
((currentTick - event.nextTick) / task.period + 1) *
task.period;
}
newAgenda.push(event);
}
agenda = newAgenda;
}
void TaskScheduler::run()
{
Lock<FastMutex> lock(mutex);
while (true)
{
while (agenda.size() == 0 && !shouldStop())
condvar.wait(mutex);
// Exit if the ActiveObject has been stopped
if (shouldStop())
{
return;
}
int64_t startTick = getTick();
Event nextEvent = agenda.top();
Task& nextTask = (*tasks)[nextEvent.taskId];
// If the task has the SKIP policy and its execution was missed, we need
// to move it forward to match the period
if (nextEvent.nextTick < startTick && nextTask.policy == Policy::SKIP)
{
agenda.pop();
enqueue(nextEvent, startTick);
}
else if (nextEvent.nextTick <= startTick)
{
agenda.pop();
// Execute the task function
if (nextTask.valid)
{
{
Unlock<FastMutex> unlock(lock);
try
{
nextTask.function();
}
catch (...)
{
// Update the failed statistic
nextTask.failedEvents++;
}
}
// Enqueue only on a valid task
updateStats(nextEvent, startTick, getTick());
enqueue(nextEvent, startTick);
}
}
else
{
Unlock<FastMutex> unlock(lock);
Thread::sleepUntil(nextEvent.nextTick);
}
}
}
void TaskScheduler::updateStats(const Event& event, int64_t startTick,
int64_t endTick)
{
constexpr float tickToMs = 1000.f / TICK_FREQ;
Task& task = (*tasks)[event.taskId];
// Activation stats
float activationError = startTick - event.nextTick;
task.activationStats.add(activationError * tickToMs);
// Period stats
int64_t lastCall = task.lastCall;
if (lastCall >= 0)
task.periodStats.add((startTick - lastCall) * tickToMs);
// Update the last call tick to the current start tick for the next
// iteration
task.lastCall = startTick;
// Workload stats
task.workloadStats.add(endTick - startTick);
}
void TaskScheduler::enqueue(Event event, int64_t startTick)
{
constexpr float msToTick = TICK_FREQ / 1000.f;
Task& task = (*tasks)[event.taskId];
switch (task.policy)
{
case Policy::ONE_SHOT:
// If the task is one shot we won't push it to the agenda and we'll
// remove it from the tasks map.
task.valid = false;
return;
case Policy::SKIP:
// Updated the missed events count
task.missedEvents += (startTick - event.nextTick) / task.period;
// Compute the number of periods between the tick the event should
// have been run and the tick it actually run. Than adds 1 and
// multiply the period to get the next execution tick still aligned
// to the original one.
// E.g. If a task has to run once every 2 ticks and start at tick 0
// but for whatever reason the first execution is at tick 3, then
// the next execution will be at tick 4.
event.nextTick +=
((startTick - event.nextTick) / task.period + 1) * task.period;
break;
case Policy::RECOVER:
event.nextTick += task.period * msToTick;
break;
}
// Re-enqueue the event in the agenda and signals the run thread
agenda.push(event);
condvar.broadcast();
}
TaskScheduler::Task TaskScheduler::makeTask(function_t function,
uint32_t period, size_t id,
bool validity, Policy policy)
{
return Task{function, period, id, validity, policy, -1, {}, {}, {}, 0, 0};
}
} // namespace Boardcore