1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import os
23 import string
24 import time
25
26 from twisted.web import server, http
27 from twisted.web.resource import Resource
28 from twisted.internet import defer, reactor, error
29 from twisted.cred import credentials
30 from zope.interface import implements
31
32 from flumotion.common import log, messages, errors, netutils, interfaces
33 from flumotion.common.i18n import N_, gettexter
34 from flumotion.component import component
35 from flumotion.component.base import http as httpbase
36 from flumotion.component.component import moods
37 from flumotion.component.misc.httpserver import httpfile, localprovider
38 from flumotion.component.misc.httpserver import serverstats
39 from flumotion.component.misc.porter import porterclient
40 from flumotion.twisted import fdserver
41
42 __version__ = "$Rev: 8058 $"
43 T_ = gettexter()
44
45 UPTIME_UPDATE_INTERVAL = 5
46
47
49
51 server.Request.__init__(self, channel, queued)
52 now = time.time()
53 self.lastTimeWritten = now
54
55
56
57 self.fd = self.transport.fileno()
58
59 self._component = channel.factory.component
60 self._transfer = None
61 self._provider = None
62 self._startTime = now
63 self._completionTime = None
64 self._rangeFirstByte = None
65 self._rangeLastByte = None
66 self._resourceSize = None
67 self._bytesWritten = 0L
68
69
70 self.stats = serverstats.RequestStatistics(self._component.stats)
71
72 self._component.requestStarted(self)
73
75 self._rangeFirstByte = first
76 self._rangeLastByte = last
77 self._resourceSize = size
78
86
95
101
103 if self._completionTime is None:
104 self._completionTime = time.time()
105
106 self.stats.onCompleted(self._resourceSize)
107 duration = self._completionTime - self._startTime
108 self._component.requestFinished(self, self.stats.bytesSent,
109 duration, fd)
110
112 headers = self.getAllHeaders()
113 duration = (self._completionTime or time.time()) - self._startTime
114 requestFields = {'ip': self.getClientIP(),
115 'method': self.method,
116 'uri': self.uri,
117 'get-parameters': self.args,
118 'clientproto': self.clientproto,
119 'response': self.code,
120 'bytes-sent': self._bytesWritten,
121 'referer': headers.get('referer', None),
122 'user-agent': headers.get('user-agent', None),
123 'time-connected': duration,
124 'resource-size': self._resourceSize,
125 'range-first': self._rangeFirstByte,
126 'range-last': self._rangeLastByte}
127 if self._provider:
128
129 providerFields = self._provider.getLogFields()
130 providerFields.update(requestFields)
131 requestFields = providerFields
132 return requestFields
133
134
135 -class Site(server.Site):
142
143
145 """
146 I wrap a statistics ui state entry, to allow updates.
147 """
148
150 self._state = state
151 self._key = key
152
153 - def update(self, name, value):
154 if value != self._state.get(self._key).get(name, None):
155 self._state.setitem(self._key, name, value)
156
157
159
165
167 """
168 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None.
169 """
170 return self.callRemote('authenticate', bouncerName, keycard)
171
172 - def keepAlive(self, bouncerName, issuerName, ttl):
173 """
174 @rtype: L{twisted.internet.defer.Deferred}
175 """
176 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
177
179 """
180 @rtype: L{twisted.internet.defer.Deferred}
181 """
182 return self.callRemote('removeKeycardId', bouncerName, keycardId)
183
186
189
192
195
198
201
202
204 implements(interfaces.IStreamingComponent)
205
206 componentMediumClass = HTTPFileMedium
207
208 REQUEST_TIMEOUT = 30
209
210
212 self.mountPoint = None
213 self.type = None
214 self.port = None
215 self.hostname = None
216 self.stats = None
217 self._rateControlPlug = None
218 self._fileProviderPlug = None
219 self._metadataProviderPlug = None
220 self._loggers = []
221 self._requestModifiers = []
222 self._logfilter = None
223 self.httpauth = None
224 self._startTime = time.time()
225 self._uptimeCallId = None
226
227 self._description = 'On-Demand Flumotion Stream'
228
229 self._singleFile = False
230 self._connected_clients = {}
231 self._total_bytes_written = 0
232
233 self._pbclient = None
234
235 self._twistedPort = None
236 self._timeoutRequestsCallLater = None
237
238 self._pendingDisconnects = {}
239 self._rootResource = None
240
241
242
243 self._mimeToResource = {
244 'video/x-flv': httpfile.FLVFile,
245 'video/mp4': httpfile.MP4File,
246 }
247
248 self.uiState.addKey('stream-url', None)
249 self.uiState.addKey('server-uptime', 0)
250 self.uiState.addKey('file-provider', None)
251 self.uiState.addDictKey('request-statistics')
252 self.uiState.addDictKey('provider-statistics')
253
255 props = self.config['properties']
256 self.fixRenamedProperties(props, [
257 ('issuer', 'issuer-class'),
258 ('porter_socket_path', 'porter-socket-path'),
259 ('porter_username', 'porter-username'),
260 ('porter_password', 'porter-password'),
261 ('mount_point', 'mount-point')])
262
263 if props.get('type', 'master') == 'slave':
264 for k in 'socket-path', 'username', 'password':
265 if not 'porter-' + k in props:
266 msg = 'slave mode, missing required property porter-%s' % k
267 return defer.fail(errors.ConfigError(msg))
268
269 path = props.get('path', None)
270 if path is None:
271 return
272 if os.path.isfile(path):
273 self._singleFile = True
274 elif os.path.isdir(path):
275 self._singleFile = False
276 else:
277 msg = "the file or directory specified in 'path': %s does " \
278 "not exist or is neither a file nor directory" % path
279 return defer.fail(errors.ConfigError(msg))
280
282 desc = props.get('description', None)
283 if desc:
284 self._description = desc
285
286
287 mountPoint = props.get('mount-point', '/')
288 if not mountPoint.startswith('/'):
289 mountPoint = '/' + mountPoint
290 self.mountPoint = mountPoint
291 self.hostname = props.get('hostname', None)
292 if not self.hostname:
293 self.hostname = netutils.guess_public_hostname()
294
295 self.type = props.get('type', 'master')
296 self.port = props.get('port', 8801)
297 if self.type == 'slave':
298
299 self._porterPath = props['porter-socket-path']
300 self._porterUsername = props['porter-username']
301 self._porterPassword = props['porter-password']
302 socket = 'flumotion.component.plugs.request.RequestLoggerPlug'
303 self._loggers = self.plugs.get(socket, [])
304 socket = \
305 'flumotion.component.plugs.requestmodifier.RequestModifierPlug'
306 self._requestModifiers = self.plugs.get(socket, [])
307
308 self.httpauth = httpbase.HTTPAuthentication(self)
309 if 'avatarId' in self.config:
310 self.httpauth.setRequesterId(self.config['avatarId'])
311 if 'bouncer' in props:
312 self.httpauth.setBouncerName(props['bouncer'])
313 if 'issuer-class' in props:
314 self.httpauth.setIssuerClass(props['issuer-class'])
315 if 'ip-filter' in props:
316 logFilter = http.LogFilter()
317 for f in props['ip-filter']:
318 logFilter.addIPFilter(f)
319 self._logfilter = logFilter
320 socket = \
321 'flumotion.component.misc.httpserver.ratecontrol.RateControllerPlug'
322 plugs = self.plugs.get(socket, [])
323 if plugs:
324
325 path = props.get('path')
326 self._rateControlPlug = self.plugs[socket][-1]
327
328 socket = \
329 'flumotion.component.misc.httpserver.fileprovider.FileProviderPlug'
330 plugs = self.plugs.get(socket, [])
331 if plugs:
332
333 self._fileProviderPlug = plugs[-1]
334 else:
335
336
337 plugProps = {"properties": {"path": props.get('path', None)}}
338 self._fileProviderPlug = localprovider.FileProviderLocalPlug(
339 plugProps)
340
341 socket = ('flumotion.component.misc.httpserver'
342 '.metadataprovider.MetadataProviderPlug')
343 plugs = self.plugs.get(socket, [])
344 if plugs:
345 self._metadataProviderPlug = plugs[-1]
346
347
348 self.uiState.set('stream-url', self.getUrl())
349
351 self.have_properties(self.config['properties'])
352
353 root = self._rootResource
354 if root is None:
355 root = self._getDefaultRootResource()
356
357 if root is None:
358 raise errors.WrongStateError(
359 "a resource or path property must be set")
360
361 site = Site(root, self)
362 self._timeoutRequestsCallLater = reactor.callLater(
363 self.REQUEST_TIMEOUT, self._timeoutRequests)
364
365
366 self.stats = serverstats.ServerStatistics()
367 updater = StatisticsUpdater(self.uiState, "request-statistics")
368 self.stats.startUpdates(updater)
369 updater = StatisticsUpdater(self.uiState, "provider-statistics")
370 self._fileProviderPlug.startStatsUpdates(updater)
371 self._updateUptime()
372
373 d = defer.Deferred()
374 if self.type == 'slave':
375
376 if self._singleFile:
377 self._pbclient = porterclient.HTTPPorterClientFactory(
378 site, [self.mountPoint], d)
379 else:
380 self._pbclient = porterclient.HTTPPorterClientFactory(
381 site, [], d,
382 prefixes=[self.mountPoint])
383 creds = credentials.UsernamePassword(self._porterUsername,
384 self._porterPassword)
385 self._pbclient.startLogin(creds, self._pbclient.medium)
386 self.debug("Starting porter login!")
387
388 reactor.connectWith(fdserver.FDConnector, self._porterPath,
389 self._pbclient, 10, checkPID=False)
390 else:
391
392 try:
393 self.debug('Going to listen on port %d' % self.port)
394 iface = ""
395
396
397 self._twistedPort = reactor.listenTCP(self.port,
398 site, interface=iface)
399 self.port = self._twistedPort.getHost().port
400 self.debug('Listening on port %d' % self.port)
401 except error.CannotListenError:
402 t = 'Port %d is not available.' % self.port
403 self.warning(t)
404 m = messages.Error(T_(N_(
405 "Network error: TCP port %d is not available."),
406 self.port))
407 self.addMessage(m)
408 self.setMood(moods.sad)
409 return defer.fail(errors.ComponentSetupHandledError(t))
410
411 d.callback(None)
412
413
414 def setComponentHappy(result):
415 self.httpauth.scheduleKeepAlive()
416 self.setMood(moods.happy)
417 return result
418 d.addCallback(setComponentHappy)
419 return d
420
422 if self.stats:
423 self.stats.stopUpdates()
424 if self._fileProviderPlug:
425 self._fileProviderPlug.stopStatsUpdates()
426 if self.httpauth:
427 self.httpauth.stopKeepAlive()
428 if self._timeoutRequestsCallLater:
429 self._timeoutRequestsCallLater.cancel()
430 self._timeoutRequestsCallLater = None
431 if self._uptimeCallId:
432 self._uptimeCallId.cancel()
433 self._uptimeCallId = None
434 if self._twistedPort:
435 self._twistedPort.stopListening()
436
437 l = [self.remove_all_clients()]
438 if self.type == 'slave' and self._pbclient:
439 if self._singleFile:
440 l.append(self._pbclient.deregisterPath(self.mountPoint))
441 else:
442 l.append(self._pbclient.deregisterPrefix(self.mountPoint))
443 return defer.DeferredList(l)
444
446 """
447 Provide a new set of porter login information, for when we're in slave
448 mode and the porter changes.
449 If we're currently connected, this won't disconnect - it'll just change
450 the information so that next time we try and connect we'll use the
451 new ones
452 @param path: new path
453 @param username: new username
454 @param password: new password
455 """
456 if self.type != 'slave':
457 raise errors.WrongStateError(
458 "Can't specify porter details in master mode")
459
460 self._porterUsername = username
461 self._porterPassword = password
462
463 creds = credentials.UsernamePassword(self._porterUsername,
464 self._porterPassword)
465 self._pbclient.startLogin(creds, self.medium)
466
467 self._updatePath(path)
468
470
471 if path == self._porterPath:
472 return
473 self._porterPath = path
474
475
476 self._pbclient.stopTrying()
477
478 self._pbclient.resetDelay()
479 reactor.connectWith(fdserver.FDConnector, self._porterPath,
480 self._pbclient, 10, checkPID=False)
481
503
505 node = self._fileProviderPlug.getRootPath()
506 if node is None:
507 return None
508
509 self.debug('Starting with mount point "%s"' % self.mountPoint)
510 factory = httpfile.MimedFileFactory(self.httpauth,
511 mimeToResource=self._mimeToResource,
512 rateController=self._rateControlPlug,
513 requestModifiers=self._requestModifiers,
514 metadataProvider=self._metadataProviderPlug)
515
516 root = factory.create(node)
517 if self.mountPoint != '/':
518 root = self._createRootResourceForPath(self.mountPoint, root)
519
520 return root
521
523 if path.endswith('/'):
524 path = path[:-1]
525
526 root = Resource()
527 children = string.split(path[1:], '/')
528 parent = root
529 for child in children[:-1]:
530 resource = Resource()
531 self.debug("Putting Resource at %s", child)
532 parent.putChild(child, resource)
533 parent = resource
534 self.debug("Putting resource %r at %r", fileResource, children[-1])
535 parent.putChild(children[-1], fileResource)
536 return root
537
539 """
540 Remove a client when requested.
541
542 Used by keycard expiry.
543 """
544 if fd in self._connected_clients:
545 request = self._connected_clients[fd]
546 self.debug("Removing client for fd %d", fd)
547 request.unregisterProducer()
548 request.channel.transport.loseConnection()
549 else:
550 self.debug("No client with fd %d found", fd)
551
553 l = []
554 for fd in self._connected_clients:
555 d = defer.Deferred()
556 self._pendingDisconnects[fd] = d
557 l.append(d)
558
559 request = self._connected_clients[fd]
560 request.unregisterProducer()
561 request.channel.transport.loseConnection()
562
563 self.debug("Waiting for %d clients to finish", len(l))
564 return defer.DeferredList(l)
565
567
568 fd = request.transport.fileno()
569 self._connected_clients[fd] = request
570 self.debug("[fd %5d] (ts %f) request %r started",
571 fd, time.time(), request)
572
574
575
576 self.debug('[fd %5d] (ts %f) finishing request %r',
577 request.transport.fileno(), time.time(), request)
578
579 self.httpauth.cleanupAuth(fd)
580 ip = request.getClientIP()
581 if not self._logfilter or not self._logfilter.isInRange(ip):
582 fields = request.getLogFields()
583 fields.update({'time': time.gmtime(),
584 'username': '-'})
585 l = []
586 for logger in self._loggers:
587 l.append(defer.maybeDeferred(
588 logger.event, 'http_session_completed', fields))
589 d = defer.DeferredList(l)
590 else:
591 d = defer.succeed(None)
592
593 del self._connected_clients[fd]
594
595 self._total_bytes_written += bytesWritten
596
597 def firePendingDisconnect(_):
598 self.debug("Logging completed")
599 if fd in self._pendingDisconnects:
600 pending = self._pendingDisconnects.pop(fd)
601 self.debug("Firing pending disconnect deferred")
602 pending.callback(None)
603
604
605 self.debug('[fd %5d] (ts %f) finished request %r',
606 fd, time.time(), request)
607
608 d.addCallback(firePendingDisconnect)
609
611 return self._description
612
614 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
615
617 socket = 'flumotion.component.plugs.streamdata.StreamDataProviderPlug'
618 if self.plugs[socket]:
619 plug = self.plugs[socket][-1]
620 return plug.getStreamData()
621 else:
622 return {'protocol': 'HTTP',
623 'description': self._description,
624 'url': self.getUrl()}
625
627 """
628 Return the number of connected clients
629 """
630 return len(self._connected_clients)
631
633 """
634 Current Bandwidth
635 """
636 bytesTransferred = self._total_bytes_written
637 for request in self._connected_clients.values():
638 if request._transfer:
639 bytesTransferred += request._transfer.bytesWritten
640 return bytesTransferred
641
643 """
644 Return a tuple (deltaadded, deltaremoved, bytes_transferred,
645 current_clients, current_load) of our current bandwidth and
646 user values. The deltas and current_load are NOT currently
647 implemented here, we set them as zero.
648 """
649 return (0, 0, self.getBytesSent(), self.getClients(), 0)
650
652 """
653 Close the logfile, then reopen using the previous logfilename
654 """
655 for logger in self._loggers:
656 self.debug('rotating logger %r' % logger)
657 logger.rotate()
658
660 """Attaches a root resource to this component. The root resource is the
661 once which will be used when accessing the mount point.
662 This is normally called from a plugs start() method.
663 @param resource: root resource
664 @type resource: L{twisted.web.resource.Resource}
665 """
666 rootResource = self._createRootResourceForPath(
667 self.getMountPoint(), resource)
668
669 self._rootResource = rootResource
670
672 """Get the mount point of this component
673 @returns: the mount point
674 """
675
676 return self.config['properties'].get('mount-point')
677
683