OSDN Git Service

#30993: gcc4.6以降でのコンパイルエラー対処
[ultramonkey-l7/ultramonkey-l7-v3.git] / l7vsd / src / session_thread_control.cpp
1 /*!
2  *    @file    session_thread_control.cpp
3  *    @brief    session used 2 threads. thread pooling unit is 2 thread control.
4  *
5  * L7VSD: Linux Virtual Server for Layer7 Load Balancing
6  * Copyright (C) 2009  NTT COMWARE Corporation.
7  *
8  * This program is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Lesser General Public
10  * License as published by the Free Software Foundation; either
11  * version 2.1 of the License, or (at your option) any later version.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Lesser General Public License for more details.
17  *
18  * You should have received a copy of the GNU Lesser General Public
19  * License along with this library; if not, write to the Free Software
20  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
21  * 02110-1301 USA
22  *
23  **********************************************************************/
24
25 #include <sched.h>
26 #include "session_thread_control.h"
27
28 #include "utility.h"
29
30 namespace l7vs
31 {
32
33 //
34 //!    @brief create up down thread
35 //
36 int    session_thread_control::start_thread()
37 {
38
39         upthread.reset(new boost::thread(&session_thread_control::upstream_run, this));        //! upstream thread create
40         downthread.reset(new boost::thread(&session_thread_control::downstream_run, this));    //! downstream thread create
41
42         //pthread_setschedparam
43         int    retval, sched_policy;
44         sched_param    scheduler_param;
45         retval    = pthread_getschedparam(upthread->native_handle(), &sched_policy, &scheduler_param);
46         if (retval != 0) return retval;
47
48         if (SCHED_FIFO == sched_algorithm) {
49                 scheduler_param.__sched_priority    = sched_priority;
50                 sched_policy    = SCHED_FIFO;
51         } else if (SCHED_RR == sched_algorithm) {
52                 scheduler_param.__sched_priority    = sched_priority;
53                 sched_policy    = SCHED_RR;
54         } else if (SCHED_BATCH == sched_algorithm) {
55                 sched_policy    = SCHED_BATCH;
56         }
57         if (0 <= sched_algorithm) {
58                 retval            = pthread_setschedparam(upthread->native_handle(), sched_algorithm, &scheduler_param);
59                 if (retval != 0 ) return retval;
60                 retval            = pthread_setschedparam(downthread->native_handle(), sched_algorithm, &scheduler_param);
61                 if (retval != 0 ) return retval;
62         }
63
64         return retval;
65 }
66
67 //
68 //!    @brief upstream thread bind function.
69 //
70 void    session_thread_control::upstream_run()
71 {
72         if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_VIRTUALSERVICE))) {
73                 boost::format fmt("in_function : void session_thread_control::upstream_run()");
74                 Logger::putLogDebug(LOG_CAT_L7VSD_VIRTUALSERVICE, 1, fmt.str(), __FILE__, __LINE__);
75         }
76
77         cpu_set_t       mask;
78         // when num>0, do CPU_SET times of "num_of_core_uses".
79         if (0 < num_of_core_uses) {
80                 CPU_ZERO(&mask);
81                 for (int i = 0; i < num_of_core_uses; ++i) {
82                         CPU_SET(i, &mask);
83                 }
84                 sched_setaffinity(0, sizeof(cpu_set_t), &mask);
85
86                 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_VIRTUALSERVICE))) {
87                         boost::format formatter("void virtualservice_tcp::upstream_run() "
88                                                 "sched_setaffinity after num_of_core_uses = %d");
89                         formatter % num_of_core_uses;
90                         Logger::putLogDebug(LOG_CAT_L7VSD_VIRTUALSERVICE, 2, formatter.str(), __FILE__, __LINE__);
91                 }
92
93         }
94 #ifdef    SCHED_SETAFFINITY
95         sched_setaffinity(0, sizeof(cpu_set_t), &vsnic_cpumask);
96 #endif
97
98         state_tag    state;
99         upthread_running_mutex.lock();
100         {
101                 // get first state from class upstream state.
102                 rw_scoped_lock    upstate_lock(upthread_state_mutex);
103                 state = upthread_state;    //thread local state is update.
104         }
105         for (;;) {  // thread loop
106                 if (state == WAIT) {    // after create or session end. this thread is pooling mode
107                         boost::mutex::scoped_lock    lock(upthread_condition_mutex);
108                         boost::xtime    wait;
109                         boost::xtime_get(&wait, boost::TIME_UTC);
110                         wait.sec += 1;
111                         upthread_running_mutex.unlock();
112                         upthread_condition.timed_wait(lock, wait);   // thread is condition wait( start at notify_all() )
113                         upthread_running_mutex.lock();
114                 } else if (state == EXIT) { // this state is virtualservice end. thread is finishing.
115                         break;
116                 } else { //state RUNNING
117                         session->up_thread_run();    //session upstream thread looping.
118                         stopupstream();
119                 }
120                 rw_scoped_lock    upstate_lock(upthread_state_mutex);
121                 state = upthread_state;    //thread local state is update.
122         }
123         upthread_running_mutex.unlock();
124         boost::mutex::scoped_lock up_lk(upthread_joining_mutex);
125         upthread_joining_condition.notify_all();
126         if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
127                 boost::format fmt("out_function : void session_thread_control::upstream_run()");
128                 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 3, fmt.str(), __FILE__, __LINE__);
129         }
130 }
131 //
132 //! @brief    downstream thread bind function,
133 //
134 void    session_thread_control::downstream_run()
135 {
136         if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_VIRTUALSERVICE))) {
137                 boost::format fmt("in_function : void session_thread_control::downstream_run()");
138                 Logger::putLogDebug(LOG_CAT_L7VSD_VIRTUALSERVICE, 4, fmt.str(), __FILE__, __LINE__);
139         }
140         cpu_set_t       mask;
141         // when num>0, do CPU_SET times of "num_of_core_uses".
142         if (0 < num_of_core_uses) {
143                 CPU_ZERO(&mask);
144                 for (int i = 0; i < num_of_core_uses; ++i) {
145                         CPU_SET(i, &mask);
146                 }
147                 sched_setaffinity(0, sizeof(cpu_set_t), &mask);
148
149                 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_VIRTUALSERVICE))) {
150                         boost::format formatter("void virtualservice_tcp::downstream_run() "
151                                                 "sched_setaffinity after num_of_core_uses = %d");
152                         formatter % num_of_core_uses;
153                         Logger::putLogDebug(LOG_CAT_L7VSD_VIRTUALSERVICE, 5, formatter.str(), __FILE__, __LINE__);
154                 }
155
156         }
157 #ifdef    SCHED_SETAFFINITY
158         sched_setaffinity(0, sizeof(cpu_set_t), &rsnic_cpumask);
159 #endif
160
161         state_tag    state;
162         downthread_running_mutex.lock();
163         {
164                 rw_scoped_lock    downstate_lock(downthread_state_mutex);
165                 state = downthread_state;    //thread local state is update.
166         }
167         for (;;) {  //thread loop
168                 if (state == WAIT) {    //after create or session end. this thread is pooling mode
169                         boost::mutex::scoped_lock    lock(downthread_condition_mutex);
170 //             downthread_condition.wait( lock ); // thread is condition wait( start at notify_all() )
171                         boost::xtime    wait;
172                         boost::xtime_get(&wait, boost::TIME_UTC);
173                         wait.sec += 1;
174                         downthread_running_mutex.unlock();
175                         downthread_condition.timed_wait(lock, wait);   // thread is condition wait( start at notify_all() )
176                         downthread_running_mutex.lock();
177                 } else if (state == EXIT) { // this state is virtualservice end. thread is finishing.
178                         break;
179                 } else { //state RUNNING
180                         session->down_thread_run();//session downstream thread looping.
181                         stopdownstream();
182                 }
183                 rw_scoped_lock    downstate_lock(downthread_state_mutex);
184                 state = downthread_state;    // thread local sate is update.
185         }
186         downthread_running_mutex.unlock();
187         boost::mutex::scoped_lock down_lk(downthread_joining_mutex);
188         downthread_joining_condition.notify_all();
189         if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
190                 boost::format fmt("out_function : void session_thread_control::downstream_run()");
191                 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 6, fmt.str(), __FILE__, __LINE__);
192         }
193 }
194 //
195 //! @brief    start upstream function.
196 //
197 void    session_thread_control::startupstream()
198 {
199         if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
200                 boost::format fmt("in_function : void session_thread_control::startupstream()");
201                 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 7, fmt.str(), __FILE__, __LINE__);
202         }
203         rw_scoped_lock    upstate_lock(upthread_state_mutex);
204         if (upthread_state != EXIT) upthread_state = RUNNING;         // upthread state update.[RUNNING] -> alive mode
205         upthread_condition.notify_all();                            // conditionwait upstreamthread is run.
206         if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
207                 boost::format fmt("out_function : void session_thread_control::startupstream() :status = %d");
208                 fmt % upthread_state;
209                 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 8, fmt.str(), __FILE__, __LINE__);
210         }
211 }
212 //
213 //! @brief    stop upstream function
214 //
215 void    session_thread_control::stopupstream()
216 {
217         if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
218                 boost::format fmt("in_function : void session_thread_control::stopupstream()");
219                 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 9, fmt.str(), __FILE__, __LINE__);
220         }
221         rw_scoped_lock    upstate_lock(upthread_state_mutex);
222         if (upthread_state != EXIT) upthread_state = WAIT;                 // upthread state is update [WAIT] -> pooling mode
223         if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
224                 boost::format fmt("out_function : void session_thread_control::stopupstream() : status = %d");
225                 fmt % upthread_state;
226                 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 10, fmt.str(), __FILE__, __LINE__);
227         }
228 }
229 //
230 //! @brief    start downstream function
231 //
232 void    session_thread_control::startdownstream()
233 {
234         if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
235                 boost::format fmt("in_function : void session_thread_control::startdownstream()");
236                 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 11, fmt.str(), __FILE__, __LINE__);
237         }
238         rw_scoped_lock    downstate_lock(downthread_state_mutex);
239         if (downthread_state != EXIT) downthread_state = RUNNING;         // downstream state is update [RUNNING] -> alive mode
240         downthread_condition.notify_all();                                // condition wait thread is run.
241         if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
242                 boost::format fmt("out_function : void session_thread_control::startdownstream() : status = %d");
243                 fmt % downthread_state;
244                 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 12, fmt.str(), __FILE__, __LINE__);
245         }
246 }
247 //
248 //! @brief    stop downstream function.
249 //
250 void    session_thread_control::stopdownstream()
251 {
252         if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
253                 boost::format fmt("in_function : void session_thread_control::stopdownstream()");
254                 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 13, fmt.str(), __FILE__, __LINE__);
255         }
256         rw_scoped_lock    downstate_lock(downthread_state_mutex);
257         if (downthread_state != EXIT) downthread_state = WAIT;             // downstream state is update [WAIT] -> pooling mode
258         if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
259                 boost::format fmt("out_function : void session_thread_control::stopdownstream() : status = %d");
260                 fmt % downthread_state;
261                 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 14, fmt.str(), __FILE__, __LINE__);
262         }
263 }
264 //
265 //!    @brief    upstream and downstream threads finished function
266 //
267 void    session_thread_control::join()
268 {
269         if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
270                 boost::format fmt("in_function : void session_thread_control::join() :");
271                 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 15, fmt.str(), __FILE__, __LINE__);
272         }
273
274         boost::mutex::scoped_lock    up_lk(upthread_joining_mutex);
275         boost::mutex::scoped_lock    down_lk(downthread_joining_mutex);
276
277         {
278                 rw_scoped_lock    upstate_lock(upthread_state_mutex);
279                 upthread_state = EXIT;    //upstream state update [EXIT] -> thread exit mode
280         }
281         {
282                 boost::mutex::scoped_lock upthread_running_wait(upthread_running_mutex);
283                 upthread_condition.notify_all();    // conditionwait thread is run
284         }
285
286         {
287                 rw_scoped_lock    downstate_lock(downthread_state_mutex);
288                 downthread_state = EXIT;    //downstream state update [EXIT] -> thread exit mode
289         }
290         {
291                 boost::mutex::scoped_lock downthread_running_wait(downthread_running_mutex);
292                 downthread_condition.notify_all(); //condition wait thread is run.
293         }
294
295         upthread_joining_condition.wait(up_lk);
296         downthread_joining_condition.wait(down_lk);
297
298         if (unlikely(LOG_LV_DEBUG == l7vs::Logger::getLogLevel(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE))) {
299                 boost::format fmt("out_function : void session_thread_control::stopdownstream() : up_status = %d / down_status = %d");
300                 fmt % upthread_state % downthread_state;
301                 l7vs::Logger::putLogDebug(l7vs::LOG_CAT_L7VSD_VIRTUALSERVICE, 16, fmt.str(), __FILE__, __LINE__);
302         }
303 }
304 //
305 //!    @brief    get upthread mutex object reference
306 //
307 boost::mutex    &session_thread_control::get_upthread_mutex()
308 {
309         return    upthread_running_mutex;
310 }
311 //
312 //!    @brief    get downthread mutex object reference
313 //
314 boost::mutex    &session_thread_control::get_downthread_mutex()
315 {
316         return    downthread_running_mutex;
317 }
318
319 }    //namespace l7vs