OSDN Git Service

ソースツリー再構成中(ほぼOK?)
[ultramonkey-l7/ultramonkey-l7-v3.git] / l7vsd / src / replication.cpp
1 /*
2  *      @file   replication.cpp
3  *      @brief  Replication class
4  *
5  * L7VSD: Linux Virtual Server for Layer7 Load Balancing\r
6  * Copyright (C) 2009  NTT COMWARE Corporation.\r
7  *\r
8  * This program is free software; you can redistribute it and/or\r
9  * modify it under the terms of the GNU Lesser General Public\r
10  * License as published by the Free Software Foundation; either\r
11  * version 2.1 of the License, or (at your option) any later version.\r
12  *\r
13  * This program is distributed in the hope that it will be useful,\r
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of\r
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU\r
16  * Lesser General Public License for more details.\r
17  *\r
18  * You should have received a copy of the GNU Lesser General Public\r
19  * License along with this library; if not, write to the Free Software\r
20  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA\r
21  * 02110-1301 USA\r
22  *\r
23  **********************************************************************/\r
24 #include        <boost/lexical_cast.hpp>
25 #include        <boost/format.hpp>
26 #include        "replication.h"
27 #include        "parameter.h"
28 #include        "logger.h"
29
30 namespace l7vs{
31
32 //! emun States Type string
33 static const char* replication_mode[] = {
34         "REPLICATION_OUT",
35         "REPLICATION_SINGLE",
36         "REPLICATION_MASTER",
37         "REPLICATION_SLAVE",
38         "REPLICATION_MASTER_STOP",
39         "REPLICATION_SLAVE_STOP"
40 };
41
42
43 //! Initialize Replication
44 //! @retval 0 Success
45 //! @retval -1 Error
46 int                     replication::initialize(){
47         Logger  logger( LOG_CAT_L7VSD_REPLICATION, 1, "replication::initialize", __FILE__, __LINE__ );
48
49         Parameter       param;
50         error_code      ip_addr_ret, service_name_ret, nic_ret, interval_ret;
51         std::string     buf;
52
53         // Check by continuous initialize.
54         if ( REPLICATION_OUT != replication_state.service_status ){
55                 // Initialization has already been done.
56                 buf = boost::io::str( boost::format( "Initialization is a failure, because initialization has already been done. mode : %s" ) 
57                                                                                         % replication_mode[( int )replication_state.service_status] );
58                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 1, buf, __FILE__, __LINE__ );
59                 return -1;
60         }
61
62         // Check the Parameter exists
63         replication_info.ip_addr = param.get_string( PARAM_COMP_REPLICATION, "ip_addr", ip_addr_ret );
64         replication_info.service_name = param.get_string( PARAM_COMP_REPLICATION, "service_name", service_name_ret );
65         replication_info.nic = param.get_string( PARAM_COMP_REPLICATION, "nic", nic_ret );
66         replication_info.interval = param.get_int( PARAM_COMP_REPLICATION, "interval", interval_ret );
67
68         // SG File not set
69         if ( ip_addr_ret && service_name_ret && nic_ret && interval_ret ){
70                 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 1, "Required item is not set in l7vs.", __FILE__, __LINE__ );
71                 // Status Set
72                 replication_state.service_status = REPLICATION_SINGLE;
73                 return 0;
74         }
75
76         // IP Address exists
77         if ( ip_addr_ret  ){
78                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 2, "IP Address is not set.", __FILE__, __LINE__ );
79                 // Status Set
80                 replication_state.service_status = REPLICATION_SINGLE;
81                 return -1;
82         }
83         // Port exists
84         if ( service_name_ret ){
85                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 3, "Port is not set.", __FILE__, __LINE__ );
86                 // Status Set
87                 replication_state.service_status = REPLICATION_SINGLE;
88                 return -1;
89         }
90         // NIC exists
91         if ( nic_ret ){
92                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 4, "NIC is not set.", __FILE__, __LINE__ );
93                 // Status Set
94                 replication_state.service_status = REPLICATION_SINGLE;
95                 return -1;
96         }
97         // Interval exists
98         if ( interval_ret ){
99                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 5, "Interval is not set.", __FILE__, __LINE__ );
100                 // Status Set
101                 replication_state.service_status = REPLICATION_SINGLE;
102                 return -1;
103         }
104
105         // Failed in the acquisition of IP
106         if ( replication_info.ip_addr == "" ){
107                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 6, "Could not get IP Address.", __FILE__, __LINE__ );
108                 // Status Set
109                 replication_state.service_status = REPLICATION_SINGLE;
110                 return -1;
111         }
112
113         // Failed in the acquisition of Port
114         if ( replication_info.service_name == "" ){
115                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 7, "Could not get Port.", __FILE__, __LINE__ );
116                 // Status Set
117                 replication_state.service_status = REPLICATION_SINGLE;
118                 return -1;
119         }
120
121         // Failed in the acquisition of NIC
122         if ( replication_info.nic == "" ){
123                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 8, "Could not get NIC.", __FILE__, __LINE__ );
124                 // Status Set
125                 replication_state.service_status = REPLICATION_SINGLE;
126                 return -1;
127         }
128
129         // Variable that sets ID
130         std::string key_id;
131         std::string key_size;
132         error_code      id_ret, size_ret;
133         mutex_ptr       component_mutex;
134
135         replication_mutex.clear();
136         // Conponent exists
137         // Get Component infomation
138         for ( int i=0; i<CMP_MAX; i++){
139                 key_id = boost::io::str( boost::format( "cmponent_id_%02d" ) % i );
140                 key_size = boost::io::str( boost::format( "cmponent_size_%02d" ) % i );
141
142                 // ID and the Size exist
143                 replication_info.component_info[i].id = param.get_string( PARAM_COMP_REPLICATION, key_id, id_ret ); 
144                 replication_info.component_info[i].block_size = param.get_int( PARAM_COMP_REPLICATION, key_size, size_ret );
145                 if ( id_ret || size_ret ){
146                         break;
147                 }
148                 if ( replication_info.component_info[i].id == "" ){
149                         buf = boost::io::str( boost::format( "Could not get %s." ) % key_id );
150                         Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 9, buf, __FILE__, __LINE__ );
151                         // Status Set
152                         replication_state.service_status = REPLICATION_SINGLE;
153                         return -1;
154                 }
155
156                 // Count block head number
157                 replication_info.component_info[i].block_head = ( 0 == i ? 0 :
158                                 ( replication_info.component_info[i-1].block_head + replication_info.component_info[i-1].block_size ) );
159                 replication_state.total_block += replication_info.component_info[i].block_size;
160
161                 // Number of Component 
162                 replication_info.component_num++;
163
164                 component_mutex = mutex_ptr( new boost::mutex );
165                 replication_mutex.insert( std::pair<std::string, mutex_ptr>( replication_info.component_info[i].id, component_mutex ) );
166         }
167
168         // Check the Parameters value
169         if ( 0 != check_parameter() ){
170                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 10, "Failed in parameter check.", __FILE__, __LINE__ );
171                 // Status Set
172                 replication_state.service_status = REPLICATION_SINGLE;
173                 return -1;
174         }
175
176         // The memory is 0 or less. 
177         if ( 0 >= replication_state.total_block){
178                 // Status Set
179                 replication_state.service_status = REPLICATION_SINGLE;
180                 return 0;
181         }
182
183         replication_state.last_send_block = replication_state.total_block-1;
184
185         // Get the Replication memory
186         replication_state.replication_memory = getrpl();
187         // Confirmation of Replication memory 
188         if ( NULL == replication_state.replication_memory ){
189                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 1, "Replication memory is NULL.", __FILE__, __LINE__ );
190                 // Status Set
191                 replication_state.service_status = REPLICATION_SINGLE;
192                 return -1;
193         }
194         // Get the Components memory
195         replication_state.component_memory = getcmp();
196         // Confirmation of Components memory
197         if ( NULL == replication_state.component_memory ){
198                 // free memory
199                 releaserpl();
200                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 2, "Components memory is NULL.", __FILE__, __LINE__ );
201                 // Status Set
202                 replication_state.service_status = REPLICATION_SINGLE;
203                 return -1;
204         }
205
206         // Memory for Switch Surface Number
207         replication_state.surface_block_array_ptr = getsrf();
208         if ( NULL == replication_state.surface_block_array_ptr ){
209                 // free memory
210                 releaserpl();
211                 releasecmp();
212                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 3, "Surface infomation memory is NULL.", __FILE__, __LINE__ );
213                 // Status Set
214                 replication_state.service_status = REPLICATION_SINGLE;
215                 return -1;
216         }
217
218         // Status Set to Slave
219         if ( 0 != set_slave() ){
220                 // free memory
221                 releasesrf();
222                 releaserpl();
223                 releasecmp();
224                 // Status Set
225                 replication_state.service_status = REPLICATION_SINGLE;
226                 return -1;
227         }
228
229         replication_state.service_status = REPLICATION_SLAVE;
230
231         {
232                 boost::mutex::scoped_lock       lock( replication_thread_mutex );
233
234                 replication_flag = WAIT;
235         }
236         replication_thread_ptr = thread_ptr( new boost::thread( boost::bind ( &replication::send_thread, this ) ) );
237
238         {
239                 boost::mutex::scoped_lock       lock( service_thread_mutex );
240
241                 service_flag = RUNNING;
242         }
243         service_thread_ptr = thread_ptr( new boost::thread( boost::bind ( &replication::service_thread, this ) ) );
244
245         buf = boost::io::str( boost::format( "Initialized in %s mode." ) % replication_mode[( int )replication_state.service_status] );
246         Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 2, buf, __FILE__, __LINE__ );
247
248         return 0;
249 }
250
251 //! Finalize Replication
252 void            replication::finalize(){
253         Logger  logger( LOG_CAT_L7VSD_REPLICATION, 2, "replication::finalize", __FILE__, __LINE__ );
254
255         {
256                 boost::mutex::scoped_lock       lock( replication_thread_mutex );
257
258                 replication_flag = EXIT;
259                 replication_thread_condition.notify_all();
260         }
261         if ( replication_thread_ptr ){
262                 replication_thread_ptr->join();
263         }
264
265         // Socket finalaize
266         if ( replication_send_socket.is_open() ){
267                 replication_send_socket.close();
268         }
269         if ( replication_receive_socket.is_open() ){
270                 replication_receive_socket.close();
271         }
272
273         {
274                 boost::mutex::scoped_lock       lock( service_thread_mutex );
275
276                 service_flag = EXIT;
277                 service_thread_condition.notify_all();
278         }
279         if ( service_thread_ptr ){
280                 service_thread_ptr->join();
281         }
282
283         // Release replication memory
284         releaserpl();
285         // Release component memory
286         releasecmp();
287         // Release surface block memory
288         releasesrf();
289
290         // reset of replication_state
291         replication_state.send_time = 0;
292         replication_state.last_send_block = 0;
293         replication_state.last_recv_block = 0;
294         replication_state.total_block = 0;
295         replication_state.surface_block_no = 1;
296
297         // reset of replication_info
298         replication_info.ip_addr = "";
299         replication_info.service_name = "";
300         replication_info.nic = "";
301         replication_info.interval = 0;
302         replication_info.component_num = 0;
303         for ( int loop = 0; loop < CMP_MAX; loop++ ){
304                 replication_info.component_info[loop].id = "";
305                 replication_info.component_info[loop].block_head = 0;
306                 replication_info.component_info[loop].block_size = 0;
307         }
308         replication_mutex.clear();
309
310         replication_endpoint = boost::asio::ip::udp::endpoint();
311         bind_endpoint = boost::asio::ip::udp::endpoint();
312
313         // status change
314         replication_state.service_status = REPLICATION_OUT;
315 }
316
317 //! Switch Slave to Master
318 void            replication::switch_to_master(){
319         Logger  logger( LOG_CAT_L7VSD_REPLICATION, 3, "replication::switch_to_master", __FILE__, __LINE__ );
320
321         std::string buf;
322         int ret;
323
324         switch ( replication_state.service_status ){
325                 case REPLICATION_SLAVE:
326                 case REPLICATION_SLAVE_STOP:
327                         // Set Mastre Mode
328                         ret = set_master();
329                         if ( 0 != ret ){
330                                 replication_state.service_status = REPLICATION_SINGLE;
331
332                                 releaserpl();
333                                 releasecmp();
334                                 releasesrf();
335
336                                 buf = boost::io::str( boost::format( "Switch to master NG. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
337                                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 11, buf, __FILE__, __LINE__ );
338                         } else {
339                                 // Copy from compornent area to replication area.
340                                 memcpy( replication_state.replication_memory, replication_state.component_memory, replication_state.total_block*DATA_SIZE );
341
342                                 if ( REPLICATION_SLAVE == replication_state.service_status ){
343                                         // Set mode.
344                                         replication_state.service_status = REPLICATION_MASTER;
345
346                                         {
347                                                 boost::mutex::scoped_lock       lock( replication_thread_mutex );
348                                                 if      ( replication_flag != EXIT ){
349                                                          replication_flag = RUNNING;
350                                                 }
351                                                 replication_thread_condition.notify_all();
352                                         }
353                                 } else {
354                                 // Set mode.
355                                         replication_state.service_status = REPLICATION_MASTER_STOP;
356                                 }
357
358                                 buf = boost::io::str( boost::format( "Switch to master OK. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
359                                 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 3, buf, __FILE__, __LINE__ );
360                         }
361                         break;
362                 case REPLICATION_SINGLE:
363                         buf = boost::io::str( boost::format( "Starting by %s, doesn't shift to MASTER." ) % replication_mode[( int )replication_state.service_status] ); 
364                         Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 4, buf, __FILE__, __LINE__ );
365                         break;
366                 default:
367                         buf = boost::io::str( boost::format( "Can not switch to master. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
368                         Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 1, buf, __FILE__, __LINE__ );
369                         break;
370         }
371 }
372
373 //! Set Master mode
374 int             replication::set_master()
375 {
376         std::string buf;
377         boost::system::error_code err;
378
379         // close socket
380         if ( replication_send_socket.is_open() ){
381                 replication_send_socket.close();
382         }
383         if ( replication_receive_socket.is_open() ){
384                 replication_receive_socket.cancel();
385                 replication_receive_socket.close();
386         }
387
388         {
389                 boost::mutex::scoped_lock       lock( service_thread_mutex );
390
391                 if      ( service_flag == RUNNING ){
392                         service_flag = WAIT_REQ;
393                         service_io.stop();
394                 }
395         }
396
397         while ( service_flag == WAIT_REQ ){
398                 usleep( 1 );
399         }
400
401         {
402                 boost::mutex::scoped_lock       lock( service_thread_mutex );
403
404                 if      ( service_flag != EXIT ){
405                         service_io.reset();
406                         service_flag = RUNNING;
407
408                         service_thread_condition.notify_all();
409                 }
410         }
411
412         Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 5, "Initialization of send socket is success.", __FILE__, __LINE__ );
413
414         return 0;
415
416 }
417
418 //! Switch Master to Slave
419 void            replication::switch_to_slave(){
420         Logger  logger( LOG_CAT_L7VSD_REPLICATION, 4, "replication::switch_to_slave", __FILE__, __LINE__ );
421
422         std::string buf;
423         int ret;
424
425         switch ( replication_state.service_status ){
426                 case REPLICATION_MASTER:
427                 case REPLICATION_MASTER_STOP:
428
429                         {
430                                 boost::mutex::scoped_lock       lock( replication_thread_mutex );
431                                 if      ( replication_flag != EXIT ){
432                                          replication_flag = WAIT;
433                                 }
434                         }
435
436                         // Set Mastre Mode
437                         ret = set_slave();
438                         if ( 0 != ret ){
439                                 replication_state.service_status = REPLICATION_SINGLE;
440
441                                 releaserpl();
442                                 releasecmp();
443                                 releasesrf();
444
445                                 buf = boost::io::str( boost::format( "Switch to slave NG. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
446                                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 12, buf, __FILE__, __LINE__ );
447                         } else {
448                                 // initialize to replication area.
449                                 memset( replication_state.replication_memory, '\0', replication_state.total_block * DATA_SIZE );
450
451                                 if ( REPLICATION_MASTER == replication_state.service_status ){
452                                         // Set mode.
453                                         replication_state.service_status = REPLICATION_SLAVE;
454                                 } else {
455                                         // Set mode.
456                                         replication_state.service_status = REPLICATION_SLAVE_STOP;
457                                 }
458
459                                 buf = boost::io::str( boost::format( "Switch to slave OK. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
460                                 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 6, buf, __FILE__, __LINE__ );
461                         }
462                         break;
463                 case REPLICATION_SINGLE:
464                         buf = boost::io::str( boost::format( "Starting by %s, doesn't shift to SLAVE." ) % replication_mode[( int )replication_state.service_status] );
465                         Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 7, buf, __FILE__, __LINE__ );
466                         break;
467                 default:
468                         buf = boost::io::str( boost::format( "Can not switch to slave. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
469                         Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 2, buf, __FILE__, __LINE__ );
470                         break;
471         }
472 }
473
474 //! Set Slave mode
475 int             replication::set_slave()
476 {
477         std::string buf;
478         boost::system::error_code err;
479
480         // close socket
481         if ( replication_send_socket.is_open() ){
482                 replication_send_socket.close();
483         }
484         if ( replication_receive_socket.is_open() ){
485                 replication_receive_socket.cancel();
486                 replication_receive_socket.close();
487         }
488
489         {
490                 boost::mutex::scoped_lock       lock( service_thread_mutex );
491
492                 if      ( service_flag != EXIT ){
493                         service_flag = WAIT_REQ;
494                         service_io.stop();
495                 }
496         }
497         while ( service_flag == WAIT_REQ ){
498                 usleep( 1 );
499         }
500
501         // make receive socket
502 //std::cout << "slave " << bind_endpoint.address() << ":" << bind_endpoint.port() << "\n";
503         replication_receive_socket.open( bind_endpoint.protocol(), err );
504         if ( err ){
505                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 1, err.message(), __FILE__, __LINE__ );
506                 return -1;
507         }
508         replication_receive_socket.bind( bind_endpoint, err );
509         if ( err ){
510                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 2, err.message(), __FILE__, __LINE__ );
511                 return -1;
512         }
513
514 //std::cout << "slave " << bind_endpoint.address() << ":" << bind_endpoint.port() << "\n";
515         replication_receive_socket.async_receive_from( boost::asio::buffer( &replication_data, sizeof( struct replication_data_struct ) ),
516                                                                                         bind_endpoint,
517                                                                                         boost::bind( &replication::handle_receive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
518
519         {
520                 boost::mutex::scoped_lock       lock( service_thread_mutex );
521
522                 if      ( service_flag != EXIT ){
523                         service_io.reset();
524                         service_flag = RUNNING;
525
526                         service_thread_condition.notify_all();
527                 }
528         }
529         Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 8, "Initialization of receive socket is success.", __FILE__, __LINE__ );
530
531         return 0;
532 }
533
534 //! Pay replication memory 
535 //! @param[in] component_id is the one to identify the component.
536 //! @param[out] size of component use blocks
537 //! @return Replication memory address
538 //! @retval nonnull Replication memory address
539 //! @retval NULL Error
540 void*           replication::pay_memory( const std::string& inid, unsigned int& outsize ){
541         Logger  logger( LOG_CAT_L7VSD_REPLICATION, 5, "replication::pay_memory", __FILE__, __LINE__ );
542
543         void *ret = NULL;
544         std::string buf;
545
546         outsize = 0;
547         // Check replication mode.
548         if ( REPLICATION_OUT == replication_state.service_status || REPLICATION_SINGLE == replication_state.service_status){
549                 // Check mode that can use the replication.
550                 buf = boost::io::str( boost::format( "Improper mode for Replication. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
551                 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 9, buf, __FILE__, __LINE__ );
552                 return NULL;
553         }
554
555         if ( NULL == replication_state.component_memory ){
556                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 4, "You can't get memory. Component memory is NULL.", __FILE__, __LINE__ );
557                 return NULL;
558         }
559
560         // search Component ID
561         for ( int i = 0; i < replication_info.component_num; i++ ){
562                 // Demand ID is compared with Component ID
563                 if ( inid == replication_info.component_info[i].id ){
564                         // size check
565                         if ( 0 == replication_info.component_info[i].block_size ){
566                                 Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 3, "Component block size is 0.", __FILE__, __LINE__ );
567                                 return NULL;
568                         }
569
570                         // block_head check
571                         if ( replication_info.component_info[i].block_head < 0 || replication_info.component_info[i].block_head > replication_state.total_block ){
572                                 buf = boost::io::str( boost::format( "Too many component block. Max is %d." ) % replication_state.total_block );
573                                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 13, buf, __FILE__, __LINE__ );
574                                 return NULL;
575                         }
576                         // Pay memory address
577                         ret = ( char * )replication_state.component_memory + replication_info.component_info[i].block_head * DATA_SIZE;
578                         // Nnumber of blocks of ID was returned.
579                         outsize = replication_info.component_info[i].block_size;
580
581                         // LOG INFO
582                         char    str[256];
583                         sprintf( str, "Component Info ID : \"%s\". Block size : %d . Head Block No : %d/  Pay memory : %p ",
584                                                                                                 replication_info.component_info[i].id.c_str(),
585                                                                                                 replication_info.component_info[i].block_size,
586                                                                                                 replication_info.component_info[i].block_head,
587                                                                                                 ( char* )ret );
588                         Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 10, str, __FILE__, __LINE__ );
589                         return ret;
590                 }
591         }
592
593         buf = boost::io::str( boost::format( "Unknown component ID. Component ID : %s" ) % inid );
594         Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 14, buf, __FILE__, __LINE__ );
595
596         return NULL;
597 }
598
599 //! Replication Dump
600 void            replication::dump_memory(){
601         Logger  logger( LOG_CAT_L7VSD_REPLICATION, 6, "replication::dump_memory", __FILE__, __LINE__ );
602
603         int size;
604         unsigned char* p;
605         unsigned char* head;
606         int h = 0;
607         int i = 0;
608         std::string     buf;
609
610         // Check replication mode.
611         if ( REPLICATION_OUT == replication_state.service_status || REPLICATION_SINGLE == replication_state.service_status){
612                 buf = boost::io::str( boost::format( "Replication memory dump failure. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
613                 Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 4, buf, __FILE__, __LINE__ );
614                 return;
615         }
616
617         if ( NULL == replication_state.replication_memory ){
618                 Logger::putLogWarn( LOG_CAT_L7VSD_SYSTEM_MEMORY, 1, "Replication memory is NULL.", __FILE__, __LINE__ );
619                 return;
620         }
621
622         // Dump size
623         size = DATA_SIZE * replication_state.total_block;
624         if ( 0 == size ){
625                 Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 5, "Can not get Replication memory", __FILE__, __LINE__ );
626                 return;
627         }
628
629         // Memory Dump
630         p = ( unsigned char* )replication_state.replication_memory;
631
632         // Output mode
633         Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 11, "Replication Dump Start ----------------------------", __FILE__, __LINE__ );
634         buf = boost::io::str( boost::format( "Mode is [ %s ]." ) % replication_mode[( int )replication_state.service_status] );
635         Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 12, buf, __FILE__, __LINE__ );
636         buf = boost::io::str( boost::format( "Total Block is [ %u ]" ) % replication_state.total_block );
637         Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 13, buf, __FILE__, __LINE__ );
638
639         // Converts into the binary, and writes it to the file. 
640         for ( h = 0; h < size / DATA_SIZE; h++ ){
641                 buf = boost::io::str( boost::format( "Block Number [ %d ]" ) % h );
642                 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 14, buf, __FILE__, __LINE__ );
643
644                 for ( i = 0; i < DATA_SIZE / LOG_DATA_WIDTH; i++ ){
645                         head = p + h * DATA_SIZE + i * LOG_DATA_WIDTH;
646
647                         // have to cast char to int. because boost::format ignore char with appointed width.
648                         buf = boost::io::str( boost::format(    "%02hhX %02hhX %02hhX %02hhX  %02hhX %02hhX %02hhX %02hhX  "
649                                                                                                         "%02hhX %02hhX %02hhX %02hhX  %02hhX %02hhX %02hhX %02hhX" )
650                                                                                                         % ( int )*head % ( int )*(head+1) % ( int )*(head+2) % ( int )*(head+3)
651                                                                                                         % ( int )*(head+4) % ( int )*(head+5) % ( int )*(head+6)% ( int )*(head+7)
652                                                                                                         % ( int )*(head+8) % ( int )*(head+9) % ( int )*(head+10) % ( int )*(head+11)
653                                                                                                         % ( int )*(head+12) % ( int )*(head+13) % ( int )*(head+14) % ( int )*(head+15) );
654                         Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 15, buf, __FILE__, __LINE__ );
655                 }
656         }
657         Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 16, "Replication Dump End ------------------------------", __FILE__, __LINE__ );
658 }
659
660 //! Chenge Status isActive
661 void            replication::start(){
662         Logger  logger( LOG_CAT_L7VSD_REPLICATION, 7, "replication::start", __FILE__, __LINE__ );
663
664         std::string buf;
665
666         switch ( replication_state.service_status ){
667                 case REPLICATION_MASTER_STOP:
668                         replication_state.service_status = REPLICATION_MASTER;
669
670                         {
671                                 boost::mutex::scoped_lock       lock( replication_thread_mutex );
672                                 if      ( replication_flag != EXIT ){
673                                          replication_flag = RUNNING;
674                                 }
675                                 replication_thread_condition.notify_all();
676                         }
677
678                         buf = boost::io::str( boost::format( "Replication start. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
679                         Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 17, buf, __FILE__, __LINE__ );
680                         break;
681                 case REPLICATION_SLAVE_STOP:
682                         replication_state.service_status = REPLICATION_SLAVE;
683                         buf = boost::io::str( boost::format( "Replication start. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
684                         Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 18, buf, __FILE__, __LINE__ );
685                         break;
686                 case REPLICATION_MASTER:
687                         buf = boost::io::str( boost::format( "Could not MASTER start, because already start. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
688                         Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 6, buf, __FILE__, __LINE__ );
689                         break;
690                 case REPLICATION_SLAVE:
691                         buf = boost::io::str( boost::format( "Could not SALVE start, because already start. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
692                         Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 7, buf, __FILE__, __LINE__ );
693                         break;
694                 default:
695                         buf = boost::io::str( boost::format( "Could not start, because mode is %s." ) % replication_mode[( int )replication_state.service_status] );
696                         Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 8, buf, __FILE__, __LINE__ );
697                         break;
698         }
699 }
700
701 //! Chenge Status isStop
702 void            replication::stop(){
703         Logger  logger( LOG_CAT_L7VSD_REPLICATION, 8, "replication::stop", __FILE__, __LINE__ );
704
705         std::string buf;
706
707         switch ( replication_state.service_status ){
708                 case REPLICATION_MASTER:
709
710                         {
711                                 boost::mutex::scoped_lock       lock( replication_thread_mutex );
712                                 if      ( replication_flag != EXIT ){
713                                          replication_flag = WAIT;
714                                 }
715                         }
716
717                         replication_state.service_status = REPLICATION_MASTER_STOP;
718                         buf = boost::io::str( boost::format( "Replication stop. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
719                         Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 19, buf, __FILE__, __LINE__ );
720                         break;
721                 case REPLICATION_SLAVE:
722                         replication_state.service_status = REPLICATION_SLAVE_STOP;
723                         buf = boost::io::str( boost::format( "Replication stop. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
724                         Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 20, buf, __FILE__, __LINE__ );
725                         break;
726                 case REPLICATION_MASTER_STOP:
727                         buf = boost::io::str( boost::format( "Could not MASTER stop, because already stop. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
728                         Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 9, buf, __FILE__, __LINE__ );
729                         break;
730                 case REPLICATION_SLAVE_STOP:
731                         buf = boost::io::str( boost::format( "Could not SALVE stop, because already stop. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
732                         Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 10, buf, __FILE__, __LINE__ );
733                         break;
734                 default:
735                         buf = boost::io::str( boost::format( "Could not start, because mode is %s." ) % replication_mode[( int )replication_state.service_status] );
736                         Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 11, buf, __FILE__, __LINE__ );
737                         break;
738         }
739 }
740
741 //! Compulsion reproduction execution
742 void            replication::force_replicate(){
743         Logger  logger( LOG_CAT_L7VSD_REPLICATION, 9, "replication::force_replicate", __FILE__, __LINE__ );
744
745         int                                     send_ret = -1;
746         error_code                      interval_ret;
747         int                                     ms_time = 0;
748         struct timespec         time;
749         Parameter                       param;
750
751         std::string     buf;
752
753         // Check by continuous initialize.
754         if ( REPLICATION_MASTER != replication_state.service_status && REPLICATION_MASTER_STOP != replication_state.service_status ){
755                 // Initialization has already been done.
756                                         buf = boost::io::str( boost::format( "Could not compulsion replication. Mode is different. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
757                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 15, buf, __FILE__, __LINE__ );
758                 return;
759         } else if ( REPLICATION_MASTER_STOP == replication_state.service_status ){
760                                         buf = boost::io::str( boost::format( "Can not replication compulsorily, because mode is %s." ) % replication_mode[( int )replication_state.service_status] );
761                 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 21, buf, __FILE__, __LINE__ );
762                 return;
763         }
764
765         // Replication memory is NULL
766         if ( NULL == replication_state.replication_memory ){
767                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 5, "Replication memory is NULL.", __FILE__, __LINE__ );
768                 return;
769         }
770
771         // Component memory is NULL
772         if ( NULL == replication_state.component_memory ){
773                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 6, "Component memory is NULL.", __FILE__, __LINE__ );
774                 return;
775         }
776
777         // set send interval
778         time.tv_sec = 0;
779         time.tv_nsec = 0;
780         ms_time = param.get_int( PARAM_COMP_REPLICATION, "compulsorily_interval", interval_ret );
781
782         if ( interval_ret ){
783                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 16, "Could not get interval value for replication compulsorily.", __FILE__, __LINE__ );
784                 return;
785         }
786
787         if ( ms_time < MIN_COMPULSORILRY_INTERVAL || MAX_COMPULSORILRY_INTERVAL < ms_time ){
788                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 17, "Invalid compulsorily interval.", __FILE__, __LINE__ );
789                 return;
790         }
791
792         // Thread stop
793         {
794                 boost::mutex::scoped_lock       lock( replication_thread_mutex );
795                 if      ( replication_flag != EXIT ){
796                          replication_flag = WAIT;
797                 }
798         }
799
800         time.tv_nsec =  ( long )( ms_time * 1000000 );
801
802         // set last send block is c maximum block
803         replication_state.last_send_block = replication_state.total_block-1;
804
805         for ( unsigned int i = 0; i < replication_state.total_block; i++ ){
806                 // set compulsorily interval.
807                 nanosleep( &time, NULL );
808
809                 send_ret = send_data();
810
811                 if ( 0 != send_ret ){
812                         Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 3, "Send data is Failed.", __FILE__, __LINE__ );
813                         goto END;
814                 }
815
816                 // set last send block number
817                 if ( replication_state.last_send_block < replication_state.total_block-1 ){
818                         replication_state.last_send_block += 1;
819                 } else if ( replication_state.last_send_block == replication_state.total_block-1 ){
820                         replication_state.last_send_block = 0;
821                 } else {
822                         Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 18, "Last send block number is illegal.", __FILE__, __LINE__ );
823                         goto END;
824                 }
825                 buf = boost::io::str( boost::format( "Data sending succeeded. Send block number : %u Version : %llu" ) % replication_state.last_send_block % (unsigned long long)replication_state.surface_block_no );
826                 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 22, buf, __FILE__, __LINE__ );
827
828                 // surface block number is change
829                 if ( replication_state.total_block == replication_state.last_send_block + 1 ){
830                         // Synchronization is executed. 
831                         memcpy( replication_state.replication_memory, replication_state.component_memory, replication_state.total_block*DATA_SIZE );
832
833                         // make new serial
834                         replication_state.surface_block_no = (uint64_t)make_serial();
835                         if ( 0 == replication_state.surface_block_no ){
836                                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 19, "Could not get serial number.", __FILE__, __LINE__ );
837                                 goto END;
838                         }
839                 }
840         }
841
842         Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 23, "Replication compulsorily is success.", __FILE__, __LINE__ );
843
844 END:
845         // Thread rusume
846         {
847                 boost::mutex::scoped_lock       lock( replication_thread_mutex );
848                 if      ( replication_flag != EXIT ){
849                          replication_flag = RUNNING;
850                 }
851                 replication_thread_condition.notify_all();
852         }
853 }
854
855 //! Interval Re-setting
856 void            replication::reset(){
857         Logger  logger( LOG_CAT_L7VSD_REPLICATION, 10, "replication::reset", __FILE__, __LINE__ );
858
859         error_code                      ret;
860         unsigned short          value;
861         std::string                     buf;
862         Parameter                       param;
863
864         // Check Parameter exists
865         value = param.get_int( PARAM_COMP_REPLICATION, "interval", ret );
866         if ( ret ){
867                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 20, "Not change re-setting value.", __FILE__, __LINE__ );
868                 return;
869         }
870
871         // Check interval
872         if ( value < MIN_INTERVAL || MAX_INTERVAL < value ){
873                 buf = boost::io::str( boost::format( "Invalid Interval value. value : %d" ) % ( int )value );
874                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 21, buf, __FILE__, __LINE__ );
875                 return;
876         }
877         //set interval
878         replication_info.interval =  value;
879 }
880
881 //! Get Status
882 //! @return REPLICATION_MODE_TAG enumration
883 replication::REPLICATION_MODE_TAG       replication::get_status(){
884         Logger  logger( LOG_CAT_L7VSD_REPLICATION, 11, "replication::get_status", __FILE__, __LINE__ );
885
886         return replication_state.service_status;
887 }
888
889 //! Send function
890 int                     replication::handle_send(){
891         Logger  logger( LOG_CAT_L7VSD_REPLICATION, 12, "replication::handle_send", __FILE__, __LINE__ );
892
893         int send_ret = -1;
894         std::string buf;
895         std::map<std::string, mutex_ptr>::iterator      itr;
896
897         // Check by continuous initialize.
898         if ( REPLICATION_MASTER != replication_state.service_status && REPLICATION_MASTER_STOP != replication_state.service_status ){
899                 // Initialization has already been done.
900                 buf = boost::io::str( boost::format( "Can not send_callback. Mode is different.  mode : %s" ) % replication_mode[( int )replication_state.service_status] );
901                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 22, buf, __FILE__, __LINE__ );
902                 return -1;
903         } else if ( REPLICATION_MASTER_STOP == replication_state.service_status ){ 
904                 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 24, "Can not send Replication data, because mode is MASTER_STOP.", __FILE__, __LINE__ );
905                 return 0;
906         }
907
908         // Replication memory is NULL
909         if ( NULL == replication_state.replication_memory ){
910                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 7, "Replication memory is NULL.", __FILE__, __LINE__ );
911                 return -1;
912         }
913
914         // Component memory is NULL
915         if ( NULL == replication_state.component_memory ){
916                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 8, "Component memory is NULL.", __FILE__, __LINE__ );
917                 return -1;
918         }
919
920         send_ret = send_data();
921         if ( 0 != send_ret ){
922                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 4, "Send data is Failed.", __FILE__, __LINE__ );
923                 return -1;
924         }
925
926         // set last send block number
927         if ( replication_state.last_send_block < replication_state.total_block-1 )      {
928                 replication_state.last_send_block += 1;
929         } else if ( replication_state.last_send_block == replication_state.total_block-1 ){
930                 replication_state.last_send_block = 0;
931         } else {
932                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 23, "Last send block number is illegal.", __FILE__, __LINE__ );
933                 return -1;
934         }
935
936         // surface block number is change
937         if ( replication_state.total_block == replication_state.last_send_block + 1 ){
938                 // Synchronization is executed. 
939                 memcpy( replication_state.replication_memory, replication_state.component_memory, replication_state.total_block*DATA_SIZE );
940
941                 // make new serial
942                 replication_state.surface_block_no = (uint64_t)make_serial();
943                 if ( 0 == replication_state.surface_block_no ){
944                         Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 24, "Could not get serial number .", __FILE__, __LINE__ );
945                         return -1;
946                 }
947         }
948
949         return 0;
950 }
951
952 //! Callback function
953 void            replication::handle_receive( const boost::system::error_code& err, size_t size ){
954         int recv_ret;
955         std::string buf;
956
957         if ( err ){
958                 if ( boost::system::errc::operation_canceled != err.value() ){
959                         Logger::putLogInfo( LOG_CAT_L7VSD_SYSTEM, 1, err.message(), __FILE__, __LINE__ );
960                 }
961                 return;
962         }
963
964         // Check by continuous initialize.
965         if ( REPLICATION_SLAVE != replication_state.service_status && REPLICATION_SLAVE_STOP != replication_state.service_status ){
966                 // Initialization has already been done.
967                 buf = boost::io::str( boost::format( "Can not receive_callback. Mode is different.  mode : %s" ) % replication_mode[( int )replication_state.service_status] );
968                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 25, buf, __FILE__, __LINE__ );
969                 return;
970         } else if ( REPLICATION_SLAVE_STOP == replication_state.service_status ){
971                 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 25, "Can not receive Replication data, because mode is SLAVE_STOP.", __FILE__, __LINE__ );
972                 return;
973         }
974
975         // Replication memory is NULL
976         if ( NULL == replication_state.replication_memory ){
977                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 9, "Replication memory is NULL.", __FILE__, __LINE__ );
978                 return;
979         }
980
981         // Component memory is NULL
982         if ( NULL == replication_state.component_memory ){
983                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 10, "Component memory is NULL.", __FILE__, __LINE__ );
984                 return;
985         }
986
987         // Surface block array memory is NULL
988         if ( NULL == replication_state.surface_block_array_ptr){
989                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 11, "Surface block array pointer is NULL.", __FILE__, __LINE__ );
990                 return;
991         }
992
993         if ( size != sizeof ( struct replication_data_struct ) ){
994                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 5, "Failed in the reception processing of data because of illegal receive size.", __FILE__, __LINE__ );
995                 return;
996         }
997
998         recv_ret = recv_data();
999         if ( 0 != recv_ret ){
1000                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 6, "Failed in the reception processing of data because of illegal receive data.", __FILE__, __LINE__ );
1001                 return;
1002         }
1003
1004         // set surface block
1005         replication_state.surface_block_array_ptr[replication_data.block_num] = replication_data.serial;
1006
1007         // set last recv block number
1008         if ( replication_state.last_recv_block < replication_state.total_block-1 ){
1009                 replication_state.last_recv_block += 1;
1010         } else if ( replication_state.last_recv_block == replication_state.total_block-1 ){
1011                 replication_state.last_recv_block = 0;
1012         } else {
1013                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 26, "Last receive block number is illegal.", __FILE__, __LINE__ );
1014                 return;
1015         }
1016
1017 //std::cout << "slave " << bind_endpoint.address() << ":" << bind_endpoint.port() << "\n";
1018 //      replication_receive_socket.async_receive( boost::asio::buffer( &replication_data, sizeof( struct replication_data_struct ) ),
1019 //                                                                                      boost::bind( &replication::handle_receive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
1020         replication_receive_socket.async_receive_from( boost::asio::buffer( &replication_data, sizeof( struct replication_data_struct ) ),
1021                                                                                         bind_endpoint,
1022                                                                                         boost::bind( &replication::handle_receive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
1023 }
1024
1025 //! Lock replication memory
1026 //! @param[in] component_id is the one to identify the component.
1027 //! @retval 0 Success
1028 //! @retval -1 Error
1029 int                     replication::lock( const std::string& inid ){
1030         Logger  logger( LOG_CAT_L7VSD_REPLICATION, 13, "replication::lock", __FILE__, __LINE__ );
1031
1032         std::map<std::string, mutex_ptr>::iterator      itr;
1033         std::string buf;
1034
1035         itr = replication_mutex.find( inid );
1036         if ( itr == replication_mutex.end() ){
1037                 buf = boost::io::str( boost::format( "Could not find %s." ) % inid );
1038                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 27, buf, __FILE__, __LINE__ );
1039                 return -1;
1040         }
1041
1042         itr->second->lock();
1043
1044         return 0;
1045 }
1046
1047 //! Unlock replication memory
1048 //! @param[in] component_id is the one to identify the component.
1049 //! @retval 0 Success
1050 //! @retval -1 Error
1051 int                     replication::unlock( const std::string& inid ){
1052         Logger  logger( LOG_CAT_L7VSD_REPLICATION, 14, "replication::unlock", __FILE__, __LINE__ );
1053
1054         std::map<std::string, mutex_ptr>::iterator      itr;
1055         std::string buf;
1056
1057         itr = replication_mutex.find( inid );
1058         if ( itr == replication_mutex.end() ){
1059                 buf = boost::io::str( boost::format( "Could not find %s." ) % inid );
1060                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 28, buf, __FILE__, __LINE__ );
1061                 return -1;
1062         }
1063
1064         itr->second->try_lock();
1065         itr->second->unlock();
1066
1067         return 0;
1068 }
1069
1070 //! Refer replication memory mutex
1071 //! @param[in] component_id is the one to identify the component.
1072 //! @param[out] shared_ptr of mutex
1073 //! @retval 0 Success
1074 //! @retval -1 Error
1075 int                     replication::refer_lock_mutex( const std::string& inid, mutex_ptr& outmutex ){
1076         Logger  logger( LOG_CAT_L7VSD_REPLICATION, 15, "replication::refer_lock_mutex", __FILE__, __LINE__ );
1077
1078         std::map<std::string, mutex_ptr>::iterator      itr;
1079         std::string buf;
1080
1081         itr = replication_mutex.find( inid );
1082         if ( itr == replication_mutex.end() ){
1083                 buf = boost::io::str( boost::format( "Could not find %s." ) % inid );
1084                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 29, buf, __FILE__, __LINE__ );
1085                 return -1;
1086         }
1087
1088         outmutex = itr->second;
1089
1090         return 0;
1091 }
1092
1093 //! Parameter Check
1094 //! @retval 0 Success
1095 //! @retval -1 Error
1096 int                     replication::check_parameter(){
1097
1098         int ret = -1;
1099         size_t sum=0;
1100         std::string buf;
1101
1102 //std::cout << "check1 " << replication_info.ip_addr << ":" << replication_info.service_name << "\n";
1103         // Whether IP and the port are effective is confirmed.
1104         try{
1105 //              replication_endpoint = boost::asio::ip::udp::endpoint( boost::asio::ip::address::from_string( replication_info.ip_addr ), boost::lexical_cast<unsigned short>( replication_info.service_name ) );
1106
1107                 boost::asio::ip::udp::resolver                          udp_resolver( service_io );
1108                 boost::asio::ip::udp::resolver::query           udp_query( replication_info.ip_addr, replication_info.service_name );
1109                 boost::asio::ip::udp::resolver::iterator        itr = udp_resolver.resolve( udp_query );
1110                 replication_endpoint = *itr;
1111         }
1112         catch(...){
1113                 buf = boost::io::str( boost::format( "Failed to get IP or Service Name.(%s:%s)" ) % replication_info.ip_addr % replication_info.service_name );
1114                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_ENDPOINT, 1, buf, __FILE__, __LINE__ );
1115                 goto END;
1116         }
1117 //std::cout << "check2 " << replication_endpoint.address() << ":" << replication_endpoint.port() << "\n";
1118
1119         // get ip address from nic
1120         try{
1121                 struct sockaddr_in addr;
1122
1123                 //Networkdevice struct define
1124                 struct ifreq ifr;
1125                 memset( &ifr, 0, sizeof( struct ifreq ) );
1126
1127                 //create socket
1128                 int fd  = socket( AF_INET, SOCK_DGRAM, 0 );
1129                 if ( fd >= 0 ){
1130                         //get networkdevice struct for IPv4
1131                         strncpy( ifr.ifr_name, replication_info.nic.c_str(), IFNAMSIZ-1 );
1132                         ifr.ifr_addr.sa_family = AF_INET;
1133
1134                         if ( ioctl( fd, SIOCGIFADDR, &ifr ) >= 0 ){
1135                                 memcpy( &addr, &(ifr.ifr_addr), sizeof( struct sockaddr_in ) );
1136                         }
1137
1138                         close( fd );
1139                 }
1140
1141                 bind_endpoint = boost::asio::ip::udp::endpoint( boost::asio::ip::address::from_string( inet_ntoa( addr.sin_addr ) ), boost::lexical_cast<unsigned short>( replication_info.service_name ) );
1142         }
1143         catch(...){
1144                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_ENDPOINT, 2, "You can not get IP address from nic.", __FILE__, __LINE__ );
1145                 goto END;
1146         }
1147 //std::cout << "check3 " << bind_endpoint.address() << ":" << bind_endpoint.port() << "\n";
1148
1149         // Interval check
1150         if ( ( MIN_INTERVAL>replication_info.interval ) || ( MAX_INTERVAL<replication_info.interval ) ){
1151                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 30, "Invalid Interval value", __FILE__, __LINE__ );
1152                 goto END;
1153         }
1154         // Components ID check
1155         // Components Size check
1156         if ( 0 == replication_info.component_num ){
1157                 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 26, "Can not get component, because component is 0.", __FILE__, __LINE__ );
1158                 ret = 0;
1159                 goto END;
1160         }
1161         for ( int i=0; i < replication_info.component_num; i++ ){
1162                 sum += replication_info.component_info[i].block_size ;
1163                 for ( int j=i+1; j<replication_info.component_num; j++ ){
1164                         if ( replication_info.component_info[j].id == replication_info.component_info[i].id ){
1165                                 buf = boost::io::str( boost::format( "Component ID was repeated.(%s)" ) % replication_info.component_info[i].id );
1166                                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 31, buf, __FILE__, __LINE__ );
1167                                 goto END;
1168                         }
1169                 }
1170         }
1171         if ( sum > CMP_BLOCK_MAX ){
1172                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 32, "Total component size is too large.", __FILE__, __LINE__ );
1173                 goto END;
1174         }
1175         ret = 0;
1176 END:
1177
1178         Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 27, ( ( 0 == ret ) ? "Parameter Check OK." :  "Parameter Check NG." ), __FILE__, __LINE__ );
1179         return ret;
1180 }
1181
1182
1183
1184 //! Get Replication Memory
1185 //! @return memory Replication memory
1186 //! @retval memory memory get Success
1187 //! @retval NULL Error
1188 void*           replication::getrpl(){
1189         unsigned int total_block = replication_state.total_block;
1190         void*   memory = NULL;
1191
1192         // Get replication memory
1193         memory = malloc( total_block*DATA_SIZE );
1194
1195         // malloc Error
1196         if ( ( void* )NULL == memory ){
1197                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 12, "Replication memory is Malloc Error.", __FILE__, __LINE__ );
1198                 return NULL;
1199         }
1200         memset(memory,0,total_block*DATA_SIZE);
1201
1202         return memory;
1203 }
1204
1205 //! Get Component Memory
1206 //! @return memory Component memory
1207 //! @retval memory memory get Success
1208 //! @retval NULL Error
1209 void*           replication::getcmp(){
1210         unsigned int total_block = replication_state.total_block;
1211         void*   memory = NULL ;
1212
1213         // Get component memory
1214         memory = malloc( total_block*DATA_SIZE );
1215
1216         // malloc Error
1217         if ( ( void* )NULL == memory ){
1218                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 13, "Component memory is Malloc Error.", __FILE__, __LINE__ );
1219                 return NULL;
1220         }
1221         memset(memory,0,total_block*DATA_SIZE);
1222
1223         return memory;
1224 }
1225
1226 //! Get Surface Number Memory
1227 //! @return memory Component memory
1228 //! @retval memory memory get Success
1229 //! @retval NULL Error
1230 uint64_t*       replication::getsrf(){
1231         unsigned int total_block = replication_state.total_block;
1232         uint64_t*       memory = NULL;
1233
1234         // Get memory
1235         memory = ( uint64_t* )malloc( total_block*sizeof(uint64_t) );
1236
1237         // malloc Error
1238         if ( ( uint64_t* )NULL == memory ){
1239                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 14, "Surface info address is Malloc Error.", __FILE__, __LINE__ );
1240                 return NULL;
1241         }
1242         memset( memory, 0, total_block*sizeof(uint64_t) );
1243
1244         return memory;
1245 }
1246
1247 //! Make serial number
1248 //! @return Serial number
1249 //! @retval nonzero Serial number
1250 //! @retval 0 Error
1251 unsigned long long              replication::make_serial(){
1252         unsigned long long int serial_num;
1253         struct timespec current_time;
1254
1255         // get time by clock_gettime
1256         if ( clock_gettime(CLOCK_REALTIME, &current_time) == -1 ){
1257                 // failre.
1258                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 7, "You failed to get of time.", __FILE__, __LINE__ );
1259                 serial_num = 0;
1260         } else {
1261                 // make a serial succeeds.
1262                 serial_num = ( unsigned long long int )current_time.tv_sec * 1000000 + ( unsigned long long int ) current_time.tv_nsec / 1000;
1263         }
1264
1265         return serial_num;
1266 }
1267
1268 //! Send transfer data to standby server
1269 //! @param[in] data Points to input data from external program. This will be send to standby server.
1270 //! @retval 0 Success
1271 //! @retval -1 Error
1272 int                     replication::send_data(){
1273         char*   send_memory;
1274         size_t send_byte;
1275
1276         // make replication data struct
1277         //initialize
1278         memset( &replication_data, 0, sizeof( struct replication_data_struct ) );
1279         // Set replication id
1280         replication_data.id = REPLICATION_ID;
1281         // set block_num (replication_state.last_send_block + 1) and Range check of memory
1282         if ( replication_state.last_send_block < replication_state.total_block - 1 ){
1283                 replication_data.block_num = replication_state.last_send_block + 1;
1284         } else if ( replication_state.last_send_block == replication_state.total_block - 1 ){
1285                 replication_data.block_num = 0;
1286         } else {
1287                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 33, "Send block number is too large.", __FILE__, __LINE__ );
1288                 return -1;
1289         }
1290
1291         // set serial
1292         replication_data.serial = replication_state.surface_block_no;
1293         if ( 1 == replication_data.serial &&  0 == replication_data.block_num ){
1294                 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 28, "Serial number is 1, first send processing.", __FILE__, __LINE__ );
1295         }
1296
1297         // set data size (sizeof(replication_data))
1298         replication_data.size = sizeof( struct replication_data_struct );
1299
1300         if ( replication_data.size > SEND_DATA_SIZE ){
1301                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 34, "Send block data size is too large.", __FILE__, __LINE__ );
1302                 return -1;
1303         }
1304
1305         // set replication data (1 block)
1306         send_memory = ( char* )replication_state.replication_memory + DATA_SIZE*replication_data.block_num;
1307         memcpy( replication_data.data, send_memory, DATA_SIZE );
1308
1309 #if     0
1310         // send to data
1311         send_byte = replication_send_socket.send_to( boost::asio::buffer( &replication_data, sizeof( struct replication_data_struct ) ), replication_endpoint );
1312 #else
1313         boost::system::error_code err;
1314         std::string buf;
1315
1316         // Whether IP and the port are effective is confirmed.
1317         try{
1318 //              replication_endpoint = boost::asio::ip::udp::endpoint( boost::asio::ip::address::from_string( replication_info.ip_addr ), boost::lexical_cast<unsigned short>( replication_info.service_name ) );
1319
1320                 boost::asio::ip::udp::resolver                          udp_resolver( service_io );
1321                 boost::asio::ip::udp::resolver::query           udp_query( replication_info.ip_addr, replication_info.service_name );
1322                 boost::asio::ip::udp::resolver::iterator        itr = udp_resolver.resolve( udp_query );
1323                 replication_endpoint = *itr;
1324         }
1325         catch(...){
1326                 buf = boost::io::str( boost::format( "Failed to get IP or Service Name.(%s:%s)" ) % replication_info.ip_addr % replication_info.service_name );
1327                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_ENDPOINT, 3, buf, __FILE__, __LINE__ );
1328                 return -1;
1329         }
1330
1331 //std::cout << "master " << replication_endpoint.address() << ":" << replication_endpoint.port() << "\n";
1332         replication_send_socket.connect( replication_endpoint, err );
1333         if ( err ){
1334                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 8, err.message(), __FILE__, __LINE__ );
1335                 return -1;
1336         }
1337
1338         // send to data
1339         send_byte = replication_send_socket.send( boost::asio::buffer( &replication_data, sizeof( struct replication_data_struct ) ) );
1340         replication_send_socket.close();
1341 #endif
1342         if ( sizeof( struct replication_data_struct ) != send_byte ){
1343                 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 9, "Data send error.", __FILE__, __LINE__ );
1344                 return -1;
1345         }
1346
1347         return 0;
1348 }
1349
1350 //! Receive transfer data from active server
1351 //! @param[out]  recv_data Points to output data from external program.
1352 //! @retval 0 Success
1353 //! @retval -1 Error
1354 int                     replication::recv_data(){
1355         char    *recv_memory;
1356         std::map<std::string, mutex_ptr>::iterator      itr;
1357
1358         // Check replication ID
1359         if ( replication_data.id != REPLICATION_ID ){
1360                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 35, "Get invalid data.", __FILE__, __LINE__ );
1361                 return -1;
1362         }
1363
1364         // block number is over
1365         if ( replication_data.block_num > replication_state.total_block ){
1366                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 36, "Recv block number is too large.", __FILE__, __LINE__ );
1367                 return -1;
1368         }
1369
1370         // Comparison of serial numbers
1371         if ( replication_data.serial < replication_state.surface_block_array_ptr[replication_data.block_num] ){
1372                 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 37, "Recv replication data is too old.", __FILE__, __LINE__ );
1373                 return -1;
1374         }
1375
1376         // Substitution of version
1377         replication_state.surface_block_array_ptr[replication_data.block_num] = replication_data.serial;
1378
1379         // set recv data
1380         recv_memory = ( char* )replication_state.replication_memory + DATA_SIZE * replication_data.block_num;
1381
1382         // received data.
1383         memcpy( recv_memory, &replication_data.data, DATA_SIZE );
1384
1385         // set surface block
1386         replication_state.surface_block_array_ptr[replication_data.block_num] = replication_data.serial;
1387
1388         // Surface numbers are compared.
1389         for ( unsigned int i = 0; i < replication_state.total_block-1; i++ ){
1390                 if ( replication_state.surface_block_array_ptr[i] != replication_state.surface_block_array_ptr[i+1] ){
1391                         break;
1392                 }
1393                 if ( i == replication_state.total_block-2 ){
1394                         // Lock all compornent area
1395                         for ( itr = replication_mutex.begin(); itr != replication_mutex.end(); itr++ ){
1396                                 itr->second->lock();
1397                         }
1398
1399                         // Synchronization is executed.
1400                         memcpy(replication_state.component_memory, replication_state.replication_memory, replication_state.total_block*DATA_SIZE );
1401
1402                         // Unlock all compornent area
1403                         for ( itr = replication_mutex.begin(); itr != replication_mutex.end(); itr++ ){
1404                                 itr->second->unlock();
1405                         }
1406                 }
1407         }
1408
1409         return 0;
1410 }
1411
1412 //! Release Replication Memory
1413 void            replication::releaserpl(){
1414         if ( NULL != replication_state.replication_memory ){
1415                 free(replication_state.replication_memory);
1416         }
1417         replication_state.replication_memory = NULL;
1418 }
1419
1420 //! Release Components Memory
1421 void            replication::releasecmp(){
1422         if ( NULL != replication_state.component_memory){
1423                 free(replication_state.component_memory);
1424         }
1425         replication_state.component_memory = NULL;
1426 }
1427
1428 //! Release Surface Memory
1429 void            replication::releasesrf(){
1430         if ( NULL != replication_state.surface_block_array_ptr ){
1431                 free(replication_state.surface_block_array_ptr);
1432         }
1433         replication_state.surface_block_array_ptr=NULL;
1434 }
1435
1436 //! Replication thread
1437 void            replication::send_thread(){
1438         bool    mode = false;
1439         REPLICATION_THREAD_TAG  flag;
1440         {
1441                 boost::mutex::scoped_lock lock( replication_thread_mutex );
1442                 flag = replication_flag;
1443         }
1444         for ( ; ; ){
1445                 if ( flag == WAIT ){
1446                         boost::mutex::scoped_lock       lock( replication_thread_mutex );
1447                         replication_thread_condition.wait( lock );
1448                 } else if ( flag == EXIT ){
1449                         break;
1450                 } else {
1451                         if ( false == mode ){
1452                                 usleep( replication_info.interval );
1453                         } else {
1454                                 if ( -1 == handle_send() ){
1455                                 }
1456                         }
1457                         mode = !mode;
1458                 }
1459                 {
1460                         boost::mutex::scoped_lock       lock( replication_thread_mutex );
1461                         flag = replication_flag;
1462                 }
1463         }
1464 }
1465
1466 //! io_service thread
1467 void            replication::service_thread(){
1468         REPLICATION_THREAD_TAG  flag;
1469         {
1470                 boost::mutex::scoped_lock lock( service_thread_mutex );
1471                 flag = service_flag;
1472         }
1473         for ( ; ; ){
1474                 if ( flag == WAIT || flag == WAIT_REQ ){
1475                         boost::mutex::scoped_lock       lock( service_thread_mutex );
1476                         service_flag = WAIT;
1477                         service_thread_condition.wait( lock );
1478                 } else if ( flag == EXIT ){
1479                         break;
1480                 } else {
1481                         service_io.poll();
1482                 }
1483                 {
1484                         boost::mutex::scoped_lock       lock( service_thread_mutex );
1485                         flag = service_flag;
1486                 }
1487         }
1488 }
1489
1490 }       //namespace l7vs