Package flumotion :: Package component :: Module feedcomponent010
[hide private]

Source Code for Module flumotion.component.feedcomponent010

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_feedcomponent010 -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gst 
 23  import gobject 
 24   
 25  import os 
 26  import time 
 27   
 28  from twisted.internet import reactor, defer 
 29   
 30  from flumotion.common import common, errors, pygobject, messages, log 
 31  from flumotion.common import gstreamer 
 32  from flumotion.common.i18n import N_, gettexter 
 33  from flumotion.common.planet import moods 
 34  from flumotion.component import component as basecomponent 
 35  from flumotion.component import feed, padmonitor 
 36  from flumotion.component.feeder import Feeder 
 37  from flumotion.component.eater import Eater 
 38   
 39  __version__ = "$Rev: 7901 $" 
 40  T_ = gettexter() 
 41   
 42   
43 -class FeedComponent(basecomponent.BaseComponent):
44 """ 45 I am a base class for all Flumotion feed components. 46 """ 47 48 # how often to update the UIState feeder statistics 49 FEEDER_STATS_UPDATE_FREQUENCY = 12.5 50 51 logCategory = 'feedcomponent' 52 53 ### BaseComponent interface implementations 54
55 - def init(self):
56 # add keys for eaters and feeders uiState 57 self.feeders = {} # feeder feedName -> Feeder 58 self.eaters = {} # eater eaterAlias -> Eater 59 self.uiState.addListKey('feeders') 60 self.uiState.addListKey('eaters') 61 62 self.pipeline = None 63 self.pipeline_signals = [] 64 self.bus_signal_id = None 65 self.effects = {} 66 self._feeder_probe_cl = None 67 68 self._pad_monitors = padmonitor.PadMonitorSet( 69 lambda: self.setMood(moods.happy), 70 lambda: self.setMood(moods.hungry)) 71 72 self._clock_slaved = False 73 self.clock_provider = None 74 self._master_clock_info = None # (ip, port, basetime) if we're the 75 # clock master 76 77 self._change_monitor = gstreamer.StateChangeMonitor() 78 79 # multifdsink's get-stats signal had critical bugs before this version 80 self._get_stats_supported = (gstreamer.get_plugin_version('tcp') 81 >= (0, 10, 11, 0))
82
83 - def do_setup(self):
84 """ 85 Sets up component. 86 87 Invokes the L{create_pipeline} and L{set_pipeline} vmethods, 88 which subclasses can provide. 89 """ 90 config = self.config 91 eater_config = config.get('eater', {}) 92 feeder_config = config.get('feed', []) 93 source_config = config.get('source', []) 94 95 self.debug("FeedComponent.do_setup(): eater_config %r", eater_config) 96 self.debug("FeedComponent.do_setup(): feeder_config %r", feeder_config) 97 self.debug("FeedComponent.do_setup(): source_config %r", source_config) 98 # for upgrade of code without restarting managers 99 # this will only be for components whose eater name in registry is 100 # default, so no need to import registry and find eater name 101 if eater_config == {} and source_config != []: 102 eater_config = {'default': [(x, 'default') for x in source_config]} 103 104 for eaterName in eater_config: 105 for feedId, eaterAlias in eater_config[eaterName]: 106 self.eaters[eaterAlias] = Eater(eaterAlias, eaterName) 107 self.uiState.append('eaters', self.eaters[eaterAlias].uiState) 108 109 for feederName in feeder_config: 110 self.feeders[feederName] = Feeder(feederName) 111 self.uiState.append('feeders', 112 self.feeders[feederName].uiState) 113 114 clockMaster = config.get('clock-master', None) 115 if clockMaster: 116 self._clock_slaved = clockMaster != config['avatarId'] 117 else: 118 self._clock_slaved = False 119 120 pipeline = self.create_pipeline() 121 self.connect_feeders(pipeline) 122 self.set_pipeline(pipeline) 123 124 self.debug("FeedComponent.do_setup(): setup finished") 125 126 self.try_start_pipeline() 127 128 # no race, messages marshalled asynchronously via the bus 129 d = self._change_monitor.add(gst.STATE_CHANGE_PAUSED_TO_PLAYING) 130 d.addCallback(lambda x: self.do_pipeline_playing())
131
132 - def setup_completed(self):
133 # Just log; we override the superclass to not turn happy here. 134 # Instead, we turn happy once the pipeline gets to PLAYING. 135 self.debug("Setup completed")
136 137 ### FeedComponent interface for subclasses 138
139 - def create_pipeline(self):
140 """ 141 Subclasses have to implement this method. 142 143 @rtype: L{gst.Pipeline} 144 """ 145 raise NotImplementedError( 146 "subclass must implement create_pipeline")
147
148 - def set_pipeline(self, pipeline):
149 """ 150 Subclasses can override me. 151 They should chain up first. 152 """ 153 if self.pipeline: 154 self.cleanup() 155 self.pipeline = pipeline 156 self._setup_pipeline()
157
158 - def attachPadMonitorToFeeder(self, feederName):
159 elementName = self.feeders[feederName].payName 160 element = self.pipeline.get_by_name(elementName) 161 if not element: 162 raise errors.ComponentError("No such feeder %s" % feederName) 163 164 pad = element.get_pad('src') 165 self._pad_monitors.attach(pad, elementName)
166 167 ### FeedComponent methods 168
169 - def addEffect(self, effect):
170 self.effects[effect.name] = effect 171 effect.setComponent(self)
172
173 - def connect_feeders(self, pipeline):
174 # Connect to the client-fd-removed signals on each feeder, so we 175 # can clean up properly on removal. 176 177 def client_fd_removed(sink, fd, feeder): 178 # Called (as a signal callback) when the FD is no longer in 179 # use by multifdsink. 180 # This will call the registered callable on the fd. 181 # Called from GStreamer threads. 182 self.debug("cleaning up fd %d", fd) 183 feeder.clientDisconnected(fd)
184 185 for feeder in self.feeders.values(): 186 element = pipeline.get_by_name(feeder.elementName) 187 if element: 188 element.connect('client-fd-removed', client_fd_removed, 189 feeder) 190 self.debug("Connected to client-fd-removed on %r", feeder) 191 else: 192 self.warning("No feeder %s in pipeline", feeder.elementName)
193
194 - def get_pipeline(self):
195 return self.pipeline
196
197 - def do_pipeline_playing(self):
198 """ 199 Invoked when the pipeline has changed the state to playing. 200 The default implementation sets the component's mood to HAPPY. 201 """ 202 self.setMood(moods.happy)
203
204 - def make_message_for_gstreamer_error(self, gerror, debug):
205 """Make a flumotion error message to show to the user. 206 207 This method may be overridden by components that have special 208 knowledge about potential errors. If the component does not know 209 about the error, it can chain up to this implementation, which 210 will make a generic message. 211 212 @param gerror: The GError from the error message posted on the 213 GStreamer message bus. 214 @type gerror: L{gst.GError} 215 @param debug: A string with debugging information. 216 @type debug: str 217 218 @returns: A L{flumotion.common.messages.Message} to show to the 219 user. 220 """ 221 # generate a unique id 222 mid = "%s-%s-%d" % (self.name, gerror.domain, gerror.code) 223 m = messages.Error(T_(N_( 224 "Internal GStreamer error.")), 225 debug="%s\n%s: %d\n%s" % ( 226 gerror.message, gerror.domain, gerror.code, debug), 227 mid=mid, priority=40) 228 return m
229
230 - def bus_message_received_cb(self, bus, message):
231 232 def state_changed(): 233 if src == self.pipeline: 234 old, new, pending = message.parse_state_changed() 235 self._change_monitor.state_changed(old, new)
236 237 def error(): 238 gerror, debug = message.parse_error() 239 self.warning('element %s error %s %s', 240 src.get_path_string(), gerror, debug) 241 self.setMood(moods.sad) 242 243 # this method can fail if the component has a mistake 244 try: 245 m = self.make_message_for_gstreamer_error(gerror, debug) 246 except Exception, e: 247 msg = log.getExceptionMessage(e) 248 m = messages.Error(T_(N_( 249 "Programming error in component.")), 250 debug="Bug in %r.make_message_for_gstreamer_error: %s" % ( 251 self.__class__, msg)) 252 253 self.state.append('messages', m) 254 self._change_monitor.have_error(self.pipeline.get_state(), 255 message) 256 257 def eos(): 258 name = src.get_name() 259 if name in self._pad_monitors: 260 self.info('End of stream in element %s', name) 261 self._pad_monitors[name].setInactive() 262 else: 263 self.info("We got an eos from %s", name) 264 265 def default(): 266 self.log('message received: %r', message) 267 268 handlers = {gst.MESSAGE_STATE_CHANGED: state_changed, 269 gst.MESSAGE_ERROR: error, 270 gst.MESSAGE_EOS: eos} 271 t = message.type 272 src = message.src 273 handlers.get(t, default)() 274 return True 275
276 - def install_eater_continuity_watch(self, eaterWatchElements):
277 """Watch a set of elements for discontinuity messages. 278 279 @param eaterWatchElements: the set of elements to watch for 280 discontinuities. 281 @type eaterWatchElements: Dict of elementName => Eater. 282 """ 283 284 def on_element_message(bus, message): 285 src = message.src 286 name = src.get_name() 287 if name in eaterWatchElements: 288 eater = eaterWatchElements[name] 289 s = message.structure 290 291 def timestampDiscont(): 292 prevTs = s["prev-timestamp"] 293 prevDuration = s["prev-duration"] 294 curTs = s["cur-timestamp"] 295 discont = curTs - (prevTs + prevDuration) 296 dSeconds = discont / float(gst.SECOND) 297 self.debug("we have a discont on eater %s of %f s " 298 "between %s and %s ", eater.eaterAlias, 299 dSeconds, gst.TIME_ARGS(prevTs + prevDuration), 300 gst.TIME_ARGS(curTs)) 301 eater.timestampDiscont(dSeconds, 302 float(curTs) / float(gst.SECOND))
303 304 def offsetDiscont(): 305 prevOffsetEnd = s["prev-offset-end"] 306 curOffset = s["cur-offset"] 307 discont = curOffset - prevOffsetEnd 308 self.debug("we have a discont on eater %s of %d " 309 "units between %d and %d ", 310 eater.eaterAlias, discont, prevOffsetEnd, 311 curOffset) 312 eater.offsetDiscont(discont, curOffset) 313 314 handlers = {'imperfect-timestamp': timestampDiscont, 315 'imperfect-offset': offsetDiscont} 316 if s.get_name() in handlers: 317 handlers[s.get_name()]() 318 319 # we know that there is a signal watch already installed 320 bus = self.pipeline.get_bus() 321 # never gets cleaned up; does that matter? 322 bus.connect("message::element", on_element_message) 323
324 - def install_eater_event_probes(self, eater):
325 326 def fdsrc_event(pad, event): 327 # An event probe used to consume unwanted EOS events on eaters. 328 # Called from GStreamer threads. 329 if event.type == gst.EVENT_EOS: 330 self.info('End of stream for eater %s, disconnect will be ' 331 'triggered', eater.eaterAlias) 332 # We swallow it because otherwise our component acts on the EOS 333 # and we can't recover from that later. Instead, fdsrc will be 334 # taken out and given a new fd on the next eatFromFD call. 335 return False 336 return True
337 338 def depay_event(pad, event): 339 # An event probe used to consume unwanted duplicate 340 # newsegment events. 341 # Called from GStreamer threads. 342 if event.type == gst.EVENT_NEWSEGMENT: 343 # We do this because we know gdppay/gdpdepay screw up on 2nd 344 # newsegments (unclear what the original reason for this 345 # was, perhaps #349204) 346 # Other elements might also have problems with repeated 347 # newsegments coming in, so we just drop them all. Flumotion 348 # operates in single segment space, so dropping newsegments 349 # should be fine. 350 if getattr(eater, '_gotFirstNewSegment', False): 351 self.info("Subsequent new segment event received on " 352 "depay on eater %s", eater.eaterAlias) 353 # swallow (gulp) 354 return False 355 else: 356 eater._gotFirstNewSegment = True 357 return True 358 359 self.debug('adding event probe for eater %s', eater.eaterAlias) 360 fdsrc = self.get_element(eater.elementName) 361 fdsrc.get_pad("src").add_event_probe(fdsrc_event) 362 depay = self.get_element(eater.depayName) 363 depay.get_pad("src").add_event_probe(depay_event) 364
365 - def _setup_pipeline(self):
366 self.debug('setup_pipeline()') 367 assert self.bus_signal_id == None 368 369 self.pipeline.set_name('pipeline-' + self.getName()) 370 bus = self.pipeline.get_bus() 371 bus.add_signal_watch() 372 self.bus_signal_id = bus.connect('message', 373 self.bus_message_received_cb) 374 sig_id = self.pipeline.connect('deep-notify', 375 gstreamer.verbose_deep_notify_cb, self) 376 self.pipeline_signals.append(sig_id) 377 378 # set to ready so that multifdsinks can always receive fds, even 379 # if the pipeline has a delayed start due to clock slaving 380 self.pipeline.set_state(gst.STATE_READY) 381 382 # start checking feeders, if we have a sufficiently recent multifdsink 383 if self._get_stats_supported: 384 self._feeder_probe_cl = reactor.callLater( 385 self.FEEDER_STATS_UPDATE_FREQUENCY, 386 self._feeder_probe_calllater) 387 else: 388 self.warning("Feeder statistics unavailable, your " 389 "gst-plugins-base is too old") 390 m = messages.Warning(T_(N_( 391 "Your gst-plugins-base is too old, so " 392 "feeder statistics will be unavailable.")), 393 mid='multifdsink') 394 m.add(T_(N_( 395 "Please upgrade '%s' to version %s."), 'gst-plugins-base', 396 '0.10.11')) 397 self.addMessage(m) 398 399 for eater in self.eaters.values(): 400 self.install_eater_event_probes(eater) 401 pad = self.get_element(eater.elementName).get_pad('src') 402 self._pad_monitors.attach(pad, eater.elementName, 403 padmonitor.EaterPadMonitor, 404 self.reconnectEater, 405 eater.eaterAlias) 406 eater.setPadMonitor(self._pad_monitors[eater.elementName])
407
408 - def stop_pipeline(self):
409 if not self.pipeline: 410 return 411 412 if self.clock_provider: 413 self.clock_provider.set_property('active', False) 414 self.clock_provider = None 415 retval = self.pipeline.set_state(gst.STATE_NULL) 416 if retval != gst.STATE_CHANGE_SUCCESS: 417 self.warning('Setting pipeline to NULL failed')
418
419 - def cleanup(self):
420 self.debug("cleaning up") 421 422 assert self.pipeline != None 423 424 self.stop_pipeline() 425 # Disconnect signals 426 map(self.pipeline.disconnect, self.pipeline_signals) 427 self.pipeline_signals = [] 428 if self.bus_signal_id: 429 self.pipeline.get_bus().disconnect(self.bus_signal_id) 430 self.pipeline.get_bus().remove_signal_watch() 431 self.bus_signal_id = None 432 self.pipeline = None 433 434 if self._feeder_probe_cl: 435 self._feeder_probe_cl.cancel() 436 self._feeder_probe_cl = None 437 438 # clean up checkEater callLaters 439 for eater in self.eaters.values(): 440 self._pad_monitors.remove(eater.elementName) 441 eater.setPadMonitor(None)
442
443 - def do_stop(self):
444 self.debug('Stopping') 445 if self.pipeline: 446 self.cleanup() 447 self.debug('Stopped') 448 return defer.succeed(None)
449
450 - def set_master_clock(self, ip, port, base_time):
451 self.debug("Master clock set to %s:%d with base_time %s", ip, port, 452 gst.TIME_ARGS(base_time)) 453 454 assert self._clock_slaved 455 if self._master_clock_info == (ip, port, base_time): 456 self.debug("Same master clock info, returning directly") 457 return defer.succeed(None) 458 elif self._master_clock_info: 459 self.stop_pipeline() 460 461 self._master_clock_info = ip, port, base_time 462 463 clock = gst.NetClientClock(None, ip, port, base_time) 464 # disable the pipeline's management of base_time -- we're going 465 # to set it ourselves. 466 self.pipeline.set_new_stream_time(gst.CLOCK_TIME_NONE) 467 self.pipeline.set_base_time(base_time) 468 self.pipeline.use_clock(clock) 469 470 self.try_start_pipeline()
471
472 - def get_master_clock(self):
473 """ 474 Return the connection details for the network clock provided by 475 this component, if any. 476 """ 477 if self.clock_provider: 478 ip, port, base_time = self._master_clock_info 479 return ip, port, base_time 480 else: 481 return None
482
483 - def provide_master_clock(self, port):
484 """ 485 Tell the component to provide a master clock on the given port. 486 487 @returns: a deferred firing a (ip, port, base_time) triple. 488 """ 489 490 def pipelinePaused(r): 491 clock = self.pipeline.get_clock() 492 # make sure the pipeline sticks with this clock 493 self.pipeline.use_clock(clock) 494 495 self.clock_provider = gst.NetTimeProvider(clock, None, port) 496 realport = self.clock_provider.get_property('port') 497 498 base_time = self.pipeline.get_base_time() 499 500 self.debug('provided master clock from %r, base time %s', 501 clock, gst.TIME_ARGS(base_time)) 502 503 if self.medium: 504 # FIXME: This isn't always correct. We need a more 505 # flexible API, and a proper network map, to do this. 506 # Even then, it's not always going to be possible. 507 ip = self.medium.getIP() 508 else: 509 ip = "127.0.0.1" 510 511 self._master_clock_info = (ip, realport, base_time) 512 return self.get_master_clock()
513 514 assert self.pipeline 515 assert not self._clock_slaved 516 (ret, state, pending) = self.pipeline.get_state(0) 517 if state != gst.STATE_PAUSED and state != gst.STATE_PLAYING: 518 self.debug("pipeline still spinning up: %r", state) 519 d = self._change_monitor.add(gst.STATE_CHANGE_READY_TO_PAUSED) 520 d.addCallback(pipelinePaused) 521 return d 522 elif self.clock_provider: 523 self.debug("returning existing master clock info") 524 return defer.succeed(self.get_master_clock()) 525 else: 526 return defer.maybeDeferred(pipelinePaused, None) 527 528 ### BaseComponent interface implementation 529
530 - def try_start_pipeline(self, force=False):
531 """ 532 Tell the component to start. 533 Whatever is using the component is responsible for making sure all 534 eaters have received their file descriptor to eat from. 535 """ 536 (ret, state, pending) = self.pipeline.get_state(0) 537 if state == gst.STATE_PLAYING: 538 self.log('already PLAYING') 539 if not force: 540 return 541 self.debug('pipeline PLAYING, but starting anyway as requested') 542 543 if self._clock_slaved and not self._master_clock_info: 544 self.debug("Missing master clock info, deferring set to PLAYING") 545 return 546 547 for eater in self.eaters.values(): 548 if not eater.fd: 549 self.debug('eater %s not yet connected, deferring set to ' 550 'PLAYING', eater.eaterAlias) 551 return 552 553 self.debug("Setting pipeline %r to GST_STATE_PLAYING", self.pipeline) 554 self.pipeline.set_state(gst.STATE_PLAYING)
555
556 - def _feeder_probe_calllater(self):
557 for feedId, feeder in self.feeders.items(): 558 feederElement = self.get_element(feeder.elementName) 559 for client in feeder.getClients(): 560 # a currently disconnected client will have fd None 561 if client.fd is not None: 562 array = feederElement.emit('get-stats', client.fd) 563 if len(array) == 0: 564 # There is an unavoidable race here: we can't know 565 # whether the fd has been removed from multifdsink. 566 # However, if we call get-stats on an fd that 567 # multifdsink doesn't know about, we just get a 568 # 0-length array. We ensure that we don't reuse 569 # the FD too soon so this can't result in calling 570 # this on a valid but WRONG fd 571 self.debug('Feeder element for feed %s does not know ' 572 'client fd %d' % (feedId, client.fd)) 573 else: 574 client.setStats(array) 575 self._feeder_probe_cl = reactor.callLater( 576 self.FEEDER_STATS_UPDATE_FREQUENCY, 577 self._feeder_probe_calllater)
578
579 - def unblock_eater(self, eaterAlias):
580 """ 581 After this function returns, the stream lock for this eater must have 582 been released. If your component needs to do something here, override 583 this method. 584 """ 585 pass
586
587 - def get_element(self, element_name):
588 """Get an element out of the pipeline. 589 590 If it is possible that the component has not yet been set up, 591 the caller needs to check if self.pipeline is actually set. 592 """ 593 assert self.pipeline 594 self.log('Looking up element %r in pipeline %r', 595 element_name, self.pipeline) 596 element = self.pipeline.get_by_name(element_name) 597 if not element: 598 self.warning("No element named %r in pipeline", element_name) 599 return element
600
601 - def get_element_property(self, element_name, property):
602 'Gets a property of an element in the GStreamer pipeline.' 603 self.debug("%s: getting property %s of element %s" % ( 604 self.getName(), property, element_name)) 605 element = self.get_element(element_name) 606 if not element: 607 msg = "Element '%s' does not exist" % element_name 608 self.warning(msg) 609 raise errors.PropertyError(msg) 610 611 self.debug('getting property %s on element %s' % ( 612 property, element_name)) 613 try: 614 value = element.get_property(property) 615 except (ValueError, TypeError): 616 msg = "Property '%s' on element '%s' does not exist" % ( 617 property, element_name) 618 self.warning(msg) 619 raise errors.PropertyError(msg) 620 621 # param enums and enums need to be returned by integer value 622 if isinstance(value, gobject.GEnum): 623 value = int(value) 624 625 return value
626
627 - def set_element_property(self, element_name, property, value):
628 'Sets a property on an element in the GStreamer pipeline.' 629 self.debug("%s: setting property %s of element %s to %s" % ( 630 self.getName(), property, element_name, value)) 631 element = self.get_element(element_name) 632 if not element: 633 msg = "Element '%s' does not exist" % element_name 634 self.warning(msg) 635 raise errors.PropertyError(msg) 636 637 self.debug('setting property %s on element %r to %s' % 638 (property, element_name, value)) 639 pygobject.gobject_set_property(element, property, value)
640 641 ### methods to connect component eaters and feeders 642
643 - def reconnectEater(self, eaterAlias):
644 if not self.medium: 645 self.debug("Can't reconnect eater %s, running " 646 "without a medium", eaterAlias) 647 return 648 649 self.eaters[eaterAlias].disconnected() 650 self.medium.connectEater(eaterAlias)
651
652 - def feedToFD(self, feedName, fd, cleanup, eaterId=None):
653 """ 654 @param feedName: name of the feed to feed to the given fd. 655 @type feedName: str 656 @param fd: the file descriptor to feed to 657 @type fd: int 658 @param cleanup: the function to call when the FD is no longer feeding 659 @type cleanup: callable 660 """ 661 self.debug('FeedToFD(%s, %d)', feedName, fd) 662 663 # We must have a pipeline in READY or above to do this. Do a 664 # non-blocking (zero timeout) get_state. 665 if (not self.pipeline or 666 self.pipeline.get_state(0)[1] == gst.STATE_NULL): 667 self.warning('told to feed %s to fd %d, but pipeline not ' 668 'running yet', feedName, fd) 669 cleanup(fd) 670 # can happen if we are restarting but the other component is 671 # happy; assume other side will reconnect later 672 return 673 674 if feedName not in self.feeders: 675 msg = "Cannot find feeder named '%s'" % feedName 676 mid = "feedToFD-%s" % feedName 677 m = messages.Warning(T_(N_("Internal Flumotion error.")), 678 debug=msg, mid=mid, priority=40) 679 self.state.append('messages', m) 680 self.warning(msg) 681 cleanup(fd) 682 return False 683 684 feeder = self.feeders[feedName] 685 element = self.get_element(feeder.elementName) 686 assert element 687 clientId = eaterId or ('client-%d' % fd) 688 element.emit('add', fd) 689 feeder.clientConnected(clientId, fd, cleanup)
690
691 - def eatFromFD(self, eaterAlias, feedId, fd):
692 """ 693 Tell the component to eat the given feedId from the given fd. 694 The component takes over the ownership of the fd, closing it when 695 no longer eating. 696 697 @param eaterAlias: the alias of the eater 698 @type eaterAlias: str 699 @param feedId: feed id (componentName:feedName) to eat from through 700 the given fd 701 @type feedId: str 702 @param fd: the file descriptor to eat from 703 @type fd: int 704 """ 705 self.debug('EatFromFD(%s, %s, %d)', eaterAlias, feedId, fd) 706 707 if not self.pipeline: 708 self.warning('told to eat %s from fd %d, but pipeline not ' 709 'running yet', feedId, fd) 710 # can happen if we are restarting but the other component is 711 # happy; assume other side will reconnect later 712 os.close(fd) 713 return 714 715 if eaterAlias not in self.eaters: 716 self.warning('Unknown eater alias: %s', eaterAlias) 717 os.close(fd) 718 return 719 720 eater = self.eaters[eaterAlias] 721 element = self.get_element(eater.elementName) 722 if not element: 723 self.warning('Eater element %s not found', eater.elementName) 724 os.close(fd) 725 return 726 727 # fdsrc only switches to the new fd in ready or below 728 (result, current, pending) = element.get_state(0L) 729 pipeline_playing = current not in [gst.STATE_NULL, gst.STATE_READY] 730 if pipeline_playing: 731 self.debug('eater %s in state %r, kidnapping it', 732 eaterAlias, current) 733 734 # we unlink fdsrc from its peer, take it out of the pipeline 735 # so we can set it to READY without having it send EOS, 736 # then switch fd and put it back in. 737 # To do this safely, we first block fdsrc:src, then let the 738 # component do any neccesary unlocking (needed for multi-input 739 # elements) 740 srcpad = element.get_pad('src') 741 742 def _block_cb(pad, blocked): 743 pass
744 srcpad.set_blocked_async(True, _block_cb) 745 self.unblock_eater(eaterAlias) 746 747 # Now, we can switch FD with this mess 748 sinkpad = srcpad.get_peer() 749 srcpad.unlink(sinkpad) 750 parent = element.get_parent() 751 parent.remove(element) 752 self.log("setting to ready") 753 element.set_state(gst.STATE_READY) 754 self.log("setting to ready complete!!!") 755 old = element.get_property('fd') 756 self.log("Closing old fd %d", old) 757 os.close(old) 758 element.set_property('fd', fd) 759 parent.add(element) 760 srcpad.link(sinkpad) 761 element.set_state(gst.STATE_PLAYING) 762 # We're done; unblock the pad 763 srcpad.set_blocked_async(False, _block_cb) 764 else: 765 element.set_property('fd', fd) 766 767 # update our eater uiState, saying that we are eating from a 768 # possibly new feedId 769 eater.connected(fd, feedId) 770 771 if not pipeline_playing: 772 self.try_start_pipeline() 773