AbstractEventSource.cpp
Go to the documentation of this file.00001
00002
00003 #include <fesa-core/RealTime/AbstractEventSource.h>
00004
00005 #include <fesa-core/RealTime/AbstractRTEquipment.h>
00006 #include <fesa-core/RealTime/RTEvent.h>
00007 #include <fesa-core/RealTime/RTScheduler.h>
00008 #include <fesa-core/Diagnostic/FesaStream.h>
00009 #include <fesa-core/Core/AbstractEquipment.h>
00010 #include <fesa-core/Core/ThreadPriorityConfiguration.h>
00011 #include <fesa-core/Utilities/XMLParser.h>
00012 #include <fesa-core/Utilities/Lock.h>
00013 #include <fesa-core/Utilities/ParserElements/EventElement.h>
00014
00015 #include <cmw-log/Logger.h>
00016
00017
00018 namespace
00019 {
00020
00021 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.RealTime.AbstractEventSource");
00022
00023 }
00024
00025
00026 namespace fesa
00027 {
00028
00029 AbstractEventSource::AbstractEventSource(const std::string eventSourceName, EventSourceType type) :
00030 sourceIsEnabled_(true),
00031 eventSourceName_(eventSourceName),
00032 type_(type)
00033 {
00034 std::ostringstream message;
00035 message << "createEventSource, name: " << eventSourceName_;
00036 LOG_TRACE_IF(logger, message.str());
00037 Thread::registerThreadIdName(getID(), eventSourceName);
00038
00039
00040 firedEventCount_.last_ = 0;
00041 firedEventCount_.current_ = 0;
00042 firedEventCount_.max_ = 0;
00043 {
00044 AbstractEquipment* eqp = AbstractEquipment::getInstance();
00045 std::string instanceFile = eqp->getDeviceDataFileName();
00046 XMLParser xmlParser(instanceFile, false);
00047 ThreadPriorityConfigurationFromFile conf(xmlParser,eqp->getProcessConfiguration());
00048 int32_t prio = conf.getPrioEventSource(eventSourceName_);
00049 setPriority(prio);
00050 }
00051 }
00052
00053 AbstractEventSource::~AbstractEventSource()
00054 {
00055 std::ostringstream message;
00056 message << "Removing event source: " << eventSourceName_;
00057 LOG_TRACE_IF(logger, message.str());
00058 }
00059
00060 void AbstractEventSource::run()
00061 {
00062 isRunning_ = true;
00063 AbstractRTEquipment* rtEquipment = AbstractRTEquipment::getInstance();
00064 RTEvent* pEvt;
00065 while (isRunning_)
00066 {
00067 try
00068 {
00069 waitTillSourceEnabled();
00070 pEvt = wait();
00071 if (pEvt != NULL)
00072 {
00073 boost::shared_ptr<RTEvent> sh_pEvt(pEvt);
00074 postEventToSchedulers(sh_pEvt);
00075 (firedEventCount_.current_)++;
00076 }
00077 }
00078 catch (FesaException& ex)
00079 {
00080 std::ostringstream errorStrStream;
00081 errorStrStream << ex.getMessage();
00082 LOG_ERROR_IF(logger, errorStrStream.str());
00083 rtEquipment->handleEventSourceError(this,ex);
00084 continue;
00085 }
00086 catch (...)
00087 {
00088 LOG_ERROR_IF(logger, "Unknown exception caught");
00089 FesaException ex (__FILE__, __LINE__, "Unknown exception caught");
00090 rtEquipment->handleEventSourceError(this, ex);
00091 continue;
00092 }
00093 }
00094
00095 std::ostringstream message;
00096 message << "Terminating " << eventSourceName_;
00097 LOG_TRACE_IF(logger, message.str());
00098 hasFinished_ = true;
00099
00100 }
00101
00102 void AbstractEventSource::postEventToSchedulers(boost::shared_ptr<RTEvent>& event)
00103 {
00104 ConcreteEventMap::const_iterator eventIter = eventMap_.find(event->getName());
00105 if (eventIter == eventMap_.end())
00106 {
00107 std::ostringstream message;
00108 message << "No scheduler available to scheduler the concrete-event:" << event->getName() << " in event-source: " << getName() ;
00109 LOG_TRACE_IF(logger, message.str());
00110 return;
00111 }
00112
00113 std::vector<SubscribedScheduler>::const_iterator sched_iter;
00114 for (sched_iter = eventIter->second.begin(); sched_iter != eventIter->second.end(); ++sched_iter)
00115 {
00116 if(sched_iter->isEnabled_)
00117 {
00118 if(logger.isLoggable(CMW::Log::Level::LL_TRACE))
00119 {
00120 std::ostringstream message;
00121 message << "Posting logical-event: " << eventIter->first << " to scheduler: " << sched_iter->scheduler_->getName();
00122 LOG_TRACE_IF(logger, message.str());
00123 }
00124 sched_iter->scheduler_->post(event);
00125 }
00126 else
00127 {
00128 std::ostringstream message;
00129 message << "The logical event:" << eventIter->first << " is disabled and will not be fired.";
00130 LOG_TRACE_IF(logger, message.str());
00131 }
00132 }
00133 }
00134
00135 void AbstractEventSource::addRTScheduler(boost::shared_ptr<fesa::EventElement>& eventElement, RTScheduler* scheduler)
00136 {
00137 SubscribedScheduler schedulingInfo;
00138 schedulingInfo.scheduler_ = scheduler;
00139 schedulingInfo.logicalEventName_ = eventElement->getLogicalName();
00140 schedulingInfo.isEnabled_ = true;
00141
00142 ConcreteEventMap::iterator eventIter = eventMap_.find(eventElement->getConcreteName());
00143 if (eventIter == eventMap_.end())
00144 {
00145 std::vector< SubscribedScheduler > schedulers;
00146 schedulers.push_back(schedulingInfo);
00147 eventMap_.insert(make_pair(eventElement->getConcreteName(),schedulers));
00148
00149 connect(eventElement);
00150 }
00151 else
00152 {
00153 std::vector< SubscribedScheduler >::iterator schedIter;
00154 for (schedIter= eventIter->second.begin(); schedIter != eventIter->second.end(); ++schedIter)
00155 {
00156 if (schedIter->scheduler_ == scheduler && schedIter->logicalEventName_ == eventElement->getLogicalName() )
00157 {
00158 std::ostringstream message;
00159 message << "RTScheduler: " << scheduler->getName() << " already registered for event: " << eventElement->getLogicalName();
00160 LOG_TRACE_IF(logger, message.str());
00161 return;
00162 }
00163 }
00164 eventIter->second.push_back(schedulingInfo);
00165 }
00166 std::ostringstream message;
00167 message << "Added scheduler: " << scheduler->getName() << " for logical-event: " << eventElement->getLogicalName();
00168 LOG_TRACE_IF(logger, message.str());
00169 }
00170
00171 void AbstractEventSource::waitTillSourceEnabled()
00172 {
00173 while(!sourceIsEnabled_)
00174 {
00175 Lock lock(mutex_waitIfDisabled_);
00176 waitIfDisabled_.wait(mutex_waitIfDisabled_);
00177 }
00178 }
00179
00180 void AbstractEventSource::printConfig(FesaStream* configStream) const
00181 {
00182 *configStream << "\t\t\t<event-source name=\"" << eventSourceName_ << "\">" << std::endl;
00183 ConcreteEventMap::const_iterator itr;
00184 for ( itr = eventMap_.begin(); itr != eventMap_.end(); ++itr)
00185 {
00186 *configStream << "\t\t\t\t<event concreteName=\"" << itr->first << "\"/>" << std::endl;
00187 }
00188 *configStream << "\t\t\t</event-source>" << std::endl;
00189 }
00190
00191 void AbstractEventSource::printState(FesaStream* fesaStream, double elapsedTime)
00192 {
00193 float freq_firedEventCount = (float) ((firedEventCount_.current_ - firedEventCount_.last_) / elapsedTime);
00194 *fesaStream << "\n event-source thread-name = \"" << eventSourceName_ << "\"" << std::endl;
00195 *fesaStream << " thread-id:\t" << getID() << std::endl;
00196 *fesaStream << " priority:\t\t" << getEvtSrcPriority() << std::endl;
00197 *fesaStream << " fired-event-count (last/current: freq):\t\t" << firedEventCount_.last_ << "/" << firedEventCount_.current_ << ": " << freq_firedEventCount << " Hz" << std::endl;
00198 firedEventCount_.last_ = firedEventCount_.current_;
00199
00200 }
00201 const std::string& AbstractEventSource::getName() const
00202 {
00203 return eventSourceName_;
00204 }
00205
00206 EventSourceType AbstractEventSource::getType() const
00207 {
00208 return type_;
00209 }
00210
00211 uint32_t AbstractEventSource::getEvtSrcPriority() const
00212 {
00213 return (getPriority());
00214 }
00215
00216 uint32_t AbstractEventSource::getEvtSrcID() const
00217 {
00218 return ((uint32_t) getID());
00219 }
00220
00221 void AbstractEventSource::enable()
00222 {
00223 sourceIsEnabled_ = true;
00224 {
00225 Lock lock(mutex_waitIfDisabled_);
00226 waitIfDisabled_.signal();
00227 }
00228 std::ostringstream message;
00229 message << "Events source: " << eventSourceName_ << " got enabled.";
00230 LOG_TRACE_IF(logger, message.str());
00231 }
00232
00233 void AbstractEventSource::disable()
00234 {
00235 sourceIsEnabled_ = false;
00236 std::ostringstream message;
00237 message << "Events source: " << eventSourceName_ << " got disabled.";
00238 LOG_TRACE_IF(logger, message.str());
00239 }
00240
00241 void AbstractEventSource::enableEvent(const std::string& logicalEventName)
00242 {
00243 setLogicalEventEnableState(true, logicalEventName);
00244 std::ostringstream message;
00245 message << "Event: " << logicalEventName << " of events-source: " << eventSourceName_ << " got enabled.";
00246 LOG_TRACE_IF(logger, message.str());
00247 }
00248
00249 void AbstractEventSource::disableEvent(const std::string& logicalEventName)
00250 {
00251 setLogicalEventEnableState(false, logicalEventName);
00252 std::ostringstream message;
00253 message << "Event: " << logicalEventName << " of events-source: " << eventSourceName_ << " got disabled.";
00254 LOG_TRACE_IF(logger, message.str());
00255 }
00256
00257 bool AbstractEventSource::isEventEnabled(const std::string& logicalEventName) const
00258 {
00259 ConcreteEventMap::const_iterator eventIter;
00260 std::vector< SubscribedScheduler >::const_iterator schedulerIter;
00261 for(eventIter = eventMap_.begin();eventIter!=eventMap_.end();++eventIter)
00262 {
00263 for(schedulerIter = eventIter->second.begin();schedulerIter!=eventIter->second.end();++schedulerIter)
00264 {
00265 if(schedulerIter->logicalEventName_ == logicalEventName)
00266 {
00267
00268
00269 return schedulerIter->isEnabled_;
00270 }
00271 }
00272 }
00273
00274 throw FesaException(__FILE__, __LINE__, FesaErrorEventNotFound.c_str(), logicalEventName.c_str());
00275 }
00276
00277 void AbstractEventSource::setLogicalEventEnableState(bool newState,const std::string& logicalEventName)
00278 {
00279 bool found = false;
00280 ConcreteEventMap::iterator eventIter;
00281 std::vector< SubscribedScheduler >::iterator schedulerIter;
00282 for(eventIter = eventMap_.begin();eventIter!=eventMap_.end();++eventIter)
00283 {
00284 for(schedulerIter = eventIter->second.begin();schedulerIter!=eventIter->second.end();++schedulerIter)
00285 {
00286 if(schedulerIter->logicalEventName_ == logicalEventName)
00287 {
00288 schedulerIter->isEnabled_ = newState;
00289 found = true;
00290 }
00291 }
00292 }
00293 if( !found )
00294 {
00295 throw FesaException(__FILE__, __LINE__, FesaErrorEventNotFound.c_str(), logicalEventName.c_str());
00296 }
00297 }
00298
00299 }