Blender V2.61 - r43446
|
00001 /* 00002 * Copyright 2011, Blender Foundation. 00003 * 00004 * This program is free software; you can redistribute it and/or 00005 * modify it under the terms of the GNU General Public License 00006 * as published by the Free Software Foundation; either version 2 00007 * of the License, or (at your option) any later version. 00008 * 00009 * This program is distributed in the hope that it will be useful, 00010 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00012 * GNU General Public License for more details. 00013 * 00014 * You should have received a copy of the GNU General Public License 00015 * along with this program; if not, write to the Free Software Foundation, 00016 * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 00017 */ 00018 00019 #ifndef __DEVICE_NETWORK_H__ 00020 #define __DEVICE_NETWORK_H__ 00021 00022 #ifdef WITH_NETWORK 00023 00024 #include <boost/archive/text_iarchive.hpp> 00025 #include <boost/archive/text_oarchive.hpp> 00026 #include <boost/array.hpp> 00027 #include <boost/asio.hpp> 00028 #include <boost/bind.hpp> 00029 #include <boost/serialization/vector.hpp> 00030 #include <boost/thread.hpp> 00031 00032 #include <iostream> 00033 00034 #include "util_foreach.h" 00035 #include "util_list.h" 00036 #include "util_string.h" 00037 00038 CCL_NAMESPACE_BEGIN 00039 00040 using std::cout; 00041 using std::cerr; 00042 using std::endl; 00043 using std::hex; 00044 using std::setw; 00045 using std::exception; 00046 00047 using boost::asio::ip::tcp; 00048 00049 static const int SERVER_PORT = 5120; 00050 static const int DISCOVER_PORT = 5121; 00051 static const string DISCOVER_REQUEST_MSG = "REQUEST_RENDER_SERVER_IP"; 00052 static const string DISCOVER_REPLY_MSG = "REPLY_RENDER_SERVER_IP"; 00053 00054 typedef struct RPCSend { 00055 RPCSend(tcp::socket& socket_, const string& name_ = "") 00056 : name(name_), socket(socket_), archive(archive_stream) 00057 { 00058 archive & name_; 00059 } 00060 00061 void write() 00062 { 00063 boost::system::error_code error; 00064 00065 /* get string from stream */ 00066 string archive_str = archive_stream.str(); 00067 00068 /* first send fixed size header with size of following data */ 00069 ostringstream header_stream; 00070 header_stream << setw(8) << hex << archive_str.size(); 00071 string header_str = header_stream.str(); 00072 00073 boost::asio::write(socket, 00074 boost::asio::buffer(header_str), 00075 boost::asio::transfer_all(), error); 00076 00077 if(error.value()) 00078 cout << "Network send error: " << error.message() << "\n"; 00079 00080 /* then send actual data */ 00081 boost::asio::write(socket, 00082 boost::asio::buffer(archive_str), 00083 boost::asio::transfer_all(), error); 00084 00085 if(error.value()) 00086 cout << "Network send error: " << error.message() << "\n"; 00087 } 00088 00089 void write_buffer(void *buffer, size_t size) 00090 { 00091 boost::system::error_code error; 00092 00093 boost::asio::write(socket, 00094 boost::asio::buffer(buffer, size), 00095 boost::asio::transfer_all(), error); 00096 00097 if(error.value()) 00098 cout << "Network send error: " << error.message() << "\n"; 00099 } 00100 00101 string name; 00102 tcp::socket& socket; 00103 ostringstream archive_stream; 00104 boost::archive::text_oarchive archive; 00105 } RPCSend; 00106 00107 typedef struct RPCReceive { 00108 RPCReceive(tcp::socket& socket_) 00109 : socket(socket_), archive_stream(NULL), archive(NULL) 00110 { 00111 /* read head with fixed size */ 00112 vector<char> header(8); 00113 size_t len = boost::asio::read(socket, boost::asio::buffer(header)); 00114 00115 /* verify if we got something */ 00116 if(len == header.size()) { 00117 /* decode header */ 00118 string header_str(&header[0], header.size()); 00119 istringstream header_stream(header_str); 00120 00121 size_t data_size; 00122 00123 if((header_stream >> hex >> data_size)) { 00124 vector<char> data(data_size); 00125 size_t len = boost::asio::read(socket, boost::asio::buffer(data)); 00126 00127 if(len == data_size) { 00128 archive_str = (data.size())? string(&data[0], data.size()): string(""); 00129 /*istringstream archive_stream(archive_str); 00130 boost::archive::text_iarchive archive(archive_stream);*/ 00131 archive_stream = new istringstream(archive_str); 00132 archive = new boost::archive::text_iarchive(*archive_stream); 00133 00134 *archive & name; 00135 } 00136 else 00137 cout << "Network receive error: data size doens't match header\n"; 00138 } 00139 else 00140 cout << "Network receive error: can't decode data size from header\n"; 00141 } 00142 else 00143 cout << "Network receive error: invalid header size\n"; 00144 } 00145 00146 ~RPCReceive() 00147 { 00148 delete archive; 00149 delete archive_stream; 00150 } 00151 00152 void read_buffer(void *buffer, size_t size) 00153 { 00154 size_t len = boost::asio::read(socket, boost::asio::buffer(buffer, size)); 00155 00156 if(len != size) 00157 cout << "Network receive error: buffer size doesn't match expected size\n"; 00158 } 00159 00160 string name; 00161 tcp::socket& socket; 00162 string archive_str; 00163 istringstream *archive_stream; 00164 boost::archive::text_iarchive *archive; 00165 } RPCReceive; 00166 00167 class ServerDiscovery { 00168 public: 00169 ServerDiscovery(bool discover = false) 00170 : listen_socket(io_service), collect_servers(false) 00171 { 00172 /* setup listen socket */ 00173 listen_endpoint.address(boost::asio::ip::address_v4::any()); 00174 listen_endpoint.port(DISCOVER_PORT); 00175 00176 listen_socket.open(listen_endpoint.protocol()); 00177 00178 boost::asio::socket_base::reuse_address option(true); 00179 listen_socket.set_option(option); 00180 00181 listen_socket.bind(listen_endpoint); 00182 00183 /* setup receive callback */ 00184 async_receive(); 00185 00186 /* start server discovery */ 00187 if(discover) { 00188 collect_servers = true; 00189 servers.clear(); 00190 00191 broadcast_message(DISCOVER_REQUEST_MSG); 00192 } 00193 00194 /* start thread */ 00195 work = new boost::asio::io_service::work(io_service); 00196 thread = new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service)); 00197 } 00198 00199 ~ServerDiscovery() 00200 { 00201 io_service.stop(); 00202 thread->join(); 00203 delete thread; 00204 delete work; 00205 } 00206 00207 list<string> get_server_list() 00208 { 00209 list<string> result; 00210 00211 mutex.lock(); 00212 result = servers; 00213 mutex.unlock(); 00214 00215 return result; 00216 } 00217 00218 private: 00219 void handle_receive_from(const boost::system::error_code& error, size_t size) 00220 { 00221 if(error) { 00222 cout << "Server discovery receive error: " << error.message() << "\n"; 00223 return; 00224 } 00225 00226 if(size > 0) { 00227 string msg = string(receive_buffer, size); 00228 00229 /* handle incoming message */ 00230 if(collect_servers) { 00231 if(msg == DISCOVER_REPLY_MSG) { 00232 string address = receive_endpoint.address().to_string(); 00233 00234 mutex.lock(); 00235 00236 /* add address if it's not already in the list */ 00237 bool found = false; 00238 00239 foreach(string& server, servers) 00240 if(server == address) 00241 found = true; 00242 00243 if(!found) 00244 servers.push_back(address); 00245 00246 mutex.unlock(); 00247 } 00248 } 00249 else { 00250 /* reply to request */ 00251 if(msg == DISCOVER_REQUEST_MSG) 00252 broadcast_message(DISCOVER_REPLY_MSG); 00253 } 00254 } 00255 00256 async_receive(); 00257 } 00258 00259 void async_receive() 00260 { 00261 listen_socket.async_receive_from( 00262 boost::asio::buffer(receive_buffer), receive_endpoint, 00263 boost::bind(&ServerDiscovery::handle_receive_from, this, 00264 boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 00265 } 00266 00267 void broadcast_message(const string& msg) 00268 { 00269 /* setup broadcast socket */ 00270 boost::asio::ip::udp::socket socket(io_service); 00271 00272 socket.open(boost::asio::ip::udp::v4()); 00273 00274 boost::asio::socket_base::broadcast option(true); 00275 socket.set_option(option); 00276 00277 boost::asio::ip::udp::endpoint broadcast_endpoint( 00278 boost::asio::ip::address::from_string("255.255.255.255"), DISCOVER_PORT); 00279 00280 /* broadcast message */ 00281 socket.send_to(boost::asio::buffer(msg), broadcast_endpoint); 00282 } 00283 00284 /* network service and socket */ 00285 boost::asio::io_service io_service; 00286 boost::asio::ip::udp::endpoint listen_endpoint; 00287 boost::asio::ip::udp::socket listen_socket; 00288 00289 /* threading */ 00290 boost::thread *thread; 00291 boost::asio::io_service::work *work; 00292 boost::mutex mutex; 00293 00294 /* buffer and endpoint for receiving messages */ 00295 char receive_buffer[256]; 00296 boost::asio::ip::udp::endpoint receive_endpoint; 00297 00298 /* collection of server addresses in list */ 00299 bool collect_servers; 00300 list<string> servers; 00301 }; 00302 00303 CCL_NAMESPACE_END 00304 00305 #endif 00306 00307 #endif /* __DEVICE_NETWORK_H__ */ 00308