SubscriptionTreeManager.cpp

Go to the documentation of this file.
00001 // Copyright CERN 2012 - Developed in collaboration with GSI
00002 
00003 #include <fesa-core/RDADeviceServer/SubscriptionTreeManager.h>
00004 
00005 #include <fesa-core/RDADeviceServer/Notification.h>
00006 #include <fesa-core/RDADeviceServer/SubscriptionParameter.h>
00007 #include <fesa-core/Server/Property.h>
00008 #include <fesa-core/DataStore/AbstractDevice.h>
00009 #include <fesa-core/Core/MessageTypes.h>
00010 #include <fesa-core/Core/NotificationIDManager.h>
00011 #include <fesa-core/Core/NotificationID.h>
00012 #include <fesa-core/Utilities/Lock.h>
00013 
00014 #include <cmw-rda/ValueChangeListener.h>
00015 #include <cmw-log/Logger.h>
00016 
00017 #include <boost/lexical_cast.hpp>
00018 
00019 #include <iostream>
00020 
00021 namespace
00022 {
00023 
00024 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.RDADeviceServer.SubscriptionTreeManager");
00025 
00029 static fesa::Mutex singleMutex;
00030 
00031 }
00032 
00033 namespace fesa
00034 {
00035 
00036 SubscriptionTreeManager::SubscriptionTreeManager(bool useFastUpdate) :
00037     initializationSucessfull_(false), lastErrorCode_(0), useFastUpdate_(useFastUpdate)
00038 {
00039 
00040 }
00041 
00042 SubscriptionTreeManager::~SubscriptionTreeManager()
00043 {
00044     //Shotdown all Notifications
00045     std::map<uint32_t, boost::shared_ptr<Notification> >::iterator iterN;
00046     for (iterN = notifications_.begin(); iterN != notifications_.end(); iterN++)
00047     {
00048         notifications_.erase(iterN);
00049     }
00050 
00051     // Remove all the subscriptions
00052     std::map<std::string, boost::shared_ptr<SubscriptionParameter> >::iterator iterS;
00053     for (iterS = subscriptions_.begin(); iterS != subscriptions_.end(); iterS++)
00054     {
00055         subscriptions_.erase(iterS);
00056     }
00057 }
00058 
00059 //Adding a new subscriber consist to 5 steps:
00060 // -1) Check if the notification are properly initialized.
00061 // -2) Create the parameterName computed form deviceName and propertyname
00062 // -3) Insert the new SubscriptionParameter in the subscriptionParameter collection
00063 // -4) Register the new SubscriptionCycle in the SubscriptionCycle collection
00064 // -5) Iterate over the Notification collection and if the new SubscriptionParameter is controlled by a Notification insert it
00065 
00066 void SubscriptionTreeManager::addSubscriber(const std::string& className, AbstractDevice& device, Property& property, const std::string& cycleSelector,
00067                                             const rdaData& filter, rdaValueChangeListener* subscriber)
00068 {
00069     // add the new subscriber to the tree: Inserting new element in the subscriptionTree requires to manage the concurrent access.
00070     // TODO for the time being the tree is fully locked during the whole insertion operation with the consequence of preventing the notification
00071     // execution. Investigate if this could be refine in order to reduce as much as possible the time between the lock acquisition and the release.
00072     // Add new subscriber ....lock the SubscriberParameter Collection  (unlock is done automatically)
00073 
00074     // TODO: order the subscribers using an additional level: className
00075     // ************ critical section: automatic unlock*************
00076     Lock l(treeMutex_);
00077 
00078     if (!initializationSucessfull_)
00079     {
00080         initNotifications();
00081     }
00082 
00083     std::string paramName = device.getName() + std::string("#") + property.getName();
00084     boost::shared_ptr<SubscriptionParameter> sp = addSubscriptionParam(paramName, device, property, cycleSelector, filter, subscriber);
00085     subscriptionCycle_.addCycle(cycleSelector);
00086 
00087     // addSubscriptionParam returns a SubscriptionParameter* only if a new SubscriptionParameter is created.
00088     // Otherwise, returns NULL
00089     NotificationItr itr = notifications_.begin();
00090     NotificationItr itrEnd = notifications_.end();
00091     for(; itr != itrEnd; ++itr)
00092     {
00093         if((*itr).second->contains(device.getName(), property.getName()))
00094         {
00095             if (sp.get() != NULL) // it's a new subscritionParameter
00096                 (*itr).second->addSubscriber(sp);
00097             (*itr).second->addCycle(cycleSelector);
00098         }
00099     }
00100 
00101     if (subscriber->isOnChange() && !property.isOnChange())
00102     {
00103         std::string message("Warning: Subscription OnChange is not supported by this property");
00104         subscriber->ioFailed(rdaIOError("FESA_RDADeviceServer", 0, message.c_str()));
00105     }
00106     // Print the SubscriptionTree while adding new subscriber
00107     if (logger.isLoggable(CMW::Log::Level::LL_DEBUG))
00108     {
00109         std::string s("add parameter: ");
00110         s.append(paramName);
00111         s.append(" listener: ");
00112         char buf[16];
00113         snprintf(buf, 16, "%p", subscriber);
00114         s.append(buf);
00115         printState(s);
00116     }
00117     //******** end of critical section **********
00118 }
00119 
00120 void SubscriptionTreeManager::removeSubscriber(AbstractDevice& device, Property& property, const std::string& cycleSelector,
00121                                                rdaValueChangeListener* subscriber)
00122 {
00123     // remove a subscriber from the tree: Inserting new element in the subscriptionTree requires to manage the concurrent access.
00124     // TODO for the time being the tree is fully locked during the whole insertion operation with the consequence of preventing the notification
00125     // execution. Investigate if this could be refine in order to reduce as much as possible the time between the lock acquisition and the release.
00126     // remove subscriber ....lock the SubscriberParameter Collection  (unlock is done automatically)
00127     Lock l(treeMutex_);
00128     std::string paramName = device.getName() + std::string("#") + property.getName();
00129     SubscriptionItr subParamItr = subscriptions_.find(paramName);
00130     if (subParamItr != subscriptions_.end())
00131     {
00132         // output parameter saying if the subscriptionParameter isEmpty (means no listeners anymore)
00133         // it has to be deleted from Notification
00134         bool isEmpty = false;
00135         (*subParamItr).second->removeSubscriber(cycleSelector, subscriber, isEmpty);
00136         // remove from NotificationID
00137         NotificationItr notifItr = notifications_.begin();
00138         NotificationItr notifItrEnd = notifications_.end();
00139         {
00140             for(; notifItr != notifItrEnd; ++notifItr)
00141             {
00142                 (*notifItr).second->removeSubscriber((*subParamItr).second, cycleSelector, isEmpty);
00143             }
00144         }
00145         subscriptionCycle_.removeCycle(cycleSelector);
00146         if (isEmpty)
00147         {
00148             boost::shared_ptr<SubscriptionParameter> sp = (*subParamItr).second;
00149             subscriptions_.erase(subParamItr);
00150         }
00151     }
00152     else
00153     {
00154         //TODO log error "found no such subscriptionParameter"
00155     }
00156     // Print the SubscriptionTree while removing subscriber
00157     if (logger.isLoggable(CMW::Log::Level::LL_DEBUG))
00158     {
00159         std::string s("remove parameter: ");
00160         s.append(paramName);
00161         s.append(" listener: ");
00162         char buf[16];
00163         snprintf(buf, 16, "%p", subscriber);
00164         s.append(buf);
00165         printState(s);
00166     }
00167 }
00168 
00169 void SubscriptionTreeManager::notify(uint32_t IDKey, MultiplexingContext& muxContext,
00170                                      RequestType& reqType)
00171 {
00172     // lock the SubscriberParameter Collection  (unlock is done automatically)
00173     Lock l(treeMutex_);
00174     // try to initialize the tree
00175     if (!initializationSucessfull_)
00176     {
00177         initNotifications();
00178     }
00179     // Locate the Notification Instance
00180     NotificationItr itr = notifications_.find(IDKey);
00181     if(itr != notifications_.end())
00182     {
00183 
00184         (*itr).second->notify(muxContext, reqType);
00185         if (useFastUpdate_) {
00186             const rdaData emptyData;
00187             rdaValueChangeListener::flush(emptyData);
00188         }
00189     }
00190     else
00191     {
00192         std::ostringstream msg;
00193         msg << "notify error, IDKey not found " << boost::lexical_cast<std::string>(IDKey);
00194         LOG_ERROR_IF(logger, msg.str());
00195     }
00196 }
00197 
00198 void SubscriptionTreeManager::notify(ManualNotificationMessage* msg, RequestType& reqType)
00199 {
00200     std::vector<std::string> devices;
00201     std::vector<std::string> properties;
00202 
00203     std::vector<std::string> tmp;
00204 
00205 
00206     // Check if any cycleSelector is matching the current context
00207     std::vector<std::string> validCS;
00208     // fill the validCS by calling matchContext
00209     subscriptionCycle_.matchContext(*(msg->muxContext_), validCS);
00210 
00211 
00212     if (validCS.size() != 0) // At least one registered cycleSelector is matching the current context
00213     {
00214         try {
00215             // critical section
00216             Lock l(treeMutex_);
00217             for (std::map<std::string,std::vector<std::string> >::iterator pADItr = msg->propertiesAndDevicesCol_.begin(); pADItr != msg->propertiesAndDevicesCol_.end(); ++pADItr)
00218             {
00219                 for (std::vector<std::string>::iterator devIter = pADItr->second.begin(); devIter != pADItr->second.end(); ++devIter)
00220                 {
00221                     std::string paramName = *devIter + std::string("#") + pADItr->first;
00222                     SubscriptionItr subParamItr = subscriptions_.find(paramName);
00223                     if (subParamItr != subscriptions_.end())
00224                     {
00225                         (*subParamItr).second->notify(*(msg->muxContext_), reqType, validCS);
00226                     }
00227                 }
00228             }
00229         }
00230         catch(FesaException& ex)
00231         {
00232 
00233         }
00234         if (useFastUpdate_) {
00235             const rdaData emptyData;
00236             rdaValueChangeListener::flush(emptyData);
00237         }
00238     }
00239 }
00240 
00241 void SubscriptionTreeManager::reportInternalError(const std::string& errorCategory, int32_t errorCode, const std::string&  message)
00242 {
00243     // critical section
00244     Lock l(treeMutex_);
00245     bool changed = (errorCode == lastErrorCode_) ? false : true;
00246     SubscriptionItr subItr = subscriptions_.begin();
00247     SubscriptionItr subItrEnd = subscriptions_.end();
00248     for(; subItr != subItrEnd; ++subItr)
00249     {
00250         (*subItr).second->reportInternalError(errorCategory, errorCode, message, changed);
00251     }
00252     lastErrorCode_ = errorCode;
00253 }
00254 
00255 boost::shared_ptr<SubscriptionParameter> SubscriptionTreeManager::addSubscriptionParam(const std::string& paramName, AbstractDevice& device,
00256                                                                                        Property& property, const std::string& cycleSelector,
00257                                                                                        const rdaData& filter, rdaValueChangeListener* subscriber)
00258 {
00259     boost::shared_ptr<SubscriptionParameter> sp((SubscriptionParameter*)NULL);
00260     SubscriptionItr itr = subscriptions_.find(paramName);
00261     if(itr != subscriptions_.end())
00262     {
00263         (*itr).second->addSubscriber(cycleSelector, filter, subscriber);
00264         // Returns NULL because it's not a new SubscriptionParameter
00265     }
00266     else
00267     {
00268         NotificationThread* pNotificationThread = &(notificationThreadFactory_.getThread(property.getClassName(), property.getNotificationThreadKey()));
00269 
00270         // Create a new SubscriptionParameter: is not necessary to lock the SubscriberParameter collection for the time being
00271         sp.reset(new SubscriptionParameter(paramName, device, property, useFastUpdate_, pNotificationThread));
00272         // Add subscriber
00273         sp->addSubscriber(cycleSelector, filter, subscriber);
00274         // Register : lock the SubscriptionParameter collection (unlock is done automatically)
00275         subscriptions_[paramName] = sp;
00276     }
00277     //Never executed
00278     return sp;
00279 }
00280 
00281 void SubscriptionTreeManager::initNotifications()
00282 {
00283     NotificationIDManager* idGenerator = NotificationIDManager::getInstance();
00284     const std::map<uint32_t, NotificationID*>& idsCol = idGenerator->getNotificationIDCol();
00285     // TODO : check addGlobalDevice
00286     //        addGlobalDevicesToNotificationIDCollection(idsCol);
00287 
00288     std::map<uint32_t, NotificationID*>::const_iterator notIter;
00289     for (notIter = idsCol.begin(); notIter != idsCol.end(); notIter++)
00290     {
00291         //generate a Notification for each ID
00292         boost::shared_ptr<Notification> notif(new Notification(*((*notIter).second)));
00293         notifications_.insert(std::pair<uint32_t, boost::shared_ptr<Notification> >((*notIter).second->getIDKey(), notif));
00294     }
00295     initializationSucessfull_ = true;
00296 
00297     // TODO: for the time being this feature is disabled because we need to register the SubcriptionParameter and the cycleSelector
00298     // To be checked later while validating the initialization phase.
00299     //put all the waiting subscribers in the collection
00300     //        std::vector<SubscriptionParameter*>::iterator iter;
00301     //        std::map<uint32_t, Notification*>::iterator iter2;
00302     //        for (iter = waitingSubscriberCol_.begin(); iter != waitingSubscriberCol_.end(); iter++)
00303     //        {
00304     //            //add the new subscriber into the Notifications
00305     //            const std::string& propName = (*iter)->getProperty().getName();
00306     //            const std::string& deviceName = (*iter)->getDevice().getName();
00307     //
00308     //            for (iter2 = notifications_.begin(); iter2 != notifications_.end(); iter2++)
00309     //            {
00310     //                if ((*iter2).second->contains(propName, deviceName))
00311     //                    (*iter2).second->addSubscriber(*(*iter));
00312     //            }
00313     //        }
00314     //
00315     //        //empty the list
00316     //        waitingSubscriberCol_.clear();
00317 }
00318 
00319 void SubscriptionTreeManager::printState(std::string& header)
00320 {
00321     std::stringstream str;
00322     str << header << "\nSubscriptionTreeManager Configuration:\n\tNotif Configuration:\n";
00323     NotificationItr itr = notifications_.begin();
00324     NotificationItr itrEnd = notifications_.end();
00325     for(; itr != itrEnd; ++itr)
00326     {
00327         (*itr).second->printDebugState(str);
00328     }
00329 
00330     str << "\tCycleSubscription Configuration:\n";
00331     subscriptionCycle_.printDebugState(str,2);
00332 
00333     str << "\tSubscriptionParameter Configuration:\n";
00334     SubscriptionItr subItr = subscriptions_.begin();
00335     SubscriptionItr subItrEnd = subscriptions_.end();
00336     for(; subItr != subItrEnd; ++subItr)
00337     {
00338         (*subItr).second->printDebugState(str);
00339     }
00340     LOG_DEBUG_IF(logger, str.str());
00341 }
00342 } // fesa

Generated on 18 Jan 2013 for Fesa by  doxygen 1.6.1