From f23bbc3c061acb1fa3d7e97a4f2dbea8b8d74573 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Mon, 7 Sep 2020 20:46:50 +0300 Subject: [PATCH] [*] Fragmentation GDB packets debugged --- dap-sdk/net/core/dap_worker.c | 9 +- dap-sdk/net/server-udp/dap_udp_server.c | 9 +- .../net/server/http_server/dap_http_folder.c | 1 + .../net/server/http_server/dap_http_simple.c | 7 +- .../http_server/http_client/dap_http_client.c | 4 +- modules/channel/chain/dap_stream_ch_chain.c | 517 +++++++++--------- .../chain/include/dap_stream_ch_chain.h | 3 +- .../chain/include/dap_stream_ch_chain_pkt.h | 2 +- modules/net/dap_chain_net.c | 4 +- modules/service/vpn/dap_chain_net_srv_vpn.c | 2 + 10 files changed, 282 insertions(+), 276 deletions(-) diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 68d1b20191..b3611126c1 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -256,14 +256,15 @@ void *dap_worker_thread(void *arg) // Socket is ready to write if(((l_epoll_events[n].events & EPOLLOUT) || (l_cur->flags & DAP_SOCK_READY_TO_WRITE)) && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE)) { - //log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size); + + //log_it(L_DEBUG, "Main loop output: %u bytes to send", l_cur->buf_out_size); if(l_cur->callbacks.write_callback) l_cur->callbacks.write_callback(l_cur, NULL); // Call callback to process write event + if (l_cur->worker == NULL ){ // esocket was unassigned in callback, we don't need any ops with it now, // continue to poll another esockets continue; } - if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) { static const uint32_t buf_out_zero_count_max = 2; @@ -307,15 +308,11 @@ void *dap_worker_thread(void *arg) if (l_errno != EAGAIN && l_errno != EWOULDBLOCK ){ // If we have non-blocking socket log_it(L_ERROR, "Some error occured in send(): %s", strerror(errno)); l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; - break; } }else{ //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", l_bytes_sent,l_cur->buf_out_size); - //} - //log_it(L_DEBUG,"Output: sent %u bytes",total_sent); if (l_bytes_sent) { l_cur->buf_out_size -= l_bytes_sent; - //log_it(L_DEBUG,"Output: left %u bytes in buffer",l_cur->buf_out_size); if (l_cur->buf_out_size) { memmove(l_cur->buf_out, &l_cur->buf_out[l_bytes_sent], l_cur->buf_out_size); } else { diff --git a/dap-sdk/net/server-udp/dap_udp_server.c b/dap-sdk/net/server-udp/dap_udp_server.c index d02ac6a629..26a0fc39bb 100644 --- a/dap-sdk/net/server-udp/dap_udp_server.c +++ b/dap-sdk/net/server-udp/dap_udp_server.c @@ -182,14 +182,7 @@ static void write_cb( EPOLL_HANDLE efd, int revents, dap_server_t *sh ) dap_events_socket_t *client = udp_client->esocket; if( client != NULL && !check_close(client) && (client->flags & DAP_SOCK_READY_TO_WRITE) ) { - - if ( sh->client_callbacks.write_callback ) - sh->client_callbacks.write_callback( client, NULL ); - if ( client->buf_out_size > 0 ) { - - - struct sockaddr_in addr; addr.sin_family = AF_INET; dap_udp_client_get_address( client, (unsigned int *)&addr.sin_addr.s_addr, &addr.sin_port ); @@ -211,6 +204,8 @@ static void write_cb( EPOLL_HANDLE efd, int revents, dap_server_t *sh ) sb_payload_ready = false; } LL_DELETE( udp->waiting_clients, udp_client ); + if ( sh->client_callbacks.write_callback ) + sh->client_callbacks.write_callback( client, NULL ); } else if( client == NULL ) { LL_DELETE( udp->waiting_clients, udp_client ); diff --git a/dap-sdk/net/server/http_server/dap_http_folder.c b/dap-sdk/net/server/http_server/dap_http_folder.c index d16e8a0d4a..857f397752 100644 --- a/dap-sdk/net/server/http_server/dap_http_folder.c +++ b/dap-sdk/net/server/http_server/dap_http_folder.c @@ -297,6 +297,7 @@ void dap_http_folder_data_write(dap_http_client_t * cl_ht, void * arg) dap_http_file_t * cl_ht_file= DAP_HTTP_FILE(cl_ht); cl_ht->esocket->buf_out_size=fread(cl_ht->esocket->buf_out,1,sizeof(cl_ht->esocket->buf_out),cl_ht_file->fd); cl_ht_file->position+=cl_ht->esocket->buf_out_size; + dap_events_socket_set_writable_unsafe(cl_ht->esocket, true); if(feof(cl_ht_file->fd)!=0){ log_it(L_INFO, "All the file %s is sent out",cl_ht_file->local_path); diff --git a/dap-sdk/net/server/http_server/dap_http_simple.c b/dap-sdk/net/server/http_server/dap_http_simple.c index 3e43bacb72..9428a6f00d 100644 --- a/dap-sdk/net/server/http_server/dap_http_simple.c +++ b/dap-sdk/net/server/http_server/dap_http_simple.c @@ -228,14 +228,10 @@ inline static bool _is_supported_user_agents_list_setted() inline static void _set_only_write_http_client_state(dap_http_client_t* http_client) { // log_it(L_DEBUG,"_set_only_write_http_client_state"); -// Sleep(300); - - http_client->esocket->flags = DAP_SOCK_READY_TO_WRITE; // To not to touch epoll_fd we clean flags by ourself -// http_client->state_write=DAP_HTTP_CLIENT_STATE_NONE; http_client->state_write=DAP_HTTP_CLIENT_STATE_START; dap_events_socket_set_writable_unsafe(http_client->esocket,true); -// http_client->state_write=DAP_HTTP_CLIENT_STATE_START; + dap_events_socket_set_readable_unsafe(http_client->esocket, false); } static void _copy_reply_and_mime_to_response( dap_http_simple_t *cl_sh ) @@ -380,6 +376,7 @@ static void s_http_client_data_write( dap_http_client_t * a_http_client, void *a l_http_simple->reply_sent += dap_events_socket_write_unsafe( a_http_client->esocket, l_http_simple->reply_byte + l_http_simple->reply_sent, a_http_client->out_content_length - l_http_simple->reply_sent ); + dap_events_socket_set_writable_unsafe(a_http_client->esocket, true); if ( l_http_simple->reply_sent >= a_http_client->out_content_length ) { log_it(L_INFO, "All the reply (%u) is sent out", a_http_client->out_content_length ); diff --git a/dap-sdk/net/server/http_server/http_client/dap_http_client.c b/dap-sdk/net/server/http_server/http_client/dap_http_client.c index b5add239f7..ef122b7a5d 100644 --- a/dap-sdk/net/server/http_server/http_client/dap_http_client.c +++ b/dap-sdk/net/server/http_server/http_client/dap_http_client.c @@ -498,7 +498,7 @@ void dap_http_client_write( dap_events_socket_t * cl, void *arg ) log_it( L_INFO," HTTP response with %u status code", l_http_client->reply_status_code ); dap_events_socket_write_f_unsafe(cl, "HTTP/1.1 %u %s\r\n",l_http_client->reply_status_code, l_http_client->reply_reason_phrase[0] ? l_http_client->reply_reason_phrase : http_status_reason_phrase(l_http_client->reply_status_code) ); - + dap_events_socket_set_writable_unsafe(cl, true); dap_http_client_out_header_generate( l_http_client ); l_http_client->state_write = DAP_HTTP_CLIENT_STATE_HEADERS; } break; @@ -508,6 +508,7 @@ void dap_http_client_write( dap_events_socket_t * cl, void *arg ) if ( hdr == NULL ) { log_it(L_DEBUG, "Output: headers are over (reply status code %u)",l_http_client->reply_status_code); dap_events_socket_write_f_unsafe(cl, "\r\n"); + dap_events_socket_set_writable_unsafe(cl, true); if ( l_http_client->out_content_length || l_http_client->out_content_ready ) { l_http_client->state_write=DAP_HTTP_CLIENT_STATE_DATA; } else { @@ -520,6 +521,7 @@ void dap_http_client_write( dap_events_socket_t * cl, void *arg ) } else { //log_it(L_WARNING,"Output: header %s: %s",hdr->name,hdr->value); dap_events_socket_write_f_unsafe(cl, "%s: %s\r\n", hdr->name, hdr->value); + dap_events_socket_set_writable_unsafe(cl, true); dap_http_header_remove( &l_http_client->out_headers, hdr ); } } break; diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 2077b79769..fe8116937f 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -138,12 +138,12 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create(l_chain); l_ch_chain->request_atom_iter = l_iter; l_lasts = l_chain->callback_atom_iter_get_lasts(l_iter, &l_lasts_count, &l_lasts_sizes); + log_it(L_INFO, "Found %d atoms for synchronization", l_lasts_count); if (l_lasts && l_lasts_sizes) { for(long int i = l_lasts_count - 1; i >= 0; i--) { dap_chain_atom_item_t * l_item = NULL; dap_chain_hash_fast_t l_atom_hash; - dap_hash_fast(l_lasts[i], l_lasts_sizes[i], - &l_atom_hash); + dap_hash_fast(l_lasts[i], l_lasts_sizes[i], &l_atom_hash); pthread_mutex_lock(&l_ch_chain->mutex); HASH_FIND(hh, l_ch_chain->request_atoms_lasts, &l_atom_hash, sizeof(l_atom_hash), l_item); @@ -207,6 +207,7 @@ bool s_sync_gdb_callback(dap_proc_thread_t *a_thread, void *a_arg) //log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list)); // Add it to outgoing list l_ch_chain->request_global_db_trs = l_db_log; + l_ch_chain->db_iter = NULL; l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB; } else { dap_stream_ch_chain_sync_request_t l_request = {}; @@ -214,13 +215,12 @@ bool s_sync_gdb_callback(dap_proc_thread_t *a_thread, void *a_arg) l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); + l_ch_chain->request_net_id, l_ch_chain->request_chain_id, + l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); l_ch_chain->state = CHAIN_STATE_IDLE; if(l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, - DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - NULL, 0, l_ch_chain->callback_notify_arg); + l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + NULL, 0, l_ch_chain->callback_notify_arg); } //log_it(L_INFO, "Prepared %u items for sync", l_db_log->items_number - l_request->id_start); // go to send data from list [in s_stream_ch_packet_out()] @@ -381,91 +381,75 @@ bool s_gdb_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) */ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) { - //static char *s_net_name = NULL; dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); - if(l_ch_chain) { - dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg; - dap_stream_ch_chain_pkt_t * l_chain_pkt = (dap_stream_ch_chain_pkt_t *) l_ch_pkt->data; - uint8_t l_acl_idx = dap_chain_net_acl_idx_by_id(l_chain_pkt->hdr.net_id); - bool l_error = false; - char l_err_str[64]; - if (l_acl_idx == (uint8_t)-1) { - log_it(L_ERROR, "Invalid net id in packet"); - strcpy(l_err_str, "ERROR_NET_INVALID_ID"); - l_error = true; - } - if (!l_error && a_ch->stream->session->acl && !a_ch->stream->session->acl[l_acl_idx]) { - log_it(L_WARNING, "Unauthorized request attempt to network %s", - dap_chain_net_by_id(l_chain_pkt->hdr.net_id)->pub.name); - strcpy(l_err_str, "ERROR_NET_NOT_AUTHORIZED"); - l_error = true; - } - if (l_error) { + if (!l_ch_chain) { + return; + } + dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg; + dap_stream_ch_chain_pkt_t * l_chain_pkt = (dap_stream_ch_chain_pkt_t *) l_ch_pkt->data; + if (!l_chain_pkt) { + return; + } + size_t l_chain_pkt_data_size = l_ch_pkt->hdr.size - sizeof(l_chain_pkt->hdr); + uint8_t l_acl_idx = dap_chain_net_acl_idx_by_id(l_chain_pkt->hdr.net_id); + if (l_acl_idx == (uint8_t)-1) { + log_it(L_ERROR, "Invalid net id in packet"); + if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR) { + if(l_ch_chain->callback_notify_packet_in) { + l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt, + l_chain_pkt_data_size, l_ch_chain->callback_notify_arg); + } + } else { dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, l_err_str); + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_NET_INVALID_ID"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); } - size_t l_chain_pkt_data_size = l_ch_pkt->hdr.size - sizeof(l_chain_pkt->hdr); - if (!l_error && l_chain_pkt) { - switch (l_ch_pkt->hdr.type) { - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: { - log_it(L_INFO, "In: SYNCED_ALL pkt"); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: { - log_it(L_INFO, "In: SYNCED_GLOBAL_DB pkt"); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP: { - log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt"); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: { - log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt"); - } - break; + return; + } + if (a_ch->stream->session->acl && !a_ch->stream->session->acl[l_acl_idx]) { + log_it(L_WARNING, "Unauthorized request attempt to network %s", + dap_chain_net_by_id(l_chain_pkt->hdr.net_id)->pub.name); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_NET_NOT_AUTHORIZED"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + return; + } + switch (l_ch_pkt->hdr.type) { + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: { + log_it(L_INFO, "In: SYNCED_ALL pkt"); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: { + log_it(L_INFO, "In: SYNCED_GLOBAL_DB pkt"); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP: { + log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt"); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: { + log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt"); + } + break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { - log_it(L_INFO, "In: SYNCED_CHAINS pkt"); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: { - log_it(L_INFO, "In: SYNC_CHAINS pkt"); - dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); - if(l_chain) { - if(l_ch_chain->state != CHAIN_STATE_IDLE) { - log_it(L_INFO, "Can't process SYNC_CHAINS request because not in idle state"); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_STATE_NOT_IN_IDLE"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } else { - // fill ids - if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { - dap_stream_ch_chain_sync_request_t * l_request = - (dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data; - memcpy(&l_ch_chain->request, l_request, l_chain_pkt_data_size); - memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); - memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); - memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); - } - dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_chains_callback, a_ch); - } - } - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: { - log_it(L_INFO, "In: SYNC_GLOBAL_DB pkt"); - if(l_ch_chain->state != CHAIN_STATE_IDLE) { - log_it(L_INFO, "Can't process SYNC_GLOBAL_DB request because not in idle state"); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_STATE_NOT_IN_IDLE"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - break; - } - // receive the latest global_db revision of the remote node -> go to send mode + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { + log_it(L_INFO, "In: SYNCED_CHAINS pkt"); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: { + log_it(L_INFO, "In: SYNC_CHAINS pkt"); + dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); + if(l_chain) { + if(l_ch_chain->state != CHAIN_STATE_IDLE) { + log_it(L_INFO, "Can't process SYNC_CHAINS request because not in idle state"); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_STATE_NOT_IN_IDLE"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + } else { + // fill ids if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { dap_stream_ch_chain_sync_request_t * l_request = (dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data; @@ -473,114 +457,138 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); - dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_gdb_callback, a_ch); - } - else { - log_it(L_ERROR, "Get DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB session_id=%u bad request", - a_ch->stream->session->id); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_SYNC_GLOBAL_DB_REQUEST_BAD"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } } - break; - // first packet of data with source node address - case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN: { - log_it(L_INFO, "In: FIRST_CHAIN data_size=%d", l_chain_pkt_data_size); - if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)) - memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size); + dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); + dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_chains_callback, a_ch); } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN: { - //log_it(L_INFO, "In: CHAIN pkt"); - dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); - if(l_chain) { - // Expect atom element in - if(l_chain_pkt_data_size > 0) { - memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); - memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); - memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); - l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size); - memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); - l_ch_chain->pkt_data_size = l_chain_pkt_data_size; - dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_chain_pkt_callback, a_ch); - } else { - log_it(L_WARNING, "Empty chain packet"); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_CHAIN_PACKET_EMPTY"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } - } - } - break; - // first packet of data with source node address - case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB: { - log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d", l_chain_pkt_data_size); - if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)) - memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB: { - //log_it(L_INFO, "In: GLOBAL_DB data_size=%d", l_chain_pkt_data_size); - // get transaction and save it to global_db - if(l_chain_pkt_data_size > 0) { - memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); - memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); - memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); - l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size); - memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); - l_ch_chain->pkt_data_size = l_chain_pkt_data_size; - dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_gdb_pkt_callback, a_ch); - } else { - log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size"); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_GLOBAL_DB_PACKET_EMPTY"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS: { - dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; - memcpy(&l_sync_gdb, l_chain_pkt->data, l_chain_pkt_data_size); - dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id); - l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ? - dap_chain_net_get_cur_addr(l_net)->uint64 : - dap_db_get_cur_node_addr(l_net->pub.name); - // Get last timestamp in log - l_sync_gdb.id_start = (uint64_t) dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); - // no limit - l_sync_gdb.id_end = (uint64_t)0; - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: { - dap_stream_ch_chain_sync_request_t l_sync_chains = {}; - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_chains, sizeof(l_sync_chains)); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR: - break; - default: { + } + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: { + log_it(L_INFO, "In: SYNC_GLOBAL_DB pkt"); + if(l_ch_chain->state != CHAIN_STATE_IDLE) { + log_it(L_INFO, "Can't process SYNC_GLOBAL_DB request because not in idle state"); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_STATE_NOT_IN_IDLE"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + break; + } + // receive the latest global_db revision of the remote node -> go to send mode + if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { + dap_stream_ch_chain_sync_request_t * l_request = + (dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data; + memcpy(&l_ch_chain->request, l_request, l_chain_pkt_data_size); + memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); + memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); + memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); + dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); + dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_gdb_callback, a_ch); + } + else { + log_it(L_ERROR, "Get DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB session_id=%u bad request", + a_ch->stream->session->id); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_SYNC_GLOBAL_DB_REQUEST_BAD"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + } + } + break; + // first packet of data with source node address + case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN: { + log_it(L_INFO, "In: FIRST_CHAIN data_size=%d", l_chain_pkt_data_size); + if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)) + memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN: { + log_it(L_INFO, "In: CHAIN pkt data_size=%d", l_chain_pkt_data_size); + dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); + if(l_chain) { + // Expect atom element in + if(l_chain_pkt_data_size > 0) { + memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); + memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); + memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); + l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size); + memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); + l_ch_chain->pkt_data_size = l_chain_pkt_data_size; + dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); + dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_chain_pkt_callback, a_ch); + } else { + log_it(L_WARNING, "Empty chain packet"); dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_UNKNOWN_CHAIN_PKT_TYPE"); - } + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_CHAIN_PACKET_EMPTY"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); } - if(l_ch_chain->callback_notify_packet_in) - l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt, - l_chain_pkt_data_size, l_ch_chain->callback_notify_arg); } } + break; + // first packet of data with source node address + case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB: { + log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d", l_chain_pkt_data_size); + if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)) + memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB: { + log_it(L_INFO, "In: GLOBAL_DB data_size=%d", l_chain_pkt_data_size); + // get transaction and save it to global_db + if(l_chain_pkt_data_size > 0) { + memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); + memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); + memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); + l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size); + memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); + l_ch_chain->pkt_data_size = l_chain_pkt_data_size; + dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); + dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_gdb_pkt_callback, a_ch); + } else { + log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size"); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_GLOBAL_DB_PACKET_EMPTY"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + } + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS: { + dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; + memcpy(&l_sync_gdb, l_chain_pkt->data, l_chain_pkt_data_size); + dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id); + l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ? + dap_chain_net_get_cur_addr(l_net)->uint64 : + dap_db_get_cur_node_addr(l_net->pub.name); + // Get last timestamp in log + l_sync_gdb.id_start = (uint64_t) dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); + // no limit + l_sync_gdb.id_end = (uint64_t)0; + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: { + dap_stream_ch_chain_sync_request_t l_sync_chains = {}; + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_chains, sizeof(l_sync_chains)); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR: + break; + default: { + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_UNKNOWN_CHAIN_PKT_TYPE"); + } + } + if(l_ch_chain->callback_notify_packet_in) + l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt, + l_chain_pkt_data_size, l_ch_chain->callback_notify_arg); } + /** * @brief dap_stream_ch_chain_go_idle * @param a_ch_chain @@ -607,6 +615,29 @@ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain) pthread_mutex_unlock(&a_ch_chain->mutex); } +bool s_process_gdb_iter(dap_stream_ch_t *a_ch) +{ + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); + dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs; + dap_store_obj_pkt_t *l_pkt = (dap_store_obj_pkt_t *)l_ch_chain->db_iter->data; + uint32_t l_pkt_size = sizeof(dap_store_obj_pkt_t) + l_pkt->data_size; + log_it(L_INFO, "Send one global_db record packet len=%d (rest=%d/%d items)", l_pkt_size, + dap_db_log_list_get_count_rest(l_db_list), dap_db_log_list_get_count(l_db_list)); + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, + l_ch_chain->request_net_id, l_ch_chain->request_chain_id, + l_ch_chain->request_cell_id, l_pkt, l_pkt_size); + dap_list_t *l_iter = dap_list_next(l_ch_chain->db_iter); + if (l_iter) { + l_ch_chain->db_iter = l_iter; + } else { + l_ch_chain->stats_request_gdb_processed++; + l_ch_chain->db_iter = dap_list_first(l_ch_chain->db_iter); + dap_list_free_full(l_ch_chain->db_iter, free); + l_ch_chain->db_iter = NULL; + } + return true; +} + bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) { UNUSED(a_thread); @@ -617,81 +648,56 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) // log_it( L_DEBUG,"l_ch_chain %X", l_ch_chain ); bool l_packet_out = false; switch (l_ch_chain->state) { + case CHAIN_STATE_IDLE: { dap_stream_ch_chain_go_idle(l_ch_chain); } break; - case CHAIN_STATE_SYNC_ALL: - case CHAIN_STATE_SYNC_GLOBAL_DB: { - - // Get log diff - dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs; - dap_global_db_obj_t *l_obj = dap_db_log_list_get(l_db_list); - - bool l_is_stop = true; - while(l_obj) { - size_t l_item_size_out = 0; - dap_list_t *l_item = dap_db_log_pack(l_obj, &l_item_size_out); - // Item not found, maybe it has deleted? Then go to the next item - if(!l_item || !l_item_size_out) { - l_item_size_out = 0; - l_obj = dap_db_log_list_get(l_db_list); - } - else { - /*size_t l_items_total = dap_db_log_list_get_count(l_db_list); - size_t l_items_rest = dap_db_log_list_get_count_rest(l_db_list); - log_it(L_INFO, "Send one global_db record data=0x%x len=%d (rest=%d/%d items)", l_item, l_item_size_out, - l_items_rest, l_items_total);*/ - for (dap_list_t *l_iter = l_item; l_iter; l_iter = dap_list_next(l_iter)) { - dap_store_obj_pkt_t *l_pkt = (dap_store_obj_pkt_t *)l_iter->data; - uint32_t l_pkt_size = sizeof(dap_store_obj_pkt_t) + l_pkt->data_size; - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, l_pkt, l_pkt_size); + if (l_ch_chain->db_iter) { + l_packet_out = s_process_gdb_iter(l_ch); + } else { + dap_global_db_obj_t *l_obj; + do { // Get log diff + size_t l_item_size_out = 0; + l_obj = dap_db_log_list_get(l_ch_chain->request_global_db_trs); + l_ch_chain->db_iter = dap_db_log_pack(l_obj, &l_item_size_out); + if (l_ch_chain->db_iter && l_item_size_out) { + break; } + // Item not found, maybe it has deleted? Then go to the next item + } while (l_obj); + if (l_ch_chain->db_iter) { + l_packet_out = s_process_gdb_iter(l_ch); + } else { + //log_it(L_DEBUG, "l_obj == 0, STOP"); + // free log list + dap_db_log_list_delete(l_ch_chain->request_global_db_trs); + l_ch_chain->request_global_db_trs = NULL; + // last message + dap_stream_ch_chain_sync_request_t l_request = {}; + dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); + l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; + l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); + l_request.id_end = 0; + + log_it( L_DEBUG,"Syncronized database: last id %llu, items syncronyzed %llu ", l_request.id_start, + l_ch_chain->stats_request_gdb_processed ); + + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + l_ch_chain->request_net_id, l_ch_chain->request_chain_id, + l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); l_packet_out = true; - l_ch_chain->stats_request_gdb_processed++; - dap_list_free_full(l_item, free); - // sent the record, another will be sent - l_is_stop = false; - break; - } - } - if(l_is_stop){ - //log_it(L_DEBUG, "l_obj == 0, STOP"); - // free log list - l_ch_chain->request_global_db_trs = NULL; - dap_db_log_list_delete(l_db_list); - // last message - dap_stream_ch_chain_sync_request_t l_request = {}; - dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); - l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; - l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); - l_request.id_end = 0; - - log_it( L_DEBUG,"Syncronized database: last id %llu, items syncronyzed %llu ", l_request.id_start, - l_ch_chain->stats_request_gdb_processed ); - - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); - l_packet_out = true; - - if(l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - NULL, 0, l_ch_chain->callback_notify_arg); - - if(l_ch_chain->state != CHAIN_STATE_SYNC_ALL) + if(l_ch_chain->callback_notify_packet_out) + l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + NULL, 0, l_ch_chain->callback_notify_arg); dap_stream_ch_chain_go_idle(l_ch_chain); + } } + } break; - } - if(l_ch_chain->state != CHAIN_STATE_SYNC_ALL) - break; - - // Synchronize chains + // Synchronize chains case CHAIN_STATE_SYNC_CHAINS: { //log_it(L_DEBUG, "CHAIN_STATE_SYNC_CHAINS"); dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); @@ -730,6 +736,7 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) // Then flush it out to the remote size_t l_atom_size = l_atom_item->atom_size; + log_it(L_INFO, "Send one chain packet len=%d", l_atom_size); dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, l_atom_item->atom, l_atom_size); @@ -766,8 +773,8 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) } pthread_mutex_unlock(&l_ch_chain->mutex); } - } - break; + } break; + default: break; } if (l_packet_out) { dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); @@ -784,6 +791,10 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) { (void) a_arg; + if (a_ch->stream->esocket->buf_out_size > DAP_EVENTS_SOCKET_BUF / 2) { + return; + } + dap_stream_ch_set_ready_to_write_unsafe(a_ch, false); dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); if (l_ch_chain && l_ch_chain->state != CHAIN_STATE_IDLE) { dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index 2d1ef97511..9e576b7656 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -49,7 +49,8 @@ typedef struct dap_stream_ch_chain { pthread_mutex_t mutex; dap_stream_ch_t * ch; - dap_db_log_list_t *request_global_db_trs; // list of transactions + dap_db_log_list_t *request_global_db_trs; // list of global db records + dap_list_t *db_iter; dap_stream_ch_chain_state_t state; dap_chain_atom_iter_t * request_atom_iter; diff --git a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h index 5091c15534..a56e07c490 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h +++ b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h @@ -57,7 +57,7 @@ typedef enum dap_stream_ch_chain_state{ CHAIN_STATE_IDLE=0, CHAIN_STATE_SYNC_CHAINS, CHAIN_STATE_SYNC_GLOBAL_DB, - CHAIN_STATE_SYNC_ALL, + CHAIN_STATE_SYNC_ALL } dap_stream_ch_chain_state_t; typedef struct dap_stream_ch_chain_sync_request{ diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index bbca88442b..2e42961970 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -496,7 +496,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net) } // wait for finishing of request - int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms + int timeout_ms = 30000; // 5 min = 300 sec = 300 000 ms // TODO add progress info to console l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); switch (l_res) { @@ -556,7 +556,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net) dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id, l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request)); // wait for finishing of request - int timeout_ms = 120000; // 2 min = 120 sec = 120 000 ms + int timeout_ms = 20000; // 2 min = 120 sec = 120 000 ms // TODO add progress info to console if (dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id())) { l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index ae7385b022..4edf2abcb3 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -1223,6 +1223,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) assert(l_tun); // Unsafely send it dap_events_socket_write_unsafe( l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size); + dap_events_socket_set_writable_unsafe(l_tun->es, true); } break; // for servier only @@ -1231,6 +1232,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) assert(l_tun); // Unsafely send it size_t l_ret = dap_events_socket_write_unsafe( l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size); + dap_events_socket_set_writable_unsafe(l_tun->es, true); if( l_ret) s_update_limits (a_ch, l_srv_session, l_usage,l_ret ); } break; -- GitLab