diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index 2371d6cd58d2faba7729b5275fed15dbb1f72a30..baeef39724cd923730739a3ff4fdf08422206be1 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -341,6 +341,10 @@ void* dap_client_http_request_custom(const char *a_uplink_addr, uint16_t a_uplin // create socket int l_socket = socket( PF_INET, SOCK_STREAM, 0); + if (l_socket == -1) { + log_it(L_ERROR, "Error %d with socket create", errno); + return NULL; + } // set socket param int buffsize = DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX; #ifdef _WIN32 diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 1bb6f9708ef684d963df77157f6adcdb09724e0d..6f07440c1877d006d9aac0a89c5d30a985aaa8d1 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -452,6 +452,11 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) log_it(L_INFO, "Go to stage STREAM_SESSION: process the state ops"); a_client_pvt->stream_socket = socket( PF_INET, SOCK_STREAM, 0); + if (a_client_pvt->stream_socket == -1) { + log_it(L_ERROR, "Error %d with socket create", errno); + a_client_pvt->stage_status = STAGE_STATUS_ERROR; + break; + } #ifdef _WIN32 { int buffsize = 65536; diff --git a/dap-sdk/net/stream/ch/dap_stream_ch.c b/dap-sdk/net/stream/ch/dap_stream_ch.c index af9137dd4f713d6a5b9fcbd9484994d2979dff3e..a56a182ca3e32f97fa118e86d77494dde77cfbc1 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch.c @@ -133,10 +133,15 @@ bool dap_stream_ch_valid(dap_stream_ch_t *a_ch) void dap_stream_ch_delete(dap_stream_ch_t *a_ch) { pthread_mutex_lock(&s_ch_table_lock); - struct dap_stream_ch_table_t *l_ret; + struct dap_stream_ch_table_t *l_ret;; HASH_FIND_PTR(s_ch_table, &a_ch, l_ret); + if (!l_ret) { + pthread_mutex_unlock(&s_ch_table_lock); + return; + } HASH_DEL(s_ch_table, l_ret); pthread_mutex_unlock(&s_ch_table_lock); + DAP_DELETE(l_ret); pthread_mutex_lock(&a_ch->mutex); if (a_ch->proc) diff --git a/modules/chain/dap_chain.c b/modules/chain/dap_chain.c index ccbbf22ad141985b72338dd09d6b58eb8b407823..dd0910e8f6d156746a39d7f02766a4ed672533ac 100644 --- a/modules/chain/dap_chain.c +++ b/modules/chain/dap_chain.c @@ -255,9 +255,9 @@ dap_chain_t * dap_chain_load_from_cfg(dap_ledger_t* a_ledger, const char * a_cha // Recognize chains id if ( (l_chain_id_str = dap_config_get_item_str(l_cfg,"chain","id")) != NULL ){ - if ( sscanf(l_chain_id_str,"0x%016lX",& l_chain_id_u ) !=1 ){ - if ( sscanf(l_chain_id_str,"0x%016lx",&l_chain_id_u) !=1 ) { - if ( sscanf(l_chain_id_str,"%lu",&l_chain_id_u ) !=1 ){ + if ( sscanf(l_chain_id_str,"0x%016llX",& l_chain_id_u ) !=1 ){ + if ( sscanf(l_chain_id_str,"0x%016llx",&l_chain_id_u) !=1 ) { + if ( sscanf(l_chain_id_str,"%llu",&l_chain_id_u ) !=1 ){ log_it (L_ERROR,"Can't recognize '%s' string as chain net id, hex or dec",l_chain_id_str); dap_config_close(l_cfg); return NULL; diff --git a/modules/chain/dap_chain_cell.c b/modules/chain/dap_chain_cell.c index acbb9e7b7f244d309e5708584e75523cf642b78b..4d7f76f0aa01c735677f9d345b2e5ccd78bfb01a 100644 --- a/modules/chain/dap_chain_cell.c +++ b/modules/chain/dap_chain_cell.c @@ -213,7 +213,7 @@ int dap_chain_cell_file_append( dap_chain_cell_t * a_cell, const void* a_atom, s l_total_wrote_bytes += a_atom_size; // change in chain happened -> nodes synchronization required if(a_cell->chain && a_cell->chain->callback_notify) - a_cell->chain->callback_notify(a_cell->chain->callback_notify_arg, a_cell->chain, a_cell->id); + a_cell->chain->callback_notify(a_cell->chain->callback_notify_arg, a_cell->chain, a_cell->id, (void *)a_atom, a_atom_size); } else { log_it (L_ERROR, "Can't write data from cell 0x%016X to the file \"%s\"", a_cell->id.uint64, diff --git a/modules/chain/include/dap_chain.h b/modules/chain/include/dap_chain.h index 4ed1a66426ed17316e8bae94cdb58adc4f0bfef4..0888d5eaa2a8de225cb84c9666365042581c0872 100644 --- a/modules/chain/include/dap_chain.h +++ b/modules/chain/include/dap_chain.h @@ -83,7 +83,7 @@ typedef void (*dap_chain_callback_atom_iter_delete_t)(dap_chain_atom_iter_t * ) typedef size_t (*dap_chain_datum_callback_datum_pool_proc_add_t)(dap_chain_t * , dap_chain_datum_t **, size_t ); typedef size_t (*dap_chain_datum_callback_datum_pool_proc_add_with_group_t)(dap_chain_t * , dap_chain_datum_t **, size_t, const char *); -typedef void (*dap_chain_callback_notify_t)(void * a_arg, dap_chain_t *a_chain, dap_chain_cell_id_t a_id); //change in chain happened +typedef void (*dap_chain_callback_notify_t)(void * a_arg, dap_chain_t *a_chain, dap_chain_cell_id_t a_id, void* a_atom, size_t a_atom_size); //change in chain happened typedef enum dap_chain_type { diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 92bd61bb91ee01e28937779ac9abf2ddb295668f..430a5fd6649bd701deaf6819118ffc556455d97f 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -310,7 +310,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); - dap_stream_ch_chain_sync_request_t l_request = { { 0 } }; + dap_stream_ch_chain_sync_request_t l_request = {}; //log_it(L_DEBUG, "No items to sync from %u", l_request->id_start + 1); l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); @@ -353,27 +353,33 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if(l_chain) { // Expect atom element in if(l_chain_pkt_data_size > 0) { - dap_chain_atom_ptr_t l_atom_copy = DAP_CALLOC(1, l_chain_pkt_data_size); - memcpy(l_atom_copy, l_chain_pkt->data, l_chain_pkt_data_size); - dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy); - if(l_atom_add_res == ATOM_ACCEPT && dap_chain_has_file_store(l_chain)) { - // append to file - dap_chain_cell_id_t l_cell_id; - l_cell_id.uint64 = l_chain_pkt->hdr.cell_id.uint64; - dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_cell_id); - // add one atom only - int l_res = dap_chain_cell_file_append(l_cell, l_chain_pkt->data, l_chain_pkt_data_size); - // rewrite all file - //l_res = dap_chain_cell_file_update(l_cell); - if(!l_cell || l_res < 0) { - log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_chain_pkt->data, - l_cell ? l_cell->file_storage_path : "[null]"); + dap_chain_hash_fast_t l_atom_hash = {}; + dap_hash_fast(l_chain_pkt->data, l_chain_pkt_data_size, &l_atom_hash); + dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain); + if (!l_chain->callback_atom_find_by_hash(l_atom_iter, &l_atom_hash)) { + dap_chain_atom_ptr_t l_atom_copy = DAP_CALLOC(1, l_chain_pkt_data_size); + memcpy(l_atom_copy, l_chain_pkt->data, l_chain_pkt_data_size); + dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy); + if(l_atom_add_res == ATOM_ACCEPT && dap_chain_has_file_store(l_chain)) { + // append to file + dap_chain_cell_id_t l_cell_id; + l_cell_id.uint64 = l_chain_pkt->hdr.cell_id.uint64; + dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_cell_id); + // add one atom only + int l_res = dap_chain_cell_file_append(l_cell, l_chain_pkt->data, l_chain_pkt_data_size); + // rewrite all file + //l_res = dap_chain_cell_file_update(l_cell); + if(!l_cell || l_res < 0) { + log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_chain_pkt->data, + l_cell ? l_cell->file_storage_path : "[null]"); + } + // delete cell and close file + dap_chain_cell_delete(l_cell); } - // delete cell and close file - dap_chain_cell_delete(l_cell); + if(l_atom_add_res == ATOM_PASS) + DAP_DELETE(l_atom_copy); } - if(l_atom_add_res == ATOM_PASS) - DAP_DELETE(l_atom_copy); + l_chain->callback_atom_iter_delete(l_atom_iter); } else { log_it(L_WARNING, "Empty chain packet"); dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, @@ -398,19 +404,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) //log_it(L_INFO, "In: GLOBAL_DB data_size=%d", l_chain_pkt_data_size); // get transaction and save it to global_db if(l_chain_pkt_data_size > 0) { - //session_data_t *l_data = session_data_find(a_ch->stream->session->id); size_t l_data_obj_count = 0; - // deserialize data dap_store_obj_t *l_store_obj = dap_db_log_unpack((uint8_t*) l_chain_pkt->data, l_chain_pkt_data_size, &l_data_obj_count); // Parse data from dap_db_log_pack() - //dap_store_obj_t * l_store_obj_reversed = NULL; - //if ( dap_log_level_get()== L_DEBUG ) - //if ( l_data_obj_count && l_store_obj ) - // l_store_obj_reversed = DAP_NEW_Z_SIZE(dap_store_obj_t,l_data_obj_count+1); - - // log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count ); for(size_t i = 0; i < l_data_obj_count; i++) { @@ -466,10 +464,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) /*log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\"" " timestamp=\"%s\" value_len=%u ", (char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group, - l_store_obj[i].key, - l_ts_str, - l_store_obj[i].value_len);*/ - + l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);*/ // apply received transaction dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if(l_chain) { @@ -478,36 +473,6 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) (dap_chain_datum_t**) &(l_store_obj->value), 1, l_store_obj[i].group); } - /*else { - // read net_name - if(!s_net_name) - { - static dap_config_t *l_cfg = NULL; - if((l_cfg = dap_config_open("network/default")) == NULL) { - log_it(L_ERROR, "Can't open default network config"); - } else { - s_net_name = dap_strdup(dap_config_get_item_str(l_cfg, "general", "name")); - dap_config_close(l_cfg); - } - } - // add datum in ledger if necessary - { - dap_chain_net_t *l_net = dap_chain_net_by_name(s_net_name); - dap_chain_t * l_chain; - if(l_net) { - DL_FOREACH(l_net->pub.chains, l_chain) - { - const char *l_chain_name = l_chain->name; //l_chain_name = dap_strdup("gdb"); - dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, l_chain_name); - //const char *l_group_name = "chain-gdb.kelvin-testnet.chain-F00000000000000F";//dap_chain_gdb_get_group(l_chain); - if(l_chain->callback_datums_pool_proc_with_group) - l_chain->callback_datums_pool_proc_with_group(l_chain, - (dap_chain_datum_t**) &(l_store_obj->value), 1, - l_store_obj[i].group); - } - } - } - }*/ // save data to global_db if(!dap_chain_global_db_obj_save(l_obj, 1)) { dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, @@ -531,9 +496,28 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } } break; - default:{ + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS: { + dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; + memcpy(&l_sync_gdb, l_chain_pkt->data, l_chain_pkt_data_size); + dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id); + l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ? + dap_chain_net_get_cur_addr(l_net)->uint64 : + dap_db_get_cur_node_addr(l_net->pub.name); + // Get last timestamp in log + l_sync_gdb.id_start = (uint64_t) dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); + // no limit + l_sync_gdb.id_end = (uint64_t)0; + dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); + } + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: { + dap_stream_ch_chain_sync_request_t l_sync_chains = {}; + dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_chains, sizeof(l_sync_chains)); + } + default: { //log_it(L_INFO, "Get %s packet", c_dap_stream_ch_chain_pkt_type_str[l_ch_pkt->hdr.type]); - } + } } if(l_ch_chain->callback_notify_packet_in) l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt, @@ -668,7 +652,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) // last message - dap_stream_ch_chain_sync_request_t l_request = { { 0 } }; + dap_stream_ch_chain_sync_request_t l_request = {}; dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); diff --git a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h index c0746bfd0230d87bbc68af82bbd776d735322bdc..47903691f86ad4677b5cb46bf7eeb8c9fc68ed0f 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h +++ b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h @@ -44,6 +44,8 @@ #define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB 0x12 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_ALL 0x22 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS 0x04 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS 0x14 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS 0x03 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB 0x13 diff --git a/modules/consensus/dag-pos/dap_chain_cs_dag_pos.c b/modules/consensus/dag-pos/dap_chain_cs_dag_pos.c index 44be5c56f1fe14ee13efcf88f766a14fbe4d1a48..fc3337dcef13d408ac1a63ed4e7f89dc3a0e9c21 100644 --- a/modules/consensus/dag-pos/dap_chain_cs_dag_pos.c +++ b/modules/consensus/dag-pos/dap_chain_cs_dag_pos.c @@ -275,7 +275,7 @@ static int s_callback_event_verify(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_ char *l_addr_str = dap_chain_addr_to_str(&l_addr); log_it(L_WARNING, "Verify of event is false, because bal is not enough for addr=%s", l_addr_str); DAP_DELETE(l_addr_str); - return 0; //-1; + return -1; } } diff --git a/modules/consensus/none/dap_chain_cs_none.c b/modules/consensus/none/dap_chain_cs_none.c index 59af3853fdb8f111e211d8e975e235949373d166..5086bc7adea914a2e9a423db8feeea8bec9a4572 100644 --- a/modules/consensus/none/dap_chain_cs_none.c +++ b/modules/consensus/none/dap_chain_cs_none.c @@ -134,13 +134,12 @@ int dap_chain_gdb_init(void) static void s_history_callback_notify(void * a_arg, const char a_op_code, const char * a_prefix, const char * a_group, const char * a_key, const void * a_value, const size_t a_value_size) { - (void) a_value; - (void) a_prefix; if (a_arg){ dap_chain_gdb_t * l_gdb = (dap_chain_gdb_t *) a_arg; dap_chain_net_t *l_net = dap_chain_net_by_id( l_gdb->chain->net_id); log_it(L_DEBUG,"%s.%s: op_code='%c' group=\"%s\" key=\"%s\" value_size=%u",l_net->pub.name, l_gdb->chain->name, a_op_code, a_group, a_key, a_value_size); + dap_chain_net_sync_gdb_broadcast((void *)l_net, a_op_code, a_prefix, a_group, a_key, a_value, a_value_size); } } diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 3df5696454e679022a615867a307813dfea5a1a6..89503c1e4da0af1745cb3a4a2bbfd07fe29ba501 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -65,12 +65,10 @@ #include "dap_enc_http.h" #include "dap_chain_common.h" #include "dap_chain_net.h" -#include "dap_chain_net_srv.h" #include "dap_chain_node_client.h" #include "dap_chain_node_cli.h" #include "dap_chain_node_cli_cmd.h" #include "dap_chain_ledger.h" -#include "dap_chain_net_srv_stake.h" #include "dap_chain_global_db.h" #include "dap_chain_global_db_remote.h" @@ -81,6 +79,7 @@ #include "dap_stream_ch_chain_pkt.h" #include "dap_stream_ch.h" #include "dap_stream_ch_pkt.h" +#include "dap_dns_server.h" #include "dap_module.h" @@ -120,18 +119,12 @@ typedef struct dap_chain_net_pvt{ time_t last_sync; dap_chain_node_addr_t * node_addr; - dap_chain_node_info_t * node_info; // Current node's info + dap_chain_node_info_t * node_info; // Current node's info - dap_chain_node_client_t * links; + dap_list_t *links; // Links list + dap_list_t *links_info; // Links info list - dap_chain_node_addr_t *links_addrs; - size_t links_count; - size_t links_addrs_count; - size_t links_success; - - size_t addr_request_attempts; bool load_mode; - uint8_t padding2[7]; char ** seed_aliases; uint16_t gdb_sync_groups_count; @@ -139,7 +132,6 @@ typedef struct dap_chain_net_pvt{ char **gdb_sync_groups; dap_chain_node_addr_t *gdb_sync_nodes_addrs; - uint8_t padding3[6]; uint16_t seed_aliases_count; dap_chain_net_state_t state; @@ -183,7 +175,7 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx); static void s_gbd_history_callback_notify (void * a_arg,const char a_op_code, const char * a_prefix, const char * a_group, const char * a_key, const void * a_value, const size_t a_value_len); -static void s_chain_callback_notify(void * a_arg, dap_chain_t *a_chain, dap_chain_cell_id_t a_id); +static void s_chain_callback_notify(void * a_arg, dap_chain_t *a_chain, dap_chain_cell_id_t a_id, void *a_atom, size_t a_atom_size); static int s_cli_net(int argc, char ** argv, void *arg_func, char **str_reply); @@ -191,6 +183,9 @@ static bool s_seed_mode = false; static uint8_t *dap_chain_net_set_acl(dap_chain_hash_fast_t *a_pkey_hash); +static dap_global_db_obj_callback_notify_t s_srv_callback_notify = NULL; + + char *dap_chain_net_get_gdb_group_acl(dap_chain_net_t *a_net) { if (a_net) { @@ -207,18 +202,6 @@ char *dap_chain_net_get_gdb_group_acl(dap_chain_net_t *a_net) return NULL; } -/** - * @brief s_net_set_go_sync - * @param a_net - * @return - */ -void s_net_set_go_sync(dap_chain_net_t * a_net) -{ - if(!a_net) - return; - dap_chain_net_state_go_to(a_net, NET_STATE_SYNC_REQUESTED); -} - /** * @brief s_net_state_to_str * @param l_state @@ -236,16 +219,10 @@ inline static const char * s_net_state_to_str(dap_chain_net_state_t l_state) */ int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_new_state) { - if (a_new_state == NET_STATE_SYNC_REQUESTED) { - if (PVT(a_net)->state_target != NET_STATE_OFFLINE) { - PVT(a_net)->state_target = NET_STATE_ONLINE; - } - } else { - if (PVT(a_net)->state_target == a_new_state){ - log_it(L_WARNING,"Already going to state %s",s_net_state_to_str(a_new_state)); - } - PVT(a_net)->state_target = a_new_state; + if (PVT(a_net)->state_target == a_new_state){ + log_it(L_WARNING,"Already going to state %s",s_net_state_to_str(a_new_state)); } + PVT(a_net)->state_target = a_new_state; pthread_mutex_lock( &PVT(a_net)->state_mutex_cond); // set flag for sync PVT(a_net)->flags |= F_DAP_CHAIN_NET_GO_SYNC; @@ -258,6 +235,50 @@ int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_n return 0; } + +void dap_chain_net_set_srv_callback_notify(dap_global_db_obj_callback_notify_t a_callback) +{ + s_srv_callback_notify = a_callback; +} + +void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const char *a_prefix, const char *a_group, + const char *a_key, const void *a_value, const size_t a_value_len) +{ + UNUSED(a_prefix); + UNUSED(a_value_len); + dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; + if (PVT(l_net)->state == NET_STATE_ONLINE) { + char *l_group; + if (a_op_code == 'd') { + l_group = dap_strdup_printf("%s.del", a_group); + } else { + l_group = (char *)a_group; + } + dap_store_obj_t *l_obj = (dap_store_obj_t *)dap_chain_global_db_obj_get(a_key, l_group); + if (a_op_code == 'd') { + DAP_DELETE(l_group); + } + if (!l_obj) { + log_it(L_DEBUG, "Notified GDB event does not exist"); + return; + } + l_obj->type = (uint8_t)a_op_code; + DAP_DELETE(l_obj->group); + l_obj->group = dap_strdup(a_group); + dap_store_obj_pkt_t *l_data_out = dap_store_packet_multiple(l_obj, l_obj->timestamp, 1); + dap_store_obj_free(l_obj, 1); + dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb"); + dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t) {}; + for (dap_list_t *l_tmp = PVT(l_net)->links; l_tmp; l_tmp = dap_list_next(l_tmp)) { + dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)l_tmp->data; + dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id()); + dap_stream_ch_chain_pkt_write(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_net->pub.id, + l_chain_id, l_net->pub.cell_id, l_data_out, sizeof(dap_store_obj_pkt_t) + l_data_out->data_size); + } + DAP_DELETE(l_data_out); + } +} + /** * @brief s_gbd_history_callback_notify * @param a_arg @@ -269,53 +290,14 @@ int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_n * @param a_value_len */ static void s_gbd_history_callback_notify (void * a_arg, const char a_op_code, const char * a_prefix, const char * a_group, - const char * a_key, const void * a_value, - const size_t a_value_len) + const char * a_key, const void * a_value, const size_t a_value_len) { - (void) a_op_code; - UNUSED(a_prefix); - UNUSED(a_value_len); - if (a_arg) { - dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg; - s_net_set_go_sync(l_net); - /*if (!PVT (l_net)->load_mode ){ - if( pthread_mutex_trylock( &PVT (l_net)->state_mutex) == 0 ){ - if ( PVT(l_net)->state == NET_STATE_ONLINE || PVT(l_net)->state == NET_STATE_ONLINE ) - dap_chain_net_sync_all(l_net); - pthread_mutex_unlock( &PVT (l_net)->state_mutex); - } - }*/ - } - if (!a_value || !dap_config_get_item_bool_default(g_config, "srv", "order_signed_only", false)) { + if (!a_arg) { return; } - dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; - char *l_gdb_group_str = dap_chain_net_srv_order_get_gdb_group(l_net); - if (!strcmp(a_group, l_gdb_group_str)) { - dap_chain_net_srv_order_t *l_order = (dap_chain_net_srv_order_t *)a_value; - if (l_order->version == 1) { - dap_chain_global_db_gr_del((char *)a_key, a_group); - } else { - dap_sign_t *l_sign = (dap_sign_t *)&l_order->ext[l_order->ext_size]; - if (!dap_sign_verify(l_sign, l_order, sizeof(dap_chain_net_srv_order_t) + l_order->ext_size)) { - dap_chain_global_db_gr_del((char *)a_key, a_group); - DAP_DELETE(l_gdb_group_str); - return; - } - dap_chain_hash_fast_t l_pkey_hash; - if (!dap_sign_get_pkey_hash(l_sign, &l_pkey_hash)) { - dap_chain_global_db_gr_del((char *)a_key, a_group); - DAP_DELETE(l_gdb_group_str); - return; - } - dap_chain_addr_t l_addr = {}; - dap_chain_addr_fill(&l_addr, l_sign->header.type, &l_pkey_hash, l_net->pub.id); - uint64_t l_solvency = dap_chain_ledger_calc_balance(l_net->pub.ledger, &l_addr, l_order->price_ticker); - if (l_solvency < l_order->price && !dap_chain_net_srv_stake_key_delegated(&l_addr)) { - dap_chain_global_db_gr_del((char *)a_key, a_group); - } - } - DAP_DELETE(l_gdb_group_str); + dap_chain_net_sync_gdb_broadcast(a_arg, a_op_code, a_prefix, a_group, a_key, a_value, a_value_len); + if (s_srv_callback_notify) { + s_srv_callback_notify(a_arg, a_op_code, a_prefix, a_group, a_key, a_value, a_value_len); } } @@ -325,14 +307,24 @@ static void s_gbd_history_callback_notify (void * a_arg, const char a_op_code, c * @param a_chain * @param a_id */ -static void s_chain_callback_notify(void * a_arg, dap_chain_t *a_chain, dap_chain_cell_id_t a_id) +static void s_chain_callback_notify(void * a_arg, dap_chain_t *a_chain, dap_chain_cell_id_t a_id, void* a_atom, size_t a_atom_size) { - UNUSED(a_chain); - UNUSED(a_id); - if(!a_arg) + if (!a_arg) return; - dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg; - s_net_set_go_sync(l_net); + dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; + if (PVT(l_net)->state == NET_STATE_ONLINE) { + for (dap_list_t *l_tmp = PVT(l_net)->links; l_tmp; l_tmp = dap_list_next(l_tmp)) { + dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)l_tmp->data; + uint8_t l_ch_id = dap_stream_ch_chain_get_id(); // Channel id for global_db and chains sync + dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch(l_node_client->client, l_ch_id); + if (!l_ch_chain) { + log_it(L_DEBUG,"Can't get stream_ch for id='%c' ", l_ch_id); + continue; + } + dap_stream_ch_chain_pkt_write(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_net->pub.id, + a_chain->id, a_id, a_atom, a_atom_size); + } + } } @@ -347,18 +339,19 @@ static int s_net_states_proc(dap_chain_net_t * l_net) int ret=0; - switch ( l_pvt_net->state ){ - case NET_STATE_OFFLINE:{ - // reset current link - l_pvt_net->links_count = 0; + switch (l_pvt_net->state) { + case NET_STATE_OFFLINE: { // delete all links - dap_chain_node_client_close(l_pvt_net->links); + dap_list_t *l_tmp = l_pvt_net->links; + while (l_tmp) { + dap_list_t *l_next =l_tmp->next; + dap_chain_node_client_close(l_tmp->data); + DAP_DELETE(l_tmp); + l_tmp = l_next; + } l_pvt_net->links = NULL; - l_pvt_net->links_addrs_count = 0; - if ( l_pvt_net->links_addrs ) - DAP_DELETE(l_pvt_net->links_addrs); - l_pvt_net->links_addrs = NULL; - + dap_list_free_full(l_pvt_net->links_info, free); + l_pvt_net->links_info = NULL; if ( l_pvt_net->state_target != NET_STATE_OFFLINE ){ l_pvt_net->state = NET_STATE_LINKS_PREPARE; break; @@ -367,407 +360,197 @@ static int s_net_states_proc(dap_chain_net_t * l_net) l_pvt_net->flags &= ~F_DAP_CHAIN_NET_GO_SYNC; l_pvt_net->last_sync = 0; } break; - case NET_STATE_LINKS_PREPARE:{ + + case NET_STATE_LINKS_PREPARE: { log_it(L_NOTICE,"%s.state: NET_STATE_LINKS_PREPARE",l_net->pub.name); + if (l_pvt_net->node_info) { + for (size_t i = 0; i < l_pvt_net->node_info->hdr.links_number; i++) { + dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(l_net, &l_pvt_net->node_info->links[i]); + if (l_link_node_info->hdr.address.uint64 == l_pvt_net->node_info->hdr.address.uint64) { + continue; // Do not link with self + } + l_pvt_net->links_info = dap_list_append(l_pvt_net->links_info, l_link_node_info); + } + } else { + log_it(L_WARNING,"No nodeinfo in global_db to prepare links for connecting, find nearest 3 links and fill global_db"); + } switch (l_pvt_net->node_role.enums) { case NODE_ROLE_ROOT: case NODE_ROLE_ROOT_MASTER: case NODE_ROLE_ARCHIVE: - case NODE_ROLE_CELL_MASTER:{ - // This roles load predefined links from global_db - if ( l_pvt_net->node_info ) { - if (l_pvt_net->links_addrs ) - DAP_DELETE(l_pvt_net->links_addrs); - l_pvt_net->links_addrs_count = l_pvt_net->node_info->hdr.links_number; - l_pvt_net->links_addrs = DAP_NEW_Z_SIZE( dap_chain_node_addr_t, - l_pvt_net->links_addrs_count * sizeof(dap_chain_node_addr_t)); - for (size_t i =0 ; i < l_pvt_net->node_info->hdr.links_number; i++ ){ - l_pvt_net->links_addrs[i].uint64 = l_pvt_net->node_info->links[i].uint64; - } - }else { - log_it(L_WARNING,"No nodeinfo in global_db to prepare links for connecting, find nearest 3 links and fill global_db"); - } - - // add other root nodes for connect - //if(!l_pvt_net->links_addrs_count) - { - // use no more then 4 root node - int l_use_root_nodes = min(4, l_pvt_net->seed_aliases_count); - if(!l_pvt_net->links_addrs_count) { - l_pvt_net->links_addrs = DAP_NEW_Z_SIZE(dap_chain_node_addr_t, - l_use_root_nodes * sizeof(dap_chain_node_addr_t)); - } - else{ - l_pvt_net->links_addrs = DAP_REALLOC(l_pvt_net->links_addrs, - (l_pvt_net->links_addrs_count+l_use_root_nodes) * sizeof(dap_chain_node_addr_t)); - memset(l_pvt_net->links_addrs + l_pvt_net->links_addrs_count, 0, - l_use_root_nodes * sizeof(dap_chain_node_addr_t)); - } - - for(uint16_t i = 0; i < l_use_root_nodes; i++) { - dap_chain_node_addr_t * l_node_addr = dap_chain_node_alias_find(l_net, l_pvt_net->seed_aliases[i]); - if(l_node_addr) { - l_pvt_net->links_addrs[l_pvt_net->links_addrs_count].uint64 = l_node_addr->uint64; - l_pvt_net->links_addrs_count++; - } - } - } - // shuffle the order of the nodes - for(size_t i = 0; i < l_pvt_net->links_addrs_count; i++) { - unsigned int l_new_node_pos = rand() % (l_pvt_net->links_addrs_count); - if(i == l_new_node_pos) - continue; - uint64_t l_tmp_uint64 = l_pvt_net->links_addrs[i].uint64; - l_pvt_net->links_addrs[i].uint64 = l_pvt_net->links_addrs[l_new_node_pos].uint64; - l_pvt_net->links_addrs[l_new_node_pos].uint64 = l_tmp_uint64; + case NODE_ROLE_CELL_MASTER: { + // Add other root nodes as synchronization links + while (dap_list_length(l_pvt_net->links_info) < s_max_links_count) { + int i = rand() % l_pvt_net->seed_aliases_count; + dap_chain_node_addr_t *l_link_addr = dap_chain_node_alias_find(l_net, l_pvt_net->seed_aliases[i]); + dap_chain_node_info_read(l_net, l_link_addr); } - - } break; case NODE_ROLE_FULL: case NODE_ROLE_MASTER: - case NODE_ROLE_LIGHT:{ - // If we haven't any assigned shard - connect to root-0 - if(1) { //if(l_net->pub.cell_id.uint64 == 0) { - - //dap_chain_net_pvt_t *pvt_debug = l_pvt_net; - // get current node address - dap_chain_node_addr_t l_address; - l_address.uint64 = dap_chain_net_get_cur_addr(l_net) ? - dap_chain_net_get_cur_addr(l_net)->uint64 : - dap_db_get_cur_node_addr(l_net->pub.name); - - // get current node info - dap_chain_node_info_t *l_cur_node_info = dap_chain_node_info_read(l_net, &l_address); - - if ( l_cur_node_info ) { - uint16_t l_links_addrs_count = l_cur_node_info->hdr.links_number + l_pvt_net->seed_aliases_count; - l_pvt_net->links_addrs = DAP_NEW_Z_SIZE(dap_chain_node_addr_t, - l_links_addrs_count * sizeof(dap_chain_node_addr_t)); - - // add linked nodes for connect - for(uint16_t i = 0; i < min(s_max_links_count, l_cur_node_info->hdr.links_number); i++) { - dap_chain_node_addr_t *l_addr = l_cur_node_info->links + i; - //dap_chain_node_addr_t link_addr = l_cur_node_info->links[i]; - dap_chain_node_info_t *l_remore_node_info = dap_chain_node_info_read(l_net, l_addr); - if(l_remore_node_info) { - // if only nodes from the same cell of cell=0 - if(!l_cur_node_info->hdr.cell_id.uint64 || - l_cur_node_info->hdr.cell_id.uint64 == l_remore_node_info->hdr.cell_id.uint64) { - l_pvt_net->links_addrs[l_pvt_net->links_addrs_count].uint64 = - l_remore_node_info->hdr.address.uint64; - l_pvt_net->links_addrs_count++; - } - DAP_DELETE(l_remore_node_info); - } + case NODE_ROLE_LIGHT: + default: { + // Get DNS request result from root nodes as synchronization links + while (dap_list_length(l_pvt_net->links_info) < s_max_links_count) { + int i = rand() % l_pvt_net->seed_aliases_count; + dap_chain_node_addr_t *l_remote_addr = dap_chain_node_alias_find(l_net, l_pvt_net->seed_aliases[i]); + dap_chain_node_info_t *l_remote_node_info = dap_chain_node_info_read(l_net, l_remote_addr); + dap_chain_node_info_t *l_link_node_info = DAP_NEW_Z(dap_chain_node_info_t); + int res = 0;//dap_dns_client_get_addr(l_remote_node_info->hdr.ext_addr_v4.s_addr, l_net->pub.name, l_link_node_info); + memcpy(l_link_node_info, l_remote_node_info, sizeof(dap_chain_node_info_t)); + DAP_DELETE(l_remote_node_info); + if (res) { + DAP_DELETE(l_link_node_info); + } else { + l_pvt_net->links_info = dap_list_append(l_pvt_net->links_info, l_link_node_info); } - } - // if no links then add root nodes for connect - if(!l_pvt_net->links_addrs_count){ - // use no more then s_max_links_count root node - int l_use_root_nodes = min(s_max_links_count, l_pvt_net->seed_aliases_count); - l_pvt_net->links_addrs = DAP_NEW_Z_SIZE(dap_chain_node_addr_t, - l_use_root_nodes * sizeof(dap_chain_node_addr_t)); - - for(uint16_t i = 0; i < l_use_root_nodes; i++) { - dap_chain_node_addr_t * l_node_addr = dap_chain_node_alias_find(l_net, l_pvt_net->seed_aliases[i]); - if(l_node_addr) { - l_pvt_net->links_addrs[l_pvt_net->links_addrs_count].uint64 = l_node_addr->uint64; - l_pvt_net->links_addrs_count++; - } + if (l_pvt_net->state_target == NET_STATE_OFFLINE) { + l_pvt_net->state = NET_STATE_OFFLINE; + break; } } - // shuffle the order of the nodes - for(size_t i = 0; i < l_pvt_net->links_addrs_count; i++) { - unsigned int l_new_node_pos = rand() % (l_pvt_net->links_addrs_count); - if(i==l_new_node_pos) - continue; - uint64_t l_tmp_uint64 = l_pvt_net->links_addrs[i].uint64; - l_pvt_net->links_addrs[i].uint64 = l_pvt_net->links_addrs[l_new_node_pos].uint64; - l_pvt_net->links_addrs[l_new_node_pos].uint64 = l_tmp_uint64; - } - DAP_DELETE(l_cur_node_info); - }else { - // TODO read cell's nodelist and populate array with it - } } break; } - if ( l_pvt_net->state_target > NET_STATE_LINKS_PREPARE ){ - if ( l_pvt_net->links_addrs_count>0 ) { // If links are present - l_pvt_net->state = NET_STATE_LINKS_CONNECTING; - log_it(L_DEBUG,"Prepared %u links, start to establish them", l_pvt_net->links_addrs_count ); - } else { - log_it(L_WARNING,"No links for connecting, return back to OFFLINE state"); - l_pvt_net->state = NET_STATE_OFFLINE; - // remove looping - l_pvt_net->state_target = NET_STATE_OFFLINE; - } - }else { - log_it(L_WARNING,"Target state is NET_STATE_LINKS_PREPARE? Realy?"); - l_pvt_net->state = NET_STATE_OFFLINE; - } - l_pvt_net->links_success = 0; - } - break; - - case NET_STATE_LINKS_CONNECTING:{ - size_t l_links_count = l_pvt_net->links_count; - if (l_pvt_net->links_success >= s_required_links_count) { - log_it(L_INFO, "Synchronization done"); - l_pvt_net->links_count = 0; - l_pvt_net->links_success = 0; - l_pvt_net->flags &= ~F_DAP_CHAIN_NET_GO_SYNC; - l_pvt_net->state = NET_STATE_ONLINE; - break; - } - if (l_links_count < l_pvt_net->links_addrs_count) { - l_pvt_net->links_count++; + if (l_pvt_net->state_target != NET_STATE_OFFLINE) { + l_pvt_net->state = NET_STATE_LINKS_CONNECTING; + log_it(L_DEBUG, "Prepared %u links, start to establish them", dap_list_length(l_pvt_net->links_info)); } else { - log_it(L_NOTICE, "Can't establish enough links, go to offline"); - l_pvt_net->state = NET_STATE_OFFLINE; - l_pvt_net->state_target = NET_STATE_OFFLINE; - break; + l_pvt_net->state = l_pvt_net->state_target = NET_STATE_OFFLINE; } + } break; + + case NET_STATE_LINKS_CONNECTING: { log_it(L_DEBUG, "%s.state: NET_STATE_LINKS_CONNECTING",l_net->pub.name); - log_it(L_DEBUG, "Establishing connection with " NODE_ADDR_FP_STR, - NODE_ADDR_FP_ARGS_S( l_pvt_net->links_addrs[l_links_count]) ); - dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(l_net, &l_pvt_net->links_addrs[l_links_count]); - if ( l_link_node_info ) { - dap_chain_node_client_t *l_node_client = dap_chain_node_client_connect(l_link_node_info); - if(!l_node_client) { - DAP_DELETE(l_link_node_info); - ret = -1; - break; + for (dap_list_t *l_tmp = l_pvt_net->links_info; l_tmp; l_tmp = dap_list_next(l_tmp)) { + dap_chain_node_info_t *l_link_info = (dap_chain_node_info_t *)l_tmp->data; + dap_chain_node_client_t *l_node_client = dap_chain_node_client_connect(l_link_info); + if (l_node_client) { + // wait connected + int timeout_ms = 5000; //5 sec = 5000 ms + int res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_CONNECTED, timeout_ms); + if (res == 0 ) { + log_it(L_DEBUG, "Established connection with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address)); + l_pvt_net->links = dap_list_append(l_pvt_net->links, l_node_client); + } else { + dap_chain_node_client_close(l_node_client); + l_node_client = NULL; + } + } + if (!l_node_client) { + log_it(L_DEBUG, "Can't establish link with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address)); } - // wait connected - int timeout_ms = 5000; //5 sec = 5000 ms - int res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_CONNECTED, timeout_ms); - if (res == 0 ){ - log_it(L_DEBUG, "Connected link %u", l_links_count); - l_pvt_net->links = l_node_client; - l_pvt_net->state = NET_STATE_LINKS_ESTABLISHED; - }else { - log_it(L_DEBUG, "Cant establish link %u", l_links_count); - dap_chain_node_client_close(l_node_client); - l_pvt_net->links = NULL; + if (dap_list_length(l_pvt_net->links) >= s_required_links_count) { + break; } } - } - break; + if (l_pvt_net->links) { // We have at least one working link + l_pvt_net->state = NET_STATE_SYNC_GDB; + } else { // Try to find another links + struct timespec l_sleep = {3, 0}; + nanosleep(&l_sleep, NULL); + l_pvt_net->state = NET_STATE_OFFLINE; + } + } break; - case NET_STATE_LINKS_ESTABLISHED:{ - log_it(L_DEBUG,"%s.state: NET_STATE_LINKS_ESTABLISHED",l_net->pub.name); - switch (l_pvt_net->state_target) { - case NET_STATE_ONLINE:{ // Online - switch ( l_pvt_net->node_role.enums ){ - case NODE_ROLE_ROOT_MASTER: - case NODE_ROLE_ROOT:{ - /*dap_chain_node_client_t * l_node_client = NULL, *l_node_client_tmp = NULL; - - // Send everybody your address when linked - HASH_ITER(hh,l_pvt_net->links,l_node_client,l_node_client_tmp){ - dap_stream_ch_chain_net_pkt_write(dap_client_get_stream_ch( - l_node_client->client, dap_stream_ch_chain_net_get_id()), - DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR, l_net->pub.id, - dap_chain_net_get_cur_addr(l_net), - sizeof (dap_chain_node_addr_t) ); - }*/ - l_pvt_net->state = NET_STATE_SYNC_GDB; - }break; - case NODE_ROLE_CELL_MASTER: - case NODE_ROLE_MASTER:{ - l_pvt_net->state = NET_STATE_SYNC_GDB;//NET_STATE_ADDR_REQUEST; - } break; - default:{ - // get addr for current node if it absent - if(!dap_chain_net_get_cur_addr_int(l_net)) - l_pvt_net->state = NET_STATE_ADDR_REQUEST; - else - PVT( l_net)->state = NET_STATE_SYNC_GDB; - } - } + case NET_STATE_SYNC_GDB:{ + for (dap_list_t *l_tmp = l_pvt_net->links; l_tmp; l_tmp = dap_list_next(l_tmp)) { + dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)l_tmp->data; + dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; + // Get last timestamp in log + l_sync_gdb.id_start = (uint64_t) dap_db_log_get_last_id_remote(l_node_client->remote_node_addr.uint64); + // no limit + l_sync_gdb.id_end = (uint64_t)0; + + l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ? + dap_chain_net_get_cur_addr(l_net)->uint64 : + dap_db_get_cur_node_addr(l_net->pub.name); + log_it(L_DEBUG, "Prepared request to gdb sync from %llu to %llu", l_sync_gdb.id_start, l_sync_gdb.id_end); + // find dap_chain_id_t + dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb"); + dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t ) {}; + + size_t l_res = dap_stream_ch_chain_pkt_write(dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id()), + DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id, l_chain_id, + l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); + if (l_res == 0) { + log_it(L_WARNING, "Can't send GDB sync request"); + continue; } - break; - - case NET_STATE_SYNC_GDB: // we need only to sync gdb - l_pvt_net->state = NET_STATE_SYNC_GDB ; - if ( l_pvt_net->addr_request_attempts >=10 && l_pvt_net->state == NET_STATE_ADDR_REQUEST){ - l_pvt_net->addr_request_attempts = 0; - switch( l_pvt_net->state_target){ - case NET_STATE_ONLINE: - case NET_STATE_SYNC_GDB: - l_pvt_net->state = NET_STATE_SYNC_GDB; - break; - - case NET_STATE_SYNC_CHAINS: - l_pvt_net->state = NET_STATE_SYNC_CHAINS; - break; - default: { - l_pvt_net->state = NET_STATE_OFFLINE; - l_pvt_net->state_target = NET_STATE_OFFLINE; - } - } - } - break; - case NET_STATE_SYNC_CHAINS: - l_pvt_net->state = (l_pvt_net->node_info && l_pvt_net->node_info->hdr.address.uint64)? - NET_STATE_SYNC_CHAINS : NET_STATE_ADDR_REQUEST; + // wait for finishing of request + int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms + // TODO add progress info to console + l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); + switch (l_res) { + case -1: + log_it(L_WARNING,"Timeout with link sync"); break; - - case NET_STATE_ADDR_REQUEST : - l_pvt_net->state = NET_STATE_ADDR_REQUEST; + case 0: + log_it(L_INFO, "Node sync completed"); break; - default: break; - } - } break; - // get addr for remote node - case NET_STATE_ADDR_REQUEST: { - int l_is_addr_leased = 0; - dap_chain_node_client_t *l_node_client = l_pvt_net->links; - uint8_t l_ch_id = dap_stream_ch_chain_net_get_id(); // Channel id for chain net request - dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch(l_node_client->client, l_ch_id); - // set callback for l_ch_id - dap_chain_node_client_set_callbacks(l_node_client->client, l_ch_id); - // send request for new address - size_t res = dap_stream_ch_chain_net_pkt_write(l_ch_chain, - DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_LEASE_REQUEST, - //DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_REQUEST, - l_net->pub.id, NULL, 0); - if (res == 0) { - log_it(L_WARNING,"Can't send NODE_ADDR_REQUEST packet"); - dap_chain_node_client_close(l_node_client); - l_pvt_net->links = NULL; - l_pvt_net->state = NET_STATE_LINKS_CONNECTING; - break; // try with another link - } - // wait for finishing of request - int timeout_ms = 5000; // 5 sec = 5 000 ms - // TODO add progress info to console - l_pvt_net->addr_request_attempts++; - int l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_NODE_ADDR_LEASED, timeout_ms); - switch (l_res) { + default: + log_it(L_INFO, "Node sync error %d",l_res); + } + l_res = dap_stream_ch_chain_pkt_write(dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id()), + DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS, l_net->pub.id, l_chain_id, + l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); + l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); + switch (l_res) { case -1: - log_it(L_WARNING,"Timeout with addr leasing"); - // try again 3 times - if (l_pvt_net->addr_request_attempts < 3) { - break; - } - l_pvt_net->state = NET_STATE_LINKS_CONNECTING; - break; // try with another link + log_it(L_WARNING,"Timeout with link sync"); + break; case 0: - log_it(L_INFO, "Node address leased"); - l_is_addr_leased++; - l_pvt_net->addr_request_attempts = 0; + log_it(L_INFO, "Node sync completed"); break; default: - if (l_node_client->last_error[0]) { - log_it(L_INFO, "Node address request error %d: \"%s\"", l_res, l_node_client->last_error); - l_node_client->last_error[0] = '\0'; - } - log_it(L_INFO, "Node address request error %d", l_res); - l_pvt_net->state = NET_STATE_LINKS_CONNECTING; - break; - } - if (l_is_addr_leased > 0) { - l_pvt_net->state = NET_STATE_SYNC_GDB; - } - } - break; - case NET_STATE_SYNC_GDB:{ - // send request - dap_chain_node_client_t *l_node_client = l_pvt_net->links; - dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; - // Get last timestamp in log - l_sync_gdb.id_start = (uint64_t) dap_db_log_get_last_id_remote(l_node_client->remote_node_addr.uint64); - // no limit - l_sync_gdb.id_end = (uint64_t)0; - - l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ? - dap_chain_net_get_cur_addr(l_net)->uint64 : - dap_db_get_cur_node_addr(l_net->pub.name); - - dap_chain_id_t l_chain_id_null = {}; - dap_chain_cell_id_t l_chain_cell_id_null = {}; - l_chain_id_null.uint64 = l_net->pub.id.uint64; - l_chain_cell_id_null.uint64 = dap_chain_net_get_cur_cell(l_net) ? dap_chain_net_get_cur_cell(l_net)->uint64 : 0; - - log_it(L_DEBUG, "Prepared request to gdb sync from %llu to %llu", l_sync_gdb.id_start, l_sync_gdb.id_end); - // find dap_chain_id_t - dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb"); - dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t ) {}; - - size_t l_res = dap_stream_ch_chain_pkt_write(dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id()), - DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id, l_chain_id, - l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); - if (l_res == 0) { - log_it(L_WARNING, "Can't send GDB sync request"); - dap_chain_node_client_close(l_node_client); - l_pvt_net->links = NULL; - l_pvt_net->state = NET_STATE_LINKS_CONNECTING; - break; //try another link - } - - // wait for finishing of request - int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms - // TODO add progress info to console - l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); - switch (l_res) { - case -1: - log_it(L_WARNING,"Timeout with link sync"); - break; - case 0: - log_it(L_INFO, "Node sync completed"); - break; - default: - log_it(L_INFO, "Node sync error %d",l_res); - } - if (l_res) { // try another link - break; + log_it(L_INFO, "Node sync error %d",l_res); + } } - if(l_pvt_net->state_target >= NET_STATE_SYNC_CHAINS){ + if (l_pvt_net->state_target >= NET_STATE_SYNC_CHAINS){ l_pvt_net->state = NET_STATE_SYNC_CHAINS; - } else { - l_pvt_net->links_success++; - l_pvt_net->state = NET_STATE_LINKS_CONNECTING; + } else { // Synchronization done, go offline + log_it(L_INFO, "Synchronization done"); + l_pvt_net->state = l_pvt_net->state_target = NET_STATE_OFFLINE; } } break; case NET_STATE_SYNC_CHAINS: { - dap_chain_node_client_t *l_node_client = l_pvt_net->links; - uint8_t l_ch_id = dap_stream_ch_chain_get_id(); // Channel id for global_db and chains sync - dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch(l_node_client->client, l_ch_id); - if(!l_ch_chain) { - log_it(L_DEBUG,"Can't get stream_ch for id='%c' ", l_ch_id); - l_pvt_net->state = NET_STATE_LINKS_CONNECTING; - break; - } - dap_chain_t * l_chain = NULL; - int l_sync_errors = 0; - DL_FOREACH(l_net->pub.chains, l_chain ){ - //size_t l_lasts_size = 0; - //dap_chain_atom_ptr_t * l_lasts; - //dap_chain_atom_iter_t * l_atom_iter = l_chain->callback_atom_iter_create(l_chain); - //l_lasts = l_chain->callback_atom_iter_get_lasts(l_atom_iter, &l_lasts_size); - //if( l_lasts ) { + for (dap_list_t *l_tmp = l_pvt_net->links; l_tmp; l_tmp = dap_list_next(l_tmp)) { + dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)l_tmp->data; + uint8_t l_ch_id = dap_stream_ch_chain_get_id(); // Channel id for global_db and chains sync + dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch(l_node_client->client, l_ch_id); + if (!l_ch_chain) { + log_it(L_DEBUG,"Can't get stream_ch for id='%c' ", l_ch_id); + continue; + } + dap_chain_t * l_chain = NULL; + DL_FOREACH (l_net->pub.chains, l_chain) { l_node_client->state = NODE_CLIENT_STATE_CONNECTED; dap_stream_ch_chain_sync_request_t l_request ; memset(&l_request, 0, sizeof (l_request)); - //dap_hash_fast(l_lasts[0], l_chain->callback_atom_get_size(l_lasts[0]), &l_request.hash_from); - dap_stream_ch_chain_pkt_write(l_ch_chain, - DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id, l_chain->id, - l_net->pub.cell_id, &l_request, sizeof(l_request)); - // - // dap_chain_node_client_send_ch_pkt(l_node_client,l_ch_id, - // DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, - // &l_request,sizeof (l_request) ); - + dap_stream_ch_chain_pkt_write(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id, + l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request)); // wait for finishing of request int timeout_ms = 120000; // 2 min = 120 sec = 120 000 ms // TODO add progress info to console int l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); switch (l_res) { + case -1: + log_it(L_WARNING,"Timeout with sync of chain '%s' ", l_chain->name); + break; + case 0: + // flush global_db + dap_chain_global_db_flush(); + log_it(L_INFO, "sync of chain '%s' completed ", l_chain->name); + break; + default: + log_it(L_ERROR, "sync of chain '%s' error %d", l_chain->name,l_res); + } + dap_stream_ch_chain_pkt_write(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS, l_net->pub.id, + l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request)); + l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); + switch (l_res) { case -1: log_it(L_WARNING,"Timeout with sync of chain '%s' ", l_chain->name); break; @@ -778,27 +561,22 @@ static int s_net_states_proc(dap_chain_net_t * l_net) // set time of last sync { struct timespec l_to; - clock_gettime( CLOCK_MONOTONIC, &l_to); + clock_gettime(CLOCK_MONOTONIC, &l_to); l_pvt_net->last_sync = l_to.tv_sec; } break; default: log_it(L_ERROR, "sync of chain '%s' error %d", l_chain->name,l_res); } - if (l_res) { - l_sync_errors++; - } - //DAP_DELETE( l_lasts ); - //} - //DAP_DELETE( l_atom_iter ); + } } - dap_chain_node_client_close(l_node_client); - l_pvt_net->links = NULL; - if (!l_sync_errors) { - l_pvt_net->links_success++; + log_it(L_INFO, "Synchronization done"); + if (l_pvt_net->state_target == NET_STATE_ONLINE) { + l_pvt_net->flags &= ~F_DAP_CHAIN_NET_GO_SYNC; + l_pvt_net->state = NET_STATE_ONLINE; + } else { // Synchronization done, go offline + l_pvt_net->state = l_pvt_net->state_target = NET_STATE_OFFLINE; } - l_pvt_net->state = NET_STATE_LINKS_CONNECTING; - break; } break; @@ -814,7 +592,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net) case NET_STATE_ONLINE: case NET_STATE_SYNC_GDB: case NET_STATE_SYNC_CHAINS: - l_pvt_net->state = NET_STATE_LINKS_CONNECTING; + l_pvt_net->state = NET_STATE_SYNC_GDB; break; default: break; } @@ -826,18 +604,6 @@ static int s_net_states_proc(dap_chain_net_t * l_net) return ret; } -// Global -/*static void s_net_proc_thread_callback_update_db(void) -{ - dap_chain_net_item_t *l_net_item, *l_net_item_tmp; - printf("update0\n"); - HASH_ITER(hh, s_net_items, l_net_item, l_net_item_tmp) - { - s_net_set_go_sync(l_net_item->chain_net); - } - printf("update1\n"); -}*/ - /** * @brief s_net_proc_thread * @details Brings up and check the Dap Chain Network @@ -1112,7 +878,8 @@ void s_set_reply_text_node_status(char **a_str_reply, dap_chain_net_t * a_net){ PVT(a_net)->state == NET_STATE_SYNC_GDB || PVT(a_net)->state == NET_STATE_SYNC_CHAINS) l_sync_current_link_text_block = dap_strdup_printf(", processing link %u from %u", - PVT(a_net)->links_count, PVT(a_net)->links_addrs_count); + dap_list_length(PVT(a_net)->links), + dap_list_length(PVT(a_net)->links_info)); dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" has state %s (target state %s)%s%s", @@ -2091,6 +1858,9 @@ char * dap_chain_net_get_gdb_group_mempool_by_chain_type(dap_chain_net_t * l_net */ dap_chain_node_addr_t * dap_chain_net_get_cur_addr( dap_chain_net_t * l_net) { + if (!l_net) { + return NULL; + } return PVT(l_net)->node_info ? &PVT(l_net)->node_info->hdr.address : PVT(l_net)->node_addr; } diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 3c21deb1aeeac66b011c7baa30676b1fe13067ca..4647568525e0b222307464675ca6d353df626719 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -282,8 +282,8 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha #endif case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { dap_stream_ch_chain_sync_request_t * l_request = NULL; if(a_pkt_data_size == sizeof(*l_request)) l_request = (dap_stream_ch_chain_sync_request_t*) a_pkt->data; diff --git a/modules/net/dap_dns_server.c b/modules/net/dap_dns_server.c index 39862579c61decdcc930d0fb80135600aaaf1491..1f000b8a45d37d92528cba78e1f2f4ca0f9e62f3 100644 --- a/modules/net/dap_dns_server.c +++ b/modules/net/dap_dns_server.c @@ -22,6 +22,7 @@ along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. */ +#include <errno.h> #include "dap_dns_server.h" #include "dap_udp_server.h" #include "dap_udp_client.h" @@ -35,6 +36,12 @@ #define LOG_TAG "dap_dns_server" +#ifndef _WIN32 +#include <unistd.h> // for close +#define closesocket close +#define INVALID_SOCKET -1 +#endif + static dap_dns_server_t *s_dns_server; static char s_root_alias[] = "dnsroot"; @@ -82,7 +89,18 @@ void dap_dns_buf_put_uint32(dap_dns_buf_t *buf, uint32_t val) { dap_dns_buf_put_uint16(buf, val); } -uint32_t dap_dns_resolve_hostname(char *str) { +/** + * @brief dap_dns_buf_put_uint64 Put uint64 to network order + * @param buf DNS buffer structure + * @param val uint64 in host order + * @return none + */ +void dap_dns_buf_put_uint64(dap_dns_buf_t *buf, uint64_t val) { + dap_dns_buf_put_uint32(buf, val >> 32); + dap_dns_buf_put_uint32(buf, val); +} + +dap_chain_node_info_t *dap_dns_resolve_hostname(char *str) { log_it(L_DEBUG, "DNS parser retrieve hostname %s", str); dap_chain_net_t *l_net = dap_chain_net_by_name(str); if (l_net == NULL) { @@ -102,11 +120,11 @@ uint32_t dap_dns_resolve_hostname(char *str) { if (!l_nodes_count || !l_objs) return 0; size_t l_node_num = rand() % l_nodes_count; - dap_chain_node_info_t *l_node_info = (dap_chain_node_info_t *)l_objs[l_node_num].value; - struct in_addr addr = l_node_info->hdr.ext_addr_v4; + dap_chain_node_info_t *l_node_info = DAP_NEW_Z(dap_chain_node_info_t); + memcpy(l_node_info, l_objs[l_node_num].value, sizeof(dap_chain_node_info_t)); dap_chain_global_db_objs_delete(l_objs, l_nodes_count); - log_it(L_DEBUG, "DNS resolver find ip %s", inet_ntoa(addr)); - return addr.s_addr; + log_it(L_DEBUG, "DNS resolver find ip %s", inet_ntoa(l_node_info->hdr.ext_addr_v4)); + return l_node_info; } /** @@ -222,7 +240,7 @@ void dap_dns_client_read(dap_client_remote_t *client, void * arg) { if (val) { // No other sections should present goto cleanup; } - dap_dns_buf_put_uint16(dns_reply, val); + dap_dns_buf_put_uint16(dns_reply, 1); // 1 aditional section int dot_count = 0; dap_string_t *dns_hostname = dap_string_new(""); for (int i = 0; i < qdcount; i++) { @@ -271,16 +289,16 @@ void dap_dns_client_read(dap_client_remote_t *client, void * arg) { } } // Find ip addr - uint32_t ip_addr = 0; + dap_chain_node_info_t *l_node_info = NULL; if (flags->rcode == DNS_ERROR_NONE) { dap_dns_zone_callback_t callback = dap_dns_zone_find(dns_hostname->str); if (callback) { - ip_addr = callback(dns_hostname->str); + l_node_info = callback(dns_hostname->str); } } - if (ip_addr) { - // Compose DNS answer - block_len = DNS_ANSWER_SIZE; + if (l_node_info) { + // Compose DNS answer + block_len = DNS_ANSWER_SIZE * 2 - sizeof(uint16_t) + sizeof(uint64_t); dns_reply->data = DAP_REALLOC(dns_reply->data, dns_reply->ptr + block_len); val = 0xc000 | DNS_HEADER_SIZE; // Link to host name dap_dns_buf_put_uint16(dns_reply, val); @@ -289,10 +307,21 @@ void dap_dns_client_read(dap_client_remote_t *client, void * arg) { val = DNS_CLASS_TYPE_IN; dap_dns_buf_put_uint16(dns_reply, val); uint32_t ttl = DNS_TIME_TO_LIVE; + dap_dns_buf_put_uint32(dns_reply, ttl); + dap_dns_buf_put_uint16(dns_reply, 4); // RD len for ipv4 + dap_dns_buf_put_uint32(dns_reply, l_node_info->hdr.ext_addr_v4.s_addr); + val = 0xc000 | DNS_HEADER_SIZE; // Link to host name + dap_dns_buf_put_uint16(dns_reply, val); + val = DNS_RECORD_TYPE_TXT; + dap_dns_buf_put_uint16(dns_reply, val); + val = DNS_CLASS_TYPE_IN; + dap_dns_buf_put_uint16(dns_reply, val); dap_dns_buf_put_uint32(dns_reply, ttl); - val = 4; // RD len for ipv4 + val = sizeof(uint16_t) + sizeof(uint64_t); dap_dns_buf_put_uint16(dns_reply, val); - dap_dns_buf_put_uint32(dns_reply, ip_addr); + dap_dns_buf_put_uint16(dns_reply, l_node_info->hdr.ext_port); + dap_dns_buf_put_uint64(dns_reply, l_node_info->hdr.address.uint64); + DAP_DELETE(l_node_info); } else if (flags->rcode == DNS_ERROR_NONE) { flags->rcode = DNS_ERROR_NAME; } @@ -344,3 +373,116 @@ void dap_dns_server_stop() { dap_udp_server_delete(s_dns_server->instance); DAP_DELETE(s_dns_server); } + +int dap_dns_client_get_addr(uint32_t a_addr, char *a_name, dap_chain_node_info_t *a_result) +{ + const size_t l_buf_size = 1024; + uint8_t l_buf[l_buf_size]; + dap_dns_buf_t l_dns_request = {}; + l_dns_request.data = (char *)l_buf; + dap_dns_buf_put_uint16(&l_dns_request, rand() % 0xFFFF); // ID + dap_dns_message_flags_t l_flags = {}; + dap_dns_buf_put_uint16(&l_dns_request, l_flags.val); + dap_dns_buf_put_uint16(&l_dns_request, 1); // we have only 1 question + dap_dns_buf_put_uint16(&l_dns_request, 0); + dap_dns_buf_put_uint16(&l_dns_request, 0); + dap_dns_buf_put_uint16(&l_dns_request, 0); + size_t l_ptr = 0; + uint8_t *l_cur = l_buf + l_dns_request.ptr; + for (size_t i = 0; i <= strlen(a_name); i++) + { + if (a_name[i] == '.' || a_name[i] == 0) + { + *l_cur++ = i - l_ptr; + for( ; l_ptr < i; l_ptr++) + { + *l_cur++ = a_name[l_ptr]; + } + l_ptr++; + } + } + *l_cur++='\0'; + l_dns_request.ptr = l_cur - l_buf; + dap_dns_buf_put_uint16(&l_dns_request, DNS_RECORD_TYPE_A); + dap_dns_buf_put_uint16(&l_dns_request, DNS_CLASS_TYPE_IN); +#ifdef WIN32 + SOCKET l_sock; +#else + int l_sock; +#endif + l_sock = socket(AF_INET, SOCK_DGRAM, 0); + if (l_sock == INVALID_SOCKET) { + log_it(L_ERROR, "Socket error"); + return -1; + } + struct sockaddr_in l_addr; + l_addr.sin_family = AF_INET; + l_addr.sin_port = htons(DNS_LISTEN_PORT); + l_addr.sin_addr.s_addr = a_addr; + int l_portion = 0, l_len = l_dns_request.ptr; + for (int l_sent = 0; l_sent < l_len; l_sent += l_portion) { + l_portion = sendto(l_sock, (const char *)(l_buf + l_sent), l_len - l_sent, 0, (struct sockaddr *)&l_addr, sizeof(l_addr)); + if (l_portion < 0) { + log_it(L_ERROR, "send() function error"); + closesocket(l_sock); + return -2; + } + } + fd_set fd; + FD_ZERO(&fd); + FD_SET(l_sock, &fd); + struct timeval tv; + tv.tv_sec = 5; + tv.tv_usec = 0; +#ifdef WIN32 + int l_selected = select(1, &fd, NULL, NULL, &tv); +#else + int l_selected = select(l_sock + 1, &fd, NULL, NULL, &tv); +#endif + if (l_selected < 0) + { + log_it(L_WARNING, "select() error"); + closesocket(l_sock); + return -3; + } + if (l_selected == 0) + { + log_it(L_DEBUG, "select() timeout"); + closesocket(l_sock); + return -4; + } + struct sockaddr_in l_clientaddr; + socklen_t l_clientlen = sizeof(l_clientaddr); + size_t l_recieved = recvfrom(l_sock, (char *)l_buf, l_buf_size, 0, (struct sockaddr *)&l_clientaddr, &l_clientlen); + size_t l_addr_point = DNS_HEADER_SIZE + strlen(a_name) + 2 + 2 * sizeof(uint16_t) + DNS_ANSWER_SIZE - sizeof(uint32_t); + if (l_recieved < l_addr_point + sizeof(uint32_t)) { + log_it(L_WARNING, "DNS answer incomplete"); + closesocket(l_sock); + return -5; + } + l_cur = l_buf + 3 * sizeof(uint16_t); + int l_answers_count = ntohs(*(uint16_t *)l_cur); + if (l_answers_count != 1) { + log_it(L_WARNING, "Incorrect DNS answer format"); + closesocket(l_sock); + return -6; + } + l_cur = l_buf + l_addr_point; + if (a_result) { + a_result->hdr.ext_addr_v4.s_addr = ntohl(*(uint32_t *)l_cur); + } + l_cur = l_buf + 5 * sizeof(uint16_t); + int l_additions_count = ntohs(*(uint16_t *)l_cur); + if (l_additions_count == 1) { + l_cur = l_buf + l_addr_point + DNS_ANSWER_SIZE; + if (a_result) { + a_result->hdr.ext_port = ntohs(*(uint16_t *)l_cur); + } + l_cur += sizeof(uint16_t); + if (a_result) { + a_result->hdr.address.uint64 = be64toh(*(uint64_t *)l_cur); + } + } + closesocket(l_sock); + return 0; +} diff --git a/modules/net/include/dap_chain_net.h b/modules/net/include/dap_chain_net.h index 00fc47ae9ee4bc55b9f87a65ca2250e10e3a0bb4..15470f9fdc71a2f62996c63565269e0540bf5416 100644 --- a/modules/net/include/dap_chain_net.h +++ b/modules/net/include/dap_chain_net.h @@ -61,8 +61,7 @@ typedef enum dap_chain_net_state{ NET_STATE_ADDR_REQUEST, // Waiting for address assign NET_STATE_SYNC_GDB, NET_STATE_SYNC_CHAINS, - NET_STATE_ONLINE, - NET_STATE_SYNC_REQUESTED + NET_STATE_ONLINE } dap_chain_net_state_t; typedef struct dap_chain_net{ @@ -87,7 +86,6 @@ void dap_chain_net_deinit(void); void dap_chain_net_load_all(); -void s_net_set_go_sync(dap_chain_net_t * a_net); int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_new_state); inline static int dap_chain_net_start(dap_chain_net_t * a_net){ return dap_chain_net_state_go_to(a_net,NET_STATE_ONLINE); } @@ -155,3 +153,6 @@ dap_list_t * dap_chain_net_get_add_gdb_group(dap_chain_net_t * a_net, dap_chain_ int dap_chain_net_verify_datum_for_add(dap_chain_net_t *a_net, dap_chain_datum_t * a_datum ); void dap_chain_net_dump_datum(dap_string_t * a_str_out, dap_chain_datum_t * a_datum, const char *a_hash_out_type); +void dap_chain_net_set_srv_callback_notify(dap_global_db_obj_callback_notify_t a_callback); +void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const char *a_prefix, const char *a_group, + const char *a_key, const void *a_value, const size_t a_value_len); diff --git a/modules/net/include/dap_dns_server.h b/modules/net/include/dap_dns_server.h index ee107a16bf7d23bb16e620bafe55559bcf415a81..e36022f8bf2bf4d08d774075ba41a2f1b24540e4 100644 --- a/modules/net/include/dap_dns_server.h +++ b/modules/net/include/dap_dns_server.h @@ -29,6 +29,7 @@ #endif #include "dap_server.h" #include "uthash.h" +#include "dap_chain_node.h" #define DNS_LISTEN_PORT 53 // UDP #define DNS_TIME_TO_LIVE 600 // Seconds @@ -63,7 +64,7 @@ typedef enum _dap_dns_record_type_t { DNS_RECORD_TYPE_MG, // Mail group member (experimental) DNS_RECORD_TYPE_MR, // Mail rename domain name (experimental) DNS_RECORD_TYPE_NULL, // NULL resource record (experimental) - DNS_RECORD_TYPE_WKS, // Well known server description + DNS_RECORD_TYPE_WKS, // Well known services description DNS_RECORD_TYPE_PTR, // Domain name pointer DNS_RECORD_TYPE_HINFO, // Host information DNS_RECORD_TYPE_MINFO, // Mail box or list information @@ -95,7 +96,7 @@ typedef struct _dap_dns_message_flags_bits_t { int qr : 1; // 0 - query, 1 - response } dap_dns_message_flags_bits_t; -typedef uint32_t (*dap_dns_zone_callback_t) (char *hostname); // Callback for DNS zone operations +typedef dap_chain_node_info_t *(*dap_dns_zone_callback_t) (char *hostname); // Callback for DNS zone operations typedef union _dap_dns_message_flags_t { dap_dns_message_flags_bits_t flags; @@ -123,3 +124,4 @@ void dap_dns_server_start(); void dap_dns_server_stop(); int dap_dns_zone_register(char *zone, dap_dns_zone_callback_t callback); int dap_dns_zone_unregister(char *zone); +int dap_dns_client_get_addr(uint32_t a_addr, char *a_name, dap_chain_node_info_t *a_result); diff --git a/modules/net/srv/dap_chain_net_srv_order.c b/modules/net/srv/dap_chain_net_srv_order.c index e3df6bfad4021dbb823a9a984b305cd960c78f2c..948ec9a6d09a74172d4f099fd38fac44fb9d430a 100644 --- a/modules/net/srv/dap_chain_net_srv_order.c +++ b/modules/net/srv/dap_chain_net_srv_order.c @@ -31,6 +31,7 @@ #include "dap_enc_base58.h" #include "dap_chain_global_db.h" #include "dap_chain_net_srv_countries.h" +#include "dap_chain_net_srv_stake.h" //#include "dap_chain_net_srv_geoip.h" #define LOG_TAG "dap_chain_net_srv_order" @@ -58,13 +59,16 @@ char *s_server_continents[]={ "Antarctica" }; +static void s_srv_order_callback_notify(void *a_arg, const char a_op_code, const char *a_prefix, const char *a_group, + const char *a_key, const void *a_value, const size_t a_value_len); + /** * @brief dap_chain_net_srv_order_init * @return */ int dap_chain_net_srv_order_init(void) { - + dap_chain_net_set_srv_callback_notify(s_srv_order_callback_notify); //geoip_info_t *l_ipinfo = chain_net_geoip_get_ip_info("8.8.8.8"); return 0; } @@ -506,3 +510,42 @@ void dap_chain_net_srv_order_dump_to_string(dap_chain_net_srv_order_t *a_order,d DAP_DELETE(l_ext_out); } } + +static void s_srv_order_callback_notify(void *a_arg, const char a_op_code, const char *a_prefix, const char *a_group, + const char *a_key, const void *a_value, const size_t a_value_len) +{ + (void) a_op_code; + UNUSED(a_prefix); + UNUSED(a_value_len); + if (!a_arg || !a_value || !dap_config_get_item_bool_default(g_config, "srv", "order_signed_only", false)) { + return; + } + dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; + char *l_gdb_group_str = dap_chain_net_srv_order_get_gdb_group(l_net); + if (!strcmp(a_group, l_gdb_group_str)) { + dap_chain_net_srv_order_t *l_order = (dap_chain_net_srv_order_t *)a_value; + if (l_order->version == 1) { + dap_chain_global_db_gr_del((char *)a_key, a_group); + } else { + dap_sign_t *l_sign = (dap_sign_t *)&l_order->ext[l_order->ext_size]; + if (!dap_sign_verify(l_sign, l_order, sizeof(dap_chain_net_srv_order_t) + l_order->ext_size)) { + dap_chain_global_db_gr_del((char *)a_key, a_group); + DAP_DELETE(l_gdb_group_str); + return; + } + dap_chain_hash_fast_t l_pkey_hash; + if (!dap_sign_get_pkey_hash(l_sign, &l_pkey_hash)) { + dap_chain_global_db_gr_del((char *)a_key, a_group); + DAP_DELETE(l_gdb_group_str); + return; + } + dap_chain_addr_t l_addr = {}; + dap_chain_addr_fill(&l_addr, l_sign->header.type, &l_pkey_hash, l_net->pub.id); + uint64_t l_solvency = dap_chain_ledger_calc_balance(l_net->pub.ledger, &l_addr, l_order->price_ticker); + if (l_solvency < l_order->price && !dap_chain_net_srv_stake_key_delegated(&l_addr)) { + dap_chain_global_db_gr_del((char *)a_key, a_group); + } + } + DAP_DELETE(l_gdb_group_str); + } +} diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 078b09e1436cf2dee2f0fcfccedbc069ebf61ab5..a2250e0abd53ce2b8ac181c44d596ace2e723d9c 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -399,6 +399,7 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain dap_chain_hash_fast_t * l_hashes = DAP_NEW_Z_SIZE(dap_chain_hash_fast_t, sizeof(dap_chain_hash_fast_t) * l_hashes_size); size_t l_hashes_linked = 0; + dap_chain_cell_t *l_cell = NULL; for (size_t d = 0; d <a_datums_count ; d++){ dap_chain_datum_t * l_datum = a_datums[d]; @@ -471,9 +472,24 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain l_event = l_dag->callback_cs_event_create(l_dag,l_datum,l_hashes,l_hashes_linked); if ( l_event){ // Event is created - // add directly to file - if(l_dag->is_add_directy) { - if(!s_chain_callback_atom_add(a_chain, l_event)) { + if (l_dag->is_add_directy) { + if (s_chain_callback_atom_add(a_chain, l_event) == ATOM_ACCEPT) { + // add events to file + if (!l_cell) { + l_cell = dap_chain_cell_create(); + if (!l_cell) { + log_it(L_ERROR, "Insufficient memory"); + continue; + } + dap_chain_net_t *l_net = dap_chain_net_by_id(a_chain->net_id); + l_cell->chain = a_chain; + l_cell->id.uint64 = l_net ? l_net->pub.cell_id.uint64 : 0; + l_cell->file_storage_path = dap_strdup_printf("%0llx.dchaincell", l_cell->id.uint64); + } + if (dap_chain_cell_file_append(l_cell, l_event, a_chain->callback_atom_get_size(l_event)) < 0) { + log_it(L_ERROR, "Can't add new event to the file '%s'", l_cell->file_storage_path); + continue; + } l_datum_processed++; } else { @@ -522,22 +538,7 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain } } } - // add events to file - if(l_dag->is_add_directy && l_datum_processed>0) { - dap_chain_cell_t *l_cell = dap_chain_cell_create(); - int l_res = -1; - if(l_cell) { - dap_chain_net_t *l_net = dap_chain_net_by_id(a_chain->net_id); - l_cell->chain = a_chain; - l_cell->id.uint64 = l_net ? l_net->pub.cell_id.uint64 : 0; - l_cell->file_storage_path = dap_strdup_printf("%0llx.dchaincell", l_cell->id.uint64); - l_res = dap_chain_cell_file_update(l_cell); - } - if(!l_cell || l_res < 0) { - log_it(L_ERROR, "Can't add new %d events to the file '%s'", l_datum_processed, - l_cell ? l_cell->file_storage_path : ""); - l_datum_processed = 0; - } + if (l_cell) { dap_chain_cell_delete(l_cell); } dap_chain_global_db_objs_delete(l_events_round_new, l_events_round_new_size);