remote.cpp

00001 
00002 /***************************************************************************
00003  *  remote.h - Remote BlackBoard access via Fawkes network protocol
00004  *
00005  *  Created: Mon Mar 03 10:53:00 2008
00006  *  Copyright  2006-2008  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023 
00024 #include <blackboard/remote.h>
00025 #include <blackboard/exceptions.h>
00026 #include <blackboard/net/messages.h>
00027 #include <blackboard/net/ilist_content.h>
00028 #include <blackboard/net/interface_proxy.h>
00029 #include <blackboard/internal/notifier.h>
00030 #include <blackboard/internal/instance_factory.h>
00031 
00032 #include <interface/interface_info.h>
00033 
00034 #include <core/threading/mutex.h>
00035 #include <core/threading/mutex_locker.h>
00036 #include <core/threading/wait_condition.h>
00037 #include <netcomm/fawkes/client.h>
00038 
00039 #include <string>
00040 #include <cstring>
00041 #include <fnmatch.h>
00042 #include <arpa/inet.h>
00043 
00044 namespace fawkes {
00045 
00046 /** @class RemoteBlackBoard <blackboard/remote.h>
00047  * Remote BlackBoard.
00048  * This class implements the access to a remote BlackBoard using the Fawkes
00049  * network protocol.
00050  *
00051  * @author Tim Niemueller
00052  */
00053 
00054 /** Constructor.
00055  * @param client Fawkes network client to use.
00056  */
00057 RemoteBlackBoard::RemoteBlackBoard(FawkesNetworkClient *client)
00058 {
00059   __fnc = client;
00060   __fnc_owner = false;
00061 
00062   if ( ! __fnc->connected() ) {
00063     throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
00064   }
00065 
00066   __fnc->register_handler(this, FAWKES_CID_BLACKBOARD);
00067 
00068   __mutex = new Mutex();
00069   __notifier = new BlackBoardNotifier();
00070   __instance_factory = new BlackBoardInstanceFactory();
00071 
00072   __wait_mutex = new Mutex();
00073   __wait_cond  = new WaitCondition(__wait_mutex);
00074 
00075   __m = NULL;
00076 }
00077 
00078 
00079 /** Constructor.
00080  * This will internall create a fawkes network client that is used to communicate
00081  * with the remote BlackBoard.
00082  * @param hostname hostname to connect to
00083  * @param port port to connect to
00084  */
00085 RemoteBlackBoard::RemoteBlackBoard(const char *hostname, unsigned short int port)
00086 {
00087   __fnc = new FawkesNetworkClient(hostname, port);
00088   try {
00089     __fnc->connect();
00090   } catch (Exception &e) {
00091     delete __fnc;
00092     throw;
00093   }
00094 
00095   __fnc_owner = true;
00096 
00097   if ( ! __fnc->connected() ) {
00098     throw Exception("Cannot instantiate RemoteBlackBoard on unconnected client");
00099   }
00100 
00101   __fnc->register_handler(this, FAWKES_CID_BLACKBOARD);
00102 
00103   __mutex = new Mutex();
00104   __notifier = new BlackBoardNotifier();
00105   __instance_factory = new BlackBoardInstanceFactory();
00106 
00107   __wait_mutex = new Mutex();
00108   __wait_cond  = new WaitCondition(__wait_mutex);
00109 
00110   __m = NULL;
00111 }
00112 
00113 
00114 /** Destructor. */
00115 RemoteBlackBoard::~RemoteBlackBoard()
00116 {
00117   __fnc->deregister_handler(FAWKES_CID_BLACKBOARD);
00118   delete __mutex;
00119   delete __notifier;
00120   delete __instance_factory;
00121 
00122   for ( __pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) {
00123     delete __pit->second;
00124   }
00125 
00126   if (__fnc_owner) {
00127     __fnc->disconnect();
00128     delete __fnc;
00129   }
00130 
00131   delete __wait_cond;
00132   delete __wait_mutex;
00133 }
00134 
00135 
00136 bool
00137 RemoteBlackBoard::is_alive() const throw()
00138 {
00139   return __fnc->connected();
00140 }
00141 
00142 
00143 void
00144 RemoteBlackBoard::reopen_interfaces()
00145 {
00146   __proxies.lock();
00147   __ipit = __invalid_proxies.begin();
00148   while ( __ipit != __invalid_proxies.end() ) {
00149     try {
00150       Interface *iface = (*__ipit)->interface();
00151       open_interface(iface->type(), iface->id(), iface->is_writer(), iface);
00152       iface->set_validity(true);
00153       __ipit = __invalid_proxies.erase(__ipit);
00154     } catch (Exception &e) {
00155           // we failed to re-establish validity for the given interface, bad luck
00156       ++__ipit;
00157     }
00158   }
00159   __proxies.unlock();
00160 }
00161 
00162 bool
00163 RemoteBlackBoard::try_aliveness_restore() throw()
00164 {
00165   bool rv = true;
00166   try {
00167     if ( ! __fnc->connected() ) {
00168       __fnc->connect();
00169 
00170       reopen_interfaces();
00171     }
00172   } catch (...) {
00173     rv = false;
00174   }
00175   return rv;
00176 }
00177 
00178 
00179 void
00180 RemoteBlackBoard::open_interface(const char *type, const char *identifier,
00181                                  bool writer, Interface *iface)
00182 {
00183   if ( ! __fnc->connected() ) {
00184     throw Exception("Cannot instantiate remote interface, connection is dead");
00185   }
00186 
00187   bb_iopen_msg_t *om = (bb_iopen_msg_t *)calloc(1, sizeof(bb_iopen_msg_t));
00188   strncpy(om->type, type, __INTERFACE_TYPE_SIZE);
00189   strncpy(om->id, identifier, __INTERFACE_ID_SIZE);
00190   memcpy(om->hash, iface->hash(), __INTERFACE_HASH_SIZE);
00191 
00192   FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
00193                                                         writer ? MSG_BB_OPEN_FOR_WRITING : MSG_BB_OPEN_FOR_READING,
00194                                                         om, sizeof(bb_iopen_msg_t));
00195 
00196   __wait_mutex->lock();
00197   __fnc->enqueue(omsg);
00198   while (! __m ||
00199          ((__m->msgid() != MSG_BB_OPEN_SUCCESS) &&
00200           (__m->msgid() != MSG_BB_OPEN_FAILURE))) {
00201     if ( __m ) {
00202       __m->unref();
00203       __m = NULL;
00204     }
00205     __wait_cond->wait();
00206   }
00207   __wait_mutex->unlock();
00208 
00209   if ( __m->msgid() == MSG_BB_OPEN_SUCCESS ) {
00210     // We got the interface, create internal storage and prepare instance for return
00211     BlackBoardInterfaceProxy *proxy = new BlackBoardInterfaceProxy(__fnc, __m, __notifier,
00212                                                                    iface, writer);
00213     __proxies[proxy->serial()] = proxy;
00214   } else if ( __m->msgid() == MSG_BB_OPEN_FAILURE ) {
00215     bb_iopenfail_msg_t *fm = __m->msg<bb_iopenfail_msg_t>();
00216     unsigned int error = ntohl(fm->errno);
00217     __m->unref();
00218     __m = NULL;
00219     if ( error == BB_ERR_WRITER_EXISTS ) {
00220       throw BlackBoardWriterActiveException(identifier, type);
00221     } else if ( error == BB_ERR_HASH_MISMATCH ) {
00222       throw Exception("Hash mismatch for interface %s:%s", type, identifier);
00223     } else if ( error == BB_ERR_UNKNOWN_TYPE ) {
00224       throw Exception("Type %s unknoen (%s:%s)", type, type, identifier);
00225     } else if ( error == BB_ERR_WRITER_EXISTS ) {
00226       throw BlackBoardWriterActiveException(identifier, type);
00227     } else {
00228       throw Exception("Could not open interface");
00229     }
00230   }
00231 
00232   __m->unref();
00233   __m = NULL;
00234 }
00235 
00236 Interface *
00237 RemoteBlackBoard::open_interface(const char *type, const char *identifier, bool writer)
00238 {
00239   if ( ! __fnc->connected() ) {
00240     throw Exception("Cannot instantiate remote interface, connection is dead");
00241   }
00242 
00243   Interface *iface = __instance_factory->new_interface_instance(type, identifier);
00244   try {
00245     open_interface(type, identifier, writer, iface);
00246   } catch (...) {
00247     __instance_factory->delete_interface_instance(iface);
00248     throw;
00249   }
00250 
00251   return iface;
00252 }
00253 
00254 
00255 Interface *
00256 RemoteBlackBoard::open_for_reading(const char *type, const char *identifier)
00257 {
00258   return open_interface(type, identifier, /* writer? */ false);
00259 }
00260 
00261 
00262 Interface *
00263 RemoteBlackBoard::open_for_writing(const char *type, const char *identifier)
00264 {
00265   return open_interface(type, identifier, /* writer? */ true);
00266 }
00267 
00268 
00269 std::list<Interface *>
00270 RemoteBlackBoard::open_multiple_for_reading(const char *type, const char *id_pattern)
00271 {
00272   std::list<Interface *> rv;
00273 
00274   InterfaceInfoList *infl = list_all();
00275   for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
00276     if ((strncmp(type, i->type(), __INTERFACE_TYPE_SIZE) != 0) ||
00277         (fnmatch(id_pattern, i->id(), 0) == FNM_NOMATCH) ) {
00278       // type or ID prefix does not match, go on
00279       continue;
00280     }
00281 
00282     try {
00283       Interface *iface = open_for_reading((*i).type(), (*i).id());
00284       rv.push_back(iface);
00285     } catch (Exception &e) {
00286       for (std::list<Interface *>::iterator j = rv.begin(); j != rv.end(); ++j) {
00287         close(*j);
00288       }
00289       throw;
00290     }
00291   }
00292 
00293   return rv;
00294 }
00295 
00296 
00297 /** Close interface.
00298  * @param interface interface to close
00299  */
00300 void
00301 RemoteBlackBoard::close(Interface *interface)
00302 {
00303   if ( interface == NULL )  return;
00304 
00305   unsigned int serial = interface->serial();
00306 
00307   if ( __proxies.find(serial) != __proxies.end() ) {
00308     delete __proxies[serial];
00309     __proxies.erase(serial);
00310   }
00311 
00312   if ( __fnc->connected() ) {
00313     // We cannot "officially" close it, if we are disconnected it cannot be used anyway
00314     bb_iserial_msg_t *sm = (bb_iserial_msg_t *)calloc(1, sizeof(bb_iserial_msg_t));
00315     sm->serial = htonl(interface->serial());
00316 
00317     FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
00318                                                           MSG_BB_CLOSE,
00319                                                           sm, sizeof(bb_iserial_msg_t));
00320     __fnc->enqueue(omsg);
00321   }
00322 
00323   __instance_factory->delete_interface_instance(interface);
00324 }
00325 
00326 
00327 void
00328 RemoteBlackBoard::register_listener(BlackBoardInterfaceListener *listener, unsigned int flags)
00329 {
00330   __notifier->register_listener(listener, flags);
00331 }
00332 
00333 
00334 void
00335 RemoteBlackBoard::unregister_listener(BlackBoardInterfaceListener *listener)
00336 {
00337   __notifier->unregister_listener(listener);
00338 }
00339 
00340 
00341 void
00342 RemoteBlackBoard::register_observer(BlackBoardInterfaceObserver *observer, unsigned int flags)
00343 {
00344   __notifier->register_observer(observer, flags);
00345 }
00346 
00347 
00348 void
00349 RemoteBlackBoard::unregister_observer(BlackBoardInterfaceObserver *observer)
00350 {
00351   __notifier->unregister_observer(observer);
00352 }
00353 
00354 
00355 InterfaceInfoList *
00356 RemoteBlackBoard::list_all()
00357 {
00358   MutexLocker lock(__mutex);
00359   InterfaceInfoList *infl = new InterfaceInfoList();
00360 
00361   FawkesNetworkMessage *omsg = new FawkesNetworkMessage(FAWKES_CID_BLACKBOARD,
00362                                                         MSG_BB_LIST_ALL);
00363   __wait_mutex->lock();
00364   __fnc->enqueue(omsg);
00365   while (! __m ||
00366          (__m->msgid() != MSG_BB_INTERFACE_LIST)) {
00367     if ( __m ) {
00368       __m->unref();
00369       __m = NULL;
00370     }
00371     __wait_cond->wait();
00372   }
00373   __wait_mutex->unlock();
00374 
00375   BlackBoardInterfaceListContent *bbilc = __m->msgc<BlackBoardInterfaceListContent>();
00376   while ( bbilc->has_next() ) {
00377     size_t iisize;
00378     bb_iinfo_msg_t *ii = bbilc->next(&iisize);
00379     infl->append(ii->type, ii->id, ii->hash,  ii->serial,
00380                  ii->has_writer, ii->num_readers);
00381   }
00382 
00383   __m->unref();
00384   __m = NULL;
00385 
00386   return infl;
00387 }
00388 
00389 
00390 /** We are no longer registered in Fawkes network client.
00391  * Ignored.
00392  * @param id the id of the calling client
00393  */
00394 void
00395 RemoteBlackBoard::deregistered(unsigned int id) throw()
00396 {
00397 }
00398 
00399 
00400 void
00401 RemoteBlackBoard::inbound_received(FawkesNetworkMessage *m,
00402                                    unsigned int id) throw()
00403 {
00404   if ( m->cid() == FAWKES_CID_BLACKBOARD ) {
00405     unsigned int msgid = m->msgid();
00406     try {
00407       if ( msgid == MSG_BB_DATA_CHANGED ) {
00408         unsigned int serial = ntohl(((unsigned int *)m->payload())[0]);
00409         if ( __proxies.find(serial) != __proxies.end() ) {
00410           __proxies[serial]->process_data_changed(m);
00411         }
00412       } else if (msgid == MSG_BB_INTERFACE_MESSAGE) {
00413         unsigned int serial = ntohl(((unsigned int *)m->payload())[0]);
00414         if ( __proxies.find(serial) != __proxies.end() ) {
00415           __proxies[serial]->process_interface_message(m);
00416         }
00417       } else if (msgid == MSG_BB_READER_ADDED) {
00418         bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00419         if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00420           __proxies[ntohl(esm->serial)]->reader_added(ntohl(esm->event_serial));
00421         }
00422       } else if (msgid == MSG_BB_READER_REMOVED) {
00423         bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00424         if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00425           __proxies[ntohl(esm->serial)]->reader_removed(ntohl(esm->event_serial));
00426         }
00427       } else if (msgid == MSG_BB_WRITER_ADDED) {
00428         bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00429         if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00430           __proxies[ntohl(esm->serial)]->writer_added(ntohl(esm->event_serial));
00431         }
00432       } else if (msgid == MSG_BB_WRITER_REMOVED) {
00433         bb_ieventserial_msg_t *esm = m->msg<bb_ieventserial_msg_t>();
00434         if ( __proxies.find(ntohl(esm->serial)) != __proxies.end() ) {
00435           __proxies[ntohl(esm->serial)]->writer_removed(ntohl(esm->event_serial));
00436         }
00437       } else {
00438         __wait_mutex->lock();
00439         __m = m;
00440         __m->ref();
00441         __wait_cond->wake_all();
00442         __wait_mutex->unlock();
00443       }
00444     } catch (Exception &e) {
00445       // Bam, you're dead. Ok, not now, we just ignore that this shit happened...
00446     }
00447   }
00448 }
00449 
00450 
00451 void
00452 RemoteBlackBoard::connection_died(unsigned int id) throw()
00453 {
00454   // mark all assigned interfaces as invalid
00455   __proxies.lock();
00456   for (__pit = __proxies.begin(); __pit != __proxies.end(); ++__pit) {
00457     __pit->second->interface()->set_validity(false);
00458     __invalid_proxies.push_back(__pit->second);
00459   }
00460   __proxies.clear();
00461   __proxies.unlock();
00462 }
00463 
00464 
00465 void
00466 RemoteBlackBoard::connection_established(unsigned int id) throw()
00467 {
00468 }
00469 
00470 } // end namespace fawkes

Generated on Tue Feb 22 13:32:25 2011 for Fawkes API by  doxygen 1.4.7