diff --git a/dap-sdk/crypto/include/dap_hash.h b/dap-sdk/crypto/include/dap_hash.h index fc557c3e510949ce15811ffe84ca9ab393e6cf1e..8c6c1aff9b26859270b2d411a6cd0a484fddb552 100755 --- a/dap-sdk/crypto/include/dap_hash.h +++ b/dap-sdk/crypto/include/dap_hash.h @@ -46,7 +46,7 @@ typedef enum dap_hash_type { typedef union dap_chain_hash_fast{ uint8_t raw[DAP_CHAIN_HASH_FAST_SIZE]; -} dap_chain_hash_fast_t; +} DAP_ALIGN_PACKED dap_chain_hash_fast_t; typedef dap_chain_hash_fast_t dap_hash_fast_t; #ifdef __cplusplus diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 3f005a9d31632944a8438e03230e89b32ff5d42f..b8f5ab9997a637e62bc1c3df9852063ae3c9036e 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -1348,14 +1348,14 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *a_esocket, bool a_esocket->flags |= DAP_SOCK_READY_TO_WRITE; #ifdef DAP_OS_WINDOWS if (a_esocket->type == DESCRIPTOR_TYPE_QUEUE) - a_esocket->flags |= EPOLLONESHOT; + a_esocket->ev_base_flags |= EPOLLONESHOT; #endif } else { a_esocket->flags ^= DAP_SOCK_READY_TO_WRITE; #ifdef DAP_OS_WINDOWS if (a_esocket->type == DESCRIPTOR_TYPE_QUEUE) - a_esocket->flags ^= EPOLLONESHOT; + a_esocket->ev_base_flags ^= EPOLLONESHOT; #endif } if( a_esocket->worker ) diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index fce5c3f648450205105b7d8407b8f02a0e754053..ea5b4f9f23c421bb32c5d1e19624f2dd5eb2489e 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -114,6 +114,7 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_thread, void *a_arg); static bool s_debug_more=false; +static uint_fast16_t s_update_pack_size=100; // Number of hashes packed into the one packet /** * @brief dap_stream_ch_chain_init * @return @@ -124,6 +125,7 @@ int dap_stream_ch_chain_init() dap_stream_ch_proc_add(dap_stream_ch_chain_get_id(), s_stream_ch_new, s_stream_ch_delete, s_stream_ch_packet_in, s_stream_ch_packet_out); s_debug_more = dap_config_get_item_bool_default(g_config,"stream_ch_chain","debug_more",false); + s_update_pack_size = dap_config_get_item_int16_default(g_config,"stream_ch_chain","update_pack_size",100); return 0; } @@ -174,6 +176,32 @@ static void s_sync_request_delete(struct sync_request * a_sync_request) DAP_DELETE(a_sync_request); } +/** + * @brief s_stream_ch_delete_in_proc + * @param a_thread + * @param a_arg + * @return + */ +static bool s_stream_ch_delete_in_proc(dap_proc_thread_t * a_thread, void * a_arg) +{ + (void) a_thread; + dap_stream_ch_chain_t * l_ch_chain=(dap_stream_ch_chain_t*) a_arg; + dap_stream_ch_chain_hash_item_t * l_item, *l_tmp; + + // Clear remote atoms + HASH_ITER(hh, l_ch_chain->remote_atoms, l_item, l_tmp){ + HASH_DEL(l_ch_chain->remote_atoms, l_item); + DAP_DELETE(l_item); + } + // Clear remote gdbs + HASH_ITER(hh, l_ch_chain->remote_gdbs, l_item, l_tmp){ + HASH_DEL(l_ch_chain->remote_gdbs, l_item); + DAP_DELETE(l_item); + } + DAP_DELETE(l_ch_chain); + return true; +} + /** * @brief s_stream_ch_delete * @param ch @@ -182,9 +210,8 @@ static void s_sync_request_delete(struct sync_request * a_sync_request) static void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg) { (void) a_arg; - (void) a_ch; - //dap_stream_ch_chain_t * l_ch_chain=DAP_STREAM_CH_CHAIN(a_ch); - + dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input,s_stream_ch_delete_in_proc,a_ch->internal ); + a_ch->internal = NULL; // To prevent its cleaning in worker } /** @@ -216,8 +243,8 @@ static void s_sync_out_chains_first_worker_callback(dap_worker_t *a_worker, void log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN"); dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN, - l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, - l_ch_chain->request_hdr.cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, &l_node_addr, sizeof(dap_chain_node_addr_t)); DAP_DELETE(l_sync_request); } @@ -245,8 +272,8 @@ static void s_sync_out_chains_last_worker_callback(dap_worker_t *a_worker, void if (s_debug_more ) log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS"); dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, - l_sync_request->request_hdr.net_id, l_sync_request->request_hdr.chain_id, - l_sync_request->request_hdr.cell_id, &l_request, sizeof(l_request)); + l_sync_request->request_hdr.net_id.uint64, l_sync_request->request_hdr.chain_id.uint64, + l_sync_request->request_hdr.cell_id.uint64, &l_request, sizeof(l_request)); if (l_ch_chain->request_atom_iter) DAP_DEL_Z(l_ch_chain->request_atom_iter); @@ -313,8 +340,8 @@ static void s_sync_out_gdb_first_gdb_worker_callback(dap_worker_t *a_worker, voi if (s_debug_more ) log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB"); dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch , DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, - l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, - l_ch_chain->request_hdr.cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, &l_node_addr, sizeof(dap_chain_node_addr_t)); if(l_ch_chain->callback_notify_packet_out) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, NULL, 0, l_ch_chain->callback_notify_arg); @@ -340,8 +367,8 @@ static void s_sync_out_gdb_synced_data_worker_callback(dap_worker_t *a_worker, v if (s_debug_more ) log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB"); dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, - l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request)); l_ch_chain->state = CHAIN_STATE_IDLE; if(l_ch_chain->callback_notify_packet_out) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, @@ -524,8 +551,8 @@ static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_worker, void *a_a UNUSED(a_worker); struct ch_chain_pkt_in * l_pkt_in = (struct ch_chain_pkt_in*) a_arg; - dap_stream_ch_chain_pkt_write_error_unsafe(l_pkt_in->ch, l_pkt_in->pkt->pkt_hdr.net_id, - l_pkt_in->pkt->pkt_hdr.chain_id, l_pkt_in->pkt->pkt_hdr.cell_id, + dap_stream_ch_chain_pkt_write_error_unsafe(l_pkt_in->ch, l_pkt_in->pkt->pkt_hdr.net_id.uint64, + l_pkt_in->pkt->pkt_hdr.chain_id.uint64, l_pkt_in->pkt->pkt_hdr.cell_id.uint64, "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED"); dap_stream_ch_set_ready_to_write_unsafe(l_pkt_in->ch, true); DAP_DELETE(l_pkt_in); @@ -678,42 +705,56 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if (!l_chain_pkt) { return; } - size_t l_chain_pkt_data_size = l_ch_pkt->hdr.size - sizeof(l_chain_pkt->hdr); - uint16_t l_acl_idx = dap_chain_net_acl_idx_by_id(l_chain_pkt->hdr.net_id); + if (l_ch_pkt->hdr.size< sizeof (l_chain_pkt->hdr) ){ + log_it(L_ERROR, "Corrupted packet: too small size %zd, smaller then header size %zd", l_ch_pkt->hdr.size, + sizeof(l_chain_pkt->hdr)); + + } + + size_t l_chain_pkt_data_size = l_ch_pkt->hdr.size-sizeof (l_chain_pkt->hdr) ; + uint16_t l_acl_idx = dap_chain_net_acl_idx_by_id(l_chain_pkt->hdr.net_id ); if (l_acl_idx == (uint16_t)-1) { - log_it(L_ERROR, "Invalid net id 0x%016x in packet", l_chain_pkt->hdr.net_id); if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR) { if(l_ch_chain->callback_notify_packet_in) { l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt, l_chain_pkt_data_size, l_ch_chain->callback_notify_arg); } } else { - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + + log_it(L_ERROR, "Invalid request from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x" chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", a_ch->stream->esocket->remote_addr_str? + a_ch->stream->esocket->remote_addr_str: "<unknown>", l_chain_pkt->hdr.ext_id, + l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, + l_chain_pkt->hdr.cell_id.uint64); + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_NET_INVALID_ID"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + // Who are you? I don't know you! go away! + a_ch->stream->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; } return; } if (a_ch->stream->session->acl && !a_ch->stream->session->acl[l_acl_idx]) { - log_it(L_WARNING, "Unauthorized request attempt to network %s", + log_it(L_WARNING, "Unauthorized request attempt from %s to network %s", a_ch->stream->esocket->remote_addr_str? + a_ch->stream->esocket->remote_addr_str: "<unknown>", dap_chain_net_by_id(l_chain_pkt->hdr.net_id)->pub.name); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_NET_NOT_AUTHORIZED"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); return; } switch (l_ch_pkt->hdr.type) { /// --- GDB update --- // Request for gdbs list update case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ:{ - l_ch_chain->state = CHAIN_STATE_UPDATE_GLOBAL_DB; - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); }break; // Response with metadata organized in TSD case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD:{ + }break; + // If requested - begin to send atom hashes + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_START:{ + l_ch_chain->state = CHAIN_STATE_UPDATE_GLOBAL_DB_REMOTE; + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); }break; // Response with gdb element hashes and sizes case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB:{ @@ -737,19 +778,19 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) }break; // End of response case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END:{ - l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS ; // Switch on update chains hashes + l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS_REMOTE ; // Switch on update chains hashes dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ , - l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, - l_ch_chain->request_hdr.cell_id, NULL, 0); + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, NULL, 0); }break; /// --- Chains update --- // Request for atoms list update case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ:{ if(l_ch_chain->is_on_request){ - log_it(L_WARNING, "Can't process SYNC_CHAINS request because its already busy with syncronization"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + log_it(L_WARNING, "Can't process UPDATE_CHAINS_REQ request because its already busy with syncronization"); + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); break; @@ -763,38 +804,56 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64); dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if(l_chain) { - if(l_ch_chain->state != CHAIN_STATE_IDLE) { - l_ch_chain->is_on_request=false; - log_it(L_INFO, "Can't process SYNC_CHAINS request because not in idle state"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_STATE_NOT_IN_IDLE"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } else { - struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request); - l_sync_request->ch = a_ch; - l_sync_request->worker = a_ch->stream_worker->worker; - l_sync_request->remote_atoms = l_ch_chain->remote_atoms; - l_ch_chain->remote_atoms = NULL; + struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request); + l_sync_request->ch = a_ch; + l_sync_request->worker = a_ch->stream_worker->worker; + l_sync_request->remote_atoms = l_ch_chain->remote_atoms; + l_ch_chain->remote_atoms = NULL; - memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof (l_ch_chain->request)); - memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof (l_ch_chain->request_hdr)); - dap_proc_queue_add_callback_inter( a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_sync_request ); - } + memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof (l_ch_chain->request)); + memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof (l_ch_chain->request_hdr)); + dap_proc_queue_add_callback_inter( a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_sync_request ); } + }break; + // Response with metadata organized in TSD + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD :{ + }break; - l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS; - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + // If requested - begin to send atom hashes + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END: + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START:{ + if(s_debug_more) + log_it(L_INFO,"In: Requested update chains start"); + memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr , sizeof (l_ch_chain->request_hdr)); + if(l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END ) + l_ch_chain->request_updates_complete = true; + l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS_REMOTE; - }break; - // Response with metadata organized in TSD - case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD :{ + dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id); + if (l_ch_chain){ + l_ch_chain->request_atom_iter = l_chain->callback_atom_iter_create(l_chain); + l_chain->callback_atom_iter_get_first(l_ch_chain->request_atom_iter, NULL); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + }else{ + log_it(L_ERROR, "Invalid UPDATE_CHAINS_START request from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x" chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", a_ch->stream->esocket->remote_addr_str? + a_ch->stream->esocket->remote_addr_str: "<unknown>", l_chain_pkt->hdr.ext_id, + l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64, + l_chain_pkt->hdr.cell_id.uint64); + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + "ERROR_NET_INVALID_ID"); + // Who are you? I don't know you! go away! + a_ch->stream->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE; + } }break; + // Response with atom hashes and sizes case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS :{ + uint l_count_added=0; + uint l_count_total=0; for ( dap_stream_ch_chain_update_element_t * l_element =(dap_stream_ch_chain_update_element_t *) l_chain_pkt->data; (size_t) (((byte_t*)l_element) - l_chain_pkt->data ) < l_chain_pkt_data_size; l_element++){ @@ -805,21 +864,25 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) memcpy(&l_hash_item->hash, &l_element->hash, sizeof (l_element->hash)); l_hash_item->size = l_element->size; HASH_ADD(hh, l_ch_chain->remote_atoms, hash, sizeof (l_hash_item->hash), l_hash_item); + l_count_added++; + /* if (s_debug_more){ char l_hash_str[72]={ [0]='\0'}; dap_chain_hash_fast_to_str(&l_hash_item->hash,l_hash_str,sizeof (l_hash_str)); - log_it(L_INFO,"In: Updated remote atom hash list with %s ", l_hash_str); - } + log_it(L_DEBUG,"In: Updated remote atom hash list with %s ", l_hash_str); + }*/ } + l_count_total++; } + if (s_debug_more) + log_it(L_INFO,"In: Added %u from %u remote atom hash in list",l_count_added,l_count_total); + }break; // End of response - case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END:{ - l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS ; // Switch on update chains hashes - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ , - l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, - l_ch_chain->request_hdr.cell_id, NULL, 0); - }break; + //case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END:{ + // l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS_REMOTE; // Switch on update chains hashes to remote + // dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + //}break; // first packet of data with source node address case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN: { @@ -831,8 +894,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64); }else{ log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN", l_chain_pkt_data_size); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_CHAIN_PACKET_TYPE_FIRST_CHAIN_INCORRET_DATA_SIZE(%zd/%zd)",l_chain_pkt_data_size, sizeof(dap_chain_node_addr_t)); } } @@ -862,6 +925,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { + l_ch_chain->request_updates_complete = false; if (dap_log_level_get()<= L_INFO){ char *l_hash_from_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_from); char *l_hash_to_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_to); @@ -881,8 +945,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { if(l_ch_chain->is_on_request){ log_it(L_WARNING, "Can't process SYNC_CHAINS request because its already busy with syncronization"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); break; @@ -902,8 +966,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) log_it(L_INFO, "Can't process SYNC_CHAINS request between %s and %s because not in idle state", l_hash_from_str? l_hash_from_str:"(null)", l_hash_to_str?l_hash_to_str:"(null)"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_STATE_NOT_IN_IDLE"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); } else { @@ -918,9 +982,10 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) DAP_DELETE(l_hash_from_str); DAP_DELETE(l_hash_to_str); }else{ - log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: Wrong chain packet size %zd when expected %zd", + l_chain_pkt_data_size, sizeof(l_ch_chain->request)); + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_CHAIN_PKT_DATA_SIZE" ); } } @@ -929,16 +994,16 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { if(l_ch_chain->state != CHAIN_STATE_IDLE) { log_it(L_WARNING, "Can't process SYNC_GLOBAL_DB request because not in idle state"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_STATE_NOT_IN_IDLE"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); break; } else { // receive the latest global_db revision of the remote node -> go to send mode if(l_ch_chain->is_on_request){ log_it(L_WARNING, "Can't process SYNC_CHAINS request because its already busy with syncronization"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); break; @@ -961,8 +1026,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } }else{ log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_CHAIN_PKT_DATA_SIZE" ); } } @@ -996,8 +1061,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_in_chains_callback, l_sync_request); } else { log_it(L_WARNING, "Empty chain packet"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_CHAIN_PACKET_EMPTY"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); } @@ -1014,8 +1079,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr) ); }else { log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB", l_chain_pkt_data_size); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_CHAIN_PACKET_TYPE_FIRST_GLOBAL_DB_INCORRET_DATA_SIZE"); } break; @@ -1039,8 +1104,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_gdb_in_pkt_proc_callback, l_sync_request); } else { log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size"); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_GLOBAL_DB_PACKET_EMPTY"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); } @@ -1054,8 +1119,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); log_it(L_INFO, "In: SYNC_GLOBAL_DB_RVRS pkt: net 0x%016x chain 0x%016x cell 0x%016x, request gdb sync from %u", l_chain_pkt->hdr.net_id.uint64 , l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id, l_sync_gdb.id_start ); - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_sync_gdb, sizeof(l_sync_gdb)); } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: { @@ -1071,22 +1136,28 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_chain_pkt->hdr.net_id.uint64 , l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, l_hash_from_str[0] ? l_hash_from_str :"(null)"); } - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_request, sizeof(l_request)); + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, &l_request, sizeof(l_request)); } }else{ log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_CHAIN_PKT_DATA_SIZE" ); } } break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR: - break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR:{ + char * l_error_str = (char*)l_chain_pkt->data; + if(l_chain_pkt_data_size>1) + l_error_str[l_chain_pkt_data_size-1]='\0'; // To be sure that nobody sends us garbage + // without trailing zero + log_it(L_WARNING,"In: ERROR packet: '%s'",l_ch_chain->node_client, l_chain_pkt_data_size>1? + l_error_str:"<empty>"); + }break; default: { - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64, + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, "ERROR_UNKNOWN_CHAIN_PKT_TYPE"); } } @@ -1121,8 +1192,8 @@ static void s_process_gdb_iter(dap_stream_ch_t *a_ch) log_it(L_INFO, "Send one global_db record packet len=%d (rest=%d/%d items)", l_pkt_size, dap_db_log_list_get_count_rest(l_db_list), dap_db_log_list_get_count(l_db_list)); dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, - l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, - l_ch_chain->request_hdr.cell_id, l_pkt, l_pkt_size); + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, l_pkt, l_pkt_size); dap_list_t *l_iter = dap_list_next(l_ch_chain->request_db_iter); if (l_iter) { l_ch_chain->request_db_iter = l_iter; @@ -1173,8 +1244,8 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->is_on_request = false; dap_stream_ch_chain_sync_request_t l_request = {}; dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, - l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request)); dap_stream_ch_chain_go_idle(l_ch_chain); if (l_ch_chain->callback_notify_packet_out) l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, @@ -1183,6 +1254,49 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) } } break; + // Update list of atoms to remote + case CHAIN_STATE_UPDATE_CHAINS_REMOTE:{ + dap_stream_ch_chain_update_element_t * l_data= DAP_NEW_Z_SIZE(dap_stream_ch_chain_update_element_t,sizeof (dap_stream_ch_chain_update_element_t)*s_update_pack_size); + size_t l_data_size=0; + for(uint_fast16_t n=0; n<s_update_pack_size && (l_ch_chain->request_atom_iter && l_ch_chain->request_atom_iter->cur);n++){ + // If present smth - hash it + dap_hash_fast(l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size,&l_data[n].hash); + l_data[n].size=l_ch_chain->request_atom_iter->cur_size; + // Shift offset counter + l_data_size += sizeof (dap_stream_ch_chain_update_element_t); + // Then get next atom + l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); + } + if (l_data_size){ + if(s_debug_more) + log_it(L_DEBUG,"Out: UPDATE_CHAINS size %zd sent ",l_data_size); + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS, + l_ch_chain->request_hdr.net_id.uint64, + l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, + l_data,l_data_size); + } + if(!l_data_size || !l_ch_chain->request_atom_iter){ // We over with all the hashes here + if(s_debug_more) + log_it(L_INFO,"Out: UPDATE_CHAINS_END sent "); + + if (l_ch_chain->request_updates_complete){ + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ , + l_ch_chain->request_hdr.net_id.uint64, + l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, + NULL,0); + }else + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END, + l_ch_chain->request_hdr.net_id.uint64, + l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, + NULL,0); + dap_stream_ch_set_ready_to_write_unsafe(a_ch,false); + l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS; + } + }break; + // Synchronize chains case CHAIN_STATE_SYNC_CHAINS: { @@ -1202,18 +1316,22 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_request_atom_hash_str); } }else{ + l_hash_item = DAP_NEW_Z(dap_stream_ch_chain_hash_item_t); if(s_debug_more){ - dap_chain_hash_fast_t l_atom_hash={0}; - dap_hash_fast(l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size,&l_atom_hash); - char *l_atom_hash_str= dap_chain_hash_fast_to_str_new(&l_atom_hash); + dap_hash_fast(l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size,&l_hash_item->hash); + char *l_atom_hash_str= dap_chain_hash_fast_to_str_new(&l_hash_item->hash); log_it(L_INFO, "Out CHAIN pkt: atom hash %s (size %zd) ", l_atom_hash_str, l_ch_chain->request_atom_iter->cur_size); DAP_DELETE(l_atom_hash_str); } - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id, - l_ch_chain->request_hdr.chain_id, l_ch_chain->request_hdr.cell_id, + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id.uint64, + l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size); l_ch_chain->stats_request_atoms_processed++; + + l_hash_item->size = l_ch_chain->request_atom_iter->cur_size; + // Because we sent this atom to remote - we record it to not to send it twice + HASH_ADD(hh, l_ch_chain->remote_atoms, hash, sizeof (l_hash_item->hash), l_hash_item); } // Then get next atom and populate new last l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL); @@ -1221,8 +1339,8 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_chain_sync_request_t l_request = {0}; // last message dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, - l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, - l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); + l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request)); log_it( L_INFO,"Synced: %llu atoms processed", l_ch_chain->stats_request_atoms_processed); l_ch_chain->is_on_request = false; dap_stream_ch_chain_go_idle(l_ch_chain); diff --git a/modules/channel/chain/dap_stream_ch_chain_pkt.c b/modules/channel/chain/dap_stream_ch_chain_pkt.c index d547e587d1201f379a884b4a2b55b5cf59c0042c..c97d1f1e663bd9b2fc4835bcb88d613dbd4dd766 100644 --- a/modules/channel/chain/dap_stream_ch_chain_pkt.c +++ b/modules/channel/chain/dap_stream_ch_chain_pkt.c @@ -49,17 +49,17 @@ dap_stream_ch_chain_state_t dap_stream_ch_chain_pkt_type_to_dap_stream_ch_chain_ * @param data_size * @return */ -size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id, - dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, +size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type,uint64_t a_net_id, + uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size) { dap_stream_ch_chain_pkt_t * l_chain_pkt; size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr) + a_data_size; l_chain_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size ); l_chain_pkt->hdr.version = 1; - l_chain_pkt->hdr.net_id.uint64 = a_net_id.uint64; - l_chain_pkt->hdr.cell_id.uint64 = a_cell_id.uint64; - l_chain_pkt->hdr.chain_id.uint64 = a_chain_id.uint64; + l_chain_pkt->hdr.net_id.uint64 = a_net_id; + l_chain_pkt->hdr.cell_id.uint64 = a_cell_id; + l_chain_pkt->hdr.chain_id.uint64 = a_chain_id; if (a_data_size && a_data) memcpy( l_chain_pkt->data, a_data, a_data_size); @@ -70,17 +70,17 @@ size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_typ } -size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id, - dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, +size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, uint8_t a_type,uint64_t a_net_id, + uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size) { dap_stream_ch_chain_pkt_t * l_chain_pkt; size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr) + a_data_size; l_chain_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size ); l_chain_pkt->hdr.version = 1; - l_chain_pkt->hdr.net_id.uint64 = a_net_id.uint64; - l_chain_pkt->hdr.cell_id.uint64 = a_cell_id.uint64; - l_chain_pkt->hdr.chain_id.uint64 = a_chain_id.uint64; + l_chain_pkt->hdr.net_id.uint64 = a_net_id; + l_chain_pkt->hdr.cell_id.uint64 = a_cell_id; + l_chain_pkt->hdr.chain_id.uint64 = a_chain_id; if (a_data_size && a_data) memcpy( l_chain_pkt->data, a_data, a_data_size); @@ -103,17 +103,18 @@ size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_strea * @param a_data_size * @return */ -size_t dap_stream_ch_chain_pkt_write_inter(dap_proc_thread_t * a_thread, dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id, - dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, +size_t dap_stream_ch_chain_pkt_write_inter(dap_proc_thread_t * a_thread, dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, + uint8_t a_type,uint64_t a_net_id, + uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size) { dap_stream_ch_chain_pkt_t * l_chain_pkt; size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr) + a_data_size; l_chain_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size ); l_chain_pkt->hdr.version = 1; - l_chain_pkt->hdr.net_id.uint64 = a_net_id.uint64; - l_chain_pkt->hdr.cell_id.uint64 = a_cell_id.uint64; - l_chain_pkt->hdr.chain_id.uint64 = a_chain_id.uint64; + l_chain_pkt->hdr.net_id.uint64 = a_net_id; + l_chain_pkt->hdr.cell_id.uint64 = a_cell_id; + l_chain_pkt->hdr.chain_id.uint64 = a_chain_id; if (a_data_size && a_data) memcpy( l_chain_pkt->data, a_data, a_data_size); diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index e6b333c3bc152cf747ece0e524e0ab194f559e5f..805ab0ace2ff4ad401357bce0813dfd5f6842c77 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -79,6 +79,8 @@ typedef struct dap_stream_ch_chain { dap_stream_ch_chain_pkt_hdr_t request_hdr; dap_list_t *request_db_iter; + bool request_updates_complete; + atomic_bool is_on_request; // Protects request section dap_stream_ch_chain_callback_packet_t callback_notify_packet_out; diff --git a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h index 388400ae1a6fdcde67f45b16036838d013a0e01f..9c8e352409c16ff090bb168eff77ad9d19c5ebd9 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h +++ b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h @@ -59,13 +59,15 @@ #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ 0x05 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD 0x15 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS 0x25 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END 0x35 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START 0x25 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS 0x35 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END 0x45 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ 0x06 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD 0x16 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB 0x26 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END 0x36 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_START 0x26 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB 0x36 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END 0x46 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR 0xff @@ -77,11 +79,13 @@ typedef enum dap_stream_ch_chain_state{ CHAIN_STATE_IDLE=0, - CHAIN_STATE_UPDATE_GLOBAL_DB=1, // Update GDB hashtable - CHAIN_STATE_UPDATE_CHAINS=2, // Update chains hashtable - CHAIN_STATE_SYNC_CHAINS=3, - CHAIN_STATE_SYNC_GLOBAL_DB=4, - CHAIN_STATE_SYNC_ALL=5 + CHAIN_STATE_UPDATE_GLOBAL_DB_REMOTE, // Downloadn GDB hashtable from remote + CHAIN_STATE_UPDATE_GLOBAL_DB, // Update GDB hashtable to remote + CHAIN_STATE_SYNC_GLOBAL_DB, + CHAIN_STATE_UPDATE_CHAINS_REMOTE, // Update chains hashtable from remote + CHAIN_STATE_UPDATE_CHAINS, // Update chains hashtable to remote + CHAIN_STATE_SYNC_CHAINS, + CHAIN_STATE_SYNC_ALL } dap_stream_ch_chain_state_t; @@ -100,8 +104,13 @@ typedef struct dap_stream_ch_chain_sync_request{ typedef struct dap_stream_ch_chain_pkt_hdr{ - uint8_t version; - uint8_t padding[7]; + union{ + struct{ + uint8_t version; + uint8_t padding[7]; + } DAP_ALIGN_PACKED; + uint64_t ext_id; + }DAP_ALIGN_PACKED; dap_chain_net_id_t net_id; dap_chain_id_t chain_id; dap_chain_cell_id_t cell_id; @@ -127,16 +136,16 @@ static const char* c_dap_stream_ch_chain_pkt_type_str[]={ dap_stream_ch_chain_state_t dap_stream_ch_chain_pkt_type_to_dap_stream_ch_chain_state(char a_state); -size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type, dap_chain_net_id_t a_net_id, - dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, +size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id, + uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size); -size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, uint8_t a_type, dap_chain_net_id_t a_net_id, - dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, +size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, uint8_t a_type, uint64_t a_net_id, + uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size); -size_t dap_stream_ch_chain_pkt_write_inter(dap_proc_thread_t * a_thread, dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id, - dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, +size_t dap_stream_ch_chain_pkt_write_inter(dap_proc_thread_t * a_thread, dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, uint8_t a_type,uint64_t a_net_id, + uint64_t a_chain_id, uint64_t a_cell_id, const void * a_data, size_t a_data_size); /** @@ -148,8 +157,8 @@ size_t dap_stream_ch_chain_pkt_write_inter(dap_proc_thread_t * a_thread, dap_str * @param a_err_string_format * @return */ -inline static size_t dap_stream_ch_chain_pkt_write_error_unsafe(dap_stream_ch_t *a_ch, dap_chain_net_id_t a_net_id, - dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, const char * a_err_string_format,... ) +inline static size_t dap_stream_ch_chain_pkt_write_error_unsafe(dap_stream_ch_t *a_ch, uint64_t a_net_id, + uint64_t a_chain_id, uint64_t a_cell_id, const char * a_err_string_format,... ) { va_list l_va; char * l_str; @@ -178,8 +187,8 @@ inline static size_t dap_stream_ch_chain_pkt_write_error_unsafe(dap_stream_ch_t * @param a_err_string_format * @return */ -inline static size_t dap_stream_ch_chain_pkt_write_error_inter(dap_proc_thread_t * a_thread, dap_stream_worker_t * a_stream_worker, dap_stream_ch_t *a_ch, dap_chain_net_id_t a_net_id, - dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, const char * a_err_string_format,... ) +inline static size_t dap_stream_ch_chain_pkt_write_error_inter(dap_proc_thread_t * a_thread, dap_stream_worker_t * a_stream_worker, dap_stream_ch_t *a_ch, uint64_t a_net_id, + uint64_t a_chain_id, uint64_t a_cell_id, const char * a_err_string_format,... ) { va_list l_va; char * l_str; diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 63ed017fc025cc240c261f56242d22bdf0e9cc52..bc448ccc3bc7639f3231f34ed84f37b50b658fb4 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -332,8 +332,8 @@ void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const c if (!l_ch_chain) { continue; } - dap_stream_ch_chain_pkt_write_mt( dap_client_get_stream_worker(l_node_client->client), l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_net->pub.id, - l_chain_id, l_net->pub.cell_id, l_data_out, + dap_stream_ch_chain_pkt_write_mt( dap_client_get_stream_worker(l_node_client->client), l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_net->pub.id.uint64, + l_chain_id.uint64, l_net->pub.cell_id.uint64, l_data_out, sizeof(dap_store_obj_pkt_t) + l_data_out->data_size); } dap_list_free_full(l_list_out, free); @@ -384,7 +384,7 @@ static void s_chain_callback_notify(void * a_arg, dap_chain_t *a_chain, dap_chai continue; } dap_stream_ch_chain_pkt_write_mt( dap_client_get_stream_worker( l_node_client->client), l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, - l_net->pub.id, a_chain->id, a_id, a_atom, a_atom_size); + l_net->pub.id.uint64, a_chain->id.uint64, a_id.uint64, a_atom, a_atom_size); } } } @@ -416,7 +416,8 @@ static void s_fill_links_from_root_aliases(dap_chain_net_t * a_net) l_pvt_net->links_info = dap_list_append(l_pvt_net->links_info, l_link_node_info); pthread_rwlock_unlock(&l_pvt_net->rwlock); } else { - log_it(L_WARNING, "Not found link %s."NODE_ADDR_FP_STR" in the node list", a_net->pub.name, NODE_ADDR_FPS_ARGS(l_link_addr)); + log_it(L_WARNING, "Not found link %s."NODE_ADDR_FP_STR" in the node list", a_net->pub.name, + NODE_ADDR_FPS_ARGS(l_link_addr)); } } } @@ -436,7 +437,8 @@ static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_clie a_node_client->stream_worker = dap_client_get_stream_worker(a_node_client->client); if( !a_node_client->is_reconnecting || s_debug_more ) - log_it(L_NOTICE, "Established connection with %s."NODE_ADDR_FP_STR,l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address)); + log_it(L_NOTICE, "Established connection with %s."NODE_ADDR_FP_STR,l_net->pub.name, + NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr)); pthread_rwlock_wrlock(&l_net_pvt->rwlock); l_net_pvt->links = dap_list_append(l_net_pvt->links, a_node_client); l_net_pvt->links_connected_count++; @@ -461,8 +463,8 @@ static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_clie if(a_node_client->ch_chain_net) a_node_client->ch_chain_net_uuid = a_node_client->ch_chain_net->uuid; dap_stream_ch_chain_pkt_write_unsafe( a_node_client->ch_chain , - DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id, - l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); + DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id.uint64, + l_chain_id.uint64, l_net->pub.cell_id.uint64, &l_sync_gdb, sizeof(l_sync_gdb)); } a_node_client->is_reconnecting = false; @@ -488,7 +490,7 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t * a_node_c if(s_debug_more) log_it(L_NOTICE, "%s."NODE_ADDR_FP_STR" disconnected, reconnecting back...", l_net->pub.name, - NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address) ); + NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr) ); a_node_client->is_reconnecting = true; @@ -498,7 +500,7 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t * a_node_c }else{ log_it(L_CRITICAL,"Link "NODE_ADDR_FP_STR" disconnected, but wrong target state %s: could be only NET_STATE_ONLINE or NET_STATE_OFFLINE " - ,NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address) + ,NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr) , c_net_states[l_net_pvt->state_target] ); } if(l_net_pvt->links_connected_count) @@ -520,7 +522,7 @@ static void s_node_link_callback_stage(dap_chain_node_client_t * a_node_client,d { dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg; if( s_debug_more) - log_it(L_INFO,"%s."NODE_ADDR_FP_STR" stage %s",l_net->pub.name,NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address), + log_it(L_INFO,"%s."NODE_ADDR_FP_STR" stage %s",l_net->pub.name,NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr), dap_client_stage_str(a_stage)); } @@ -533,7 +535,8 @@ static void s_node_link_callback_stage(dap_chain_node_client_t * a_node_client,d static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client, int a_error, void * a_arg) { dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg; - log_it(L_WARNING, "Can't establish link with %s."NODE_ADDR_FP_STR, l_net->pub.name, NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address)); + log_it(L_WARNING, "Can't establish link with %s."NODE_ADDR_FP_STR, l_net->pub.name, + NODE_ADDR_FP_ARGS_S(a_node_client->remote_node_addr)); } /** @@ -916,6 +919,7 @@ void dap_chain_net_delete( dap_chain_net_t * a_net ) */ int dap_chain_net_init() { + dap_chain_node_client_init(); dap_chain_node_cli_cmd_item_create ("net", s_cli_net, NULL, "Network commands", "net -net <chain net name> [-mode update|all] go < online | offline >\n" "\tFind and establish links and stay online. \n" diff --git a/modules/net/dap_chain_node_cli_cmd.c b/modules/net/dap_chain_node_cli_cmd.c index 242e0346a4583dd841ad2dac947625df0dd500c5..7efc4c3be31901d32082ba040ab6eabc3d6dd92f 100644 --- a/modules/net/dap_chain_node_cli_cmd.c +++ b/modules/net/dap_chain_node_cli_cmd.c @@ -1128,7 +1128,7 @@ int com_node(int a_argc, char ** a_argv, void *arg_func, char **a_str_reply) //memcpy(&l_s_ch_chain->request, &l_sync_request, sizeof(l_sync_request)); if(0 == dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, - l_net->pub.id, l_chain_id_null, l_chain_cell_id_null, &l_sync_request, + l_net->pub.id.uint64, 0, 0, &l_sync_request, sizeof(l_sync_request))) { dap_chain_node_cli_set_reply_text(a_str_reply, "Error: Can't send sync chains request"); // clean client struct @@ -1168,7 +1168,7 @@ int com_node(int a_argc, char ** a_argv, void *arg_func, char **a_str_reply) DAP_DELETE(l_hash); } if(0 == dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, - l_net->pub.id, l_chain->id, l_remote_node_info->hdr.cell_id, &l_sync_request, + l_net->pub.id.uint64, l_chain->id.uint64, l_remote_node_info->hdr.cell_id.uint64, &l_sync_request, sizeof(l_sync_request))) { dap_chain_node_cli_set_reply_text(a_str_reply, "Error: Can't send sync chains request"); // clean client struct diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index e78793c8219f1a676da5f059322f1c78f13cebeb..b1e4585e2453c02cf3c7ed0051ec8df52a11f8d0 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -83,12 +83,15 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size, void * a_arg); +bool s_stream_ch_chain_debug_more = false; /** * @brief dap_chain_node_client_init * @return */ int dap_chain_node_client_init(void) { + s_stream_ch_chain_debug_more = dap_config_get_item_bool_default(g_config,"stream_ch_chain","debug_more",false); + return 0; } @@ -148,7 +151,8 @@ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) if(l_node_client->callbacks.error) l_node_client->callbacks.error(l_node_client, EINVAL,l_node_client->callbacks_arg ); - log_it(L_DEBUG, "Wakeup all who waits"); + if (s_stream_ch_chain_debug_more) + log_it(L_DEBUG, "Wakeup all who waits"); pthread_mutex_lock(&l_node_client->wait_mutex); l_node_client->state = NODE_CLIENT_STATE_ERROR; @@ -189,7 +193,8 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg) if(l_node_client->callbacks.connected) l_node_client->callbacks.connected(l_node_client, l_node_client->callbacks_arg); l_node_client->keep_connection = true; - log_it(L_DEBUG, "Wakeup all who waits"); + if(s_stream_ch_chain_debug_more) + log_it(L_DEBUG, "Wakeup all who waits"); l_node_client->state = NODE_CLIENT_STATE_ESTABLISHED; #ifndef _WIN32 pthread_cond_broadcast(&l_node_client->wait_cond); @@ -263,7 +268,7 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR: dap_snprintf(l_node_client->last_error, sizeof(l_node_client->last_error), "%s", (char*) a_pkt->data); - log_it(L_WARNING, "Received packet DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR with error \"%s\"", + log_it(L_WARNING, "In: Received packet DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR with error \"%s\"", l_node_client->last_error); l_node_client->state = NODE_CLIENT_STATE_ERROR; @@ -272,12 +277,21 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha #else SetEvent( l_node_client->wait_cond ); #endif - break; + break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END:{ + dap_chain_net_t * l_net = l_node_client->net; + assert(l_net); + if (s_stream_ch_chain_debug_more) + log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" UPDATE_CHAINS_END: %zd hashes on remote", + l_net->pub.name, + NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr ) + ); }break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB:{ dap_chain_net_t * l_net = l_node_client->net; + assert(l_net); + if(s_stream_ch_chain_debug_more) + log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" synced GDB", l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr )); // We over with GLOBAL_DB and switch on syncing chains l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES; @@ -290,16 +304,32 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha dap_chain_cell_id_t l_cell_id={0}; dap_chain_id_t l_chain_id={0}; if(! l_node_client->cur_chain){ - log_it(L_CRITICAL,"Can't sync %s because there is no chains in it",l_net->pub.name); - dap_stream_ch_chain_pkt_write_error_unsafe(a_ch_chain->ch,l_net->pub.id, l_chain_id,l_cell_id,"ERROR_CHAIN_NO_CHAINS"); + log_it(L_CRITICAL,"In: Can't sync chains for %s because there is no chains in it",l_net->pub.name); + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch_chain->ch,l_net->pub.id.uint64, + l_chain_id.uint64,l_cell_id.uint64,"ERROR_CHAIN_NO_CHAINS"); }else{ // If present - select the first one cell in chain l_chain_id=l_node_client->cur_chain->id; dap_chain_cell_t * l_cell = l_node_client->cur_chain->cells; if (l_cell){ l_cell_id=l_cell->id; } - dap_stream_ch_chain_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ,l_net->pub.id , - l_chain_id,l_cell_id,NULL,0); + uint64_t l_net_id = l_net->pub.id.uint64; + + dap_stream_ch_chain_pkt_t * l_chain_pkt; + size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr); + l_chain_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size ); + l_chain_pkt->hdr.version = 1; + l_chain_pkt->hdr.net_id.uint64 = l_net_id; + l_chain_pkt->hdr.cell_id.uint64 = l_cell_id.uint64; + l_chain_pkt->hdr.chain_id.uint64 = l_chain_id.uint64; + + dap_stream_ch_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START , + l_chain_pkt,l_chain_pkt_size); + DAP_DELETE(l_chain_pkt); + log_it(L_INFO, + "In: Send UPDATE_CHAINS_START: net_id=0x%016x chain_id=0x%016x cell_id=0x%016x ", + l_net_id,l_chain_id.uint64,l_cell_id.uint64 + ); } }break; @@ -314,45 +344,55 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha // Check if we over with it before if ( !l_node_client->cur_cell ){ - log_it(L_WARNING, "No current cell in sync state, anyway we over it"); + log_it(L_WARNING, "In: No current cell in sync state, anyway we over it"); }else l_node_client->cur_cell =(dap_chain_cell_t *) l_node_client->cur_cell->hh.next; dap_chain_net_t * l_net = l_node_client->net; + assert(l_net); dap_chain_node_addr_t * l_node_addr = dap_chain_net_get_cur_addr(l_net); // If over with cell, switch on next chain if ( l_node_client->cur_cell){ // Check if we over with it before if ( !l_node_client->cur_chain ){ - log_it(L_ERROR, "No chain but cell is present, over wit it"); + log_it(L_ERROR, "In: No chain but cell is present, over wit it"); }else{ dap_chain_id_t l_chain_id=l_node_client->cur_chain->id; dap_chain_cell_id_t l_cell_id = l_node_client->cur_cell->id; - dap_stream_ch_chain_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS,l_net->pub.id , - l_chain_id,l_cell_id,NULL,0); + l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES; + dap_stream_ch_chain_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START, + l_net->pub.id.uint64 , + l_chain_id.uint64,l_cell_id.uint64,NULL,0); } }else{ // Check if we over with it before if ( !l_node_client->cur_chain ){ - log_it(L_WARNING, "No current chain in sync state, anyway we over it"); + log_it(L_WARNING, "In: No current chain in sync state, anyway we over it"); }else{ l_node_client->cur_chain = (dap_chain_t *) l_node_client->cur_chain->next; l_node_client->cur_cell = l_node_client->cur_chain? l_node_client->cur_chain->cells : NULL; } + dap_chain_id_t l_chain_id={0}; + dap_chain_cell_id_t l_cell_id = {0}; - if (l_node_client->cur_chain && !l_node_client->cur_cell) - log_it(L_WARNING,"Link %s."NODE_ADDR_FP_STR" started to sync %s chain but found no cells in it",l_net->pub.name, - NODE_ADDR_FP_ARGS(l_node_addr), l_node_client->cur_chain->name ); + if (l_node_client->cur_cell) + l_cell_id = l_node_client->cur_cell->id; // Check if we have some more chains and cells in it to sync - if( l_node_client->cur_chain && l_node_client->cur_cell ){ - dap_chain_id_t l_chain_id=l_node_client->cur_chain->id; - dap_chain_cell_id_t l_cell_id = l_node_client->cur_cell->id; - dap_stream_ch_chain_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS,l_net->pub.id , - l_chain_id,l_cell_id,NULL,0); + if( l_node_client->cur_chain ){ + l_chain_id=l_node_client->cur_chain->id; + + l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS_UPDATES; + if (s_stream_ch_chain_debug_more) + log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" started to sync %s chain",l_net->pub.name, + NODE_ADDR_FP_ARGS(l_node_addr), l_node_client->cur_chain->name ); + + dap_stream_ch_chain_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START, + l_net->pub.id.uint64 , + l_chain_id.uint64,l_cell_id.uint64,NULL,0); }else{ // If no - over with sync process - log_it(L_INFO, "State node %s."NODE_ADDR_FP_STR" is SYNCED",l_net->pub.name, NODE_ADDR_FP_ARGS(l_node_addr) ); + log_it(L_INFO, "In: State node %s."NODE_ADDR_FP_STR" is SYNCED",l_net->pub.name, NODE_ADDR_FP_ARGS(l_node_addr) ); l_node_client->state = NODE_CLIENT_STATE_SYNCED; if (dap_chain_net_get_state(l_net) == NET_STATE_SYNC_CHAINS ) dap_chain_net_set_state(l_net, NET_STATE_ONLINE); @@ -365,7 +405,12 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha } } - } + } break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL:{ + dap_chain_net_t * l_net = l_node_client->net; + assert(l_net); + dap_chain_net_set_state(l_net, NET_STATE_ONLINE); + }break; default: break; } }