SubscriptionParameter.cpp

Go to the documentation of this file.
00001 // Copyright CERN 2012 - Developed in collaboration with GSI
00002 
00003 #include <fesa-core/RDADeviceServer/SubscriptionParameter.h>
00004 
00005 #include <fesa-core/RDADeviceServer/SubscriptionFilter.h>
00006 #include <fesa-core/Server/Property.h>
00007 #include <fesa-core/DataStore/AbstractDevice.h>
00008 #include <fesa-core/Diagnostic/Diagnostics.h>
00009 
00010 #include <cmw-log/Logger.h>
00011 
00012 
00013 namespace
00014 {
00015 
00016 const std::string diagTopic = fesa::DiagnosticUtils::rdaTrackingStr;
00017 
00018 } // namespace
00019 
00020 
00021 namespace fesa
00022 {
00023 
00024 SubscriptionParameter::SubscriptionParameter(const std::string& paramName, AbstractDevice& device,
00025                                              Property& property, bool useFastUpdate,
00026                                              NotificationThread* pNotificationThread) :
00027     name_(paramName),
00028     device_(device),
00029     property_(property),
00030     useFastUpdate_(useFastUpdate),
00031     pNotificationThread_(pNotificationThread)
00032 {
00033     const boost::shared_ptr<Diagnostics>& diagnostics = AbstractEquipment::getInstance()->getDiagnostics();
00034     DiagnosticUtils::DiagnosticMessage diagMsg;
00035     diagMsg.side = DiagnosticUtils::framework;
00036     diagMsg.source = DiagnosticUtils::server;
00037     std::ostringstream traceStrStream;
00038     traceStrStream << "Creating subscription parameter with name '" << name_ << "'";
00039     diagMsg.msg = traceStrStream.str();
00040     diagnostics->log(diagTopic, diagMsg);
00041 }
00042 
00043 SubscriptionParameter::~SubscriptionParameter()
00044 {
00045     const boost::shared_ptr<Diagnostics>& diagnostics = AbstractEquipment::getInstance()->getDiagnostics();
00046     DiagnosticUtils::DiagnosticMessage diagMsg;
00047     diagMsg.side = DiagnosticUtils::framework;
00048     diagMsg.source = DiagnosticUtils::server;
00049     std::ostringstream traceStrStream;
00050     traceStrStream << "Deleting subscription parameter with name '" << name_ << "'";
00051     diagMsg.msg = traceStrStream.str();
00052     diagnostics->log(diagTopic, diagMsg);
00053     SubscriptionFilterItr itr = filters_.begin();
00054     SubscriptionFilterItr itrEnd = filters_.end();
00055     filters_.erase(itr, itrEnd);
00056 }
00057 
00058 void SubscriptionParameter::addSubscriber(const std::string& cycleSelector, const rdaData& newFilter, rdaValueChangeListener* subscriber)
00059 {
00060     // Adding a new subscriber, which correspond to a monitorOn request, consist of two phases:
00061     // 1) Register the new client
00062     // 2) Provide the client with a firstUpdate.
00063 
00064     //1) Register the new Subscriber
00065     //find matching filter
00066     SubscriptionFilterItr itr;
00067     for (itr = filters_.begin(); itr != filters_.end(); itr++) // iterate through all the registered SubscriptionFilter
00068     {
00069         if ((*itr)->matchFilter(newFilter)) // request the filter to proceed to a comparison with the newFilter
00070         {
00071             //register
00072             (*itr)->addSubscriber(cycleSelector, subscriber);
00073             break;
00074         }
00075     }
00076     if (itr == filters_.end())
00077     {
00078         // No registered SubscriptionFilter is matching the new one.
00079         // Create the new filter
00080         boost::shared_ptr<AbstractSubscriptionFilter> sf;
00081         sf.reset((AbstractSubscriptionFilter*)new SubscriptionFilter(device_, property_, newFilter, useFastUpdate_, pNotificationThread_));
00082 
00083         // add subscriber
00084         sf->addSubscriber(cycleSelector, subscriber);
00085         // insert the new filter in the collection
00086         filters_.push_back(sf);
00087         //Make itr pointing to the last element freshly added
00088         itr = filters_.end();
00089         --itr;
00090     }
00091 
00092     //2) Provide the firstUpdate: if the property is a Setting and no cycleSelector is specified and device/property is multiplexed
00093     // firstUpdate send back one by one the current value of all cycles.
00094     // itr points to the right subscription Filter
00095     if (cycleSelector.empty() && property_.getType()==SETTING && device_.isMultiplexed() && property_.isMultiplexed())
00096     {//we have to return a value for EACH cycle !
00097         (*itr)->allCyclesFirstUpdate(newFilter, subscriber);
00098     }
00099     else //standard first update
00100     {
00101         (*itr)->cycleFirstUpdate(cycleSelector, newFilter, subscriber);
00102     }
00103 }
00104 
00105 void SubscriptionParameter::removeSubscriber(const std::string& cycleSelector, rdaValueChangeListener* subscriber, bool& isEmpty)
00106 {
00107     SubscriptionFilterItr itr;
00108     for (itr = filters_.begin(); itr != filters_.end(); ++itr) // iterate through all the registered SubscriptionFilter
00109     {
00110         bool isFilterEmpty = false; // tells if the SubscriptionFilter is empty
00111         bool toBeDeleted = true;    // tells if the SubscriptionFilter can be deleted in case it is in use by the NotificationThread
00112         if ((*itr)->removeSubscriber(cycleSelector, subscriber, isFilterEmpty, toBeDeleted))
00113         {
00114             if (isFilterEmpty)
00115             {
00116                 filters_.erase(itr);
00117             }
00118             break;
00119         }
00120     }
00121     // set isEmpty
00122     isEmpty = filters_.empty();
00123 }
00124 
00125 void SubscriptionParameter::notify(MultiplexingContext& muxContext, RequestType& reqType,
00126                                    std::vector<std::string>& validCS)
00127 {
00128     SubscriptionFilterItr itr = filters_.begin();
00129     SubscriptionFilterItr itrEnd = filters_.end();
00130     for (; itr != itrEnd; ++itr)
00131     {
00132         (*itr)->notify(muxContext, reqType, validCS);
00133     }
00134 }
00135 
00136 void SubscriptionParameter::reportInternalError(const std::string& errorCategory, int32_t errorCode, const std::string&  message, bool changed)
00137 {
00138     SubscriptionFilterItr itr;
00139     for (itr = filters_.begin(); itr != filters_.end(); ++itr) // iterate through all the registered SubscriptionFilter
00140     {
00141         (*itr)->reportInternalError(errorCategory, errorCode, message, changed);
00142     }
00143 }
00144 
00145 void SubscriptionParameter::printDebugState(std::stringstream& str)
00146 {
00147     str << "\t\tParam: " << name_ << "\n";
00148     SubscriptionFilterItr itr = filters_.begin();
00149     SubscriptionFilterItr itrEnd = filters_.end();
00150     for (; itr != itrEnd; ++itr)
00151     {
00152         (*itr)->printDebugState(str);
00153     }
00154 }
00155 
00156 } // fesa

Generated on 18 Jan 2013 for Fesa by  doxygen 1.6.1