1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 Framework for writing automated integration tests.
24
25 This module provides a way of writing automated integration tests from
26 within Twisted's unit testing framework, trial. Test cases are
27 constructed as subclasses of the normal trial
28 L{twisted.trial.unittest.TestCase} class.
29
30 Integration tests look like normal test methods, except that they are
31 decorated with L{integration.test}, take an extra "plan" argument, and
32 do not return anything. For example::
33
34 from twisted.trial import unittest
35 from flumotion.twisted import integration
36
37 class IntegrationTestExample(unittest.TestCase):
38 @integration.test
39 def testEchoFunctionality(self, plan):
40 process = plan.spawn('echo', 'hello world')
41 plan.wait(process, 0)
42
43 This example will spawn a process, as if you typed "echo 'hello world'"
44 at the shell prompt. It then waits for the process to exit, expecting
45 the exit status to be 0.
46
47 The example illustrates two of the fundamental plan operators, spawn and
48 wait. "spawn" spawns a process. "wait" waits for a process to finish.
49 The other operators are "spawnPar", which spawns a number of processes
50 in parallel, "waitPar", which waits for a number of processes in
51 parallel, and "kill", which kills one or more processes via SIGTERM and
52 then waits for them to exit.
53
54 It is evident that this framework is most appropriate for testing the
55 integration of multiple processes, and is not suitable for in-process
56 tests. The plan that is built up is only executed after the test method
57 exits, via the L{integration.test} decorator; the writer of the
58 integration test does not have access to the plan's state.
59
60 Note that all process exits must be anticipated. If at any point the
61 integration tester receives SIGCHLD, the next operation must be a wait
62 for that process. If this is not the case, the test is interpreted as
63 having failed.
64
65 Also note that while the test is running, the stdout and stderr of each
66 spawned process is redirected into log files in a subdirectory of where
67 the test is located. For example, in the previous example, the following
68 files will be created::
69
70 $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stdout
71 $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stderr
72
73 In the case that multiple echo commands are run in the same plan, the
74 subsequent commands will be named as echo-1, echo-2, and the like. Upon
75 successful completion of the test case, the log directory will be
76 deleted.
77 """
78
79 import os
80 import signal
81
82 from twisted.python import failure
83 from twisted.internet import reactor, protocol, defer
84 from flumotion.common import log as flog
85
86 __version__ = "$Rev: 7162 $"
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 reactor.wakeUp = lambda: reactor.waker and reactor.waker.wakeUp()
110
111
112 -def log(format, *args):
114
115
116 -def debug(format, *args):
118
119
120 -def info(format, *args):
122
123
126
127
128 -def error(format, *args):
130
131
133 if os.sep in executable:
134 if os.access(os.path.abspath(executable), os.X_OK):
135 return os.path.abspath(executable)
136 elif os.getenv('PATH'):
137 for path in os.getenv('PATH').split(os.pathsep):
138 if os.access(os.path.join(path, executable), os.X_OK):
139 return os.path.join(path, executable)
140 raise CommandNotFoundException(executable)
141
142
144
145 - def __init__(self, process, expectedCode, actualCode):
146 Exception.__init__(self)
147 self.process = process
148 self.expected = expectedCode
149 self.actual = actualCode
150
152 return ('Expected exit code %r from %r, but got %r'
153 % (self.expected, self.process, self.actual))
154
155
157
161
163 return 'The process %r exited prematurely.' % self.process
164
165
167
171
173 return 'Command %r not found in the PATH.' % self.command
174
175
177
179 Exception.__init__(self)
180 self.processes = processes
181
183 return ('Processes still running at end of test: %r'
184 % (self.processes, ))
185
186
188
192
194 return ('Timed out waiting for %r to exit with status %r'
195 % (self.process, self.status))
196
197
199
201 self.exitDeferred = defer.Deferred()
202 self.timedOut = False
203
205 return self.exitDeferred
206
207 - def timeout(self, process, status):
211
213 info('process ended with status %r, exit code %r',
214 status, status.value.exitCode)
215 if self.timedOut:
216 warning('already timed out??')
217 print 'already timed out quoi?'
218 else:
219 info('process ended with status %r, exit code %r',
220 status, status.value.exitCode)
221 self.exitDeferred.callback(status.value.exitCode)
222
223
225 NOT_STARTED, STARTED, STOPPED = 'NOT-STARTED', 'STARTED', 'STOPPED'
226
227 - def __init__(self, name, argv, testDir):
228 self.name = name
229 self.argv = (_which(argv[0]), ) + argv[1:]
230 self.testDir = testDir
231
232 self.pid = None
233 self.protocol = None
234 self.state = self.NOT_STARTED
235 self._timeoutDC = None
236
237 log('created process object %r', self)
238
240 assert self.state == self.NOT_STARTED
241
242 self.protocol = ProcessProtocol()
243
244 stdout = open(os.path.join(self.testDir, self.name + '.stdout'), 'w')
245 stderr = open(os.path.join(self.testDir, self.name + '.stderr'), 'w')
246
247 childFDs = {1: stdout.fileno(), 2: stderr.fileno()}
248
249
250
251
252
253
254
255
256
257
258
259
260 info('spawning process %r, argv=%r', self, self.argv)
261 termHandler = signal.signal(signal.SIGTERM, signal.SIG_DFL)
262 env = dict(os.environ)
263 env['FLU_DEBUG'] = '5'
264 process = reactor.spawnProcess(self.protocol, self.argv[0],
265 env=env, args=self.argv,
266 childFDs=childFDs)
267 signal.signal(signal.SIGTERM, termHandler)
268
269 stdout.close()
270 stderr.close()
271
272
273
274
275 self.pid = process.pid
276 self.state = self.STARTED
277
278 def got_exit(res):
279 self.state = self.STOPPED
280 info('process %r has stopped', self)
281 return res
282 self.protocol.getDeferred().addCallback(got_exit)
283
284 - def kill(self, sig=signal.SIGTERM):
285 assert self.state == self.STARTED
286 info('killing process %r, signal %d', self, sig)
287 os.kill(self.pid, sig)
288
289 - def wait(self, status, timeout=20):
300 d.addCallback(got_exit)
301 if self.state == self.STARTED:
302 self._timeoutDC = reactor.callLater(timeout,
303 self.protocol.timeout,
304 self,
305 status)
306
307 def cancel_timeout(res):
308 debug('cancelling timeout for %r', self)
309 if self._timeoutDC.active():
310 self._timeoutDC.cancel()
311 return res
312 d.addCallbacks(cancel_timeout, cancel_timeout)
313 return d
314
316 return '<Process %s in state %s>' % (self.name, self.state)
317
318
320
321
323 self.processes = []
324 self.timeout = 20
325
326 - def spawn(self, process):
331
337
338 - def kill(self, process):
342
343 - def wait(self, process, exitCode):
344 assert process in self.processes
345
346 def remove_from_processes_list(_):
347 self.processes.remove(process)
348 d = process.wait(exitCode, timeout=self.timeout)
349 d.addCallback(remove_from_processes_list)
350 return d
351
367 p.protocol.processEnded = callbacker(d)
368 p.kill(sig=signal.SIGKILL)
369 d = defer.DeferredList(dlist)
370
371 def error(_):
372 if failure:
373 return failure
374 else:
375 raise e
376 d.addCallback(error)
377 return d
378 return failure
379
380 - def run(self, ops, timeout=20):
381 self.timeout = timeout
382 d = defer.Deferred()
383
384 def run_op(_, op):
385
386
387 return op[0](*op[1:])
388 for op in ops:
389 d.addCallback(run_op, op)
390 d.addCallbacks(lambda _: self._checkProcesses(failure=None),
391 lambda failure: self._checkProcesses(failure=failure))
392
393
394
395
396
397 reactor.callLater(0, d.callback, None)
398 return d
399
400
402
403 - def __init__(self, testCase, testName):
404 self.name = testName
405 self.testCaseName = testCase.__class__.__name__
406 self.processes = {}
407 self.outputDir = self._makeOutputDir(os.getcwd())
408
409
410
411 self.vm = PlanExecutor()
412 self.ops = []
413 self.timeout = 20
414
416
417 try:
418 os.mkdir(testDir)
419 except OSError:
420 pass
421 tail = '%s-%s' % (self.testCaseName, self.name)
422 outputDir = os.path.join(testDir, tail)
423 os.mkdir(outputDir)
424 return outputDir
425
427 for root, dirs, files in os.walk(self.outputDir, topdown=False):
428 for name in files:
429 os.remove(os.path.join(root, name))
430 for name in dirs:
431 os.rmdir(os.path.join(root, name))
432 os.rmdir(self.outputDir)
433 self.outputDir = None
434
445
448
451
452 - def spawn(self, command, *args):
456
458 processes = []
459 self._appendOp(self.vm.checkExits, ())
460 for argv in argvs:
461 assert isinstance(argv, tuple), \
462 'all arguments to spawnPar must be tuples'
463 for arg in argv:
464 assert isinstance(arg, str), \
465 'all subarguments to spawnPar must be strings'
466 processes.append(self._allocProcess(argv))
467 for process in processes:
468 self._appendOp(self.vm.spawn, process)
469 return tuple(processes)
470
471 - def wait(self, process, status):
473
474 - def waitPar(self, *processStatusPairs):
479
480 - def kill(self, process, status=None):
484
489
490
492 testName = proc.__name__
493
494 def wrappedtest(self):
495 plan = Plan(self, testName)
496 proc(self, plan)
497 return plan.execute()
498 try:
499 wrappedtest.__name__ = testName
500 except TypeError:
501
502 pass
503
504
505 wrappedtest.timeout = 666
506 return wrappedtest
507