NotificationConsumer.cpp
Go to the documentation of this file.00001
00002
00003 #include <fesa-core/Server/NotificationConsumer.h>
00004
00005 #include <fesa-core/Server/AbstractServerEquipment.h>
00006 #include <fesa-core/Core/AbstractEquipment.h>
00007 #include <fesa-core/Core/AbstractMsgQueue.h>
00008 #include <fesa-core/Core/MsgQueueFactory.h>
00009 #include <fesa-core/Core/MessageTypes.h>
00010 #include <fesa-core/Core/ThreadPriorityConfiguration.h>
00011 #include <fesa-core/DataStore/EquipmentData.h>
00012 #include <fesa-core/Diagnostic/Diagnostics.h>
00013 #include <fesa-core/Persistency/PersistencyManager.h>
00014 #include <fesa-core/RDADeviceServer/SubscriptionTreeManager.h>
00015 #include <fesa-core/Synchronization/MultiplexingContext.h>
00016 #include <fesa-core/Utilities/ProcessConfiguration.h>
00017 #include <fesa-core/Utilities/XMLParser.h>
00018
00019 #include <cmw-log/Logger.h>
00020
00021
00022 namespace
00023 {
00024
00025 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.Server.NotificationConsumer");
00026 const std::string diagTopic = fesa::DiagnosticUtils::startUpTrackingStr;
00027
00028 }
00029
00030
00031 namespace fesa
00032 {
00033
00034
00035 NotificationConsumer* NotificationConsumer::theInstance_ = NULL;
00036
00037 NotificationConsumer::NotificationConsumer(const std::string& queueName, SubscriptionTreeManager& subscriptionTreeManager) : subscriptionTreeManager_(subscriptionTreeManager)
00038 {
00039 const boost::shared_ptr<Diagnostics>& diagnostics = AbstractEquipment::getInstance()->getDiagnostics();
00040 DiagnosticUtils::DiagnosticMessage diagMsg;
00041 diagMsg.side = DiagnosticUtils::framework;
00042 diagMsg.source = DiagnosticUtils::server;
00043 std::ostringstream traceStrStream;
00044 traceStrStream << "Notification consumer is connecting to message queue '" << queueName << "'";
00045 diagMsg.msg = traceStrStream.str();
00046 diagnostics->log(diagTopic, diagMsg);
00047 int32_t size = AbstractEquipment::getInstance()->getProcessConfiguration()->getIntValue(PropertyTag::MSG_NUM_MAX);
00048 msgQueuePtr_ = MsgQueueFactory::getOrCreateMsgQueue(queueName, size, true);
00049 setDetachState( PTHREAD_CREATE_DETACHED);
00050 {
00051 AbstractEquipment* eqp = AbstractEquipment::getInstance();
00052 std::string instanceFile = eqp->getDeviceDataFileName();
00053 XMLParser xmlParser(instanceFile, false);
00054 ThreadPriorityConfigurationFromFile conf(xmlParser, eqp->getProcessConfiguration());
00055 int32_t prio = conf.getPrioNotificationConsumer();
00056 setPriority(prio);
00057 }
00058 }
00059
00060 NotificationConsumer::~NotificationConsumer()
00061 {
00062 }
00063
00064 NotificationConsumer* NotificationConsumer::getInstance( SubscriptionTreeManager& subscriptionTreeManager)
00065 {
00066 if (theInstance_ == NULL)
00067 {
00068 AbstractEquipment* eqp = AbstractEquipment::getInstance();
00069 theInstance_ = new NotificationConsumer(eqp->getEquipmentName(), subscriptionTreeManager);
00070 }
00071 return theInstance_;
00072 }
00073
00074 void NotificationConsumer::releaseInstance()
00075 {
00076 if (theInstance_ != NULL)
00077 {
00078 delete theInstance_;
00079 theInstance_ = NULL;
00080 }
00081 }
00082
00083 void NotificationConsumer::run()
00084 {
00085 const boost::shared_ptr<Diagnostics>& diagnostics = AbstractEquipment::getInstance()->getDiagnostics();
00086 DiagnosticUtils::DiagnosticMessage diagMsg;
00087 diagMsg.side = DiagnosticUtils::framework;
00088 diagMsg.source = DiagnosticUtils::server;
00089 AbstractServerEquipment* eqp = AbstractServerEquipment::getInstance();
00090 RequestType reqType = GET;
00091 diagMsg.msg = "Notification consumer is purging the queue before consumption";
00092 diagnostics->log(diagTopic, diagMsg);
00093 msgQueuePtr_->purge();
00094 AbstractMessage* abstractMsg = NULL;
00095 while (isRunning_)
00096 {
00097 try
00098 {
00099
00100 abstractMsg = NULL;
00101
00102 abstractMsg = msgQueuePtr_->consumeMsg();
00103
00104 switch (abstractMsg->msgType_)
00105 {
00106 case AutomaticNotificationMsg:
00107 {
00108 AutomaticNotificationMessage* concreteMsg =
00109 static_cast<AutomaticNotificationMessage*> (abstractMsg);
00110
00111 std::vector<AbstractDeviceClass*> deviceCol = eqp->getDeviceClassCol();
00112
00113 EquipmentData* equipmentData = EquipmentData::getInstance();
00114 if (*(equipmentData->notificationFailure_) == true)
00115 {
00116 *(equipmentData->notificationFailure_) = false;
00117 }
00118
00119 try
00120 {
00121
00122 subscriptionTreeManager_.notify(concreteMsg->notificationID_, *concreteMsg->muxContext_, reqType);
00123 delete concreteMsg->muxContext_;
00124 }
00125 catch (...)
00126 {
00127 delete concreteMsg->muxContext_;
00128 throw;
00129 }
00130 break;
00131 }
00132 case ManualNotificationMsg:
00133 {
00134 ManualNotificationMessage* concreteMsg = static_cast<ManualNotificationMessage*> (abstractMsg);
00135
00136 EquipmentData* equipmentData = EquipmentData::getInstance();
00137 if (*(equipmentData->notificationFailure_) == true)
00138 {
00139 *(equipmentData->notificationFailure_) = false;
00140 }
00141
00142 try
00143 {
00144 subscriptionTreeManager_.notify(concreteMsg, reqType);
00145 delete concreteMsg->muxContext_;
00146 }
00147 catch (...)
00148 {
00149 delete concreteMsg->muxContext_;
00150 throw;
00151 }
00152
00153 break;
00154 }
00155 case CommandMsg:
00156 {
00157 CommandMessage *concreteMsg = static_cast<CommandMessage*> (abstractMsg);
00158
00159 if (concreteMsg->command_ == "PersistencyTrigger")
00160 {
00161 PersistencyManager::getInstance()->trigger(concreteMsg->parameter1_);
00162 }
00163 else
00164 {
00165
00166 throw FesaException(__FILE__, __LINE__, FesaErrorUnknownMessageType.c_str(),
00167 msgQueuePtr_->getQueueName().c_str());
00168 }
00169 break;
00170 }
00171 default:
00172 throw FesaException(__FILE__, __LINE__, FesaErrorUnknownMessageType.c_str(),
00173 msgQueuePtr_->getQueueName().c_str());
00174 }
00175
00176 delete abstractMsg;
00177
00178
00179
00180 }
00181 catch (FesaException ex)
00182 {
00183 if (abstractMsg != NULL)
00184 delete abstractMsg;
00185 std::ostringstream errorStrStream;
00186 errorStrStream << "Notification consuming failure: " << ex.getMessage();
00187 LOG_ERROR_IF(logger, errorStrStream.str());
00188 }
00189
00190 }
00191 isRunning_ = false;
00192 }
00193
00194 }