00001
00002
00003 #include <fesa-core/Core/IpcMsgQueue.h>
00004
00005 #include <fesa-core/Core/MessageTypes.h>
00006 #include <fesa-core/Exception/FesaException.h>
00007
00008 #include <cmw-log/Logger.h>
00009
00010 #include <fstream>
00011 #include <sstream>
00012 #include <cstring>
00013 #include <cerrno>
00014 #include <sys/resource.h>
00015 #include <sys/stat.h>
00016
00017
00018 namespace
00019 {
00020
00021 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.Core.IpcMsgQueue");
00022
00023 }
00024
00025
00026 namespace fesa
00027 {
00028
00029 IpcMsgQueue::IpcMsgQueue(const std::string& name, uint32_t queuelength, bool blocking) :
00030 AbstractMsgQueue(name, IPC_MSG_QUEUE, queuelength)
00031 {
00032 checkSystemMaxValues(queuelength);
00033 mq_attr attr;
00034 std::memset((void *) &attr, 0, sizeof(attr));
00035 attr.mq_msgsize = MSG_DATA_SIZE_MAX;
00036 attr.mq_maxmsg = queuelength;
00037 attr.mq_flags = 0;
00038
00039 int32_t mask = umask(0);
00040 int32_t oflags = O_CREAT | O_RDWR;
00041 if (blocking == false)
00042 {
00043 oflags |= O_NONBLOCK;
00044 }
00045 if ((msgQueue_ = mq_open(name.c_str(), oflags, 0666, &attr)) == (mqd_t) - 1)
00046 {
00047 std::string errMsg(std::strerror(errno));
00048 if (errno == EINVAL)
00049 {
00050 errMsg.append("\nRun the process as root!");
00051 }
00052 throw FesaException(__FILE__, __LINE__, FesaErrorCreatingMsgQueue.c_str(), name.c_str(), errMsg.c_str());
00053 }
00054 umask(mask);
00055 }
00056
00057 IpcMsgQueue::~IpcMsgQueue()
00058 {
00059 mq_close( msgQueue_);
00060 mq_unlink(getQueueName().c_str());
00061 }
00062
00063 void IpcMsgQueue::postMsg(AbstractMessage* msg)
00064 {
00065 std::ostringstream stream(std::ios_base::out);
00066 msg->serialize(stream);
00067 if (mq_send(msgQueue_, stream.str().c_str(), MSG_DATA_SIZE_MAX, msg->prio_) == -1)
00068 {
00069 if (errno != EAGAIN)
00070 {
00071 throw FesaException(__FILE__, __LINE__, FesaErrorMsgQueueGeneric.c_str(), "Posting Msg on",
00072 getQueueName().c_str(), std::strerror(errno));
00073 }
00074 throw FesaException(__FILE__, __LINE__, FesaErrorQueueLengthExceeded.c_str(), "Posting Msg on",
00075 getQueueName().c_str());
00076 }
00077 if(logger.isLoggable(CMW::Log::Level::LL_TRACE))
00078 {
00079 std::ostringstream logMsg;
00080 logMsg << "Posted '" << stream.str() << "'";
00081 LOG_TRACE_IF(logger, logMsg.str());
00082 }
00083 }
00084
00085 AbstractMessage* IpcMsgQueue::consumeMsg()
00086 {
00087 ssize_t length;
00088 char data[MSG_DATA_SIZE_MAX];
00089
00090 length = mq_receive(msgQueue_, data, MSG_DATA_SIZE_MAX, 0);
00091 if (length == -1)
00092 {
00093 throw FesaException(__FILE__, __LINE__, FesaErrorMsgQueueGeneric.c_str(), "Consuming Msg on",
00094 getQueueName().c_str(), std::strerror(errno));
00095 }
00096 else
00097 {
00098 std::string message(data);
00099 std::istringstream stream(message);
00100 if(logger.isLoggable(CMW::Log::Level::LL_TRACE))
00101 {
00102 std::ostringstream logMsg;
00103 logMsg << "Received '" << stream.str() << "'";
00104 LOG_TRACE_IF(logger, logMsg.str());
00105 }
00106 uint32_t type;
00107
00108 stream >> type;
00109 switch (type)
00110 {
00111 case AutomaticNotificationMsg:
00112 {
00113 return new AutomaticNotificationMessage(stream);
00114 break;
00115 }
00116 case ManualNotificationMsg:
00117 {
00118 return new ManualNotificationMessage(stream);
00119 break;
00120 }
00121 case CommandMsg:
00122 {
00123 return new CommandMessage(stream);
00124 break;
00125 }
00126 case OnDemandMsg:
00127 {
00128 return new OnDemandMessage(stream);
00129 break;
00130 }
00131 default:
00132 {
00133 throw FesaException(__FILE__, __LINE__, FesaErrorUnknownMessageType.c_str(), getQueueName().c_str());
00134 }
00135 }
00136 }
00137
00138 return (AbstractMessage*) NULL;
00139 }
00140
00141 uint32_t IpcMsgQueue::getCurrentMsgCount()
00142 {
00143 mq_attr attrs;
00144
00145 if (mq_getattr(msgQueue_, &attrs) == -1)
00146 {
00147 throw FesaException(__FILE__, __LINE__, FesaErrorMsgQueueGeneric.c_str(), "Getting size on",
00148 getQueueName().c_str(), std::strerror(errno));
00149 }
00150
00151 return static_cast<uint32_t>(attrs.mq_curmsgs);
00152 }
00153
00154 void IpcMsgQueue::purge()
00155 {
00156 char msg[constants_.msg_size_max_];
00157 while (getCurrentMsgCount() > 0)
00158 {
00159 if (mq_receive(msgQueue_, msg, constants_.msg_size_max_, NULL) == -1)
00160 {
00161 throw FesaException(__FILE__, __LINE__, FesaErrorMsgQueueGeneric.c_str(), "Purging queue on",
00162 getQueueName().c_str(), std::strerror(errno));
00163 }
00164 }
00165 }
00166
00167 void IpcMsgQueue::checkSystemMaxValues(uint32_t queuelength)
00168 {
00169 bool changed = false;
00170
00171
00172
00173
00174 std::ifstream file_max_message_size;
00175 std::ifstream file_max_queue_length;
00176 uint32_t system_max_message_size;
00177 uint32_t system_max_queue_length;
00178
00179 file_max_message_size.open("/proc/sys/fs/mqueue/msgsize_max", std::ifstream::in);
00180 file_max_message_size >> system_max_message_size;
00181 file_max_message_size.close();
00182
00183 file_max_queue_length.open("/proc/sys/fs/mqueue/msg_max", std::ifstream::in);
00184 file_max_queue_length >> system_max_queue_length;
00185 file_max_queue_length.close();
00186
00187 if (system_max_message_size < constants_.msg_size_max_)
00188 {
00189 std::stringstream out;
00190 out << constants_.msg_size_max_;
00191 throw FesaException(__FILE__, __LINE__, FesaErrorSystemMessageSizeExceeded.c_str(), out.str().c_str(),
00192 out.str().c_str(), out.str().c_str());
00193 }
00194
00195 if (system_max_queue_length < queuelength)
00196 {
00197 std::stringstream out;
00198 out << queuelength;
00199 throw FesaException(__FILE__, __LINE__, FesaErrorSystemQueueLengtExceeded.c_str(), out.str().c_str(),
00200 out.str().c_str(), out.str().c_str());
00201 }
00202
00203
00204
00205
00206
00207
00208 rlimit currentLimit;
00209 getrlimit(RLIMIT_MSGQUEUE,¤tLimit);
00210
00211
00212 uint32_t neededLimit = queuelength * static_cast<uint32_t>(sizeof(struct msg_msg *)) + queuelength * MSG_DATA_SIZE_MAX;
00213
00214 if(currentLimit.rlim_max <= neededLimit)
00215 {
00216 currentLimit.rlim_max = neededLimit;
00217 changed = true;
00218 }
00219 if(currentLimit.rlim_cur <= neededLimit)
00220 {
00221 currentLimit.rlim_cur = neededLimit;
00222 changed = true;
00223 }
00224 if(changed)
00225 {
00226 if(setrlimit(RLIMIT_MSGQUEUE,¤tLimit) == -1)
00227 {
00228 throw FesaException(__FILE__, __LINE__, FesaErrorMsgQueueGeneric.c_str(), "Error while setting process",
00229 "posix-message-queue-memory hard/soft-limit(rlim_max/rlim_cur). Operation 'setrlimit' could not been performed. Try to login as root, or to give this process the CAP_SYS_RESOURCE capability.",std::strerror(errno));
00230 }
00231 }
00232 }
00233
00234 }