RTScheduler.cpp
Go to the documentation of this file.00001
00002
00003 #include <fesa-core/RealTime/RTScheduler.h>
00004
00005 #include <fesa-core/RealTime/AbstractRTAction.h>
00006 #include <fesa-core/RealTime/AbstractRTEquipment.h>
00007 #include <fesa-core/RealTime/RTEvent.h>
00008 #include <fesa-core/Core/AbstractEquipment.h>
00009 #include <fesa-core/Core/ThreadPriorityConfiguration.h>
00010 #include <fesa-core/Diagnostic/Diagnostics.h>
00011 #include <fesa-core/Diagnostic/FesaStream.h>
00012 #include <fesa-core/Utilities/Lock.h>
00013 #include <fesa-core/Utilities/XMLParser.h>
00014
00015 #include <cmw-log/Logger.h>
00016
00017
00018 namespace
00019 {
00020
00021 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.RealTime.RTScheduler");
00022 const std::string diagTopic = fesa::DiagnosticUtils::eventTrackingStr;
00023
00024 }
00025
00026
00027 namespace fesa
00028 {
00029
00030 RTScheduler::RTScheduler(const std::string& name, int32_t eventQueueSize) :
00031 name_(name),
00032 queueSize_(eventQueueSize),
00033 lostEventCount_(0)
00034 {
00035 setDetachState( PTHREAD_CREATE_DETACHED);
00036 {
00037 AbstractEquipment* eqp = AbstractEquipment::getInstance();
00038 std::string instanceFile = eqp->getDeviceDataFileName();
00039 XMLParser xmlParser(instanceFile, false);
00040 ThreadPriorityConfigurationFromFile conf(xmlParser, eqp->getProcessConfiguration());
00041 int32_t prio = conf.getPrioRTScheduler(name_);
00042 setPriority(prio);
00043 }
00044
00045 postEventCount_.last_ = 0;
00046 postEventCount_.current_ = 0;
00047 postEventCount_.max_ = 0;
00048 scheduledEventCount_.last_ = 0;
00049 scheduledEventCount_.current_ = 0;
00050 scheduledEventCount_.max_ = 0;
00051 accumulatedEventCount_.last_ = 0;
00052 accumulatedEventCount_.current_ = 0;
00053 accumulatedEventCount_.max_ = 0;
00054
00055 std::ostringstream message;
00056 message << " Creation of RTScheduler: " << name;
00057 LOG_TRACE_IF(logger, message.str());
00058 }
00059
00060 RTScheduler::~RTScheduler()
00061 {
00062 std::map<const std::string, std::vector<AbstractRTAction*> >::iterator schedullingUnit;
00063 std::vector<AbstractRTAction*>::iterator actions;
00064
00065 for (schedullingUnit = schedulingMap_.begin(); schedullingUnit != schedulingMap_.end(); schedullingUnit++)
00066 for (actions = schedullingUnit->second.begin(); actions != schedullingUnit->second.end(); actions++)
00067 if (*actions != NULL)
00068 {
00069 delete *actions;
00070 *actions = NULL;
00071 }
00072 }
00073
00074 void RTScheduler::post(boost::shared_ptr<RTEvent>& pEv)
00075 {
00076 (postEventCount_.current_)++;
00077
00078 accumulatedEventCount_.current_ = static_cast<uint32_t>(eventQueue_.size());
00079 if (accumulatedEventCount_.current_ > accumulatedEventCount_.max_)
00080 {
00081 accumulatedEventCount_.max_ = accumulatedEventCount_.current_;
00082 }
00083 Lock lock(mutexQueue_);
00084 if ((queueSize_ < 0) ||
00085 (eventQueue_.size() < (uint32_t) queueSize_))
00086 {
00087
00088 eventQueue_.push_back(pEv);
00089
00090
00091
00092 conditionalVariableQueue_.signal();
00093 if (logger.isLoggable(CMW::Log::Level::LL_TRACE))
00094 {
00095 std::ostringstream message;
00096 message << "Signaling " << name_;
00097 LOG_TRACE_IF(logger, message.str());
00098 }
00099 }
00100 else
00101 {
00102 std::ostringstream errorStrStream;
00103 errorStrStream << "Could not post event: " << pEv->getName() << "; RTScheduler queue is completely full." << std::endl;
00104 LOG_ERROR_IF(logger, errorStrStream.str());
00105 FesaException exception(__FILE__, __LINE__, errorStrStream.str());
00106 AbstractRTEquipment::getInstance()->handleSchedulerError(this,exception);
00107 }
00108 }
00109
00110 void RTScheduler::run()
00111 {
00112 if (logger.isLoggable(CMW::Log::Level::LL_DEBUG))
00113 {
00114 std::ostringstream message;
00115 message << "Running scheduler " << name_;
00116 LOG_DEBUG_IF(logger, message.str());
00117 }
00118 while (isRunning_)
00119 {
00120
00121 boost::shared_ptr<RTEvent> pShEvt;
00122 {
00123 Lock lock(mutexQueue_);
00124 if (eventQueue_.size() == 0)
00125 {
00126 conditionalVariableQueue_.wait(mutexQueue_);
00127 if (logger.isLoggable(CMW::Log::Level::LL_TRACE))
00128 {
00129 std::ostringstream message;
00130 message << "Post received for " << name_;
00131 LOG_TRACE_IF(logger, message.str());
00132 }
00133 }
00134
00135 pShEvt = eventQueue_.front();
00136
00137 eventQueue_.pop_front();
00138 }
00139
00140 RTEvent* pEvent = pShEvt.get();
00141 if (isRunning_ && pEvent != NULL)
00142 {
00143
00144 schedule(pEvent);
00145 }
00146 else
00147 {
00148 continue;
00149 }
00150 }
00151 hasFinished_ = true;
00152 }
00153
00154 void RTScheduler::logDiagnostics(RTEvent*& pEvent)
00155 {
00156 const boost::shared_ptr<Diagnostics>& diagnostics = AbstractEquipment::getInstance()->getDiagnostics();
00157 DiagnosticUtils::DiagnosticMessage diagMsg;
00158 diagMsg.side = DiagnosticUtils::framework;
00159 diagMsg.source = DiagnosticUtils::rt;
00160 std::ostringstream traceStrStream;
00161 traceStrStream << "RTEvent '" << pEvent->getName() << "'";
00162 diagMsg.msg = traceStrStream.str();
00163 diagnostics->log(diagTopic, diagMsg);
00164 }
00165
00166 void RTScheduler::schedule(RTEvent* pEvent)
00167 {
00168 logDiagnostics(pEvent);
00169 std::map<const std::string, std::vector<AbstractRTAction*> >::iterator eventIterator = schedulingMap_.find(
00170 pEvent->getName());
00171 if (eventIterator != schedulingMap_.end())
00172 {
00173 (scheduledEventCount_.current_)++;
00174 std::vector<AbstractRTAction*>::iterator actionsIterator;
00175 for (actionsIterator = eventIterator->second.begin(); actionsIterator != eventIterator->second.end();
00176 actionsIterator++)
00177 {
00178 try
00179 {
00180 if (logger.isLoggable(CMW::Log::Level::LL_TRACE))
00181 {
00182 std::ostringstream message;
00183 message << "Executing action " << (*actionsIterator)->getName();
00184 LOG_TRACE_IF(logger, message.str());
00185 }
00186 (*actionsIterator)->executeAction(pEvent);
00187 }
00188 catch (FesaException& ex)
00189 {
00190 std::ostringstream errorStrStream;
00191 errorStrStream << "RTScheduler::schedule() caught FesaException during " << name_
00192 << " RTAction execution: " << ex.getMessage();
00193 LOG_ERROR_IF(logger, errorStrStream.str());
00194 }
00195 catch (std::exception& ex)
00196 {
00197 std::ostringstream errorStrStream;
00198 errorStrStream << "RTScheduler::schedule() caught std::exception during " << name_
00199 << " RTAction execution: " << ex.what();
00200 LOG_ERROR_IF(logger, errorStrStream.str());
00201 }
00202
00203 catch (...)
00204 {
00205 std::ostringstream errorStrStream;
00206 errorStrStream << "RTScheduler::schedule() caught unknown exception during " << name_
00207 << " RTAction execution";
00208 LOG_ERROR_IF(logger, errorStrStream.str());
00209 }
00210 }
00211 }
00212 else
00213 {
00214 std::ostringstream errorStrStream;
00215 errorStrStream << "RTScheduler::schedule() " << name_ << " RTAction execution: "
00216 << "scheduler not found for event: " << pEvent->getName();
00217 LOG_ERROR_IF(logger, errorStrStream.str());
00218 }
00219 }
00220
00221 void RTScheduler::printConfig(FesaStream* configStream)
00222 {
00223 std::map<std::string, std::vector<AbstractRTAction*> >::iterator itr = schedulingMap_.begin();
00224 std::map<std::string, std::vector<AbstractRTAction*> >::iterator end = schedulingMap_.end();
00225 *configStream << "\t\t\t<concurrent-layer name=\"" << name_ << "\" priority=\"" << getPriority() << "\">"
00226 << std::endl;
00227 for (; itr != end; ++itr)
00228 {
00229 *configStream << "\t\t\t\t<event name=\"" << itr->first << "\">" << std::endl;
00230 std::vector<AbstractRTAction*>::iterator itrAct = itr->second.begin();
00231 std::vector<AbstractRTAction*>::iterator endAct = itr->second.end();
00232 for (; itrAct != endAct; ++itrAct)
00233 {
00234 (*itrAct)->printConfig(configStream);
00235 }
00236 *configStream << "\t\t\t\t</event>" << std::endl;
00237 }
00238
00239 *configStream << "\t\t\t</concurrent-layer>" << std::endl;
00240 }
00241
00242 void RTScheduler::printState(FesaStream* fesaStream, double elapsedTime)
00243 {
00244 float freq_postEventCount;
00245 float freq_scheduledEventCount;
00246 *fesaStream << "\nSCHEDULING --------------------------------------------------------------" << std::endl;
00247 freq_postEventCount = (float) (((postEventCount_.current_ - postEventCount_.last_) / elapsedTime));
00248 freq_scheduledEventCount = (float) (((scheduledEventCount_.current_ - scheduledEventCount_.last_) / elapsedTime));
00249 *fesaStream << "\n concurrent-layer thread-name = \"" << name_ << "\"" << std::endl;
00250 *fesaStream << " thread-id:\t" << getID() << std::endl;
00251
00252 *fesaStream << " priority:\t\t" << getPriority() << std::endl;
00253 *fesaStream << " event-queue-size:\t" << queueSize_ << std::endl;
00254 *fesaStream << " post-event-count (last/current: freq):\t\t" << postEventCount_.last_ << "/"
00255 << postEventCount_.current_ << ": " << freq_postEventCount << " Hz" << std::endl;
00256 *fesaStream << " scheduled-event-count (last/current: freq):\t" << scheduledEventCount_.last_ << "/"
00257 << scheduledEventCount_.current_ << ": " << freq_scheduledEventCount << " Hz" << std::endl;
00258 *fesaStream << " current-accumulated-event-count:\t" << accumulatedEventCount_.current_ << std::endl;
00259 *fesaStream << " max-accumulated-event-count:\t" << accumulatedEventCount_.max_ << std::endl;
00260 *fesaStream << " lost-event-count:\t\t" << lostEventCount_ << std::endl;
00261 postEventCount_.last_ = postEventCount_.current_;
00262 scheduledEventCount_.last_ = scheduledEventCount_.current_;
00263
00264 std::map<std::string, std::vector<AbstractRTAction*> >::iterator itrEvt = schedulingMap_.begin();
00265 std::map<std::string, std::vector<AbstractRTAction*> >::iterator endEvt = schedulingMap_.end();
00266 for (; itrEvt != endEvt; ++itrEvt)
00267 {
00268 *fesaStream << "\n event name = \"" << itrEvt->first << "\"" << std::endl;
00269 std::vector<AbstractRTAction*>::iterator itrAct = itrEvt->second.begin();
00270 std::vector<AbstractRTAction*>::iterator endAct = itrEvt->second.end();
00271 for (; itrAct != endAct; ++itrAct)
00272 (*itrAct)->printState(fesaStream, elapsedTime);
00273
00274 }
00275
00276 }
00277
00278 void RTScheduler::cancel()
00279 {
00280 isRunning_ = false;
00281 int32_t attempts = MAX_ATTEMPTS_STOP_THREAD;
00282
00283
00284 while (!hasFinished_ && attempts > 0)
00285 {
00286 attempts--;
00287 conditionalVariableQueue_.signal();
00288 sleep(1);
00289
00290 }
00291 if (attempts == 0)
00292 {
00293 stop();
00294 std::ostringstream errorStrStream;
00295 errorStrStream << "During shut down procedure RTScheduler: " << name_ << " was forced to exit ";
00296 LOG_ERROR_IF(logger, errorStrStream.str());
00297 }
00298 }
00299
00300 void RTScheduler::addRTAction(const std::string& eventName, AbstractRTAction* rtAction)
00301 {
00302 std::map<const std::string, std::vector<AbstractRTAction*> >::iterator rtActionList = schedulingMap_.find(
00303 eventName);
00304
00305 if (rtActionList != schedulingMap_.end())
00306 {
00307 rtActionList->second.push_back(rtAction);
00308 }
00309 else
00310 {
00311 std::vector<AbstractRTAction*> actions;
00312 actions.push_back(rtAction);
00313 schedulingMap_.insert(std::make_pair(eventName, actions));
00314 }
00315 }
00316
00317 }