Blender V2.61 - r43446

device_network.h

Go to the documentation of this file.
00001 /*
00002  * Copyright 2011, Blender Foundation.
00003  *
00004  * This program is free software; you can redistribute it and/or
00005  * modify it under the terms of the GNU General Public License
00006  * as published by the Free Software Foundation; either version 2
00007  * of the License, or (at your option) any later version.
00008  *
00009  * This program is distributed in the hope that it will be useful,
00010  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012  * GNU General Public License for more details.
00013  *
00014  * You should have received a copy of the GNU General Public License
00015  * along with this program; if not, write to the Free Software Foundation,
00016  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
00017  */
00018 
00019 #ifndef __DEVICE_NETWORK_H__
00020 #define __DEVICE_NETWORK_H__
00021 
00022 #ifdef WITH_NETWORK
00023 
00024 #include <boost/archive/text_iarchive.hpp>
00025 #include <boost/archive/text_oarchive.hpp>
00026 #include <boost/array.hpp>
00027 #include <boost/asio.hpp>
00028 #include <boost/bind.hpp>
00029 #include <boost/serialization/vector.hpp>
00030 #include <boost/thread.hpp>
00031 
00032 #include <iostream>
00033 
00034 #include "util_foreach.h"
00035 #include "util_list.h"
00036 #include "util_string.h"
00037 
00038 CCL_NAMESPACE_BEGIN
00039 
00040 using std::cout;
00041 using std::cerr;
00042 using std::endl;
00043 using std::hex;
00044 using std::setw;
00045 using std::exception;
00046 
00047 using boost::asio::ip::tcp;
00048 
00049 static const int SERVER_PORT = 5120;
00050 static const int DISCOVER_PORT = 5121;
00051 static const string DISCOVER_REQUEST_MSG = "REQUEST_RENDER_SERVER_IP";
00052 static const string DISCOVER_REPLY_MSG = "REPLY_RENDER_SERVER_IP";
00053 
00054 typedef struct RPCSend {
00055     RPCSend(tcp::socket& socket_, const string& name_ = "")
00056     : name(name_), socket(socket_), archive(archive_stream)
00057     {
00058         archive & name_;
00059     }
00060 
00061     void write()
00062     {
00063         boost::system::error_code error;
00064 
00065         /* get string from stream */
00066         string archive_str = archive_stream.str();
00067 
00068         /* first send fixed size header with size of following data */
00069         ostringstream header_stream;
00070         header_stream << setw(8) << hex << archive_str.size();
00071         string header_str = header_stream.str();
00072 
00073         boost::asio::write(socket,
00074             boost::asio::buffer(header_str),
00075             boost::asio::transfer_all(), error);
00076 
00077         if(error.value())
00078             cout << "Network send error: " << error.message() << "\n";
00079 
00080         /* then send actual data */
00081         boost::asio::write(socket,
00082             boost::asio::buffer(archive_str),
00083             boost::asio::transfer_all(), error);
00084         
00085         if(error.value())
00086             cout << "Network send error: " << error.message() << "\n";
00087     }
00088 
00089     void write_buffer(void *buffer, size_t size)
00090     {
00091         boost::system::error_code error;
00092 
00093         boost::asio::write(socket,
00094             boost::asio::buffer(buffer, size),
00095             boost::asio::transfer_all(), error);
00096         
00097         if(error.value())
00098             cout << "Network send error: " << error.message() << "\n";
00099     }
00100 
00101     string name;
00102     tcp::socket& socket;
00103     ostringstream archive_stream;
00104     boost::archive::text_oarchive archive;
00105 } RPCSend;
00106 
00107 typedef struct RPCReceive {
00108     RPCReceive(tcp::socket& socket_)
00109     : socket(socket_), archive_stream(NULL), archive(NULL)
00110     {
00111         /* read head with fixed size */
00112         vector<char> header(8);
00113         size_t len = boost::asio::read(socket, boost::asio::buffer(header));
00114 
00115         /* verify if we got something */
00116         if(len == header.size()) {
00117             /* decode header */
00118             string header_str(&header[0], header.size());
00119             istringstream header_stream(header_str);
00120 
00121             size_t data_size;
00122 
00123             if((header_stream >> hex >> data_size)) {
00124                 vector<char> data(data_size);
00125                 size_t len = boost::asio::read(socket, boost::asio::buffer(data));
00126 
00127                 if(len == data_size) {
00128                     archive_str = (data.size())? string(&data[0], data.size()): string("");
00129                     /*istringstream archive_stream(archive_str);
00130                     boost::archive::text_iarchive archive(archive_stream);*/
00131                     archive_stream = new istringstream(archive_str);
00132                     archive = new boost::archive::text_iarchive(*archive_stream);
00133 
00134                     *archive & name;
00135                 }
00136                 else
00137                     cout << "Network receive error: data size doens't match header\n";
00138             }
00139             else
00140                 cout << "Network receive error: can't decode data size from header\n";
00141         }
00142         else
00143             cout << "Network receive error: invalid header size\n";
00144     }
00145 
00146     ~RPCReceive()
00147     {
00148         delete archive;
00149         delete archive_stream;
00150     }
00151 
00152     void read_buffer(void *buffer, size_t size)
00153     {
00154         size_t len = boost::asio::read(socket, boost::asio::buffer(buffer, size));
00155 
00156         if(len != size)
00157             cout << "Network receive error: buffer size doesn't match expected size\n";
00158     }
00159 
00160     string name;
00161     tcp::socket& socket;
00162     string archive_str;
00163     istringstream *archive_stream;
00164     boost::archive::text_iarchive *archive;
00165 } RPCReceive;
00166 
00167 class ServerDiscovery {
00168 public:
00169     ServerDiscovery(bool discover = false)
00170     : listen_socket(io_service), collect_servers(false)
00171     {
00172         /* setup listen socket */
00173         listen_endpoint.address(boost::asio::ip::address_v4::any());
00174         listen_endpoint.port(DISCOVER_PORT);
00175 
00176         listen_socket.open(listen_endpoint.protocol());
00177 
00178         boost::asio::socket_base::reuse_address option(true);
00179         listen_socket.set_option(option);
00180 
00181         listen_socket.bind(listen_endpoint);
00182 
00183         /* setup receive callback */
00184         async_receive();
00185 
00186         /* start server discovery */
00187         if(discover) {
00188             collect_servers = true;
00189             servers.clear();
00190 
00191             broadcast_message(DISCOVER_REQUEST_MSG);
00192         }
00193 
00194         /* start thread */
00195         work = new boost::asio::io_service::work(io_service);
00196         thread = new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service));
00197     }
00198 
00199     ~ServerDiscovery()
00200     {
00201         io_service.stop();
00202         thread->join();
00203         delete thread;
00204         delete work;
00205     }
00206 
00207     list<string> get_server_list()
00208     {
00209         list<string> result;
00210 
00211         mutex.lock();
00212         result = servers;
00213         mutex.unlock();
00214 
00215         return result;
00216     }
00217 
00218 private:
00219     void handle_receive_from(const boost::system::error_code& error, size_t size)
00220     {
00221         if(error) {
00222             cout << "Server discovery receive error: " << error.message() << "\n";
00223             return;
00224         }
00225 
00226         if(size > 0) {
00227             string msg = string(receive_buffer, size);
00228 
00229             /* handle incoming message */
00230             if(collect_servers) {
00231                 if(msg == DISCOVER_REPLY_MSG) {
00232                     string address = receive_endpoint.address().to_string();
00233 
00234                     mutex.lock();
00235 
00236                     /* add address if it's not already in the list */
00237                     bool found = false;
00238 
00239                     foreach(string& server, servers)
00240                         if(server == address)
00241                             found = true;
00242 
00243                     if(!found)
00244                         servers.push_back(address);
00245 
00246                     mutex.unlock();
00247                 }
00248             }
00249             else {
00250                 /* reply to request */
00251                 if(msg == DISCOVER_REQUEST_MSG)
00252                     broadcast_message(DISCOVER_REPLY_MSG);
00253             }
00254         }
00255 
00256         async_receive();
00257     }
00258 
00259     void async_receive()
00260     {
00261         listen_socket.async_receive_from(
00262             boost::asio::buffer(receive_buffer), receive_endpoint,
00263             boost::bind(&ServerDiscovery::handle_receive_from, this,
00264             boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
00265     }
00266 
00267     void broadcast_message(const string& msg)
00268     {
00269         /* setup broadcast socket */
00270         boost::asio::ip::udp::socket socket(io_service);
00271 
00272         socket.open(boost::asio::ip::udp::v4());
00273 
00274         boost::asio::socket_base::broadcast option(true);
00275         socket.set_option(option);
00276 
00277         boost::asio::ip::udp::endpoint broadcast_endpoint(
00278             boost::asio::ip::address::from_string("255.255.255.255"), DISCOVER_PORT);
00279 
00280         /* broadcast message */
00281         socket.send_to(boost::asio::buffer(msg), broadcast_endpoint);
00282     }
00283 
00284     /* network service and socket */
00285     boost::asio::io_service io_service;
00286     boost::asio::ip::udp::endpoint listen_endpoint;
00287     boost::asio::ip::udp::socket listen_socket;
00288 
00289     /* threading */
00290     boost::thread *thread;
00291     boost::asio::io_service::work *work;
00292     boost::mutex mutex;
00293 
00294     /* buffer and endpoint for receiving messages */
00295     char receive_buffer[256];
00296     boost::asio::ip::udp::endpoint receive_endpoint;
00297 
00298     /* collection of server addresses in list */
00299     bool collect_servers;
00300     list<string> servers;
00301 };
00302 
00303 CCL_NAMESPACE_END
00304 
00305 #endif
00306 
00307 #endif /* __DEVICE_NETWORK_H__ */
00308