From 45d225f3694af334bae3d5b74b871caa4811d183 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Fri, 4 Sep 2020 17:22:20 +0300 Subject: [PATCH 1/4] [+] Packet split for GDB sync --- modules/channel/chain/dap_stream_ch_chain.c | 16 +-- .../channel/chain/dap_stream_ch_chain_pkt.c | 8 +- .../global-db/dap_chain_global_db_driver.c | 97 ++++++++++--------- modules/global-db/dap_chain_global_db_hist.c | 44 ++------- .../global-db/include/dap_chain_global_db.h | 2 +- .../include/dap_chain_global_db_driver.h | 9 +- modules/net/dap_chain_net.c | 6 +- 7 files changed, 87 insertions(+), 95 deletions(-) diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 8f94aa9be..aea46e12c 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -628,12 +628,10 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) bool l_is_stop = true; while(l_obj) { size_t l_item_size_out = 0; - uint8_t *l_item = dap_db_log_pack(l_obj, &l_item_size_out); + dap_list_t *l_item = dap_db_log_pack(l_obj, &l_item_size_out); // Item not found, maybe it has deleted? Then go to the next item if(!l_item || !l_item_size_out) { - //log_it(L_WARNING, "Log pack returned NULL??? data=0x%x (nothing to send) (rest=%d records)", l_obj, l_items_rest); l_item_size_out = 0; - // go to next item l_obj = dap_db_log_list_get(l_db_list); } else { @@ -641,12 +639,16 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) size_t l_items_rest = dap_db_log_list_get_count_rest(l_db_list); log_it(L_INFO, "Send one global_db record data=0x%x len=%d (rest=%d/%d items)", l_item, l_item_size_out, l_items_rest, l_items_total);*/ - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, l_item, l_item_size_out); + for (dap_list_t *l_iter = l_item; l_iter; l_iter = dap_list_next(l_iter)) { + dap_store_obj_pkt_t *l_pkt = (dap_store_obj_pkt_t *)l_iter->data; + uint32_t l_pkt_size = sizeof(dap_store_obj_pkt_t) + l_pkt->data_size; + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, + l_ch_chain->request_net_id, l_ch_chain->request_chain_id, + l_ch_chain->request_cell_id, l_pkt, l_pkt_size); + } l_packet_out = true; l_ch_chain->stats_request_gdb_processed++; - DAP_DELETE(l_item); + dap_list_free_full(l_item, free); // sent the record, another will be sent l_is_stop = false; break; diff --git a/modules/channel/chain/dap_stream_ch_chain_pkt.c b/modules/channel/chain/dap_stream_ch_chain_pkt.c index aff9bb677..19cc56601 100644 --- a/modules/channel/chain/dap_stream_ch_chain_pkt.c +++ b/modules/channel/chain/dap_stream_ch_chain_pkt.c @@ -49,8 +49,8 @@ dap_stream_ch_chain_state_t dap_stream_ch_chain_pkt_type_to_dap_stream_ch_chain_ * @return */ size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id, - dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, - const void * a_data, size_t a_data_size) + dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, + const void * a_data, size_t a_data_size) { dap_stream_ch_chain_pkt_t * l_chain_pkt; size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr) + a_data_size; @@ -69,8 +69,8 @@ size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_typ } size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id, - dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, - const void * a_data, size_t a_data_size) + dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, + const void * a_data, size_t a_data_size) { dap_stream_ch_chain_pkt_t * l_chain_pkt; size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr) + a_data_size; diff --git a/modules/global-db/dap_chain_global_db_driver.c b/modules/global-db/dap_chain_global_db_driver.c index b1e805831..fd07a1bda 100644 --- a/modules/global-db/dap_chain_global_db_driver.c +++ b/modules/global-db/dap_chain_global_db_driver.c @@ -196,53 +196,64 @@ static size_t dap_db_get_size_pdap_store_obj_t(pdap_store_obj_t store_obj) * @param a_size_out[out] size of output structure * @return NULL in case of an error */ -dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp, +dap_list_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp, size_t a_store_obj_count) { - if(!a_store_obj || a_store_obj_count < 1) + if (!a_store_obj || a_store_obj_count < 1) return NULL; - size_t l_data_size_out = sizeof(uint32_t); // size of output data + // calculate output structure size - for(size_t l_q = 0; l_q < a_store_obj_count; ++l_q) + dap_list_t *l_ret = NULL; + dap_store_obj_pkt_t *l_pkt; + uint32_t l_obj_count = 0, l_data_size_out = 0; + for (size_t l_q = 0; l_q < a_store_obj_count; ++l_q) { l_data_size_out += dap_db_get_size_pdap_store_obj_t(&a_store_obj[l_q]); - - dap_store_obj_pkt_t *l_pkt = DAP_NEW_Z_SIZE(dap_store_obj_pkt_t, sizeof(dap_store_obj_pkt_t) + l_data_size_out); - l_pkt->data_size = l_data_size_out; - l_pkt->timestamp = a_timestamp; - uint64_t l_offset = 0; - uint32_t l_count = (uint32_t) a_store_obj_count; - memcpy(l_pkt->data + l_offset, &l_count, sizeof(uint32_t)); - l_offset += sizeof(uint32_t); - for(size_t l_q = 0; l_q < a_store_obj_count; ++l_q) { - dap_store_obj_t obj = a_store_obj[l_q]; - //uint16_t section_size = (uint16_t) dap_strlen(obj.section); - uint16_t group_size = (uint16_t) dap_strlen(obj.group); - uint16_t key_size = (uint16_t) dap_strlen(obj.key); - memcpy(l_pkt->data + l_offset, &obj.type, sizeof(int)); - l_offset += sizeof(int); - //memcpy(l_pkt->data + l_offset, §ion_size, sizeof(uint16_t)); - //l_offset += sizeof(uint16_t); - //memcpy(l_pkt->data + l_offset, obj.section, section_size); - //l_offset += section_size; - memcpy(l_pkt->data + l_offset, &group_size, sizeof(uint16_t)); - l_offset += sizeof(uint16_t); - memcpy(l_pkt->data + l_offset, obj.group, group_size); - l_offset += group_size; - memcpy(l_pkt->data + l_offset, &obj.id, sizeof(uint64_t)); - l_offset += sizeof(uint64_t); - memcpy(l_pkt->data + l_offset, &obj.timestamp, sizeof(time_t)); - l_offset += sizeof(time_t); - memcpy(l_pkt->data + l_offset, &key_size, sizeof(uint16_t)); - l_offset += sizeof(uint16_t); - memcpy(l_pkt->data + l_offset, obj.key, key_size); - l_offset += key_size; - memcpy(l_pkt->data + l_offset, &obj.value_len, sizeof(size_t)); - l_offset += sizeof(size_t); - memcpy(l_pkt->data + l_offset, obj.value, obj.value_len); - l_offset += obj.value_len; + if (l_data_size_out > DAP_CHAIN_PKT_EXPECT_SIZE || (l_q == a_store_obj_count - 1 && l_data_size_out)) { + l_pkt = DAP_NEW_Z_SIZE(dap_store_obj_pkt_t, sizeof(dap_store_obj_pkt_t) + l_data_size_out); + l_pkt->data_size = l_data_size_out; + l_pkt->timestamp = a_timestamp; + l_pkt->obj_count = l_q + 1 - l_obj_count; + l_ret = dap_list_append(l_ret, l_pkt); + l_data_size_out = 0; + l_obj_count = l_q + 1; + } } - assert(l_data_size_out == l_offset); - return l_pkt; + l_obj_count = 0; + for (dap_list_t *l_iter = l_ret; l_iter; l_iter = dap_list_next(l_iter)) { + l_pkt = (dap_store_obj_pkt_t *)l_iter->data; + uint64_t l_offset = 0; + for(size_t l_q = 0; l_q < l_pkt->obj_count; ++l_q) { + dap_store_obj_t obj = a_store_obj[l_obj_count + l_q]; + //uint16_t section_size = (uint16_t) dap_strlen(obj.section); + uint16_t group_size = (uint16_t) dap_strlen(obj.group); + uint16_t key_size = (uint16_t) dap_strlen(obj.key); + memcpy(l_pkt->data + l_offset, &obj.type, sizeof(int)); + l_offset += sizeof(int); + //memcpy(l_pkt->data + l_offset, §ion_size, sizeof(uint16_t)); + //l_offset += sizeof(uint16_t); + //memcpy(l_pkt->data + l_offset, obj.section, section_size); + //l_offset += section_size; + memcpy(l_pkt->data + l_offset, &group_size, sizeof(uint16_t)); + l_offset += sizeof(uint16_t); + memcpy(l_pkt->data + l_offset, obj.group, group_size); + l_offset += group_size; + memcpy(l_pkt->data + l_offset, &obj.id, sizeof(uint64_t)); + l_offset += sizeof(uint64_t); + memcpy(l_pkt->data + l_offset, &obj.timestamp, sizeof(time_t)); + l_offset += sizeof(time_t); + memcpy(l_pkt->data + l_offset, &key_size, sizeof(uint16_t)); + l_offset += sizeof(uint16_t); + memcpy(l_pkt->data + l_offset, obj.key, key_size); + l_offset += key_size; + memcpy(l_pkt->data + l_offset, &obj.value_len, sizeof(size_t)); + l_offset += sizeof(size_t); + memcpy(l_pkt->data + l_offset, obj.value, obj.value_len); + l_offset += obj.value_len; + } + l_obj_count += l_pkt->obj_count; + assert(l_pkt->data_size == l_offset); + } + return l_ret; } /** * deserialization @@ -255,9 +266,7 @@ dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *pkt, siz if(!pkt || pkt->data_size < 1) return NULL; uint64_t offset = 0; - uint32_t count; - memcpy(&count, pkt->data, sizeof(uint32_t)); - offset += sizeof(uint32_t); + uint32_t count = pkt->obj_count; dap_store_obj_t *store_obj = DAP_NEW_Z_SIZE(dap_store_obj_t, count * sizeof(struct dap_store_obj)); for(size_t q = 0; q < count; ++q) { dap_store_obj_t *obj = store_obj + q; diff --git a/modules/global-db/dap_chain_global_db_hist.c b/modules/global-db/dap_chain_global_db_hist.c index c2683b156..4ac96d147 100644 --- a/modules/global-db/dap_chain_global_db_hist.c +++ b/modules/global-db/dap_chain_global_db_hist.c @@ -77,7 +77,7 @@ static char* dap_db_new_history_timestamp() * * return dap_store_obj_pkt_t* */ -uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out) +dap_list_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out) { if(!a_obj) return NULL; @@ -122,15 +122,18 @@ uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out) i++; } // serialize data - dap_store_obj_pkt_t *l_data_out = dap_store_packet_multiple(l_store_obj, l_timestamp, l_count); + dap_list_t *l_data_out = dap_store_packet_multiple(l_store_obj, l_timestamp, l_count); dap_store_obj_free(l_store_obj, l_count); dap_strfreev(l_keys); if(l_data_out && a_data_size_out) { - *a_data_size_out = sizeof(dap_store_obj_pkt_t) + l_data_out->data_size; + *a_data_size_out = 0; + for (dap_list_t *l_iter = l_data_out; l_iter; l_iter = dap_list_next(l_iter)) { + *a_data_size_out += sizeof(dap_store_obj_pkt_t) + ((dap_store_obj_pkt_t *)l_data_out)->data_size; + } } - return (uint8_t*) l_data_out; + return l_data_out; } @@ -1326,7 +1329,6 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_gr dap_db_log_list_t *l_dap_db_log_list = DAP_NEW_Z(dap_db_log_list_t); size_t l_add_groups_num = 0;// number of group - //size_t l_add_groups_items_num = 0;// number items in all groups dap_list_t *l_add_groups_mask = a_add_groups_mask; // calc l_add_groups_num while(l_add_groups_mask) { @@ -1337,7 +1339,6 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_gr l_add_groups_mask = dap_list_next(l_add_groups_mask); } - //size_t l_add_groups_num = dap_list_length(a_add_groups_mask); size_t l_data_size_out_main = dap_chain_global_db_driver_count(GROUP_LOCAL_HISTORY, first_id); size_t *l_data_size_out_add_items = DAP_NEW_Z_SIZE(size_t, sizeof(size_t) * l_add_groups_num); uint64_t *l_group_last_id = DAP_NEW_Z_SIZE(uint64_t, sizeof(uint64_t) * l_add_groups_num); @@ -1366,9 +1367,6 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_gr } if(!(l_data_size_out_main + l_data_size_out_add_items_count)) return NULL; - // debug -// if(l_data_size_out>11) -// l_data_size_out = 11; l_dap_db_log_list->item_start = first_id; l_dap_db_log_list->item_last = first_id + l_data_size_out_main; l_dap_db_log_list->items_number_main = l_data_size_out_main; @@ -1381,31 +1379,9 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_gr l_dap_db_log_list->group_names = l_group_names; l_dap_db_log_list->group_cur = -1; l_dap_db_log_list->add_groups = a_add_groups_mask; - // there are too few items, read items right now - if(0) {//l_data_size_out <= 10) { - dap_list_t *l_list = NULL; - // read first items - size_t l_objs_count = 0; - dap_store_obj_t *l_objs = dap_chain_global_db_cond_load(GROUP_LOCAL_HISTORY, first_id, &l_objs_count); - for(size_t i = 0; i < l_objs_count; i++) { - dap_store_obj_t *l_obj_cur = l_objs + i; - dap_global_db_obj_t *l_item = DAP_NEW(dap_global_db_obj_t); - l_item->id = l_obj_cur->id; - l_item->key = dap_strdup(l_obj_cur->key); - l_item->value = (uint8_t*) dap_strdup((char*) l_obj_cur->value); - l_list = dap_list_append(l_list, l_item); - } - l_dap_db_log_list->list_write = l_list; - l_dap_db_log_list->list_read = l_list; - //log_it(L_DEBUG, "loaded items n=%d", l_data_size_out); - dap_store_obj_free(l_objs, l_objs_count); - } - // start thread for items loading - else { - l_dap_db_log_list->is_process = true; - pthread_mutex_init(&l_dap_db_log_list->list_mutex, NULL); - pthread_create(&l_dap_db_log_list->thread, NULL, s_list_thread_proc, l_dap_db_log_list); - } + l_dap_db_log_list->is_process = true; + pthread_mutex_init(&l_dap_db_log_list->list_mutex, NULL); + pthread_create(&l_dap_db_log_list->thread, NULL, s_list_thread_proc, l_dap_db_log_list); return l_dap_db_log_list; } diff --git a/modules/global-db/include/dap_chain_global_db.h b/modules/global-db/include/dap_chain_global_db.h index 4ac3c5a9a..76ae06271 100644 --- a/modules/global-db/include/dap_chain_global_db.h +++ b/modules/global-db/include/dap_chain_global_db.h @@ -119,7 +119,7 @@ char* dap_chain_global_db_hash(const uint8_t *data, size_t data_size); char* dap_chain_global_db_hash_fast(const uint8_t *data, size_t data_size); // Get data according the history log -uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out); +dap_list_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out); // Get data according the history log //char* dap_db_history_tx(dap_chain_hash_fast_t * a_tx_hash, const char *a_group_mempool); diff --git a/modules/global-db/include/dap_chain_global_db_driver.h b/modules/global-db/include/dap_chain_global_db_driver.h index c8d7ef9c5..4bccdc9e0 100644 --- a/modules/global-db/include/dap_chain_global_db_driver.h +++ b/modules/global-db/include/dap_chain_global_db_driver.h @@ -29,6 +29,8 @@ #include "dap_common.h" #include "dap_list.h" +#define DAP_CHAIN_PKT_EXPECT_SIZE 7168 + typedef struct dap_store_obj { uint64_t id; time_t timestamp; @@ -42,8 +44,9 @@ typedef struct dap_store_obj { }DAP_ALIGN_PACKED dap_store_obj_t, *pdap_store_obj_t; typedef struct dap_store_obj_pkt { - time_t timestamp; - size_t data_size; + uint64_t timestamp; + uint64_t data_size; + uint32_t obj_count; uint8_t data[]; }__attribute__((packed)) dap_store_obj_pkt_t; @@ -90,7 +93,7 @@ bool dap_chain_global_db_driver_is(const char *a_group, const char *a_key); size_t dap_chain_global_db_driver_count(const char *a_group, uint64_t id); dap_list_t* dap_chain_global_db_driver_get_groups_by_mask(const char *a_group_mask); -dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, +dap_list_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp, size_t a_store_obj_count); dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *a_pkt, size_t *a_store_obj_count); diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 8d2e991ab..3dba36268 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -268,7 +268,9 @@ void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const c l_obj->type = (uint8_t)a_op_code; DAP_DELETE(l_obj->group); l_obj->group = dap_strdup(a_group); - dap_store_obj_pkt_t *l_data_out = dap_store_packet_multiple(l_obj, l_obj->timestamp, 1); + dap_list_t *l_list_out = dap_store_packet_multiple(l_obj, l_obj->timestamp, 1); + // Expect only one element in list + dap_store_obj_pkt_t *l_data_out = (dap_store_obj_pkt_t *)l_list_out->data; dap_store_obj_free(l_obj, 1); dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb"); dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t) {}; @@ -278,7 +280,7 @@ void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const c dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_net->pub.id, l_chain_id, l_net->pub.cell_id, l_data_out, sizeof(dap_store_obj_pkt_t) + l_data_out->data_size); } - DAP_DELETE(l_data_out); + dap_list_free_full(l_list_out, free); } } -- GitLab From 7a48933efa5c1bcd9930c8cd19a974552919c15f Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Fri, 4 Sep 2020 19:45:29 +0300 Subject: [PATCH 2/4] [*] Infinite cycle in root role for sync fixed --- modules/net/dap_chain_net.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 3dba36268..bbca88442 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -391,10 +391,10 @@ static int s_net_states_proc(dap_chain_net_t * l_net) case NODE_ROLE_ARCHIVE: case NODE_ROLE_CELL_MASTER: { // Add other root nodes as synchronization links - while (dap_list_length(l_pvt_net->links_info) < s_max_links_count) { - int i = rand() % l_pvt_net->seed_aliases_count; + for (int i = 0; i < MIN(s_max_links_count, l_pvt_net->seed_aliases_count); i++) { dap_chain_node_addr_t *l_link_addr = dap_chain_node_alias_find(l_net, l_pvt_net->seed_aliases[i]); - dap_chain_node_info_read(l_net, l_link_addr); + dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(l_net, l_link_addr); + l_pvt_net->links_info = dap_list_append(l_pvt_net->links_info, l_link_node_info); } } break; case NODE_ROLE_FULL: -- GitLab From b1f5aa151c96e6a6e7ace59ac63e28feb319a976 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Sat, 5 Sep 2020 13:46:18 +0300 Subject: [PATCH 3/4] [*] Small bugfix with sync --- modules/channel/chain/dap_stream_ch_chain.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index aea46e12c..2077b7976 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -138,8 +138,8 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create(l_chain); l_ch_chain->request_atom_iter = l_iter; l_lasts = l_chain->callback_atom_iter_get_lasts(l_iter, &l_lasts_count, &l_lasts_sizes); - if(l_lasts&& l_lasts_sizes) { - for(size_t i = l_lasts_count - 1; i >= 0; i--) { + if (l_lasts && l_lasts_sizes) { + for(long int i = l_lasts_count - 1; i >= 0; i--) { dap_chain_atom_item_t * l_item = NULL; dap_chain_hash_fast_t l_atom_hash; dap_hash_fast(l_lasts[i], l_lasts_sizes[i], @@ -559,11 +559,15 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); } + break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: { dap_stream_ch_chain_sync_request_t l_sync_chains = {}; dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_chains, sizeof(l_sync_chains)); } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR: + break; default: { dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, -- GitLab From f23bbc3c061acb1fa3d7e97a4f2dbea8b8d74573 Mon Sep 17 00:00:00 2001 From: Roman Khlopkov <roman.khlopkov@demlabs.net> Date: Mon, 7 Sep 2020 20:46:50 +0300 Subject: [PATCH 4/4] [*] Fragmentation GDB packets debugged --- dap-sdk/net/core/dap_worker.c | 9 +- dap-sdk/net/server-udp/dap_udp_server.c | 9 +- .../net/server/http_server/dap_http_folder.c | 1 + .../net/server/http_server/dap_http_simple.c | 7 +- .../http_server/http_client/dap_http_client.c | 4 +- modules/channel/chain/dap_stream_ch_chain.c | 517 +++++++++--------- .../chain/include/dap_stream_ch_chain.h | 3 +- .../chain/include/dap_stream_ch_chain_pkt.h | 2 +- modules/net/dap_chain_net.c | 4 +- modules/service/vpn/dap_chain_net_srv_vpn.c | 2 + 10 files changed, 282 insertions(+), 276 deletions(-) diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 68d1b2019..b3611126c 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -256,14 +256,15 @@ void *dap_worker_thread(void *arg) // Socket is ready to write if(((l_epoll_events[n].events & EPOLLOUT) || (l_cur->flags & DAP_SOCK_READY_TO_WRITE)) && !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE)) { - //log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size); + + //log_it(L_DEBUG, "Main loop output: %u bytes to send", l_cur->buf_out_size); if(l_cur->callbacks.write_callback) l_cur->callbacks.write_callback(l_cur, NULL); // Call callback to process write event + if (l_cur->worker == NULL ){ // esocket was unassigned in callback, we don't need any ops with it now, // continue to poll another esockets continue; } - if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) { static const uint32_t buf_out_zero_count_max = 2; @@ -307,15 +308,11 @@ void *dap_worker_thread(void *arg) if (l_errno != EAGAIN && l_errno != EWOULDBLOCK ){ // If we have non-blocking socket log_it(L_ERROR, "Some error occured in send(): %s", strerror(errno)); l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; - break; } }else{ //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", l_bytes_sent,l_cur->buf_out_size); - //} - //log_it(L_DEBUG,"Output: sent %u bytes",total_sent); if (l_bytes_sent) { l_cur->buf_out_size -= l_bytes_sent; - //log_it(L_DEBUG,"Output: left %u bytes in buffer",l_cur->buf_out_size); if (l_cur->buf_out_size) { memmove(l_cur->buf_out, &l_cur->buf_out[l_bytes_sent], l_cur->buf_out_size); } else { diff --git a/dap-sdk/net/server-udp/dap_udp_server.c b/dap-sdk/net/server-udp/dap_udp_server.c index d02ac6a62..26a0fc39b 100644 --- a/dap-sdk/net/server-udp/dap_udp_server.c +++ b/dap-sdk/net/server-udp/dap_udp_server.c @@ -182,14 +182,7 @@ static void write_cb( EPOLL_HANDLE efd, int revents, dap_server_t *sh ) dap_events_socket_t *client = udp_client->esocket; if( client != NULL && !check_close(client) && (client->flags & DAP_SOCK_READY_TO_WRITE) ) { - - if ( sh->client_callbacks.write_callback ) - sh->client_callbacks.write_callback( client, NULL ); - if ( client->buf_out_size > 0 ) { - - - struct sockaddr_in addr; addr.sin_family = AF_INET; dap_udp_client_get_address( client, (unsigned int *)&addr.sin_addr.s_addr, &addr.sin_port ); @@ -211,6 +204,8 @@ static void write_cb( EPOLL_HANDLE efd, int revents, dap_server_t *sh ) sb_payload_ready = false; } LL_DELETE( udp->waiting_clients, udp_client ); + if ( sh->client_callbacks.write_callback ) + sh->client_callbacks.write_callback( client, NULL ); } else if( client == NULL ) { LL_DELETE( udp->waiting_clients, udp_client ); diff --git a/dap-sdk/net/server/http_server/dap_http_folder.c b/dap-sdk/net/server/http_server/dap_http_folder.c index d16e8a0d4..857f39775 100644 --- a/dap-sdk/net/server/http_server/dap_http_folder.c +++ b/dap-sdk/net/server/http_server/dap_http_folder.c @@ -297,6 +297,7 @@ void dap_http_folder_data_write(dap_http_client_t * cl_ht, void * arg) dap_http_file_t * cl_ht_file= DAP_HTTP_FILE(cl_ht); cl_ht->esocket->buf_out_size=fread(cl_ht->esocket->buf_out,1,sizeof(cl_ht->esocket->buf_out),cl_ht_file->fd); cl_ht_file->position+=cl_ht->esocket->buf_out_size; + dap_events_socket_set_writable_unsafe(cl_ht->esocket, true); if(feof(cl_ht_file->fd)!=0){ log_it(L_INFO, "All the file %s is sent out",cl_ht_file->local_path); diff --git a/dap-sdk/net/server/http_server/dap_http_simple.c b/dap-sdk/net/server/http_server/dap_http_simple.c index 3e43bacb7..9428a6f00 100644 --- a/dap-sdk/net/server/http_server/dap_http_simple.c +++ b/dap-sdk/net/server/http_server/dap_http_simple.c @@ -228,14 +228,10 @@ inline static bool _is_supported_user_agents_list_setted() inline static void _set_only_write_http_client_state(dap_http_client_t* http_client) { // log_it(L_DEBUG,"_set_only_write_http_client_state"); -// Sleep(300); - - http_client->esocket->flags = DAP_SOCK_READY_TO_WRITE; // To not to touch epoll_fd we clean flags by ourself -// http_client->state_write=DAP_HTTP_CLIENT_STATE_NONE; http_client->state_write=DAP_HTTP_CLIENT_STATE_START; dap_events_socket_set_writable_unsafe(http_client->esocket,true); -// http_client->state_write=DAP_HTTP_CLIENT_STATE_START; + dap_events_socket_set_readable_unsafe(http_client->esocket, false); } static void _copy_reply_and_mime_to_response( dap_http_simple_t *cl_sh ) @@ -380,6 +376,7 @@ static void s_http_client_data_write( dap_http_client_t * a_http_client, void *a l_http_simple->reply_sent += dap_events_socket_write_unsafe( a_http_client->esocket, l_http_simple->reply_byte + l_http_simple->reply_sent, a_http_client->out_content_length - l_http_simple->reply_sent ); + dap_events_socket_set_writable_unsafe(a_http_client->esocket, true); if ( l_http_simple->reply_sent >= a_http_client->out_content_length ) { log_it(L_INFO, "All the reply (%u) is sent out", a_http_client->out_content_length ); diff --git a/dap-sdk/net/server/http_server/http_client/dap_http_client.c b/dap-sdk/net/server/http_server/http_client/dap_http_client.c index b5add239f..ef122b7a5 100644 --- a/dap-sdk/net/server/http_server/http_client/dap_http_client.c +++ b/dap-sdk/net/server/http_server/http_client/dap_http_client.c @@ -498,7 +498,7 @@ void dap_http_client_write( dap_events_socket_t * cl, void *arg ) log_it( L_INFO," HTTP response with %u status code", l_http_client->reply_status_code ); dap_events_socket_write_f_unsafe(cl, "HTTP/1.1 %u %s\r\n",l_http_client->reply_status_code, l_http_client->reply_reason_phrase[0] ? l_http_client->reply_reason_phrase : http_status_reason_phrase(l_http_client->reply_status_code) ); - + dap_events_socket_set_writable_unsafe(cl, true); dap_http_client_out_header_generate( l_http_client ); l_http_client->state_write = DAP_HTTP_CLIENT_STATE_HEADERS; } break; @@ -508,6 +508,7 @@ void dap_http_client_write( dap_events_socket_t * cl, void *arg ) if ( hdr == NULL ) { log_it(L_DEBUG, "Output: headers are over (reply status code %u)",l_http_client->reply_status_code); dap_events_socket_write_f_unsafe(cl, "\r\n"); + dap_events_socket_set_writable_unsafe(cl, true); if ( l_http_client->out_content_length || l_http_client->out_content_ready ) { l_http_client->state_write=DAP_HTTP_CLIENT_STATE_DATA; } else { @@ -520,6 +521,7 @@ void dap_http_client_write( dap_events_socket_t * cl, void *arg ) } else { //log_it(L_WARNING,"Output: header %s: %s",hdr->name,hdr->value); dap_events_socket_write_f_unsafe(cl, "%s: %s\r\n", hdr->name, hdr->value); + dap_events_socket_set_writable_unsafe(cl, true); dap_http_header_remove( &l_http_client->out_headers, hdr ); } } break; diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 2077b7976..fe8116937 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -138,12 +138,12 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create(l_chain); l_ch_chain->request_atom_iter = l_iter; l_lasts = l_chain->callback_atom_iter_get_lasts(l_iter, &l_lasts_count, &l_lasts_sizes); + log_it(L_INFO, "Found %d atoms for synchronization", l_lasts_count); if (l_lasts && l_lasts_sizes) { for(long int i = l_lasts_count - 1; i >= 0; i--) { dap_chain_atom_item_t * l_item = NULL; dap_chain_hash_fast_t l_atom_hash; - dap_hash_fast(l_lasts[i], l_lasts_sizes[i], - &l_atom_hash); + dap_hash_fast(l_lasts[i], l_lasts_sizes[i], &l_atom_hash); pthread_mutex_lock(&l_ch_chain->mutex); HASH_FIND(hh, l_ch_chain->request_atoms_lasts, &l_atom_hash, sizeof(l_atom_hash), l_item); @@ -207,6 +207,7 @@ bool s_sync_gdb_callback(dap_proc_thread_t *a_thread, void *a_arg) //log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list)); // Add it to outgoing list l_ch_chain->request_global_db_trs = l_db_log; + l_ch_chain->db_iter = NULL; l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB; } else { dap_stream_ch_chain_sync_request_t l_request = {}; @@ -214,13 +215,12 @@ bool s_sync_gdb_callback(dap_proc_thread_t *a_thread, void *a_arg) l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); + l_ch_chain->request_net_id, l_ch_chain->request_chain_id, + l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); l_ch_chain->state = CHAIN_STATE_IDLE; if(l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, - DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - NULL, 0, l_ch_chain->callback_notify_arg); + l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + NULL, 0, l_ch_chain->callback_notify_arg); } //log_it(L_INFO, "Prepared %u items for sync", l_db_log->items_number - l_request->id_start); // go to send data from list [in s_stream_ch_packet_out()] @@ -381,91 +381,75 @@ bool s_gdb_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) */ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) { - //static char *s_net_name = NULL; dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); - if(l_ch_chain) { - dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg; - dap_stream_ch_chain_pkt_t * l_chain_pkt = (dap_stream_ch_chain_pkt_t *) l_ch_pkt->data; - uint8_t l_acl_idx = dap_chain_net_acl_idx_by_id(l_chain_pkt->hdr.net_id); - bool l_error = false; - char l_err_str[64]; - if (l_acl_idx == (uint8_t)-1) { - log_it(L_ERROR, "Invalid net id in packet"); - strcpy(l_err_str, "ERROR_NET_INVALID_ID"); - l_error = true; - } - if (!l_error && a_ch->stream->session->acl && !a_ch->stream->session->acl[l_acl_idx]) { - log_it(L_WARNING, "Unauthorized request attempt to network %s", - dap_chain_net_by_id(l_chain_pkt->hdr.net_id)->pub.name); - strcpy(l_err_str, "ERROR_NET_NOT_AUTHORIZED"); - l_error = true; - } - if (l_error) { + if (!l_ch_chain) { + return; + } + dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg; + dap_stream_ch_chain_pkt_t * l_chain_pkt = (dap_stream_ch_chain_pkt_t *) l_ch_pkt->data; + if (!l_chain_pkt) { + return; + } + size_t l_chain_pkt_data_size = l_ch_pkt->hdr.size - sizeof(l_chain_pkt->hdr); + uint8_t l_acl_idx = dap_chain_net_acl_idx_by_id(l_chain_pkt->hdr.net_id); + if (l_acl_idx == (uint8_t)-1) { + log_it(L_ERROR, "Invalid net id in packet"); + if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR) { + if(l_ch_chain->callback_notify_packet_in) { + l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt, + l_chain_pkt_data_size, l_ch_chain->callback_notify_arg); + } + } else { dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, l_err_str); + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_NET_INVALID_ID"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); } - size_t l_chain_pkt_data_size = l_ch_pkt->hdr.size - sizeof(l_chain_pkt->hdr); - if (!l_error && l_chain_pkt) { - switch (l_ch_pkt->hdr.type) { - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: { - log_it(L_INFO, "In: SYNCED_ALL pkt"); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: { - log_it(L_INFO, "In: SYNCED_GLOBAL_DB pkt"); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP: { - log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt"); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: { - log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt"); - } - break; + return; + } + if (a_ch->stream->session->acl && !a_ch->stream->session->acl[l_acl_idx]) { + log_it(L_WARNING, "Unauthorized request attempt to network %s", + dap_chain_net_by_id(l_chain_pkt->hdr.net_id)->pub.name); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_NET_NOT_AUTHORIZED"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + return; + } + switch (l_ch_pkt->hdr.type) { + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: { + log_it(L_INFO, "In: SYNCED_ALL pkt"); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: { + log_it(L_INFO, "In: SYNCED_GLOBAL_DB pkt"); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP: { + log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt"); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: { + log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt"); + } + break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { - log_it(L_INFO, "In: SYNCED_CHAINS pkt"); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: { - log_it(L_INFO, "In: SYNC_CHAINS pkt"); - dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); - if(l_chain) { - if(l_ch_chain->state != CHAIN_STATE_IDLE) { - log_it(L_INFO, "Can't process SYNC_CHAINS request because not in idle state"); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_STATE_NOT_IN_IDLE"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } else { - // fill ids - if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { - dap_stream_ch_chain_sync_request_t * l_request = - (dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data; - memcpy(&l_ch_chain->request, l_request, l_chain_pkt_data_size); - memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); - memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); - memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); - } - dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_chains_callback, a_ch); - } - } - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: { - log_it(L_INFO, "In: SYNC_GLOBAL_DB pkt"); - if(l_ch_chain->state != CHAIN_STATE_IDLE) { - log_it(L_INFO, "Can't process SYNC_GLOBAL_DB request because not in idle state"); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_STATE_NOT_IN_IDLE"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - break; - } - // receive the latest global_db revision of the remote node -> go to send mode + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { + log_it(L_INFO, "In: SYNCED_CHAINS pkt"); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: { + log_it(L_INFO, "In: SYNC_CHAINS pkt"); + dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); + if(l_chain) { + if(l_ch_chain->state != CHAIN_STATE_IDLE) { + log_it(L_INFO, "Can't process SYNC_CHAINS request because not in idle state"); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_STATE_NOT_IN_IDLE"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + } else { + // fill ids if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { dap_stream_ch_chain_sync_request_t * l_request = (dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data; @@ -473,114 +457,138 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); - dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_gdb_callback, a_ch); - } - else { - log_it(L_ERROR, "Get DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB session_id=%u bad request", - a_ch->stream->session->id); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_SYNC_GLOBAL_DB_REQUEST_BAD"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } } - break; - // first packet of data with source node address - case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN: { - log_it(L_INFO, "In: FIRST_CHAIN data_size=%d", l_chain_pkt_data_size); - if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)) - memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size); + dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); + dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_chains_callback, a_ch); } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN: { - //log_it(L_INFO, "In: CHAIN pkt"); - dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); - if(l_chain) { - // Expect atom element in - if(l_chain_pkt_data_size > 0) { - memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); - memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); - memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); - l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size); - memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); - l_ch_chain->pkt_data_size = l_chain_pkt_data_size; - dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_chain_pkt_callback, a_ch); - } else { - log_it(L_WARNING, "Empty chain packet"); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_CHAIN_PACKET_EMPTY"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } - } - } - break; - // first packet of data with source node address - case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB: { - log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d", l_chain_pkt_data_size); - if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)) - memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB: { - //log_it(L_INFO, "In: GLOBAL_DB data_size=%d", l_chain_pkt_data_size); - // get transaction and save it to global_db - if(l_chain_pkt_data_size > 0) { - memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); - memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); - memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); - l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size); - memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); - l_ch_chain->pkt_data_size = l_chain_pkt_data_size; - dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_gdb_pkt_callback, a_ch); - } else { - log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size"); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_GLOBAL_DB_PACKET_EMPTY"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS: { - dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; - memcpy(&l_sync_gdb, l_chain_pkt->data, l_chain_pkt_data_size); - dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id); - l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ? - dap_chain_net_get_cur_addr(l_net)->uint64 : - dap_db_get_cur_node_addr(l_net->pub.name); - // Get last timestamp in log - l_sync_gdb.id_start = (uint64_t) dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); - // no limit - l_sync_gdb.id_end = (uint64_t)0; - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: { - dap_stream_ch_chain_sync_request_t l_sync_chains = {}; - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_chains, sizeof(l_sync_chains)); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR: - break; - default: { + } + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: { + log_it(L_INFO, "In: SYNC_GLOBAL_DB pkt"); + if(l_ch_chain->state != CHAIN_STATE_IDLE) { + log_it(L_INFO, "Can't process SYNC_GLOBAL_DB request because not in idle state"); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_STATE_NOT_IN_IDLE"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + break; + } + // receive the latest global_db revision of the remote node -> go to send mode + if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { + dap_stream_ch_chain_sync_request_t * l_request = + (dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data; + memcpy(&l_ch_chain->request, l_request, l_chain_pkt_data_size); + memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); + memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); + memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); + dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); + dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_gdb_callback, a_ch); + } + else { + log_it(L_ERROR, "Get DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB session_id=%u bad request", + a_ch->stream->session->id); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_SYNC_GLOBAL_DB_REQUEST_BAD"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + } + } + break; + // first packet of data with source node address + case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN: { + log_it(L_INFO, "In: FIRST_CHAIN data_size=%d", l_chain_pkt_data_size); + if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)) + memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN: { + log_it(L_INFO, "In: CHAIN pkt data_size=%d", l_chain_pkt_data_size); + dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); + if(l_chain) { + // Expect atom element in + if(l_chain_pkt_data_size > 0) { + memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); + memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); + memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); + l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size); + memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); + l_ch_chain->pkt_data_size = l_chain_pkt_data_size; + dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); + dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_chain_pkt_callback, a_ch); + } else { + log_it(L_WARNING, "Empty chain packet"); dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_UNKNOWN_CHAIN_PKT_TYPE"); - } + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_CHAIN_PACKET_EMPTY"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); } - if(l_ch_chain->callback_notify_packet_in) - l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt, - l_chain_pkt_data_size, l_ch_chain->callback_notify_arg); } } + break; + // first packet of data with source node address + case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB: { + log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d", l_chain_pkt_data_size); + if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)) + memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB: { + log_it(L_INFO, "In: GLOBAL_DB data_size=%d", l_chain_pkt_data_size); + // get transaction and save it to global_db + if(l_chain_pkt_data_size > 0) { + memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); + memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); + memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); + l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size); + memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); + l_ch_chain->pkt_data_size = l_chain_pkt_data_size; + dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); + dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_gdb_pkt_callback, a_ch); + } else { + log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size"); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_GLOBAL_DB_PACKET_EMPTY"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + } + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS: { + dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; + memcpy(&l_sync_gdb, l_chain_pkt->data, l_chain_pkt_data_size); + dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id); + l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ? + dap_chain_net_get_cur_addr(l_net)->uint64 : + dap_db_get_cur_node_addr(l_net->pub.name); + // Get last timestamp in log + l_sync_gdb.id_start = (uint64_t) dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); + // no limit + l_sync_gdb.id_end = (uint64_t)0; + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: { + dap_stream_ch_chain_sync_request_t l_sync_chains = {}; + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_chains, sizeof(l_sync_chains)); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR: + break; + default: { + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_UNKNOWN_CHAIN_PKT_TYPE"); + } + } + if(l_ch_chain->callback_notify_packet_in) + l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt, + l_chain_pkt_data_size, l_ch_chain->callback_notify_arg); } + /** * @brief dap_stream_ch_chain_go_idle * @param a_ch_chain @@ -607,6 +615,29 @@ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain) pthread_mutex_unlock(&a_ch_chain->mutex); } +bool s_process_gdb_iter(dap_stream_ch_t *a_ch) +{ + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); + dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs; + dap_store_obj_pkt_t *l_pkt = (dap_store_obj_pkt_t *)l_ch_chain->db_iter->data; + uint32_t l_pkt_size = sizeof(dap_store_obj_pkt_t) + l_pkt->data_size; + log_it(L_INFO, "Send one global_db record packet len=%d (rest=%d/%d items)", l_pkt_size, + dap_db_log_list_get_count_rest(l_db_list), dap_db_log_list_get_count(l_db_list)); + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, + l_ch_chain->request_net_id, l_ch_chain->request_chain_id, + l_ch_chain->request_cell_id, l_pkt, l_pkt_size); + dap_list_t *l_iter = dap_list_next(l_ch_chain->db_iter); + if (l_iter) { + l_ch_chain->db_iter = l_iter; + } else { + l_ch_chain->stats_request_gdb_processed++; + l_ch_chain->db_iter = dap_list_first(l_ch_chain->db_iter); + dap_list_free_full(l_ch_chain->db_iter, free); + l_ch_chain->db_iter = NULL; + } + return true; +} + bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) { UNUSED(a_thread); @@ -617,81 +648,56 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) // log_it( L_DEBUG,"l_ch_chain %X", l_ch_chain ); bool l_packet_out = false; switch (l_ch_chain->state) { + case CHAIN_STATE_IDLE: { dap_stream_ch_chain_go_idle(l_ch_chain); } break; - case CHAIN_STATE_SYNC_ALL: - case CHAIN_STATE_SYNC_GLOBAL_DB: { - - // Get log diff - dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs; - dap_global_db_obj_t *l_obj = dap_db_log_list_get(l_db_list); - - bool l_is_stop = true; - while(l_obj) { - size_t l_item_size_out = 0; - dap_list_t *l_item = dap_db_log_pack(l_obj, &l_item_size_out); - // Item not found, maybe it has deleted? Then go to the next item - if(!l_item || !l_item_size_out) { - l_item_size_out = 0; - l_obj = dap_db_log_list_get(l_db_list); - } - else { - /*size_t l_items_total = dap_db_log_list_get_count(l_db_list); - size_t l_items_rest = dap_db_log_list_get_count_rest(l_db_list); - log_it(L_INFO, "Send one global_db record data=0x%x len=%d (rest=%d/%d items)", l_item, l_item_size_out, - l_items_rest, l_items_total);*/ - for (dap_list_t *l_iter = l_item; l_iter; l_iter = dap_list_next(l_iter)) { - dap_store_obj_pkt_t *l_pkt = (dap_store_obj_pkt_t *)l_iter->data; - uint32_t l_pkt_size = sizeof(dap_store_obj_pkt_t) + l_pkt->data_size; - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, l_pkt, l_pkt_size); + if (l_ch_chain->db_iter) { + l_packet_out = s_process_gdb_iter(l_ch); + } else { + dap_global_db_obj_t *l_obj; + do { // Get log diff + size_t l_item_size_out = 0; + l_obj = dap_db_log_list_get(l_ch_chain->request_global_db_trs); + l_ch_chain->db_iter = dap_db_log_pack(l_obj, &l_item_size_out); + if (l_ch_chain->db_iter && l_item_size_out) { + break; } + // Item not found, maybe it has deleted? Then go to the next item + } while (l_obj); + if (l_ch_chain->db_iter) { + l_packet_out = s_process_gdb_iter(l_ch); + } else { + //log_it(L_DEBUG, "l_obj == 0, STOP"); + // free log list + dap_db_log_list_delete(l_ch_chain->request_global_db_trs); + l_ch_chain->request_global_db_trs = NULL; + // last message + dap_stream_ch_chain_sync_request_t l_request = {}; + dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); + l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; + l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); + l_request.id_end = 0; + + log_it( L_DEBUG,"Syncronized database: last id %llu, items syncronyzed %llu ", l_request.id_start, + l_ch_chain->stats_request_gdb_processed ); + + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + l_ch_chain->request_net_id, l_ch_chain->request_chain_id, + l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); l_packet_out = true; - l_ch_chain->stats_request_gdb_processed++; - dap_list_free_full(l_item, free); - // sent the record, another will be sent - l_is_stop = false; - break; - } - } - if(l_is_stop){ - //log_it(L_DEBUG, "l_obj == 0, STOP"); - // free log list - l_ch_chain->request_global_db_trs = NULL; - dap_db_log_list_delete(l_db_list); - // last message - dap_stream_ch_chain_sync_request_t l_request = {}; - dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); - l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; - l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); - l_request.id_end = 0; - - log_it( L_DEBUG,"Syncronized database: last id %llu, items syncronyzed %llu ", l_request.id_start, - l_ch_chain->stats_request_gdb_processed ); - - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); - l_packet_out = true; - - if(l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - NULL, 0, l_ch_chain->callback_notify_arg); - - if(l_ch_chain->state != CHAIN_STATE_SYNC_ALL) + if(l_ch_chain->callback_notify_packet_out) + l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + NULL, 0, l_ch_chain->callback_notify_arg); dap_stream_ch_chain_go_idle(l_ch_chain); + } } + } break; - } - if(l_ch_chain->state != CHAIN_STATE_SYNC_ALL) - break; - - // Synchronize chains + // Synchronize chains case CHAIN_STATE_SYNC_CHAINS: { //log_it(L_DEBUG, "CHAIN_STATE_SYNC_CHAINS"); dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); @@ -730,6 +736,7 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) // Then flush it out to the remote size_t l_atom_size = l_atom_item->atom_size; + log_it(L_INFO, "Send one chain packet len=%d", l_atom_size); dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, l_atom_item->atom, l_atom_size); @@ -766,8 +773,8 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) } pthread_mutex_unlock(&l_ch_chain->mutex); } - } - break; + } break; + default: break; } if (l_packet_out) { dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); @@ -784,6 +791,10 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) { (void) a_arg; + if (a_ch->stream->esocket->buf_out_size > DAP_EVENTS_SOCKET_BUF / 2) { + return; + } + dap_stream_ch_set_ready_to_write_unsafe(a_ch, false); dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); if (l_ch_chain && l_ch_chain->state != CHAIN_STATE_IDLE) { dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index 2d1ef9751..9e576b765 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -49,7 +49,8 @@ typedef struct dap_stream_ch_chain { pthread_mutex_t mutex; dap_stream_ch_t * ch; - dap_db_log_list_t *request_global_db_trs; // list of transactions + dap_db_log_list_t *request_global_db_trs; // list of global db records + dap_list_t *db_iter; dap_stream_ch_chain_state_t state; dap_chain_atom_iter_t * request_atom_iter; diff --git a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h index 5091c1553..a56e07c49 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h +++ b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h @@ -57,7 +57,7 @@ typedef enum dap_stream_ch_chain_state{ CHAIN_STATE_IDLE=0, CHAIN_STATE_SYNC_CHAINS, CHAIN_STATE_SYNC_GLOBAL_DB, - CHAIN_STATE_SYNC_ALL, + CHAIN_STATE_SYNC_ALL } dap_stream_ch_chain_state_t; typedef struct dap_stream_ch_chain_sync_request{ diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index bbca88442..2e4296197 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -496,7 +496,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net) } // wait for finishing of request - int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms + int timeout_ms = 30000; // 5 min = 300 sec = 300 000 ms // TODO add progress info to console l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); switch (l_res) { @@ -556,7 +556,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net) dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id, l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request)); // wait for finishing of request - int timeout_ms = 120000; // 2 min = 120 sec = 120 000 ms + int timeout_ms = 20000; // 2 min = 120 sec = 120 000 ms // TODO add progress info to console if (dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id())) { l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index ae7385b02..4edf2abcb 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -1223,6 +1223,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) assert(l_tun); // Unsafely send it dap_events_socket_write_unsafe( l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size); + dap_events_socket_set_writable_unsafe(l_tun->es, true); } break; // for servier only @@ -1231,6 +1232,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) assert(l_tun); // Unsafely send it size_t l_ret = dap_events_socket_write_unsafe( l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size); + dap_events_socket_set_writable_unsafe(l_tun->es, true); if( l_ret) s_update_limits (a_ch, l_srv_session, l_usage,l_ret ); } break; -- GitLab