00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <blackboard/net/interface_proxy.h>
00025 #include <blackboard/internal/instance_factory.h>
00026 #include <blackboard/net/messages.h>
00027 #include <blackboard/internal/interface_mem_header.h>
00028 #include <blackboard/internal/notifier.h>
00029
00030 #include <core/threading/refc_rwlock.h>
00031 #include <utils/logging/liblogger.h>
00032 #include <netcomm/fawkes/client.h>
00033 #include <netcomm/fawkes/message.h>
00034
00035 #include <cstdlib>
00036 #include <cstring>
00037 #include <arpa/inet.h>
00038
00039 namespace fawkes {
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056 BlackBoardInterfaceProxy::BlackBoardInterfaceProxy(FawkesNetworkClient *client,
00057 FawkesNetworkMessage *msg,
00058 BlackBoardNotifier *notifier,
00059 Interface *interface, bool writer)
00060 {
00061 __fnc = client;
00062 if ( msg->msgid() != MSG_BB_OPEN_SUCCESS ) {
00063 throw Exception("Expected open success message");
00064 }
00065
00066 void *payload = msg->payload();
00067 bb_iopensucc_msg_t *osm = (bb_iopensucc_msg_t *)payload;
00068
00069 __notifier = notifier;
00070 __interface = interface;
00071 __instance_serial = ntohl(osm->serial);
00072 __has_writer = (osm->has_writer == 1);
00073 __num_readers = ntohl(osm->num_readers);
00074 __data_size = ntohl(osm->data_size);
00075 __clid = msg->clid();
00076 __next_msg_id = 1;
00077
00078 if ( interface->datasize() != __data_size ) {
00079
00080 throw Exception("Network message does not carry chunk of expected size");
00081 }
00082
00083 __rwlock = new RefCountRWLock();
00084 __mem_chunk = malloc(sizeof(interface_header_t) + __data_size);
00085 __data_chunk = (char *)__mem_chunk + sizeof(interface_header_t);
00086 memset(__mem_chunk, 0, sizeof(interface_header_t) + __data_size);
00087 memcpy(__data_chunk, (char *)payload + sizeof(bb_iopensucc_msg_t), __data_size);
00088
00089 interface_header_t *ih = (interface_header_t *)__mem_chunk;
00090
00091 strncpy(ih->type, interface->type(), __INTERFACE_TYPE_SIZE);
00092 strncpy(ih->id, interface->id(), __INTERFACE_ID_SIZE);
00093 memcpy(ih->hash, interface->hash(), __INTERFACE_HASH_SIZE);
00094 ih->flag_writer_active = (__has_writer ? 1 : 0);
00095 ih->num_readers = __num_readers;
00096 ih->refcount = 1;
00097
00098 interface->set_instance_serial(__instance_serial);
00099 interface->set_memory(0, __mem_chunk, __data_chunk);
00100 interface->set_mediators(this, this);
00101 interface->set_readwrite(writer, __rwlock);
00102 }
00103
00104
00105 BlackBoardInterfaceProxy::~BlackBoardInterfaceProxy()
00106 {
00107 free(__mem_chunk);
00108 }
00109
00110
00111
00112
00113
00114 void
00115 BlackBoardInterfaceProxy::process_data_changed(FawkesNetworkMessage *msg)
00116 {
00117 if ( msg->msgid() != MSG_BB_DATA_CHANGED ) {
00118 LibLogger::log_error("BlackBoardInterfaceProxy", "Expected data changed BB message, but "
00119 "received message of type %u, ignoring.", msg->msgid());
00120 return;
00121 }
00122
00123 void *payload = msg->payload();
00124 bb_idata_msg_t *dm = (bb_idata_msg_t *)payload;
00125 if ( ntohl(dm->serial) != __instance_serial ) {
00126 LibLogger::log_error("BlackBoardInterfaceProxy", "Serial mismatch, expected %u, "
00127 "but got %u, ignoring.", __instance_serial, ntohl(dm->serial));
00128 return;
00129 }
00130
00131 if ( ntohl(dm->data_size) != __data_size ) {
00132 LibLogger::log_error("BlackBoardInterfaceProxy", "Data size mismatch, expected %zu, "
00133 "but got %zu, ignoring.", __data_size, ntohl(dm->data_size));
00134 return;
00135 }
00136
00137 memcpy(__data_chunk, (char *)payload + sizeof(bb_idata_msg_t), __data_size);
00138
00139 __notifier->notify_of_data_change(__interface);
00140 }
00141
00142
00143
00144
00145
00146 void
00147 BlackBoardInterfaceProxy::process_interface_message(FawkesNetworkMessage *msg)
00148 {
00149 if ( msg->msgid() != MSG_BB_INTERFACE_MESSAGE ) {
00150 LibLogger::log_error("BlackBoardInterfaceProxy", "Expected interface BB message, but "
00151 "received message of type %u, ignoring.", msg->msgid());
00152 return;
00153 }
00154
00155 void *payload = msg->payload();
00156 bb_imessage_msg_t *mm = (bb_imessage_msg_t *)payload;
00157 if ( ntohl(mm->serial) != __instance_serial ) {
00158 LibLogger::log_error("BlackBoardInterfaceProxy", "Serial mismatch (msg), expected %u, "
00159 "but got %u, ignoring.", __instance_serial, ntohl(mm->serial));
00160 return;
00161 }
00162
00163 if ( ! __interface->is_writer() ) {
00164 LibLogger::log_error("BlackBoardInterfaceProxy", "Received interface message, but this"
00165 "is a reading instance (%s), ignoring.", __interface->uid());
00166 return;
00167 }
00168
00169 try {
00170 Message *im = __interface->create_message(mm->msg_type);
00171 im->set_id(ntohl(mm->msgid));
00172 im->set_hops(ntohl(mm->hops) + 1);
00173
00174 if (im->hops() > 1) {
00175 LibLogger::log_warn("BlackBoardInterfaceProxy", "Message IDs are not stable across more than one hop, "
00176 "message of type %s for interface %s has %u hops",
00177 im->type(), __interface->uid(), im->hops());
00178 }
00179
00180 if ( ntohl(mm->data_size) != im->datasize() ) {
00181 LibLogger::log_error("BlackBoardInterfaceProxy", "Message data size mismatch, expected "
00182 "%zu, but got %zu, ignoring.", im->datasize(), ntohl(mm->data_size));
00183 delete im;
00184 return;
00185 }
00186
00187 im->set_from_chunk((char *)payload + sizeof(bb_imessage_msg_t));
00188
00189 if ( __notifier->notify_of_message_received(__interface, im) ) {
00190 __interface->msgq_append(im);
00191 }
00192 } catch (Exception &e) {
00193 e.append("Failed to enqueue interface message for %s, ignoring", __interface->uid());
00194 LibLogger::log_error("BlackBoardInterfaceProxy", e);
00195 }
00196 }
00197
00198
00199
00200
00201
00202 void
00203 BlackBoardInterfaceProxy::reader_added(unsigned int event_serial)
00204 {
00205 ++__num_readers;
00206 __notifier->notify_of_reader_added(__interface, event_serial);
00207 }
00208
00209
00210
00211
00212 void
00213 BlackBoardInterfaceProxy::reader_removed(unsigned int event_serial)
00214 {
00215 if ( __num_readers > 0 ) {
00216 --__num_readers;
00217 }
00218 __notifier->notify_of_reader_removed(__interface, event_serial);
00219 }
00220
00221
00222
00223
00224 void
00225 BlackBoardInterfaceProxy::writer_added(unsigned int event_serial)
00226 {
00227 __has_writer = true;
00228 __notifier->notify_of_writer_added(__interface, event_serial);
00229 }
00230
00231
00232
00233
00234 void
00235 BlackBoardInterfaceProxy::writer_removed(unsigned int event_serial)
00236 {
00237 __has_writer = false;
00238 __notifier->notify_of_writer_removed(__interface, event_serial);
00239 }
00240
00241
00242
00243
00244
00245 unsigned int
00246 BlackBoardInterfaceProxy::serial() const
00247 {
00248 return __instance_serial;
00249 }
00250
00251
00252
00253
00254
00255 unsigned int
00256 BlackBoardInterfaceProxy::clid() const
00257 {
00258 return __instance_serial;
00259 }
00260
00261
00262
00263
00264 Interface *
00265 BlackBoardInterfaceProxy::interface() const
00266 {
00267 return __interface;
00268 }
00269
00270
00271
00272 bool
00273 BlackBoardInterfaceProxy::exists_writer(const Interface *interface) const
00274 {
00275 return __has_writer;
00276 }
00277
00278 unsigned int
00279 BlackBoardInterfaceProxy::num_readers(const Interface *interface) const
00280 {
00281 return __num_readers;
00282 }
00283
00284 void
00285 BlackBoardInterfaceProxy::notify_of_data_change(const Interface *interface)
00286 {
00287
00288 size_t payload_size = sizeof(bb_idata_msg_t) + interface->datasize();
00289 void *payload = malloc(payload_size);
00290 bb_idata_msg_t *dm = (bb_idata_msg_t *)payload;
00291 dm->serial = htonl(interface->serial());
00292 dm->data_size = htonl(interface->datasize());
00293 memcpy((char *)payload + sizeof(bb_idata_msg_t), interface->datachunk(),
00294 interface->datasize());
00295
00296 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(__clid, FAWKES_CID_BLACKBOARD,
00297 MSG_BB_DATA_CHANGED,
00298 payload, payload_size);
00299 __fnc->enqueue(omsg);
00300 }
00301
00302
00303
00304 void
00305 BlackBoardInterfaceProxy::transmit(Message *message)
00306 {
00307
00308 size_t payload_size = sizeof(bb_imessage_msg_t) + message->datasize();
00309 void *payload = calloc(1, payload_size);
00310 bb_imessage_msg_t *dm = (bb_imessage_msg_t *)payload;
00311 dm->serial = htonl(__interface->serial());
00312 unsigned int msgid = next_msg_id();
00313 dm->msgid = htonl(msgid);
00314 dm->hops = htonl(message->hops());
00315 message->set_id(msgid);
00316 strncpy(dm->msg_type, message->type(), __INTERFACE_MESSAGE_TYPE_SIZE);
00317 dm->data_size = htonl(message->datasize());
00318 memcpy((char *)payload + sizeof(bb_imessage_msg_t), message->datachunk(),
00319 message->datasize());
00320
00321 FawkesNetworkMessage *omsg = new FawkesNetworkMessage(__clid, FAWKES_CID_BLACKBOARD,
00322 MSG_BB_INTERFACE_MESSAGE,
00323 payload, payload_size);
00324 __fnc->enqueue(omsg);
00325 }
00326
00327 }