2 * @file session_thread_control.cpp
3 * @brief session used 2 threads. thread pooling unit is 2 thread control.
5 * L7VSD: Linux Virtual Server for Layer7 Load Balancing
6 * Copyright (C) 2009 NTT COMWARE Corporation.
8 * This program is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
23 **********************************************************************/
26 #include "session_thread_control.h"
34 //! @brief create up down thread
36 int session_thread_control::start_thread()
39 upthread.reset(new boost::thread(&session_thread_control::upstream_run, this)); //! upstream thread create
40 downthread.reset(new boost::thread(&session_thread_control::downstream_run, this)); //! downstream thread create
42 //pthread_setschedparam
43 int retval, sched_policy;
44 sched_param scheduler_param;
45 retval = pthread_getschedparam(upthread->native_handle(), &sched_policy, &scheduler_param);
46 if (retval != 0) return retval;
48 if (SCHED_FIFO == sched_algorithm) {
49 scheduler_param.__sched_priority = sched_priority;
50 sched_policy = SCHED_FIFO;
51 } else if (SCHED_RR == sched_algorithm) {
52 scheduler_param.__sched_priority = sched_priority;
53 sched_policy = SCHED_RR;
54 } else if (SCHED_BATCH == sched_algorithm) {
55 sched_policy = SCHED_BATCH;
57 if (0 <= sched_algorithm) {
58 retval = pthread_setschedparam(upthread->native_handle(), sched_algorithm, &scheduler_param);
59 if (retval != 0 ) return retval;
60 retval = pthread_setschedparam(downthread->native_handle(), sched_algorithm, &scheduler_param);
61 if (retval != 0 ) return retval;
68 //! @brief upstream thread bind function.
70 void session_thread_control::upstream_run()
72 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_VIRTUALSERVICE))) {
73 boost::format fmt("in_function : void session_thread_control::upstream_run()");
74 Logger::putLogDebug(LOG_CAT_L7VSD_VIRTUALSERVICE, 1, fmt.str(), __FILE__, __LINE__);
78 // when num>0, do CPU_SET times of "num_of_core_uses".
79 if (0 < num_of_core_uses) {
81 for (int i = 0; i < num_of_core_uses; ++i) {
84 sched_setaffinity(0, sizeof(cpu_set_t), &mask);
86 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_VIRTUALSERVICE))) {
87 boost::format formatter("void virtualservice_tcp::upstream_run() "
88 "sched_setaffinity after num_of_core_uses = %d");
89 formatter % num_of_core_uses;
90 Logger::putLogDebug(LOG_CAT_L7VSD_VIRTUALSERVICE, 2, formatter.str(), __FILE__, __LINE__);
94 #ifdef SCHED_SETAFFINITY
95 sched_setaffinity(0, sizeof(cpu_set_t), &vsnic_cpumask);
99 upthread_running_mutex.lock();
101 // get first state from class upstream state.
102 rw_scoped_lock upstate_lock(upthread_state_mutex);
103 state = upthread_state; //thread local state is update.
105 for (;;) { // thread loop
106 if (state == WAIT) { // after create or session end. this thread is pooling mode
107 boost::mutex::scoped_lock lock(upthread_condition_mutex);
109 boost::xtime_get(&wait, boost::TIME_UTC);
111 upthread_running_mutex.unlock();
112 upthread_condition.timed_wait(lock, wait); // thread is condition wait( start at notify_all() )
113 upthread_running_mutex.lock();
114 } else if (state == EXIT) { // this state is virtualservice end. thread is finishing.
116 } else { //state RUNNING
117 session->up_thread_run(); //session upstream thread looping.
120 rw_scoped_lock upstate_lock(upthread_state_mutex);
121 state = upthread_state; //thread local state is update.
123 upthread_running_mutex.unlock();
124 boost::mutex::scoped_lock up_lk(upthread_joining_mutex);
125 upthread_joining_condition.notify_all();
126 if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
127 boost::format fmt("out_function : void session_thread_control::upstream_run()");
128 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 3, fmt.str(), __FILE__, __LINE__);
132 //! @brief downstream thread bind function,
134 void session_thread_control::downstream_run()
136 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_VIRTUALSERVICE))) {
137 boost::format fmt("in_function : void session_thread_control::downstream_run()");
138 Logger::putLogDebug(LOG_CAT_L7VSD_VIRTUALSERVICE, 4, fmt.str(), __FILE__, __LINE__);
141 // when num>0, do CPU_SET times of "num_of_core_uses".
142 if (0 < num_of_core_uses) {
144 for (int i = 0; i < num_of_core_uses; ++i) {
147 sched_setaffinity(0, sizeof(cpu_set_t), &mask);
149 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_VIRTUALSERVICE))) {
150 boost::format formatter("void virtualservice_tcp::downstream_run() "
151 "sched_setaffinity after num_of_core_uses = %d");
152 formatter % num_of_core_uses;
153 Logger::putLogDebug(LOG_CAT_L7VSD_VIRTUALSERVICE, 5, formatter.str(), __FILE__, __LINE__);
157 #ifdef SCHED_SETAFFINITY
158 sched_setaffinity(0, sizeof(cpu_set_t), &rsnic_cpumask);
162 downthread_running_mutex.lock();
164 rw_scoped_lock downstate_lock(downthread_state_mutex);
165 state = downthread_state; //thread local state is update.
167 for (;;) { //thread loop
168 if (state == WAIT) { //after create or session end. this thread is pooling mode
169 boost::mutex::scoped_lock lock(downthread_condition_mutex);
170 // downthread_condition.wait( lock ); // thread is condition wait( start at notify_all() )
172 boost::xtime_get(&wait, boost::TIME_UTC);
174 downthread_running_mutex.unlock();
175 downthread_condition.timed_wait(lock, wait); // thread is condition wait( start at notify_all() )
176 downthread_running_mutex.lock();
177 } else if (state == EXIT) { // this state is virtualservice end. thread is finishing.
179 } else { //state RUNNING
180 session->down_thread_run();//session downstream thread looping.
183 rw_scoped_lock downstate_lock(downthread_state_mutex);
184 state = downthread_state; // thread local sate is update.
186 downthread_running_mutex.unlock();
187 boost::mutex::scoped_lock down_lk(downthread_joining_mutex);
188 downthread_joining_condition.notify_all();
189 if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
190 boost::format fmt("out_function : void session_thread_control::downstream_run()");
191 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 6, fmt.str(), __FILE__, __LINE__);
195 //! @brief start upstream function.
197 void session_thread_control::startupstream()
199 if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
200 boost::format fmt("in_function : void session_thread_control::startupstream()");
201 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 7, fmt.str(), __FILE__, __LINE__);
203 rw_scoped_lock upstate_lock(upthread_state_mutex);
204 if (upthread_state != EXIT) upthread_state = RUNNING; // upthread state update.[RUNNING] -> alive mode
205 upthread_condition.notify_all(); // conditionwait upstreamthread is run.
206 if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
207 boost::format fmt("out_function : void session_thread_control::startupstream() :status = %d");
208 fmt % upthread_state;
209 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 8, fmt.str(), __FILE__, __LINE__);
213 //! @brief stop upstream function
215 void session_thread_control::stopupstream()
217 if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
218 boost::format fmt("in_function : void session_thread_control::stopupstream()");
219 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 9, fmt.str(), __FILE__, __LINE__);
221 rw_scoped_lock upstate_lock(upthread_state_mutex);
222 if (upthread_state != EXIT) upthread_state = WAIT; // upthread state is update [WAIT] -> pooling mode
223 if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
224 boost::format fmt("out_function : void session_thread_control::stopupstream() : status = %d");
225 fmt % upthread_state;
226 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 10, fmt.str(), __FILE__, __LINE__);
230 //! @brief start downstream function
232 void session_thread_control::startdownstream()
234 if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
235 boost::format fmt("in_function : void session_thread_control::startdownstream()");
236 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 11, fmt.str(), __FILE__, __LINE__);
238 rw_scoped_lock downstate_lock(downthread_state_mutex);
239 if (downthread_state != EXIT) downthread_state = RUNNING; // downstream state is update [RUNNING] -> alive mode
240 downthread_condition.notify_all(); // condition wait thread is run.
241 if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
242 boost::format fmt("out_function : void session_thread_control::startdownstream() : status = %d");
243 fmt % downthread_state;
244 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 12, fmt.str(), __FILE__, __LINE__);
248 //! @brief stop downstream function.
250 void session_thread_control::stopdownstream()
252 if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
253 boost::format fmt("in_function : void session_thread_control::stopdownstream()");
254 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 13, fmt.str(), __FILE__, __LINE__);
256 rw_scoped_lock downstate_lock(downthread_state_mutex);
257 if (downthread_state != EXIT) downthread_state = WAIT; // downstream state is update [WAIT] -> pooling mode
258 if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
259 boost::format fmt("out_function : void session_thread_control::stopdownstream() : status = %d");
260 fmt % downthread_state;
261 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 14, fmt.str(), __FILE__, __LINE__);
265 //! @brief upstream and downstream threads finished function
267 void session_thread_control::join()
269 if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
270 boost::format fmt("in_function : void session_thread_control::join() :");
271 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 15, fmt.str(), __FILE__, __LINE__);
274 boost::mutex::scoped_lock up_lk(upthread_joining_mutex);
275 boost::mutex::scoped_lock down_lk(downthread_joining_mutex);
278 rw_scoped_lock upstate_lock(upthread_state_mutex);
279 upthread_state = EXIT; //upstream state update [EXIT] -> thread exit mode
282 boost::mutex::scoped_lock upthread_running_wait(upthread_running_mutex);
283 upthread_condition.notify_all(); // conditionwait thread is run
287 rw_scoped_lock downstate_lock(downthread_state_mutex);
288 downthread_state = EXIT; //downstream state update [EXIT] -> thread exit mode
291 boost::mutex::scoped_lock downthread_running_wait(downthread_running_mutex);
292 downthread_condition.notify_all(); //condition wait thread is run.
295 upthread_joining_condition.wait(up_lk);
296 downthread_joining_condition.wait(down_lk);
298 if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
299 boost::format fmt("out_function : void session_thread_control::stopdownstream() : up_status = %d / down_status = %d");
300 fmt % upthread_state % downthread_state;
301 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 16, fmt.str(), __FILE__, __LINE__);
305 //! @brief get upthread mutex object reference
307 boost::mutex &session_thread_control::get_upthread_mutex()
309 return upthread_running_mutex;
312 //! @brief get downthread mutex object reference
314 boost::mutex &session_thread_control::get_downthread_mutex()
316 return downthread_running_mutex;