MsgQueueFactory.cpp

Go to the documentation of this file.
00001 // Copyright CERN 2012 - Developed in collaboration with GSI
00002 
00003 #include <fesa-core/Core/MsgQueueFactory.h>
00004 
00005 #include <fesa-core/Core/AbstractEquipment.h>
00006 #include <fesa-core/Core/LocalMsgQueue.h>
00007 #include <fesa-core/Core/IpcMsgQueue.h>
00008 #include <fesa-core/Core/AbstractMsgQueue.h>
00009 #include <fesa-core/Utilities/Lock.h>
00010 #include <fesa-core/Utilities/Mutex.h>
00011 #include <fesa-core/Exception/FesaException.h>
00012 
00013 
00014 namespace
00015 {
00016 
00020 static fesa::Mutex queueFactoryMutex_;
00021 
00022 } // namespace
00023 
00024 namespace fesa
00025 {
00026 
00027 std::map<std::string, AbstractMsgQueue*> MsgQueueFactory::blockingIPCQueueCol_;
00028 std::map<std::string, AbstractMsgQueue*> MsgQueueFactory::nonBlockingIPCQueueCol_;
00029 std::map<std::string, AbstractMsgQueue*> MsgQueueFactory::localQueueCol_;
00030 
00031 AbstractMsgQueue* MsgQueueFactory::getOrCreateMsgQueue(const std::string& name, uint32_t queuelength, bool blocking)
00032 {
00033     std::map<std::string, AbstractMsgQueue*>* queueCol = 0;
00034     if (AbstractEquipment::getInstance()->getProcessType() == unsplit)
00035     {
00036         queueCol = &localQueueCol_;
00037     }
00038     else
00039     {
00040         if (blocking)
00041         {
00042             queueCol =  &blockingIPCQueueCol_;
00043         }
00044         else
00045         {
00046             queueCol =  &nonBlockingIPCQueueCol_;
00047         }
00048     }
00049 
00050     Lock lock(queueFactoryMutex_);
00051     std::map<std::string, AbstractMsgQueue*>::const_iterator iter = queueCol->find(name);
00052     AbstractMsgQueue* msgQueuePtr;
00053     //Queue has not be found, so it is created and inserted into the map
00054     if (iter == queueCol->end())
00055     {
00056         if (AbstractEquipment::getInstance()->getProcessType() == unsplit)
00057         {
00058             msgQueuePtr = (AbstractMsgQueue*) new LocalMsgQueue("/" + name, queuelength);
00059         }
00060         else
00061         {
00062             // IPC message queue expects "/" as first character of the name
00063             msgQueuePtr = (AbstractMsgQueue*) new IpcMsgQueue("/" + name, queuelength, blocking);
00064         }
00065         std::pair<std::map<std::string, AbstractMsgQueue*>::iterator, bool> res = queueCol->insert(std::pair<
00066                                                                                                    std::string, AbstractMsgQueue*>(name, msgQueuePtr));
00067     }
00068     else
00069     {
00070         msgQueuePtr = (*iter).second;
00071 
00072         if (msgQueuePtr->getQueueLength() != queuelength)
00073         {
00074             throw FesaException(__FILE__, __LINE__, FesaErrorQueuenameDuplication.c_str(), name.c_str(),
00075                                 "queue length");
00076         }
00077     }
00078     return msgQueuePtr;
00079 }
00080 
00081 MsgQueueFactory::~MsgQueueFactory()
00082 {
00083     for (std::map<std::string, AbstractMsgQueue*>::iterator iter = blockingIPCQueueCol_.begin(); iter != blockingIPCQueueCol_.end(); iter++)
00084     {
00085         delete (*iter).second;
00086     }
00087     blockingIPCQueueCol_.clear();
00088 
00089     for (std::map<std::string, AbstractMsgQueue*>::iterator iter = nonBlockingIPCQueueCol_.begin(); iter != nonBlockingIPCQueueCol_.end(); iter++)
00090     {
00091         delete (*iter).second;
00092     }
00093     nonBlockingIPCQueueCol_.clear();
00094 
00095     for (std::map<std::string, AbstractMsgQueue*>::iterator iter = localQueueCol_.begin(); iter != localQueueCol_.end(); iter++)
00096     {
00097         delete (*iter).second;
00098     }
00099     localQueueCol_.clear();
00100 }
00101 
00102 } // fesa

Generated on 18 Jan 2013 for Fesa by  doxygen 1.6.1