diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 6f07440c1877d006d9aac0a89c5d30a985aaa8d1..56e006a5118cf9c7cfd572aac1fcc2affff89f3b 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -277,7 +277,7 @@ int dap_client_pvt_disconnect(dap_client_pvt_t *a_client_pvt) // l_client_internal->stream_es->signal_close = true; // start stopping connection - if(!dap_events_socket_kill_socket(a_client_pvt->stream_es)) { + if(a_client_pvt->stream_es && !dap_events_socket_kill_socket(a_client_pvt->stream_es)) { int l_counter = 0; // wait for stop of connection (max 0.7 sec.) while(a_client_pvt->stream_es && l_counter < 70) { @@ -485,10 +485,9 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) a_client_pvt->stream_es->_inheritor = a_client_pvt;//->client; a_client_pvt->stream = dap_stream_new_es(a_client_pvt->stream_es); a_client_pvt->stream->is_client_to_uplink = true; - a_client_pvt->stream_session = dap_stream_session_pure_new(); // may be from in packet? + a_client_pvt->stream->session = dap_stream_session_pure_new(); // may be from in packet? // new added, whether it is necessary? - a_client_pvt->stream->session = a_client_pvt->stream_session; a_client_pvt->stream->session->key = a_client_pvt->stream_key; // connect @@ -1201,39 +1200,21 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg) { log_it(L_INFO, "================= stream delete/peer reconnect"); - //dap_client_t *l_client = DAP_CLIENT(a_es); dap_client_pvt_t * l_client_pvt = a_es->_inheritor; if(l_client_pvt == NULL) { log_it(L_ERROR, "dap_client_pvt_t is not initialized"); return; } - //pthread_mutex_lock(&l_client->mutex); - //dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client); log_it(L_DEBUG, "client_pvt=0x%x", l_client_pvt); - if(l_client_pvt == NULL) { - log_it(L_ERROR, "dap_client_pvt is not initialized"); - //pthread_mutex_unlock(&l_client->mutex); - return; - } + if (l_client_pvt->stage_status_error_callback) { + l_client_pvt->stage_status_error_callback(l_client_pvt->client, (void *)true); + } dap_stream_delete(l_client_pvt->stream); l_client_pvt->stream = NULL; - -// if(l_client_pvt->client && l_client_pvt->client == l_client) -// dap_client_reset(l_client_pvt->client); -// l_client_pvt->client= NULL; - -// log_it(L_DEBUG, "dap_stream_session_close()"); -// sleep(3); - dap_stream_session_close(l_client_pvt->stream_session->id); - l_client_pvt->stream_session = NULL; - - // signal to permit deleting of l_client_pvt l_client_pvt->stream_es = NULL; - //pthread_mutex_unlock(&l_client->mutex); - /* disable reconnect from here if(l_client_pvt->is_reconnect) { diff --git a/dap-sdk/net/client/include/dap_client_pvt.h b/dap-sdk/net/client/include/dap_client_pvt.h index 04aa7439242657964d7bd4bf8f5627aa36af1241..5bb82ed0c274ed7b93bea1ce0caefbbce40e47e5 100644 --- a/dap-sdk/net/client/include/dap_client_pvt.h +++ b/dap-sdk/net/client/include/dap_client_pvt.h @@ -41,7 +41,6 @@ typedef struct dap_client_internal dap_events_socket_t * stream_es; int stream_socket; - dap_stream_session_t * stream_session; dap_stream_t* stream; dap_events_t * events; diff --git a/dap-sdk/net/core/dap_timerfd.c b/dap-sdk/net/core/dap_timerfd.c index dec3016a0dd01ae237383a6e8b0728d912cdf44f..89bf5f116b86707e539d67918356742a984d88c4 100644 --- a/dap-sdk/net/core/dap_timerfd.c +++ b/dap-sdk/net/core/dap_timerfd.c @@ -21,7 +21,7 @@ You should have received a copy of the GNU General Public License along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. */ - +#ifndef WIN32 #include <stdint.h> #include <stdbool.h> #include <errno.h> @@ -149,4 +149,4 @@ int dap_timerfd_delete(dap_timerfd_t *l_timerfd) DAP_DELETE(l_timerfd); return 0; } - +#endif diff --git a/dap-sdk/net/stream/ch/dap_stream_ch.c b/dap-sdk/net/stream/ch/dap_stream_ch.c index a56a182ca3e32f97fa118e86d77494dde77cfbc1..2d589d439fe0fe1b0bc3dda5d079b1dab7fd70c1 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch.c @@ -115,13 +115,16 @@ dap_stream_ch_t* dap_stream_ch_new(dap_stream_t* a_stream, uint8_t id) } } -bool dap_stream_ch_valid(dap_stream_ch_t *a_ch) +struct dap_stream_ch_table_t *dap_stream_ch_valid(dap_stream_ch_t *a_ch) { struct dap_stream_ch_table_t *l_ret; if(!a_ch) return false; pthread_mutex_lock(&s_ch_table_lock); HASH_FIND_PTR(s_ch_table, &a_ch, l_ret); + if (l_ret) { + pthread_mutex_lock(&a_ch->mutex); + } pthread_mutex_unlock(&s_ch_table_lock); return l_ret; } @@ -140,16 +143,20 @@ void dap_stream_ch_delete(dap_stream_ch_t *a_ch) return; } HASH_DEL(s_ch_table, l_ret); + pthread_mutex_lock(&a_ch->mutex); pthread_mutex_unlock(&s_ch_table_lock); DAP_DELETE(l_ret); - pthread_mutex_lock(&a_ch->mutex); if (a_ch->proc) if (a_ch->proc->delete_callback) a_ch->proc->delete_callback(a_ch, NULL); pthread_mutex_unlock(&a_ch->mutex); pthread_mutex_destroy(&a_ch->mutex); + //pthread_rwlock_wrlock(&a_ch->stream->rwlock); + a_ch->stream->channel[a_ch->stream->channel_count--] = NULL; + //pthread_rwlock_unlock(&a_ch->stream->rwlock); + /* fixed raise, but probably may be memory leak! if(ch->internal){ free(ch->internal); @@ -168,7 +175,6 @@ void dap_stream_ch_set_ready_to_read(dap_stream_ch_t * a_ch,bool a_is_ready) if (!dap_stream_ch_valid(a_ch)) { return; } - pthread_mutex_lock(&a_ch->mutex); if( a_ch->ready_to_read != a_is_ready){ //log_it(L_DEBUG,"Change channel '%c' to %s", (char) ch->proc->id, is_ready?"true":"false"); a_ch->ready_to_read=a_is_ready; @@ -194,7 +200,6 @@ void dap_stream_ch_set_ready_to_write(dap_stream_ch_t * ch,bool is_ready) if (!dap_stream_ch_valid(ch)) { return; } - pthread_mutex_lock(&ch->mutex); if(ch->ready_to_write!=is_ready){ //log_it(L_DEBUG,"Change channel '%c' to %s", (char) ch->proc->id, is_ready?"true":"false"); ch->ready_to_write=is_ready; @@ -223,7 +228,6 @@ bool dap_stream_ch_get_ready_to_read(dap_stream_ch_t * a_ch) return false; } bool l_ret; - pthread_mutex_lock(&a_ch->mutex); l_ret = a_ch->ready_to_read; pthread_mutex_unlock(&a_ch->mutex); return l_ret; @@ -240,7 +244,6 @@ bool dap_stream_ch_get_ready_to_write(dap_stream_ch_t * a_ch) return false; } bool l_ret; - pthread_mutex_lock(&a_ch->mutex); l_ret = a_ch->ready_to_write; pthread_mutex_unlock(&a_ch->mutex); return l_ret; diff --git a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c index b1600592b17f4572c400430a2aba24b0fb123f24..db22f60fae5d208182e50fefebc60077ada7869d 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c @@ -82,7 +82,6 @@ size_t dap_stream_ch_pkt_write(struct dap_stream_ch * a_ch, uint8_t a_type, con if (!dap_stream_ch_valid(a_ch)) { return 0; } - pthread_mutex_lock( &a_ch->mutex); //log_it(L_DEBUG,"Output: Has %u bytes of %c type for %c channel id",data_size, (char)type, (char) ch->proc->id ); diff --git a/dap-sdk/net/stream/ch/include/dap_stream_ch.h b/dap-sdk/net/stream/ch/include/dap_stream_ch.h index a9c1f484fee1a16f231be3a3ea13a1d046f5a834..ff3368e064f2d65e0a8163696c78a687f7bf2eb9 100644 --- a/dap-sdk/net/stream/ch/include/dap_stream_ch.h +++ b/dap-sdk/net/stream/ch/include/dap_stream_ch.h @@ -63,6 +63,6 @@ bool dap_stream_ch_get_ready_to_write(dap_stream_ch_t *a_ch); void dap_stream_ch_delete(dap_stream_ch_t *a_ch); -bool dap_stream_ch_valid(dap_stream_ch_t *a_ch); +struct dap_stream_ch_table_t *dap_stream_ch_valid(dap_stream_ch_t *a_ch); #endif diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index a5eef987c52b5aaea2d9c24d5d48f27cd0806383..ee3b1fd7d5abeee6d461ae2c2d310fd1735fe5fe 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -338,19 +338,50 @@ dap_stream_t * stream_new(dap_http_client_t * a_sh) return ret; } -void dap_stream_delete( dap_stream_t *a_stream ) +void dap_stream_delete(dap_stream_t *a_stream) { -// log_it(L_DEBUG,"dap_stream_delete( )"); if(a_stream == NULL) { log_it(L_ERROR,"stream delete NULL instance"); return; } - pthread_rwlock_destroy(&a_stream->rwlock); - stream_dap_delete(a_stream->conn, NULL); + pthread_mutex_lock(&s_mutex_keepalive_list); + if(s_stream_keepalive_list){ + DL_DELETE(s_stream_keepalive_list, a_stream); + } + a_stream->conn_udp = NULL; + a_stream->conn = NULL; + a_stream->events_socket = NULL; + pthread_mutex_unlock(&s_mutex_keepalive_list); + while (a_stream->channel_count) { + dap_stream_ch_delete(a_stream->channel[a_stream->channel_count - 1]); + } + + pthread_rwlock_wrlock(&a_stream->rwlock); + if(a_stream->session) + dap_stream_session_close(a_stream->session->id); + a_stream->session = NULL; + pthread_rwlock_unlock(&a_stream->rwlock); + pthread_rwlock_destroy(&a_stream->rwlock); DAP_DELETE(a_stream); + log_it(L_NOTICE,"Stream connection is over"); +} + +/** + * @brief stream_dap_delete Delete callback for UDP client + * @param sh DAP client instance + * @param arg Not used + */ +void stream_dap_delete(dap_client_remote_t* sh, void * arg) +{ + UNUSED(arg); + if (!sh) + return; + dap_stream_t *l_stream = DAP_STREAM(sh); + dap_stream_delete(l_stream); } + /** * @brief dap_stream_new_es * @param a_es @@ -678,42 +709,6 @@ void stream_dap_data_write(dap_client_remote_t* a_client , void * arg){ //log_it(L_DEBUG,"stream_dap_data_write ok"); } -/** - * @brief stream_dap_delete Delete callback for UDP client - * @param sh DAP client instance - * @param arg Not used - */ -void stream_dap_delete(dap_client_remote_t* sh, void * arg){ - if(!sh) - return; - dap_stream_t * l_stream = DAP_STREAM(sh); - if(l_stream == NULL) - return; - (void) arg; - - pthread_mutex_lock(&s_mutex_keepalive_list); - if(s_stream_keepalive_list){ - DL_DELETE(s_stream_keepalive_list, l_stream); - } - pthread_mutex_unlock(&s_mutex_keepalive_list); - - /* Until channel is closed, it may still need l_stream->rwlock, so we can't lock it here yet. - In case of races on stream closing think about making this place more robust; - or forbid locking l_stream->rwlock from inside of channels. */ - for( ;l_stream->channel_count; l_stream->channel_count--){ - dap_stream_ch_delete(l_stream->channel[l_stream->channel_count - 1]); - l_stream->channel[l_stream->channel_count - 1] = NULL; - } - - pthread_rwlock_wrlock(&l_stream->rwlock); - if(l_stream->session) - dap_stream_session_close(l_stream->session->id); - l_stream->session = NULL; - pthread_rwlock_unlock(&l_stream->rwlock); - //free(sid); - log_it(L_NOTICE,"Stream connection is over"); -} - /** * @brief stream_dap_new New connection callback for UDP client * @param sh DAP client instance diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 430a5fd6649bd701deaf6819118ffc556455d97f..140bf61c3c3c2d6ec73f4998d7eccefa2fedbc6a 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -516,8 +516,10 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_chains, sizeof(l_sync_chains)); } default: { - //log_it(L_INFO, "Get %s packet", c_dap_stream_ch_chain_pkt_type_str[l_ch_pkt->hdr.type]); - } + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_UNKNOWN_CHAIN_PKT_TYPE"); + } } if(l_ch_chain->callback_notify_packet_in) l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt, diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 89503c1e4da0af1745cb3a4a2bbfd07fe29ba501..089091aad853318aba2599f6a6f5777d10b9f4f1 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -366,7 +366,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net) if (l_pvt_net->node_info) { for (size_t i = 0; i < l_pvt_net->node_info->hdr.links_number; i++) { dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(l_net, &l_pvt_net->node_info->links[i]); - if (l_link_node_info->hdr.address.uint64 == l_pvt_net->node_info->hdr.address.uint64) { + if (!l_link_node_info || l_link_node_info->hdr.address.uint64 == l_pvt_net->node_info->hdr.address.uint64) { continue; // Do not link with self } l_pvt_net->links_info = dap_list_append(l_pvt_net->links_info, l_link_node_info); @@ -374,6 +374,11 @@ static int s_net_states_proc(dap_chain_net_t * l_net) } else { log_it(L_WARNING,"No nodeinfo in global_db to prepare links for connecting, find nearest 3 links and fill global_db"); } + if (!l_pvt_net->seed_aliases_count) { + log_it(L_ERROR, "No root servers present in configuration file. Can't establish DNS requests"); + l_pvt_net->state_target = l_pvt_net->state = NET_STATE_OFFLINE; + break; + } switch (l_pvt_net->node_role.enums) { case NODE_ROLE_ROOT: case NODE_ROLE_ROOT_MASTER: @@ -453,8 +458,15 @@ static int s_net_states_proc(dap_chain_net_t * l_net) } break; case NET_STATE_SYNC_GDB:{ - for (dap_list_t *l_tmp = l_pvt_net->links; l_tmp; l_tmp = dap_list_next(l_tmp)) { + for (dap_list_t *l_tmp = l_pvt_net->links; l_tmp; ) { dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)l_tmp->data; + dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id()); + if (!l_ch_chain) { // Channel or stream or client itself closed + l_tmp = dap_list_next(l_tmp); + dap_chain_node_client_close(l_node_client); + l_pvt_net->links = dap_list_remove(l_pvt_net->links, l_node_client); + continue; + } dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; // Get last timestamp in log l_sync_gdb.id_start = (uint64_t) dap_db_log_get_last_id_remote(l_node_client->remote_node_addr.uint64); @@ -505,8 +517,11 @@ static int s_net_states_proc(dap_chain_net_t * l_net) default: log_it(L_INFO, "Node sync error %d",l_res); } + l_tmp = dap_list_next(l_tmp); } - if (l_pvt_net->state_target >= NET_STATE_SYNC_CHAINS){ + if (!l_pvt_net->links) { + l_pvt_net->state = NET_STATE_LINKS_PREPARE; + } else if (l_pvt_net->state_target >= NET_STATE_SYNC_CHAINS) { l_pvt_net->state = NET_STATE_SYNC_CHAINS; } else { // Synchronization done, go offline log_it(L_INFO, "Synchronization done"); @@ -516,12 +531,14 @@ static int s_net_states_proc(dap_chain_net_t * l_net) break; case NET_STATE_SYNC_CHAINS: { + bool l_need_flush = false; for (dap_list_t *l_tmp = l_pvt_net->links; l_tmp; l_tmp = dap_list_next(l_tmp)) { dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)l_tmp->data; - uint8_t l_ch_id = dap_stream_ch_chain_get_id(); // Channel id for global_db and chains sync - dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch(l_node_client->client, l_ch_id); - if (!l_ch_chain) { - log_it(L_DEBUG,"Can't get stream_ch for id='%c' ", l_ch_id); + dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id()); + if (!l_ch_chain) { // Channel or stream or client itself closed + l_tmp = dap_list_next(l_tmp); + dap_chain_node_client_close(l_node_client); + l_pvt_net->links = dap_list_remove(l_pvt_net->links, l_node_client); continue; } dap_chain_t * l_chain = NULL; @@ -540,24 +557,22 @@ static int s_net_states_proc(dap_chain_net_t * l_net) log_it(L_WARNING,"Timeout with sync of chain '%s' ", l_chain->name); break; case 0: - // flush global_db - dap_chain_global_db_flush(); - log_it(L_INFO, "sync of chain '%s' completed ", l_chain->name); + l_need_flush = true; + log_it(L_INFO, "Sync of chain '%s' completed ", l_chain->name); break; default: - log_it(L_ERROR, "sync of chain '%s' error %d", l_chain->name,l_res); + log_it(L_ERROR, "Sync of chain '%s' error %d", l_chain->name,l_res); } dap_stream_ch_chain_pkt_write(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS, l_net->pub.id, l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request)); l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); switch (l_res) { case -1: - log_it(L_WARNING,"Timeout with sync of chain '%s' ", l_chain->name); + log_it(L_WARNING,"Timeout with reverse sync of chain '%s' ", l_chain->name); break; case 0: - // flush global_db - dap_chain_global_db_flush(); - log_it(L_INFO, "sync of chain '%s' completed ", l_chain->name); + l_need_flush = true; + log_it(L_INFO, "Reverse sync of chain '%s' completed ", l_chain->name); // set time of last sync { struct timespec l_to; @@ -566,16 +581,25 @@ static int s_net_states_proc(dap_chain_net_t * l_net) } break; default: - log_it(L_ERROR, "sync of chain '%s' error %d", l_chain->name,l_res); + log_it(L_ERROR, "Reverse sync of chain '%s' error %d", l_chain->name,l_res); } } + l_tmp = dap_list_next(l_tmp); } - log_it(L_INFO, "Synchronization done"); - if (l_pvt_net->state_target == NET_STATE_ONLINE) { - l_pvt_net->flags &= ~F_DAP_CHAIN_NET_GO_SYNC; - l_pvt_net->state = NET_STATE_ONLINE; - } else { // Synchronization done, go offline - l_pvt_net->state = l_pvt_net->state_target = NET_STATE_OFFLINE; + if (l_need_flush) { + // flush global_db + dap_chain_global_db_flush(); + } + if (!l_pvt_net->links) { + l_pvt_net->state = NET_STATE_LINKS_PREPARE; + } else { + log_it(L_INFO, "Synchronization done"); + if (l_pvt_net->state_target == NET_STATE_ONLINE) { + l_pvt_net->flags &= ~F_DAP_CHAIN_NET_GO_SYNC; + l_pvt_net->state = NET_STATE_ONLINE; + } else { // Synchronization done, go offline + l_pvt_net->state = l_pvt_net->state_target = NET_STATE_OFFLINE; + } } } break; @@ -653,7 +677,7 @@ static void *s_net_proc_thread ( void *a_net ) #endif // checking whether new sync is needed - time_t l_sync_timeout = 300; // 300 sec = 5 min + time_t l_sync_timeout = 1800; // 1800 sec = 30 min clock_gettime(CLOCK_MONOTONIC, &l_to); // start sync every l_sync_timeout sec if(l_to.tv_sec >= p_net->last_sync + l_sync_timeout) { diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 4647568525e0b222307464675ca6d353df626719..598041a8475773de9ddbb42254ca7bfdd6619c5a 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -120,12 +120,12 @@ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) if(l_is_last_attempt){ pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_DISCONNECTED; - pthread_mutex_unlock(&l_node_client->wait_mutex); #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else SetEvent( l_node_client->wait_cond ); #endif + pthread_mutex_unlock(&l_node_client->wait_mutex); } if(l_node_client && l_node_client->keep_connection && @@ -137,13 +137,13 @@ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) log_it(L_DEBUG, "Wakeup all who waits"); pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_ERROR; - pthread_mutex_unlock(&l_node_client->wait_mutex); #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else SetEvent( l_node_client->wait_cond ); #endif + pthread_mutex_unlock(&l_node_client->wait_mutex); //dap_client_go_stage( a_client , STAGE_STREAM_STREAMING, s_stage_end_callback ); } //printf("* tage_status_error_callback client=%x data=%x\n", a_client, a_arg); @@ -161,9 +161,6 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg) if(l_node_client) { log_it(L_NOTICE, "Stream connection with node " NODE_ADDR_FP_STR " established", NODE_ADDR_FP_ARGS_S( l_node_client->remote_node_addr)); - pthread_mutex_lock(&l_node_client->wait_mutex); - l_node_client->state = NODE_CLIENT_STATE_CONNECTED; - pthread_mutex_unlock(&l_node_client->wait_mutex); // set callbacks for C and N channels; for R and S it is not needed dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); if(l_client_internal && l_client_internal->active_channels) { @@ -174,31 +171,18 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg) } } } - /* - // find current channel code - dap_stream_ch_t * l_ch = NULL; - l_ch = dap_client_get_stream_ch(a_client, l_client_internal->active_channels[0]); - //dap_stream_ch_t * l_ch = dap_client_get_stream_ch(a_client, dap_stream_ch_chain_get_id()); - if(l_ch) { - dap_chain_node_client_set_callbacks(dap_client_t *a_client, uint8_t a_ch_id) - dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); - l_ch_chain->callback_notify_packet_out = s_ch_chain_callback_notify_packet_out; - l_ch_chain->callback_notify_packet_in = s_ch_chain_callback_notify_packet_in; - l_ch_chain->callback_notify_arg = l_node_client; - } else { - log_it(L_WARNING, "No ch_chain channel, can't init notify callback for pkt type CH_CHAIN"); - }*/ if(l_node_client->callback_connected) l_node_client->callback_connected(l_node_client, a_arg); l_node_client->keep_connection = true; log_it(L_DEBUG, "Wakeup all who waits"); - + pthread_mutex_lock(&l_node_client->wait_mutex); + l_node_client->state = NODE_CLIENT_STATE_CONNECTED; #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else SetEvent( l_node_client->wait_cond ); #endif - + pthread_mutex_unlock(&l_node_client->wait_mutex); } } @@ -223,12 +207,12 @@ static void s_ch_chain_callback_notify_packet_in2(dap_stream_ch_chain_net_t* a_c } pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_NODE_ADDR_LEASED; - pthread_mutex_unlock(&l_node_client->wait_mutex); #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else - SetEvent( l_node_client->wait_cond ); - #endif + SetEvent( l_node_client->wait_cond ); +#endif + pthread_mutex_unlock(&l_node_client->wait_mutex); break; } // get remote node address @@ -240,13 +224,13 @@ static void s_ch_chain_callback_notify_packet_in2(dap_stream_ch_chain_net_t* a_c } pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_GET_NODE_ADDR; - pthread_mutex_unlock(&l_node_client->wait_mutex); #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else SetEvent( l_node_client->wait_cond ); #endif - break; + pthread_mutex_unlock(&l_node_client->wait_mutex); + break; } } } @@ -267,20 +251,20 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg; switch (a_pkt_type) { case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR: - pthread_mutex_lock(&l_node_client->wait_mutex); - l_node_client->state = NODE_CLIENT_STATE_ERROR; dap_snprintf(l_node_client->last_error, sizeof(l_node_client->last_error), "%s", (char*) a_pkt->data); log_it(L_WARNING, "Received packet DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR with error \"%s\"", l_node_client->last_error); - pthread_mutex_unlock(&l_node_client->wait_mutex); + pthread_mutex_lock(&l_node_client->wait_mutex); + l_node_client->state = NODE_CLIENT_STATE_ERROR; #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else - SetEvent( l_node_client->wait_cond ); + SetEvent( l_node_client->wait_cond ); #endif - + pthread_mutex_unlock(&l_node_client->wait_mutex); + break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { @@ -387,25 +371,24 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha log_it(L_INFO, "Remote node has lastes timestamp for us type=%d", a_pkt_type); pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_SYNCED; - pthread_mutex_unlock(&l_node_client->wait_mutex); #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else SetEvent( l_node_client->wait_cond ); #endif - + pthread_mutex_unlock(&l_node_client->wait_mutex); } } } else { log_it(L_INFO, "Sync notify without request to sync back, stay in SYNCED state"); pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_SYNCED; - pthread_mutex_unlock(&l_node_client->wait_mutex); #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else - SetEvent( l_node_client->wait_cond ); + SetEvent( l_node_client->wait_cond ); #endif + pthread_mutex_unlock(&l_node_client->wait_mutex); } } @@ -438,12 +421,12 @@ static void s_ch_chain_callback_notify_packet_out(dap_stream_ch_chain_t* a_ch_ch case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_SYNCED; - pthread_mutex_unlock(&l_node_client->wait_mutex); #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else - SetEvent( l_node_client->wait_cond ); + SetEvent( l_node_client->wait_cond ); #endif + pthread_mutex_unlock(&l_node_client->wait_mutex); } break; default: { @@ -520,12 +503,12 @@ static void s_ch_chain_callback_notify_packet_R(dap_stream_ch_chain_net_srv_t* a //... pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_CHECKED; - pthread_mutex_unlock(&l_node_client->wait_mutex); #ifndef _WIN32 pthread_cond_signal(&l_node_client->wait_cond); #else SetEvent( l_node_client->wait_cond ); #endif + pthread_mutex_unlock(&l_node_client->wait_mutex); break; } }