diff --git a/dap_chain_net.c b/dap_chain_net.c index 3b28dceff0b5b39afb03d967069ca52721ebf7a8..8eb3c7a1c9fb55058dac93f235a3aef85c0dd992 100644 --- a/dap_chain_net.c +++ b/dap_chain_net.c @@ -601,20 +601,25 @@ lb_proc_state: * @param a_cfg Network1 configuration * @return */ -static void * s_net_proc_thread ( void * a_net) +static void *s_net_proc_thread ( void *a_net ) { - dap_chain_net_t * l_net = (dap_chain_net_t *) a_net; - bool is_looping = true ; - while( is_looping ) { - s_net_states_proc(l_net); - int l_timeout_ms = 20000;// 20 sec -#ifndef _WIN32 - pthread_mutex_lock( &PVT(l_net)->state_mutex ); + dap_chain_net_t *l_net = (dap_chain_net_t *)a_net; + dap_chain_net_pvt_t *p_net = (dap_chain_net_pvt_t *)(void *)l_net->pvt; + + const uint64_t l_timeout_ms = 20000;// 20 sec + + while( !(p_net->flags & F_DAP_CHAIN_NET_SHUTDOWN) ) { + + s_net_states_proc( l_net ); + #ifndef _WIN32 + pthread_mutex_lock( &p_net->state_mutex ); // prepare for signal waiting + struct timespec l_to; - clock_gettime(CLOCK_MONOTONIC, &l_to); + clock_gettime( CLOCK_MONOTONIC, &l_to ); int64_t l_nsec_new = l_to.tv_nsec + l_timeout_ms * 1000000ll; + // if the new number of nanoseconds is more than a second if(l_nsec_new > (long) 1e9) { l_to.tv_sec += l_nsec_new / (long) 1e9; @@ -622,42 +627,20 @@ static void * s_net_proc_thread ( void * a_net) } else l_to.tv_nsec = (long) l_nsec_new; + // signal waiting - int l_ret = pthread_cond_timedwait(&PVT(l_net)->state_proc_cond, &PVT(l_net)->state_mutex, &l_to); + pthread_cond_timedwait( &p_net->state_proc_cond, &p_net->state_mutex, &l_to ); + //pthread_cond_wait(&PVT(l_net)->state_proc_cond,&PVT(l_net)->state_mutex); - pthread_mutex_unlock( &PVT(l_net)->state_mutex ); + pthread_mutex_unlock( &p_net->state_mutex ); #else // WIN32 WaitForSingleObject( p_net->state_proc_cond, (uint32_t)l_timeout_ms ); - #endif - // only check connection - if(l_ret==ETIMEDOUT){ - // send ping -/* dap_stream_ch_chain_net_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_PING, - l_ch_chain_net_pkt->hdr.net_id, NULL, 0); - - if(0 == dap_stream_ch_chain_pkt_write(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, - l_net->pub.id, l_chain_id_null, l_chain_cell_id_null, &l_sync_request, - sizeof(l_sync_request))) { - dap_chain_node_cli_set_reply_text(a_str_reply, "Error: Cant send sync chains request"); - // clean client struct - dap_chain_node_client_close(l_node_client); - DAP_DELETE(l_remote_node_info); - return -1; - } - dap_stream_ch_set_ready_to_write(l_ch_chain, true); - // wait pong - timeout_ms = 120000; // 20 min = 1200 sec = 1 200 000 ms - res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_PONG, timeout_ms); - if(res) { - PVT(l_net)->state = NET_STATE_OFFLINE; - }*/ - - } - log_it( L_DEBUG, "Waked up net proHASH_COUNT( c thread, cond_wait=%d", l_ret); - + #endif + log_it( L_DEBUG, "Waked up s_net_proc_thread( )" ); } + return NULL; } diff --git a/dap_chain_node_cli.c b/dap_chain_node_cli.c index 4338a7f5ac619231b97f9bab6d0e95fb9a1084ae..393ccc3feccef1c21e6ede071648cb4f7dbd9885 100644 --- a/dap_chain_node_cli.c +++ b/dap_chain_node_cli.c @@ -882,7 +882,7 @@ int dap_chain_node_cli_init(dap_config_t * g_config) #else - Sleep( 3000 ); +// Sleep( 3000 ); if( pthread_create(&threadId, NULL, thread_pipe_func, (void*) (intptr_t) sockfd) != 0 ) { closesocket( sockfd ); diff --git a/dap_chain_node_cli_cmd.c b/dap_chain_node_cli_cmd.c index a7fadbfc08e7a1006527c8f9f8f9043b261bdb3d..8acba884d972812849006577260f880f7d35e07b 100644 --- a/dap_chain_node_cli_cmd.c +++ b/dap_chain_node_cli_cmd.c @@ -440,30 +440,95 @@ static int link_add_or_del_with_reply(dap_chain_net_t * a_net, dap_chain_node_in * str_reply[out] for reply * return 0 Ok, -1 error */ -static int node_info_dump_with_reply(dap_chain_net_t * a_net, dap_chain_node_addr_t * a_addr, const char *a_alias, char **a_str_reply) +static int node_info_dump_with_reply(dap_chain_net_t * a_net, dap_chain_node_addr_t * a_addr, bool a_is_full, const char *a_alias, char **a_str_reply) { int l_ret = 0; - dap_string_t *l_string_reply = dap_string_new("Node dump:\n"); + dap_string_t *l_string_reply = dap_string_new("Node dump:"); - if( (a_addr && a_addr->uint64 ) || a_alias) { + if((a_addr && a_addr->uint64) || a_alias) { dap_chain_node_addr_t *l_addr; - if (a_addr && a_addr->uint64){ + if(a_addr && a_addr->uint64) { l_addr = DAP_NEW(dap_chain_node_addr_t); l_addr->uint64 = a_addr->uint64; - } else if (a_alias) { - l_addr = dap_chain_node_alias_find(a_net,a_alias); + } else if(a_alias) { + l_addr = dap_chain_node_alias_find(a_net, a_alias); } - if ( l_addr ) { + if(!l_addr) { + dap_chain_node_cli_set_reply_text(a_str_reply, "addr not found"); + dap_string_free(l_string_reply, true); + return -1; + } + // read node + dap_chain_node_info_t *node_info_read = node_info_read_and_reply(a_net, l_addr, a_str_reply); + + // get aliases in form of string + dap_string_t *aliases_string = dap_string_new(NULL); + dap_list_t *list_aliases = get_aliases_by_name(a_net, l_addr); + if(list_aliases) + { + dap_list_t *list = list_aliases; + while(list) + { + const char *alias = (const char *) list->data; + dap_string_append_printf(aliases_string, "\nalias %s", alias); + list = dap_list_next(list); + } + dap_list_free_full(list_aliases, (dap_callback_destroyed_t) free); } + else + dap_string_append(aliases_string, "\nno aliases"); + + const int hostlen = 128; + char *host4 = (char*) alloca(hostlen); + char *host6 = (char*) alloca(hostlen); + struct sockaddr_in sa4 = { .sin_family = AF_INET, .sin_addr = node_info_read->hdr.ext_addr_v4 }; + const char* str_ip4 = inet_ntop(AF_INET, &(((struct sockaddr_in *) &sa4)->sin_addr), host4, hostlen); + + struct sockaddr_in6 sa6 = { .sin6_family = AF_INET6, .sin6_addr = node_info_read->hdr.ext_addr_v6 }; + const char* str_ip6 = inet_ntop(AF_INET6, &(((struct sockaddr_in6 *) &sa6)->sin6_addr), host6, hostlen); + + // get links in form of string + dap_string_t *links_string = dap_string_new(NULL); + for(unsigned int i = 0; i < node_info_read->hdr.links_number; i++) { + dap_chain_node_addr_t link_addr = node_info_read->links[i]; + dap_string_append_printf(links_string, "\nlink%02d address : " NODE_ADDR_FP_STR, i, + NODE_ADDR_FP_ARGS_S(link_addr)); + } + + dap_string_append_printf(l_string_reply, "\n"); + // set short reply with node param + if(!a_is_full) + dap_string_append_printf(l_string_reply, + "node address "NODE_ADDR_FP_STR"\tcell 0x%016llx\tipv4 %s\tnumber of links %u", + NODE_ADDR_FP_ARGS_S(node_info_read->hdr.address), + node_info_read->hdr.cell_id.uint64, str_ip4, + node_info_read->hdr.links_number); + else + // set full reply with node param + dap_string_append_printf(l_string_reply, + "node address " NODE_ADDR_FP_STR "\ncell 0x%016llx%s\nipv4 %s\nipv6 %s\nlinks %u%s", + NODE_ADDR_FP_ARGS_S(node_info_read->hdr.address), + node_info_read->hdr.cell_id.uint64, + str_ip4, str_ip6, aliases_string->str, + node_info_read->hdr.links_number, links_string->str); + dap_string_free(aliases_string, true); + dap_string_free(links_string, true); + + DAP_DELETE(l_addr); + DAP_DELETE(node_info_read); + + }else { // Dump list dap_global_db_obj_t *l_objs = NULL; size_t l_nodes_count = 0; dap_chain_node_info_t *l_node_info; + dap_string_append(l_string_reply, "\n"); // read all node l_objs = dap_chain_global_db_gr_load( a_net->pub.gdb_nodes, &l_nodes_count); if(!l_nodes_count || !l_objs) { dap_string_append_printf(l_string_reply, "No records\n"); + dap_string_free(l_string_reply, true); l_ret = -1; }else { dap_string_append_printf(l_string_reply,"Got %u records:\n",l_nodes_count); @@ -519,7 +584,7 @@ static int node_info_dump_with_reply(dap_chain_net_t * a_net, dap_chain_node_add if(i) dap_string_append_printf(l_string_reply, "\n"); // set short reply with node param - if(l_objs) + if(!a_is_full) dap_string_append_printf(l_string_reply, "node address "NODE_ADDR_FP_STR"\tcell 0x%016llx\tipv4 %s\tnumber of links %u", NODE_ADDR_FP_ARGS_S(node_info_read->hdr.address), @@ -598,6 +663,7 @@ int com_global_db(int a_argc, char ** a_argv, char **a_str_reply) const char *l_addr_str = NULL, *alias_str = NULL, *l_cell_str = NULL, *l_link_str = NULL; const char *a_ipv4_str = NULL, *a_ipv6_str = NULL; // find addr, alias + dap_chain_node_cli_find_option_val(a_argv, arg_index, a_argc, "-addr", &l_addr_str); dap_chain_node_cli_find_option_val(a_argv, arg_index, a_argc, "-alias", &alias_str); dap_chain_node_cli_find_option_val(a_argv, arg_index, a_argc, "-cell", &l_cell_str); @@ -612,7 +678,7 @@ int com_global_db(int a_argc, char ** a_argv, char **a_str_reply) l_node_info = DAP_NEW_Z_SIZE(dap_chain_node_info_t, l_node_info_size); if(l_addr_str) { - dap_digit_from_string(l_addr_str, l_node_info->hdr.address.raw, sizeof(l_node_info->hdr.address.raw)); + dap_digit_from_string2(l_addr_str, l_node_info->hdr.address.raw, sizeof(l_node_info->hdr.address.raw)); } if(l_cell_str) { dap_digit_from_string(l_cell_str, l_node_info->hdr.cell_id.raw, sizeof(l_node_info->hdr.cell_id.raw)); //DAP_CHAIN_CELL_ID_SIZE); @@ -688,10 +754,12 @@ int com_node(int a_argc, char ** a_argv, char **a_str_reply) dap_chain_node_addr_t l_node_addr={0}; const char *l_addr_str = NULL, *alias_str = NULL; const char * l_net_str = NULL; + // find addr, alias dap_chain_node_cli_find_option_val(a_argv, arg_index, a_argc, "-addr", &l_addr_str); dap_chain_node_cli_find_option_val(a_argv, arg_index, a_argc, "-alias", &alias_str); dap_chain_node_cli_find_option_val(a_argv, arg_index, a_argc, "-net", &l_net_str); + bool l_is_full = dap_chain_node_cli_find_option_val(a_argv, arg_index, a_argc, "-full", NULL); if (l_addr_str) if ( dap_chain_node_addr_from_str(&l_node_addr,l_addr_str) != 0 ) @@ -714,7 +782,7 @@ int com_node(int a_argc, char ** a_argv, char **a_str_reply) case CMD_DUMP:{ // handler of command 'global_db node dump' - return node_info_dump_with_reply(l_net, &l_node_addr , alias_str, a_str_reply); + return node_info_dump_with_reply(l_net, &l_node_addr , l_is_full, alias_str, a_str_reply); } // add alias case CMD_ALIAS: diff --git a/dap_chain_node_client.c b/dap_chain_node_client.c index ec4641f16d9bbd24838f7269fa6d89f7aed24745..81a6df8260ec6f3b487b56b92d9b99fb7a14a042 100644 --- a/dap_chain_node_client.c +++ b/dap_chain_node_client.c @@ -62,6 +62,7 @@ #define SYSTEM_CONFIGS_DIR SYSTEM_PREFIX"/etc" static int listen_port_tcp = 8079; + static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg); static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t*, uint8_t a_pkt_type, dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, @@ -131,8 +132,12 @@ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) pthread_mutex_unlock(&l_node_client->wait_mutex); log_it(L_DEBUG,"Wakeup all who waits"); l_node_client->state = NODE_CLIENT_STATE_ERROR; - pthread_cond_signal(&l_node_client->wait_cond); +#ifndef _WIN32 + pthread_cond_signal(&l_node_client->wait_cond); +#else + SetEvent( l_node_client->wait_cond ); +#endif //dap_client_go_stage( a_client , STAGE_STREAM_STREAMING, s_stage_end_callback ); } //printf("* tage_status_error_callback client=%x data=%x\n", a_client, a_arg); @@ -168,7 +173,13 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg) l_node_client->callback_connected(l_node_client,a_arg); l_node_client->keep_connection = true; log_it(L_DEBUG,"Wakeup all who waits"); + +#ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); +#else + SetEvent( l_node_client->wait_cond ); +#endif + } } @@ -193,13 +204,22 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha log_it(L_WARNING,"Received packet DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR with error \"%s\"", l_node_client->last_error); pthread_mutex_unlock(&l_node_client->wait_mutex); + +#ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); +#else + SetEvent( l_node_client->wait_cond ); +#endif case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_LEASE: pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_NODE_ADDR_LEASED; pthread_mutex_unlock(&l_node_client->wait_mutex); +#ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); +#else + SetEvent( l_node_client->wait_cond ); +#endif break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: @@ -238,14 +258,23 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_SYNCED; pthread_mutex_unlock(&l_node_client->wait_mutex); +#ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); +#else + SetEvent( l_node_client->wait_cond ); +#endif + } }else { log_it(L_INFO, "Sync notify without request to sync back, stay in SYNCED state"); pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_SYNCED; pthread_mutex_unlock(&l_node_client->wait_mutex); +#ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); +#else + SetEvent( l_node_client->wait_cond ); +#endif } } @@ -276,7 +305,11 @@ static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t* a_ch_ch pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_SYNCED; pthread_mutex_unlock(&l_node_client->wait_mutex); +#ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); +#else + SetEvent( l_node_client->wait_cond ); +#endif }break; default:{} } @@ -295,10 +328,16 @@ dap_chain_node_client_t* dap_chain_node_client_connect(dap_chain_node_info_t *a_ } dap_chain_node_client_t *l_node_client = DAP_NEW_Z(dap_chain_node_client_t); l_node_client->state = NODE_CLIENT_STATE_DISCONNECTED; + +#ifndef _WIN32 pthread_condattr_t attr; pthread_condattr_init(&attr); pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); pthread_cond_init(&l_node_client->wait_cond, &attr); +#else + l_node_client->wait_cond = CreateEventA( NULL, FALSE, FALSE, NULL ); +#endif + pthread_mutex_init(&l_node_client->wait_mutex, NULL); l_node_client->events = NULL; //dap_events_new(); l_node_client->client = dap_client_new(l_node_client->events, stage_status_callback, s_stage_status_error_callback); @@ -348,7 +387,11 @@ void dap_chain_node_client_close(dap_chain_node_client_t *a_client) //dap_client_delete(a_client->client); //a_client->client = NULL; +#ifndef _WIN32 pthread_cond_destroy(&a_client->wait_cond); +#else + CloseHandle( a_client->wait_cond ); +#endif pthread_mutex_destroy(&a_client->wait_mutex); DAP_DELETE(a_client); } @@ -384,32 +427,41 @@ int dap_chain_node_client_send_ch_pkt(dap_chain_node_client_t *a_client, uint8_t * waited_state state which we will wait, sample NODE_CLIENT_STATE_CONNECT or NODE_CLIENT_STATE_SENDED * return -2 false, -1 timeout, 0 end of connection or sending data */ -int dap_chain_node_client_wait(dap_chain_node_client_t *a_client, int a_waited_state, int a_timeout_ms) +int dap_chain_node_client_wait( dap_chain_node_client_t *a_client, int a_waited_state, int a_timeout_ms ) { int ret = -1; - if(!a_client) + if( !a_client ) return -3; - pthread_mutex_lock(&a_client->wait_mutex); + + pthread_mutex_lock( &a_client->wait_mutex ); // have waited - if(a_client->state == a_waited_state) { - pthread_mutex_unlock(&a_client->wait_mutex); + if ( a_client->state == a_waited_state ) { + pthread_mutex_unlock( &a_client->wait_mutex ); return 0; } + +#ifndef _WIN32 // prepare for signal waiting struct timespec to; - clock_gettime(CLOCK_MONOTONIC, &to); + clock_gettime( CLOCK_MONOTONIC, &to ); int64_t nsec_new = to.tv_nsec + a_timeout_ms * 1000000ll; // if the new number of nanoseconds is more than a second - if(nsec_new > (long) 1e9) { + + if ( nsec_new > (long) 1e9 ) { to.tv_sec += nsec_new / (long) 1e9; to.tv_nsec = nsec_new % (long) 1e9; } else to.tv_nsec = (long) nsec_new; +#else + pthread_mutex_unlock( &a_client->wait_mutex ); +#endif + // signal waiting do { - int wait = pthread_cond_timedwait(&a_client->wait_cond, &a_client->wait_mutex, &to); +#ifndef _WIN32 + int wait = pthread_cond_timedwait( &a_client->wait_cond, &a_client->wait_mutex, &to); if(wait == 0 && ( a_client->state == a_waited_state || a_client->state == NODE_CLIENT_STATE_ERROR ) @@ -421,8 +473,25 @@ int dap_chain_node_client_wait(dap_chain_node_client_t *a_client, int a_waited_s ret = -1; break; } - } - while(1); - pthread_mutex_unlock(&a_client->wait_mutex); +#else + int wait = WaitForSingleObject( a_client->wait_cond, (uint32_t)a_timeout_ms ); + pthread_mutex_lock( &a_client->wait_mutex ); + + if ( wait == WAIT_OBJECT_0 && ( + a_client->state == a_waited_state || + a_client->state == NODE_CLIENT_STATE_ERROR ) + ) { + ret = a_client->state == a_waited_state ? 0 : -2; + break; + } + else if ( wait == WAIT_TIMEOUT || wait == WAIT_FAILED ) { + ret = -1; + break; + } +#endif + + } while( 1 ); + + pthread_mutex_unlock( &a_client->wait_mutex ); return ret; } diff --git a/dap_chain_node_client.h b/dap_chain_node_client.h index 200519dbcd8541931fa82185872d5ed344ffd17d..1ec69226d4b4a398af461ee61bf2c4abb310e27a 100644 --- a/dap_chain_node_client.h +++ b/dap_chain_node_client.h @@ -58,7 +58,11 @@ typedef struct dap_chain_node_client { char last_error[128]; dap_chain_node_client_callback_t callback_connected; +#ifndef _WIN32 pthread_cond_t wait_cond; +#else + HANDLE wait_cond; +#endif pthread_mutex_t wait_mutex; // For hash indexing