source: nscp/scripts/python/lib/test_helper.py @ 8013c0c

0.4.00.4.10.4.2
Last change on this file since 8013c0c was 8013c0c, checked in by Michael Medin <michael@…>, 16 months ago
  • Fixed so NSCAClient parses address correctly
  • settings exception is now derived from exception meaning it will show up more with details (instead of unknown)
  • Added API for handling log level (replaces older debug flag)
  • Added options for settings debug level
  • Changed to --settings is a global argument (meaning you can use it in any mode)
  • Added arguments parsing to test: so you can use global arguments such as --log and --settings.
  • Removed memory leak in settings parsing interface
  • Property mode set to 100644
File size: 9.1 KB
Line 
1from NSCP import Settings, Registry, Core, log, log_debug, log_error, status
2import os
3import inspect
4
5test_manager = None
6
7def install_testcases(tests):
8        test_manager = create_test_manager()
9        test_manager.add(tests)
10        test_manager.install()
11
12def init_testcases(plugin_id, plugin_alias, script_alias, tests):
13        test_manager = create_test_manager(plugin_id, plugin_alias, script_alias)
14        test_manager.add(tests)
15        test_manager.init()
16
17def shutdown_testcases():
18        if get_test_manager():
19                get_test_manager().shutdown()
20        destroy_test_manager()
21
22def get_test_manager():
23        global test_manager
24        return test_manager
25
26def destroy_test_manager():
27        global test_manager
28        if test_manager:
29                test_manager.destroy()
30        test_manager = None
31
32def create_test_manager(plugin_id = 0, plugin_alias = '', script_alias = ''):
33        global test_manager
34        if not test_manager:
35                test_manager = TestManager(plugin_id, plugin_alias, script_alias)
36               
37                reg = Registry.get(plugin_id)
38               
39                reg.simple_cmdline('help', display_help)
40                reg.simple_cmdline('install_python_test', install_tests)
41                reg.simple_cmdline('run_python_test', run_tests)
42
43                reg.simple_function('py_unittest', run_tests, 'Run python unit test suite')
44       
45        return test_manager
46       
47def add_test_suite(suites):
48        mgr = get_test_manager()
49        if isinstance(suites, (list)):
50                for s in suites:
51                        mgr.add(s)
52        else:
53                mgr.add(suites)
54       
55def install_tests(arguments = []):
56        get_test_manager().install(arguments)
57        return (status.OK, 'installed?')
58
59def run_tests(arguments = []):
60        result = get_test_manager().run(arguments)
61        return result.return_nagios()
62
63def display_help(arguments = []):
64        return (status.OK, 'TODO')
65
66class Callable:
67        def __init__(self, anycallable):
68                self.__call__ = anycallable
69
70class SingletonHelper:
71        klass = None
72        def __init__(self, klass):
73                self.klass = klass
74        def __call__(self, *args, **kw):
75                if not self.klass._instance:
76                        self.klass._instance = self.klass()
77                return self.klass._instance
78
79def setup_singleton(klass, src = None):
80        klass.getInstance = SingletonHelper(klass)
81        if not src:
82                cf = inspect.currentframe()
83                if cf:
84                        bf = cf.f_back
85                        if bf:
86                                src = bf.f_code.co_filename
87        klass.__source__ = src
88
89class BasicTest(object):
90
91        _instance = None
92        getInstance = None
93        __source__ = ''
94
95        def desc(self):
96                return 'TODO: Describe: %s'%self.title()
97
98        def title(self):
99                return self._instance.__class__.__name__
100
101        def setup(self, plugin_id, prefix):
102                None
103
104        def teardown(self):
105                None
106
107        def run_test(self):
108                result = TestResult('run_test')
109                result.add_message(False, 'TODO add implementation')
110                return result
111
112        def install(self, arguments):
113                conf = Settings.get()
114                conf.set_string('/modules', 'pytest', 'PythonScript')
115                fn = os.path.basename(self.__source__)
116                (sn, ext) = os.path.splitext(fn)
117                conf.register_key('/settings/pytest/scripts', sn, 'string', 'UNIT TEST SCRIPT: %s'%self.title(), 'A script for running unittests for: %s'%self.desc(), fn)
118                conf.set_string('/settings/pytest/scripts', sn, fn)
119               
120                conf.save()
121       
122        def uninstall(self):
123                None
124
125        def help(self):
126                None
127
128        def init(self, plugin_id):
129                None
130
131        def shutdown(self):
132                None
133
134        def require_boot(self):
135                return False
136               
137class TestResultEntry:
138        status = False
139        desc = 'Unassigned result'
140        error = None
141        def __init__(self, status, desc, error):
142                self.status = status
143                self.desc = desc
144                self.error = error
145
146        def log(self, prefix = '', indent = 0):
147                if self.status:
148                        log_debug('%s%s%s'%(prefix, ''.rjust(indent, ' '), self))
149                else:
150                        log_error('%s%s%s'%(prefix, ''.rjust(indent, ' '), self))
151       
152        def is_ok(self):
153                return self.status
154
155        def count(self):
156                if self.status:
157                        return (1, 1)
158                return (1, 0)
159
160        def contains(self, other):
161                if self == other:
162                        return True
163                return False
164               
165        def __str__(self):
166                if self.status:
167                        return 'OK: %s'%self.desc
168                else:
169                        return 'ERROR: %s (%s)'%(self.desc, self.error)
170               
171class TestResultCollection(TestResultEntry):
172
173        status = True
174        title = None
175        children = []
176        def __init__(self, title, list = None):
177                self.title = title
178                self.children = []
179                if list:
180                        self.extend(list)
181
182        def log(self, prefix = '', indent = 0):
183                start = '%s%s'%(prefix, ''.rjust(indent, ' '))
184                if self.status:
185                        log_debug('%s%s'%(start, self))
186                else:
187                        log_error('%s%s'%(start, self))
188                for c in self.children:
189                        c.log(prefix, indent+1)
190       
191        def is_ok(self):
192                return self.status
193
194        def count(self):
195                total_count = 0
196                ok_count = 0
197                #if self.status:
198                #       ok_count = 1
199
200                for c in self.children:
201                        (total, ok) = c.count()
202                        total_count = total_count + total
203                        ok_count = ok_count + ok
204
205                return (total_count, ok_count)
206
207        def contains(self, other):
208                for c in self.children:
209                        if c.contains(other):
210                                return True
211                return False
212
213        def __str__(self):
214                if self.status:
215                        return 'OK: %s'%self.title
216                else:
217                        (total, ok) = self.count()
218                        return 'ERROR: %s (%d/%d)'%(self.title, ok, total)
219
220        def extend(self, lst):
221                if isinstance(lst, list):
222                        if self.status:
223                                for c in lst:
224                                        if not c.is_ok():
225                                                self.status = False
226                                               
227                        for c in lst:
228                                if c.contains(self):
229                                        log_error('Attempting to add a list with me in it')
230                                        return
231                        self.children.extend(lst)
232                else:
233                        self.append(lst)
234
235        def append(self, entry):
236                if not entry:
237                        log_error('Attempting to add invalid entry (None)')
238                elif entry == self:
239                        log_error('Attempting to add self to self')
240                else:
241                        if self.status and not entry.is_ok():
242                                self.status = False
243                        self.children.append(entry)
244       
245class TestResult(TestResultCollection):
246
247        def __init__(self, title = 'DUMMY TITLE'):
248                TestResultCollection.__init__(self, title)
249
250        def add_message(self, status, message, error = None):
251                e = TestResultEntry(status, message, error)
252                e.log()
253                self.append(e)
254               
255        def assert_equals(self, s1, s2, msg):
256                self.add_message(s1 == s2, msg, '"%s" != "%s"'%(s1, s2))
257               
258        def assert_contains(self, s1, s2, msg):
259                if s1 == s2:
260                        self.add_message(s1 in s2 or s2 in s1, msg, '"%s" (contains) "%s"'%(s1, s2))
261                elif s1 == None or s2 == None:
262                        self.add_message(False, msg, '"%s" (contains) "%s"'%(s1, s2))
263                else:
264                        self.add_message(s1 in s2 or s2 in s1, msg, '"%s" (contains) "%s"'%(s1, s2))
265
266        def add_entry(self, e):
267                self.append(e)
268
269        def add(self, result):
270                self.extend(result)
271
272        def return_nagios(self):
273                (total, ok) = self.count()
274                log_debug(' | Test result log (only summary will be returned to query)')
275                self.log(' | ')
276                if total == ok:
277                        return (status.OK, "OK: %d test(s) successfull"%(total))
278                else:
279                        return (status.CRITICAL, "ERROR: %d/%d test(s) failed"%(total-ok, total))
280
281class TestManager:
282       
283        suites = []
284        prefix = ''
285        plugin_id = None
286        plugin_alias = None
287        script_alias = None
288       
289        def __init__(self, plugin_id = 0, plugin_alias = '', script_alias = ''):
290                if script_alias:
291                        self.prefix = '%s_'%script_alias
292                self.plugin_id = plugin_id
293                self.plugin_alias = plugin_alias
294                self.script_alias = script_alias
295                self.suites = []
296       
297        def add(self, suite):
298                if isinstance(suite, list):
299                        for s in suite:
300                                self.add(s)
301                else:
302                        if not suite in self.suites:
303                                self.suites.append(suite)
304
305        def run_suite(self, suite):
306                result = TestResult('Running suite: %s'%suite.title())
307                for c in list:
308                        result.add(run_test(plugin_id, prefix, c))
309                return result
310               
311        def run(self, arguments = []):
312                result = TestResult('Test result for %d suites'%len(self.suites))
313                for suite in self.suites:
314                        instance = suite.getInstance()
315                        instance.setup(self.plugin_id, self.prefix)
316                        suite_result = TestResult('Running suite: %s'%instance.title())
317                        suite_result.append(instance.run_test())
318                        result.append(suite_result)
319                        result.add_message(suite_result.is_ok(), 'Result from suite: %s'%instance.title())
320                        instance.teardown()
321                return result
322
323        def init(self):
324                for suite in self.suites:
325                        instance = suite.getInstance()
326                        instance.init(self.plugin_id)
327                       
328        def destroy(self):
329                self.suites = []
330                self.prefix = ''
331                self.plugin_id = None
332                self.plugin_alias = None
333                self.script_alias = None
334                       
335        def install(self, arguments = []):
336                boot = False
337                for suite in self.suites:
338                        instance = suite.getInstance()
339                        instance.install(arguments)
340                        if instance.require_boot():
341                                boot = True
342
343                #core = Core.get()
344                #core.reload('service')
345                #(code, msg, perf) = core.simple_query('py_unittest', [])
346               
347                log('-+---==(TEST INSTALLER)==---------------------------------------------------+-')
348                log(' | Setup nessecary configuration for running test                           |')
349                log(' | This includes: Loading the PythonScript module at startup                |')
350                log(' | To use this please run nsclient++ in "test mode" like so:                |')
351                if boot:
352                        log(' | nscp client --boot --query py_unittest                                   |')
353                else:
354                        log(' | nscp client --query py_unittest                                          |')
355                log('-+--------------------------------------------------------==(DAS ENDE!)==---+-')
356                       
357        def shutdown(self):
358                for suite in self.suites:
359                        instance = suite.getInstance()
360                        instance.shutdown()
361                       
362               
Note: See TracBrowser for help on using the repository browser.