00001
00002
00003 #include <fesa-core/RDADeviceServer/SubscriptionTreeManager.h>
00004
00005 #include <fesa-core/RDADeviceServer/Notification.h>
00006 #include <fesa-core/RDADeviceServer/SubscriptionParameter.h>
00007 #include <fesa-core/Server/Property.h>
00008 #include <fesa-core/DataStore/AbstractDevice.h>
00009 #include <fesa-core/Core/MessageTypes.h>
00010 #include <fesa-core/Core/NotificationIDManager.h>
00011 #include <fesa-core/Core/NotificationID.h>
00012 #include <fesa-core/Utilities/Lock.h>
00013
00014 #include <cmw-rda/ValueChangeListener.h>
00015 #include <cmw-log/Logger.h>
00016
00017 #include <boost/lexical_cast.hpp>
00018
00019 #include <iostream>
00020
00021 namespace
00022 {
00023
00024 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.RDADeviceServer.SubscriptionTreeManager");
00025
00029 static fesa::Mutex singleMutex;
00030
00031 }
00032
00033 namespace fesa
00034 {
00035
00036 SubscriptionTreeManager::SubscriptionTreeManager(bool useFastUpdate) :
00037 initializationSucessfull_(false), lastErrorCode_(0), useFastUpdate_(useFastUpdate)
00038 {
00039
00040 }
00041
00042 SubscriptionTreeManager::~SubscriptionTreeManager()
00043 {
00044
00045 std::map<uint32_t, boost::shared_ptr<Notification> >::iterator iterN;
00046 for (iterN = notifications_.begin(); iterN != notifications_.end(); iterN++)
00047 {
00048 notifications_.erase(iterN);
00049 }
00050
00051
00052 std::map<std::string, boost::shared_ptr<SubscriptionParameter> >::iterator iterS;
00053 for (iterS = subscriptions_.begin(); iterS != subscriptions_.end(); iterS++)
00054 {
00055 subscriptions_.erase(iterS);
00056 }
00057 }
00058
00059
00060
00061
00062
00063
00064
00065
00066 void SubscriptionTreeManager::addSubscriber(const std::string& className, AbstractDevice& device, Property& property, const std::string& cycleSelector,
00067 const rdaData& filter, rdaValueChangeListener* subscriber)
00068 {
00069
00070
00071
00072
00073
00074
00075
00076 Lock l(treeMutex_);
00077
00078 if (!initializationSucessfull_)
00079 {
00080 initNotifications();
00081 }
00082
00083 std::string paramName = device.getName() + std::string("#") + property.getName();
00084 boost::shared_ptr<SubscriptionParameter> sp = addSubscriptionParam(paramName, device, property, cycleSelector, filter, subscriber);
00085 subscriptionCycle_.addCycle(cycleSelector);
00086
00087
00088
00089 NotificationItr itr = notifications_.begin();
00090 NotificationItr itrEnd = notifications_.end();
00091 for(; itr != itrEnd; ++itr)
00092 {
00093 if((*itr).second->contains(device.getName(), property.getName()))
00094 {
00095 if (sp.get() != NULL)
00096 (*itr).second->addSubscriber(sp);
00097 (*itr).second->addCycle(cycleSelector);
00098 }
00099 }
00100
00101 if (subscriber->isOnChange() && !property.isOnChange())
00102 {
00103 std::string message("Warning: Subscription OnChange is not supported by this property");
00104 subscriber->ioFailed(rdaIOError("FESA_RDADeviceServer", 0, message.c_str()));
00105 }
00106
00107 if (logger.isLoggable(CMW::Log::Level::LL_DEBUG))
00108 {
00109 std::string s("add parameter: ");
00110 s.append(paramName);
00111 s.append(" listener: ");
00112 char buf[16];
00113 snprintf(buf, 16, "%p", subscriber);
00114 s.append(buf);
00115 printState(s);
00116 }
00117
00118 }
00119
00120 void SubscriptionTreeManager::removeSubscriber(AbstractDevice& device, Property& property, const std::string& cycleSelector,
00121 rdaValueChangeListener* subscriber)
00122 {
00123
00124
00125
00126
00127 Lock l(treeMutex_);
00128 std::string paramName = device.getName() + std::string("#") + property.getName();
00129 SubscriptionItr subParamItr = subscriptions_.find(paramName);
00130 if (subParamItr != subscriptions_.end())
00131 {
00132
00133
00134 bool isEmpty = false;
00135 (*subParamItr).second->removeSubscriber(cycleSelector, subscriber, isEmpty);
00136
00137 NotificationItr notifItr = notifications_.begin();
00138 NotificationItr notifItrEnd = notifications_.end();
00139 {
00140 for(; notifItr != notifItrEnd; ++notifItr)
00141 {
00142 (*notifItr).second->removeSubscriber((*subParamItr).second, cycleSelector, isEmpty);
00143 }
00144 }
00145 subscriptionCycle_.removeCycle(cycleSelector);
00146 if (isEmpty)
00147 {
00148 boost::shared_ptr<SubscriptionParameter> sp = (*subParamItr).second;
00149 subscriptions_.erase(subParamItr);
00150 }
00151 }
00152 else
00153 {
00154
00155 }
00156
00157 if (logger.isLoggable(CMW::Log::Level::LL_DEBUG))
00158 {
00159 std::string s("remove parameter: ");
00160 s.append(paramName);
00161 s.append(" listener: ");
00162 char buf[16];
00163 snprintf(buf, 16, "%p", subscriber);
00164 s.append(buf);
00165 printState(s);
00166 }
00167 }
00168
00169 void SubscriptionTreeManager::notify(uint32_t IDKey, MultiplexingContext& muxContext,
00170 RequestType& reqType)
00171 {
00172
00173 Lock l(treeMutex_);
00174
00175 if (!initializationSucessfull_)
00176 {
00177 initNotifications();
00178 }
00179
00180 NotificationItr itr = notifications_.find(IDKey);
00181 if(itr != notifications_.end())
00182 {
00183
00184 (*itr).second->notify(muxContext, reqType);
00185 if (useFastUpdate_) {
00186 const rdaData emptyData;
00187 rdaValueChangeListener::flush(emptyData);
00188 }
00189 }
00190 else
00191 {
00192 std::ostringstream msg;
00193 msg << "notify error, IDKey not found " << boost::lexical_cast<std::string>(IDKey);
00194 LOG_ERROR_IF(logger, msg.str());
00195 }
00196 }
00197
00198 void SubscriptionTreeManager::notify(ManualNotificationMessage* msg, RequestType& reqType)
00199 {
00200 std::vector<std::string> devices;
00201 std::vector<std::string> properties;
00202
00203 std::vector<std::string> tmp;
00204
00205
00206
00207 std::vector<std::string> validCS;
00208
00209 subscriptionCycle_.matchContext(*(msg->muxContext_), validCS);
00210
00211
00212 if (validCS.size() != 0)
00213 {
00214 try {
00215
00216 Lock l(treeMutex_);
00217 for (std::map<std::string,std::vector<std::string> >::iterator pADItr = msg->propertiesAndDevicesCol_.begin(); pADItr != msg->propertiesAndDevicesCol_.end(); ++pADItr)
00218 {
00219 for (std::vector<std::string>::iterator devIter = pADItr->second.begin(); devIter != pADItr->second.end(); ++devIter)
00220 {
00221 std::string paramName = *devIter + std::string("#") + pADItr->first;
00222 SubscriptionItr subParamItr = subscriptions_.find(paramName);
00223 if (subParamItr != subscriptions_.end())
00224 {
00225 (*subParamItr).second->notify(*(msg->muxContext_), reqType, validCS);
00226 }
00227 }
00228 }
00229 }
00230 catch(FesaException& ex)
00231 {
00232
00233 }
00234 if (useFastUpdate_) {
00235 const rdaData emptyData;
00236 rdaValueChangeListener::flush(emptyData);
00237 }
00238 }
00239 }
00240
00241 void SubscriptionTreeManager::reportInternalError(const std::string& errorCategory, int32_t errorCode, const std::string& message)
00242 {
00243
00244 Lock l(treeMutex_);
00245 bool changed = (errorCode == lastErrorCode_) ? false : true;
00246 SubscriptionItr subItr = subscriptions_.begin();
00247 SubscriptionItr subItrEnd = subscriptions_.end();
00248 for(; subItr != subItrEnd; ++subItr)
00249 {
00250 (*subItr).second->reportInternalError(errorCategory, errorCode, message, changed);
00251 }
00252 lastErrorCode_ = errorCode;
00253 }
00254
00255 boost::shared_ptr<SubscriptionParameter> SubscriptionTreeManager::addSubscriptionParam(const std::string& paramName, AbstractDevice& device,
00256 Property& property, const std::string& cycleSelector,
00257 const rdaData& filter, rdaValueChangeListener* subscriber)
00258 {
00259 boost::shared_ptr<SubscriptionParameter> sp((SubscriptionParameter*)NULL);
00260 SubscriptionItr itr = subscriptions_.find(paramName);
00261 if(itr != subscriptions_.end())
00262 {
00263 (*itr).second->addSubscriber(cycleSelector, filter, subscriber);
00264
00265 }
00266 else
00267 {
00268 NotificationThread* pNotificationThread = &(notificationThreadFactory_.getThread(property.getClassName(), property.getNotificationThreadKey()));
00269
00270
00271 sp.reset(new SubscriptionParameter(paramName, device, property, useFastUpdate_, pNotificationThread));
00272
00273 sp->addSubscriber(cycleSelector, filter, subscriber);
00274
00275 subscriptions_[paramName] = sp;
00276 }
00277
00278 return sp;
00279 }
00280
00281 void SubscriptionTreeManager::initNotifications()
00282 {
00283 NotificationIDManager* idGenerator = NotificationIDManager::getInstance();
00284 const std::map<uint32_t, NotificationID*>& idsCol = idGenerator->getNotificationIDCol();
00285
00286
00287
00288 std::map<uint32_t, NotificationID*>::const_iterator notIter;
00289 for (notIter = idsCol.begin(); notIter != idsCol.end(); notIter++)
00290 {
00291
00292 boost::shared_ptr<Notification> notif(new Notification(*((*notIter).second)));
00293 notifications_.insert(std::pair<uint32_t, boost::shared_ptr<Notification> >((*notIter).second->getIDKey(), notif));
00294 }
00295 initializationSucessfull_ = true;
00296
00297
00298
00299
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313
00314
00315
00316
00317 }
00318
00319 void SubscriptionTreeManager::printState(std::string& header)
00320 {
00321 std::stringstream str;
00322 str << header << "\nSubscriptionTreeManager Configuration:\n\tNotif Configuration:\n";
00323 NotificationItr itr = notifications_.begin();
00324 NotificationItr itrEnd = notifications_.end();
00325 for(; itr != itrEnd; ++itr)
00326 {
00327 (*itr).second->printDebugState(str);
00328 }
00329
00330 str << "\tCycleSubscription Configuration:\n";
00331 subscriptionCycle_.printDebugState(str,2);
00332
00333 str << "\tSubscriptionParameter Configuration:\n";
00334 SubscriptionItr subItr = subscriptions_.begin();
00335 SubscriptionItr subItrEnd = subscriptions_.end();
00336 for(; subItr != subItrEnd; ++subItr)
00337 {
00338 (*subItr).second->printDebugState(str);
00339 }
00340 LOG_DEBUG_IF(logger, str.str());
00341 }
00342 }