NotificationThread.cpp

Go to the documentation of this file.
00001 // Copyright CERN 2012 - Developed in collaboration with GSI
00002 
00003 #include <fesa-core/RDADeviceServer/NotificationThread.h>
00004 
00005 #include <fesa-core/RDADeviceServer/SubscriptionFilter.h>
00006 #include <fesa-core/Core/AbstractEquipment.h>
00007 #include <fesa-core/Core/ThreadPriorityConfiguration.h>
00008 #include <fesa-core/Diagnostic/Diagnostics.h>
00009 #include <fesa-core/Exception/FesaException.h>
00010 #include <fesa-core/Utilities/Lock.h>
00011 #include <fesa-core/Utilities/XMLParser.h>
00012 
00013 #include <cmw-log/Logger.h>
00014 
00015 
00016 namespace
00017 {
00018 
00019 const std::string diagTopic = fesa::DiagnosticUtils::rdaTrackingStr;
00020 
00021 } // namespace
00022 
00023 
00024 namespace fesa
00025 {
00026 
00027 NotificationThread::NotificationThread(const std::string& className, const std::string& notificationThreadKey) :
00028     inUseSubFilter_(NULL)
00029 {
00030     const boost::shared_ptr<Diagnostics>& diagnostics = AbstractEquipment::getInstance()->getDiagnostics();
00031     DiagnosticUtils::DiagnosticMessage diagMsg;
00032     diagMsg.side = DiagnosticUtils::framework;
00033     diagMsg.source = DiagnosticUtils::server;
00034     const std::string threadName("Notif_" + notificationThreadKey);
00035     {
00036         AbstractEquipment* eqp = AbstractEquipment::getInstance();
00037         std::string instanceFile = eqp->getDeviceDataFileName();
00038         XMLParser xmlParser(instanceFile, false);
00039         ThreadPriorityConfigurationFromFile conf(xmlParser, eqp->getProcessConfiguration());
00040         int32_t prio =  conf.getPrioNotificationthread(className, notificationThreadKey);
00041         setPriority(prio);
00042     }
00043     try
00044     {
00045         std::ostringstream traceStrStream;
00046         traceStrStream << "Starting notification thread with priority " << getPriority();
00047         diagMsg.msg = traceStrStream.str();
00048         diagnostics->log(diagTopic, diagMsg);
00049         start(false, threadName);
00050     }
00051     catch (FesaException& ex)
00052     {
00053         std::ostringstream traceStrStream;
00054         traceStrStream << "Failed to start notification thread: " << ex.getMessage();
00055         diagMsg.msg = traceStrStream.str();
00056         diagnostics->log(diagTopic, diagMsg);
00057     }
00058 }
00059 
00060 NotificationThread::~NotificationThread()
00061 {
00062 }
00063 
00064 void NotificationThread::addNotificationEvent(MultiplexingContext& muxContext, RequestType& reqType,
00065                                               std::vector<std::string>& validCS, SubscriptionFilter* subFilter)
00066 {
00067     // the Stl queue is NOT threadsave .. so we have to use mutexes
00068     boost::shared_ptr<NotificationThread::NotificationQueueElement> pEl(new NotificationThread::NotificationQueueElement(muxContext, reqType, validCS, subFilter));
00069     {
00070         Lock lock(mutexQueue_);
00071         queue_.push_back(pEl);
00072     }
00073     startConsumption_.signal();
00074 }
00075 
00076 bool NotificationThread::removeNotificationEvent(SubscriptionFilter* subFilter)
00077 {
00078     Lock lock(mutexQueue_); //get a lock on the queue
00079     std::deque<boost::shared_ptr<NotificationQueueElement> >::iterator iter = queue_.begin();
00080 
00081     //erase all entrys, with matching subscriber
00082     while (iter != queue_.end())
00083     {
00084         if ((*iter)->subFilter_ == subFilter)// compare the reference of both
00085         {
00086             boost::shared_ptr<NotificationQueueElement> pEl = (*iter);
00087             iter = queue_.erase(iter);//return value is a iterator-ref to the next element
00088         }
00089         else
00090         {
00091             iter++;
00092         }
00093     }
00094     bool isInUse = (inUseSubFilter_ == subFilter) ? true : false;
00095     return isInUse;
00096 }
00097 
00098 void NotificationThread::run()
00099 {
00100     while (isRunning_)
00101     {
00102         boost::shared_ptr<NotificationQueueElement> pEl;
00103         {
00104             Lock lock(mutexQueue_);
00105             inUseSubFilter_ = NULL;
00106             while (queue_.empty())
00107             {
00108                 startConsumption_.wait(mutexQueue_);
00109             }
00110 
00111             //dequeue element
00112             pEl = queue_.front();
00113             queue_.pop_front(); //delete this element from the queue
00114             // keep the current in use Subscription Filter in case a remove Subscriber is waiting.
00115             inUseSubFilter_ = pEl->subFilter_;
00116         }
00117         pEl->subFilter_->doUpdate(pEl->muxContext_, pEl->reqType_, pEl->validCS_);
00118     }
00119     hasFinished_ = true;
00120 }
00121 
00122 NotificationThread::NotificationQueueElement::NotificationQueueElement(MultiplexingContext& muxContext, RequestType& reqType,
00123                                                                        std::vector<std::string>& validCS, SubscriptionFilter* subFilter) :
00124     subFilter_(subFilter),
00125     muxContext_(muxContext),
00126     reqType_(reqType),
00127     validCS_(validCS)
00128 {
00129 }
00130 
00131 
00132 NotificationThread::NotificationQueueElement::~NotificationQueueElement()
00133 {
00134 }
00135 
00136 } // fesa

Generated on 18 Jan 2013 for Fesa by  doxygen 1.6.1