diff --git a/CMakeLists.txt b/CMakeLists.txt index ca18e96640e7adb54f87c491289fc6c36c013c1b..a0f0ec4a1625e3ba38c9918329fc0ae9b0d3c4d3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 3.0) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.9-57") +set(CELLFRAME_SDK_NATIVE_VERSION "2.9-58") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") message("Cellframe modules: ${CELLFRAME_MODULES}") diff --git a/dap-sdk/core/include/dap_common.h b/dap-sdk/core/include/dap_common.h index 4aa1d26856ea27760004bae02345db3293371be6..71d13509fbc6ff194edf3712799a0681a6d05252 100755 --- a/dap-sdk/core/include/dap_common.h +++ b/dap-sdk/core/include/dap_common.h @@ -440,12 +440,18 @@ void dap_log_set_max_item(unsigned int a_max); char *dap_log_get_item(time_t a_start_time, int a_limit); #if defined __GNUC__ || defined __clang__ +#ifdef __MINGW_PRINTF_FORMAT +#define DAP_PRINTF_ATTR(format_index, args_index) \ + __attribute__ ((format (gnu_printf, format_index, args_index))) +#else #define DAP_PRINTF_ATTR(format_index, args_index) \ __attribute__ ((format (printf, format_index, args_index))) +#endif #else /* __GNUC__ */ #define DAP_PRINTF_ATTR(format_index, args_index) #endif /* __GNUC__ */ + DAP_PRINTF_ATTR(3, 4) void _log_it( const char * log_tag, enum dap_log_level, const char * format, ... ); #define log_it( _log_level, ...) _log_it( LOG_TAG, _log_level, ##__VA_ARGS__) diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 0f3381ae552b456188b28954a164e8c642421fe7..0bdbe70925a34375c51875a6ca33271bf89050c8 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -1791,11 +1791,10 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool //log_it( L_DEBUG, "es is going to be removed from the lists and free the memory (0x%016X)", a_es ); dap_events_socket_remove_from_worker_unsafe(a_es, a_es->worker); - //log_it( L_DEBUG, "dap_events_socket wrapped around %d socket is removed", a_es->socket ); - if( a_es->callbacks.delete_callback ) a_es->callbacks.delete_callback( a_es, NULL ); // Init internal structure + //log_it( L_DEBUG, "dap_events_socket wrapped around %d socket is removed", a_es->socket ); dap_events_socket_delete_unsafe(a_es, preserve_inheritor); } diff --git a/dap-sdk/net/core/dap_server.c b/dap-sdk/net/core/dap_server.c index bdbccb4a76f3768e801786fdf9fde5d8bce1986e..792af686326c764b136b3fd0e88a86ab907bbb5c 100644 --- a/dap-sdk/net/core/dap_server.c +++ b/dap-sdk/net/core/dap_server.c @@ -292,7 +292,6 @@ static int s_server_run(dap_server_t * a_server, dap_events_socket_callbacks_t * l_callbacks.error_callback = a_callbacks->error_callback; } -// if we have poll exclusive #ifdef DAP_EVENTS_CAPS_EPOLL for(size_t l_worker_id = 0; l_worker_id < dap_events_worker_get_count() ; l_worker_id++){ dap_worker_t *l_w = dap_events_worker_get(l_worker_id); @@ -305,6 +304,7 @@ static int s_server_run(dap_server_t * a_server, dap_events_socket_callbacks_t * // Prepare for multi thread listening l_es->ev_base_flags = EPOLLIN; #ifdef EPOLLEXCLUSIVE + // if we have poll exclusive l_es->ev_base_flags |= EPOLLET | EPOLLEXCLUSIVE; #endif l_es->_inheritor = a_server; diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 8c66698e7fd6dbd325db284f3cc09e68da10da99..d864c0879295e43d214d35166b6390b814563c87 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -280,8 +280,8 @@ void *dap_worker_thread(void *arg) #else #error "Unimplemented fetch esocket after poll" #endif - if(!l_cur) { - log_it(L_ERROR, "dap_events_socket NULL"); + if(!l_cur || (l_cur->worker && l_cur->worker != l_worker)) { + log_it(L_WARNING, "dap_events_socket was destroyed earlier"); continue; } if(s_debug_reactor) { @@ -356,7 +356,8 @@ void *dap_worker_thread(void *arg) dap_events_socket_set_readable_unsafe(l_cur, false); dap_events_socket_set_writable_unsafe(l_cur, false); l_cur->buf_out_size = 0; - l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + if (!l_cur->no_close) + l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; if(l_cur->callbacks.error_callback) l_cur->callbacks.error_callback(l_cur, l_sock_err); // Call callback to process error event } @@ -527,7 +528,8 @@ void *dap_worker_thread(void *arg) log_it(L_ERROR, "Some error occured in recv() function: %s", strerror(errno)); #endif dap_events_socket_set_readable_unsafe(l_cur, false); - l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + if (!l_cur->no_close) + l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; l_cur->buf_out_size = 0; } #ifndef DAP_NET_CLIENT_NO_SSL @@ -536,7 +538,8 @@ void *dap_worker_thread(void *arg) wolfSSL_ERR_error_string(l_errno, l_err_str); log_it(L_ERROR, "Some error occured in SSL read(): %s (code %d)", l_err_str, l_errno); dap_events_socket_set_readable_unsafe(l_cur, false); - l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + if (!l_cur->no_close) + l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; l_cur->buf_out_size = 0; } #endif @@ -654,7 +657,7 @@ void *dap_worker_thread(void *arg) l_bytes_sent = send(l_cur->socket, (const char *)l_cur->buf_out, l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL); #ifdef DAP_OS_WINDOWS - dap_events_socket_set_writable_unsafe(l_cur,false); + //dap_events_socket_set_writable_unsafe(l_cur,false); // enabling this will break windows server replies l_errno = WSAGetLastError(); #else l_errno = errno; @@ -752,7 +755,7 @@ void *dap_worker_thread(void *arg) break; case DESCRIPTOR_TYPE_PIPE: case DESCRIPTOR_TYPE_FILE: - l_bytes_sent = write(l_cur->socket, (char *) (l_cur->buf_out), l_cur->buf_out_size ); + l_bytes_sent = write(l_cur->fd, (char *) (l_cur->buf_out), l_cur->buf_out_size ); l_errno = errno; break; default: @@ -769,7 +772,8 @@ void *dap_worker_thread(void *arg) { // If we have non-blocking socket log_it(L_ERROR, "Some error occured in send(): %s (code %d)", strerror(l_errno), l_errno); #endif - l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + if (!l_cur->no_close) + l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; l_cur->buf_out_size = 0; } #ifndef DAP_NET_CLIENT_NO_SSL @@ -777,7 +781,8 @@ void *dap_worker_thread(void *arg) char l_err_str[80]; wolfSSL_ERR_error_string(l_errno, l_err_str); log_it(L_ERROR, "Some error occured in SSL write(): %s (code %d)", l_err_str, l_errno); - l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + if (!l_cur->no_close) + l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; l_cur->buf_out_size = 0; } #endif @@ -801,7 +806,7 @@ void *dap_worker_thread(void *arg) dap_events_socket_set_writable_unsafe(l_cur,true); } - if ((l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !l_cur->no_close) + if (l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) { if (l_cur->buf_out_size == 0) { if(s_debug_reactor) diff --git a/dap-sdk/net/server/http_server/http_client/dap_http_client.c b/dap-sdk/net/server/http_server/http_client/dap_http_client.c index 2a5597c09d8f2628d2c7ffc6b231126f8e64a822..b9c0f0ff39bc9f12a4b09d8c5bda6d3cc0009bfe 100644 --- a/dap-sdk/net/server/http_server/http_client/dap_http_client.c +++ b/dap-sdk/net/server/http_server/http_client/dap_http_client.c @@ -645,7 +645,7 @@ void dap_http_client_out_header_generate(dap_http_client_t *a_http_client) } if ( a_http_client->out_content_type[0] ) { dap_http_header_add(&a_http_client->out_headers,"Content-Type",a_http_client->out_content_type); - log_it(L_DEBUG,"output: Content-Type = '%s'",a_http_client->out_content_type); + log_it(L_DEBUG,"Output: Content-Type = '%s'",a_http_client->out_content_type); } if ( a_http_client->out_content_length ) { dap_snprintf(buf,sizeof(buf),"%zu",a_http_client->out_content_length); diff --git a/dap-sdk/net/stream/stream/dap_stream_pkt.c b/dap-sdk/net/stream/stream/dap_stream_pkt.c index d0b5efa8aafe23f248c1bdf10bdee81ef9cedffb..38d14061d3e072b87f5908a7f0596b20f4b059f1 100644 --- a/dap-sdk/net/stream/stream/dap_stream_pkt.c +++ b/dap-sdk/net/stream/stream/dap_stream_pkt.c @@ -76,7 +76,7 @@ dap_stream_pkt_t * dap_stream_pkt_detect(void * a_data, size_t data_size) if(memcmp(sig_start,c_dap_stream_sig,sizeof(c_dap_stream_sig))==0){ ret = (dap_stream_pkt_t *)sig_start; if (length_left < sizeof(dap_stream_ch_pkt_hdr_t)) { - log_it(L_ERROR, "Too small packet size %zu", length_left); + //log_it(L_ERROR, "Too small packet size %zu", length_left); // it's not an error, just random case ret = NULL; break; } diff --git a/modules/global-db/dap_chain_global_db.c b/modules/global-db/dap_chain_global_db.c index 6490b4b796ce545cad1e02b15662e106bd5bdef6..ffd4da5e97f676c9e707edb4dfc3a13691e144ba 100644 --- a/modules/global-db/dap_chain_global_db.c +++ b/modules/global-db/dap_chain_global_db.c @@ -332,13 +332,9 @@ uint8_t * dap_chain_global_db_get(const char *a_key, size_t *a_data_out) */ static bool global_db_gr_del_add(char *a_key,const char *a_group, time_t a_timestamp) { - dap_store_obj_t store_data; - memset(&store_data, 0, sizeof(dap_store_obj_t)); + dap_store_obj_t store_data = {}; store_data.type = 'a'; store_data.key = a_key; - // no data - store_data.value = NULL; - store_data.value_len = 0; // group = parent group + '.del' store_data.group = dap_strdup_printf("%s.del", a_group); store_data.timestamp = a_timestamp; @@ -354,6 +350,7 @@ static bool global_db_gr_del_add(char *a_key,const char *a_group, time_t a_times } + /** * @brief Deletes info about the deleted object from the database * @param a_key an object key string, looked like "0x8FAFBD00B..." @@ -627,7 +624,7 @@ bool dap_chain_global_db_gr_del(char *a_key,const char *a_group) unlock(); if(l_res >= 0) { // add to Del group - global_db_gr_del_add(dap_strdup(a_key), store_data.group, store_data.timestamp); + global_db_gr_del_add(dap_strdup(a_key), store_data.group, time(NULL)); } // do not add to history if l_res=1 (already deleted) if (!l_res) { diff --git a/modules/global-db/dap_chain_global_db_driver.c b/modules/global-db/dap_chain_global_db_driver.c index 80c1278beec52163c67a24ca39082f99f44ee7f2..d1cc0e45ebf466e78052f4ba4cecdfd1e81703f7 100644 --- a/modules/global-db/dap_chain_global_db_driver.c +++ b/modules/global-db/dap_chain_global_db_driver.c @@ -450,17 +450,19 @@ int dap_chain_global_db_driver_appy(pdap_store_obj_t a_store_obj, size_t a_store for(size_t i = 0; i < a_store_count; i++) { dap_store_obj_t *l_store_obj_cur = a_store_obj + i; assert(l_store_obj_cur); + char *l_cur_key = dap_strdup(l_store_obj_cur->key); int l_ret_tmp = s_drv_callback.apply_store_obj(l_store_obj_cur); if(l_ret_tmp == 1) { - log_it(L_INFO, "item is missing (may be already deleted) %s/%s\n", l_store_obj_cur->group, l_store_obj_cur->key); + log_it(L_INFO, "Item is missing (may be already deleted) %s/%s\n", l_store_obj_cur->group, l_cur_key); l_ret = 1; - break; } if(l_ret_tmp < 0) { - log_it(L_ERROR, "Can't write item %s/%s (code %d)\n", l_store_obj_cur->group, l_store_obj_cur->key, l_ret_tmp); + log_it(L_ERROR, "Can't write item %s/%s (code %d)\n", l_store_obj_cur->group, l_cur_key, l_ret_tmp); l_ret -= 1; - break; } + DAP_DELETE(l_cur_key); + if (l_ret) + break; } if(a_store_count > 1 && s_drv_callback.transaction_end) @@ -518,7 +520,7 @@ size_t dap_chain_global_db_driver_count(const char *a_group, uint64_t id) * @param a_group_mask the group mask string * @return If successful, returns the list of group names, otherwise NULL. */ -dap_list_t* dap_chain_global_db_driver_get_groups_by_mask(const char *a_group_mask) +dap_list_t *dap_chain_global_db_driver_get_groups_by_mask(const char *a_group_mask) { dap_list_t *l_list = NULL; if(s_drv_callback.get_groups_by_mask) diff --git a/modules/global-db/dap_chain_global_db_driver_cdb.c b/modules/global-db/dap_chain_global_db_driver_cdb.c index 5fc9ac4ebaada90577c6c7b68cccf6b7562e1207..61b55dfa5ad4fc705c19bcece744e6eef0417123 100644 --- a/modules/global-db/dap_chain_global_db_driver_cdb.c +++ b/modules/global-db/dap_chain_global_db_driver_cdb.c @@ -587,6 +587,13 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) { if(!a_store_obj->key) { return -2; } + /*if (dap_fnmatch("*.del", a_store_obj->group, 0)) { + char *l_del_group = dap_strdup_printf("%s.del", a_store_obj->group); + pcdb_instance l_cdb_d = dap_cdb_get_db_by_group(l_del_group); + if (l_cdb_d) { + l_cdb_i->id = max(l_cdb_d->id, l_cdb_i->id); + } + }*/ cdb_record l_rec; l_rec.key = a_store_obj->key; //dap_strdup(a_store_obj->key); int offset = 0; diff --git a/modules/global-db/dap_chain_global_db_driver_sqlite.c b/modules/global-db/dap_chain_global_db_driver_sqlite.c index 57ab6c1758cf566a439b746d93a3fb0ea00cd46a..a853e58f93e1ab269ccc2486062191fa747ed1d4 100644 --- a/modules/global-db/dap_chain_global_db_driver_sqlite.c +++ b/modules/global-db/dap_chain_global_db_driver_sqlite.c @@ -191,14 +191,14 @@ sqlite3* dap_db_driver_sqlite_open(const char *a_filename_utf8, int a_flags, cha { sqlite3 *l_db = NULL; - int l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_NOMUTEX, NULL); + int l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_FULLMUTEX, NULL); // if unable to open the database file if(l_rc == SQLITE_CANTOPEN) { log_it(L_WARNING,"No database on path %s, creating one from scratch", a_filename_utf8); if(l_db) sqlite3_close(l_db); // try to create database - l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_NOMUTEX| SQLITE_OPEN_CREATE, NULL); + l_rc = sqlite3_open_v2(a_filename_utf8, &l_db, a_flags | SQLITE_OPEN_FULLMUTEX| SQLITE_OPEN_CREATE, NULL); } if(l_rc != SQLITE_OK) { @@ -608,16 +608,11 @@ int dap_db_driver_sqlite_apply_store_obj(dap_store_obj_t *a_store_obj) if(a_store_obj->type == 'a') { if(!a_store_obj->key) return -1; - //dap_chain_hash_fast_t l_hash; - //dap_hash_fast(a_store_obj->value, a_store_obj->value_len, &l_hash); - - char *l_blob_hash = "";//dap_db_driver_get_string_from_blob((uint8_t*) &l_hash, sizeof(dap_chain_hash_fast_t)); char *l_blob_value = dap_db_driver_get_string_from_blob(a_store_obj->value, (int)a_store_obj->value_len); DAP_DEL_Z(a_store_obj->value); //add one record - l_query = sqlite3_mprintf("insert into '%s' values(NULL, '%s', x'%s', '%lld', x'%s')", - l_table_name, a_store_obj->key, l_blob_hash, a_store_obj->timestamp, l_blob_value); - //dap_db_driver_sqlite_free(l_blob_hash); + l_query = sqlite3_mprintf("insert into '%s' values(NULL, '%s', x'', '%lld', x'%s')", + l_table_name, a_store_obj->key, a_store_obj->timestamp, l_blob_value); dap_db_driver_sqlite_free(l_blob_value); } else if (a_store_obj->type == 'd') { diff --git a/modules/global-db/dap_chain_global_db_hist.c b/modules/global-db/dap_chain_global_db_hist.c index eeb9b2b7a694014cf250746aab013acbca0d4a19..83f10d0d8495e9d8c149ac88815e5c5d2e01864c 100644 --- a/modules/global-db/dap_chain_global_db_hist.c +++ b/modules/global-db/dap_chain_global_db_hist.c @@ -12,7 +12,7 @@ #include "dap_chain_global_db_hist.h" #include "uthash.h" -//#define GDB_SYNC_ALWAYS_FROM_ZERO +#define GDB_SYNC_ALWAYS_FROM_ZERO // for dap_db_history() typedef struct dap_tx_data{ diff --git a/modules/modules_dynamic/cdb/dap_modules_dynamic_cdb.c b/modules/modules_dynamic/cdb/dap_modules_dynamic_cdb.c index 4b234c34f7e77afbf603d3fa7902f401acaae6e0..58b109c07afba78cee108de120f4e342b93a7e45 100644 --- a/modules/modules_dynamic/cdb/dap_modules_dynamic_cdb.c +++ b/modules/modules_dynamic/cdb/dap_modules_dynamic_cdb.c @@ -33,6 +33,7 @@ static const char * s_default_path_modules = "var/modules"; static void *s_cdb_handle = NULL; +static bool s_cdb_was_init = false; void dap_modules_dynamic_close_cdb() { @@ -40,10 +41,13 @@ void dap_modules_dynamic_close_cdb() dlclose(s_cdb_handle); s_cdb_handle = NULL; } + s_cdb_was_init = false; } void *dap_modules_dynamic_get_cdb_func(const char *a_func_name) { + if (!s_cdb_was_init) + return NULL; char l_lib_path[MAX_PATH] = {'\0'}; void *l_ref_func = NULL; // find func from dynamic library @@ -80,5 +84,6 @@ int dap_modules_dynamic_load_cdb(dap_http_t * a_server) log_it(L_ERROR,"dap_modules_dynamic: dap_chain_net_srv_vpn_cdb_init returns %d", l_init_res); return -3; } + s_cdb_was_init = true; return 0; } diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 86efb4d7cf60e990f50c8bd898d59b2b15c5e3fa..964614e0c2f1d309396534aa390e5764d7588677 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -268,17 +268,17 @@ int dap_chain_net_init() dap_chain_node_cli_cmd_item_create ("net", s_cli_net, "Network commands", "net list [chains -n <chain net name>]" "\tList all networks or list all chains in selected network" - "net -net <chain net name> [-mode update|all] go < online | offline >\n" + "net -net <chain net name> [-mode {update | all}] go {online | offline | sync}\n" "\tFind and establish links and stay online. \n" "\tMode \"update\" is by default when only new chains and gdb are updated. Mode \"all\" updates everything from zero\n" "net -net <chain net name> get status\n" "\tLook at current status\n" "net -net <chain net name> stats tx [-from <From time>] [-to <To time>] [-prev_sec <Seconds>] \n" "\tTransactions statistics. Time format is <Year>-<Month>-<Day>_<Hours>:<Minutes>:<Seconds> or just <Seconds> \n" - "net -net <chain net name> [-mode update|all] sync < all | gdb | chains >\n" + "net -net <chain net name> [-mode {update | all}] sync {all | gdb | chains}\n" "\tSyncronyze gdb, chains or everything\n" "\tMode \"update\" is by default when only new chains and gdb are updated. Mode \"all\" updates everything from zero\n" - "net -net <chain net name> link < list | add | del | info | establish >\n" + "net -net <chain net name> link {list | add | del | info | establish}\n" "\tList, add, del, dump or establish links\n" "net -net <chain net name> ca add {-cert <cert name> | -hash <cert hash>}\n" "\tAdd certificate to list of authority cetificates in GDB group\n" @@ -353,7 +353,7 @@ int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_n pthread_mutex_lock( &PVT(a_net)->state_mutex_cond); // Preventing call of state_go_to before wait cond will be armed // set flag for sync PVT(a_net)->flags |= F_DAP_CHAIN_NET_GO_SYNC; - //PVT(a_net)->flags |= F_DAP_CHAIN_NET_SYNC_FROM_ZERO; + //PVT(a_net)->flags |= F_DAP_CHAIN_NET_SYNC_FROM_ZERO; // TODO set this flag according to -mode argument from command line #ifndef _WIN32 pthread_cond_signal( &PVT(a_net)->state_proc_cond ); #else @@ -364,6 +364,11 @@ int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_n return 0; } +dap_chain_net_state_t dap_chain_net_get_target_state(dap_chain_net_t *a_net) +{ + return PVT(a_net)->state_target; +} + /** * @brief set s_srv_callback_notify * @@ -824,7 +829,6 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) assert(l_net_pvt); if (l_net_pvt->state_target == NET_STATE_OFFLINE) { l_net_pvt->state = NET_STATE_OFFLINE; - return true; } pthread_rwlock_wrlock(&l_net_pvt->rwlock); @@ -1236,7 +1240,7 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) l_net = dap_chain_net_by_name(l_net_str); if (l_net){ - dap_string_append(l_string_ret,"Chains:\n "); + dap_string_append(l_string_ret,"Chains:\n"); dap_chain_t * l_chain = l_net->pub.chains; while (l_chain) { dap_string_append_printf(l_string_ret, "\t%s:\n", l_chain->name ); @@ -1245,7 +1249,7 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) }else{ dap_chain_net_item_t * l_net_item, *l_net_item_tmp; int l_net_i = 0; - dap_string_append(l_string_ret,"Networks:\n "); + dap_string_append(l_string_ret,"Networks:\n"); HASH_ITER(hh, s_net_items, l_net_item, l_net_item_tmp){ l_net = l_net_item->chain_net; dap_string_append_printf(l_string_ret, "\t%s:\n", l_net_item->name); @@ -1379,7 +1383,7 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) else if(strcmp(l_go_str, "sync") == 0) { dap_chain_node_cli_set_reply_text(a_str_reply, "Network \"%s\" resynchronizing", l_net->pub.name); - dap_chain_net_state_go_to(l_net, NET_STATE_SYNC_GDB); + dap_chain_net_state_go_to(l_net, NET_STATE_SYNC_CHAINS); } } else if ( l_get_str){ @@ -1458,6 +1462,7 @@ static int s_cli_net(int argc, char **argv, char **a_str_reply) dap_chain_node_cli_set_reply_text(a_str_reply, "SYNC_CHAINS state requested to state machine. Current state: %s\n", c_net_states[ PVT(l_net)->state] ); + // TODO set PVT flag to exclude GDB sync dap_chain_net_sync_chains(l_net); } else { diff --git a/modules/net/dap_chain_node.c b/modules/net/dap_chain_node.c index c4f73594b6b4d1e1a59d7f398fb2c398b29eab27..e6ec10bb3fc89e52d43cdd62c25f0680633f2c80 100644 --- a/modules/net/dap_chain_node.c +++ b/modules/net/dap_chain_node.c @@ -225,7 +225,7 @@ dap_chain_node_info_t* dap_chain_node_info_read( dap_chain_net_t * a_net,dap_cha return node_info; }*/ -bool dap_chain_node_mempool_process(dap_chain_t *a_chain, dap_chain_node_role_t a_role, dap_chain_datum_t *a_datum) +int dap_chain_node_mempool_process(dap_chain_t *a_chain, dap_chain_node_role_t a_role, dap_chain_datum_t *a_datum) { bool l_need_process = false; for (uint16_t j = 0; j < a_chain->autoproc_datum_types_count; j++) { @@ -235,35 +235,31 @@ bool dap_chain_node_mempool_process(dap_chain_t *a_chain, dap_chain_node_role_t } } if (!l_need_process) - return false; + return -1; if (a_datum->header.type_id == DAP_CHAIN_DATUM_TX) { dap_chain_datum_tx_t *l_tx = (dap_chain_datum_tx_t *)a_datum->data; dap_chain_tx_in_t *l_tx_in = (dap_chain_tx_in_t *)dap_chain_datum_tx_item_get(l_tx, NULL, TX_ITEM_TYPE_IN, NULL); // Is not it a base transaction? if (l_tx_in && !dap_hash_fast_is_blank(&l_tx_in->header.tx_prev_hash)) { if (a_role.enums == NODE_ROLE_ROOT) { - return false; + return -1; } } } - if (a_chain->callback_add_datums(a_chain, &a_datum, 1) != 1) { - return false; - } - return true; + return (int)a_chain->callback_add_datums(a_chain, &a_datum, 1); } -static bool s_mempool_auto = false; /** - * @brief + * @brief * get automatic mempool processing, when network config contains mempool_auto_types for specific datums - * @return true - * @return false + * @return true + * @return false */ bool dap_chain_node_mempool_autoproc_init() { uint16_t l_net_count; - bool l_mempool_auto_default = false; + bool l_mempool_auto_default = false, l_ret = false; dap_chain_net_t **l_net_list = dap_chain_net_list(&l_net_count); for (uint16_t i = 0; i < l_net_count; i++) { dap_chain_node_role_t l_role = dap_chain_net_get_role(l_net_list[i]); @@ -276,10 +272,12 @@ bool dap_chain_node_mempool_autoproc_init() l_mempool_auto_default = true; break; default: + l_mempool_auto_default = false; break; } - s_mempool_auto = dap_config_get_item_bool_default(g_config, "mempool", "auto_proc", l_mempool_auto_default); - if (s_mempool_auto) { + l_net_list[i]->pub.mempool_autoproc = dap_config_get_item_bool_default(g_config, "mempool", "auto_proc", l_mempool_auto_default); + if (l_net_list[i]->pub.mempool_autoproc) { + l_ret = true; dap_chain_t *l_chain; DL_FOREACH(l_net_list[i]->pub.chains, l_chain) { if (!l_chain) { @@ -291,12 +289,13 @@ bool dap_chain_node_mempool_autoproc_init() dap_global_db_obj_t *l_objs = dap_chain_global_db_gr_load(l_gdb_group_mempool, &l_objs_size); if (l_objs_size) { for (size_t i = 0; i < l_objs_size; i++) { - // Delete processed objects - dap_chain_global_db_gr_del(dap_strdup(l_objs[i].key), l_gdb_group_mempool); if (!l_objs[i].value_len) continue; dap_chain_datum_t *l_datum = (dap_chain_datum_t *)l_objs[i].value; - dap_chain_node_mempool_process(l_chain, l_role, l_datum); + if (dap_chain_node_mempool_process(l_chain, l_role, l_datum) >= 0) { + // Delete processed objects + dap_chain_global_db_gr_del(dap_strdup(l_objs[i].key), l_gdb_group_mempool); + } } dap_chain_global_db_objs_delete(l_objs, l_objs_size); } @@ -305,7 +304,7 @@ bool dap_chain_node_mempool_autoproc_init() } } DAP_DELETE(l_net_list); - return s_mempool_auto; + return l_ret; } /** @@ -313,17 +312,18 @@ bool dap_chain_node_mempool_autoproc_init() */ void dap_chain_node_mempool_autoproc_deinit() { - s_mempool_auto = false; } void dap_chain_node_mempool_autoproc_notify(void *a_arg, const char a_op_code, const char *a_group, const char *a_key, const void *a_value, const size_t a_value_len) { UNUSED(a_value_len); - if (!a_arg || !a_value || !s_mempool_auto || a_op_code != 'a') { + if (!a_arg || !a_value || a_op_code != 'a') { return; } dap_chain_net_t *l_net = (dap_chain_net_t *)a_arg; + if (!l_net->pub.mempool_autoproc) + return; dap_chain_t *l_chain; DL_FOREACH(l_net->pub.chains, l_chain) { if (!l_chain) { @@ -333,7 +333,7 @@ void dap_chain_node_mempool_autoproc_notify(void *a_arg, const char a_op_code, c if (!strcmp(a_group, l_gdb_group_str)) { dap_chain_datum_t *l_datum = (dap_chain_datum_t *)a_value; dap_chain_node_role_t l_role = dap_chain_net_get_role(l_net); - if (dap_chain_node_mempool_process(l_chain, l_role, l_datum)) { + if (dap_chain_node_mempool_process(l_chain, l_role, l_datum) >= 0) { dap_chain_global_db_gr_del(dap_strdup(a_key), l_gdb_group_str); } } diff --git a/modules/net/dap_chain_node_cli_cmd.c b/modules/net/dap_chain_node_cli_cmd.c index 6d4c6e5e2cfe877c841ab63a6f5caabc17565581..26d7be50f46ca48c1bf649d5e98c34c2a8baa237 100644 --- a/modules/net/dap_chain_node_cli_cmd.c +++ b/modules/net/dap_chain_node_cli_cmd.c @@ -4016,12 +4016,12 @@ int com_tx_history(int a_argc, char ** a_argv, char **a_str_reply) char *l_str_ret = NULL; if(l_tx_hash_str) { - l_str_ret = dap_strdup_printf("history for tx hash %s:\n%s", l_tx_hash_str, + l_str_ret = dap_strdup_printf("History for tx hash %s:\n%s", l_tx_hash_str, l_str_out ? l_str_out : " empty"); } else if(l_addr) { char *l_addr_str = dap_chain_addr_to_str(l_addr); - l_str_ret = dap_strdup_printf("history for addr %s:\n%s", l_addr_str, + l_str_ret = dap_strdup_printf("History for addr %s:\n%s", l_addr_str, l_str_out ? l_str_out : " empty"); DAP_DELETE(l_addr_str); } @@ -4171,54 +4171,35 @@ int cmd_gdb_export(int argc, char ** argv, char ** a_str_reply) return -1; } const char *l_db_path = dap_config_get_item_str(g_config, "resources", "dap_global_db_path"); - - // NB! [TEMPFIX] Temporarily backward-compatible until migration to new databases locations (after updates) - const char *l_db_driver = dap_config_get_item_str(g_config, "resources", "dap_global_db_driver"); - char l_db_concat[80]; - dap_sprintf(l_db_concat, "%s/gdb-%s", l_db_path, l_db_driver); - - struct dirent *d; - DIR *dir = opendir(l_db_concat); + DIR *dir = opendir(l_db_path); if (!dir) { - // External "if" to check out old or new path. - log_it(L_WARNING, "Probably db directory is in old path. Checking out."); - dir = opendir(l_db_path); - if (!dir) { - log_it(L_ERROR, "Can't open db directory"); - dap_chain_node_cli_set_reply_text(a_str_reply, "Can't open db directory"); - return -1; - } + log_it(L_ERROR, "Can't open db directory"); + dap_chain_node_cli_set_reply_text(a_str_reply, "Can't open db directory"); + return -1; } char l_path[strlen(l_db_path) + strlen(l_filename) + 12]; memset(l_path, '\0', sizeof(l_path)); dap_snprintf(l_path, sizeof(l_path), "%s/%s.json", l_db_path, l_filename); - /*FILE *l_json_file = fopen(l_path, "a"); - if (!l_json_file) { - log_it(L_ERROR, "Can't open file %s", l_path); - dap_chain_node_cli_set_reply_text(a_str_reply, "Can't open specified file"); - return -1; - }*/ + struct json_object *l_json = json_object_new_array(); - for (d = readdir(dir); d; d = readdir(dir)) { - if (!dap_strcmp(d->d_name, ".") || !dap_strcmp(d->d_name, "..")) { - continue; - } + dap_list_t *l_groups_list = dap_chain_global_db_driver_get_groups_by_mask("*"); + for (dap_list_t *l_list = l_groups_list; l_list; l_list = dap_list_next(l_list)) { size_t l_data_size = 0; - pdap_store_obj_t l_data = dap_chain_global_db_obj_gr_get(NULL, &l_data_size, d->d_name); - log_it(L_INFO, "Exporting group %s, number of records: %zu", d->d_name, l_data_size); + char *l_group_name = (char *)l_list->data; + pdap_store_obj_t l_data = dap_chain_global_db_obj_gr_get(NULL, &l_data_size, l_group_name); + log_it(L_INFO, "Exporting group %s, number of records: %zu", l_group_name, l_data_size); if (!l_data_size) { continue; } struct json_object *l_json_group = json_object_new_array(); struct json_object *l_json_group_inner = json_object_new_object(); - json_object_object_add(l_json_group_inner, "group", json_object_new_string(d->d_name)); + json_object_object_add(l_json_group_inner, "group", json_object_new_string(l_group_name)); for (size_t i = 0; i < l_data_size; ++i) { size_t l_out_size = DAP_ENC_BASE64_ENCODE_SIZE((int64_t)l_data[i].value_len) + 1; char *l_value_enc_str = DAP_NEW_Z_SIZE(char, l_out_size); - //size_t l_enc_size = dap_enc_base64_encode(l_data[i].value, l_data[i].value_len, l_value_enc_str, DAP_ENC_DATA_TYPE_B64); - + dap_enc_base64_encode(l_data[i].value, l_data[i].value_len, l_value_enc_str, DAP_ENC_DATA_TYPE_B64); struct json_object *jobj = json_object_new_object(); json_object_object_add(jobj, "id", json_object_new_int64((int64_t)l_data[i].id)); json_object_object_add(jobj, "key", json_object_new_string(l_data[i].key)); @@ -4233,6 +4214,7 @@ int cmd_gdb_export(int argc, char ** argv, char ** a_str_reply) json_object_array_add(l_json, l_json_group_inner); dap_store_obj_free(l_data, l_data_size); } + dap_list_free_full(l_groups_list, free); if (json_object_to_file(l_path, l_json) == -1) { #if JSON_C_MINOR_VERSION<15 log_it(L_CRITICAL, "Couldn't export JSON to file, error code %d", errno ); @@ -4307,9 +4289,9 @@ int cmd_gdb_import(int argc, char ** argv, char ** a_str_reply) l_group_store[j].timestamp = json_object_get_int64(l_ts); l_group_store[j].value_len = (uint64_t)json_object_get_int64(l_value_len); l_group_store[j].type = 'a'; - //const char *l_value_str = json_object_get_string(l_value); + const char *l_value_str = json_object_get_string(l_value); char *l_val = DAP_NEW_Z_SIZE(char, l_group_store[j].value_len); - //size_t l_dec_size = dap_enc_base64_decode(l_value_str, strlen(l_value_str), l_val, DAP_ENC_DATA_TYPE_B64); + dap_enc_base64_decode(l_value_str, strlen(l_value_str), l_val, DAP_ENC_DATA_TYPE_B64); l_group_store[j].value = (uint8_t*)l_val; } if (dap_chain_global_db_driver_appy(l_group_store, l_records_count)) { diff --git a/modules/net/dap_chain_node_cli_cmd_tx.c b/modules/net/dap_chain_node_cli_cmd_tx.c index c1e0c4884a5b670d36833206e1e0f580e551c217..b2e69b18ba1a486b287f9fa4006f80fb4882e373 100644 --- a/modules/net/dap_chain_node_cli_cmd_tx.c +++ b/modules/net/dap_chain_node_cli_cmd_tx.c @@ -46,14 +46,12 @@ // for dap_db_history_filter() typedef struct dap_tx_data { dap_chain_hash_fast_t tx_hash; - char tx_hash_str[70]; char token_ticker[DAP_CHAIN_TICKER_SIZE_MAX]; - //size_t obj_num; - size_t pos_num; dap_chain_datum_t *datum; - dap_chain_addr_t addr; - bool is_use_all_cur_out;// find cur addr in prev OUT items UT_hash_handle hh; + //useless + char tx_hash_str[70]; + dap_chain_addr_t addr; } dap_tx_data_t; typedef struct dap_chain_tx_hash_processed_ht{ @@ -611,372 +609,134 @@ char* dap_db_history_addr(dap_chain_addr_t * a_addr, dap_chain_t * a_chain, cons if (!l_atom) { return NULL; } - - while(l_atom && l_atom_size) { - size_t l_datums_count =0; + while (l_atom && l_atom_size) { + size_t l_datums_count = 0; dap_chain_datum_t **l_datums = a_chain->callback_atom_get_datums ? a_chain->callback_atom_get_datums(l_atom, l_atom_size, &l_datums_count) : NULL; - if (! l_datums){ - log_it(L_WARNING,"Not defined callback_atom_get_datums for chain \"%s\"", a_chain->name); + if (!l_datums) { + log_it(L_WARNING, "Not defined callback_atom_get_datums for chain \"%s\"", a_chain->name); break; } - for (size_t d=0; d< l_datums_count; d++){ - dap_chain_datum_t *l_datum = l_datums && l_datums_count ? l_datums[d] :NULL; - if(!l_datum || l_datum->header.type_id != DAP_CHAIN_DATUM_TX) { + for (size_t d = 0; d < l_datums_count; d++) { + dap_chain_datum_t *l_datum = l_datums && l_datums_count ? l_datums[d] : NULL; + if (!l_datum || l_datum->header.type_id != DAP_CHAIN_DATUM_TX) { // go to next transaction - l_atom = a_chain->callback_atom_iter_get_next(l_atom_iter, &l_atom_size); continue; } + dap_tx_data_t *l_tx_data = DAP_NEW_Z(dap_tx_data_t); // transaction - dap_chain_datum_tx_t *l_tx = (dap_chain_datum_tx_t*) l_datum->data; - dap_list_t *l_records_out = NULL; + dap_chain_datum_tx_t *l_tx = (dap_chain_datum_tx_t *)l_datum->data; + + // find Token items - present in base transaction + dap_list_t *l_list_tx_token = dap_chain_datum_tx_items_get(l_tx, TX_ITEM_TYPE_TOKEN, NULL); + // save token name + if (l_list_tx_token) { + strcpy(l_tx_data->token_ticker, + ((dap_chain_tx_token_t *)l_list_tx_token->data)->header.ticker); + dap_list_free(l_list_tx_token); + } + + dap_list_t *l_list_in_items = dap_chain_datum_tx_items_get(l_tx, TX_ITEM_TYPE_IN, NULL); + if (!l_list_in_items) { // a bad tx + DAP_DELETE(l_tx_data); + continue; + } + // all in items should be from the same address + dap_chain_tx_in_t *l_tx_in = (dap_chain_tx_in_t *)l_list_in_items->data; + dap_chain_hash_fast_t *l_tx_prev_hash = &l_tx_in->header.tx_prev_hash; + dap_chain_addr_t *l_src_addr = NULL; + char *l_src_addr_str = NULL; + if (!dap_hash_fast_is_blank(l_tx_prev_hash)) { + dap_tx_data_t *l_tx_data_prev = NULL; + HASH_FIND(hh, l_tx_data_hash, l_tx_prev_hash, sizeof(dap_chain_hash_fast_t), l_tx_data_prev); + if (!l_tx_data_prev) { // prev tx not found - no info for history + DAP_DELETE(l_tx_data); + continue; + } + strncpy(l_tx_data->token_ticker, l_tx_data_prev->token_ticker, DAP_CHAIN_TICKER_SIZE_MAX); + dap_chain_datum_tx_t *l_tx_prev = (dap_chain_datum_tx_t *)l_tx_data_prev->datum->data; + dap_list_t *l_list_prev_out_items = dap_chain_datum_tx_items_get(l_tx_prev, TX_ITEM_TYPE_OUT, NULL); + dap_list_t *l_list_out_prev_item = dap_list_nth(l_list_prev_out_items, l_tx_in->header.tx_out_prev_idx); + l_src_addr = &((dap_chain_tx_out_t *)l_list_out_prev_item->data)->addr; + l_src_addr_str = dap_chain_addr_to_str(l_src_addr); + dap_list_free(l_list_prev_out_items); + } + dap_list_free(l_list_in_items); + + dap_hash_fast(l_tx, dap_chain_datum_tx_get_size(l_tx), &l_tx_data->tx_hash); // transaction time char *l_time_str = NULL; { - if(l_tx->header.ts_created > 0) { - time_t rawtime = (time_t) l_tx->header.ts_created; - struct tm * timeinfo; + if (l_tx->header.ts_created > 0) { + time_t rawtime = (time_t)l_tx->header.ts_created; + struct tm *timeinfo; timeinfo = localtime(&rawtime); if(timeinfo) l_time_str = dap_strdup(asctime(timeinfo)); - } - else + } else l_time_str = dap_strdup(" "); } - - // find Token items - present in emit transaction - dap_list_t *l_list_tx_token = dap_chain_datum_tx_items_get(l_tx, TX_ITEM_TYPE_TOKEN, NULL); - - // list of dap_tx_data_t*; info about OUT item in current transaction - dap_list_t *l_list_out_info = NULL; - + char *l_tx_hash_str; + if (!dap_strcmp(a_hash_out_type, "hex")) + l_tx_hash_str = dap_hash_fast_to_str_new(&l_tx_data->tx_hash); + else + l_tx_hash_str = dap_enc_base58_encode_to_str(&l_tx_data->tx_hash_str, sizeof(dap_chain_hash_fast_t)); // find OUT items + bool l_header_printed = false; dap_list_t *l_list_out_items = dap_chain_datum_tx_items_get(l_tx, TX_ITEM_TYPE_OUT, NULL); - dap_list_t *l_list_out_items_tmp = l_list_out_items; - while(l_list_out_items_tmp) { - const dap_chain_tx_out_t *l_tx_out = (const dap_chain_tx_out_t*) l_list_out_items_tmp->data; - // save OUT item l_tx_out - { - // save tx hash - // info about OUT item in current transaction - dap_tx_data_t *l_tx_data = DAP_NEW_Z(dap_tx_data_t); - dap_chain_hash_fast_t l_tx_hash; - dap_hash_fast(l_tx, dap_chain_datum_tx_get_size(l_tx), &l_tx_hash); - memcpy(&l_tx_data->tx_hash, &l_tx_hash, sizeof(dap_chain_hash_fast_t)); - memcpy(&l_tx_data->addr, &l_tx_out->addr, sizeof(dap_chain_addr_t)); - dap_chain_hash_fast_to_str(&l_tx_data->tx_hash, l_tx_data->tx_hash_str, sizeof(l_tx_data->tx_hash_str)); - l_tx_data->datum = DAP_NEW_SIZE(dap_chain_datum_t, l_atom_size); - memcpy(l_tx_data->datum, l_datum, l_atom_size); - // save token name - if(l_tx_data && l_list_tx_token) { - dap_chain_tx_token_t *tk = l_list_tx_token->data; - memcpy(l_tx_data->token_ticker, tk->header.ticker, sizeof(l_tx_data->token_ticker)); - } - HASH_ADD(hh, l_tx_data_hash, tx_hash, sizeof(dap_chain_hash_fast_t), l_tx_data); - - // save OUT items to list - l_records_out = dap_list_append(l_records_out, (void*) l_tx_out); - // save info about OUT items to list - l_list_out_info = dap_list_append(l_list_out_info, (void*) l_tx_data); - } - l_list_out_items_tmp = dap_list_next(l_list_out_items_tmp); - } - - // find IN items - dap_list_t *l_list_in_items = dap_chain_datum_tx_items_get(l_tx, TX_ITEM_TYPE_IN, NULL); - dap_list_t *l_list_in_items_tmp = l_list_in_items; - // find cur addr in prev OUT items - //bool l_is_use_all_cur_out = false; - { - while(l_list_in_items_tmp) { - const dap_chain_tx_in_t *l_tx_in = (const dap_chain_tx_in_t*) l_list_in_items_tmp->data; - dap_chain_hash_fast_t tx_prev_hash = l_tx_in->header.tx_prev_hash; - - //find prev OUT item - dap_tx_data_t *l_tx_data_prev = NULL; - HASH_FIND(hh, l_tx_data_hash, &tx_prev_hash, sizeof(dap_chain_hash_fast_t), l_tx_data_prev); - if(l_tx_data_prev != NULL) { - // fill token in all l_tx_data from prev transaction - - dap_list_t *l_list_out_info_tmp = l_list_out_info; - while(l_list_out_info_tmp) { - dap_tx_data_t *l_tx_data = (dap_tx_data_t*) l_list_out_info_tmp->data; - if(l_tx_data) { - // get token from prev tx - memcpy(l_tx_data->token_ticker, l_tx_data_prev->token_ticker, - sizeof(l_tx_data->token_ticker)); - dap_chain_datum_t *l_datum_prev = get_prev_tx(l_tx_data_prev); - dap_chain_datum_tx_t *l_tx_prev = - l_datum_prev ? (dap_chain_datum_tx_t*) l_datum_prev->data : NULL; - - // find OUT items in prev datum - dap_list_t *l_list_out_prev_items = dap_chain_datum_tx_items_get(l_tx_prev, - TX_ITEM_TYPE_OUT, NULL); - // find OUT item for IN item; - dap_list_t *l_list_out_prev_item = dap_list_nth(l_list_out_prev_items, - l_tx_in->header.tx_out_prev_idx); - dap_chain_tx_out_t *l_tx_prev_out = - l_list_out_prev_item ? - (dap_chain_tx_out_t*) l_list_out_prev_item->data : - NULL; - if(l_tx_prev_out && !memcmp(&l_tx_prev_out->addr, a_addr, sizeof(dap_chain_addr_t))) - l_tx_data->is_use_all_cur_out = true; - - } - l_list_out_info_tmp = dap_list_next(l_list_out_info_tmp); - } + for (dap_list_t *l_list_out = l_list_out_items; l_list_out; l_list_out = dap_list_next(l_list_out)) { + dap_chain_tx_out_t *l_tx_out = (dap_chain_tx_out_t *)l_list_out->data; + if (l_src_addr && !memcmp(&l_tx_out->addr, l_src_addr, sizeof(dap_chain_addr_t))) + continue; // send to self + if (l_src_addr && !memcmp(l_src_addr, a_addr, sizeof(dap_chain_addr_t))) { + if (!l_header_printed) { + dap_string_append_printf(l_str_out, "TX hash %s\n\t%s", l_tx_hash_str, l_time_str); + l_header_printed = true; } - l_list_in_items_tmp = dap_list_next(l_list_in_items_tmp); + char *l_dst_addr_str = dap_chain_addr_to_str(&l_tx_out->addr); + dap_string_append_printf(l_str_out, "\tsend %"DAP_UINT64_FORMAT_U" %s to %s\n", + l_tx_out->header.value, + l_tx_data->token_ticker, + l_dst_addr_str); + DAP_DELETE(l_dst_addr_str); } - // find prev OUT items for IN items - dap_list_t *l_list_in_items2_tmp = l_list_in_items; // go to begin of list - while(l_list_in_items2_tmp) { - const dap_chain_tx_in_t *l_tx_in = (const dap_chain_tx_in_t*) l_list_in_items2_tmp->data; - dap_chain_hash_fast_t tx_prev_hash = l_tx_in->header.tx_prev_hash; - // if first transaction - empty prev OUT item - if(dap_hash_fast_is_blank(&tx_prev_hash)) { - - dap_tx_data_t *l_tx_data = NULL; - dap_list_t *l_list_out_info_tmp = l_list_out_info; - while(l_list_out_info_tmp) { - l_tx_data = (dap_tx_data_t*) l_list_out_info_tmp->data; - if(l_tx_data->token_ticker[0]) - break; - l_list_out_info_tmp = dap_list_next(l_list_out_info_tmp); - } - - // add emit info to ret string - if(l_tx_data && !memcmp(&l_tx_data->addr, a_addr, sizeof(dap_chain_addr_t))) { - dap_list_t *l_records_tmp = l_records_out; - while(l_records_tmp) { - char *tx_hash_str; - if(!dap_strcmp(a_hash_out_type,"hex")) - tx_hash_str = dap_strdup( l_tx_data->tx_hash_str); - else - tx_hash_str = dap_enc_base58_from_hex_str_to_str( l_tx_data->tx_hash_str); - const dap_chain_tx_out_t *l_tx_out = (const dap_chain_tx_out_t*) l_records_tmp->data; - - if(!dap_strcmp(a_hash_out_type,"hex")){ - dap_string_append_printf(l_str_out, "tx hash %s \n emit %"DAP_UINT64_FORMAT_U" %s\n", - tx_hash_str,//l_tx_data->tx_hash_str, - l_tx_out->header.value, - l_tx_data->token_ticker); - } - else { - dap_string_append_printf(l_str_out, "tx hash %s \n emit %"DAP_UINT64_FORMAT_U" %s\n", - l_tx_data->tx_hash_str, - l_tx_out->header.value, - l_tx_data->token_ticker); - } - DAP_DELETE(tx_hash_str); - l_records_tmp = dap_list_next(l_records_tmp); - } - } - //dap_list_free(l_records_out); + if (!memcmp(&l_tx_out->addr, a_addr, sizeof(dap_chain_addr_t))) { + if (!l_header_printed) { + dap_string_append_printf(l_str_out, "TX hash %s\n\t%s", l_tx_hash_str, l_time_str); + l_header_printed = true; } - // in other transactions except first one - else { - //find prev OUT item - dap_tx_data_t *l_tx_data_prev = NULL; - HASH_FIND(hh, l_tx_data_hash, &tx_prev_hash, sizeof(dap_chain_hash_fast_t), l_tx_data_prev); - if(l_tx_data_prev != NULL) { - char *l_src_str = NULL; - bool l_src_str_is_cur = false; - - dap_tx_data_t *l_tx_data = NULL; - dap_list_t *l_list_out_info_tmp = l_list_out_info; - while(l_list_out_info_tmp) { - l_tx_data = (dap_tx_data_t*) l_list_out_info_tmp->data; - if(l_tx_data->token_ticker[0]) - break; - l_list_out_info_tmp = dap_list_next(l_list_out_info_tmp); - } - if(l_tx_data) { - // get token from prev tx - memcpy(l_tx_data->token_ticker, l_tx_data_prev->token_ticker, - sizeof(l_tx_data->token_ticker)); - - dap_chain_datum_t *l_datum_prev = get_prev_tx(l_tx_data_prev); - dap_chain_datum_tx_t *l_tx_prev = - l_datum_prev ? (dap_chain_datum_tx_t*) l_datum_prev->data : NULL; - - // find OUT items in prev datum - dap_list_t *l_list_out_prev_items = dap_chain_datum_tx_items_get(l_tx_prev, - TX_ITEM_TYPE_OUT, NULL); - // find OUT item for IN item; - dap_list_t *l_list_out_prev_item = dap_list_nth(l_list_out_prev_items, - l_tx_in->header.tx_out_prev_idx); - dap_chain_tx_out_t *l_tx_prev_out = - l_list_out_prev_item ? - (dap_chain_tx_out_t*) l_list_out_prev_item->data : - NULL; - // if use src addr - bool l_is_use_src_addr = false; - // find source addrs - dap_string_t *l_src_addr = dap_string_new(NULL); - { - // find IN items in prev datum - for get destination addr - dap_list_t *l_list_in_prev_items = dap_chain_datum_tx_items_get(l_tx_prev, - TX_ITEM_TYPE_IN, NULL); - dap_list_t *l_list_tmp = l_list_in_prev_items; - while(l_list_tmp) { - dap_chain_tx_in_t *l_tx_prev_in = l_list_tmp->data; - dap_chain_hash_fast_t l_tx_prev_prev_hash = - l_tx_prev_in->header.tx_prev_hash; - //find prev OUT item - dap_tx_data_t *l_tx_data_prev_prev = NULL; - HASH_FIND(hh, l_tx_data_hash, &l_tx_prev_prev_hash, - sizeof(dap_chain_hash_fast_t), l_tx_data_prev_prev); - if(l_tx_data_prev_prev) { - // if use src addr - if(l_tx_data_prev_prev && - !memcmp(&l_tx_data_prev_prev->addr, a_addr, - sizeof(dap_chain_addr_t))) - l_is_use_src_addr = true; - char *l_str = dap_chain_addr_to_str(&l_tx_data_prev_prev->addr); - if(l_src_addr->len > 0) - dap_string_append_printf(l_src_addr, "\n %s", l_str); - else - dap_string_append_printf(l_src_addr, "%s", l_str); // first record - DAP_DELETE(l_str); - } - l_list_tmp = dap_list_next(l_list_tmp); - } - } - - l_src_str_is_cur = l_is_use_src_addr; - if(l_src_addr->len <= 1) { - l_src_str = - (l_tx_data) ? dap_chain_addr_to_str(&l_tx_data->addr) : - NULL; - if(l_tx_prev_out && !memcmp(&l_tx_prev_out->addr, a_addr, sizeof(dap_chain_addr_t))) - l_src_str_is_cur = true; - dap_string_free(l_src_addr, true); - } - else - l_src_str = dap_string_free(l_src_addr, false); - - if(l_tx_prev_out) { - char *l_dst_to_str = dap_chain_addr_to_str(&l_tx_prev_out->addr); - // if use dst addr - bool l_is_use_dst_addr = false; - if(!memcmp(&l_tx_prev_out->addr, a_addr, sizeof(dap_chain_addr_t))) - l_is_use_dst_addr = true; - char *tx_hash_str; - if(!dap_strcmp(a_hash_out_type, "hex")) - tx_hash_str = dap_strdup(l_tx_data->tx_hash_str); - else - tx_hash_str = dap_enc_base58_from_hex_str_to_str(l_tx_data->tx_hash_str); - if(l_is_use_src_addr && !l_is_use_dst_addr) { - dap_string_append_printf(l_str_out, - "tx hash %s \n %s in send %"DAP_UINT64_FORMAT_U" %s from %s\n to %s\n", - tx_hash_str,//l_tx_data->tx_hash_str, - l_time_str ? l_time_str : "", - l_tx_prev_out->header.value, - l_tx_data->token_ticker, - l_src_str ? l_src_str : "", - l_dst_to_str); - } else if(l_is_use_dst_addr && !l_is_use_src_addr) { - if(!l_src_str_is_cur) - dap_string_append_printf(l_str_out, - "tx hash %s \n %s in recv %"DAP_UINT64_FORMAT_U" %s from %s\n", - tx_hash_str,//l_tx_data->tx_hash_str, - l_time_str ? l_time_str : "", - l_tx_prev_out->header.value, - l_tx_data->token_ticker, - l_src_str ? l_src_str : ""); - } - DAP_DELETE(tx_hash_str); - DAP_DELETE(l_dst_to_str); - } - dap_list_free(l_list_out_prev_items); - } - - // OUT items - dap_list_t *l_records_tmp = l_records_out; - while(l_records_tmp) { - - const dap_chain_tx_out_t *l_tx_out = (const dap_chain_tx_out_t*) l_records_tmp->data; - - if(( l_tx_data && l_tx_data->is_use_all_cur_out ) - || !memcmp(&l_tx_out->addr, a_addr, sizeof(dap_chain_addr_t))) { - - char *l_addr_str = (l_tx_out) ? dap_chain_addr_to_str(&l_tx_out->addr) : NULL; - - char *tx_hash_str; - if(!dap_strcmp(a_hash_out_type, "hex")) - tx_hash_str = dap_strdup(l_tx_data->tx_hash_str); - else - tx_hash_str = dap_enc_base58_from_hex_str_to_str(l_tx_data->tx_hash_str); - if(l_tx_out && a_addr && memcmp(&l_tx_out->addr, a_addr, sizeof(dap_chain_addr_t))==0) { - if(!l_src_str_is_cur) - dap_string_append_printf(l_str_out, - "tx hash %s \n %s recv %"DAP_UINT64_FORMAT_U" %s from %s\n", - tx_hash_str,//l_tx_data->tx_hash_str, - l_time_str ? l_time_str : "", - l_tx_out->header.value, - l_tx_data_prev->token_ticker, - l_src_str ? l_src_str : "?"); - // break search prev OUT items for IN items - l_list_in_items2_tmp = NULL; - } - else { - dap_string_append_printf(l_str_out, - "tx hash %s \n %s send %"DAP_UINT64_FORMAT_U" %s to %s\n", - tx_hash_str,//l_tx_data->tx_hash_str, - l_time_str ? l_time_str : "", - l_tx_out->header.value, - l_tx_data_prev->token_ticker, - l_addr_str ? l_addr_str : ""); - l_list_in_items2_tmp = NULL; - } - DAP_DELETE(tx_hash_str); - DAP_DELETE(l_addr_str); - } - - l_records_tmp = dap_list_next(l_records_tmp); - } - //dap_list_free(l_records_out); - DAP_DELETE(l_src_str); - - } - } - l_list_in_items2_tmp = dap_list_next(l_list_in_items2_tmp); + dap_string_append_printf(l_str_out, "\trecv %"DAP_UINT64_FORMAT_U" %s from %s\n", + l_tx_out->header.value, + l_tx_data->token_ticker, + l_src_addr ? l_src_addr_str : "emission"); } - // l_list_in_items_tmp = dap_list_next(l_list_in_items_tmp); - // } } - - if(l_list_tx_token) - dap_list_free(l_list_tx_token); - if(l_list_out_items) - dap_list_free(l_list_out_items); - if(l_list_in_items) - dap_list_free(l_list_in_items); - dap_list_free(l_records_out); - dap_list_free(l_list_out_info); + dap_list_free(l_list_out_items); + DAP_DELETE(l_src_addr_str); + DAP_DELETE(l_tx_hash_str); DAP_DELETE(l_time_str); - // go to next transaction - l_atom = a_chain->callback_atom_iter_get_next(l_atom_iter, &l_atom_size); + size_t l_datum_size = dap_chain_datum_tx_get_size(l_tx) + sizeof(dap_chain_datum_t); + l_tx_data->datum = DAP_NEW_SIZE(dap_chain_datum_t, l_datum_size); + memcpy(l_tx_data->datum, l_datum, l_datum_size); // for GDB chains with data replace with each itreation + HASH_ADD(hh, l_tx_data_hash, tx_hash, sizeof(dap_chain_hash_fast_t), l_tx_data); } DAP_DELETE(l_datums); + // go to next atom (event or block) + l_atom = a_chain->callback_atom_iter_get_next(l_atom_iter, &l_atom_size); } // delete hashes dap_tx_data_t *l_iter_current, *l_item_tmp; - HASH_ITER(hh, l_tx_data_hash , l_iter_current, l_item_tmp) - { - // delete datum - DAP_DELETE(l_iter_current->datum); + HASH_ITER(hh, l_tx_data_hash , l_iter_current, l_item_tmp) { + HASH_DEL(l_tx_data_hash, l_iter_current); // delete struct DAP_DELETE(l_iter_current); - HASH_DEL(l_tx_data_hash, l_iter_current); } // if no history if(!l_str_out->len) - dap_string_append(l_str_out, " empty"); + dap_string_append(l_str_out, "\tempty"); char *l_ret_str = l_str_out ? dap_string_free(l_str_out, false) : NULL; return l_ret_str; } diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 344b0e06dc6d4031eb6f0ea06aa81e87beefa0fe..690f5b38cd0746b94dc5760655b2fd8160c3c1d5 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -404,11 +404,12 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha dap_chain_id_t l_chain_id = {}; dap_chain_cell_id_t l_cell_id = {}; if (a_pkt_type == DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB) { - if(s_stream_ch_chain_debug_more) - log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" synced GDB. Going to update chains", l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr )); - // TODO check if target net state == NET_STATE_SYNC_GDB to not synchronize chains, if it - l_node_client->cur_chain = l_net->pub.chains; - l_node_client->cur_cell = l_node_client->cur_chain ? l_node_client->cur_chain->cells : NULL; + if (dap_chain_net_get_target_state(l_net) != NET_STATE_SYNC_GDB) { + if(s_stream_ch_chain_debug_more) + log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" synced GDB. Going to update chains", l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr )); + l_node_client->cur_chain = l_net->pub.chains; + l_node_client->cur_cell = l_node_client->cur_chain ? l_node_client->cur_chain->cells : NULL; + } } else { // Check if we over with it before if ( ! l_node_client->cur_cell ){ @@ -451,7 +452,10 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha dap_chain_node_addr_t * l_node_addr = dap_chain_net_get_cur_addr(l_net); log_it(L_INFO, "In: State node %s."NODE_ADDR_FP_STR" is SYNCED",l_net->pub.name, NODE_ADDR_FP_ARGS(l_node_addr) ); l_node_client->state = NODE_CLIENT_STATE_SYNCED; - dap_chain_net_set_state(l_net, NET_STATE_ONLINE); + if (dap_chain_net_get_target_state(l_net) == NET_STATE_ONLINE) + dap_chain_net_set_state(l_net, NET_STATE_ONLINE); + else + dap_chain_net_state_go_to(l_net, NET_STATE_OFFLINE); #ifndef _WIN32 pthread_cond_broadcast(&l_node_client->wait_cond); #else diff --git a/modules/net/dap_chain_node_dns_server.c b/modules/net/dap_chain_node_dns_server.c index 66090c88cf0d36d692757926c689fa92d0c1f0da..3f933f7d7b99c509775186d41163d0f5d083b1fb 100644 --- a/modules/net/dap_chain_node_dns_server.c +++ b/modules/net/dap_chain_node_dns_server.c @@ -310,8 +310,8 @@ dap_chain_node_info_t *dap_dns_resolve_hostname(char *str) if (!l_nodes_count || !l_objs) return NULL; dap_chain_node_info_t *l_node_candidate; - for (int i = 0; i < 5; i++) { - // 5 tryes for non empty address & port + for (int i = 0; i < 50; i++) { + // 50 tryes for non empty address & port size_t l_node_num = rand() % l_nodes_count; l_node_candidate = (dap_chain_node_info_t *)l_objs[l_node_num].value; if (l_node_candidate->hdr.ext_addr_v4.s_addr && l_node_candidate->hdr.ext_port) diff --git a/modules/net/include/dap_chain_net.h b/modules/net/include/dap_chain_net.h index bb7fe912de58ebdae8c405072b70b057d41b271f..d955cef77b7891bc08f1d0a35c0e5a9714c54a34 100644 --- a/modules/net/include/dap_chain_net.h +++ b/modules/net/include/dap_chain_net.h @@ -75,6 +75,7 @@ typedef struct dap_chain_net{ // checks bool token_emission_signs_verify; + bool mempool_autoproc; dap_chain_t * chains; // double-linked list of chains dap_chain_t * default_chain; @@ -90,14 +91,14 @@ void dap_chain_net_deinit(void); void dap_chain_net_load_all(); int dap_chain_net_state_go_to(dap_chain_net_t * a_net, dap_chain_net_state_t a_new_state); +dap_chain_net_state_t dap_chain_net_get_target_state(dap_chain_net_t *a_net); inline static int dap_chain_net_start(dap_chain_net_t * a_net){ return dap_chain_net_state_go_to(a_net,NET_STATE_ONLINE); } inline static int dap_chain_net_stop(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_OFFLINE); } inline static int dap_chain_net_links_establish(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_LINKS_ESTABLISHED); } -inline static int dap_chain_net_sync_chains(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_SYNC_CHAINS); } inline static int dap_chain_net_sync_gdb(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_SYNC_GDB); } -inline static int dap_chain_net_sync_all(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_SYNC_CHAINS); }//NET_STATE_ONLINE - +inline static int dap_chain_net_sync_chains(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_SYNC_CHAINS); } +inline static int dap_chain_net_sync_all(dap_chain_net_t * a_net) { return dap_chain_net_state_go_to(a_net,NET_STATE_SYNC_CHAINS); } void dap_chain_net_set_state ( dap_chain_net_t * l_net, dap_chain_net_state_t a_state); dap_chain_net_state_t dap_chain_net_get_state ( dap_chain_net_t * l_net); diff --git a/modules/net/include/dap_chain_node.h b/modules/net/include/dap_chain_node.h index 216d6a4261c130a77397d44fc5eecd8cc75856af..6b6a810667a10eec933d95262cdc46dcc79d23a4 100644 --- a/modules/net/include/dap_chain_node.h +++ b/modules/net/include/dap_chain_node.h @@ -144,7 +144,7 @@ inline static char* dap_chain_node_addr_to_hash_str(dap_chain_node_addr_t *addre return a_key; } -bool dap_chain_node_mempool_process(dap_chain_t *a_chain, dap_chain_node_role_t a_role, dap_chain_datum_t *a_datum); +int dap_chain_node_mempool_process(dap_chain_t *a_chain, dap_chain_node_role_t a_role, dap_chain_datum_t *a_datum); bool dap_chain_node_mempool_autoproc_init(); void dap_chain_node_mempool_autoproc_deinit(); void dap_chain_node_mempool_autoproc_notify(void *a_arg, const char a_op_code, const char *a_group, diff --git a/modules/net/srv/dap_chain_net_srv.c b/modules/net/srv/dap_chain_net_srv.c index eb0be7492f210845ea4964a463cefa956d7ed227..4e8d8cc338ec19b1ca26029e07115538d5457f3c 100644 --- a/modules/net/srv/dap_chain_net_srv.c +++ b/modules/net/srv/dap_chain_net_srv.c @@ -535,11 +535,16 @@ static int s_cli_net_srv( int argc, char **argv, char **a_str_reply) } #ifdef DAP_MODULES_DYNAMIC else if( dap_strcmp( l_order_str, "recheck" ) == 0 ){ - //int dap_chain_net_srv_vpn_cdb_server_list_check_orders(dap_chain_net_t *a_net); int (*dap_chain_net_srv_vpn_cdb_server_list_check_orders)(dap_chain_net_t *a_net); dap_chain_net_srv_vpn_cdb_server_list_check_orders = dap_modules_dynamic_get_cdb_func("dap_chain_net_srv_vpn_cdb_server_list_check_orders"); int l_init_res = dap_chain_net_srv_vpn_cdb_server_list_check_orders ? dap_chain_net_srv_vpn_cdb_server_list_check_orders(l_net) : -5; - ret = (l_init_res > 0) ? 0 : -10; + if (l_init_res >= 0) { + dap_string_append_printf(l_string_ret, "Orders recheck started\n"); + ret = 0; + } else { + dap_string_append_printf(l_string_ret, "Orders recheck not started, code %d\n", l_init_res); + ret = -10; + } }else if( dap_strcmp( l_order_str, "static" ) == 0 ){ // find the subcommand directly after the 'order' command diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index 5c280e7e697c21e01d0aff944d401f2d0228166e..d963628e1eeeebbb93fe21b9a897c7cf0a858e9f 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -58,7 +58,7 @@ #include <time.h> #include <errno.h> #include <signal.h> - +#include <stddef.h> #include "uthash.h" #include "utlist.h" @@ -192,6 +192,7 @@ static void s_es_tun_new(dap_events_socket_t * a_es, void * arg); static void s_es_tun_delete(dap_events_socket_t * a_es, void * arg); static void s_es_tun_read(dap_events_socket_t * a_es, void * arg); static void s_es_tun_error(dap_events_socket_t * a_es,int arg); +static void s_es_tun_write(dap_events_socket_t *a_es, void *arg); static void s_tun_recv_msg_callback(dap_events_socket_t * a_esocket_queue, void * a_msg ); static void s_tun_send_msg_ip_assigned(uint32_t a_worker_id, dap_chain_net_srv_ch_vpn_t * a_ch_vpn, struct in_addr a_addr); @@ -204,9 +205,11 @@ static int s_tun_attach_queue(int fd); static bool s_tun_client_send_data(dap_chain_net_srv_ch_vpn_info_t * a_ch_vpn_info, const void * a_data, size_t a_data_size); -static size_t s_stream_session_esocket_send(dap_chain_net_srv_stream_session_t * l_srv_session, dap_events_socket_t * l_es, const void * a_data, size_t a_data_size ); static bool s_tun_client_send_data_unsafe(dap_chain_net_srv_ch_vpn_t * l_ch_vpn, ch_vpn_pkt_t * l_pkt_out); +static void s_tun_fifo_write(dap_chain_net_srv_vpn_tun_socket_t *a_tun, ch_vpn_pkt_t *a_pkt); +static ch_vpn_pkt_t *s_tun_fifo_read(dap_chain_net_srv_vpn_tun_socket_t *a_tun); + static bool s_tun_client_send_data_unsafe(dap_chain_net_srv_ch_vpn_t * l_ch_vpn, ch_vpn_pkt_t * l_pkt_out) { @@ -519,10 +522,12 @@ static dap_events_socket_t * s_tun_event_stream_create(dap_worker_t * a_worker, l_s_callbacks.read_callback = s_es_tun_read; l_s_callbacks.error_callback = s_es_tun_error; l_s_callbacks.delete_callback = s_es_tun_delete; + l_s_callbacks.write_callback = s_es_tun_write; dap_events_socket_t * l_es = dap_events_socket_wrap_no_add(a_worker->events , a_tun_fd, &l_s_callbacks); l_es->type = DESCRIPTOR_TYPE_FILE; + l_es->no_close = true; dap_events_socket_assign_on_worker_mt(l_es, a_worker); return l_es; @@ -1454,23 +1459,46 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) // for client only case VPN_PACKET_OP_CODE_VPN_RECV:{ a_ch->stream->esocket->last_ping_request = time(NULL); // not ping, but better ;-) - //ch_sf_tun_client_send(CH_VPN(a_ch), l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size); dap_events_socket_t *l_es = dap_chain_net_vpn_client_tun_get_esock(); // Find tun socket for current worker dap_chain_net_srv_vpn_tun_socket_t *l_tun = l_es ? l_es->_inheritor : NULL; //ch_sf_tun_socket_t * l_tun = s_tun_sockets[a_ch->stream_worker->worker->id]; assert(l_tun); - s_stream_session_esocket_send(l_srv_session, l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size); + size_t l_ret = dap_events_socket_write_unsafe(l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size); + if (l_ret == l_vpn_pkt->header.op_data.data_size) { + l_srv_session->stats.packets_sent++; + l_srv_session->stats.bytes_sent += l_ret; + } else if (l_ret > 0) { + log_it (L_WARNING, "Lost %zd bytes, buffer overflow", l_vpn_pkt->header.op_data.data_size - l_ret); + l_srv_session->stats.bytes_sent_lost += (l_vpn_pkt->header.op_data.data_size - l_ret); + l_srv_session->stats.packets_sent_lost++; + } } break; // for server only case VPN_PACKET_OP_CODE_VPN_SEND: { dap_chain_net_srv_vpn_tun_socket_t * l_tun = s_tun_sockets[a_ch->stream_worker->worker->id]; assert(l_tun); - // Unsafely send it - size_t l_ret = s_stream_session_esocket_send(l_srv_session, l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size); - if( l_ret) - s_update_limits (a_ch, l_srv_session, l_usage,l_ret ); + size_t l_size_to_send = l_vpn_pkt->header.op_data.data_size; + ssize_t l_ret = write(l_tun->es->fd, l_vpn_pkt->data, l_size_to_send); + if (l_ret > 0) { + s_update_limits(a_ch, l_srv_session, l_usage, l_ret); + if (l_ret == l_size_to_send) { + l_srv_session->stats.packets_sent++; + l_srv_session->stats.bytes_sent += l_ret; + } else { + log_it (L_WARNING, "Lost %zd bytes", l_size_to_send - l_ret); + l_srv_session->stats.bytes_sent_lost += (l_size_to_send - l_ret); + l_srv_session->stats.packets_sent_lost++; + } + } else if (errno == EAGAIN || errno == EWOULDBLOCK) { + s_tun_fifo_write(l_tun, l_vpn_pkt); + dap_events_socket_set_writable_unsafe(l_tun->es, true); + } else { + char l_errbuf[128]; + strerror_r(errno, l_errbuf, sizeof (l_errbuf)); + log_it(L_WARNING,"Error with data sent: \"%s\" code %d", l_errbuf, errno); + } } break; default: log_it(L_WARNING, "Can't process SF type 0x%02x", l_vpn_pkt->header.op_code); @@ -1478,59 +1506,6 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } } -/** - * @brief s_stream_session_esocket_send - * @param l_srv_session - * @param l_es - * @param a_data - * @param a_data_size - */ -static size_t s_stream_session_esocket_send(dap_chain_net_srv_stream_session_t * l_srv_session, dap_events_socket_t * l_es, const void * a_data, size_t a_data_size ) -{ - // Lets first try to send it directly with write() call - ssize_t l_direct_wrote; - size_t l_ret = 0; - if (l_es->type == DESCRIPTOR_TYPE_FILE ) - l_direct_wrote = write(l_es->fd, a_data, a_data_size); - else - l_direct_wrote = send(l_es->fd, a_data, a_data_size, MSG_DONTWAIT | MSG_NOSIGNAL); - int l_errno = errno; - - size_t l_data_left_to_send=0; - if (l_direct_wrote > 0){ - l_ret += l_direct_wrote; - if((size_t) l_direct_wrote < a_data_size){ // If we sent not all - lets put tail in buffer - l_data_left_to_send = a_data_size-l_direct_wrote; - }else{ - l_srv_session->stats.packets_sent++; - l_srv_session->stats.bytes_sent+= l_direct_wrote; - } - }else{ - l_data_left_to_send = a_data_size; - l_direct_wrote=0; - if(l_errno != EAGAIN && l_errno != EWOULDBLOCK){ - char l_errbuf[128]; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it(L_WARNING,"Error with data sent: \"%s\" code %d",l_errbuf, l_errno); - } - } - - if(l_data_left_to_send){ - //if ( dap_events_socket_write_unsafe( l_es, a_data +l_direct_wrote,l_data_left_to_send - // ) < l_data_left_to_send ){ - //log_it(L_WARNING,"Loosing data, probably buffers are overfilling, lost %zd bytes", l_data_left_to_send); - log_it(L_WARNING,"Loosing data, lost %zd bytes", l_data_left_to_send); - l_srv_session->stats.bytes_sent_lost += l_data_left_to_send; - l_srv_session->stats.packets_sent_lost++; - /*}else{ - l_ret += l_data_left_to_send; - l_srv_session->stats.packets_sent++; - l_srv_session->stats.bytes_sent+= l_direct_wrote; - }*/ - } - return l_ret; -} - /** * @brief stream_sf_packet_out Packet Out Ch callback * @param ch @@ -1572,6 +1547,25 @@ static void s_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) } +static void s_tun_fifo_write(dap_chain_net_srv_vpn_tun_socket_t *a_tun, ch_vpn_pkt_t *a_pkt) +{ + if (!a_tun || !a_pkt) + return; + a_tun->fifo = dap_list_append(a_tun->fifo, DAP_DUP_SIZE(a_pkt, + a_pkt->header.op_data.data_size + sizeof(a_pkt->header))); +} + +static ch_vpn_pkt_t *s_tun_fifo_read(dap_chain_net_srv_vpn_tun_socket_t *a_tun) +{ + if (!a_tun || !a_tun->fifo) + return NULL; + ch_vpn_pkt_t *l_ret = (ch_vpn_pkt_t *)a_tun->fifo->data; + dap_list_t *l_to_delete = a_tun->fifo; + a_tun->fifo = a_tun->fifo->next; + DAP_DELETE(l_to_delete); + return l_ret; +} + /** * @brief m_es_tun_delete * @param a_es @@ -1580,9 +1574,34 @@ static void s_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) static void s_es_tun_delete(dap_events_socket_t * a_es, void * arg) { (void) arg; - s_tun_sockets[a_es->worker->id] = NULL; - dap_events_socket_remove_and_delete_unsafe(s_tun_sockets_queue_msg[a_es->worker->id],false); - log_it(L_NOTICE,"Destroyed TUN event socket"); + if (a_es->worker) { + s_tun_sockets[a_es->worker->id] = NULL; + dap_events_socket_remove_and_delete_unsafe(s_tun_sockets_queue_msg[a_es->worker->id],false); + log_it(L_NOTICE,"Destroyed TUN event socket"); + } +} + +/** + * @brief s_es_tun_write + * @param a_es + * @param arg + */ +static void s_es_tun_write(dap_events_socket_t *a_es, void *arg) +{ + (void) arg; + dap_chain_net_srv_vpn_tun_socket_t *l_tun = CH_SF_TUN_SOCKET(a_es); + assert(l_tun); + ch_vpn_pkt_t *l_vpn_pkt = (ch_vpn_pkt_t *)l_tun->fifo->data; + if (!l_vpn_pkt) + return; + a_es->buf_out_zero_count = 0; + size_t l_size_to_send = l_vpn_pkt->header.op_data.data_size; + ssize_t l_ret = write(l_tun->es->fd, l_vpn_pkt->data, l_size_to_send); + if (l_ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + return; + } + s_tun_fifo_read(l_tun); + DAP_DELETE(l_vpn_pkt); } /** diff --git a/modules/service/vpn/include/dap_chain_net_srv_vpn.h b/modules/service/vpn/include/dap_chain_net_srv_vpn.h index 810330bb7b254f9dde8bb34a31a4b9db14fd1382..10e3edf36807e1b846f78b171efa06b8fff79520 100644 --- a/modules/service/vpn/include/dap_chain_net_srv_vpn.h +++ b/modules/service/vpn/include/dap_chain_net_srv_vpn.h @@ -112,6 +112,7 @@ typedef struct dap_chain_net_srv_vpn_tun_socket { dap_events_socket_t * es; dap_chain_net_srv_ch_vpn_info_t * clients; // Remote clients identified by destination address dap_events_socket_t ** queue_tun_msg_input; + dap_list_t *fifo; //UT_hash_handle hh; }dap_chain_net_srv_vpn_tun_socket_t;