00001
00002
00003 #include <fesa-core/RDADeviceServer/SubscriptionFilter.h>
00004
00005 #include <fesa-core/RDADeviceServer/NotificationThread.h>
00006
00007 #include <fesa-core/Server/Property.h>
00008 #include <fesa-core/DataStore/AbstractDevice.h>
00009 #include <fesa-core/Synchronization/SynchronizationLabObjectFactory.h>
00010 #include <fesa-core/Synchronization/CycleDescriptor.h>
00011 #include <fesa-core/Utilities/Lock.h>
00012 #include <fesa-core/Utilities/Mutex.h>
00013
00014 #include <cmw-rda/ValueChangeListener.h>
00015 #include <cmw-rda/Data.h>
00016 #include <cmw-log/Logger.h>
00017
00018 #include <boost/lexical_cast.hpp>
00019
00020 #include <iostream>
00021
00022 namespace
00023 {
00024 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.RDADeviceServer.SubscriptionFilter");
00025 }
00026
00027
00028 namespace fesa
00029 {
00030
00031 AbstractSubscriptionFilter::AbstractSubscriptionFilter(AbstractDevice& device, Property& property, const rdaData& newFilter, bool useFastUpdate) :
00032 device_(device),
00033 property_(property),
00034 filter_(newFilter),
00035 useFastUpdate_(useFastUpdate)
00036 {
00037 }
00038
00039 AbstractSubscriptionFilter::~AbstractSubscriptionFilter()
00040 {
00041 }
00042
00043 void AbstractSubscriptionFilter::addSubscriber(const std::string& cycleSelector, rdaValueChangeListener* subscriber)
00044 {
00045
00046 boost::shared_ptr<FesaListener> l(new FesaListener(subscriber));
00047
00048 LstnrMapItr itr = listeners_.find(cycleSelector);
00049 if (itr != listeners_.end())
00050 {
00051
00052 (*itr).second->push_back(l);
00053 }
00054 else
00055 {
00056 boost::shared_ptr<ListenerVector> lv(new ListenerVector());
00057 lv->push_back(l);
00058 listeners_[cycleSelector] = lv;
00059 }
00060 }
00061
00062 bool AbstractSubscriptionFilter::removeSubscriber(const std::string& cycleSelector, rdaValueChangeListener* subscriber, bool& isEmpty, bool& toBeDeleted)
00063 {
00064 LstnrMapItr itr = listeners_.find(cycleSelector);
00065 if (itr != listeners_.end())
00066 {
00067 LstnrItr itrLstnr = (*itr).second->begin();
00068 LstnrItr itrEndLstnr = (*itr).second->end();
00069 for (; itrLstnr != itrEndLstnr; ++itrLstnr)
00070 {
00071 if (((*itrLstnr)->subscriber_) == subscriber)
00072 {
00073 (*itr).second->erase(itrLstnr);
00074 if ((*itr).second->size() == 0)
00075 {
00076 listeners_.erase(itr);
00077 }
00078
00079 isEmpty = listeners_.empty();
00080
00081 toBeDeleted = isEmpty;
00082 return true;
00083 }
00084 }
00085 }
00086 isEmpty = listeners_.empty();
00087 toBeDeleted = isEmpty;
00088
00089 return false;
00090 }
00091
00092 bool AbstractSubscriptionFilter::matchFilter(const rdaData& newfilter)
00093 {
00094 if (newfilter.size() == filter_.size())
00095 {
00096 rdaDataIterator newFilterIter(newfilter);
00097 rdaDataIterator oldFilterIter(filter_);
00098 rdaDataEntry* newFilterEntry = newFilterIter.next();
00099 rdaDataEntry* oldFilterEntry = oldFilterIter.next();
00100 if (newFilterEntry == NULL && oldFilterEntry == NULL)
00101 return true;
00102 while (*newFilterEntry == *oldFilterEntry)
00103 {
00104
00105 newFilterEntry = newFilterIter.next();
00106 oldFilterEntry = oldFilterIter.next();
00107 if (newFilterEntry == NULL && oldFilterEntry == NULL)
00108 return true;
00109 }
00110 }
00111 return false;
00112 }
00113
00114 void AbstractSubscriptionFilter::cycleFirstUpdate(const std::string& cycleSelector, const rdaData& newFilter, rdaValueChangeListener* l)
00115 {
00116 const rdaData emptydata;
00117 rdaData value;
00118 try
00119 {
00120 property_.get(device_, cycleSelector, newFilter, value, SUBSCRIBE);
00121 }
00122 catch (FesaException& ex)
00123 {
00124 try
00125 {
00126 l->ioFailed(rdaIOError("FESA", ex.getErrorCodeAsLong(), ex.getMessage().c_str()));
00127 }
00128 catch (rdaInternalError& ex)
00129 {
00130 }
00131 return;
00132 }
00133 try
00134 {
00135
00136 l->valueUpdated(emptydata, value, true);
00137 }
00138 catch (rdaInternalError& ex)
00139 {
00140 };
00141
00142 }
00143
00144 void AbstractSubscriptionFilter::allCyclesFirstUpdate(const rdaData& filter, rdaValueChangeListener* l)
00145 {
00146
00147 SynchronizationLabObjectFactory* syncFactory = SynchronizationLabObjectFactory::getInstance();
00148 CycleDescriptor* timingDescriptor = syncFactory->getCycleDescriptor(device_.timingDomain.get(),device_.mainMuxCriterion.get());
00149
00150 const std::vector<int32_t>& idCol = timingDescriptor->getCycleIdsCol();
00151
00152
00153 for (std::vector<int32_t>::const_iterator itr = idCol.begin(); itr != idCol.end(); itr++)
00154 {
00155
00156
00157
00158 std::string cycleName = timingDescriptor->getCycleSelectorName((*itr));
00159 cycleFirstUpdate(cycleName, filter, l);
00160 }
00161 }
00162
00163
00164 void AbstractSubscriptionFilter::reportInternalError(const std::string& category, int32_t errorCode, const std::string& message, bool changed)
00165 {
00166 char* c = const_cast<char*> (category.c_str());
00167 char* m = const_cast<char*> (message.c_str());
00168 rdaData emptydata;
00169
00170 LstnrMapItr itr = listeners_.begin();
00171 LstnrMapItr itrEnd = listeners_.end();
00172 for(; itr!=itrEnd; ++itr)
00173 {
00174 LstnrItr itrLstnr = (*itr).second->begin();
00175 LstnrItr itrEndLstnr = (*itr).second->end();
00176 for (; itrLstnr != itrEndLstnr; ++itrLstnr)
00177 {
00178 try {
00179 bool newError = changed;
00180 if (!(*itrLstnr)->reportErrorOnce_)
00181 {
00182 newError = true;
00183 (*itrLstnr)->reportErrorOnce_ = true;
00184 }
00185 (*itrLstnr)->subscriber_->ioFailed(c,errorCode,m,newError);
00186 }
00187 catch (rdaInternalError& ex)
00188 {
00189 }
00190 }
00191 }
00192 rdaValueChangeListener::flush(emptydata);
00193 }
00194
00195 void AbstractSubscriptionFilter::printDebugState(std::stringstream& str)
00196 {
00197 std::stringstream temp;
00198 temp << filter_;
00199 std::string s(temp.str());
00200 std::string::size_type pos;
00201 while ((pos=s.find('\n')) != std::string::npos)
00202 {
00203 s.replace(pos,1," ");
00204 }
00205 str << "\t\t\tFilter: " << s << "\n";
00206 LstnrMapItr itr = listeners_.begin();
00207 LstnrMapItr itrEnd = listeners_.end();
00208 for(; itr!=itrEnd; ++itr)
00209 {
00210 str << "\t\t\t\tCycleSelector: " << (*itr).first << "\n";
00211 LstnrItr itrLstnr = (*itr).second->begin();
00212 LstnrItr itrEndLstnr = (*itr).second->end();
00213 str << "\t\t\t\tListeners:\n";
00214 for (; itrLstnr != itrEndLstnr; ++itrLstnr)
00215 {
00216 str << "\t\t\t\t\t" << (*itrLstnr)->subscriber_ << "\n";
00217 }
00218 }
00219 }
00220
00221
00222
00223
00225 SubscriptionFilter::SubscriptionFilter(AbstractDevice& device, Property& property, const rdaData& newFilter, bool useFastUpdate, NotificationThread* pNotificationThread):
00226 AbstractSubscriptionFilter(device, property, newFilter, useFastUpdate), pNotificationThread_(pNotificationThread), cancelled_(false)
00227 {
00228 }
00229
00230 SubscriptionFilter::~SubscriptionFilter()
00231 {
00232
00233 }
00234
00235 void SubscriptionFilter::addSubscriber(const std::string& cycleSelector, rdaValueChangeListener* subscriber)
00236 {
00237
00238 boost::shared_ptr<FesaListener> l(new FesaListener(subscriber));
00239
00240
00241 Lock lockListener(pNotificationThread_->getMutex());
00242 LstnrMapItr itr = listeners_.find(cycleSelector);
00243 if (itr != listeners_.end())
00244 {
00245
00246 (*itr).second->push_back(l);
00247 }
00248 else
00249 {
00250 boost::shared_ptr<ListenerVector> lv(new ListenerVector());
00251 lv->push_back(l);
00252 listeners_[cycleSelector] = lv;
00253 }
00254 }
00255
00256
00257
00258 bool SubscriptionFilter::removeSubscriber(const std::string& cycleSelector, rdaValueChangeListener* subscriber, bool& isEmpty, bool& toBeDeleted)
00259 {
00260 LOG_DEBUG_IF(logger, "removeSubscriber");
00261
00262 Lock lockListener(pNotificationThread_->getMutex());
00263 LstnrMapItr itr = listeners_.find(cycleSelector);
00264 if (itr != listeners_.end())
00265 {
00266 LstnrItr itrLstnr = (*itr).second->begin();
00267 LstnrItr itrEndLstnr = (*itr).second->end();
00268 for (; itrLstnr != itrEndLstnr; ++itrLstnr)
00269 {
00270 if ((*itrLstnr)->subscriber_ == subscriber)
00271 {
00272 (*itr).second->erase(itrLstnr);
00273 if ((*itr).second->size() == 0)
00274 {
00275 listeners_.erase(itr);
00276 }
00277
00278 isEmpty = listeners_.empty();
00279 toBeDeleted = isEmpty;
00280 if (isEmpty && pendingNotification_)
00281 {
00282
00283
00284 bool isInUse = pNotificationThread_->removeNotificationEvent(this);
00285 pendingNotification_--;
00286 if (isInUse)
00287 {
00288 toBeDeleted = false;
00289 cancelled_ = true;
00290 }
00291 }
00292 return true;
00293 }
00294 }
00295 }
00296 isEmpty = listeners_.empty();
00297
00298 return false;
00299 }
00300
00301
00302
00303 void SubscriptionFilter::notify(MultiplexingContext& muxContext,
00304 RequestType& reqType, std::vector<std::string>& validCS)
00305 {
00306
00307 std::vector<std::string>::iterator itrValidCS = validCS.begin();
00308 std::vector<std::string>::iterator itrEndValidCS = validCS.end();
00309
00310
00311 Lock lockListener(pNotificationThread_->getMutex());
00312 for (; itrValidCS != itrEndValidCS; ++itrValidCS)
00313 {
00314
00315 LstnrMapItr itrLstnrMap = listeners_.find((*itrValidCS));
00316 if (itrLstnrMap != listeners_.end())
00317 {
00318 pNotificationThread_->addNotificationEvent(muxContext, reqType, validCS, this);
00319 ++pendingNotification_;
00320 }
00321 }
00322 }
00323
00324
00325
00326 void SubscriptionFilter::doUpdate(MultiplexingContext& muxContext, RequestType& reqType, std::vector<std::string>& validCS)
00327 {
00328
00329 std::vector<std::string>::iterator itrValidCS = validCS.begin();
00330 std::vector<std::string>::iterator itrEndValidCS = validCS.end();
00331 const rdaData emptydata;
00332 rdaData pushdata;
00333 const rdaData* dp = &pushdata;
00334 bool gotData = false;
00335 bool gotError = false;
00336 std::string currentError("");
00337 int32_t currentErrorCode = 0;
00338 bool needData = false;
00339 int64_t dataStamp;
00340
00341 Mutex& notificationThreadMutex = pNotificationThread_->getMutex();
00342
00343 {
00344 Lock lockListener(notificationThreadMutex);
00345 for (; itrValidCS != itrEndValidCS; ++itrValidCS)
00346 {
00347
00348 LstnrMapItr itrLstnrMap = listeners_.find((*itrValidCS));
00349 if (itrLstnrMap != listeners_.end())
00350 {
00351 needData = true;
00352 break;
00353 }
00354 }
00355 pendingNotification_--;
00356 }
00357
00358
00359
00360
00361 if(needData)
00362 {
00363 try
00364 {
00365 dataStamp = property_.getDataTimestamp(device_, muxContext, filter_);
00366 property_.get(device_, muxContext, filter_, pushdata, reqType);
00367 gotData = true;
00368 }
00369 catch (FesaException& ex)
00370 {
00371
00372 currentErrorCode = static_cast<int32_t>(ex.getErrorCodeAsLong());
00373 currentError = ex.getMessage();
00374 gotError = true;
00375 }
00376 }
00377
00378
00379 {
00380 Lock lockListener(notificationThreadMutex);
00381 if (cancelled_)
00382 {
00383
00384 delete this;
00385 return;
00386 }
00387 for (; itrValidCS != itrEndValidCS; ++itrValidCS)
00388 {
00389
00390 LstnrMapItr itrLstnrMap = listeners_.find((*itrValidCS));
00391 if (itrLstnrMap != listeners_.end())
00392 {
00393 LstnrItr itrLstnr = (*itrLstnrMap).second->begin();
00394 LstnrItr itrEndLstnr = (*itrLstnrMap).second->end();
00395 for (; itrLstnr != itrEndLstnr; ++itrLstnr)
00396 {
00397 if (gotData)
00398 {
00399
00400 bool isOnChange = (*itrLstnr)->subscriber_->isOnChange();
00401 if (gotData && (!isOnChange || (isOnChange && dataStamp) > (*itrLstnr)->updateStamp_))
00402 {
00403 try
00404 {
00405 if (useFastUpdate_)
00406 (*itrLstnr)->subscriber_->valueUpdated(*dp, true);
00407 else
00408 (*itrLstnr)->subscriber_->valueUpdated(emptydata, *dp, true);
00409 (*itrLstnr)->updateStamp_ = dataStamp;
00410 }
00411 catch (rdaInternalError& ie)
00412 {
00413 };
00414 }
00415 }
00416 if (gotError)
00417 {
00418 try
00419 {
00420 (*itrLstnr)->subscriber_->ioFailed(rdaIOError("FESA", currentErrorCode,
00421 currentError.c_str()));
00422 continue;
00423 }
00424 catch (rdaInternalError& ie)
00425 {
00426
00427 continue;
00428 }
00429 }
00430 }
00431 }
00432 }
00433 }
00434
00435 }
00436
00437 }