5 #include "Command.hpp"
\r
6 #include "CommandHeader.hpp"
\r
7 #include "Session.hpp"
\r
9 #include "../Logger.hpp"
\r
10 #include <boost/make_shared.hpp>
\r
15 Session::Session(boost::asio::io_service& io_service_tcp) :
\r
16 io_service_tcp_(io_service_tcp),
\r
17 socket_tcp_(io_service_tcp),
\r
21 read_start_time_(time(nullptr)),
\r
22 write_start_time_(time(nullptr)),
\r
25 serialized_byte_sum_(0),
\r
26 compressed_byte_sum_(0),
\r
27 write_average_limit_(999999),
\r
38 void Session::Close()
\r
40 socket_tcp_.close();
\r
43 void Session::Send(const Command& command)
\r
45 auto msg = Serialize(command);
\r
46 write_byte_sum_ += msg.size();
\r
47 UpdateWriteByteAverage();
\r
49 Logger::Debug(_T("%d byte/s"), GetWriteByteAverage());
\r
51 io_service_tcp_.post(boost::bind(&Session::DoWriteTCP, this, msg, shared_from_this()));
\r
54 void Session::SyncSend(const Command& command)
\r
56 auto msg = Serialize(command);
\r
57 write_byte_sum_ += msg.size();
\r
58 UpdateWriteByteAverage();
\r
62 socket_tcp_, boost::asio::buffer(msg.data(), msg.size()),
\r
63 boost::asio::transfer_all());
\r
64 } catch (std::exception& e) {
\r
65 std::cout << e.what() << std::endl;
\r
69 double Session::GetReadByteAverage() const
\r
71 return 1.0f * read_byte_sum_ / (time(nullptr) - read_start_time_);
\r
74 double Session::GetWriteByteAverage() const
\r
76 return 1.0f * write_byte_sum_ / (time(nullptr) - write_start_time_);
\r
79 void Session::UpdateReadByteAverage()
\r
81 unsigned long elapsed_time = time(nullptr) - read_start_time_;
\r
82 if (elapsed_time >= BYTE_AVERAGE_REFRESH_SECONDS) {
\r
83 read_byte_sum_ /= 2;
\r
84 read_start_time_ = time(nullptr) - elapsed_time / 2;
\r
88 void Session::UpdateWriteByteAverage()
\r
90 unsigned long elapsed_time = time(nullptr) - write_start_time_;
\r
91 if (elapsed_time >= BYTE_AVERAGE_REFRESH_SECONDS) {
\r
92 write_byte_sum_ /= 2;
\r
93 write_start_time_ = time(nullptr) - elapsed_time / 2;
\r
97 void Session::ResetReadByteAverage()
\r
102 void Session::ResetWriteByteAverage()
\r
104 write_byte_sum_ = 0;
\r
107 void Session::EnableEncryption()
\r
109 encryption_ = true;
\r
112 Encrypter& Session::encrypter()
\r
117 tcp::socket& Session::tcp_socket()
\r
119 return socket_tcp_;
\r
122 UserID Session::id() const
\r
127 void Session::set_id(UserID id)
\r
132 bool Session::online() const
\r
137 std::string Session::global_ip() const
\r
142 uint16_t Session::udp_port() const{
\r
146 void Session::set_global_ip(const std::string& global_ip)
\r
148 global_ip_ = global_ip;
\r
151 void Session::set_udp_port(uint16_t udp_port)
\r
153 udp_port_ = udp_port;
\r
156 int Session::serialized_byte_sum() const
\r
158 return serialized_byte_sum_;
\r
161 int Session::compressed_byte_sum() const
\r
163 return compressed_byte_sum_;
\r
166 bool Session::operator==(const Session& s)
\r
168 return id_ == s.id_;
\r
171 bool Session::operator!=(const Session& s)
\r
173 return !operator==(s);
\r
176 int Session::write_average_limit() const
\r
178 return write_average_limit_;
\r
181 void Session::set_write_average_limit(int limit)
\r
183 write_average_limit_ = limit;
\r
186 std::string Session::Serialize(const Command& command)
\r
188 assert(command.header() < 0xFF);
\r
189 auto header = static_cast<uint8_t>(command.header());
\r
190 std::string body = command.body();
\r
192 std::string msg = Utils::Serialize(header) + body;
\r
195 if (body.size() >= COMPRESS_MIN_LENGTH) {
\r
196 auto compressed = Utils::LZ4Compress(msg);
\r
197 if (msg.size() > compressed.size() + sizeof(uint8_t)) {
\r
198 assert(msg.size() < 65535);
\r
199 msg = Utils::Serialize(static_cast<uint8_t>(header::LZ4_COMPRESS_HEADER),
\r
200 static_cast<uint16_t>(msg.size()))
\r
207 msg = Utils::Serialize(static_cast<uint8_t>(header::ENCRYPT_HEADER))
\r
208 + encrypter_.Encrypt(msg);
\r
211 return Utils::Encode(msg);
\r
214 Command Session::Deserialize(const std::string& msg)
\r
216 std::string decoded_msg = Utils::Decode(msg);
\r
219 Utils::Deserialize(decoded_msg, &header);
\r
222 if (header == header::ENCRYPT_HEADER) {
\r
223 decoded_msg.erase(0, sizeof(header));
\r
224 decoded_msg = encrypter_.Decrypt(decoded_msg);
\r
225 Utils::Deserialize(decoded_msg, &header);
\r
229 if (header == header::LZ4_COMPRESS_HEADER) {
\r
230 uint16_t original_size;
\r
231 Utils::Deserialize(decoded_msg, &header, &original_size);
\r
232 decoded_msg.erase(0, sizeof(header) + sizeof(original_size));
\r
233 decoded_msg = Utils::LZ4Uncompress(decoded_msg, original_size);
\r
234 Utils::Deserialize(decoded_msg, &header);
\r
237 std::string body = decoded_msg.substr(sizeof(header));
\r
239 return Command(static_cast<header::CommandHeader>(header), body, shared_from_this());
\r
242 void Session::ReceiveTCP(const boost::system::error_code& error)
\r
245 std::string buffer(boost::asio::buffer_cast<const char*>(receive_buf_.data()),receive_buf_.size());
\r
246 auto length = buffer.find_last_of(NETWORK_UTILS_DELIMITOR);
\r
248 if (length != std::string::npos) {
\r
250 receive_buf_.consume(length+1);
\r
251 buffer.erase(length+1);
\r
253 while (!buffer.empty()) {
\r
256 while (!buffer.empty() && buffer[0]!=NETWORK_UTILS_DELIMITOR)
\r
263 read_byte_sum_ += msg.size();
\r
264 UpdateReadByteAverage();
\r
269 boost::asio::async_read_until(socket_tcp_,
\r
270 receive_buf_, NETWORK_UTILS_DELIMITOR,
\r
272 &Session::ReceiveTCP, shared_from_this(),
\r
273 boost::asio::placeholders::error));
\r
282 void Session::DoWriteTCP(const std::string msg, SessionPtr session_holder)
\r
284 bool write_in_progress = !send_queue_.empty();
\r
285 send_queue_.push(msg);
\r
286 if (!write_in_progress && !send_queue_.empty())
\r
289 auto s = boost::make_shared<std::string>(msg.data(), msg.size());
\r
291 boost::asio::async_write(socket_tcp_,
\r
292 boost::asio::buffer(s->data(), s->size()),
\r
293 boost::bind(&Session::WriteTCP, this,
\r
294 boost::asio::placeholders::error, s, session_holder));
\r
298 void Session::WriteTCP(const boost::system::error_code& error,
\r
299 boost::shared_ptr<std::string> holder, SessionPtr session_holder)
\r
302 if (!send_queue_.empty()) {
\r
304 if (!send_queue_.empty())
\r
307 boost::shared_ptr<std::string> s =
\r
308 boost::make_shared<std::string>(send_queue_.front().data(), send_queue_.front().size());
\r
310 boost::asio::async_write(socket_tcp_,
\r
311 boost::asio::buffer(s->data(), s->size()),
\r
312 boost::bind(&Session::WriteTCP, this,
\r
313 boost::asio::placeholders::error, s, session_holder));
\r
317 FatalError(session_holder);
\r
321 void Session::FetchTCP(const std::string& msg)
\r
323 if (msg.size() >= sizeof(uint8_t)) {
\r
325 (*on_receive_)(Deserialize(msg));
\r
328 Logger::Error(_T("Too short data"));
\r
332 void Session::FatalError(SessionPtr session_holder)
\r
338 (*on_receive_)(FatalConnectionError(id_));
\r
340 (*on_receive_)(FatalConnectionError());
\r
346 void Session::set_on_receive(CallbackFuncPtr func)
\r
348 on_receive_ = func;
\r