OnSubscriptionEventSource.cpp

Go to the documentation of this file.
00001 // Copyright CERN 2012 - Developed in collaboration with GSI
00002 
00003 #include <fesa-core/RealTime/OnSubscriptionEventSource.h>
00004 #include <fesa-core/RealTime/OnSubscriptionRTEventPayload.h>
00005 #include <fesa-core/Utilities/ParserElements/EventElement.h>
00006 #include <fesa-core/Synchronization/NoneContext.h>
00007 #include <fesa-core/Synchronization/TimingContext.h>
00008 #include <fesa-core/Synchronization/SynchronizationLabObjectFactory.h>
00009 #include <fesa-core/Core/AbstractEquipment.h>
00010 #include <fesa-core/Utilities/ParserElements/ParserElementDefs.h>
00011 #include <fesa-core/RealTime/RTEvent.h>
00012 #include <cmw-log/Logger.h>
00013 
00014 
00015 namespace
00016 {
00017 
00018 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.RealTime.OnSubscriptionEventSource");
00019 
00020 } // namespace
00021 
00022 
00023 namespace fesa
00024 {
00025 
00026 OnSubscriptionEventSource::OnSubscriptionEventSource() :
00027     AbstractEventSource(ON_SUBSCRIPTION_EVENT_SOURCE_NAME, OnSubscriptionSource)
00028 {
00029     itr1_ = onSubscriptionEventMapping_.begin();
00030     itr2_ = onSubscriptionEventMapping_.end();
00031     IsSubscriptionStarted_ = false;
00032     subscriptionThread_ = NULL;
00033 }
00034 
00035 OnSubscriptionEventSource::~OnSubscriptionEventSource()
00036 {
00037     if (subscriptionThread_ != NULL)
00038         deleteSubscriptionThread();
00039 }
00040 
00041 void OnSubscriptionEventSource::connect(boost::shared_ptr<fesa::EventElement>& eventElement) // For the time being onChange is always false and subscription to ALL
00042 {
00043     std::string eventName = eventElement->getConcreteName();
00044     std::string deviceName = eventElement->getSourceTypeSpecificData(ON_SUBSCRIPTION_TAG_DEVICE);
00045     std::string propertyName = eventElement->getSourceTypeSpecificData(ON_SUBSCRIPTION_TAG_PROPERTY);
00046     std::pair<std::string, std::string> tmp(deviceName, propertyName);
00047     onSubscriptionEventMapping_.insert(std::pair<std::pair<std::string, std::string>, std::string>(tmp, eventName));
00048     pendingSubscriptions_.insert(std::pair<std::string, std::string>(deviceName, propertyName));
00049 }
00050 
00051 RTEvent* OnSubscriptionEventSource::wait()
00052 {
00053     if (!IsSubscriptionStarted_) // if it the first time
00054     {
00055         if (!startSubscriptions()) // If some of the subscriptions failed
00056         {
00057             // create a thread that every 5 second retries the connection of the remaining subscriptions
00058             createSubscriptionThread();
00059         }
00060     }
00061 
00062     if (itr1_ == itr2_)
00063     {
00064         eventInfo_.data.removeAll();
00065         proxy_.waitNotification(eventInfo_.deviceName, eventInfo_.propertyName, eventInfo_.cycleSelectorName,
00066                                 eventInfo_.data);
00067         std::pair<OnSubscriptionEventMapping::iterator, OnSubscriptionEventMapping::iterator> itr =
00068             onSubscriptionEventMapping_.equal_range(std::pair<std::string, std::string>(
00069                                                         eventInfo_.deviceName, eventInfo_.propertyName));
00070         itr1_ = itr.first;
00071         itr2_ = itr.second;
00072     }
00073 
00074     if ((eventInfo_.data.contains("updateFlags"))) // Inmediate update should be ignored
00075     {
00076         std::string msg = " Inmediate update; event should be ignored";
00077         ++itr1_;
00078         return NULL;//TODO Returning NULL is not nice. Better would be to have the RTEvent as parameter
00079     }
00080 
00081     OnSubscriptionRTEventPayload* pRTPayload = new OnSubscriptionRTEventPayload(eventInfo_.deviceName,
00082                                                                                 eventInfo_.propertyName, eventInfo_.cycleSelectorName, eventInfo_.data);
00083 
00084     // check if the subscription is coming from a multiplexed property or not
00085 
00086     MultiplexingContext* context;
00087 
00088     if (pRTPayload->rdaData_.contains("cycleStamp")) //subscription done to a timing related action
00089     {
00090         int64_t cycleStamp = pRTPayload->rdaData_.extractLong("cycleStamp");
00091         try
00092         {
00093             std::string cycleName = pRTPayload->rdaData_.extractString("cycleName");
00094             context = SynchronizationLabObjectFactory::getInstance()->createTimingContext(cycleStamp, cycleName);
00095 
00096         }
00097         catch (FesaException& ex)
00098         {
00099             delete pRTPayload;
00100             throw;//will be caught & logged in the AbstractEventSource
00101         }
00102     }
00103     else // subscription done to a non-timing related action
00104     {
00105         context = new NoneContext();
00106     }
00107 
00108     RTEvent* pRTEvt = new RTEvent(itr1_->second, context, this, pRTPayload);
00109     ++itr1_;
00110     return pRTEvt;
00111 }
00112 
00113 bool OnSubscriptionEventSource::startSubscriptions()
00114 {
00115     IsSubscriptionStarted_ = true;
00116     OnSubscriptionEventMapping::iterator itr;
00117     for (itr = onSubscriptionEventMapping_.begin(); itr != onSubscriptionEventMapping_.end(); ++itr)
00118     {
00119         try
00120         {
00121             proxy_.subscribe(itr->first.first, itr->first.second);
00122             pendingSubscriptions_.erase(std::pair<std::string, std::string>(itr->first.first, itr->first.second));
00123         }
00124         catch (FesaException& ex)
00125         {
00126             LOG_INFO_IF(logger, ex.getMessage());
00127         }
00128     }
00129     if (pendingSubscriptions_.empty())
00130     {
00131         return true;
00132     }
00133     else
00134     {
00135         return false;
00136     }
00137 }
00138 
00139 void OnSubscriptionEventSource::createSubscriptionThread()
00140 {
00141     subscriptionThread_ = new SubscriptionThread(this);
00142     subscriptionThread_->setPriority(getPriority());
00143     subscriptionThread_->start(false, "SubscriptionThread");
00144 }
00145 
00146 void OnSubscriptionEventSource::deleteSubscriptionThread()
00147 {
00148     subscriptionThread_->stop();
00149     delete subscriptionThread_;
00150 }
00151 
00152 OnSubscriptionEventSource::SubscriptionThread::SubscriptionThread(OnSubscriptionEventSource* evtSrc)
00153 {
00154     evtSrc_ = evtSrc;
00155 }
00156 
00157 OnSubscriptionEventSource::SubscriptionThread::~SubscriptionThread()
00158 {
00159 
00160 }
00161 
00162 void OnSubscriptionEventSource::SubscriptionThread::run()
00163 {
00164     while (!evtSrc_->pendingSubscriptions_.empty())
00165     {
00166         for (PendingSubscriptions::iterator itr = evtSrc_->pendingSubscriptions_.begin(); itr
00167                  != evtSrc_->pendingSubscriptions_.end(); ++itr)
00168         {
00169             try
00170             {
00171                 evtSrc_->proxy_.subscribe(itr->first, itr->second);
00172                 evtSrc_->pendingSubscriptions_.erase(std::pair<std::string, std::string>(itr->first, itr->second));
00173             }
00174             catch (FesaException& ex)
00175             {
00176                 LOG_INFO_IF(logger, ex.getMessage());
00177             }
00178         }
00179         struct timespec req;
00180         req.tv_sec = 5;
00181         req.tv_nsec = 0;
00182         int32_t ret;
00183         do
00184         {
00185             ret = nanosleep(&req, &req);
00186         }
00187         while ((ret == -1) && (errno == EINTR));
00188     }
00189     evtSrc_->deleteSubscriptionThread();
00190 }
00191 
00192 } // fesa

Generated on 18 Jan 2013 for Fesa by  doxygen 1.6.1