RTController.cpp

Go to the documentation of this file.
00001 // Copyright CERN 2012 - Developed in collaboration with GSI
00002 
00003 #include <fesa-core/RealTime/RTController.h>
00004 #include <fesa-core/RealTime/AbstractRTActionFactory.h>
00005 #include <fesa-core/RealTime/AbstractEventSourceFactory.h>
00006 #include <fesa-core/DataStore/AbstractDeviceFactory.h>
00007 #include <fesa-core/Core/AbstractEquipment.h>
00008 #include <fesa-core/Core/NotificationIDManager.h>
00009 #include <fesa-core/Core/AbstractDeviceClass.h>
00010 #include <fesa-core/Sorting/HomogeneousDevCol.h>
00011 #include <fesa-core/Exception/FesaExceptionDef.h>
00012 #include <fesa-core/RealTime/AbstractEventSourceFactory.h>
00013 #include <fesa-core/RealTime/EventsMappingLoader.h>
00014 #include <fesa-core/RealTime/AbstractRTEquipment.h>
00015 #include <fesa-core/RealTime/RTScheduler.h>
00016 #include <fesa-core/Diagnostic/FesaStream.h>
00017 #include <fesa-core/Utilities/ParserElements/EventElement.h>
00018 #include <fesa-core/Sorting/SortingInterpreter.h>
00019 #include <fesa-core/RealTime/AbstractRTAction.h>
00020 
00021 #include <iostream>
00022 #include <sys/stat.h>
00023 #include <vector>
00024 #include <sstream>
00025 
00026 #include <cmw-log/Logger.h>
00027 
00028 namespace
00029 {
00030 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.RealTime");
00031 }
00032 
00033 namespace fesa
00034 {
00035 
00036     RTController::RTController()
00037     {
00038     }
00039 
00040     // remove the resources
00041     // CRFESA-764 to be review in the shutdown procedure
00042     RTController::~RTController()
00043     {
00044         LOG_TRACE_IF(logger, "Removing controller");
00045         std::map<const std::string, RTScheduler*>::iterator schedulers;
00046         for (schedulers = schedulersCol_.begin(); schedulers != schedulersCol_.end(); schedulers++)
00047         {
00048             if (schedulers->second != NULL)
00049             {
00050                 schedulers->second->stop();
00051             }
00052         }
00053         AbstractEventSourceFactory::stopEventSources();
00054         for (schedulers = schedulersCol_.begin(); schedulers != schedulersCol_.end(); schedulers++)
00055         {
00056             if (schedulers->second != NULL)
00057             {
00058                 delete schedulers->second;
00059                 schedulers->second = NULL;
00060 
00061             }
00062         }
00063         AbstractEventSourceFactory::deleteEventSources();
00064         LOG_TRACE_IF(logger, "Controller removed");
00065     }
00066 
00067     void RTController::initialize()
00068     {
00069         // Get scheduler (layer) information from the RTEquipment definition
00070         AbstractRTEquipment::RTLayerInfoCol layerInfoCol;
00071         AbstractRTEquipment::getInstance()->fillRTSchedulerInfo(layerInfoCol);
00072         // Create RT infrastructure needed by each RTDeviceClass involved in the same DeployUnit
00073         const std::vector<AbstractDeviceClass*>& pRTDeviceClassCol =
00074                         AbstractRTEquipment::getInstance()->getDeviceClassCol();
00075         for (std::vector<AbstractDeviceClass*>::const_iterator it = pRTDeviceClassCol.begin(); it
00076                         != pRTDeviceClassCol.end(); ++it)
00077         {
00078             AbstractRTDeviceClass* pRTDeviceClass = static_cast<AbstractRTDeviceClass*> (*it);
00079             struct stat stFileInfo;
00080             const std::string instanceFile = AbstractEquipment::getInstance()->getDeviceDataFileName();
00081             int32_t exists = stat(instanceFile.c_str(), &stFileInfo); // returns 0 if the file exists
00082             bool isOptional = AbstractEquipment::getInstance()->isClassOptional(pRTDeviceClass->getName());
00083             if (exists != 0 && !isOptional) // file does not exist and not optional => exception
00084             {
00085                 LOG_ERROR_IF(logger, "Instantiation file does not exist and not optional");
00086                 throw FesaFileException(__FILE__, __LINE__, FesaErrorOpenXMLFile.c_str(), instanceFile.c_str());
00087 
00088             }
00089             else if (exists == 0)
00090             {
00091                 std::ostringstream msg;
00092                 msg << "Configuring the RTController with the info coming from RTDeviceClass " << pRTDeviceClass->getName();
00093                 LOG_DEBUG_IF(logger, msg.str());
00094                 createSchedulers(layerInfoCol, pRTDeviceClass, instanceFile);
00095             }
00096 
00097         }
00098         //check that all scheduling units have been used. Delete the layers
00099         for (unsigned i = 0; i < layerInfoCol.size(); ++i)
00100         {
00101             assertSchedulingUnitsConfig(layerInfoCol[i]->schedulingUnitRefCol_);
00102             delete (layerInfoCol[i]);
00103         }
00104         //all NotificationID's have been created now by the RT actions ... lets save them
00105         NotificationIDManager::getInstance()->save();
00106         if (schedulersCol_.empty())//No schedulers? --> we dont need a RealTime side at all!
00107         {
00108             LOG_ERROR_IF(logger, "No scheduler found => RT part unnecessary.");
00109             throw FesaException(__FILE__, __LINE__, FesaErrorNothingToDoForRTSide.c_str());
00110         }
00111     }
00112 
00113 void RTController::assertSchedulingUnitsConfig(const std::vector<RTSchedulingUnitRef> schedulingUnits)
00114 {
00115     for (std::vector<RTSchedulingUnitRef>::const_iterator it = schedulingUnits.begin(); it != schedulingUnits.end();
00116                     ++it)
00117     {
00118         const RTSchedulingUnitRef& unit = *it;
00119         if (unit.found_)
00120         {
00121             continue;
00122         }
00123         bool schedFound = false;
00124         // check if the scheduling unit not found was not found because the device collection was empty
00125         for (std::vector<std::string>::iterator itr = schedulingUnitWithEmptyDevCol_.begin();
00126                         itr != schedulingUnitWithEmptyDevCol_.end(); ++itr)
00127         {
00128             if (unit.name_ == *itr)
00129             {
00130                 schedFound = true;
00131                 break;
00132             }
00133         }
00134         if (schedFound == false)
00135         {
00136             std::ostringstream msg;
00137             msg << "Scheduling unit " << unit.name_ << " for class " << unit.className_ << " not found.";
00138             LOG_ERROR_IF(logger, msg.str());
00139             throw FesaException(__FILE__, __LINE__, FesaErrorSchedulingUnitNotFound.c_str(), unit.name_.c_str(),
00140                             unit.className_.c_str());
00141         }
00142     }
00143 }
00144 
00145     void RTController::createSchedulers(AbstractRTEquipment::RTLayerInfoCol& layerInfoCol,
00146                     AbstractRTDeviceClass* pRTDeviceClass, const std::string instanceFile)
00147     {
00148         // fill the collections
00149         std::vector<RTSchedulingUnitInfo*> schedulingUnitsInfoCol;
00150         AbstractRTDeviceClass::RTActionInfoCol actionInfoCol;
00151         pRTDeviceClass->fillRTSchedulingUnitsInfo(schedulingUnitsInfoCol);
00152         pRTDeviceClass->addDiagnosticSchedulingUnitInfo(schedulingUnitsInfoCol);
00153         pRTDeviceClass->fillRTActionInfo(actionInfoCol);
00154         pRTDeviceClass->addDiagnosticRTActionInfo(actionInfoCol);
00155 
00156         XMLParser xmlParser(instanceFile, false);
00157         boost::shared_ptr<ClassElement> pClassElement(new ClassElement(pRTDeviceClass)); // FIXME: Do we need a pointer here because of its implementation? See shared_from_this in loadLogicalEventElements
00158         EventsMappingLoader eventsManager(pClassElement,&xmlParser);
00159 
00160         // for each scheduling-unit  
00161         for (std::vector<RTSchedulingUnitInfo*>::iterator schedulingUnit = schedulingUnitsInfoCol.begin(); schedulingUnit
00162                         != schedulingUnitsInfoCol.end(); ++schedulingUnit)
00163         {
00164             if (logger.isLoggable(CMW::Log::Level::LL_TRACE))
00165             {
00166                 std::ostringstream msg;
00167                 msg << "Creating RT action config for scheduling unit " << (*schedulingUnit)->name_;
00168                 LOG_TRACE_IF(logger, msg.str());
00169             }
00170             // create RT action configuration
00171             createRTActionConfig(layerInfoCol, pRTDeviceClass, *schedulingUnit, actionInfoCol, eventsManager);
00172             delete (*schedulingUnit); // this one is processed and was allocated by the pRTDeviceClass->fillRTSchedulingUnitsInfo => delete it
00173         }
00174         // delete resources
00175         for (AbstractRTDeviceClass::RTActionInfoCol::iterator actions = actionInfoCol.begin(); actions
00176                         != actionInfoCol.end(); actions++)
00177         {
00178             delete (actions->second);
00179         }
00180         actionInfoCol.clear();
00181     } // end createSchedulers
00182 
00183 void RTController::createRTActionConfig(AbstractRTEquipment::RTLayerInfoCol& layerInfoCol, AbstractRTDeviceClass* pRTDeviceClass,
00184                 RTSchedulingUnitInfo* schedulingUnit, AbstractRTDeviceClass::RTActionInfoCol& actionInfoCol, EventsMappingLoader& eventsManager)
00185 {
00186     AbstractRTDeviceClass::RTActionInfoCol::iterator itActionInfo = actionInfoCol.find(schedulingUnit->actionName_);
00187     if (itActionInfo == actionInfoCol.end())
00188     {
00189         throw FesaException(__FILE__, __LINE__, FesaErrorRTActionNameNotFound.c_str(),
00190                         schedulingUnit->actionName_.c_str());
00191     }
00192     RTActionConfig actionConfig;
00193     actionConfig.actionName_ = schedulingUnit->actionName_;
00194     actionConfig.automaticallyNotifiedPropertiesCol_ = itActionInfo->second->automaticallyNotifiedPropertiesCol_;
00195     actionConfig.manuallyNotifiedPropertiesCol_ = itActionInfo->second->manuallyNotifiedPropertiesCol_;
00196     actionConfig.isGlobalPropertyToBeNotified_ = itActionInfo->second->isGlobalPropertyToBeNotified_;
00197     actionConfig.selectionCriterion_ = schedulingUnit->selectionCriterion_;
00198     actionConfig.className_ = schedulingUnit->className_;
00199     actionConfig.globalDevice_ = pRTDeviceClass->getDeviceFactory()->getGlobalDevice();
00200     // Looking the concrete-name for this logical event name and scheduling
00201     std::vector<EventElementPointer> eventConfigurationCollection = eventsManager.getEventElementCol(
00202                     schedulingUnit->eventName_);
00203     for (std::vector<EventElementPointer>::iterator itEvt = eventConfigurationCollection.begin();
00204                     itEvt != eventConfigurationCollection.end(); ++itEvt)
00205     {
00206         if (logger.isLoggable(CMW::Log::Level::LL_TRACE))
00207         {
00208             std::ostringstream msg;
00209             msg << "Processing event " << (*itEvt)->getConcreteName();
00210             LOG_TRACE_IF(logger, msg.str());
00211         }
00212         actionConfig.eventSourceName_ = (*itEvt)->getEventSourceName();
00213         const std::set<AbstractDevice*>& devCol = (*itEvt)->getDeviceCollection();
00214         actionConfig.eventName_ = (*itEvt)->getConcreteName(); // Concrete event name
00215         // split devices depending on selection-criterion
00216         std::vector<AbstractDevice *> unsortedDevicelCol;
00217         std::copy(devCol.begin(), devCol.end(), std::back_inserter(unsortedDevicelCol)); // TODO Why's that if it's just for the SortingInterp, can we change it?
00218         SortingInterpreter sortingInterpreter;
00219         std::set<HomogeneousDevCol *> collectionOfDevCol = sortingInterpreter.interpret(
00220                         actionConfig.selectionCriterion_, actionConfig.className_, &(unsortedDevicelCol));
00221         // register scheduling-unit with empty device collections
00222         if (collectionOfDevCol.empty())
00223         {
00224             schedulingUnitWithEmptyDevCol_.push_back(schedulingUnit->name_);
00225             std::ostringstream msg;
00226             msg << "Scheduling unit " << schedulingUnit->name_ << " doesn't have any device => ignore it.";
00227             LOG_TRACE_IF(logger, msg.str());
00228         }
00229         else
00230         {
00231             for (std::set<HomogeneousDevCol*>::iterator itDevCol = collectionOfDevCol.begin();
00232                             itDevCol != collectionOfDevCol.end(); ++itDevCol)
00233             {
00234                 if (logger.isLoggable(CMW::Log::Level::LL_TRACE))
00235                 {
00236                     std::ostringstream msg;
00237                     msg << "Processing device collection of " << (*itDevCol)->getSize() << " devices";
00238                     LOG_TRACE_IF(logger, msg.str());
00239                 }
00240                 std::set<AbstractDevice*>* deviceCollection = (*itDevCol)->getDevCol();
00241                 RTScheduler* scheduler = createAction(layerInfoCol, pRTDeviceClass, schedulingUnit, actionConfig,
00242                                 deviceCollection);
00243                 if (scheduler != NULL)
00244                 { // create a new event-source (if needed)
00245                     createEventSource(pRTDeviceClass->getName(), *itEvt, scheduler,
00246                                     pRTDeviceClass->getEventSourceFactory());
00247                 }
00248             }
00249         }
00250     }
00251 }
00252 
00253 RTScheduler* RTController::createAction(AbstractRTEquipment::RTLayerInfoCol& layerInfoCol, AbstractRTDeviceClass* pRTDeviceClass,
00254                 RTSchedulingUnitInfo* schedulingUnit, RTActionConfig& actionConfig, std::set<AbstractDevice*>* deviceCollection)
00255 {
00256     actionConfig.deviceCol_.clear();
00257     std::copy(deviceCollection->begin(), deviceCollection->end(), std::back_inserter(actionConfig.deviceCol_)); // TODO Why's that if it's just for the SortingInterp, can we change it?
00258     RTSchedulingUnitRef schedulingUnitRef;
00259     RTLayerInfo* layerInfo;
00260     uint32_t k;
00261     bool schedulingFound = false;
00262     for (k = 0; k < layerInfoCol.size(); ++k)
00263     {
00264         for (uint32_t j = 0; j < layerInfoCol[k]->schedulingUnitRefCol_.size(); ++j)
00265         {
00266             if (((((layerInfoCol[k]->schedulingUnitRefCol_[j].name_) == schedulingUnit->name_))
00267                             && ((schedulingUnit)->name_ == DIAGNOSTIC_LAYER))
00268                             || (((layerInfoCol[k]->schedulingUnitRefCol_[j].name_) == schedulingUnit->name_)
00269                                             && (layerInfoCol[k]->schedulingUnitRefCol_[j].className_
00270                                                             == schedulingUnit->className_)))
00271             {
00272                 layerInfo = layerInfoCol[k];
00273                 schedulingUnitRef = layerInfoCol[k]->schedulingUnitRefCol_[j];
00274                 layerInfoCol[k]->schedulingUnitRefCol_[j].found_ = true;
00275                 schedulingFound = true;
00276                 break;
00277             }
00278         }
00279         if (schedulingFound == true)
00280         {
00281             break;
00282         }
00283     }
00284     if (k == layerInfoCol.size())
00285     {
00286         std::ostringstream msg;
00287         msg << "scheduling unit" << schedulingUnit->className_ << ":" << schedulingUnit->name_.c_str()
00288                         << " is not used in the current deploy unit";
00289         LOG_WARNING_IF(logger, msg.str());
00290         return (RTScheduler*)NULL;
00291     }
00292     // compute scheduler name
00293     std::stringstream layerName;
00294     layerName << layerInfo->layerName_;
00295     // relying on selection criterion if required
00296     if (schedulingUnitRef.perDeviceCollection_)
00297     {
00298         layerName << actionConfig.eventName_;
00299     }
00300     RTScheduler* currentScheduler = findScheduler(layerName.str(), layerInfo->queueSize_);
00301     // create the RT action
00302     AbstractRTActionFactory* pRTActionFactory = pRTDeviceClass->getRTActionFactory();
00303     if (pRTActionFactory == NULL)
00304     {
00305         throw FesaException(__FILE__, __LINE__, FesaErrorRTFactoryNotFound.c_str(), schedulingUnit->className_.c_str());
00306     }
00307     AbstractRTAction* action = pRTActionFactory->createRTAction(actionConfig.actionName_, actionConfig);
00308     if (action == NULL)
00309     {
00310         throw FesaException(__FILE__, __LINE__, FesaErrorRTActionNameNotFound.c_str(),
00311                         actionConfig.actionName_.c_str());
00312     }
00313     // at this point currentScheduler contains the scheduler for this action
00314     currentScheduler->addRTAction(actionConfig.eventName_, action);
00315     if (logger.isLoggable(CMW::Log::Level::LL_TRACE))
00316     {
00317         std::ostringstream msg;
00318         msg << "Action " << action->getName() << " created and added to scheduler " << currentScheduler->getName();
00319         LOG_TRACE_IF(logger, msg.str());
00320     }
00321     return currentScheduler;
00322 }
00323 
00324 RTScheduler* RTController::findScheduler(const std::string& schedulerName, uint32_t queueSize)
00325 {
00326     RTScheduler* scheduler;
00327     std::map<const std::string, RTScheduler*>::iterator schedIter = schedulersCol_.find(schedulerName);
00328     if (schedIter != schedulersCol_.end())
00329     {
00330         scheduler = (*schedIter).second;
00331     }
00332     else // create a new scheduler
00333     {
00334         // Create a log with the appropriate className in oder to follow the
00335         // custom topics which are class dependant
00336         scheduler = new RTScheduler(schedulerName, queueSize);
00337         schedulersCol_.insert(std::make_pair(schedulerName, scheduler));
00338     }
00339     return scheduler;
00340 }
00341 
00342     void RTController::start()
00343     {
00344 
00345         //start the schedulers
00346         std::map<std::string, RTScheduler*>::iterator rtTmpScheduler;
00347 
00348         for (rtTmpScheduler = schedulersCol_.begin(); rtTmpScheduler != schedulersCol_.end(); rtTmpScheduler++)
00349         {
00350             // run all the schedulers but the diagnostic one
00351             const std::string name = rtTmpScheduler->second->getName();
00352             if (name != "diagnosticLayer")
00353             {
00354                 rtTmpScheduler->second->start(false, name);
00355             }
00356         }
00357 
00358         // start the all event sources (Fwk and Custom ones)
00359         AbstractEventSourceFactory::startEventSources();
00360 
00361     rtTmpScheduler = schedulersCol_.find("diagnosticLayer");
00362         // run diagnostic layer in a separate thread without blocking call
00363         if (rtTmpScheduler != schedulersCol_.end())
00364         {
00365         const std::string name = rtTmpScheduler->second->getName();
00366         rtTmpScheduler->second->start(false, name);
00367         }
00368         else
00369         {
00370             //No Diagnostic Layer? --> Something is wrong on the RT side, throw an exception!
00371             throw FesaException(__FILE__, __LINE__, FesaErrorNoDiagnosticLayerFound.c_str());
00372         }
00373     }// end run
00374 
00375 
00376     void RTController::createEventSource(const std::string& className, boost::shared_ptr<fesa::EventElement>& eventElement,
00377                     RTScheduler* currentScheduler, AbstractEventSourceFactory* eventSourceFactory)
00378     {
00379         AbstractEventSource* source;
00380         if(eventSourceFactory->eventSourceExists(className,eventElement->getEventSourceName()))
00381         {
00382                 source = eventSourceFactory->getEventSource(className,eventElement->getEventSourceName());
00383         }
00384         else
00385         {
00386                 source = eventSourceFactory->createEventSource(className,eventElement->getEventSourceName(),eventElement->getEventSourceType());
00387         }
00388 
00389         source->addRTScheduler(eventElement, currentScheduler);
00390     }
00391 
00392     void RTController::printConfigAll(FesaStream* fesaStream)
00393     {
00394 
00395         AbstractEventSourceFactory::printConfigAll(fesaStream);
00396 
00397         *fesaStream << "\t\t<scheduling>" << std::endl;
00398         std::map<const std::string, RTScheduler*>::iterator itr = schedulersCol_.begin();
00399         std::map<const std::string, RTScheduler*>::iterator end = schedulersCol_.end();
00400         for (; itr != end; ++itr)
00401         {
00402             itr->second->printConfig(fesaStream);
00403         }
00404         *fesaStream << "\t\t</scheduling>" << std::endl;
00405     }
00406 
00407     void RTController::printStateAll(FesaStream* fesaStream, double elapsedTime)
00408     {
00409 
00410         AbstractEventSourceFactory::printStateAll(fesaStream, elapsedTime);
00411 
00412         *fesaStream << "\t\t<scheduling>" << std::endl;
00413         std::map<const std::string, RTScheduler*>::iterator itr = schedulersCol_.begin();
00414         std::map<const std::string, RTScheduler*>::iterator end = schedulersCol_.end();
00415         for (; itr != end; ++itr)
00416         {
00417             itr->second->printState(fesaStream, elapsedTime);
00418         }
00419         *fesaStream << "\t\t</scheduling>" << std::endl;
00420 
00421     }
00422 
00423 }// end class

Generated on 18 Jan 2013 for Fesa by  doxygen 1.6.1