Package flumotion :: Package common :: Module medium
[hide private]

Source Code for Module flumotion.common.medium

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """base classes for PB client-side mediums. 
 23  """ 
 24   
 25  import time 
 26   
 27  from twisted.spread import pb 
 28  from twisted.internet import defer, reactor 
 29  from zope.interface import implements 
 30   
 31  from flumotion.common import log, interfaces, bundleclient, errors 
 32  from flumotion.common import messages 
 33  from flumotion.common.netutils import addressGetHost 
 34  from flumotion.configure import configure 
 35  from flumotion.twisted import pb as fpb 
 36   
 37  __version__ = "$Rev: 7371 $" 
 38   
 39   
40 -class BaseMedium(fpb.Referenceable):
41 """ 42 I am a base interface for PB clients interfacing with PB server-side 43 avatars. 44 Used by admin/worker/component to talk to manager's vishnu, 45 and by job to talk to worker's brain. 46 47 @ivar remote: a remote reference to the server-side object on 48 which perspective_(methodName) methods can be called 49 @type remote: L{twisted.spread.pb.RemoteReference} 50 @type bundleLoader: L{flumotion.common.bundleclient.BundleLoader} 51 """ 52 53 # subclasses will need to set this to the specific medium type 54 # tho... 55 implements(interfaces.IMedium) 56 logCategory = "basemedium" 57 remoteLogName = "baseavatar" 58 59 remote = None 60 bundleLoader = None 61
62 - def setRemoteReference(self, remoteReference):
63 """ 64 Set the given remoteReference as the reference to the server-side 65 avatar. 66 67 @param remoteReference: L{twisted.spread.pb.RemoteReference} 68 """ 69 self.debug('%r.setRemoteReference: %r' % (self, remoteReference)) 70 self.remote = remoteReference 71 72 def nullRemote(x): 73 self.debug('%r: disconnected from %r' % (self, self.remote)) 74 self.remote = None
75 self.remote.notifyOnDisconnect(nullRemote) 76 77 self.bundleLoader = bundleclient.BundleLoader(self.callRemote) 78 79 # figure out connection addresses if it's an internet address 80 tarzan = None 81 jane = None 82 try: 83 transport = remoteReference.broker.transport 84 tarzan = transport.getHost() 85 jane = transport.getPeer() 86 except Exception, e: 87 self.debug("could not get connection info, reason %r" % e) 88 if tarzan and jane: 89 self.debug("connection is from me on %s to remote on %s" % ( 90 addressGetHost(tarzan), 91 addressGetHost(jane)))
92
93 - def hasRemoteReference(self):
94 """ 95 Does the medium have a remote reference to a server-side avatar ? 96 """ 97 return self.remote != None
98
99 - def callRemoteLogging(self, level, stackDepth, name, *args, **kwargs):
100 """ 101 Call the given method with the given arguments remotely on the 102 server-side avatar. 103 104 Gets serialized to server-side perspective_ methods. 105 106 @param level: the level we should log at (log.DEBUG, log.INFO, etc) 107 @type level: int 108 @param stackDepth: the number of stack frames to go back to get 109 file and line information, negative or zero. 110 @type stackDepth: non-positive int 111 @param name: name of the remote method 112 @type name: str 113 """ 114 if level is not None: 115 debugClass = str(self.__class__).split(".")[-1].upper() 116 startArgs = [self.remoteLogName, debugClass, name] 117 format, debugArgs = log.getFormatArgs( 118 '%s --> %s: callRemote(%s, ', startArgs, 119 ')', (), args, kwargs) 120 logKwArgs = self.doLog(level, stackDepth - 1, 121 format, *debugArgs) 122 123 if not self.remote: 124 self.warning('Tried to callRemote(%s), but we are disconnected' 125 % name) 126 return defer.fail(errors.NotConnectedError()) 127 128 def callback(result): 129 format, debugArgs = log.getFormatArgs( 130 '%s <-- %s: callRemote(%s, ', startArgs, 131 '): %s', (log.ellipsize(result), ), args, kwargs) 132 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 133 return result
134 135 def errback(failure): 136 format, debugArgs = log.getFormatArgs( 137 '%s <-- %s: callRemote(%s, ', startArgs, 138 '): %r', (failure, ), args, kwargs) 139 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 140 return failure 141 142 d = self.remote.callRemote(name, *args, **kwargs) 143 if level is not None: 144 d.addCallbacks(callback, errback) 145 return d 146
147 - def callRemote(self, name, *args, **kwargs):
148 """ 149 Call the given method with the given arguments remotely on the 150 server-side avatar. 151 152 Gets serialized to server-side perspective_ methods. 153 """ 154 return self.callRemoteLogging(log.DEBUG, -1, name, *args, 155 **kwargs)
156
157 - def getBundledFunction(self, module, function):
158 """ 159 Returns the given function in the given module, loading the 160 module from a bundle. 161 162 If we can't find the bundle for the given module, or if the 163 given module does not contain the requested function, we will 164 raise L{flumotion.common.errors.RemoteRunError} (perhaps a 165 poorly chosen error). If importing the module raises an 166 exception, that exception will be passed through unmodified. 167 168 @param module: module the function lives in 169 @type module: str 170 @param function: function to run 171 @type function: str 172 173 @returns: a callable, the given function in the given module. 174 """ 175 176 def gotModule(mod): 177 if hasattr(mod, function): 178 return getattr(mod, function) 179 else: 180 msg = 'No procedure named %s in module %s' % (function, 181 module) 182 self.warning('%s', msg) 183 raise errors.RemoteRunError(msg)
184 185 def gotModuleError(failure): 186 failure.trap(errors.NoBundleError) 187 msg = 'Failed to find bundle for module %s' % module 188 self.warning('%s', msg) 189 raise errors.RemoteRunError(msg) 190 191 d = self.bundleLoader.loadModule(module) 192 d.addCallbacks(gotModule, gotModuleError) 193 return d 194
195 - def runBundledFunction(self, module, function, *args, **kwargs):
196 """ 197 Runs the given function in the given module with the given 198 arguments. 199 200 This method calls getBundledFunction and then invokes the 201 function. Any error raised by getBundledFunction or by invoking 202 the function will be passed through unmodified. 203 204 Callers that expect to return their result over a PB connection 205 should catch nonserializable exceptions so as to prevent nasty 206 backtraces in the logs. 207 208 @param module: module the function lives in 209 @type module: str 210 @param function: function to run 211 @type function: str 212 213 @returns: the return value of the given function in the module. 214 """ 215 self.debug('runBundledFunction(%r, %r)', module, function) 216 217 def gotFunction(proc): 218 219 def invocationError(failure): 220 self.warning('Exception raised while calling ' 221 '%s.%s(*args=%r, **kwargs=%r): %s', 222 module, function, args, kwargs, 223 log.getFailureMessage(failure)) 224 return failure
225 226 self.debug('calling %s.%s(%r, %r)', module, function, args, 227 kwargs) 228 d = defer.maybeDeferred(proc, *args, **kwargs) 229 d.addErrback(invocationError) 230 return d 231 232 d = self.getBundledFunction(module, function) 233 d.addCallback(gotFunction) 234 return d 235 236
237 -class PingingMedium(BaseMedium):
238 _pingInterval = configure.heartbeatInterval 239 _pingCheckInterval = (configure.heartbeatInterval * 240 configure.pingTimeoutMultiplier) 241 _pingDC = None 242
243 - def startPinging(self, disconnect):
244 """ 245 @param disconnect: a method to call when we do not get ping replies 246 @type disconnect: callable 247 """ 248 self.debug('startPinging') 249 self._lastPingback = time.time() 250 if self._pingDC: 251 self.debug("Cannot start pinging, already pinging") 252 return 253 self._pingDisconnect = disconnect 254 self._ping() 255 self._pingCheck()
256
257 - def _ping(self):
258 259 def pingback(result): 260 self._lastPingback = time.time() 261 self.log('pinged, pingback at %r' % self._lastPingback)
262 263 def pingFailed(failure): 264 # ignoring the connection failures so they don't end up in 265 # the logs - we'll notice the lack of pingback eventually 266 failure.trap(pb.PBConnectionLost) 267 self.log('ping failed: %s' % log.getFailureMessage(failure))
268 269 if self.remote: 270 self.log('pinging') 271 d = self.callRemoteLogging(log.LOG, 0, 'ping') 272 d.addCallbacks(pingback, pingFailed) 273 else: 274 self.info('tried to ping, but disconnected yo') 275 276 self._pingDC = reactor.callLater(self._pingInterval, 277 self._ping) 278
279 - def _pingCheck(self):
280 self._pingCheckDC = None 281 if (self.remote and 282 (time.time() - self._lastPingback > self._pingCheckInterval)): 283 self.info('no pingback in %f seconds, closing connection', 284 self._pingCheckInterval) 285 self._pingDisconnect() 286 else: 287 self._pingCheckDC = reactor.callLater(self._pingCheckInterval, 288 self._pingCheck)
289
290 - def stopPinging(self):
291 if self._pingCheckDC: 292 self._pingCheckDC.cancel() 293 self._pingCheckDC = None 294 295 if self._pingDC: 296 self._pingDC.cancel() 297 self._pingDC = None
298
299 - def _disconnect(self):
300 if self.remote: 301 self.remote.broker.transport.loseConnection()
302
303 - def setRemoteReference(self, remote):
304 BaseMedium.setRemoteReference(self, remote) 305 306 def stopPingingCb(x): 307 self.debug('stop pinging') 308 self.stopPinging()
309 self.remote.notifyOnDisconnect(stopPingingCb) 310 311 self.startPinging(self._disconnect) 312
313 - def remote_writeFluDebugMarker(self, level, marker):
314 """ 315 Sets a marker that will be prefixed to the log strings. Setting this 316 marker to multiple elements at a time helps debugging. 317 @param marker: A string to prefix all the log strings. 318 @type marker: str 319 """ 320 self.writeMarker(marker, level)
321