Changeset 2b2e9b8 in nscp


Ignore:
Timestamp:
09/18/11 21:56:36 (21 months ago)
Author:
Michael Medin <michael@…>
Branches:
master, 0.4.0, 0.4.1, 0.4.2
Children:
a629015
Parents:
76d1076
Message:

Another massive commit with a single comment, but essentially this is the first version with a proper distributed message queue this will be the foundation of some pretty cool stuff in the next few weeks/months...

  • Implemented first version of DistributedClient? and DistributedSServer which work so now we have a proper message based transport. Still a lot of rough edges such as cookie and authentication support is hard coded. We also need a proper two way distributed server as well as implement "all" payload types.
Files:
4 added
27 edited

Legend:

Unmodified
Added
Removed
  • CMakeLists.txt

    r7515d00 r2b2e9b8  
    216216if(OPENSSL_FOUND) 
    217217  MESSAGE(STATUS "Found OpenSSL: ${OPENSSL_INCLUDE_DIR}") 
    218   SET(NSCP_GLOBAL_DEFINES ${NSCP_GLOBAL_DEFINES} -DUSE_SSL) 
    219218else(OPENSSL_FOUND) 
    220219  MESSAGE(WARNING " OpenSSL NOT found (no ssl support, ${OPENSSL_INCLUDE_DIR})") 
  • changelog

    r7515d00 r2b2e9b8  
    66 * Fix RtlStringFromGUID problem on NT4 
    77 
     82011-09-18 MickeM 
     9 * Implementd first version of DistributedClient and DistributedSServer which work so now we have a propper messager based transport. 
     10   Still a lot of rough edges such as cookie and authentication support is hardcoded. 
     11   We also need a proper two way distributed server as well as implement "all" payload types. 
    812 
    9132011-09-11 MickeM 
  • include/client/command_line_parser.cpp

    r7515d00 r2b2e9b8  
    162162  std::string buffer, reply; 
    163163  nscapi::functions::create_simple_query_request(config.data->command, config.data->arguments, buffer); 
    164   int ret = config.handler->exec(config.data, buffer, reply); 
     164  int ret = config.handler->query(config.data, buffer, reply); 
    165165  nscapi::functions::parse_simple_query_response(reply, msg, perf); 
    166166  return ret; 
  • include/nscapi/nscapi_plugin_wrapper.hpp

    ra14aa07 r2b2e9b8  
    115115      } 
    116116      void register_command(std::wstring command, std::wstring description) { 
    117         get_core()->registerCommand(get_id(), _T("submit_nscp"), _T("Submit a query to a remote host via NSCP")); 
     117        get_core()->registerCommand(get_id(), command, description); 
    118118      } 
    119119 
  • include/nscp/client/socket.hpp

    r7515d00 r2b2e9b8  
    7878      while (left > 0) { 
    7979        nscp::packet chunk; 
    80         std::vector<char> buf(sizeof(nscp::data::signature_packet)); 
     80        std::vector<char> buf(nscp::length::get_signature_size()); 
    8181        if (!reader->read_and_wait(*socket_, get_socket(), boost::asio::buffer(buf))) { 
    8282          get_socket().close(); 
     
    8686        chunk.read_signature(buf); 
    8787        std::wcout << _T("<<<") << chunk.signature.to_wstring() << std::endl; 
    88         buf.resize(chunk.signature.payload_length); 
     88        buf.resize(nscp::length::get_payload_size(chunk.signature)); 
    8989 
    9090        if (!reader->read_and_wait(*socket_, get_socket(), boost::asio::buffer(buf))) { 
  • include/nscp/handler.cpp

    r7515d00 r2b2e9b8  
    11#include <nscp/handler.hpp> 
    2  
    32#include <nscapi/functions.hpp> 
    4  
    53 
    64std::list<nscp::packet> nscp::handler::process_all(const std::list<nscp::packet> &packets) { 
    75  std::list<nscp::packet> result; 
    86  BOOST_FOREACH(const nscp::packet &packet, packets) { 
    9     std::list<nscp::packet> tmp = process(packet); 
    10     result.insert(result.begin(), tmp.begin(), tmp.end()); 
     7    result.push_back(process(packet)); 
    118  } 
    129  return result; 
     
    1411 
    1512 
    16 std::list<nscp::packet> nscp::handler::process(const nscp::packet &packet) { 
    17   std::list<nscp::packet> result; 
     13nscp::packet nscp::handler::process(const nscp::packet &packet) { 
    1814  Plugin::Common::Header hdr; 
    19  
    2015  if (nscp::checks::is_query_request(packet)) { 
    2116    Plugin::QueryRequestMessage msg; 
     
    2621    try { 
    2722      NSCAPI::nagiosReturn returncode = handle_query_request(packet.payload, msg, reply); 
    28       if (returncode == NSCAPI::returnIgnored) { 
     23      if (returncode == NSCAPI::returnIgnored) 
    2924        nscapi::functions::create_simple_query_response_unknown(command, _T("Command was not found: ") + command, _T(""), reply); 
    30       } 
    3125    } catch (const nscp::nscp_exception &e) { 
    3226      nscapi::functions::create_simple_query_response_unknown(command, _T("Processing error: ") + command + _T(": ") + utf8::cvt<std::wstring>(e.what()), _T(""), reply); 
     
    3428      nscapi::functions::create_simple_query_response_unknown(command, _T("Unknown error processing: ") + command + _T(": ") + utf8::cvt<std::wstring>(e.what()), _T(""), reply); 
    3529    } 
     30    return nscp::factory::create_query_response(reply); 
    3631  } else if (nscp::checks::is_query_response(packet)) { 
    3732    // @todo handle submission here 
    38  
     33    return nscp::factory::create_error(_T("Submissions currently not supported: ") + packet.to_wstring()); 
    3934  } else { 
    4035    this->log_error(__FILE__, __LINE__, _T("Unknown packet: ") + packet.to_wstring()); 
    41     result.push_back(nscp::factory::create_error(_T("Unknown packet: ") + packet.to_wstring())); 
     36    return nscp::factory::create_error(_T("Unknown packet: ") + packet.to_wstring()); 
    4237  } 
    43   return result; 
    4438} 
  • include/nscp/handler.hpp

    r7515d00 r2b2e9b8  
    88 
    99  struct handler : public nscp::server::server_handler { 
    10     virtual std::list<nscp::packet> process(const nscp::packet &packet); 
     10    virtual nscp::packet process(const nscp::packet &packet); 
    1111    virtual std::list<nscp::packet> process_all(const std::list<nscp::packet> &packet); 
    1212 
  • include/nscp/packet.hpp

    r7515d00 r2b2e9b8  
    4949    static const short exec_response = 21; 
    5050 
     51    static const short message_envelope_request = 30; 
     52    static const short message_envelope_response = 31; 
     53 
    5154    static const int nscp_magic_number = 12345; 
    5255    static const short error = 100; 
     
    5457    static const short version_1 = 1; 
    5558 
    56     struct signature_packet { 
     59    struct signature_type; 
     60    struct tcp_signature_data { 
    5761      int16_t   version; 
    5862 
     
    6569      u_int32_t additional_packet_count; 
    6670      u_int32_t magic_number; 
    67  
    68       signature_packet() : magic_number(nscp_magic_number) {} 
    69       signature_packet(const signature_packet &other)  
     71      tcp_signature_data() : magic_number(nscp_magic_number) {} 
     72      tcp_signature_data(const tcp_signature_data &other) 
    7073        : version(other.version) 
    7174        , header_type(other.header_type) 
     
    7679        , additional_packet_count(other.additional_packet_count) 
    7780      {} 
    78       const signature_packet& operator=(const signature_packet &other) { 
     81      const tcp_signature_data& operator=(const tcp_signature_data &other) { 
    7982        version = other.version; 
    8083        header_type = other.header_type; 
     
    8689        return *this; 
    8790      } 
    88  
    89       bool validate() const { 
    90         return magic_number == nscp_magic_number; 
     91      const tcp_signature_data& operator=(const signature_type &other) { 
     92        version = other.version; 
     93        header_type = other.header_type; 
     94        header_length = other.header_length; 
     95        payload_type = other.payload_type; 
     96        payload_length = other.payload_length; 
     97        additional_packet_count = other.additional_packet_count; 
     98        magic_number = other.magic_number; 
     99        return *this; 
    91100      } 
    92101 
     
    99108          << _T(", payload: ") << payload_type 
    100109          << _T(", ") << payload_length 
    101           << _T(", count: ") << additional_packet_count ; 
     110          << _T(", count: ") << additional_packet_count; 
    102111        return ss.str(); 
    103112      } 
     
    110119          << ", payload: " << payload_type 
    111120          << ", " << payload_length 
    112           << ", count: " << additional_packet_count ; 
     121          << ", count: " << additional_packet_count; 
    113122        return ss.str(); 
    114123      } 
    115124    }; 
     125    struct signature_type : public tcp_signature_data { 
     126      std::string cookie; 
     127 
     128      signature_type() {} 
     129      signature_type(const signature_type &other) : tcp_signature_data(other), cookie(other.cookie)  {} 
     130      signature_type(const tcp_signature_data &other) : tcp_signature_data(other) {} 
     131      const signature_type& operator=(const signature_type &other) { 
     132        tcp_signature_data::operator =(other); 
     133        cookie = other.cookie; 
     134        return *this; 
     135      } 
     136      const signature_type& operator=(const tcp_signature_data &other) { 
     137        tcp_signature_data::operator =(other); 
     138        return *this; 
     139      } 
     140 
     141      bool validate() const { 
     142        return magic_number == nscp_magic_number; 
     143      } 
     144 
     145      std::wstring to_wstring() const { 
     146        std::wstringstream ss; 
     147        ss << _T("base: {") << tcp_signature_data::to_wstring() << _T("}"); 
     148        return ss.str(); 
     149      } 
     150      std::string to_string() const { 
     151        std::stringstream ss; 
     152        ss << "base: {" << tcp_signature_data::to_string() << "}, cookie: " << cookie ; 
     153        return ss.str(); 
     154      } 
     155    }; 
     156 
     157 
    116158  }; 
    117159  struct length { 
    118     static unsigned long get_signature_size() { 
    119       return sizeof(data::signature_packet); 
    120     } 
    121     static unsigned long get_header_size(const data::signature_packet &signature) { 
     160    static unsigned long long get_signature_size() { 
     161      return sizeof(data::tcp_signature_data); 
     162    } 
     163    static unsigned long long get_header_size(const data::tcp_signature_data &signature) { 
    122164      return signature.header_length*sizeof(char); 
    123165    } 
    124     static unsigned long get_payload_size(const data::signature_packet &signature) { 
     166    static unsigned long long get_payload_size(const data::tcp_signature_data &signature) { 
    125167      return signature.payload_length*sizeof(char); 
    126168    } 
     
    141183 
    142184  struct packet { 
    143     nscp::data::signature_packet signature; 
     185    nscp::data::signature_type signature; 
    144186    std::string header; 
    145187    std::string payload; 
    146188 
    147189    packet() {} 
    148     packet(const nscp::data::signature_packet &sig) : signature(sig) {} 
    149     packet(const nscp::data::signature_packet &sig, std::string header, std::string payload) : signature(sig), header(header), payload(payload) {} 
     190    packet(const nscp::data::tcp_signature_data &sig) : signature(sig) {} 
     191    packet(const nscp::data::signature_type &sig, std::string header, std::string payload) : signature(sig), header(header), payload(payload) {} 
    150192    packet(const packet & other) : signature(other.signature), header(other.header), payload(other.payload) {} 
    151193    const packet& operator=(const packet & other) { 
     
    165207      return ret; 
    166208    } 
     209    std::string write_signature() const { 
     210      std::string buffer; 
     211      write_signature(buffer); 
     212      return buffer; 
     213    } 
     214    std::string write_msg_signature() const { 
     215      std::string buffer; 
     216      write_msg_signature(buffer); 
     217      return buffer; 
     218    } 
     219    void write_msg_signature(std::string &buffer) const { 
     220      NSCPIPC::Signature sig; 
     221      sig.set_header_type(signature.header_type); 
     222      sig.set_payload_type(signature.payload_type); 
     223      sig.set_version(NSCPIPC::Common_Version_VERSION_1); 
     224      sig.set_cookie(signature.cookie); 
     225      sig.AppendToString(&buffer); 
     226    } 
    167227    void write_signature(std::string &buffer) const { 
    168       // @todo: Optimize this away once this is working 
    169       char * tmpbuffer = new char[length::get_signature_size()+1]; 
    170       nscp::data::signature_packet *tmp = reinterpret_cast<nscp::data::signature_packet*>(tmpbuffer); 
    171       *tmp = signature; 
    172       buffer.append(tmpbuffer, length::get_signature_size()); 
    173       delete [] tmpbuffer; 
     228      nscp::data::tcp_signature_data data = signature; 
     229      buffer.append(reinterpret_cast<char*>(&data), length::get_signature_size()); 
     230    } 
     231    std::string write_header() const { 
     232      std::string buffer; 
     233      write_header(buffer); 
     234      return buffer; 
    174235    } 
    175236    inline void write_header(std::string &buffer) const { 
     
    177238        buffer.insert(buffer.end(), header.begin(), header.end()); 
    178239    } 
     240    std::string write_payload() const { 
     241      std::string buffer; 
     242      write_payload(buffer); 
     243      return buffer; 
     244    } 
    179245    inline void write_payload(std::string &buffer) const { 
    180246      if (!payload.empty()) 
     
    182248    } 
    183249 
     250    bool is_signature_valid() { 
     251      return signature.magic_number == nscp::data::nscp_magic_number; 
     252    } 
     253 
    184254    ////////////////////////////////////////////////////////////////////////// 
    185255    // Read from vector (string?) 
     256 
     257    void read_msg_signature(std::string &string) { 
     258      NSCPIPC::Signature sig; 
     259      sig.ParseFromString(string); 
     260      signature.header_length = 0; 
     261      signature.payload_length = 0; 
     262      signature.cookie = sig.cookie(); 
     263      signature.header_type = sig.header_type(); 
     264      signature.payload_type = sig.payload_type(); 
     265      signature.magic_number = nscp::data::nscp_magic_number; 
     266      if (sig.version() == NSCPIPC::Common_Version_VERSION_1) 
     267        signature.version = nscp::data::version_1; 
     268      signature.additional_packet_count = 0; 
     269    } 
     270 
    186271    void read_signature(std::vector<char> &buf) { 
    187272      assert(buf.size() >= nscp::length::get_signature_size()); 
    188       // @todo: Optimize this away once this is working 
    189       nscp::data::signature_packet *tmp = reinterpret_cast<nscp::data::signature_packet*>(&(*buf.begin())); 
    190       signature = *tmp; 
    191       signature.payload_type = tmp->payload_type; 
    192       signature.payload_length = tmp->payload_length; 
     273      read_signature(reinterpret_cast<nscp::data::tcp_signature_data*>(&(*buf.begin()))); 
     274    } 
     275    void read_signature(std::string &string) { 
     276      assert(string.size() >= nscp::length::get_signature_size()); 
     277      read_signature(reinterpret_cast<nscp::data::tcp_signature_data*>(&(*string.begin()))); 
    193278    } 
    194279    void read_signature(std::string::iterator begin, std::string::iterator end) { 
    195280      assert(end-begin >= nscp::length::get_signature_size()); 
    196       // @todo: Optimize this away once this is working 
    197       nscp::data::signature_packet *tmp = reinterpret_cast<nscp::data::signature_packet*>(&(*begin)); 
    198       signature = *tmp; 
    199       signature.payload_type = tmp->payload_type; 
    200       signature.payload_length = tmp->payload_length; 
     281      read_signature(reinterpret_cast<nscp::data::tcp_signature_data*>(&(*begin))); 
     282    } 
     283    void read_signature(nscp::data::signature_type *sig) { 
     284      signature = *sig; 
     285    } 
     286    void read_signature(nscp::data::tcp_signature_data *sig) { 
     287      signature = *sig; 
    201288    } 
    202289    void nibble_signature(std::string &buf) { 
    203       assert(buf.size() >= nscp::length::get_signature_size()); 
    204       // @todo: Optimize this away once this is working 
    205       nscp::data::signature_packet *tmp = reinterpret_cast<nscp::data::signature_packet*>(&(*buf.begin())); 
    206       signature = *tmp; 
    207       signature.payload_type = tmp->payload_type; 
    208       signature.payload_length = tmp->payload_length; 
     290      read_signature(buf); 
    209291      buf.erase(buf.begin(), buf.begin()+nscp::length::get_signature_size()); 
     292    } 
     293    void read_header(std::string &string) { 
     294      read_header(string.begin(), string.end()); 
    210295    } 
    211296    void read_header(std::vector<char> &buf) { 
     
    219304      header = std::string(buf.begin(), buf.begin()+nscp::length::get_header_size(signature)); 
    220305      buf.erase(buf.begin(), buf.begin()+nscp::length::get_header_size(signature)); 
     306    } 
     307    void read_payload(std::string &string) { 
     308      read_payload(string.begin(), string.end()); 
    221309    } 
    222310    void read_payload(std::vector<char> &buf) { 
     
    255343 
    256344 
    257     static nscp::data::signature_packet create_simple_sig(int payload_type, std::string::size_type size) { 
    258       nscp::data::signature_packet signature; 
     345    static nscp::data::signature_type create_simple_sig(int payload_type, std::string::size_type size) { 
     346      nscp::data::signature_type signature; 
    259347      signature.header_length = 0; 
    260348      signature.header_type = 0; 
     
    268356 
    269357    } 
     358    static nscp::data::signature_type create_sig(int payload_type, std::string::size_type header_size, std::string::size_type payload_size, unsigned long additional_packets = 0) { 
     359      nscp::data::signature_type signature; 
     360      signature.header_length = header_size; 
     361      signature.header_type = 0; 
     362 
     363      signature.additional_packet_count = additional_packets; 
     364      signature.version = nscp::data::version_1; 
     365 
     366      signature.payload_length = payload_size; 
     367      signature.payload_type = payload_type; 
     368      return signature; 
     369 
     370    } 
    270371    static packet create_payload(unsigned long payload_type, std::string buffer, unsigned long additional_packets = 0) { 
    271       nscp::data::signature_packet signature; 
     372      nscp::data::signature_type signature; 
    272373      signature.header_length = 0; 
    273374      signature.header_type = 0; 
     
    286387    } 
    287388 
    288     static packet create_envelope_request(unsigned long additionl_packets) { 
    289       nscp::data::signature_packet signature; 
    290       signature.header_length = 0; 
    291       signature.header_type = 0; 
    292  
    293       signature.additional_packet_count = additionl_packets; 
    294       signature.version = nscp::data::version_1; 
    295  
     389    static packet create_message_envelope_request(unsigned long additional_packets) { 
     390      std::string buffer; 
     391      NSCPIPC::MessageRequestEnvelope request_envelope; 
     392      request_envelope.mutable_envelope()->set_version(NSCPIPC::Common_Version_VERSION_1); 
     393      request_envelope.mutable_envelope()->set_max_supported_version(NSCPIPC::Common_Version_VERSION_1); 
     394      // @todo: set authentication stuff here 
     395      request_envelope.SerializeToString(&buffer); 
     396      return packet(create_sig(nscp::data::message_envelope_request, 0, buffer.size(), additional_packets), "", buffer); 
     397    } 
     398    static packet create_message_envelope_response(std::string cookie, int sequence) { 
     399      std::string buffer; 
     400      NSCPIPC::MessageResponseEnvelope envelope; 
     401      envelope.mutable_envelope()->set_version(NSCPIPC::Common_Version_VERSION_1); 
     402      envelope.mutable_envelope()->set_max_supported_version(NSCPIPC::Common_Version_VERSION_1); 
     403      envelope.set_cookie(cookie); 
     404      envelope.set_sequence(sequence); 
     405      envelope.SerializeToString(&buffer); 
     406      return packet(create_sig(nscp::data::message_envelope_response, 0, buffer.size(), 0), "", buffer); 
     407    } 
     408 
     409 
     410    static packet create_envelope_request(unsigned long additional_packets) { 
    296411      std::string buffer; 
    297412      NSCPIPC::RequestEnvelope request_envelope; 
     
    299414      request_envelope.set_max_supported_version(NSCPIPC::Common_Version_VERSION_1); 
    300415      request_envelope.SerializeToString(&buffer); 
    301  
    302       signature.payload_length = buffer.size(); 
    303       signature.payload_type = nscp::data::envelope_request; 
    304  
    305       return packet(signature, "", buffer); 
    306     } 
    307  
    308     static packet create_envelope_response(unsigned long additionl_packets) { 
    309       nscp::data::signature_packet signature; 
    310       signature.header_length = 0; 
    311       signature.header_type = 0; 
    312  
    313       signature.additional_packet_count = additionl_packets; 
    314       signature.version = nscp::data::version_1; 
    315  
     416      return packet(create_sig(nscp::data::envelope_request, 0, buffer.size(), additional_packets), "", buffer); 
     417    } 
     418 
     419    static packet create_envelope_response(unsigned long additional_packets) { 
    316420      std::string buffer; 
    317421      NSCPIPC::RequestEnvelope request_envelope; 
     
    319423      request_envelope.set_max_supported_version(NSCPIPC::Common_Version_VERSION_1); 
    320424      request_envelope.SerializeToString(&buffer); 
    321  
    322       signature.payload_length = buffer.size(); 
    323       signature.payload_type = nscp::data::envelope_response; 
    324  
    325       return packet(signature, "", buffer); 
     425      return packet(create_sig(nscp::data::envelope_response, 0, buffer.size(), additional_packets), "", buffer); 
    326426    } 
    327427 
    328428    static packet create_error(std::wstring msg) { 
    329       nscp::data::signature_packet signature; 
    330       signature.header_length = 0; 
    331       signature.header_type = 0; 
    332  
    333       signature.additional_packet_count = 0; 
    334       signature.version = nscp::data::version_1; 
    335  
    336429      std::string buffer; 
    337430      NSCPIPC::ErrorMessage message; 
     
    340433      error->set_message(utf8::cvt<std::string>(msg)); 
    341434      message.SerializeToString(&buffer); 
    342  
    343       signature.payload_length = buffer.size(); 
    344       signature.payload_type = nscp::data::error; 
    345  
    346       return packet(signature, "", buffer); 
     435      return packet(create_sig(nscp::data::error, 0, buffer.size()), "", buffer); 
    347436    } 
    348437  }; 
     
    355444      return packet.signature.payload_type == nscp::data::envelope_response; 
    356445    } 
     446    static bool is_message_envelope_request(const nscp::packet &packet) { 
     447      return packet.signature.payload_type == nscp::data::message_envelope_request; 
     448    } 
     449    static bool is_message_envelope_response(const nscp::packet &packet) { 
     450      return packet.signature.payload_type == nscp::data::message_envelope_response; 
     451    } 
    357452    static bool is_query_request(const nscp::packet &packet) { 
    358453      return packet.signature.payload_type == nscp::data::command_request; 
  • include/nscp/server/connection.cpp

    r7515d00 r2b2e9b8  
    121121        envelope.ParseFromString(packet.payload); 
    122122      } else { 
    123         std::list<nscp::packet> result = handler_->process(packet); 
    124         outbound_queue_.insert(outbound_queue_.end(), result.begin(), result.end()); 
     123        outbound_queue_.push_back(handler_->process(packet)); 
    125124      } 
    126125      return boost::make_tuple(sig.additional_packet_count > 0, process_helper(&nscp::server::parser::digest_signature, &connection::process_signature)); 
  • include/nscp/server/connection.hpp

    r7515d00 r2b2e9b8  
    7373      boost::shared_ptr<nscp::server::server_handler> handler_; 
    7474 
    75       nscp::data::signature_packet sig; 
     75      nscp::data::tcp_signature_data sig; 
    7676 
    7777      /// Buffer for incoming data. 
  • include/nscp/server/handler.hpp

    r7515d00 r2b2e9b8  
    1414    public: 
    1515      server_handler() {} 
    16       virtual std::list<nscp::packet> process(const nscp::packet &packet) = 0; 
     16      virtual nscp::packet process(const nscp::packet &packet) = 0; 
    1717      virtual std::list<nscp::packet> process_all(const std::list<nscp::packet> &packet) = 0; 
    1818 
  • include/nscp/server/parser.hpp

    r7515d00 r2b2e9b8  
    3131 
    3232      template <typename InputIterator> 
    33       InputIterator digest_header(InputIterator begin, InputIterator end, const nscp::data::signature_packet &signature) { 
     33      InputIterator digest_header(InputIterator begin, InputIterator end, const nscp::data::tcp_signature_data &signature) { 
    3434        return digest_anything(begin, end, nscp::length::get_header_size(signature)); 
    3535      } 
    3636 
    37       boost::tuple<bool, char*> digest_payload(char* begin, char* end, const nscp::data::signature_packet &signature) { 
     37      boost::tuple<bool, char*> digest_payload(char* begin, char* end, const nscp::data::tcp_signature_data &signature) { 
    3838        return digest_anything(begin, end, nscp::length::get_payload_size(signature)); 
    3939      } 
    4040 
    41       nscp::data::signature_packet parse_signature() { 
    42         assert(buffer_.size() >= sizeof(nscp::data::signature_packet)); 
    43         nscp::data::signature_packet *tmp = reinterpret_cast<nscp::data::signature_packet*>(&(*buffer_.begin())); 
    44         nscp::data::signature_packet signature = *tmp; 
     41      nscp::data::tcp_signature_data parse_signature() { 
     42        assert(buffer_.size() >= nscp::length::get_signature_size()); 
     43        nscp::data::tcp_signature_data *tmp = reinterpret_cast<nscp::data::tcp_signature_data*>(&(*buffer_.begin())); 
     44        nscp::data::tcp_signature_data signature = *tmp; 
    4545        buffer_.clear(); 
    4646        return signature; 
    4747      } 
    48       void parse_header(const nscp::data::signature_packet &signature) { 
     48      void parse_header(const nscp::data::tcp_signature_data &signature) { 
    4949        unsigned long wanted = nscp::length::get_header_size(signature); 
    5050        if (wanted == 0) 
  • include/strEx.h

    r7515d00 r2b2e9b8  
    282282      } 
    283283    } 
     284    ss << chars; 
    284285    return ss.str(); 
    285286  } 
     
    304305        chars += buf[i]; 
    305306    } 
    306     return ss.str(); 
     307    ss << chars; 
     308    return ss.str(); 
     309  } 
     310  inline std::string format_buffer(std::string buf) { 
     311    return format_buffer(buf.c_str(), buf.size()); 
    307312  } 
    308313  inline std::string format_buffer(const std::vector<char> &buf) { 
     
    326331        chars += buf[i]; 
    327332    } 
     333    ss << chars; 
    328334    return ss.str(); 
    329335  } 
  • include/zmsg.hpp

    r7515d00 r2b2e9b8  
    8888 
    8989   void set_part(size_t part_nbr, std::string data) { 
    90        if (part_nbr < m_part_data.size() && part_nbr >= 0) { 
    91            m_part_data[part_nbr] = data; 
    92        } 
     90     if (part_nbr < m_part_data.size() && part_nbr >= 0) { 
     91       m_part_data[part_nbr] = data; 
     92     } 
     93   } 
     94   std::string get_part(size_t part_nbr) { 
     95     return m_part_data[part_nbr]; 
    9396   } 
    9497 
     
    109112         } 
    110113         char *data = reinterpret_cast<char*>(message.data()); 
    111          //std::cerr << "recv: \"" << (unsigned char*) message.data() << "\", size " << message.size() << std::endl; 
    112114         if (message.size() == 17 && data[0] == 0) { 
    113115            push_back(encode_uuid(msg_to_string(message))); 
     
    125127   } 
    126128 
    127    void send(zmq::socket_t & socket) { 
     129   bool send(zmq::socket_t & socket) { 
    128130       for (size_t part_nbr = 0; part_nbr < m_part_data.size(); part_nbr++) { 
    129131          zmq::message_t message; 
     
    133135             message.rebuild(17); 
    134136             memcpy(message.data(), uuidbin, 17); 
    135             delete uuidbin; 
     137      delete uuidbin; 
    136138          } else { 
    137139             message.rebuild(data.size()); 
     
    139141          } 
    140142          try { 
    141         //dump(); 
     143       //dump(); 
    142144             socket.send(message, part_nbr < m_part_data.size() - 1 ? ZMQ_SNDMORE : 0); 
    143145          } catch (zmq::error_t error) { 
    144              //assert(error.num()!=0); 
     146        assert(false); 
     147        return false; 
    145148          } 
    146149       } 
    147150       clear(); 
     151     return true; 
    148152   } 
    149153 
     
    187191 
    188192       assert(data[0] == 0); 
    189      std::string uuidstr = std::string(34, ' '); 
     193     std::string uuidstr = std::string(33, ' '); 
    190194       uuidstr[0] = '@'; 
    191195       int byte_nbr; 
    192196       for (byte_nbr = 0; byte_nbr < 16; byte_nbr++) { 
    193            uuidstr[byte_nbr * 2 + 1] = hex_char[data[byte_nbr + 1] >> 4]; 
    194            uuidstr[byte_nbr * 2 + 2] = hex_char[data[byte_nbr + 1] & 15]; 
     197           uuidstr[byte_nbr * 2 + 1] = hex_char[(unsigned char)data[byte_nbr + 1] >> 4]; 
     198           uuidstr[byte_nbr * 2 + 2] = hex_char[(unsigned char)data[byte_nbr + 1] & 15]; 
    195199       } 
    196200       uuidstr[33] = 0; 
     
    226230               = (hex_to_bin [uuidstr [byte_nbr * 2 + 1] & 127] << 4) 
    227231               + (hex_to_bin [uuidstr [byte_nbr * 2 + 2] & 127]); 
    228  
    229232       return (data); 
    230233   } 
     
    281284 
    282285          // Dump the message as text or binary 
    283           int is_text = 1; 
    284           for (unsigned int char_nbr = 0; char_nbr < data.size(); char_nbr++) 
    285               if (data [char_nbr] < 32 || data [char_nbr] > 127) 
    286                   is_text = 0; 
    287  
    288286          std::cerr << "[" << std::setw(3) << std::setfill('0') << (int) data.size() << "] "; 
    289287          for (unsigned int char_nbr = 0; char_nbr < data.size(); char_nbr++) { 
    290               if (is_text) { 
    291                   std::cerr << (char) data [char_nbr]; 
     288        if (data [char_nbr] < 32 || data [char_nbr] == 127) { 
     289          std::cerr << "0x" << std::hex << std::setw(2) << std::setfill('0') << (short int) data [char_nbr]; 
     290          //std::cerr.unsetf(std::hex); 
    292291              } else { 
    293                   std::cerr << std::hex << std::setw(2) << std::setfill('0') << (short int) data [char_nbr]; 
     292          std::cerr << (char) data [char_nbr]; 
    294293              } 
    295294          } 
     
    386385}; 
    387386 
     387struct zxmsg : public zmq::message_t { 
     388 
     389  zxmsg(std::string string) : zmq::message_t(string.size()) { 
     390    memcpy(data(), string.data(), string.size()); 
     391  } 
     392  static std::string read(zmq::message_t &msg) { 
     393    return std::string(reinterpret_cast<char*>(msg.data()), msg.size()); 
     394  } 
     395 
     396}; 
     397 
    388398#endif /* ZMSG_H_ */ 
  • libs/protobuf/CMakeLists.txt

    rb38e845 r2b2e9b8  
    1414) 
    1515 
    16 add_library(${TARGET} ${SRCS}) 
     16ADD_LIBRARY(${TARGET} ${SRCS}) 
    1717SET_TARGET_PROPERTIES(${TARGET} PROPERTIES FOLDER "libraries") 
    1818 
  • libs/protobuf/ipc.proto

    r7515d00 r2b2e9b8  
    11package NSCPIPC; 
     2 
     3option optimize_for = LITE_RUNTIME; 
    24 
    35message Common { 
     
    1921  required int32 header_type = 2; 
    2022  required int32 payload_type = 3; 
    21   required int32 additional_packet_count = 4; 
     23  required string cookie = 4; 
     24}; 
     25message MessageSignature { 
     26  required Signature signature = 1; 
     27  required string cookie = 2; 
     28  optional int64 sequence = 3; 
    2229}; 
    2330 
     
    5764}; 
    5865 
     66message MessageRequestEnvelope { 
     67  required RequestEnvelope envelope = 1; 
     68  optional string username = 2; 
     69  optional string password = 3; 
     70  optional int64 sequence = 4; 
     71}; 
     72 
     73message MessageResponseEnvelope { 
     74  required RequestEnvelope envelope = 1; 
     75  required string cookie = 2; 
     76  optional int64 sequence = 3; 
     77} 
     78 
     79 
     80 
    5981message ErrorMessage { 
    6082 
  • modules/DistributedServer/CMakeLists.txt

    r76d1076 r2b2e9b8  
    11cmake_minimum_required(VERSION 2.6) 
    22 
    3 SET(TARGET ZeroMQServer) 
     3SET(TARGET DistributedServer) 
    44   
    55PROJECT(${TARGET}) 
     
    2222  SET(SRCS ${SRCS} 
    2323    stdafx.h 
    24     "${TARGET}.h" 
    25     "${TARGET}.def" 
    26     "handler_impl.hpp" 
     24    ${TARGET}.h 
     25    ${TARGET}.def 
     26    handler_impl.hpp 
     27    queue_manager.hpp 
     28    worker_manager.hpp 
    2729    ${NSCP_INCLUDEDIR}/nscp/packet.hpp 
    2830    ${NSCP_INCLUDEDIR}/nscp/handler.hpp 
     
    3638ADD_DEFINITIONS(${NSCP_GLOBAL_DEFINES}) 
    3739ADD_LIBRARY(${TARGET} MODULE ${SRCS}) 
    38 INCLUDE_DIRECTORIES(${OPENSSL_INCLUDE_DIR}) 
    3940INCLUDE_DIRECTORIES(${ZEROMQ_INCLUDE_DIR}) 
    4041 
     
    4243  ${Boost_FILESYSTEM_LIBRARY} 
    4344  ${NSCP_DEF_PLUGIN_LIB} 
    44   ${OPENSSL_LIBRARIES} 
    4545  ${ZEROMQ_LIBRARY} 
    4646  ${EXTRA_LIB} 
  • modules/DistributedServer/DistributedServer.cpp

    r76d1076 r2b2e9b8  
    2121#include "stdafx.h" 
    2222 
    23 #include <zmq.h> 
    24 #include <zmq.hpp> 
    25  
    26 #include "ZeroMQServer.h" 
     23#include "DistributedServer.h" 
    2724#include <strEx.h> 
    28 #include <time.h> 
    29 #include <queue> 
    30 #include <list> 
    31 #include <nscp/packet.hpp> 
    32 #include <boost/thread.hpp> 
    3325 
    3426#include <config.h> 
    3527#include "handler_impl.hpp" 
     28#include "queue_manager.hpp" 
     29#include "worker_manager.hpp" 
    3630 
    3731#include <settings/client/settings_client.hpp> 
    3832 
    39  
    40  
    41 // 
    42 //  Asynchronous client-to-server (DEALER to ROUTER) 
    43 // 
    44 //  While this example runs in a single process, that is just to make 
    45 //  it easier to start and stop the example. Each task has its own 
    46 //  context and conceptually acts as a separate process. 
    47  
    48 #include <zmq.hpp> 
    49 #include <zmsg.hpp> 
    50  
    51  
    52 struct zxmsg : public zmq::message_t { 
    53  
    54   zxmsg(std::string string) : zmq::message_t(string.size()) { 
    55     memcpy(data(), string.data(), string.size()); 
    56   } 
    57   static std::string read(zmq::message_t &msg) { 
    58     return std::string(reinterpret_cast<char*>(msg.data()), msg.size()); 
    59   } 
    60  
    61 }; 
    62  
    63 void start_queue(zmq::context_t *context, boost::thread_group *threads, int my_thread_count); 
    64  
    65 void queue_thread(zmq::context_t *context) { 
    66  
    67     zmq::socket_t frontend(*context, ZMQ_XREP); 
    68   zmq::socket_t backend(*context, ZMQ_XREP); 
    69   zmq::socket_t control(*context, ZMQ_REP); 
    70     frontend.bind("tcp://*:5555"); 
    71   backend.bind("inproc://backend"); 
    72   control.bind("inproc://control"); 
    73  
    74   std::queue<std::string> worker_queue; 
    75  
    76   try { 
    77     while (1) { 
    78  
    79       zmq::pollitem_t items [] = { 
    80         { backend,  0, ZMQ_POLLIN, 0 }, 
    81         { control,  0, ZMQ_POLLIN, 0 }, 
    82         { frontend, 0, ZMQ_POLLIN, 0 } 
    83       }; 
    84       //  Poll frontend only if we have available workers 
    85       if (worker_queue.size()) 
    86         zmq::poll(items, 3, -1); 
    87       else 
    88         zmq::poll(items, 2, -1); 
    89  
    90       //  Handle worker activity on backend 
    91       if (items[0].revents & ZMQ_POLLIN) { 
    92         NSC_DEBUG_MSG_STD(_T("---got worker msg---")); 
    93         zmsg zm; 
    94         if (!zm.recv(backend)) { 
    95           NSC_DEBUG_MSG_STD(_T("---failed to read---")); 
    96           return; 
    97         } 
    98  
    99         //  Use worker address for LRU routing 
    100         //assert (worker_queue.size() < worker_count); 
    101  
    102         zmsg::opstring_t s = zm.unwrap(); 
    103         if (s) 
    104           worker_queue.push(*s); 
    105  
    106         //  Return reply to client if it's not a READY 
    107         zmsg::opstring_t a = zm.address(); 
    108         if (a && *a == "READY") 
    109           zm.clear(); 
    110         else 
    111           zm.send(frontend); 
    112       } 
    113       if (items[1].revents & ZMQ_POLLIN) { 
    114         NSC_DEBUG_MSG_STD(_T("---got control msg---")); 
    115         zmq::message_t query; 
    116         if (!control.recv(&query)) { 
    117           NSC_DEBUG_MSG_STD(_T("---failed to read---")); 
    118           return; 
    119         } 
    120         std::string command = zxmsg::read(query); 
    121         NSC_DEBUG_MSG_STD(_T("=>") + to_wstring(command)); 
    122         zxmsg resp("OK"); 
    123         control.send(resp); 
    124       } 
    125       if (items[2].revents & ZMQ_POLLIN) { 
    126         NSC_DEBUG_MSG_STD(_T("---got client msg---")); 
    127         //  Now get next client request, route to next worker 
    128  
    129         zmsg zm; 
    130         if (!zm.recv(frontend)) { 
    131           NSC_DEBUG_MSG_STD(_T("---failed to read---")); 
    132           return; 
    133         } 
    134  
    135         //  REQ socket in worker needs an envelope delimiter 
    136         zm.wrap(worker_queue.front(), ""); 
    137         zm.send(backend); 
    138  
    139         //  Dequeue and drop the next worker address 
    140         worker_queue.pop(); 
    141       } 
    142     } 
    143   } catch (zmq::error_t &e) { 
    144     NSC_LOG_ERROR(_T("Failed in process: ") + to_wstring(e.what())); 
    145   } 
    146 } 
    147 int worker_thread(zmq::context_t *context, std::string foo); 
    148  
    149 void start_queue(zmq::context_t *context, boost::thread_group &threads, int my_thread_count) { 
    150   boost::thread *t = new boost::thread(queue_thread, context); 
    151   threads.add_thread(t); 
    152  
    153   zmq::socket_t control(*context, ZMQ_REQ); 
    154   for (int i=0;i<10;i++) { 
    155     try { 
    156       control.connect("inproc://control"); 
    157       NSC_LOG_ERROR_STD(_T("connected to control...")); 
    158       zxmsg msg("start"); 
    159       if (!control.send(msg)) { 
    160         NSC_LOG_ERROR_STD(_T("Failed to send start")); 
    161         continue; 
    162       } 
    163       break; 
    164     } catch (zmq::error_t &e) { 
    165       NSC_LOG_ERROR_STD(_T("control: ") + utf8::cvt<std::wstring>(e.what())); 
    166     } 
    167     boost::this_thread::sleep(boost::posix_time::seconds(1)); 
    168   } 
    169   zmq::message_t resp; 
    170   control.recv(&resp); 
    171   NSC_DEBUG_MSG(_T("Got: ") + to_wstring(zxmsg::read(resp))); 
    172  
    173  
    174   for (std::size_t i = 0; i < my_thread_count; ++i) { 
    175     boost::thread *t = new boost::thread(worker_thread, context, "wt_" + to_string(i)); 
    176     threads.add_thread(t); 
    177   } 
    178 } 
    179  
    180 void stop_queue(zmq::context_t *context) { 
    181   zmq::socket_t control(*context, ZMQ_REQ); 
    182   for (int i=0;i<10;i++) { 
    183     try { 
    184       control.connect("inproc://control"); 
    185       NSC_LOG_ERROR_STD(_T("connected to control...")); 
    186       zxmsg msg("stop"); 
    187       if (!control.send(msg)) { 
    188         NSC_LOG_ERROR_STD(_T("Failed to send start")); 
    189         continue; 
    190       } 
    191       break; 
    192     } catch (zmq::error_t &e) { 
    193       NSC_LOG_ERROR_STD(_T("control: ") + utf8::cvt<std::wstring>(e.what())); 
    194     } 
    195     boost::this_thread::sleep(boost::posix_time::seconds(1)); 
    196   } 
    197 } 
    198  
    199 int worker_count = 0; 
    200 int worker_thread(zmq::context_t *context, std::string key) { 
    201   //boost::shared_ptr<nscp::handler> handler = new handler_impl(); 
    202   try { 
    203     //zmq::context_t context(1); 
    204     zmq::socket_t worker(*context, ZMQ_REQ); 
    205  
    206     //  Set random identity to make tracing easier 
    207     std::string identity = key; 
    208     worker.connect("inproc://backend"); 
    209  
    210     //  Tell queue we're ready for work 
    211     NSC_DEBUG_MSG_STD(_T("I: (") + to_wstring(identity) + _T(") worker ready")); 
    212     zxmsg msg("READY"); 
    213     if (!worker.send(msg)) { 
    214       NSC_DEBUG_MSG_STD(_T("FAIELD TO SEND!!!!!!")); 
    215     } 
    216  
    217     while (1) { 
    218       zmsg zm; 
    219       if (!zm.recv(worker)) { 
    220         NSC_DEBUG_MSG_STD(_T("Failed to read in (") + to_wstring(identity) + _T(")")); 
    221         return 0; 
    222       } 
    223       NSC_DEBUG_MSG_STD(_T("I: (") + to_wstring(identity) + _T(") from: ") + to_wstring(*zm.address())); 
    224       NSC_DEBUG_MSG_STD(_T("I: (") + to_wstring(identity) + _T(") sent: ") + to_wstring(*zm.body())); 
    225       //sleep (1);              //  Do some heavy work 
    226       zm.send(worker); 
    227     } 
    228   } catch (const zmq::error_t &e) { 
    229     NSC_LOG_ERROR_STD(utf8::cvt<std::wstring>(e.what())); 
    230   } 
    231   return 0; 
    232 } 
    233 ////////////////////////////////////////////////////////////////////////// 
    234  
    235 static zmq::socket_t * s_client_socket(zmq::context_t* context) { 
    236   NSC_DEBUG_MSG(_T("I: (re)connecting to server")); 
    237   zmq::socket_t *client = new zmq::socket_t(*context, ZMQ_REQ); 
    238   client->connect ("tcp://localhost:5555"); 
    239  
    240   //  Configure socket to not wait at close time 
    241   //int linger = 0; 
    242   //client->setsockopt(ZMQ_LINGER, &linger, sizeof (linger)); 
    243   return client; 
    244 } 
    245  
    246 int start_client (zmq::context_t *context, int request_retries, int timeout) { 
    247   zmq::socket_t * client = s_client_socket(context); 
    248  
    249   int sequence = 0; 
    250   int retries_left = request_retries; 
    251  
    252   while (retries_left) { 
    253     std::stringstream request; 
    254     request << ++sequence; 
    255     zxmsg msg(request.str()); 
    256     client->send(msg); 
    257     //sleep (1); 
    258  
    259     bool expect_reply = true; 
    260     while (expect_reply) { 
    261       //  Poll socket for a reply, with timeout 
    262       zmq::pollitem_t items[] = { { *client, 0, ZMQ_POLLIN, 0 } }; 
    263       zmq::poll (&items[0], 1, timeout * 1000); 
    264  
    265       //  If we got a reply, process it 
    266       if (items[0].revents & ZMQ_POLLIN) { 
    267         //  We got a reply from the server, must match sequence 
    268         zmq::message_t msg; 
    269         client->recv(&msg); 
    270         std::string reply(reinterpret_cast<char*>(msg.data()), msg.size()); 
    271         if (atoi (reply.c_str ()) == sequence) { 
    272           NSC_DEBUG_MSG(_T("I: server replied OK (") + to_wstring(reply) + _T(")")); 
    273           retries_left = request_retries; 
    274           expect_reply = false; 
    275         } else { 
    276           NSC_DEBUG_MSG(_T("E: malformed reply from server (") + to_wstring(reply) + _T(")")); 
    277         } 
    278       } 
    279       else 
    280         if (--retries_left == 0) { 
    281           NSC_DEBUG_MSG(_T("E: server seems to be offline, abandoning")); 
    282           expect_reply = false; 
    283           break; 
    284         } else { 
    285           NSC_DEBUG_MSG(_T("W: no response from server, retrying...")); 
    286           //  Old socket will be confused; close it and open a new one 
    287           delete client; 
    288           client = s_client_socket(context); 
    289           //  Send request again, on new socket 
    290           zxmsg msg(request.str()); 
    291           client->send(msg); 
    292         } 
    293     } 
    294   } 
    295   delete client; 
    296   return 0; 
    297 } 
    298  
    299  
    30033namespace sh = nscapi::settings_helper; 
    30134 
    302 NSCPListener::NSCPListener() : context(NULL) { 
     35DistributedServer::DistributedServer() : context(NULL) { 
    30336} 
    304 NSCPListener::~NSCPListener() { 
     37DistributedServer::~DistributedServer() { 
    30538  delete context; 
    30639} 
    30740 
    308 bool NSCPListener::loadModule() { 
     41bool DistributedServer::loadModule() { 
    30942  return false; 
    31043} 
    31144 
    312 bool NSCPListener::loadModuleEx(std::wstring alias, NSCAPI::moduleLoadMode mode) { 
     45bool DistributedServer::loadModuleEx(std::wstring alias, NSCAPI::moduleLoadMode mode) { 
     46  std::wstring host, suffix, server_mode; 
     47  unsigned int thread_count; 
    31348  try { 
    314     /* 
    31549    sh::settings_registry settings(get_settings_proxy()); 
    316     settings.set_alias(_T("nscp"), alias, _T("server")); 
     50    settings.set_alias(_T("distributed"), alias, _T("server")); 
    31751 
    31852    settings.alias().add_path_to_settings() 
    319       (_T("NSCP SERVER SECTION"), _T("Section for NSCP (NSCPListener.dll) (check_nscp) protocol options.")) 
     53      (_T("DISTRIBUTED NSCP SERVER SECTION"), _T("Section for Distributed NSCP (DistributedServer) (check_nscp) protocol options.")) 
    32054      ; 
    32155 
    32256    settings.alias().add_key_to_settings() 
    323       (_T("port"), sh::uint_key(&info_.port, 5668), 
    324       _T("PORT NUMBER"), _T("Port to use for NSCP.")) 
     57      (_T("host"), sh::wstring_key(&host, _T("tcp://*:5555")), 
     58      _T("HOST TO BIND/CONNECT TO"), _T("The host to bind/connect to")) 
    32559 
    326       ; 
     60      (_T("suffix"), sh::wstring_key(&suffix, _T("ncsp.dist")), 
     61      _T("SUFFIX FOR INTERNAL CHANNELS"), _T("Has to be uniq on each server")) 
    32762 
    328     settings.alias().add_parent(_T("/settings/default")).add_key_to_settings() 
     63      (_T("worker pool size"), sh::uint_key(&thread_count, 10), 
     64      _T("WORKER POOL SIZE"), _T("Number of threads to spawn for the worker pool")) 
    32965 
    330       (_T("thread pool"), sh::uint_key(&info_.thread_pool_size, 10), 
    331       _T("THREAD POOL"), _T("")) 
    332  
    333       (_T("bind to"), sh::string_key(&info_.address), 
    334       _T("BIND TO ADDRESS"), _T("Allows you to bind server to a specific local address. This has to be a dotted ip address not a host name. Leaving this blank will bind to all available IP addresses.")) 
    335  
    336       (_T("socket queue size"), sh::int_key(&info_.back_log, 0), 
    337       _T("LISTEN QUEUE"), _T("Number of sockets to queue before starting to refuse new incoming connections. This can be used to tweak the amount of simultaneous sockets that the server accepts.")) 
    338  
    339       (_T("allowed hosts"), sh::string_fun_key<std::wstring>(boost::bind(&socket_helpers::allowed_hosts_manager::set_source, &info_.allowed_hosts, _1), _T("127.0.0.1")), 
    340       _T("ALLOWED HOSTS"), _T("A comaseparated list of allowed hosts. You can use netmasks (/ syntax) or * to create ranges.")) 
    341  
    342       (_T("cache allowed hosts"), sh::bool_key(&info_.allowed_hosts.cached, true), 
    343       _T("CACHE ALLOWED HOSTS"), _T("If hostnames should be cached, improves speed and security somewhat but wont allow you to have dynamic IPs for your nagios server.")) 
    344  
    345       (_T("timeout"), sh::uint_key(&info_.timeout, 30), 
    346       _T("TIMEOUT"), _T("Timeout when reading packets on incoming sockets. If the data has not arrived within this time we will bail out.")) 
    347  
    348       (_T("use ssl"), sh::bool_key(&info_.use_ssl, true), 
    349       _T("ENABLE SSL ENCRYPTION"), _T("This option controls if SSL should be enabled.")) 
    350  
    351       (_T("certificate"), sh::wpath_key(&info_.certificate, _T("${certificate-path}/nrpe_dh_512.pem")), 
    352       _T("SSL CERTIFICATE"), _T("")) 
    353  
     66      (_T("mode"), sh::wstring_key(&server_mode, _T("master")), 
     67      _T("OPERATION MODE"), _T("Mode of operation can only be master now but will add more later on (such as slave)")) 
    35468      ; 
    35569 
    35670    settings.register_all(); 
    35771    settings.notify(); 
    358     */ 
    35972 
    360     context = new zmq::context_t(5); 
    361     start_queue(context, threads, 5); 
    362     //start_client(context, 5, 10); 
    363     //std::wcout << _T("---QUEUE STARTED---") << std::endl; 
    364     //start_client(10, 30); 
     73    context = new zmq::context_t(2); 
     74 
     75    zeromq_queue::connection_info queue_info(to_string(host), to_string(suffix)); 
     76    zeromq_queue::queue_manager queue; 
     77    queue.start(context, threads, queue_info); 
     78 
     79    zeromq_worker::connection_info worker_info(queue_info.get_backend(), to_string(suffix), thread_count); 
     80    zeromq_worker::worker_manager workers; 
     81    workers.start(context, threads, worker_info); 
    36582 
    36683  } catch (std::exception &e) { 
     
    37188    return false; 
    37289  } 
    373  
    374  
    37590  return true; 
    37691} 
    37792 
    378 void free_z_buffer(void *data, void *hint) { 
    379   delete [] reinterpret_cast<char*>(data); 
    380 } 
    381 void* create_z_buffer(std::string &buffer) { 
    382   char *tmp = new char[buffer.size()+1]; 
    383   memcpy(tmp, buffer.c_str(), buffer.size()); 
    384   return tmp; 
    385 } 
    38693 
    387  
    388  
    389 struct zeromq_server { 
    390   zmq::socket_t socket; 
    391   boost::shared_ptr<nscp::handler> handler; 
    392  
    393   zeromq_server(zmq::context_t context, boost::shared_ptr<nscp::handler> handler) : socket(context, ZMQ_REP), handler(handler) { 
    394   } 
    395  
    396   void start_server(std::string address = "tcp://*:5555") { 
    397     socket.bind(address.c_str()); 
    398  
    399     while (true) { 
    400       zmq::message_t request; 
    401       socket.recv(&request); 
    402  
    403       std::list<nscp::packet> read_list; 
    404       bool has_more = true; 
    405       while (has_more) { 
    406         nscp::packet packet; 
    407         std::string buffer(reinterpret_cast<char*>(request.data()), request.size()); 
    408         packet.read_all(buffer); 
    409         read_list.push_back(packet); 
    410         std::cout << "<<< " << packet.to_string() << std::endl; 
    411         has_more = packet.signature.additional_packet_count > 0; 
    412       } 
    413       std::list<nscp::packet> result = handler->process_all(read_list); 
    414       send(result); 
    415     } 
    416   } 
    417   void send(std::list<nscp::packet> packets) { 
    418     unsigned long count = packets.size(); 
    419     BOOST_FOREACH(nscp::packet &packet, packets) { 
    420       packet.signature.additional_packet_count = count--; 
    421       std::cout << ">>> " << packet.to_string() << std::endl; 
    422       std::string buffer = packet.write_string(); 
    423       zmq::message_t msg(create_z_buffer(buffer), buffer.size(), &free_z_buffer); 
    424       socket.send(msg); 
    425     } 
    426   } 
    427 }; 
    428  
    429 bool NSCPListener::unloadModule() { 
     94bool DistributedServer::unloadModule() { 
    43095  try { 
    43196    delete context; 
    43297    context = NULL; 
    433     //stop_queue(context); 
    43498    threads.join_all(); 
    43599  } catch (...) { 
     
    441105 
    442106 
    443 bool NSCPListener::hasCommandHandler() { 
     107bool DistributedServer::hasCommandHandler() { 
    444108  return false; 
    445109} 
    446 bool NSCPListener::hasMessageHandler() { 
     110bool DistributedServer::hasMessageHandler() { 
    447111  return false; 
    448112} 
    449113 
    450114NSC_WRAP_DLL(); 
    451 NSC_WRAPPERS_MAIN_DEF(NSCPListener); 
     115NSC_WRAPPERS_MAIN_DEF(DistributedServer); 
    452116NSC_WRAPPERS_IGNORE_MSG_DEF(); 
    453117NSC_WRAPPERS_IGNORE_CMD_DEF(); 
  • modules/DistributedServer/DistributedServer.def

    r76d1076 r2b2e9b8  
    1 LIBRARY NRPEListener 
     1LIBRARY DistributedServer 
    22 
    33EXPORTS 
  • modules/DistributedServer/DistributedServer.h

    r76d1076 r2b2e9b8  
    2222NSC_WRAPPERS_MAIN(); 
    2323 
    24 class NSCPListener : public nscapi::impl::simple_plugin { 
     24#include <zmq.hpp> 
     25 
     26class DistributedServer : public nscapi::impl::simple_plugin { 
    2527public: 
    2628  zmq::context_t  *context; 
    2729  boost::thread_group threads; 
    28   NSCPListener(); 
    29   virtual ~NSCPListener(); 
     30  DistributedServer(); 
     31  virtual ~DistributedServer(); 
    3032  // Module calls 
    3133  bool loadModule(); 
     
    3537 
    3638  static std::wstring getModuleName() { 
    37     return _T("ZeroMQ ServerNSCP server"); 
     39    return _T("Distributed server"); 
    3840  } 
    3941  static nscapi::plugin_wrapper::module_version getModuleVersion() { 
     
    4244  } 
    4345  static std::wstring getModuleDescription() { 
    44     return _T("A simple server that listens for incoming NSCP connection and handles them."); 
     46    return _T("A simple server that listens for incoming distributed requests."); 
    4547  } 
    4648 
  • modules/DistributedServer/handler_impl.cpp

    r76d1076 r2b2e9b8  
    77#include "handler_impl.hpp" 
    88 
    9 std::list<nscp::packet> handler_impl::process(nscp::packet &packet) { 
    10   std::list<nscp::packet> result; 
     9#include "stdafx.h" 
    1110 
     11#include <boost/asio.hpp> 
     12#include <protobuf/plugin.pb.h> 
     13#include <nscapi/functions.hpp> 
     14 
     15#include "handler_impl.hpp" 
     16 
     17NSCAPI::nagiosReturn handler_impl::handle_query_request(const std::string &request, Plugin::QueryRequestMessage &msg, std::string &reply) { 
    1218  Plugin::Common::Header hdr; 
     19  hdr.CopyFrom(msg.header()); 
    1320 
    14   if (nscp::checks::is_query_request(packet)) { 
    15     Plugin::QueryRequestMessage msg; 
    16     msg.ParseFromString(packet.payload); 
    17     hdr.CopyFrom(msg.header()); 
     21  Plugin::QueryResponseMessage response; 
     22  // @todo: swap data in the dhear (ie. sender /recipent) 
     23  response.mutable_header()->CopyFrom(hdr); 
    1824 
    19     // @todo: Make split optional 
    20     // @todo: Make this return ONE response not multiple responses 
     25  // @todo: Make split optional 
     26  for (int i=0;i<msg.payload_size();i++) { 
     27    const Plugin::QueryRequestMessage_Request &payload = msg.payload(i); 
     28    std::string outBuffer; 
     29    std::wstring command = utf8::cvt<std::wstring>(payload.command()); 
    2130 
    22     for (int i=0;i<msg.payload_size();i++) { 
    23       const Plugin::QueryRequestMessage_Request &payload = msg.payload(i); 
    24       std::string outBuffer; 
    25       std::wstring command = utf8::cvt<std::wstring>(payload.command()); 
    26  
    27       if (command.empty() || command == _T("_NSCP_CHECK")) { 
    28         nscapi::functions::create_simple_query_response(_T("_NSCP_CHECK"), NSCAPI::returnOK, _T("I (") + nscapi::plugin_singleton->get_core()->getApplicationVersionString() + _T(") seem to be doing fine..."), _T(""), outBuffer); 
    29       } else if (!allowArgs_ && payload.arguments_size() > 0) { 
    30         nscapi::functions::create_simple_query_response_unknown(command, _T("Arguments not allowed for command: ") + command, _T(""), outBuffer); 
    31       } else { 
    32         bool ok = true; 
    33         if (!allowNasty_) { 
    34           for (int j=0;j<payload.arguments_size();j++) { 
    35             if (payload.arguments(j).find_first_of(NASTY_METACHARS) != std::wstring::npos) { 
    36               ok = false; 
    37               break; 
    38             } 
     31    if (command.empty() || command == _T("_NSCP_CHECK")) { 
     32      nscapi::functions::create_simple_query_response(_T("_NSCP_CHECK"), NSCAPI::returnOK, _T("I (") + nscapi::plugin_singleton->get_core()->getApplicationVersionString() + _T(") seem to be doing fine..."), _T(""), outBuffer); 
     33    } else if (!allowArgs_ && payload.arguments_size() > 0) { 
     34      nscapi::functions::create_simple_query_response_unknown(command, _T("Arguments not allowed for command: ") + command, _T(""), reply); 
     35    } else { 
     36      bool ok = true; 
     37      if (!allowNasty_) { 
     38        for (int j=0;j<payload.arguments_size();j++) { 
     39          if (payload.arguments(j).find_first_of(NASTY_METACHARS) != std::wstring::npos) { 
     40            ok = false; 
     41            break; 
    3942          } 
    4043        } 
    41         if (ok) { 
    42           std::string tmpBuffer; 
    43           Plugin::QueryRequestMessage tmp; 
    44           tmp.mutable_header()->CopyFrom(hdr); 
    45           tmp.add_payload()->CopyFrom(payload); 
    46           tmp.SerializeToString(&tmpBuffer); 
    47           NSCAPI::nagiosReturn returncode = nscapi::plugin_singleton->get_core()->query(command, tmpBuffer, outBuffer); 
    48           if (returncode == NSCAPI::returnIgnored) { 
    49             nscapi::functions::create_simple_query_response_unknown(command, _T("Command was not found: ") + command, _T(""), outBuffer); 
    50           } 
    51         } else { 
    52           nscapi::functions::create_simple_query_response_unknown(command, _T("Nasty arguments not allowed for command: ") + command, _T(""), outBuffer); 
     44      } 
     45      if (ok) { 
     46        std::string tmpBuffer; 
     47        Plugin::QueryRequestMessage tmp; 
     48        tmp.mutable_header()->CopyFrom(hdr); 
     49        tmp.add_payload()->CopyFrom(payload); 
     50        tmp.SerializeToString(&tmpBuffer); 
     51        NSCAPI::nagiosReturn returncode = nscapi::plugin_singleton->get_core()->query(command, tmpBuffer, outBuffer); 
     52        if (returncode == NSCAPI::returnIgnored) { 
     53          nscapi::functions::create_simple_query_response_unknown(command, _T("Command was not found: ") + command, _T(""), outBuffer); 
    5354        } 
     55      } else { 
     56        nscapi::functions::create_simple_query_response_unknown(command, _T("Nasty arguments not allowed for command: ") + command, _T(""), outBuffer); 
    5457      } 
    55       result.push_back(nscp::factory::create_query_response(outBuffer)); 
    5658    } 
    57   } else if (nscp::checks::is_query_response(packet)) { 
    58  
    59     // @todo handle submission here 
    60  
    61   } else { 
    62     NSC_LOG_ERROR(_T("Unknown packet: ") + packet.to_wstring()); 
    63     result.push_back(create_error(_T("Unknown packet: ") + packet.to_wstring())); 
     59    Plugin::QueryResponseMessage tmpResponse; 
     60    tmpResponse.ParseFromString(outBuffer); 
     61    for (int i=0;i<tmpResponse.payload_size();i++) { 
     62      response.add_payload()->CopyFrom(tmpResponse.payload(i)); 
     63    } 
    6464  } 
    65   return result; 
     65  response.SerializeToString(&reply); 
     66  // @todo: fixme this should probably be an aggregate right? 
     67  return NSCAPI::isSuccess; 
    6668} 
    6769 
     
    6971 
    7072 
     73 
     74 
     75 
  • modules/DistributedServer/handler_impl.hpp

    r76d1076 r2b2e9b8  
    33#include <nscp/packet.hpp> 
    44#include <nscp/handler.hpp> 
    5 #include <boost/tuple/tuple.hpp> 
    65 
    76class handler_impl : public nscp::handler, private boost::noncopyable { 
    8   unsigned int payload_length_; 
    97  bool allowArgs_; 
    108  bool allowNasty_; 
    119  bool noPerfData_; 
    1210public: 
    13   handler_impl(unsigned int payload_length) : payload_length_(payload_length), noPerfData_(false), allowNasty_(false), allowArgs_(false) {} 
     11  handler_impl() : noPerfData_(false), allowNasty_(false), allowArgs_(false) {} 
    1412 
    15   unsigned int get_payload_length() { 
    16     return payload_length_; 
    17   } 
    18   void set_payload_length(unsigned int payload) { 
    19     payload_length_ = payload; 
    20   } 
    21  
    22   std::list<nscp::packet> process(nscp::packet &buffer); 
     13  NSCAPI::nagiosReturn handle_query_request(const std::string &request, Plugin::QueryRequestMessage &msg, std::string &reply); 
    2314 
    2415  nscp::packet create_error(std::wstring msg) { 
  • modules/DistributedServer/module.cmake

    r76d1076 r2b2e9b8  
    22  SET (BUILD_MODULE 1) 
    33ELSE(ZEROMQ_FOUND) 
    4   MESSAGE(STATUS "Disabling ZeroMQServer since zeromq was not found") 
     4  MESSAGE(STATUS "Disabling DistributedServer since zeromq was not found") 
    55ENDIF(ZEROMQ_FOUND) 
    66 
  • modules/NRPEClient/CMakeLists.txt

    r438998b r2b2e9b8  
    1515 
    1616ADD_DEFINITIONS(${NSCP_GLOBAL_DEFINES}) 
     17IF(OPENSSL_FOUND) 
     18  ADD_DEFINITIONS(-DUSE_SSL) 
     19ENDIF(OPENSSL_FOUND) 
    1720 
    1821IF(WIN32) 
  • modules/NRPEServer/CMakeLists.txt

    r438998b r2b2e9b8  
    2323 
    2424ADD_DEFINITIONS(${NSCP_GLOBAL_DEFINES}) 
     25IF(OPENSSL_FOUND) 
     26  ADD_DEFINITIONS(-DUSE_SSL) 
     27ENDIF(OPENSSL_FOUND) 
    2528 
    2629IF(WIN32) 
  • modules/NSCPClient/NSCPClient.cpp

    r7515d00 r2b2e9b8  
    139139} 
    140140 
    141 std::wstring NSCPClient::setup(client::configuration config, const std::wstring &command) { 
     141std::wstring NSCPClient::setup(client::configuration &config, const std::wstring &command) { 
    142142  clp_handler_impl *handler = new clp_handler_impl(this); 
    143143  add_local_options(config.local, handler->local_data); 
  • modules/NSCPClient/NSCPClient.h

    r7515d00 r2b2e9b8  
    108108  void add_command(std::wstring key, std::wstring args); 
    109109  void add_target(std::wstring key, std::wstring args); 
    110   std::wstring setup(client::configuration config, const std::wstring &command); 
     110  std::wstring setup(client::configuration &config, const std::wstring &command); 
    111111 
    112112}; 
Note: See TracChangeset for help on using the changeset viewer.