add_down_thread_vs_message_func.second = boost::bind(&tcp_session::down_thread_exit,this,_1);
virtual_service_message_down_thread_function_map.insert(add_down_thread_vs_message_func);
- // epoll impliment
- up_client_epollfd = epoll_create( EVENT_NUM );
- up_realserver_epollfd = epoll_create( EVENT_NUM );
- down_client_epollfd = epoll_create( EVENT_NUM );
- down_realserver_epollfd = epoll_create( EVENT_NUM );
}
//! destructor
tcp_session::~tcp_session(){
else
break;
}
- close( up_client_epollfd );
- close( up_realserver_epollfd );
- close( down_client_epollfd );
- close( down_realserver_epollfd );
}
//! initialize
session_result_message tcp_session::initialize(){
boost::array<char,MAX_BUFFER_SIZE>& data_buff = up_thread_data_client_side.get_data();
boost::system::error_code ec;
std::size_t recv_size;
- UP_THREAD_FUNC_TYPE_TAG func_tag;
-
- struct epoll_event event;
- event.events = EPOLLET | EPOLLIN | EPOLLHUP;
- if ( !ssl_flag ) {
- epoll_ctl( up_client_epollfd, EPOLL_CTL_ADD, client_socket.get_socket().native(), &event );
+ if (!ssl_flag) {
+ recv_size = client_socket.read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE), ec);
} else {
- epoll_ctl( up_client_epollfd, EPOLL_CTL_ADD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
+ recv_size = client_ssl_socket.read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE), ec);
}
-
- while( true ){
- {
- rd_scoped_lock scoped_lock(exit_flag_update_mutex);
- if(unlikely(exit_flag)){
- func_tag = UP_FUNC_EXIT;
- break;
- }
- }
-
- if (!ssl_flag) {
- recv_size = client_socket.read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE), ec);
- } else {
- recv_size = client_ssl_socket.read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE), ec);
- }
-
- if(!ec){
- if(recv_size > 0){
- //----Debug log----------------------------------------------------------------------
- if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
- boost::asio::ip::tcp::endpoint client_endpoint;
- if (!ssl_flag) {
- client_endpoint = client_socket.get_socket().lowest_layer().remote_endpoint(ec);
- } else {
- client_endpoint = client_ssl_socket.get_socket().lowest_layer().remote_endpoint(ec);
- }
- std::stringstream buf;
- buf << "Thread ID[";
- buf << boost::this_thread::get_id();
- buf << "] up_thread_client_receive";
- buf << " receive data size[";
- buf << recv_size;
- buf << "] from [";
- buf << client_endpoint;
- buf << "]";
- Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 36, buf.str(), __FILE__, __LINE__ );
- }
- //----Debug log----------------------------------------------------------------------
- up_thread_data_client_side.set_size(recv_size);
- parent_service.update_up_recv_size(recv_size);
- protocol_module_base::EVENT_TAG module_event = protocol_module->handle_client_recv(up_thread_id,data_buff,recv_size);
- std::map< protocol_module_base::EVENT_TAG ,UP_THREAD_FUNC_TYPE_TAG >::iterator func_type = up_thread_module_event_map.find(module_event);
- if(unlikely( func_type == up_thread_module_event_map.end() )){
- //Error unknown protocol_module_base::EVENT_TAG return
- std::stringstream buf;
- buf << "Thread ID[";
- buf << boost::this_thread::get_id();
- buf << "] protocol_module returnd illegal EVENT_TAG : ";
- buf << module_event;
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 20, buf.str(), __FILE__, __LINE__ );
- up_thread_exit(process_type);
- return;
+ UP_THREAD_FUNC_TYPE_TAG func_tag;
+ if(!ec){
+ if(recv_size > 0){
+ //----Debug log----------------------------------------------------------------------
+ if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
+ boost::asio::ip::tcp::endpoint client_endpoint;
+ if (!ssl_flag) {
+ client_endpoint = client_socket.get_socket().lowest_layer().remote_endpoint(ec);
+ } else {
+ client_endpoint = client_ssl_socket.get_socket().lowest_layer().remote_endpoint(ec);
}
- func_tag = func_type->second;
- break;
- }else{
- //func_tag = UP_FUNC_CLIENT_RECEIVE;
- //boost::this_thread::yield();
+ std::stringstream buf;
+ buf << "Thread ID[";
+ buf << boost::this_thread::get_id();
+ buf << "] up_thread_client_receive";
+ buf << " receive data size[";
+ buf << recv_size;
+ buf << "] from [";
+ buf << client_endpoint;
+ buf << "]";
+ Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 36, buf.str(), __FILE__, __LINE__ );
}
- }else{
- if(ec == boost::asio::error::try_again){
- //func_tag = UP_FUNC_CLIENT_RECEIVE;
- //boost::this_thread::yield();
- }else{
- func_tag = UP_FUNC_CLIENT_DISCONNECT;
- break;
+ //----Debug log----------------------------------------------------------------------
+ up_thread_data_client_side.set_size(recv_size);
+ parent_service.update_up_recv_size(recv_size);
+ protocol_module_base::EVENT_TAG module_event = protocol_module->handle_client_recv(up_thread_id,data_buff,recv_size);
+ std::map< protocol_module_base::EVENT_TAG ,UP_THREAD_FUNC_TYPE_TAG >::iterator func_type = up_thread_module_event_map.find(module_event);
+ if(unlikely( func_type == up_thread_module_event_map.end() )){
+ //Error unknown protocol_module_base::EVENT_TAG return
+ std::stringstream buf;
+ buf << "Thread ID[";
+ buf << boost::this_thread::get_id();
+ buf << "] protocol_module returnd illegal EVENT_TAG : ";
+ buf << module_event;
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 20, buf.str(), __FILE__, __LINE__ );
+ up_thread_exit(process_type);
+ return;
}
+ func_tag = func_type->second;
+ }else{
+ func_tag = UP_FUNC_CLIENT_RECEIVE;
+ boost::this_thread::yield();
}
-
- event.events = EPOLLET | EPOLLIN | EPOLLHUP;
- if ( !ssl_flag ) {
- epoll_ctl( up_client_epollfd, EPOLL_CTL_MOD, client_socket.get_socket().native(), &event );
- } else {
- epoll_ctl( up_client_epollfd, EPOLL_CTL_MOD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
+ }else{
+ if(ec == boost::asio::error::try_again){
+ func_tag = UP_FUNC_CLIENT_RECEIVE;
+ boost::this_thread::yield();
+ }else{
+ func_tag = UP_FUNC_CLIENT_DISCONNECT;
}
-
- epoll_wait( up_client_epollfd, up_client_events, EVENT_NUM, EPOLL_TIMEOUT );
}
-
- event.events = EPOLLET | EPOLLIN | EPOLLHUP;
- if ( !ssl_flag ) {
- epoll_ctl( up_client_epollfd, EPOLL_CTL_DEL, client_socket.get_socket().native(), &event );
- } else {
- epoll_ctl( up_client_epollfd, EPOLL_CTL_DEL, client_ssl_socket.get_socket().lowest_layer().native(), &event );
- }
-
up_thread_function_pair func = up_thread_function_array[func_tag];
if(unlikely( !func.second )){
//Error not find function map
boost::array<char,MAX_BUFFER_SIZE>& data_buff = up_thread_data_dest_side.get_data();
std::size_t data_size = up_thread_data_dest_side.get_size();
std::size_t send_data_size = up_thread_data_dest_side.get_send_size();
- std::size_t send_size;
+ std::size_t send_size = send_socket->second->write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
UP_THREAD_FUNC_TYPE_TAG func_tag;
-
- struct epoll_event event;
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
- epoll_ctl( up_realserver_epollfd, EPOLL_CTL_ADD, send_socket->second->get_socket().native(), &event );
-
- while( true ){
- {
- rd_scoped_lock scoped_lock(exit_flag_update_mutex);
- if(unlikely(exit_flag)){
- func_tag = UP_FUNC_EXIT;
- break;
- }
+ if( !ec ){
+ send_data_size += send_size;
+ up_thread_data_dest_side.set_send_size(send_data_size);
+ parent_service.update_up_send_size(send_size);
+ //----Debug log----------------------------------------------------------------------
+ if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
+ std::stringstream buf;
+ buf << "Thread ID[";
+ buf << boost::this_thread::get_id();
+ buf << "] up_thread_realserver_send";
+ buf << " send data size[";
+ buf << send_size;
+ buf << "] for [";
+ buf << server_endpoint;
+ buf << "]";
+ Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 37, buf.str(), __FILE__, __LINE__ );
}
-
- send_size = send_socket->second->write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
-
- if( !ec ){
- send_data_size += send_size;
- up_thread_data_dest_side.set_send_size(send_data_size);
- parent_service.update_up_send_size(send_size);
- //----Debug log----------------------------------------------------------------------
- if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
+ //----Debug log----------------------------------------------------------------------
+ if(data_size > send_data_size){
+ func_tag = UP_FUNC_REALSERVER_SEND;
+ }else{
+ protocol_module_base::EVENT_TAG module_event = protocol_module->handle_realserver_send(up_thread_id);
+ std::map< protocol_module_base::EVENT_TAG ,UP_THREAD_FUNC_TYPE_TAG >::iterator func_type = up_thread_module_event_map.find(module_event);
+ if( unlikely( func_type == up_thread_module_event_map.end() )){
+ //Error unknown protocol_module_base::EVENT_TAG return
std::stringstream buf;
buf << "Thread ID[";
buf << boost::this_thread::get_id();
- buf << "] up_thread_realserver_send";
- buf << " send data size[";
- buf << send_size;
- buf << "] for [";
- buf << server_endpoint;
- buf << "]";
- Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 37, buf.str(), __FILE__, __LINE__ );
- }
- //----Debug log----------------------------------------------------------------------
- if(data_size > send_data_size){
- func_tag = UP_FUNC_REALSERVER_SEND;
- }else{
- protocol_module_base::EVENT_TAG module_event = protocol_module->handle_realserver_send(up_thread_id);
- std::map< protocol_module_base::EVENT_TAG ,UP_THREAD_FUNC_TYPE_TAG >::iterator func_type = up_thread_module_event_map.find(module_event);
- if( unlikely( func_type == up_thread_module_event_map.end() )){
- //Error unknown protocol_module_base::EVENT_TAG return
- std::stringstream buf;
- buf << "Thread ID[";
- buf << boost::this_thread::get_id();
- buf << "] protocol_module returnd illegal EVENT_TAG : ";
- buf << module_event;
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 30, buf.str(), __FILE__, __LINE__ );
- up_thread_exit(process_type);
- return;
- }
- func_tag = func_type->second;
+ buf << "] protocol_module returnd illegal EVENT_TAG : ";
+ buf << module_event;
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 30, buf.str(), __FILE__, __LINE__ );
+ up_thread_exit(process_type);
+ return;
}
- break;
+ func_tag = func_type->second;
+ }
+ }else{
+ if(ec == boost::asio::error::try_again){
+ func_tag = UP_FUNC_REALSERVER_SEND;
}else{
- if(ec == boost::asio::error::try_again){
- //func_tag = UP_FUNC_REALSERVER_SEND;
- //boost::this_thread::yield();
- }else{
- func_tag = UP_FUNC_REALSERVER_DISCONNECT;
- break;
- }
+ func_tag = UP_FUNC_REALSERVER_DISCONNECT;
}
-
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
- epoll_ctl( up_realserver_epollfd, EPOLL_CTL_MOD, send_socket->second->get_socket().native(), &event );
-
- epoll_wait( up_realserver_epollfd, up_realserver_events, EVENT_NUM, EPOLL_TIMEOUT );
}
-
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
- epoll_ctl( up_realserver_epollfd, EPOLL_CTL_DEL, send_socket->second->get_socket().native(), &event );
-
up_thread_function_pair func = up_thread_function_array[func_tag];
if( unlikely( !func.second ) ){
//Error not find function map
boost::array<char,MAX_BUFFER_SIZE>& data_buff = up_thread_data_dest_side.get_data();
std::size_t data_size = up_thread_data_dest_side.get_size();
std::size_t send_data_size = up_thread_data_dest_side.get_send_size();
- std::size_t send_size;
+ std::size_t send_size = sorryserver_socket.second->write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
UP_THREAD_FUNC_TYPE_TAG func_tag;
-
- struct epoll_event event;
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
- epoll_ctl( up_realserver_epollfd, EPOLL_CTL_ADD, sorryserver_socket.second->get_socket().native(), &event );
-
- while( true ){
- {
- rd_scoped_lock scoped_lock(exit_flag_update_mutex);
- if(unlikely(exit_flag)){
- func_tag = UP_FUNC_EXIT;
- break;
- }
+ if(!ec){
+ send_data_size += send_size;
+ up_thread_data_dest_side.set_send_size(send_data_size);
+ //----Debug log----------------------------------------------------------------------
+ if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
+ std::stringstream buf;
+ buf << "Thread ID[";
+ buf << boost::this_thread::get_id();
+ buf << "] up_thread_sorryserver_send";
+ buf << " send data size[";
+ buf << send_size;
+ buf << "] for [";
+ buf << sorry_endpoint;
+ buf << "]";
+ Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 39, buf.str(), __FILE__, __LINE__ );
}
-
- send_size = sorryserver_socket.second->write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
-
- if(!ec){
- send_data_size += send_size;
- up_thread_data_dest_side.set_send_size(send_data_size);
- //----Debug log----------------------------------------------------------------------
- if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
+ //----Debug log----------------------------------------------------------------------
+ if(data_size > send_data_size){
+ func_tag = UP_FUNC_SORRYSERVER_SEND;
+ }else{
+ protocol_module_base::EVENT_TAG module_event = protocol_module->handle_sorryserver_send(up_thread_id);
+ std::map< protocol_module_base::EVENT_TAG ,UP_THREAD_FUNC_TYPE_TAG >::iterator func_type = up_thread_module_event_map.find(module_event);
+ if(unlikely( func_type == up_thread_module_event_map.end() )){
+ //Error unknown protocol_module_base::EVENT_TAG return
std::stringstream buf;
buf << "Thread ID[";
buf << boost::this_thread::get_id();
- buf << "] up_thread_sorryserver_send";
- buf << " send data size[";
- buf << send_size;
- buf << "] for [";
- buf << sorry_endpoint;
- buf << "]";
- Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 39, buf.str(), __FILE__, __LINE__ );
- }
- //----Debug log----------------------------------------------------------------------
- if(data_size > send_data_size){
- func_tag = UP_FUNC_SORRYSERVER_SEND;
- }else{
- protocol_module_base::EVENT_TAG module_event = protocol_module->handle_sorryserver_send(up_thread_id);
- std::map< protocol_module_base::EVENT_TAG ,UP_THREAD_FUNC_TYPE_TAG >::iterator func_type = up_thread_module_event_map.find(module_event);
- if(unlikely( func_type == up_thread_module_event_map.end() )){
- //Error unknown protocol_module_base::EVENT_TAG return
- std::stringstream buf;
- buf << "Thread ID[";
- buf << boost::this_thread::get_id();
- buf << "] protocol_module returnd illegal EVENT_TAG : ";
- buf << module_event;
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 49, buf.str(), __FILE__, __LINE__ );
- up_thread_exit(process_type);
- return;
- }
- func_tag = func_type->second;
+ buf << "] protocol_module returnd illegal EVENT_TAG : ";
+ buf << module_event;
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 49, buf.str(), __FILE__, __LINE__ );
+ up_thread_exit(process_type);
+ return;
}
- break;
+ func_tag = func_type->second;
+ }
+ }else{
+ if(ec == boost::asio::error::try_again){
+ func_tag = UP_FUNC_SORRYSERVER_SEND;
}else{
- if(ec == boost::asio::error::try_again){
- //func_tag = UP_FUNC_SORRYSERVER_SEND;
- //boost::this_thread::yield();
- }else{
- func_tag = UP_FUNC_SORRYSERVER_DISCONNECT;
- break;
- }
+ func_tag = UP_FUNC_SORRYSERVER_DISCONNECT;
}
-
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
- epoll_ctl( up_realserver_epollfd, EPOLL_CTL_MOD, sorryserver_socket.second->get_socket().native(), &event );
-
- epoll_wait( up_realserver_epollfd, up_realserver_events, EVENT_NUM, EPOLL_TIMEOUT );
}
-
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
- epoll_ctl( up_realserver_epollfd, EPOLL_CTL_DEL, sorryserver_socket.second->get_socket().native(), &event );
-
up_thread_function_pair func = up_thread_function_array[func_tag];
if(unlikely( !func.second )){
//Error not find function map
down_thread_data_dest_side.initialize();
boost::array<char,MAX_BUFFER_SIZE>& data_buff = down_thread_data_dest_side.get_data();
boost::system::error_code ec;
- size_t recv_size;
+ size_t recv_size = down_thread_current_receive_realserver_socket->second->read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE),ec);
+ boost::asio::ip::tcp::endpoint server_endpoint = down_thread_current_receive_realserver_socket->first;
+ down_thread_data_dest_side.set_endpoint(server_endpoint);
DOWN_THREAD_FUNC_TYPE_TAG func_tag;
-
- struct epoll_event event;
- event.events = EPOLLET | EPOLLIN | EPOLLHUP;
- epoll_ctl( down_realserver_epollfd, EPOLL_CTL_ADD, down_thread_current_receive_realserver_socket->second->get_socket().native(), &event );
-
- while( true ){
- {
- rd_scoped_lock scoped_lock(exit_flag_update_mutex);
- if(unlikely(exit_flag)){
- func_tag = DOWN_FUNC_EXIT;
- break;
+ if(!ec){
+ if(recv_size > 0){
+ //----Debug log----------------------------------------------------------------------
+ if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
+ std::stringstream buf;
+ buf << "Thread ID[";
+ buf << boost::this_thread::get_id();
+ buf << "] down_thread_realserver_receive";
+ buf << " receive data size[";
+ buf << recv_size;
+ buf << "] from [";
+ buf << server_endpoint;
+ buf << "]";
+ Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 43, buf.str(), __FILE__, __LINE__ );
}
- }
-
- recv_size = down_thread_current_receive_realserver_socket->second->read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE),ec);
-
- boost::asio::ip::tcp::endpoint server_endpoint = down_thread_current_receive_realserver_socket->first;
- down_thread_data_dest_side.set_endpoint(server_endpoint);
- if(!ec){
- if(recv_size > 0){
- //----Debug log----------------------------------------------------------------------
- if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
- std::stringstream buf;
- buf << "Thread ID[";
- buf << boost::this_thread::get_id();
- buf << "] down_thread_realserver_receive";
- buf << " receive data size[";
- buf << recv_size;
- buf << "] from [";
- buf << server_endpoint;
- buf << "]";
- Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 43, buf.str(), __FILE__, __LINE__ );
- }
- //----Debug log----------------------------------------------------------------------
- down_thread_data_dest_side.set_size(recv_size);
- parent_service.update_down_recv_size(recv_size);
- protocol_module_base::EVENT_TAG module_event = protocol_module->handle_realserver_recv(down_thread_id,server_endpoint,data_buff,recv_size);
- std::map< protocol_module_base::EVENT_TAG ,DOWN_THREAD_FUNC_TYPE_TAG >::iterator func_type = down_thread_module_event_map.find(module_event);
- if(unlikely( func_type == down_thread_module_event_map.end() )){
- //Error unknown protocol_module_base::EVENT_TAG return
- std::stringstream buf;
- buf << "Thread ID[";
- buf << boost::this_thread::get_id();
- buf << "] protocol_module returnd illegal EVENT_TAG : ";
- buf << module_event;
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 70, buf.str(), __FILE__, __LINE__ );
- down_thread_exit(process_type);
- return;
- }
- func_tag = func_type->second;
- break;
- }else{
- //func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
- //boost::this_thread::yield();
+ //----Debug log----------------------------------------------------------------------
+ down_thread_data_dest_side.set_size(recv_size);
+ parent_service.update_down_recv_size(recv_size);
+ protocol_module_base::EVENT_TAG module_event = protocol_module->handle_realserver_recv(down_thread_id,server_endpoint,data_buff,recv_size);
+ std::map< protocol_module_base::EVENT_TAG ,DOWN_THREAD_FUNC_TYPE_TAG >::iterator func_type = down_thread_module_event_map.find(module_event);
+ if(unlikely( func_type == down_thread_module_event_map.end() )){
+ //Error unknown protocol_module_base::EVENT_TAG return
+ std::stringstream buf;
+ buf << "Thread ID[";
+ buf << boost::this_thread::get_id();
+ buf << "] protocol_module returnd illegal EVENT_TAG : ";
+ buf << module_event;
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 70, buf.str(), __FILE__, __LINE__ );
+ down_thread_exit(process_type);
+ return;
}
+ func_tag = func_type->second;
}else{
- if(ec == boost::asio::error::try_again){
- //func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
- //boost::this_thread::yield();
- }else{
- func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
- break;
- }
+ func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
+ boost::this_thread::yield();
+ }
+ }else{
+ if(ec == boost::asio::error::try_again){
+ func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
+ boost::this_thread::yield();
+ }else{
+ func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
}
-
- event.events = EPOLLET | EPOLLIN | EPOLLHUP;
- epoll_ctl( down_realserver_epollfd, EPOLL_CTL_MOD, down_thread_current_receive_realserver_socket->second->get_socket().native(), &event );
-
- epoll_wait( down_realserver_epollfd, down_realserver_events, EVENT_NUM, EPOLL_TIMEOUT );
}
-
- event.events = EPOLLET | EPOLLIN | EPOLLHUP;
- epoll_ctl( down_realserver_epollfd, EPOLL_CTL_DEL, down_thread_current_receive_realserver_socket->second->get_socket().native(), &event );
-
down_thread_function_pair func = down_thread_function_array[func_tag];
if(unlikely( !func.second ) ){
//Error not find function map
//! down thread send for client and raise module event of handle_client_send
//! @param[in] process_type is prosecess type
void tcp_session::down_thread_client_send(const TCP_PROCESS_TYPE_TAG process_type){
+
boost::system::error_code ec;
boost::array<char,MAX_BUFFER_SIZE>& data_buff = down_thread_data_client_side.get_data();
std::size_t data_size = down_thread_data_client_side.get_size();
std::size_t send_data_size = down_thread_data_client_side.get_send_size();
std::size_t send_size;
- DOWN_THREAD_FUNC_TYPE_TAG func_tag;
-
- struct epoll_event event;
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
- if ( !ssl_flag ) {
- epoll_ctl( down_client_epollfd, EPOLL_CTL_ADD, client_socket.get_socket().native(), &event );
+ if (!ssl_flag) {
+ send_size = client_socket.write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
} else {
- epoll_ctl( down_client_epollfd, EPOLL_CTL_ADD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
+ send_size = client_ssl_socket.write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
}
-
- while( true ){
- {
- rd_scoped_lock scoped_lock(exit_flag_update_mutex);
- if(unlikely(exit_flag)){
- func_tag = DOWN_FUNC_EXIT;
- break;
+ DOWN_THREAD_FUNC_TYPE_TAG func_tag;
+ if(!ec){
+ send_data_size += send_size;
+ down_thread_data_client_side.set_send_size(send_data_size);
+ parent_service.update_down_send_size(send_size);
+ //----Debug log----------------------------------------------------------------------
+ if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
+ boost::asio::ip::tcp::endpoint client_endpoint;
+ if (!ssl_flag) {
+ client_endpoint = client_socket.get_socket().lowest_layer().remote_endpoint(ec);
+ } else {
+ client_endpoint = client_ssl_socket.get_socket().lowest_layer().remote_endpoint(ec);
}
+ std::stringstream buf;
+ buf << "Thread ID[";
+ buf << boost::this_thread::get_id();
+ buf << "] down_thread_client_send";
+ buf << " send data size[";
+ buf << send_size;
+ buf << "] for [";
+ buf << client_endpoint;
+ buf << "]";
+ Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 45, buf.str(), __FILE__, __LINE__ );
}
-
- if (!ssl_flag) {
- send_size = client_socket.write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
- } else {
- send_size = client_ssl_socket.write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
- }
-
- if(!ec){
- send_data_size += send_size;
- down_thread_data_client_side.set_send_size(send_data_size);
- parent_service.update_down_send_size(send_size);
- //----Debug log----------------------------------------------------------------------
- if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
- boost::asio::ip::tcp::endpoint client_endpoint;
- if (!ssl_flag) {
- client_endpoint = client_socket.get_socket().lowest_layer().remote_endpoint(ec);
- } else {
- client_endpoint = client_ssl_socket.get_socket().lowest_layer().remote_endpoint(ec);
- }
+ //----Debug log----------------------------------------------------------------------
+ if(data_size > send_data_size){
+ func_tag = DOWN_FUNC_CLIENT_SEND;
+ }else{
+ protocol_module_base::EVENT_TAG module_event = protocol_module->handle_client_send(down_thread_id);
+ std::map< protocol_module_base::EVENT_TAG ,DOWN_THREAD_FUNC_TYPE_TAG >::iterator func_type = down_thread_module_event_map.find(module_event);
+ if(unlikely( func_type == down_thread_module_event_map.end() ) ){
+ //Error unknown protocol_module_base::EVENT_TAG return
std::stringstream buf;
buf << "Thread ID[";
buf << boost::this_thread::get_id();
- buf << "] down_thread_client_send";
- buf << " send data size[";
- buf << send_size;
- buf << "] for [";
- buf << client_endpoint;
- buf << "]";
- Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 45, buf.str(), __FILE__, __LINE__ );
- }
- //----Debug log----------------------------------------------------------------------
- if(data_size > send_data_size){
- func_tag = DOWN_FUNC_CLIENT_SEND;
- //down_send_wait.reset();
- }else{
- protocol_module_base::EVENT_TAG module_event = protocol_module->handle_client_send(down_thread_id);
- std::map< protocol_module_base::EVENT_TAG ,DOWN_THREAD_FUNC_TYPE_TAG >::iterator func_type = down_thread_module_event_map.find(module_event);
- if(unlikely( func_type == down_thread_module_event_map.end() ) ){
- //Error unknown protocol_module_base::EVENT_TAG return
- std::stringstream buf;
- buf << "Thread ID[";
- buf << boost::this_thread::get_id();
- buf << "] protocol_module returnd illegal EVENT_TAG : ";
- buf << module_event;
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 82, buf.str(), __FILE__, __LINE__ );
- down_thread_exit(process_type);
- return;
- }
- func_tag = func_type->second;
- }
- break;
- }else{
- if(ec == boost::asio::error::try_again){
- //func_tag = DOWN_FUNC_CLIENT_SEND;
- //boost::this_thread::yield();
- }else{
- func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
- break;
+ buf << "] protocol_module returnd illegal EVENT_TAG : ";
+ buf << module_event;
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 82, buf.str(), __FILE__, __LINE__ );
+ down_thread_exit(process_type);
+ return;
}
+ func_tag = func_type->second;
}
-
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
- if ( !ssl_flag ) {
- epoll_ctl( down_client_epollfd, EPOLL_CTL_MOD, client_socket.get_socket().native(), &event );
- } else {
- epoll_ctl( down_client_epollfd, EPOLL_CTL_MOD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
+ }else{
+ if(ec == boost::asio::error::try_again){
+ func_tag = DOWN_FUNC_CLIENT_SEND;
+ }else{
+ func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
}
-
- epoll_wait( down_client_epollfd, up_client_events, EVENT_NUM, EPOLL_TIMEOUT );
}
-
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
- if ( !ssl_flag ) {
- epoll_ctl( down_client_epollfd, EPOLL_CTL_DEL, client_socket.get_socket().native(), &event );
- } else {
- epoll_ctl( down_client_epollfd, EPOLL_CTL_DEL, client_ssl_socket.get_socket().lowest_layer().native(), &event );
- }
-
down_thread_function_pair func = down_thread_function_array[func_tag];
if(unlikely( !func.second )){
//Error not find function map