OnSubscriptionEventSource.cpp
Go to the documentation of this file.00001
00002
00003 #include <fesa-core/RealTime/OnSubscriptionEventSource.h>
00004 #include <fesa-core/RealTime/OnSubscriptionRTEventPayload.h>
00005 #include <fesa-core/Utilities/ParserElements/EventElement.h>
00006 #include <fesa-core/Synchronization/NoneContext.h>
00007 #include <fesa-core/Synchronization/TimingContext.h>
00008 #include <fesa-core/Synchronization/SynchronizationLabObjectFactory.h>
00009 #include <fesa-core/Core/AbstractEquipment.h>
00010 #include <fesa-core/Utilities/ParserElements/ParserElementDefs.h>
00011 #include <fesa-core/RealTime/RTEvent.h>
00012 #include <cmw-log/Logger.h>
00013
00014
00015 namespace
00016 {
00017
00018 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.RealTime.OnSubscriptionEventSource");
00019
00020 }
00021
00022
00023 namespace fesa
00024 {
00025
00026 OnSubscriptionEventSource::OnSubscriptionEventSource() :
00027 AbstractEventSource(ON_SUBSCRIPTION_EVENT_SOURCE_NAME, OnSubscriptionSource)
00028 {
00029 itr1_ = onSubscriptionEventMapping_.begin();
00030 itr2_ = onSubscriptionEventMapping_.end();
00031 IsSubscriptionStarted_ = false;
00032 subscriptionThread_ = NULL;
00033 }
00034
00035 OnSubscriptionEventSource::~OnSubscriptionEventSource()
00036 {
00037 if (subscriptionThread_ != NULL)
00038 deleteSubscriptionThread();
00039 }
00040
00041 void OnSubscriptionEventSource::connect(boost::shared_ptr<fesa::EventElement>& eventElement)
00042 {
00043 std::string eventName = eventElement->getConcreteName();
00044 std::string deviceName = eventElement->getSourceTypeSpecificData(ON_SUBSCRIPTION_TAG_DEVICE);
00045 std::string propertyName = eventElement->getSourceTypeSpecificData(ON_SUBSCRIPTION_TAG_PROPERTY);
00046 std::pair<std::string, std::string> tmp(deviceName, propertyName);
00047 onSubscriptionEventMapping_.insert(std::pair<std::pair<std::string, std::string>, std::string>(tmp, eventName));
00048 pendingSubscriptions_.insert(std::pair<std::string, std::string>(deviceName, propertyName));
00049 }
00050
00051 RTEvent* OnSubscriptionEventSource::wait()
00052 {
00053 if (!IsSubscriptionStarted_)
00054 {
00055 if (!startSubscriptions())
00056 {
00057
00058 createSubscriptionThread();
00059 }
00060 }
00061
00062 if (itr1_ == itr2_)
00063 {
00064 eventInfo_.data.removeAll();
00065 proxy_.waitNotification(eventInfo_.deviceName, eventInfo_.propertyName, eventInfo_.cycleSelectorName,
00066 eventInfo_.data);
00067 std::pair<OnSubscriptionEventMapping::iterator, OnSubscriptionEventMapping::iterator> itr =
00068 onSubscriptionEventMapping_.equal_range(std::pair<std::string, std::string>(
00069 eventInfo_.deviceName, eventInfo_.propertyName));
00070 itr1_ = itr.first;
00071 itr2_ = itr.second;
00072 }
00073
00074 if ((eventInfo_.data.contains("updateFlags")))
00075 {
00076 std::string msg = " Inmediate update; event should be ignored";
00077 ++itr1_;
00078 return NULL;
00079 }
00080
00081 OnSubscriptionRTEventPayload* pRTPayload = new OnSubscriptionRTEventPayload(eventInfo_.deviceName,
00082 eventInfo_.propertyName, eventInfo_.cycleSelectorName, eventInfo_.data);
00083
00084
00085
00086 MultiplexingContext* context;
00087
00088 if (pRTPayload->rdaData_.contains("cycleStamp"))
00089 {
00090 int64_t cycleStamp = pRTPayload->rdaData_.extractLong("cycleStamp");
00091 try
00092 {
00093 std::string cycleName = pRTPayload->rdaData_.extractString("cycleName");
00094 context = SynchronizationLabObjectFactory::getInstance()->createTimingContext(cycleStamp, cycleName);
00095
00096 }
00097 catch (FesaException& ex)
00098 {
00099 delete pRTPayload;
00100 throw;
00101 }
00102 }
00103 else
00104 {
00105 context = new NoneContext();
00106 }
00107
00108 RTEvent* pRTEvt = new RTEvent(itr1_->second, context, this, pRTPayload);
00109 ++itr1_;
00110 return pRTEvt;
00111 }
00112
00113 bool OnSubscriptionEventSource::startSubscriptions()
00114 {
00115 IsSubscriptionStarted_ = true;
00116 OnSubscriptionEventMapping::iterator itr;
00117 for (itr = onSubscriptionEventMapping_.begin(); itr != onSubscriptionEventMapping_.end(); ++itr)
00118 {
00119 try
00120 {
00121 proxy_.subscribe(itr->first.first, itr->first.second);
00122 pendingSubscriptions_.erase(std::pair<std::string, std::string>(itr->first.first, itr->first.second));
00123 }
00124 catch (FesaException& ex)
00125 {
00126 LOG_INFO_IF(logger, ex.getMessage());
00127 }
00128 }
00129 if (pendingSubscriptions_.empty())
00130 {
00131 return true;
00132 }
00133 else
00134 {
00135 return false;
00136 }
00137 }
00138
00139 void OnSubscriptionEventSource::createSubscriptionThread()
00140 {
00141 subscriptionThread_ = new SubscriptionThread(this);
00142 subscriptionThread_->setPriority(getPriority());
00143 subscriptionThread_->start(false, "SubscriptionThread");
00144 }
00145
00146 void OnSubscriptionEventSource::deleteSubscriptionThread()
00147 {
00148 subscriptionThread_->stop();
00149 delete subscriptionThread_;
00150 }
00151
00152 OnSubscriptionEventSource::SubscriptionThread::SubscriptionThread(OnSubscriptionEventSource* evtSrc)
00153 {
00154 evtSrc_ = evtSrc;
00155 }
00156
00157 OnSubscriptionEventSource::SubscriptionThread::~SubscriptionThread()
00158 {
00159
00160 }
00161
00162 void OnSubscriptionEventSource::SubscriptionThread::run()
00163 {
00164 while (!evtSrc_->pendingSubscriptions_.empty())
00165 {
00166 for (PendingSubscriptions::iterator itr = evtSrc_->pendingSubscriptions_.begin(); itr
00167 != evtSrc_->pendingSubscriptions_.end(); ++itr)
00168 {
00169 try
00170 {
00171 evtSrc_->proxy_.subscribe(itr->first, itr->second);
00172 evtSrc_->pendingSubscriptions_.erase(std::pair<std::string, std::string>(itr->first, itr->second));
00173 }
00174 catch (FesaException& ex)
00175 {
00176 LOG_INFO_IF(logger, ex.getMessage());
00177 }
00178 }
00179 struct timespec req;
00180 req.tv_sec = 5;
00181 req.tv_nsec = 0;
00182 int32_t ret;
00183 do
00184 {
00185 ret = nanosleep(&req, &req);
00186 }
00187 while ((ret == -1) && (errno == EINTR));
00188 }
00189 evtSrc_->deleteSubscriptionThread();
00190 }
00191
00192 }