// epoll impliment
up_client_epollfd = epoll_create( EVENT_NUM );
up_realserver_epollfd = epoll_create( EVENT_NUM );
+ up_sorryserver_epollfd = epoll_create( EVENT_NUM );
down_client_epollfd = epoll_create( EVENT_NUM );
down_realserver_epollfd = epoll_create( EVENT_NUM );
+ down_sorryserver_epollfd = epoll_create( EVENT_NUM );
+ up_client_epollfd_registered = false;
+ up_realserver_epollfd_registered = false;
+ up_sorryserver_epollfd_registered = false;
+ down_client_epollfd_registered = false;
+ down_realserver_epollfd_registered = false;
+ down_sorryserver_epollfd_registered = false;
+ is_epoll_edge_trigger = true;
+ epoll_timeout = 50;
+
}
//! destructor
tcp_session::~tcp_session(){
downstream_buffer_size = int_val;
}
+ int_val = param.get_int( PARAM_COMP_SESSION, PARAM_EPOLL_TRIGGER, vs_err );
+ if((likely( !vs_err )) && (0 < int_val)){
+ if (int_val == 1) {
+ is_epoll_edge_trigger = true;
+ } else if (int_val == 2) {
+ is_epoll_edge_trigger = false;
+ }
+ }
+
+ int_val = param.get_int( PARAM_COMP_SESSION, PARAM_EPOLL_TIMEOUT, vs_err );
+ if((likely( !vs_err )) && (0 < int_val)){
+ epoll_timeout = int_val;
+ }
+
protocol_module = parent_service.get_protocol_module();
if(unlikely( protocol_module == NULL )){
}
}
+ up_client_epollfd_registered = false;
+ up_realserver_epollfd_registered = false;
+ up_sorryserver_epollfd_registered = false;
+ down_client_epollfd_registered = false;
+ down_realserver_epollfd_registered = false;
+ down_sorryserver_epollfd_registered = false;
+
return msg;
}
UP_THREAD_FUNC_TYPE_TAG func_tag;
struct epoll_event event;
- event.events = EPOLLET | EPOLLIN | EPOLLHUP;
if ( !ssl_flag ) {
- epoll_ctl( up_client_epollfd, EPOLL_CTL_ADD, client_socket.get_socket().native(), &event );
+ event.data.fd = client_socket.get_socket().native();
} else {
- epoll_ctl( up_client_epollfd, EPOLL_CTL_ADD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
+ event.data.fd = client_ssl_socket.get_socket().lowest_layer().native();
+ }
+ if (is_epoll_edge_trigger) {
+ event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+ } else {
+ event.events = EPOLLIN | EPOLLHUP;
+ }
+ bool add_flag = false;
+ if (!up_client_epollfd_registered) {
+ if (epoll_ctl( up_client_epollfd, EPOLL_CTL_ADD, event.data.fd, &event ) < 0) {
+ std::stringstream buf;
+ buf << "up_thread_client_receive : epoll_ctl EPOLL_CTL_ADD error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ boost::this_thread::yield();
+ return;
+ }
+ up_client_epollfd_registered = true;
+ add_flag = true;
}
while( true ){
}
}
+ if (is_epoll_edge_trigger && (!add_flag)) {
+ if (epoll_ctl( up_client_epollfd, EPOLL_CTL_MOD, event.data.fd, &event ) < 0) {
+ std::stringstream buf;
+ buf << "up_thread_client_receive : epoll_ctl EPOLL_CTL_MOD error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ boost::this_thread::yield();
+ return;
+ }
+ }
+
+ int ret_fds = epoll_wait( up_client_epollfd, up_client_events, EVENT_NUM, epoll_timeout );
+ if (ret_fds <= 0) {
+ if (ret_fds == 0) {
+ std::stringstream buf;
+ buf << "up_thread_client_receive : epoll_wait timeout ";
+ buf << epoll_timeout;
+ buf << "mS";
+ Logger::putLogInfo( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ } else {
+ std::stringstream buf;
+ buf << "up_thread_client_receive : epoll_wait error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ }
+ boost::this_thread::yield();
+ return;
+ }
+
+ bool event_check = false;
+ for (int i = 0; i < ret_fds; ++i) {
+ if (up_client_events[i].data.fd == event.data.fd) {
+ if (up_client_events[i].events & EPOLLIN) {
+ event_check = true;
+ break;
+ } else if (up_client_events[i].events & EPOLLHUP) {
+ std::stringstream buf;
+ buf << "up_thread_client_receive : epoll_wait EPOLLHUP";
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ down_thread_exit(process_type);
+ return;
+ }
+ }
+ }
+ if (!event_check) {
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, "up_thread_client_receive : epoll_wait unknown fd", __FILE__, __LINE__ );
+ boost::this_thread::yield();
+ return;
+ }
+
if (!ssl_flag) {
recv_size = client_socket.read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE), ec);
} else {
break;
}
}
-
- event.events = EPOLLET | EPOLLIN | EPOLLHUP;
- if ( !ssl_flag ) {
- epoll_ctl( up_client_epollfd, EPOLL_CTL_MOD, client_socket.get_socket().native(), &event );
- } else {
- epoll_ctl( up_client_epollfd, EPOLL_CTL_MOD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
- }
-
- epoll_wait( up_client_epollfd, up_client_events, EVENT_NUM, EPOLL_TIMEOUT );
- }
-
- event.events = EPOLLET | EPOLLIN | EPOLLHUP;
- if ( !ssl_flag ) {
- epoll_ctl( up_client_epollfd, EPOLL_CTL_DEL, client_socket.get_socket().native(), &event );
- } else {
- epoll_ctl( up_client_epollfd, EPOLL_CTL_DEL, client_ssl_socket.get_socket().lowest_layer().native(), &event );
}
up_thread_function_pair func = up_thread_function_array[func_tag];
UP_THREAD_FUNC_TYPE_TAG func_tag;
struct epoll_event event;
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
- epoll_ctl( up_realserver_epollfd, EPOLL_CTL_ADD, send_socket->second->get_socket().native(), &event );
+ event.data.fd = send_socket->second->get_socket().native();
+ if (is_epoll_edge_trigger) {
+ event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+ } else {
+ event.events = EPOLLOUT | EPOLLHUP;
+ }
+ bool add_flag = false;
+ if (!up_realserver_epollfd_registered) {
+ if (epoll_ctl( up_realserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event ) < 0) {
+ std::stringstream buf;
+ buf << "up_thread_realserver_send : epoll_ctl EPOLL_CTL_ADD error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ boost::this_thread::yield();
+ return;
+ }
+ up_realserver_epollfd_registered = true;
+ add_flag = true;
+ }
while( true ){
{
}
}
+ if (is_epoll_edge_trigger && (!add_flag)) {
+ if (epoll_ctl( up_realserver_epollfd, EPOLL_CTL_MOD, event.data.fd, &event ) < 0) {
+ std::stringstream buf;
+ buf << "up_thread_realserver_send : epoll_ctl EPOLL_CTL_MOD error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ boost::this_thread::yield();
+ return;
+ }
+ }
+
+ int ret_fds = epoll_wait( up_realserver_epollfd, up_realserver_events, EVENT_NUM, epoll_timeout );
+ if (ret_fds <= 0) {
+ if (ret_fds == 0) {
+ std::stringstream buf;
+ buf << "up_thread_realserver_send : epoll_wait timeout ";
+ buf << epoll_timeout;
+ buf << "mS";
+ Logger::putLogInfo( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ } else {
+ std::stringstream buf;
+ buf << "up_thread_realserver_send : epoll_wait error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ }
+ boost::this_thread::yield();
+ return;
+ }
+
+ bool event_check = false;
+ for (int i = 0; i < ret_fds; ++i) {
+ if (up_realserver_events[i].data.fd == event.data.fd) {
+ if (up_realserver_events[i].events & EPOLLOUT) {
+ event_check = true;
+ break;
+ } else if (up_realserver_events[i].events & EPOLLHUP) {
+ std::stringstream buf;
+ buf << "up_thread_realserver_send : epoll_wait EPOLLHUP";
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ down_thread_exit(process_type);
+ return;
+ }
+ }
+ }
+ if (!event_check) {
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, "up_thread_realserver_send : epoll_wait unknown fd", __FILE__, __LINE__ );
+ boost::this_thread::yield();
+ return;
+ }
+
send_size = send_socket->second->write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
if( !ec ){
break;
}
}
-
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
- epoll_ctl( up_realserver_epollfd, EPOLL_CTL_MOD, send_socket->second->get_socket().native(), &event );
-
- epoll_wait( up_realserver_epollfd, up_realserver_events, EVENT_NUM, EPOLL_TIMEOUT );
}
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
- epoll_ctl( up_realserver_epollfd, EPOLL_CTL_DEL, send_socket->second->get_socket().native(), &event );
-
up_thread_function_pair func = up_thread_function_array[func_tag];
if( unlikely( !func.second ) ){
//Error not find function map
UP_THREAD_FUNC_TYPE_TAG func_tag;
struct epoll_event event;
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
- epoll_ctl( up_realserver_epollfd, EPOLL_CTL_ADD, sorryserver_socket.second->get_socket().native(), &event );
+ event.data.fd = sorryserver_socket.second->get_socket().native();
+ if (is_epoll_edge_trigger) {
+ event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+ } else {
+ event.events = EPOLLOUT | EPOLLHUP;
+ }
+ bool add_flag = false;
+ if (!up_sorryserver_epollfd_registered) {
+ if (epoll_ctl( up_sorryserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event ) < 0) {
+ std::stringstream buf;
+ buf << "up_thread_sorryserver_send : epoll_ctl EPOLL_CTL_ADD error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ boost::this_thread::yield();
+ return;
+ }
+ up_sorryserver_epollfd_registered = true;
+ add_flag = true;
+ }
while( true ){
{
}
}
+ if (is_epoll_edge_trigger && (!add_flag)) {
+ if (epoll_ctl( up_sorryserver_epollfd, EPOLL_CTL_MOD, event.data.fd, &event ) < 0) {
+ std::stringstream buf;
+ buf << "up_thread_sorryserver_send : epoll_ctl EPOLL_CTL_MOD error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ boost::this_thread::yield();
+ return;
+ }
+ }
+
+ int ret_fds = epoll_wait( up_sorryserver_epollfd, up_sorryserver_events, EVENT_NUM, epoll_timeout );
+ if (ret_fds <= 0) {
+ if (ret_fds == 0) {
+ std::stringstream buf;
+ buf << "up_thread_sorryserver_send : epoll_wait timeout ";
+ buf << epoll_timeout;
+ buf << "mS";
+ Logger::putLogInfo( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ } else {
+ std::stringstream buf;
+ buf << "up_thread_sorryserver_send : epoll_wait error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ }
+ boost::this_thread::yield();
+ return;
+ }
+
+ bool event_check = false;
+ for (int i = 0; i < ret_fds; ++i) {
+ if (up_sorryserver_events[i].data.fd == event.data.fd) {
+ if (up_sorryserver_events[i].events & EPOLLOUT) {
+ event_check = true;
+ break;
+ } else if (up_sorryserver_events[i].events & EPOLLHUP) {
+ std::stringstream buf;
+ buf << "up_thread_sorryserver_send : epoll_wait EPOLLHUP";
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ down_thread_exit(process_type);
+ return;
+ }
+ }
+ }
+ if (!event_check) {
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, "up_thread_sorryserver_send : epoll_wait unknown fd", __FILE__, __LINE__ );
+ boost::this_thread::yield();
+ return;
+ }
+
send_size = sorryserver_socket.second->write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
if(!ec){
break;
}
}
-
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
- epoll_ctl( up_realserver_epollfd, EPOLL_CTL_MOD, sorryserver_socket.second->get_socket().native(), &event );
-
- epoll_wait( up_realserver_epollfd, up_realserver_events, EVENT_NUM, EPOLL_TIMEOUT );
}
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
- epoll_ctl( up_realserver_epollfd, EPOLL_CTL_DEL, sorryserver_socket.second->get_socket().native(), &event );
-
up_thread_function_pair func = up_thread_function_array[func_tag];
if(unlikely( !func.second )){
//Error not find function map
DOWN_THREAD_FUNC_TYPE_TAG func_tag;
struct epoll_event event;
- event.events = EPOLLET | EPOLLIN | EPOLLHUP;
- epoll_ctl( down_realserver_epollfd, EPOLL_CTL_ADD, down_thread_current_receive_realserver_socket->second->get_socket().native(), &event );
+ event.data.fd = down_thread_current_receive_realserver_socket->second->get_socket().native();
+ if (is_epoll_edge_trigger) {
+ event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+ } else {
+ event.events = EPOLLIN | EPOLLHUP;
+ }
+ bool add_flag = false;
+ if (!down_realserver_epollfd_registered) {
+ if (epoll_ctl( down_realserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event ) < 0) {
+ std::stringstream buf;
+ buf << "doun_thread_realserver_receive : epoll_ctl EPOLL_CTL_ADD error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ boost::this_thread::yield();
+ return;
+ }
+ down_realserver_epollfd_registered = true;
+ add_flag = true;
+ }
while( true ){
{
}
}
+ if (is_epoll_edge_trigger && (!add_flag)) {
+ if (epoll_ctl( down_realserver_epollfd, EPOLL_CTL_MOD, event.data.fd, &event ) < 0) {
+ std::stringstream buf;
+ buf << "down_thread_realserver_receive : epoll_ctl EPOLL_CTL_MOD error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ boost::this_thread::yield();
+ return;
+ }
+ }
+
+ int ret_fds = epoll_wait( down_realserver_epollfd, down_realserver_events, EVENT_NUM, epoll_timeout );
+ if (ret_fds <= 0) {
+ if (ret_fds == 0) {
+ std::stringstream buf;
+ buf << "down_thread_realserver_receive : epoll_wait timeout ";
+ buf << epoll_timeout;
+ buf << "mS";
+ Logger::putLogInfo( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ } else {
+ std::stringstream buf;
+ buf << "down_thread_realserver_receive : epoll_wait error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ }
+ boost::this_thread::yield();
+ return;
+ }
+
+ bool event_check = false;
+ for (int i = 0; i < ret_fds; ++i) {
+ if (down_realserver_events[i].data.fd == event.data.fd) {
+ if (down_realserver_events[i].events & EPOLLIN) {
+ event_check = true;
+ break;
+ } else if (down_realserver_events[i].events & EPOLLHUP) {
+ std::stringstream buf;
+ buf << "down_thread_realserver_receive : epoll_wait EPOLLHUP";
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ down_thread_exit(process_type);
+ return;
+ }
+ }
+ }
+ if (!event_check) {
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, "down_thread_realserver_receive : epoll_wait unknown fd", __FILE__, __LINE__ );
+ boost::this_thread::yield();
+ return;
+ }
+
recv_size = down_thread_current_receive_realserver_socket->second->read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE),ec);
boost::asio::ip::tcp::endpoint server_endpoint = down_thread_current_receive_realserver_socket->first;
break;
}
}
-
- event.events = EPOLLET | EPOLLIN | EPOLLHUP;
- epoll_ctl( down_realserver_epollfd, EPOLL_CTL_MOD, down_thread_current_receive_realserver_socket->second->get_socket().native(), &event );
-
- epoll_wait( down_realserver_epollfd, down_realserver_events, EVENT_NUM, EPOLL_TIMEOUT );
}
- event.events = EPOLLET | EPOLLIN | EPOLLHUP;
- epoll_ctl( down_realserver_epollfd, EPOLL_CTL_DEL, down_thread_current_receive_realserver_socket->second->get_socket().native(), &event );
-
down_thread_function_pair func = down_thread_function_array[func_tag];
if(unlikely( !func.second ) ){
//Error not find function map
DOWN_THREAD_FUNC_TYPE_TAG func_tag;
struct epoll_event event;
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
if ( !ssl_flag ) {
- epoll_ctl( down_client_epollfd, EPOLL_CTL_ADD, client_socket.get_socket().native(), &event );
+ event.data.fd = client_socket.get_socket().native();
+ } else {
+ event.data.fd = client_ssl_socket.get_socket().lowest_layer().native();
+ }
+ if (is_epoll_edge_trigger) {
+ event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
} else {
- epoll_ctl( down_client_epollfd, EPOLL_CTL_ADD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
+ event.events = EPOLLOUT | EPOLLHUP;
+ }
+ bool add_flag = false;
+ if (!down_client_epollfd_registered) {
+ if (epoll_ctl( down_client_epollfd, EPOLL_CTL_ADD, event.data.fd, &event ) < 0) {
+ std::stringstream buf;
+ buf << "down_thread_client_send : epoll_ctl EPOLL_CTL_ADD error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ boost::this_thread::yield();
+ return;
+ }
+ down_client_epollfd_registered = true;
+ add_flag = true;
}
while( true ){
}
}
+ if (is_epoll_edge_trigger && (!add_flag)) {
+ if (epoll_ctl( down_client_epollfd, EPOLL_CTL_MOD, event.data.fd, &event ) < 0) {
+ std::stringstream buf;
+ buf << "down_thread_client_send : epoll_ctl EPOLL_CTL_MOD error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ boost::this_thread::yield();
+ return;
+ }
+ }
+
+ int ret_fds = epoll_wait( down_client_epollfd, down_client_events, EVENT_NUM, epoll_timeout );
+ if (ret_fds <= 0) {
+ if (ret_fds == 0) {
+ std::stringstream buf;
+ buf << "down_thread_client_send : epoll_wait timeout ";
+ buf << epoll_timeout;
+ buf << "mS";
+ Logger::putLogInfo( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ } else {
+ std::stringstream buf;
+ buf << "down_thread_client_send : epoll_wait error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ }
+ boost::this_thread::yield();
+ return;
+ }
+
+ bool event_check = false;
+ for (int i = 0; i < ret_fds; ++i) {
+ if (down_client_events[i].data.fd == event.data.fd) {
+ if (down_client_events[i].events & EPOLLOUT) {
+ event_check = true;
+ break;
+ } else if (down_client_events[i].events & EPOLLHUP) {
+ std::stringstream buf;
+ buf << "down_thread_client_send : epoll_wait EPOLLHUP";
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ down_thread_exit(process_type);
+ return;
+ }
+ }
+ }
+ if (!event_check) {
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, "down_thread_client_send : epoll_wait unknown fd", __FILE__, __LINE__ );
+ boost::this_thread::yield();
+ return;
+ }
+
if (!ssl_flag) {
send_size = client_socket.write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
} else {
break;
}
}
-
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
- if ( !ssl_flag ) {
- epoll_ctl( down_client_epollfd, EPOLL_CTL_MOD, client_socket.get_socket().native(), &event );
- } else {
- epoll_ctl( down_client_epollfd, EPOLL_CTL_MOD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
- }
-
- epoll_wait( down_client_epollfd, up_client_events, EVENT_NUM, EPOLL_TIMEOUT );
- }
-
- event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
- if ( !ssl_flag ) {
- epoll_ctl( down_client_epollfd, EPOLL_CTL_DEL, client_socket.get_socket().native(), &event );
- } else {
- epoll_ctl( down_client_epollfd, EPOLL_CTL_DEL, client_ssl_socket.get_socket().lowest_layer().native(), &event );
}
down_thread_function_pair func = down_thread_function_array[func_tag];
down_thread_data_dest_side.initialize();
boost::array<char,MAX_BUFFER_SIZE>& data_buff = down_thread_data_dest_side.get_data();
boost::system::error_code ec;
- size_t recv_size = sorryserver_socket.second->read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE),ec);
- boost::asio::ip::tcp::endpoint sorry_endpoint = sorryserver_socket.first;
- down_thread_data_dest_side.set_endpoint(sorry_endpoint);
+ size_t recv_size;
DOWN_THREAD_FUNC_TYPE_TAG func_tag;
- if(!ec){
- if(recv_size > 0){
- //----Debug log----------------------------------------------------------------------
- if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
- std::stringstream buf;
- buf << "Thread ID[";
- buf << boost::this_thread::get_id();
- buf << "] down_thread_sorryserver_receive";
- buf << " receive data size[";
- buf << recv_size;
- buf << "] from [";
- buf << sorry_endpoint;
- buf << "]";
- Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 46, buf.str(), __FILE__, __LINE__ );
+
+ struct epoll_event event;
+ bool add_flag = false;
+ if (!down_sorryserver_epollfd_registered) {
+ event.data.fd = sorryserver_socket.second->get_socket().native();
+ if (is_epoll_edge_trigger) {
+ event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+ } else {
+ event.events = EPOLLIN | EPOLLHUP;
+ }
+ if (epoll_ctl( down_sorryserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event ) < 0) {
+ std::stringstream buf;
+ buf << "doun_thread_sorryserver_receive : epoll_ctl EPOLL_CTL_ADD error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ boost::this_thread::yield();
+ return;
+ }
+ down_sorryserver_epollfd_registered = true;
+ add_flag = true;
+ }
+
+ while( true ){
+ {
+ rd_scoped_lock scoped_lock(exit_flag_update_mutex);
+ if(unlikely(exit_flag)){
+ func_tag = DOWN_FUNC_EXIT;
+ break;
}
- //----Debug log----------------------------------------------------------------------
- down_thread_data_dest_side.set_size(recv_size);
- protocol_module_base::EVENT_TAG module_event = protocol_module->handle_sorryserver_recv(down_thread_id,sorry_endpoint,data_buff,recv_size);
- std::map< protocol_module_base::EVENT_TAG ,DOWN_THREAD_FUNC_TYPE_TAG >::iterator func_type = down_thread_module_event_map.find(module_event);
- if(unlikely( func_type == down_thread_module_event_map.end() )){
- //Error unknown protocol_module_base::EVENT_TAG return
+ }
+
+ if (is_epoll_edge_trigger && (!add_flag)) {
+ if (epoll_ctl( down_sorryserver_epollfd, EPOLL_CTL_MOD, event.data.fd, &event ) < 0) {
std::stringstream buf;
- buf << "Thread ID[";
- buf << boost::this_thread::get_id();
- buf << "] protocol_module returnd illegal EVENT_TAG : ";
- buf << module_event;
- Logger::putLogError( LOG_CAT_L7VSD_SESSION, 88, buf.str(), __FILE__, __LINE__ );
- down_thread_exit(process_type);
+ buf << "down_thread_sorryserver_receive : epoll_ctl EPOLL_CTL_MOD error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ boost::this_thread::yield();
return;
}
- func_tag = func_type->second;
- }else{
- func_tag = DOWN_FUNC_SORRYSERVER_RECEIVE;
+ }
+
+ int ret_fds = epoll_wait( down_sorryserver_epollfd, down_sorryserver_events, EVENT_NUM, epoll_timeout );
+ if (ret_fds <= 0) {
+ if (ret_fds == 0) {
+ std::stringstream buf;
+ buf << "down_thread_sorryserver_receive : epoll_wait timeout ";
+ buf << epoll_timeout;
+ buf << "mS";
+ Logger::putLogInfo( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ } else {
+ std::stringstream buf;
+ buf << "down_thread_sorryserver_receive : epoll_wait error : ";
+ buf << strerror(errno);
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ }
boost::this_thread::yield();
+ return;
}
- }else{
- if(ec == boost::asio::error::try_again){
- func_tag = DOWN_FUNC_SORRYSERVER_RECEIVE;
+
+ bool event_check = false;
+ for (int i = 0; i < ret_fds; ++i) {
+ if (down_sorryserver_events[i].data.fd == event.data.fd) {
+ if (down_sorryserver_events[i].events & EPOLLIN) {
+ event_check = true;
+ break;
+ } else if (down_sorryserver_events[i].events & EPOLLHUP) {
+ std::stringstream buf;
+ buf << "down_thread_sorryserver_receive : epoll_wait EPOLLHUP";
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__ );
+ down_thread_exit(process_type);
+ return;
+ }
+ }
+ }
+ if (!event_check) {
+ Logger::putLogWarn( LOG_CAT_L7VSD_SESSION, 999, "down_thread_sorryserver_receive : epoll_wait unknown fd", __FILE__, __LINE__ );
boost::this_thread::yield();
+ return;
+ }
+
+ recv_size = sorryserver_socket.second->read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE),ec);
+
+ boost::asio::ip::tcp::endpoint sorry_endpoint = sorryserver_socket.first;
+ down_thread_data_dest_side.set_endpoint(sorry_endpoint);
+ if(!ec){
+ if(recv_size > 0){
+ //----Debug log----------------------------------------------------------------------
+ if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
+ std::stringstream buf;
+ buf << "Thread ID[";
+ buf << boost::this_thread::get_id();
+ buf << "] down_thread_sorryserver_receive";
+ buf << " receive data size[";
+ buf << recv_size;
+ buf << "] from [";
+ buf << sorry_endpoint;
+ buf << "]";
+ Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 46, buf.str(), __FILE__, __LINE__ );
+ }
+ //----Debug log----------------------------------------------------------------------
+ down_thread_data_dest_side.set_size(recv_size);
+ protocol_module_base::EVENT_TAG module_event = protocol_module->handle_sorryserver_recv(down_thread_id,sorry_endpoint,data_buff,recv_size);
+ std::map< protocol_module_base::EVENT_TAG ,DOWN_THREAD_FUNC_TYPE_TAG >::iterator func_type = down_thread_module_event_map.find(module_event);
+ if(unlikely( func_type == down_thread_module_event_map.end() )){
+ //Error unknown protocol_module_base::EVENT_TAG return
+ std::stringstream buf;
+ buf << "Thread ID[";
+ buf << boost::this_thread::get_id();
+ buf << "] protocol_module returnd illegal EVENT_TAG : ";
+ buf << module_event;
+ Logger::putLogError( LOG_CAT_L7VSD_SESSION, 88, buf.str(), __FILE__, __LINE__ );
+ down_thread_exit(process_type);
+ return;
+ }
+ func_tag = func_type->second;
+ }else{
+ //func_tag = DOWN_FUNC_SORRYSERVER_RECEIVE;
+ //boost::this_thread::yield();
+ }
}else{
- func_tag = DOWN_FUNC_SORRYSERVER_DISCONNECT;
+ if(ec == boost::asio::error::try_again){
+ //func_tag = DOWN_FUNC_SORRYSERVER_RECEIVE;
+ //boost::this_thread::yield();
+ }else{
+ func_tag = DOWN_FUNC_SORRYSERVER_DISCONNECT;
+ }
}
}
+
down_thread_function_pair func = down_thread_function_array[func_tag];
if(unlikely( !func.second )){
//Error not find function map