OSDN Git Service

epoll fix by morisita-san
authortanuma <tanuma@1ed66053-1c2d-0410-8867-f7571e6e31d3>
Wed, 3 Mar 2010 03:40:48 +0000 (03:40 +0000)
committertanuma <tanuma@1ed66053-1c2d-0410-8867-f7571e6e31d3>
Wed, 3 Mar 2010 03:40:48 +0000 (03:40 +0000)
git-svn-id: http://10.144.169.20/repos/um/branches/l7vsd-3.x-ramiel@10008 1ed66053-1c2d-0410-8867-f7571e6e31d3

l7vsd/include/tcp_session.h
l7vsd/src/tcp_session.cpp

index f745e4d..14b7c0d 100644 (file)
@@ -30,6 +30,7 @@
 //#include <boost/asio/ssl.hpp>
 #include <boost/thread/thread.hpp>
 #include <boost/thread/mutex.hpp>
+#include <sys/epoll.h>
 
 #include "wrlock.h"
 #include "protocol_module_base.h"
@@ -314,7 +315,18 @@ namespace l7vs{
             //! socket option 
             tcp_socket_option_info socket_opt_info;
 
-           
+            // epoll using member
+            #define EVENT_NUM       2
+            #define EPOLL_TIMEOUT   50      //[microsecond]
+            struct epoll_event  up_client_events[EVENT_NUM];
+            struct epoll_event  up_realserver_events[EVENT_NUM];
+            struct epoll_event  down_client_events[EVENT_NUM];
+            struct epoll_event  down_realserver_events[EVENT_NUM];
+            int    up_client_epollfd;
+            int    up_realserver_epollfd;
+            int    down_client_epollfd;
+            int    down_realserver_epollfd;
+
             //! handshake timer handler
             //! @param[in]        error is error code object
             virtual void handle_ssl_handshake_timer(
index 6174896..5092aa5 100644 (file)
@@ -280,6 +280,11 @@ namespace l7vs{
         add_down_thread_vs_message_func.second = boost::bind(&tcp_session::down_thread_exit,this,_1);
         virtual_service_message_down_thread_function_map.insert(add_down_thread_vs_message_func);
 
+        // epoll impliment
+        up_client_epollfd = epoll_create( EVENT_NUM );
+        up_realserver_epollfd = epoll_create( EVENT_NUM );
+        down_client_epollfd = epoll_create( EVENT_NUM );
+        down_realserver_epollfd = epoll_create( EVENT_NUM );
     }
     //! destructor
     tcp_session::~tcp_session(){
@@ -312,6 +317,10 @@ namespace l7vs{
             else
                 break;
         }
+        close( up_client_epollfd );
+        close( up_realserver_epollfd );
+        close( down_client_epollfd );
+        close( down_realserver_epollfd );
     }
     //! initialize
     session_result_message tcp_session::initialize(){
@@ -1476,62 +1485,101 @@ namespace l7vs{
         boost::array<char,MAX_BUFFER_SIZE>& data_buff = up_thread_data_client_side.get_data();
         boost::system::error_code ec;
         std::size_t recv_size;
-        if (!ssl_flag) {
-            recv_size = client_socket.read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE), ec);
+        UP_THREAD_FUNC_TYPE_TAG func_tag;
+
+        struct epoll_event event;
+        event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+        if ( !ssl_flag ) {
+            epoll_ctl( up_client_epollfd, EPOLL_CTL_ADD, client_socket.get_socket().native(), &event );
         } else {
-            recv_size = client_ssl_socket.read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE), ec);
+            epoll_ctl( up_client_epollfd, EPOLL_CTL_ADD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
         }
-        UP_THREAD_FUNC_TYPE_TAG func_tag;
-        if(!ec){
-            if(recv_size > 0){
-                //----Debug log----------------------------------------------------------------------
-                if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
-                    boost::asio::ip::tcp::endpoint client_endpoint;
-                    if (!ssl_flag) {
-                        client_endpoint = client_socket.get_socket().lowest_layer().remote_endpoint(ec);
-                    } else {
-                        client_endpoint = client_ssl_socket.get_socket().lowest_layer().remote_endpoint(ec);
-                    }
-                    std::stringstream buf;
-                    buf << "Thread ID[";
-                    buf << boost::this_thread::get_id();
-                    buf << "] up_thread_client_receive";
-                    buf << " receive data size[";
-                    buf << recv_size;
-                    buf << "] from [";
-                    buf << client_endpoint;
-                    buf << "]";
-                    Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 36, buf.str(), __FILE__, __LINE__ );
+
+        while( true ){
+            {
+                rd_scoped_lock scoped_lock(exit_flag_update_mutex);
+                if(unlikely(exit_flag)){
+                    func_tag = UP_FUNC_EXIT;
+                    break;
                 }
-                //----Debug log----------------------------------------------------------------------
-                up_thread_data_client_side.set_size(recv_size);
-                parent_service.update_up_recv_size(recv_size);
-                protocol_module_base::EVENT_TAG module_event = protocol_module->handle_client_recv(up_thread_id,data_buff,recv_size);
-                std::map< protocol_module_base::EVENT_TAG ,UP_THREAD_FUNC_TYPE_TAG >::iterator func_type = up_thread_module_event_map.find(module_event);
-                if(unlikely( func_type == up_thread_module_event_map.end() )){
-                    //Error unknown protocol_module_base::EVENT_TAG return
-                    std::stringstream buf;
-                    buf << "Thread ID[";
-                    buf << boost::this_thread::get_id();
-                    buf << "] protocol_module returnd illegal EVENT_TAG : ";
-                    buf << module_event;
-                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 20, buf.str(), __FILE__, __LINE__ );
-                    up_thread_exit(process_type);
-                    return;
+            }
+
+            if (!ssl_flag) {
+                recv_size = client_socket.read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE), ec);
+            } else {
+                recv_size = client_ssl_socket.read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE), ec);
+            }
+
+            if(!ec){
+                if(recv_size > 0){
+                    //----Debug log----------------------------------------------------------------------
+                    if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
+                        boost::asio::ip::tcp::endpoint client_endpoint;
+                        if (!ssl_flag) {
+                            client_endpoint = client_socket.get_socket().lowest_layer().remote_endpoint(ec);
+                        } else {
+                            client_endpoint = client_ssl_socket.get_socket().lowest_layer().remote_endpoint(ec);
+                        }
+                        std::stringstream buf;
+                        buf << "Thread ID[";
+                        buf << boost::this_thread::get_id();
+                        buf << "] up_thread_client_receive";
+                        buf << " receive data size[";
+                        buf << recv_size;
+                        buf << "] from [";
+                        buf << client_endpoint;
+                        buf << "]";
+                        Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 36, buf.str(), __FILE__, __LINE__ );
+                    }
+                    //----Debug log----------------------------------------------------------------------
+                    up_thread_data_client_side.set_size(recv_size);
+                    parent_service.update_up_recv_size(recv_size);
+                    protocol_module_base::EVENT_TAG module_event = protocol_module->handle_client_recv(up_thread_id,data_buff,recv_size);
+                    std::map< protocol_module_base::EVENT_TAG ,UP_THREAD_FUNC_TYPE_TAG >::iterator func_type = up_thread_module_event_map.find(module_event);
+                    if(unlikely( func_type == up_thread_module_event_map.end() )){
+                        //Error unknown protocol_module_base::EVENT_TAG return
+                        std::stringstream buf;
+                        buf << "Thread ID[";
+                        buf << boost::this_thread::get_id();
+                        buf << "] protocol_module returnd illegal EVENT_TAG : ";
+                        buf << module_event;
+                        Logger::putLogError( LOG_CAT_L7VSD_SESSION, 20, buf.str(), __FILE__, __LINE__ );
+                        up_thread_exit(process_type);
+                        return;
+                    }
+                    func_tag = func_type->second;
+                    break;
+                }else{
+                    //func_tag = UP_FUNC_CLIENT_RECEIVE;
+                    //boost::this_thread::yield();
                 }
-                func_tag = func_type->second;
             }else{
-                func_tag = UP_FUNC_CLIENT_RECEIVE;
-                boost::this_thread::yield();
+                if(ec == boost::asio::error::try_again){
+                    //func_tag = UP_FUNC_CLIENT_RECEIVE;
+                    //boost::this_thread::yield();
+                }else{
+                    func_tag = UP_FUNC_CLIENT_DISCONNECT;
+                    break;
+                }
             }
-        }else{
-            if(ec == boost::asio::error::try_again){
-                func_tag = UP_FUNC_CLIENT_RECEIVE;
-                boost::this_thread::yield();
-            }else{
-                func_tag = UP_FUNC_CLIENT_DISCONNECT;
+
+            event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+            if ( !ssl_flag ) {
+                epoll_ctl( up_client_epollfd, EPOLL_CTL_MOD, client_socket.get_socket().native(), &event );
+            } else {
+                epoll_ctl( up_client_epollfd, EPOLL_CTL_MOD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
             }
+
+            epoll_wait( up_client_epollfd, up_client_events, EVENT_NUM, EPOLL_TIMEOUT );
         }
+
+        event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+        if ( !ssl_flag ) {
+            epoll_ctl( up_client_epollfd, EPOLL_CTL_DEL, client_socket.get_socket().native(), &event );
+        } else {
+            epoll_ctl( up_client_epollfd, EPOLL_CTL_DEL, client_ssl_socket.get_socket().lowest_layer().native(), &event );
+        }
+
         up_thread_function_pair    func    = up_thread_function_array[func_tag];
         if(unlikely( !func.second )){
             //Error not find function map
@@ -1696,51 +1744,80 @@ namespace l7vs{
         boost::array<char,MAX_BUFFER_SIZE>& data_buff = up_thread_data_dest_side.get_data();
         std::size_t data_size = up_thread_data_dest_side.get_size();
         std::size_t send_data_size = up_thread_data_dest_side.get_send_size();
-        std::size_t send_size = send_socket->second->write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
+        std::size_t send_size;
         UP_THREAD_FUNC_TYPE_TAG func_tag;
-        if( !ec ){
-            send_data_size += send_size;
-            up_thread_data_dest_side.set_send_size(send_data_size);
-            parent_service.update_up_send_size(send_size);
-            //----Debug log----------------------------------------------------------------------
-            if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
-                std::stringstream buf;
-                buf << "Thread ID[";
-                buf << boost::this_thread::get_id();
-                buf << "] up_thread_realserver_send";
-                buf << " send data size[";
-                buf << send_size;
-                buf << "] for [";
-                buf << server_endpoint;
-                buf << "]";
-                Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 37, buf.str(), __FILE__, __LINE__ );
+
+        struct epoll_event event;
+        event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+        epoll_ctl( up_realserver_epollfd, EPOLL_CTL_ADD, send_socket->second->get_socket().native(), &event );
+
+        while( true ){
+            {
+                rd_scoped_lock scoped_lock(exit_flag_update_mutex);
+                if(unlikely(exit_flag)){
+                    func_tag = UP_FUNC_EXIT;
+                    break;
+                }
             }
-            //----Debug log----------------------------------------------------------------------
-            if(data_size > send_data_size){
-                func_tag = UP_FUNC_REALSERVER_SEND;
-            }else{
-                protocol_module_base::EVENT_TAG module_event = protocol_module->handle_realserver_send(up_thread_id);
-                std::map< protocol_module_base::EVENT_TAG ,UP_THREAD_FUNC_TYPE_TAG >::iterator func_type = up_thread_module_event_map.find(module_event);
-                if( unlikely( func_type == up_thread_module_event_map.end() )){
-                    //Error unknown protocol_module_base::EVENT_TAG return
+
+            send_size = send_socket->second->write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
+
+            if( !ec ){
+                send_data_size += send_size;
+                up_thread_data_dest_side.set_send_size(send_data_size);
+                parent_service.update_up_send_size(send_size);
+                //----Debug log----------------------------------------------------------------------
+                if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
                     std::stringstream buf;
                     buf << "Thread ID[";
                     buf << boost::this_thread::get_id();
-                    buf << "] protocol_module returnd illegal EVENT_TAG : ";
-                    buf << module_event;    
-                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 30, buf.str(), __FILE__, __LINE__ );
-                    up_thread_exit(process_type);
-                    return;
+                    buf << "] up_thread_realserver_send";
+                    buf << " send data size[";
+                    buf << send_size;
+                    buf << "] for [";
+                    buf << server_endpoint;
+                    buf << "]";
+                    Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 37, buf.str(), __FILE__, __LINE__ );
                 }
-                func_tag = func_type->second;
-            }
-        }else{
-            if(ec == boost::asio::error::try_again){
-                func_tag = UP_FUNC_REALSERVER_SEND;
+                //----Debug log----------------------------------------------------------------------
+                if(data_size > send_data_size){
+                    func_tag = UP_FUNC_REALSERVER_SEND;
+                }else{
+                    protocol_module_base::EVENT_TAG module_event = protocol_module->handle_realserver_send(up_thread_id);
+                    std::map< protocol_module_base::EVENT_TAG ,UP_THREAD_FUNC_TYPE_TAG >::iterator func_type = up_thread_module_event_map.find(module_event);
+                    if( unlikely( func_type == up_thread_module_event_map.end() )){
+                        //Error unknown protocol_module_base::EVENT_TAG return
+                        std::stringstream buf;
+                        buf << "Thread ID[";
+                        buf << boost::this_thread::get_id();
+                        buf << "] protocol_module returnd illegal EVENT_TAG : ";
+                        buf << module_event;    
+                        Logger::putLogError( LOG_CAT_L7VSD_SESSION, 30, buf.str(), __FILE__, __LINE__ );
+                        up_thread_exit(process_type);
+                        return;
+                    }
+                    func_tag = func_type->second;
+                }
+                break;
             }else{
-                func_tag = UP_FUNC_REALSERVER_DISCONNECT;
+                if(ec == boost::asio::error::try_again){
+                    //func_tag = UP_FUNC_REALSERVER_SEND;
+                    //boost::this_thread::yield();
+                }else{
+                    func_tag = UP_FUNC_REALSERVER_DISCONNECT;
+                    break;
+                }
             }
+
+            event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+            epoll_ctl( up_realserver_epollfd, EPOLL_CTL_MOD, send_socket->second->get_socket().native(), &event );
+
+            epoll_wait( up_realserver_epollfd, up_realserver_events, EVENT_NUM, EPOLL_TIMEOUT );
         }
+
+        event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+        epoll_ctl( up_realserver_epollfd, EPOLL_CTL_DEL, send_socket->second->get_socket().native(), &event );
+
         up_thread_function_pair    func    = up_thread_function_array[func_tag];
         if( unlikely( !func.second ) ){
             //Error not find function map
@@ -2135,50 +2212,79 @@ namespace l7vs{
         boost::array<char,MAX_BUFFER_SIZE>& data_buff = up_thread_data_dest_side.get_data();
         std::size_t data_size = up_thread_data_dest_side.get_size();
         std::size_t send_data_size = up_thread_data_dest_side.get_send_size();
-        std::size_t send_size = sorryserver_socket.second->write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
+        std::size_t send_size;
         UP_THREAD_FUNC_TYPE_TAG func_tag;
-        if(!ec){
-            send_data_size += send_size;
-            up_thread_data_dest_side.set_send_size(send_data_size);
-            //----Debug log----------------------------------------------------------------------
-            if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
-                std::stringstream buf;
-                buf << "Thread ID[";
-                buf << boost::this_thread::get_id();
-                buf << "] up_thread_sorryserver_send";
-                buf << " send data size[";
-                buf << send_size;
-                buf << "] for [";
-                buf << sorry_endpoint;
-                buf << "]";
-                Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 39, buf.str(), __FILE__, __LINE__ );
+
+        struct epoll_event event;
+        event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+        epoll_ctl( up_realserver_epollfd, EPOLL_CTL_ADD, sorryserver_socket.second->get_socket().native(), &event );
+
+        while( true ){
+            {
+                rd_scoped_lock scoped_lock(exit_flag_update_mutex);
+                if(unlikely(exit_flag)){
+                    func_tag = UP_FUNC_EXIT;
+                    break;
+                }
             }
-            //----Debug log----------------------------------------------------------------------
-            if(data_size > send_data_size){
-                func_tag = UP_FUNC_SORRYSERVER_SEND;
-            }else{
-                protocol_module_base::EVENT_TAG module_event = protocol_module->handle_sorryserver_send(up_thread_id);
-                std::map< protocol_module_base::EVENT_TAG ,UP_THREAD_FUNC_TYPE_TAG >::iterator func_type = up_thread_module_event_map.find(module_event);
-                if(unlikely( func_type == up_thread_module_event_map.end() )){
-                    //Error unknown protocol_module_base::EVENT_TAG return
+
+            send_size = sorryserver_socket.second->write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
+
+            if(!ec){
+                send_data_size += send_size;
+                up_thread_data_dest_side.set_send_size(send_data_size);
+                //----Debug log----------------------------------------------------------------------
+                if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
                     std::stringstream buf;
                     buf << "Thread ID[";
                     buf << boost::this_thread::get_id();
-                    buf << "] protocol_module returnd illegal EVENT_TAG : ";
-                    buf << module_event;
-                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 49, buf.str(), __FILE__, __LINE__ );
-                    up_thread_exit(process_type);
-                    return;
+                    buf << "] up_thread_sorryserver_send";
+                    buf << " send data size[";
+                    buf << send_size;
+                    buf << "] for [";
+                    buf << sorry_endpoint;
+                    buf << "]";
+                    Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 39, buf.str(), __FILE__, __LINE__ );
                 }
-                func_tag = func_type->second;
-            }
-        }else{
-            if(ec == boost::asio::error::try_again){
-                func_tag = UP_FUNC_SORRYSERVER_SEND;
+                //----Debug log----------------------------------------------------------------------
+                if(data_size > send_data_size){
+                    func_tag = UP_FUNC_SORRYSERVER_SEND;
+                }else{
+                    protocol_module_base::EVENT_TAG module_event = protocol_module->handle_sorryserver_send(up_thread_id);
+                    std::map< protocol_module_base::EVENT_TAG ,UP_THREAD_FUNC_TYPE_TAG >::iterator func_type = up_thread_module_event_map.find(module_event);
+                    if(unlikely( func_type == up_thread_module_event_map.end() )){
+                        //Error unknown protocol_module_base::EVENT_TAG return
+                        std::stringstream buf;
+                        buf << "Thread ID[";
+                        buf << boost::this_thread::get_id();
+                        buf << "] protocol_module returnd illegal EVENT_TAG : ";
+                        buf << module_event;
+                        Logger::putLogError( LOG_CAT_L7VSD_SESSION, 49, buf.str(), __FILE__, __LINE__ );
+                        up_thread_exit(process_type);
+                        return;
+                    }
+                    func_tag = func_type->second;
+                }
+                break;
             }else{
-                func_tag = UP_FUNC_SORRYSERVER_DISCONNECT;
+                if(ec == boost::asio::error::try_again){
+                    //func_tag = UP_FUNC_SORRYSERVER_SEND;
+                    //boost::this_thread::yield();
+                }else{
+                    func_tag = UP_FUNC_SORRYSERVER_DISCONNECT;
+                    break;
+                }
             }
+
+            event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+            epoll_ctl( up_realserver_epollfd, EPOLL_CTL_MOD, sorryserver_socket.second->get_socket().native(), &event );
+
+            epoll_wait( up_realserver_epollfd, up_realserver_events, EVENT_NUM, EPOLL_TIMEOUT );
         }
+
+        event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+        epoll_ctl( up_realserver_epollfd, EPOLL_CTL_DEL, sorryserver_socket.second->get_socket().native(), &event );
+
         up_thread_function_pair    func    = up_thread_function_array[func_tag];
         if(unlikely( !func.second )){
             //Error not find function map
@@ -2591,54 +2697,82 @@ namespace l7vs{
         down_thread_data_dest_side.initialize();
         boost::array<char,MAX_BUFFER_SIZE>& data_buff = down_thread_data_dest_side.get_data();
         boost::system::error_code ec;
-        size_t recv_size = down_thread_current_receive_realserver_socket->second->read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE),ec);
-        boost::asio::ip::tcp::endpoint server_endpoint = down_thread_current_receive_realserver_socket->first;
-        down_thread_data_dest_side.set_endpoint(server_endpoint);
+        size_t recv_size;
         DOWN_THREAD_FUNC_TYPE_TAG func_tag;
-        if(!ec){
-            if(recv_size > 0){
-                //----Debug log----------------------------------------------------------------------
-                if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
-                    std::stringstream buf;
-                    buf << "Thread ID[";
-                    buf << boost::this_thread::get_id();
-                    buf << "] down_thread_realserver_receive";
-                    buf << " receive data size[";
-                    buf << recv_size;
-                    buf << "] from [";
-                    buf << server_endpoint;
-                    buf << "]";
-                    Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 43, buf.str(), __FILE__, __LINE__ );
-                }
-                //----Debug log----------------------------------------------------------------------
-                down_thread_data_dest_side.set_size(recv_size);
-                parent_service.update_down_recv_size(recv_size);
-                protocol_module_base::EVENT_TAG module_event = protocol_module->handle_realserver_recv(down_thread_id,server_endpoint,data_buff,recv_size);
-                std::map< protocol_module_base::EVENT_TAG ,DOWN_THREAD_FUNC_TYPE_TAG >::iterator func_type = down_thread_module_event_map.find(module_event);
-                if(unlikely( func_type == down_thread_module_event_map.end() )){
-                    //Error unknown protocol_module_base::EVENT_TAG return
-                    std::stringstream buf;
-                    buf << "Thread ID[";
-                    buf << boost::this_thread::get_id();
-                    buf << "] protocol_module returnd illegal EVENT_TAG : ";
-                    buf << module_event;    
-                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 70, buf.str(), __FILE__, __LINE__ );
-                    down_thread_exit(process_type);
-                    return;
+
+        struct epoll_event event;
+        event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+        epoll_ctl( down_realserver_epollfd, EPOLL_CTL_ADD, down_thread_current_receive_realserver_socket->second->get_socket().native(), &event );
+
+        while( true ){
+            {
+                rd_scoped_lock scoped_lock(exit_flag_update_mutex);
+                if(unlikely(exit_flag)){
+                    func_tag = DOWN_FUNC_EXIT;
+                    break;
                 }
-                func_tag = func_type->second;
-            }else{
-                func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
-                boost::this_thread::yield();
             }
-        }else{
-            if(ec == boost::asio::error::try_again){
-                func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
-                boost::this_thread::yield();
+
+            recv_size = down_thread_current_receive_realserver_socket->second->read_some(boost::asio::buffer(data_buff,MAX_BUFFER_SIZE),ec);
+
+            boost::asio::ip::tcp::endpoint server_endpoint = down_thread_current_receive_realserver_socket->first;
+            down_thread_data_dest_side.set_endpoint(server_endpoint);
+            if(!ec){
+                if(recv_size > 0){
+                    //----Debug log----------------------------------------------------------------------
+                    if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
+                        std::stringstream buf;
+                        buf << "Thread ID[";
+                        buf << boost::this_thread::get_id();
+                        buf << "] down_thread_realserver_receive";
+                        buf << " receive data size[";
+                        buf << recv_size;
+                        buf << "] from [";
+                        buf << server_endpoint;
+                        buf << "]";
+                        Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 43, buf.str(), __FILE__, __LINE__ );
+                   }
+                    //----Debug log----------------------------------------------------------------------
+                    down_thread_data_dest_side.set_size(recv_size);
+                    parent_service.update_down_recv_size(recv_size);
+                    protocol_module_base::EVENT_TAG module_event = protocol_module->handle_realserver_recv(down_thread_id,server_endpoint,data_buff,recv_size);
+                    std::map< protocol_module_base::EVENT_TAG ,DOWN_THREAD_FUNC_TYPE_TAG >::iterator func_type = down_thread_module_event_map.find(module_event);
+                    if(unlikely( func_type == down_thread_module_event_map.end() )){
+                        //Error unknown protocol_module_base::EVENT_TAG return
+                        std::stringstream buf;
+                        buf << "Thread ID[";
+                        buf << boost::this_thread::get_id();
+                        buf << "] protocol_module returnd illegal EVENT_TAG : ";
+                        buf << module_event;    
+                        Logger::putLogError( LOG_CAT_L7VSD_SESSION, 70, buf.str(), __FILE__, __LINE__ );
+                        down_thread_exit(process_type);
+                        return;
+                    }
+                    func_tag = func_type->second;
+                    break;
+                }else{
+                    //func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
+                    //boost::this_thread::yield();
+                }
             }else{
-                func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
+                if(ec == boost::asio::error::try_again){
+                    //func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
+                    //boost::this_thread::yield();
+                }else{
+                    func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
+                    break;
+                }
             }
+
+            event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+            epoll_ctl( down_realserver_epollfd, EPOLL_CTL_MOD, down_thread_current_receive_realserver_socket->second->get_socket().native(), &event );
+
+            epoll_wait( down_realserver_epollfd, down_realserver_events, EVENT_NUM, EPOLL_TIMEOUT );
         }
+
+        event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+        epoll_ctl( down_realserver_epollfd, EPOLL_CTL_DEL, down_thread_current_receive_realserver_socket->second->get_socket().native(), &event );
+
         down_thread_function_pair    func    = down_thread_function_array[func_tag];
         if(unlikely( !func.second ) ){
             //Error not find function map
@@ -2890,67 +3024,107 @@ namespace l7vs{
     //! down thread send for client and raise module event of handle_client_send
     //! @param[in]        process_type is prosecess type
     void tcp_session::down_thread_client_send(const TCP_PROCESS_TYPE_TAG process_type){
-        
         boost::system::error_code ec;
         boost::array<char,MAX_BUFFER_SIZE>& data_buff = down_thread_data_client_side.get_data();
         std::size_t data_size = down_thread_data_client_side.get_size();
         std::size_t send_data_size = down_thread_data_client_side.get_send_size();
         std::size_t send_size;
-        if (!ssl_flag) {
-            send_size = client_socket.write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
+        DOWN_THREAD_FUNC_TYPE_TAG func_tag;
+
+        struct epoll_event event;
+        event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+        if ( !ssl_flag ) {
+            epoll_ctl( down_client_epollfd, EPOLL_CTL_ADD, client_socket.get_socket().native(), &event );
         } else {
-            send_size = client_ssl_socket.write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
+            epoll_ctl( down_client_epollfd, EPOLL_CTL_ADD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
         }
-        DOWN_THREAD_FUNC_TYPE_TAG func_tag;
-        if(!ec){
-            send_data_size += send_size;
-            down_thread_data_client_side.set_send_size(send_data_size);
-            parent_service.update_down_send_size(send_size);
-            //----Debug log----------------------------------------------------------------------
-            if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
-                boost::asio::ip::tcp::endpoint client_endpoint;
-                if (!ssl_flag) {
-                    client_endpoint = client_socket.get_socket().lowest_layer().remote_endpoint(ec);
-                } else {
-                    client_endpoint = client_ssl_socket.get_socket().lowest_layer().remote_endpoint(ec);
+
+        while( true ){
+            {
+                rd_scoped_lock scoped_lock(exit_flag_update_mutex);
+                if(unlikely(exit_flag)){
+                    func_tag = DOWN_FUNC_EXIT;
+                    break;
                 }
-                std::stringstream buf;
-                buf << "Thread ID[";
-                buf << boost::this_thread::get_id();
-                buf << "] down_thread_client_send";
-                buf << " send data size[";
-                buf << send_size;
-                buf << "] for [";
-                buf << client_endpoint;
-                buf << "]";
-                Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 45, buf.str(), __FILE__, __LINE__ );
             }
-            //----Debug log----------------------------------------------------------------------
-            if(data_size > send_data_size){
-                func_tag = DOWN_FUNC_CLIENT_SEND;
-            }else{
-                protocol_module_base::EVENT_TAG module_event = protocol_module->handle_client_send(down_thread_id);
-                std::map< protocol_module_base::EVENT_TAG ,DOWN_THREAD_FUNC_TYPE_TAG >::iterator func_type = down_thread_module_event_map.find(module_event);
-                if(unlikely( func_type == down_thread_module_event_map.end() ) ){
-                    //Error unknown protocol_module_base::EVENT_TAG return
+
+            if (!ssl_flag) {
+                send_size = client_socket.write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
+            } else {
+                send_size = client_ssl_socket.write_some(boost::asio::buffer(data_buff.data()+send_data_size,data_size-send_data_size),ec);
+            }
+
+            if(!ec){
+                send_data_size += send_size;
+                down_thread_data_client_side.set_send_size(send_data_size);
+                parent_service.update_down_send_size(send_size);
+                //----Debug log----------------------------------------------------------------------
+                if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))){
+                    boost::asio::ip::tcp::endpoint client_endpoint;
+                    if (!ssl_flag) {
+                        client_endpoint = client_socket.get_socket().lowest_layer().remote_endpoint(ec);
+                    } else {
+                        client_endpoint = client_ssl_socket.get_socket().lowest_layer().remote_endpoint(ec);
+                    }
                     std::stringstream buf;
                     buf << "Thread ID[";
                     buf << boost::this_thread::get_id();
-                    buf << "] protocol_module returnd illegal EVENT_TAG : ";
-                    buf << module_event;
-                    Logger::putLogError( LOG_CAT_L7VSD_SESSION, 82, buf.str(), __FILE__, __LINE__ );
-                    down_thread_exit(process_type);
-                    return;
+                    buf << "] down_thread_client_send";
+                    buf << " send data size[";
+                    buf << send_size;
+                    buf << "] for [";
+                    buf << client_endpoint;
+                    buf << "]";
+                    Logger::putLogDebug( LOG_CAT_L7VSD_SESSION, 45, buf.str(), __FILE__, __LINE__ );
                 }
-                func_tag = func_type->second;
-            }
-        }else{
-            if(ec == boost::asio::error::try_again){
-                func_tag = DOWN_FUNC_CLIENT_SEND;
+                //----Debug log----------------------------------------------------------------------
+                if(data_size > send_data_size){
+                    func_tag = DOWN_FUNC_CLIENT_SEND;
+                    //down_send_wait.reset();
+                }else{
+                    protocol_module_base::EVENT_TAG module_event = protocol_module->handle_client_send(down_thread_id);
+                    std::map< protocol_module_base::EVENT_TAG ,DOWN_THREAD_FUNC_TYPE_TAG >::iterator func_type = down_thread_module_event_map.find(module_event);
+                    if(unlikely( func_type == down_thread_module_event_map.end() ) ){
+                        //Error unknown protocol_module_base::EVENT_TAG return
+                        std::stringstream buf;
+                        buf << "Thread ID[";
+                        buf << boost::this_thread::get_id();
+                        buf << "] protocol_module returnd illegal EVENT_TAG : ";
+                        buf << module_event;
+                        Logger::putLogError( LOG_CAT_L7VSD_SESSION, 82, buf.str(), __FILE__, __LINE__ );
+                        down_thread_exit(process_type);
+                        return;
+                    }
+                    func_tag = func_type->second;
+                }
+                break;
             }else{
-                func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
+                if(ec == boost::asio::error::try_again){
+                    //func_tag = DOWN_FUNC_CLIENT_SEND;
+                    //boost::this_thread::yield();
+                }else{
+                    func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
+                    break;
+                }
             }
+
+            event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+            if ( !ssl_flag ) {
+                epoll_ctl( down_client_epollfd, EPOLL_CTL_MOD, client_socket.get_socket().native(), &event );
+            } else {
+                epoll_ctl( down_client_epollfd, EPOLL_CTL_MOD, client_ssl_socket.get_socket().lowest_layer().native(), &event );
+            }
+
+            epoll_wait( down_client_epollfd, up_client_events, EVENT_NUM, EPOLL_TIMEOUT );
         }
+
+        event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
+        if ( !ssl_flag ) {
+            epoll_ctl( down_client_epollfd, EPOLL_CTL_DEL, client_socket.get_socket().native(), &event );
+        } else {
+            epoll_ctl( down_client_epollfd, EPOLL_CTL_DEL, client_ssl_socket.get_socket().lowest_layer().native(), &event );
+        }
+
         down_thread_function_pair    func    = down_thread_function_array[func_tag];
         if(unlikely( !func.second )){
             //Error not find function map