RTScheduler.cpp

Go to the documentation of this file.
00001 // Copyright CERN 2012 - Developed in collaboration with GSI
00002 
00003 #include <fesa-core/RealTime/RTScheduler.h>
00004 
00005 #include <fesa-core/RealTime/AbstractRTAction.h>
00006 #include <fesa-core/RealTime/AbstractRTEquipment.h>
00007 #include <fesa-core/RealTime/RTEvent.h>
00008 #include <fesa-core/Core/AbstractEquipment.h>
00009 #include <fesa-core/Core/ThreadPriorityConfiguration.h>
00010 #include <fesa-core/Diagnostic/Diagnostics.h>
00011 #include <fesa-core/Diagnostic/FesaStream.h>
00012 #include <fesa-core/Utilities/Lock.h>
00013 #include <fesa-core/Utilities/XMLParser.h>
00014 
00015 #include <cmw-log/Logger.h>
00016 
00017 
00018 namespace
00019 {
00020 
00021 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.RealTime.RTScheduler");
00022 const std::string diagTopic = fesa::DiagnosticUtils::eventTrackingStr;
00023 
00024 } // namespace
00025 
00026 
00027 namespace fesa
00028 {
00029 
00030 RTScheduler::RTScheduler(const std::string& name, int32_t eventQueueSize) :
00031     name_(name),
00032     queueSize_(eventQueueSize),
00033     lostEventCount_(0)
00034 {
00035     setDetachState( PTHREAD_CREATE_DETACHED);
00036     {
00037         AbstractEquipment* eqp = AbstractEquipment::getInstance();
00038         std::string instanceFile = eqp->getDeviceDataFileName();
00039         XMLParser xmlParser(instanceFile, false);
00040         ThreadPriorityConfigurationFromFile conf(xmlParser, eqp->getProcessConfiguration());
00041         int32_t prio =  conf.getPrioRTScheduler(name_);
00042         setPriority(prio);
00043     }
00044 
00045     postEventCount_.last_ = 0;
00046     postEventCount_.current_ = 0;
00047     postEventCount_.max_ = 0;
00048     scheduledEventCount_.last_ = 0;
00049     scheduledEventCount_.current_ = 0;
00050     scheduledEventCount_.max_ = 0;
00051     accumulatedEventCount_.last_ = 0;
00052     accumulatedEventCount_.current_ = 0;
00053     accumulatedEventCount_.max_ = 0;
00054 
00055     std::ostringstream message;
00056     message << " Creation of RTScheduler: " << name;
00057     LOG_TRACE_IF(logger, message.str());
00058 }
00059 
00060 RTScheduler::~RTScheduler()
00061 {
00062     std::map<const std::string, std::vector<AbstractRTAction*> >::iterator schedullingUnit;
00063     std::vector<AbstractRTAction*>::iterator actions;
00064 
00065     for (schedullingUnit = schedulingMap_.begin(); schedullingUnit != schedulingMap_.end(); schedullingUnit++)
00066         for (actions = schedullingUnit->second.begin(); actions != schedullingUnit->second.end(); actions++)
00067             if (*actions != NULL)
00068             {
00069                 delete *actions;
00070                 *actions = NULL;
00071             }
00072 }
00073 
00074 void RTScheduler::post(boost::shared_ptr<RTEvent>& pEv)
00075 {
00076     (postEventCount_.current_)++;
00077     //Save maximum number of event accumulation
00078     accumulatedEventCount_.current_ = static_cast<uint32_t>(eventQueue_.size());
00079     if (accumulatedEventCount_.current_ > accumulatedEventCount_.max_)
00080     {
00081         accumulatedEventCount_.max_ = accumulatedEventCount_.current_;
00082     }
00083     Lock lock(mutexQueue_);
00084     if ((queueSize_ < 0) || // no limit defined
00085         (eventQueue_.size() < (uint32_t) queueSize_))
00086     { // check against the limit
00087         // No event jam, push-back the event
00088         eventQueue_.push_back(pEv);
00089         // Lower alarm Field in case it was raised before
00090         //pGlobalDevice->rtEventLost.lower();
00091         // Wake up the the scheduler and returns
00092         conditionalVariableQueue_.signal();
00093         if (logger.isLoggable(CMW::Log::Level::LL_TRACE))
00094         {
00095             std::ostringstream message;
00096             message << "Signaling " << name_;
00097             LOG_TRACE_IF(logger, message.str());
00098         }
00099     }
00100     else
00101     {
00102         std::ostringstream errorStrStream;
00103         errorStrStream << "Could not post event: " << pEv->getName() << "; RTScheduler queue is completely full." << std::endl;
00104         LOG_ERROR_IF(logger, errorStrStream.str());
00105         FesaException exception(__FILE__, __LINE__, errorStrStream.str());
00106         AbstractRTEquipment::getInstance()->handleSchedulerError(this,exception);
00107     }
00108 }
00109 
00110 void RTScheduler::run()
00111 {
00112     if (logger.isLoggable(CMW::Log::Level::LL_DEBUG))
00113     {
00114         std::ostringstream message;
00115         message << "Running scheduler " << name_;
00116         LOG_DEBUG_IF(logger, message.str());
00117     }
00118     while (isRunning_)
00119     {
00120         // a blocking-call returning one-by-one the events hold in the Scheduler's event-queue
00121         boost::shared_ptr<RTEvent> pShEvt;
00122         {
00123             Lock lock(mutexQueue_);
00124             if (eventQueue_.size() == 0)
00125             {
00126                 conditionalVariableQueue_.wait(mutexQueue_);
00127                 if (logger.isLoggable(CMW::Log::Level::LL_TRACE))
00128                 {
00129                     std::ostringstream message;
00130                     message << "Post received for " << name_;
00131                     LOG_TRACE_IF(logger, message.str());
00132                 }
00133             }
00134             // dequeue events
00135             pShEvt = eventQueue_.front();
00136             // delete the entry in the queue (shared_ptr reference-counter decremented by one)
00137             eventQueue_.pop_front();
00138         }
00139         // retrieve from the shared_pointer the raw object.
00140         RTEvent* pEvent = pShEvt.get();
00141         if (isRunning_ && pEvent != NULL)
00142         {
00143             // Schedule the Event
00144             schedule(pEvent);
00145         }
00146         else
00147         {
00148             continue;
00149         }
00150     }
00151     hasFinished_ = true;
00152 } // end run
00153 
00154 void RTScheduler::logDiagnostics(RTEvent*& pEvent)
00155 {
00156     const boost::shared_ptr<Diagnostics>& diagnostics = AbstractEquipment::getInstance()->getDiagnostics();
00157     DiagnosticUtils::DiagnosticMessage diagMsg;
00158     diagMsg.side = DiagnosticUtils::framework;
00159     diagMsg.source = DiagnosticUtils::rt;
00160     std::ostringstream traceStrStream;
00161     traceStrStream << "RTEvent '" << pEvent->getName() << "'";
00162     diagMsg.msg = traceStrStream.str();
00163     diagnostics->log(diagTopic, diagMsg);
00164 }
00165 
00166 void RTScheduler::schedule(RTEvent* pEvent)
00167 {
00168     logDiagnostics(pEvent);
00169     std::map<const std::string, std::vector<AbstractRTAction*> >::iterator eventIterator = schedulingMap_.find(
00170                     pEvent->getName());
00171     if (eventIterator != schedulingMap_.end())
00172     {
00173         (scheduledEventCount_.current_)++;
00174         std::vector<AbstractRTAction*>::iterator actionsIterator;
00175         for (actionsIterator = eventIterator->second.begin(); actionsIterator != eventIterator->second.end();
00176                         actionsIterator++)
00177         {
00178             try
00179             {
00180                 if (logger.isLoggable(CMW::Log::Level::LL_TRACE))
00181                 {
00182                     std::ostringstream message;
00183                     message << "Executing action " << (*actionsIterator)->getName();
00184                     LOG_TRACE_IF(logger, message.str());
00185                 }
00186                 (*actionsIterator)->executeAction(pEvent);
00187             }
00188             catch (FesaException& ex)
00189             {
00190                 std::ostringstream errorStrStream;
00191                 errorStrStream << "RTScheduler::schedule() caught FesaException during " << name_
00192                                 << " RTAction execution: " << ex.getMessage();
00193                 LOG_ERROR_IF(logger, errorStrStream.str());
00194             }
00195             catch (std::exception& ex)
00196             {
00197                 std::ostringstream errorStrStream;
00198                 errorStrStream << "RTScheduler::schedule() caught std::exception during " << name_
00199                                 << " RTAction execution: " << ex.what();
00200                 LOG_ERROR_IF(logger, errorStrStream.str());
00201             }
00202 
00203             catch (...)
00204             {
00205                 std::ostringstream errorStrStream;
00206                 errorStrStream << "RTScheduler::schedule() caught unknown exception during " << name_
00207                                 << " RTAction execution";
00208                 LOG_ERROR_IF(logger, errorStrStream.str());
00209             }
00210         }
00211     }
00212     else
00213     {
00214         std::ostringstream errorStrStream;
00215         errorStrStream << "RTScheduler::schedule()  " << name_ << " RTAction execution: "
00216                         << "scheduler not found for event: " << pEvent->getName();
00217         LOG_ERROR_IF(logger, errorStrStream.str());
00218     }
00219 }
00220 
00221 void RTScheduler::printConfig(FesaStream* configStream)
00222 {
00223     std::map<std::string, std::vector<AbstractRTAction*> >::iterator itr = schedulingMap_.begin();
00224     std::map<std::string, std::vector<AbstractRTAction*> >::iterator end = schedulingMap_.end();
00225     *configStream << "\t\t\t<concurrent-layer name=\"" << name_ << "\" priority=\"" << getPriority() << "\">"
00226                     << std::endl;
00227     for (; itr != end; ++itr)
00228     {
00229         *configStream << "\t\t\t\t<event name=\"" << itr->first << "\">" << std::endl;
00230         std::vector<AbstractRTAction*>::iterator itrAct = itr->second.begin();
00231         std::vector<AbstractRTAction*>::iterator endAct = itr->second.end();
00232         for (; itrAct != endAct; ++itrAct)
00233         {
00234             (*itrAct)->printConfig(configStream);
00235         }
00236         *configStream << "\t\t\t\t</event>" << std::endl;
00237     }
00238 
00239     *configStream << "\t\t\t</concurrent-layer>" << std::endl;
00240 }
00241 
00242 void RTScheduler::printState(FesaStream* fesaStream, double elapsedTime)
00243 {
00244     float freq_postEventCount;
00245     float freq_scheduledEventCount;
00246     *fesaStream << "\nSCHEDULING --------------------------------------------------------------" << std::endl;
00247     freq_postEventCount = (float) (((postEventCount_.current_ - postEventCount_.last_) / elapsedTime));
00248     freq_scheduledEventCount = (float) (((scheduledEventCount_.current_ - scheduledEventCount_.last_) / elapsedTime));
00249     *fesaStream << "\n    concurrent-layer thread-name = \"" << name_ << "\"" << std::endl;
00250     *fesaStream << "        thread-id:\t" << getID() << std::endl;
00251     //*fesaStream << "        policy:\t\t" << policyStr << endl;
00252     *fesaStream << "        priority:\t\t" << getPriority() << std::endl;
00253     *fesaStream << "        event-queue-size:\t" << queueSize_ << std::endl;
00254     *fesaStream << "        post-event-count (last/current: freq):\t\t" << postEventCount_.last_ << "/"
00255                 << postEventCount_.current_ << ": " << freq_postEventCount << " Hz" << std::endl;
00256     *fesaStream << "        scheduled-event-count (last/current: freq):\t" << scheduledEventCount_.last_ << "/"
00257                 << scheduledEventCount_.current_ << ": " << freq_scheduledEventCount << " Hz" << std::endl;
00258     *fesaStream << "        current-accumulated-event-count:\t" << accumulatedEventCount_.current_ << std::endl;
00259     *fesaStream << "        max-accumulated-event-count:\t" << accumulatedEventCount_.max_ << std::endl;
00260     *fesaStream << "        lost-event-count:\t\t" << lostEventCount_ << std::endl;
00261     postEventCount_.last_ = postEventCount_.current_;
00262     scheduledEventCount_.last_ = scheduledEventCount_.current_;
00263 
00264     std::map<std::string, std::vector<AbstractRTAction*> >::iterator itrEvt = schedulingMap_.begin();
00265     std::map<std::string, std::vector<AbstractRTAction*> >::iterator endEvt = schedulingMap_.end();
00266     for (; itrEvt != endEvt; ++itrEvt)
00267     {
00268         *fesaStream << "\n        event name = \"" << itrEvt->first << "\"" << std::endl;
00269         std::vector<AbstractRTAction*>::iterator itrAct = itrEvt->second.begin();
00270         std::vector<AbstractRTAction*>::iterator endAct = itrEvt->second.end();
00271         for (; itrAct != endAct; ++itrAct)
00272             (*itrAct)->printState(fesaStream, elapsedTime);
00273 
00274     }
00275 
00276 }
00277 
00278 void RTScheduler::cancel()
00279 {
00280     isRunning_ = false;
00281     int32_t attempts = MAX_ATTEMPTS_STOP_THREAD;
00282 
00283     //wait for the thread to stop
00284     while (!hasFinished_ && attempts > 0)
00285     {
00286         attempts--;
00287         conditionalVariableQueue_.signal();
00288         sleep(1);
00289 
00290     }
00291     if (attempts == 0)
00292     {
00293         stop();
00294         std::ostringstream errorStrStream;
00295         errorStrStream << "During shut down procedure RTScheduler: " << name_ << " was forced to exit ";
00296         LOG_ERROR_IF(logger, errorStrStream.str());
00297     }
00298 }
00299 
00300 void RTScheduler::addRTAction(const std::string& eventName, AbstractRTAction* rtAction)
00301 {
00302     std::map<const std::string, std::vector<AbstractRTAction*> >::iterator rtActionList = schedulingMap_.find(
00303         eventName);
00304 
00305     if (rtActionList != schedulingMap_.end())
00306     {
00307         rtActionList->second.push_back(rtAction);
00308     }
00309     else
00310     {
00311         std::vector<AbstractRTAction*> actions;
00312         actions.push_back(rtAction);
00313         schedulingMap_.insert(std::make_pair(eventName, actions));
00314     }
00315 }
00316 
00317 } // fesa

Generated on 18 Jan 2013 for Fesa by  doxygen 1.6.1