SubscriptionFilter.cpp

Go to the documentation of this file.
00001 // Copyright CERN 2012 - Developed in collaboration with GSI
00002 
00003 #include <fesa-core/RDADeviceServer/SubscriptionFilter.h>
00004 
00005 #include <fesa-core/RDADeviceServer/NotificationThread.h>
00006 // #include <fesa-core/RDADeviceServer/FesaDeviceServer.h> // TODO this dependency is just to retrieve the update policy .. to be reviewed
00007 #include <fesa-core/Server/Property.h>
00008 #include <fesa-core/DataStore/AbstractDevice.h>
00009 #include <fesa-core/Synchronization/SynchronizationLabObjectFactory.h>
00010 #include <fesa-core/Synchronization/CycleDescriptor.h>
00011 #include <fesa-core/Utilities/Lock.h>
00012 #include <fesa-core/Utilities/Mutex.h>
00013 
00014 #include <cmw-rda/ValueChangeListener.h>
00015 #include <cmw-rda/Data.h>
00016 #include <cmw-log/Logger.h>
00017 
00018 #include <boost/lexical_cast.hpp>
00019 
00020 #include <iostream>
00021 
00022 namespace
00023 {
00024 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.RDADeviceServer.SubscriptionFilter");
00025 }
00026 
00027 
00028 namespace fesa
00029 {
00030 
00031 AbstractSubscriptionFilter::AbstractSubscriptionFilter(AbstractDevice& device, Property& property, const rdaData& newFilter, bool useFastUpdate) :
00032     device_(device),
00033     property_(property),
00034     filter_(newFilter),
00035     useFastUpdate_(useFastUpdate)
00036 {
00037 }
00038 
00039 AbstractSubscriptionFilter::~AbstractSubscriptionFilter()
00040 {
00041 }
00042 
00043 void AbstractSubscriptionFilter::addSubscriber(const std::string& cycleSelector, rdaValueChangeListener* subscriber)
00044 {
00045     // Create a new Listener
00046     boost::shared_ptr<FesaListener> l(new FesaListener(subscriber));
00047 
00048     LstnrMapItr itr = listeners_.find(cycleSelector);
00049     if (itr != listeners_.end())
00050     {
00051         // we found a listenerVector for this cycleSelector. Add the new listener
00052         (*itr).second->push_back(l);
00053     }
00054     else
00055     {
00056         boost::shared_ptr<ListenerVector> lv(new ListenerVector());
00057         lv->push_back(l);
00058         listeners_[cycleSelector] = lv;
00059     }
00060 }
00061 
00062 bool AbstractSubscriptionFilter::removeSubscriber(const std::string& cycleSelector, rdaValueChangeListener* subscriber, bool& isEmpty, bool& toBeDeleted)
00063 {
00064     LstnrMapItr itr = listeners_.find(cycleSelector);
00065     if (itr != listeners_.end())
00066     {
00067         LstnrItr itrLstnr = (*itr).second->begin();
00068         LstnrItr itrEndLstnr = (*itr).second->end();
00069         for (; itrLstnr != itrEndLstnr; ++itrLstnr)
00070         {
00071             if (((*itrLstnr)->subscriber_) == subscriber) // check if the two references referred to the same rdaValueChangeListener instance
00072             {
00073                 (*itr).second->erase(itrLstnr);
00074                 if ((*itr).second->size() == 0)
00075                 {
00076                     listeners_.erase(itr);
00077                 }
00078                 // leave the loop
00079                 isEmpty = listeners_.empty();
00080                 // only useful for background subsriptionFilter
00081                 toBeDeleted = isEmpty;
00082                 return true;
00083             }
00084         }
00085     }
00086     isEmpty = listeners_.empty();
00087     toBeDeleted = isEmpty;
00088     // subscriber was not found.
00089     return false;
00090 }
00091 
00092 bool AbstractSubscriptionFilter::matchFilter(const rdaData& newfilter)
00093 {
00094     if (newfilter.size() == filter_.size()) //could match ..lets try
00095     {
00096         rdaDataIterator newFilterIter(newfilter);
00097         rdaDataIterator oldFilterIter(filter_);
00098         rdaDataEntry* newFilterEntry = newFilterIter.next();
00099         rdaDataEntry* oldFilterEntry = oldFilterIter.next();
00100         if (newFilterEntry == NULL && oldFilterEntry == NULL)
00101             return true; //we found an empty filter matching the new empty filter
00102         while (*newFilterEntry == *oldFilterEntry)//as long, as filter's entry do not differ
00103         {
00104             //compare next elements of the two filters
00105             newFilterEntry = newFilterIter.next();
00106             oldFilterEntry = oldFilterIter.next();
00107             if (newFilterEntry == NULL && oldFilterEntry == NULL)
00108                 return true;//we reached the end of the filter, without getting kicked out --> we have a match
00109         }//end while -- comparison of two filters
00110     }//end if filter size is equal
00111     return false;//we reach the end of the loop?  --> nothing found
00112 }
00113 
00114 void AbstractSubscriptionFilter::cycleFirstUpdate(const std::string& cycleSelector, const rdaData& newFilter, rdaValueChangeListener* l)
00115 {
00116     const rdaData emptydata;
00117     rdaData value;
00118     try
00119     {
00120         property_.get(device_, cycleSelector, newFilter, value, SUBSCRIBE);
00121     }
00122     catch (FesaException& ex)
00123     {
00124         try
00125         {
00126             l->ioFailed(rdaIOError("FESA", ex.getErrorCodeAsLong(), ex.getMessage().c_str()));
00127         }
00128         catch (rdaInternalError& ex)
00129         {
00130         }
00131         return;
00132     }
00133     try
00134     {
00135         //thats the "direct","slow" notification
00136         l->valueUpdated(emptydata, value, true);
00137     }
00138     catch (rdaInternalError& ex)
00139     {
00140     };
00141 
00142 }
00143 
00144 void AbstractSubscriptionFilter::allCyclesFirstUpdate(const rdaData& filter, rdaValueChangeListener* l)
00145 {
00146     // get the correspondent cycle descriptor
00147     SynchronizationLabObjectFactory* syncFactory = SynchronizationLabObjectFactory::getInstance();
00148     CycleDescriptor* timingDescriptor = syncFactory->getCycleDescriptor(device_.timingDomain.get(),device_.mainMuxCriterion.get());
00149 
00150     const std::vector<int32_t>& idCol = timingDescriptor->getCycleIdsCol();
00151 
00152     //for all cycles in this Descriptor
00153     for (std::vector<int32_t>::const_iterator itr = idCol.begin(); itr != idCol.end(); itr++)
00154     {
00155 
00156         // the cycleName is returned by copy, therefore it's safe even if in the meanwhile
00157         // the timingDescriptor selectorNames has been refreshed
00158         std::string cycleName = timingDescriptor->getCycleSelectorName((*itr));
00159         cycleFirstUpdate(cycleName, filter, l);
00160     }
00161 }
00162 
00163 
00164 void AbstractSubscriptionFilter::reportInternalError(const std::string& category, int32_t errorCode, const std::string& message, bool changed)
00165 {
00166     char* c = const_cast<char*> (category.c_str());
00167     char* m = const_cast<char*> (message.c_str());
00168     rdaData emptydata;
00169 
00170     LstnrMapItr itr = listeners_.begin();
00171     LstnrMapItr itrEnd = listeners_.end();
00172     for(; itr!=itrEnd; ++itr)
00173     {
00174         LstnrItr itrLstnr = (*itr).second->begin();
00175         LstnrItr itrEndLstnr = (*itr).second->end();
00176         for (; itrLstnr != itrEndLstnr; ++itrLstnr)
00177         {
00178             try {
00179                 bool newError = changed;
00180                 if (!(*itrLstnr)->reportErrorOnce_)
00181                 {
00182                     newError = true;
00183                     (*itrLstnr)->reportErrorOnce_ = true;
00184                 }
00185                 (*itrLstnr)->subscriber_->ioFailed(c,errorCode,m,newError);
00186             }
00187             catch (rdaInternalError& ex)
00188             {
00189             }
00190         }
00191     }
00192     rdaValueChangeListener::flush(emptydata);
00193 }
00194 
00195 void AbstractSubscriptionFilter::printDebugState(std::stringstream& str)
00196 {
00197     std::stringstream temp;
00198     temp << filter_;
00199     std::string s(temp.str());
00200     std::string::size_type pos;
00201     while ((pos=s.find('\n')) != std::string::npos)
00202     {
00203         s.replace(pos,1," ");
00204     }
00205     str << "\t\t\tFilter: " << s << "\n";
00206     LstnrMapItr itr = listeners_.begin();
00207     LstnrMapItr itrEnd = listeners_.end();
00208     for(; itr!=itrEnd; ++itr)
00209     {
00210         str << "\t\t\t\tCycleSelector: " << (*itr).first << "\n";
00211         LstnrItr itrLstnr = (*itr).second->begin();
00212         LstnrItr itrEndLstnr = (*itr).second->end();
00213         str << "\t\t\t\tListeners:\n";
00214         for (; itrLstnr != itrEndLstnr; ++itrLstnr)
00215         {
00216             str << "\t\t\t\t\t" << (*itrLstnr)->subscriber_ << "\n";
00217         }
00218     }
00219 }
00220 
00221 
00222 
00223 // Specialized class: SubscriptionFilter provide basically a specific implementation for the notify
00225 SubscriptionFilter::SubscriptionFilter(AbstractDevice& device, Property& property, const rdaData& newFilter, bool useFastUpdate, NotificationThread* pNotificationThread):
00226     AbstractSubscriptionFilter(device, property, newFilter, useFastUpdate), pNotificationThread_(pNotificationThread), cancelled_(false)
00227 {
00228 }
00229 
00230 SubscriptionFilter::~SubscriptionFilter()
00231 {
00232 
00233 }
00234 
00235 void SubscriptionFilter::addSubscriber(const std::string& cycleSelector, rdaValueChangeListener* subscriber)
00236 {
00237     // Create a new Listener
00238     boost::shared_ptr<FesaListener> l(new FesaListener(subscriber));
00239 
00240     // CriticalSection requiring to lock the Listeners using the mutex kept by the backgroundThread running notify()
00241     Lock lockListener(pNotificationThread_->getMutex());
00242     LstnrMapItr itr = listeners_.find(cycleSelector);
00243     if (itr != listeners_.end())
00244     {
00245         // we found a listenerVector for this cycleSelector. Add the new listener
00246         (*itr).second->push_back(l);
00247     }
00248     else
00249     {
00250         boost::shared_ptr<ListenerVector> lv(new ListenerVector());
00251         lv->push_back(l);
00252         listeners_[cycleSelector] = lv;
00253     }
00254 }
00255 
00256 
00257 
00258 bool SubscriptionFilter::removeSubscriber(const std::string& cycleSelector, rdaValueChangeListener* subscriber, bool& isEmpty, bool& toBeDeleted)
00259 {
00260     LOG_DEBUG_IF(logger, "removeSubscriber");
00261     // CriticalSection requiring to lock the Listeners using the mutex kept by the backgroundThread running notify()
00262     Lock lockListener(pNotificationThread_->getMutex());
00263     LstnrMapItr itr = listeners_.find(cycleSelector);
00264     if (itr != listeners_.end())
00265     {
00266         LstnrItr itrLstnr = (*itr).second->begin();
00267         LstnrItr itrEndLstnr = (*itr).second->end();
00268         for (; itrLstnr != itrEndLstnr; ++itrLstnr)
00269         {
00270             if ((*itrLstnr)->subscriber_ == subscriber) // check if the two references referred to the same rdaValueChangeListener instance
00271             {
00272                 (*itr).second->erase(itrLstnr);
00273                 if ((*itr).second->size() == 0)
00274                 {
00275                     listeners_.erase(itr);
00276                 }
00277                 // leave the loop
00278                 isEmpty = listeners_.empty();
00279                 toBeDeleted = isEmpty;
00280                 if (isEmpty && pendingNotification_)
00281                 {
00282                     // The subscriptionFilter is empty therefore it will be deleted, but because
00283                     // there is pending notification, we have to remove it from teh background thread queue
00284                     bool isInUse = pNotificationThread_->removeNotificationEvent(this);
00285                     pendingNotification_--;
00286                     if (isInUse)
00287                     {
00288                         toBeDeleted = false;
00289                         cancelled_ = true;
00290                     }
00291                 }
00292                 return true;
00293             }
00294         }
00295     }
00296     isEmpty = listeners_.empty();
00297     // subscriber was not found.
00298     return false;
00299 }
00300 
00301 // The main goal is to minimize the critical code to avoid as more as possible priority inversion, namely,
00302 // a background:notify should not prevent
00303 void SubscriptionFilter::notify(MultiplexingContext& muxContext,
00304                                 RequestType& reqType, std::vector<std::string>& validCS)
00305 {
00306     //Iterate trough the validCS
00307     std::vector<std::string>::iterator itrValidCS = validCS.begin();
00308     std::vector<std::string>::iterator itrEndValidCS = validCS.end();
00309 
00310     // CriticalSection requiring to lock the Listeners
00311     Lock lockListener(pNotificationThread_->getMutex());
00312     for (; itrValidCS != itrEndValidCS; ++itrValidCS)
00313     {
00314         // find an entry using the key
00315         LstnrMapItr itrLstnrMap = listeners_.find((*itrValidCS));
00316         if (itrLstnrMap != listeners_.end())
00317         {
00318             pNotificationThread_->addNotificationEvent(muxContext, reqType, validCS, this);
00319             ++pendingNotification_;
00320         }
00321     }
00322 }
00323 
00324 // The main goal is to minimize the critical code to avoid as more as possible priority inversion, namely,
00325 // a background:notify should not prevent
00326 void SubscriptionFilter::doUpdate(MultiplexingContext& muxContext, RequestType& reqType, std::vector<std::string>& validCS)
00327 {
00328     //Iterate trough the validCS
00329     std::vector<std::string>::iterator itrValidCS = validCS.begin();
00330     std::vector<std::string>::iterator itrEndValidCS = validCS.end();
00331     const rdaData emptydata;
00332     rdaData pushdata;
00333     const rdaData* dp = &pushdata;
00334     bool gotData = false;
00335     bool gotError = false;
00336     std::string currentError("");
00337     int32_t currentErrorCode = 0;
00338     bool needData = false;
00339     int64_t dataStamp;
00340 
00341     Mutex& notificationThreadMutex = pNotificationThread_->getMutex();
00342     // CriticalSection requiring to lock the Listeners
00343     {
00344         Lock lockListener(notificationThreadMutex);
00345         for (; itrValidCS != itrEndValidCS; ++itrValidCS)
00346         {
00347             // find an entry using the key
00348             LstnrMapItr itrLstnrMap = listeners_.find((*itrValidCS));
00349             if (itrLstnrMap != listeners_.end())
00350             {
00351                 needData = true;
00352                 break;
00353             }
00354         }
00355         pendingNotification_--;
00356     }
00357     // end of the critical section: concurrent thread may run and change SubscriptionTree
00358 
00359     // Background property is accessed out of the critical section
00360     // to minimize the notification inversion priority
00361     if(needData)
00362     {
00363         try
00364         {
00365             dataStamp = property_.getDataTimestamp(device_, muxContext, filter_);
00366             property_.get(device_, muxContext, filter_, pushdata, reqType);
00367             gotData = true;
00368         }
00369         catch (FesaException& ex)
00370         {
00371             // cache the errorCode and errorMessage for the following listeners
00372             currentErrorCode = static_cast<int32_t>(ex.getErrorCodeAsLong());
00373             currentError = ex.getMessage();
00374             gotError = true;
00375         }
00376     }
00377 
00378     // CriticalSection requiring to lock the Listeners
00379     {
00380         Lock lockListener(notificationThreadMutex);
00381         if (cancelled_)
00382         {
00383             // This SubscriptionFilter has been mark to be deleted in meanwhile, just return;
00384             delete this;
00385             return;
00386         }
00387         for (; itrValidCS != itrEndValidCS; ++itrValidCS)
00388         {
00389             // find an entry using the key
00390             LstnrMapItr itrLstnrMap = listeners_.find((*itrValidCS));
00391             if (itrLstnrMap != listeners_.end())
00392             {
00393                 LstnrItr itrLstnr = (*itrLstnrMap).second->begin();
00394                 LstnrItr itrEndLstnr = (*itrLstnrMap).second->end();
00395                 for (; itrLstnr != itrEndLstnr; ++itrLstnr)
00396                 {
00397                     if (gotData)
00398                     {
00399                         // FIXME there is probably a bug with the subscriber object when one unsubscribe see LOG_ERROR_IF(logger, boost::lexical_cast<std::string>((*itrLstnr)->subscriber_->isOnChange()));
00400                         bool isOnChange = (*itrLstnr)->subscriber_->isOnChange();
00401                         if (gotData && (!isOnChange || (isOnChange && dataStamp) > (*itrLstnr)->updateStamp_))
00402                         {
00403                             try
00404                             {
00405                                 if (useFastUpdate_)
00406                                     (*itrLstnr)->subscriber_->valueUpdated(*dp, true);
00407                                 else
00408                                     (*itrLstnr)->subscriber_->valueUpdated(emptydata, *dp, true);
00409                                 (*itrLstnr)->updateStamp_ = dataStamp;
00410                             }
00411                             catch (rdaInternalError& ie)
00412                             {
00413                             };
00414                         }
00415                     }
00416                     if (gotError)
00417                     {
00418                         try
00419                         {
00420                             (*itrLstnr)->subscriber_->ioFailed(rdaIOError("FESA", currentErrorCode,
00421                                                                           currentError.c_str()));
00422                             continue;
00423                         }
00424                         catch (rdaInternalError& ie)
00425                         {
00426                             // TODO : continue or break check if it's make sense to try to set all the subscriber
00427                             continue;
00428                         }
00429                     }
00430                 }
00431             }
00432         }
00433     }
00434     // end of the critical section: concurrent thread may run and change SubscriptionTree
00435 }
00436 
00437 } // fesa

Generated on 18 Jan 2013 for Fesa by  doxygen 1.6.1