IpcMsgQueue.cpp

Go to the documentation of this file.
00001 // Copyright CERN 2012 - Developed in collaboration with GSI
00002 
00003 #include <fesa-core/Core/IpcMsgQueue.h>
00004 
00005 #include <fesa-core/Core/MessageTypes.h>
00006 #include <fesa-core/Exception/FesaException.h>
00007 
00008 #include <cmw-log/Logger.h>
00009 
00010 #include <fstream> // To check the queue system limits
00011 #include <sstream>
00012 #include <cstring>
00013 #include <cerrno>
00014 #include <sys/resource.h> // get/set r_limit
00015 #include <sys/stat.h>
00016 
00017 
00018 namespace
00019 {
00020 
00021 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.Core.IpcMsgQueue");
00022 
00023 } // namespace
00024 
00025 
00026 namespace fesa
00027 {
00028 
00029 IpcMsgQueue::IpcMsgQueue(const std::string& name, uint32_t queuelength, bool blocking) :
00030     AbstractMsgQueue(name, IPC_MSG_QUEUE, queuelength)
00031 {
00032     checkSystemMaxValues(queuelength);//check if the limits, provided by the system are sufficiant
00033     mq_attr attr;
00034     std::memset((void *) &attr, 0, sizeof(attr));
00035     attr.mq_msgsize = MSG_DATA_SIZE_MAX; // size of each message
00036     attr.mq_maxmsg = queuelength; // max number of messages
00037     attr.mq_flags = 0; // leave zero for mq_open
00038 
00039     int32_t mask = umask(0); // set the user mask to 0 in order to create the queue with the wanted rights
00040     int32_t oflags = O_CREAT | O_RDWR;
00041     if (blocking == false)
00042     {
00043         oflags |=  O_NONBLOCK; // Add the non blocking flag to the Q config
00044     }
00045     if ((msgQueue_ = mq_open(name.c_str(), oflags, 0666, &attr)) == (mqd_t) - 1)
00046     {
00047         std::string errMsg(std::strerror(errno));
00048         if (errno == EINVAL)
00049         {
00050             errMsg.append("\nRun the process as root!");
00051         }
00052         throw FesaException(__FILE__, __LINE__, FesaErrorCreatingMsgQueue.c_str(), name.c_str(), errMsg.c_str());
00053     }
00054     umask(mask); // restore the user mask
00055 }
00056 
00057 IpcMsgQueue::~IpcMsgQueue()
00058 {
00059     mq_close( msgQueue_);
00060     mq_unlink(getQueueName().c_str());
00061 }
00062 
00063 void IpcMsgQueue::postMsg(AbstractMessage* msg)
00064 {
00065     std::ostringstream stream(std::ios_base::out);
00066     msg->serialize(stream);//since we are a Inter Process Communication queue, we cannot send pointers
00067     if (mq_send(msgQueue_, stream.str().c_str(), MSG_DATA_SIZE_MAX, msg->prio_) == -1)
00068     {
00069         if (errno != EAGAIN)
00070         {
00071             throw FesaException(__FILE__, __LINE__, FesaErrorMsgQueueGeneric.c_str(), "Posting Msg on",
00072                                 getQueueName().c_str(), std::strerror(errno));
00073         }
00074         throw FesaException(__FILE__, __LINE__, FesaErrorQueueLengthExceeded.c_str(), "Posting Msg on",
00075                             getQueueName().c_str());
00076     }
00077     if(logger.isLoggable(CMW::Log::Level::LL_TRACE))
00078     {
00079         std::ostringstream logMsg;
00080         logMsg << "Posted '" << stream.str() << "'";
00081         LOG_TRACE_IF(logger, logMsg.str());
00082     }
00083 }
00084 
00085 AbstractMessage* IpcMsgQueue::consumeMsg()
00086 {
00087     ssize_t length;
00088     char data[MSG_DATA_SIZE_MAX];
00089     // on blocking mode, mq_receive waits, until a message arrives
00090     length = mq_receive(msgQueue_, data, MSG_DATA_SIZE_MAX, 0);
00091     if (length == -1)
00092     {
00093         throw FesaException(__FILE__, __LINE__, FesaErrorMsgQueueGeneric.c_str(), "Consuming Msg on",
00094                             getQueueName().c_str(), std::strerror(errno));
00095     }
00096     else
00097     {
00098         std::string message(data);
00099         std::istringstream stream(message);
00100         if(logger.isLoggable(CMW::Log::Level::LL_TRACE))
00101         {
00102             std::ostringstream logMsg;
00103             logMsg << "Received '" << stream.str() << "'";
00104             LOG_TRACE_IF(logger, logMsg.str());
00105         }
00106         uint32_t type;
00107         // get the type of the message from the stream
00108         stream >> type;
00109         switch (type)
00110         {
00111             case AutomaticNotificationMsg:
00112             {
00113                 return new AutomaticNotificationMessage(stream);
00114                 break;
00115             }
00116             case ManualNotificationMsg:
00117             {
00118                 return new ManualNotificationMessage(stream);
00119                 break;
00120             }
00121             case CommandMsg:
00122             {
00123                 return new CommandMessage(stream);
00124                 break;
00125             }
00126             case OnDemandMsg:
00127             {
00128                 return new OnDemandMessage(stream);
00129                 break;
00130             }
00131             default:
00132             {
00133                 throw FesaException(__FILE__, __LINE__, FesaErrorUnknownMessageType.c_str(), getQueueName().c_str());
00134             }
00135         }
00136     }
00137     // Should never be executed but make gcc happy
00138     return (AbstractMessage*) NULL;
00139 }
00140 
00141 uint32_t IpcMsgQueue::getCurrentMsgCount()
00142 {
00143     mq_attr attrs;
00144 
00145     if (mq_getattr(msgQueue_, &attrs) == -1)
00146     {
00147         throw FesaException(__FILE__, __LINE__, FesaErrorMsgQueueGeneric.c_str(), "Getting size on",
00148                             getQueueName().c_str(), std::strerror(errno));
00149     }
00150 
00151     return static_cast<uint32_t>(attrs.mq_curmsgs);
00152 }
00153 
00154 void IpcMsgQueue::purge()
00155 {
00156     char msg[constants_.msg_size_max_]; // the max msg size
00157     while (getCurrentMsgCount() > 0)
00158     {
00159         if (mq_receive(msgQueue_, msg, constants_.msg_size_max_, NULL) == -1)
00160         {
00161             throw FesaException(__FILE__, __LINE__, FesaErrorMsgQueueGeneric.c_str(), "Purging queue on",
00162                                 getQueueName().c_str(), std::strerror(errno));
00163         }
00164     }
00165 }
00166 
00167 void IpcMsgQueue::checkSystemMaxValues(uint32_t queuelength)
00168 {
00169     bool changed = false;
00170     // ----------------------------------------
00171     // -- 1 -- First we check the system-limits
00172     // ----------------------------------------
00173 
00174     std::ifstream file_max_message_size;
00175     std::ifstream file_max_queue_length;
00176     uint32_t system_max_message_size;
00177     uint32_t system_max_queue_length;
00178 
00179     file_max_message_size.open("/proc/sys/fs/mqueue/msgsize_max", std::ifstream::in);
00180     file_max_message_size >> system_max_message_size;
00181     file_max_message_size.close();
00182 
00183     file_max_queue_length.open("/proc/sys/fs/mqueue/msg_max", std::ifstream::in);
00184     file_max_queue_length >> system_max_queue_length;
00185     file_max_queue_length.close();
00186 
00187     if (system_max_message_size < constants_.msg_size_max_)
00188     {
00189         std::stringstream out;//for conversions
00190         out << constants_.msg_size_max_;
00191         throw FesaException(__FILE__, __LINE__, FesaErrorSystemMessageSizeExceeded.c_str(), out.str().c_str(),
00192                             out.str().c_str(), out.str().c_str());
00193     }
00194 
00195     if (system_max_queue_length < queuelength)
00196     {
00197         std::stringstream out;//for conversions
00198         out << queuelength;
00199         throw FesaException(__FILE__, __LINE__, FesaErrorSystemQueueLengtExceeded.c_str(), out.str().c_str(),
00200                             out.str().c_str(), out.str().c_str());
00201     }
00202 
00203     // ----------------------------------------
00204     // -- 2 -- Now we check (and fix) the soft and hard-limit of our process
00205     // ----------------------------------------
00206 
00207     //get the current limits for this process
00208     rlimit currentLimit;
00209     getrlimit(RLIMIT_MSGQUEUE,&currentLimit);//check if limit is sufficient
00210 
00211     //calculate the needed limits
00212     uint32_t neededLimit = queuelength * static_cast<uint32_t>(sizeof(struct msg_msg *)) + queuelength * MSG_DATA_SIZE_MAX;
00213 
00214     if(currentLimit.rlim_max <= neededLimit)
00215     {
00216         currentLimit.rlim_max = neededLimit;
00217         changed = true;
00218     }
00219     if(currentLimit.rlim_cur <= neededLimit)
00220     {
00221         currentLimit.rlim_cur = neededLimit;
00222         changed = true;
00223     }
00224     if(changed)
00225     {
00226         if(setrlimit(RLIMIT_MSGQUEUE,&currentLimit) == -1)//check
00227         {
00228             throw FesaException(__FILE__, __LINE__, FesaErrorMsgQueueGeneric.c_str(), "Error while setting process",
00229                                 "posix-message-queue-memory hard/soft-limit(rlim_max/rlim_cur). Operation 'setrlimit' could not been performed. Try to login as root, or to give this process the CAP_SYS_RESOURCE capability.",std::strerror(errno));
00230         }
00231     }
00232 }
00233 
00234 } // fesa

Generated on 18 Jan 2013 for Fesa by  doxygen 1.6.1