1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 worker-side objects to handle worker clients
24 """
25
26 import signal
27
28 from twisted.internet import reactor, error
29 from twisted.spread import flavors
30 from zope.interface import implements
31
32 from flumotion.common import errors, interfaces, debug
33 from flumotion.common import medium
34 from flumotion.common.vfs import listDirectory, registerVFSJelly
35 from flumotion.twisted.pb import ReconnectingFPBClientFactory
36
37 __version__ = "$Rev: 7162 $"
38 JOB_SHUTDOWN_TIMEOUT = 5
39
40
42 """
43 I am a client factory for the worker to log in to the manager.
44 """
45 logCategory = 'worker'
46 perspectiveInterface = interfaces.IWorkerMedium
47
61
72
73
74
76
77
78
79 def remoteDisconnected(remoteReference):
80 if reactor.killed:
81 self.log('Connection to manager lost due to shutdown')
82 else:
83 self.warning('Lost connection to manager, '
84 'will attempt to reconnect')
85
86 def loginCallback(reference):
87 self.info("Logged in to manager")
88 self.debug("remote reference %r" % reference)
89
90 self.medium.setRemoteReference(reference)
91 reference.notifyOnDisconnect(remoteDisconnected)
92
93 def alreadyConnectedErrback(failure):
94 failure.trap(errors.AlreadyConnectedError)
95 self.warning('A worker with the name "%s" is already connected.' %
96 failure.value)
97
98 def accessDeniedErrback(failure):
99 failure.trap(errors.NotAuthenticatedError)
100 self.warning('Access denied.')
101
102 def connectionRefusedErrback(failure):
103 failure.trap(error.ConnectionRefusedError)
104 self.warning('Connection to %s:%d refused.' % (self._managerHost,
105 self._managerPort))
106
107 def NoSuchMethodErrback(failure):
108 failure.trap(flavors.NoSuchMethod)
109
110 if failure.value.find('remote_getKeycardClasses') > -1:
111 self.warning(
112 "Manager %s:%d is older than version 0.3.0. "
113 "Please upgrade." % (self._managerHost, self._managerPort))
114 return
115
116 return failure
117
118 def loginFailedErrback(failure):
119 self.warning('Login failed, reason: %s' % str(failure))
120
121 d.addCallback(loginCallback)
122 d.addErrback(accessDeniedErrback)
123 d.addErrback(connectionRefusedErrback)
124 d.addErrback(alreadyConnectedErrback)
125 d.addErrback(NoSuchMethodErrback)
126 d.addErrback(loginFailedErrback)
127
128
130 """
131 I am a medium interfacing with the manager-side WorkerAvatar.
132
133 @ivar brain: the worker brain
134 @type brain: L{worker.WorkerBrain}
135 @ivar factory: the worker client factory
136 @type factory: L{WorkerClientFactory}
137 """
138
139 logCategory = 'workermedium'
140
141 implements(interfaces.IWorkerMedium)
142
144 """
145 @type brain: L{worker.WorkerBrain}
146 """
147 self.brain = brain
148 self.factory = None
149 registerVFSJelly()
150
165
170
171
172
174 """
175 Gets the set of TCP ports that this worker is configured to use.
176
177 @rtype: 2-tuple: (list of int, bool)
178 @return: list of ports, and a boolean if we allocate ports
179 randomly
180 """
181 return self.brain.getPorts()
182
184 """
185 Return the TCP port the Feed Server is listening on.
186
187 @rtype: int, or NoneType
188 @return: TCP port number, or None if there is no feed server
189 """
190 return self.brain.getFeedServerPort()
191
192 - def remote_create(self, avatarId, type, moduleName, methodName,
193 nice, conf):
194 """
195 Start a component of the given type with the given nice level.
196 Will spawn a new job process to run the component in.
197
198 @param avatarId: avatar identification string
199 @type avatarId: str
200 @param type: type of the component to create
201 @type type: str
202 @param moduleName: name of the module to create the component from
203 @type moduleName: str
204 @param methodName: the factory method to use to create the component
205 @type methodName: str
206 @param nice: nice level
207 @type nice: int
208 @param conf: component config
209 @type conf: dict
210
211 @returns: a deferred fired when the process has started and created
212 the component
213 """
214 return self.brain.create(avatarId, type, moduleName, methodName,
215 nice, conf)
216
218 """
219 Checks if one or more GStreamer elements are present and can be
220 instantiated.
221
222 @param elementNames: names of the Gstreamer elements
223 @type elementNames: list of str
224
225 @rtype: list of str
226 @returns: a list of instantiatable element names
227 """
228 return self.brain.runCheck('flumotion.worker.checks.check',
229 'checkElements', elementNames)
230
232 """
233 Checks if the given module can be imported.
234
235 @param moduleName: name of the module to check
236 @type moduleName: str
237
238 @returns: None or Failure
239 """
240 return self.brain.runCheck(
241 'flumotion.worker.checks.check', 'checkImport',
242 moduleName)
243
245 """
246 Runs the given function in the given module with the given arguments.
247
248 @param module: module the function lives in
249 @type module: str
250 @param function: function to run
251 @type function: str
252
253 @returns: the return value of the given function in the module.
254 """
255 return self.brain.runCheck(module, function, *args, **kwargs)
256 remote_runFunction = remote_runCheck
257
259 """
260 I return a list of componentAvatarIds, I have. I am called by the
261 manager soon after I attach to it. This is needed on reconnects
262 so that the manager knows what components it needs to start on me.
263
264 @returns: a list of componentAvatarIds
265 """
266 return self.brain.getComponents()
267
269 """Kill one of the worker's jobs.
270
271 This method is intended for exceptional purposes only; a normal
272 component shutdown is performed by the manager via calling
273 remote_stop() on the component avatar.
274
275 Raises L{flumotion.common.errors.UnknownComponentError} if the
276 job is unknown.
277
278 @param avatarId: the avatar Id of the component, e.g.
279 '/default/audio-encoder'
280 @type avatarId: string
281 @param signum: Signal to send, optional. Defaults to SIGKILL.
282 @type signum: int
283 """
284 self.brain.killJob(avatarId, signum)
285
288
290 """List the directory called path
291 @returns: the directory
292 @rtype: deferred that will fire an object implementing L{IDirectory}
293 """
294 return listDirectory(directoryName)
295