Skip to content

Commit

Permalink
lestarch: reworking posix tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
LeStarch committed Aug 31, 2021
1 parent 001b4ce commit 3079f39
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 147 deletions.
2 changes: 1 addition & 1 deletion Drv/Ip/SocketReadTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ void SocketReadTask::startSocketTask(const Fw::StringBase &name,
FW_ASSERT(not this->m_stop); // It is a coding error to stop the thread before it is started
m_reconnect = reconnect;
// Note: the first step is for the IP socket to open the the port
Os::Task::TaskStatus stat = m_task.start(name, 0, priority, stack, SocketReadTask::readTask, this, cpuAffinity);
Os::Task::TaskStatus stat = m_task.start(name, SocketReadTask::readTask, this, priority, stack, cpuAffinity, 0);
FW_ASSERT(Os::Task::TASK_OK == stat, static_cast<NATIVE_INT_TYPE>(stat));
}

Expand Down
10 changes: 5 additions & 5 deletions Drv/Ip/SocketReadTask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ class SocketReadTask {
* to the Os::Task::start call. cpuAffinity defaults to -1.
*
* \param name: name of the task
* \param priority: priority of the started task. See: Os::Task::start.
* \param stack: stack size provided to the task. See: Os::Task::start.
* \param priority: priority of the started task. See: Os::Task::start. Default: -1, not prioritized
* \param stack: stack size provided to the task. See: Os::Task::start. Default: -1, posix threads default
* \param reconnect: automatically reconnect socket when closed. Default: true.
* \param cpuAffinity: cpu affinity provided to task. See: Os::Task::start.
* \param cpuAffinity: cpu affinity provided to task. See: Os::Task::start. Default: -1, don't care
*/
void startSocketTask(const Fw::StringBase &name,
const NATIVE_INT_TYPE priority,
const NATIVE_INT_TYPE stack,
const NATIVE_INT_TYPE priority = -1,
const NATIVE_INT_TYPE stack = -1,
const bool reconnect = true,
const NATIVE_INT_TYPE cpuAffinity = -1);

Expand Down
2 changes: 1 addition & 1 deletion Fw/Comp/ActiveComponentBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ namespace Fw {
#if FW_BAREMETAL_SCHEDULER == 1
Os::Task::TaskStatus status = this->m_task.start(taskName, identifier, priority, stackSize, this->s_baseBareTask, this, cpuAffinity);
#else
Os::Task::TaskStatus status = this->m_task.start(taskName, identifier, priority, stackSize, this->s_baseTask, this, cpuAffinity);
Os::Task::TaskStatus status = this->m_task.start(taskName, this->s_baseTask, this, priority, stackSize, cpuAffinity, identifier);
#endif
FW_ASSERT(status == Os::Task::TASK_OK,(NATIVE_INT_TYPE)status);
}
Expand Down
2 changes: 1 addition & 1 deletion Fw/Comp/ActiveComponentBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
namespace Fw {
class ActiveComponentBase : public QueuedComponentBase {
public:
void start(NATIVE_INT_TYPE identifier, NATIVE_INT_TYPE priority, NATIVE_INT_TYPE stackSize, NATIVE_INT_TYPE cpuAffinity = -1); //!< called by instantiator when task is to be started
void start(NATIVE_INT_TYPE identifier = 0, NATIVE_INT_TYPE priority = -1, NATIVE_INT_TYPE stackSize = -1, NATIVE_INT_TYPE cpuAffinity = -1); //!< called by instantiator when task is to be started
void exit(void); //!< exit task in active component
Os::Task::TaskStatus join(void **value_ptr); //!< provide return value of thread if value_ptr is not NULL

Expand Down
261 changes: 162 additions & 99 deletions Os/Posix/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,13 @@
#include <Fw/Types/Assert.hpp>


#ifdef TGT_OS_TYPE_VXWORKS
#include <vxWorks.h>
#include <taskLib.h> // need it for VX_FP_TASK
#else
#include <sys/types.h>
#include <unistd.h>
#endif
#include <sys/types.h>
#include <errno.h>

#include <pthread.h>
#include <errno.h>
#include <limits.h>
#include <string.h>
#include <time.h>
#include <stdio.h>
#include <Fw/Logger/Logger.hpp>

typedef void* (*pthread_func_ptr)(void*);
Expand All @@ -28,123 +22,192 @@ void* pthread_entry_wrapper(void* arg) {
}

namespace Os {
Task::Task() : m_handle(0), m_identifier(0), m_affinity(-1), m_started(false), m_suspendedOnPurpose(false), m_routineWrapper() {
}

Task::TaskStatus Task::start(const Fw::StringBase &name, NATIVE_INT_TYPE identifier, NATIVE_INT_TYPE priority, NATIVE_INT_TYPE stackSize, taskRoutine routine, void* arg, NATIVE_INT_TYPE cpuAffinity) {
FW_ASSERT(routine);

this->m_name = "TP_";
this->m_name += name;
#ifndef TGT_OS_TYPE_VXWORKS
char pid[40];
(void)snprintf(pid,sizeof(pid),".%d",getpid());
pid[sizeof(pid)-1] = 0;
this->m_name += pid;
#endif
this->m_identifier = identifier;
Task::TaskStatus tStat = TASK_OK;

pthread_attr_t att;
// clear att; can cause issues
memset(&att,0,sizeof(att));

I32 stat = pthread_attr_init(&att);
if (stat != 0) {
Fw::Logger::logMsg("pthread_attr_init: (%d)(%d): %s\n",stat,errno, reinterpret_cast<POINTER_CAST>(strerror(stat)));
return TASK_INVALID_PARAMS;
void validate_arguments(NATIVE_INT_TYPE& priority, NATIVE_INT_TYPE& stack, NATIVE_INT_TYPE& affinity, bool expect_perm) {
// Check priority attributes
if (!expect_perm and priority != -1) {
Fw::Logger::logMsg("[WARNING] task priority set and permissions unavailable. Discarding priority.\n");
priority = -1;
}
#ifdef TGT_OS_TYPE_VXWORKS
stat = pthread_attr_setstacksize(&att,stackSize);
if (stat != 0) {
return TASK_INVALID_STACK;
}
stat = pthread_attr_setschedpolicy(&att,SCHED_FIFO);
if (stat != 0) {
return TASK_INVALID_PARAMS;
}
stat = pthread_attr_setname(&att,(char*)this->m_name.toChar());
if (stat != 0) {
return TASK_INVALID_PARAMS;
if (priority != -1 and priority < 1) {
Fw::Logger::logMsg("[WARNING] low task priority of %d being clamped to 1\n", priority);
priority = 1;
}
stat = pthread_attr_setinheritsched(&att,PTHREAD_EXPLICIT_SCHED); // needed to set inheritance according to WR docs
if (stat != 0) {
return TASK_INVALID_PARAMS;
if (priority != -1 and priority > 99) {
Fw::Logger::logMsg("[WARNING] high task priority of %d being clamped to 99\n", priority);
priority = 99;
}
sched_param schedParam;
memset(&schedParam,0,sizeof(sched_param));
schedParam.sched_priority = priority;
stat = pthread_attr_setschedparam(&att,&schedParam);
if (stat != 0) {
return TASK_INVALID_PARAMS;
// Check the stack
if (stack != -1 and stack < PTHREAD_STACK_MIN) {
Fw::Logger::logMsg("[WARNING] stack size %d too small, setting to minimum of %d\n", stack, PTHREAD_STACK_MIN);
stack = PTHREAD_STACK_MIN;
}
stat = pthread_attr_setopt(&att,VX_FP_TASK);
if (stat != 0) {
return TASK_INVALID_PARAMS;
// Check CPU affinity
if (!expect_perm and affinity != -1) {
Fw::Logger::logMsg("[WARNING] cpu affinity set and permissions unavailable. Discarding affinity.\n");
affinity = -1;
}
#elif defined TGT_OS_TYPE_LINUX
#if !defined BUILD_CYGWIN // cygwin doesn't support this call
stat = pthread_attr_setschedpolicy(&att,SCHED_RR);
if (stat != 0) {
Fw::Logger::logMsg("pthread_attr_setschedpolicy: %s\n", reinterpret_cast<POINTER_CAST>(strerror(errno)));
return TASK_INVALID_PARAMS;
}

Task::TaskStatus set_stack_size(pthread_attr_t& att, NATIVE_INT_TYPE stack) {
// Set the stack size, if it has been supplied
if (stack != -1) {
I32 stat = pthread_attr_setstacksize(&att, stack);
if (stat != 0) {
Fw::Logger::logMsg("pthread_attr_setstacksize: %s\n", reinterpret_cast<POINTER_CAST>(strerror(stat)));
return Task::TASK_INVALID_STACK;
}
}
#endif
#elif defined TGT_OS_TYPE_RTEMS
stat = pthread_attr_setstacksize(&att,stackSize);
if (stat != 0) {
return TASK_INVALID_STACK;
return Task::TASK_OK;
}

Task::TaskStatus set_priority_params(pthread_attr_t& att, NATIVE_INT_TYPE priority) {
if (priority != -1) {
I32 stat = pthread_attr_setschedpolicy(&att, SCHED_FIFO);
if (stat != 0) {
Fw::Logger::logMsg("pthread_attr_setschedpolicy: %s\n", reinterpret_cast<POINTER_CAST>(strerror(stat)));
return Task::TASK_INVALID_PARAMS;
}

stat = pthread_attr_setinheritsched(&att, PTHREAD_EXPLICIT_SCHED);
if (stat != 0) {
Fw::Logger::logMsg("pthread_attr_setinheritsched: %s\n",
reinterpret_cast<POINTER_CAST>(strerror(stat)));
return Task::TASK_INVALID_PARAMS;
}

sched_param schedParam;
memset(&schedParam, 0, sizeof(sched_param));
schedParam.sched_priority = priority;
stat = pthread_attr_setschedparam(&att, &schedParam);
if (stat != 0) {
Fw::Logger::logMsg("pthread_attr_setschedparam: %s\n", reinterpret_cast<POINTER_CAST>(strerror(stat)));
return Task::TASK_INVALID_PARAMS;
}
}
stat = pthread_attr_setschedpolicy(&att,SCHED_FIFO);
if (stat != 0) {
return TASK_INVALID_PARAMS;
return Task::TASK_OK;
}

Task::TaskStatus set_cpu_affinity(pthread_attr_t& att, NATIVE_INT_TYPE cpuAffinity) {
if (cpuAffinity != -1) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpuAffinity, &cpuset);

I32 stat = pthread_attr_setaffinity_np(&att, sizeof(cpu_set_t), &cpuset);
if (stat != 0) {
Fw::Logger::logMsg("pthread_setaffinity_np: %i %s\n", cpuAffinity,
reinterpret_cast<POINTER_CAST>(strerror(stat)));
return Task::TASK_INVALID_PARAMS;
}
}
stat = pthread_attr_setinheritsched(&att,PTHREAD_EXPLICIT_SCHED); // needed to set inheritance according to WR docs
return Task::TASK_OK;
}

Task::TaskStatus create_pthread(NATIVE_INT_TYPE priority, NATIVE_INT_TYPE stackSize, NATIVE_INT_TYPE cpuAffinity, pthread_t*& tid, void* arg, bool expect_perm) {
Task::TaskStatus tStat = Task::TASK_OK;
validate_arguments(priority, stackSize, cpuAffinity, expect_perm);
pthread_attr_t att;
memset(&att,0, sizeof(att));


I32 stat = pthread_attr_init(&att);
if (stat != 0) {
return TASK_INVALID_PARAMS;
Fw::Logger::logMsg("pthread_attr_init: (%d)(%d): %s\n", stat, errno, reinterpret_cast<POINTER_CAST>(strerror(stat)));
return Task::TASK_INVALID_PARAMS;
}
sched_param schedParam;
memset(&schedParam,0,sizeof(sched_param));
schedParam.sched_priority = priority;
stat = pthread_attr_setschedparam(&att,&schedParam);
if (stat != 0) {
return TASK_INVALID_PARAMS;

// Handle setting stack size
tStat = set_stack_size(att, stackSize);
if (tStat != Task::TASK_OK) {
return tStat;
}
#elif defined TGT_OS_TYPE_DARWIN
#else
#error Unsupported OS!
#endif

// If a registry has been registered, register task
if (Task::s_taskRegistry) {
Task::s_taskRegistry->addTask(this);

// Handle non-zero priorities
tStat = set_priority_params(att, priority);
if (tStat != Task::TASK_OK) {
return tStat;
}

pthread_t* tid = new pthread_t;
this->m_routineWrapper.routine = routine;
this->m_routineWrapper.arg = arg;
// Set affinity before creating thread:
tStat = set_cpu_affinity(att, cpuAffinity);
if (tStat != Task::TASK_OK) {
return tStat;
}

stat = pthread_create(tid,&att,pthread_entry_wrapper,&this->m_routineWrapper);
tid = new pthread_t;
const char* message = NULL;

stat = pthread_create(tid, &att, pthread_entry_wrapper, arg);
switch (stat) {
// Success, do nothing
case 0:
this->m_handle = (POINTER_CAST)tid;
Task::s_numTasks++;
break;
case EINVAL:
delete tid;
Fw::Logger::logMsg("pthread_create: %s\n", reinterpret_cast<POINTER_CAST>(strerror(errno)));
tStat = TASK_INVALID_PARAMS;
message = "Invalid thread attributes specified";
tStat = Task::TASK_INVALID_PARAMS;
break;
case EPERM:
message = "Insufficient permissions to create thread. May not set thread priority without permission";
tStat = Task::TASK_ERROR_PERMISSION;
break;
case EAGAIN:
message = "Unable to allocate thread. Increase thread ulimit.";
tStat = Task::TASK_ERROR_RESOURCES;
break;
default:
delete tid;
tStat = TASK_UNKNOWN_ERROR;
message = "Unknown error";
tStat = Task::TASK_UNKNOWN_ERROR;
break;
}

(void)pthread_attr_destroy(&att);
if (stat != 0) {
delete tid;
tid = NULL;
Fw::Logger::logMsg("pthread_create: %s. %s\n", reinterpret_cast<POINTER_CAST>(message), reinterpret_cast<POINTER_CAST>(strerror(stat)));
return tStat;
}
return Task::TASK_OK;
}

Task::Task() : m_handle(0), m_identifier(0), m_affinity(-1), m_started(false), m_suspendedOnPurpose(false), m_routineWrapper() {
}

return tStat;
Task::TaskStatus Task::start(const Fw::StringBase &name, taskRoutine routine, void* arg, NATIVE_INT_TYPE priority, NATIVE_INT_TYPE stackSize, NATIVE_INT_TYPE cpuAffinity, NATIVE_INT_TYPE identifier) {
FW_ASSERT(routine);

this->m_name = "TP_";
this->m_name += name;
this->m_identifier = identifier;
// Setup functor wrapper parameters
this->m_routineWrapper.routine = routine;
this->m_routineWrapper.arg = arg;
pthread_t* tid;

// Try to create a permissioned thread
TaskStatus status = create_pthread(priority, stackSize, cpuAffinity, tid, &this->m_routineWrapper, true);
// Failure dur to permission automatically retried
if (status == TASK_ERROR_PERMISSION) {
Fw::Logger::logMsg("[WARNING] Insufficient permissions to create a prioritized tasks or specify CPU affinities. Attempting to fallback to tasks without priority.\n");
Fw::Logger::logMsg("[WARNING] Please use no-argument <component>.start() calls or ensure executing user has correct permissions for your operating system.\n");
Fw::Logger::logMsg("[WARNING] Note: this fallback to tasks without priority will be removed and will fail in future fprime releases.\n");
status = create_pthread(priority, stackSize, cpuAffinity, tid, &this->m_routineWrapper, false); // Fallback with no permission
}
// Check for non-zero error code
if (status != TASK_OK) {
return status;
}
FW_ASSERT(tid != NULL);

// Handle a successfully created task
this->m_handle = (POINTER_CAST)tid;
Task::s_numTasks++;
// If a registry has been registered, register task
if (Task::s_taskRegistry) {
Task::s_taskRegistry->addTask(this);
}
return status;
}

Task::TaskStatus Task::delay(NATIVE_UINT_TYPE milliseconds)
Expand Down
5 changes: 4 additions & 1 deletion Os/Task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ namespace Os {
TASK_UNKNOWN_ERROR, //!< unexpected error return value
TASK_INVALID_AFFINITY, //!< unable to set the task affinity
TASK_DELAY_ERROR, //!< error trying to delay the task
TASK_JOIN_ERROR //!< error trying to join the task
TASK_JOIN_ERROR, //!< error trying to join the task
TASK_ERROR_RESOURCES, //!< unable to allocate more tasks
TASK_ERROR_PERMISSION, //!< permissions error setting-up tasks
} TaskStatus ;

typedef void (*taskRoutine)(void* ptr); //!< prototype for task routine started in task context
Expand All @@ -34,6 +36,7 @@ namespace Os {
Task(); //!< constructor
virtual ~Task(); //!< destructor
// Priority is based on Posix priorities - 0 lowest, 255 highest
TaskStatus start(const Fw::StringBase &name, taskRoutine routine, void* arg, NATIVE_INT_TYPE priority = -1, NATIVE_INT_TYPE stackSize = -1, NATIVE_INT_TYPE cpuAffinity = -1, NATIVE_INT_TYPE identifier = -1); //!< start the task
TaskStatus start(const Fw::StringBase &name, NATIVE_INT_TYPE identifier, NATIVE_INT_TYPE priority, NATIVE_INT_TYPE stackSize, taskRoutine routine, void* arg, NATIVE_INT_TYPE cpuAffinity = -1); //!< start the task
I32 getIdentifier(void); //!< get the identifier for the task
static TaskId getOsIdentifier(void); //Gets the Os Task ID. Useful for passive components.
Expand Down
7 changes: 7 additions & 0 deletions Os/TaskCommon.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <Os/Task.hpp>
#include <Fw/Types/Assert.hpp>
#include <Fw/Logger/Logger.hpp>
#include <string.h>

namespace Os {
Expand Down Expand Up @@ -41,6 +42,12 @@ namespace Os {

}

Task::TaskStatus Task::start(const Fw::StringBase &name, NATIVE_INT_TYPE identifier, NATIVE_INT_TYPE priority, NATIVE_INT_TYPE stackSize, taskRoutine routine, void* arg, NATIVE_INT_TYPE cpuAffinity) {
Fw::Logger::logMsg("[WARNING] Os::Task.start(name, identifier, priority, stackSize, routine, arg, cpuAffinity) is deprecated.\n");
Fw::Logger::logMsg("[WARNING] Please migrate to the form: Os::Task.start(name, routine, arg, priority, stackSize, cpuAffinity, identifier).\n");
return this->start(name, routine, arg, priority, stackSize, cpuAffinity, identifier);
}

TaskRegistry::~TaskRegistry() {

}
Expand Down
Loading

0 comments on commit 3079f39

Please sign in to comment.