Thread.cpp

Go to the documentation of this file.
00001 // Copyright CERN 2012 - Developed in collaboration with GSI
00002 
00003 #include <fesa-core/Utilities/Thread.h>
00004 
00005 #include <fesa-core/Core/AbstractEquipment.h>
00006 #include <fesa-core/Core/ThreadPriorityConfiguration.h>
00007 #include <fesa-core/Exception/FesaException.h>
00008 #include <fesa-core/Utilities/Lock.h>
00009 #include <fesa-core/Utilities/Mutex.h>
00010 #include <fesa-core/Utilities/ProcessConfiguration.h>
00011 #include <fesa-core/Utilities/XMLParser.h>
00012 
00013 #include <cmw-log/Logger.h>
00014 
00015 #include <cstring>
00016 #include <sys/resource.h>
00017 #include <sys/syscall.h>
00018 #include <sys/types.h>
00019 
00020 
00021 namespace
00022 {
00023 
00024 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.Utilities.Thread");
00025 
00026 int32_t process_priority = 0;
00027 int32_t scheduling_policy = fesa::RT_SCHED_POLICY;
00028 bool runWithoutRTPrios = false;
00029 std::map<pthread_t, std::string> threadIdNameMap;
00030 fesa::Mutex threadIdNameMapMutex;
00031 
00032 
00033 int
00034 getProcessPriority()
00035 {
00036     // To get the priority of a process, you have to get the priority from the mainthread of this process.
00037     // So this method only is called from the MainThread.
00038     pid_t pid = getpid();
00039     sched_param params;
00040     if (sched_getparam(pid, &params) == -1)
00041     {
00042         std::string errMsg(std::strerror(errno));
00043         std::ostringstream errorStrStream;
00044         errorStrStream << "Could not get Process Parameter: " << errMsg;
00045         throw fesa::FesaException(__FILE__, __LINE__, FesaErrorReadProcessParameter.c_str(), errMsg.c_str());
00046     }
00047     process_priority = params.sched_priority;
00048     return params.sched_priority;
00049 }
00050 
00051 
00052 void
00053 setProcessPriority(int32_t prio, int32_t policy)
00054 {
00055     //To adjust the priority of a process, you have to adjust the priority from the mainthread of this process.
00056     //So this Method only is called from the MainThread.
00057     pid_t pid = getpid();
00058     sched_param params;
00059     sched_getparam(pid, &params);
00060     params.sched_priority = prio;
00061     int32_t err = sched_setscheduler(pid, policy, &params);
00062     if (err != 0)
00063     {
00064         std::string errMsg(std::strerror(err));
00065         throw fesa::FesaException(__FILE__, __LINE__, FesaErrorWriteProcessParameter.c_str(), errMsg.c_str());
00066     }
00067     std::ostringstream stream;
00068     stream << "Process priority set to priority: " << prio;
00069     stream << ", Scheduling policy set to: ";
00070     if(policy == fesa::RT_SCHED_POLICY)
00071     {
00072         stream << "SCHED_RR" << std::endl;
00073     }
00074     else
00075     {
00076         stream << "SCHED_OTHER" << std::endl;
00077     }
00078     LOG_TRACE_IF(logger, stream.str());
00079     process_priority = prio;
00080 }
00081 
00082 } // namespace
00083 
00084 
00085 namespace fesa
00086 {
00087 
00088 void*
00089 Thread::startThread(void* arg)
00090 {
00091     Thread* thread = reinterpret_cast<Thread*>(arg);
00092     if(runWithoutRTPrios)
00093     { // Nice level needs to be set at run-time
00094         // That's how a nice-level can be set.
00095         // We can't supply thread->tid_ as the second parameter. 'setpriority' wants the thread id in the form that 'gettid()' provides,
00096         // which is different from the one returned from 'pthread_create()'
00097         if (setpriority(PRIO_PROCESS, static_cast<pid_t>(syscall(SYS_gettid)), thread->priority_) == -1)
00098         {
00099             const std::string errMsg(std::strerror(errno));
00100             throw FesaException(__FILE__, __LINE__, FesaErrorSettingThreadAttributes.c_str(),errMsg.c_str());
00101         }
00102     }
00103     registerThreadIdName(thread->tid_, thread->name_);
00104     thread->run();
00105 
00106     return 0;
00107 }
00108 
00109 
00110 Thread::Thread() :
00111     isRunning_(false),
00112     hasFinished_(false)
00113 {
00114     priority_ = -100;//has to be adjusted by the derived class.
00115 
00116     if(runWithoutRTPrios)
00117         scheduling_policy = NICE_SCHED_POLICY;
00118     if (pthread_attr_init(&attr_) != 0)
00119         throw FesaException(__FILE__, __LINE__, FesaErrorInitializingThreadAttributes.c_str());
00120 }
00121 
00122 
00123 Thread::~Thread()
00124 {
00125 }
00126 
00127 
00128 void
00129 Thread::start(bool blockingMode, const std::string& threadName)
00130 {
00131     if (priority_ == -100) //priority has to be adjusted before startup
00132     {
00133         throw FesaException(__FILE__, __LINE__, FesaErrorUndefinedPriority.c_str());
00134     }
00135     name_ = threadName;
00136     const pid_t pid = getpid();
00137     sched_param params;
00138     sched_getparam(pid, &params);
00139     std::ostringstream traceStrStream;
00140     traceStrStream << "Starting new thread with priority " << priority_;
00141     if (runWithoutRTPrios == true)
00142     {
00143         traceStrStream << ". Attention! Thread is started without RT scheduling" << std::endl;
00144     }
00145     LOG_TRACE_IF(logger, traceStrStream.str());
00146     if (runWithoutRTPrios == true) // Use nice levels instead of priorities
00147     {
00148         params.sched_priority = 0;
00149     }
00150     else
00151     {
00152         params.sched_priority = priority_;
00153     }
00154     pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
00155     int32_t err = pthread_attr_setschedpolicy(&attr_, scheduling_policy);
00156     if (err != 0)
00157     {
00158         const std::string errMsg(std::strerror(err));
00159         throw FesaException(__FILE__, __LINE__, FesaErrorSettingSchedulingPolicy.c_str(), errMsg.c_str());
00160     }
00161     err = pthread_attr_setinheritsched(&attr_, PTHREAD_EXPLICIT_SCHED);
00162     if (err != 0)
00163     {
00164         const std::string errMsg(std::strerror(err));
00165         throw FesaException(__FILE__, __LINE__, FesaErrorSettingInheritScheduling.c_str(), errMsg.c_str());
00166     }
00167     err = pthread_attr_setschedparam(&attr_, &params);
00168     if (err != 0)
00169     {
00170         const std::string errMsg(std::strerror(err));
00171         throw FesaException(__FILE__, __LINE__, FesaErrorSettingThreadAttributes.c_str(), errMsg.c_str());
00172     }
00173     if(blockingMode)
00174     {
00175         pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_JOINABLE);
00176     }
00177     else
00178     {
00179         pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_DETACHED);
00180     }
00181     //there is some kind of race with isRunning_
00182     isRunning_ = true;
00183     err = pthread_create(&tid_, &attr_, Thread::startThread, reinterpret_cast<void*>(this));
00184     if (err != 0)
00185     {
00186         const std::string errMsg(std::strerror(err));
00187         throw FesaException(__FILE__, __LINE__, FesaErrorCreatingThread.c_str(),errMsg.c_str());
00188     }
00189     isRunning_ = true;
00190     hasFinished_ = false;
00191     err = pthread_attr_destroy(&attr_);
00192     if (err != 0)
00193     {
00194         const std::string errMsg(std::strerror(err));
00195         throw FesaException(__FILE__, __LINE__, FesaErrorDestroyingThreadAttributes.c_str(), errMsg.c_str());
00196     }
00197     // If it is a blocking call the calling thread waits for the return of the new thread
00198     if (blockingMode)
00199     {
00200         pthread_join (tid_, NULL);
00201     }
00202 }
00203 
00204 
00205 void
00206 Thread::stop()
00207 {
00208     isRunning_ = false;
00209     int32_t attempts = MAX_ATTEMPTS_STOP_THREAD;
00210 
00211     //wait for the thread to stop
00212     while (!hasFinished_ && attempts > 0)
00213     {
00214         attempts--;
00215         sleep(1);
00216     }
00217     if (attempts == 0)
00218         pthread_cancel(tid_);
00219 }
00220 
00221 
00222 int32_t
00223 Thread::join(void** status)
00224 {
00225     return pthread_join(tid_, status);
00226 }
00227 
00228 
00229 void
00230 Thread::setPriority(const int32_t priority)
00231 {
00232     if(isRunning_)
00233         throw FesaException(__FILE__, __LINE__, FesaErrorSettingPrioDuringRuntime.c_str());
00234 
00235     priority_  = priority;
00236 }
00237 
00238 
00239 /*static*/
00240 void
00241 Thread::AdjustProcessPriority()
00242 {
00243     AbstractEquipment* eqp = AbstractEquipment::getInstance();
00244     runWithoutRTPrios = eqp->getProcessConfiguration()->isDefined(PropertyTag::NO_RT_SCHEDULING);
00245     if(runWithoutRTPrios)
00246     {
00247         std::ostringstream errorStrStream;
00248         errorStrStream << std::endl;
00249         errorStrStream << "------------------------------------------ WARNING !!!! ------------------------------------------"  << std::endl;
00250         errorStrStream << "You are running this FESA-binary without any rt-support, using nice-priorities "                     << std::endl;
00251         errorStrStream << "The binary will continue to run in verbose mode, but without any guarantee."                         << std::endl;
00252         errorStrStream << "To be able to start the class with proper RT-priorities, please contact your system administrator."  << std::endl;
00253         errorStrStream << "--------------------------------------------------------------------------------------------------"  << std::endl;
00254         LOG_WARNING_IF(logger, errorStrStream.str());
00255     }
00256 
00257     std::string instanceFile =eqp->getDeviceDataFileName();
00258     XMLParser xmlParser(instanceFile, false);
00259     ThreadPriorityConfigurationFromFile conf(xmlParser, eqp->getProcessConfiguration());
00260     int32_t desired_process_prio = -100;//just something very small
00261     int32_t current_process_prio = getProcessPriority();
00262 
00263     if(!conf.getMaximumPrio(desired_process_prio))
00264     {
00265         // If there is no prio defined in the instantiation-file, we just keep our current process prio
00266         desired_process_prio = current_process_prio;
00267         //TODO: What should be the prio of the main.-thread?
00268     }
00269 
00270     // Try to adjust the process priority, if necessary
00271     if(runWithoutRTPrios)
00272     {//nice priorities have the range [-20 20] maximum is -20
00273         //The scheduler-prio always has to be 0, when using nice-priorities
00274         setProcessPriority(0,NICE_SCHED_POLICY);
00275     }
00276     else
00277     {//rt-priorities have the range [0 99] maximum is 99
00278         if (current_process_prio < desired_process_prio)
00279         {
00280             setProcessPriority(desired_process_prio,RT_SCHED_POLICY);
00281         }
00282     }
00283 }
00284 
00285 
00286 /*static*/
00287 std::string
00288 Thread::getThreadName(pthread_t threadId)
00289 {
00290     Lock lock(threadIdNameMapMutex);
00291     std::map<pthread_t, std::string>::const_iterator iter = threadIdNameMap.find(threadId);
00292     if (iter == threadIdNameMap.end())
00293     {
00294         std::ostringstream errorStrStream;
00295         errorStrStream << "Thread with id " << static_cast<uint32_t>(threadId) << " is not registered";
00296         throw FesaException(__FILE__, __LINE__, errorStrStream.str());
00297     }
00298     return iter->second;
00299 }
00300 
00301 
00302 /*static*/
00303 void
00304 Thread::registerThreadIdName(pthread_t threadId, const std::string& threadName)
00305 {
00306     Lock lock(threadIdNameMapMutex);
00307     threadIdNameMap.insert(std::pair<pthread_t, std::string>(threadId, threadName));
00308 }
00309 
00310 } // fesa

Generated on 18 Jan 2013 for Fesa by  doxygen 1.6.1