NotificationConsumer.cpp

Go to the documentation of this file.
00001 // Copyright CERN 2012 - Developed in collaboration with GSI
00002 
00003 #include <fesa-core/Server/NotificationConsumer.h>
00004 
00005 #include <fesa-core/Server/AbstractServerEquipment.h>
00006 #include <fesa-core/Core/AbstractEquipment.h>
00007 #include <fesa-core/Core/AbstractMsgQueue.h>
00008 #include <fesa-core/Core/MsgQueueFactory.h>
00009 #include <fesa-core/Core/MessageTypes.h>
00010 #include <fesa-core/Core/ThreadPriorityConfiguration.h>
00011 #include <fesa-core/DataStore/EquipmentData.h>
00012 #include <fesa-core/Diagnostic/Diagnostics.h>
00013 #include <fesa-core/Persistency/PersistencyManager.h>
00014 #include <fesa-core/RDADeviceServer/SubscriptionTreeManager.h>
00015 #include <fesa-core/Synchronization/MultiplexingContext.h>
00016 #include <fesa-core/Utilities/ProcessConfiguration.h>
00017 #include <fesa-core/Utilities/XMLParser.h>
00018 
00019 #include <cmw-log/Logger.h>
00020 
00021 
00022 namespace
00023 {
00024 
00025 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.Server.NotificationConsumer");
00026 const std::string diagTopic = fesa::DiagnosticUtils::startUpTrackingStr;
00027 
00028 } // namespace
00029 
00030 
00031 namespace fesa
00032 {
00033 
00034 //init static members
00035 NotificationConsumer* NotificationConsumer::theInstance_ = NULL;
00036 
00037 NotificationConsumer::NotificationConsumer(const std::string& queueName,  SubscriptionTreeManager& subscriptionTreeManager) :  subscriptionTreeManager_(subscriptionTreeManager)
00038 {
00039     const boost::shared_ptr<Diagnostics>& diagnostics = AbstractEquipment::getInstance()->getDiagnostics();
00040     DiagnosticUtils::DiagnosticMessage diagMsg;
00041     diagMsg.side = DiagnosticUtils::framework;
00042     diagMsg.source = DiagnosticUtils::server;
00043     std::ostringstream traceStrStream;
00044     traceStrStream << "Notification consumer is connecting to message queue '" << queueName << "'";
00045     diagMsg.msg = traceStrStream.str();
00046     diagnostics->log(diagTopic, diagMsg);
00047     int32_t size = AbstractEquipment::getInstance()->getProcessConfiguration()->getIntValue(PropertyTag::MSG_NUM_MAX);
00048     msgQueuePtr_ = MsgQueueFactory::getOrCreateMsgQueue(queueName, size, true);
00049     setDetachState( PTHREAD_CREATE_DETACHED);
00050     {
00051         AbstractEquipment* eqp = AbstractEquipment::getInstance();
00052         std::string instanceFile = eqp->getDeviceDataFileName();
00053         XMLParser xmlParser(instanceFile, false);
00054         ThreadPriorityConfigurationFromFile conf(xmlParser, eqp->getProcessConfiguration());
00055         int32_t prio =  conf.getPrioNotificationConsumer();
00056         setPriority(prio);
00057     }
00058 }
00059 
00060 NotificationConsumer::~NotificationConsumer()
00061 {
00062 }
00063 
00064 NotificationConsumer* NotificationConsumer::getInstance( SubscriptionTreeManager& subscriptionTreeManager)
00065 {
00066     if (theInstance_ == NULL)
00067     {
00068         AbstractEquipment* eqp = AbstractEquipment::getInstance();
00069         theInstance_ = new NotificationConsumer(eqp->getEquipmentName(),  subscriptionTreeManager);
00070     }
00071     return theInstance_;
00072 }
00073 
00074 void NotificationConsumer::releaseInstance()
00075 {
00076     if (theInstance_ != NULL)
00077     {
00078         delete theInstance_;
00079         theInstance_ = NULL;
00080     }
00081 }
00082 
00083 void NotificationConsumer::run()
00084 {
00085     const boost::shared_ptr<Diagnostics>& diagnostics = AbstractEquipment::getInstance()->getDiagnostics();
00086     DiagnosticUtils::DiagnosticMessage diagMsg;
00087     diagMsg.side = DiagnosticUtils::framework;
00088     diagMsg.source = DiagnosticUtils::server;
00089     AbstractServerEquipment* eqp = AbstractServerEquipment::getInstance();
00090     RequestType reqType = GET;
00091     diagMsg.msg = "Notification consumer is purging the queue before consumption";
00092     diagnostics->log(diagTopic, diagMsg);
00093     msgQueuePtr_->purge();
00094     AbstractMessage* abstractMsg = NULL;
00095     while (isRunning_)
00096     {
00097         try
00098         {
00099             // reset the pointer just in case an exception is thrown by the messageQueue::consumeMsg method
00100             abstractMsg = NULL;
00101             //this thread will get stuck here, until a message arrives
00102             abstractMsg = msgQueuePtr_->consumeMsg();
00103 
00104             switch (abstractMsg->msgType_)
00105             {
00106                 case AutomaticNotificationMsg:
00107                 {
00108                     AutomaticNotificationMessage* concreteMsg =
00109                         static_cast<AutomaticNotificationMessage*> (abstractMsg);
00110 
00111                     std::vector<AbstractDeviceClass*> deviceCol = eqp->getDeviceClassCol();
00112 
00113                     EquipmentData* equipmentData = EquipmentData::getInstance();
00114                     if (*(equipmentData->notificationFailure_) == true)
00115                     {
00116                         *(equipmentData->notificationFailure_) = false;
00117                     }
00118 
00119                     try
00120                     {
00121 
00122                         subscriptionTreeManager_.notify(concreteMsg->notificationID_, *concreteMsg->muxContext_, reqType);
00123                         delete concreteMsg->muxContext_;
00124                     }
00125                     catch (...)
00126                     {
00127                         delete concreteMsg->muxContext_;
00128                         throw;
00129                     }
00130                     break;
00131                 }
00132                 case ManualNotificationMsg:
00133                 {
00134                     ManualNotificationMessage* concreteMsg = static_cast<ManualNotificationMessage*> (abstractMsg);
00135 
00136                     EquipmentData* equipmentData = EquipmentData::getInstance();
00137                     if (*(equipmentData->notificationFailure_) == true)
00138                     {
00139                         *(equipmentData->notificationFailure_) = false;
00140                     }
00141 
00142                     try
00143                     {
00144                         subscriptionTreeManager_.notify(concreteMsg, reqType);
00145                         delete concreteMsg->muxContext_;
00146                     }
00147                     catch (...)
00148                     {
00149                         delete concreteMsg->muxContext_;
00150                         throw;
00151                     }
00152 
00153                     break;
00154                 }
00155                 case CommandMsg:
00156                 {
00157                     CommandMessage *concreteMsg = static_cast<CommandMessage*> (abstractMsg);
00158 
00159                     if (concreteMsg->command_ == "PersistencyTrigger")
00160                     {
00161                         PersistencyManager::getInstance()->trigger(concreteMsg->parameter1_);
00162                     }
00163                     else
00164                     {
00165                         //unknown command
00166                         throw FesaException(__FILE__, __LINE__, FesaErrorUnknownMessageType.c_str(),
00167                                             msgQueuePtr_->getQueueName().c_str());
00168                     }
00169                     break;
00170                 }
00171                 default:
00172                     throw FesaException(__FILE__, __LINE__, FesaErrorUnknownMessageType.c_str(),
00173                                         msgQueuePtr_->getQueueName().c_str());
00174             }
00175 
00176             delete abstractMsg;
00177 
00178             //MONITORING: Notification-JAM detection TODO : figure out out if needed
00179 
00180         }
00181         catch (FesaException ex)
00182         {
00183             if (abstractMsg != NULL)
00184                 delete abstractMsg;
00185             std::ostringstream errorStrStream;
00186             errorStrStream << "Notification consuming failure: " << ex.getMessage();
00187             LOG_ERROR_IF(logger, errorStrStream.str());
00188         }
00189 
00190     }
00191     isRunning_ = false;
00192 }
00193 
00194 } // fesa

Generated on 18 Jan 2013 for Fesa by  doxygen 1.6.1