diff --git a/dap-sdk/net/client/dap_client.c b/dap-sdk/net/client/dap_client.c index 89d098685ebe254624151aa3682b8fed7db2578a..cda4e104fd42c8f8bd72f5766f2dbd06e4c8df45 100644 --- a/dap-sdk/net/client/dap_client.c +++ b/dap-sdk/net/client/dap_client.c @@ -187,6 +187,46 @@ void dap_client_set_auth_cert_unsafe(dap_client_t * a_client, dap_cert_t *a_cert DAP_CLIENT_PVT(a_client)->auth_cert = a_cert; } +/** + * @brief dap_client_set_auth_cert + * @param a_client + * @param a_chain_net_name + * @param a_option + */ +void dap_client_set_auth_cert(dap_client_t *a_client, const char *a_chain_net_name) +{ + const char *l_auth_hash_str = NULL; + + if(a_client == NULL || a_chain_net_name == NULL){ + log_it(L_ERROR,"Chain-net is NULL for dap_client_set_auth_cert"); + return; + } + + char *l_path = dap_strdup_printf("network/%s", a_chain_net_name); + if (!l_path) { + log_it(L_ERROR, "Can't allocate memory: file: %s line: %d", __FILE__, __LINE__); + return; + } + + dap_config_t *l_cfg = dap_config_open(l_path); + free(l_path); + if (!l_cfg) { + log_it(L_ERROR, "Can't allocate memory: file: %s line: %d", __FILE__, __LINE__); + return; + } + + dap_cert_t *l_cert = dap_cert_find_by_name(dap_config_get_item_str(l_cfg, "general", "auth_cert")); + if (!l_cert) { + dap_config_close(l_cfg); + log_it(L_ERROR,"l_cert is NULL by dap_cert_find_by_name"); + return; + } + dap_client_set_auth_cert_unsafe(a_client, l_cert); + + //dap_cert_delete(l_cert); + dap_config_close(l_cfg); +} + /** * @brief s_client_delete * @param a_client @@ -232,7 +272,7 @@ static void s_go_stage_on_client_worker_unsafe(dap_worker_t * a_worker,void * a_ dap_client_stage_t l_stage_target = ((struct go_stage_arg*) a_arg)->stage_target; dap_client_callback_t l_stage_end_callback= ((struct go_stage_arg*) a_arg)->stage_end_callback; dap_client_pvt_t * l_client_pvt = ((struct go_stage_arg*) a_arg)->client_pvt; - dap_client_t * l_client = ((struct go_stage_arg*) a_arg)->client_pvt->client; + dap_client_t *l_client = l_client_pvt->client; bool l_flag_delete_after = ((struct go_stage_arg *) a_arg)->flag_delete_after ;// Delete after stage achievement DAP_DELETE(a_arg); @@ -321,7 +361,10 @@ void dap_client_go_stage(dap_client_t * a_client, dap_client_stage_t a_stage_tar } dap_client_pvt_t * l_client_pvt = dap_client_pvt_find(a_client->pvt_uuid); - assert(l_client_pvt); + if (NULL == l_client_pvt) { + log_it(L_ERROR, "dap_client_go_stage, client_pvt == NULL"); + return; + } struct go_stage_arg *l_stage_arg = DAP_NEW_Z(struct go_stage_arg); if (! l_stage_arg) return; l_stage_arg->stage_end_callback = a_stage_end_callback; diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 4dc04661e783517de391d5560a68120cfebae637..d6f20b3581edec47188047ab164ce90110b92108 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -159,50 +159,6 @@ void dap_client_pvt_new(dap_client_pvt_t * a_client_pvt) dap_client_pvt_hh_add_unsafe(a_client_pvt); } - - -/** - * @brief dap_client_set_auth_cert - * @param a_client - * @param a_chain_net_name - * @param a_option - */ -void dap_client_set_auth_cert(dap_client_t *a_client, const char *a_chain_net_name) -{ - const char *l_auth_hash_str = NULL; - - if(a_client == NULL || a_chain_net_name == NULL){ - log_it(L_ERROR,"Chain-net is NULL for dap_client_set_auth_cert"); - return; - } - - char *l_path = dap_strdup_printf("network/%s", a_chain_net_name); - if (!l_path) { - log_it(L_ERROR, "Can't allocate memory: file: %s line: %d", __FILE__, __LINE__); - return; - } - - dap_config_t *l_cfg = dap_config_open(l_path); - free(l_path); - if (!l_cfg) { - log_it(L_ERROR, "Can't allocate memory: file: %s line: %d", __FILE__, __LINE__); - return; - } - - dap_cert_t *l_cert = dap_cert_find_by_name(dap_config_get_item_str(l_cfg, "general", "auth_cert")); - if (!l_cert) { - dap_config_close(l_cfg); - log_it(L_ERROR,"l_cert is NULL by dap_cert_find_by_name"); - return; - } - dap_client_set_auth_cert_unsafe(a_client, l_cert); - - //dap_cert_delete(l_cert); - dap_config_close(l_cfg); -} - - - /** * @brief dap_client_pvt_delete_unsafe * @param a_client_pvt @@ -980,7 +936,7 @@ static void s_request_response(void * a_response, size_t a_response_size, void * static void s_enc_init_response(dap_client_t * a_client, void * a_response, size_t a_response_size) { dap_client_pvt_t * l_client_pvt = dap_client_pvt_find(a_client->pvt_uuid); - if (!l_client_pvt) return; + if (!l_client_pvt || l_client_pvt->is_to_delete) return; if (!l_client_pvt->session_key_open){ log_it(L_ERROR, "m_enc_init_response: session is NULL!"); diff --git a/dap-sdk/net/client/include/dap_client.h b/dap-sdk/net/client/include/dap_client.h index 8457e838b7cfd55a0c0ca6eabdce765d5d20e815..0173b8cc673cb8db251c47f18b0bf3f78b308cb0 100644 --- a/dap-sdk/net/client/include/dap_client.h +++ b/dap-sdk/net/client/include/dap_client.h @@ -139,6 +139,7 @@ dap_stream_ch_t * dap_client_get_stream_ch_unsafe(dap_client_t * a_client, uint8 const char * dap_client_get_stream_id(dap_client_t * a_client); void dap_client_set_active_channels_unsafe (dap_client_t * a_client, const char * a_active_channels); void dap_client_set_auth_cert_unsafe(dap_client_t * a_client, dap_cert_t *a_cert); +void dap_client_set_auth_cert(dap_client_t *a_client, const char *a_chain_net_name); dap_client_stage_t dap_client_get_stage(dap_client_t * a_client); dap_client_stage_status_t dap_client_get_stage_status(dap_client_t * a_client); diff --git a/dap-sdk/net/core/dap_timerfd.c b/dap-sdk/net/core/dap_timerfd.c index 044dca1e62f523d2a7a3aa9fef59f8fd45be9e4d..cdda7e26de2c1581684415b72b617c03bae17466 100644 --- a/dap-sdk/net/core/dap_timerfd.c +++ b/dap-sdk/net/core/dap_timerfd.c @@ -108,6 +108,7 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t dap_timerfd_t* l_timerfd = dap_timerfd_create( a_timeout_ms, a_callback, a_callback_arg); if(l_timerfd){ dap_worker_add_events_socket(l_timerfd->events_socket, a_worker); + l_timerfd->worker = a_worker; return l_timerfd; }else{ log_it(L_CRITICAL,"Can't create timer"); @@ -254,52 +255,72 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t return l_timerfd; } -/** - * @brief s_es_callback_timer - * @param a_event_sock - */ -static void s_es_callback_timer(struct dap_events_socket *a_event_sock) +static void s_timerfd_reset(dap_timerfd_t *a_timerfd, dap_events_socket_t *a_event_sock) { - dap_timerfd_t *l_timerfd = a_event_sock->_inheritor; - // run user's callback - if(l_timerfd->callback && l_timerfd->callback(l_timerfd->callback_arg)) { - //printf("\nread() returned %d, %d\n", l_ptiu64, l_read_ret); #if defined DAP_OS_LINUX - struct itimerspec l_ts; - // repeat never - l_ts.it_interval.tv_sec = 0; - l_ts.it_interval.tv_nsec = 0; - // timeout for timer - l_ts.it_value.tv_sec = l_timerfd->timeout_ms / 1000; - l_ts.it_value.tv_nsec = (l_timerfd->timeout_ms % 1000) * 1000000; - if(timerfd_settime(l_timerfd->tfd, 0, &l_ts, NULL) < 0) { - log_it(L_WARNING, "callback_timerfd_read() failed: timerfd_settime() errno=%d\n", errno); - } + struct itimerspec l_ts; + // repeat never + l_ts.it_interval.tv_sec = 0; + l_ts.it_interval.tv_nsec = 0; + // timeout for timer + l_ts.it_value.tv_sec = a_timerfd->timeout_ms / 1000; + l_ts.it_value.tv_nsec = (a_timerfd->timeout_ms % 1000) * 1000000; + if(timerfd_settime(a_timerfd->tfd, 0, &l_ts, NULL) < 0) { + log_it(L_WARNING, "Reset timerfd failed: timerfd_settime() errno=%d\n", errno); + } #elif defined (DAP_OS_BSD) - dap_worker_add_events_socket_unsafe(a_event_sock,a_event_sock->worker); - //struct kevent * l_event = &a_event_sock->kqueue_event; - //EV_SET(l_event, 0, a_event_sock->kqueue_base_filter, a_event_sock->kqueue_base_flags,a_event_sock->kqueue_base_fflags,a_event_sock->kqueue_data,a_event_sock); - //kevent(a_event_sock->worker->kqueue_fd,l_event,1,NULL,0,NULL); + dap_worker_add_events_socket_unsafe(a_event_sock,a_event_sock->worker); +//struct kevent * l_event = &a_event_sock->kqueue_event; +//EV_SET(l_event, 0, a_event_sock->kqueue_base_filter, a_event_sock->kqueue_base_flags,a_event_sock->kqueue_base_fflags,a_event_sock->kqueue_data,a_event_sock); +//kevent(a_event_sock->worker->kqueue_fd,l_event,1,NULL,0,NULL); #elif defined (DAP_OS_WINDOWS) - /*LARGE_INTEGER l_due_time; - l_due_time.QuadPart = (long long)l_timerfd->timeout_ms * _MSEC; - if (!SetWaitableTimer(l_timerfd->th, &l_due_time, 0, TimerAPCb, l_timerfd, false)) { - log_it(L_CRITICAL, "Waitable timer not reset, error %d", GetLastError()); - CloseHandle(l_timerfd->th); - }*/ // Wtf is this entire thing for?... + /*LARGE_INTEGER l_due_time; + l_due_time.QuadPart = (long long)a_timerfd->timeout_ms * _MSEC; + if (!SetWaitableTimer(a_timerfd->th, &l_due_time, 0, TimerAPCb, a_timerfd, false)) { + log_it(L_CRITICAL, "Waitable timer not reset, error %d", GetLastError()); + CloseHandle(a_timerfd->th); + }*/ // Wtf is this entire thing for?... #else -#error "No timer callback realization for your platform" +#error "No timer reset realization for your platform" #endif #ifndef DAP_OS_BSD - dap_events_socket_set_readable_unsafe(a_event_sock, true); + dap_events_socket_set_readable_unsafe(a_event_sock, true); #endif +} +/** + * @brief s_es_callback_timer + * @param a_event_sock + */ +static void s_es_callback_timer(struct dap_events_socket *a_event_sock) +{ + dap_timerfd_t *l_timerfd = a_event_sock->_inheritor; + // run user's callback + if(l_timerfd->callback && l_timerfd->callback(l_timerfd->callback_arg)) { + s_timerfd_reset(l_timerfd, a_event_sock); } else { l_timerfd->events_socket->flags |= DAP_SOCK_SIGNAL_CLOSE; } } +/** + * @brief dap_timerfd_reset + * @param a_tfd + */ +void dap_timerfd_reset(dap_timerfd_t *a_timerfd) +{ + if (!a_timerfd) + return; + dap_events_socket_t *l_sock = NULL; + if (a_timerfd->worker) + l_sock = dap_worker_esocket_find_uuid(a_timerfd->worker, a_timerfd->esocket_uuid); + else if (a_timerfd->proc_thread) + l_sock = a_timerfd->events_socket; + if (l_sock) + s_timerfd_reset(a_timerfd, l_sock); +} + /** * @brief dap_timerfd_stop * @param a_tfd diff --git a/dap-sdk/net/core/include/dap_timerfd.h b/dap-sdk/net/core/include/dap_timerfd.h index 6247112a517a89215e8225426f457c7970c69ba1..9815ab819c19999eb663a9a36951fd12a45fb3ca 100644 --- a/dap-sdk/net/core/include/dap_timerfd.h +++ b/dap-sdk/net/core/include/dap_timerfd.h @@ -53,6 +53,8 @@ typedef struct dap_timerfd { #elif defined(DAP_OS_LINUX) int tfd; //timer file descriptor #endif + dap_worker_t *worker; + dap_proc_thread_t *proc_thread; dap_events_socket_t *events_socket; dap_events_socket_uuid_t esocket_uuid; dap_timerfd_callback_t callback; @@ -69,4 +71,4 @@ dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg); dap_timerfd_t* dap_timerfd_start_on_proc_thread(dap_proc_thread_t * a_proc_thread, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg); void dap_timerfd_delete(dap_timerfd_t *l_timerfd); - +void dap_timerfd_reset(dap_timerfd_t *a_timerfd); diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index 689c203b0a738d81c94a06ae416ab47204e14d22..a36776d86067c6d1fe429f41fab013a70d2ce65b 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -646,9 +646,9 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) read_bytes_to=0; if(a_stream->pkt_buf_in_data_size>=(a_stream->pkt_buf_in->hdr.size + sizeof(dap_stream_pkt_hdr_t)) ){ // If we have all the packet in packet buffer if(a_stream->pkt_buf_in_data_size > a_stream->pkt_buf_in->hdr.size + sizeof(dap_stream_pkt_hdr_t)){ // If we have little more data then we need for packet buffer - //log_it(L_WARNING,"Prefilled packet buffer has %u bytes more than we need, they're lost",a_stream->pkt_buf_in_data_size-a_stream->pkt_buf_in->hdr.size); + log_it(L_WARNING,"Prefilled packet buffer has %zu bytes more than we need, it's lost",a_stream->pkt_buf_in_data_size-a_stream->pkt_buf_in->hdr.size); + DAP_DEL_Z(a_stream->pkt_buf_in); a_stream->pkt_buf_in_data_size = 0; - a_stream->pkt_buf_in = NULL; } else{ s_stream_proc_pkt_in(a_stream); @@ -692,24 +692,24 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) found_sig=true; //dap_stream_pkt_t *temp_pkt = dap_stream_pkt_detect( (uint8_t*)pkt + 1 ,pkt->hdr.size+sizeof(stream_pkt_hdr_t) ); + size_t l_pkt_size = pkt->hdr.size + sizeof(dap_stream_pkt_hdr_t); if(bytes_left_to_read >= sizeof (dap_stream_pkt_t)){ - if(bytes_left_to_read <(pkt->hdr.size+sizeof(dap_stream_pkt_t) )){ // Is all the packet in da buf? + if (bytes_left_to_read < l_pkt_size) { // Is all the packet in da buf? read_bytes_to=bytes_left_to_read; }else{ - read_bytes_to=pkt->hdr.size+sizeof(dap_stream_pkt_t); + read_bytes_to = l_pkt_size; } } //log_it(L_DEBUG, "Detected packet signature pkt->hdr.size=%u read_bytes_to=%u bytes_left_to_read=%u pkt_offset=%u" // ,pkt->hdr.size, read_bytes_to, bytes_left_to_read,pkt_offset); if(read_bytes_to > HEADER_WITH_SIZE_FIELD){ // If we have size field, we can allocate memory - a_stream->pkt_buf_in_size_expected =( pkt->hdr.size+sizeof(dap_stream_pkt_hdr_t)); - size_t pkt_buf_in_size_expected=a_stream->pkt_buf_in_size_expected; - a_stream->pkt_buf_in=(dap_stream_pkt_t *) malloc(pkt_buf_in_size_expected); - if(read_bytes_to>(pkt->hdr.size+sizeof(dap_stream_pkt_hdr_t) )){ + a_stream->pkt_buf_in_size_expected = l_pkt_size; + a_stream->pkt_buf_in = DAP_NEW_SIZE(struct dap_stream_pkt, l_pkt_size); + if (read_bytes_to > l_pkt_size) { //log_it(L_WARNING,"For some strange reasons we have read_bytes_to=%u is bigger than expected pkt length(%u bytes). Dropped %u bytes", // pkt->hdr.size+sizeof(stream_pkt_hdr_t),read_bytes_to- pkt->hdr.size+sizeof(stream_pkt_hdr_t)); - read_bytes_to=(pkt->hdr.size+sizeof(dap_stream_pkt_hdr_t)); + read_bytes_to = l_pkt_size; } if(read_bytes_to>bytes_left_to_read){ //log_it(L_WARNING,"For some strange reasons we have read_bytes_to=%u is bigger that's left in input buffer (%u bytes). Dropped %u bytes", @@ -720,11 +720,11 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream) proc_data+=(read_bytes_to + pkt_offset); bytes_left_to_read-=read_bytes_to; a_stream->pkt_buf_in_data_size=(read_bytes_to); - if(a_stream->pkt_buf_in_data_size==(pkt->hdr.size + sizeof(dap_stream_pkt_hdr_t))){ + if(a_stream->pkt_buf_in_data_size==l_pkt_size){ // log_it(INFO,"All the packet is present in da buffer (hdr.size=%u read_bytes_to=%u buf_in_size=%u)" // ,sid->pkt_buf_in->hdr.size,read_bytes_to,sid->conn->buf_in_size); s_stream_proc_pkt_in(a_stream); - }else if(a_stream->pkt_buf_in_data_size>pkt->hdr.size + sizeof(dap_stream_pkt_hdr_t)){ + }else if(a_stream->pkt_buf_in_data_size>l_pkt_size){ //log_it(L_WARNING,"Input: packet buffer has %u bytes more than we need, they're lost",a_stream->pkt_buf_in_data_size-pkt->hdr.size); }else{ //log_it(L_DEBUG,"Input: Not all stream packet in input (hdr.size=%u read_bytes_to=%u)",a_stream->pkt_buf_in->hdr.size,read_bytes_to); @@ -820,13 +820,8 @@ static void s_stream_proc_pkt_in(dap_stream_t * a_stream) memcpy(l_ret_pkt.sig, c_dap_stream_sig, sizeof(l_ret_pkt.sig)); dap_events_socket_write_unsafe(a_stream->esocket, &l_ret_pkt, sizeof(l_ret_pkt)); // Reset client keepalive timer - if (a_stream->keepalive_timer && a_stream->keepalive_timer->events_socket->worker) { - void *l_arg = a_stream->keepalive_timer->callback_arg; - dap_timerfd_delete(a_stream->keepalive_timer); - a_stream->keepalive_timer = dap_timerfd_start_on_worker(a_stream->stream_worker->worker, - STREAM_KEEPALIVE_TIMEOUT * 1000, - (dap_timerfd_callback_t)s_callback_keepalive, - l_arg); + if (a_stream->keepalive_timer) { + dap_timerfd_reset(a_stream->keepalive_timer); } } break; case STREAM_PKT_TYPE_ALIVE: diff --git a/modules/chain/dap_chain_cell.c b/modules/chain/dap_chain_cell.c index 5d8756db08e9c0b3192169a873a21f218cf8613f..ad8c8ebbc92da8b9c209cc4509da400fa95b6f67 100644 --- a/modules/chain/dap_chain_cell.c +++ b/modules/chain/dap_chain_cell.c @@ -213,7 +213,7 @@ int dap_chain_cell_load(dap_chain_t * a_chain, const char * a_cell_file_path) unsigned long l_read = fread(l_element, 1, l_el_size, l_f); if(l_read == l_el_size) { dap_chain_atom_verify_res_t l_res = a_chain->callback_atom_add(a_chain, l_element, l_el_size); // !!! blocking GDB call !!! - if (l_res == ATOM_PASS && l_res == ATOM_REJECT) { + if (l_res == ATOM_PASS || l_res == ATOM_REJECT) { DAP_DELETE(l_element); } ++q; diff --git a/modules/chain/dap_chain_ledger.c b/modules/chain/dap_chain_ledger.c index fe9a950693dbd5072953f6b20c41eadbcee98a41..cebd13bf91905204568363f090ba0e9135c1365e 100644 --- a/modules/chain/dap_chain_ledger.c +++ b/modules/chain/dap_chain_ledger.c @@ -1282,7 +1282,7 @@ int dap_chain_ledger_token_emission_add_check(dap_ledger_t *a_ledger, byte_t *a_ */ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, byte_t *a_token_emission, size_t a_token_emission_size) { - int ret = 0; + int l_ret = 0; dap_ledger_private_t *l_ledger_priv = PVT(a_ledger); const char * c_token_ticker = ((dap_chain_datum_token_emission_t *)a_token_emission)->hdr.ticker; @@ -1321,6 +1321,7 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, byte_t *a_token_ } else { HASH_ADD(hh, l_ledger_priv->treshold_emissions, datum_token_emission_hash, sizeof(l_token_emission_hash), l_token_emission_item); + l_ret = DAP_CHAIN_CS_VERIFY_CODE_TX_NO_TOKEN; } pthread_rwlock_unlock( l_token_item ? &l_token_item->token_emissions_rwlock : &l_ledger_priv->treshold_emissions_rwlock); @@ -1354,7 +1355,7 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, byte_t *a_token_ if(s_debug_more) log_it(L_WARNING,"Treshold for emissions is overfulled (%zu max), dropping down new data, added nothing", s_treshold_emissions_max); - ret = -2; + l_ret = -2; } } else { if (l_token_item) { @@ -1367,10 +1368,10 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, byte_t *a_token_ ((dap_chain_datum_token_emission_t *)a_token_emission)->hdr.value, c_token_ticker, l_hash_str); } } - ret = -1; + l_ret = -1; } DAP_DELETE(l_hash_str); - return ret; + return l_ret; } int dap_chain_ledger_token_emission_load(dap_ledger_t *a_ledger, byte_t *a_token_emission, size_t a_token_emission_size) diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 6437457e18f80ce1069331d1e0b939dc76d3c686..d1ba56f8df0149a01aa26a57d3396e6b4b8acd71 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -127,7 +127,7 @@ int dap_stream_ch_chain_init() s_stream_ch_packet_out); s_debug_more = dap_config_get_item_bool_default(g_config,"stream_ch_chain","debug_more",false); s_update_pack_size = dap_config_get_item_int16_default(g_config,"stream_ch_chain","update_pack_size",100); - s_list_ban_groups = dap_config_get_array_str(g_config, "general", "ban_list_sync_groups", &s_size_ban_groups); + s_list_ban_groups = dap_config_get_array_str(g_config, "stream_ch_chain", "ban_list_sync_groups", &s_size_ban_groups); return 0; } @@ -150,7 +150,7 @@ void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg) UNUSED(a_arg); a_ch->internal = DAP_NEW_Z(dap_stream_ch_chain_t); dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); - l_ch_chain->ch = a_ch; + l_ch_chain->_inheritor = a_ch; } /** @@ -339,7 +339,7 @@ static void s_sync_out_gdb_first_worker_callback(dap_worker_t *a_worker, void *a l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); if (s_debug_more ) log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB"); - dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch , DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, + dap_stream_ch_chain_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, &l_node_addr, sizeof(dap_chain_node_addr_t)); if(l_ch_chain->callback_notify_packet_out) @@ -371,7 +371,7 @@ static void s_sync_out_gdb_last_worker_callback(dap_worker_t *a_worker, void *a_ if (s_debug_more ) log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB"); - dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + dap_stream_ch_chain_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); l_ch_chain->state = CHAIN_STATE_IDLE; @@ -724,7 +724,8 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) } } // save data to global_db - if(!dap_chain_global_db_obj_save(dap_store_obj_copy(l_obj, 1), 1)) { + dap_store_obj_t *l_obj_copy = dap_store_obj_copy(l_obj, 1); + if(!dap_chain_global_db_obj_save(l_obj_copy, 1)) { struct sync_request *l_sync_req_err = DAP_DUP(l_sync_request); dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_gdb_in_pkt_error_worker_callback, l_sync_req_err); @@ -732,6 +733,8 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) if (s_debug_more) log_it(L_DEBUG, "Added new GLOBAL_DB synchronization record"); } + DAP_DELETE(l_obj_copy->group); + DAP_DELETE(l_obj_copy); } if(l_store_obj) { dap_store_obj_free(l_store_obj, l_data_obj_count); @@ -772,6 +775,53 @@ static void s_stream_ch_write_error_unsafe(dap_stream_ch_t *a_ch, uint64_t a_net dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, a_net_id, a_chain_id, a_cell_id, a_err_string); } +static bool s_chain_timer_callback(void *a_arg) +{ + dap_worker_t *l_worker = dap_events_get_current_worker(dap_events_get_default()); + dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(l_worker), *(dap_stream_ch_uuid_t *)a_arg); + if (!l_ch) { + DAP_DELETE(a_arg); + return false; + } + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + if (!l_ch_chain->was_active) { + if (l_ch_chain->state != CHAIN_STATE_IDLE) { + dap_stream_ch_chain_go_idle(l_ch_chain); + } + DAP_DELETE(a_arg); + l_ch_chain->activity_timer = NULL; + return false; + } + l_ch_chain->was_active = false; + // Sending dumb packet with nothing to inform remote thats we're just skiping atoms of GDB's, nothing freezed + if (l_ch_chain->state == CHAIN_STATE_SYNC_CHAINS) + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD, + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); + if (l_ch_chain->state == CHAIN_STATE_SYNC_GLOBAL_DB || l_ch_chain->state == CHAIN_STATE_UPDATE_GLOBAL_DB) { + if (s_debug_more) + log_it(L_INFO, "Send one global_db TSD packet (rest=%zu/%zu items)", + dap_db_log_list_get_count_rest(l_ch_chain->request_db_log), + dap_db_log_list_get_count(l_ch_chain->request_db_log)); + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD, + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); + } + return true; +} + +static void s_chain_timer_reset(dap_stream_ch_chain_t *a_ch_chain) +{ + if (a_ch_chain->state == CHAIN_STATE_IDLE) + return; + if (!a_ch_chain->activity_timer) { + dap_stream_ch_uuid_t *l_uuid = DAP_DUP(&DAP_STREAM_CH(a_ch_chain)->uuid); + a_ch_chain->activity_timer = dap_timerfd_start_on_worker(DAP_STREAM_CH(a_ch_chain)->stream_worker->worker, + 3000, s_chain_timer_callback, (void *)l_uuid); + } else + a_ch_chain->was_active = true; +} + /** * @brief s_stream_ch_packet_in * @param a_ch @@ -795,6 +845,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) sizeof(l_chain_pkt->hdr)); } + s_chain_timer_reset(l_ch_chain); size_t l_chain_pkt_data_size = l_ch_pkt->hdr.size-sizeof (l_chain_pkt->hdr) ; uint16_t l_acl_idx = dap_chain_net_acl_idx_by_id(l_chain_pkt->hdr.net_id ); @@ -1320,7 +1371,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_error_str[l_chain_pkt_data_size-1]='\0'; // To be sure that nobody sends us garbage // without trailing zero log_it(L_WARNING,"In from remote addr %s chain id 0x%016"DAP_UINT64_FORMAT_x" got error on his side: '%s'", - l_ch_chain->ch->stream->esocket->remote_addr_str ? l_ch_chain->ch->stream->esocket->remote_addr_str: "<no addr>", + DAP_STREAM_CH(l_ch_chain)->stream->esocket->remote_addr_str ? DAP_STREAM_CH(l_ch_chain)->stream->esocket->remote_addr_str: "<no addr>", l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt_data_size > 1 ? l_error_str:"<empty>"); } break; @@ -1388,26 +1439,32 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) return; dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); + s_chain_timer_reset(l_ch_chain); + switch (l_ch_chain->state) { // Update list of global DB records to remote case CHAIN_STATE_UPDATE_GLOBAL_DB: { dap_stream_ch_chain_update_element_t l_data[s_update_pack_size]; uint_fast16_t i; + dap_db_log_list_obj_t *l_obj = NULL; for (i = 0; i < s_update_pack_size; i++) { - dap_db_log_list_obj_t *l_obj = dap_db_log_list_get(l_ch_chain->request_db_log); - if (!l_obj) + l_obj = dap_db_log_list_get(l_ch_chain->request_db_log); + if (!l_obj || DAP_POINTER_TO_INT(l_obj) == 1) break; memcpy(&l_data[i].hash, &l_obj->hash, sizeof(dap_chain_hash_fast_t)); l_data[i].size = l_obj->pkt->data_size; } if (i) { - dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB, + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB, l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, l_data, i * sizeof(dap_stream_ch_chain_update_element_t)); l_ch_chain->stats_request_gdb_processed += i; if (s_debug_more) log_it(L_INFO, "Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB"); + } else if (l_obj) { + // We need to return into the write callback + a_ch->stream->esocket->buf_out_zero_count = 0; } else { l_ch_chain->request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(dap_chain_net_by_id( l_ch_chain->request_hdr.net_id)); @@ -1430,8 +1487,10 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) size_t l_pkt_size = 0; for (uint_fast16_t l_skip_count = 0; l_skip_count < s_skip_in_reactor_count; ) { l_obj = dap_db_log_list_get(l_ch_chain->request_db_log); - if (!l_obj) + if (!l_obj || DAP_POINTER_TO_INT(l_obj) == 1) { + l_skip_count = s_skip_in_reactor_count; break; + } dap_stream_ch_chain_hash_item_t *l_hash_item = NULL; unsigned l_hash_item_hashv = 0; HASH_VALUE(&l_obj->hash, sizeof(dap_chain_hash_fast_t), l_hash_item_hashv); @@ -1460,7 +1519,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) } if (l_pkt_size) { // If request was from defined node_addr we update its state - if( s_debug_more) + if (s_debug_more) log_it(L_INFO, "Send one global_db packet len=%zu (rest=%zu/%zu items)", l_pkt_size, dap_db_log_list_get_count_rest(l_ch_chain->request_db_log), dap_db_log_list_get_count(l_ch_chain->request_db_log)); @@ -1469,10 +1528,8 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->request_hdr.cell_id.uint64, l_pkt, l_pkt_size); DAP_DELETE(l_pkt); } else if (l_obj) { - // Sending dumb packet with nothing to inform remote thats we're just skiping GDBs, nothing freezed - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD, - l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, - l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); + // We need to return into the write callback + a_ch->stream->esocket->buf_out_zero_count = 0; } else { log_it( L_INFO,"Syncronized database: items syncronyzed %"DAP_UINT64_FORMAT_U" from %zu", l_ch_chain->stats_request_gdb_processed, dap_db_log_list_get_count(l_ch_chain->request_db_log)); @@ -1583,10 +1640,8 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) 0, l_ch_chain->callback_notify_arg); } if (! l_was_sent_smth ){ - // Sending dumb packet with nothing to inform remote thats we're just skiping atoms, nothing freezed - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD, - l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, - l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); + // We need to return into the write callback + a_ch->stream->esocket->buf_out_zero_count = 0; } } break; default: break; diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index 5fd46f5c682158bb1b8c4b9323297161383170e6..fa63d53c2657cbb7ecd3e26ffdf8d2a49f0fbdea 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -58,7 +58,7 @@ typedef struct dap_stream_ch_chain_hash_item{ typedef struct dap_stream_ch_chain { - dap_stream_ch_t * ch; + void *_inheritor; dap_stream_ch_chain_state_t state; dap_chain_node_client_t * node_client; // Node client associated with stream @@ -75,12 +75,16 @@ typedef struct dap_stream_ch_chain { dap_stream_ch_chain_pkt_hdr_t request_hdr; dap_list_t *request_db_iter; + bool was_active; + dap_timerfd_t *activity_timer; + dap_stream_ch_chain_callback_packet_t callback_notify_packet_out; dap_stream_ch_chain_callback_packet_t callback_notify_packet_in; void *callback_notify_arg; } dap_stream_ch_chain_t; #define DAP_STREAM_CH_CHAIN(a) ((dap_stream_ch_chain_t *) ((a)->internal) ) +#define DAP_STREAM_CH(a) ((dap_stream_ch_t *)((a)->_inheritor)) #define DAP_CHAIN_PKT_EXPECT_SIZE 7168 int dap_stream_ch_chain_init(void); diff --git a/modules/global-db/dap_chain_global_db_driver.c b/modules/global-db/dap_chain_global_db_driver.c index fe0a48daa53f88a66b4894d8a166d536c084504d..3e3176a1af810cb4a354edebbc02bcaf1829504e 100644 --- a/modules/global-db/dap_chain_global_db_driver.c +++ b/modules/global-db/dap_chain_global_db_driver.c @@ -198,8 +198,7 @@ dap_store_obj_t* dap_store_obj_copy(dap_store_obj_t *a_store_obj, size_t a_store memcpy(l_store_obj_dst, l_store_obj_src, sizeof(dap_store_obj_t)); l_store_obj_dst->group = dap_strdup(l_store_obj_src->group); l_store_obj_dst->key = dap_strdup(l_store_obj_src->key); - l_store_obj_dst->value = DAP_NEW_SIZE(uint8_t, l_store_obj_dst->value_len); - memcpy(l_store_obj_dst->value, l_store_obj_src->value, l_store_obj_dst->value_len); + l_store_obj_dst->value = DAP_DUP_SIZE(l_store_obj_src->value, l_store_obj_src->value_len); } return l_store_obj; } diff --git a/modules/global-db/dap_chain_global_db_driver_cdb.c b/modules/global-db/dap_chain_global_db_driver_cdb.c index 699f07942d106f464d4b3f4459bf7328c1acf8ec..d3a6d25208d30065105063b942048d083f87570a 100644 --- a/modules/global-db/dap_chain_global_db_driver_cdb.c +++ b/modules/global-db/dap_chain_global_db_driver_cdb.c @@ -83,12 +83,12 @@ static void cdb_serialize_val_to_dap_store_obj(pdap_store_obj_t a_obj, const cha a_obj->key = dap_strdup(key); a_obj->id = dap_hex_to_uint(val, sizeof(uint64_t)); offset += sizeof(uint64_t); - a_obj->value_len = dap_hex_to_uint(val + offset, sizeof(unsigned long)); - offset += sizeof(unsigned long); + a_obj->value_len = dap_hex_to_uint(val + offset, sizeof(uint64_t)); + offset += sizeof(uint64_t); a_obj->value = DAP_NEW_SIZE(uint8_t, a_obj->value_len); memcpy(a_obj->value, val + offset, a_obj->value_len); offset += a_obj->value_len; - a_obj->timestamp = (time_t)dap_hex_to_uint(val + offset, sizeof(time_t)); + a_obj->timestamp = dap_hex_to_uint(val + offset, sizeof(uint64_t)); } /** A callback function designed for finding a last item */ @@ -159,11 +159,11 @@ bool dap_cdb_get_count_iter_callback(void *arg, const char *key, int ksize, cons return true; } -/** +/** * @brief Initiates a CDB with main hash table size: 1000000, * record cache: 128Mb, index page cache: 1024Mb. - * @param a_group a group name - * @param a_flags should be combination of CDB_CREAT / CDB_TRUNC / CDB_PAGEWARMUP + * @param a_group a group name + * @param a_flags should be combination of CDB_CREAT / CDB_TRUNC / CDB_PAGEWARMUP CDB_PAGEWARMUP * @return A pointer to CDB, if success. NULL, if error. */ @@ -255,7 +255,7 @@ int dap_db_driver_cdb_init(const char *a_cdb_path, dap_db_driver_callbacks_t *a_ if (!S_ISDIR(buf.st_mode) || !res) { continue; } -#elif defined (DAP_OS_BSD) +#elif defined (DAP_OS_BSD) struct stat buf; int res = stat(d->d_name, &buf); if (!S_ISDIR(buf.st_mode) || !res) { @@ -290,7 +290,7 @@ int dap_db_driver_cdb_init(const char *a_cdb_path, dap_db_driver_callbacks_t *a_ * @brief Gets CDB by a_group. * @param a_group a group name * @return if CDB is found, a pointer to CDB, otherwise NULL. - */ + */ pcdb_instance dap_cdb_get_db_by_group(const char *a_group) { pcdb_instance l_cdb_i = NULL; pthread_rwlock_rdlock(&cdb_rwlock); @@ -355,7 +355,7 @@ int dap_db_driver_cdb_flush(void) { * @brief Read last store item from CDB. * @param a_group a group name * @return If successful, a pointer to item, otherwise NULL. - */ + */ dap_store_obj_t *dap_db_driver_cdb_read_last_store_obj(const char* a_group) { if (!a_group) { return NULL; @@ -382,7 +382,7 @@ dap_store_obj_t *dap_db_driver_cdb_read_last_store_obj(const char* a_group) { * @param a_group the group name * @param a_key the key * @return true or false - */ + */ bool dap_db_driver_cdb_is_obj(const char *a_group, const char *a_key) { bool l_ret = false; @@ -408,7 +408,7 @@ bool dap_db_driver_cdb_is_obj(const char *a_group, const char *a_key) * @param a_key the key or NULL * @param a_count_out IN. Count of read items. OUT Count of items was read * @return If successful, pointer to items; otherwise NULL. - */ + */ dap_store_obj_t *dap_db_driver_cdb_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out) { if (!a_group) { return NULL; @@ -468,7 +468,7 @@ dap_store_obj_t *dap_db_driver_cdb_read_store_obj(const char *a_group, const cha * @param a_count_out[in] a count of items * @param a_count[out] a count of items were got * @return If successful, pointer to items, otherwise NULL. - */ + */ dap_store_obj_t* dap_db_driver_cdb_read_cond_store_obj(const char *a_group, uint64_t a_id, size_t *a_count_out) { if (!a_group) { return NULL; @@ -520,7 +520,7 @@ dap_store_obj_t* dap_db_driver_cdb_read_cond_store_obj(const char *a_group, uint * @param a_group the group name * @param a_id id * @return If successful, count of store items; otherwise 0. - */ + */ size_t dap_db_driver_cdb_read_count_store(const char *a_group, uint64_t a_id) { if (!a_group) { @@ -595,19 +595,18 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) { cdb_record l_rec; l_rec.key = a_store_obj->key; //dap_strdup(a_store_obj->key); int offset = 0; - char *l_val = DAP_NEW_Z_SIZE(char, sizeof(uint64_t) + sizeof(unsigned long) + a_store_obj->value_len + sizeof(time_t)); + char *l_val = DAP_NEW_Z_SIZE(char, sizeof(uint64_t) + sizeof(uint64_t) + a_store_obj->value_len + sizeof(uint64_t)); dap_uint_to_hex(l_val, ++l_cdb_i->id, sizeof(uint64_t)); offset += sizeof(uint64_t); - dap_uint_to_hex(l_val + offset, a_store_obj->value_len, sizeof(unsigned long)); - offset += sizeof(unsigned long); + dap_uint_to_hex(l_val + offset, a_store_obj->value_len, sizeof(uint64_t)); + offset += sizeof(uint64_t); if(a_store_obj->value && a_store_obj->value_len){ memcpy(l_val + offset, a_store_obj->value, a_store_obj->value_len); DAP_DELETE(a_store_obj->value); } offset += a_store_obj->value_len; - unsigned long l_time = (unsigned long)a_store_obj->timestamp; - dap_uint_to_hex(l_val + offset, l_time, sizeof(time_t)); - offset += sizeof(time_t); + dap_uint_to_hex(l_val + offset, a_store_obj->timestamp, sizeof(uint64_t)); + offset += sizeof(uint64_t); l_rec.val = l_val; if (cdb_set2(l_cdb_i->cdb, l_rec.key, (int)strlen(l_rec.key), l_rec.val, offset, CDB_INSERTCACHE | CDB_OVERWRITE, 0) != CDB_SUCCESS) { log_it(L_ERROR, "Couldn't add record with key [%s] to CDB: \"%s\"", l_rec.key, cdb_errmsg(cdb_errno(l_cdb_i->cdb))); diff --git a/modules/global-db/dap_chain_global_db_hist.c b/modules/global-db/dap_chain_global_db_hist.c index 8fffb0664b539825a57e648ea51d7993aec8bedd..25691ff0efb30528441dc5b740b5ddb292bd3e97 100644 --- a/modules/global-db/dap_chain_global_db_hist.c +++ b/modules/global-db/dap_chain_global_db_hist.c @@ -276,33 +276,34 @@ dap_db_log_list_t* dap_db_log_list_start(dap_chain_node_addr_t a_addr, int a_fla } dap_list_free(l_groups_masks); - static int l_try_read_ban_list = 0; + + static uint16_t s_size_ban_list = 0; + static char **s_ban_list = NULL; + static bool l_try_read_ban_list = false; if (!l_try_read_ban_list) { - s_ban_list = dap_config_get_array_str(g_config, "general", "ban_list_sync_groups", &s_size_ban_list); - l_try_read_ban_list = 1; + s_ban_list = dap_config_get_array_str(g_config, "stream_ch_chain", "ban_list_sync_groups", &s_size_ban_list); + l_try_read_ban_list = true; } /* delete groups from ban list */ if (s_size_ban_list > 0) { for (dap_list_t *l_groups = l_dap_db_log_list->groups; l_groups; ) { - int found = 0; + bool l_found = false; for (int i = 0; i < s_size_ban_list; i++) { - if (dap_fnmatch(s_ban_list[i], l_groups->data, FNM_NOESCAPE)) { + if (!dap_fnmatch(s_ban_list[i], l_groups->data, FNM_NOESCAPE)) { dap_list_t *l_tmp = l_groups->next; - dap_list_delete_link(l_dap_db_log_list->groups, l_groups); + l_dap_db_log_list->groups = dap_list_delete_link(l_dap_db_log_list->groups, l_groups); l_groups = l_tmp; - found = 1; + l_found = true; break; } } - if (found) continue; + if (l_found) continue; l_groups = dap_list_next(l_groups); } } - - for (dap_list_t *l_groups = l_dap_db_log_list->groups; l_groups; l_groups = dap_list_next(l_groups)) { dap_db_log_list_group_t *l_replace = DAP_NEW_Z(dap_db_log_list_group_t); l_replace->name = (char *)l_groups->data; @@ -367,33 +368,19 @@ size_t dap_db_log_list_get_count_rest(dap_db_log_list_t *a_db_log_list) */ dap_db_log_list_obj_t *dap_db_log_list_get(dap_db_log_list_t *a_db_log_list) { - if(!a_db_log_list) + if (!a_db_log_list) return NULL; - dap_list_t *l_list; - bool l_is_process; - int l_count = 0; - while(1) { - pthread_mutex_lock(&a_db_log_list->list_mutex); - l_is_process = a_db_log_list->is_process; - // check next item - l_list = a_db_log_list->list_read; - if (l_list){ - a_db_log_list->list_read = dap_list_next(a_db_log_list->list_read); - a_db_log_list->items_rest--; - } - pthread_mutex_unlock(&a_db_log_list->list_mutex); - // wait reading next item, no more 1 sec (50 ms * 100 times) - if(!l_list && l_is_process) { - dap_usleep(DAP_USEC_PER_SEC / 200); - l_count++; - if(l_count > 100) - break; - } - else - break; + pthread_mutex_lock(&a_db_log_list->list_mutex); + int l_is_process = a_db_log_list->is_process; + // check next item + dap_list_t *l_list = a_db_log_list->list_read; + if (l_list){ + a_db_log_list->list_read = dap_list_next(a_db_log_list->list_read); + a_db_log_list->items_rest--; } + pthread_mutex_unlock(&a_db_log_list->list_mutex); //log_it(L_DEBUG, "get item n=%d", a_db_log_list->items_number - a_db_log_list->items_rest); - return l_list ? (dap_db_log_list_obj_t *)l_list->data : NULL; + return l_list ? (dap_db_log_list_obj_t *)l_list->data : DAP_INT_TO_POINTER(l_is_process); } /** diff --git a/modules/global-db/dap_chain_global_db_remote.c b/modules/global-db/dap_chain_global_db_remote.c index e1c5d27c1309c8cfacb2a4938f3cb622d5865d42..53bec9f4efe378aa2e0f7f0d0544b9178cdcf544 100644 --- a/modules/global-db/dap_chain_global_db_remote.c +++ b/modules/global-db/dap_chain_global_db_remote.c @@ -255,20 +255,20 @@ dap_store_obj_pkt_t *dap_store_packet_single(pdap_store_obj_t a_store_obj) uint32_t l_type = a_store_obj->type; memcpy(l_pkt->data, &l_type, sizeof(uint32_t)); uint64_t l_offset = sizeof(uint32_t); - uint16_t group_size = (uint16_t) dap_strlen(a_store_obj->group); - memcpy(l_pkt->data + l_offset, &group_size, sizeof(uint16_t)); + uint16_t l_group_size = (uint16_t) dap_strlen(a_store_obj->group); + memcpy(l_pkt->data + l_offset, &l_group_size, sizeof(uint16_t)); l_offset += sizeof(uint16_t); - memcpy(l_pkt->data + l_offset, a_store_obj->group, group_size); - l_offset += group_size; + memcpy(l_pkt->data + l_offset, a_store_obj->group, l_group_size); + l_offset += l_group_size; memcpy(l_pkt->data + l_offset, &a_store_obj->id, sizeof(uint64_t)); l_offset += sizeof(uint64_t); - memcpy(l_pkt->data + l_offset, &a_store_obj->timestamp, sizeof(time_t)); - l_offset += sizeof(time_t); - uint16_t key_size = (uint16_t) dap_strlen(a_store_obj->key); - memcpy(l_pkt->data + l_offset, &key_size, sizeof(uint16_t)); + memcpy(l_pkt->data + l_offset, &a_store_obj->timestamp, sizeof(uint64_t)); + l_offset += sizeof(uint64_t); + uint16_t l_key_size = (uint16_t) dap_strlen(a_store_obj->key); + memcpy(l_pkt->data + l_offset, &l_key_size, sizeof(uint16_t)); l_offset += sizeof(uint16_t); - memcpy(l_pkt->data + l_offset, a_store_obj->key, key_size); - l_offset += key_size; + memcpy(l_pkt->data + l_offset, a_store_obj->key, l_key_size); + l_offset += l_key_size; memcpy(l_pkt->data + l_offset, &a_store_obj->value_len, sizeof(uint64_t)); l_offset += sizeof(uint64_t); memcpy(l_pkt->data + l_offset, a_store_obj->value, a_store_obj->value_len); @@ -314,9 +314,9 @@ dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *pkt, siz memcpy(&obj->id, pkt->data + offset, sizeof(uint64_t)); offset += sizeof(uint64_t); - if (offset+sizeof (time_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'timestamp' field"); break;} // Check for buffer boundries - memcpy(&obj->timestamp, pkt->data + offset, sizeof(time_t)); - offset += sizeof(time_t); + if (offset+sizeof (uint64_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'timestamp' field"); break;} // Check for buffer boundries + memcpy(&obj->timestamp, pkt->data + offset, sizeof(uint64_t)); + offset += sizeof(uint64_t); if (offset+sizeof (uint16_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'key_length' field"); break;} // Check for buffer boundries memcpy(&str_length, pkt->data + offset, sizeof(uint16_t)); @@ -337,7 +337,7 @@ dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *pkt, siz memcpy(obj->value, pkt->data + offset, obj->value_len); offset += obj->value_len; } - //assert(pkt->data_size == offset); + assert(pkt->data_size == offset); if(store_obj_count) *store_obj_count = count; return store_obj; diff --git a/modules/global-db/include/dap_chain_global_db_driver.h b/modules/global-db/include/dap_chain_global_db_driver.h index ecf75715ecb4423ec15d2d0cc05776b0a7b97c70..af3174367a78d9829d843eb7398a8cbff2055337 100644 --- a/modules/global-db/include/dap_chain_global_db_driver.h +++ b/modules/global-db/include/dap_chain_global_db_driver.h @@ -31,13 +31,13 @@ typedef struct dap_store_obj { uint64_t id; - time_t timestamp; + uint64_t timestamp; uint8_t type; char *group; char *key; const char *c_key; uint8_t *value; - size_t value_len; + uint64_t value_len; }DAP_ALIGN_PACKED dap_store_obj_t, *pdap_store_obj_t; typedef struct dap_store_obj_pkt { diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 4fc0eed88ad34ece90a3ce75ed84c04115fcd8ec..c9c0c18513d591ec3d67ec46039f375511b57b49 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -224,7 +224,8 @@ static const dap_chain_node_client_callbacks_t s_node_link_callbacks={ .connected=s_node_link_callback_connected, .disconnected=s_node_link_callback_disconnected, .stage=s_node_link_callback_stage, - .error=s_node_link_callback_error + .error=s_node_link_callback_error, + .delete=s_node_link_callback_delete }; @@ -629,29 +630,34 @@ static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client, */ static void s_node_link_callback_delete(dap_chain_node_client_t * a_node_client, void * a_arg) { - log_it(L_DEBUG,"Remove node client from list"); dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg; dap_chain_net_pvt_t * l_net_pvt = PVT(l_net); + if (!a_node_client->keep_connection) { + dap_notify_server_send_f_mt("{" + "class:\"NetLinkDelete\"," + "net_id:0x%016" DAP_UINT64_FORMAT_X "," + "cell_id:0x%016"DAP_UINT64_FORMAT_X"," + "address:\""NODE_ADDR_FP_STR"\"" + "}\n", a_node_client->net->pub.id.uint64, a_node_client->info->hdr.cell_id.uint64, + NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address)); + return; + } pthread_rwlock_wrlock(&l_net_pvt->rwlock); for ( dap_list_t * it = l_net_pvt->links; it; it=it->next ){ - dap_chain_node_client_t * l_client =(dap_chain_node_client_t *) it->data; - // Cut out current iterator if it equals with deleting handler - if (l_client == a_node_client){ - if (it->prev) - it->prev->next = it->next; - if (it->next) - it->next->prev = it->prev; + if (it->data == a_node_client) { + log_it(L_DEBUG,"Replace node client with new one"); + it->data = dap_chain_net_client_create_n_connect(l_net, a_node_client->info); } } pthread_rwlock_unlock(&l_net_pvt->rwlock); dap_notify_server_send_f_mt("{" - "class:\"NetLinkDelete\"," + "class:\"NetLinkRestart\"," "net_id:0x%016" DAP_UINT64_FORMAT_X "," "cell_id:0x%016"DAP_UINT64_FORMAT_X"," "address:\""NODE_ADDR_FP_STR"\"" - "}\n", a_node_client->net->pub.id.uint64, a_node_client->info->hdr.cell_id.uint64, + "}\n", a_node_client->net->pub.id.uint64, a_node_client->info->hdr.cell_id.uint64, NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address)); - dap_chain_node_client_close(a_node_client); + // Then a_alient wiil be destroyed in a right way } /** @@ -834,6 +840,7 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) dap_list_t *l_tmp = l_net_pvt->links; while (l_tmp) { dap_list_t *l_next =l_tmp->next; + ((dap_chain_node_client_t *)l_tmp->data)->keep_connection = false; dap_chain_node_client_close(l_tmp->data); DAP_DELETE(l_tmp); l_tmp = l_next; @@ -849,6 +856,7 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) break; } // disable SYNC_GDB + l_net_pvt->active_link = NULL; l_net_pvt->flags &= ~F_DAP_CHAIN_NET_GO_SYNC; l_net_pvt->last_sync = 0; } break; @@ -1022,7 +1030,8 @@ bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t if (l_links->data == l_net_pvt->active_link) { dap_chain_node_client_t *l_client = (dap_chain_node_client_t *)l_links->data; if (l_client->state >= NODE_CLIENT_STATE_ESTABLISHED && - l_client->state < NODE_CLIENT_STATE_SYNCED) { + l_client->state < NODE_CLIENT_STATE_SYNCED && + a_client != l_client) { l_found = true; break; } @@ -1036,11 +1045,14 @@ bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t return !l_found; } -void dap_chain_net_sync_unlock(dap_chain_net_t *a_net) +void dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client) { + if (!a_net) + return; dap_chain_net_pvt_t *l_net_pvt = PVT(a_net); pthread_rwlock_rdlock(&l_net_pvt->rwlock); - l_net_pvt->active_link = NULL; + if (!a_client || l_net_pvt->active_link == a_client) + l_net_pvt->active_link = NULL; pthread_rwlock_unlock(&l_net_pvt->rwlock); } /** diff --git a/modules/net/dap_chain_node_cli_cmd.c b/modules/net/dap_chain_node_cli_cmd.c index 07f90acf57d35ff8d772f188606f5caa40b64cb9..232ee6c87f33b8230ffca7cd8bcea91e02aa172c 100644 --- a/modules/net/dap_chain_node_cli_cmd.c +++ b/modules/net/dap_chain_node_cli_cmd.c @@ -4701,7 +4701,7 @@ SIGNER_COUNT static char *s_strdup_by_index (const char *a_file, const int a_index); static dap_tsd_t *s_alloc_metadata (const char *a_file, const int a_meta); -static uint8_t *s_concat_hash_and_mimetypes (dap_chain_hash_fast_t *a_chain, dap_list_t *a_meta_list, int a_index_meta, size_t *a_fullsize); +static uint8_t *s_concat_hash_and_mimetypes (dap_chain_hash_fast_t *a_chain, dap_list_t *a_meta_list, size_t *a_fullsize); /* * dap_sign_file - sign a file with flags. @@ -4758,8 +4758,6 @@ static int s_sign_file(const char *a_filename, dap_sign_signer_file_t a_flags, c l_shift <<= 1; } - int l_ret = 0; - dap_cert_t *l_cert = dap_cert_find_by_name(a_cert_name); if (!l_cert) { DAP_FREE(l_buffer); @@ -4772,7 +4770,7 @@ static int s_sign_file(const char *a_filename, dap_sign_signer_file_t a_flags, c } size_t l_full_size_for_sign; - uint8_t *l_data = s_concat_hash_and_mimetypes (a_hash, l_std_list, l_index_meta, &l_full_size_for_sign); + uint8_t *l_data = s_concat_hash_and_mimetypes(a_hash, l_std_list, &l_full_size_for_sign); if (!l_data) { DAP_FREE(l_buffer); return 0; @@ -4788,18 +4786,15 @@ static int s_sign_file(const char *a_filename, dap_sign_signer_file_t a_flags, c return 1; } -static byte_t *s_concat_meta (dap_list_t *a_meta, int a_index_meta, size_t *a_fullsize) +static byte_t *s_concat_meta (dap_list_t *a_meta, size_t *a_fullsize) { if (a_fullsize) *a_fullsize = 0; - int l_len = 0; - int l_n; int l_part = 256; int l_power = 1; byte_t *l_buf = DAP_CALLOC(l_part * l_power++, 1); - int l_total = l_part; - int l_counter = 0; + size_t l_counter = 0; int l_part_power = l_part; int l_index = 0; @@ -4807,13 +4802,13 @@ static byte_t *s_concat_meta (dap_list_t *a_meta, int a_index_meta, size_t *a_fu if (!l_iter->data) continue; dap_tsd_t * l_tsd = (dap_tsd_t *) l_iter->data; l_index = l_counter; - l_counter += strlen(l_tsd->data); + l_counter += strlen((char *)l_tsd->data); if (l_counter >= l_part_power) { l_part_power = l_part * l_power++; l_buf = (byte_t *) DAP_REALLOC(l_buf, l_part_power); } - memcpy (&l_buf[l_index], l_tsd->data, strlen(l_tsd->data)); + memcpy (&l_buf[l_index], l_tsd->data, strlen((char *)l_tsd->data)); } if (a_fullsize) @@ -4822,10 +4817,10 @@ static byte_t *s_concat_meta (dap_list_t *a_meta, int a_index_meta, size_t *a_fu return l_buf; } -static uint8_t *s_concat_hash_and_mimetypes (dap_chain_hash_fast_t *a_chain_hash, dap_list_t *a_meta_list, int a_index_meta, size_t *a_fullsize) +static uint8_t *s_concat_hash_and_mimetypes (dap_chain_hash_fast_t *a_chain_hash, dap_list_t *a_meta_list, size_t *a_fullsize) { if (!a_fullsize) return NULL; - byte_t *l_buf = s_concat_meta (a_meta_list, a_index_meta, a_fullsize); + byte_t *l_buf = s_concat_meta (a_meta_list, a_fullsize); if (!l_buf) return (uint8_t *) l_buf; size_t l_len_meta_buf = *a_fullsize; @@ -4858,7 +4853,7 @@ static dap_tsd_t *s_alloc_metadata (const char *a_file, const int a_meta) case SIGNER_FILENAME_SHORT: { char *l_filename_short = NULL; - if (l_filename_short = strrchr(a_file, '.')) { + if ((l_filename_short = strrchr(a_file, '.')) != 0) { int l_index_of_latest_point = l_filename_short - a_file; l_filename_short = s_strdup_by_index (a_file, l_index_of_latest_point); if (!l_filename_short) return NULL; @@ -4883,7 +4878,7 @@ static dap_tsd_t *s_alloc_metadata (const char *a_file, const int a_meta) stat (a_file, &l_st); char *l_ctime = ctime(&l_st.st_ctime); char *l = NULL; - if (l = strchr(l_ctime, '\n')) *l = 0; + if ((l = strchr(l_ctime, '\n')) != 0) *l = 0; return dap_tsd_create_string(SIGNER_DATE, l_ctime); } break; diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 819f9ac4e984af9cd1ae9e98eff8901e333a6fdc..f615b7a82da79a7f254608ca06b35a5a2e971d76 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -53,7 +53,7 @@ #include "dap_timerfd.h" #include "dap_hash.h" #include "dap_uuid.h" -//#include "dap_http_client_simple.h" +#include "dap_client.h" #include "dap_client_pvt.h" #include "dap_chain_global_db_remote.h" #include "dap_chain_global_db_hist.h" @@ -164,17 +164,18 @@ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) pthread_mutex_unlock(&l_node_client->wait_mutex); l_node_client->esocket_uuid = 0; + dap_chain_net_sync_unlock(l_node_client->net, l_node_client); if (l_node_client->callbacks.disconnected) { l_node_client->callbacks.disconnected(l_node_client, l_node_client->callbacks_arg); } if (l_node_client->keep_connection) { dap_events_socket_uuid_t *l_uuid = DAP_DUP(&l_node_client->uuid); - dap_timerfd_start_on_worker(l_node_client->stream_worker - ? l_node_client->stream_worker->worker - : dap_events_worker_get_auto(), - s_timer_update_states * 1000, - s_timer_update_states_callback, - l_uuid); + l_node_client->sync_timer = dap_timerfd_start_on_worker(l_node_client->stream_worker + ? l_node_client->stream_worker->worker + : dap_events_worker_get_auto(), + s_timer_update_states * 1000, + s_timer_update_states_callback, + l_uuid); } return; } @@ -224,7 +225,7 @@ static bool s_timer_update_states_callback(void *a_arg) dap_client_t * l_client = dap_client_from_esocket(l_es); if (l_client ) { dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t*) l_client->_inheritor; - if (l_node_client && l_node_client->ch_chain) { + if (l_node_client && l_node_client->ch_chain && l_node_client->stream_worker && l_node_client->ch_chain_uuid) { dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(l_node_client->stream_worker, l_node_client->ch_chain_uuid); if (l_ch) { dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); @@ -249,9 +250,12 @@ static bool s_timer_update_states_callback(void *a_arg) // if we not returned yet l_me->state = NODE_CLIENT_STATE_DISCONNECTED; if (l_me->keep_connection) { - log_it(L_INFO, "Reconnecting node client with peer "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_me->remote_node_addr)); - l_me->state = NODE_CLIENT_STATE_CONNECTING ; - dap_client_go_stage(l_me->client, STAGE_STREAM_STREAMING, s_stage_connected_callback); + if (dap_client_pvt_find(l_me->client->pvt_uuid)) { + log_it(L_INFO, "Reconnecting node client with peer "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_me->remote_node_addr)); + l_me->state = NODE_CLIENT_STATE_CONNECTING ; + dap_client_go_stage(l_me->client, STAGE_STREAM_STREAMING, s_stage_connected_callback); + } else + dap_chain_node_client_close(l_me); } DAP_DELETE(l_uuid); return false; @@ -294,7 +298,10 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg) dap_events_socket_uuid_t *l_uuid = DAP_DUP(&l_node_client->uuid); dap_worker_exec_callback_on(l_stream->esocket->worker, s_node_client_connected_synchro_start_callback, l_uuid); dap_events_socket_uuid_t *l_uuid_timer = DAP_DUP(&l_node_client->uuid); - dap_timerfd_start_on_worker(l_stream->esocket->worker, s_timer_update_states * 1000, s_timer_update_states_callback, l_uuid_timer); + l_node_client->sync_timer = dap_timerfd_start_on_worker(l_stream->esocket->worker, + s_timer_update_states * 1000, + s_timer_update_states_callback, + l_uuid_timer); } } #ifndef _WIN32 @@ -363,6 +370,8 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg) { + UNUSED(a_ch_chain); + UNUSED(a_pkt_data_size); dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg; switch (a_pkt_type) { @@ -451,16 +460,18 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" started to sync %s chain",l_net->pub.name, NODE_ADDR_FP_ARGS(l_node_addr), l_node_client->cur_chain->name ); } - dap_stream_ch_chain_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ, + dap_stream_ch_chain_pkt_write_unsafe(l_node_client->ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ, l_net->pub.id.uint64 , l_chain_id.uint64,l_cell_id.uint64,NULL,0); }else{ // If no - over with sync process dap_chain_node_addr_t * l_node_addr = dap_chain_net_get_cur_addr(l_net); log_it(L_INFO, "In: State node %s."NODE_ADDR_FP_STR" is SYNCED",l_net->pub.name, NODE_ADDR_FP_ARGS(l_node_addr) ); - dap_chain_net_sync_unlock(l_net); + dap_chain_net_sync_unlock(l_net, l_node_client); l_node_client->state = NODE_CLIENT_STATE_SYNCED; - if (dap_chain_net_get_target_state(l_net) == NET_STATE_ONLINE) + if (dap_chain_net_get_target_state(l_net) == NET_STATE_ONLINE) { + dap_timerfd_reset(l_node_client->sync_timer); dap_chain_net_set_state(l_net, NET_STATE_ONLINE); + } else dap_chain_net_state_go_to(l_net, NET_STATE_OFFLINE); #ifndef _WIN32 @@ -741,19 +752,23 @@ void dap_chain_node_client_close(dap_chain_node_client_t *a_client) if (l_client_found) { HASH_DEL(s_clients,l_client_found); DAP_DELETE(l_client_found); + if (a_client->callbacks.delete) + a_client->callbacks.delete(a_client, a_client->net); char l_node_addr_str[INET_ADDRSTRLEN] = {}; inet_ntop(AF_INET, &a_client->info->hdr.ext_addr_v4, l_node_addr_str, INET_ADDRSTRLEN); log_it(L_INFO, "Closing node client to uplink %s:%d", l_node_addr_str, a_client->info->hdr.ext_port); - dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(a_client->stream_worker, a_client->ch_chain_uuid); - if (l_ch) { - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); - l_ch_chain->callback_notify_packet_in = NULL; - l_ch_chain->callback_notify_packet_out = NULL; - } - l_ch = dap_stream_ch_find_by_uuid_unsafe(a_client->stream_worker, a_client->ch_chain_net_uuid); - if (l_ch) { - dap_stream_ch_chain_net_t *l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(l_ch); - l_ch_chain_net->notify_callback = NULL; + if (a_client->stream_worker) { + dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(a_client->stream_worker, a_client->ch_chain_uuid); + if (l_ch) { + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + l_ch_chain->callback_notify_packet_in = NULL; + l_ch_chain->callback_notify_packet_out = NULL; + } + l_ch = dap_stream_ch_find_by_uuid_unsafe(a_client->stream_worker, a_client->ch_chain_net_uuid); + if (l_ch) { + dap_stream_ch_chain_net_t *l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(l_ch); + l_ch_chain_net->notify_callback = NULL; + } } // clean client dap_client_pvt_t *l_client_pvt = dap_client_pvt_find(a_client->client->pvt_uuid); diff --git a/modules/net/include/dap_chain_net.h b/modules/net/include/dap_chain_net.h index 142bd80e10711d4bb0f154846e5187a7067a83c5..87bdd275eb69e65185d025aaa91ff0d01dbfdfe6 100644 --- a/modules/net/include/dap_chain_net.h +++ b/modules/net/include/dap_chain_net.h @@ -119,7 +119,7 @@ void dap_chain_net_set_flag_sync_from_zero(dap_chain_net_t * a_net, bool a_flag_ bool dap_chain_net_get_flag_sync_from_zero( dap_chain_net_t * a_net); bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client); -void dap_chain_net_sync_unlock(dap_chain_net_t *a_net); +void dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client); dap_chain_net_t * dap_chain_net_by_name( const char * a_name); dap_chain_net_t * dap_chain_net_by_id( dap_chain_net_id_t a_id); diff --git a/modules/net/include/dap_chain_node_client.h b/modules/net/include/dap_chain_node_client.h index 21902259e9901466c29bc498cdd74e7e562cdbeb..6f11f89e507474173237fcdaebc42bb6b34719ce 100644 --- a/modules/net/include/dap_chain_node_client.h +++ b/modules/net/include/dap_chain_node_client.h @@ -113,8 +113,8 @@ typedef struct dap_chain_node_client { struct in6_addr remote_ipv6; bool keep_connection; - bool is_connected; + dap_timerfd_t *sync_timer; // callbacks dap_chain_node_client_callbacks_t callbacks; void * callbacks_arg; diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index d784fb1160de5122ac7159a804b2cc6a6a945491..aebe5a09cdb7ce08f5bfc787b09ffe18af0f2934 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -435,6 +435,7 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_add(dap_chain_t * a_cha break; case DAP_CHAIN_CS_VERIFY_CODE_TX_NO_PREVIOUS: case DAP_CHAIN_CS_VERIFY_CODE_TX_NO_EMISSION: + case DAP_CHAIN_CS_VERIFY_CODE_TX_NO_TOKEN: pthread_rwlock_wrlock(l_events_rwlock); HASH_ADD(hh, PVT(l_dag)->events_treshold, hash, sizeof(l_event_item->hash), l_event_item); pthread_rwlock_unlock(l_events_rwlock); @@ -921,10 +922,11 @@ dap_chain_cs_dag_event_item_t* dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t char * l_event_hash_str = dap_chain_hash_fast_to_str_new(&l_event_item->hash); log_it(L_DEBUG, "Processing event (threshold): %s...", l_event_hash_str); DAP_DELETE(l_event_hash_str); - } int l_add_res = s_dap_chain_add_atom_to_events_table(a_dag, a_ledger, l_event_item); - HASH_ADD(hh, PVT(a_dag)->events,hash,sizeof (l_event_item->hash), l_event_item); - s_dag_events_lasts_process_new_last_event(a_dag, l_event_item); - if(! l_add_res){ + } + int l_add_res = s_dap_chain_add_atom_to_events_table(a_dag, a_ledger, l_event_item); + if (!l_add_res) { + HASH_ADD(hh, PVT(a_dag)->events,hash,sizeof (l_event_item->hash), l_event_item); + s_dag_events_lasts_process_new_last_event(a_dag, l_event_item); if(s_debug_more) log_it(L_INFO, "... moved from treshold to main chains"); res = true; @@ -932,7 +934,7 @@ dap_chain_cs_dag_event_item_t* dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t }else{ if(s_debug_more) log_it(L_WARNING, "... error adding"); - //todo: delete event + DAP_DELETE(l_event_item); } //res = true; }else if(ret == DAP_THRESHOLD_CONFLICTING)