source: nscp/scripts/python/test_nrpe.py @ 7354aa1

0.4.00.4.10.4.2
Last change on this file since 7354aa1 was a87ce04, checked in by Michael Medin <michael@…>, 17 months ago
  • Fixed some issues in the NRPE decoder
  • Added support for forwarding queries (ie. xxx_forward) mainly usefull for python(?) scripts where we can now handconstruct messages for delivery
  • Added NRPE unit tests (52 of them)
  • Fixed process enumeration API for 64/32 bit
  • Property mode set to 100644
File size: 8.0 KB
Line 
1from NSCP import Settings, Registry, Core, log, status, log_error, sleep
2from test_helper import BasicTest, TestResult, Callable, setup_singleton, install_testcases, init_testcases, shutdown_testcases
3import plugin_pb2
4from types import *
5import socket
6import uuid
7import unicodedata
8
9import threading
10sync = threading.RLock()
11
12core = Core.get()
13
14def isOpen(ip, port):
15        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
16        try:
17                s.connect((ip, int(port)))
18                s.shutdown(2)
19                return True
20        except:
21                return False
22
23class NRPEMessage:
24        uuid = None
25        source = None
26        command = None
27        status = None
28        message = None
29        perfdata = None
30        got_simple_response = False
31        got_response = False
32
33        def __init__(self, command):
34                if type(command) == 'unicode':
35                        try:
36                                self.uuid = command.decode('ascii', 'replace')
37                        except UnicodeDecodeError:
38                                self.uuid = command
39                else:
40                        self.uuid = command
41                #self.uuid = unicodedata.normalize('NFKD', command).encode('ascii','ignore')
42                self.command = command
43        def __str__(self):
44                return 'Message: %s (%s, %s, %s)'%(self.uuid, self.source, self.command, self.status)
45
46class NRPEServerTest(BasicTest):
47        instance = None
48        key = ''
49        reg = None
50        _responses = {}
51        _requests = {}
52       
53        def has_response(self, id):
54                with sync:
55                        return id in self._responses
56       
57        def get_response(self, id):
58                with sync:
59                        if id in self._responses:
60                                return self._responses[id]
61                        msg = NRPEMessage(id)
62                        self._responses[id] = msg
63                        return msg
64
65        def set_response(self, msg):
66                with sync:
67                        self._responses[msg.uuid] = msg
68
69        def del_response(self, id):
70                with sync:
71                        del self._responses[id]
72                       
73        def get_request(self, id):
74                with sync:
75                        if id in self._requests:
76                                return self._requests[id]
77                        msg = NRPEMessage(id)
78                        self._requests[id] = msg
79                        return msg
80
81        def set_request(self, msg):
82                with sync:
83                        self._requests[msg.uuid] = msg
84
85        def del_request(self, id):
86                with sync:
87                        del self._requests[id]
88                       
89       
90        def desc(self):
91                return 'Testcase for NRPE protocol'
92
93        def title(self):
94                return 'NRPE Client/Server test'
95
96        def setup(self, plugin_id, prefix):
97                self.key = '_%stest_command'%prefix
98                self.reg = Registry.get(plugin_id)
99                self.reg.simple_function('check_py_nrpe_test_s', NRPEServerTest.simple_handler, 'TODO')
100                self.reg.function('check_py_nrpe_test', NRPEServerTest.handler, 'TODO')
101
102        def simple_handler(arguments):
103                instance = NRPEServerTest.getInstance()
104                return instance.simple_handler_wrapped(arguments)
105        simple_handler = Callable(simple_handler)
106
107        def handler(channel, request):
108                instance = NRPEServerTest.getInstance()
109                return instance.handler_wrapped(channel, request)
110        handler = Callable(handler)
111       
112        def simple_handler_wrapped(self, arguments):
113                log('Got simple message %s'%arguments)
114                msg = self.get_response(arguments[0])
115                msg.got_simple_response = True
116                self.set_response(msg)
117                rmsg = self.get_request(arguments[0])
118                return (rmsg.status, rmsg.message, rmsg.perfdata)
119
120        def handler_wrapped(self, channel, request):
121                log_error('DISCARDING message on %s'%(channel))
122               
123                message = plugin_pb2.SubmitRequestMessage()
124                message.ParseFromString(request)
125                command = message.payload[0].command
126                log('Got message %s on %s'%(command, channel))
127               
128                msg = self.get_response(command)
129                msg.got_response = True
130                self.set_response(msg)
131                return None
132               
133        def teardown(self):
134                None
135               
136        def submit_payload(self, alias, ssl, length, source, status, msg, perf):
137                message = plugin_pb2.QueryRequestMessage()
138               
139                message.header.version = plugin_pb2.Common.VERSION_1
140                message.header.recipient_id = "test_rp"
141                host = message.header.hosts.add()
142                host.address = "127.0.0.1:15666"
143                host.id = "test_rp"
144                enc = host.metadata.add()
145                enc.key = "use ssl"
146                enc.value = '%s'%ssl
147                enc = host.metadata.add()
148                enc.key = "payload length"
149                enc.value = '%d'%length
150
151                uid = str(uuid.uuid4())
152                payload = message.payload.add()
153                payload.command = 'check_py_nrpe_test_s'
154                payload.arguments.append(uid)
155                rmsg = self.get_request(uid)
156                rmsg.status = status
157                rmsg.message = msg
158                rmsg.perfdata = perf
159                self.set_request(rmsg)
160                (result_code, response) = core.query('nrpe_forward', message.SerializeToString())
161                response_message = plugin_pb2.QueryResponseMessage()
162                response_message.ParseFromString(response)
163                result = TestResult()
164               
165                found = False
166                for i in range(0,10):
167                        if self.has_response(uid):
168                                rmsg = self.get_response(uid)
169                                #result.add_message(rmsg.got_response, 'Testing to recieve message using %s'%alias)
170                                result.add_message(rmsg.got_simple_response, 'Testing to recieve simple message using %s'%alias)
171                                result.add_message(len(response_message.payload) == 1, 'Verify that we only get one payload response for %s'%alias, '%s != 1'%len(response_message.payload))
172                                result.assert_equals(response_message.payload[0].result, status, 'Verify that status is sent through %s'%alias)
173                                result.assert_equals(response_message.payload[0].message, msg, 'Verify that message is sent through %s'%alias)
174                                #result.assert_equals(rmsg.perfdata, perf, 'Verify that performance data is sent through')
175                                self.del_response(uid)
176                                found = True
177                                break
178                        else:
179                                log('Waiting for %s'%uid)
180                                sleep(1)
181                if found:
182                        return result
183                return None
184
185        def test_one(self, ssl=True, length=1024, state = status.UNKNOWN, tag = 'TODO'):
186                r1 = self.submit_payload('%s/%s/%s'%(ssl, length, tag), ssl, length, '%ssrc%s'%(tag, tag), state, '%smsg%s'%(tag, tag), '')
187                if r1:
188                        return r1
189                result = TestResult()
190                result.add_message(True, '*FAILED* to send message with %s (retrying)'%encryption)
191                r2 = self.submit_payload('%s/%s'%(ssl, length), ssl, length, '%ssrc%s'%(tag, tag), state, '%smsg%s'%(tag, tag), '')
192                if r2:
193                        result.add(r2)
194                        return result
195                result.add_message(False, 'Failed to send message with: %s%%s'%(ssl, length))
196                return result
197
198        def do_one_test(self, ssl=True, length=1024):
199                conf = Settings.get()
200                conf.set_int('/settings/NRPE/test_nrpe_server', 'payload length', length)
201                conf.set_bool('/settings/NRPE/test_nrpe_server', 'use ssl', ssl)
202                conf.set_bool('/settings/NRPE/test_nrpe_server', 'allow arguments', True)
203                # TODO: conf.set_string('/settings/NRPE/test_nrpe_server', 'certificate', ssl)
204                core.reload('test_nrpe_server')
205                result = TestResult()
206                result.add_message(isOpen('127.0.0.1', 15666), 'Checking that port is open (server is up)')
207                result.add(self.test_one(ssl, length, state = status.UNKNOWN, tag = 'unknown'))
208                result.add(self.test_one(ssl, length, state = status.OK, tag = 'ok'))
209                result.add(self.test_one(ssl, length, state = status.WARNING, tag = 'warn'))
210                result.add(self.test_one(ssl, length, state = status.CRITICAL, tag = 'crit'))
211                return result
212
213        def run_test(self):
214                result = TestResult()
215                result.add(self.do_one_test(ssl=True))
216                result.add(self.do_one_test(ssl=False))
217                result.add(self.do_one_test(ssl=True, length=4096))
218                return result
219               
220        def install(self, arguments):
221                conf = Settings.get()
222                conf.set_string('/modules', 'test_nrpe_server', 'NRPEServer')
223                conf.set_string('/modules', 'test_nrpe_client', 'NRPEClient')
224                conf.set_string('/modules', 'pytest', 'PythonScript')
225
226                conf.set_string('/settings/pytest/scripts', 'test_nrpe', 'test_nrpe.py')
227               
228                conf.set_string('/settings/NRPE/test_nrpe_server', 'port', '15666')
229                conf.set_string('/settings/NRPE/test_nrpe_server', 'inbox', 'nrpe_test_inbox')
230                conf.set_string('/settings/NRPE/test_nrpe_server', 'encryption', '1')
231
232                conf.set_string('/settings/NRPE/test_nrpe_client/targets', 'nrpe_test_local', 'nrpe://127.0.0.1:15666')
233                conf.set_string('/settings/NRPE/test_nrpe_client', 'channel', 'nrpe_test_outbox')
234               
235                conf.save()
236
237        def uninstall(self):
238                None
239
240        def help(self):
241                None
242
243        def init(self, plugin_id):
244                None
245
246        def shutdown(self):
247                None
248
249setup_singleton(NRPEServerTest)
250
251all_tests = [NRPEServerTest]
252
253def __main__():
254        install_testcases(all_tests)
255       
256def init(plugin_id, plugin_alias, script_alias):
257        init_testcases(plugin_id, plugin_alias, script_alias, all_tests)
258
259def shutdown():
260        shutdown_testcases()
Note: See TracBrowser for help on using the repository browser.