source: nscp/scripts/python/test_nsca.py @ 58f0e80

0.4.00.4.10.4.2
Last change on this file since 58f0e80 was 58f0e80, checked in by Michael Medin <michael@…>, 19 months ago

2011-10-21 MickeM

  • Some more threading issues in Python (works perfectly(?) now)
  • Refactored the tests a bit making test suites runable from a central hub
  • Fixed soo everything builds and works on Linux

2011-10-19 MickeM

  • Fixed some threadding issues in PythonScript
  • Still working on refactoring the unittest helpers a bit

2011-10-16 MickeM

  • Fixed issue with loading zero-byte UTF-8 config files
  • Fixed some issues with settings subsystem (debug log as well as createing instances for --generate)
  • Added ability to set event log to real-time
  • Added support for multiple eventlogs in real-time
  • Property mode set to 100644
File size: 7.3 KB
Line 
1from NSCP import Settings, Registry, Core, log, status, log_error, sleep
2from test_helper import Callable, TestResult, get_test_manager, create_test_manager
3import plugin_pb2
4from types import *
5import socket
6import uuid
7import unicodedata
8
9core = Core.get()
10
11prefix = 'py_'
12plugin_id = 0
13
14def get_help(arguments):
15        return (status.OK, 'help: Get help')
16
17
18def isOpen(ip, port):
19        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
20        try:
21                s.connect((ip, int(port)))
22                s.shutdown(2)
23                return True
24        except:
25                return False
26
27class NSCAMessage:
28        uuid = None
29        source = None
30        command = None
31        status = None
32        message = None
33        perfdata = None
34        got_simple_response = False
35
36        def __init__(self, command):
37                try:
38                        self.uuid = command.decode('ascii')
39                except UnicodeDecodeError:
40                        self.uuid = command
41                #self.uuid = unicodedata.normalize('NFKD', command).encode('ascii','ignore')
42                self.command = command
43        def __str__(self):
44                return 'Message: %s (%s, %s, %s)'%(self.uuid, self.source, self.command, self.status)
45
46class NSCAServerTest:
47        instance = None
48        key = ''
49        reg = None
50        responses = {}
51       
52        class SingletonHelper:
53                def __call__( self, *args, **kw ) :
54                        if NSCAServerTest.instance is None :
55                                object = NSCAServerTest()
56                                NSCAServerTest.instance = object
57                        return NSCAServerTest.instance
58
59        getInstance = SingletonHelper()
60
61        def desc(self):
62                return 'Testcase for NSCA protocol'
63
64        def title(self):
65                return 'NSCA Server test'
66
67        def setup(self, plugin_id, prefix):
68                self.key = '_%stest_command'%prefix
69                self.reg = Registry.get(plugin_id)
70                self.reg.simple_subscription('nsca_test_inbox', NSCAServerTest.simple_inbox_handler)
71                self.reg.subscription('nsca_test_inbox', NSCAServerTest.inbox_handler)
72
73        def simple_inbox_handler(channel, source, command, code, message, perf):
74                instance = NSCAServerTest.getInstance()
75                return instance.simple_inbox_handler_wrapped(channel, source, command, code, message, perf)
76        simple_inbox_handler = Callable(simple_inbox_handler)
77
78        def inbox_handler(channel, request):
79                instance = NSCAServerTest.getInstance()
80                return instance.inbox_handler_wrapped(channel, request)
81        inbox_handler = Callable(inbox_handler)
82       
83        def simple_inbox_handler_wrapped(self, channel, source, command, status, message, perf):
84                log('Got simple message %s on %s'%(command, channel))
85                msg = NSCAMessage(command)
86                if msg.uuid in self.responses:
87                        msg = self.responses[msg.uuid]
88                msg.source = source
89                msg.status = status
90                msg.message = message
91                msg.perfdata = perf
92                msg.got_simple_response = True
93                self.responses[msg.uuid] = msg
94                return True
95
96        def inbox_handler_wrapped(self, channel, request):
97                log_error('DISCARDED message on %s'%(channel))
98               
99                message = plugin_pb2.SubmitRequestMessage()
100                message.ParseFromString(request)
101                command = message.payload[0].command
102                log('Got simple message %s on %s'%(command, channel))
103               
104                msg = NSCAMessage(command)
105                if msg.uuid in self.responses:
106                        msg = self.responses[msg.uuid]
107                msg.got_response = True
108                self.responses[msg.uuid] = msg
109                return (False, '')
110               
111        def teardown(self):
112                None
113               
114        def submit_payload(self, encryption, source, status, msg, perf):
115                message = plugin_pb2.SubmitRequestMessage()
116               
117                message.header.version = plugin_pb2.Common.VERSION_1
118                message.header.recipient_id = "test_rp"
119                message.channel = 'nsca_test_outbox'
120                host = message.header.hosts.add()
121                host.address = "127.0.0.1:15667"
122                host.id = "test_rp"
123                enc = host.metadata.add()
124                enc.key = "encryption"
125                enc.value = encryption
126                enc = host.metadata.add()
127                enc.key = "password"
128                enc.value = 'pwd-%s'%encryption
129
130                uid = str(uuid.uuid4())
131                payload = message.payload.add()
132                payload.result = status
133                payload.command = uid
134                payload.message = '%s - %s'%(uid, msg)
135                payload.source = source
136                (result_code, err) = core.submit('nsca_test_outbox', message.SerializeToString())
137                result = TestResult()
138               
139                found = False
140                for i in range(0,10):
141                        if uid in self.responses:
142                                rmsg = self.responses[uid]
143                                result.add_message(rmsg.got_response, 'Testing to recieve message using %s'%encryption)
144                                result.add_message(rmsg.got_simple_response, 'Testing to recieve simple message using %s'%encryption)
145                                result.add_message(len(err) == 0, 'Testing to send message using %s'%encryption, err)
146                                #result.assert_equals(rmsg.last_source, source, 'Verify that source is sent through')
147                                result.assert_equals(rmsg.command, uid, 'Verify that command is sent through')
148                                result.assert_contains(rmsg.message, msg, 'Verify that message is sent through')
149                                result.assert_equals(rmsg.perfdata, perf, 'Verify that performance data is sent through')
150                                del self.responses[uid]
151                                found = True
152                                break
153                        else:
154                                log('Waiting for %s (%s)'%(uid, self.responses.keys()))
155                                sleep(1)
156                if not found:
157                        result.add_message(False, 'Failed to send message with uuid: %s using %s'%(uid, encryption), err)
158                return result
159
160        def test_one_full(self, encryption, state, key):
161                return self.submit_payload(encryption, '%ssrc%s'%(key, key), state, '%smsg%s'%(key, key), '')
162
163        def test_one(self, crypto):
164                conf = Settings.get()
165                conf.set_string('/settings/NSCA/test_nsca_server', 'encryption', '%s'%crypto)
166                conf.set_string('/settings/NSCA/test_nsca_server', 'password', 'pwd-%s'%crypto)
167                core.reload('test_nsca_server')
168                result = TestResult()
169                result.add(self.test_one_full(crypto, status.UNKNOWN, 'unknown'))
170                result.add(self.test_one_full(crypto, status.OK, 'ok'))
171                result.add(self.test_one_full(crypto, status.WARNING, 'warn'))
172                result.add(self.test_one_full(crypto, status.CRITICAL, 'crit'))
173                return result
174
175        def run_test(self):
176                result = TestResult()
177                result.add_message(isOpen('localhost', 15667), 'Checking that port is open')
178                # Currently broken: "xor"
179                cryptos = ["des", "3des", "cast128", "xtea", "blowfish", "twofish", "rc2", "aes", "serpent", "gost", "none", "3way"]
180                for c in cryptos:
181                        result.add_message(True, 'Testing crypto: %s'%c)
182                        result.add(self.test_one(c))
183               
184                return result
185               
186        def install(self, arguments):
187                conf = Settings.get()
188                conf.set_string('/modules', 'test_nsca_server', 'NSCAServer')
189                conf.set_string('/modules', 'test_nsca_client', 'NSCAClient')
190                conf.set_string('/modules', 'pytest', 'PythonScript')
191
192                conf.set_string('/settings/pytest/scripts', 'test_nsca', 'test_nsca.py')
193               
194                conf.set_string('/settings/NSCA/test_nsca_server', 'port', '15667')
195                conf.set_string('/settings/NSCA/test_nsca_server', 'inbox', 'nsca_test_inbox')
196                conf.set_string('/settings/NSCA/test_nsca_server', 'encryption', '1')
197
198                conf.set_string('/settings/NSCA/test_nsca_client/targets', 'nsca_test_local', 'nsca://127.0.0.1:15667')
199                conf.set_string('/settings/NSCA/test_nsca_client', 'channel', 'nsca_test_outbox')
200               
201                conf.save()
202
203        def uninstall(self):
204                None
205
206        def help(self):
207                None
208
209        def init(self, plugin_id):
210                None
211
212        def shutdown(self):
213                None
214
215all_tests = [NSCAServerTest]
216
217def __main__():
218        test_manager = create_test_manager()
219        test_manager.add(all_tests)
220        test_manager.install()
221       
222def init(plugin_id, plugin_alias, script_alias):
223        test_manager = create_test_manager(plugin_id, plugin_alias, script_alias)
224        test_manager.add(all_tests)
225
226        test_manager.init()
227
228def shutdown():
229        test_manager = get_test_manager()
230        test_manager.shutdown()
Note: See TracBrowser for help on using the repository browser.