00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <blackboard/internal/notifier.h>
00025 #include <blackboard/blackboard.h>
00026 #include <blackboard/interface_listener.h>
00027 #include <blackboard/interface_observer.h>
00028
00029 #include <core/threading/mutex.h>
00030 #include <core/threading/mutex_locker.h>
00031 #include <core/threading/wait_condition.h>
00032 #include <core/utils/lock_hashset.h>
00033 #include <core/utils/lock_hashmap.h>
00034 #include <utils/logging/liblogger.h>
00035 #include <interface/interface.h>
00036
00037 #include <algorithm>
00038 #include <functional>
00039 #include <cstdlib>
00040 #include <cstring>
00041 #include <fnmatch.h>
00042
00043 namespace fawkes {
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055 BlackBoardNotifier::BlackBoardNotifier()
00056 {
00057 __bbil_writer_events = 0;
00058 __bbil_writer_mutex = new Mutex();
00059 __bbil_writer_waitcond = new WaitCondition(__bbil_writer_mutex);
00060
00061 __bbil_reader_events = 0;
00062 __bbil_reader_mutex = new Mutex();
00063 __bbil_reader_waitcond = new WaitCondition(__bbil_reader_mutex);
00064
00065 __bbil_data_events = 0;
00066 __bbil_data_mutex = new Mutex();
00067 __bbil_data_waitcond = new WaitCondition(__bbil_data_mutex);
00068
00069 __bbil_messages_events = 0;
00070 __bbil_messages_mutex = new Mutex();
00071 __bbil_messages_waitcond = new WaitCondition(__bbil_messages_mutex);
00072
00073 __bbio_events = 0;
00074 __bbio_mutex = new Mutex();
00075 __bbio_waitcond = new WaitCondition(__bbio_mutex);
00076 }
00077
00078
00079
00080 BlackBoardNotifier::~BlackBoardNotifier()
00081 {
00082 delete __bbil_writer_waitcond;
00083 delete __bbil_writer_mutex;
00084
00085 delete __bbil_reader_waitcond;
00086 delete __bbil_reader_mutex;
00087
00088 delete __bbil_data_waitcond;
00089 delete __bbil_data_mutex;
00090
00091 delete __bbil_messages_waitcond;
00092 delete __bbil_messages_mutex;
00093
00094 delete __bbio_waitcond;
00095 delete __bbio_mutex;
00096 }
00097
00098
00099
00100
00101
00102
00103
00104
00105 void
00106 BlackBoardNotifier::register_listener(BlackBoardInterfaceListener *listener,
00107 unsigned int flags)
00108 {
00109 if ( flags & BlackBoard::BBIL_FLAG_DATA ) {
00110 __bbil_data_mutex->lock();
00111 if (__bbil_data_events > 0) {
00112 LibLogger::log_warn("BlackBoardNotifier", "Registering interface listener %s "
00113 "for data events queued",
00114 listener->bbil_name());
00115 __bbil_data_queue.push_back(std::make_pair(true, listener));
00116 } else {
00117 add_listener(listener, listener->bbil_data_interfaces(), __bbil_data);
00118 }
00119 __bbil_data_mutex->unlock();
00120 }
00121 if ( flags & BlackBoard::BBIL_FLAG_MESSAGES ) {
00122 __bbil_messages_mutex->lock();
00123 BlackBoardInterfaceListener::InterfaceLockMapIterator i;
00124 BlackBoardInterfaceListener::InterfaceLockMap *im = listener->bbil_message_interfaces();
00125 MutexLocker lock(im->mutex());
00126 for (i = im->begin(); i != im->end(); ++i) {
00127 if ( ! i->second->is_writer() ||
00128 (__bbil_messages.find(i->first) != __bbil_messages.end()) ) {
00129 __bbil_messages_mutex->unlock();
00130 throw Exception("An interface listener has already been registered for %s",
00131 i->first.c_str());
00132 }
00133 }
00134 for (i = im->begin(); i != im->end(); ++i) {
00135 __bbil_messages[i->first] = listener;
00136 }
00137 __bbil_messages_mutex->unlock();
00138 }
00139 if ( flags & BlackBoard::BBIL_FLAG_READER ) {
00140 __bbil_reader_mutex->lock();
00141 if (__bbil_reader_events > 0) {
00142 LibLogger::log_warn("BlackBoardNotifier", "Registering interface listener %s "
00143 "for reader events queued",
00144 listener->bbil_name());
00145 __bbil_reader_queue.push_back(std::make_pair(true, listener));
00146 } else {
00147 add_listener(listener, listener->bbil_reader_interfaces(), __bbil_reader);
00148 }
00149 __bbil_reader_mutex->unlock();
00150 }
00151 if ( flags & BlackBoard::BBIL_FLAG_WRITER ) {
00152 __bbil_writer_mutex->lock();
00153 if (__bbil_writer_events > 0) {
00154 LibLogger::log_warn("BlackBoardNotifier", "Registering interface listener %s "
00155 "for writer events queued",
00156 listener->bbil_name());
00157 __bbil_writer_queue.push_back(std::make_pair(true, listener));
00158 } else {
00159 add_listener(listener, listener->bbil_writer_interfaces(), __bbil_writer);
00160 }
00161 __bbil_writer_mutex->unlock();
00162 }
00163 }
00164
00165
00166
00167
00168
00169
00170
00171 void
00172 BlackBoardNotifier::unregister_listener(BlackBoardInterfaceListener *listener)
00173 {
00174 remove_listener(listener, __bbil_writer_mutex, __bbil_writer_events,
00175 __bbil_writer_queue, __bbil_writer);
00176 remove_listener(listener, __bbil_reader_mutex, __bbil_reader_events,
00177 __bbil_reader_queue, __bbil_reader);
00178 remove_listener(listener, __bbil_data_mutex, __bbil_data_events,
00179 __bbil_data_queue, __bbil_data);
00180 remove_message_listener(listener);
00181 }
00182
00183
00184
00185
00186
00187
00188 void
00189 BlackBoardNotifier::add_listener(BlackBoardInterfaceListener *listener,
00190 BlackBoardInterfaceListener::InterfaceLockMap *im,
00191 BBilMap &ilmap)
00192 {
00193 BlackBoardInterfaceListener::InterfaceLockMapIterator i;
00194 im->lock();
00195 for (i = im->begin(); i != im->end(); ++i) {
00196 ilmap[i->first].push_back(listener);
00197 }
00198 im->unlock();
00199 }
00200
00201 void
00202 BlackBoardNotifier::remove_listener(BlackBoardInterfaceListener *listener,
00203 Mutex *mutex, unsigned int events,
00204 BBilQueue &queue, BBilMap &ilmap)
00205 {
00206 MutexLocker lock(mutex);
00207 if (events > 0) {
00208
00209
00210
00211 BBilQueue::iterator re;
00212 if ( (re = find(queue.begin(), queue.end(),
00213 std::make_pair(true, listener))) != queue.end()) {
00214
00215 queue.erase(re);
00216 }
00217 queue.push_back(std::make_pair(false, listener));
00218 } else {
00219 remove_listener(ilmap, listener);
00220 }
00221 }
00222
00223
00224
00225
00226
00227
00228 void
00229 BlackBoardNotifier::remove_listener(BBilMap &ilmap, BlackBoardInterfaceListener *listener)
00230 {
00231 BBilMapIterator i, tmp;
00232
00233 i = ilmap.begin();;
00234 while (i != ilmap.end()) {
00235 BBilListIterator j = i->second.begin();
00236 while (j != i->second.end()) {
00237 if ( *j == listener ) {
00238 j = i->second.erase(j);
00239 } else {
00240 ++j;
00241 }
00242 }
00243 if ( i->second.empty() ) {
00244 tmp = i;
00245 ++i;
00246 ilmap.erase(tmp);
00247 } else {
00248 ++i;
00249 }
00250 }
00251 }
00252
00253
00254 void
00255 BlackBoardNotifier::remove_message_listener_map(BlackBoardInterfaceListener *listener)
00256 {
00257 BBilMessageLockMapIterator i, tmp;
00258
00259 i = __bbil_messages.begin();;
00260 while (i != __bbil_messages.end()) {
00261 if ( i->second == listener ) {
00262
00263 tmp = i;
00264 ++i;
00265 __bbil_messages.erase(tmp);
00266 } else {
00267 ++i;
00268 }
00269 }
00270 }
00271
00272
00273 void
00274 BlackBoardNotifier::remove_message_listener(BlackBoardInterfaceListener *listener)
00275 {
00276 __bbil_messages_mutex->lock();
00277 if (__bbil_messages_events > 0) {
00278
00279
00280
00281 BBilQueue::iterator re;
00282 if ( (re = find(__bbil_messages_queue.begin(), __bbil_messages_queue.end(),
00283 std::make_pair(true, listener))) != __bbil_messages_queue.end()) {
00284
00285 __bbil_messages_queue.erase(re);
00286 }
00287 __bbil_messages_queue.push_back(std::make_pair(false, listener));
00288 } else {
00289 remove_message_listener_map(listener);
00290 }
00291 __bbil_messages_mutex->unlock();
00292 }
00293
00294
00295
00296
00297
00298
00299 void
00300 BlackBoardNotifier::register_observer(BlackBoardInterfaceObserver *observer,
00301 unsigned int flags)
00302 {
00303 __bbio_mutex->lock();
00304 if (__bbio_events > 0) {
00305 __bbio_queue.push_back(std::make_pair(flags, observer));
00306 } else {
00307 if ( flags & BlackBoard::BBIO_FLAG_CREATED ) {
00308 add_observer(observer, observer->bbio_get_observed_create(), __bbio_created);
00309 }
00310 if ( flags & BlackBoard::BBIO_FLAG_DESTROYED ) {
00311 add_observer(observer, observer->bbio_get_observed_destroy(), __bbio_destroyed);
00312 }
00313 }
00314 __bbio_mutex->unlock();
00315 }
00316
00317
00318 void
00319 BlackBoardNotifier::add_observer(BlackBoardInterfaceObserver *observer,
00320 BlackBoardInterfaceObserver::ObservedInterfaceLockMap *its,
00321 BBioMap &bbiomap)
00322 {
00323 BlackBoardInterfaceObserver::ObservedInterfaceLockMapIterator i;
00324 its->lock();
00325 for (i = its->begin(); i != its->end(); ++i) {
00326 bbiomap[i->first].push_back(make_pair(observer, i->second));
00327 }
00328 its->unlock();
00329 }
00330
00331
00332
00333
00334
00335
00336 void
00337 BlackBoardNotifier::remove_observer(BBioMap &iomap, BlackBoardInterfaceObserver *observer)
00338 {
00339 BBioMapIterator i, tmp;
00340
00341 i = iomap.begin();
00342 while (i != iomap.end()) {
00343 BBioListIterator j = i->second.begin();
00344 while (j != i->second.end()) {
00345 if ( j->first == observer ) {
00346 j = i->second.erase(j);
00347 } else {
00348 ++j;
00349 }
00350 }
00351 if ( i->second.empty() ) {
00352 tmp = i;
00353 ++i;
00354 iomap.erase(tmp);
00355 } else {
00356 ++i;
00357 }
00358 }
00359 }
00360
00361
00362
00363
00364
00365
00366 void
00367 BlackBoardNotifier::unregister_observer(BlackBoardInterfaceObserver *observer)
00368 {
00369 MutexLocker lock(__bbio_mutex);
00370 if ( __bbio_events > 0) {
00371 BBioQueueEntry e = std::make_pair((unsigned int)0, observer);
00372 BBioQueue::iterator re;
00373 while ( (re = find_if(__bbio_queue.begin(), __bbio_queue.end(),
00374 bind2nd(std::not_equal_to<BBioQueueEntry>(), e)))
00375 != __bbio_queue.end()) {
00376
00377 if (re->second == observer) {
00378 __bbio_queue.erase(re);
00379 }
00380 }
00381 __bbio_queue.push_back(std::make_pair(0, observer));
00382
00383 } else {
00384 remove_observer(__bbio_created, observer);
00385 remove_observer(__bbio_destroyed, observer);
00386 }
00387 }
00388
00389
00390
00391
00392
00393 void
00394 BlackBoardNotifier::notify_of_interface_created(const char *type, const char *id) throw()
00395 {
00396 __bbio_mutex->lock();
00397 __bbio_events += 1;
00398 __bbio_mutex->unlock();
00399
00400 BBioMapIterator lhmi;
00401 BBioListIterator i, l;
00402 if ( (lhmi = __bbio_created.find(type)) != __bbio_created.end() ) {
00403 BBioList &list = (*lhmi).second;
00404 for (i = list.begin(); i != list.end(); ++i) {
00405 BlackBoardInterfaceObserver *bbio = i->first;
00406 for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
00407 if (fnmatch(pi->c_str(), id, 0) == 0) {
00408 bbio->bb_interface_created(type, id);
00409 break;
00410 }
00411 }
00412 }
00413 }
00414
00415 __bbio_mutex->lock();
00416 __bbio_events -= 1;
00417 process_bbio_queue();
00418 __bbio_mutex->unlock();
00419 }
00420
00421
00422
00423
00424
00425
00426 void
00427 BlackBoardNotifier::notify_of_interface_destroyed(const char *type, const char *id) throw()
00428 {
00429 __bbio_mutex->lock();
00430 __bbio_events += 1;
00431 __bbio_mutex->unlock();
00432
00433 BBioMapIterator lhmi;
00434 BBioListIterator i, l;
00435 if ( (lhmi = __bbio_destroyed.find(type)) != __bbio_destroyed.end() ) {
00436 BBioList &list = (*lhmi).second;
00437 for (i = list.begin(); i != list.end(); ++i) {
00438 BlackBoardInterfaceObserver *bbio = i->first;
00439 for (std::list<std::string>::iterator pi = i->second.begin(); pi != i->second.end(); ++pi) {
00440 if (fnmatch(pi->c_str(), id, 0) == 0) {
00441 bbio->bb_interface_destroyed(type, id);
00442 break;
00443 }
00444 }
00445 }
00446 }
00447
00448 __bbio_mutex->lock();
00449 __bbio_events -= 1;
00450 process_bbio_queue();
00451 __bbio_mutex->unlock();
00452 }
00453
00454
00455 void
00456 BlackBoardNotifier::process_bbio_queue()
00457 {
00458 if ( ! __bbio_queue.empty() ) {
00459 if (__bbio_events > 0 ) {
00460 __bbio_waitcond->wait();
00461 } else {
00462 while (! __bbio_queue.empty()) {
00463 BBioQueueEntry &e = __bbio_queue.front();
00464 if (e.first & BlackBoard::BBIO_FLAG_CREATED) {
00465 add_observer(e.second, e.second->bbio_get_observed_create(), __bbio_created);
00466 } else if (e.first & BlackBoard::BBIO_FLAG_DESTROYED) {
00467 add_observer(e.second, e.second->bbio_get_observed_destroy(), __bbio_destroyed);
00468 } else {
00469 remove_observer(__bbio_created, e.second);
00470 remove_observer(__bbio_destroyed, e.second);
00471 }
00472 __bbio_queue.pop_front();
00473 }
00474 __bbio_waitcond->wake_all();
00475 }
00476 }
00477 }
00478
00479
00480
00481
00482
00483
00484
00485
00486 void
00487 BlackBoardNotifier::notify_of_writer_added(const Interface *interface,
00488 unsigned int event_instance_serial) throw()
00489 {
00490 __bbil_writer_mutex->lock();
00491 if ( (__bbil_writer_events > 0) && ! __bbil_writer_queue.empty() ) {
00492 __bbil_writer_waitcond->wait();
00493 }
00494 __bbil_writer_events += 1;
00495 __bbil_writer_mutex->unlock();
00496
00497 BBilMapIterator lhmi;
00498 BBilListIterator i, l;
00499 const char *uid = interface->uid();
00500 if ( (lhmi = __bbil_writer.find(uid)) != __bbil_writer.end() ) {
00501 BBilList &list = (*lhmi).second;
00502 for (i = list.begin(); i != list.end(); ++i) {
00503 BlackBoardInterfaceListener *bbil = (*i);
00504 Interface *bbil_iface = bbil->bbil_writer_interface(uid);
00505 if (bbil_iface != NULL ) {
00506 bbil->bb_interface_writer_added(bbil_iface, event_instance_serial);
00507 } else {
00508 LibLogger::log_warn("BlackBoardNotifier",
00509 "BBIL[%s] registered for writer events "
00510 "(open) for '%s' but has no such interface",
00511 bbil->bbil_name(), uid);
00512 }
00513 }
00514 }
00515
00516 __bbil_writer_mutex->lock();
00517 __bbil_writer_events -= 1;
00518 process_writer_queue();
00519 __bbil_writer_mutex->unlock();
00520 }
00521
00522
00523
00524
00525
00526
00527
00528 void
00529 BlackBoardNotifier::notify_of_writer_removed(const Interface *interface,
00530 unsigned int event_instance_serial) throw()
00531 {
00532 __bbil_writer_mutex->lock();
00533 if ( (__bbil_writer_events > 0) && ! __bbil_writer_queue.empty() ) {
00534 __bbil_writer_waitcond->wait();
00535 }
00536 __bbil_writer_events += 1;
00537 __bbil_writer_mutex->unlock();
00538
00539 BBilMapIterator lhmi;
00540 BBilListIterator i, l;
00541 const char *uid = interface->uid();
00542 if ( (lhmi = __bbil_writer.find(uid)) != __bbil_writer.end() ) {
00543 BBilList &list = (*lhmi).second;
00544 for (i = list.begin(); i != list.end(); ++i) {
00545 BlackBoardInterfaceListener *bbil = (*i);
00546 Interface *bbil_iface = bbil->bbil_writer_interface(uid);
00547 if (bbil_iface != NULL) {
00548 if (bbil_iface->serial() != event_instance_serial) {
00549 bbil->bb_interface_writer_removed(bbil_iface, event_instance_serial);
00550 }
00551 } else {
00552 LibLogger::log_warn("BlackBoardNotifier",
00553 "BBIL[%s] registered for writer events "
00554 "(close) for '%s' but has no such interface",
00555 bbil->bbil_name(), uid);
00556 }
00557 }
00558 }
00559
00560 __bbil_writer_mutex->lock();
00561 __bbil_writer_events -= 1;
00562 process_writer_queue();
00563 __bbil_writer_mutex->unlock();
00564 }
00565
00566 void
00567 BlackBoardNotifier::process_writer_queue()
00568 {
00569 if ( ! __bbil_writer_queue.empty() ) {
00570 if (__bbil_writer_events > 0 ) {
00571 __bbil_writer_waitcond->wait();
00572 } else {
00573 while (! __bbil_writer_queue.empty()) {
00574 BBilQueueEntry &e = __bbil_writer_queue.front();
00575 if (e.first) {
00576 add_listener(e.second, e.second->bbil_writer_interfaces(), __bbil_writer);
00577 } else {
00578 remove_listener(__bbil_writer, e.second);
00579 }
00580 __bbil_writer_queue.pop_front();
00581 }
00582 __bbil_writer_waitcond->wake_all();
00583 }
00584 }
00585 }
00586
00587
00588
00589
00590
00591
00592
00593 void
00594 BlackBoardNotifier::notify_of_reader_added(const Interface *interface,
00595 unsigned int event_instance_serial) throw()
00596 {
00597 __bbil_reader_mutex->lock();
00598 if ( (__bbil_reader_events > 0) && ! __bbil_reader_queue.empty() ) {
00599 __bbil_reader_waitcond->wait();
00600 }
00601 __bbil_reader_events += 1;
00602 __bbil_reader_mutex->unlock();
00603
00604 BBilMapIterator lhmi;
00605 BBilListIterator i, l;
00606 const char *uid = interface->uid();
00607 if ( (lhmi = __bbil_reader.find(uid)) != __bbil_reader.end() ) {
00608 BBilList &list = (*lhmi).second;
00609 for (i = list.begin(); i != list.end(); ++i) {
00610 BlackBoardInterfaceListener *bbil = (*i);
00611 Interface *bbil_iface = bbil->bbil_reader_interface(uid);
00612 if (bbil_iface != NULL ) {
00613 bbil->bb_interface_reader_added(bbil_iface, event_instance_serial);
00614 } else {
00615 LibLogger::log_warn("BlackBoardNotifier",
00616 "BBIL[%s] registered for reader events "
00617 "(open) for '%s' but has no such interface",
00618 bbil->bbil_name(), uid);
00619 }
00620 }
00621 }
00622
00623 __bbil_reader_mutex->lock();
00624 __bbil_reader_events -= 1;
00625 process_reader_queue();
00626 __bbil_reader_mutex->unlock();
00627 }
00628
00629
00630
00631
00632
00633
00634
00635 void
00636 BlackBoardNotifier::notify_of_reader_removed(const Interface *interface,
00637 unsigned int event_instance_serial) throw()
00638 {
00639 __bbil_reader_mutex->lock();
00640 if ( (__bbil_reader_events > 0) && ! __bbil_reader_queue.empty() ) {
00641 __bbil_reader_waitcond->wait();
00642 }
00643 __bbil_reader_events += 1;
00644 __bbil_reader_mutex->unlock();
00645
00646 BBilMapIterator lhmi;
00647 BBilListIterator i, l;
00648 const char *uid = interface->uid();
00649 if ( (lhmi = __bbil_reader.find(uid)) != __bbil_reader.end() ) {
00650 BBilList &list = (*lhmi).second;
00651 for (i = list.begin(); i != list.end(); ++i) {
00652 BlackBoardInterfaceListener *bbil = (*i);
00653 Interface *bbil_iface = bbil->bbil_reader_interface(uid);
00654 if (bbil_iface != NULL) {
00655 if (bbil_iface->serial() != event_instance_serial) {
00656 bbil->bb_interface_reader_removed(bbil_iface, event_instance_serial);
00657 }
00658 } else {
00659 LibLogger::log_warn("BlackBoardNotifier",
00660 "BBIL[%s] registered for reader events "
00661 "(close) for '%s' but has no such interface",
00662 bbil->bbil_name(), uid);
00663 }
00664 }
00665 }
00666
00667 __bbil_reader_mutex->lock();
00668 __bbil_reader_events -= 1;
00669 process_reader_queue();
00670 __bbil_reader_mutex->unlock();
00671 }
00672
00673
00674 void
00675 BlackBoardNotifier::process_reader_queue()
00676 {
00677 if ( ! __bbil_reader_queue.empty() ) {
00678 if (__bbil_reader_events > 0 ) {
00679 __bbil_reader_waitcond->wait();
00680 } else {
00681 while (! __bbil_reader_queue.empty()) {
00682 BBilQueueEntry &e = __bbil_reader_queue.front();
00683 if (e.first) {
00684 add_listener(e.second, e.second->bbil_reader_interfaces(), __bbil_reader);
00685 } else {
00686 remove_listener(__bbil_reader, e.second);
00687 }
00688 __bbil_reader_queue.pop_front();
00689 }
00690 __bbil_reader_waitcond->wake_all();
00691 }
00692 }
00693 }
00694
00695
00696
00697
00698
00699
00700
00701
00702
00703
00704
00705 void
00706 BlackBoardNotifier::notify_of_data_change(const Interface *interface)
00707 {
00708 __bbil_data_mutex->lock();
00709 if ( (__bbil_data_events > 0) && ! __bbil_data_queue.empty() ) {
00710 __bbil_data_waitcond->wait();
00711 }
00712 __bbil_data_events += 1;
00713 __bbil_data_mutex->unlock();
00714
00715 BBilMapIterator lhmi;
00716 BBilListIterator i, l;
00717 const char *uid = interface->uid();
00718 if ( (lhmi = __bbil_data.find(uid)) != __bbil_data.end() ) {
00719 BBilList &list = (*lhmi).second;
00720 for (i = list.begin(); i != list.end(); ++i) {
00721 BlackBoardInterfaceListener *bbil = (*i);
00722 Interface *bbil_iface = bbil->bbil_data_interface(uid);
00723 if (bbil_iface != NULL) {
00724 bbil->bb_interface_data_changed(bbil_iface);
00725 } else {
00726 LibLogger::log_warn("BlackBoardNotifier",
00727 "BBIL[%s] registered for data change events "
00728 "for '%s' but has no such interface",
00729 bbil->bbil_name(), uid);
00730 }
00731 }
00732 }
00733
00734 __bbil_data_mutex->lock();
00735 __bbil_data_events -= 1;
00736 if ( ! __bbil_data_queue.empty() ) {
00737 if (__bbil_data_events > 0 ) {
00738 __bbil_data_waitcond->wait();
00739 } else {
00740 while (! __bbil_data_queue.empty()) {
00741 BBilQueueEntry &e = __bbil_data_queue.front();
00742 if (e.first) {
00743 add_listener(e.second, e.second->bbil_data_interfaces(), __bbil_data);
00744 } else {
00745 remove_listener(__bbil_data, e.second);
00746 }
00747 __bbil_data_queue.pop_front();
00748 }
00749 __bbil_data_waitcond->wake_all();
00750 }
00751 }
00752 __bbil_data_mutex->unlock();
00753 }
00754
00755
00756
00757
00758
00759
00760
00761
00762
00763
00764
00765
00766
00767 bool
00768 BlackBoardNotifier::notify_of_message_received(const Interface *interface, Message *message)
00769 {
00770 __bbil_messages_mutex->lock();
00771 if ( (__bbil_messages_events > 0) && ! __bbil_messages_queue.empty() ) {
00772 __bbil_messages_waitcond->wait();
00773 }
00774 __bbil_messages_events += 1;
00775 __bbil_messages_mutex->unlock();
00776
00777 bool rv = false;
00778
00779 const char *uid = interface->uid();
00780 if ( __bbil_messages.find(uid) != __bbil_messages.end() ) {
00781 BlackBoardInterfaceListener *bbil = __bbil_messages[uid];
00782
00783 Interface *bbil_iface = bbil->bbil_message_interface(uid);
00784 if (bbil_iface != NULL ) {
00785 if ( bbil->bb_interface_message_received(bbil_iface, message) ) {
00786 rv = true;
00787 }
00788 } else {
00789 LibLogger::log_warn("BlackBoardNotifier", "BBIL[%s] registered "
00790 "for message received events for '%s' "
00791 "but has no such interface",
00792 bbil->bbil_name(), uid);
00793 }
00794 } else {
00795 rv = true;
00796 }
00797
00798 __bbil_messages_mutex->lock();
00799 __bbil_messages_events -= 1;
00800 if ( ! __bbil_messages_queue.empty() ) {
00801 if (__bbil_messages_events > 0 ) {
00802 __bbil_messages_waitcond->wait();
00803 } else {
00804 while (! __bbil_messages_queue.empty()) {
00805 BBilQueueEntry &e = __bbil_messages_queue.front();
00806
00807 remove_message_listener_map(e.second);
00808 __bbil_messages_queue.pop_front();
00809 }
00810 __bbil_messages_waitcond->wake_all();
00811 }
00812 }
00813 __bbil_messages_mutex->unlock();
00814
00815 return rv;
00816 }
00817
00818 }