PersistencyManager.cpp

Go to the documentation of this file.
00001 // Copyright CERN 2012 - Developed in collaboration with GSI
00002 
00003 #include <fesa-core/Persistency/PersistencyManager.h>
00004 
00005 #include <fesa-core/Persistency/PersistencyUnit.h>
00006 #include <fesa-core/Core/AbstractEquipment.h>
00007 #include <fesa-core/Core/ThreadPriorityConfiguration.h>
00008 #include <fesa-core/DataStore/GlobalDevice.h>
00009 #include <fesa-core/Exception/FesaException.h>
00010 #include <fesa-core/Utilities/Lock.h>
00011 #include <fesa-core/Utilities/ProcessConfiguration.h>
00012 #include <fesa-core/Utilities/XMLParser.h>
00013 
00014 #include <cmw-log/Logger.h>
00015 
00016 #include <iostream>
00017 
00018 
00019 namespace
00020 {
00021 
00022 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.Persistency.PersistencyManager");
00023 
00024 }
00025 
00026 
00027 namespace fesa
00028 {
00029 
00030 PersistencyManager* PersistencyManager::theInstance_ = NULL;
00031 
00032 void PersistencyManager::registerPersistencyUnit(PersistencyUnit * pPersistencyUnit)
00033 {
00034 
00035     const std::string& name = pPersistencyUnit->getGlobalDevice().className.getAsString();
00036     persistencyUnitsCol_.insert(std::pair<const std::string, PersistencyUnit *>(name, pPersistencyUnit));
00037 }
00038 
00039 PersistencyManager::PersistencyManager() :
00040     isStarted_(false), counterOfTriggers_(0)
00041 {
00042     persistencyMaxDelay_ = AbstractEquipment::getInstance()->getProcessConfiguration()->getIntValue(PropertyTag::MAX_PERSISTENCY_DELAY);
00043 }
00044 
00045 PersistencyManager::~PersistencyManager()
00046 {
00047 
00048     for (std::map<const std::string, PersistencyUnit*>::iterator p = persistencyUnitsCol_.begin(); p != persistencyUnitsCol_.end(); ++p)
00049     {
00050         delete p->second;
00051     }
00052 }
00053 
00054 PersistencyManager* PersistencyManager::getInstance()
00055 {
00056     if (!theInstance_)
00057     {
00058         theInstance_ = new PersistencyManager();
00059     }
00060     return theInstance_;
00061 }
00062 
00063 void PersistencyManager::start()
00064 {
00065     {
00066         AbstractEquipment* eqp = AbstractEquipment::getInstance();
00067         std::string instanceFile = eqp->getDeviceDataFileName();
00068         XMLParser xmlParser(instanceFile, false);
00069         ThreadPriorityConfigurationFromFile conf(xmlParser, eqp->getProcessConfiguration());
00070         int32_t prio =  conf.getPrioPersistence();
00071         setPriority(prio);
00072     }
00073 
00074     if (!isStarted_)
00075     {
00076         const std::string threadName("PersistencyManager");
00077         Thread::start(false, threadName);
00078         isStarted_ = true;
00079     }
00080 }
00081 
00082 void PersistencyManager::trigger(const std::string& className)
00083 {
00084     std::map<const std::string, PersistencyUnit*>::iterator pos = persistencyUnitsCol_.find(className);
00085     if (pos == persistencyUnitsCol_.end())
00086     {
00087         return;
00088     }
00089     {
00090         Lock lock(accessMutex_);
00091         persistencySet_.insert(persistencyUnitsCol_[className]);
00092         counterOfTriggers_++;
00093     }
00094     triggerCondVar_.signal();
00095 }
00096 
00097 void PersistencyManager::run()
00098 {
00099     bool persistencyTimeout = false;
00100     uint32_t persistencyDelay = 0;
00101     uint32_t avalancheTimeout = 30;
00102 
00103     struct timespec delay, time_left_before_wakeup;
00104     delay.tv_sec = avalancheTimeout;
00105     delay.tv_nsec = 0;
00106 
00107     while (isRunning_)
00108     {
00109         {
00110             Lock lock(accessMutex_);
00111             while (persistencySet_.size() == 0)
00112             {
00113                 triggerCondVar_.wait(accessMutex_);
00114             }
00115             // timeout in order to protect against an avalanche of changes
00116         }
00117         uint32_t lastCounterValue = 0;
00118         do
00119         {
00120             lastCounterValue = counterOfTriggers_;
00121             // wait avalance timeout before triggering the persistency
00122             nanosleep(&delay, &time_left_before_wakeup);
00123             persistencyDelay += avalancheTimeout;
00124             /* In case of periodic storeFieldssettings, we could never exit from this avalanche filtering loop.
00125              * To prevent such a situation (no storing any more), we define a maximum delay (~5mn).
00126              */
00127             if (persistencyDelay > persistencyMaxDelay_)
00128             {
00129                 persistencyDelay = 0;
00130                 persistencyTimeout = true;
00131             }
00132         }
00133         while ((lastCounterValue != counterOfTriggers_) && (persistencyTimeout == false));
00134         {
00135             Lock lock(accessMutex_);
00136             persistencyTimeout = false;
00137             counterOfTriggers_ = 0;
00138             while(persistencySet_.size() > 0)
00139             {
00140                 // extract the first element
00141                 std::set<PersistencyUnit*>::iterator puItr = persistencySet_.begin();
00142                 PersistencyUnit* pu = (*puItr);
00143                 persistencySet_.erase(puItr);
00144                 try
00145                 {
00146                     storeManager_.store(*pu);
00147                 }
00148                 catch (FesaException& e)
00149                 {
00150                     LOG_ERROR_IF(logger, e.getMessage());
00151                 }
00152             }
00153         }
00154     }
00155     hasFinished_ = true;
00156 }
00157 
00158 } // fesa

Generated on 18 Jan 2013 for Fesa by  doxygen 1.6.1