1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import gst
23 import gobject
24
25 import os
26 import time
27
28 from twisted.internet import reactor, defer
29
30 from flumotion.common import common, errors, pygobject, messages, log
31 from flumotion.common import gstreamer
32 from flumotion.common.i18n import N_, gettexter
33 from flumotion.common.planet import moods
34 from flumotion.component import component as basecomponent
35 from flumotion.component import feed, padmonitor
36 from flumotion.component.feeder import Feeder
37 from flumotion.component.eater import Eater
38
39 __version__ = "$Rev: 7901 $"
40 T_ = gettexter()
41
42
44 """
45 I am a base class for all Flumotion feed components.
46 """
47
48
49 FEEDER_STATS_UPDATE_FREQUENCY = 12.5
50
51 logCategory = 'feedcomponent'
52
53
54
56
57 self.feeders = {}
58 self.eaters = {}
59 self.uiState.addListKey('feeders')
60 self.uiState.addListKey('eaters')
61
62 self.pipeline = None
63 self.pipeline_signals = []
64 self.bus_signal_id = None
65 self.effects = {}
66 self._feeder_probe_cl = None
67
68 self._pad_monitors = padmonitor.PadMonitorSet(
69 lambda: self.setMood(moods.happy),
70 lambda: self.setMood(moods.hungry))
71
72 self._clock_slaved = False
73 self.clock_provider = None
74 self._master_clock_info = None
75
76
77 self._change_monitor = gstreamer.StateChangeMonitor()
78
79
80 self._get_stats_supported = (gstreamer.get_plugin_version('tcp')
81 >= (0, 10, 11, 0))
82
84 """
85 Sets up component.
86
87 Invokes the L{create_pipeline} and L{set_pipeline} vmethods,
88 which subclasses can provide.
89 """
90 config = self.config
91 eater_config = config.get('eater', {})
92 feeder_config = config.get('feed', [])
93 source_config = config.get('source', [])
94
95 self.debug("FeedComponent.do_setup(): eater_config %r", eater_config)
96 self.debug("FeedComponent.do_setup(): feeder_config %r", feeder_config)
97 self.debug("FeedComponent.do_setup(): source_config %r", source_config)
98
99
100
101 if eater_config == {} and source_config != []:
102 eater_config = {'default': [(x, 'default') for x in source_config]}
103
104 for eaterName in eater_config:
105 for feedId, eaterAlias in eater_config[eaterName]:
106 self.eaters[eaterAlias] = Eater(eaterAlias, eaterName)
107 self.uiState.append('eaters', self.eaters[eaterAlias].uiState)
108
109 for feederName in feeder_config:
110 self.feeders[feederName] = Feeder(feederName)
111 self.uiState.append('feeders',
112 self.feeders[feederName].uiState)
113
114 clockMaster = config.get('clock-master', None)
115 if clockMaster:
116 self._clock_slaved = clockMaster != config['avatarId']
117 else:
118 self._clock_slaved = False
119
120 pipeline = self.create_pipeline()
121 self.connect_feeders(pipeline)
122 self.set_pipeline(pipeline)
123
124 self.debug("FeedComponent.do_setup(): setup finished")
125
126 self.try_start_pipeline()
127
128
129 d = self._change_monitor.add(gst.STATE_CHANGE_PAUSED_TO_PLAYING)
130 d.addCallback(lambda x: self.do_pipeline_playing())
131
133
134
135 self.debug("Setup completed")
136
137
138
140 """
141 Subclasses have to implement this method.
142
143 @rtype: L{gst.Pipeline}
144 """
145 raise NotImplementedError(
146 "subclass must implement create_pipeline")
147
157
159 elementName = self.feeders[feederName].payName
160 element = self.pipeline.get_by_name(elementName)
161 if not element:
162 raise errors.ComponentError("No such feeder %s" % feederName)
163
164 pad = element.get_pad('src')
165 self._pad_monitors.attach(pad, elementName)
166
167
168
172
174
175
176
177 def client_fd_removed(sink, fd, feeder):
178
179
180
181
182 self.debug("cleaning up fd %d", fd)
183 feeder.clientDisconnected(fd)
184
185 for feeder in self.feeders.values():
186 element = pipeline.get_by_name(feeder.elementName)
187 if element:
188 element.connect('client-fd-removed', client_fd_removed,
189 feeder)
190 self.debug("Connected to client-fd-removed on %r", feeder)
191 else:
192 self.warning("No feeder %s in pipeline", feeder.elementName)
193
196
198 """
199 Invoked when the pipeline has changed the state to playing.
200 The default implementation sets the component's mood to HAPPY.
201 """
202 self.setMood(moods.happy)
203
205 """Make a flumotion error message to show to the user.
206
207 This method may be overridden by components that have special
208 knowledge about potential errors. If the component does not know
209 about the error, it can chain up to this implementation, which
210 will make a generic message.
211
212 @param gerror: The GError from the error message posted on the
213 GStreamer message bus.
214 @type gerror: L{gst.GError}
215 @param debug: A string with debugging information.
216 @type debug: str
217
218 @returns: A L{flumotion.common.messages.Message} to show to the
219 user.
220 """
221
222 mid = "%s-%s-%d" % (self.name, gerror.domain, gerror.code)
223 m = messages.Error(T_(N_(
224 "Internal GStreamer error.")),
225 debug="%s\n%s: %d\n%s" % (
226 gerror.message, gerror.domain, gerror.code, debug),
227 mid=mid, priority=40)
228 return m
229
236
237 def error():
238 gerror, debug = message.parse_error()
239 self.warning('element %s error %s %s',
240 src.get_path_string(), gerror, debug)
241 self.setMood(moods.sad)
242
243
244 try:
245 m = self.make_message_for_gstreamer_error(gerror, debug)
246 except Exception, e:
247 msg = log.getExceptionMessage(e)
248 m = messages.Error(T_(N_(
249 "Programming error in component.")),
250 debug="Bug in %r.make_message_for_gstreamer_error: %s" % (
251 self.__class__, msg))
252
253 self.state.append('messages', m)
254 self._change_monitor.have_error(self.pipeline.get_state(),
255 message)
256
257 def eos():
258 name = src.get_name()
259 if name in self._pad_monitors:
260 self.info('End of stream in element %s', name)
261 self._pad_monitors[name].setInactive()
262 else:
263 self.info("We got an eos from %s", name)
264
265 def default():
266 self.log('message received: %r', message)
267
268 handlers = {gst.MESSAGE_STATE_CHANGED: state_changed,
269 gst.MESSAGE_ERROR: error,
270 gst.MESSAGE_EOS: eos}
271 t = message.type
272 src = message.src
273 handlers.get(t, default)()
274 return True
275
277 """Watch a set of elements for discontinuity messages.
278
279 @param eaterWatchElements: the set of elements to watch for
280 discontinuities.
281 @type eaterWatchElements: Dict of elementName => Eater.
282 """
283
284 def on_element_message(bus, message):
285 src = message.src
286 name = src.get_name()
287 if name in eaterWatchElements:
288 eater = eaterWatchElements[name]
289 s = message.structure
290
291 def timestampDiscont():
292 prevTs = s["prev-timestamp"]
293 prevDuration = s["prev-duration"]
294 curTs = s["cur-timestamp"]
295 discont = curTs - (prevTs + prevDuration)
296 dSeconds = discont / float(gst.SECOND)
297 self.debug("we have a discont on eater %s of %f s "
298 "between %s and %s ", eater.eaterAlias,
299 dSeconds, gst.TIME_ARGS(prevTs + prevDuration),
300 gst.TIME_ARGS(curTs))
301 eater.timestampDiscont(dSeconds,
302 float(curTs) / float(gst.SECOND))
303
304 def offsetDiscont():
305 prevOffsetEnd = s["prev-offset-end"]
306 curOffset = s["cur-offset"]
307 discont = curOffset - prevOffsetEnd
308 self.debug("we have a discont on eater %s of %d "
309 "units between %d and %d ",
310 eater.eaterAlias, discont, prevOffsetEnd,
311 curOffset)
312 eater.offsetDiscont(discont, curOffset)
313
314 handlers = {'imperfect-timestamp': timestampDiscont,
315 'imperfect-offset': offsetDiscont}
316 if s.get_name() in handlers:
317 handlers[s.get_name()]()
318
319
320 bus = self.pipeline.get_bus()
321
322 bus.connect("message::element", on_element_message)
323
325
326 def fdsrc_event(pad, event):
327
328
329 if event.type == gst.EVENT_EOS:
330 self.info('End of stream for eater %s, disconnect will be '
331 'triggered', eater.eaterAlias)
332
333
334
335 return False
336 return True
337
338 def depay_event(pad, event):
339
340
341
342 if event.type == gst.EVENT_NEWSEGMENT:
343
344
345
346
347
348
349
350 if getattr(eater, '_gotFirstNewSegment', False):
351 self.info("Subsequent new segment event received on "
352 "depay on eater %s", eater.eaterAlias)
353
354 return False
355 else:
356 eater._gotFirstNewSegment = True
357 return True
358
359 self.debug('adding event probe for eater %s', eater.eaterAlias)
360 fdsrc = self.get_element(eater.elementName)
361 fdsrc.get_pad("src").add_event_probe(fdsrc_event)
362 depay = self.get_element(eater.depayName)
363 depay.get_pad("src").add_event_probe(depay_event)
364
366 self.debug('setup_pipeline()')
367 assert self.bus_signal_id == None
368
369 self.pipeline.set_name('pipeline-' + self.getName())
370 bus = self.pipeline.get_bus()
371 bus.add_signal_watch()
372 self.bus_signal_id = bus.connect('message',
373 self.bus_message_received_cb)
374 sig_id = self.pipeline.connect('deep-notify',
375 gstreamer.verbose_deep_notify_cb, self)
376 self.pipeline_signals.append(sig_id)
377
378
379
380 self.pipeline.set_state(gst.STATE_READY)
381
382
383 if self._get_stats_supported:
384 self._feeder_probe_cl = reactor.callLater(
385 self.FEEDER_STATS_UPDATE_FREQUENCY,
386 self._feeder_probe_calllater)
387 else:
388 self.warning("Feeder statistics unavailable, your "
389 "gst-plugins-base is too old")
390 m = messages.Warning(T_(N_(
391 "Your gst-plugins-base is too old, so "
392 "feeder statistics will be unavailable.")),
393 mid='multifdsink')
394 m.add(T_(N_(
395 "Please upgrade '%s' to version %s."), 'gst-plugins-base',
396 '0.10.11'))
397 self.addMessage(m)
398
399 for eater in self.eaters.values():
400 self.install_eater_event_probes(eater)
401 pad = self.get_element(eater.elementName).get_pad('src')
402 self._pad_monitors.attach(pad, eater.elementName,
403 padmonitor.EaterPadMonitor,
404 self.reconnectEater,
405 eater.eaterAlias)
406 eater.setPadMonitor(self._pad_monitors[eater.elementName])
407
409 if not self.pipeline:
410 return
411
412 if self.clock_provider:
413 self.clock_provider.set_property('active', False)
414 self.clock_provider = None
415 retval = self.pipeline.set_state(gst.STATE_NULL)
416 if retval != gst.STATE_CHANGE_SUCCESS:
417 self.warning('Setting pipeline to NULL failed')
418
420 self.debug("cleaning up")
421
422 assert self.pipeline != None
423
424 self.stop_pipeline()
425
426 map(self.pipeline.disconnect, self.pipeline_signals)
427 self.pipeline_signals = []
428 if self.bus_signal_id:
429 self.pipeline.get_bus().disconnect(self.bus_signal_id)
430 self.pipeline.get_bus().remove_signal_watch()
431 self.bus_signal_id = None
432 self.pipeline = None
433
434 if self._feeder_probe_cl:
435 self._feeder_probe_cl.cancel()
436 self._feeder_probe_cl = None
437
438
439 for eater in self.eaters.values():
440 self._pad_monitors.remove(eater.elementName)
441 eater.setPadMonitor(None)
442
449
451 self.debug("Master clock set to %s:%d with base_time %s", ip, port,
452 gst.TIME_ARGS(base_time))
453
454 assert self._clock_slaved
455 if self._master_clock_info == (ip, port, base_time):
456 self.debug("Same master clock info, returning directly")
457 return defer.succeed(None)
458 elif self._master_clock_info:
459 self.stop_pipeline()
460
461 self._master_clock_info = ip, port, base_time
462
463 clock = gst.NetClientClock(None, ip, port, base_time)
464
465
466 self.pipeline.set_new_stream_time(gst.CLOCK_TIME_NONE)
467 self.pipeline.set_base_time(base_time)
468 self.pipeline.use_clock(clock)
469
470 self.try_start_pipeline()
471
473 """
474 Return the connection details for the network clock provided by
475 this component, if any.
476 """
477 if self.clock_provider:
478 ip, port, base_time = self._master_clock_info
479 return ip, port, base_time
480 else:
481 return None
482
484 """
485 Tell the component to provide a master clock on the given port.
486
487 @returns: a deferred firing a (ip, port, base_time) triple.
488 """
489
490 def pipelinePaused(r):
491 clock = self.pipeline.get_clock()
492
493 self.pipeline.use_clock(clock)
494
495 self.clock_provider = gst.NetTimeProvider(clock, None, port)
496 realport = self.clock_provider.get_property('port')
497
498 base_time = self.pipeline.get_base_time()
499
500 self.debug('provided master clock from %r, base time %s',
501 clock, gst.TIME_ARGS(base_time))
502
503 if self.medium:
504
505
506
507 ip = self.medium.getIP()
508 else:
509 ip = "127.0.0.1"
510
511 self._master_clock_info = (ip, realport, base_time)
512 return self.get_master_clock()
513
514 assert self.pipeline
515 assert not self._clock_slaved
516 (ret, state, pending) = self.pipeline.get_state(0)
517 if state != gst.STATE_PAUSED and state != gst.STATE_PLAYING:
518 self.debug("pipeline still spinning up: %r", state)
519 d = self._change_monitor.add(gst.STATE_CHANGE_READY_TO_PAUSED)
520 d.addCallback(pipelinePaused)
521 return d
522 elif self.clock_provider:
523 self.debug("returning existing master clock info")
524 return defer.succeed(self.get_master_clock())
525 else:
526 return defer.maybeDeferred(pipelinePaused, None)
527
528
529
531 """
532 Tell the component to start.
533 Whatever is using the component is responsible for making sure all
534 eaters have received their file descriptor to eat from.
535 """
536 (ret, state, pending) = self.pipeline.get_state(0)
537 if state == gst.STATE_PLAYING:
538 self.log('already PLAYING')
539 if not force:
540 return
541 self.debug('pipeline PLAYING, but starting anyway as requested')
542
543 if self._clock_slaved and not self._master_clock_info:
544 self.debug("Missing master clock info, deferring set to PLAYING")
545 return
546
547 for eater in self.eaters.values():
548 if not eater.fd:
549 self.debug('eater %s not yet connected, deferring set to '
550 'PLAYING', eater.eaterAlias)
551 return
552
553 self.debug("Setting pipeline %r to GST_STATE_PLAYING", self.pipeline)
554 self.pipeline.set_state(gst.STATE_PLAYING)
555
578
580 """
581 After this function returns, the stream lock for this eater must have
582 been released. If your component needs to do something here, override
583 this method.
584 """
585 pass
586
588 """Get an element out of the pipeline.
589
590 If it is possible that the component has not yet been set up,
591 the caller needs to check if self.pipeline is actually set.
592 """
593 assert self.pipeline
594 self.log('Looking up element %r in pipeline %r',
595 element_name, self.pipeline)
596 element = self.pipeline.get_by_name(element_name)
597 if not element:
598 self.warning("No element named %r in pipeline", element_name)
599 return element
600
602 'Gets a property of an element in the GStreamer pipeline.'
603 self.debug("%s: getting property %s of element %s" % (
604 self.getName(), property, element_name))
605 element = self.get_element(element_name)
606 if not element:
607 msg = "Element '%s' does not exist" % element_name
608 self.warning(msg)
609 raise errors.PropertyError(msg)
610
611 self.debug('getting property %s on element %s' % (
612 property, element_name))
613 try:
614 value = element.get_property(property)
615 except (ValueError, TypeError):
616 msg = "Property '%s' on element '%s' does not exist" % (
617 property, element_name)
618 self.warning(msg)
619 raise errors.PropertyError(msg)
620
621
622 if isinstance(value, gobject.GEnum):
623 value = int(value)
624
625 return value
626
628 'Sets a property on an element in the GStreamer pipeline.'
629 self.debug("%s: setting property %s of element %s to %s" % (
630 self.getName(), property, element_name, value))
631 element = self.get_element(element_name)
632 if not element:
633 msg = "Element '%s' does not exist" % element_name
634 self.warning(msg)
635 raise errors.PropertyError(msg)
636
637 self.debug('setting property %s on element %r to %s' %
638 (property, element_name, value))
639 pygobject.gobject_set_property(element, property, value)
640
641
642
644 if not self.medium:
645 self.debug("Can't reconnect eater %s, running "
646 "without a medium", eaterAlias)
647 return
648
649 self.eaters[eaterAlias].disconnected()
650 self.medium.connectEater(eaterAlias)
651
652 - def feedToFD(self, feedName, fd, cleanup, eaterId=None):
653 """
654 @param feedName: name of the feed to feed to the given fd.
655 @type feedName: str
656 @param fd: the file descriptor to feed to
657 @type fd: int
658 @param cleanup: the function to call when the FD is no longer feeding
659 @type cleanup: callable
660 """
661 self.debug('FeedToFD(%s, %d)', feedName, fd)
662
663
664
665 if (not self.pipeline or
666 self.pipeline.get_state(0)[1] == gst.STATE_NULL):
667 self.warning('told to feed %s to fd %d, but pipeline not '
668 'running yet', feedName, fd)
669 cleanup(fd)
670
671
672 return
673
674 if feedName not in self.feeders:
675 msg = "Cannot find feeder named '%s'" % feedName
676 mid = "feedToFD-%s" % feedName
677 m = messages.Warning(T_(N_("Internal Flumotion error.")),
678 debug=msg, mid=mid, priority=40)
679 self.state.append('messages', m)
680 self.warning(msg)
681 cleanup(fd)
682 return False
683
684 feeder = self.feeders[feedName]
685 element = self.get_element(feeder.elementName)
686 assert element
687 clientId = eaterId or ('client-%d' % fd)
688 element.emit('add', fd)
689 feeder.clientConnected(clientId, fd, cleanup)
690
691 - def eatFromFD(self, eaterAlias, feedId, fd):
692 """
693 Tell the component to eat the given feedId from the given fd.
694 The component takes over the ownership of the fd, closing it when
695 no longer eating.
696
697 @param eaterAlias: the alias of the eater
698 @type eaterAlias: str
699 @param feedId: feed id (componentName:feedName) to eat from through
700 the given fd
701 @type feedId: str
702 @param fd: the file descriptor to eat from
703 @type fd: int
704 """
705 self.debug('EatFromFD(%s, %s, %d)', eaterAlias, feedId, fd)
706
707 if not self.pipeline:
708 self.warning('told to eat %s from fd %d, but pipeline not '
709 'running yet', feedId, fd)
710
711
712 os.close(fd)
713 return
714
715 if eaterAlias not in self.eaters:
716 self.warning('Unknown eater alias: %s', eaterAlias)
717 os.close(fd)
718 return
719
720 eater = self.eaters[eaterAlias]
721 element = self.get_element(eater.elementName)
722 if not element:
723 self.warning('Eater element %s not found', eater.elementName)
724 os.close(fd)
725 return
726
727
728 (result, current, pending) = element.get_state(0L)
729 pipeline_playing = current not in [gst.STATE_NULL, gst.STATE_READY]
730 if pipeline_playing:
731 self.debug('eater %s in state %r, kidnapping it',
732 eaterAlias, current)
733
734
735
736
737
738
739
740 srcpad = element.get_pad('src')
741
742 def _block_cb(pad, blocked):
743 pass
744 srcpad.set_blocked_async(True, _block_cb)
745 self.unblock_eater(eaterAlias)
746
747
748 sinkpad = srcpad.get_peer()
749 srcpad.unlink(sinkpad)
750 parent = element.get_parent()
751 parent.remove(element)
752 self.log("setting to ready")
753 element.set_state(gst.STATE_READY)
754 self.log("setting to ready complete!!!")
755 old = element.get_property('fd')
756 self.log("Closing old fd %d", old)
757 os.close(old)
758 element.set_property('fd', fd)
759 parent.add(element)
760 srcpad.link(sinkpad)
761 element.set_state(gst.STATE_PLAYING)
762
763 srcpad.set_blocked_async(False, _block_cb)
764 else:
765 element.set_property('fd', fd)
766
767
768
769 eater.connected(fd, feedId)
770
771 if not pipeline_playing:
772 self.try_start_pipeline()
773