log_thread.cpp

00001 
00002 /***************************************************************************
00003  *  log_thread.cpp - BB Logger Thread
00004  *
00005  *  Created: Sun Nov 08 00:02:09 2009
00006  *  Copyright  2006-2009  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version.
00014  *
00015  *  This program is distributed in the hope that it will be useful,
00016  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  *  GNU Library General Public License for more details.
00019  *
00020  *  Read the full text in the LICENSE.GPL file in the doc directory.
00021  */
00022 
00023 #include "log_thread.h"
00024 #include "file.h"
00025 
00026 #include <blackboard/blackboard.h>
00027 #include <utils/logging/logger.h>
00028 #include <core/exceptions/system.h>
00029 #include <interfaces/SwitchInterface.h>
00030 
00031 #include <memory>
00032 #include <cstring>
00033 #include <cstdlib>
00034 #include <cstdio>
00035 #include <cerrno>
00036 #include <fcntl.h>
00037 #ifdef __FreeBSD__
00038 #  include <sys/endian.h>
00039 #else
00040 #  include <endian.h>
00041 #endif
00042 #include <arpa/inet.h>
00043 #include <sys/stat.h>
00044 #include <sys/mman.h>
00045 
00046 using namespace fawkes;
00047 
00048 /** @class BBLoggerThread "log_thread.h"
00049  * BlackBoard logger thread.
00050  * One instance of this thread handles logging of one specific interface.
00051  * The plugin will spawn as many threads as there are interfaces to log. This
00052  * allows for maximum concurrency of the writers and avoids a serialization
00053  * bottle neck.
00054  * The log thread can operate in buffering mode. If this mode is disabled, the
00055  * data is written to the file within the blackboard data changed event, and
00056  * thus the writing operation can slow down the overall system, but memory
00057  * requirements are low. This is useful if a lot of data is written or if the
00058  * storage device is slow. If the mode is enabled, during the event the BB data
00059  * will be copied into another memory segment and the thread will be woken up.
00060  * Once the thread is running it stores all of the BB data segments bufferd
00061  * up to then.
00062  * The interface listener listens for events for a particular interface and
00063  * then writes the changes to the file.
00064  * @author Tim Niemueller
00065  */
00066 
00067 /** Constructor.
00068  * @param iface_uid interface UID which to log
00069  * @param logdir directory to store config files, must exist
00070  * @param buffering enable log buffering?
00071  * @param flushing true to flush after each written chunk
00072  * @param scenario ID of the log scenario
00073  * @param start_time time to use as start time for the log
00074  */
00075 BBLoggerThread::BBLoggerThread(const char *iface_uid,
00076                                const char *logdir, bool buffering, bool flushing,
00077                                const char *scenario, fawkes::Time *start_time)
00078   : Thread("BBLoggerThread", Thread::OPMODE_WAITFORWAKEUP),
00079     BlackBoardInterfaceListener("BBLoggerThread(%s)", iface_uid)
00080 {
00081   set_coalesce_wakeups(true);
00082   set_name("BBLoggerThread(%s)", iface_uid);
00083 
00084   __buffering   = buffering;
00085   __flushing    = flushing;
00086   __uid         = strdup(iface_uid);
00087   __logdir      = strdup(logdir);
00088   __scenario    = strdup(scenario);
00089   __start       = new Time(start_time);
00090   __filename    = NULL;
00091   __queue_mutex = new Mutex();
00092   __data_size   = 0;
00093   __is_master   = false;
00094   __enabled     = true;
00095 
00096   __now = NULL;
00097 
00098   // Parse UID
00099   Interface::parse_uid(__uid, &__type, &__id);
00100 
00101   char date[21];
00102   Time now;
00103   struct tm *tmp = localtime(&(now.get_timeval()->tv_sec));
00104   strftime(date, 21, "%F-%H-%M-%S", tmp);
00105 
00106   if (asprintf(&__filename, "%s/%s-%s-%s-%s.log", LOGDIR, __scenario,
00107                __type, __id, date) == -1) {
00108     throw OutOfMemoryException("Cannot generate log name");
00109   }
00110 }
00111 
00112 
00113 /** Destructor. */
00114 BBLoggerThread::~BBLoggerThread()
00115 {
00116   free(__uid);
00117   free(__type);
00118   free(__id);
00119   free(__logdir);
00120   free(__scenario);
00121   free(__filename);
00122   delete __queue_mutex;
00123   delete __start;
00124 }
00125 
00126 
00127 void
00128 BBLoggerThread::init()
00129 {
00130   __queues[0].clear();
00131   __queues[1].clear();
00132   __act_queue = 0;
00133 
00134   __queue_mutex = new Mutex();
00135   __data_size   = 0;
00136 
00137   __now = NULL;
00138   __num_data_items = 0;
00139   __session_start  = 0;
00140 
00141   // use open because fopen does not provide O_CREAT | O_EXCL
00142   // open read/write because of usage of mmap
00143   mode_t m = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
00144   int fd   = open(__filename, O_RDWR | O_CREAT | O_EXCL, m);
00145   if ( ! fd ) {
00146     throw CouldNotOpenFileException(__filename, errno, "Failed to open log 1");
00147   } else {
00148     __f_data = fdopen(fd, "w+");
00149     if ( ! __f_data ) {
00150       throw CouldNotOpenFileException(__filename, errno, "Failed to open log 2");
00151     }
00152   }
00153 
00154   try {
00155     __iface = blackboard->open_for_reading(__type, __id);
00156     __data_size = __iface->datasize();
00157   } catch (Exception &e) {
00158     fclose(__f_data);
00159     throw;
00160   }
00161 
00162   try {
00163     write_header();
00164   } catch (FileWriteException &e) {
00165     blackboard->close(__iface);
00166     fclose(__f_data);
00167     throw;
00168   }
00169 
00170   __now = new Time(clock);
00171 
00172   if (__is_master) {
00173     try {
00174       __switch_if = blackboard->open_for_writing<SwitchInterface>("BBLogger");
00175       __switch_if->set_enabled(__enabled);
00176       __switch_if->write();
00177       bbil_add_message_interface(__switch_if);
00178     } catch (Exception &e) {
00179       fclose(__f_data);
00180       throw;
00181     }
00182   }
00183 
00184   bbil_add_data_interface(__iface);
00185   bbil_add_writer_interface(__iface);
00186 
00187   blackboard->register_listener(this, BlackBoard::BBIL_FLAG_DATA |
00188                                 BlackBoard::BBIL_FLAG_WRITER |
00189                                 BlackBoard::BBIL_FLAG_MESSAGES);
00190 
00191   logger->log_info(name(), "Logging %s to %s%s", __iface->uid(), __filename,
00192                    __is_master ? " as master" : "");
00193 }
00194 
00195 
00196 void
00197 BBLoggerThread::finalize()
00198 {
00199   blackboard->unregister_listener(this);
00200   if (__is_master) {
00201     blackboard->close(__switch_if);
00202   }
00203   update_header();
00204   fclose(__f_data);
00205   for (unsigned int q = 0; q < 2; ++q) {
00206     while (!__queues[q].empty()) {
00207       void *t = __queues[q].front();
00208       free(t);
00209       __queues[q].pop();
00210     }
00211   }
00212   delete __now;
00213   __now = NULL;
00214 }
00215 
00216 
00217 /** Get filename.
00218  * @return file name, valid after object instantiated, but before init() does not
00219  * mean that the file has been or can actually be opened
00220  */
00221 const char *
00222 BBLoggerThread::get_filename() const
00223 {
00224   return __filename;
00225 }
00226 
00227 
00228 /** Enable or disable logging.
00229  * @param enabled true to enable logging, false to disable
00230  */
00231 void
00232 BBLoggerThread::set_enabled(bool enabled)
00233 {
00234   if (enabled && !__enabled) {
00235     logger->log_info(name(), "Logging enabled",
00236                      (__num_data_items - __session_start));
00237     __session_start = __num_data_items;
00238   } else if (!enabled && __enabled) {
00239     logger->log_info(name(), "Logging disabled (wrote %u entries), flushing",
00240                      (__num_data_items - __session_start));
00241     update_header();
00242     fflush(__f_data);
00243   }
00244 
00245   __enabled = enabled;
00246 }
00247 
00248 
00249 /** Set threadlist and master status.
00250  * This copies the thread list and sets this thread as master thread.
00251  * If you intend to use this method you must do so before the thread is
00252  * initialized. You may only ever declare one thread as master.
00253  * @param thread_list list of threads to notify on enable/disable events
00254  */
00255 void
00256 BBLoggerThread::set_threadlist(fawkes::ThreadList &thread_list)
00257 {
00258   __is_master = true;
00259   __threads   = thread_list;
00260 }
00261 
00262 void
00263 BBLoggerThread::write_header()
00264 {
00265   bblog_file_header header;
00266   memset(&header, 0, sizeof(header));
00267   header.file_magic   = htonl(BBLOGGER_FILE_MAGIC);
00268   header.file_version = htonl(BBLOGGER_FILE_VERSION);
00269 #if __BYTE_ORDER == __BIG_ENDIAN
00270   header.endianess = BBLOG_BIG_ENDIAN;
00271 #else
00272   header.endianess = BBLOG_LITTLE_ENDIAN;
00273 #endif
00274   header.num_data_items = __num_data_items;
00275   strncpy(header.scenario, (const char *)__scenario, BBLOG_SCENARIO_SIZE);
00276   strncpy(header.interface_type, __iface->type(), BBLOG_INTERFACE_TYPE_SIZE);
00277   strncpy(header.interface_id, __iface->id(), BBLOG_INTERFACE_ID_SIZE);
00278   memcpy(header.interface_hash, __iface->hash(), BBLOG_INTERFACE_HASH_SIZE);
00279   header.data_size = __iface->datasize();
00280   long start_time_sec, start_time_usec;
00281   __start->get_timestamp(start_time_sec, start_time_usec);
00282   header.start_time_sec  = start_time_sec;
00283   header.start_time_usec = start_time_usec;
00284   if (fwrite(&header, sizeof(header), 1, __f_data) != 1) {
00285     throw FileWriteException(__filename, "Failed to write header");
00286   }
00287   fflush(__f_data);
00288 }
00289 
00290 /** Updates the num_data_items field in the header. */
00291 void
00292 BBLoggerThread::update_header()
00293 {
00294   // write updated num_data_items field
00295 #if _POSIX_MAPPED_FILES
00296   void *h = mmap(NULL, sizeof(bblog_file_header), PROT_WRITE, MAP_SHARED,
00297                  fileno(__f_data), 0);
00298   if (h == MAP_FAILED) {
00299     logger->log_warn(name(), "Failed to mmap log (%s), "
00300                      "not updating number of data items",
00301                      strerror(errno));
00302   } else {
00303     bblog_file_header *header = (bblog_file_header *)h;
00304     header->num_data_items = __num_data_items;
00305     munmap(h, sizeof(bblog_file_header));
00306   }
00307 #else
00308   logger->log_warn(name(), "Memory mapped files not available, "
00309                    "not updating number of data items on close");
00310 #endif
00311 }
00312 
00313 void
00314 BBLoggerThread::write_chunk(const void *chunk)
00315 {
00316   bblog_entry_header ehead;
00317   __now->stamp();
00318   Time d = *__now - *__start;
00319   long rel_time_sec, rel_time_usec;
00320   d.get_timestamp(rel_time_sec, rel_time_usec);
00321   ehead.rel_time_sec  = rel_time_sec;
00322   ehead.rel_time_usec = rel_time_usec;
00323   if ( (fwrite(&ehead, sizeof(ehead), 1, __f_data) == 1) &&
00324        (fwrite(chunk, __data_size, 1, __f_data) == 1) ) {
00325     if (__flushing)  fflush(__f_data);
00326     __num_data_items += 1;
00327   } else {
00328     logger->log_warn(name(), "Failed to write chunk");
00329   }
00330 }
00331 
00332 
00333 void
00334 BBLoggerThread::loop()
00335 {
00336   unsigned int write_queue = __act_queue;
00337   __queue_mutex->lock();
00338   __act_queue = 1 - __act_queue;
00339   __queue_mutex->unlock();
00340   LockQueue<void *> &queue = __queues[write_queue];
00341   //logger->log_debug(name(), "Writing %zu entries", queue.size());
00342   while (! queue.empty() ) {
00343     void *c = queue.front();
00344     write_chunk(c);
00345     free(c);
00346     queue.pop();
00347   }
00348 }
00349 
00350 bool
00351 BBLoggerThread::bb_interface_message_received(Interface *interface,
00352                                               Message *message) throw()
00353 {
00354   SwitchInterface::EnableSwitchMessage *enm;
00355   SwitchInterface::DisableSwitchMessage *dism;
00356 
00357   bool enabled = true;
00358   if ((enm = dynamic_cast<SwitchInterface::EnableSwitchMessage *>(message)) != NULL) {
00359     enabled = true;
00360   } else if ((dism = dynamic_cast<SwitchInterface::DisableSwitchMessage *>(message)) != NULL) {
00361     enabled = false;
00362   } else {
00363     logger->log_debug(name(), "Unhandled message type: %s via %s",
00364                       message->type(), interface->uid());
00365   }
00366 
00367   for (ThreadList::iterator i = __threads.begin(); i != __threads.end(); ++i) {
00368     BBLoggerThread *bblt = dynamic_cast<BBLoggerThread *>(*i);
00369     bblt->set_enabled(enabled);
00370   }
00371 
00372   __switch_if->set_enabled(__enabled);
00373   __switch_if->write();
00374 
00375   return false;
00376 }
00377 
00378 
00379 void
00380 BBLoggerThread::bb_interface_data_changed(Interface *interface) throw()
00381 {
00382   if (!__enabled)  return;
00383 
00384   try {
00385     __iface->read();
00386 
00387     if ( __buffering ) {
00388       void *c = malloc(__iface->datasize());
00389       memcpy(c, __iface->datachunk(), __iface->datasize());
00390       __queue_mutex->lock();
00391       __queues[__act_queue].push_locked(c);
00392       __queue_mutex->unlock();
00393       wakeup();
00394     } else {
00395       __queue_mutex->lock();
00396       write_chunk(__iface->datachunk());
00397       __queue_mutex->unlock();
00398     }
00399 
00400   } catch (Exception &e) {
00401     logger->log_error(name(), "Exception when data changed");
00402     logger->log_error(name(), e);
00403   }
00404 }
00405 
00406 void
00407 BBLoggerThread::bb_interface_writer_added(Interface *interface,
00408                                           unsigned int instance_serial) throw()
00409 {
00410   __session_start = __num_data_items;
00411 }
00412 
00413 void
00414 BBLoggerThread::bb_interface_writer_removed(Interface *interface,
00415                                             unsigned int instance_serial) throw()
00416 {
00417   logger->log_info(name(), "Writer removed (wrote %u entries), flushing",
00418                    (__num_data_items - __session_start));
00419   update_header();
00420   fflush(__f_data);
00421 }

Generated on Tue Feb 22 13:32:29 2011 for Fawkes API by  doxygen 1.4.7