source: nscp/scripts/python/test_eventlog.py @ 74e060a

0.4.00.4.10.4.2
Last change on this file since 74e060a was 74e060a, checked in by Michael Medin <michael@…>, 14 months ago
  • Added new command to EventLog? CheckEventlogCached? which checks result caught by the real-time process. CheckEventLogCACHE warn=gt:1 crit=gt:5 Requires a configured real-time checker to work.
  • Fixed issue in installer and "Make file writable" by everyone now uses Users SID.
  • Fixed issue in installer and "Default plugins" now correctly sets them to 1.
  • Removed dependency on tcpip from the service and the installer
  • Added a series of keywords to EventLog? check to facilitate better checking
  • Added a set of aliases to make EventLog? behave more like Wdinwos Eventlog viewer.
  • Added a lot of unit test cases to the Eventlog checker.
  • Fixed issue with default schedule beeing added as an item and not a template
  • Property mode set to 100644
File size: 8.7 KB
Line 
1from NSCP import Settings, Registry, Core, log, status, sleep
2from test_helper import BasicTest, TestResult, Callable, setup_singleton, install_testcases, init_testcases, shutdown_testcases
3import plugin_pb2
4from types import *
5import socket
6import unicodedata
7
8class EventLogTest(BasicTest):
9        instance = None
10        key = ''
11        reg = None
12        got_response = False
13        last_source = None
14        last_command = None
15        last_status = None
16        last_message = None
17        last_perfdata = None
18        got_simple_response = None
19        message_count = 0
20       
21        class SingletonHelper:
22                def __call__( self, *args, **kw ) :
23                        if EventLogTest.instance is None :
24                                object = EventLogTest()
25                                EventLogTest.instance = object
26                        return EventLogTest.instance
27
28        getInstance = SingletonHelper()
29
30        def desc(self):
31                return 'Testcase for eventlog'
32
33        def title(self):
34                return 'EventLog test'
35
36        def setup(self, plugin_id, prefix):
37                self.key = '_%stest_command'%prefix
38                self.reg = Registry.get(plugin_id)
39                self.reg.simple_subscription('pytest_evlog', EventLogTest.simple_inbox_handler)
40
41        def simple_inbox_handler(channel, source, command, code, message, perf):
42                instance = EventLogTest.getInstance()
43                return instance.simple_inbox_handler_wrapped(channel, source, command, code, message, perf)
44        simple_inbox_handler = Callable(simple_inbox_handler)
45
46        def simple_inbox_handler_wrapped(self, channel, source, command, status, message, perf):
47                message = unicodedata.normalize('NFKD', message).encode('ascii','ignore')
48                log('Got simple message %s on %s'%(command, channel))
49                self.got_simple_response = True
50                self.last_source = source
51                self.last_command = command
52                self.last_status = status
53                self.last_message = message
54                self.message_count = self.message_count + 1
55                self.last_perfdata = perf
56                return True
57
58        def teardown(self):
59                None
60
61        def test_create(self, source, id, level, severity, category, facility, arguments):
62                self.last_message = None
63                self.message_count = 0
64                args = ['--source', source,
65                                        '--id', id,
66                                        '--level', level,
67                                        '--severity', severity,
68                                        '--category', category,
69                                        '--facility', facility
70                                        ]
71                for f in arguments:
72                        args.append('--argument')
73                        args.append(f)
74                (ret, msg) = Core.get().simple_exec('any', 'insert-eventlog', args)
75                return ret == 0
76               
77               
78        def test_w_expected(self, filter, syntax, expected):
79                result = TestResult('Validating filter: %s'%filter)
80                (res, msg, perf) = Core.get().simple_query('CheckEventLog', ['file=Application', 'debug=true', 'warn=ne:%d'%expected, 'crit=ne:%d'%expected, 'filter=%s'%filter, 'syntax=%s'%syntax])
81                result.assert_equals(res, status.OK, "Validate status OK for %s"%filter)
82                (res, msg, perf) = Core.get().simple_query('CheckEventLog', ['file=Application', 'debug=true', 'warn=eq:%d'%expected, 'crit=ne:%d'%expected, 'filter=%s'%filter, 'syntax=%s'%syntax])
83                result.assert_equals(res, status.WARNING, "Validate status OK for %s"%filter)
84                (res, msg, perf) = Core.get().simple_query('CheckEventLog', ['file=Application', 'debug=true', 'warn=eq:%d'%expected, 'crit=eq:%d'%expected, 'filter=%s'%filter, 'syntax=%s'%syntax])
85                result.assert_equals(res, status.CRITICAL, "Validate status CRIT for %s"%filter)
86                return result
87
88        def test_syntax(self, filter, syntax, expected):
89                result = TestResult('Validating syntax: %s'%syntax)
90                (res, msg, perf) = Core.get().simple_query('CheckEventLog', ['file=Application', 'warn=ne:1', 'filter=%s'%filter, 'syntax=%s'%syntax, 'descriptions'])
91                result.assert_equals(msg, expected, "Validate message rendering syntax: %s"%msg)
92                return result
93               
94        def run_test(self):
95                result = TestResult('Checking CheckEventLog')
96                cache = TestResult('Checking CheckEventLog CACHE')
97               
98                (res, msg, perf) = Core.get().simple_query('CheckEventLogCACHE', ['warn=eq:1', 'crit=eq:2'])
99                cache.assert_equals(res, status.OK, "Validate cache is empty")
100                cache.assert_equals(msg, 'Eventlog check ok', "Validate cache is ok: %s"%msg)
101               
102               
103                a_list = ['a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a']
104                result.add_message(self.test_create('Application Error', 1000, 'error', 0, 0, 0, a_list), 'Testing to create a log message')
105                sleep(500)
106                result.assert_equals(self.last_message, 'error Application Error: ', 'Verify that message is sent through')
107                result.assert_equals(self.message_count, 1, 'Verify that onlyt one message is sent through')
108
109                result.add_message(self.test_create('Application Error', 1000, 'info', 2, 1, 5, a_list), 'Testing to create a log message')
110                sleep(500)
111                result.assert_equals(self.last_message, 'info Application Error: ', 'Verify that message is sent through')
112                result.assert_equals(self.message_count, 1, 'Verify that onlyt one message is sent through')
113
114                (res, msg, perf) = Core.get().simple_query('CheckEventLogCACHE', ['warn=eq:1', 'crit=eq:2'])
115                cache.assert_equals(res, status.CRITICAL, "Validate cache has items")
116                cache.assert_equals(msg, 'error Application Error: , info Application Error: , eventlog: 2 = critical', "Validate cache is ok: %s"%msg)
117                cache.assert_equals(perf, "'eventlog'=2;1;2", "Validate cache is ok: %s"%msg)
118                (res, msg, perf) = Core.get().simple_query('CheckEventLogCACHE', ['warn=eq:1', 'crit=eq:2'])
119                cache.assert_equals(res, status.OK, "Validate cache is empty (again)")
120                cache.assert_equals(msg, 'Eventlog check ok', "Validate cache is ok: %s"%msg)
121               
122                result.add(cache)
123               
124                r = TestResult('Checking filters')
125                r.add(self.test_w_expected('id = 1000 and generated gt 1m', '%generated%', 0))
126                r.add(self.test_w_expected('id = 1000 and generated gt -1m', '%generated%', 2))
127                r.add(self.test_w_expected('id = 1000 and generated gt -1m and id = 1000', '%generated%: %id%, %category%', 2))
128                r.add(self.test_w_expected('id = 1000 and generated gt -1m and category = 1', '%category%', 1))
129                r.add(self.test_w_expected('id = 1000 and generated gt -1m and category = 0', '%category%', 1))
130                r.add(self.test_w_expected("id = 1000 and generated gt -1m and level = 'error'", '%level%', 1))
131                r.add(self.test_w_expected("id = 1000 and generated gt -1m and level = 'info'", '%level%', 1))
132                result.add(r)
133               
134                r = TestResult('Checking syntax')
135                r.add(self.test_syntax('id = 1000 and generated gt -2m and category = 0', '%source% - %type% - %category%', 'Application Error - error - 0'))
136                r.add(self.test_syntax('id = 1000 and generated gt -2m and category = 1', '%source% - %type% - %category%', 'Application Error - info - 1'))
137                r.add(self.test_syntax('id = 1000 and generated gt -2m and category = 0', '%facility% - %qualifier% - %customer%', '0 - 0 - 0'))
138                r.add(self.test_syntax('id = 1000 and generated gt -2m and category = 1', '%facility% - %qualifier% - %customer%', '5 - 5 - 0'))
139                r.add(self.test_syntax('id = 1000 and generated gt -2m and category = 0', '%rawid% - %severity% - %log%', '1000 - success - Application'))
140                r.add(self.test_syntax('id = 1000 and generated gt -2m and category = 1', '%rawid% - %severity% - %log%', '2147812328 - warning - Application'))
141                r.add(self.test_syntax('id = 1000 and generated gt -2m and category = 0', '%id% - %strings%', '1000 - a, a, a, a, a, a, a, a, a, a, a, a, a, '))
142                r.add(self.test_syntax('id = 1000 and generated gt -2m and category = 1', '%id% - %strings%', '1000 - a, a, a, a, a, a, a, a, a, a, a, a, a, '))
143                result.add(r)
144
145                return result
146
147        def install(self, arguments):
148                conf = Settings.get()
149                conf.set_string('/modules', 'pytest_eventlog', 'CheckEventLog')
150                conf.set_string('/modules', 'pytest', 'PythonScript')
151
152                conf.set_string('/settings/pytest/scripts', 'test_eventlog', 'test_eventlog.py')
153               
154                conf.set_string('/settings/pytest_eventlog/real-time', 'enabled', 'true')
155                conf.set_string('/settings/pytest_eventlog/real-time', 'filter', 'id = 1000 and category = 0')
156                conf.set_string('/settings/pytest_eventlog/real-time/filters', 'test', 'id = 1000 and category = 1')
157                conf.set_string('/settings/pytest_eventlog/real-time', 'maximum age', '5s')
158                conf.set_string('/settings/pytest_eventlog/real-time', 'destination', 'pytest_evlog')
159                conf.set_string('/settings/pytest_eventlog/real-time', 'language', 'english')
160                conf.set_string('/settings/pytest_eventlog/real-time', 'debug', 'true')
161                conf.set_string('/settings/pytest_eventlog/real-time', 'enable active', 'true')
162               
163                conf.save()
164       
165        def uninstall(self):
166                None
167
168        def help(self):
169                None
170
171        def init(self, plugin_id):
172                None
173                #reg = Registry.get(plugin_id)
174                #reg.simple_function('test_eventlog', test, 'Run python EventLog unit test suite')
175
176        def shutdown(self):
177                None
178
179setup_singleton(EventLogTest)
180
181all_tests = [EventLogTest]
182
183def __main__():
184        install_testcases(all_tests)
185       
186def init(plugin_id, plugin_alias, script_alias):
187        init_testcases(plugin_id, plugin_alias, script_alias, all_tests)
188
189def shutdown():
190        shutdown_testcases()
Note: See TracBrowser for help on using the repository browser.