NotificationThread.cpp
Go to the documentation of this file.00001
00002
00003 #include <fesa-core/RDADeviceServer/NotificationThread.h>
00004
00005 #include <fesa-core/RDADeviceServer/SubscriptionFilter.h>
00006 #include <fesa-core/Core/AbstractEquipment.h>
00007 #include <fesa-core/Core/ThreadPriorityConfiguration.h>
00008 #include <fesa-core/Diagnostic/Diagnostics.h>
00009 #include <fesa-core/Exception/FesaException.h>
00010 #include <fesa-core/Utilities/Lock.h>
00011 #include <fesa-core/Utilities/XMLParser.h>
00012
00013 #include <cmw-log/Logger.h>
00014
00015
00016 namespace
00017 {
00018
00019 const std::string diagTopic = fesa::DiagnosticUtils::rdaTrackingStr;
00020
00021 }
00022
00023
00024 namespace fesa
00025 {
00026
00027 NotificationThread::NotificationThread(const std::string& className, const std::string& notificationThreadKey) :
00028 inUseSubFilter_(NULL)
00029 {
00030 const boost::shared_ptr<Diagnostics>& diagnostics = AbstractEquipment::getInstance()->getDiagnostics();
00031 DiagnosticUtils::DiagnosticMessage diagMsg;
00032 diagMsg.side = DiagnosticUtils::framework;
00033 diagMsg.source = DiagnosticUtils::server;
00034 const std::string threadName("Notif_" + notificationThreadKey);
00035 {
00036 AbstractEquipment* eqp = AbstractEquipment::getInstance();
00037 std::string instanceFile = eqp->getDeviceDataFileName();
00038 XMLParser xmlParser(instanceFile, false);
00039 ThreadPriorityConfigurationFromFile conf(xmlParser, eqp->getProcessConfiguration());
00040 int32_t prio = conf.getPrioNotificationthread(className, notificationThreadKey);
00041 setPriority(prio);
00042 }
00043 try
00044 {
00045 std::ostringstream traceStrStream;
00046 traceStrStream << "Starting notification thread with priority " << getPriority();
00047 diagMsg.msg = traceStrStream.str();
00048 diagnostics->log(diagTopic, diagMsg);
00049 start(false, threadName);
00050 }
00051 catch (FesaException& ex)
00052 {
00053 std::ostringstream traceStrStream;
00054 traceStrStream << "Failed to start notification thread: " << ex.getMessage();
00055 diagMsg.msg = traceStrStream.str();
00056 diagnostics->log(diagTopic, diagMsg);
00057 }
00058 }
00059
00060 NotificationThread::~NotificationThread()
00061 {
00062 }
00063
00064 void NotificationThread::addNotificationEvent(MultiplexingContext& muxContext, RequestType& reqType,
00065 std::vector<std::string>& validCS, SubscriptionFilter* subFilter)
00066 {
00067
00068 boost::shared_ptr<NotificationThread::NotificationQueueElement> pEl(new NotificationThread::NotificationQueueElement(muxContext, reqType, validCS, subFilter));
00069 {
00070 Lock lock(mutexQueue_);
00071 queue_.push_back(pEl);
00072 }
00073 startConsumption_.signal();
00074 }
00075
00076 bool NotificationThread::removeNotificationEvent(SubscriptionFilter* subFilter)
00077 {
00078 Lock lock(mutexQueue_);
00079 std::deque<boost::shared_ptr<NotificationQueueElement> >::iterator iter = queue_.begin();
00080
00081
00082 while (iter != queue_.end())
00083 {
00084 if ((*iter)->subFilter_ == subFilter)
00085 {
00086 boost::shared_ptr<NotificationQueueElement> pEl = (*iter);
00087 iter = queue_.erase(iter);
00088 }
00089 else
00090 {
00091 iter++;
00092 }
00093 }
00094 bool isInUse = (inUseSubFilter_ == subFilter) ? true : false;
00095 return isInUse;
00096 }
00097
00098 void NotificationThread::run()
00099 {
00100 while (isRunning_)
00101 {
00102 boost::shared_ptr<NotificationQueueElement> pEl;
00103 {
00104 Lock lock(mutexQueue_);
00105 inUseSubFilter_ = NULL;
00106 while (queue_.empty())
00107 {
00108 startConsumption_.wait(mutexQueue_);
00109 }
00110
00111
00112 pEl = queue_.front();
00113 queue_.pop_front();
00114
00115 inUseSubFilter_ = pEl->subFilter_;
00116 }
00117 pEl->subFilter_->doUpdate(pEl->muxContext_, pEl->reqType_, pEl->validCS_);
00118 }
00119 hasFinished_ = true;
00120 }
00121
00122 NotificationThread::NotificationQueueElement::NotificationQueueElement(MultiplexingContext& muxContext, RequestType& reqType,
00123 std::vector<std::string>& validCS, SubscriptionFilter* subFilter) :
00124 subFilter_(subFilter),
00125 muxContext_(muxContext),
00126 reqType_(reqType),
00127 validCS_(validCS)
00128 {
00129 }
00130
00131
00132 NotificationThread::NotificationQueueElement::~NotificationQueueElement()
00133 {
00134 }
00135
00136 }