Package flumotion :: Package manager :: Module component
[hide private]

Source Code for Module flumotion.manager.component

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_component -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  manager-side objects for components 
 24   
 25  API Stability: semi-stable 
 26  """ 
 27   
 28  import time 
 29   
 30  from twisted.spread import pb 
 31  from twisted.internet import reactor, defer 
 32  from twisted.internet import error as terror 
 33  from twisted.python.failure import Failure 
 34  from zope.interface import implements 
 35   
 36  from flumotion.configure import configure 
 37  from flumotion.manager import base, config 
 38  from flumotion.common import errors, interfaces, keycards, log, planet 
 39  from flumotion.common import messages, common 
 40  from flumotion.common.i18n import N_, gettexter 
 41  from flumotion.common.planet import moods 
 42  from flumotion.twisted import flavors 
 43   
 44  __version__ = "$Rev: 7740 $" 
 45  T_ = gettexter() 
 46   
 47   
48 -class ComponentAvatar(base.ManagerAvatar):
49 """ 50 I am a Manager-side avatar for a component. 51 I live in the L{ComponentHeaven}. 52 53 Each component that logs in to the manager gets an avatar created for it 54 in the manager. 55 56 @cvar avatarId: the L{componentId<common.componentId>} 57 @type avatarId: str 58 @cvar jobState: job state of this avatar's component 59 @type jobState: L{flumotion.common.planet.ManagerJobState} 60 @cvar componentState: component state of this avatar's component 61 @type componentState: L{flumotion.common.planet.ManagerComponentState} 62 """ 63 64 logCategory = 'comp-avatar' 65
66 - def __init__(self, heaven, avatarId, remoteIdentity, mind, conf, 67 jobState, clocking):
68 # doc in base class 69 base.ManagerAvatar.__init__(self, heaven, avatarId, 70 remoteIdentity, mind) 71 72 self.jobState = jobState 73 self.makeComponentState(conf) 74 self.clocking = clocking 75 76 self._shutdown_requested = False 77 self._shutdownDeferred = None 78 79 self.vishnu.registerComponent(self) 80 # calllater to allow the component a chance to receive its 81 # avatar, so that it has set medium.remote 82 reactor.callLater(0, self.heaven.componentAttached, self)
83 84 ### python methods 85
86 - def __repr__(self):
87 mood = '(unknown)' 88 if self.componentState: 89 moodValue = self.componentState.get('mood') 90 if moodValue is not None: 91 mood = moods.get(moodValue).name 92 return '<%s %s (mood %s)>' % (self.__class__.__name__, 93 self.avatarId, mood)
94 95 ### ComponentAvatar methods 96
97 - def makeAvatarInitArgs(klass, heaven, avatarId, remoteIdentity, 98 mind):
99 100 def gotStates(result): 101 (_s1, conf), (_s2, jobState), (_s3, clocking) = result 102 assert _s1 and _s2 and _s3 # fireOnErrback=1 103 log.debug('component-avatar', 'got state information') 104 return (heaven, avatarId, remoteIdentity, mind, 105 conf, jobState, clocking)
106 log.debug('component-avatar', 'calling mind for state information') 107 d = defer.DeferredList([mind.callRemote('getConfig'), 108 mind.callRemote('getState'), 109 mind.callRemote('getMasterClockInfo')], 110 fireOnOneErrback=True) 111 d.addCallback(gotStates) 112 return d
113 makeAvatarInitArgs = classmethod(makeAvatarInitArgs) 114
115 - def onShutdown(self):
116 # doc in base class 117 self.info('component "%s" logged out', self.avatarId) 118 119 self.vishnu.unregisterComponent(self) 120 121 if self.clocking: 122 ip, port, base_time = self.clocking 123 self.vishnu.releasePortsOnWorker(self.getWorkerName(), 124 [port]) 125 126 self.componentState.clearJobState(self._shutdown_requested) 127 128 # FIXME: why? 129 self.componentState.set('moodPending', None) 130 131 self.componentState = None 132 self.jobState = None 133 134 self.heaven.componentDetached(self) 135 136 if self._shutdownDeferred: 137 reactor.callLater(0, self._shutdownDeferred.callback, True) 138 self._shutdownDeferred = None 139 140 base.ManagerAvatar.onShutdown(self)
141 142 # my methods 143
144 - def addMessage(self, level, mid, format, *args, **kwargs):
145 """ 146 Convenience message to construct a message and add it to the 147 component state. `format' should be marked as translatable in 148 the source with N_, and *args will be stored as format 149 arguments. Keyword arguments are passed on to the message 150 constructor. See L{flumotion.common.messages.Message} for the 151 meanings of the rest of the arguments. 152 153 For example:: 154 155 self.addMessage(messages.WARNING, 'foo-warning', 156 N_('The answer is %d'), 42, debug='not really') 157 """ 158 self.addMessageObject(messages.Message(level, 159 T_(format, *args), 160 mid=mid, **kwargs))
161
162 - def addMessageObject(self, message):
163 """ 164 Add a message to the planet state. 165 166 @type message: L{flumotion.common.messages.Message} 167 """ 168 self.componentState.append('messages', message)
169
170 - def upgradeConfig(self, state, conf):
171 # different from conf['version'], eh... 172 version = conf.get('config-version', 0) 173 while version < config.CURRENT_VERSION: 174 try: 175 config.UPGRADERS[version](conf) 176 version += 1 177 conf['config-version'] = version 178 except Exception, e: 179 self.addMessage(messages.WARNING, 180 'upgrade-%d' % version, 181 N_("Failed to upgrade config %r " 182 "from version %d. Please file " 183 "a bug."), conf, version, 184 debug=log.getExceptionMessage(e)) 185 return
186
187 - def makeComponentState(self, conf):
188 # the component just logged in with good credentials. we fetched 189 # its config and job state. now there are two possibilities: 190 # (1) we were waiting for such a component to start. There was 191 # a ManagerComponentState and an avatarId in the 192 # componentMappers waiting for us. 193 # (2) we don't know anything about this component, but it has a 194 # state and config. We deal with it, creating all the 195 # neccesary internal state. 196 197 def verifyExistingComponentState(conf, state): 198 # condition (1) 199 state.setJobState(self.jobState) 200 self.componentState = state 201 202 self.upgradeConfig(state, conf) 203 if state.get('config') != conf: 204 diff = config.dictDiff(state.get('config'), conf) 205 diffMsg = config.dictDiffMessageString(diff, 206 'internal conf', 207 'running conf') 208 self.addMessage(messages.WARNING, 'stale-config', 209 N_("Component logged in with stale " 210 "configuration. To fix this, stop " 211 "this component and then restart " 212 "the manager."), 213 debug=("Updating internal conf from " 214 "running conf:\n" + diffMsg)) 215 self.warning('updating internal component state for %r', 216 state) 217 self.debug('changes to conf: %s', 218 config.dictDiffMessageString(diff)) 219 state.set('config', conf)
220 221 def makeNewComponentState(conf): 222 # condition (2) 223 state = planet.ManagerComponentState() 224 state.setJobState(self.jobState) 225 self.componentState = state 226 227 self.upgradeConfig(state, conf) 228 229 flowName, compName = conf['parent'], conf['name'] 230 231 state.set('name', compName) 232 state.set('type', conf['type']) 233 state.set('workerRequested', self.jobState.get('workerName')) 234 state.set('config', conf) 235 self.vishnu.addComponentToFlow(state, flowName) 236 return state 237 238 mState = self.vishnu.getManagerComponentState(self.avatarId) 239 if mState: 240 verifyExistingComponentState(conf, mState) 241 else: 242 makeNewComponentState(conf) 243
244 - def provideMasterClock(self):
245 """ 246 Tell the component to provide a master clock. 247 248 @rtype: L{twisted.internet.defer.Deferred} 249 """ 250 251 def success(clocking): 252 self.clocking = clocking 253 self.heaven.masterClockAvailable(self)
254 255 def error(failure): 256 self.addMessage(messages.WARNING, 'provide-master-clock', 257 N_('Failed to provide the master clock'), 258 debug=log.getFailureMessage(failure)) 259 self.vishnu.releasePortsOnWorker(self.getWorkerName(), [port]) 260 261 if self.clocking: 262 self.heaven.masterClockAvailable(self) 263 else: 264 (port, ) = self.vishnu.reservePortsOnWorker( 265 self.getWorkerName(), 1) 266 self.debug('provideMasterClock on port %d', port) 267 268 d = self.mindCallRemote('provideMasterClock', port) 269 d.addCallbacks(success, error) 270
271 - def getFeedServerPort(self):
272 """ 273 Returns the port on which a feed server for this component is 274 listening on. 275 276 @rtype: int 277 """ 278 return self.vishnu.getWorkerFeedServerPort(self.getWorkerName())
279
280 - def getRemoteManagerIP(self):
281 """ 282 Get the IP address of the manager as seen by the component. 283 284 @rtype: str 285 """ 286 return self.jobState.get('manager-ip')
287
288 - def getWorkerName(self):
289 """ 290 Return the name of the worker. 291 292 @rtype: str 293 """ 294 return self.jobState.get('workerName')
295
296 - def getPid(self):
297 """ 298 Return the PID of the component. 299 300 @rtype: int 301 """ 302 return self.jobState.get('pid')
303
304 - def getName(self):
305 """ 306 Get the name of the component. 307 308 @rtype: str 309 """ 310 return self.componentState.get('name')
311
312 - def getParentName(self):
313 """ 314 Get the name of the component's parent. 315 316 @rtype: str 317 """ 318 return self.componentState.get('parent').get('name')
319
320 - def getType(self):
321 """ 322 Get the component type name of the component. 323 324 @rtype: str 325 """ 326 return self.componentState.get('type')
327
328 - def getEaters(self):
329 """ 330 Get the set of eaters that this component eats from. 331 332 @rtype: dict of eaterName -> [(feedId, eaterAlias)] 333 """ 334 return self.componentState.get('config').get('eater', {})
335
336 - def getFeeders(self):
337 """ 338 Get the list of feeders that this component provides. 339 340 @rtype: list of feederName 341 """ 342 return self.componentState.get('config').get('feed', [])
343
344 - def getFeedId(self, feedName):
345 """ 346 Get the feedId of a feed provided or consumed by this component. 347 348 @param feedName: The name of the feed (i.e., eater alias or 349 feeder name) 350 @rtype: L{flumotion.common.common.feedId} 351 """ 352 return common.feedId(self.getName(), feedName)
353
354 - def getFullFeedId(self, feedName):
355 """ 356 Get the full feedId of a feed provided or consumed by this 357 component. 358 359 @param feedName: The name of the feed (i.e., eater alias or 360 feeder name) 361 @rtype: L{flumotion.common.common.fullFeedId} 362 """ 363 return common.fullFeedId(self.getParentName(), 364 self.getName(), feedName)
365
366 - def getVirtualFeeds(self):
367 """ 368 Get the set of virtual feeds provided by this component. 369 370 @rtype: dict of fullFeedId -> (ComponentAvatar, feederName) 371 """ 372 conf = self.componentState.get('config') 373 ret = {} 374 for feedId, feederName in conf.get('virtual-feeds', {}).items(): 375 vComp, vFeed = common.parseFeedId(feedId) 376 ffid = common.fullFeedId(self.getParentName(), vComp, vFeed) 377 ret[ffid] = (self, feederName) 378 return ret
379
380 - def getWorker(self):
381 """ 382 Get the worker that this component should run on. 383 384 @rtype: str 385 """ 386 return self.componentState.get('workerRequested')
387
388 - def getClockMaster(self):
389 """ 390 Get this component's clock master, if any. 391 392 @rtype: avatarId or None 393 """ 394 return self.componentState.get('config')['clock-master']
395
396 - def stop(self):
397 """ 398 Tell the remote component to shut down. 399 """ 400 self._shutdownDeferred = defer.Deferred() 401 402 self.mindCallRemote('stop') 403 404 return self._shutdownDeferred
405
406 - def setClocking(self, host, port, base_time):
407 # setMood on error? 408 return self.mindCallRemote('setMasterClock', host, port, base_time)
409
410 - def eatFrom(self, eaterAlias, fullFeedId, host, port):
411 self.debug('connecting eater %s to feed %s', eaterAlias, fullFeedId) 412 return self.mindCallRemote('eatFrom', eaterAlias, fullFeedId, 413 host, port)
414
415 - def feedTo(self, feederName, fullFeedId, host, port):
416 self.debug('connecting feeder %s to feed %s', feederName, fullFeedId) 417 return self.mindCallRemote('feedTo', feederName, fullFeedId, 418 host, port)
419 420 # FIXME: maybe make a BouncerComponentAvatar subclass ? 421
422 - def authenticate(self, keycard):
423 """ 424 Authenticate the given keycard. 425 Gets proxied to L{flumotion.component.bouncers.bouncer.""" \ 426 """BouncerMedium.remote_authenticate} 427 The component should be a subclass of 428 L{flumotion.component.bouncers.bouncer.Bouncer} 429 430 @type keycard: L{flumotion.common.keycards.Keycard} 431 """ 432 return self.mindCallRemote('authenticate', keycard)
433
434 - def removeKeycardId(self, keycardId):
435 """ 436 Remove a keycard managed by this bouncer because the requester 437 has gone. 438 439 @type keycardId: str 440 """ 441 return self.mindCallRemote('removeKeycardId', keycardId)
442
443 - def expireKeycard(self, keycardId):
444 """ 445 Expire a keycard issued to this component because the bouncer decided 446 to. 447 448 @type keycardId: str 449 """ 450 return self.mindCallRemote('expireKeycard', keycardId)
451
452 - def expireKeycards(self, keycardIds):
453 """ 454 Expire keycards issued to this component because the bouncer 455 decided to. 456 457 @type keycardIds: sequence of str 458 """ 459 return self.mindCallRemote('expireKeycards', keycardIds)
460
461 - def keepAlive(self, issuerName, ttl):
462 """ 463 Resets the expiry timeout for keycards issued by issuerName. 464 465 @param issuerName: the issuer for which keycards should be kept 466 alive; that is to say, keycards with the 467 attribute 'issuerName' set to this value will 468 have their ttl values reset. 469 @type issuerName: str 470 @param ttl: the new expiry timeout 471 @type ttl: number 472 """ 473 return self.mindCallRemote('keepAlive', issuerName, ttl)
474 475 ### IPerspective methods, called by the worker's component 476
477 - def perspective_cleanShutdown(self):
478 """ 479 Called by a component to tell the manager that it's shutting down 480 cleanly (and thus should go to sleeping, rather than lost or sad) 481 """ 482 self.debug("shutdown is clean, shouldn't go to lost") 483 self._shutdown_requested = True
484
485 - def perspective_removeKeycardId(self, bouncerName, keycardId):
486 """ 487 Remove a keycard on the given bouncer on behalf of a 488 component's medium. 489 490 This is requested by a component that created the keycard. 491 492 @type bouncerName: str 493 @param keycardId: id of keycard to remove 494 @type keycardId: str 495 """ 496 avatarId = common.componentId('atmosphere', bouncerName) 497 if not self.heaven.hasAvatar(avatarId): 498 self.warning('No bouncer with id %s registered', avatarId) 499 raise errors.UnknownComponentError(avatarId) 500 501 return self.heaven.getAvatar(avatarId).removeKeycardId(keycardId)
502
503 - def perspective_expireKeycard(self, requesterId, keycardId):
504 """ 505 Expire a keycard (and thus the requester's connection) 506 issued to the given requester. 507 508 This is called by the bouncer component that authenticated the keycard. 509 510 @param requesterId: name (avatarId) of the component that originally 511 requested authentication for the given keycardId 512 @type requesterId: str 513 @param keycardId: id of keycard to expire 514 @type keycardId: str 515 """ 516 # FIXME: we should also be able to expire manager bouncer keycards 517 if not self.heaven.hasAvatar(requesterId): 518 self.warning('asked to expire keycard %s for requester %s, ' 519 'but no such component registered', 520 keycardId, requesterId) 521 raise errors.UnknownComponentError(requesterId) 522 523 return self.heaven.getAvatar(requesterId).expireKeycard(keycardId)
524
525 - def perspective_expireKeycards(self, requesterId, keycardIds):
526 """ 527 Expire multiple keycards (and thus the requester's connections) 528 issued to the given requester. 529 530 This is called by the bouncer component that authenticated 531 the keycards. 532 533 @param requesterId: name (avatarId) of the component that originally 534 requested authentication for the given keycardId 535 @type requesterId: str 536 @param keycardIds: sequence of id of keycards to expire 537 @type keycardIds: sequence of str 538 """ 539 if not self.heaven.hasAvatar(requesterId): 540 self.warning('asked to expire %d keycards for requester %s, ' 541 'but no such component registered', 542 len(keycardIds), requesterId) 543 raise errors.UnknownComponentError(requesterId) 544 545 return self.heaven.getAvatar(requesterId).expireKeycards(keycardIds)
546 547
548 -class dictlist(dict):
549
550 - def add(self, key, value):
551 if key not in self: 552 self[key] = [] 553 self[key].append(value)
554
555 - def remove(self, key, value):
556 self[key].remove(value) 557 if not self[key]: 558 del self[key]
559 560
561 -class FeedMap(object, log.Loggable):
562 logName = 'feed-map' 563
564 - def __init__(self):
565 #FIXME: Use twisted.python.util.OrderedDict instead 566 self.avatars = {} 567 self._ordered_avatars = [] 568 self._dirty = True 569 self._recalc()
570
571 - def componentAttached(self, avatar):
572 assert avatar.avatarId not in self.avatars 573 self.avatars[avatar.avatarId] = avatar 574 self._ordered_avatars.append(avatar) 575 self._dirty = True
576
577 - def componentDetached(self, avatar):
578 # returns the a list of other components that will need to be 579 # reconnected 580 del self.avatars[avatar.avatarId] 581 self._ordered_avatars.remove(avatar) 582 self._dirty = True 583 # NB, feedDeps is dirty. Scrub it of avatars that have logged 584 # out 585 return [(a, f) for a, f in self.feedDeps.pop(avatar, []) 586 if a.avatarId in self.avatars]
587
588 - def _getFeederAvatar(self, eater, feedId):
589 # FIXME: 'get' part is confusing - this methods _modifies_ structures! 590 flowName = eater.getParentName() 591 compName, feedName = common.parseFeedId(feedId) 592 ffid = common.fullFeedId(flowName, compName, feedName) 593 feeder = None 594 if ffid in self.feeds: 595 feeder, feedName = self.feeds[ffid][0] 596 self.feedDeps.add(feeder, (eater, ffid)) 597 if feeder.getFeedId(feedName) != feedId: 598 self.debug('chose %s for feed %s', 599 feeder.getFeedId(feedName), feedId) 600 return feeder, feedName
601
602 - def _recalc(self):
603 if not self._dirty: 604 return 605 self.feedersForEaters = ffe = {} 606 self.eatersForFeeders = eff = dictlist() 607 self.feeds = dictlist() 608 self.feedDeps = dictlist() 609 610 for comp in self._ordered_avatars: 611 for feederName in comp.getFeeders(): 612 self.feeds.add(comp.getFullFeedId(feederName), 613 (comp, feederName)) 614 for ffid, pair in comp.getVirtualFeeds().items(): 615 self.feeds.add(ffid, pair) 616 617 for eater in self.avatars.values(): 618 for pairs in eater.getEaters().values(): 619 for feedId, eName in pairs: 620 feeder, fName = self._getFeederAvatar(eater, feedId) 621 if feeder: 622 ffe[eater.getFullFeedId(eName)] = ( 623 eName, feeder, fName) 624 eff.add(feeder.getFullFeedId(fName), 625 (fName, eater, eName)) 626 else: 627 self.debug('eater %s waiting for feed %s to log in', 628 eater.getFeedId(eName), feedId) 629 self._dirty = False
630
631 - def getFeedersForEaters(self, avatar):
632 """Get the set of feeds that this component is eating from, 633 keyed by eater alias. 634 635 @return: a list of (eaterAlias, feederAvatar, feedName) tuples 636 @rtype: list of (str, ComponentAvatar, str) 637 """ 638 self._recalc() 639 ret = [] 640 for tups in avatar.getEaters().values(): 641 for feedId, alias in tups: 642 ffid = avatar.getFullFeedId(alias) 643 if ffid in self.feedersForEaters: 644 ret.append(self.feedersForEaters[ffid]) 645 return ret
646
647 - def getFeedersForEater(self, avatar, ffid):
648 """Get the set of feeds that this component is eating from 649 for the given feedId. 650 651 @param avatar: the eater component 652 @type avatar: L{ComponentAvatar} 653 @param ffid: full feed id for which to return feeders 654 @type ffid: str 655 @return: a list of (eaterAlias, feederAvatar, feedName) tuples 656 @rtype: list of (str, L{ComponentAvatar}, str) 657 """ 658 self._recalc() 659 ret = [] 660 for feeder, feedName in self.feeds.get(ffid, []): 661 rffid = feeder.getFullFeedId(feedName) 662 eff = self.eatersForFeeders.get(rffid, []) 663 for fName, eater, eaterName in eff: 664 if eater == avatar: 665 ret.append((eaterName, feeder, feedName)) 666 return ret
667
668 - def getEatersForFeeders(self, avatar):
669 """Get the set of eaters that this component feeds, keyed by 670 feeder name. 671 672 @return: a list of (feederName, eaterAvatar, eaterAlias) tuples 673 @rtype: list of (str, ComponentAvatar, str) 674 """ 675 self._recalc() 676 ret = [] 677 for feedName in avatar.getFeeders(): 678 ffid = avatar.getFullFeedId(feedName) 679 if ffid in self.eatersForFeeders: 680 ret.extend(self.eatersForFeeders[ffid]) 681 return ret
682 683
684 -class ComponentHeaven(base.ManagerHeaven):
685 """ 686 I handle all registered components and provide L{ComponentAvatar}s 687 for them. 688 """ 689 690 implements(interfaces.IHeaven) 691 avatarClass = ComponentAvatar 692 693 logCategory = 'comp-heaven' 694
695 - def __init__(self, vishnu):
696 # doc in base class 697 base.ManagerHeaven.__init__(self, vishnu) 698 self.feedMap = FeedMap()
699 700 ### our methods 701
702 - def feedServerAvailable(self, workerName):
703 self.debug('feed server %s logged in, we can connect to its port', 704 workerName) 705 # can be made more efficient 706 for avatar in self.avatars.values(): 707 if avatar.getWorkerName() == workerName: 708 self._setupClocking(avatar) 709 self._connectEatersAndFeeders(avatar)
710
711 - def masterClockAvailable(self, component):
712 self.debug('master clock for %r provided on %r', component.avatarId, 713 component.clocking) 714 component_flow = component.getParentName() 715 # can be made more efficient 716 for avatar in self.avatars.values(): 717 if avatar.avatarId != component.avatarId: 718 flow = avatar.getParentName() 719 if flow == component_flow: 720 self._setupClocking(avatar)
721
722 - def _setupClocking(self, avatar):
723 master = avatar.getClockMaster() 724 if master: 725 if master == avatar.avatarId: 726 self.debug('Need for %r to provide a clock master', 727 master) 728 avatar.provideMasterClock() 729 else: 730 self.debug('Need to synchronize with clock master %r', 731 master) 732 # if master in self.avatars would be natural, but it seems 733 # that for now due to the getClocking() calls etc we need to 734 # check against the componentMapper set. could (and probably 735 # should) be fixed in the future. 736 m = self.vishnu.getComponentMapper(master) 737 if m and m.avatar: 738 clocking = m.avatar.clocking 739 if clocking: 740 host, port, base_time = clocking 741 avatar.setClocking(host, port, base_time) 742 else: 743 self.warning('%r should provide a clock master ' 744 'but is not doing so', master) 745 # should we componentAvatar.provideMasterClock() ? 746 else: 747 self.debug('clock master not logged in yet, will ' 748 'set clocking later')
749
750 - def componentAttached(self, avatar):
751 # No need to wait for any of this, they are not interdependent 752 assert avatar.avatarId in self.avatars 753 self.feedMap.componentAttached(avatar) 754 self._setupClocking(avatar) 755 self._connectEatersAndFeeders(avatar)
756
757 - def componentDetached(self, avatar):
758 assert avatar.avatarId not in self.avatars 759 compsNeedingReconnect = self.feedMap.componentDetached(avatar) 760 if self.vishnu.running: 761 self.debug('will reconnect: %r', compsNeedingReconnect) 762 # FIXME: this will need revision when we have the 'feedTo' 763 # direction working 764 for comp, ffid in compsNeedingReconnect: 765 self._connectEaters(comp, ffid)
766
767 - def mapNetFeed(self, fromAvatar, toAvatar):
768 """ 769 @param fromAvatar: the avatar to connect from 770 @type fromAvatar: L{ComponentAvatar} 771 @param fromAvatar: the avatar to connect to 772 @type toAvatar: L{ComponentAvatar} 773 774 @returns: the host and port on which to make the connection to 775 toAvatar from fromAvatar 776 @rtype: tuple of (str, int or None) 777 """ 778 toHost = toAvatar.getClientAddress() 779 toPort = toAvatar.getFeedServerPort() # can be None 780 781 # FIXME: until network map is implemented, hack to assume that 782 # connections from what appears to us to be the same IP go 783 # through localhost instead. Allows connections between 784 # components on a worker behind a firewall, but not between 785 # components running on different workers, both behind a 786 # firewall 787 fromHost = fromAvatar.mind.broker.transport.getPeer().host 788 if fromHost == toHost: 789 toHost = '127.0.0.1' 790 791 self.debug('mapNetFeed from %r to %r: %s:%r', fromAvatar, toAvatar, 792 toHost, toPort) 793 return toHost, toPort
794
795 - def _connectFeederToEater(self, fromComp, fromFeed, 796 toComp, toFeed, method):
797 host, port = self.mapNetFeed(fromComp, toComp) 798 if port: 799 fullFeedId = toComp.getFullFeedId(toFeed) 800 proc = getattr(fromComp, method) 801 proc(fromFeed, fullFeedId, host, port) 802 else: 803 self.debug('postponing connection to %s: feed server ' 804 'unavailable', toComp.getFeedId(toFeed))
805
806 - def _connectEatersAndFeeders(self, avatar):
807 # FIXME: all connections are upstream for now 808 809 def always(otherComp): 810 return True
811 812 def never(otherComp): 813 return False
814 directions = [(self.feedMap.getFeedersForEaters, 815 always, 'eatFrom', 'feedTo'), 816 (self.feedMap.getEatersForFeeders, 817 never, 'feedTo', 'eatFrom')] 818 819 myComp = avatar 820 for getPeers, initiate, directMethod, reversedMethod in directions: 821 for myFeedName, otherComp, otherFeedName in getPeers(myComp): 822 if initiate(otherComp): 823 # we initiate the connection 824 self._connectFeederToEater(myComp, myFeedName, otherComp, 825 otherFeedName, directMethod) 826 else: 827 # make the other component initiate connection 828 self._connectFeederToEater(otherComp, otherFeedName, 829 myComp, myFeedName, 830 reversedMethod) 831
832 - def _connectEaters(self, avatar, ffid):
833 # FIXME: all connections are upstream for now 834 ffe = self.feedMap.getFeedersForEater(avatar, ffid) 835 for myFeedName, otherComp, otherFeedName in ffe: 836 self._connectFeederToEater(avatar, myFeedName, otherComp, 837 otherFeedName, 'eatFrom')
838