fuse_client.cpp

00001 
00002 /***************************************************************************
00003  *  fuse_client.cpp - FUSE network transport client
00004  *
00005  *  Created: Thu Mar 29 00:47:24 2007
00006  *  Copyright  2005-2007  Tim Niemueller [www.niemueller.de]
00007  *
00008  ****************************************************************************/
00009 
00010 /*  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version. A runtime exception applies to
00014  *  this software (see LICENSE.GPL_WRE file mentioned below for details).
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU Library General Public License for more details.
00020  *
00021  *  Read the full text in the LICENSE.GPL_WRE file in the doc directory.
00022  */
00023 
00024 #include <fvutils/net/fuse_client.h>
00025 
00026 #include <fvutils/net/fuse_transceiver.h>
00027 #include <fvutils/net/fuse_message_queue.h>
00028 #include <fvutils/net/fuse_message.h>
00029 #include <fvutils/net/fuse_client_handler.h>
00030 
00031 #include <core/threading/mutex.h>
00032 #include <core/threading/wait_condition.h>
00033 #include <core/exceptions/software.h>
00034 #include <netcomm/socket/stream.h>
00035 #include <netcomm/utils/exceptions.h>
00036 
00037 #include <cstring>
00038 #include <netinet/in.h>
00039 #include <cstdlib>
00040 #include <unistd.h>
00041 
00042 using namespace fawkes;
00043 
00044 namespace firevision {
00045 #if 0 /* just to make Emacs auto-indent happy */
00046 }
00047 #endif
00048 
00049 /** @class FuseClient <fvutils/net/fuse_client.h>
00050  * FUSE client.
00051  * FUSE is the FireVision protocol to retrieve information, images and lookup
00052  * tables from vision processes and to send control commands to these systems.
00053  * The client is used in the retrieving or controlling process.
00054  * @ingroup FUSE
00055  * @ingroup FireVision
00056  * @author Tim Niemueller
00057  */
00058 
00059 /** Constructor.
00060  * @param hostname host to connect to
00061  * @param port port to connect to
00062  * @param handler client handler to handle incoming data
00063  */
00064 FuseClient::FuseClient(const char *hostname, unsigned short int port,
00065                        FuseClientHandler *handler)
00066   : Thread("FuseClient")
00067 {
00068   __hostname = strdup(hostname);
00069   __port = port;
00070   __handler = handler;
00071 
00072   __wait_timeout = 10;
00073 
00074   __inbound_msgq = new FuseNetworkMessageQueue();
00075   __outbound_msgq = new FuseNetworkMessageQueue();
00076 
00077   __mutex = new Mutex();
00078   __recv_mutex    = new Mutex();
00079   __recv_waitcond = new WaitCondition(__recv_mutex);
00080   __socket = new StreamSocket();
00081   __greeting_mutex    = new Mutex();
00082   __greeting_waitcond = new WaitCondition(__greeting_mutex);
00083 
00084   __alive = true;
00085   __greeting_received = false;
00086 }
00087 
00088 
00089 /** Destructor. */
00090 FuseClient::~FuseClient()
00091 {
00092   free(__hostname);
00093 
00094   while ( ! __inbound_msgq->empty() ) {
00095     FuseNetworkMessage *m = __inbound_msgq->front();
00096     m->unref();
00097     __inbound_msgq->pop();
00098   }
00099   delete __inbound_msgq;
00100 
00101   while ( ! __outbound_msgq->empty() ) {
00102     FuseNetworkMessage *m = __outbound_msgq->front();
00103     m->unref();
00104     __outbound_msgq->pop();
00105   }
00106   delete __outbound_msgq;
00107 
00108   delete __mutex;
00109   delete __recv_mutex;
00110   delete __recv_waitcond;
00111   delete __socket;
00112   delete __greeting_mutex;
00113   delete __greeting_waitcond;
00114 }
00115 
00116 
00117 /** Connect. */
00118 void
00119 FuseClient::connect()
00120 {
00121   __socket->connect(__hostname, __port);
00122 
00123   FUSE_greeting_message_t *greetmsg = (FUSE_greeting_message_t *)malloc(sizeof(FUSE_greeting_message_t));
00124   greetmsg->version = htonl(FUSE_CURRENT_VERSION);
00125   __outbound_msgq->push(new FuseNetworkMessage(FUSE_MT_GREETING,
00126                                                greetmsg, sizeof(FUSE_greeting_message_t)));
00127 }
00128 
00129 
00130 /** Disconnect. */
00131 void
00132 FuseClient::disconnect()
00133 {
00134   __mutex->lock();
00135   delete __socket;
00136   __socket = new StreamSocket();
00137   __alive = false;
00138   __mutex->unlock();
00139 }
00140 
00141 
00142 /** Send queued messages. */
00143 void
00144 FuseClient::send()
00145 {
00146   try {
00147     FuseNetworkTransceiver::send(__socket, __outbound_msgq);
00148   } catch (ConnectionDiedException &e) {
00149     e.print_trace();
00150     __socket->close();
00151     __alive = false;
00152     __handler->fuse_connection_died();
00153     __recv_waitcond->wake_all();
00154   }
00155 }
00156 
00157 
00158 /** Receive messages. */
00159 void
00160 FuseClient::recv()
00161 {
00162   __recv_mutex->lock();
00163   try {
00164     while ( __socket->available() ) {
00165       FuseNetworkTransceiver::recv(__socket, __inbound_msgq);
00166     }
00167   } catch (ConnectionDiedException &e) {
00168     e.print_trace();
00169     __socket->close();
00170     __alive = false;
00171     __handler->fuse_connection_died();
00172     __recv_waitcond->wake_all();
00173   }
00174   __recv_mutex->unlock();
00175 }
00176 
00177 
00178 /** Enqueue message.
00179  * This method takes ownership of the passed message. You must explicitly
00180  * reference it before enqueing if you want to use it afterwards.
00181  * @param m message to enqueue
00182  */
00183 void
00184 FuseClient::enqueue(FuseNetworkMessage *m)
00185 {
00186   __outbound_msgq->push_locked(m);
00187 }
00188 
00189 
00190 /** Enqueue message.
00191  * @param type type of message
00192  * @param payload payload of message
00193  * @param payload_size size of payload
00194  */
00195 void
00196 FuseClient::enqueue(FUSE_message_type_t type, void *payload, size_t payload_size)
00197 {
00198   FuseNetworkMessage *m = new FuseNetworkMessage(type, payload, payload_size);
00199   __outbound_msgq->push_locked(m);  
00200 }
00201 
00202 
00203 /** Enqueue message without payload.
00204  * @param type type of message
00205  */
00206 void
00207 FuseClient::enqueue(FUSE_message_type_t type)
00208 {
00209   FuseNetworkMessage *m = new FuseNetworkMessage(type);
00210   __outbound_msgq->push_locked(m);  
00211 }
00212 
00213 
00214 /** Enqueue message and wait for reply.
00215  * The wait happens atomically, use this to avoid race conditions. This method
00216  * takes ownership of the passed message. You must explicitly reference it
00217  * before enqueing if you want to use it afterwards.
00218  * @param m message to enqueue
00219  */
00220 void
00221 FuseClient::enqueue_and_wait(FuseNetworkMessage *m)
00222 {
00223   __recv_mutex->lock();
00224   __outbound_msgq->push_locked(m);
00225   __recv_waitcond->wait();
00226   __recv_mutex->unlock();
00227 }
00228 
00229 
00230 /** Enqueue message and wait for reply.
00231  * The wait happens atomically, use this to avoid race conditions.
00232  * @param type type of message
00233  * @param payload payload of message
00234  * @param payload_size size of payload
00235  */
00236 void
00237 FuseClient::enqueue_and_wait(FUSE_message_type_t type, void *payload, size_t payload_size)
00238 {
00239   FuseNetworkMessage *m = new FuseNetworkMessage(type, payload, payload_size);
00240   __recv_mutex->lock();
00241   __outbound_msgq->push_locked(m);  
00242   __recv_waitcond->wait();
00243   __recv_mutex->unlock();
00244 }
00245 
00246 
00247 /** Enqueue message without payload and wait for reply.
00248  * The wait happens atomically, use this to avoid race conditions.
00249  * @param type type of message
00250  */
00251 void
00252 FuseClient::enqueue_and_wait(FUSE_message_type_t type)
00253 {
00254   FuseNetworkMessage *m = new FuseNetworkMessage(type);
00255   __recv_mutex->lock();
00256   __outbound_msgq->push_locked(m);  
00257   __recv_waitcond->wait();
00258   __recv_mutex->unlock();
00259 }
00260 
00261 
00262 
00263 /** Sleep for some time.
00264  * Wait until inbound messages have been receive, the connection dies or the
00265  * timeout has been reached, whatever comes first. So you sleep at most timeout ms,
00266  * but short under some circumstances (incoming data or lost connection).
00267  */
00268 void
00269 FuseClient::sleep()
00270 {
00271   try {
00272     __socket->poll(__wait_timeout /* ms timeout */, Socket::POLL_IN);
00273   } catch (Exception &e) {
00274   }
00275 }
00276 
00277 
00278 /** Thread loop.
00279  * Sends enqueued messages and reads incoming messages off the network.
00280  */
00281 void
00282 FuseClient::loop()
00283 {
00284   __mutex->lock();
00285 
00286   if ( ! __alive ) {
00287     __mutex->unlock();
00288     usleep(10000);
00289     return;
00290   }
00291 
00292   bool wake = false;
00293 
00294   send();
00295   sleep();
00296   recv();
00297 
00298   //process_inbound();
00299 
00300   __inbound_msgq->lock();
00301   while ( ! __inbound_msgq->empty() ) {
00302     FuseNetworkMessage *m = __inbound_msgq->front();
00303 
00304     if ( m->type() == FUSE_MT_GREETING ) {
00305       FUSE_greeting_message_t *gm = m->msg<FUSE_greeting_message_t>();
00306       if ( ntohl(gm->version) != FUSE_CURRENT_VERSION ) {
00307         __handler->fuse_invalid_server_version(FUSE_CURRENT_VERSION, ntohl(gm->version));
00308         __alive = false;
00309       } else {
00310         __greeting_mutex->lock();
00311         __greeting_received = true;
00312         __greeting_waitcond->wake_all();
00313         __greeting_mutex->unlock();
00314         __handler->fuse_connection_established();
00315       }
00316     } else {
00317       __handler->fuse_inbound_received(m);
00318       wake = true;
00319     }
00320 
00321     m->unref();
00322     __inbound_msgq->pop();
00323   }
00324   __inbound_msgq->unlock();
00325 
00326   if ( wake ) {
00327     __recv_waitcond->wake_all();
00328   }
00329   __mutex->unlock();
00330 }
00331 
00332 
00333 /** Wait for messages.
00334  * This will wait for messages to arrive. The calling
00335  * thread is blocked until messages are available.
00336  */
00337 void
00338 FuseClient::wait()
00339 {
00340   __recv_mutex->lock();
00341   __recv_waitcond->wait();
00342   __recv_mutex->unlock();
00343 }
00344 
00345 
00346 /** Wait for greeting message.
00347  * This method will wait for the greeting message to arrive. Make sure that you called
00348  * connect() before waiting or call it concurrently in another thread. The calling thread
00349  * will be blocked until the message has been received. If the message has already been
00350  * received this method will return immediately. Thus it is safe to call this at any time
00351  * without risking a race condition.
00352  */
00353 void
00354 FuseClient::wait_greeting()
00355 {
00356   __greeting_mutex->lock();
00357   while (! __greeting_received) {
00358     __greeting_waitcond->wait();
00359   }
00360   __greeting_mutex->unlock();
00361 }
00362 
00363 } // end namespace firevision

Generated on Tue Feb 22 13:32:16 2011 for Fawkes API by  doxygen 1.4.7