MsgQueueFactory.cpp
Go to the documentation of this file.00001
00002
00003 #include <fesa-core/Core/MsgQueueFactory.h>
00004
00005 #include <fesa-core/Core/AbstractEquipment.h>
00006 #include <fesa-core/Core/LocalMsgQueue.h>
00007 #include <fesa-core/Core/IpcMsgQueue.h>
00008 #include <fesa-core/Core/AbstractMsgQueue.h>
00009 #include <fesa-core/Utilities/Lock.h>
00010 #include <fesa-core/Utilities/Mutex.h>
00011 #include <fesa-core/Exception/FesaException.h>
00012
00013
00014 namespace
00015 {
00016
00020 static fesa::Mutex queueFactoryMutex_;
00021
00022 }
00023
00024 namespace fesa
00025 {
00026
00027 std::map<std::string, AbstractMsgQueue*> MsgQueueFactory::blockingIPCQueueCol_;
00028 std::map<std::string, AbstractMsgQueue*> MsgQueueFactory::nonBlockingIPCQueueCol_;
00029 std::map<std::string, AbstractMsgQueue*> MsgQueueFactory::localQueueCol_;
00030
00031 AbstractMsgQueue* MsgQueueFactory::getOrCreateMsgQueue(const std::string& name, uint32_t queuelength, bool blocking)
00032 {
00033 std::map<std::string, AbstractMsgQueue*>* queueCol = 0;
00034 if (AbstractEquipment::getInstance()->getProcessType() == unsplit)
00035 {
00036 queueCol = &localQueueCol_;
00037 }
00038 else
00039 {
00040 if (blocking)
00041 {
00042 queueCol = &blockingIPCQueueCol_;
00043 }
00044 else
00045 {
00046 queueCol = &nonBlockingIPCQueueCol_;
00047 }
00048 }
00049
00050 Lock lock(queueFactoryMutex_);
00051 std::map<std::string, AbstractMsgQueue*>::const_iterator iter = queueCol->find(name);
00052 AbstractMsgQueue* msgQueuePtr;
00053
00054 if (iter == queueCol->end())
00055 {
00056 if (AbstractEquipment::getInstance()->getProcessType() == unsplit)
00057 {
00058 msgQueuePtr = (AbstractMsgQueue*) new LocalMsgQueue("/" + name, queuelength);
00059 }
00060 else
00061 {
00062
00063 msgQueuePtr = (AbstractMsgQueue*) new IpcMsgQueue("/" + name, queuelength, blocking);
00064 }
00065 std::pair<std::map<std::string, AbstractMsgQueue*>::iterator, bool> res = queueCol->insert(std::pair<
00066 std::string, AbstractMsgQueue*>(name, msgQueuePtr));
00067 }
00068 else
00069 {
00070 msgQueuePtr = (*iter).second;
00071
00072 if (msgQueuePtr->getQueueLength() != queuelength)
00073 {
00074 throw FesaException(__FILE__, __LINE__, FesaErrorQueuenameDuplication.c_str(), name.c_str(),
00075 "queue length");
00076 }
00077 }
00078 return msgQueuePtr;
00079 }
00080
00081 MsgQueueFactory::~MsgQueueFactory()
00082 {
00083 for (std::map<std::string, AbstractMsgQueue*>::iterator iter = blockingIPCQueueCol_.begin(); iter != blockingIPCQueueCol_.end(); iter++)
00084 {
00085 delete (*iter).second;
00086 }
00087 blockingIPCQueueCol_.clear();
00088
00089 for (std::map<std::string, AbstractMsgQueue*>::iterator iter = nonBlockingIPCQueueCol_.begin(); iter != nonBlockingIPCQueueCol_.end(); iter++)
00090 {
00091 delete (*iter).second;
00092 }
00093 nonBlockingIPCQueueCol_.clear();
00094
00095 for (std::map<std::string, AbstractMsgQueue*>::iterator iter = localQueueCol_.begin(); iter != localQueueCol_.end(); iter++)
00096 {
00097 delete (*iter).second;
00098 }
00099 localQueueCol_.clear();
00100 }
00101
00102 }