PersistencyManager.cpp
Go to the documentation of this file.00001
00002
00003 #include <fesa-core/Persistency/PersistencyManager.h>
00004
00005 #include <fesa-core/Persistency/PersistencyUnit.h>
00006 #include <fesa-core/Core/AbstractEquipment.h>
00007 #include <fesa-core/Core/ThreadPriorityConfiguration.h>
00008 #include <fesa-core/DataStore/GlobalDevice.h>
00009 #include <fesa-core/Exception/FesaException.h>
00010 #include <fesa-core/Utilities/Lock.h>
00011 #include <fesa-core/Utilities/ProcessConfiguration.h>
00012 #include <fesa-core/Utilities/XMLParser.h>
00013
00014 #include <cmw-log/Logger.h>
00015
00016 #include <iostream>
00017
00018
00019 namespace
00020 {
00021
00022 CMW::Log::Logger& logger = CMW::Log::LoggerFactory::getLogger("FESA.FWK.fesa-core.Persistency.PersistencyManager");
00023
00024 }
00025
00026
00027 namespace fesa
00028 {
00029
00030 PersistencyManager* PersistencyManager::theInstance_ = NULL;
00031
00032 void PersistencyManager::registerPersistencyUnit(PersistencyUnit * pPersistencyUnit)
00033 {
00034
00035 const std::string& name = pPersistencyUnit->getGlobalDevice().className.getAsString();
00036 persistencyUnitsCol_.insert(std::pair<const std::string, PersistencyUnit *>(name, pPersistencyUnit));
00037 }
00038
00039 PersistencyManager::PersistencyManager() :
00040 isStarted_(false), counterOfTriggers_(0)
00041 {
00042 persistencyMaxDelay_ = AbstractEquipment::getInstance()->getProcessConfiguration()->getIntValue(PropertyTag::MAX_PERSISTENCY_DELAY);
00043 }
00044
00045 PersistencyManager::~PersistencyManager()
00046 {
00047
00048 for (std::map<const std::string, PersistencyUnit*>::iterator p = persistencyUnitsCol_.begin(); p != persistencyUnitsCol_.end(); ++p)
00049 {
00050 delete p->second;
00051 }
00052 }
00053
00054 PersistencyManager* PersistencyManager::getInstance()
00055 {
00056 if (!theInstance_)
00057 {
00058 theInstance_ = new PersistencyManager();
00059 }
00060 return theInstance_;
00061 }
00062
00063 void PersistencyManager::start()
00064 {
00065 {
00066 AbstractEquipment* eqp = AbstractEquipment::getInstance();
00067 std::string instanceFile = eqp->getDeviceDataFileName();
00068 XMLParser xmlParser(instanceFile, false);
00069 ThreadPriorityConfigurationFromFile conf(xmlParser, eqp->getProcessConfiguration());
00070 int32_t prio = conf.getPrioPersistence();
00071 setPriority(prio);
00072 }
00073
00074 if (!isStarted_)
00075 {
00076 const std::string threadName("PersistencyManager");
00077 Thread::start(false, threadName);
00078 isStarted_ = true;
00079 }
00080 }
00081
00082 void PersistencyManager::trigger(const std::string& className)
00083 {
00084 std::map<const std::string, PersistencyUnit*>::iterator pos = persistencyUnitsCol_.find(className);
00085 if (pos == persistencyUnitsCol_.end())
00086 {
00087 return;
00088 }
00089 {
00090 Lock lock(accessMutex_);
00091 persistencySet_.insert(persistencyUnitsCol_[className]);
00092 counterOfTriggers_++;
00093 }
00094 triggerCondVar_.signal();
00095 }
00096
00097 void PersistencyManager::run()
00098 {
00099 bool persistencyTimeout = false;
00100 uint32_t persistencyDelay = 0;
00101 uint32_t avalancheTimeout = 30;
00102
00103 struct timespec delay, time_left_before_wakeup;
00104 delay.tv_sec = avalancheTimeout;
00105 delay.tv_nsec = 0;
00106
00107 while (isRunning_)
00108 {
00109 {
00110 Lock lock(accessMutex_);
00111 while (persistencySet_.size() == 0)
00112 {
00113 triggerCondVar_.wait(accessMutex_);
00114 }
00115
00116 }
00117 uint32_t lastCounterValue = 0;
00118 do
00119 {
00120 lastCounterValue = counterOfTriggers_;
00121
00122 nanosleep(&delay, &time_left_before_wakeup);
00123 persistencyDelay += avalancheTimeout;
00124
00125
00126
00127 if (persistencyDelay > persistencyMaxDelay_)
00128 {
00129 persistencyDelay = 0;
00130 persistencyTimeout = true;
00131 }
00132 }
00133 while ((lastCounterValue != counterOfTriggers_) && (persistencyTimeout == false));
00134 {
00135 Lock lock(accessMutex_);
00136 persistencyTimeout = false;
00137 counterOfTriggers_ = 0;
00138 while(persistencySet_.size() > 0)
00139 {
00140
00141 std::set<PersistencyUnit*>::iterator puItr = persistencySet_.begin();
00142 PersistencyUnit* pu = (*puItr);
00143 persistencySet_.erase(puItr);
00144 try
00145 {
00146 storeManager_.store(*pu);
00147 }
00148 catch (FesaException& e)
00149 {
00150 LOG_ERROR_IF(logger, e.getMessage());
00151 }
00152 }
00153 }
00154 }
00155 hasFinished_ = true;
00156 }
00157
00158 }