logreplay_thread.cpp

00001 
00002 /***************************************************************************
00003  *  logreplay_thread.cpp - BB Log Replay Thread
00004  *
00005  *  Created: Wed Feb 17 01:53:00 2010
00006  *  Copyright  2010  Tim Niemueller [www.niemueller.de]
00007  *             2010  Masrur Doostdar <doostdar@kbsg.rwth-aachen.de>
00008  *
00009  ****************************************************************************/
00010 
00011 /*  This program is free software; you can redistribute it and/or modify
00012  *  it under the terms of the GNU General Public License as published by
00013  *  the Free Software Foundation; either version 2 of the License, or
00014  *  (at your option) any later version.
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL file in the doc directory.
00022  */
00023 
00024 #include "logreplay_thread.h"
00025 #include "file.h"
00026 
00027 #include <blackboard/blackboard.h>
00028 #include <utils/logging/logger.h>
00029 #include <core/threading/wait_condition.h>
00030 #include <core/exceptions/system.h>
00031 #include <utils/misc/autofree.h>
00032 
00033 #include <blackboard/internal/instance_factory.h>
00034 
00035 
00036 #include <memory>
00037 #include <cstring>
00038 #include <cstdlib>
00039 #include <cstdio>
00040 #include <cerrno>
00041 #include <fcntl.h>
00042 #ifdef __FreeBSD__
00043 #  include <sys/endian.h>
00044 #else
00045 #  include <endian.h>
00046 #endif
00047 #include <arpa/inet.h>
00048 #include <sys/mman.h>
00049 
00050 using namespace fawkes;
00051 
00052 /** @class BBLogReplayThread "logreplay_thread.h"
00053  * BlackBoard log Replay thread.
00054  * Writes the data of the logfile into a blackboard interface, considering the
00055  * time-step differences between the data.
00056  * @author Masrur Doostdar
00057  * @author Tim Niemueller
00058  */
00059 
00060 /** Constructor.
00061  * @param logfile_name filename of the log to be replayed
00062  * @param logdir directory containing the logfile
00063  * @param scenario ID of the log scenario
00064  * @param grace_period time in seconds that desired offset and loop offset may
00065  * diverge to still write the new data
00066  * @param loop_replay specifies if the replay should be looped
00067  * @param non_blocking do not block the main loop if not enough time has elapsed
00068  * to replay new data but just wait for the next cycle. This is ignored in
00069  * continuous thread mode as it could cause busy waiting.
00070  * @param thread_name initial thread name
00071  * @param th_opmode thread operation mode
00072  */
00073 BBLogReplayThread::BBLogReplayThread(const char *logfile_name,
00074                                      const char *logdir,
00075                                      const char *scenario,
00076                                      float grace_period,
00077                                      bool loop_replay,
00078                                      bool non_blocking,
00079                                      const char *thread_name,
00080                                      fawkes::Thread::OpMode th_opmode)
00081   : Thread(thread_name, th_opmode)
00082 {
00083   set_name("BBLogReplayThread(%s)", logfile_name);
00084   set_prepfin_conc_loop(true);
00085 
00086   __logfile_name= strdup(logfile_name);
00087   __logdir      = strdup(logdir);
00088   __scenario    = strdup(scenario); // dont need this!?
00089   __filename    = NULL;
00090   __cfg_grace_period = grace_period;
00091   __cfg_loop_replay  = loop_replay;
00092   if (th_opmode == OPMODE_WAITFORWAKEUP) {
00093     __cfg_non_blocking = non_blocking;
00094   } else {
00095     // would cause busy waiting
00096     __cfg_non_blocking = false;
00097   }
00098 }
00099 
00100 
00101 /** Destructor. */
00102 BBLogReplayThread::~BBLogReplayThread()
00103 {
00104   free(__logfile_name);
00105   free(__logdir);
00106   free(__scenario);
00107 }
00108 
00109 
00110 
00111 
00112 void
00113 BBLogReplayThread::init()
00114 {
00115   __logfile = NULL;
00116   __interface = NULL;
00117   __filename = NULL;
00118 
00119   if (asprintf(&__filename, "%s/%s", __logdir, __logfile_name) == -1) {
00120     throw OutOfMemoryException("Cannot re-generate logfile-path");
00121   }
00122 
00123   try {
00124     __logfile = new BBLogFile(__filename, true);
00125   } catch (Exception &e) {
00126     finalize();
00127     throw;
00128   }
00129 
00130   if (! __logfile->has_next()) {
00131     finalize();
00132     throw Exception("Log file %s does not have any entries", __filename);
00133   }
00134 
00135   __interface = blackboard->open_for_writing(__logfile->interface_type(),
00136                                              __logfile->interface_id());
00137 
00138   try {
00139     __logfile->set_interface(__interface);
00140   } catch (Exception &e) {
00141     finalize();
00142     throw;
00143   }
00144 
00145   logger->log_info(name(), "Replaying from %s:", __filename);
00146 }
00147 
00148 
00149 void
00150 BBLogReplayThread::finalize()
00151 {
00152   delete __logfile;
00153   if (__filename)  free(__filename);
00154   blackboard->close(__interface);
00155 }
00156 
00157 
00158 void
00159 BBLogReplayThread::once()
00160 {
00161   // Write first immediately, skip first offset
00162   __logfile->read_next();
00163   __interface->write();
00164   __last_offset = __logfile->entry_offset();
00165   if (__logfile->has_next()) {
00166     __logfile->read_next();
00167     __offsetdiff  = __logfile->entry_offset() - __last_offset;
00168     __last_offset = __logfile->entry_offset();
00169   }
00170   __last_loop.stamp();
00171 }
00172 
00173 void
00174 BBLogReplayThread::loop()
00175 {
00176   if (__logfile->has_next()) {
00177 
00178     // check if there is time left to wait
00179     __now.stamp();
00180     __loopdiff = __now - __last_loop;
00181     if ((__offsetdiff.in_sec() - __loopdiff.in_sec()) > __cfg_grace_period) {
00182       if (__cfg_non_blocking) {
00183         // need to keep waiting before posting, but in non-blocking mode
00184         // just wait for next loop
00185         return;
00186       } else {
00187         __waittime = __offsetdiff - __loopdiff;
00188         __waittime.wait();
00189       }
00190     }
00191 
00192     __interface->write();
00193     __logfile->read_next();
00194 
00195     __last_loop.stamp();
00196     __offsetdiff  = __logfile->entry_offset() - __last_offset;
00197     __last_offset = __logfile->entry_offset();
00198 
00199   } else {
00200     if(__cfg_loop_replay){
00201       logger->log_info(name(), "replay finished, looping");
00202       __logfile->rewind();
00203     } else {
00204       if (opmode() == OPMODE_CONTINUOUS) {
00205         // block
00206         logger->log_info(name(), "replay finished, sleeping");
00207         WaitCondition waitcond;
00208         waitcond.wait();
00209       } // else wait will just run once per loop
00210     }
00211   }
00212 }

Generated on Tue Feb 22 13:31:28 2011 for Fawkes API by  doxygen 1.4.7