2 * @file tcp_session.cpp
3 * @brief tcp session class
5 * L7VSD: Linux Virtual Server for Layer7 Load Balancing
6 * Copyright (C) 2009 NTT COMWARE Corporation.
8 * This program is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
23 **********************************************************************/
24 #include <boost/format.hpp>
26 #include "tcp_session.h"
27 #include "tcp_thread_message.h"
28 #include "virtualservice.h"
30 #include "logger_implement_access.h"
31 #include "parameter.h"
38 //! @param[in/out] vs is parent virtualservice object
39 //! @param[in/out] session_io is session use io service object
40 //! @param[in] set_option is session use set socket option info
41 //! @param[in] listen_endpoint is virtualservice listen endpoint
42 //! @param[in] ssl_mode is session use SSL flag
43 //! @param[in] set_ssl_context is session use SSL context object
44 //! @param[in] set_ssl_cache_flag is session use SSL session cache
45 //! @param[in] set_ssl_handshake_time_out is session use SSL handshake timeout
46 //! @param[in] set_access_logger is session use access logger
47 tcp_session::tcp_session(
48 virtualservice_tcp &vs,
49 boost::asio::io_service &session_io,
50 const tcp_socket_option_info set_option,
51 const endpoint listen_endpoint,
53 boost::asio::ssl::context &set_ssl_context,
54 const bool set_ssl_cache_flag,
55 const int set_ssl_handshake_time_out,
56 logger_implement_access *set_access_logger)
61 upthread_status(UPTHREAD_SLEEP),
62 downthread_status(DOWNTHREAD_SLEEP),
63 realserver_connect_status(false),
64 protocol_module(NULL),
65 client_socket(session_io, set_option),
66 upstream_buffer_size(-1),
67 downstream_buffer_size(-1),
68 virtualservice_endpoint(listen_endpoint),
69 access_log_flag(false),
70 access_logger(set_access_logger),
72 client_ssl_socket(session_io, set_ssl_context, set_option),
73 ssl_context(set_ssl_context),
74 ssl_cache_flag(set_ssl_cache_flag),
75 ssl_handshake_timer_flag(false),
76 ssl_handshake_time_out(set_ssl_handshake_time_out),
77 ssl_handshake_time_out_flag(false),
78 socket_opt_info(set_option)
81 tcp_socket_ptr sorry_socket(new tcp_socket(session_io, socket_opt_info));
82 sorryserver_socket.second = sorry_socket;
84 // set up_thread_module_event_map
85 std::pair<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG> add_up_thread_event;
86 add_up_thread_event.first = protocol_module_base::ACCEPT;
87 add_up_thread_event.second = UP_FUNC_CLIENT_ACCEPT;
88 up_thread_module_event_map.insert(add_up_thread_event);
89 add_up_thread_event.first = protocol_module_base::CLIENT_DISCONNECT;
90 add_up_thread_event.second = UP_FUNC_CLIENT_DISCONNECT;
91 up_thread_module_event_map.insert(add_up_thread_event);
92 add_up_thread_event.first = protocol_module_base::CLIENT_RECV;
93 add_up_thread_event.second = UP_FUNC_CLIENT_RECEIVE;
94 up_thread_module_event_map.insert(add_up_thread_event);
95 add_up_thread_event.first = protocol_module_base::CLIENT_RESPONSE_SEND;
96 add_up_thread_event.second = UP_FUNC_CLIENT_RESPOND_SEND;
97 up_thread_module_event_map.insert(add_up_thread_event);
98 add_up_thread_event.first = protocol_module_base::REALSERVER_SELECT;
99 add_up_thread_event.second = UP_FUNC_REALSERVER_GET_DEST_EVENT;
100 up_thread_module_event_map.insert(add_up_thread_event);
101 add_up_thread_event.first = protocol_module_base::REALSERVER_CONNECT;
102 add_up_thread_event.second = UP_FUNC_REALSERVER_CONNECT;
103 up_thread_module_event_map.insert(add_up_thread_event);
104 add_up_thread_event.first = protocol_module_base::REALSERVER_SEND;
105 add_up_thread_event.second = UP_FUNC_REALSERVER_SEND;
106 up_thread_module_event_map.insert(add_up_thread_event);
107 add_up_thread_event.first = protocol_module_base::REALSERVER_DISCONNECT;
108 add_up_thread_event.second = UP_FUNC_REALSERVER_ALL_DISCONNECT;
109 up_thread_module_event_map.insert(add_up_thread_event);
110 add_up_thread_event.first = protocol_module_base::SORRYSERVER_SELECT;
111 add_up_thread_event.second = UP_FUNC_SORRYSERVER_GET_DEST;
112 up_thread_module_event_map.insert(add_up_thread_event);
113 add_up_thread_event.first = protocol_module_base::SORRYSERVER_CONNECT;
114 add_up_thread_event.second = UP_FUNC_SORRYSERVER_CONNECT;
115 up_thread_module_event_map.insert(add_up_thread_event);
116 add_up_thread_event.first = protocol_module_base::SORRYSERVER_SEND;
117 add_up_thread_event.second = UP_FUNC_SORRYSERVER_SEND;
118 up_thread_module_event_map.insert(add_up_thread_event);
119 add_up_thread_event.first = protocol_module_base::SORRYSERVER_DISCONNECT;
120 add_up_thread_event.second = UP_FUNC_SORRYSERVER_MOD_DISCONNECT;
121 up_thread_module_event_map.insert(add_up_thread_event);
122 add_up_thread_event.first = protocol_module_base::FINALIZE;
123 add_up_thread_event.second = UP_FUNC_EXIT;
124 up_thread_module_event_map.insert(add_up_thread_event);
126 // set up_thread_function_array
129 up_thread_function_array[UP_FUNC_CLIENT_ACCEPT] =
130 std::make_pair(UP_FUNC_CLIENT_ACCEPT, boost::bind(&tcp_session::up_thread_client_accept, this, _1));
131 up_thread_function_array[UP_FUNC_CLIENT_ACCEPT_EVENT] =
132 std::make_pair(UP_FUNC_CLIENT_ACCEPT_EVENT, boost::bind(&tcp_session::up_thread_client_accept_event, this, _1));
133 up_thread_function_array[UP_FUNC_CLIENT_DISCONNECT] =
134 std::make_pair(UP_FUNC_CLIENT_DISCONNECT, boost::bind(&tcp_session::up_thread_client_disconnect, this, _1));
135 up_thread_function_array[UP_FUNC_CLIENT_DISCONNECT_EVENT] =
136 std::make_pair(UP_FUNC_CLIENT_DISCONNECT_EVENT, boost::bind(&tcp_session::up_thread_client_disconnect_event, this, _1));
137 up_thread_function_array[UP_FUNC_CLIENT_RECEIVE] =
138 std::make_pair(UP_FUNC_CLIENT_RECEIVE, boost::bind(&tcp_session::up_thread_client_receive, this, _1));
139 up_thread_function_array[UP_FUNC_CLIENT_RESPOND_SEND] =
140 std::make_pair(UP_FUNC_CLIENT_RESPOND_SEND, boost::bind(&tcp_session::up_thread_client_respond, this, _1));
141 up_thread_function_array[UP_FUNC_CLIENT_RESPOND_SEND_EVENT] =
142 std::make_pair(UP_FUNC_CLIENT_RESPOND_SEND_EVENT, boost::bind(&tcp_session::up_thread_client_respond_event, this, _1));
143 up_thread_function_array[UP_FUNC_REALSERVER_GET_DEST_EVENT] =
144 std::make_pair(UP_FUNC_REALSERVER_GET_DEST_EVENT, boost::bind(&tcp_session::up_thread_realserver_get_destination_event, this, _1));
145 up_thread_function_array[UP_FUNC_REALSERVER_CONNECT] =
146 std::make_pair(UP_FUNC_REALSERVER_CONNECT, boost::bind(&tcp_session::up_thread_realserver_connect, this, _1));
147 up_thread_function_array[UP_FUNC_REALSERVER_CONNECT_EVENT] =
148 std::make_pair(UP_FUNC_REALSERVER_CONNECT_EVENT, boost::bind(&tcp_session::up_thread_realserver_connect_event, this, _1));
149 up_thread_function_array[UP_FUNC_REALSERVER_CONNECT_FAIL_EVENT] =
150 std::make_pair(UP_FUNC_REALSERVER_CONNECT_FAIL_EVENT, boost::bind(&tcp_session::up_thread_realserver_connection_fail_event, this, _1));
151 up_thread_function_array[UP_FUNC_REALSERVER_SEND] =
152 std::make_pair(UP_FUNC_REALSERVER_SEND, boost::bind(&tcp_session::up_thread_realserver_send, this, _1));
153 up_thread_function_array[UP_FUNC_REALSERVER_DISCONNECT] =
154 std::make_pair(UP_FUNC_REALSERVER_DISCONNECT, boost::bind(&tcp_session::up_thread_realserver_disconnect, this, _1));
155 up_thread_function_array[UP_FUNC_REALSERVER_DISCONNECT_EVENT] =
156 std::make_pair(UP_FUNC_REALSERVER_DISCONNECT_EVENT, boost::bind(&tcp_session::up_thread_realserver_disconnect_event, this, _1));
157 up_thread_function_array[UP_FUNC_REALSERVER_ALL_DISCONNECT] =
158 std::make_pair(UP_FUNC_REALSERVER_ALL_DISCONNECT, boost::bind(&tcp_session::up_thread_all_realserver_disconnect, this, _1));
159 up_thread_function_array[UP_FUNC_SORRYSERVER_GET_DEST] =
160 std::make_pair(UP_FUNC_SORRYSERVER_GET_DEST, boost::bind(&tcp_session::up_thread_sorryserver_get_destination_event, this, _1));
161 up_thread_function_array[UP_FUNC_SORRYSERVER_CONNECT] =
162 std::make_pair(UP_FUNC_SORRYSERVER_CONNECT, boost::bind(&tcp_session::up_thread_sorryserver_connect, this, _1));
163 up_thread_function_array[UP_FUNC_SORRYSERVER_CONNECT_EVENT] =
164 std::make_pair(UP_FUNC_SORRYSERVER_CONNECT_EVENT, boost::bind(&tcp_session::up_thread_sorryserver_connect_event, this, _1));
165 up_thread_function_array[UP_FUNC_SORRYSERVER_CONNECT_FAIL_EVENT] =
166 std::make_pair(UP_FUNC_SORRYSERVER_CONNECT_FAIL_EVENT, boost::bind(&tcp_session::up_thread_sorryserver_connection_fail_event, this, _1));
167 up_thread_function_array[UP_FUNC_SORRYSERVER_SEND] =
168 std::make_pair(UP_FUNC_SORRYSERVER_SEND, boost::bind(&tcp_session::up_thread_sorryserver_send, this, _1));
169 up_thread_function_array[UP_FUNC_SORRYSERVER_DISCONNECT] =
170 std::make_pair(UP_FUNC_SORRYSERVER_DISCONNECT, boost::bind(&tcp_session::up_thread_sorryserver_disconnect, this, _1));
171 up_thread_function_array[UP_FUNC_SORRYSERVER_MOD_DISCONNECT] =
172 std::make_pair(UP_FUNC_SORRYSERVER_MOD_DISCONNECT, boost::bind(&tcp_session::up_thread_sorryserver_mod_disconnect, this, _1));
173 up_thread_function_array[UP_FUNC_SORRYSERVER_DISCONNECT_EVENT] =
174 std::make_pair(UP_FUNC_SORRYSERVER_DISCONNECT_EVENT, boost::bind(&tcp_session::up_thread_sorryserver_disconnect_event, this, _1));
175 up_thread_function_array[UP_FUNC_EXIT] =
176 std::make_pair(UP_FUNC_EXIT, boost::bind(&tcp_session::up_thread_exit, this, _1));
178 // set up_thread_message_down_thread_function_map
179 std::pair<DOWN_THREAD_FUNC_TYPE_TAG, tcp_session_func> add_up_thread_message_func;
180 add_up_thread_message_func.first = DOWN_FUNC_CLIENT_DISCONNECT_EVENT;
181 add_up_thread_message_func.second = boost::bind(&tcp_session::down_thread_client_disconnect_event, this, _1);
182 up_thread_message_down_thread_function_map.insert(add_up_thread_message_func);
183 add_up_thread_message_func.first = DOWN_FUNC_CLIENT_RESPOND_SEND_EVENT;
184 add_up_thread_message_func.second = boost::bind(&tcp_session::down_thread_client_respond_event, this, _1);
185 up_thread_message_down_thread_function_map.insert(add_up_thread_message_func);
186 add_up_thread_message_func.first = DOWN_FUNC_REALSERVER_DISCONNECT_EVENT;
187 add_up_thread_message_func.second = boost::bind(&tcp_session::down_thread_realserver_disconnect_event, this, _1);
188 up_thread_message_down_thread_function_map.insert(add_up_thread_message_func);
189 add_up_thread_message_func.first = DOWN_FUNC_SORRYSERVER_DISCONNECT_EVENT;
190 add_up_thread_message_func.second = boost::bind(&tcp_session::down_thread_sorryserver_disconnect_event, this, _1);
191 up_thread_message_down_thread_function_map.insert(add_up_thread_message_func);
192 add_up_thread_message_func.first = DOWN_FUNC_SORRY_ENABLE_EVENT;
193 add_up_thread_message_func.second = boost::bind(&tcp_session::down_thread_sorry_enable_event, this, _1);
194 up_thread_message_down_thread_function_map.insert(add_up_thread_message_func);
196 // set down_thread_module_event_map
197 std::pair<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG> add_down_thread_event;
198 add_down_thread_event.first = protocol_module_base::CLIENT_SEND;
199 add_down_thread_event.second = DOWN_FUNC_CLIENT_SEND;
200 down_thread_module_event_map.insert(add_down_thread_event);
201 add_down_thread_event.first = protocol_module_base::CLIENT_DISCONNECT;
202 add_down_thread_event.second = DOWN_FUNC_CLIENT_DISCONNECT;
203 down_thread_module_event_map.insert(add_down_thread_event);
204 add_down_thread_event.first = protocol_module_base::CLIENT_CONNECTION_CHECK;
205 add_down_thread_event.second = DOWN_FUNC_CLIENT_CONNECTION_CHK;
206 down_thread_module_event_map.insert(add_down_thread_event);
207 add_down_thread_event.first = protocol_module_base::REALSERVER_RECV;
208 add_down_thread_event.second = DOWN_FUNC_REALSERVER_RECEIVE;
209 down_thread_module_event_map.insert(add_down_thread_event);
210 add_down_thread_event.first = protocol_module_base::REALSERVER_DISCONNECT;
211 add_down_thread_event.second = DOWN_FUNC_REALSERVER_ALL_DISCONNECT;
212 down_thread_module_event_map.insert(add_down_thread_event);
213 add_down_thread_event.first = protocol_module_base::SORRYSERVER_RECV;
214 add_down_thread_event.second = DOWN_FUNC_SORRYSERVER_RECEIVE;
215 down_thread_module_event_map.insert(add_down_thread_event);
216 add_down_thread_event.first = protocol_module_base::SORRYSERVER_DISCONNECT;
217 add_down_thread_event.second = DOWN_FUNC_SORRYSERVER_MOD_DISCONNECT;
218 down_thread_module_event_map.insert(add_down_thread_event);
219 add_down_thread_event.first = protocol_module_base::FINALIZE;
220 add_down_thread_event.second = DOWN_FUNC_EXIT;
221 down_thread_module_event_map.insert(add_down_thread_event);
223 // set down_thread_function_array
224 down_thread_function_array[DOWN_FUNC_CLIENT_DISCONNECT] = std::make_pair(DOWN_FUNC_CLIENT_DISCONNECT, boost::bind(&tcp_session::down_thread_client_disconnect, this, _1));
225 down_thread_function_array[DOWN_FUNC_CLIENT_DISCONNECT_EVENT] = std::make_pair(DOWN_FUNC_CLIENT_DISCONNECT_EVENT, boost::bind(&tcp_session::down_thread_client_disconnect_event, this, _1));
226 down_thread_function_array[DOWN_FUNC_CLIENT_CONNECTION_CHK] = std::make_pair(DOWN_FUNC_CLIENT_CONNECTION_CHK, boost::bind(&tcp_session::down_thread_client_connection_chk_event, this, _1));
227 down_thread_function_array[DOWN_FUNC_CLIENT_SEND] = std::make_pair(DOWN_FUNC_CLIENT_SEND, boost::bind(&tcp_session::down_thread_client_send, this, _1));
228 down_thread_function_array[DOWN_FUNC_REALSERVER_RECEIVE] = std::make_pair(DOWN_FUNC_REALSERVER_RECEIVE, boost::bind(&tcp_session::down_thread_realserver_receive, this, _1));
229 down_thread_function_array[DOWN_FUNC_REALSERVER_DISCONNECT] = std::make_pair(DOWN_FUNC_REALSERVER_DISCONNECT, boost::bind(&tcp_session::down_thread_realserver_disconnect, this, _1));
230 down_thread_function_array[DOWN_FUNC_REALSERVER_DISCONNECT_EVENT] = std::make_pair(DOWN_FUNC_REALSERVER_DISCONNECT_EVENT, boost::bind(&tcp_session::down_thread_realserver_disconnect_event, this, _1));
231 down_thread_function_array[DOWN_FUNC_REALSERVER_ALL_DISCONNECT] = std::make_pair(DOWN_FUNC_REALSERVER_ALL_DISCONNECT, boost::bind(&tcp_session::down_thread_all_realserver_disconnect, this, _1));
232 down_thread_function_array[DOWN_FUNC_SORRYSERVER_RECEIVE] = std::make_pair(DOWN_FUNC_SORRYSERVER_RECEIVE, boost::bind(&tcp_session::down_thread_sorryserver_receive, this, _1));
233 down_thread_function_array[DOWN_FUNC_SORRYSERVER_DISCONNECT] = std::make_pair(DOWN_FUNC_SORRYSERVER_DISCONNECT, boost::bind(&tcp_session::down_thread_sorryserver_disconnect, this, _1));
234 down_thread_function_array[DOWN_FUNC_SORRYSERVER_MOD_DISCONNECT] = std::make_pair(DOWN_FUNC_SORRYSERVER_MOD_DISCONNECT, boost::bind(&tcp_session::down_thread_sorryserver_mod_disconnect, this, _1));
235 down_thread_function_array[DOWN_FUNC_SORRYSERVER_DISCONNECT_EVENT] = std::make_pair(DOWN_FUNC_SORRYSERVER_DISCONNECT_EVENT, boost::bind(&tcp_session::down_thread_sorryserver_disconnect_event, this, _1));
236 down_thread_function_array[DOWN_FUNC_EXIT] = std::make_pair(DOWN_FUNC_EXIT, boost::bind(&tcp_session::down_thread_exit, this, _1));
238 // set down_thread_message_up_thread_function_map
239 std::pair<UP_THREAD_FUNC_TYPE_TAG, tcp_session_func> add_down_thread_message_func;
240 add_down_thread_message_func.first = UP_FUNC_CLIENT_DISCONNECT_EVENT;
241 add_down_thread_message_func.second = boost::bind(&tcp_session::up_thread_client_disconnect_event, this, _1);
242 down_thread_message_up_thread_function_map.insert(add_down_thread_message_func);
243 add_down_thread_message_func.first = UP_FUNC_REALSERVER_DISCONNECT_EVENT;
244 add_down_thread_message_func.second = boost::bind(&tcp_session::up_thread_realserver_disconnect_event, this, _1);
245 down_thread_message_up_thread_function_map.insert(add_down_thread_message_func);
246 add_down_thread_message_func.first = UP_FUNC_SORRYSERVER_DISCONNECT_EVENT;
247 add_down_thread_message_func.second = boost::bind(&tcp_session::up_thread_sorryserver_disconnect_event, this, _1);
248 down_thread_message_up_thread_function_map.insert(add_down_thread_message_func);
250 // set virtual_service_message_up_thread_function_map
251 std::pair<TCP_VIRTUAL_SERVICE_MESSAGE_TAG, tcp_session_func> add_up_thread_vs_message_func;
252 add_up_thread_vs_message_func.first = SORRY_STATE_ENABLE;
253 add_up_thread_vs_message_func.second = boost::bind(&tcp_session::up_thread_sorry_enable_event, this, _1);
254 virtual_service_message_up_thread_function_map.insert(add_up_thread_vs_message_func);
255 add_up_thread_vs_message_func.first = SORRY_STATE_DISABLE;
256 add_up_thread_vs_message_func.second = boost::bind(&tcp_session::up_thread_sorry_disable_event, this, _1);
257 virtual_service_message_up_thread_function_map.insert(add_up_thread_vs_message_func);
258 add_up_thread_vs_message_func.first = SESSION_END;
259 add_up_thread_vs_message_func.second = boost::bind(&tcp_session::up_thread_exit, this, _1);
260 virtual_service_message_up_thread_function_map.insert(add_up_thread_vs_message_func);
262 // set virtual_service_message_down_thread_function_map
263 std::pair<TCP_VIRTUAL_SERVICE_MESSAGE_TAG, tcp_session_func> add_down_thread_vs_message_func;
264 add_down_thread_vs_message_func.first = SORRY_STATE_ENABLE;
265 add_down_thread_vs_message_func.second = boost::bind(&tcp_session::down_thread_sorry_enable_event, this, _1);
266 virtual_service_message_down_thread_function_map.insert(add_down_thread_vs_message_func);
267 add_down_thread_vs_message_func.first = SORRY_STATE_DISABLE;
268 add_down_thread_vs_message_func.second = boost::bind(&tcp_session::down_thread_sorry_disable_event, this, _1);
269 virtual_service_message_down_thread_function_map.insert(add_down_thread_vs_message_func);
270 add_down_thread_vs_message_func.first = SESSION_END;
271 add_down_thread_vs_message_func.second = boost::bind(&tcp_session::down_thread_exit, this, _1);
272 virtual_service_message_down_thread_function_map.insert(add_down_thread_vs_message_func);
275 up_client_epollfd = epoll_create(EVENT_NUM);
276 up_realserver_epollfd = epoll_create(EVENT_NUM);
277 up_sorryserver_epollfd = epoll_create(EVENT_NUM);
278 down_client_epollfd = epoll_create(EVENT_NUM);
279 down_realserver_epollfd = epoll_create(EVENT_NUM);
280 down_sorryserver_epollfd = epoll_create(EVENT_NUM);
281 up_client_epollfd_registered = false;
282 up_realserver_epollfd_registered = false;
283 up_sorryserver_epollfd_registered = false;
284 down_client_epollfd_registered = false;
285 down_realserver_epollfd_registered = false;
286 down_sorryserver_epollfd_registered = false;
287 is_epoll_edge_trigger = true;
292 tcp_session::~tcp_session()
294 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
295 boost::format formatter("Thread ID[%d] FUNC IN ~tcp_session");
296 formatter % boost::this_thread::get_id();
297 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
300 // up_thread_module_event_map
301 up_thread_module_event_map.clear();
302 // up_thread_message_down_thread_function_map
303 up_thread_message_down_thread_function_map.clear();
304 // down_thread_module_event_map
305 down_thread_module_event_map.clear();
306 // down_thread_message_down_thread_function_map
307 down_thread_message_up_thread_function_map.clear();
308 // virtual_service_message_up_thread_function_map
309 virtual_service_message_up_thread_function_map.clear();
310 // virtual_service_message_down_thread_function_map
311 virtual_service_message_down_thread_function_map.clear();
312 // up_thread_message_que
313 tcp_thread_message *msg;
315 msg = up_thread_message_que.pop();
322 // down_thread_message_que
324 msg = down_thread_message_que.pop();
331 close(up_client_epollfd);
332 close(up_realserver_epollfd);
333 close(up_sorryserver_epollfd);
334 close(down_client_epollfd);
335 close(down_realserver_epollfd);
336 close(down_sorryserver_epollfd);
340 session_result_message tcp_session::initialize()
342 session_result_message msg;
346 up_thread_id = boost::thread::id();
347 down_thread_id = boost::thread::id();
348 ssl_handshake_timer_flag = false;
349 ssl_handshake_time_out_flag = false;
350 upthread_status = UPTHREAD_SLEEP;
351 downthread_status = DOWNTHREAD_SLEEP;
352 realserver_connect_status = false;
353 protocol_module = NULL;
354 tcp_thread_message *tmp_msg;
356 tmp_msg = up_thread_message_que.pop();
364 tmp_msg = down_thread_message_que.pop();
373 l7vs::Parameter param;
374 l7vs::error_code vs_err;
377 int_val = param.get_int(PARAM_COMP_SESSION, PARAM_UP_BUFFER_SIZE, vs_err);
378 if (likely(!vs_err) && (int_val > 0)) {
379 upstream_buffer_size = int_val;
382 int_val = param.get_int(PARAM_COMP_SESSION, PARAM_DOWN_BUFFER_SIZE, vs_err);
383 if ((likely(!vs_err)) && (int_val > 0)) {
384 downstream_buffer_size = int_val;
387 int_val = param.get_int(PARAM_COMP_SESSION, PARAM_EPOLL_TRIGGER, vs_err);
388 if ((likely(!vs_err)) && (int_val >= 0)) {
390 is_epoll_edge_trigger = false;
392 is_epoll_edge_trigger = false;
396 int_val = param.get_int(PARAM_COMP_SESSION, PARAM_EPOLL_TIMEOUT, vs_err);
397 if ((likely(!vs_err)) && (int_val > 0)) {
398 epoll_timeout = int_val;
401 protocol_module = parent_service.get_protocol_module();
403 if (unlikely(protocol_module == NULL)) {
404 //Error protocol_module NULL
405 std::stringstream buf;
406 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
407 buf << "protocol_module is NULL!";
408 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 5, buf.str(), __FILE__, __LINE__);
410 msg.message = "Protocol module not found";
413 // Reset SSL structure to allow another connection.
415 if (client_ssl_socket.is_handshake_error()) {
417 client_ssl_socket.clear_socket();
419 if (ssl_cache_flag) {
420 if (unlikely(ssl_clear_keep_cache(client_ssl_socket.get_socket().impl()->ssl) == false)) {
421 //Error ssl_clear_keep_cache
422 std::stringstream buf;
424 buf << boost::this_thread::get_id();
425 buf << "] ssl_clear_keep_cache failed";
426 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 110, buf.str(), __FILE__, __LINE__);
428 msg.message = "ssl_clear_keep_cache failed";
430 //----Debug log----------------------------------------------------------------------
431 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
432 std::stringstream buf;
434 buf << boost::this_thread::get_id();
435 buf << "] ssl_clear_keep_cache ok";
436 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 81,
440 //----Debug log----------------------------------------------------------------------
443 if (unlikely(SSL_clear(client_ssl_socket.get_socket().impl()->ssl) == 0)) {
445 std::stringstream buf;
447 buf << boost::this_thread::get_id();
448 buf << "] SSL_clear failed";
449 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 111, buf.str(), __FILE__, __LINE__);
451 msg.message = "SSL_clear failed";
453 //----Debug log----------------------------------------------------------------------
454 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
455 std::stringstream buf;
457 buf << boost::this_thread::get_id();
458 buf << "] SSL_clear ok";
459 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 82,
463 //----Debug log----------------------------------------------------------------------
469 up_client_epollfd_registered = false;
470 up_realserver_epollfd_registered = false;
471 up_sorryserver_epollfd_registered = false;
472 down_client_epollfd_registered = false;
473 down_realserver_epollfd_registered = false;
474 down_sorryserver_epollfd_registered = false;
476 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
477 boost::format formatter("Thread ID[%d] FUNC OUT ~tcp_session");
478 formatter % boost::this_thread::get_id();
479 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
485 //! ssl clear keep cache
486 //! @param[in] ssl object
487 //! @return true is clear OK.
488 //! @return false is not clear
489 bool tcp_session::ssl_clear_keep_cache(SSL *clear_ssl)
491 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
492 boost::format formatter("Thread ID[%d] FUNC IN ssl_clear_keep_cache");
493 formatter % boost::this_thread::get_id();
494 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
497 if (clear_ssl->method == NULL) {
498 //----Debug log----------------------------------------------------------------------
499 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
500 std::stringstream buf;
502 buf << boost::this_thread::get_id();
503 buf << "] clear ssl method is NULL";
504 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 83,
508 //----Debug log----------------------------------------------------------------------
512 if (clear_ssl->new_session) {
513 //----Debug log----------------------------------------------------------------------
514 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
515 std::stringstream buf;
517 buf << boost::this_thread::get_id();
518 buf << "] clear ssl new_session is not NULL";
519 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 84,
523 //----Debug log----------------------------------------------------------------------
527 if (clear_ssl->session != NULL) {
528 //----Debug log----------------------------------------------------------------------
529 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
530 std::stringstream buf;
532 buf << boost::this_thread::get_id();
533 buf << "] SSL_SESSION_free call";
534 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 88,
538 //----Debug log----------------------------------------------------------------------
539 SSL_SESSION_free(clear_ssl->session);
543 clear_ssl->session = NULL;
545 clear_ssl->error = 0;
547 clear_ssl->shutdown = 0;
548 clear_ssl->version = clear_ssl->method->version;
549 clear_ssl->client_version = clear_ssl->version;
550 clear_ssl->rwstate = SSL_NOTHING;
551 clear_ssl->rstate = SSL_ST_READ_HEADER;
552 clear_ssl->state = SSL_ST_BEFORE | clear_ssl->server ? SSL_ST_ACCEPT : SSL_ST_CONNECT;
555 if (clear_ssl->init_buf != NULL) {
556 BUF_MEM_free(clear_ssl->init_buf);
557 clear_ssl->init_buf = NULL;
561 if (clear_ssl->enc_read_ctx != NULL) {
562 EVP_CIPHER_CTX_cleanup(clear_ssl->enc_read_ctx);
563 OPENSSL_free(clear_ssl->enc_read_ctx);
564 clear_ssl->enc_read_ctx = NULL;
567 // enc_write_ctx free
568 if (clear_ssl->enc_write_ctx != NULL) {
569 EVP_CIPHER_CTX_cleanup(clear_ssl->enc_write_ctx);
570 OPENSSL_free(clear_ssl->enc_write_ctx);
571 clear_ssl->enc_write_ctx = NULL;
575 if (clear_ssl->expand != NULL) {
576 COMP_CTX_free(clear_ssl->expand);
577 clear_ssl->expand = NULL;
581 if (clear_ssl->compress != NULL) {
582 COMP_CTX_free(clear_ssl->compress);
583 clear_ssl->compress = NULL;
586 clear_ssl->first_packet = 0;
588 if (!clear_ssl->in_handshake && (clear_ssl->method != clear_ssl->ctx->method)) {
589 //----Debug log----------------------------------------------------------------------
590 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
591 std::stringstream buf;
593 buf << boost::this_thread::get_id();
594 buf << "] In ssl_clear_keep_cache() method changed";
595 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 86,
599 //----Debug log----------------------------------------------------------------------
600 clear_ssl->method->ssl_free(clear_ssl);
601 clear_ssl->method = clear_ssl->ctx->method;
602 if (!clear_ssl->method->ssl_new(clear_ssl)) {
603 //----Debug log----------------------------------------------------------------------
604 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
605 std::stringstream buf;
607 buf << boost::this_thread::get_id();
608 buf << "] clear ssl method ssl_new error";
609 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 85,
613 //----Debug log----------------------------------------------------------------------
617 clear_ssl->method->ssl_clear(clear_ssl);
620 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
621 boost::format formatter("Thread ID[%d] FUNC OUT ssl_clear_keep_cache");
622 formatter % boost::this_thread::get_id();
623 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
629 //! get reference client side socket
630 //! @return reference client side socket
631 boost::asio::ip::tcp::socket &tcp_session::get_client_socket()
633 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
634 boost::format formatter("Thread ID[%d] FUNC IN/OUT get_client_socket");
635 formatter % boost::this_thread::get_id();
636 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
639 return client_socket.get_socket();
641 //! get reference client side ssl socket
642 //! @return reference client side ssl socket
643 ssl_socket &tcp_session::get_client_ssl_socket()
645 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
646 boost::format formatter("Thread ID[%d] FUNC IN/OUT get_client_ssl_socket");
647 formatter % boost::this_thread::get_id();
648 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
651 return client_ssl_socket.get_socket();
653 //! message from parent virtualservice
654 //! @param[in] message is tcp virtualservice message type
655 void tcp_session::set_virtual_service_message(const TCP_VIRTUAL_SERVICE_MESSAGE_TAG message)
657 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
658 boost::format formatter("Thread ID[%d] FUNC IN set_virtual_service_message");
659 formatter % boost::this_thread::get_id();
660 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
664 case SESSION_PAUSE_ON: {
666 boost::mutex::scoped_lock up_lock(upthread_status_mutex);
667 upthread_status = UPTHREAD_LOCK;
670 boost::mutex::scoped_lock down_lock(downthread_status_mutex);
671 downthread_status = DOWNTHREAD_LOCK;
674 //----Debug log----------------------------------------------------------------------
675 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
676 std::stringstream buf;
678 buf << boost::this_thread::get_id();
679 buf << "] FUNC OUT set_virtual_service_message message:[SESSION_PAUSE_ON]";
680 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 12, buf.str(), __FILE__, __LINE__);
682 //----Debug log----------------------------------------------------------------------
684 case SESSION_PAUSE_OFF: {
686 boost::mutex::scoped_lock lock(upthread_status_mutex);
687 if (upthread_status == UPTHREAD_LOCK)
688 upthread_status_cond.notify_one();
691 boost::mutex::scoped_lock lock(downthread_status_mutex);
692 if (downthread_status == DOWNTHREAD_LOCK)
693 downthread_status_cond.notify_one();
696 //----Debug log----------------------------------------------------------------------
697 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
698 std::stringstream buf;
700 buf << boost::this_thread::get_id();
701 buf << "] FUNC OUT set_virtual_service_message message:[SESSION_PAUSE_OFF]";
702 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 13, buf.str(), __FILE__, __LINE__);
704 //----Debug log----------------------------------------------------------------------
706 case ACCESS_LOG_ON: {
707 rw_scoped_lock scope_lock(access_log_flag_mutex);
708 access_log_flag = true;
710 //----Debug log----------------------------------------------------------------------
711 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
712 std::stringstream buf;
714 buf << boost::this_thread::get_id();
715 buf << "] FUNC OUT set_virtual_service_message message:[ACCESS_LOG_ON]";
716 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 49, buf.str(), __FILE__, __LINE__);
718 //----Debug log----------------------------------------------------------------------
720 case ACCESS_LOG_OFF: {
721 rw_scoped_lock scope_lock(access_log_flag_mutex);
722 access_log_flag = false;
724 //----Debug log----------------------------------------------------------------------
725 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
726 std::stringstream buf;
728 buf << boost::this_thread::get_id();
729 buf << "] FUNC OUT set_virtual_service_message message:[ACCESS_LOG_OFF]";
730 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 68, buf.str(), __FILE__, __LINE__);
732 //----Debug log----------------------------------------------------------------------
734 case SORRY_STATE_ENABLE:
735 //----Debug log----------------------------------------------------------------------
736 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
737 std::stringstream buf;
739 buf << boost::this_thread::get_id();
740 buf << "] set_virtual_service_message message:[SORRY_STATE_ENABLE]";
741 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 14, buf.str(), __FILE__, __LINE__);
743 //----Debug log----------------------------------------------------------------------
745 case SORRY_STATE_DISABLE:
746 //----Debug log----------------------------------------------------------------------
747 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
748 std::stringstream buf;
750 buf << boost::this_thread::get_id();
751 buf << "] set_virtual_service_message message:[SORRY_STATE_DISABLE]";
752 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 15, buf.str(), __FILE__, __LINE__);
754 //----Debug log----------------------------------------------------------------------
757 //----Debug log----------------------------------------------------------------------
758 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
759 std::stringstream buf;
761 buf << boost::this_thread::get_id();
762 buf << "] set_virtual_service_message message:[SESSION_END]";
763 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 16, buf.str(), __FILE__, __LINE__);
765 //----Debug log----------------------------------------------------------------------
769 tcp_thread_message *up_msg = new tcp_thread_message;
770 std::map< TCP_VIRTUAL_SERVICE_MESSAGE_TAG, tcp_session_func>::iterator up_func;
771 up_func = virtual_service_message_up_thread_function_map.find(message);
773 up_msg->message = up_func->second;
774 while (!up_thread_message_que.push(up_msg)) {}
776 tcp_thread_message *down_msg = new tcp_thread_message;
777 std::map< TCP_VIRTUAL_SERVICE_MESSAGE_TAG, tcp_session_func>::iterator down_func;
778 down_func = virtual_service_message_down_thread_function_map.find(message);
779 down_msg->message = down_func->second;
780 while (!down_thread_message_que.push(down_msg)) {}
782 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
783 boost::format formatter("Thread ID[%d] FUNC OUT set_virtual_service_message");
784 formatter % boost::this_thread::get_id();
785 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
788 //! up stream thread main function
789 void tcp_session::up_thread_run()
791 boost::system::error_code ec;
793 //----Debug log----------------------------------------------------------------------
794 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
795 boost::format formatter("Thread ID[%d] FUNC IN up_thread_run");
796 formatter % boost::this_thread::get_id();
797 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 17, formatter.str(), __FILE__, __LINE__);
799 //----Debug log----------------------------------------------------------------------
801 up_thread_id = boost::this_thread::get_id();
803 boost::mutex::scoped_lock lock1(upthread_status_mutex);
804 boost::mutex::scoped_lock lock2(realserver_connect_mutex);
805 upthread_status = UPTHREAD_ALIVE;
806 realserver_connect_status = false;
809 //----Debug log----------------------------------------------------------------------
810 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
811 std::stringstream buf;
812 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
813 buf << "up thread down thread alive wait start";
814 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 18, buf.str(), __FILE__, __LINE__);
816 //----Debug log----------------------------------------------------------------------
819 boost::mutex::scoped_lock lock(downthread_status_mutex);
820 if (downthread_status < DOWNTHREAD_ALIVE){
821 to_time(LOCKTIMEOUT, xt);
822 downthread_status_cond.timed_wait( lock , xt );
826 //----Debug log----------------------------------------------------------------------
827 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
828 std::stringstream buf;
829 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
830 buf << "up thread down thread alive wait end";
831 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 19, buf.str(), __FILE__, __LINE__);
833 //----Debug log----------------------------------------------------------------------
835 if (likely(!exit_flag)) {
836 bool bres = !ssl_flag ? client_socket.get_socket().lowest_layer().is_open()
837 : client_ssl_socket.get_socket().lowest_layer().is_open();
838 if (unlikely(!bres)) {
839 // cannot get client socket
840 std::stringstream buf;
841 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
842 buf << "cannot get client socket";
843 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 9, buf.str(), __FILE__, __LINE__);
845 rw_scoped_lock scoped_lock(exit_flag_update_mutex);
852 client_socket.accept();
854 client_ssl_socket.accept();
856 if (likely(!exit_flag)) {
857 client_endpoint = (!ssl_flag) ? client_socket.get_socket().lowest_layer().remote_endpoint(ec)
858 : client_ssl_socket.get_socket().lowest_layer().remote_endpoint(ec);
860 //cannot get client endpoint
861 std::stringstream buf;
862 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
863 buf << "cannot get client endpoint: ";
865 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 10, buf.str(), __FILE__, __LINE__);
867 rw_scoped_lock scoped_lock(exit_flag_update_mutex);
872 if (likely(!exit_flag)) {
873 bool bres = !ssl_flag ? client_socket.set_non_blocking_mode(ec)
874 : client_ssl_socket.set_non_blocking_mode(ec);
875 if (unlikely(!bres)) {
876 // cannot set socket to non-blocking mode
877 std::stringstream buf;
878 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
879 buf << "cannot set socket to non-blocking mode:";
881 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 11, buf.str(), __FILE__, __LINE__);
883 rw_scoped_lock scoped_lock(exit_flag_update_mutex);
888 if (likely(!exit_flag)) {
889 // set client socket options(receive buffer size)
890 if (upstream_buffer_size > 0) {
891 boost::asio::socket_base::receive_buffer_size opt(upstream_buffer_size);
894 client_socket.get_socket().lowest_layer().set_option(opt, ec);
896 client_ssl_socket.get_socket().lowest_layer().set_option(opt, ec);
899 // cannot set socket option
900 std::stringstream buf;
901 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
902 buf << "cannot set client socket receive buffer size: ";
904 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 12, buf.str(), __FILE__, __LINE__);
906 rw_scoped_lock scoped_lock(exit_flag_update_mutex);
912 if (likely(!exit_flag)) {
913 // set client socket options(send buffer size)
914 if (downstream_buffer_size > 0) {
915 boost::asio::socket_base::send_buffer_size opt(downstream_buffer_size);
918 client_socket.get_socket().lowest_layer().set_option(opt, ec);
920 client_ssl_socket.get_socket().lowest_layer().set_option(opt, ec);
923 // cannot set socket option
924 std::stringstream buf;
925 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
926 buf << "cannot set client socket send buffer size: ";
928 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 13, buf.str(), __FILE__, __LINE__);
930 rw_scoped_lock scoped_lock(exit_flag_update_mutex);
937 boost::asio::ip::udp::endpoint dummy_end;
938 protocol_module_base::EVENT_TAG module_event;
939 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type;
940 up_thread_function_pair func;
942 if (likely(!exit_flag)) {
943 module_event = protocol_module->handle_session_initialize(up_thread_id, down_thread_id, client_endpoint, dummy_end);
944 func_type = up_thread_module_event_map.find(module_event);
945 func = up_thread_function_array[func_type->second];
946 up_thread_next_call_function = func;
949 boost::mutex::scoped_lock lock(upthread_status_mutex);
950 upthread_status = UPTHREAD_ACTIVE;
952 upthread_status_cond.notify_one();
954 //----Debug log----------------------------------------------------------------------
955 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
956 std::stringstream buf;
957 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
958 buf << "up thread loop start";
959 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 20, buf.str(), __FILE__, __LINE__);
961 //----Debug log----------------------------------------------------------------------
964 boost::mutex::scoped_lock lock( upthread_status_mutex );
965 if ( unlikely( upthread_status == UPTHREAD_LOCK ) ) {
966 to_time(LOCKTIMEOUT, xt);
967 upthread_status_cond.timed_wait( lock , xt );
968 //upthread_status_cond.wait( lock );
969 upthread_status = UPTHREAD_ACTIVE;
972 tcp_thread_message *msg = up_thread_message_que.pop();
974 if (unlikely(UP_FUNC_EXIT == up_thread_next_call_function.first)) {
975 up_thread_next_call_function.second(LOCAL_PROC);
977 up_thread_message_data.set_endpoint(msg->endpoint_info);
978 msg->message(MESSAGE_PROC);
983 up_thread_next_call_function.second(LOCAL_PROC);
986 //----Debug log----------------------------------------------------------------------
987 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
988 std::stringstream buf;
989 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
990 buf << "up thread loop end";
991 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 21, buf.str(), __FILE__, __LINE__);
993 //----Debug log----------------------------------------------------------------------
995 up_thread_all_socket_close();
998 boost::mutex::scoped_lock lock(upthread_status_mutex);
999 upthread_status = UPTHREAD_ALIVE;
1002 //----Debug log----------------------------------------------------------------------
1003 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1004 std::stringstream buf;
1005 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
1006 buf << "up thread/down thread dead wait start";
1007 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 22, buf.str(), __FILE__, __LINE__);
1009 //----Debug log----------------------------------------------------------------------
1011 boost::mutex::scoped_lock lock(downthread_status_mutex);
1012 if (downthread_status != DOWNTHREAD_SLEEP) {
1013 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1014 boost::format formatter("Thread ID[%s] down thread finalize wait");
1015 formatter % boost::this_thread::get_id();
1016 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1018 to_time(LOCKTIMEOUT, xt);
1019 downthread_status_cond.timed_wait(lock, xt);
1023 //----Debug log----------------------------------------------------------------------
1024 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1025 std::stringstream buf;
1026 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
1027 buf << "up thread/down thread dead wait end";
1028 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 23, buf.str(), __FILE__, __LINE__);
1030 //----Debug log----------------------------------------------------------------------
1031 if (likely(protocol_module != NULL))
1032 protocol_module->handle_session_finalize(up_thread_id, down_thread_id);
1033 //----Debug log----------------------------------------------------------------------
1034 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1035 std::stringstream buf;
1036 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
1037 buf << "up thread called handle_session_finalize";
1038 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 24, buf.str(), __FILE__, __LINE__);
1040 //----Debug log----------------------------------------------------------------------
1042 boost::mutex::scoped_lock lock(upthread_status_mutex);
1043 upthread_status = UPTHREAD_SLEEP;
1046 rw_scoped_lock scoped_lock(exit_flag_update_mutex);
1047 if( !__sync_bool_compare_and_swap( &exit_flag, true, 2 ) ){
1048 parent_service.release_session(this);
1052 //----Debug log----------------------------------------------------------------------
1053 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1054 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_run");
1055 formatter % boost::this_thread::get_id();
1056 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 18, formatter.str(), __FILE__, __LINE__);
1060 //! down stream thread main function
1061 void tcp_session::down_thread_run()
1063 //----Debug log----------------------------------------------------------------------
1064 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1065 boost::format formatter("Thread ID[%d] FUNC IN down_thread_run");
1066 formatter % boost::this_thread::get_id();
1067 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 27, formatter.str(), __FILE__, __LINE__);
1069 //----Debug log----------------------------------------------------------------------
1070 down_thread_id = boost::this_thread::get_id();
1072 boost::mutex::scoped_lock lock(downthread_status_mutex);
1073 downthread_status = DOWNTHREAD_ALIVE;
1075 downthread_status_cond.notify_one();
1077 //----Debug log----------------------------------------------------------------------
1078 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1079 std::stringstream buf;
1080 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
1081 buf << "down_thread_run up thread active wait start";
1082 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 28, buf.str(), __FILE__, __LINE__);
1084 //----Debug log----------------------------------------------------------------------
1086 boost::mutex::scoped_lock lock(upthread_status_mutex);
1087 while (upthread_status < UPTHREAD_ACTIVE) {
1088 to_time(LOCKTIMEOUT, xt);
1089 upthread_status_cond.timed_wait(upthread_status_mutex, xt);
1093 //----Debug log----------------------------------------------------------------------
1094 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1095 std::stringstream buf;
1096 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
1097 buf << "down_thread_run up thread active wait end";
1098 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 29, buf.str(), __FILE__, __LINE__);
1100 //----Debug log----------------------------------------------------------------------
1102 boost::mutex::scoped_lock lock(downthread_status_mutex);
1103 downthread_status = DOWNTHREAD_ACTIVE;
1105 downthread_status_cond.notify_one();
1107 down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_REALSERVER_RECEIVE];
1109 //----Debug log----------------------------------------------------------------------
1110 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1111 std::stringstream buf;
1112 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
1113 buf << "down thread loop start";
1114 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 30, buf.str(), __FILE__, __LINE__);
1116 //----Debug log----------------------------------------------------------------------
1118 while (!exit_flag) {
1119 if ( unlikely ( downthread_status == DOWNTHREAD_LOCK ) ) {
1120 boost::mutex::scoped_lock lock(downthread_status_mutex);
1121 to_time(LOCKTIMEOUT, xt);
1122 downthread_status_cond.timed_wait( lock , xt );
1123 //downthread_status_cond.wait( lock );
1124 downthread_status = DOWNTHREAD_ACTIVE;
1126 while (unlikely(!down_thread_connect_socket_list.empty())) {
1127 socket_element push_rs_socket = down_thread_connect_socket_list.get_socket();
1128 down_thread_receive_realserver_socket_list.push_back(push_rs_socket);
1129 down_thread_current_receive_realserver_socket = down_thread_receive_realserver_socket_list.begin();
1130 boost::mutex::scoped_lock lock(upthread_status_mutex);
1131 if (upthread_status < UPTHREAD_ALIVE)
1135 tcp_thread_message *msg = down_thread_message_que.pop();
1136 if (unlikely(msg)) {
1137 if (unlikely(DOWN_FUNC_EXIT == down_thread_next_call_function.first)) {
1138 down_thread_next_call_function.second(LOCAL_PROC);
1140 down_thread_message_data.set_endpoint(msg->endpoint_info);
1141 msg->message(MESSAGE_PROC);
1146 down_thread_next_call_function.second(LOCAL_PROC);
1149 //----Debug log----------------------------------------------------------------------
1150 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1151 std::stringstream buf;
1152 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
1153 buf << "down thread loop end";
1154 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 31, buf.str(), __FILE__, __LINE__);
1156 //----Debug log----------------------------------------------------------------------
1157 down_thread_all_socket_close();
1159 boost::mutex::scoped_lock lock(downthread_status_mutex);
1160 downthread_status = DOWNTHREAD_ALIVE;
1162 downthread_status_cond.notify_one();
1163 //----Debug log----------------------------------------------------------------------
1164 //----Debug log----------------------------------------------------------------------
1166 boost::mutex::scoped_lock lock(downthread_status_mutex);
1167 downthread_status = DOWNTHREAD_SLEEP;
1169 downthread_status_cond.notify_one();
1170 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1171 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_run");
1172 formatter % boost::this_thread::get_id();
1173 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 32, formatter.str(), __FILE__, __LINE__);
1177 rw_scoped_lock scoped_lock(exit_flag_update_mutex);
1178 if( !__sync_bool_compare_and_swap( &exit_flag, true, 2 ) ){
1179 parent_service.release_session(this);
1185 //! endpoint data to string information
1186 //! @param[in] endpoint is target endpoint object
1187 std::string tcp_session::endpoint_to_string(
1188 const endpoint &target_endpoint)
1190 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1191 boost::format formatter("Thread ID[%d] FUNC IN endpoint_to_string");
1192 formatter % boost::this_thread::get_id();
1193 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1196 std::stringstream ret;
1197 if (target_endpoint.address().is_v6()) {
1198 ret << "[" << target_endpoint.address().to_string() << "]:" << target_endpoint.port();
1200 ret << target_endpoint.address().to_string() << ":" << target_endpoint.port();
1203 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1204 boost::format formatter("Thread ID[%d] FUNC OUT endpoint_to_string");
1205 formatter % boost::this_thread::get_id();
1206 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1212 //! up thread accept client side
1213 //! @param[in] process_type is process type
1214 void tcp_session::up_thread_client_accept(const TCP_PROCESS_TYPE_TAG process_type)
1216 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1217 boost::format formatter("Thread ID[%d] FUNC IN up_thread_client_accept");
1218 formatter % boost::this_thread::get_id();
1219 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1222 UP_THREAD_FUNC_TYPE_TAG func_tag;
1226 rd_scoped_lock scoped_lock(ssl_handshake_time_out_flag_mutex);
1227 if (ssl_handshake_time_out_flag) {
1228 // SSL handshake time out or timer error
1229 func_tag = UP_FUNC_CLIENT_DISCONNECT;
1230 std::stringstream buf;
1231 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
1232 buf << "handshake timer timeout " << ssl_handshake_time_out << "sec: ";
1233 buf << "handshaking not end";
1234 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 112, buf.str(), __FILE__, __LINE__);
1236 if (ssl_handshake_timer_flag == false) {
1237 // set handshake timer
1238 //register handshake timer event handler
1239 ssl_handshake_timer.reset(new boost::asio::deadline_timer(io));
1240 ssl_handshake_timer->expires_from_now(boost::posix_time::seconds(ssl_handshake_time_out));
1241 ssl_handshake_timer->async_wait(boost::bind(&tcp_session::handle_ssl_handshake_timer,
1243 boost::asio::placeholders::error));
1244 ssl_handshake_timer_flag = true;
1245 //----Debug log----------------------------------------------------------------------
1246 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1247 std::stringstream buf;
1248 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
1249 buf << "ssl session handshaking start: ";
1250 buf << "set handshake timer " << ssl_handshake_time_out << "sec";
1251 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 70, buf.str(), __FILE__, __LINE__);
1253 //----Debug log----------------------------------------------------------------------
1255 // try ssl handshake
1256 boost::system::error_code ec;
1257 client_ssl_socket.handshake(ec);
1261 ssl_handshake_timer->cancel();
1262 func_tag = UP_FUNC_CLIENT_ACCEPT_EVENT;
1265 if (ec == boost::asio::error::try_again) {
1266 func_tag = UP_FUNC_CLIENT_ACCEPT;
1267 boost::this_thread::yield();
1270 ssl_handshake_timer->cancel();
1271 func_tag = UP_FUNC_CLIENT_DISCONNECT;
1272 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 113, "ssl socket handshaking failed", __FILE__, __LINE__);
1273 //----Debug log----------------------------------------------------------------------
1274 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1275 std::stringstream buf;
1276 buf << "Thread ID[";
1277 buf << boost::this_thread::get_id();
1278 buf << "] tcp_ssl_socket::handshake [";
1279 buf << ec.message();
1281 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 71, buf.str(), __FILE__, __LINE__);
1283 //----Debug log----------------------------------------------------------------------
1288 func_tag = UP_FUNC_CLIENT_ACCEPT_EVENT;
1290 up_thread_next_call_function = up_thread_function_array[func_tag];
1292 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1293 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_client_accept: NEXT_FUNC[%s]");
1294 formatter % boost::this_thread::get_id() % func_tag_to_string(func_tag);
1295 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1299 //! up thread raise module event of handle_accept and do handshake
1300 //! @param[in] process_type is process type
1301 void tcp_session::up_thread_client_accept_event(const TCP_PROCESS_TYPE_TAG process_type)
1303 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1304 boost::format formatter("Thread ID[%d] FUNC IN up_thread_client_accept_event");
1305 formatter % boost::this_thread::get_id();
1306 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1309 protocol_module_base::EVENT_TAG module_event;
1310 module_event = protocol_module->handle_accept(up_thread_id);
1311 up_thread_data_client_side.set_endpoint(client_endpoint);
1312 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
1314 up_thread_function_pair func = up_thread_function_array[func_type->second];
1315 up_thread_next_call_function = func;
1317 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1318 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_client_accept_event: NEXT_FUNC[%s]");
1319 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
1320 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1324 //! handshake timer handler
1325 //! @param[in] error is timer operation result error code
1326 void tcp_session::handle_ssl_handshake_timer(const boost::system::error_code &error)
1328 //----Debug log--------------------------------------------------------
1329 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1330 boost::format formatter("Thread ID[%d] FUNC IN handle_ssl_handshake_timer");
1331 formatter % boost::this_thread::get_id();
1332 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 72, formatter.str(), __FILE__, __LINE__);
1334 //----Debug log--------------------------------------------------------
1337 rw_scoped_lock scoped_lock(ssl_handshake_time_out_flag_mutex);
1338 ssl_handshake_time_out_flag = true;
1340 if (error.value() == ECANCELED) {
1341 //----Debug log----------------------------------------
1342 if (unlikely(LOG_LV_DEBUG ==
1343 Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1344 std::stringstream buf;
1345 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
1346 buf << "handshake timer operation cancel: ";
1347 buf << "handshaking normal end";
1348 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION,
1349 87, buf.str(), __FILE__, __LINE__);
1351 //----Debug log----------------------------------------
1353 std::stringstream buf;
1354 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
1355 buf << "handshake timer operation failed: ";
1356 buf << error.message();
1357 Logger::putLogError(LOG_CAT_L7VSD_SESSION,
1358 116, buf.str(), __FILE__, __LINE__);
1362 //----Debug log--------------------------------------------------------
1363 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1364 boost::format formatter("Thread ID[%d] FUNC OUT handle_ssl_handshake_timer");
1365 formatter % boost::this_thread::get_id();
1366 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 73, formatter.str(), __FILE__, __LINE__);
1368 //----Debug log--------------------------------------------------------
1371 //! up thread receive client side and raise module event of handle_client_recv
1372 //! @param[in] process_type is process type
1373 void tcp_session::up_thread_client_receive(const TCP_PROCESS_TYPE_TAG process_type)
1375 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1376 boost::format formatter("Thread ID[%d] FUNC IN up_thread_client_receive");
1377 formatter % boost::this_thread::get_id();
1378 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1381 if (unlikely(0 < parent_service.get_wait_upstream())) {
1382 //----Debug log----------------------------------------------------------------------
1383 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1384 std::stringstream buf;
1385 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
1386 buf << "up_thread_client_receive qos wait active";
1387 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 35, buf.str(), __FILE__, __LINE__);
1389 //----Debug log----------------------------------------------------------------------
1390 return; // try again
1392 up_thread_data_client_side.initialize();
1393 boost::array<char, MAX_BUFFER_SIZE>& data_buff = up_thread_data_client_side.get_data();
1394 boost::system::error_code ec;
1395 std::size_t recv_size;
1396 UP_THREAD_FUNC_TYPE_TAG func_tag;
1398 struct epoll_event event;
1399 event.data.fd = !ssl_flag ? client_socket.get_socket().native()
1400 : client_ssl_socket.get_socket().lowest_layer().native();
1403 if (is_epoll_edge_trigger) {
1404 event.events = EPOLLIN | EPOLLHUP | EPOLLET;
1406 event.events = EPOLLIN | EPOLLHUP;
1408 bool add_flag = false;
1409 if (!up_client_epollfd_registered) {
1410 if (epoll_ctl(up_client_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
1411 std::stringstream buf;
1412 buf << "up_thread_client_receive: epoll_ctl EPOLL_CTL_ADD error: ";
1413 buf << strerror(errno);
1414 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
1415 boost::this_thread::yield();
1418 up_client_epollfd_registered = true;
1422 if (is_epoll_edge_trigger && (!add_flag)) {
1423 if (epoll_ctl(up_client_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
1424 std::stringstream buf;
1425 buf << "up_thread_client_receive: epoll_ctl EPOLL_CTL_MOD error: ";
1426 buf << strerror(errno);
1427 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
1428 up_thread_next_call_function = up_thread_function_array[UP_FUNC_CLIENT_DISCONNECT];
1432 int ret_fds = epoll_wait(up_client_epollfd, up_client_events, EVENT_NUM, epoll_timeout);
1434 //----Debug log----------------------------------------------------------------------
1435 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1436 boost::format formatter("up_thread_client_receive: epoll_wait timeout %d msec");
1437 formatter % epoll_timeout;
1438 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1440 //----Debug log----------------------------------------------------------------------
1441 up_thread_next_call_function = up_thread_function_array[UP_FUNC_CLIENT_RECEIVE];
1443 } else if (ret_fds < 0) {
1444 boost::format formatter("up_thread_client_receive: epoll_wait error: %s");
1445 formatter % strerror(errno);
1446 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1447 up_thread_next_call_function = up_thread_function_array[UP_FUNC_CLIENT_DISCONNECT];
1451 for (int i = 0; i < ret_fds; ++i) {
1452 if (up_client_events[i].data.fd == event.data.fd) {
1453 if (up_client_events[i].events & EPOLLIN) {
1456 if (up_client_events[i].events & EPOLLHUP) {
1457 up_thread_next_call_function = up_thread_function_array[UP_FUNC_CLIENT_DISCONNECT];
1458 //----Debug log----------------------------------------------------------------------
1459 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1460 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_client_receive EPOLLHUP: NEXT_FUNC[%s]");
1461 formatter % boost::this_thread::get_id() % func_tag_to_string(UP_FUNC_CLIENT_DISCONNECT);
1462 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1464 //----Debug log----------------------------------------------------------------------
1470 recv_size = !ssl_flag ? client_socket.read_some(
1471 boost::asio::buffer(data_buff, MAX_BUFFER_SIZE),
1473 : client_ssl_socket.read_some(
1474 boost::asio::buffer(data_buff, MAX_BUFFER_SIZE),
1478 if (recv_size > 0) {
1479 //----Debug log----------------------------------------------------------------------
1480 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1481 endpoint client_endpoint; // XXX redefined???
1482 client_endpoint = !ssl_flag ? client_socket.get_socket().lowest_layer().remote_endpoint(ec)
1483 : client_ssl_socket.get_socket().lowest_layer().remote_endpoint(ec);
1484 boost::format formatter("Thread ID[%d] up_thread_client_receive receive data size[%d] from [%d]");
1485 formatter % boost::this_thread::get_id() % recv_size % client_endpoint;
1486 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 36, formatter.str(), __FILE__, __LINE__);
1488 //----Debug log----------------------------------------------------------------------
1489 up_thread_data_client_side.set_size(recv_size);
1490 parent_service.update_up_recv_size(recv_size);
1491 protocol_module_base::EVENT_TAG module_event = protocol_module->handle_client_recv(up_thread_id, data_buff, recv_size);
1492 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
1493 if (unlikely(func_type == up_thread_module_event_map.end())) {
1494 //Error unknown protocol_module_base::EVENT_TAG return
1495 boost::format formatter("Thread ID[%d] protocol_module returned illegal EVENT_TAG: %d");
1496 formatter % boost::this_thread::get_id() % module_event;
1497 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 20, formatter.str(), __FILE__, __LINE__);
1498 up_thread_exit(process_type);
1501 func_tag = func_type->second;
1503 boost::format formatter("Thread ID[%d] client read error. recv_size: %d");
1504 formatter % boost::this_thread::get_id() % recv_size;
1505 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1506 func_tag = UP_FUNC_CLIENT_RECEIVE;
1509 if (ec == boost::asio::error::eof) {
1510 func_tag = UP_FUNC_CLIENT_DISCONNECT;
1511 } else if (ec == boost::asio::error::try_again) {
1512 func_tag = UP_FUNC_CLIENT_RECEIVE;
1514 boost::format formatter("Thread ID[%d] client read error: %s");
1515 formatter % boost::this_thread::get_id() % ec.message();
1516 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1517 func_tag = UP_FUNC_CLIENT_DISCONNECT;
1520 up_thread_next_call_function = up_thread_function_array[func_tag];
1522 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1523 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_client_receive: NEXT_FUNC[%s]");
1524 formatter % boost::this_thread::get_id() % func_tag_to_string(func_tag);
1525 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1528 //! up thread raise client respond send event message for up and down thread
1529 //! @param[in] process_type is process type
1530 void tcp_session::up_thread_client_respond(const TCP_PROCESS_TYPE_TAG process_type)
1532 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1533 boost::format formatter("Thread ID[%d] FUNC IN up_thread_client_respond");
1534 formatter % boost::this_thread::get_id();
1535 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1538 tcp_thread_message *up_msg = new tcp_thread_message;
1539 tcp_thread_message *down_msg = new tcp_thread_message;
1540 up_thread_function_pair up_func = up_thread_function_array[UP_FUNC_CLIENT_RESPOND_SEND_EVENT];
1542 up_msg->message = up_func.second;
1543 std::map<DOWN_THREAD_FUNC_TYPE_TAG, tcp_session_func>::iterator down_func
1544 = up_thread_message_down_thread_function_map.find(DOWN_FUNC_CLIENT_RESPOND_SEND_EVENT);
1545 down_msg->message = down_func->second;
1546 while (!up_thread_message_que.push(up_msg)) {}
1547 while (!down_thread_message_que.push(down_msg)) {}
1549 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1550 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_client_respond");
1551 formatter % boost::this_thread::get_id();
1552 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1555 //! up thread raise module event of handle_response_send_inform
1556 //! @param[in] process_type is process type
1557 void tcp_session::up_thread_client_respond_event(const TCP_PROCESS_TYPE_TAG process_type)
1559 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1560 boost::format formatter("Thread ID[%d] FUNC IN up_thread_client_respond_event");
1561 formatter % boost::this_thread::get_id();
1562 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1565 protocol_module_base::EVENT_TAG module_event;
1567 rw_scoped_lock scope_lock(module_function_response_send_inform_mutex);
1568 module_event = protocol_module->handle_response_send_inform(up_thread_id);
1570 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
1571 up_thread_function_pair func = up_thread_function_array[func_type->second];
1572 up_thread_next_call_function = func;
1574 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1575 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_client_respond_event: NEXT_FUNC[%s]");
1576 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
1577 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1580 //! up thread close client socket and raise client disconnect event message for up and down thread
1581 //! @param[in] process_type is process type
1582 void tcp_session::up_thread_client_disconnect(const TCP_PROCESS_TYPE_TAG process_type)
1584 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1585 boost::format formatter("Thread ID[%d] FUNC IN func up_thread_client_disconnect");
1586 formatter % boost::this_thread::get_id();
1587 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1590 boost::system::error_code ec;
1591 bool bres = !ssl_flag ? client_socket.close(ec)
1592 : client_ssl_socket.close(ec);
1595 tcp_thread_message *up_msg = new tcp_thread_message;
1596 tcp_thread_message *down_msg = new tcp_thread_message;
1597 up_thread_function_pair up_func = up_thread_function_array[UP_FUNC_CLIENT_DISCONNECT_EVENT];
1598 up_msg->message = up_func.second;
1599 down_thread_function_pair down_func = down_thread_function_array[DOWN_FUNC_CLIENT_DISCONNECT_EVENT];
1600 down_msg->message = down_func.second;
1602 while (!up_thread_message_que.push(up_msg)) {}
1603 while (!down_thread_message_que.push(down_msg)) {}
1605 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1606 boost::format formatter("Thread ID[%d] FUNC_OUT up_thread_client_disconnect up_func[%s] down_func[%s]");
1607 formatter % boost::this_thread::get_id()
1608 % func_tag_to_string(UP_FUNC_CLIENT_DISCONNECT_EVENT)
1609 % func_tag_to_string(DOWN_FUNC_CLIENT_DISCONNECT_EVENT);
1610 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1613 //! up thread raise module event of handle_client_disconnect
1614 //! @param[in] process_type is process type
1615 void tcp_session::up_thread_client_disconnect_event(const TCP_PROCESS_TYPE_TAG process_type)
1617 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1618 boost::format formatter("Thread ID[%d] FUNC IN up_thread_client_disconnect_event");
1619 formatter % boost::this_thread::get_id();
1620 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1622 protocol_module_base::EVENT_TAG module_event;
1624 rw_scoped_lock scope_lock(module_function_client_disconnect_mutex);
1625 module_event = protocol_module->handle_client_disconnect(up_thread_id);
1627 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
1628 up_thread_function_pair func = up_thread_function_array[func_type->second];
1629 up_thread_next_call_function = func;
1631 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1632 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_client_disconnect_event: NEXT_FUNC[%s]");
1633 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
1634 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1637 //! up thread send realserver and raise module event of handle_client_recv
1638 //! @param[in] process_type is process type
1639 void tcp_session::up_thread_realserver_send(const TCP_PROCESS_TYPE_TAG process_type)
1641 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1642 boost::format formatter("Thread ID[%d] FUNC IN up_thread_realserver_send");
1643 formatter % boost::this_thread::get_id();
1644 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1647 boost::system::error_code ec;
1648 endpoint server_endpoint = up_thread_data_dest_side.get_endpoint();
1649 std::map<endpoint, tcp_socket_ptr>::iterator send_socket = up_thread_send_realserver_socket_map.find(server_endpoint);
1650 boost::array<char, MAX_BUFFER_SIZE>& data_buff = up_thread_data_dest_side.get_data();
1651 std::size_t data_size = up_thread_data_dest_side.get_size();
1652 std::size_t send_data_size = up_thread_data_dest_side.get_send_size();
1653 std::size_t send_size;
1654 UP_THREAD_FUNC_TYPE_TAG func_tag;
1656 struct epoll_event event;
1657 event.data.fd = send_socket->second->get_socket().native();
1658 if (is_epoll_edge_trigger) {
1659 event.events = EPOLLOUT | EPOLLHUP | EPOLLET;
1661 event.events = EPOLLOUT | EPOLLHUP;
1663 bool add_flag = false;
1664 if (!up_realserver_epollfd_registered) {
1665 if (epoll_ctl(up_realserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
1666 std::stringstream buf;
1667 buf << "up_thread_realserver_send: epoll_ctl EPOLL_CTL_ADD error: ";
1668 buf << strerror(errno);
1669 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
1670 boost::this_thread::yield();
1673 up_realserver_epollfd_registered = true;
1678 rd_scoped_lock scoped_lock(exit_flag_update_mutex);
1679 if (unlikely(exit_flag)) {
1680 up_thread_next_call_function = up_thread_function_array[UP_FUNC_EXIT];
1685 if (is_epoll_edge_trigger && (!add_flag)) {
1686 if (epoll_ctl(up_realserver_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
1687 std::stringstream buf;
1688 buf << "up_thread_realserver_send: epoll_ctl EPOLL_CTL_MOD error: ";
1689 buf << strerror(errno);
1690 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
1691 boost::this_thread::yield();
1696 int ret_fds = epoll_wait(up_realserver_epollfd, up_realserver_events, EVENT_NUM, epoll_timeout);
1699 //----Debug log----------------------------------------------------------------------
1700 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1701 std::stringstream buf;
1702 buf << "up_thread_realserver_send: epoll_wait timeout " << epoll_timeout << " msec.";
1703 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
1705 //----Debug log----------------------------------------------------------------------
1707 std::stringstream buf;
1708 buf << "up_thread_realserver_send: epoll_wait error: " << strerror(errno);
1709 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
1711 //XXX no need to retry???
1712 up_thread_next_call_function = up_thread_function_array[UP_FUNC_REALSERVER_DISCONNECT];
1716 for (int i = 0; i < ret_fds; ++i) {
1717 if (up_realserver_events[i].data.fd == event.data.fd) {
1718 if (up_realserver_events[i].events & EPOLLOUT) {
1720 } else if (up_realserver_events[i].events & EPOLLHUP) {
1721 std::stringstream buf;
1722 buf << "up_thread_realserver_send: epoll hung up event";
1723 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
1724 up_thread_next_call_function = up_thread_function_array[UP_FUNC_REALSERVER_DISCONNECT];
1730 send_size = send_socket->second->write_some(
1731 boost::asio::buffer(
1732 data_buff.data() + send_data_size,
1733 data_size - send_data_size),
1736 send_data_size += send_size;
1737 up_thread_data_dest_side.set_send_size(send_data_size);
1738 parent_service.update_up_send_size(send_size);
1739 //----Debug log----------------------------------------------------------------------
1740 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1741 boost::format formatter("Thread ID[%d] up_thread_realserver_send send data size[%d] for [%d]");
1742 formatter % boost::this_thread::get_id() % send_size % server_endpoint;
1743 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 37, formatter.str(), __FILE__, __LINE__);
1745 //----Debug log----------------------------------------------------------------------
1746 if (data_size > send_data_size) {
1747 func_tag = UP_FUNC_REALSERVER_SEND;
1749 protocol_module_base::EVENT_TAG module_event = protocol_module->handle_realserver_send(up_thread_id);
1750 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
1751 func_tag = func_type->second;
1754 if (ec == boost::asio::error::try_again) {
1755 func_tag = UP_FUNC_REALSERVER_SEND;
1757 func_tag = UP_FUNC_REALSERVER_DISCONNECT;
1761 up_thread_next_call_function = up_thread_function_array[func_tag];
1763 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1764 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_realserver_send: NEXT_FUNC[%s]");
1765 formatter % boost::this_thread::get_id() % func_tag_to_string(func_tag);
1766 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1771 //! up thread raise module event of handle_realserver_select
1772 //! @param[in] process_type is process type
1773 void tcp_session::up_thread_realserver_get_destination_event(const TCP_PROCESS_TYPE_TAG process_type)
1775 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1776 boost::format formatter("Thread ID[%d] FUNC IN up_thread_realserver_get_destination_event");
1777 formatter % boost::this_thread::get_id();
1778 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1781 endpoint server_endpoint;
1783 protocol_module_base::EVENT_TAG module_event = protocol_module->handle_realserver_select(up_thread_id, server_endpoint);
1784 up_thread_data_dest_side.set_endpoint(server_endpoint);
1786 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
1787 up_thread_function_pair func = up_thread_function_array[func_type->second];
1788 up_thread_next_call_function = func;
1790 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1791 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_realserver_get_destination_event");
1792 formatter % boost::this_thread::get_id();
1793 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1796 //! up thread connect realserver
1797 //! @param[in] process_type is process type
1798 void tcp_session::up_thread_realserver_connect(const TCP_PROCESS_TYPE_TAG process_type)
1800 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1801 boost::format formatter("Thread ID[%d] FUNC IN up_thread_realserver_connect");
1802 formatter % boost::this_thread::get_id();
1803 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1806 endpoint server_endpoint = up_thread_data_dest_side.get_endpoint();
1807 std::map<endpoint, tcp_socket_ptr>::iterator get_socket = up_thread_send_realserver_socket_map.find(server_endpoint);
1808 std::map<endpoint, tcp_socket_ptr>::iterator map_end = up_thread_send_realserver_socket_map.end();
1809 UP_THREAD_FUNC_TYPE_TAG func_tag;
1810 if (get_socket != map_end) {
1811 func_tag = UP_FUNC_REALSERVER_CONNECT_EVENT;
1813 tcp_socket_ptr new_socket(new tcp_socket(io, socket_opt_info));
1814 boost::system::error_code ec;
1815 #ifdef IP_TRANSPARENT
1816 realserver_element::REALSERVER_FWDMODE_TAG fwdmode = realserver_element::FWD_NONE;
1817 std::vector<realserver_element> real_vec = parent_service.get_element().realserver_vector;
1818 for (std::vector<realserver_element>::iterator rs_itr = real_vec.begin();
1819 rs_itr != real_vec.end(); ++rs_itr) {
1820 if (rs_itr->tcp_endpoint == server_endpoint) {
1821 fwdmode = rs_itr->fwdmode;
1825 if (fwdmode == realserver_element::FWD_TPROXY && (
1826 ( server_endpoint.address().is_v4() && client_endpoint.address().is_v4() ) ||
1827 ( server_endpoint.address().is_v6() && client_endpoint.address().is_v6() ) ) ) {
1828 int ip_socket_level;
1829 if (client_endpoint.address().is_v4()) {
1830 ip_socket_level = SOL_IP;
1831 new_socket->get_socket().open(boost::asio::ip::tcp::v4(), ec);
1833 ip_socket_level = SOL_IPV6;
1834 new_socket->get_socket().open(boost::asio::ip::tcp::v6(), ec);
1837 boost::format formatter("Thread ID[%d] realserver socket open error: %s");
1838 formatter % boost::this_thread::get_id() % ec.message();
1839 Logger::putLogError(LOG_CAT_L7VSD_SESSION, /*XXX*/999, formatter.str(), __FILE__, __LINE__);
1842 // set IP_TRANSPARENT
1844 int err = ::setsockopt(new_socket->get_socket().native(),
1845 ip_socket_level, IP_TRANSPARENT, &on, sizeof(on));
1846 if (unlikely(err)) {
1847 ec = boost::system::error_code(errno, boost::asio::error::get_system_category());
1848 boost::format formatter("Thread ID[%d] realserver socket option(IP_TRANSPARENT) set failed: %s");
1849 formatter % boost::this_thread::get_id() % ec.message();
1850 Logger::putLogError(LOG_CAT_L7VSD_SESSION, /*XXX*/999, formatter.str(), __FILE__, __LINE__);
1853 // bind client address
1854 new_socket->get_socket().bind(client_endpoint, ec);
1856 if (ec == boost::asio::error::address_in_use) {
1857 boost::format formatter("Thread ID[%d] bind client address error(%s). retry by other port.");
1858 formatter % boost::this_thread::get_id() % ec.message();
1859 Logger::putLogInfo(LOG_CAT_L7VSD_SESSION, /*XXX*/999, formatter.str(), __FILE__, __LINE__);
1861 unsigned short client_port = client_endpoint.port();
1862 client_endpoint.port(0);
1863 new_socket->get_socket().bind(client_endpoint, ec);
1864 client_endpoint.port(client_port);
1867 boost::format formatter("Thread ID[%d] bind client address to realserver socket failed: %s");
1868 formatter % boost::this_thread::get_id() % ec.message();
1869 Logger::putLogError(LOG_CAT_L7VSD_SESSION, /*XXX*/999, formatter.str(), __FILE__, __LINE__);
1875 bool bres = new_socket->connect(server_endpoint, ec);
1878 rd_scoped_lock scoped_lock(access_log_flag_mutex);
1879 if (access_log_flag && access_logger != NULL) {
1880 endpoint server_side_endpoint =
1881 new_socket->get_socket().local_endpoint(ec);
1883 std::string reserv_msg("");
1884 access_logger->putLog(
1885 endpoint_to_string(virtualservice_endpoint),
1886 endpoint_to_string(client_endpoint),
1887 endpoint_to_string(server_side_endpoint),
1888 endpoint_to_string(server_endpoint),
1893 parent_service.connection_active(server_endpoint);
1894 if (unlikely(!new_socket->set_non_blocking_mode(ec))) {
1895 // socket set non-blocking mode error
1896 std::stringstream buf;
1897 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
1898 buf << "set non blocking socket error: " << ec.message();
1899 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 34, buf.str(), __FILE__, __LINE__);
1900 up_thread_exit(process_type);
1904 //set realserver_socket options(receive buffer size)
1905 if (downstream_buffer_size > 0) {
1906 boost::asio::socket_base::receive_buffer_size opt1(downstream_buffer_size);
1907 new_socket->get_socket().set_option(opt1, ec);
1909 // socket set non-blocking mode error
1910 std::stringstream buf;
1911 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
1912 buf << "realserver socket receive buffer size error: " << ec.message();
1913 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 35, buf.str(), __FILE__, __LINE__);
1914 up_thread_exit(process_type);
1918 //set realserver_socket options(send buffer size)
1919 if (upstream_buffer_size > 0) {
1920 boost::asio::socket_base::send_buffer_size opt2(upstream_buffer_size);
1921 new_socket->get_socket().set_option(opt2, ec);
1923 // socket set non-blocking mode error
1924 std::stringstream buf;
1925 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
1926 buf << "realserver socket send buffer size error: " << ec.message();
1927 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 36, buf.str(), __FILE__, __LINE__);
1928 up_thread_exit(process_type);
1933 socket_element push_element;
1934 push_element.first = server_endpoint;
1935 push_element.second = new_socket;
1936 up_thread_send_realserver_socket_map.insert(push_element);
1937 down_thread_connect_socket_list.push_back(push_element);
1938 func_tag = UP_FUNC_REALSERVER_CONNECT_EVENT;
1940 func_tag = UP_FUNC_REALSERVER_CONNECT_FAIL_EVENT;
1941 //connect socket error
1942 std::stringstream buf;
1943 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
1944 buf << "connect socket error: " << ec.message();
1945 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 37, buf.str(), __FILE__, __LINE__);
1948 up_thread_function_pair func = up_thread_function_array[func_tag];
1949 boost::mutex::scoped_lock lock(realserver_connect_mutex);
1950 realserver_connect_status = true;
1951 realserver_connect_cond.notify_one();
1952 up_thread_next_call_function = func;
1954 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1955 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_realserver_connect: NEXT_FUNC[%s]");
1956 formatter % boost::this_thread::get_id() % func_tag_to_string(func_tag);
1957 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1960 //! up thread raise module event of handle_realserver_connect
1961 //! @param[in] process_type is process type
1962 void tcp_session::up_thread_realserver_connect_event(const TCP_PROCESS_TYPE_TAG process_type)
1964 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1965 boost::format formatter("Thread ID[%d] FUNC IN up_thread_realserver_connect_event");
1966 formatter % boost::this_thread::get_id();
1967 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1970 endpoint server_endpoint = up_thread_data_dest_side.get_endpoint();
1971 up_thread_data_dest_side.initialize();
1972 boost::array<char, MAX_BUFFER_SIZE>& data_buff = up_thread_data_dest_side.get_data();
1973 size_t data_size = 0;
1974 protocol_module_base::EVENT_TAG module_event = protocol_module->handle_realserver_connect(up_thread_id, data_buff, data_size);
1975 up_thread_data_dest_side.set_endpoint(server_endpoint);
1976 up_thread_data_dest_side.set_size(data_size);
1977 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
1978 up_thread_next_call_function = up_thread_function_array[func_type->second];
1980 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1981 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_realserver_connect_event: NEXT_FUNC[%s]");
1982 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
1983 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1986 //! up thread raise module event of handle_realserver_connection_fail
1987 //! @param[in] process_type is process type
1988 void tcp_session::up_thread_realserver_connection_fail_event(const TCP_PROCESS_TYPE_TAG process_type)
1990 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
1991 boost::format formatter("Thread ID[%d] FUNC IN up_thread_realserver_connection_fail_event");
1992 formatter % boost::this_thread::get_id();
1993 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
1996 endpoint server_endpoint = up_thread_data_dest_side.get_endpoint();
1997 protocol_module_base::EVENT_TAG module_event = protocol_module->handle_realserver_connection_fail(up_thread_id, server_endpoint);
1998 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
1999 up_thread_next_call_function = up_thread_function_array[func_type->second];
2001 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2002 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_realserver_connection_fail_event: NEXT_FUNC[%s]");
2003 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
2004 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2007 //! up thread close realserver socket and raise realserver disconnect event message for up and down thread
2008 //! @param[in] process_type is process type
2009 void tcp_session::up_thread_realserver_disconnect(const TCP_PROCESS_TYPE_TAG process_type)
2011 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2012 boost::format formatter("Thread ID[%d] FUNC IN up_thread_realserver_disconnect");
2013 formatter % boost::this_thread::get_id();
2014 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2017 endpoint server_endpoint = up_thread_data_dest_side.get_endpoint();
2018 std::map<endpoint, tcp_socket_ptr>::iterator close_socket = up_thread_send_realserver_socket_map.find(server_endpoint);
2019 boost::system::error_code ec;
2020 bool bres = close_socket->second->close(ec);
2022 parent_service.connection_inactive(server_endpoint);
2023 tcp_thread_message *up_msg = new tcp_thread_message;
2024 tcp_thread_message *down_msg = new tcp_thread_message;
2025 up_thread_function_pair up_func = up_thread_function_array[UP_FUNC_REALSERVER_DISCONNECT_EVENT];
2026 up_msg->endpoint_info = server_endpoint;
2027 up_msg->message = up_func.second;
2028 std::map<DOWN_THREAD_FUNC_TYPE_TAG, tcp_session_func>::iterator down_func = up_thread_message_down_thread_function_map.find(DOWN_FUNC_REALSERVER_DISCONNECT_EVENT);
2029 down_msg->endpoint_info = server_endpoint;
2030 down_msg->message = down_func->second;
2031 while (!up_thread_message_que.push(up_msg)) {}
2032 while (!down_thread_message_que.push(down_msg)) {}
2035 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2036 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_realserver_disconnect");
2037 formatter % boost::this_thread::get_id();
2038 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2041 //! up thread raise module event of handle_client_disconnect
2042 //! @param[in] process_type is process type
2043 void tcp_session::up_thread_realserver_disconnect_event(const TCP_PROCESS_TYPE_TAG process_type)
2045 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2046 boost::format formatter("Thread ID[%d] FUNC IN up_thread_realserver_disconnect_event");
2047 formatter % boost::this_thread::get_id();
2048 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2051 protocol_module_base::EVENT_TAG module_event;
2052 endpoint server_endpoint = up_thread_message_data.get_endpoint();
2054 rw_scoped_lock scope_lock(module_function_realserver_disconnect_mutex);
2055 module_event = protocol_module->handle_realserver_disconnect(up_thread_id, server_endpoint);
2057 up_thread_send_realserver_socket_map.erase(server_endpoint);
2059 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
2060 up_thread_next_call_function = up_thread_function_array[func_type->second];
2062 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2063 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_realserver_disconnect_event: NEXT_FUNC[%s]");
2064 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
2065 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2069 //! up thread close all realserver socket and raise module event of handle_realserver_disconnect
2070 //! @param[in] process_type is process type
2071 void tcp_session::up_thread_all_realserver_disconnect(const TCP_PROCESS_TYPE_TAG process_type)
2073 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2074 boost::format formatter("Thread ID[%d] FUNC IN up_thread_all_realserver_disconnect");
2075 formatter % boost::this_thread::get_id();
2076 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2079 std::map<endpoint, tcp_socket_ptr>::iterator close_socket = up_thread_send_realserver_socket_map.begin();
2080 std::map<endpoint, tcp_socket_ptr>::iterator list_end = up_thread_send_realserver_socket_map.end();
2082 protocol_module_base::EVENT_TAG module_event;
2083 bool realserver_found = false;
2084 endpoint server_endpoint;
2085 while (close_socket != list_end) {
2086 realserver_found = true;
2087 boost::system::error_code ec;
2088 bool bres = close_socket->second->close(ec);
2089 server_endpoint = close_socket->first;
2091 parent_service.connection_inactive(server_endpoint);
2094 rw_scoped_lock scope_lock(module_function_realserver_disconnect_mutex);
2095 module_event = protocol_module->handle_realserver_disconnect(up_thread_id, server_endpoint);
2099 if (!realserver_found) {
2100 // realserver not found
2101 //----Debug log----------------------------------------------------------------------
2102 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2103 std::stringstream buf;
2104 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
2105 buf << "close realserver not fond";
2106 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 38, buf.str(), __FILE__, __LINE__);
2108 //----Debug log----------------------------------------------------------------------
2109 rw_scoped_lock scope_lock(module_function_realserver_disconnect_mutex);
2110 module_event = protocol_module->handle_realserver_disconnect(up_thread_id, endpoint());
2112 up_thread_send_realserver_socket_map.clear();
2113 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
2114 up_thread_next_call_function = up_thread_function_array[func_type->second];
2116 //allrealserver_disconnect.
2117 boost::mutex::scoped_lock lock(realserver_connect_mutex);
2118 realserver_connect_status = true;
2119 realserver_connect_cond.notify_one();
2121 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2122 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_all_realserver_disconnect: NEXT_FUNC[%s]");
2123 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
2124 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2127 //! up thread send sorryserver and raise module event of handle_sorryserver_send
2128 //! @param[in] process_type is process type
2129 void tcp_session::up_thread_sorryserver_send(const TCP_PROCESS_TYPE_TAG process_type)
2131 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2132 boost::format formatter("Thread ID[%d] FUNC IN up_thread_sorryserver_send");
2133 formatter % boost::this_thread::get_id();
2134 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2137 boost::system::error_code ec;
2138 endpoint sorry_endpoint = up_thread_data_dest_side.get_endpoint();
2139 boost::array<char, MAX_BUFFER_SIZE>& data_buff = up_thread_data_dest_side.get_data();
2140 std::size_t data_size = up_thread_data_dest_side.get_size();
2141 std::size_t send_data_size = up_thread_data_dest_side.get_send_size();
2142 std::size_t send_size;
2143 UP_THREAD_FUNC_TYPE_TAG func_tag;
2145 struct epoll_event event;
2146 event.data.fd = sorryserver_socket.second->get_socket().native();
2147 if (is_epoll_edge_trigger) {
2148 event.events = EPOLLET | EPOLLOUT | EPOLLHUP;
2150 event.events = EPOLLOUT | EPOLLHUP;
2152 bool add_flag = false;
2153 if (!up_sorryserver_epollfd_registered) {
2154 if (epoll_ctl(up_sorryserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
2155 std::stringstream buf;
2156 buf << "up_thread_sorryserver_send: epoll_ctl EPOLL_CTL_ADD error: ";
2157 buf << strerror(errno);
2158 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
2159 boost::this_thread::yield();
2162 up_sorryserver_epollfd_registered = true;
2168 rd_scoped_lock scoped_lock(exit_flag_update_mutex);
2169 if (unlikely(exit_flag)) {
2170 func_tag = UP_FUNC_EXIT;
2175 if (is_epoll_edge_trigger && (!add_flag)) {
2176 if (epoll_ctl(up_sorryserver_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
2177 std::stringstream buf;
2178 buf << "up_thread_sorryserver_send: epoll_ctl EPOLL_CTL_MOD error: ";
2179 buf << strerror(errno);
2180 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
2181 boost::this_thread::yield();
2186 int ret_fds = epoll_wait(up_sorryserver_epollfd, up_sorryserver_events, EVENT_NUM, epoll_timeout);
2189 //----Debug log----------------------------------------------------------------------
2190 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2191 std::stringstream buf;
2192 buf << "up_thread_sorryserver_send: epoll_wait timeout " << epoll_timeout << " msec.";
2193 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
2195 //----Debug log----------------------------------------------------------------------
2197 std::stringstream buf;
2198 buf << "up_thread_sorryserver_send: epoll_wait error: ";
2199 buf << strerror(errno);
2200 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
2202 up_thread_next_call_function = up_thread_function_array[UP_FUNC_SORRYSERVER_DISCONNECT];
2206 for (int i = 0; i < ret_fds; ++i) {
2207 if (up_sorryserver_events[i].data.fd == event.data.fd) {
2208 if (up_sorryserver_events[i].events & EPOLLOUT) {
2210 } else if (up_sorryserver_events[i].events & EPOLLHUP) {
2211 up_thread_next_call_function = up_thread_function_array[UP_FUNC_SORRYSERVER_DISCONNECT];
2217 send_size = sorryserver_socket.second->write_some(boost::asio::buffer(data_buff.data() + send_data_size, data_size - send_data_size), ec);
2220 send_data_size += send_size;
2221 up_thread_data_dest_side.set_send_size(send_data_size);
2222 //----Debug log----------------------------------------------------------------------
2223 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2224 std::stringstream buf;
2225 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
2226 buf << "up_thread_sorryserver_send send data size[" << send_size << "] for [" << sorry_endpoint << "]";
2227 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 39, buf.str(), __FILE__, __LINE__);
2229 //----Debug log----------------------------------------------------------------------
2230 if (data_size > send_data_size) {
2231 func_tag = UP_FUNC_SORRYSERVER_SEND;
2233 protocol_module_base::EVENT_TAG module_event = protocol_module->handle_sorryserver_send(up_thread_id);
2234 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
2235 if (unlikely(func_type == up_thread_module_event_map.end())) {
2236 //Error unknown protocol_module_base::EVENT_TAG return
2237 std::stringstream buf;
2238 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
2239 buf << "protocol_module returned illegal EVENT_TAG: " << module_event;
2240 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 49, buf.str(), __FILE__, __LINE__);
2241 up_thread_exit(process_type);
2244 func_tag = func_type->second;
2248 if (ec == boost::asio::error::try_again) {
2249 //func_tag = UP_FUNC_SORRYSERVER_SEND;
2250 //boost::this_thread::yield();
2252 func_tag = UP_FUNC_SORRYSERVER_DISCONNECT;
2258 up_thread_next_call_function = up_thread_function_array[func_tag];
2260 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2261 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_sorryserver_send: NEXT_FUNC[%s]");
2262 formatter % boost::this_thread::get_id() % func_tag_to_string(func_tag);
2263 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2266 //! up thread raise module event of handle_sorryserver_select
2267 //! @param[in] process_type is process type
2268 void tcp_session::up_thread_sorryserver_get_destination_event(const TCP_PROCESS_TYPE_TAG process_type)
2270 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2271 boost::format formatter("Thread ID[%d] FUNC IN up_thread_sorryserver_get_destination_event");
2272 formatter % boost::this_thread::get_id();
2273 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2276 endpoint server_endpoint;
2277 virtualservice_element &vs_element = parent_service.get_element();
2278 server_endpoint = vs_element.sorry_endpoint;
2279 protocol_module_base::EVENT_TAG module_event = protocol_module->handle_sorryserver_select(up_thread_id, server_endpoint);
2280 up_thread_data_dest_side.set_endpoint(server_endpoint);
2282 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
2283 up_thread_next_call_function = up_thread_function_array[func_type->second];
2285 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2286 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_sorryserver_get_destination_event: NEXT_FUNC[%s]");
2287 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
2288 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2291 //! up thread connect sorryserver
2292 //! @param[in] process_type is process type
2293 void tcp_session::up_thread_sorryserver_connect(const TCP_PROCESS_TYPE_TAG process_type)
2295 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2296 boost::format formatter("Thread ID[%d] FUNC IN up_thread_sorryserver_connect");
2297 formatter % boost::this_thread::get_id();
2298 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2301 endpoint sorry_endpoint = up_thread_data_dest_side.get_endpoint();
2302 UP_THREAD_FUNC_TYPE_TAG func_tag;
2303 boost::system::error_code ec;
2304 if (sorryserver_socket.second->get_socket().lowest_layer().is_open()) {
2305 func_tag = UP_FUNC_SORRYSERVER_CONNECT_EVENT;
2307 #ifdef IP_TRANSPARENT
2308 if (parent_service.get_element().sorry_fwdmode == virtualservice_element::FWD_TPROXY && (
2309 ( sorry_endpoint.address().is_v4() && client_endpoint.address().is_v4() ) ||
2310 ( sorry_endpoint.address().is_v6() && client_endpoint.address().is_v6() ) ) ) {
2311 int ip_socket_level;
2312 if (client_endpoint.address().is_v4()) {
2313 ip_socket_level = SOL_IP;
2314 sorryserver_socket.second->get_socket().open(boost::asio::ip::tcp::v4(), ec);
2316 ip_socket_level = SOL_IPV6;
2317 sorryserver_socket.second->get_socket().open(boost::asio::ip::tcp::v6(), ec);
2320 boost::format formatter("Thread ID[%d] sorryserver socket open error: %s");
2321 formatter % boost::this_thread::get_id() % ec.message();
2322 Logger::putLogError(LOG_CAT_L7VSD_SESSION, /*XXX*/999, formatter.str(), __FILE__, __LINE__);
2325 // set IP_TRANSPARENT
2327 int err = ::setsockopt(sorryserver_socket.second->get_socket().native(),
2328 ip_socket_level, IP_TRANSPARENT, &on, sizeof(on));
2329 if (unlikely(err)) {
2330 ec = boost::system::error_code(errno, boost::asio::error::get_system_category());
2331 boost::format formatter("Thread ID[%d] sorryserver socket option(IP_TRANSPARENT) set failed: %s");
2332 formatter % boost::this_thread::get_id() % ec.message();
2333 Logger::putLogError(LOG_CAT_L7VSD_SESSION, /*XXX*/999, formatter.str(), __FILE__, __LINE__);
2336 // bind client address
2337 sorryserver_socket.second->get_socket().bind(client_endpoint, ec);
2339 if (ec == boost::asio::error::address_in_use) {
2340 boost::format formatter("Thread ID[%d] bind client address error(%s). retry by other port.");
2341 formatter % boost::this_thread::get_id() % ec.message();
2342 Logger::putLogInfo(LOG_CAT_L7VSD_SESSION, /*XXX*/999, formatter.str(), __FILE__, __LINE__);
2344 unsigned short client_port = client_endpoint.port();
2345 client_endpoint.port(0);
2346 sorryserver_socket.second->get_socket().bind(client_endpoint, ec);
2347 client_endpoint.port(client_port);
2350 boost::format formatter("Thread ID[%d] bind client address to sorryserver socket failed: %s");
2351 formatter % boost::this_thread::get_id() % ec.message();
2352 Logger::putLogError(LOG_CAT_L7VSD_SESSION, /*XXX*/999, formatter.str(), __FILE__, __LINE__);
2358 bool bres = sorryserver_socket.second->connect(sorry_endpoint, ec);
2360 if (unlikely(!sorryserver_socket.second->set_non_blocking_mode(ec))) {
2361 // socket set non-blocking mode error
2362 std::stringstream buf;
2363 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
2364 buf << "set non blocking socket error: " << ec.message();
2365 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 53, buf.str(), __FILE__, __LINE__);
2366 up_thread_exit(process_type);
2368 sorryserver_socket.first = sorry_endpoint;
2369 func_tag = UP_FUNC_SORRYSERVER_CONNECT_EVENT;
2371 func_tag = UP_FUNC_SORRYSERVER_CONNECT_FAIL_EVENT;
2372 //connect socket error
2373 std::stringstream buf;
2374 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
2375 buf << "connect socket error: " << ec.message();
2376 Logger::putLogError(LOG_CAT_L7VSD_SESSION, 54, buf.str(), __FILE__, __LINE__);
2379 up_thread_next_call_function = up_thread_function_array[func_tag];
2381 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2382 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_sorryserver_connect: NEXT_FUNC[%s]");
2383 formatter % boost::this_thread::get_id() % func_tag_to_string(func_tag);
2384 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2387 //! up thread raise module event of handle_sorryserver_connect
2388 //! @param[in] process_type is process type
2389 void tcp_session::up_thread_sorryserver_connect_event(const TCP_PROCESS_TYPE_TAG process_type)
2391 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2392 boost::format formatter("Thread ID[%d] FUNC IN up_thread_sorryserver_connect_event");
2393 formatter % boost::this_thread::get_id();
2394 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2397 endpoint sorry_endpoint = up_thread_data_dest_side.get_endpoint();
2398 up_thread_data_dest_side.initialize();
2399 boost::array<char, MAX_BUFFER_SIZE>& data_buff = up_thread_data_dest_side.get_data();
2400 size_t data_size = 0;
2401 protocol_module_base::EVENT_TAG module_event = protocol_module->handle_sorryserver_connect(up_thread_id, data_buff, data_size);
2402 up_thread_data_dest_side.set_endpoint(sorry_endpoint);
2403 up_thread_data_dest_side.set_size(data_size);
2404 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
2405 up_thread_next_call_function = up_thread_function_array[func_type->second];
2407 // protocol module sorry mode change
2408 tcp_thread_message *down_msg = new tcp_thread_message;
2409 std::map<DOWN_THREAD_FUNC_TYPE_TAG, tcp_session_func>::iterator
2410 down_func = up_thread_message_down_thread_function_map.find(DOWN_FUNC_SORRY_ENABLE_EVENT);
2411 down_msg->message = down_func->second;
2412 while (!down_thread_message_que.push(down_msg)) {}
2414 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2415 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_sorryserver_connect_event: NEXT_FUNC[%s]");
2416 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
2417 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2420 //! up thread raise module event of handle_sorryserver_connection_fail
2421 //! @param[in] process_type is process type
2422 void tcp_session::up_thread_sorryserver_connection_fail_event(const TCP_PROCESS_TYPE_TAG process_type)
2424 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2425 boost::format formatter("Thread ID[%d] FUNC IN up_thread_sorryserver_connection_fail_event");
2426 formatter % boost::this_thread::get_id();
2427 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2430 endpoint server_endpoint = up_thread_data_dest_side.get_endpoint();
2431 protocol_module_base::EVENT_TAG module_event = protocol_module->handle_sorryserver_connection_fail(up_thread_id, server_endpoint);
2432 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
2433 up_thread_next_call_function = up_thread_function_array[func_type->second];
2435 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2436 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_sorryserver_connection_fail_event: NEXT_FUNC[%s]");
2437 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
2438 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2441 //! up thread close sorryserver socket and raise sorryserver disconnect event message for up and down thread
2442 //! @param[in] process_type is process type
2443 void tcp_session::up_thread_sorryserver_disconnect(const TCP_PROCESS_TYPE_TAG process_type)
2445 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2446 boost::format formatter("Thread ID[%d] FUNC IN up_thread_sorryserver_disconnect");
2447 formatter % boost::this_thread::get_id();
2448 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2451 boost::system::error_code ec;
2452 bool bres = sorryserver_socket.second->close(ec);
2454 tcp_thread_message *up_msg = new tcp_thread_message;
2455 tcp_thread_message *down_msg = new tcp_thread_message;
2456 up_thread_function_pair up_func = up_thread_function_array[UP_FUNC_SORRYSERVER_DISCONNECT_EVENT];
2457 up_msg->message = up_func.second;
2458 up_msg->endpoint_info = sorryserver_socket.first;
2459 std::map< DOWN_THREAD_FUNC_TYPE_TAG, tcp_session_func >::iterator down_func = up_thread_message_down_thread_function_map.find(DOWN_FUNC_SORRYSERVER_DISCONNECT_EVENT);
2460 down_msg->message = down_func->second;
2461 down_msg->endpoint_info = sorryserver_socket.first;
2462 while (!up_thread_message_que.push(up_msg)) {}
2463 while (!down_thread_message_que.push(down_msg)) {}
2466 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2467 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_sorryserver_disconnect");
2468 formatter % boost::this_thread::get_id();
2469 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2472 //! up thread close sorryserver socket and raise module sorryserver disconnect event
2473 //! @param[in] process_type is process type
2474 void tcp_session::up_thread_sorryserver_mod_disconnect(const TCP_PROCESS_TYPE_TAG process_type)
2476 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2477 boost::format formatter("Thread ID[%d] FUNC IN up_thread_sorryserver_mod_disconnect");
2478 formatter % boost::this_thread::get_id();
2479 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2482 protocol_module_base::EVENT_TAG module_event;
2484 boost::system::error_code ec;
2485 endpoint sorry_endpoint = sorryserver_socket.first;
2486 bool bres = sorryserver_socket.second->close(ec);
2488 sorryserver_socket.first = endpoint();
2491 rw_scoped_lock scope_lock(module_function_sorryserver_disconnect_mutex);
2492 module_event = protocol_module->handle_sorryserver_disconnect(up_thread_id, sorry_endpoint);
2494 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
2495 up_thread_next_call_function = up_thread_function_array[func_type->second];
2497 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2498 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_sorryserver_mod_disconnect: NEXT_FUNC[%s]");
2499 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
2500 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2504 //! up thread raise module event of handle_sorryserver_disconnect
2505 //! @param[in] process_type is process type
2506 void tcp_session::up_thread_sorryserver_disconnect_event(const TCP_PROCESS_TYPE_TAG process_type)
2508 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2509 boost::format formatter("Thread ID[%d] FUNC IN up_thread_sorryserver_disconnect_event");
2510 formatter % boost::this_thread::get_id();
2511 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2514 protocol_module_base::EVENT_TAG module_event;
2515 endpoint sorry_endpoint = up_thread_message_data.get_endpoint();
2517 rw_scoped_lock scope_lock(module_function_sorryserver_disconnect_mutex);
2518 module_event = protocol_module->handle_sorryserver_disconnect(up_thread_id, sorry_endpoint);
2520 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
2521 up_thread_next_call_function = up_thread_function_array[func_type->second];
2523 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2524 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_sorryserver_disconnect_event NEXT_FUNC[%s]");
2525 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
2526 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2529 //! up thread raise module event of handle_sorry_enable
2530 //! @param[in] process_type is process type
2531 void tcp_session::up_thread_sorry_enable_event(const TCP_PROCESS_TYPE_TAG process_type)
2533 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2534 boost::format formatter("Thread ID[%d] FUNC IN up_thread_sorry_enable_event");
2535 formatter % boost::this_thread::get_id();
2536 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2539 protocol_module_base::EVENT_TAG module_event;
2541 rw_scoped_lock scope_lock(module_function_sorry_enable_mutex);
2542 //----Debug log----------------------------------------------------------------------
2543 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2544 std::stringstream buf;
2545 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
2546 buf << "handle_sorry_enable call";
2547 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 40, buf.str(), __FILE__, __LINE__);
2549 //----Debug log----------------------------------------------------------------------
2550 module_event = protocol_module->handle_sorry_enable(up_thread_id);
2552 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
2553 up_thread_next_call_function = up_thread_function_array[func_type->second];
2555 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2556 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_sorry_enable_event: NEXT_FUNC[%s]");
2557 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
2558 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2561 //! up thread raise module event of handle_sorry_disable
2562 //! @param[in] process_type is process type
2563 void tcp_session::up_thread_sorry_disable_event(const TCP_PROCESS_TYPE_TAG process_type)
2565 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2566 boost::format formatter("Thread ID[%d] FUNC IN up_thread_sorry_disable_event");
2567 formatter % boost::this_thread::get_id();
2568 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2571 protocol_module_base::EVENT_TAG module_event;
2573 rw_scoped_lock scope_lock(module_function_sorry_disable_mutex);
2574 //----Debug log----------------------------------------------------------------------
2575 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2576 std::stringstream buf;
2577 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
2578 buf << "handle_sorry_disable call";
2579 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 41, buf.str(), __FILE__, __LINE__);
2581 //----Debug log----------------------------------------------------------------------
2582 module_event = protocol_module->handle_sorry_disable(up_thread_id);
2584 std::map<protocol_module_base::EVENT_TAG, UP_THREAD_FUNC_TYPE_TAG>::iterator func_type = up_thread_module_event_map.find(module_event);
2585 up_thread_next_call_function = up_thread_function_array[func_type->second];
2587 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2588 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_sorry_disable_event: NEXT_FUNC[%s]");
2589 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
2590 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2594 //! up thread exit main loop
2595 //! @param[in] process_type is process type
2596 void tcp_session::up_thread_exit(const TCP_PROCESS_TYPE_TAG process_type)
2598 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2599 boost::format formatter("Thread ID[%d] FUNC IN func up_thread_exit");
2600 formatter % boost::this_thread::get_id();
2601 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2603 rw_scoped_lock scoped_lock(exit_flag_update_mutex);
2604 boost::mutex::scoped_lock down_thread_cond_lock(upthread_status_mutex);
2605 boost::mutex::scoped_lock realserver_status_lock(realserver_connect_mutex);
2606 upthread_status_cond.notify_one();
2607 realserver_connect_cond.notify_one();
2608 realserver_connect_status = true;
2611 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2612 boost::format formatter("Thread ID[%d] FUNC OUT func up_thread_client_disconnect");
2613 formatter % boost::this_thread::get_id();
2614 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2617 //! up thread close all socket
2618 void tcp_session::up_thread_all_socket_close(void)
2620 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2621 boost::format formatter("Thread ID[%d] FUNC IN up_thread_all_socket_close");
2622 formatter % boost::this_thread::get_id();
2623 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2626 std::map<endpoint, tcp_socket_ptr>::iterator close_socket = up_thread_send_realserver_socket_map.begin();
2627 std::map<endpoint, tcp_socket_ptr>::iterator list_end = up_thread_send_realserver_socket_map.end();
2628 boost::system::error_code ec;
2629 while (close_socket != list_end) {
2630 bool bres = close_socket->second->close(ec);
2632 parent_service.connection_inactive(close_socket->first);
2635 up_thread_send_realserver_socket_map.clear();
2636 down_thread_connect_socket_list.clear();
2639 client_socket.close(ec);
2641 client_ssl_socket.close(ec);
2643 sorryserver_socket.second->close(ec);
2645 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2646 boost::format formatter("Thread ID[%d] FUNC OUT up_thread_all_socket_close");
2647 formatter % boost::this_thread::get_id();
2648 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2652 //! down thread receive from realserver and raise module event of handle_realserver_recv
2653 //! @param[in] process_type is process type
2654 void tcp_session::down_thread_realserver_receive(const TCP_PROCESS_TYPE_TAG process_type)
2656 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2657 boost::format formatter("Thread ID[%d] FUNC IN down_thread_realserver_receive");
2658 formatter % boost::this_thread::get_id();
2659 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2662 if (down_thread_receive_realserver_socket_list.empty()) {
2663 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2664 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_realserver_receive");
2665 formatter % boost::this_thread::get_id();
2666 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2668 boost::mutex::scoped_lock lock(realserver_connect_mutex);
2669 if (!realserver_connect_status) {
2670 to_time(LOCKTIMEOUT, xt);
2671 realserver_connect_cond.timed_wait(lock, xt);
2672 //realserver_connect_cond.wait(lock);
2674 down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_REALSERVER_RECEIVE];
2678 if (unlikely(0 < parent_service.get_wait_downstream())) {
2679 //----Debug log----------------------------------------------------------------------
2680 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2681 std::stringstream buf;
2682 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
2683 buf << "down_thread_realserver_receive qos wait active";
2684 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 42, buf.str(), __FILE__, __LINE__);
2686 //----Debug log----------------------------------------------------------------------
2689 down_thread_data_dest_side.initialize();
2690 boost::array<char, MAX_BUFFER_SIZE>& data_buff = down_thread_data_dest_side.get_data();
2691 boost::system::error_code ec;
2693 DOWN_THREAD_FUNC_TYPE_TAG func_tag;
2695 struct epoll_event event;
2696 event.data.fd = down_thread_current_receive_realserver_socket->second->get_socket().native();
2697 if (is_epoll_edge_trigger) {
2698 event.events = EPOLLIN | EPOLLHUP | EPOLLET;
2700 event.events = EPOLLIN | EPOLLHUP;
2702 bool add_flag = false;
2703 if (!down_realserver_epollfd_registered) {
2704 if (epoll_ctl(down_realserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
2705 std::stringstream buf;
2706 buf << "down_thread_realserver_receive: epoll_ctl EPOLL_CTL_ADD error: " << strerror(errno);
2707 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
2708 boost::this_thread::yield();
2711 down_realserver_epollfd_registered = true;
2715 if (is_epoll_edge_trigger && (!add_flag)) {
2716 if (epoll_ctl(down_realserver_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
2717 std::stringstream buf;
2718 buf << "down_thread_realserver_receive: epoll_ctl EPOLL_CTL_MOD error: " << strerror(errno);
2719 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
2720 boost::this_thread::yield();
2725 int ret_fds = epoll_wait(down_realserver_epollfd, down_realserver_events, EVENT_NUM, epoll_timeout);
2728 //----Debug log----------------------------------------------------------------------
2729 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2730 boost::format formatter("down_thread_realserver_receive: epoll_wait timeout %d msec.");
2731 formatter % epoll_timeout;
2732 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2734 //----Debug log----------------------------------------------------------------------
2735 down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_REALSERVER_RECEIVE];
2738 boost::format formatter("down_thread_realserver_receive: epoll_wait error: %d");
2739 formatter % strerror(errno);
2740 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2741 down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_REALSERVER_DISCONNECT];
2746 for (int i = 0; i < ret_fds; ++i) {
2747 if (down_realserver_events[i].data.fd == event.data.fd) {
2748 if (down_realserver_events[i].events & EPOLLIN) {
2750 } else if (down_realserver_events[i].events & EPOLLHUP) {
2751 down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_REALSERVER_DISCONNECT];
2752 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2753 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_realserver_receive: EPOLL_HUP");
2754 formatter % boost::this_thread::get_id();
2761 endpoint server_endpoint = down_thread_current_receive_realserver_socket->first;
2762 down_thread_data_dest_side.set_endpoint(server_endpoint);
2764 recv_size = down_thread_current_receive_realserver_socket->second->read_some(boost::asio::buffer(data_buff, MAX_BUFFER_SIZE), ec);
2766 if (recv_size > 0) {
2767 //----Debug log----------------------------------------------------------------------
2768 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2769 std::stringstream buf;
2770 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
2771 buf << "down_thread_realserver_receive receive data size[" << recv_size << "] from [" << server_endpoint << "]";
2772 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 43, buf.str(), __FILE__, __LINE__);
2774 //----Debug log----------------------------------------------------------------------
2775 down_thread_data_dest_side.set_size(recv_size);
2776 parent_service.update_down_recv_size(recv_size);
2778 protocol_module_base::EVENT_TAG module_event = protocol_module->handle_realserver_recv(down_thread_id, server_endpoint, data_buff, recv_size);
2780 std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type = down_thread_module_event_map.find(module_event);
2781 func_tag = func_type->second;
2783 func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
2784 //boost::this_thread::yield();
2787 if (ec == boost::asio::error::try_again) {
2788 func_tag = DOWN_FUNC_REALSERVER_RECEIVE;
2789 //boost::this_thread::yield();
2791 func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
2795 down_thread_function_pair func = down_thread_function_array[func_tag];
2796 down_thread_current_receive_realserver_socket++;
2797 std::list<socket_element>::iterator list_end = down_thread_receive_realserver_socket_list.end();
2798 if (down_thread_current_receive_realserver_socket == list_end) {
2799 down_thread_current_receive_realserver_socket = down_thread_receive_realserver_socket_list.begin();
2801 down_thread_next_call_function = func;
2803 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2804 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_realserver_receive: NEXT_FUNC[%s]");
2805 formatter % boost::this_thread::get_id() % func_tag_to_string(func_tag);
2806 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2809 //! down thread close realserver socket and raise realserver disconnect event message for up and down thread
2810 //! @param[in] process_type is process type
2811 void tcp_session::down_thread_realserver_disconnect(const TCP_PROCESS_TYPE_TAG process_type)
2813 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2814 boost::format formatter("Thread ID[%d] FUNC IN down_thread_realserver_disconnect");
2815 formatter % boost::this_thread::get_id();
2816 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2819 endpoint server_endpoint = down_thread_data_dest_side.get_endpoint();
2820 std::list<socket_element>::iterator list_end = down_thread_receive_realserver_socket_list.end();
2821 std::list<socket_element>::iterator close_socket = down_thread_receive_realserver_socket_list.begin();
2822 while (list_end != close_socket) {
2823 endpoint close_endpoint = close_socket->first;
2824 if (close_endpoint == server_endpoint) {
2825 boost::system::error_code ec;
2826 bool bres = close_socket->second->close(ec);
2828 parent_service.connection_inactive(server_endpoint);
2829 tcp_thread_message *up_msg = new tcp_thread_message;
2830 tcp_thread_message *down_msg = new tcp_thread_message;
2831 down_thread_function_pair down_func = down_thread_function_array[DOWN_FUNC_REALSERVER_DISCONNECT_EVENT];
2832 down_msg->endpoint_info = server_endpoint;
2833 down_msg->message = down_func.second;
2834 std::map<UP_THREAD_FUNC_TYPE_TAG, tcp_session_func>::iterator up_func = down_thread_message_up_thread_function_map.find(UP_FUNC_REALSERVER_DISCONNECT_EVENT);
2835 up_msg->endpoint_info = server_endpoint;
2836 up_msg->message = up_func->second;
2837 while (!up_thread_message_que.push(up_msg)) {}
2838 while (!down_thread_message_que.push(down_msg)) {}
2845 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2846 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_realserver_disconnect");
2847 formatter % boost::this_thread::get_id();
2848 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2852 //! down thread raise module event of handle_realserver_disconnect
2853 //! @param[in] process_type is process type
2854 void tcp_session::down_thread_realserver_disconnect_event(const TCP_PROCESS_TYPE_TAG process_type)
2856 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2857 boost::format formatter("Thread ID[%d] FUNC IN down_thread_realserver_disconnect_event");
2858 formatter % boost::this_thread::get_id();
2859 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2862 protocol_module_base::EVENT_TAG module_event;
2863 endpoint server_endpoint = down_thread_message_data.get_endpoint();
2865 rw_scoped_lock scope_lock(module_function_realserver_disconnect_mutex);
2866 module_event = protocol_module->handle_realserver_disconnect(down_thread_id, server_endpoint);
2869 std::list<socket_element>::iterator list_end = down_thread_receive_realserver_socket_list.end();
2870 std::list<socket_element>::iterator erase_socket = down_thread_receive_realserver_socket_list.begin();
2871 while (likely(erase_socket != list_end)) {
2872 if (erase_socket->first == server_endpoint) {
2873 down_thread_receive_realserver_socket_list.erase(erase_socket);
2874 down_thread_current_receive_realserver_socket = down_thread_receive_realserver_socket_list.begin();
2880 std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type = down_thread_module_event_map.find(module_event);
2881 down_thread_next_call_function = down_thread_function_array[func_type->second];
2883 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2884 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_realserver_disconnect_event: NEXT_FUNC[%s]");
2885 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
2886 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2889 //! down thread close realserver socket and raise realserver disconnect event message for up and down thread
2890 //! @param[in] process_type is process type
2891 void tcp_session::down_thread_all_realserver_disconnect(const TCP_PROCESS_TYPE_TAG process_type)
2893 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2894 boost::format formatter("Thread ID[%d] FUNC IN down_thread_all_realserver_disconnect");
2895 formatter % boost::this_thread::get_id();
2896 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2899 std::list<socket_element>::iterator close_socket = down_thread_receive_realserver_socket_list.begin();
2900 std::list<socket_element>::iterator list_end = down_thread_receive_realserver_socket_list.end();
2902 protocol_module_base::EVENT_TAG module_event;
2903 endpoint server_endpoint;
2904 bool realserver_fond = false;
2905 while (close_socket != list_end) {
2906 realserver_fond = true;
2907 boost::system::error_code ec;
2908 bool bres = close_socket->second->close(ec);
2909 server_endpoint = close_socket->first;
2911 parent_service.connection_inactive(server_endpoint);
2914 rw_scoped_lock scope_lock(module_function_realserver_disconnect_mutex);
2915 module_event = protocol_module->handle_realserver_disconnect(down_thread_id, server_endpoint);
2920 if (!realserver_fond) {
2921 //not fond realserver
2922 //----Debug log----------------------------------------------------------------------
2923 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2924 std::stringstream buf;
2925 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
2926 buf << "close realserver not fond";
2927 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 44, buf.str(), __FILE__, __LINE__);
2929 //----Debug log----------------------------------------------------------------------
2930 rw_scoped_lock scope_lock(module_function_realserver_disconnect_mutex);
2931 module_event = protocol_module->handle_realserver_disconnect(down_thread_id, endpoint());
2933 down_thread_receive_realserver_socket_list.clear();
2934 std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type = down_thread_module_event_map.find(module_event);
2935 down_thread_next_call_function = down_thread_function_array[func_type->second];
2937 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2938 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_all_realserver_disconnect: NEXT_FUNC[%s]");
2939 formatter % boost::this_thread::get_id() % func_type->second;
2940 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2943 //! down thread raise module event of handle_client_connection_check
2944 //! @param[in] process_type is process type
2945 void tcp_session::down_thread_client_connection_chk_event(const TCP_PROCESS_TYPE_TAG process_type)
2947 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2948 boost::format formatter("Thread ID[%d] FUNC IN down_thread_client_connection_chk_event");
2949 formatter % boost::this_thread::get_id();
2950 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2953 down_thread_data_client_side.initialize();
2954 boost::array<char, MAX_BUFFER_SIZE>& data_buff = down_thread_data_client_side.get_data();
2955 std::size_t data_size;
2956 protocol_module_base::EVENT_TAG module_event = protocol_module->handle_client_connection_check(down_thread_id, data_buff, data_size);
2957 down_thread_data_client_side.set_size(data_size);
2958 std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type = down_thread_module_event_map.find(module_event);
2959 down_thread_next_call_function = down_thread_function_array[func_type->second];
2961 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2962 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_client_connection_chk_event: NEXT_FUNC[%s]");
2963 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
2964 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2968 //! down thread raise module event of handle_response_send_inform
2969 //! @param[in] process_type is process type
2970 void tcp_session::down_thread_client_respond_event(const TCP_PROCESS_TYPE_TAG process_type)
2972 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2973 boost::format formatter("Thread ID[%d] FUNC IN down_thread_client_respond_event");
2974 formatter % boost::this_thread::get_id();
2975 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2978 protocol_module_base::EVENT_TAG module_event;
2980 rw_scoped_lock scope_lock(module_function_response_send_inform_mutex);
2981 module_event = protocol_module->handle_response_send_inform(down_thread_id);
2983 std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type = down_thread_module_event_map.find(module_event);
2984 down_thread_next_call_function = down_thread_function_array[func_type->second];
2986 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2987 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_client_respond_event: NEXT_FUNC[%s]");
2988 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
2989 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
2992 //! down thread send for client and raise module event of handle_client_send
2993 //! @param[in] process_type is process type
2994 void tcp_session::down_thread_client_send(const TCP_PROCESS_TYPE_TAG process_type)
2996 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
2997 boost::format formatter("Thread ID[%d] FUNC IN down_thread_client_send");
2998 formatter % boost::this_thread::get_id();
2999 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3002 boost::system::error_code ec;
3003 boost::array<char, MAX_BUFFER_SIZE>& data_buff = down_thread_data_client_side.get_data();
3004 std::size_t data_size = down_thread_data_client_side.get_size();
3005 std::size_t send_data_size = down_thread_data_client_side.get_send_size();
3006 std::size_t send_size;
3007 DOWN_THREAD_FUNC_TYPE_TAG func_tag;
3009 struct epoll_event event;
3010 event.data.fd = !ssl_flag ? client_socket.get_socket().native()
3011 : client_ssl_socket.get_socket().lowest_layer().native();
3013 if (is_epoll_edge_trigger) {
3014 event.events = EPOLLOUT | EPOLLHUP | EPOLLET;
3016 event.events = EPOLLOUT | EPOLLHUP;
3018 bool add_flag = false;
3019 if (!down_client_epollfd_registered) {
3020 if (epoll_ctl(down_client_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
3021 std::stringstream buf;
3022 buf << "down_thread_client_send: epoll_ctl EPOLL_CTL_ADD error: ";
3023 buf << strerror(errno);
3024 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
3025 boost::this_thread::yield();
3028 down_client_epollfd_registered = true;
3034 rd_scoped_lock scoped_lock(exit_flag_update_mutex);
3035 if (unlikely(exit_flag)) {
3036 func_tag = DOWN_FUNC_EXIT;
3041 if (is_epoll_edge_trigger && (!add_flag)) {
3042 if (epoll_ctl(down_client_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
3043 std::stringstream buf;
3044 buf << "down_thread_client_send: epoll_ctl EPOLL_CTL_MOD error: ";
3045 buf << strerror(errno);
3046 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
3047 boost::this_thread::yield();
3052 int ret_fds = epoll_wait(down_client_epollfd, down_client_events, EVENT_NUM, epoll_timeout);
3055 std::stringstream buf;
3056 buf << "down_thread_client_send: epoll_wait timeout ";
3057 buf << epoll_timeout;
3059 Logger::putLogInfo(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
3061 std::stringstream buf;
3062 buf << "down_thread_client_send: epoll_wait error: ";
3063 buf << strerror(errno);
3064 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
3066 down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_CLIENT_DISCONNECT];
3070 for (int i = 0; i < ret_fds; ++i) {
3071 if (down_client_events[i].data.fd == event.data.fd) {
3072 if (down_client_events[i].events & EPOLLOUT) {
3074 } else if (down_client_events[i].events & EPOLLHUP) {
3075 down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_CLIENT_DISCONNECT];
3081 send_size = !ssl_flag ? client_socket.write_some(
3082 boost::asio::buffer(
3083 data_buff.data() + send_data_size,
3084 data_size - send_data_size),
3086 : client_ssl_socket.write_some(
3087 boost::asio::buffer(
3088 data_buff.data() + send_data_size,
3089 data_size - send_data_size),
3092 send_data_size += send_size;
3093 down_thread_data_client_side.set_send_size(send_data_size);
3094 parent_service.update_down_send_size(send_size);
3095 //----Debug log----------------------------------------------------------------------
3096 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3097 endpoint client_endpoint // XXX redefined???
3098 = !ssl_flag ? client_socket.get_socket().lowest_layer().remote_endpoint(ec)
3099 : client_ssl_socket.get_socket().lowest_layer().remote_endpoint(ec);
3100 boost::format formatter("Thread ID[%d] down_thread_client_send send data size[%d] for [%d]");
3101 formatter % boost::this_thread::get_id() % send_size % client_endpoint;
3102 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 45, formatter.str(), __FILE__, __LINE__);
3104 //----Debug log----------------------------------------------------------------------
3105 if (data_size > send_data_size) {
3106 func_tag = DOWN_FUNC_CLIENT_SEND;
3107 //down_send_wait.reset();
3109 protocol_module_base::EVENT_TAG module_event = protocol_module->handle_client_send(down_thread_id);
3110 std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type = down_thread_module_event_map.find(module_event);
3111 func_tag = func_type->second;
3115 if (ec == boost::asio::error::try_again) {
3116 //func_tag = DOWN_FUNC_CLIENT_SEND;
3117 //boost::this_thread::yield();
3119 func_tag = DOWN_FUNC_CLIENT_DISCONNECT;
3125 down_thread_next_call_function = down_thread_function_array[func_tag];
3127 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3128 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_client_send: NEXT_FUNC[%s]");
3129 formatter % boost::this_thread::get_id() % func_tag_to_string(func_tag);
3130 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3133 //! down thread close client socket and raise client disconnect event message for up and down thread
3134 //! @param[in] process_type is process type
3135 void tcp_session::down_thread_client_disconnect(const TCP_PROCESS_TYPE_TAG process_type)
3137 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3138 boost::format formatter("Thread ID[%d] FUNC IN down_thread_client_disconnect");
3139 formatter % boost::this_thread::get_id();
3140 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3142 boost::system::error_code ec;
3143 bool bres = !ssl_flag ? client_socket.close(ec)
3144 : client_ssl_socket.close(ec);
3146 tcp_thread_message *up_msg = new tcp_thread_message;
3147 tcp_thread_message *down_msg = new tcp_thread_message;
3148 down_thread_function_pair down_func = down_thread_function_array[DOWN_FUNC_CLIENT_DISCONNECT_EVENT];
3149 down_msg->message = down_func.second;
3150 std::map< UP_THREAD_FUNC_TYPE_TAG, tcp_session_func >::iterator up_func = down_thread_message_up_thread_function_map.find(UP_FUNC_CLIENT_DISCONNECT_EVENT);
3151 up_msg->message = up_func->second;
3152 while (!down_thread_message_que.push(down_msg)) {}
3153 while (!up_thread_message_que.push(up_msg)) {}
3156 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3157 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_client_disconnect");
3158 formatter % boost::this_thread::get_id();
3159 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3163 //! down thread raise module event of handle_client_disconnect
3164 //! @param[in] process_type is process type
3165 void tcp_session::down_thread_client_disconnect_event(const TCP_PROCESS_TYPE_TAG process_type)
3167 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3168 boost::format formatter("Thread ID[%d] FUNC IN down_thread_disconnect_event");
3169 formatter % boost::this_thread::get_id();
3170 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3173 protocol_module_base::EVENT_TAG module_event;
3175 rw_scoped_lock scope_lock(module_function_client_disconnect_mutex);
3176 module_event = protocol_module->handle_client_disconnect(down_thread_id);
3178 std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type =
3179 down_thread_module_event_map.find(module_event);
3180 down_thread_function_pair func = down_thread_function_array[func_type->second];
3181 down_thread_next_call_function = func;
3183 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3184 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_exit: NEXT_FUNC[%s]");
3185 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
3186 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3189 //! down thread receive from sorryserver and raise module event of handle_sorryserver_recv
3190 //! @param[in] process_type is process type
3191 void tcp_session::down_thread_sorryserver_receive(const TCP_PROCESS_TYPE_TAG process_type)
3193 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3194 boost::format formatter("Thread ID[%d] FUNC IN down_thread_sorryserver_receive");
3195 formatter % boost::this_thread::get_id();
3196 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3199 if (unlikely(!sorryserver_socket.second->get_socket().lowest_layer().is_open())) {
3200 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3201 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_sorryserver_receive");
3202 formatter % boost::this_thread::get_id();
3203 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3205 down_thread_next_call_function = down_thread_function_array[DOWN_FUNC_SORRYSERVER_RECEIVE];
3209 down_thread_data_dest_side.initialize();
3210 boost::array<char, MAX_BUFFER_SIZE>& data_buff = down_thread_data_dest_side.get_data();
3211 boost::system::error_code ec;
3213 DOWN_THREAD_FUNC_TYPE_TAG func_tag;
3214 endpoint sorry_endpoint;
3217 struct epoll_event event;
3218 event.data.fd = sorryserver_socket.second->get_socket().native();
3219 bool add_flag = false;
3220 if (!down_sorryserver_epollfd_registered) {
3221 if (is_epoll_edge_trigger) {
3222 event.events = EPOLLIN | EPOLLHUP | EPOLLET;
3224 event.events = EPOLLIN | EPOLLHUP;
3226 if (epoll_ctl(down_sorryserver_epollfd, EPOLL_CTL_ADD, event.data.fd, &event) < 0) {
3227 std::stringstream buf;
3228 buf << "down_thread_sorryserver_receive: epoll_ctl EPOLL_CTL_ADD error: ";
3229 buf << strerror(errno);
3230 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
3231 boost::this_thread::yield();
3234 down_sorryserver_epollfd_registered = true;
3239 rd_scoped_lock scoped_lock(exit_flag_update_mutex);
3240 if (unlikely(exit_flag)) {
3241 func_tag = DOWN_FUNC_EXIT;
3242 goto down_thread_sorryserver_receive_out;
3246 if (is_epoll_edge_trigger && (!add_flag)) {
3247 if (epoll_ctl(down_sorryserver_epollfd, EPOLL_CTL_MOD, event.data.fd, &event) < 0) {
3248 std::stringstream buf;
3249 buf << "down_thread_sorryserver_receive: epoll_ctl EPOLL_CTL_MOD error: ";
3250 buf << strerror(errno);
3251 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
3252 boost::this_thread::yield();
3257 ret_fds = epoll_wait(down_sorryserver_epollfd, down_sorryserver_events, EVENT_NUM, epoll_timeout);
3260 //----Debug log----------------------------------------------------------------------
3261 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3262 std::stringstream buf;
3263 buf << "down_thread_sorryserver_receive: epoll_wait timeout " << epoll_timeout << " msec.";
3264 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
3266 //----Debug log----------------------------------------------------------------------
3267 func_tag = DOWN_FUNC_SORRYSERVER_RECEIVE;
3269 std::stringstream buf;
3270 buf << "down_thread_sorryserver_receive: epoll_wait error: " << strerror(errno);
3271 Logger::putLogWarn(LOG_CAT_L7VSD_SESSION, 999, buf.str(), __FILE__, __LINE__);
3272 func_tag = DOWN_FUNC_SORRYSERVER_DISCONNECT;
3274 goto down_thread_sorryserver_receive_out;
3277 for (int i = 0; i < ret_fds; ++i) {
3278 if (down_sorryserver_events[i].data.fd == event.data.fd) {
3279 if (down_sorryserver_events[i].events & EPOLLIN) {
3281 } else if (down_sorryserver_events[i].events & EPOLLHUP) {
3282 func_tag = DOWN_FUNC_SORRYSERVER_DISCONNECT;
3283 goto down_thread_sorryserver_receive_out;
3288 sorry_endpoint = sorryserver_socket.first;
3289 down_thread_data_dest_side.set_endpoint(sorry_endpoint);
3291 recv_size = sorryserver_socket.second->read_some(boost::asio::buffer(data_buff, MAX_BUFFER_SIZE), ec);
3293 if (recv_size > 0) {
3294 //----Debug log----------------------------------------------------------------------
3295 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3296 std::stringstream buf;
3297 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
3298 buf << "down_thread_sorryserver_receive receive data size[" << recv_size << "] from [" << sorry_endpoint << "]";
3299 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 46, buf.str(), __FILE__, __LINE__);
3301 //----Debug log----------------------------------------------------------------------
3302 down_thread_data_dest_side.set_size(recv_size);
3303 protocol_module_base::EVENT_TAG module_event = protocol_module->handle_sorryserver_recv(down_thread_id, sorry_endpoint, data_buff, recv_size);
3304 std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type = down_thread_module_event_map.find(module_event);
3305 func_tag = func_type->second;
3307 func_tag = DOWN_FUNC_SORRYSERVER_RECEIVE;
3310 if (ec == boost::asio::error::try_again) {
3311 func_tag = DOWN_FUNC_SORRYSERVER_RECEIVE;
3313 func_tag = DOWN_FUNC_SORRYSERVER_DISCONNECT;
3317 down_thread_sorryserver_receive_out:
3318 down_thread_next_call_function = down_thread_function_array[func_tag];
3320 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3321 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_sorryserver_receive: NEXT_FUNC[%s]");
3322 formatter % boost::this_thread::get_id() % func_tag_to_string(func_tag);
3323 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3326 //! down thread raise module event of handle_sorryserver_disconnect
3327 //! @param[in] process_type is process type
3328 void tcp_session::down_thread_sorryserver_disconnect(const TCP_PROCESS_TYPE_TAG process_type)
3330 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3331 boost::format formatter("Thread ID[%d] FUNC IN down_thread_sorryserver_disconnect");
3332 formatter % boost::this_thread::get_id();
3333 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3336 boost::system::error_code ec;
3337 bool bres = sorryserver_socket.second->close(ec);
3339 tcp_thread_message *up_msg = new tcp_thread_message;
3340 tcp_thread_message *down_msg = new tcp_thread_message;
3341 down_thread_function_pair down_func = down_thread_function_array[DOWN_FUNC_SORRYSERVER_DISCONNECT_EVENT];
3342 down_msg->message = down_func.second;
3343 down_msg->endpoint_info = sorryserver_socket.first;
3344 std::map< UP_THREAD_FUNC_TYPE_TAG, tcp_session_func >::iterator up_func = down_thread_message_up_thread_function_map.find(UP_FUNC_SORRYSERVER_DISCONNECT_EVENT);
3345 up_msg->message = up_func->second;
3346 up_msg->endpoint_info = sorryserver_socket.first;
3347 while (!up_thread_message_que.push(up_msg)) {}
3348 while (!down_thread_message_que.push(down_msg)) {}
3351 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3352 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_sorryserver_disconnect");
3353 formatter % boost::this_thread::get_id();
3354 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3357 //! down thread close sorryserver socket and raise module sorryserver disconnect event
3358 //! @param[in] process_type is process type
3359 void tcp_session::down_thread_sorryserver_mod_disconnect(const TCP_PROCESS_TYPE_TAG process_type)
3361 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3362 boost::format formatter("Thread ID[%d] FUNC IN down_thread_sorryserver_mod_disconnect");
3363 formatter % boost::this_thread::get_id();
3364 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3367 protocol_module_base::EVENT_TAG module_event;
3369 boost::system::error_code ec;
3370 endpoint sorry_endpoint = sorryserver_socket.first;
3371 bool bres = sorryserver_socket.second->close(ec);
3373 sorryserver_socket.first = endpoint();
3376 rw_scoped_lock scope_lock(module_function_sorryserver_disconnect_mutex);
3377 module_event = protocol_module->handle_sorryserver_disconnect(down_thread_id, sorry_endpoint);
3380 std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type = down_thread_module_event_map.find(module_event);
3381 down_thread_function_pair func = down_thread_function_array[func_type->second];
3382 down_thread_next_call_function = func;
3384 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3385 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_sorryserver_mod_disconnect: NEXT_FUNC[%s]");
3386 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
3387 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3390 //! down thread raise module event of handle_sorryserver_disconnect
3391 //! @param[in] process_type is process type
3392 void tcp_session::down_thread_sorryserver_disconnect_event(const TCP_PROCESS_TYPE_TAG process_type)
3394 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3395 boost::format formatter("Thread ID[%d] FUNC IN down_thread_sorryserver_disconnect_event");
3396 formatter % boost::this_thread::get_id();
3397 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3400 protocol_module_base::EVENT_TAG module_event;
3401 endpoint sorry_endpoint = down_thread_message_data.get_endpoint();
3403 rw_scoped_lock scope_lock(module_function_sorryserver_disconnect_mutex);
3404 module_event = protocol_module->handle_sorryserver_disconnect(down_thread_id, sorry_endpoint);
3406 std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type = down_thread_module_event_map.find(module_event);
3407 down_thread_function_pair func = down_thread_function_array[func_type->second];
3408 down_thread_next_call_function = func;
3410 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3411 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_sorryserver_disconnect_event: NEXT_FUNC[%s]");
3412 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
3413 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3416 //! down thread raise module event of handle_sorry_enable
3417 //! @param[in] process_type is process type
3418 void tcp_session::down_thread_sorry_enable_event(const TCP_PROCESS_TYPE_TAG process_type)
3420 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3421 boost::format formatter("Thread ID[%d] FUNC IN down_thread_sorry_enable_event");
3422 formatter % boost::this_thread::get_id();
3423 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3426 protocol_module_base::EVENT_TAG module_event;
3428 rw_scoped_lock scope_lock(module_function_sorry_enable_mutex);
3429 //----Debug log----------------------------------------------------------------------
3430 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3431 std::stringstream buf;
3432 buf << "Thread ID[";
3433 buf << boost::this_thread::get_id();
3434 buf << "] handle_sorry_enable call";
3435 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 47, buf.str(), __FILE__, __LINE__);
3437 //----Debug log----------------------------------------------------------------------
3438 module_event = protocol_module->handle_sorry_enable(down_thread_id);
3440 std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type = down_thread_module_event_map.find(module_event);
3441 down_thread_function_pair func = down_thread_function_array[func_type->second];
3442 down_thread_next_call_function = func;
3444 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3445 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_sorry_enable_event: NEXT_FUNC[%s]");
3446 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
3447 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3450 //! down thread raise module event of handle_sorry_disable
3451 //! @param[in] process_type is process type
3452 void tcp_session::down_thread_sorry_disable_event(const TCP_PROCESS_TYPE_TAG process_type)
3454 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3455 boost::format formatter("Thread ID[%d] FUNC IN down_thread_sorry_disable_event");
3456 formatter % boost::this_thread::get_id();
3457 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3460 protocol_module_base::EVENT_TAG module_event;
3462 rw_scoped_lock scope_lock(module_function_sorry_disable_mutex);
3463 //----Debug log----------------------------------------------------------------------
3464 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3465 std::stringstream buf;
3466 buf << "Thread ID[" << boost::this_thread::get_id() << "] ";
3467 buf << "handle_sorry_disable call";
3468 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 48, buf.str(), __FILE__, __LINE__);
3470 //----Debug log----------------------------------------------------------------------
3471 module_event = protocol_module->handle_sorry_disable(down_thread_id);
3473 std::map<protocol_module_base::EVENT_TAG, DOWN_THREAD_FUNC_TYPE_TAG>::iterator func_type = down_thread_module_event_map.find(module_event);
3474 down_thread_function_pair func = down_thread_function_array[func_type->second];
3475 down_thread_next_call_function = func;
3477 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3478 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_sorry_disable_event: NEXT_FUNC[%s]");
3479 formatter % boost::this_thread::get_id() % func_tag_to_string(func_type->second);
3480 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3483 //! down thread exit main loop
3484 //! @param[in] process_type is process type
3485 void tcp_session::down_thread_exit(const TCP_PROCESS_TYPE_TAG process_type)
3487 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3488 boost::format formatter("Thread ID[%d] FUNC IN down_thread_exit");
3489 formatter % boost::this_thread::get_id();
3490 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3492 rw_scoped_lock scoped_lock(exit_flag_update_mutex);
3493 boost::mutex::scoped_lock status_scoped_lock(downthread_status_mutex);
3494 downthread_status_cond.notify_one();
3497 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3498 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_exit");
3499 formatter % boost::this_thread::get_id();
3500 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3503 //! down thread close all socket
3504 void tcp_session::down_thread_all_socket_close(void)
3506 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3507 boost::format formatter("Thread ID[%d] FUNC IN down_thread_all_socket_close");
3508 formatter % boost::this_thread::get_id();
3509 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3512 std::list<socket_element>::iterator close_socket = down_thread_receive_realserver_socket_list.begin();
3513 std::list<socket_element>::iterator list_end = down_thread_receive_realserver_socket_list.end();
3514 boost::system::error_code ec;
3515 while (close_socket != list_end) {
3516 bool bres = close_socket->second->close(ec);
3518 parent_service.connection_inactive(close_socket->first);
3521 down_thread_receive_realserver_socket_list.clear();
3523 client_socket.close(ec);
3525 client_ssl_socket.close(ec);
3526 sorryserver_socket.second->close(ec);
3528 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
3529 boost::format formatter("Thread ID[%d] FUNC OUT down_thread_all_socket_close");
3530 formatter % boost::this_thread::get_id();
3531 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
3535 //! milliseconds to boost::xtime converter
3536 void tcp_session::to_time(int in, boost::xtime &xt)
3538 boost::xtime_get(&xt, boost::TIME_UTC);
3539 xt.sec += (in / 1000);
3540 xt.nsec += (in % 1000) * 1000000;
3541 if (xt.nsec >= 1000000000) {
3543 xt.nsec -= 1000000000;