OSDN Git Service

High Speed Version!
author0809216 <0809216@1ed66053-1c2d-0410-8867-f7571e6e31d3>
Tue, 9 Nov 2010 08:41:14 +0000 (08:41 +0000)
committer0809216 <0809216@1ed66053-1c2d-0410-8867-f7571e6e31d3>
Tue, 9 Nov 2010 08:41:14 +0000 (08:41 +0000)
git-svn-id: http://10.144.169.20/repos/um/branches/l7vsd-3.x-ramiel-epoll-cond@10428 1ed66053-1c2d-0410-8867-f7571e6e31d3

l7vsd/src/tcp_session.cpp

index 7c662b1..f9b5c56 100644 (file)
@@ -284,7 +284,7 @@ tcp_session::tcp_session(
         down_client_epollfd_registered = false;
         down_realserver_epollfd_registered = false;
         down_sorryserver_epollfd_registered = false;
-        is_epoll_edge_trigger = true;
+        is_epoll_edge_trigger = false;
         epoll_timeout = 50;
 }
 
@@ -817,9 +817,9 @@ void tcp_session::up_thread_run()
 
         {
                 boost::mutex::scoped_lock lock(downthread_status_mutex);
-                if (downthread_status < DOWNTHREAD_ALIVE){
+                if (downthread_status < DOWNTHREAD_ALIVE) {
                         to_time(LOCKTIMEOUT, xt);
-                        downthread_status_cond.timed_wait( lock , xt );
+                        downthread_status_cond.timed_wait(lock, xt);
                }
         }
 
@@ -841,10 +841,7 @@ void tcp_session::up_thread_run()
                         buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
                         buf << "cannot get client socket";
                         Logger::putLogError(LOG_CAT_L7VSD_SESSION, 9, buf.str(), __FILE__, __LINE__);
-                        {
-                                rw_scoped_lock scoped_lock(exit_flag_update_mutex);
-                                __sync_bool_compare_and_swap(&exit_flag, 0, 1);
-                        }
+                        __sync_bool_compare_and_swap(&exit_flag, 0, 1);
                 }
         }
 
@@ -863,10 +860,7 @@ void tcp_session::up_thread_run()
                         buf << "cannot get client endpoint: ";
                         buf << ec.message();
                         Logger::putLogError(LOG_CAT_L7VSD_SESSION, 10, buf.str(), __FILE__, __LINE__);
-                        {
-                                rw_scoped_lock scoped_lock(exit_flag_update_mutex);
-                                __sync_bool_compare_and_swap(&exit_flag, 0, 1);
-                        }
+                        __sync_bool_compare_and_swap(&exit_flag, 0, 1);
                 }
         }
         if (likely(!exit_flag)) {
@@ -879,10 +873,7 @@ void tcp_session::up_thread_run()
                         buf << "cannot set socket to non-blocking mode:";
                         buf << ec.message();
                         Logger::putLogError(LOG_CAT_L7VSD_SESSION, 11, buf.str(), __FILE__, __LINE__);
-                        {
-                                rw_scoped_lock scoped_lock(exit_flag_update_mutex);
-                                __sync_bool_compare_and_swap(&exit_flag, 0, 1);
-                        }
+                        __sync_bool_compare_and_swap(&exit_flag, 0, 1);
                 }
         }
         if (likely(!exit_flag)) {
@@ -902,10 +893,7 @@ void tcp_session::up_thread_run()
                                 buf << "cannot set client socket receive buffer size: ";
                                 buf << ec.message();
                                 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 12, buf.str(), __FILE__, __LINE__);
-                                {
-                                        rw_scoped_lock scoped_lock(exit_flag_update_mutex);
-                                        __sync_bool_compare_and_swap(&exit_flag, 0, 1);
-                                }
+                                __sync_bool_compare_and_swap(&exit_flag, 0, 1);
                         }
                 }
         }
@@ -926,31 +914,25 @@ void tcp_session::up_thread_run()
                                 buf << "cannot set client socket send buffer size: ";
                                 buf << ec.message();
                                 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 13, buf.str(), __FILE__, __LINE__);
-                                {
-                                        rw_scoped_lock scoped_lock(exit_flag_update_mutex);
-                                        __sync_bool_compare_and_swap(&exit_flag, 0, 1);
-                                }
+                                __sync_bool_compare_and_swap(&exit_flag, 0, 1);
                         }
                 }
         }
-
-        boost::asio::ip::udp::endpoint dummy_end;
-        protocol_module_base::EVENT_TAG module_event;
-        std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type;
-        up_thread_function_pair func;
-
         if (likely(!exit_flag)) {
+                boost::asio::ip::udp::endpoint dummy_end;
+                protocol_module_base::EVENT_TAG module_event;
+                std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type;
+                up_thread_function_pair func;
                 module_event = protocol_module->handle_session_initialize(up_thread_id, down_thread_id, client_endpoint, dummy_end);
                 func_type = up_thread_module_event_map.find(module_event);
                 func = up_thread_function_array[func_type->second];
                 up_thread_next_call_function = func;
         }
-        {
+        if (likely(!exit_flag)) {
                 boost::mutex::scoped_lock lock(upthread_status_mutex);
                 upthread_status = UPTHREAD_ACTIVE;
+                upthread_status_cond.notify_one();
         }
-        upthread_status_cond.notify_one();
-
         //----Debug log----------------------------------------------------------------------
         if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
                 std::stringstream buf;
@@ -961,11 +943,10 @@ void tcp_session::up_thread_run()
         //----Debug log----------------------------------------------------------------------
         while (!exit_flag) {
                {
-                       boost::mutex::scoped_lock lock( upthread_status_mutex );
-                       if ( unlikely( upthread_status == UPTHREAD_LOCK ) ) {
+                       boost::mutex::scoped_lock lock(upthread_status_mutex);
+                       if ( unlikely(upthread_status == UPTHREAD_LOCK) ) {
                                to_time(LOCKTIMEOUT, xt);
-                               upthread_status_cond.timed_wait( lock , xt );
-                               //upthread_status_cond.wait( lock );
+                               upthread_status_cond.timed_wait(lock, xt);
                                upthread_status = UPTHREAD_ACTIVE;
                        }
                }
@@ -1009,7 +990,7 @@ void tcp_session::up_thread_run()
         //----Debug log----------------------------------------------------------------------
         {
                 boost::mutex::scoped_lock lock(downthread_status_mutex);
-                if (downthread_status != DOWNTHREAD_SLEEP) {
+                while (downthread_status != DOWNTHREAD_SLEEP) {
                         if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
                                 boost::format formatter("Thread ID[%s] down thread finalize wait");
                                 formatter % boost::this_thread::get_id();
@@ -1028,8 +1009,9 @@ void tcp_session::up_thread_run()
                 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 23, buf.str(), __FILE__, __LINE__);
         }
         //----Debug log----------------------------------------------------------------------
-        if (likely(protocol_module != NULL))
+        if (likely(protocol_module != NULL)) {
                 protocol_module->handle_session_finalize(up_thread_id, down_thread_id);
+        }
         //----Debug log----------------------------------------------------------------------
         if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
                 std::stringstream buf;
@@ -1042,11 +1024,8 @@ void tcp_session::up_thread_run()
                 boost::mutex::scoped_lock lock(upthread_status_mutex);
                 upthread_status = UPTHREAD_SLEEP;
         }
-        {
-                rw_scoped_lock scoped_lock(exit_flag_update_mutex);
-                if( !__sync_bool_compare_and_swap( &exit_flag, 1, 2 ) ){
-                        parent_service.release_session(this);
-                }
+        if ( !__sync_bool_compare_and_swap(&exit_flag, 1, 2) ) {
+                parent_service.release_session(this);
         }
 
         //----Debug log----------------------------------------------------------------------
@@ -1116,11 +1095,10 @@ void tcp_session::down_thread_run()
         //----Debug log----------------------------------------------------------------------
 
         while (!exit_flag) {
-                if ( unlikely ( downthread_status == DOWNTHREAD_LOCK ) ) {
+                if ( unlikely(downthread_status == DOWNTHREAD_LOCK) ) {
                         boost::mutex::scoped_lock lock(downthread_status_mutex);
                         to_time(LOCKTIMEOUT, xt);
-                        downthread_status_cond.timed_wait( lock , xt );
-                        //downthread_status_cond.wait( lock );
+                        downthread_status_cond.timed_wait(lock, xt);
                         downthread_status = DOWNTHREAD_ACTIVE;
                 }
                 while (unlikely(!down_thread_connect_socket_list.empty())) {
@@ -1172,14 +1150,9 @@ void tcp_session::down_thread_run()
                 formatter % boost::this_thread::get_id();
                 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 32, formatter.str(), __FILE__, __LINE__);
         }
-
-        {
-                rw_scoped_lock scoped_lock(exit_flag_update_mutex);
-                if( !__sync_bool_compare_and_swap( &exit_flag, 1, 2 ) ){
-                        parent_service.release_session(this);
-                }
+        if ( !__sync_bool_compare_and_swap(&exit_flag, 1, 2) ) {
+                parent_service.release_session(this);
         }
-
 }
 
 //! endpoint data to string information
@@ -1410,7 +1383,6 @@ void tcp_session::up_thread_client_receive(const TCP_PROCESS_TYPE_TAG process_ty
         } else {
                 event.events = EPOLLIN | EPOLLHUP;
         }
-        bool add_flag = false;
         if (!up_client_epollfd_registered) {
                 if (epoll_ctl(up_client_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
                         std::stringstream buf;
@@ -1421,18 +1393,6 @@ void tcp_session::up_thread_client_receive(const TCP_PROCESS_TYPE_TAG process_ty
                         return;
                 }
                 up_client_epollfd_registered = true;
-                add_flag = true;
-        }
-
-        if (is_epoll_edge_trigger && (!add_flag)) {
-                if (epoll_ctl(up_client_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
-                        std::stringstream buf;
-                        buf << "up_thread_client_receive: epoll_ctl EPOLL_CTL_MOD error: ";
-                        buf << strerror(errno);
-                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
-                        up_thread_next_call_function = up_thread_function_array[UP_FUNC_CLIENT_DISCONNECT];
-                        return;
-                }
         }
 
         int ret_fds = epoll_wait(up_client_epollfd, up_client_events, EVENT_NUM, epoll_timeout);
@@ -1663,7 +1623,6 @@ void tcp_session::up_thread_realserver_send(const TCP_PROCESS_TYPE_TAG process_t
         } else {
                 event.events = EPOLLOUT | EPOLLHUP;
         }
-        bool add_flag = false;
         if (!up_realserver_epollfd_registered) {
                 if (epoll_ctl(up_realserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
                         std::stringstream buf;
@@ -1674,26 +1633,6 @@ void tcp_session::up_thread_realserver_send(const TCP_PROCESS_TYPE_TAG process_t
                         return;
                 }
                 up_realserver_epollfd_registered = true;
-                add_flag = true;
-        }
-
-        {
-                rd_scoped_lock scoped_lock(exit_flag_update_mutex);
-                if (unlikely(exit_flag)) {
-                        up_thread_next_call_function = up_thread_function_array[UP_FUNC_EXIT];
-                        return;
-                }
-        }
-
-        if (is_epoll_edge_trigger && (!add_flag)) {
-                if (epoll_ctl(up_realserver_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
-                        std::stringstream buf;
-                        buf << "up_thread_realserver_send: epoll_ctl EPOLL_CTL_MOD error: ";
-                        buf << strerror(errno);
-                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
-                        boost::this_thread::yield();
-                        return;
-                }
         }
 
         int ret_fds = epoll_wait(up_realserver_epollfd, up_realserver_events, EVENT_NUM, epoll_timeout);
@@ -2152,7 +2091,6 @@ void tcp_session::up_thread_sorryserver_send(const TCP_PROCESS_TYPE_TAG process_
         } else {
                 event.events = EPOLLOUT | EPOLLHUP;
         }
-        bool add_flag = false;
         if (!up_sorryserver_epollfd_registered) {
                 if (epoll_ctl(up_sorryserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
                         std::stringstream buf;
@@ -2163,7 +2101,6 @@ void tcp_session::up_thread_sorryserver_send(const TCP_PROCESS_TYPE_TAG process_
                         return;
                 }
                 up_sorryserver_epollfd_registered = true;
-                add_flag = true;
         }
 
         while (true) {
@@ -2175,17 +2112,6 @@ void tcp_session::up_thread_sorryserver_send(const TCP_PROCESS_TYPE_TAG process_
                         }
                 }
 
-                if (is_epoll_edge_trigger && (!add_flag)) {
-                        if (epoll_ctl(up_sorryserver_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
-                                std::stringstream buf;
-                                buf << "up_thread_sorryserver_send: epoll_ctl EPOLL_CTL_MOD error: ";
-                                buf << strerror(errno);
-                                Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
-                                boost::this_thread::yield();
-                                return;
-                        }
-                }
-
                 int ret_fds = epoll_wait(up_sorryserver_epollfd, up_sorryserver_events, EVENT_NUM, epoll_timeout);
                 if (ret_fds <= 0) {
                         if (ret_fds == 0) {
@@ -2603,7 +2529,6 @@ void tcp_session::up_thread_exit(const TCP_PROCESS_TYPE_TAG process_type)
                 formatter % boost::this_thread::get_id();
                 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
         }
-        rw_scoped_lock scoped_lock(exit_flag_update_mutex);
         boost::mutex::scoped_lock down_thread_cond_lock(upthread_status_mutex);
         boost::mutex::scoped_lock realserver_status_lock(realserver_connect_mutex);
         upthread_status_cond.notify_one();
@@ -2702,7 +2627,6 @@ void tcp_session::down_thread_realserver_receive(const TCP_PROCESS_TYPE_TAG proc
         } else {
                 event.events = EPOLLIN | EPOLLHUP;
         }
-        bool add_flag = false;
         if (!down_realserver_epollfd_registered) {
                 if (epoll_ctl(down_realserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
                         std::stringstream buf;
@@ -2712,17 +2636,6 @@ void tcp_session::down_thread_realserver_receive(const TCP_PROCESS_TYPE_TAG proc
                         return;
                 }
                 down_realserver_epollfd_registered = true;
-                add_flag = true;
-        }
-
-        if (is_epoll_edge_trigger && (!add_flag)) {
-                if (epoll_ctl(down_realserver_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
-                        std::stringstream buf;
-                        buf << "down_thread_realserver_receive: epoll_ctl EPOLL_CTL_MOD error: " << strerror(errno);
-                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
-                        boost::this_thread::yield();
-                        return;
-                }
         }
 
         int ret_fds = epoll_wait(down_realserver_epollfd, down_realserver_events, EVENT_NUM, epoll_timeout);
@@ -3016,7 +2929,6 @@ void tcp_session::down_thread_client_send(const TCP_PROCESS_TYPE_TAG process_typ
         } else {
                 event.events = EPOLLOUT | EPOLLHUP;
         }
-        bool add_flag = false;
         if (!down_client_epollfd_registered) {
                 if (epoll_ctl(down_client_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
                         std::stringstream buf;
@@ -3027,7 +2939,6 @@ void tcp_session::down_thread_client_send(const TCP_PROCESS_TYPE_TAG process_typ
                         return;
                 }
                 down_client_epollfd_registered = true;
-                add_flag = true;
         }
 
         while (true) {
@@ -3039,17 +2950,6 @@ void tcp_session::down_thread_client_send(const TCP_PROCESS_TYPE_TAG process_typ
                         }
                 }
 
-                if (is_epoll_edge_trigger && (!add_flag)) {
-                        if (epoll_ctl(down_client_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
-                                std::stringstream buf;
-                                buf << "down_thread_client_send: epoll_ctl EPOLL_CTL_MOD error: ";
-                                buf << strerror(errno);
-                                Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
-                                boost::this_thread::yield();
-                                return;
-                        }
-                }
-
                 int ret_fds = epoll_wait(down_client_epollfd, down_client_events, EVENT_NUM, epoll_timeout);
                 if (ret_fds <= 0) {
                         if (ret_fds == 0) {
@@ -3216,7 +3116,6 @@ void tcp_session::down_thread_sorryserver_receive(const TCP_PROCESS_TYPE_TAG pro
 
         struct epoll_event event;
         event.data.fd = sorryserver_socket.second->get_socket().native();
-        bool add_flag = false;
         if (!down_sorryserver_epollfd_registered) {
                 if (is_epoll_edge_trigger) {
                         event.events = EPOLLIN | EPOLLHUP | EPOLLET;
@@ -3232,26 +3131,6 @@ void tcp_session::down_thread_sorryserver_receive(const TCP_PROCESS_TYPE_TAG pro
                         return;
                 }
                 down_sorryserver_epollfd_registered = true;
-                add_flag = true;
-        }
-
-        {
-                rd_scoped_lock scoped_lock(exit_flag_update_mutex);
-                if (unlikely(exit_flag)) {
-                        func_tag = DOWN_FUNC_EXIT;
-                        goto down_thread_sorryserver_receive_out;
-                }
-        }
-
-        if (is_epoll_edge_trigger && (!add_flag)) {
-                if (epoll_ctl(down_sorryserver_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
-                        std::stringstream buf;
-                        buf << "down_thread_sorryserver_receive: epoll_ctl EPOLL_CTL_MOD error: ";
-                        buf << strerror(errno);
-                        Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
-                        boost::this_thread::yield();
-                        return;
-                }
         }
 
         ret_fds = epoll_wait(down_sorryserver_epollfd, down_sorryserver_events, EVENT_NUM, epoll_timeout);
@@ -3489,7 +3368,6 @@ void tcp_session::down_thread_exit(const TCP_PROCESS_TYPE_TAG process_type)
                 formatter % boost::this_thread::get_id();
                 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
         }
-        rw_scoped_lock scoped_lock(exit_flag_update_mutex);
         boost::mutex::scoped_lock status_scoped_lock(downthread_status_mutex);
         downthread_status_cond.notify_one();
         __sync_bool_compare_and_swap(&exit_flag, 0, 1);