6 #include "CommandHeader.hpp"
9 #include "../Logger.hpp"
10 #include <boost/make_shared.hpp>
15 Session::Session(boost::asio::io_service& io_service_tcp) :
16 io_service_tcp_(io_service_tcp),
17 socket_tcp_(io_service_tcp),
21 read_start_time_(time(nullptr)),
22 write_start_time_(time(nullptr)),
25 serialized_byte_sum_(0),
26 compressed_byte_sum_(0),
42 void Session::Send(const Command& command)
44 auto msg = Serialize(command);
45 write_byte_sum_ += msg.size();
46 UpdateWriteByteAverage();
48 io_service_tcp_.post(boost::bind(&Session::DoWriteTCP, this, msg, shared_from_this()));
51 void Session::SyncSend(const Command& command)
53 auto msg = Serialize(command);
54 write_byte_sum_ += msg.size();
55 UpdateWriteByteAverage();
59 socket_tcp_, boost::asio::buffer(msg.data(), msg.size()),
60 boost::asio::transfer_all());
61 } catch (std::exception& e) {
62 std::cout << e.what() << std::endl;
66 double Session::GetReadByteAverage() const
68 return 1.0f * read_byte_sum_ / (time(nullptr) - read_start_time_);
71 double Session::GetWriteByteAverage() const
73 return 1.0f * write_byte_sum_ / (time(nullptr) - write_start_time_);
76 void Session::UpdateReadByteAverage()
78 unsigned long elapsed_time = time(nullptr) - read_start_time_;
79 if (elapsed_time >= BYTE_AVERAGE_REFRESH_SECONDS) {
81 read_start_time_ = time(nullptr) - elapsed_time / 2;
85 void Session::UpdateWriteByteAverage()
87 unsigned long elapsed_time = time(nullptr) - write_start_time_;
88 if (elapsed_time >= BYTE_AVERAGE_REFRESH_SECONDS) {
90 write_start_time_ = time(nullptr) - elapsed_time / 2;
94 void Session::EnableEncryption()
99 Encrypter& Session::encrypter()
104 tcp::socket& Session::tcp_socket()
109 UserID Session::id() const
114 void Session::set_id(UserID id)
119 bool Session::online() const
124 std::string Session::global_ip() const
129 uint16_t Session::udp_port() const{
133 void Session::set_global_ip(const std::string& global_ip)
135 global_ip_ = global_ip;
138 void Session::set_udp_port(uint16_t udp_port)
140 udp_port_ = udp_port;
143 int Session::serialized_byte_sum() const
145 return serialized_byte_sum_;
148 int Session::compressed_byte_sum() const
150 return compressed_byte_sum_;
153 bool Session::operator==(const Session& s)
158 bool Session::operator!=(const Session& s)
160 return !operator==(s);
163 std::string Session::Serialize(const Command& command)
165 assert(command.header() < 0xFF);
166 auto header = static_cast<uint8_t>(command.header());
\r
167 std::string body = command.body();
169 std::string msg = Utils::Serialize(header) + body;
172 if (body.size() >= COMPRESS_MIN_LENGTH) {
173 auto compressed = Utils::LZ4Compress(msg);
174 if (msg.size() > compressed.size() + sizeof(uint8_t)) {
\r
175 assert(msg.size() < 65535);
176 msg = Utils::Serialize(static_cast<uint8_t>(header::LZ4_COMPRESS_HEADER),
\r
177 static_cast<uint16_t>(msg.size()))
\r
184 msg = Utils::Serialize(static_cast<uint8_t>(header::ENCRYPT_HEADER))
\r
185 + encrypter_.Encrypt(msg);
188 return Utils::Encode(msg);
191 Command Session::Deserialize(const std::string& msg)
193 std::string decoded_msg = Utils::Decode(msg);
196 Utils::Deserialize(decoded_msg, &header);
199 if (header == header::ENCRYPT_HEADER) {
200 decoded_msg.erase(0, sizeof(header));
201 decoded_msg = encrypter_.Decrypt(decoded_msg);
202 Utils::Deserialize(decoded_msg, &header);
206 if (header == header::LZ4_COMPRESS_HEADER) {
207 uint16_t original_size;
\r
208 Utils::Deserialize(decoded_msg, &header, &original_size);
209 decoded_msg.erase(0, sizeof(header) + sizeof(original_size));
210 decoded_msg = Utils::LZ4Uncompress(decoded_msg, original_size);
211 Utils::Deserialize(decoded_msg, &header);
214 std::string body = decoded_msg.substr(sizeof(header));
216 return Command(static_cast<header::CommandHeader>(header), body, shared_from_this());
219 void Session::ReceiveTCP(const boost::system::error_code& error)
222 std::string buffer(boost::asio::buffer_cast<const char*>(receive_buf_.data()),receive_buf_.size());
223 auto length = buffer.find_last_of(NETWORK_UTILS_DELIMITOR);
225 if (length != std::string::npos) {
227 receive_buf_.consume(length+1);
228 buffer.erase(length+1);
230 while (!buffer.empty()) {
233 while (!buffer.empty() && buffer[0]!=NETWORK_UTILS_DELIMITOR)
240 read_byte_sum_ += msg.size();
241 UpdateReadByteAverage();
246 boost::asio::async_read_until(socket_tcp_,
247 receive_buf_, NETWORK_UTILS_DELIMITOR,
249 &Session::ReceiveTCP, shared_from_this(),
250 boost::asio::placeholders::error));
259 void Session::DoWriteTCP(const std::string msg, SessionPtr session_holder)
261 bool write_in_progress = !send_queue_.empty();
262 send_queue_.push(msg);
263 if (!write_in_progress && !send_queue_.empty())
266 boost::shared_ptr<std::string> s =
267 boost::make_shared<std::string>(msg.data(), msg.size());
269 boost::asio::async_write(socket_tcp_,
270 boost::asio::buffer(s->data(), s->size()),
271 boost::bind(&Session::WriteTCP, this,
272 boost::asio::placeholders::error, s, session_holder));
276 void Session::WriteTCP(const boost::system::error_code& error,
277 boost::shared_ptr<std::string> holder, SessionPtr session_holder)
280 if (!send_queue_.empty()) {
282 if (!send_queue_.empty())
285 boost::shared_ptr<std::string> s =
286 boost::make_shared<std::string>(send_queue_.front().data(), send_queue_.front().size());
288 boost::asio::async_write(socket_tcp_,
289 boost::asio::buffer(s->data(), s->size()),
290 boost::bind(&Session::WriteTCP, this,
291 boost::asio::placeholders::error, s, session_holder));
295 FatalError(session_holder);
299 void Session::FetchTCP(const std::string& msg)
301 if (msg.size() >= sizeof(uint8_t)) {
\r
303 (*on_receive_)(Deserialize(msg));
306 Logger::Error(_T("Too short data"));
310 void Session::FatalError(SessionPtr session_holder)
316 (*on_receive_)(FatalConnectionError(id_));
318 (*on_receive_)(FatalConnectionError());
324 void Session::set_on_receive(CallbackFuncPtr func)