// Status Set to Slave
if ( 0 != set_slave() ){
-// replication_receive_socket.close();
-
// free memory
releasesrf();
releaserpl();
}
replication_thread_ptr = thread_ptr( new boost::thread( boost::bind ( &replication::send_thread, this ) ) );
+ {
+ boost::mutex::scoped_lock lock( service_thread_mutex );
+
+ service_flag = RUNNING;
+ }
+ service_thread_ptr = thread_ptr( new boost::thread( boost::bind ( &replication::service_thread, this ) ) );
+
buf = boost::io::str( boost::format( "Initialized in %s mode." ) % replication_mode[(int)replication_state.service_status] );
Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 1, buf, __FILE__, __LINE__ );
replication_flag = EXIT;
replication_thread_condition.notify_all();
}
- if ( replication_thread_ptr )
- {
+ if ( replication_thread_ptr ){
replication_thread_ptr->join();
- replication_thread_ptr->detach();
}
// Socket finalaize
-// if ( REPLICATION_SLAVE == replication_state.service_status || REPLICATION_SLAVE_STOP == replication_state.service_status ){
-//std::cout << "cancel1\n";
-// replication_receive_socket.cancel();
-// }
- replication_receive_socket.close();
- replication_send_socket.close();
+ if ( replication_send_socket.is_open() ){
+ replication_send_socket.close();
+ }
+ if ( replication_receive_socket.is_open() ){
+ replication_receive_socket.close();
+ }
+
+ {
+ boost::mutex::scoped_lock lock( service_thread_mutex );
+
+ service_flag = EXIT;
+ service_thread_condition.notify_all();
+ }
+ if ( service_thread_ptr ){
+ service_thread_ptr->join();
+ }
+
// Release replication memory
releaserpl();
// Release component memory
std::string buf;
int ret;
- std::map<std::string, mutex_ptr>::iterator itr;
switch (replication_state.service_status){
case REPLICATION_SLAVE:
buf = boost::io::str( boost::format( "Switch to master NG. mode : %s" ) % replication_mode[(int)replication_state.service_status] );
Logger::putLogError( LOG_CAT_L7VSD_REPLICATION, 1, buf, __FILE__, __LINE__ );
}else{
- // Lock all compornent area
- for ( itr = replication_mutex.begin(); itr != replication_mutex.end(); itr++ ){
- itr->second->lock();
- }
-
- // Copy from component area to replication area.
+ // Copy from compornent area to replication area.
memcpy(replication_state.replication_memory, replication_state.component_memory, replication_state.total_block*DATA_SIZE);
- // Unlock all compornent area
- for ( itr = replication_mutex.begin(); itr != replication_mutex.end(); itr++ ){
- itr->second->unlock();
- }
-
if ( REPLICATION_SLAVE == replication_state.service_status ){
// Set mode.
replication_state.service_status = REPLICATION_MASTER;
boost::system::error_code err;
// close socket
-// replication_receive_socket.cancel();
- replication_receive_socket.close();
+ if ( replication_send_socket.is_open() ){
+ replication_send_socket.close();
+ }
+ if ( replication_receive_socket.is_open() ){
+ replication_receive_socket.cancel();
+ replication_receive_socket.close();
+ }
+
+ {
+ boost::mutex::scoped_lock lock( service_thread_mutex );
+
+ if ( service_flag == RUNNING ){
+ service_flag = WAIT_REQ;
+ service_io.stop();
+ }
+ }
+
+ while ( service_flag == WAIT_REQ ){
+ usleep( 1 );
+ }
+
+ {
+ boost::mutex::scoped_lock lock( service_thread_mutex );
+
+ if ( service_flag != EXIT ){
+ service_io.reset();
+ service_flag = RUNNING;
+
+ service_thread_condition.notify_all();
+ }
+ }
// make send socket
// replication_send_socket.connect( replication_endpoint, err );
boost::system::error_code err;
// close socket
- replication_send_socket.close();
- replication_receive_socket.close();
+ if ( replication_send_socket.is_open() ){
+ replication_send_socket.close();
+ }
+ if ( replication_receive_socket.is_open() ){
+ replication_receive_socket.cancel();
+ replication_receive_socket.close();
+ }
+
+ {
+ boost::mutex::scoped_lock lock( service_thread_mutex );
+
+ if ( service_flag != EXIT ){
+ service_flag = WAIT_REQ;
+ service_io.stop();
+ }
+ }
+ while ( service_flag == WAIT_REQ ){
+ usleep( 1 );
+ }
// make receive socket
-std::cout << "slave " << replication_endpoint.address() << ":" << replication_endpoint.port() << "\n";
+//std::cout << "slave " << replication_endpoint.address() << ":" << replication_endpoint.port() << "\n";
+
replication_receive_socket.open( replication_endpoint.protocol(), err );
if ( err ){
Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 1, err.message(), __FILE__, __LINE__ );
// return -1;
- }
- replication_receive_socket.bind( replication_endpoint, err );
- if ( err ){
- Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 1, err.message(), __FILE__, __LINE__ );
-// return -1;
+ } else {
+ replication_receive_socket.bind( replication_endpoint, err );
+ if ( err ){
+ Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 1, err.message(), __FILE__, __LINE__ );
+// return -1;
+ }
}
-std::cout << "slave " << replication_endpoint.address() << ":" << replication_endpoint.port() << "\n";
-// replication_receive_socket.async_receive( boost::asio::buffer( &replication_data, sizeof( struct replication_data_struct ) ),
-// boost::bind( &replication::handle_receive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
+//std::cout << "slave " << replication_endpoint.address() << ":" << replication_endpoint.port() << "\n";
replication_receive_socket.async_receive_from( boost::asio::buffer( &replication_data, sizeof( struct replication_data_struct ) ),
replication_endpoint,
boost::bind( &replication::handle_receive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
+ {
+ boost::mutex::scoped_lock lock( service_thread_mutex );
+
+ if ( service_flag != EXIT ){
+ service_io.reset();
+ service_flag = RUNNING;
+
+ service_thread_condition.notify_all();
+ }
+ }
Logger::putLogInfo( LOG_CAT_L7VSD_REPLICATION, 1, "Initialization of receive socket is success.", __FILE__, __LINE__ );
return 0;
Parameter param;
std::string buf;
- std::map<std::string, mutex_ptr>::iterator itr;
// Check by continuous initialize.
if ( REPLICATION_MASTER != replication_state.service_status && REPLICATION_MASTER_STOP != replication_state.service_status ){
// surface block number is change
if(replication_state.total_block == replication_state.last_send_block + 1 ){
- // Lock all compornent area
- for ( itr = replication_mutex.begin(); itr != replication_mutex.end(); itr++ ){
- itr->second->lock();
- }
-
// Synchronization is executed.
memcpy( replication_state.replication_memory, replication_state.component_memory, replication_state.total_block*DATA_SIZE );
- // Unlock all compornent area
- for ( itr = replication_mutex.begin(); itr != replication_mutex.end(); itr++ ){
- itr->second->unlock();
- }
-
// make new serial
replication_state.surface_block_no = (uint64_t)make_serial();
if ( 0 == replication_state.surface_block_no ){
// surface block number is change
if(replication_state.total_block == replication_state.last_send_block + 1 ){
- // Lock all compornent area
- for ( itr = replication_mutex.begin(); itr != replication_mutex.end(); itr++ ){
- itr->second->lock();
- }
-
// Synchronization is executed.
memcpy( replication_state.replication_memory, replication_state.component_memory, replication_state.total_block*DATA_SIZE );
- // Unlock all compornent area
- for ( itr = replication_mutex.begin(); itr != replication_mutex.end(); itr++ ){
- itr->second->unlock();
- }
-
// make new serial
replication_state.surface_block_no = (uint64_t)make_serial();
if ( 0 == replication_state.surface_block_no ){
//! Callback function
void replication::handle_receive( const boost::system::error_code& err, size_t size ){
- Logger logger( LOG_CAT_L7VSD_REPLICATION, 1, "replication::handle_receive", __FILE__, __LINE__ );
+// Logger logger( LOG_CAT_L7VSD_REPLICATION, 1, "replication::handle_receive", __FILE__, __LINE__ );
int recv_ret;
std::string buf;
-std::cout << "receive1\n";
if ( err ){
if ( boost::system::errc::operation_canceled != err.value() ){
Logger::putLogInfo( LOG_CAT_L7VSD_SYSTEM, 1, err.message(), __FILE__, __LINE__ );
return;
}
-std::cout << "slave " << replication_endpoint.address() << ":" << replication_endpoint.port() << "\n";
+//std::cout << "slave " << replication_endpoint.address() << ":" << replication_endpoint.port() << "\n";
// replication_receive_socket.async_receive( boost::asio::buffer( &replication_data, sizeof( struct replication_data_struct ) ),
// boost::bind( &replication::handle_receive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred ) );
replication_receive_socket.async_receive_from( boost::asio::buffer( &replication_data, sizeof( struct replication_data_struct ) ),
size_t sum=0;
std::string buf;
- // set address hints
+ // set address
replication_endpoint = boost::asio::ip::udp::endpoint( boost::asio::ip::address::from_string( replication_info.ip_addr ), boost::lexical_cast<unsigned short>( replication_info.service_name ) );
// Interval check
send_memory = (char *)replication_state.replication_memory + DATA_SIZE*replication_data.block_num;
memcpy( replication_data.data, send_memory, DATA_SIZE );
- // make send socket
#if 0
// send to data
send_byte = replication_send_socket.send_to( boost::asio::buffer( &replication_data, sizeof( struct replication_data_struct ) ), replication_endpoint );
#else
boost::system::error_code err;
+ // make send socket
+//std::cout << "master " << replication_endpoint.address() << ":" << replication_endpoint.port() << "\n";
+ replication_endpoint = boost::asio::ip::udp::endpoint( boost::asio::ip::address::from_string( replication_info.ip_addr ), boost::lexical_cast<unsigned short>( replication_info.service_name ) );
replication_send_socket.connect( replication_endpoint, err );
if ( err ){
Logger::putLogError( LOG_CAT_L7VSD_SYSTEM, 1, err.message(), __FILE__, __LINE__ );
//! @retval -1 Error
int replication::recv_data(){
char *recv_memory;
+ std::map<std::string, mutex_ptr>::iterator itr;
// Check replication ID
if ( replication_data.id != REPLICATION_ID ){
break;
}
if ( i == replication_state.total_block-2 ){
+ // Lock all compornent area
+ for ( itr = replication_mutex.begin(); itr != replication_mutex.end(); itr++ ){
+ itr->second->lock();
+ }
+
// Synchronization is executed.
memcpy(replication_state.component_memory, replication_state.replication_memory, replication_state.total_block*DATA_SIZE );
+
+ // Unlock all compornent area
+ for ( itr = replication_mutex.begin(); itr != replication_mutex.end(); itr++ ){
+ itr->second->unlock();
+ }
}
}
}
}
+//! io_service thread
+void replication::service_thread(){
+ REPLICATION_THREAD_TAG flag;
+ {
+ boost::mutex::scoped_lock lock( service_thread_mutex );
+ flag = service_flag;
+ }
+ for ( ; ; ){
+ if ( flag == WAIT || flag == WAIT_REQ ){
+ boost::mutex::scoped_lock lock( service_thread_mutex );
+ service_flag = WAIT;
+ service_thread_condition.wait( lock );
+ } else if ( flag == EXIT ){
+ break;
+ } else {
+ service_io.poll();
+ }
+ {
+ boost::mutex::scoped_lock lock( service_thread_mutex );
+ flag = service_flag;
+ }
+ }
+}
+
} //namespace l7vs
{
public:
//! constractor
- replication_fake( boost::asio::io_service& inreceive_io ) : replication( inreceive_io ) {}
+ replication_fake() : replication() {}
//! destractor
~replication_fake(){}
}
int set_slave_wrapper(){
+
+ {
+ boost::mutex::scoped_lock lock( replication_thread_mutex );
+ if ( replication_flag != EXIT ){
+ replication_flag = WAIT;
+ }
+ }
+
return set_slave();
}
releasesrf();
return replication_state.surface_block_array_ptr;
}
+
+ void disable_send_thread(){
+ {
+ boost::mutex::scoped_lock lock( replication_thread_mutex );
+
+ replication_flag = EXIT;
+ replication_thread_condition.notify_all();
+ }
+ if ( replication_thread_ptr ){
+ replication_thread_ptr->join();
+ }
+ }
};
} //namespace l7vs
"REPLICATION_SLAVE_STOP"
};
-bool locked_end = false;
+volatile bool locked_end = false;
-bool receiver_end = false;
-bool io_end = false;
+volatile bool receiver_end = false;
boost::asio::io_service global_receive_io;
boost::asio::io_service global_send_io;
-void io_thread(){
- while( false == io_end ){
- global_receive_io.run();
- global_send_io.run();
- }
-}
-
-boost::thread global_thread_item( boost::bind ( &io_thread ) );
-
void receiver_thread(){
boost::asio::ip::udp::endpoint udp_endpoint( boost::asio::ip::address::from_string( "10.144.169.86" ), 40000 );
boost::asio::ip::udp::socket receiver_socket( global_receive_io, udp_endpoint );
goto END;
}
-//std::cout << "receiver_thread 5-1\n";
+//std::cout << "receiver_thread\n";
//std::cout << replication_data.id << "\n";
//std::cout << replication_data.serial << "\n";
//std::cout << replication_data.block_num << "\n";
receiver_socket.close();
receiver_end = true;
-};
+}
void sender_thread(){
- l7vs::replication repli2(global_send_io);
+ l7vs::replication repli2;
BOOST_CHECK_EQUAL( repli2.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli2.initialize(), 0 );
}
void sender2_thread(){
-std::cout << "sender2 in\n";
boost::asio::ip::udp::endpoint udp_endpoint( boost::asio::ip::address::from_string( "10.144.169.86" ), 40000 );
// boost::asio::ip::udp::socket sender_socket( global_send_io, udp_endpoint );
boost::asio::ip::udp::socket sender_socket( global_send_io );
sender_socket.close();
return;
}
+ global_send_io.run();
usleep( 10000 );
replication_data.block_num = 1;
sender_socket.close();
return;
}
+ global_send_io.run();
usleep( 10000 );
replication_data.block_num = 2;
sender_socket.close();
return;
}
+ global_send_io.run();
sleep(1);
sender_socket.close();
//test case1.
void replication_initialize_test(){
// int loop;
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
// unit_test[1] コンストラクタのテスト
BOOST_MESSAGE( boost::format( "unit_test[%d]" ) % ++count );
- l7vs::replication repli1(io);
+ l7vs::replication repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
//test case2.
void replication_switch_to_master_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication repli1(io);
+ l7vs::replication repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli1.initialize(), 0 );
//test case3.
void replication_switch_to_slave_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication repli1(io);
+ l7vs::replication repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli1.initialize(), 0 );
//test case4.
void replication_pay_memory_test(){
- boost::asio::io_service io;
unsigned int size;
void* ptr;
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication repli1(io);
+ l7vs::replication repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli1.initialize(), 0 );
//test case5.
void replication_dump_memory_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication repli1(io);
+ l7vs::replication repli1;
get_int_table[1] = 1; // "cmponent_size_00"
get_int_table[2] = 1; // "cmponent_size_01"
//test case6.
void replication_start_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication repli1(io);
+ l7vs::replication repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli1.initialize(), 0 );
//test case7.
void replication_stop_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication repli1(io);
+ l7vs::replication repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli1.initialize(), 0 );
//test case8.
void replication_force_replicate_test(){
-// boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication repli1(global_send_io);
+ l7vs::replication_fake repli1;
get_int_table[1] = 1; // "cmponent_size_00"
get_int_table[2] = 1; // "cmponent_size_01"
BOOST_CHECK_EQUAL( repli1.initialize(), 0 );
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_SLAVE );
+ repli1.disable_send_thread();
unsigned int size;
void* ptr;
repli1.finalize();
- while( !receiver_end );
+ while( !receiver_end ){
+ global_receive_io.poll();
+ }
thread_item1.join();
//test case9.
void replication_reset_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication repli1(io);
+ l7vs::replication repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli1.initialize(), 0 );
//test case10.
void replication_get_status_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication repli1(io);
+ l7vs::replication repli1;
// unit_test[106] get_statusのテスト(未初期化)
BOOST_MESSAGE( boost::format( "unit_test[%d]" ) % ++count );
//test case11.
void replication_lock_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication_fake repli1(io);
+ l7vs::replication_fake repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli1.initialize(), 0 );
repli1.unlock( "virtualservice" );
thread_item1.join();
- thread_item1.detach();
BOOST_CHECK_EQUAL( locked_end, true );
BOOST_CHECK_EQUAL( repli1.locked( "virtualservice" ), true );
repli1.unlock( "chash" );
thread_item2.join();
- thread_item2.detach();
BOOST_CHECK_EQUAL( locked_end, true );
BOOST_CHECK_EQUAL( repli1.locked( "chash" ), true );
repli1.unlock( "sslid" );
thread_item3.join();
- thread_item3.detach();
BOOST_CHECK_EQUAL( locked_end, true );
BOOST_CHECK_EQUAL( repli1.locked( "sslid" ), true );
//test case12.
void replication_unlock_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication_fake repli1(io);
+ l7vs::replication_fake repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli1.initialize(), 0 );
//test case13.
void replication_refer_lock_mutex_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication_fake repli1(io);
+ l7vs::replication_fake repli1;
l7vs::replication::mutex_ptr mutex1, mutex2, mutex3, mutex4;
//test case14.
void replication_handle_send_test(){
-// boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication repli1(global_send_io);
+ l7vs::replication repli1;
get_int_table[1] = 1; // "cmponent_size_00"
get_int_table[2] = 1; // "cmponent_size_01"
BOOST_CHECK_EQUAL( repli1.initialize(), 0 );
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_SLAVE );
- receiver_end = false;
- boost::thread thread_item1( boost::bind ( &receiver_thread ) );
-
unsigned int size;
void* ptr;
BOOST_MESSAGE( boost::format( "unit_test[%d]" ) % ++count );
repli1.switch_to_master();
+ receiver_end = false;
+ boost::thread thread_item1( boost::bind ( &receiver_thread ) );
+
// unit_test[141] send_thread&handle_sendのテスト(MASTER時)
BOOST_MESSAGE( boost::format( "unit_test[%d]" ) % ++count );
repli1.start();
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_MASTER );
while( !receiver_end );
+ repli1.stop();
thread_item1.join();
BOOST_CHECK( 1 );
//test case15.
void replication_set_master_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication_fake repli1(io);
+ l7vs::replication_fake repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli1.initialize(), 0 );
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_SLAVE );
+ repli1.disable_send_thread();
// unit_test[142] set_masterのテスト(SLAVE時)
BOOST_MESSAGE( boost::format( "unit_test[%d]" ) % ++count );
//test case16.
void replication_set_slave_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication_fake repli1(io);
+ l7vs::replication_fake repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli1.initialize(), 0 );
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_SLAVE );
+ repli1.disable_send_thread();
repli1.switch_to_master();
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_MASTER );
//test case17.
void replication_check_parameter_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication_fake repli1(io);
+ l7vs::replication_fake repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli1.initialize(), 0 );
//test case18.
void replication_getrpl_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication_fake repli1(io);
+ l7vs::replication_fake repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli1.initialize(), 0 ); // param読み込みのため1回実行
BOOST_MESSAGE( boost::format( "unit_test[%d]" ) % ++count );
BOOST_CHECK( NULL != repli1.getrpl_wrapper() ); // 異常系はinitializeにて評価済み
+ repli1.releaserpl_wrapper();
repli1.finalize();
}
//test case19.
void replication_getcmp_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication_fake repli1(io);
+ l7vs::replication_fake repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli1.initialize(), 0 ); // param読み込みのため1回実行
BOOST_MESSAGE( boost::format( "unit_test[%d]" ) % ++count );
BOOST_CHECK( NULL != repli1.getcmp_wrapper() ); // 異常系はinitializeにて評価済み
+ repli1.releasecmp_wrapper();
repli1.finalize();
}
//test case20.
void replication_getsrf_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication_fake repli1(io);
+ l7vs::replication_fake repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli1.initialize(), 0 ); // param読み込みのため1回実行
BOOST_MESSAGE( boost::format( "unit_test[%d]" ) % ++count );
BOOST_CHECK( NULL != repli1.getsrf_wrapper() ); // 異常系はinitializeにて評価済み
+ repli1.releasesrf_wrapper();
repli1.finalize();
}
//test case21.
void replication_make_serial_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication_fake repli1(io);
+ l7vs::replication_fake repli1;
unsigned long long value1, value2;
// unit_test[150] make_serialのテスト(正常系)
//test case22.
void replication_releaserpl_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication_fake repli1(io);
+ l7vs::replication_fake repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli1.initialize(), 0 ); // param読み込みのため1回実行
//test case23.
void replication_releasecmp_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication_fake repli1(io);
+ l7vs::replication_fake repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli1.initialize(), 0 ); // param読み込みのため1回実行
BOOST_CHECK( NULL != repli1.getcmp_wrapper() );
BOOST_CHECK( NULL == repli1.releasecmp_wrapper() );
+
repli1.finalize();
}
//test case24.
void replication_releasesrf_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication_fake repli1(io);
+ l7vs::replication_fake repli1;
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
BOOST_CHECK_EQUAL( repli1.initialize(), 0 ); // param読み込みのため1回実行
//test case25.
void replication_finalize_test(){
- boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication repli1(io);
+ l7vs::replication repli1;
// unit_test[155] finalizeのテスト(未初期化)
BOOST_MESSAGE( boost::format( "unit_test[%d]" ) % ++count );
//test case26.
void replication_handle_receive_test(){
-// boost::asio::io_service io;
-
get_string_stubmode = 0;
get_int_stubmode = 0;
get_string_table[0] = "10.144.169.86"; // "ip_addr"
get_int_table[3] = 200; // "cmponent_size_02"
get_int_table[4] = 10; // "compulsorily_interval"
- l7vs::replication repli1(global_receive_io);
+ l7vs::replication repli1;
get_int_table[1] = 1; // "cmponent_size_00"
get_int_table[2] = 1; // "cmponent_size_01"
get_int_table[3] = 1; // "cmponent_size_02"
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_OUT );
+
BOOST_CHECK_EQUAL( repli1.initialize(), 0 );
BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_SLAVE );
// unit_test[161] handle_receiveのテスト(正常系)
BOOST_MESSAGE( boost::format( "unit_test[%d]" ) % ++count );
-// repli1.dump_memory();
-// BOOST_CHECK( 1 );
+ {
+ boost::thread thread_item( boost::bind ( &sender2_thread ) );
+
+ thread_item.join();
+ }
+
+ repli1.dump_memory();
+ BOOST_CHECK( 1 );
+
+ // unit_test[162] handle_receiveのテスト(MASTER → SLAVE時)
+ BOOST_MESSAGE( boost::format( "unit_test[%d]" ) % ++count );
+
+ unsigned int size;
+ void* ptr;
+
+ ptr = repli1.pay_memory( "virtualservice", size );
+ BOOST_CHECK( NULL != ptr );
+ BOOST_CHECK_EQUAL( repli1.lock( "virtualservice" ), 0 );
+ memset( ptr, '1', size * DATA_SIZE );
+ repli1.unlock( "virtualservice" );
+
+ ptr = repli1.pay_memory( "chash", size );
+ BOOST_CHECK( NULL != ptr );
+ BOOST_CHECK_EQUAL( repli1.lock( "chash" ), 0 );
+ memset( ptr, '2', size * DATA_SIZE );
+ repli1.unlock( "chash" );
+
+ ptr = repli1.pay_memory( "sslid", size );
+ BOOST_CHECK( NULL != ptr );
+ BOOST_CHECK_EQUAL( repli1.lock( "sslid" ), 0 );
+ memset( ptr, '3', size * DATA_SIZE );
+ repli1.unlock( "sslid" );
- boost::thread thread_item( boost::bind ( &sender2_thread ) );
-//receiver_end = false;
-//boost::thread thread_item3( boost::bind ( &receiver_thread ) );
+ repli1.switch_to_master();
+ BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_MASTER );
- thread_item.join();
-//thread_item3.join();
+ {
+ receiver_end = false;
+ boost::thread thread_item( boost::bind ( &receiver_thread ) );
+
+ while( !receiver_end ){
+ global_receive_io.poll();
+ }
+
+ thread_item.join();
+ }
+
+ repli1.switch_to_slave();
+ BOOST_CHECK_EQUAL( repli1.get_status(), l7vs::replication::REPLICATION_SLAVE );
+
+ {
+ boost::thread thread_item( boost::bind ( &sender2_thread ) );
+
+ thread_item.join();
+ }
repli1.dump_memory();
BOOST_CHECK( 1 );
/*
- // unit_test[162] handle_receiveのテスト(replication対replication)
+ // unit_test[163] handle_receiveのテスト(replication対replication)
BOOST_MESSAGE( boost::format( "unit_test[%d]" ) % ++count );
boost::thread thread_item2( boost::bind ( &sender_thread ) );
sleep(1);
thread_item2.join();
-std::cout << "sender2 end\n";
repli1.dump_memory();
BOOST_CHECK( 1 );
repli1.finalize();
}
-void replication_finish(){
- io_end = true;
- global_thread_item.join();
-}
-
test_suite* init_unit_test_suite( int argc, char* argv[] ){
l7vs::Logger logger;
l7vs::Parameter parameter;
test_suite* ts = BOOST_TEST_SUITE( "replication_test" );
// add test case to test suite
-///*
ts->add( BOOST_TEST_CASE( &replication_initialize_test ) );
ts->add( BOOST_TEST_CASE( &replication_switch_to_master_test ) );
ts->add( BOOST_TEST_CASE( &replication_switch_to_slave_test ) );
ts->add( BOOST_TEST_CASE( &replication_dump_memory_test ) );
ts->add( BOOST_TEST_CASE( &replication_start_test ) );
ts->add( BOOST_TEST_CASE( &replication_stop_test ) );
+
ts->add( BOOST_TEST_CASE( &replication_force_replicate_test ) );
ts->add( BOOST_TEST_CASE( &replication_reset_test ) );
ts->add( BOOST_TEST_CASE( &replication_releasecmp_test ) );
ts->add( BOOST_TEST_CASE( &replication_releasesrf_test ) );
ts->add( BOOST_TEST_CASE( &replication_finalize_test ) );
-//*/
ts->add( BOOST_TEST_CASE( &replication_handle_receive_test ) );
-
-
- ts->add( BOOST_TEST_CASE( &replication_finish ) );
-
framework::master_test_suite().add( ts );
return 0;