SubscriptionParameter.cpp
Go to the documentation of this file.00001
00002
00003 #include <fesa-core/RDADeviceServer/SubscriptionParameter.h>
00004
00005 #include <fesa-core/RDADeviceServer/SubscriptionFilter.h>
00006 #include <fesa-core/Server/Property.h>
00007 #include <fesa-core/DataStore/AbstractDevice.h>
00008 #include <fesa-core/Diagnostic/Diagnostics.h>
00009
00010 #include <cmw-log/Logger.h>
00011
00012
00013 namespace
00014 {
00015
00016 const std::string diagTopic = fesa::DiagnosticUtils::rdaTrackingStr;
00017
00018 }
00019
00020
00021 namespace fesa
00022 {
00023
00024 SubscriptionParameter::SubscriptionParameter(const std::string& paramName, AbstractDevice& device,
00025 Property& property, bool useFastUpdate,
00026 NotificationThread* pNotificationThread) :
00027 name_(paramName),
00028 device_(device),
00029 property_(property),
00030 useFastUpdate_(useFastUpdate),
00031 pNotificationThread_(pNotificationThread)
00032 {
00033 const boost::shared_ptr<Diagnostics>& diagnostics = AbstractEquipment::getInstance()->getDiagnostics();
00034 DiagnosticUtils::DiagnosticMessage diagMsg;
00035 diagMsg.side = DiagnosticUtils::framework;
00036 diagMsg.source = DiagnosticUtils::server;
00037 std::ostringstream traceStrStream;
00038 traceStrStream << "Creating subscription parameter with name '" << name_ << "'";
00039 diagMsg.msg = traceStrStream.str();
00040 diagnostics->log(diagTopic, diagMsg);
00041 }
00042
00043 SubscriptionParameter::~SubscriptionParameter()
00044 {
00045 const boost::shared_ptr<Diagnostics>& diagnostics = AbstractEquipment::getInstance()->getDiagnostics();
00046 DiagnosticUtils::DiagnosticMessage diagMsg;
00047 diagMsg.side = DiagnosticUtils::framework;
00048 diagMsg.source = DiagnosticUtils::server;
00049 std::ostringstream traceStrStream;
00050 traceStrStream << "Deleting subscription parameter with name '" << name_ << "'";
00051 diagMsg.msg = traceStrStream.str();
00052 diagnostics->log(diagTopic, diagMsg);
00053 SubscriptionFilterItr itr = filters_.begin();
00054 SubscriptionFilterItr itrEnd = filters_.end();
00055 filters_.erase(itr, itrEnd);
00056 }
00057
00058 void SubscriptionParameter::addSubscriber(const std::string& cycleSelector, const rdaData& newFilter, rdaValueChangeListener* subscriber)
00059 {
00060
00061
00062
00063
00064
00065
00066 SubscriptionFilterItr itr;
00067 for (itr = filters_.begin(); itr != filters_.end(); itr++)
00068 {
00069 if ((*itr)->matchFilter(newFilter))
00070 {
00071
00072 (*itr)->addSubscriber(cycleSelector, subscriber);
00073 break;
00074 }
00075 }
00076 if (itr == filters_.end())
00077 {
00078
00079
00080 boost::shared_ptr<AbstractSubscriptionFilter> sf;
00081 sf.reset((AbstractSubscriptionFilter*)new SubscriptionFilter(device_, property_, newFilter, useFastUpdate_, pNotificationThread_));
00082
00083
00084 sf->addSubscriber(cycleSelector, subscriber);
00085
00086 filters_.push_back(sf);
00087
00088 itr = filters_.end();
00089 --itr;
00090 }
00091
00092
00093
00094
00095 if (cycleSelector.empty() && property_.getType()==SETTING && device_.isMultiplexed() && property_.isMultiplexed())
00096 {
00097 (*itr)->allCyclesFirstUpdate(newFilter, subscriber);
00098 }
00099 else
00100 {
00101 (*itr)->cycleFirstUpdate(cycleSelector, newFilter, subscriber);
00102 }
00103 }
00104
00105 void SubscriptionParameter::removeSubscriber(const std::string& cycleSelector, rdaValueChangeListener* subscriber, bool& isEmpty)
00106 {
00107 SubscriptionFilterItr itr;
00108 for (itr = filters_.begin(); itr != filters_.end(); ++itr)
00109 {
00110 bool isFilterEmpty = false;
00111 bool toBeDeleted = true;
00112 if ((*itr)->removeSubscriber(cycleSelector, subscriber, isFilterEmpty, toBeDeleted))
00113 {
00114 if (isFilterEmpty)
00115 {
00116 filters_.erase(itr);
00117 }
00118 break;
00119 }
00120 }
00121
00122 isEmpty = filters_.empty();
00123 }
00124
00125 void SubscriptionParameter::notify(MultiplexingContext& muxContext, RequestType& reqType,
00126 std::vector<std::string>& validCS)
00127 {
00128 SubscriptionFilterItr itr = filters_.begin();
00129 SubscriptionFilterItr itrEnd = filters_.end();
00130 for (; itr != itrEnd; ++itr)
00131 {
00132 (*itr)->notify(muxContext, reqType, validCS);
00133 }
00134 }
00135
00136 void SubscriptionParameter::reportInternalError(const std::string& errorCategory, int32_t errorCode, const std::string& message, bool changed)
00137 {
00138 SubscriptionFilterItr itr;
00139 for (itr = filters_.begin(); itr != filters_.end(); ++itr)
00140 {
00141 (*itr)->reportInternalError(errorCategory, errorCode, message, changed);
00142 }
00143 }
00144
00145 void SubscriptionParameter::printDebugState(std::stringstream& str)
00146 {
00147 str << "\t\tParam: " << name_ << "\n";
00148 SubscriptionFilterItr itr = filters_.begin();
00149 SubscriptionFilterItr itrEnd = filters_.end();
00150 for (; itr != itrEnd; ++itr)
00151 {
00152 (*itr)->printDebugState(str);
00153 }
00154 }
00155
00156 }