OSDN Git Service

サーバーの受信量の問題を再度修正
[mmo/main.git] / common / network / Session.cpp
1 //\r
2 // Session.cpp\r
3 //\r
4 \r
5 #include "Command.hpp"\r
6 #include "CommandHeader.hpp"\r
7 #include "Session.hpp"\r
8 #include "Utils.hpp"\r
9 #include "../Logger.hpp"\r
10 #include <boost/make_shared.hpp>\r
11 #include <string>\r
12 \r
13 namespace network {\r
14 \r
15     Session::Session(boost::asio::io_service& io_service_tcp) :\r
16       io_service_tcp_(io_service_tcp),\r
17       socket_tcp_(io_service_tcp),\r
18       encryption_(false),\r
19       online_(true),\r
20       login_(false),\r
21       read_start_time_(time(nullptr)),\r
22       write_start_time_(time(nullptr)),\r
23       read_byte_sum_(0),\r
24       write_byte_sum_(0),\r
25       serialized_byte_sum_(0),\r
26       compressed_byte_sum_(0),\r
27           write_average_limit_(999999),\r
28       id_(0)\r
29     {\r
30 \r
31     }\r
32 \r
33     Session::~Session()\r
34     {\r
35         Close();\r
36     }\r
37 \r
38     void Session::Close()\r
39     {\r
40         socket_tcp_.close();\r
41     }\r
42 \r
43     void Session::Send(const Command& command)\r
44     {\r
45         auto msg = Serialize(command);\r
46         write_byte_sum_ += msg.size();\r
47         UpdateWriteByteAverage();\r
48 \r
49                 Logger::Debug(_T("%d byte/s"), GetWriteByteAverage());\r
50 \r
51         io_service_tcp_.post(boost::bind(&Session::DoWriteTCP, this, msg, shared_from_this()));\r
52     }\r
53 \r
54     void Session::SyncSend(const Command& command)\r
55     {\r
56         auto msg = Serialize(command);\r
57         write_byte_sum_ += msg.size();\r
58         UpdateWriteByteAverage();\r
59 \r
60         try {\r
61             boost::asio::write(\r
62                     socket_tcp_, boost::asio::buffer(msg.data(), msg.size()),\r
63                 boost::asio::transfer_all());\r
64         } catch (std::exception& e) {\r
65             std::cout << e.what() << std::endl;\r
66         }\r
67     }\r
68 \r
69     double Session::GetReadByteAverage() const\r
70     {\r
71         return 1.0f * read_byte_sum_ / (time(nullptr) - read_start_time_);\r
72     }\r
73 \r
74     double Session::GetWriteByteAverage() const\r
75     {\r
76         return 1.0f * write_byte_sum_ / (time(nullptr) - write_start_time_);\r
77     }\r
78 \r
79     void Session::UpdateReadByteAverage()\r
80     {\r
81         unsigned long elapsed_time = time(nullptr) - read_start_time_;\r
82         if (elapsed_time >= BYTE_AVERAGE_REFRESH_SECONDS) {\r
83             read_byte_sum_ /= 2;\r
84             read_start_time_ = time(nullptr) - elapsed_time / 2;\r
85         }\r
86     }\r
87 \r
88     void Session::UpdateWriteByteAverage()\r
89     {\r
90         unsigned long elapsed_time = time(nullptr) - write_start_time_;\r
91         if (elapsed_time >= BYTE_AVERAGE_REFRESH_SECONDS) {\r
92             write_byte_sum_ /= 2;\r
93             write_start_time_ = time(nullptr) - elapsed_time / 2;\r
94         }\r
95     }\r
96 \r
97         void Session::ResetReadByteAverage()\r
98         {\r
99                 read_byte_sum_ = 0;\r
100         }\r
101 \r
102         void Session::ResetWriteByteAverage()\r
103         {\r
104                 write_byte_sum_ = 0;\r
105         }\r
106 \r
107     void Session::EnableEncryption()\r
108     {\r
109         encryption_ = true;\r
110     }\r
111 \r
112     Encrypter& Session::encrypter()\r
113     {\r
114         return encrypter_;\r
115     }\r
116 \r
117     tcp::socket& Session::tcp_socket()\r
118     {\r
119         return socket_tcp_;\r
120     }\r
121 \r
122     UserID Session::id() const\r
123     {\r
124         return id_;\r
125     }\r
126 \r
127     void Session::set_id(UserID id)\r
128     {\r
129         id_ = id;\r
130     }\r
131 \r
132     bool Session::online() const\r
133     {\r
134         return online_;\r
135     }\r
136 \r
137     std::string Session::global_ip() const\r
138     {\r
139         return global_ip_;\r
140     }\r
141 \r
142     uint16_t Session::udp_port() const{\r
143         return udp_port_;\r
144     }\r
145 \r
146     void Session::set_global_ip(const std::string& global_ip)\r
147     {\r
148         global_ip_ = global_ip;\r
149     }\r
150 \r
151     void Session::set_udp_port(uint16_t udp_port)\r
152     {\r
153         udp_port_ = udp_port;\r
154     }\r
155 \r
156     int Session::serialized_byte_sum() const\r
157     {\r
158         return serialized_byte_sum_;\r
159     }\r
160 \r
161     int Session::compressed_byte_sum() const\r
162     {\r
163         return compressed_byte_sum_;\r
164     }\r
165 \r
166     bool Session::operator==(const Session& s)\r
167     {\r
168         return id_ == s.id_;\r
169     }\r
170 \r
171     bool Session::operator!=(const Session& s)\r
172     {\r
173         return !operator==(s);\r
174     }\r
175 \r
176         int Session::write_average_limit() const\r
177         {\r
178                 return write_average_limit_;\r
179         }\r
180 \r
181         void Session::set_write_average_limit(int limit)\r
182         {\r
183                 write_average_limit_ = limit;\r
184         }\r
185 \r
186     std::string Session::Serialize(const Command& command)\r
187     {\r
188         assert(command.header() < 0xFF);\r
189         auto header = static_cast<uint8_t>(command.header());\r
190         std::string body = command.body();\r
191 \r
192         std::string msg = Utils::Serialize(header) + body;\r
193 \r
194         // 圧縮\r
195         if (body.size() >= COMPRESS_MIN_LENGTH) {\r
196             auto compressed = Utils::LZ4Compress(msg);\r
197             if (msg.size() > compressed.size() + sizeof(uint8_t)) {\r
198                 assert(msg.size() < 65535);\r
199                 msg = Utils::Serialize(static_cast<uint8_t>(header::LZ4_COMPRESS_HEADER),\r
200                     static_cast<uint16_t>(msg.size()))\r
201                     + compressed;\r
202             }\r
203         }\r
204 \r
205         // 暗号化\r
206         if (encryption_) {\r
207             msg = Utils::Serialize(static_cast<uint8_t>(header::ENCRYPT_HEADER))\r
208                 + encrypter_.Encrypt(msg);\r
209         }\r
210 \r
211         return Utils::Encode(msg);\r
212     }\r
213 \r
214     Command Session::Deserialize(const std::string& msg)\r
215     {\r
216         std::string decoded_msg = Utils::Decode(msg);\r
217 \r
218         uint8_t header;\r
219         Utils::Deserialize(decoded_msg, &header);\r
220 \r
221         // 復号\r
222         if (header == header::ENCRYPT_HEADER) {\r
223             decoded_msg.erase(0, sizeof(header));\r
224             decoded_msg = encrypter_.Decrypt(decoded_msg);\r
225             Utils::Deserialize(decoded_msg, &header);\r
226         }\r
227 \r
228         // 伸長\r
229         if (header == header::LZ4_COMPRESS_HEADER) {\r
230             uint16_t original_size;\r
231             Utils::Deserialize(decoded_msg, &header, &original_size);\r
232             decoded_msg.erase(0, sizeof(header) + sizeof(original_size));\r
233             decoded_msg = Utils::LZ4Uncompress(decoded_msg, original_size);\r
234             Utils::Deserialize(decoded_msg, &header);\r
235         }\r
236 \r
237         std::string body = decoded_msg.substr(sizeof(header));\r
238 \r
239         return Command(static_cast<header::CommandHeader>(header), body, shared_from_this());\r
240     }\r
241 \r
242     void Session::ReceiveTCP(const boost::system::error_code& error)\r
243     {\r
244         if (!error) {\r
245             std::string buffer(boost::asio::buffer_cast<const char*>(receive_buf_.data()),receive_buf_.size());\r
246             auto length = buffer.find_last_of(NETWORK_UTILS_DELIMITOR);\r
247 \r
248             if (length != std::string::npos) {\r
249 \r
250                 receive_buf_.consume(length+1);\r
251                 buffer.erase(length+1);\r
252 \r
253                 while (!buffer.empty()) {\r
254                     std::string msg;\r
255 \r
256                     while (!buffer.empty() && buffer[0]!=NETWORK_UTILS_DELIMITOR)\r
257                     {\r
258                         msg += buffer[0];\r
259                         buffer.erase(0,1);\r
260                     }\r
261                     buffer.erase(0,1);\r
262 \r
263                     read_byte_sum_ += msg.size();\r
264                     UpdateReadByteAverage();\r
265 \r
266                     FetchTCP(msg);\r
267                 }\r
268 \r
269                 boost::asio::async_read_until(socket_tcp_,\r
270                     receive_buf_, NETWORK_UTILS_DELIMITOR,\r
271                     boost::bind(\r
272                       &Session::ReceiveTCP, shared_from_this(),\r
273                       boost::asio::placeholders::error));\r
274 \r
275             }\r
276 \r
277         } else {\r
278             FatalError();\r
279         }\r
280     }\r
281 \r
282     void Session::DoWriteTCP(const std::string msg, SessionPtr session_holder)\r
283     {\r
284         bool write_in_progress = !send_queue_.empty();\r
285         send_queue_.push(msg);\r
286         if (!write_in_progress && !send_queue_.empty())\r
287         {\r
288            \r
289           auto s = boost::make_shared<std::string>(msg.data(), msg.size());\r
290 \r
291           boost::asio::async_write(socket_tcp_,\r
292               boost::asio::buffer(s->data(), s->size()),\r
293               boost::bind(&Session::WriteTCP, this,\r
294                 boost::asio::placeholders::error, s, session_holder));\r
295         }\r
296     }\r
297 \r
298     void Session::WriteTCP(const boost::system::error_code& error,\r
299                 boost::shared_ptr<std::string> holder, SessionPtr session_holder)\r
300     {\r
301         if (!error) {\r
302             if (!send_queue_.empty()) {\r
303                   send_queue_.pop();\r
304                   if (!send_queue_.empty())\r
305                   {\r
306 \r
307                     boost::shared_ptr<std::string> s = \r
308                         boost::make_shared<std::string>(send_queue_.front().data(), send_queue_.front().size());\r
309 \r
310                     boost::asio::async_write(socket_tcp_,\r
311                         boost::asio::buffer(s->data(), s->size()),\r
312                         boost::bind(&Session::WriteTCP, this,\r
313                           boost::asio::placeholders::error, s, session_holder));\r
314                   }\r
315             }\r
316         } else {\r
317             FatalError(session_holder);\r
318         }\r
319     }\r
320 \r
321     void Session::FetchTCP(const std::string& msg)\r
322     {\r
323         if (msg.size() >= sizeof(uint8_t)) {\r
324             if (on_receive_) {\r
325                 (*on_receive_)(Deserialize(msg));\r
326             }\r
327         } else {\r
328             Logger::Error(_T("Too short data"));\r
329         }\r
330     }\r
331 \r
332     void Session::FatalError(SessionPtr session_holder)\r
333     {\r
334         if (online_) {\r
335             online_ = false;\r
336             if (on_receive_) {\r
337                 if (id_ > 0) {\r
338                     (*on_receive_)(FatalConnectionError(id_));\r
339                 } else {\r
340                     (*on_receive_)(FatalConnectionError());\r
341                 }\r
342             }\r
343         }\r
344     }\r
345 \r
346     void Session::set_on_receive(CallbackFuncPtr func)\r
347     {\r
348         on_receive_ = func;\r
349     }\r
350 \r
351 \r
352 }\r