00001
00002
00003 #include <fesa-core/Utilities/Thread.h>
00004
00005 #include <fesa-core/Core/AbstractEquipment.h>
00006 #include <fesa-core/Core/ThreadPriorityConfiguration.h>
00007 #include <fesa-core/Exception/FesaException.h>
00008 #include <fesa-core/Utilities/Lock.h>
00009 #include <fesa-core/Utilities/Mutex.h>
00010 #include <fesa-core/Utilities/ProcessConfiguration.h>
00011 #include <fesa-core/Utilities/XMLParser.h>
00012
00013 #include <cmw-log/Logger.h>
00014
00015 #include <cstring>
00016 #include <sys/resource.h>
00017 #include <sys/syscall.h>
00018 #include <sys/types.h>
00019
00020
00021 namespace
00022 {
00023
00024 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.Utilities.Thread");
00025
00026 int32_t process_priority = 0;
00027 int32_t scheduling_policy = fesa::RT_SCHED_POLICY;
00028 bool runWithoutRTPrios = false;
00029 std::map<pthread_t, std::string> threadIdNameMap;
00030 fesa::Mutex threadIdNameMapMutex;
00031
00032
00033 int
00034 getProcessPriority()
00035 {
00036
00037
00038 pid_t pid = getpid();
00039 sched_param params;
00040 if (sched_getparam(pid, ¶ms) == -1)
00041 {
00042 std::string errMsg(std::strerror(errno));
00043 std::ostringstream errorStrStream;
00044 errorStrStream << "Could not get Process Parameter: " << errMsg;
00045 throw fesa::FesaException(__FILE__, __LINE__, FesaErrorReadProcessParameter.c_str(), errMsg.c_str());
00046 }
00047 process_priority = params.sched_priority;
00048 return params.sched_priority;
00049 }
00050
00051
00052 void
00053 setProcessPriority(int32_t prio, int32_t policy)
00054 {
00055
00056
00057 pid_t pid = getpid();
00058 sched_param params;
00059 sched_getparam(pid, ¶ms);
00060 params.sched_priority = prio;
00061 int32_t err = sched_setscheduler(pid, policy, ¶ms);
00062 if (err != 0)
00063 {
00064 std::string errMsg(std::strerror(err));
00065 throw fesa::FesaException(__FILE__, __LINE__, FesaErrorWriteProcessParameter.c_str(), errMsg.c_str());
00066 }
00067 std::ostringstream stream;
00068 stream << "Process priority set to priority: " << prio;
00069 stream << ", Scheduling policy set to: ";
00070 if(policy == fesa::RT_SCHED_POLICY)
00071 {
00072 stream << "SCHED_RR" << std::endl;
00073 }
00074 else
00075 {
00076 stream << "SCHED_OTHER" << std::endl;
00077 }
00078 LOG_TRACE_IF(logger, stream.str());
00079 process_priority = prio;
00080 }
00081
00082 }
00083
00084
00085 namespace fesa
00086 {
00087
00088 void*
00089 Thread::startThread(void* arg)
00090 {
00091 Thread* thread = reinterpret_cast<Thread*>(arg);
00092 if(runWithoutRTPrios)
00093 {
00094
00095
00096
00097 if (setpriority(PRIO_PROCESS, static_cast<pid_t>(syscall(SYS_gettid)), thread->priority_) == -1)
00098 {
00099 const std::string errMsg(std::strerror(errno));
00100 throw FesaException(__FILE__, __LINE__, FesaErrorSettingThreadAttributes.c_str(),errMsg.c_str());
00101 }
00102 }
00103 registerThreadIdName(thread->tid_, thread->name_);
00104 thread->run();
00105
00106 return 0;
00107 }
00108
00109
00110 Thread::Thread() :
00111 isRunning_(false),
00112 hasFinished_(false)
00113 {
00114 priority_ = -100;
00115
00116 if(runWithoutRTPrios)
00117 scheduling_policy = NICE_SCHED_POLICY;
00118 if (pthread_attr_init(&attr_) != 0)
00119 throw FesaException(__FILE__, __LINE__, FesaErrorInitializingThreadAttributes.c_str());
00120 }
00121
00122
00123 Thread::~Thread()
00124 {
00125 }
00126
00127
00128 void
00129 Thread::start(bool blockingMode, const std::string& threadName)
00130 {
00131 if (priority_ == -100)
00132 {
00133 throw FesaException(__FILE__, __LINE__, FesaErrorUndefinedPriority.c_str());
00134 }
00135 name_ = threadName;
00136 const pid_t pid = getpid();
00137 sched_param params;
00138 sched_getparam(pid, ¶ms);
00139 std::ostringstream traceStrStream;
00140 traceStrStream << "Starting new thread with priority " << priority_;
00141 if (runWithoutRTPrios == true)
00142 {
00143 traceStrStream << ". Attention! Thread is started without RT scheduling" << std::endl;
00144 }
00145 LOG_TRACE_IF(logger, traceStrStream.str());
00146 if (runWithoutRTPrios == true)
00147 {
00148 params.sched_priority = 0;
00149 }
00150 else
00151 {
00152 params.sched_priority = priority_;
00153 }
00154 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
00155 int32_t err = pthread_attr_setschedpolicy(&attr_, scheduling_policy);
00156 if (err != 0)
00157 {
00158 const std::string errMsg(std::strerror(err));
00159 throw FesaException(__FILE__, __LINE__, FesaErrorSettingSchedulingPolicy.c_str(), errMsg.c_str());
00160 }
00161 err = pthread_attr_setinheritsched(&attr_, PTHREAD_EXPLICIT_SCHED);
00162 if (err != 0)
00163 {
00164 const std::string errMsg(std::strerror(err));
00165 throw FesaException(__FILE__, __LINE__, FesaErrorSettingInheritScheduling.c_str(), errMsg.c_str());
00166 }
00167 err = pthread_attr_setschedparam(&attr_, ¶ms);
00168 if (err != 0)
00169 {
00170 const std::string errMsg(std::strerror(err));
00171 throw FesaException(__FILE__, __LINE__, FesaErrorSettingThreadAttributes.c_str(), errMsg.c_str());
00172 }
00173 if(blockingMode)
00174 {
00175 pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_JOINABLE);
00176 }
00177 else
00178 {
00179 pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_DETACHED);
00180 }
00181
00182 isRunning_ = true;
00183 err = pthread_create(&tid_, &attr_, Thread::startThread, reinterpret_cast<void*>(this));
00184 if (err != 0)
00185 {
00186 const std::string errMsg(std::strerror(err));
00187 throw FesaException(__FILE__, __LINE__, FesaErrorCreatingThread.c_str(),errMsg.c_str());
00188 }
00189 isRunning_ = true;
00190 hasFinished_ = false;
00191 err = pthread_attr_destroy(&attr_);
00192 if (err != 0)
00193 {
00194 const std::string errMsg(std::strerror(err));
00195 throw FesaException(__FILE__, __LINE__, FesaErrorDestroyingThreadAttributes.c_str(), errMsg.c_str());
00196 }
00197
00198 if (blockingMode)
00199 {
00200 pthread_join (tid_, NULL);
00201 }
00202 }
00203
00204
00205 void
00206 Thread::stop()
00207 {
00208 isRunning_ = false;
00209 int32_t attempts = MAX_ATTEMPTS_STOP_THREAD;
00210
00211
00212 while (!hasFinished_ && attempts > 0)
00213 {
00214 attempts--;
00215 sleep(1);
00216 }
00217 if (attempts == 0)
00218 pthread_cancel(tid_);
00219 }
00220
00221
00222 int32_t
00223 Thread::join(void** status)
00224 {
00225 return pthread_join(tid_, status);
00226 }
00227
00228
00229 void
00230 Thread::setPriority(const int32_t priority)
00231 {
00232 if(isRunning_)
00233 throw FesaException(__FILE__, __LINE__, FesaErrorSettingPrioDuringRuntime.c_str());
00234
00235 priority_ = priority;
00236 }
00237
00238
00239
00240 void
00241 Thread::AdjustProcessPriority()
00242 {
00243 AbstractEquipment* eqp = AbstractEquipment::getInstance();
00244 runWithoutRTPrios = eqp->getProcessConfiguration()->isDefined(PropertyTag::NO_RT_SCHEDULING);
00245 if(runWithoutRTPrios)
00246 {
00247 std::ostringstream errorStrStream;
00248 errorStrStream << std::endl;
00249 errorStrStream << "------------------------------------------ WARNING !!!! ------------------------------------------" << std::endl;
00250 errorStrStream << "You are running this FESA-binary without any rt-support, using nice-priorities " << std::endl;
00251 errorStrStream << "The binary will continue to run in verbose mode, but without any guarantee." << std::endl;
00252 errorStrStream << "To be able to start the class with proper RT-priorities, please contact your system administrator." << std::endl;
00253 errorStrStream << "--------------------------------------------------------------------------------------------------" << std::endl;
00254 LOG_WARNING_IF(logger, errorStrStream.str());
00255 }
00256
00257 std::string instanceFile =eqp->getDeviceDataFileName();
00258 XMLParser xmlParser(instanceFile, false);
00259 ThreadPriorityConfigurationFromFile conf(xmlParser, eqp->getProcessConfiguration());
00260 int32_t desired_process_prio = -100;
00261 int32_t current_process_prio = getProcessPriority();
00262
00263 if(!conf.getMaximumPrio(desired_process_prio))
00264 {
00265
00266 desired_process_prio = current_process_prio;
00267
00268 }
00269
00270
00271 if(runWithoutRTPrios)
00272 {
00273
00274 setProcessPriority(0,NICE_SCHED_POLICY);
00275 }
00276 else
00277 {
00278 if (current_process_prio < desired_process_prio)
00279 {
00280 setProcessPriority(desired_process_prio,RT_SCHED_POLICY);
00281 }
00282 }
00283 }
00284
00285
00286
00287 std::string
00288 Thread::getThreadName(pthread_t threadId)
00289 {
00290 Lock lock(threadIdNameMapMutex);
00291 std::map<pthread_t, std::string>::const_iterator iter = threadIdNameMap.find(threadId);
00292 if (iter == threadIdNameMap.end())
00293 {
00294 std::ostringstream errorStrStream;
00295 errorStrStream << "Thread with id " << static_cast<uint32_t>(threadId) << " is not registered";
00296 throw FesaException(__FILE__, __LINE__, errorStrStream.str());
00297 }
00298 return iter->second;
00299 }
00300
00301
00302
00303 void
00304 Thread::registerThreadIdName(pthread_t threadId, const std::string& threadName)
00305 {
00306 Lock lock(threadIdNameMapMutex);
00307 threadIdNameMap.insert(std::pair<pthread_t, std::string>(threadId, threadName));
00308 }
00309
00310 }