source: nscp/include/zmsg.hpp @ f7a074d

0.4.00.4.10.4.2
Last change on this file since f7a074d was 2b2e9b8, checked in by Michael Medin <michael@…>, 21 months ago

Another massive commit with a single comment, but essentially this is the first version with a proper distributed message queue this will be the foundation of some pretty cool stuff in the next few weeks/months...

  • Implemented first version of DistributedClient? and DistributedSServer which work so now we have a proper message based transport. Still a lot of rough edges such as cookie and authentication support is hard coded. We also need a proper two way distributed server as well as implement "all" payload types.
  • Property mode set to 100644
File size: 12.3 KB
Line 
1/*  =========================================================================
2    zmsg.hpp
3
4    Multipart message class for example applications.
5
6    Follows the ZFL class conventions and is further developed as the ZFL
7    zfl_msg class.  See http://zfl.zeromq.org for more details.
8
9    -------------------------------------------------------------------------
10    Copyright (c) 1991-2010 iMatix Corporation <www.imatix.com>
11    Copyright other contributors as noted in the AUTHORS file.
12
13    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
14
15    This is free software; you can redistribute it and/or modify it under the
16    terms of the GNU Lesser General Public License as published by the Free
17    Software Foundation; either version 3 of the License, or (at your option)
18    any later version.
19
20    This software is distributed in the hope that it will be useful, but
21    WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABIL-
22    ITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
23    Public License for more details.
24
25    You should have received a copy of the GNU Lesser General Public License
26    along with this program. If not, see <http://www.gnu.org/licenses/>.
27    =========================================================================
28
29    Andreas Hoelzlwimmer <andreas.hoelzlwimmer@fh-hagenberg.at>
30*/
31
32#ifndef __ZMSG_H_INCLUDED__
33#define __ZMSG_H_INCLUDED__
34
35//#include "zhelpers.hpp"
36
37#include <vector>
38#include <string>
39#include <boost/optional.hpp>
40#include <boost/type.hpp>
41
42class zmsg {
43public:
44    //typedef std::basic_string<unsigned char> ustring;
45
46        typedef boost::optional<std::string> opstring_t;
47
48    zmsg() {
49    }
50
51   //  --------------------------------------------------------------------------
52   //  Constructor, sets initial body
53        zmsg(std::string body) {
54       body_set(body);
55   }
56
57   //  -------------------------------------------------------------------------
58   //  Constructor, sets initial body and sends message to socket
59   zmsg(std::string body, zmq::socket_t &socket) {
60       body_set(body);
61       send(socket);
62   }
63
64   //  --------------------------------------------------------------------------
65   //  Constructor, calls first receive automatically
66   zmsg(zmq::socket_t &socket) {
67           if (!recv(socket)) {
68                   std::wcout << _T("-------------------- DAMN -------------------") << std::endl;
69           }
70   }
71
72   //  --------------------------------------------------------------------------
73   //  Copy Constructor, equivalent to zmsg_dup
74   zmsg(zmsg &msg) {
75       m_part_data.resize(msg.m_part_data.size());
76       std::copy(msg.m_part_data.begin(), msg.m_part_data.end(), m_part_data.begin());
77   }
78
79   virtual ~zmsg() {
80      clear();
81   }
82
83   //  --------------------------------------------------------------------------
84   //  Erases all messages
85   void clear() {
86       m_part_data.clear();
87   }
88
89   void set_part(size_t part_nbr, std::string data) {
90           if (part_nbr < m_part_data.size() && part_nbr >= 0) {
91                   m_part_data[part_nbr] = data;
92           }
93   }
94   std::string get_part(size_t part_nbr) {
95           return m_part_data[part_nbr];
96   }
97
98   inline std::string msg_to_string(zmq::message_t &message) {
99           return std::string(reinterpret_cast<char*>(message.data()), message.size());
100   }
101   bool recv(zmq::socket_t & socket) {
102      clear();
103      while(1) {
104         zmq::message_t message(0);
105         try {
106            if (!socket.recv(&message, 0)) {
107               return false;
108            }
109         } catch (zmq::error_t error) {
110            //std::cout << "E: " << error.what() << std::endl;
111            return false;
112         }
113         char *data = reinterpret_cast<char*>(message.data());
114         if (message.size() == 17 && data[0] == 0) {
115            push_back(encode_uuid(msg_to_string(message)));
116         } else {
117            push_back(msg_to_string(message));
118         }
119                 boost::int64_t more;
120         size_t more_size = sizeof(more);
121         socket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
122         if (!more) {
123            break;
124         }
125      }
126      return true;
127   }
128
129   bool send(zmq::socket_t & socket) {
130       for (size_t part_nbr = 0; part_nbr < m_part_data.size(); part_nbr++) {
131          zmq::message_t message;
132                  std::string data = m_part_data[part_nbr];
133          if (data.size() == 33 && data [0] == '@') {
134             unsigned char * uuidbin = decode_uuid ((char *) data.c_str());
135             message.rebuild(17);
136             memcpy(message.data(), uuidbin, 17);
137                         delete uuidbin;
138          } else {
139             message.rebuild(data.size());
140             memcpy(message.data(), data.c_str(), data.size());
141          }
142          try {
143                         //dump();
144             socket.send(message, part_nbr < m_part_data.size() - 1 ? ZMQ_SNDMORE : 0);
145          } catch (zmq::error_t error) {
146                          assert(false);
147                          return false;
148          }
149       }
150       clear();
151           return true;
152   }
153
154   size_t parts() {
155      return m_part_data.size();
156   }
157
158   void body_set(const std::string body) {
159      if (m_part_data.size() > 0) {
160         m_part_data.erase(m_part_data.end()-1);
161      }
162      push_back(body);
163   }
164
165
166   boost::optional<std::string> body ()
167   {
168       if (m_part_data.size())
169           return boost::optional<std::string>(m_part_data[m_part_data.size() - 1]);
170       else
171           return boost::optional<std::string>();
172   }
173
174   // zmsg_push
175   void push_front(std::string part) {
176      m_part_data.insert(m_part_data.begin(), part);
177   }
178
179   // zmsg_append
180   void push_back(std::string part) {
181      m_part_data.push_back(part);
182   }
183
184   //  --------------------------------------------------------------------------
185   //  Formats 17-byte UUID as 33-char string starting with '@'
186   //  Lets us print UUIDs as C strings and use them as addresses
187   //
188   static std::string encode_uuid(std::string data) {
189       static char
190           hex_char [] = "0123456789ABCDEF";
191
192       assert(data[0] == 0);
193           std::string uuidstr = std::string(33, ' ');
194       uuidstr[0] = '@';
195       int byte_nbr;
196       for (byte_nbr = 0; byte_nbr < 16; byte_nbr++) {
197           uuidstr[byte_nbr * 2 + 1] = hex_char[(unsigned char)data[byte_nbr + 1] >> 4];
198           uuidstr[byte_nbr * 2 + 2] = hex_char[(unsigned char)data[byte_nbr + 1] & 15];
199       }
200       uuidstr[33] = 0;
201       return uuidstr;
202   }
203
204
205   // --------------------------------------------------------------------------
206   // Formats 17-byte UUID as 33-char string starting with '@'
207   // Lets us print UUIDs as C strings and use them as addresses
208   //
209   static unsigned char *
210   decode_uuid (char *uuidstr)
211   {
212       static char
213           hex_to_bin [128] = {
214              -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, /* */
215              -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, /* */
216              -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, /* */
217               0, 1, 2, 3, 4, 5, 6, 7, 8, 9,-1,-1,-1,-1,-1,-1, /* 0..9 */
218              -1,10,11,12,13,14,15,-1,-1,-1,-1,-1,-1,-1,-1,-1, /* A..F */
219              -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, /* */
220              -1,10,11,12,13,14,15,-1,-1,-1,-1,-1,-1,-1,-1,-1, /* a..f */
221              -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1 }; /* */
222
223       assert (strlen (uuidstr) == 33);
224       assert (uuidstr [0] == '@');
225       unsigned char *data = new unsigned char[17];
226       int byte_nbr;
227       data [0] = 0;
228       for (byte_nbr = 0; byte_nbr < 16; byte_nbr++)
229           data [byte_nbr + 1]
230               = (hex_to_bin [uuidstr [byte_nbr * 2 + 1] & 127] << 4)
231               + (hex_to_bin [uuidstr [byte_nbr * 2 + 2] & 127]);
232       return (data);
233   }
234
235   // zmsg_pop
236   boost::optional<std::string> pop_front() {
237      if (m_part_data.size() == 0) {
238         return boost::optional<std::string>();
239      }
240          std::string part = m_part_data.front();
241      m_part_data.erase(m_part_data.begin());
242      return boost::optional<std::string>(part);
243   }
244
245   void append(std::string part) {
246       push_back(part);
247   }
248
249   boost::optional<std::string> address() {
250      if (m_part_data.size()>0) {
251         return boost::optional<std::string>(m_part_data[0]);
252      } else {
253                  return boost::optional<std::string>();
254      }
255   }
256
257   void wrap(std::string address, boost::optional<std::string> delim) {
258      if (delim)
259         push_front(*delim);
260      push_front(address);
261   }
262   void wrap(std::string address, std::string delim) {
263           wrap(address, opstring_t(delim));
264   }
265   void wrap(std::string address) {
266           wrap(address, opstring_t());
267   }
268
269   boost::optional<std::string> unwrap() {
270      if (m_part_data.size() == 0) {
271         return opstring_t();
272      }
273          opstring_t addr = pop_front();
274      if (address()) {
275         pop_front();
276      }
277      return opstring_t(addr);
278   }
279
280   void dump() {
281      std::cerr << "--------------------------------------" << std::endl;
282      for (unsigned int part_nbr = 0; part_nbr < m_part_data.size(); part_nbr++) {
283                  std::string data = m_part_data [part_nbr];
284
285          // Dump the message as text or binary
286          std::cerr << "[" << std::setw(3) << std::setfill('0') << (int) data.size() << "] ";
287          for (unsigned int char_nbr = 0; char_nbr < data.size(); char_nbr++) {
288                          if (data [char_nbr] < 32 || data [char_nbr] == 127) {
289                                  std::cerr << "0x" << std::hex << std::setw(2) << std::setfill('0') << (short int) data [char_nbr];
290                                  //std::cerr.unsetf(std::hex);
291              } else {
292                                  std::cerr << (char) data [char_nbr];
293              }
294          }
295          std::cerr << std::endl;
296      }
297          std::cerr << "--------------------------------------" << std::endl;
298   }
299
300   static int
301   test(int verbose)
302   {
303      zmq::context_t context(5);
304      zmq::socket_t output(context, ZMQ_XREQ);
305      try {
306         output.bind("ipc://zmsg_selftest.ipc");
307      } catch (zmq::error_t error) {
308         //assert(error.num()!=0);
309      }
310      zmq::socket_t input(context, ZMQ_XREP);
311      try {
312         input.connect("ipc://zmsg_selftest.ipc");
313      } catch (zmq::error_t error) {
314         //assert(error.num()!=0);
315      }
316
317      zmsg zm;
318      zm.body_set((char *)"Hello");
319      assert (*zm.body() == "Hello");
320          std::cout << "body is correct" << std::endl;
321
322      zm.send(output);
323      assert(zm.parts() == 0);
324          std::cout << "sent message" << std::endl;
325
326      zm.recv(input);
327      assert (zm.parts() == 2);
328          std::cout << "received message" << std::endl;
329      if(verbose) {
330         zm.dump();
331      }
332
333      assert (zm.body() && *zm.body() == "Hello");
334
335      zm.clear();
336      zm.body_set("Hello");
337      zm.wrap("address1", "");
338      zm.wrap("address2");
339      assert (zm.parts() == 4);
340      zm.send(output);
341
342      zm.recv(input);
343      if (verbose) {
344         zm.dump();
345      }
346      assert (zm.parts() == 5);
347      assert (zm.address()->size() == 33);
348      zm.unwrap();
349      assert (zm.address() && *zm.address() == "address2");
350      //zm.body_fmt ("%c%s", 'W', "orld");
351      zm.send(output);
352
353      zm.recv (input);
354      zm.unwrap ();
355      assert (zm.parts () == 4);
356      assert (zm.body() && *zm.body() == "World");
357      opstring_t part = zm.unwrap ();
358      assert (part && *part == "address2");
359
360      // Pull off address 1, check that empty part was dropped
361      part = zm.unwrap ();
362      assert (part && *part == "address1");
363      assert (zm.parts () == 1);
364
365      // Check that message body was correctly modified
366      part = zm.pop_front();
367      assert (part && *part == "World");
368      assert (zm.parts () == 0);
369
370      // Check append method
371      zm.append ("Hello");
372      zm.append ("World!");
373      assert (zm.parts() == 2);
374      assert (zm.body() && *zm.body() == "World!");
375
376      zm.clear();
377      assert (zm.parts() == 0);
378
379      std::cout << "OK" << std::endl;
380      return 0;
381   }
382
383private:
384        std::vector<std::string> m_part_data;
385};
386
387struct zxmsg : public zmq::message_t {
388
389        zxmsg(std::string string) : zmq::message_t(string.size()) {
390                memcpy(data(), string.data(), string.size());
391        }
392        static std::string read(zmq::message_t &msg) {
393                return std::string(reinterpret_cast<char*>(msg.data()), msg.size());
394        }
395
396};
397
398#endif /* ZMSG_H_ */
Note: See TracBrowser for help on using the repository browser.