diff --git a/CMakeLists.txt b/CMakeLists.txt index 5c920801e9d055f394956fa711ad290bc4e86f77..463469d52080c7f11921e1b637bf9947728f8a09 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.6-12") +set(CELLFRAME_SDK_NATIVE_VERSION "2.6-13") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/dap-sdk/core/include/dap_math_ops.h b/dap-sdk/core/include/dap_math_ops.h index 0d398559b478af6193478b1ba15123bff8a66458..ba8d4af9a9c9a653d4ffb16393ec0a706ffaaf01 100755 --- a/dap-sdk/core/include/dap_math_ops.h +++ b/dap-sdk/core/include/dap_math_ops.h @@ -27,5 +27,4 @@ typedef union int128{int64_t i64[2];} int128_t; #endif - #endif diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 630e83a902d3811b41c9ad95751c61e790a70310..67ea0a664ce25a198856156587105542f1fbd861 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -161,7 +161,6 @@ int dap_client_pvt_disconnect_all_n_wait(dap_client_pvt_t *a_client_pvt) //dap_client_pvt_t *a_client_pvt = (a_client) ? DAP_CLIENT_PVT(a_client) : NULL; if(!a_client_pvt) return -1; - pthread_mutex_lock(&a_client_pvt->disconnected_mutex); dap_client_go_stage(a_client_pvt->client, STAGE_BEGIN, s_client_pvt_disconnected ); pthread_cond_wait(&a_client_pvt->disconnected_cond, &a_client_pvt->disconnected_mutex); @@ -272,7 +271,6 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) break; case STAGE_STREAM_CTL: { log_it(L_INFO, "Go to stage STREAM_CTL: prepare the request"); - char *l_request = dap_strdup_printf("%d", DAP_CLIENT_PROTOCOL_VERSION); size_t l_request_size = dap_strlen(l_request); log_it(L_DEBUG, "STREAM_CTL request size %u", strlen(l_request)); @@ -309,18 +307,18 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) a_client_pvt->stage_status = STAGE_STATUS_ERROR; break; } - #ifdef _WIN32 +#ifdef _WIN32 { int buffsize = 65536*4; int optsize = sizeof( int ); setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (char *)&buffsize, &optsize ); setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, &optsize ); } - #else +#else int buffsize = 65536*4; setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (const void *) &buffsize, sizeof(int)); setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (const void *) &buffsize, sizeof(int)); - #endif +#endif // Wrap socket and setup callbacks static dap_events_socket_callbacks_t l_s_callbacks = { @@ -393,8 +391,6 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) log_it(L_INFO,"Connecting to remote %s:%s",a_client_pvt->uplink_addr, a_client_pvt->uplink_port); } } - - } break; case STAGE_STREAM_CONNECTED: { @@ -1091,6 +1087,7 @@ static void s_stream_es_callback_delete(dap_events_socket_t *a_es, void *arg) log_it(L_INFO, "Stream delete callback"); dap_client_pvt_t * l_client_pvt =(dap_client_pvt_t*) a_es->_inheritor; + a_es->_inheritor = NULL; // To prevent delete in reactor if(l_client_pvt == NULL) { diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 8a6ffdd995fb2c87af00bac16533f6f05b129c74..1985ba99853ce53bf115bf284421beb8971baac2 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -732,7 +732,6 @@ void dap_events_socket_worker_poll_update_unsafe(dap_events_socket_t * a_esocket log_it(L_ERROR, "Wrong poll index when remove from worker (unsafe): %u when total count %u", a_esocket->poll_index, a_esocket->worker->poll_count); } - #else #error "Not defined dap_events_socket_set_writable_unsafe for your platform" #endif @@ -791,40 +790,51 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool if ( a_es->worker) dap_events_socket_remove_from_worker_unsafe(a_es, a_es->worker); - if (a_es->events){ // It could be socket NOT from events - pthread_rwlock_wrlock( &a_es->events->sockets_rwlock ); - if(!dap_events_socket_find_unsafe(a_es->socket, a_es->events)){ - log_it( L_ERROR, "dap_events_socket 0x%x already deleted", a_es); - pthread_rwlock_unlock( &a_es->events->sockets_rwlock ); - return ; - } - - if(a_es->events->sockets) - HASH_DEL( a_es->events->sockets, a_es ); - pthread_rwlock_unlock( &a_es->events->sockets_rwlock ); - } //log_it( L_DEBUG, "dap_events_socket wrapped around %d socket is removed", a_es->socket ); if( a_es->callbacks.delete_callback ) a_es->callbacks.delete_callback( a_es, NULL ); // Init internal structure - if ( a_es->_inheritor && !preserve_inheritor ) - DAP_DELETE( a_es->_inheritor ); + dap_events_socket_delete_unsafe(a_es, preserve_inheritor); +} - if ( a_es->socket && a_es->socket != -1) { -#ifdef _WIN32 - closesocket( a_es->socket ); -#else - close( a_es->socket ); -#ifdef DAP_EVENTS_CAPS_QUEUE_PIPE2 - if( a_es->type == DESCRIPTOR_TYPE_QUEUE){ - close( a_es->fd2); +/** + * @brief dap_events_socket_delete_unsafe + * @param a_esocket + * @param a_preserve_inheritor + */ +void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_preserve_inheritor) +{ + if (a_esocket->events){ // It could be socket NOT from events + pthread_rwlock_wrlock( &a_esocket->events->sockets_rwlock ); + if(!dap_events_socket_find_unsafe(a_esocket->socket, a_esocket->events)){ + log_it( L_ERROR, "dap_events_socket 0x%x already deleted", a_esocket); + pthread_rwlock_unlock( &a_esocket->events->sockets_rwlock ); + return ; } -#endif -#endif + if(a_esocket->events->sockets) + HASH_DEL( a_esocket->events->sockets, a_esocket ); + pthread_rwlock_unlock( &a_esocket->events->sockets_rwlock ); + } + + if ( a_esocket->_inheritor && !a_preserve_inheritor ) + DAP_DELETE( a_esocket->_inheritor ); + + if ( a_esocket->socket && a_esocket->socket != -1) { + #ifdef _WIN32 + closesocket( a_esocket->socket ); + #else + close( a_esocket->socket ); + #ifdef DAP_EVENTS_CAPS_QUEUE_PIPE2 + if( a_esocket->type == DESCRIPTOR_TYPE_QUEUE){ + close( a_esocket->fd2); + } + #endif + + #endif } - DAP_DELETE( a_es ); + DAP_DELETE( a_esocket ); } /** diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index d50e05a2ccad7dd2a087a977680806dff7080ece..1074b9f7096bf22b6caa2b3f59728f263a985c7c 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -395,10 +395,8 @@ void *dap_worker_thread(void *arg) log_it(L_ERROR, "Some error occured in send(): %s", strerror(errno)); l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; l_cur->buf_out_size = 0; - } }else{ - //log_it(L_DEBUG, "Output: %u from %u bytes are sent ", l_bytes_sent,l_cur->buf_out_size); if (l_bytes_sent) { if ( l_bytes_sent <= (ssize_t) l_cur->buf_out_size ){ @@ -479,8 +477,9 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg; dap_worker_t * w = a_es->worker; //log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new); - if(dap_events_socket_check_unsafe( w, a_es)){ - log_it(L_ERROR, "Already assigned %d (%p), you're doing smth wrong", a_es->socket, a_es); + if(dap_events_socket_check_unsafe( w, l_es_new)){ + //log_it(L_ERROR, "Already assigned %d (%p), you're doing smth wrong", l_es_new->socket, l_es_new); + // Socket already present in worker, it's OK return; } diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 86e5560cad76800e4aa0e017b2d443e63ffa04ab..feb65aa3660c939c9788b732408926580403834c 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -31,6 +31,7 @@ #include "dap_common.h" #define DAP_EVENTS_SOCKET_MAX 8194 + // Caps for different platforms #if defined(DAP_OS_LINUX) // #define DAP_EVENTS_CAPS_EPOLL @@ -218,6 +219,8 @@ dap_events_socket_t * dap_events_socket_create_type_pipe_mt(dap_worker_t * a_w, int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg); int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value); +void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_preserve_inheritor); + dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, int a_sock, dap_events_socket_callbacks_t *a_callbacks ); dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct dap_events *a_events, diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index 9904f8dac112dc590379281f94be244dfe0f089d..e57dc4e405009b108375f5d6491044d5f37750bd 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -338,7 +338,6 @@ dap_stream_t* dap_stream_new_es_client(dap_events_socket_t * a_esocket) ret->esocket = a_esocket; ret->buf_defrag_size=0; ret->is_client_to_uplink = true; - log_it(L_NOTICE,"New stream with events socket instance for %s",a_esocket->hostaddr); return ret; } diff --git a/dap-sdk/net/stream/stream/dap_stream_ctl.c b/dap-sdk/net/stream/stream/dap_stream_ctl.c index 52bf55be3d7ea1de459d6ebd43193bb756723631..54d92f87fe718056876c25d058891697443a2f80 100644 --- a/dap-sdk/net/stream/stream/dap_stream_ctl.c +++ b/dap-sdk/net/stream/stream/dap_stream_ctl.c @@ -149,9 +149,10 @@ void s_proc(struct dap_http_simple *a_http_simple, void * a_arg) log_it(L_INFO, "legacy encryption mode used (OAES)"); l_enc_type = DAP_ENC_KEY_TYPE_OAES; l_new_session = true; - } - if(l_new_session){ + }else + log_it(L_DEBUG,"Encryption type %s (enc headers %d)",dap_enc_get_type_name(l_enc_type), l_enc_headers); + if(l_new_session){ ss = dap_stream_session_pure_new(); strncpy(ss->active_channels, l_channels_str, l_channels_str_size); char *key_str = calloc(1, KEX_KEY_STR_SIZE+1); diff --git a/modules/chain/dap_chain_ledger.c b/modules/chain/dap_chain_ledger.c index 8ede47afb507de8b89ac8214f66a65ea66c042b7..f96ad62b397fa91d24ef8df1398a7132462ec3e8 100644 --- a/modules/chain/dap_chain_ledger.c +++ b/modules/chain/dap_chain_ledger.c @@ -334,7 +334,7 @@ int dap_chain_ledger_token_load(dap_ledger_t *a_ledger, dap_chain_datum_token_t * @param a_ledger */ static void s_treshold_emissions_proc(dap_ledger_t * a_ledger) -{ +{ bool l_success; do { l_success = false; @@ -343,13 +343,15 @@ static void s_treshold_emissions_proc(dap_ledger_t * a_ledger) int l_res = dap_chain_ledger_token_emission_add(a_ledger, l_emission_item->datum_token_emission, l_emission_item->datum_token_emission_size); if (!l_res) { + pthread_rwlock_wrlock(&PVT(a_ledger)->treshold_emissions_rwlock); HASH_DEL(PVT(a_ledger)->treshold_emissions, l_emission_item); + pthread_rwlock_unlock(&PVT(a_ledger)->treshold_emissions_rwlock); DAP_DELETE(l_emission_item->datum_token_emission); DAP_DELETE(l_emission_item); l_success = true; } } - } while (l_success); + } while (l_success); } /** @@ -357,22 +359,23 @@ static void s_treshold_emissions_proc(dap_ledger_t * a_ledger) * @param a_ledger */ static void s_treshold_txs_proc( dap_ledger_t *a_ledger) -{ +{ bool l_success; do { l_success = false; dap_chain_ledger_tx_item_t *l_tx_item, *l_tx_tmp; HASH_ITER(hh, PVT(a_ledger)->treshold_txs, l_tx_item, l_tx_tmp) { int l_res = dap_chain_ledger_tx_add(a_ledger, l_tx_item->tx); - if (!l_res) { + if (l_res == 1) { + pthread_rwlock_wrlock(&PVT(a_ledger)->treshold_txs_rwlock); HASH_DEL(PVT(a_ledger)->treshold_txs, l_tx_item); + pthread_rwlock_unlock(&PVT(a_ledger)->treshold_txs_rwlock); DAP_DELETE(l_tx_item->tx); DAP_DELETE(l_tx_item); l_success = true; } } } while (l_success); - } @@ -855,7 +858,7 @@ int dap_chain_ledger_tx_cache_check(dap_ledger_t *a_ledger, dap_chain_datum_tx_t && 4. tx1.dap_chain_datum_tx_out.addr.data.key == tx2.dap_chain_datum_tx_sig.pkey for unconditional output \\ - 5a. tx1.dap_chain_datum_tx_sig.pkey == tx1.dap_chain_datum_tx_sig.pkey for conditional owner + 5a. tx1.dap_chain_datum_tx_sig.pkey == tx2.dap_chain_datum_tx_sig.pkey for conditional owner \\ 5b. tx1.dap_chain_datum_tx_out.condition == verify_svc_type(tx2) for conditional output && @@ -1302,7 +1305,7 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx) pthread_rwlock_wrlock(&l_ledger_priv->treshold_txs_rwlock); HASH_ADD(hh, l_ledger_priv->treshold_txs, tx_hash_fast, sizeof(dap_chain_hash_fast_t), l_item_tmp); pthread_rwlock_unlock(&l_ledger_priv->treshold_txs_rwlock); - log_it (L_DEBUG, "dap_chain_ledger_tx_add() tx %s added to threshold", l_tx_hash_str); + log_it (L_DEBUG, "Tx %s added to threshold", l_tx_hash_str); // Add it to cache dap_chain_datum_tx_t *l_tx_cache = DAP_NEW_Z_SIZE(dap_chain_datum_tx_t, l_tx_size); memcpy(l_tx_cache, a_tx, l_tx_size); @@ -1536,6 +1539,7 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx) pthread_rwlock_wrlock(&l_ledger_priv->ledger_rwlock); HASH_ADD(hh, l_ledger_priv->ledger_items, tx_hash_fast, sizeof(dap_chain_hash_fast_t), l_item_tmp); // tx_hash_fast: name of key field pthread_rwlock_unlock(&l_ledger_priv->ledger_rwlock); + // Add it to cache uint8_t *l_tx_cache = DAP_NEW_Z_SIZE(uint8_t, l_tx_size + sizeof(l_item_tmp->cache_data)); memcpy(l_tx_cache, &l_item_tmp->cache_data, sizeof(l_item_tmp->cache_data)); @@ -1546,6 +1550,7 @@ int dap_chain_ledger_tx_add(dap_ledger_t *a_ledger, dap_chain_datum_tx_t *a_tx) DAP_DELETE(l_tx_cache); } DAP_DELETE(l_gdb_group); + s_treshold_txs_proc(a_ledger); ret = 1; } diff --git a/modules/chain/include/dap_chain_ledger.h b/modules/chain/include/dap_chain_ledger.h index c29478efed1f1803fd327f4f6df631e166aca381..252ea1547883f8b690df2645d8a7d79ad676bdfe 100644 --- a/modules/chain/include/dap_chain_ledger.h +++ b/modules/chain/include/dap_chain_ledger.h @@ -64,7 +64,6 @@ typedef bool (* dap_chain_ledger_verificator_callback_t)(dap_chain_tx_out_cond_t dap_ledger_t* dap_chain_ledger_create(uint16_t a_check_flags, char *a_net_name); - // Remove dap_ledger_t structure void dap_chain_ledger_handle_free(dap_ledger_t *a_ledger); diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index fb32aab1c4f8d1f543bd85f782f28525d5b15fe1..6ce9e8078b6add7a1d67772b5f0b2afffc700f75 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -103,7 +103,6 @@ void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg) { a_ch->internal = DAP_NEW_Z(dap_stream_ch_chain_t); dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); - pthread_mutex_init(&l_ch_chain->mutex, NULL); l_ch_chain->ch = a_ch; } @@ -120,7 +119,6 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg) dap_db_log_list_delete(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs); //dap_list_free_full(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs, (dap_callback_destroyed_t) free); DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs = NULL; } - pthread_mutex_destroy(&DAP_STREAM_CH_CHAIN(a_ch)->mutex); } @@ -222,11 +220,12 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) } dap_chain_hash_fast_t l_atom_hash = {}; - dap_chain_atom_ptr_t l_atom_copy = l_ch_chain->pkt_data; - uint64_t l_atom_copy_size = l_ch_chain->pkt_data_size; - l_ch_chain->pkt_data = NULL; - l_ch_chain->pkt_data_size = 0; - if( l_atom_copy && l_atom_copy_size){ + dap_list_t *l_pkt_copy_list = l_ch_chain->pkt_copy_list; + if (l_pkt_copy_list) { + l_ch_chain->pkt_copy_list = l_ch_chain->pkt_copy_list->next; + dap_chain_pkt_copy_t *l_pkt_copy = (dap_chain_pkt_copy_t *)l_pkt_copy_list->data; + dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_copy->pkt_data; + uint64_t l_atom_copy_size = l_pkt_copy->pkt_data_size; dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash); dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain); size_t l_atom_size =0; @@ -280,8 +279,10 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) DAP_DELETE(l_atom_copy); } l_chain->callback_atom_iter_delete(l_atom_iter); + DAP_DELETE(l_pkt_copy); + DAP_DELETE(l_pkt_copy_list); }else - log_it(L_WARNING, "In proc thread got stream ch packet with pkt_size: %zd and pkt_data: %p", l_atom_copy_size, l_atom_copy); + log_it(L_WARNING, "In proc thread got CHAINS stream ch packet with zero data"); dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); return true; } @@ -291,93 +292,101 @@ bool s_gdb_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) UNUSED(a_thread); dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); - dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); - size_t l_data_obj_count = 0; - // deserialize data & Parse data from dap_db_log_pack() - dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_ch_chain->pkt_data,l_ch_chain->pkt_data_size, &l_data_obj_count); - //log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count ); - - for(size_t i = 0; i < l_data_obj_count; i++) { - // timestamp for exist obj - time_t l_timestamp_cur = 0; - // obj to add - dap_store_obj_t* l_obj = l_store_obj + i; - // read item from base; - size_t l_count_read = 0; - dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group, - l_obj->key, &l_count_read); - // get timestamp for the exist entry - if(l_read_obj) - l_timestamp_cur = l_read_obj->timestamp; - // get timestamp for the deleted entry - else - { - l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key); - } + dap_list_t *l_pkt_copy_list = l_ch_chain->pkt_copy_list; + if (l_pkt_copy_list) { + l_ch_chain->pkt_copy_list = l_ch_chain->pkt_copy_list->next; + dap_chain_pkt_copy_t *l_pkt_copy = (dap_chain_pkt_copy_t *)l_pkt_copy_list->data; + size_t l_data_obj_count = 0; + // deserialize data & Parse data from dap_db_log_pack() + dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_pkt_copy->pkt_data, l_pkt_copy->pkt_data_size, &l_data_obj_count); + //log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count ); + + for(size_t i = 0; i < l_data_obj_count; i++) { + // timestamp for exist obj + time_t l_timestamp_cur = 0; + // obj to add + dap_store_obj_t* l_obj = l_store_obj + i; + // read item from base; + size_t l_count_read = 0; + dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group, + l_obj->key, &l_count_read); + // get timestamp for the exist entry + if(l_read_obj) + l_timestamp_cur = l_read_obj->timestamp; + // get timestamp for the deleted entry + else + { + l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key); + } - //check whether to apply the received data into the database - bool l_apply = true; - if(l_obj->timestamp < l_timestamp_cur) - l_apply = false; - else if(l_obj->type == 'd') { - // already deleted - if(!l_read_obj) - l_apply = false; - } - else if(l_obj->type == 'a') { - bool l_is_the_same_present = false; - if(l_read_obj && - l_read_obj->value_len == l_obj->value_len && - !memcmp(l_read_obj->value, l_obj->value, l_obj->value_len)) - l_is_the_same_present = true; - // this data already present in global_db and not obsolete (out of date) - if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp)) + //check whether to apply the received data into the database + bool l_apply = true; + if(l_obj->timestamp < l_timestamp_cur) l_apply = false; - } - if(l_read_obj) - dap_store_obj_free(l_read_obj, l_count_read); + else if(l_obj->type == 'd') { + // already deleted + if(!l_read_obj) + l_apply = false; + } + else if(l_obj->type == 'a') { + bool l_is_the_same_present = false; + if(l_read_obj && + l_read_obj->value_len == l_obj->value_len && + !memcmp(l_read_obj->value, l_obj->value, l_obj->value_len)) + l_is_the_same_present = true; + // this data already present in global_db and not obsolete (out of date) + if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp)) + l_apply = false; + } + if(l_read_obj) + dap_store_obj_free(l_read_obj, l_count_read); - if(!l_apply) { - // If request was from defined node_addr we update its state - if(l_ch_chain->request.node_addr.uint64) { - dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id); + if(!l_apply) { + // If request was from defined node_addr we update its state + if(l_ch_chain->request.node_addr.uint64) { + dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id); + } + continue; } - continue; - } - char l_ts_str[50]; - dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), l_store_obj[i].timestamp); - /*log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\"" - " timestamp=\"%s\" value_len=%u ", - (char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group, - l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);*/ - // apply received transaction - dap_chain_t *l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); - if(l_chain) { - if(l_chain->callback_add_datums_with_group){ - void * restrict l_store_obj_value = l_store_obj->value; - l_chain->callback_add_datums_with_group(l_chain, - (dap_chain_datum_t** restrict) l_store_obj_value, 1, - l_store_obj[i].group); + char l_ts_str[50]; + dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), l_store_obj[i].timestamp); + /*log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\"" + " timestamp=\"%s\" value_len=%u ", + (char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group, + l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);*/ + // apply received transaction + dap_chain_t *l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); + if(l_chain) { + if(l_chain->callback_add_datums_with_group){ + void * restrict l_store_obj_value = l_store_obj->value; + l_chain->callback_add_datums_with_group(l_chain, + (dap_chain_datum_t** restrict) l_store_obj_value, 1, + l_store_obj[i].group); + } } - } - // save data to global_db - if(!dap_chain_global_db_obj_save(l_obj, 1)) { - dap_stream_ch_chain_pkt_write_error(l_ch, l_ch_chain->request_net_id, - l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, - "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED"); - dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); - } else { - // If request was from defined node_addr we update its state - if(l_ch_chain->request.node_addr.uint64) { - dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id); + // save data to global_db + if(!dap_chain_global_db_obj_save(l_obj, 1)) { + dap_stream_ch_chain_pkt_write_error(l_ch, l_ch_chain->request_net_id, + l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, + "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED"); + dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); + } else { + // If request was from defined node_addr we update its state + if(l_ch_chain->request.node_addr.uint64) { + dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id); + } + //log_it(L_DEBUG, "Added new GLOBAL_DB history pack"); } - //log_it(L_DEBUG, "Added new GLOBAL_DB history pack"); } + if(l_store_obj) + dap_store_obj_free(l_store_obj, l_data_obj_count); + DAP_DELETE(l_pkt_copy); + DAP_DELETE(l_pkt_copy_list); + } else { + log_it(L_WARNING, "In proc thread got GDB stream ch packet with zero data"); } - if(l_store_obj) - dap_store_obj_free(l_store_obj, l_data_obj_count); dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); return true; } @@ -519,9 +528,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); - l_ch_chain->pkt_data = DAP_NEW_SIZE(byte_t, l_chain_pkt_data_size); - memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); - l_ch_chain->pkt_data_size = l_chain_pkt_data_size; + dap_chain_pkt_copy_t *l_pkt_copy = DAP_NEW_Z(dap_chain_pkt_copy_t); + l_pkt_copy->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size); + memcpy(l_pkt_copy->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); + l_pkt_copy->pkt_data_size = l_chain_pkt_data_size; + l_ch_chain->pkt_copy_list = dap_list_append(l_ch_chain->pkt_copy_list, l_pkt_copy); dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_chain_pkt_callback, a_ch); } else { @@ -548,9 +559,11 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); - l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size); - memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); - l_ch_chain->pkt_data_size = l_chain_pkt_data_size; + dap_chain_pkt_copy_t *l_pkt_copy = DAP_NEW_Z(dap_chain_pkt_copy_t); + l_pkt_copy->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size); + memcpy(l_pkt_copy->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); + l_pkt_copy->pkt_data_size = l_chain_pkt_data_size; + l_ch_chain->pkt_copy_list = dap_list_append(l_ch_chain->pkt_copy_list, l_pkt_copy); dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); dap_proc_queue_add_callback(a_ch->stream_worker->worker, s_gdb_pkt_callback, a_ch); } else { diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index 931d2b9714da6e70195299854d5fffbf8bef4712..b73c509b3ee9c71e0ec805edb8a07d5182b2fc30 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -45,8 +45,12 @@ typedef struct dap_chain_atom_item{ UT_hash_handle hh; } dap_chain_atom_item_t; +typedef struct dap_chain_pkt_copy { + uint64_t pkt_data_size; + byte_t *pkt_data; +} dap_chain_pkt_copy_t; + typedef struct dap_stream_ch_chain { - pthread_mutex_t mutex; dap_stream_ch_t * ch; dap_db_log_list_t *request_global_db_trs; // list of global db records @@ -54,8 +58,7 @@ typedef struct dap_stream_ch_chain { dap_stream_ch_chain_state_t state; dap_chain_atom_iter_t * request_atom_iter; - byte_t *pkt_data; - uint64_t pkt_data_size; + dap_list_t *pkt_copy_list; uint64_t stats_request_atoms_processed; uint64_t stats_request_gdb_processed; dap_stream_ch_chain_sync_request_t request; diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 9370189c5073d448654ab172e959763d17da0398..0b4fe001d2ac5a508dacee09a03185d3d44d9520 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -704,14 +704,14 @@ static void *s_net_check_thread ( void *a_net ) //dap_chain_global_db_set_callback_for_update_base(s_net_proc_thread_callback_update_db); while(1){ - if ( !(p_net->flags & F_DAP_CHAIN_NET_SHUTDOWN) ) { + if (p_net->flags & F_DAP_CHAIN_NET_SHUTDOWN) { return NULL; } - // check or start sync - s_net_states_proc( l_net ); if (p_net->flags & F_DAP_CHAIN_NET_GO_SYNC) { + // check or start sync s_net_states_proc( l_net ); + continue; } struct timespec l_to; diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index 4041396e1503f092f573f3cdd1c8bd821902e32a..d27b45c92169b0dea28c34687d30e90fbec4d8f0 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -1323,13 +1323,16 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) // for client only case VPN_PACKET_OP_CODE_VPN_RECV:{ a_ch->stream->esocket->last_ping_request = time(NULL); // not ping, but better ;-) + //ch_sf_tun_client_send(CH_VPN(a_ch), l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size); + dap_events_socket_t *l_es = dap_chain_net_vpn_client_tun_get_esock(); // Find tun socket for current worker - ch_sf_tun_socket_t * l_tun = s_tun_sockets[a_ch->stream_worker->worker->id]; + ch_sf_tun_socket_t *l_tun = l_es ? l_es->_inheritor : NULL; + //ch_sf_tun_socket_t * l_tun = s_tun_sockets[a_ch->stream_worker->worker->id]; assert(l_tun); s_stream_session_esocket_send(l_srv_session, l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size); } break; - // for servier only + // for server only case VPN_PACKET_OP_CODE_VPN_SEND: { ch_sf_tun_socket_t * l_tun = s_tun_sockets[a_ch->stream_worker->worker->id]; assert(l_tun); diff --git a/modules/service/vpn/dap_chain_net_vpn_client_tun.c b/modules/service/vpn/dap_chain_net_vpn_client_tun.c index ca890bcfc029c4cf4b47e4cc8844dac5dec5096f..1d1dc7e4efbf7e612f8f33fc131e5bf9ab63fa95 100644 --- a/modules/service/vpn/dap_chain_net_vpn_client_tun.c +++ b/modules/service/vpn/dap_chain_net_vpn_client_tun.c @@ -253,6 +253,32 @@ static void m_client_tun_write(dap_events_socket_t * a_es, void * arg) // log_it(L_WARNING, __PRETTY_FUNCTION__); } +void m_client_tun_new(dap_events_socket_t * a_es, void * arg) +{ + (void) arg; + ch_sf_tun_socket_t * l_tun_socket = DAP_NEW_Z(ch_sf_tun_socket_t); + if ( l_tun_socket ){ + l_tun_socket->worker = a_es->worker; + l_tun_socket->worker_id = l_tun_socket->worker->id; + l_tun_socket->es = a_es; + //s_tun_sockets_queue_msg[a_es->worker->id] = dap_events_socket_create_type_queue_ptr_unsafe(a_es->worker, s_tun_recv_msg_callback ); + //s_tun_sockets[a_es->worker->id] = l_tun_socket; + + a_es->_inheritor = l_tun_socket; + //s_tun_attach_queue( a_es->fd ); + { + struct ifreq ifr; + memset(&ifr, 0, sizeof(ifr)); + ifr.ifr_flags = IFF_ATTACH_QUEUE; + ioctl(a_es->fd, TUNSETQUEUE, (void *)&ifr); + } + log_it(L_NOTICE,"New TUN event socket initialized for worker %u" , l_tun_socket->worker_id); + + }else{ + log_it(L_ERROR, "Can't allocate memory for tun socket"); + } +} + static void m_client_tun_read(dap_events_socket_t * a_es, void * arg) { const static int tun_MTU = 100000; /// TODO Replace with detection of MTU size @@ -304,6 +330,10 @@ static void m_client_tun_error(dap_events_socket_t * a_es, int a_arg) log_it(L_WARNING, " TUN client problems: code %d", a_arg); } +dap_events_socket_t* dap_chain_net_vpn_client_tun_get_esock(void) { + return s_tun_events_socket; +} + int dap_chain_net_vpn_client_tun_create(const char *a_ipv4_addr_str, const char *a_ipv4_gw_str) { // char dev[IFNAMSIZ] = { 0 }; @@ -412,6 +442,7 @@ int dap_chain_net_vpn_client_tun_create(const char *a_ipv4_addr_str, const char pthread_mutex_init(&s_clients_mutex, NULL); static dap_events_socket_callbacks_t l_s_callbacks = { + .new_callback = m_client_tun_new,//m_es_tun_new; .read_callback = m_client_tun_read,// for server .write_callback = m_client_tun_write,// for client .error_callback = m_client_tun_error, @@ -421,6 +452,7 @@ int dap_chain_net_vpn_client_tun_create(const char *a_ipv4_addr_str, const char s_tun_events_socket = dap_events_socket_wrap_no_add(dap_events_get_default(), s_fd_tun, &l_s_callbacks); s_tun_events_socket->type = DESCRIPTOR_TYPE_FILE; dap_worker_add_events_socket_auto(s_tun_events_socket); + //dap_events_socket_assign_on_worker_mt(l_es, a_worker); s_tun_events_socket->_inheritor = NULL; //return 0; diff --git a/modules/service/vpn/include/dap_chain_net_vpn_client_tun.h b/modules/service/vpn/include/dap_chain_net_vpn_client_tun.h index 054b7642ad81f23f99e7787d692349cc0c1d459f..1697d3b6c5f595d24961bfd94c8358991e4c3341 100644 --- a/modules/service/vpn/include/dap_chain_net_vpn_client_tun.h +++ b/modules/service/vpn/include/dap_chain_net_vpn_client_tun.h @@ -26,6 +26,7 @@ #include "dap_chain_net_srv_vpn.h" int dap_chain_net_vpn_client_tun_init(const char *a_ipv4_gw_str); +dap_events_socket_t* dap_chain_net_vpn_client_tun_get_esock(void); int dap_chain_net_vpn_client_tun_create(const char *a_ipv4_addr_str, const char *a_ipv4_gw_str); int dap_chain_net_vpn_client_tun_delete(void); int dap_chain_net_vpn_client_tun_status(void); diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index a8ec207191d02b30f45f6c2cfa872201b1df54e7..fb6f17b94b0e457e4eda14ecfdce6deceb252330 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -287,8 +287,9 @@ void dap_chain_cs_dag_delete(dap_chain_t * a_chain) DAP_DELETE(l_dag->_pvt); } -static int s_dap_chain_add_atom_to_ledger(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger, dap_chain_cs_dag_event_item_t * a_event_item){ +static int s_dap_chain_add_atom_to_ledger(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger, dap_chain_cs_dag_event_item_t * a_event_item) +{ dap_chain_datum_t *l_datum = (dap_chain_datum_t*) dap_chain_cs_dag_event_get_datum(a_event_item->event, a_event_item->event_size); switch (l_datum->header.type_id) { case DAP_CHAIN_DATUM_TOKEN_DECL: { @@ -306,7 +307,7 @@ static int s_dap_chain_add_atom_to_ledger(dap_chain_cs_dag_t * a_dag, dap_ledger // don't save bad transactions to base int l_ret = dap_chain_ledger_tx_load(a_ledger, l_tx); if( l_ret != 1 ) { - return l_ret; + return l_ret; } dap_chain_cs_dag_event_item_t * l_tx_event= DAP_NEW_Z(dap_chain_cs_dag_event_item_t); l_tx_event->ts_added = a_event_item->ts_added;