2 * @file replication.cpp
3 * @brief Replication class
5 * L7VSD: Linux Virtual Server for Layer7 Load Balancing
\r
6 * Copyright (C) 2009 NTT COMWARE Corporation.
\r
8 * This program is free software; you can redistribute it and/or
\r
9 * modify it under the terms of the GNU Lesser General Public
\r
10 * License as published by the Free Software Foundation; either
\r
11 * version 2.1 of the License, or (at your option) any later version.
\r
13 * This program is distributed in the hope that it will be useful,
\r
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
\r
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
\r
16 * Lesser General Public License for more details.
\r
18 * You should have received a copy of the GNU Lesser General Public
\r
19 * License along with this library; if not, write to the Free Software
\r
20 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
\r
23 **********************************************************************/
\r
24 #include <boost/lexical_cast.hpp>
25 #include <boost/format.hpp>
26 #include "replication.h"
27 #include "parameter.h"
32 //! emun States Type string
33 static const char* replication_mode[] = {
38 "REPLICATION_MASTER_STOP",
39 "REPLICATION_SLAVE_STOP"
43 //! Initialize Replication
46 int replication::initialize(){
47 Logger logger( LOG_CAT_L7VSD_REPLICATION, 1, "replication::initialize", __FILE__, __LINE__ );
50 error_code ip_addr_ret, service_name_ret, nic_ret, interval_ret;
53 // Check by continuous initialize.
54 if ( REPLICATION_OUT != replication_state.service_status ){
55 // Initialization has already been done.
56 buf = boost::io::str( boost::format( "Initialization is a failure, because initialization has already been done. mode : %s" )
57 % replication_mode[( int )replication_state.service_status] );
58 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 1, buf, __FILE__, __LINE__ );
62 // Check the Parameter exists
63 replication_info.ip_addr = param.get_string( PARAM_COMP_REPLICATION, "ip_addr", ip_addr_ret );
64 replication_info.service_name = param.get_string( PARAM_COMP_REPLICATION, "service_name", service_name_ret );
65 replication_info.nic = param.get_string( PARAM_COMP_REPLICATION, "nic", nic_ret );
66 replication_info.interval = param.get_int( PARAM_COMP_REPLICATION, "interval", interval_ret );
69 if ( ip_addr_ret && service_name_ret && nic_ret && interval_ret ){
70 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 1, "Required item is not set in l7vs.", __FILE__, __LINE__ );
72 replication_state.service_status = REPLICATION_SINGLE;
78 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 2, "IP Address is not set.", __FILE__, __LINE__ );
80 replication_state.service_status = REPLICATION_SINGLE;
84 if ( service_name_ret ){
85 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 3, "Port is not set.", __FILE__, __LINE__ );
87 replication_state.service_status = REPLICATION_SINGLE;
92 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 4, "NIC is not set.", __FILE__, __LINE__ );
94 replication_state.service_status = REPLICATION_SINGLE;
99 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 5, "Interval is not set.", __FILE__, __LINE__ );
101 replication_state.service_status = REPLICATION_SINGLE;
105 // Failed in the acquisition of IP
106 if ( replication_info.ip_addr == "" ){
107 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 6, "Could not get IP Address.", __FILE__, __LINE__ );
109 replication_state.service_status = REPLICATION_SINGLE;
113 // Failed in the acquisition of Port
114 if ( replication_info.service_name == "" ){
115 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 7, "Could not get Port.", __FILE__, __LINE__ );
117 replication_state.service_status = REPLICATION_SINGLE;
121 // Failed in the acquisition of NIC
122 if ( replication_info.nic == "" ){
123 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 8, "Could not get NIC.", __FILE__, __LINE__ );
125 replication_state.service_status = REPLICATION_SINGLE;
129 // Variable that sets ID
131 std::string key_size;
132 error_code id_ret, size_ret;
133 mutex_ptr component_mutex;
135 replication_mutex.clear();
137 // Get Component infomation
138 for ( int i=0; i<CMP_MAX; i++){
139 key_id = boost::io::str( boost::format( "cmponent_id_%02d" ) % i );
140 key_size = boost::io::str( boost::format( "cmponent_size_%02d" ) % i );
142 // ID and the Size exist
143 replication_info.component_info[i].id = param.get_string( PARAM_COMP_REPLICATION, key_id, id_ret );
144 replication_info.component_info[i].block_size = param.get_int( PARAM_COMP_REPLICATION, key_size, size_ret );
145 if ( id_ret || size_ret ){
148 if ( replication_info.component_info[i].id == "" ){
149 buf = boost::io::str( boost::format( "Could not get %s." ) % key_id );
150 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 9, buf, __FILE__, __LINE__ );
152 replication_state.service_status = REPLICATION_SINGLE;
156 // Count block head number
157 replication_info.component_info[i].block_head = ( 0 == i ? 0 :
158 ( replication_info.component_info[i-1].block_head + replication_info.component_info[i-1].block_size ) );
159 replication_state.total_block += replication_info.component_info[i].block_size;
161 // Number of Component
162 replication_info.component_num++;
164 component_mutex = mutex_ptr( new boost::mutex );
165 replication_mutex.insert( std::pair<std::string, mutex_ptr>( replication_info.component_info[i].id, component_mutex ) );
168 // Check the Parameters value
169 if ( 0 != check_parameter() ){
170 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 10, "Failed in parameter check.", __FILE__, __LINE__ );
172 replication_state.service_status = REPLICATION_SINGLE;
176 // The memory is 0 or less.
177 if ( 0 >= replication_state.total_block){
179 replication_state.service_status = REPLICATION_SINGLE;
183 replication_state.last_send_block = replication_state.total_block-1;
185 // Get the Replication memory
186 replication_state.replication_memory = getrpl();
187 // Confirmation of Replication memory
188 if ( NULL == replication_state.replication_memory ){
189 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 1, "Replication memory is NULL.", __FILE__, __LINE__ );
191 replication_state.service_status = REPLICATION_SINGLE;
194 // Get the Components memory
195 replication_state.component_memory = getcmp();
196 // Confirmation of Components memory
197 if ( NULL == replication_state.component_memory ){
200 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 2, "Components memory is NULL.", __FILE__, __LINE__ );
202 replication_state.service_status = REPLICATION_SINGLE;
206 // Memory for Switch Surface Number
207 replication_state.surface_block_array_ptr = getsrf();
208 if ( NULL == replication_state.surface_block_array_ptr ){
212 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 3, "Surface infomation memory is NULL.", __FILE__, __LINE__ );
214 replication_state.service_status = REPLICATION_SINGLE;
218 // Status Set to Slave
219 if ( 0 != set_slave() ){
225 replication_state.service_status = REPLICATION_SINGLE;
229 replication_state.service_status = REPLICATION_SLAVE;
232 boost::mutex::scoped_lock lock( replication_thread_mutex );
234 replication_flag = WAIT;
236 replication_thread_ptr = thread_ptr( new boost::thread( boost::bind ( &replication::send_thread, this ) ) );
239 boost::mutex::scoped_lock lock( service_thread_mutex );
241 service_flag = RUNNING;
243 service_thread_ptr = thread_ptr( new boost::thread( boost::bind ( &replication::service_thread, this ) ) );
245 buf = boost::io::str( boost::format( "Initialized in %s mode." ) % replication_mode[( int )replication_state.service_status] );
246 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 2, buf, __FILE__, __LINE__ );
251 //! Finalize Replication
252 void replication::finalize(){
253 Logger logger( LOG_CAT_L7VSD_REPLICATION, 2, "replication::finalize", __FILE__, __LINE__ );
256 boost::mutex::scoped_lock lock( replication_thread_mutex );
258 replication_flag = EXIT;
259 replication_thread_condition.notify_all();
261 if ( replication_thread_ptr ){
262 replication_thread_ptr->join();
266 if ( replication_send_socket.is_open() ){
267 replication_send_socket.close();
269 if ( replication_receive_socket.is_open() ){
270 replication_receive_socket.close();
274 boost::mutex::scoped_lock lock( service_thread_mutex );
277 service_thread_condition.notify_all();
279 if ( service_thread_ptr ){
280 service_thread_ptr->join();
283 // Release replication memory
285 // Release component memory
287 // Release surface block memory
290 // reset of replication_state
291 replication_state.send_time = 0;
292 replication_state.last_send_block = 0;
293 replication_state.last_recv_block = 0;
294 replication_state.total_block = 0;
295 replication_state.surface_block_no = 1;
297 // reset of replication_info
298 replication_info.ip_addr = "";
299 replication_info.service_name = "";
300 replication_info.nic = "";
301 replication_info.interval = 0;
302 replication_info.component_num = 0;
303 for ( int loop = 0; loop < CMP_MAX; loop++ ){
304 replication_info.component_info[loop].id = "";
305 replication_info.component_info[loop].block_head = 0;
306 replication_info.component_info[loop].block_size = 0;
308 replication_mutex.clear();
310 replication_endpoint = boost::asio::ip::udp::endpoint();
311 bind_endpoint = boost::asio::ip::udp::endpoint();
314 replication_state.service_status = REPLICATION_OUT;
317 //! Switch Slave to Master
318 void replication::switch_to_master(){
319 Logger logger( LOG_CAT_L7VSD_REPLICATION, 3, "replication::switch_to_master", __FILE__, __LINE__ );
324 switch ( replication_state.service_status ){
325 case REPLICATION_SLAVE:
326 case REPLICATION_SLAVE_STOP:
330 replication_state.service_status = REPLICATION_SINGLE;
336 buf = boost::io::str( boost::format( "Switch to master NG. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
337 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 11, buf, __FILE__, __LINE__ );
339 // Copy from compornent area to replication area.
340 memcpy( replication_state.replication_memory, replication_state.component_memory, replication_state.total_block*DATA_SIZE );
342 if ( REPLICATION_SLAVE == replication_state.service_status ){
344 replication_state.service_status = REPLICATION_MASTER;
347 boost::mutex::scoped_lock lock( replication_thread_mutex );
348 if ( replication_flag != EXIT ){
349 replication_flag = RUNNING;
351 replication_thread_condition.notify_all();
355 replication_state.service_status = REPLICATION_MASTER_STOP;
358 buf = boost::io::str( boost::format( "Switch to master OK. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
359 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 3, buf, __FILE__, __LINE__ );
362 case REPLICATION_SINGLE:
363 buf = boost::io::str( boost::format( "Starting by %s, doesn't shift to MASTER." ) % replication_mode[( int )replication_state.service_status] );
364 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 4, buf, __FILE__, __LINE__ );
367 buf = boost::io::str( boost::format( "Can not switch to master. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
368 Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 1, buf, __FILE__, __LINE__ );
374 int replication::set_master()
377 boost::system::error_code err;
380 if ( replication_send_socket.is_open() ){
381 replication_send_socket.close();
383 if ( replication_receive_socket.is_open() ){
384 replication_receive_socket.cancel();
385 replication_receive_socket.close();
389 boost::mutex::scoped_lock lock( service_thread_mutex );
391 if ( service_flag == RUNNING ){
392 service_flag = WAIT_REQ;
397 while ( service_flag == WAIT_REQ ){
402 boost::mutex::scoped_lock lock( service_thread_mutex );
404 if ( service_flag != EXIT ){
406 service_flag = RUNNING;
408 service_thread_condition.notify_all();
412 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 5, "Initialization of send socket is success.", __FILE__, __LINE__ );
418 //! Switch Master to Slave
419 void replication::switch_to_slave(){
420 Logger logger( LOG_CAT_L7VSD_REPLICATION, 4, "replication::switch_to_slave", __FILE__, __LINE__ );
425 switch ( replication_state.service_status ){
426 case REPLICATION_MASTER:
427 case REPLICATION_MASTER_STOP:
430 boost::mutex::scoped_lock lock( replication_thread_mutex );
431 if ( replication_flag != EXIT ){
432 replication_flag = WAIT;
439 replication_state.service_status = REPLICATION_SINGLE;
445 buf = boost::io::str( boost::format( "Switch to slave NG. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
446 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 12, buf, __FILE__, __LINE__ );
448 // initialize to replication area.
449 memset( replication_state.replication_memory, '\0', replication_state.total_block * DATA_SIZE );
451 if ( REPLICATION_MASTER == replication_state.service_status ){
453 replication_state.service_status = REPLICATION_SLAVE;
456 replication_state.service_status = REPLICATION_SLAVE_STOP;
459 buf = boost::io::str( boost::format( "Switch to slave OK. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
460 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 6, buf, __FILE__, __LINE__ );
463 case REPLICATION_SINGLE:
464 buf = boost::io::str( boost::format( "Starting by %s, doesn't shift to SLAVE." ) % replication_mode[( int )replication_state.service_status] );
465 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 7, buf, __FILE__, __LINE__ );
468 buf = boost::io::str( boost::format( "Can not switch to slave. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
469 Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 2, buf, __FILE__, __LINE__ );
475 int replication::set_slave()
478 boost::system::error_code err;
481 if ( replication_send_socket.is_open() ){
482 replication_send_socket.close();
484 if ( replication_receive_socket.is_open() ){
485 replication_receive_socket.cancel();
486 replication_receive_socket.close();
490 boost::mutex::scoped_lock lock( service_thread_mutex );
492 if ( service_flag != EXIT ){
493 service_flag = WAIT_REQ;
497 while ( service_flag == WAIT_REQ ){
501 // make receive socket
502 //std::cout << "slave " << bind_endpoint.address() << ":" << bind_endpoint.port() << "\n";
503 replication_receive_socket.open( bind_endpoint.protocol(), err );
505 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 1, err.message(), __FILE__, __LINE__ );
508 replication_receive_socket.bind( bind_endpoint, err );
510 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 2, err.message(), __FILE__, __LINE__ );
514 //std::cout << "slave " << bind_endpoint.address() << ":" << bind_endpoint.port() << "\n";
515 replication_receive_socket.async_receive_from( boost::asio::buffer( &replication_data, sizeof( struct replication_data_struct ) ),
517 boost::bind( &replication::handle_receive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
520 boost::mutex::scoped_lock lock( service_thread_mutex );
522 if ( service_flag != EXIT ){
524 service_flag = RUNNING;
526 service_thread_condition.notify_all();
529 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 8, "Initialization of receive socket is success.", __FILE__, __LINE__ );
534 //! Pay replication memory
535 //! @param[in] component_id is the one to identify the component.
536 //! @param[out] size of component use blocks
537 //! @return Replication memory address
538 //! @retval nonnull Replication memory address
539 //! @retval NULL Error
540 void* replication::pay_memory( const std::string& inid, unsigned int& outsize ){
541 Logger logger( LOG_CAT_L7VSD_REPLICATION, 5, "replication::pay_memory", __FILE__, __LINE__ );
547 // Check replication mode.
548 if ( REPLICATION_OUT == replication_state.service_status || REPLICATION_SINGLE == replication_state.service_status){
549 // Check mode that can use the replication.
550 buf = boost::io::str( boost::format( "Improper mode for Replication. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
551 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 9, buf, __FILE__, __LINE__ );
555 if ( NULL == replication_state.component_memory ){
556 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 4, "You can't get memory. Component memory is NULL.", __FILE__, __LINE__ );
560 // search Component ID
561 for ( int i = 0; i < replication_info.component_num; i++ ){
562 // Demand ID is compared with Component ID
563 if ( inid == replication_info.component_info[i].id ){
565 if ( 0 == replication_info.component_info[i].block_size ){
566 Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 3, "Component block size is 0.", __FILE__, __LINE__ );
571 if ( replication_info.component_info[i].block_head < 0 || replication_info.component_info[i].block_head > replication_state.total_block ){
572 buf = boost::io::str( boost::format( "Too many component block. Max is %d." ) % replication_state.total_block );
573 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 13, buf, __FILE__, __LINE__ );
576 // Pay memory address
577 ret = ( char * )replication_state.component_memory + replication_info.component_info[i].block_head * DATA_SIZE;
578 // Nnumber of blocks of ID was returned.
579 outsize = replication_info.component_info[i].block_size;
583 sprintf( str, "Component Info ID : \"%s\". Block size : %d . Head Block No : %d/ Pay memory : %p ",
584 replication_info.component_info[i].id.c_str(),
585 replication_info.component_info[i].block_size,
586 replication_info.component_info[i].block_head,
588 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 10, str, __FILE__, __LINE__ );
593 buf = boost::io::str( boost::format( "Unknown component ID. Component ID : %s" ) % inid );
594 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 14, buf, __FILE__, __LINE__ );
600 void replication::dump_memory(){
601 Logger logger( LOG_CAT_L7VSD_REPLICATION, 6, "replication::dump_memory", __FILE__, __LINE__ );
610 // Check replication mode.
611 if ( REPLICATION_OUT == replication_state.service_status || REPLICATION_SINGLE == replication_state.service_status){
612 buf = boost::io::str( boost::format( "Replication memory dump failure. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
613 Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 4, buf, __FILE__, __LINE__ );
617 if ( NULL == replication_state.replication_memory ){
618 Logger::putLogWarn( LOG_CAT_L7VSD_SYSTEM_MEMORY, 1, "Replication memory is NULL.", __FILE__, __LINE__ );
623 size = DATA_SIZE * replication_state.total_block;
625 Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 5, "Can not get Replication memory", __FILE__, __LINE__ );
630 p = ( unsigned char* )replication_state.replication_memory;
633 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 11, "Replication Dump Start ----------------------------", __FILE__, __LINE__ );
634 buf = boost::io::str( boost::format( "Mode is [ %s ]." ) % replication_mode[( int )replication_state.service_status] );
635 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 12, buf, __FILE__, __LINE__ );
636 buf = boost::io::str( boost::format( "Total Block is [ %u ]" ) % replication_state.total_block );
637 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 13, buf, __FILE__, __LINE__ );
639 // Converts into the binary, and writes it to the file.
640 for ( h = 0; h < size / DATA_SIZE; h++ ){
641 buf = boost::io::str( boost::format( "Block Number [ %d ]" ) % h );
642 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 14, buf, __FILE__, __LINE__ );
644 for ( i = 0; i < DATA_SIZE / LOG_DATA_WIDTH; i++ ){
645 head = p + h * DATA_SIZE + i * LOG_DATA_WIDTH;
647 // have to cast char to int. because boost::format ignore char with appointed width.
648 buf = boost::io::str( boost::format( "%02hhX %02hhX %02hhX %02hhX %02hhX %02hhX %02hhX %02hhX "
649 "%02hhX %02hhX %02hhX %02hhX %02hhX %02hhX %02hhX %02hhX" )
650 % ( int )*head % ( int )*(head+1) % ( int )*(head+2) % ( int )*(head+3)
651 % ( int )*(head+4) % ( int )*(head+5) % ( int )*(head+6)% ( int )*(head+7)
652 % ( int )*(head+8) % ( int )*(head+9) % ( int )*(head+10) % ( int )*(head+11)
653 % ( int )*(head+12) % ( int )*(head+13) % ( int )*(head+14) % ( int )*(head+15) );
654 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 15, buf, __FILE__, __LINE__ );
657 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 16, "Replication Dump End ------------------------------", __FILE__, __LINE__ );
660 //! Chenge Status isActive
661 void replication::start(){
662 Logger logger( LOG_CAT_L7VSD_REPLICATION, 7, "replication::start", __FILE__, __LINE__ );
666 switch ( replication_state.service_status ){
667 case REPLICATION_MASTER_STOP:
668 replication_state.service_status = REPLICATION_MASTER;
671 boost::mutex::scoped_lock lock( replication_thread_mutex );
672 if ( replication_flag != EXIT ){
673 replication_flag = RUNNING;
675 replication_thread_condition.notify_all();
678 buf = boost::io::str( boost::format( "Replication start. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
679 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 17, buf, __FILE__, __LINE__ );
681 case REPLICATION_SLAVE_STOP:
682 replication_state.service_status = REPLICATION_SLAVE;
683 buf = boost::io::str( boost::format( "Replication start. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
684 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 18, buf, __FILE__, __LINE__ );
686 case REPLICATION_MASTER:
687 buf = boost::io::str( boost::format( "Could not MASTER start, because already start. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
688 Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 6, buf, __FILE__, __LINE__ );
690 case REPLICATION_SLAVE:
691 buf = boost::io::str( boost::format( "Could not SALVE start, because already start. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
692 Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 7, buf, __FILE__, __LINE__ );
695 buf = boost::io::str( boost::format( "Could not start, because mode is %s." ) % replication_mode[( int )replication_state.service_status] );
696 Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 8, buf, __FILE__, __LINE__ );
701 //! Chenge Status isStop
702 void replication::stop(){
703 Logger logger( LOG_CAT_L7VSD_REPLICATION, 8, "replication::stop", __FILE__, __LINE__ );
707 switch ( replication_state.service_status ){
708 case REPLICATION_MASTER:
711 boost::mutex::scoped_lock lock( replication_thread_mutex );
712 if ( replication_flag != EXIT ){
713 replication_flag = WAIT;
717 replication_state.service_status = REPLICATION_MASTER_STOP;
718 buf = boost::io::str( boost::format( "Replication stop. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
719 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 19, buf, __FILE__, __LINE__ );
721 case REPLICATION_SLAVE:
722 replication_state.service_status = REPLICATION_SLAVE_STOP;
723 buf = boost::io::str( boost::format( "Replication stop. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
724 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 20, buf, __FILE__, __LINE__ );
726 case REPLICATION_MASTER_STOP:
727 buf = boost::io::str( boost::format( "Could not MASTER stop, because already stop. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
728 Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 9, buf, __FILE__, __LINE__ );
730 case REPLICATION_SLAVE_STOP:
731 buf = boost::io::str( boost::format( "Could not SALVE stop, because already stop. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
732 Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 10, buf, __FILE__, __LINE__ );
735 buf = boost::io::str( boost::format( "Could not start, because mode is %s." ) % replication_mode[( int )replication_state.service_status] );
736 Logger::putLogWarn( LOG_CAT_L7VSD_REPLICATION, 11, buf, __FILE__, __LINE__ );
741 //! Compulsion reproduction execution
742 void replication::force_replicate(){
743 Logger logger( LOG_CAT_L7VSD_REPLICATION, 9, "replication::force_replicate", __FILE__, __LINE__ );
746 error_code interval_ret;
748 struct timespec time;
753 // Check by continuous initialize.
754 if ( REPLICATION_MASTER != replication_state.service_status && REPLICATION_MASTER_STOP != replication_state.service_status ){
755 // Initialization has already been done.
756 buf = boost::io::str( boost::format( "Could not compulsion replication. Mode is different. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
757 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 15, buf, __FILE__, __LINE__ );
759 } else if ( REPLICATION_MASTER_STOP == replication_state.service_status ){
760 buf = boost::io::str( boost::format( "Can not replication compulsorily, because mode is %s." ) % replication_mode[( int )replication_state.service_status] );
761 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 21, buf, __FILE__, __LINE__ );
765 // Replication memory is NULL
766 if ( NULL == replication_state.replication_memory ){
767 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 5, "Replication memory is NULL.", __FILE__, __LINE__ );
771 // Component memory is NULL
772 if ( NULL == replication_state.component_memory ){
773 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 6, "Component memory is NULL.", __FILE__, __LINE__ );
780 ms_time = param.get_int( PARAM_COMP_REPLICATION, "compulsorily_interval", interval_ret );
783 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 16, "Could not get interval value for replication compulsorily.", __FILE__, __LINE__ );
787 if ( ms_time < MIN_COMPULSORILRY_INTERVAL || MAX_COMPULSORILRY_INTERVAL < ms_time ){
788 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 17, "Invalid compulsorily interval.", __FILE__, __LINE__ );
794 boost::mutex::scoped_lock lock( replication_thread_mutex );
795 if ( replication_flag != EXIT ){
796 replication_flag = WAIT;
800 time.tv_nsec = ( long )( ms_time * 1000000 );
802 // set last send block is c maximum block
803 replication_state.last_send_block = replication_state.total_block-1;
805 for ( unsigned int i = 0; i < replication_state.total_block; i++ ){
806 // set compulsorily interval.
807 nanosleep( &time, NULL );
809 send_ret = send_data();
811 if ( 0 != send_ret ){
812 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 3, "Send data is Failed.", __FILE__, __LINE__ );
816 // set last send block number
817 if ( replication_state.last_send_block < replication_state.total_block-1 ){
818 replication_state.last_send_block += 1;
819 } else if ( replication_state.last_send_block == replication_state.total_block-1 ){
820 replication_state.last_send_block = 0;
822 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 18, "Last send block number is illegal.", __FILE__, __LINE__ );
825 buf = boost::io::str( boost::format( "Data sending succeeded. Send block number : %u Version : %llu" ) % replication_state.last_send_block % (unsigned long long)replication_state.surface_block_no );
826 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 22, buf, __FILE__, __LINE__ );
828 // surface block number is change
829 if ( replication_state.total_block == replication_state.last_send_block + 1 ){
830 // Synchronization is executed.
831 memcpy( replication_state.replication_memory, replication_state.component_memory, replication_state.total_block*DATA_SIZE );
834 replication_state.surface_block_no = (uint64_t)make_serial();
835 if ( 0 == replication_state.surface_block_no ){
836 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 19, "Could not get serial number.", __FILE__, __LINE__ );
842 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 23, "Replication compulsorily is success.", __FILE__, __LINE__ );
847 boost::mutex::scoped_lock lock( replication_thread_mutex );
848 if ( replication_flag != EXIT ){
849 replication_flag = RUNNING;
851 replication_thread_condition.notify_all();
855 //! Interval Re-setting
856 void replication::reset(){
857 Logger logger( LOG_CAT_L7VSD_REPLICATION, 10, "replication::reset", __FILE__, __LINE__ );
860 unsigned short value;
864 // Check Parameter exists
865 value = param.get_int( PARAM_COMP_REPLICATION, "interval", ret );
867 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 20, "Not change re-setting value.", __FILE__, __LINE__ );
872 if ( value < MIN_INTERVAL || MAX_INTERVAL < value ){
873 buf = boost::io::str( boost::format( "Invalid Interval value. value : %d" ) % ( int )value );
874 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 21, buf, __FILE__, __LINE__ );
878 replication_info.interval = value;
882 //! @return REPLICATION_MODE_TAG enumration
883 replication::REPLICATION_MODE_TAG replication::get_status(){
884 Logger logger( LOG_CAT_L7VSD_REPLICATION, 11, "replication::get_status", __FILE__, __LINE__ );
886 return replication_state.service_status;
890 int replication::handle_send(){
891 Logger logger( LOG_CAT_L7VSD_REPLICATION, 12, "replication::handle_send", __FILE__, __LINE__ );
895 std::map<std::string, mutex_ptr>::iterator itr;
897 // Check by continuous initialize.
898 if ( REPLICATION_MASTER != replication_state.service_status && REPLICATION_MASTER_STOP != replication_state.service_status ){
899 // Initialization has already been done.
900 buf = boost::io::str( boost::format( "Can not send_callback. Mode is different. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
901 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 22, buf, __FILE__, __LINE__ );
903 } else if ( REPLICATION_MASTER_STOP == replication_state.service_status ){
904 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 24, "Can not send Replication data, because mode is MASTER_STOP.", __FILE__, __LINE__ );
908 // Replication memory is NULL
909 if ( NULL == replication_state.replication_memory ){
910 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 7, "Replication memory is NULL.", __FILE__, __LINE__ );
914 // Component memory is NULL
915 if ( NULL == replication_state.component_memory ){
916 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 8, "Component memory is NULL.", __FILE__, __LINE__ );
920 send_ret = send_data();
921 if ( 0 != send_ret ){
922 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 4, "Send data is Failed.", __FILE__, __LINE__ );
926 // set last send block number
927 if ( replication_state.last_send_block < replication_state.total_block-1 ) {
928 replication_state.last_send_block += 1;
929 } else if ( replication_state.last_send_block == replication_state.total_block-1 ){
930 replication_state.last_send_block = 0;
932 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 23, "Last send block number is illegal.", __FILE__, __LINE__ );
936 // surface block number is change
937 if ( replication_state.total_block == replication_state.last_send_block + 1 ){
938 // Synchronization is executed.
939 memcpy( replication_state.replication_memory, replication_state.component_memory, replication_state.total_block*DATA_SIZE );
942 replication_state.surface_block_no = (uint64_t)make_serial();
943 if ( 0 == replication_state.surface_block_no ){
944 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 24, "Could not get serial number .", __FILE__, __LINE__ );
952 //! Callback function
953 void replication::handle_receive( const boost::system::error_code& err, size_t size ){
958 if ( boost::system::errc::operation_canceled != err.value() ){
959 Logger::putLogInfo( LOG_CAT_L7VSD_SYSTEM, 1, err.message(), __FILE__, __LINE__ );
964 // Check by continuous initialize.
965 if ( REPLICATION_SLAVE != replication_state.service_status && REPLICATION_SLAVE_STOP != replication_state.service_status ){
966 // Initialization has already been done.
967 buf = boost::io::str( boost::format( "Can not receive_callback. Mode is different. mode : %s" ) % replication_mode[( int )replication_state.service_status] );
968 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 25, buf, __FILE__, __LINE__ );
970 } else if ( REPLICATION_SLAVE_STOP == replication_state.service_status ){
971 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 25, "Can not receive Replication data, because mode is SLAVE_STOP.", __FILE__, __LINE__ );
975 // Replication memory is NULL
976 if ( NULL == replication_state.replication_memory ){
977 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 9, "Replication memory is NULL.", __FILE__, __LINE__ );
981 // Component memory is NULL
982 if ( NULL == replication_state.component_memory ){
983 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 10, "Component memory is NULL.", __FILE__, __LINE__ );
987 // Surface block array memory is NULL
988 if ( NULL == replication_state.surface_block_array_ptr){
989 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 11, "Surface block array pointer is NULL.", __FILE__, __LINE__ );
993 if ( size != sizeof ( struct replication_data_struct ) ){
994 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 5, "Failed in the reception processing of data because of illegal receive size.", __FILE__, __LINE__ );
998 recv_ret = recv_data();
999 if ( 0 != recv_ret ){
1000 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 6, "Failed in the reception processing of data because of illegal receive data.", __FILE__, __LINE__ );
1004 // set surface block
1005 replication_state.surface_block_array_ptr[replication_data.block_num] = replication_data.serial;
1007 // set last recv block number
1008 if ( replication_state.last_recv_block < replication_state.total_block-1 ){
1009 replication_state.last_recv_block += 1;
1010 } else if ( replication_state.last_recv_block == replication_state.total_block-1 ){
1011 replication_state.last_recv_block = 0;
1013 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 26, "Last receive block number is illegal.", __FILE__, __LINE__ );
1017 //std::cout << "slave " << bind_endpoint.address() << ":" << bind_endpoint.port() << "\n";
1018 // replication_receive_socket.async_receive( boost::asio::buffer( &replication_data, sizeof( struct replication_data_struct ) ),
1019 // boost::bind( &replication::handle_receive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
1020 replication_receive_socket.async_receive_from( boost::asio::buffer( &replication_data, sizeof( struct replication_data_struct ) ),
1022 boost::bind( &replication::handle_receive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
1025 //! Lock replication memory
1026 //! @param[in] component_id is the one to identify the component.
1027 //! @retval 0 Success
1028 //! @retval -1 Error
1029 int replication::lock( const std::string& inid ){
1030 Logger logger( LOG_CAT_L7VSD_REPLICATION, 13, "replication::lock", __FILE__, __LINE__ );
1032 std::map<std::string, mutex_ptr>::iterator itr;
1035 itr = replication_mutex.find( inid );
1036 if ( itr == replication_mutex.end() ){
1037 buf = boost::io::str( boost::format( "Could not find %s." ) % inid );
1038 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 27, buf, __FILE__, __LINE__ );
1042 itr->second->lock();
1047 //! Unlock replication memory
1048 //! @param[in] component_id is the one to identify the component.
1049 //! @retval 0 Success
1050 //! @retval -1 Error
1051 int replication::unlock( const std::string& inid ){
1052 Logger logger( LOG_CAT_L7VSD_REPLICATION, 14, "replication::unlock", __FILE__, __LINE__ );
1054 std::map<std::string, mutex_ptr>::iterator itr;
1057 itr = replication_mutex.find( inid );
1058 if ( itr == replication_mutex.end() ){
1059 buf = boost::io::str( boost::format( "Could not find %s." ) % inid );
1060 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 28, buf, __FILE__, __LINE__ );
1064 itr->second->try_lock();
1065 itr->second->unlock();
1070 //! Refer replication memory mutex
1071 //! @param[in] component_id is the one to identify the component.
1072 //! @param[out] shared_ptr of mutex
1073 //! @retval 0 Success
1074 //! @retval -1 Error
1075 int replication::refer_lock_mutex( const std::string& inid, mutex_ptr& outmutex ){
1076 Logger logger( LOG_CAT_L7VSD_REPLICATION, 15, "replication::refer_lock_mutex", __FILE__, __LINE__ );
1078 std::map<std::string, mutex_ptr>::iterator itr;
1081 itr = replication_mutex.find( inid );
1082 if ( itr == replication_mutex.end() ){
1083 buf = boost::io::str( boost::format( "Could not find %s." ) % inid );
1084 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 29, buf, __FILE__, __LINE__ );
1088 outmutex = itr->second;
1094 //! @retval 0 Success
1095 //! @retval -1 Error
1096 int replication::check_parameter(){
1102 //std::cout << "check1 " << replication_info.ip_addr << ":" << replication_info.service_name << "\n";
1103 // Whether IP and the port are effective is confirmed.
1105 // replication_endpoint = boost::asio::ip::udp::endpoint( boost::asio::ip::address::from_string( replication_info.ip_addr ), boost::lexical_cast<unsigned short>( replication_info.service_name ) );
1107 boost::asio::ip::udp::resolver udp_resolver( service_io );
1108 boost::asio::ip::udp::resolver::query udp_query( replication_info.ip_addr, replication_info.service_name );
1109 boost::asio::ip::udp::resolver::iterator itr = udp_resolver.resolve( udp_query );
1110 replication_endpoint = *itr;
1113 buf = boost::io::str( boost::format( "Failed to get IP or Service Name.(%s:%s)" ) % replication_info.ip_addr % replication_info.service_name );
1114 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_ENDPOINT, 1, buf, __FILE__, __LINE__ );
1117 //std::cout << "check2 " << replication_endpoint.address() << ":" << replication_endpoint.port() << "\n";
1119 // get ip address from nic
1121 struct sockaddr_in addr;
1123 //Networkdevice struct define
1125 memset( &ifr, 0, sizeof( struct ifreq ) );
1128 int fd = socket( AF_INET, SOCK_DGRAM, 0 );
1130 //get networkdevice struct for IPv4
1131 strncpy( ifr.ifr_name, replication_info.nic.c_str(), IFNAMSIZ-1 );
1132 ifr.ifr_addr.sa_family = AF_INET;
1134 if ( ioctl( fd, SIOCGIFADDR, &ifr ) >= 0 ){
1135 memcpy( &addr, &(ifr.ifr_addr), sizeof( struct sockaddr_in ) );
1141 bind_endpoint = boost::asio::ip::udp::endpoint( boost::asio::ip::address::from_string( inet_ntoa( addr.sin_addr ) ), boost::lexical_cast<unsigned short>( replication_info.service_name ) );
1144 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_ENDPOINT, 2, "You can not get IP address from nic.", __FILE__, __LINE__ );
1147 //std::cout << "check3 " << bind_endpoint.address() << ":" << bind_endpoint.port() << "\n";
1150 if ( ( MIN_INTERVAL>replication_info.interval ) || ( MAX_INTERVAL<replication_info.interval ) ){
1151 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 30, "Invalid Interval value", __FILE__, __LINE__ );
1154 // Components ID check
1155 // Components Size check
1156 if ( 0 == replication_info.component_num ){
1157 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 26, "Can not get component, because component is 0.", __FILE__, __LINE__ );
1161 for ( int i=0; i < replication_info.component_num; i++ ){
1162 sum += replication_info.component_info[i].block_size ;
1163 for ( int j=i+1; j<replication_info.component_num; j++ ){
1164 if ( replication_info.component_info[j].id == replication_info.component_info[i].id ){
1165 buf = boost::io::str( boost::format( "Component ID was repeated.(%s)" ) % replication_info.component_info[i].id );
1166 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 31, buf, __FILE__, __LINE__ );
1171 if ( sum > CMP_BLOCK_MAX ){
1172 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 32, "Total component size is too large.", __FILE__, __LINE__ );
1178 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 27, ( ( 0 == ret ) ? "Parameter Check OK." : "Parameter Check NG." ), __FILE__, __LINE__ );
1184 //! Get Replication Memory
1185 //! @return memory Replication memory
1186 //! @retval memory memory get Success
1187 //! @retval NULL Error
1188 void* replication::getrpl(){
1189 unsigned int total_block = replication_state.total_block;
1190 void* memory = NULL;
1192 // Get replication memory
1193 memory = malloc( total_block*DATA_SIZE );
1196 if ( ( void* )NULL == memory ){
1197 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 12, "Replication memory is Malloc Error.", __FILE__, __LINE__ );
1200 memset(memory,0,total_block*DATA_SIZE);
1205 //! Get Component Memory
1206 //! @return memory Component memory
1207 //! @retval memory memory get Success
1208 //! @retval NULL Error
1209 void* replication::getcmp(){
1210 unsigned int total_block = replication_state.total_block;
1211 void* memory = NULL ;
1213 // Get component memory
1214 memory = malloc( total_block*DATA_SIZE );
1217 if ( ( void* )NULL == memory ){
1218 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 13, "Component memory is Malloc Error.", __FILE__, __LINE__ );
1221 memset(memory,0,total_block*DATA_SIZE);
1226 //! Get Surface Number Memory
1227 //! @return memory Component memory
1228 //! @retval memory memory get Success
1229 //! @retval NULL Error
1230 uint64_t* replication::getsrf(){
1231 unsigned int total_block = replication_state.total_block;
1232 uint64_t* memory = NULL;
1235 memory = ( uint64_t* )malloc( total_block*sizeof(uint64_t) );
1238 if ( ( uint64_t* )NULL == memory ){
1239 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_MEMORY, 14, "Surface info address is Malloc Error.", __FILE__, __LINE__ );
1242 memset( memory, 0, total_block*sizeof(uint64_t) );
1247 //! Make serial number
1248 //! @return Serial number
1249 //! @retval nonzero Serial number
1251 unsigned long long replication::make_serial(){
1252 unsigned long long int serial_num;
1253 struct timespec current_time;
1255 // get time by clock_gettime
1256 if ( clock_gettime(CLOCK_REALTIME, ¤t_time) == -1 ){
1258 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 7, "You failed to get of time.", __FILE__, __LINE__ );
1261 // make a serial succeeds.
1262 serial_num = ( unsigned long long int )current_time.tv_sec * 1000000 + ( unsigned long long int ) current_time.tv_nsec / 1000;
1268 //! Send transfer data to standby server
1269 //! @param[in] data Points to input data from external program. This will be send to standby server.
1270 //! @retval 0 Success
1271 //! @retval -1 Error
1272 int replication::send_data(){
1276 // make replication data struct
1278 memset( &replication_data, 0, sizeof( struct replication_data_struct ) );
1279 // Set replication id
1280 replication_data.id = REPLICATION_ID;
1281 // set block_num (replication_state.last_send_block + 1) and Range check of memory
1282 if ( replication_state.last_send_block < replication_state.total_block - 1 ){
1283 replication_data.block_num = replication_state.last_send_block + 1;
1284 } else if ( replication_state.last_send_block == replication_state.total_block - 1 ){
1285 replication_data.block_num = 0;
1287 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 33, "Send block number is too large.", __FILE__, __LINE__ );
1292 replication_data.serial = replication_state.surface_block_no;
1293 if ( 1 == replication_data.serial && 0 == replication_data.block_num ){
1294 Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 28, "Serial number is 1, first send processing.", __FILE__, __LINE__ );
1297 // set data size (sizeof(replication_data))
1298 replication_data.size = sizeof( struct replication_data_struct );
1300 if ( replication_data.size > SEND_DATA_SIZE ){
1301 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 34, "Send block data size is too large.", __FILE__, __LINE__ );
1305 // set replication data (1 block)
1306 send_memory = ( char* )replication_state.replication_memory + DATA_SIZE*replication_data.block_num;
1307 memcpy( replication_data.data, send_memory, DATA_SIZE );
1311 send_byte = replication_send_socket.send_to( boost::asio::buffer( &replication_data, sizeof( struct replication_data_struct ) ), replication_endpoint );
1313 boost::system::error_code err;
1316 // Whether IP and the port are effective is confirmed.
1318 // replication_endpoint = boost::asio::ip::udp::endpoint( boost::asio::ip::address::from_string( replication_info.ip_addr ), boost::lexical_cast<unsigned short>( replication_info.service_name ) );
1320 boost::asio::ip::udp::resolver udp_resolver( service_io );
1321 boost::asio::ip::udp::resolver::query udp_query( replication_info.ip_addr, replication_info.service_name );
1322 boost::asio::ip::udp::resolver::iterator itr = udp_resolver.resolve( udp_query );
1323 replication_endpoint = *itr;
1326 buf = boost::io::str( boost::format( "Failed to get IP or Service Name.(%s:%s)" ) % replication_info.ip_addr % replication_info.service_name );
1327 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM_ENDPOINT, 3, buf, __FILE__, __LINE__ );
1331 //std::cout << "master " << replication_endpoint.address() << ":" << replication_endpoint.port() << "\n";
1332 replication_send_socket.connect( replication_endpoint, err );
1334 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 8, err.message(), __FILE__, __LINE__ );
1339 send_byte = replication_send_socket.send( boost::asio::buffer( &replication_data, sizeof( struct replication_data_struct ) ) );
1340 replication_send_socket.close();
1342 if ( sizeof( struct replication_data_struct ) != send_byte ){
1343 Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 9, "Data send error.", __FILE__, __LINE__ );
1350 //! Receive transfer data from active server
1351 //! @param[out] recv_data Points to output data from external program.
1352 //! @retval 0 Success
1353 //! @retval -1 Error
1354 int replication::recv_data(){
1356 std::map<std::string, mutex_ptr>::iterator itr;
1358 // Check replication ID
1359 if ( replication_data.id != REPLICATION_ID ){
1360 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 35, "Get invalid data.", __FILE__, __LINE__ );
1364 // block number is over
1365 if ( replication_data.block_num > replication_state.total_block ){
1366 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 36, "Recv block number is too large.", __FILE__, __LINE__ );
1370 // Comparison of serial numbers
1371 if ( replication_data.serial < replication_state.surface_block_array_ptr[replication_data.block_num] ){
1372 Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 37, "Recv replication data is too old.", __FILE__, __LINE__ );
1376 // Substitution of version
1377 replication_state.surface_block_array_ptr[replication_data.block_num] = replication_data.serial;
1380 recv_memory = ( char* )replication_state.replication_memory + DATA_SIZE * replication_data.block_num;
1383 memcpy( recv_memory, &replication_data.data, DATA_SIZE );
1385 // set surface block
1386 replication_state.surface_block_array_ptr[replication_data.block_num] = replication_data.serial;
1388 // Surface numbers are compared.
1389 for ( unsigned int i = 0; i < replication_state.total_block-1; i++ ){
1390 if ( replication_state.surface_block_array_ptr[i] != replication_state.surface_block_array_ptr[i+1] ){
1393 if ( i == replication_state.total_block-2 ){
1394 // Lock all compornent area
1395 for ( itr = replication_mutex.begin(); itr != replication_mutex.end(); itr++ ){
1396 itr->second->lock();
1399 // Synchronization is executed.
1400 memcpy(replication_state.component_memory, replication_state.replication_memory, replication_state.total_block*DATA_SIZE );
1402 // Unlock all compornent area
1403 for ( itr = replication_mutex.begin(); itr != replication_mutex.end(); itr++ ){
1404 itr->second->unlock();
1412 //! Release Replication Memory
1413 void replication::releaserpl(){
1414 if ( NULL != replication_state.replication_memory ){
1415 free(replication_state.replication_memory);
1417 replication_state.replication_memory = NULL;
1420 //! Release Components Memory
1421 void replication::releasecmp(){
1422 if ( NULL != replication_state.component_memory){
1423 free(replication_state.component_memory);
1425 replication_state.component_memory = NULL;
1428 //! Release Surface Memory
1429 void replication::releasesrf(){
1430 if ( NULL != replication_state.surface_block_array_ptr ){
1431 free(replication_state.surface_block_array_ptr);
1433 replication_state.surface_block_array_ptr=NULL;
1436 //! Replication thread
1437 void replication::send_thread(){
1439 REPLICATION_THREAD_TAG flag;
1441 boost::mutex::scoped_lock lock( replication_thread_mutex );
1442 flag = replication_flag;
1445 if ( flag == WAIT ){
1446 boost::mutex::scoped_lock lock( replication_thread_mutex );
1447 replication_thread_condition.wait( lock );
1448 } else if ( flag == EXIT ){
1451 if ( false == mode ){
1452 usleep( replication_info.interval );
1454 if ( -1 == handle_send() ){
1460 boost::mutex::scoped_lock lock( replication_thread_mutex );
1461 flag = replication_flag;
1466 //! io_service thread
1467 void replication::service_thread(){
1468 REPLICATION_THREAD_TAG flag;
1470 boost::mutex::scoped_lock lock( service_thread_mutex );
1471 flag = service_flag;
1474 if ( flag == WAIT || flag == WAIT_REQ ){
1475 boost::mutex::scoped_lock lock( service_thread_mutex );
1476 service_flag = WAIT;
1477 service_thread_condition.wait( lock );
1478 } else if ( flag == EXIT ){
1484 boost::mutex::scoped_lock lock( service_thread_mutex );
1485 flag = service_flag;