diff --git a/CMakeLists.txt b/CMakeLists.txt index 6340f5f60b37b140807a9a31054fdefb3630bfbc..f70ece0a053594f5cd18321f8811f4d1f76fc974 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.6-86") +set(CELLFRAME_SDK_NATIVE_VERSION "2.6-87") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/dap-sdk/crypto/src/XKCP/lib/low/KeccakP-1600/OptimizedAsmX86-64/KeccakP-1600-x86-64-gas.s b/dap-sdk/crypto/src/XKCP/lib/low/KeccakP-1600/OptimizedAsmX86-64/KeccakP-1600-x86-64-gas.s index 65aff45cb064fb1773bc06dea717e50cdd70c88d..4c2041924e47fd93a6b43dc8e1d209e6e3fb51ba 100755 --- a/dap-sdk/crypto/src/XKCP/lib/low/KeccakP-1600/OptimizedAsmX86-64/KeccakP-1600-x86-64-gas.s +++ b/dap-sdk/crypto/src/XKCP/lib/low/KeccakP-1600/OptimizedAsmX86-64/KeccakP-1600-x86-64-gas.s @@ -19,8 +19,7 @@ # WARNING: State must be 256 bit (32 bytes) aligned, better is 64-byte aligned (cache line) - .text - +.text # conditional assembly settings .equ UseSIMD, 0 .equ InlinePerm, 1 diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index 30517ecbac521871adf012e364e2eba2bbc422ce..520941545fbb9dc3a12a6bc0b9969093c88c86a2 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -84,7 +84,7 @@ static void s_http_connected(dap_events_socket_t * a_esocket); // Connected call static void s_client_http_delete(dap_client_http_pvt_t * a_http_pvt); static void s_http_read(dap_events_socket_t * a_es, void * arg); static void s_http_error(dap_events_socket_t * a_es, int a_arg); -static void s_timer_timeout_check(void * a_arg); +static bool s_timer_timeout_check(void * a_arg); uint64_t s_client_timeout_ms=10000; @@ -106,8 +106,13 @@ void dap_client_http_set_connect_timeout_ms(uint64_t a_timeout_ms) s_client_timeout_ms = a_timeout_ms; } - -static void s_timer_timeout_check(void * a_arg) +/** + * @brief s_timer_timeout_check + * @details Returns 'false' to prevent looping the checks + * @param a_arg + * @return + */ +static bool s_timer_timeout_check(void * a_arg) { dap_events_socket_t * l_es = (dap_events_socket_t*) a_arg; assert(l_es); @@ -124,6 +129,7 @@ static void s_timer_timeout_check(void * a_arg) l_http_pvt->is_closed_by_timeout = true; l_es->flags |= DAP_SOCK_SIGNAL_CLOSE; } + return false; } /** diff --git a/dap-sdk/net/core/dap_proc_queue.c b/dap-sdk/net/core/dap_proc_queue.c index e04a8cfdd122d309b8a8b133ed42ef11b1485bc6..7f31abf79f0af370491fffc001ded4e6da2928d4 100644 --- a/dap-sdk/net/core/dap_proc_queue.c +++ b/dap-sdk/net/core/dap_proc_queue.c @@ -123,3 +123,5 @@ void dap_proc_queue_add_callback_inter( dap_events_socket_t * a_es_input, dap_pr l_msg->callback_arg = a_callback_arg; dap_events_socket_queue_ptr_send_to_input( a_es_input , l_msg ); } + + diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index a2b2806cec162dad48c704e4ef549ec95f73253e..781151fc23bd3f490a1293f93342f67e75fb8d3c 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -196,10 +196,52 @@ static int s_update_poll_flags(dap_proc_thread_t * a_thread, dap_events_socket_t a_thread->poll[a_esocket->poll_index].revents |= POLLIN; if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE) a_thread->poll[a_esocket->poll_index].revents |= POLLOUT; +#else +#error "Not defined dap_proc_thread.c::s_update_poll_flags() on your platform" #endif return 0; } +/** + * @brief dap_proc_thread_create_queue_ptr + * @details Call this function as others only from safe situation, or, thats better, from a_thread's context + * @param a_thread + * @param a_callback + * @return + */ +dap_events_socket_t * dap_proc_thread_create_queue_ptr(dap_proc_thread_t * a_thread, dap_events_socket_callback_queue_ptr_t a_callback) +{ + dap_events_socket_t * l_es = dap_events_socket_create_type_queue_ptr_unsafe(NULL,a_callback); + if(l_es == NULL) + return NULL; + l_es->proc_thread = a_thread; +#ifdef DAP_EVENTS_CAPS_EPOLL + l_es->ev.events = l_es->ev_base_flags ; + l_es->ev.data.ptr = l_es; + if( epoll_ctl(a_thread->epoll_ctl, EPOLL_CTL_ADD, l_es->socket, &l_es->ev) != 0 ){ +#ifdef DAP_OS_WINDOWS + errno = WSAGetLastError(); +#endif + log_it(L_CRITICAL, "Can't add queue input on epoll ctl, err: %d", errno); + return NULL; + } +#elif defined(DAP_EVENTS_CAPS_POLL) + l_es->poll_index = a_thread->poll_count; + a_thread->poll[a_thread->poll_count].fd = l_es->fd; + a_thread->poll[a_thread->poll_count].events = l_es->poll_base_flags; + a_thread->esockets[a_thread->poll_count] = l_es; + a_thread->poll_count++; +#else +#error "Not defined dap_proc_thread_create_queue_ptr() on your platform" +#endif + return l_es; +} + +/** + * @brief s_proc_thread_function + * @param a_arg + * @return + */ static void * s_proc_thread_function(void * a_arg) { @@ -230,6 +272,7 @@ static void * s_proc_thread_function(void * a_arg) assert(l_workers_count); l_thread->queue_assign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)*l_workers_count ); l_thread->queue_io_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)*l_workers_count ); + l_thread->queue_callback_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)*l_workers_count ); assert(l_thread->queue_assign_input); assert(l_thread->queue_io_input); @@ -237,6 +280,7 @@ static void * s_proc_thread_function(void * a_arg) dap_worker_t * l_worker =dap_events_worker_get(n); l_thread->queue_assign_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_new ); l_thread->queue_io_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_io ); + l_thread->queue_callback_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_callback ); } #ifdef DAP_EVENTS_CAPS_EPOLL struct epoll_event l_epoll_events[ DAP_EVENTS_SOCKET_MAX]= { { 0 } }; @@ -267,6 +311,7 @@ static void * s_proc_thread_function(void * a_arg) } for (size_t n = 0; n< dap_events_worker_get_count(); n++){ + // Queue asssign l_thread->queue_assign_input[n]->ev.events = l_thread->queue_assign_input[n]->ev_base_flags ; l_thread->queue_assign_input[n]->ev.data.ptr = l_thread->queue_assign_input[n]; if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_assign_input[n]->socket, &l_thread->queue_assign_input[n]->ev) != 0 ){ @@ -277,11 +322,23 @@ static void * s_proc_thread_function(void * a_arg) return NULL; } + // Queue IO l_thread->queue_io_input[n]->ev.events = l_thread->queue_io_input[n]->ev_base_flags ; l_thread->queue_io_input[n]->ev.data.ptr = l_thread->queue_io_input[n]; if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_io_input[n]->fd , &l_thread->queue_io_input[n]->ev) != 0 ){ #ifdef DAP_OS_WINDOWS errno = WSAGetLastError(); +#endif + log_it(L_CRITICAL, "Can't add proc io input on epoll ctl, err: %d", errno); + return NULL; + } + + // Queue callback + l_thread->queue_callback_input[n]->ev.events = l_thread->queue_callback_input[n]->ev_base_flags ; + l_thread->queue_callback_input[n]->ev.data.ptr = l_thread->queue_callback_input[n]; + if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->queue_callback_input[n]->fd , &l_thread->queue_callback_input[n]->ev) != 0 ){ +#ifdef DAP_OS_WINDOWS + errno = WSAGetLastError(); #endif log_it(L_CRITICAL, "Can't add proc io input on epoll ctl, err: %d", errno); return NULL; @@ -295,32 +352,43 @@ static void * s_proc_thread_function(void * a_arg) l_thread->esockets = DAP_NEW_Z_SIZE(dap_events_socket_t*,l_thread->poll_count_max *sizeof (*l_thread->esockets)); // Add proc queue - l_thread->poll[0].fd = l_thread->proc_queue->esocket->fd; - l_thread->poll[0].events = l_thread->proc_queue->esocket->poll_base_flags; - l_thread->esockets[0] = l_thread->proc_queue->esocket; + l_thread->poll[l_thread->poll_count].fd = l_thread->proc_queue->esocket->fd; + l_thread->poll[l_thread->poll_count].events = l_thread->proc_queue->esocket->poll_base_flags; + l_thread->esockets[l_thread->poll_count] = l_thread->proc_queue->esocket; l_thread->poll_count++; // Add proc event - l_thread->poll[1].fd = l_thread->proc_event->fd; - l_thread->poll[1].events = l_thread->proc_event->poll_base_flags; - l_thread->esockets[1] = l_thread->proc_event; + l_thread->poll[l_thread->poll_count].fd = l_thread->proc_event->fd; + l_thread->poll[l_thread->poll_count].events = l_thread->proc_event->poll_base_flags; + l_thread->esockets[l_thread->poll_count] = l_thread->proc_event; l_thread->poll_count++; for (size_t n = 0; n< dap_events_worker_get_count(); n++){ dap_events_socket_t * l_queue_assign_input = l_thread->queue_assign_input[n]; dap_events_socket_t * l_queue_io_input = l_thread->queue_io_input[n]; + dap_events_socket_t * l_queue_callback_input = l_thread->queue_callback_input[n]; if (l_queue_assign_input&&l_queue_io_input){ + + // Queue assign input l_queue_assign_input->poll_index = l_thread->poll_count; l_thread->poll[l_thread->poll_count].fd = l_queue_assign_input->fd; l_thread->poll[l_thread->poll_count].events = l_queue_assign_input->poll_base_flags; l_thread->esockets[l_thread->poll_count] = l_queue_assign_input; l_thread->poll_count++; + // Queue io input l_queue_io_input->poll_index = l_thread->poll_count; l_thread->poll[l_thread->poll_count].fd = l_queue_io_input->fd; l_thread->poll[l_thread->poll_count].events = l_queue_io_input->poll_base_flags; l_thread->esockets[l_thread->poll_count] = l_queue_io_input; l_thread->poll_count++; + + // Queue callback input + l_queue_callback_input->poll_index = l_thread->poll_count; + l_thread->poll[l_thread->poll_count].fd = l_queue_callback_input->fd; + l_thread->poll[l_thread->poll_count].events = l_queue_callback_input->poll_base_flags; + l_thread->esockets[l_thread->poll_count] = l_queue_callback_input; + l_thread->poll_count++; } } @@ -650,3 +718,21 @@ int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worke return 0; } +/** + * @brief dap_proc_thread_worker_exec_callback + * @param a_thread + * @param a_worker_id + * @param a_callback + * @param a_arg + */ +void dap_proc_thread_worker_exec_callback(dap_proc_thread_t * a_thread, size_t a_worker_id, dap_worker_callback_t a_callback, void * a_arg) +{ + dap_worker_msg_callback_t * l_msg = DAP_NEW_Z(dap_worker_msg_callback_t); + l_msg->callback = a_callback; + l_msg->arg = a_arg; + dap_events_socket_queue_ptr_send_to_input(a_thread->queue_callback_input[a_worker_id],l_msg ); + + a_thread->queue_callback_input[a_worker_id]->flags |= DAP_SOCK_READY_TO_WRITE; + s_update_poll_flags(a_thread, a_thread->queue_callback_input[a_worker_id]); + +} diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 487a9f49886be91205e449ec43afe345d3151ad1..2f157b2dd26b94d1e90a9aaee6c5dc4f3de972a1 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -781,7 +781,7 @@ static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg) HASH_FIND(hh_worker, a_es->worker->esockets, &l_msg->esocket , sizeof (void*), l_msg_es ); pthread_rwlock_unlock(&a_es->worker->esocket_rwlock); if ( l_msg_es == NULL){ - log_it(L_INFO, "We got i/o message for client thats now not in list. Lost %u data", l_msg->data_size); + log_it(L_INFO, "We got i/o message for esocket %p thats now not in list. Lost %u data", l_msg->esocket, l_msg->data_size); DAP_DELETE(l_msg); return; } diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 06203e8a02c57381f1d013e0e5a5475c553fd2c8..ac1d1d2fe6e88fe542f8ce8cc6c9b63d15413b45 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -91,6 +91,7 @@ typedef struct dap_events dap_events_t; typedef struct dap_events_socket dap_events_socket_t; typedef struct dap_worker dap_worker_t; +typedef struct dap_proc_thread dap_proc_thread_t ; typedef struct dap_server dap_server_t; typedef void (*dap_events_socket_callback_t) (dap_events_socket_t *,void * ); // Callback for specific client operations @@ -208,6 +209,7 @@ typedef struct dap_events_socket { // Links to related objects dap_events_t *events; dap_worker_t *worker; + dap_proc_thread_t * proc_thread; // If assigned on dap_proc_thread_t object dap_server_t *server; // If this socket assigned with server // Platform specific things diff --git a/dap-sdk/net/core/include/dap_proc_thread.h b/dap-sdk/net/core/include/dap_proc_thread.h index 9be3c4729240cef24228c4e1e9b18f9b45325b67..2e76df9e7b808bc7d0ca3694ea4e68007460d720 100644 --- a/dap-sdk/net/core/include/dap_proc_thread.h +++ b/dap-sdk/net/core/include/dap_proc_thread.h @@ -26,6 +26,7 @@ #include <pthread.h> #include "dap_common.h" #include "dap_proc_queue.h" +#include "dap_worker.h" typedef struct dap_proc_thread{ uint32_t cpu_id; @@ -35,6 +36,7 @@ typedef struct dap_proc_thread{ dap_events_socket_t ** queue_assign_input; // Inputs for assign queues dap_events_socket_t ** queue_io_input; // Inputs for assign queues + dap_events_socket_t ** queue_callback_input; // Inputs for worker context callback queues atomic_uint proc_queue_size; pthread_cond_t started_cond; @@ -58,8 +60,12 @@ typedef struct dap_proc_thread{ int dap_proc_thread_init(uint32_t a_threads_count); dap_proc_thread_t * dap_proc_thread_get(uint32_t a_thread_number); dap_proc_thread_t * dap_proc_thread_get_auto(); +dap_events_socket_t * dap_proc_thread_create_queue_ptr(dap_proc_thread_t * a_thread, dap_events_socket_callback_queue_ptr_t a_callback); + bool dap_proc_thread_assign_on_worker_inter(dap_proc_thread_t * a_thread, dap_worker_t * a_worker, dap_events_socket_t *a_esocket ); + int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_esocket, const void * a_data, size_t a_data_size); int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_esocket, const char * a_format,...); +void dap_proc_thread_worker_exec_callback(dap_proc_thread_t * a_thread, size_t a_worker_id, dap_worker_callback_t a_callback, void * a_arg); diff --git a/dap-sdk/net/server/json_rpc/src/dap_json_rpc_response.c b/dap-sdk/net/server/json_rpc/src/dap_json_rpc_response.c index ee3e4b3e16766825ed06ea5188967fb722a6e34f..6c2782a6c1c92342f101db941cf848a57a7191fd 100644 --- a/dap-sdk/net/server/json_rpc/src/dap_json_rpc_response.c +++ b/dap-sdk/net/server/json_rpc/src/dap_json_rpc_response.c @@ -32,18 +32,19 @@ void dap_json_rpc_response_send(dap_json_rpc_response_t *a_response, dap_http_si char *str_response = NULL; if (a_response->error == NULL){ switch (a_response->type_result) { - case TYPE_RESPONSE_STRING: - l_JSON->obj_result = json_object_new_string(a_response->result_string); - break; - case TYPE_RESPONSE_DOUBLE: - l_JSON->obj_result = json_object_new_double(a_response->result_double); - break; - case TYPE_RESPONSE_BOOLEAN: - l_JSON->obj_result = json_object_new_boolean(a_response->result_boolean); - break; - case TYPE_RESPONSE_INTEGER: - l_JSON->obj_result = json_object_new_int64(a_response->result_int); - break; + case TYPE_RESPONSE_STRING: + l_JSON->obj_result = json_object_new_string(a_response->result_string); + break; + case TYPE_RESPONSE_DOUBLE: + l_JSON->obj_result = json_object_new_double(a_response->result_double); + break; + case TYPE_RESPONSE_BOOLEAN: + l_JSON->obj_result = json_object_new_boolean(a_response->result_boolean); + break; + case TYPE_RESPONSE_INTEGER: + l_JSON->obj_result = json_object_new_int64(a_response->result_int); + break; + default:{} } }else{ l_JSON->struct_error = dap_json_rpc_error_JSON_add_data(a_response->error->code_error, a_response->error->msg); diff --git a/modules/chain/dap_chain.c b/modules/chain/dap_chain.c index e27d542ff1dde825bd107bf55838c3d7284214aa..db1ddc82f5be49b8c085a6ec321474d470552d28 100644 --- a/modules/chain/dap_chain.c +++ b/modules/chain/dap_chain.c @@ -106,7 +106,7 @@ void dap_chain_deinit(void) dap_chain_t* dap_chain_enum(void** a_item) { // if a_item == 0x1 then first item - dap_chain_item_t *l_item_start = (*a_item == 0x1) ? s_chain_items : (dap_chain_item_t*) *a_item; + dap_chain_item_t *l_item_start = ( *a_item == (void*) 0x1) ? s_chain_items : (dap_chain_item_t*) *a_item; dap_chain_item_t *l_item = NULL; dap_chain_item_t *l_item_tmp = NULL; pthread_rwlock_rdlock(&s_chain_items_rwlock); @@ -142,6 +142,7 @@ dap_chain_t * dap_chain_create(dap_ledger_t* a_ledger, const char * a_chain_net_ l_ret->name = strdup (a_chain_name); l_ret->net_name = strdup (a_chain_net_name); l_ret->ledger = a_ledger; + pthread_rwlock_init(&l_ret->atoms_rwlock,NULL); dap_chain_item_t * l_ret_item = DAP_NEW_Z(dap_chain_item_t); l_ret_item->chain = l_ret; @@ -189,6 +190,7 @@ void dap_chain_delete(dap_chain_t * a_chain) DAP_DELETE(a_chain->datum_types); a_chain->autoproc_datum_types_count = 0; DAP_DELETE(a_chain->autoproc_datum_types); + pthread_rwlock_destroy(&a_chain->atoms_rwlock); pthread_rwlock_unlock(&s_chain_items_rwlock); } @@ -423,12 +425,14 @@ bool dap_chain_has_file_store(dap_chain_t * a_chain) int dap_chain_save_all (dap_chain_t * l_chain) { int ret = -1; + pthread_rwlock_rdlock(&l_chain->atoms_rwlock); dap_chain_cell_t * l_item, *l_item_tmp = NULL; HASH_ITER(hh,l_chain->cells,l_item,l_item_tmp){ dap_chain_cell_file_update(l_item); if (ret <0 ) ret++; } + pthread_rwlock_unlock(&l_chain->atoms_rwlock); return ret; } @@ -442,6 +446,8 @@ int dap_chain_load_all (dap_chain_t * l_chain) int l_ret = -2; if(!l_chain) return l_ret; + pthread_rwlock_wrlock(&l_chain->atoms_rwlock); + // create directory if not exist if(!dap_dir_test(DAP_CHAIN_PVT (l_chain)->file_storage_dir)) { dap_mkdir_with_parents(DAP_CHAIN_PVT (l_chain)->file_storage_dir); @@ -466,6 +472,8 @@ int dap_chain_load_all (dap_chain_t * l_chain) } closedir(l_dir); } + pthread_rwlock_unlock(&l_chain->atoms_rwlock); + return l_ret; } @@ -528,6 +536,8 @@ void dap_chain_add_callback_notify(dap_chain_t * a_chain, dap_chain_callback_not bool dap_chain_get_atom_last_hash(dap_chain_t * a_chain, dap_hash_fast_t * a_atom_hash) { bool l_ret = false; + pthread_rwlock_rdlock(&a_chain->atoms_rwlock); + dap_chain_atom_iter_t *l_atom_iter = a_chain->callback_atom_iter_create(a_chain); dap_chain_atom_ptr_t * l_lasts_atom; size_t l_lasts_atom_count=0; @@ -547,5 +557,6 @@ bool dap_chain_get_atom_last_hash(dap_chain_t * a_chain, dap_hash_fast_t * a_ato l_ret = true; } a_chain->callback_atom_iter_delete(l_atom_iter); + pthread_rwlock_unlock(&a_chain->atoms_rwlock); return l_ret; } diff --git a/modules/chain/include/dap_chain.h b/modules/chain/include/dap_chain.h index 63b32a066797caf11e8403c144c2519acd674e1f..d8e660427bbc74553e3b9bd0b4fad500c2c36650 100644 --- a/modules/chain/include/dap_chain.h +++ b/modules/chain/include/dap_chain.h @@ -115,6 +115,9 @@ typedef struct dap_chain{ struct dap_chain * next; struct dap_chain * prev; + // read/write atoms rwlock + pthread_rwlock_t atoms_rwlock; + dap_chain_callback_new_cfg_t callback_created; dap_chain_callback_t callback_delete; diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index faf63876603e24bb59819f7c3ffa915ef1d98ad4..73ab073b84ca6369fc07013b37411f3f4cf3e491 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -68,11 +68,48 @@ #include "dap_chain_net.h" #define LOG_TAG "dap_stream_ch_chain" + +struct sync_request +{ + dap_worker_t * worker; + dap_stream_ch_t * ch; + dap_stream_ch_chain_sync_request_t request; + dap_stream_ch_chain_pkt_hdr_t request_hdr; + dap_list_t *pkt_list; + uint64_t stats_request_elemets_processed; + union{ + struct{ + dap_db_log_list_t *db_log; // db log + dap_list_t *db_iter; + } gdb; + struct{ + dap_chain_atom_iter_t *request_atom_iter; + } chain; + }; +}; + +struct ch_chain_pkt_in +{ + dap_stream_ch_t * ch; + dap_chain_pkt_item_t * pkt; +}; + static void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg); static void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg); static void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg); static void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg); +static bool s_sync_out_chains_proc_callback(dap_proc_thread_t *a_thread, void *a_arg); + +static bool s_sync_out_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_arg); +static void s_sync_out_gdb_first_gdb_worker_callback(dap_worker_t *a_worker, void *a_arg); +static void s_sync_out_gdb_synced_data_worker_callback(dap_worker_t *a_worker, void *a_arg); + +static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg); + +static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg); +static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_thread, void *a_arg); + static bool s_debug_chain_sync=false; /** * @brief dap_stream_ch_chain_init @@ -109,22 +146,13 @@ void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg) } /** - * @brief s_stream_ch_delete - * @param ch - * @param arg + * @brief s_stream_ch_chain_delete + * @param a_ch_chain */ -void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg) +static void s_sync_request_delete(struct sync_request * a_sync_request) { - (void) a_arg; - - if(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs) { - dap_db_log_list_delete(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs); - DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs = NULL; - } - - if (DAP_STREAM_CH_CHAIN(a_ch)->in_gdb_list) { - dap_list_t *l_tmp_item = DAP_STREAM_CH_CHAIN(a_ch)->in_gdb_list; - while(l_tmp_item) { + if (a_sync_request->pkt_list) { + dap_list_t *l_tmp_item = a_sync_request->pkt_list; while(l_tmp_item) { dap_chain_pkt_item_t *l_pkt_copy = (dap_chain_pkt_item_t *)l_tmp_item->data; DAP_DELETE(l_pkt_copy->pkt_data); DAP_DELETE(l_pkt_copy); @@ -132,110 +160,214 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg) l_tmp_item = dap_list_next(l_tmp_item); DAP_DELETE(l_trash_item); } - DAP_STREAM_CH_CHAIN(a_ch)->in_gdb_list = NULL; } - if (DAP_STREAM_CH_CHAIN(a_ch)->db_iter) { - DAP_STREAM_CH_CHAIN(a_ch)->db_iter = dap_list_first( DAP_STREAM_CH_CHAIN(a_ch)->db_iter); - dap_list_free_full( DAP_STREAM_CH_CHAIN(a_ch)->db_iter, free); - DAP_STREAM_CH_CHAIN(a_ch)->db_iter = NULL; + + if (a_sync_request->gdb.db_iter) { + a_sync_request->gdb.db_iter = dap_list_first( a_sync_request->gdb.db_iter); + dap_list_free_full( a_sync_request->gdb.db_iter, free); + a_sync_request->gdb.db_iter = NULL; } + DAP_DELETE(a_sync_request); +} + +/** + * @brief s_stream_ch_delete + * @param ch + * @param arg + */ +static void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg) +{ + (void) a_arg; + (void) a_ch; + //dap_stream_ch_chain_t * l_ch_chain=DAP_STREAM_CH_CHAIN(a_ch); + } +/** + * @brief s_sync_out_chains_worker_callback + * @param a_worker + * @param a_arg + */ +static void s_sync_out_chains_first_worker_callback(dap_worker_t *a_worker, void *a_arg) +{ + UNUSED(a_worker); + struct sync_request * l_sync_request = (struct sync_request *) a_arg; + dap_stream_ch_t *l_ch = l_sync_request->ch; + if( ! dap_stream_ch_check_unsafe( DAP_STREAM_WORKER(a_worker), l_ch) ){ + log_it(L_INFO,"Client disconnected before we sent the reply"); + s_sync_request_delete(l_sync_request); + return; + } + + dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS; + dap_chain_node_addr_t l_node_addr = {}; + dap_chain_net_t *l_net = dap_chain_net_by_id(l_sync_request->request_hdr.net_id); + l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); + + if (s_debug_chain_sync ) + log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN"); + + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN, + l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, + l_ch_chain->request_hdr.cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); + DAP_DELETE(l_sync_request); + +} + +/** + * @brief s_sync_out_chains_last_worker_callback + * @param a_worker + * @param a_arg + */ +static void s_sync_out_chains_last_worker_callback(dap_worker_t *a_worker, void *a_arg) +{ + UNUSED(a_worker); + struct sync_request * l_sync_request = (struct sync_request *) a_arg; + dap_stream_ch_t *l_ch = l_sync_request->ch; + if( !dap_stream_ch_check_unsafe( DAP_STREAM_WORKER(a_worker), l_ch) ){ + log_it(L_INFO,"Client disconnected before we sent the reply"); + s_sync_request_delete(l_sync_request); + return; + } + + dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + // last packet + dap_stream_ch_chain_sync_request_t l_request = {0}; + if (s_debug_chain_sync ) + log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS"); + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, + l_sync_request->request_hdr.net_id, l_sync_request->request_hdr.chain_id, + l_sync_request->request_hdr.cell_id, &l_request, sizeof(l_request)); + if (l_ch_chain->request_atom_iter) + DAP_DEL_Z(l_ch_chain->request_atom_iter); + + l_ch_chain->state = CHAIN_STATE_IDLE; + if (l_ch_chain->callback_notify_packet_out) + l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, + NULL, 0, l_ch_chain->callback_notify_arg); + DAP_DELETE(l_sync_request); +} /** * @brief s_sync_chains_callback * @param a_thread * @param a_arg * @return */ -bool s_sync_out_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) +static bool s_sync_out_chains_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) { - UNUSED(a_thread); - dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + struct sync_request * l_sync_request = (struct sync_request *) a_arg; + + dap_chain_t * l_chain = dap_chain_find_by_id(l_sync_request->request_hdr.net_id, l_sync_request->request_hdr.chain_id); + assert(l_chain); + + pthread_rwlock_rdlock(&l_chain->atoms_rwlock); - dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id); - l_ch_chain->request_atom_iter = l_chain->callback_atom_iter_create(l_chain); + l_sync_request->chain.request_atom_iter = l_chain->callback_atom_iter_create(l_chain); size_t l_first_size = 0; - dap_chain_atom_ptr_t *l_first = l_chain->callback_atom_iter_get_first(l_ch_chain->request_atom_iter, &l_first_size); + dap_chain_atom_ptr_t *l_first = l_chain->callback_atom_iter_get_first(l_sync_request->chain.request_atom_iter, &l_first_size); if (l_first && l_first_size) { // first packet - if (!dap_hash_fast_is_blank(&l_ch_chain->request.hash_from)) { - l_first = l_chain->callback_atom_find_by_hash(l_ch_chain->request_atom_iter, - &l_ch_chain->request.hash_from, &l_first_size); + if (!dap_hash_fast_is_blank(&l_sync_request->request.hash_from)) { + l_first = l_chain->callback_atom_find_by_hash(l_sync_request->chain.request_atom_iter, + &l_sync_request->request.hash_from, &l_first_size); } - l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS; - dap_chain_node_addr_t l_node_addr = {}; - dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_hdr.net_id); - l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); - - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN, - l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, - l_ch_chain->request_hdr.cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); - } - else { - // last packet - dap_stream_ch_chain_sync_request_t l_request = {}; - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, - l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, - l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); - DAP_DEL_Z(l_ch_chain->request_atom_iter); - l_ch_chain->state = CHAIN_STATE_IDLE; - if (l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, - NULL, 0, l_ch_chain->callback_notify_arg); + pthread_rwlock_unlock(&l_chain->atoms_rwlock); + dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id,s_sync_out_chains_first_worker_callback, l_sync_request ); + } else { + pthread_rwlock_unlock(&l_chain->atoms_rwlock); + dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id,s_sync_out_chains_last_worker_callback, l_sync_request ); } - dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); - dap_proc_thread_assign_on_worker_inter(a_thread, l_ch->stream_worker->worker, l_ch->stream->esocket ); return true; } +/** + * @brief s_sync_out_gdb_first_gdb_worker_callback + * @param a_worker + * @param a_arg + */ +static void s_sync_out_gdb_first_gdb_worker_callback(dap_worker_t *a_worker, void *a_arg) +{ + struct sync_request *l_sync_request = (struct sync_request *) a_arg; + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN( l_sync_request->ch ); + dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_hdr.net_id); + + // Add it to outgoing list + l_ch_chain->request_global_db_trs = l_sync_request->gdb.db_log; + l_ch_chain->request_db_iter = NULL; + l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB; + dap_chain_node_addr_t l_node_addr = { 0 }; + l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); + if (s_debug_chain_sync ) + log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB"); + dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch , DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, + l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, + l_ch_chain->request_hdr.cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); + if(l_ch_chain->callback_notify_packet_out) + l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, + NULL, 0, l_ch_chain->callback_notify_arg); + + if( a_worker){ // We send NULL to prevent delete + DAP_DELETE(l_sync_request); + l_ch_chain->is_on_request = false; + } +} + +/** + * @brief s_sync_out_gdb_synced_data_worker_callback + * @param a_worker + * @param a_arg + */ +static void s_sync_out_gdb_synced_data_worker_callback(dap_worker_t *a_worker, void *a_arg) +{ + UNUSED(a_worker); + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN( ((struct sync_request *) a_arg)->ch ); + s_sync_out_gdb_first_gdb_worker_callback(NULL,a_arg); // NULL to say callback not to delete request + + dap_stream_ch_chain_sync_request_t l_request = {0}; + if (s_debug_chain_sync ) + log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB"); + dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, + l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); + l_ch_chain->state = CHAIN_STATE_IDLE; + if(l_ch_chain->callback_notify_packet_out) + l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + NULL, 0, l_ch_chain->callback_notify_arg); + l_ch_chain->is_on_request = false; + DAP_DELETE(a_arg); + +} + /** * @brief s_sync_out_gdb_callback * @param a_thread * @param a_arg * @return */ -bool s_sync_out_gdb_callback(dap_proc_thread_t *a_thread, void *a_arg) +static bool s_sync_out_gdb_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) { - dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + struct sync_request *l_sync_request = (struct sync_request *) a_arg; // Get log diff uint64_t l_local_last_id = dap_db_log_get_last_id(); if (s_debug_chain_sync) - log_it(L_DEBUG, "Requested transactions %llu:%llu", l_ch_chain->request.id_start, l_local_last_id); - uint64_t l_start_item = l_ch_chain->request.id_start; + log_it(L_DEBUG, "Sync out gdb proc, requested transactions %llu:%llu", l_sync_request->request.id_start, l_local_last_id); + uint64_t l_start_item = l_sync_request->request.id_start; // If the current global_db has been truncated, but the remote node has not known this - if(l_ch_chain->request.id_start > l_local_last_id) { + if(l_sync_request->request.id_start > l_local_last_id) { l_start_item = 0; } - dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_hdr.net_id); - dap_list_t *l_add_groups = dap_chain_net_get_add_gdb_group(l_net, l_ch_chain->request.node_addr); + dap_chain_net_t *l_net = dap_chain_net_by_id(l_sync_request->request_hdr.net_id); + dap_list_t *l_add_groups = dap_chain_net_get_add_gdb_group(l_net, l_sync_request->request.node_addr); dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_start_item + 1, l_add_groups); - dap_chain_node_addr_t l_node_addr = { 0 }; - l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, - l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, - l_ch_chain->request_hdr.cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); + if(l_db_log) { - // Add it to outgoing list - l_ch_chain->request_global_db_trs = l_db_log; - l_ch_chain->db_iter = NULL; - l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB; + l_sync_request->gdb.db_log = l_db_log; + dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_out_gdb_first_gdb_worker_callback,l_sync_request ); } else { - dap_stream_ch_chain_sync_request_t l_request = {}; - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, - l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); - l_ch_chain->state = CHAIN_STATE_IDLE; - if(l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - NULL, 0, l_ch_chain->callback_notify_arg); + dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id, s_sync_out_gdb_synced_data_worker_callback,l_sync_request ); } - // go to send data from list [in s_stream_ch_packet_out()] - // no data to send -> send one empty message DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB - dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); - dap_proc_thread_assign_on_worker_inter(a_thread, l_ch->stream_worker->worker, l_ch->stream->esocket ); return true; } @@ -245,123 +377,156 @@ bool s_sync_out_gdb_callback(dap_proc_thread_t *a_thread, void *a_arg) * @param a_arg * @return */ -bool s_chain_in_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) +static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) { UNUSED(a_thread); - dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + struct sync_request *l_sync_request = (struct sync_request *) a_arg; dap_chain_hash_fast_t l_atom_hash = {}; - dap_list_t *l_pkt_copy_list = l_ch_chain->in_chains_list; - if (l_pkt_copy_list) { - l_ch_chain->in_chains_list = l_ch_chain->in_chains_list->next; - dap_chain_pkt_item_t *l_pkt_item = (dap_chain_pkt_item_t *)l_pkt_copy_list->data; - if (l_pkt_item){ - dap_chain_t *l_chain = dap_chain_find_by_id(l_pkt_item->pkt_hdr.net_id, l_pkt_item->pkt_hdr.chain_id); - if (!l_chain) { - if (s_debug_chain_sync) - log_it(L_WARNING, "No chain found for DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN"); - return true; - } - dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_item->pkt_data; - uint64_t l_atom_copy_size = l_pkt_item->pkt_data_size; - if ( l_atom_copy_size && l_pkt_item && l_atom_copy ){ - dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash); - dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain); - size_t l_atom_size =0; - if ( l_chain->callback_atom_find_by_hash(l_atom_iter, &l_atom_hash, &l_atom_size) == NULL ) { - dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size); - if (l_atom_add_res == ATOM_ACCEPT && dap_chain_has_file_store(l_chain)) { - // append to file - dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_pkt_item->pkt_hdr.cell_id); - int l_res; - if (l_cell) { - // add one atom only - l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size); - // rewrite all file - //l_res = dap_chain_cell_file_update(l_cell); - if(l_res < 0) { - log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_atom_hash, - l_cell ? l_cell->file_storage_path : "[null]"); - } else { - dap_db_set_last_hash_remote(l_ch_chain->request.node_addr.uint64, l_chain, &l_atom_hash); - } - // add all atoms from treshold - if (l_chain->callback_atom_add_from_treshold){ - dap_chain_atom_ptr_t l_atom_treshold; - do{ - size_t l_atom_treshold_size; - // add into ledger - if (s_debug_chain_sync) - log_it(L_DEBUG, "Try to add atom from treshold"); - l_atom_treshold = l_chain->callback_atom_add_from_treshold(l_chain, &l_atom_treshold_size); - // add into file - if(l_atom_treshold) { - l_res = dap_chain_cell_file_append(l_cell, l_atom_treshold, l_atom_treshold_size); - log_it(L_INFO, "Added atom from treshold"); - if(l_res < 0) { - log_it(L_ERROR, "Can't save event 0x%x from treshold to the file '%s'", - l_atom_treshold, l_cell ? l_cell->file_storage_path : "[null]"); - } + + if (l_sync_request->pkt_list) { + dap_chain_pkt_item_t *l_pkt_item = NULL; + dap_list_t *l_pkt_iter =l_sync_request->pkt_list; + l_pkt_item = (dap_chain_pkt_item_t *)l_pkt_iter->data; + l_sync_request->pkt_list = l_sync_request->pkt_list->next; + if (l_sync_request->pkt_list ){ + l_sync_request->pkt_list->prev = NULL; + } + if (l_pkt_item){ + dap_chain_t *l_chain = dap_chain_find_by_id(l_pkt_item->pkt_hdr.net_id, l_pkt_item->pkt_hdr.chain_id); + if (!l_chain) { + if (s_debug_chain_sync) + log_it(L_WARNING, "No chain found for DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN"); + return true; + } + dap_chain_atom_ptr_t l_atom_copy = (dap_chain_atom_ptr_t)l_pkt_item->pkt_data; + uint64_t l_atom_copy_size = l_pkt_item->pkt_data_size; + if ( l_atom_copy_size && l_pkt_item && l_atom_copy ){ + pthread_rwlock_wrlock(&l_chain->atoms_rwlock); + dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash); + dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain); + size_t l_atom_size =0; + if ( l_chain->callback_atom_find_by_hash(l_atom_iter, &l_atom_hash, &l_atom_size) == NULL ) { + dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size); + if (l_atom_add_res == ATOM_ACCEPT && dap_chain_has_file_store(l_chain)) { + // append to file + dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_pkt_item->pkt_hdr.cell_id); + int l_res; + if (l_cell) { + // add one atom only + l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size); + // rewrite all file + //l_res = dap_chain_cell_file_update(l_cell); + if(l_res < 0) { + log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_atom_hash, + l_cell ? l_cell->file_storage_path : "[null]"); + } else { + dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); + } + // add all atoms from treshold + if (l_chain->callback_atom_add_from_treshold){ + dap_chain_atom_ptr_t l_atom_treshold; + do{ + size_t l_atom_treshold_size; + // add into ledger + if (s_debug_chain_sync) + log_it(L_DEBUG, "Try to add atom from treshold"); + l_atom_treshold = l_chain->callback_atom_add_from_treshold(l_chain, &l_atom_treshold_size); + // add into file + if(l_atom_treshold) { + l_res = dap_chain_cell_file_append(l_cell, l_atom_treshold, l_atom_treshold_size); + log_it(L_INFO, "Added atom from treshold"); + if(l_res < 0) { + log_it(L_ERROR, "Can't save event 0x%x from treshold to the file '%s'", + l_atom_treshold, l_cell ? l_cell->file_storage_path : "[null]"); } } - while(l_atom_treshold); } - - // delete cell and close file - dap_chain_cell_delete(l_cell); + while(l_atom_treshold); } - else{ - log_it(L_ERROR, "Can't get cell for cell_id 0x%x for save event to file", l_pkt_item->pkt_hdr.cell_id); - } + // delete cell and close file + dap_chain_cell_delete(l_cell); + } + else{ + log_it(L_ERROR, "Can't get cell for cell_id 0x%x for save event to file", l_pkt_item->pkt_hdr.cell_id); + } - if(l_atom_add_res == ATOM_PASS) - DAP_DELETE(l_atom_copy); - } else { - dap_db_set_last_hash_remote(l_ch_chain->request.node_addr.uint64, l_chain, &l_atom_hash); - DAP_DELETE(l_atom_copy); } - l_chain->callback_atom_iter_delete(l_atom_iter); - }else{ - if (!l_pkt_item) - log_it(L_WARNING, "chain packet item is NULL"); - if (!l_pkt_copy_list) - log_it(L_WARNING, "chain packet item list is NULL"); - if (l_atom_copy_size) - log_it(L_WARNING, "chain packet item data size is zero"); + if(l_atom_add_res == ATOM_PASS) + DAP_DELETE(l_atom_copy); + } else { + dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); + DAP_DELETE(l_atom_copy); } + l_chain->callback_atom_iter_delete(l_atom_iter); + pthread_rwlock_unlock(&l_chain->atoms_rwlock); }else{ - log_it(L_WARNING, "pkt copy is NULL"); + if (!l_pkt_item) + log_it(L_WARNING, "chain packet item is NULL"); + if (l_atom_copy_size) + log_it(L_WARNING, "chain packet item data size is zero"); } - if (l_pkt_item) - DAP_DELETE(l_pkt_item); + }else{ + log_it(L_WARNING, "pkt copy is NULL"); + } + if (l_pkt_item) + DAP_DELETE(l_pkt_item); - DAP_DELETE(l_pkt_copy_list); + if (l_pkt_iter) + DAP_DELETE(l_pkt_iter); }else log_it(L_WARNING, "In proc thread got CHAINS stream ch packet with zero data"); - dap_proc_thread_assign_on_worker_inter(a_thread, l_ch->stream_worker->worker, l_ch->stream->esocket ); - return true; + + if(l_sync_request->pkt_list){ + return false; + }else{ + DAP_DELETE(l_sync_request); + return true; + } } -bool s_gdb_in_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) +/** + * @brief s_gdb_in_pkt_error_worker_callback + * @param a_thread + * @param a_arg + */ +static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_worker, void *a_arg) { - dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + UNUSED(a_worker); + struct ch_chain_pkt_in * l_pkt_in = (struct ch_chain_pkt_in*) a_arg; + + dap_stream_ch_chain_pkt_write_error(l_pkt_in->ch, l_pkt_in->pkt->pkt_hdr.net_id, + l_pkt_in->pkt->pkt_hdr.chain_id, l_pkt_in->pkt->pkt_hdr.cell_id, + "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED"); + dap_stream_ch_set_ready_to_write_unsafe(l_pkt_in->ch, true); + DAP_DELETE(l_pkt_in); +} - dap_list_t *l_pkt_copy_list = l_ch_chain->in_gdb_list; - if (l_pkt_copy_list) { - l_ch_chain->in_gdb_list = l_ch_chain->in_gdb_list->next; - if (l_ch_chain->in_gdb_list ) - l_ch_chain->in_gdb_list->prev = NULL; - dap_chain_pkt_item_t *l_pkt_copy = (dap_chain_pkt_item_t *)l_pkt_copy_list->data; +/** + * @brief s_gdb_in_pkt_callback + * @param a_thread + * @param a_arg + * @return + */ +static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg) +{ + struct sync_request *l_sync_request = (struct sync_request *) a_arg; + + dap_list_t *l_pkt_iter = l_sync_request->pkt_list; + if (l_pkt_iter) { + l_sync_request->pkt_list =l_sync_request->pkt_list->next; + if (l_sync_request->pkt_list ) + l_sync_request->pkt_list->prev = NULL; + + dap_chain_pkt_item_t *l_pkt_item = (dap_chain_pkt_item_t *)l_pkt_iter->data; size_t l_data_obj_count = 0; // deserialize data & Parse data from dap_db_log_pack() - dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_pkt_copy->pkt_data, l_pkt_copy->pkt_data_size, &l_data_obj_count); + dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_pkt_item->pkt_data, l_pkt_item->pkt_data_size, &l_data_obj_count); if (s_debug_chain_sync){ if (l_data_obj_count) log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count ); - else if (l_pkt_copy->pkt_data){ + else if (l_pkt_item->pkt_data){ log_it(L_WARNING, "In: No data objs after unpack", l_data_obj_count ); }else log_it(L_WARNING, "In: packet in list with NULL data"); @@ -418,14 +583,14 @@ bool s_gdb_in_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) if(!l_apply) { // If request was from defined node_addr we update its state - if(l_ch_chain->request.node_addr.uint64) { - dap_db_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id); + if(l_sync_request->request.node_addr.uint64) { + dap_db_set_last_id_remote(l_sync_request->request.node_addr.uint64, l_obj->id); } continue; } // apply received transaction - dap_chain_t *l_chain = dap_chain_find_by_id(l_pkt_copy->pkt_hdr.net_id, l_pkt_copy->pkt_hdr.chain_id); + dap_chain_t *l_chain = dap_chain_find_by_id(l_pkt_item->pkt_hdr.net_id, l_pkt_item->pkt_hdr.chain_id); if(l_chain) { if(l_chain->callback_add_datums_with_group){ void * restrict l_store_obj_value = l_store_obj->value; @@ -436,14 +601,14 @@ bool s_gdb_in_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) } // save data to global_db if(!dap_chain_global_db_obj_save(l_obj, 1)) { - dap_stream_ch_chain_pkt_write_error(l_ch, l_pkt_copy->pkt_hdr.net_id, - l_pkt_copy->pkt_hdr.chain_id, l_pkt_copy->pkt_hdr.cell_id, - "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED"); - dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); + struct ch_chain_pkt_in * l_pkt_in = DAP_NEW_Z(struct ch_chain_pkt_in); + l_pkt_in->ch = l_sync_request->ch; + l_pkt_in->pkt = l_pkt_item; + dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id,s_gdb_in_pkt_error_worker_callback, l_sync_request); } else { // If request was from defined node_addr we update its state - if(l_ch_chain->request.node_addr.uint64) { - dap_db_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id); + if(l_sync_request->request.node_addr.uint64) { + dap_db_set_last_id_remote(l_sync_request->request.node_addr.uint64, l_obj->id); } if (s_debug_chain_sync) log_it(L_DEBUG, "Added new GLOBAL_DB history pack"); @@ -451,15 +616,21 @@ bool s_gdb_in_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) } if(l_store_obj) dap_store_obj_free(l_store_obj, l_data_obj_count); - if (l_pkt_copy) - DAP_DELETE(l_pkt_copy); - if (l_pkt_copy_list) - DAP_DELETE(l_pkt_copy_list); + if (l_pkt_item) + DAP_DELETE(l_pkt_item); + if (l_pkt_iter) + DAP_DELETE(l_pkt_iter); + if(l_sync_request->pkt_list) + return false; + else{ + DAP_DELETE(l_sync_request); + return true; + } + } else { log_it(L_WARNING, "In proc thread got GDB stream ch packet with zero data"); + return true; } - dap_proc_thread_assign_on_worker_inter(a_thread, l_ch->stream_worker->worker, l_ch->stream->esocket ); - return true; } /** @@ -505,238 +676,272 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) return; } switch (l_ch_pkt->hdr.type) { - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: { - log_it(L_INFO, "In: SYNCED_ALL net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: { - log_it(L_INFO, "In: SYNCED_GLOBAL_DB: net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP: { - if (s_debug_chain_sync) - log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: { + log_it(L_INFO, "In: SYNCED_ALL net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: { - if (s_debug_chain_sync) - log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); - } - break; + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: { + log_it(L_INFO, "In: SYNCED_GLOBAL_DB: net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP: { + if (s_debug_chain_sync) + log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: { + if (s_debug_chain_sync) + log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); + } + break; + + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { + if (dap_log_level_get()<= L_INFO){ + char *l_hash_from_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_from); + char *l_hash_to_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_to); + log_it(L_INFO, "In: SYNCED_CHAINS: between %s and %s",l_hash_from_str?l_hash_from_str:"(null)", + l_hash_to_str? l_hash_to_str: "(null)"); + if(l_hash_from_str) + DAP_DELETE(l_hash_from_str); + if(l_hash_to_str) + DAP_DELETE(l_hash_to_str); + } + //l_ch_chain->state = CHAIN_STATE_IDLE; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: { - if (dap_log_level_get()<= L_INFO){ - char *l_hash_from_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_from); - char *l_hash_to_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_to); - log_it(L_INFO, "In: SYNCED_CHAINS: between %s and %s",l_hash_from_str?l_hash_from_str:"(null)", - l_hash_to_str? l_hash_to_str: "(null)"); - if(l_hash_from_str) + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: { + // fill ids + if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { + if(l_ch_chain->is_on_request){ + log_it(L_WARNING, "Can't process SYNC_CHAINS request because its already busy with syncronization"); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + break; + } + l_ch_chain->is_on_request=true; + memcpy(&l_ch_chain->request, l_chain_pkt->data, l_chain_pkt_data_size); + memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); + char *l_hash_from_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_from); + char *l_hash_to_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_to); + log_it(L_INFO, "In: SYNC_CHAINS pkt: net 0x%016x chain 0x%016x cell 0x%016x between %s and %s", l_ch_chain->request_hdr.net_id.uint64 , + l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, + l_hash_from_str? l_hash_from_str: "(null)", l_hash_to_str?l_hash_to_str:"(null)"); + dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); + if(l_chain) { + if(l_ch_chain->state != CHAIN_STATE_IDLE) { + l_ch_chain->is_on_request=false; + log_it(L_INFO, "Can't process SYNC_CHAINS request between %s and %s because not in idle state", + l_hash_from_str? l_hash_from_str:"(null)", + l_hash_to_str?l_hash_to_str:"(null)"); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_STATE_NOT_IN_IDLE"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + } else { + struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request); + l_sync_request->ch = a_ch; + l_sync_request->worker = a_ch->stream_worker->worker; + memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof (l_ch_chain->request)); + memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof (l_ch_chain->request_hdr)); + dap_proc_queue_add_callback_inter( a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_proc_callback, l_sync_request ); + } + } DAP_DELETE(l_hash_from_str); - if(l_hash_to_str) DAP_DELETE(l_hash_to_str); + }else{ + log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_CHAIN_PKT_DATA_SIZE" ); + } } - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: { - // fill ids - if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { - memcpy(&l_ch_chain->request, l_chain_pkt->data, l_chain_pkt_data_size); - memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); - char *l_hash_from_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_from); - char *l_hash_to_str = dap_chain_hash_fast_to_str_new(&l_ch_chain->request.hash_to); - log_it(L_INFO, "In: SYNC_CHAINS pkt: net 0x%016x chain 0x%016x cell 0x%016x between %s and %s", l_ch_chain->request_hdr.net_id.uint64 , - l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64, - l_hash_from_str? l_hash_from_str: "(null)", l_hash_to_str?l_hash_to_str:"(null)"); - dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); - if(l_chain) { + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: { + if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { if(l_ch_chain->state != CHAIN_STATE_IDLE) { - log_it(L_INFO, "Can't process SYNC_CHAINS request between %s and %s because not in idle state", - l_hash_from_str? l_hash_from_str:"(null)", - l_hash_to_str?l_hash_to_str:"(null)"); + log_it(L_WARNING, "Can't process SYNC_GLOBAL_DB request because not in idle state"); dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, "ERROR_STATE_NOT_IN_IDLE"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } else { - dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback_inter( a_ch->stream_worker->worker->proc_queue_input, s_sync_out_chains_callback, a_ch); + break; + } else { // receive the latest global_db revision of the remote node -> go to send mode + if(l_ch_chain->is_on_request){ + log_it(L_WARNING, "Can't process SYNC_CHAINS request because its already busy with syncronization"); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + break; + } + l_ch_chain->is_on_request = true; + dap_stream_ch_chain_sync_request_t * l_request = + (dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data; + memcpy(&l_ch_chain->request, l_request, l_chain_pkt_data_size); + memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); + log_it(L_INFO, "In: SYNC_GLOBAL_DB pkt: net 0x%016x chain 0x%016x cell 0x%016x, range between %u and %u", + l_ch_chain->request_hdr.net_id.uint64 , l_ch_chain->request_hdr.chain_id.uint64, + l_ch_chain->request_hdr.cell_id.uint64, l_ch_chain->request.id_start, l_ch_chain->request.id_end ); + + struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request); + l_sync_request->ch = a_ch; + l_sync_request->worker = a_ch->stream_worker->worker; + memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof (l_ch_chain->request)); + memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof (l_ch_chain->request_hdr)); + dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_proc_callback, l_sync_request); } + }else{ + log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_CHAIN_PKT_DATA_SIZE" ); } - DAP_DELETE(l_hash_from_str); - DAP_DELETE(l_hash_to_str); - }else{ - log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_CHAIN_PKT_DATA_SIZE" ); } - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: { - if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { - dap_stream_ch_chain_sync_request_t * l_request = - (dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data; - memcpy(&l_ch_chain->request, l_request, l_chain_pkt_data_size); - memcpy(&l_ch_chain->request_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); - log_it(L_INFO, "In: SYNC_GLOBAL_DB pkt: net 0x%016x chain 0x%016x cell 0x%016x, range between %u and %u", - l_ch_chain->request_hdr.net_id.uint64 , l_ch_chain->request_hdr.chain_id.uint64, - l_ch_chain->request_hdr.cell_id.uint64, l_ch_chain->request.id_start, l_ch_chain->request.id_end ); - - if(l_ch_chain->state != CHAIN_STATE_IDLE) { - log_it(L_WARNING, "Can't process SYNC_GLOBAL_DB request because not in idle state"); + break; + // first packet of data with source node address + case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN: { + if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)){ + log_it(L_INFO, "From "NODE_ADDR_FP_STR": FIRST_CHAIN data_size=%d net 0x%016x chain 0x%016x cell 0x%016x ", + NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr), + l_chain_pkt_data_size, l_ch_chain->request_hdr.net_id.uint64 , + l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64); + }else{ + log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN", l_chain_pkt_data_size); dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_STATE_NOT_IN_IDLE"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - break; - } else { // receive the latest global_db revision of the remote node -> go to send mode - log_it(L_INFO, "Got DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB request", - a_ch->stream->session->id); - dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_out_gdb_callback, a_ch); + "ERROR_CHAIN_PACKET_TYPE_FIRST_CHAIN_INCORRET_DATA_SIZE(%zd/%zd)",l_chain_pkt_data_size, sizeof(dap_chain_node_addr_t)); } - }else{ - log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_CHAIN_PKT_DATA_SIZE" ); - } - } - break; - // first packet of data with source node address - case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN: { - if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)){ - log_it(L_INFO, "From "NODE_ADDR_FP_STR": FIRST_CHAIN data_size=%d net 0x%016x chain 0x%016x cell 0x%016x ", - NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr), - l_chain_pkt_data_size, l_ch_chain->request_hdr.net_id.uint64 , - l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64); - }else{ - log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN", l_chain_pkt_data_size); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_CHAIN_PACKET_TYPE_FIRST_CHAIN_INCORRET_DATA_SIZE(%zd/%zd)",l_chain_pkt_data_size, sizeof(dap_chain_node_addr_t)); } - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN: { - if(l_chain_pkt_data_size) { - dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); - if(l_chain) { - // Expect atom element in - if(l_chain_pkt_data_size > 0) { - dap_chain_pkt_item_t *l_pkt_item = DAP_NEW_Z(dap_chain_pkt_item_t); - memcpy(&l_pkt_item->pkt_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); - l_pkt_item->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size); - memcpy(l_pkt_item->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); - l_pkt_item->pkt_data_size = l_chain_pkt_data_size; - l_ch_chain->in_chains_list = dap_list_append(l_ch_chain->in_chains_list, l_pkt_item); - if (s_debug_chain_sync){ - dap_chain_hash_fast_t l_atom_hash={0}; - dap_hash_fast(l_chain_pkt->data, l_chain_pkt_data_size ,&l_atom_hash); - char *l_atom_hash_str= dap_chain_hash_fast_to_str_new(&l_atom_hash); - log_it(L_INFO, "In: CHAIN pkt: atom hash %s (size %zd)", l_atom_hash_str, l_chain_pkt_data_size); - DAP_DELETE(l_atom_hash_str); - } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN: { + if(l_chain_pkt_data_size) { + dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); + if(l_chain) { + // Expect atom element in + if(l_chain_pkt_data_size > 0) { + dap_chain_pkt_item_t *l_pkt_item = DAP_NEW_Z(dap_chain_pkt_item_t); + memcpy(&l_pkt_item->pkt_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); + l_pkt_item->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size); + memcpy(l_pkt_item->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); + l_pkt_item->pkt_data_size = l_chain_pkt_data_size; - dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_chain_in_pkt_callback, a_ch); - } else { - log_it(L_WARNING, "Empty chain packet"); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_CHAIN_PACKET_EMPTY"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request); + l_sync_request->ch = a_ch; + l_sync_request->worker = a_ch->stream_worker->worker; + memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof (l_ch_chain->request)); + memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof (l_ch_chain->request_hdr)); + + l_sync_request->pkt_list = dap_list_append(l_sync_request->pkt_list, l_pkt_item); + if (s_debug_chain_sync){ + dap_chain_hash_fast_t l_atom_hash={0}; + dap_hash_fast(l_chain_pkt->data, l_chain_pkt_data_size ,&l_atom_hash); + char *l_atom_hash_str= dap_chain_hash_fast_to_str_new(&l_atom_hash); + log_it(L_INFO, "In: CHAIN pkt: atom hash %s (size %zd)", l_atom_hash_str, l_chain_pkt_data_size); + DAP_DELETE(l_atom_hash_str); + } + dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_in_chains_callback, l_sync_request); + } else { + log_it(L_WARNING, "Empty chain packet"); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_CHAIN_PACKET_EMPTY"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + } } } } - } + break; + // first packet of data with source node address + case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB: + if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)){ + memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size); + log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d net 0x%016x chain 0x%016x cell 0x%016x from address "NODE_ADDR_FP_STR, + l_chain_pkt_data_size, l_chain_pkt->hdr.net_id.uint64 , + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr) ); + }else { + log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB", l_chain_pkt_data_size); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_CHAIN_PACKET_TYPE_FIRST_GLOBAL_DB_INCORRET_DATA_SIZE"); + } break; - // first packet of data with source node address - case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB: - if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)){ - memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size); - log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d net 0x%016x chain 0x%016x cell 0x%016x from address "NODE_ADDR_FP_STR, - l_chain_pkt_data_size, l_chain_pkt->hdr.net_id.uint64 , - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr) ); - }else { - log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB", l_chain_pkt_data_size); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_CHAIN_PACKET_TYPE_FIRST_GLOBAL_DB_INCORRET_DATA_SIZE"); + case DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB: { + if(s_debug_chain_sync) + log_it(L_INFO, "In: GLOBAL_DB data_size=%d ", l_chain_pkt_data_size); + // get transaction and save it to global_db + if(l_chain_pkt_data_size > 0) { + dap_chain_pkt_item_t *l_pkt_item = DAP_NEW_Z(dap_chain_pkt_item_t); + memcpy(&l_pkt_item->pkt_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); + l_pkt_item->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size); + memcpy(l_pkt_item->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); + l_pkt_item->pkt_data_size = l_chain_pkt_data_size; + + struct sync_request *l_sync_request = DAP_NEW_Z(struct sync_request); + l_sync_request->ch = a_ch; + l_sync_request->worker = a_ch->stream_worker->worker; + memcpy(&l_sync_request->request, &l_ch_chain->request, sizeof (l_ch_chain->request)); + memcpy(&l_sync_request->request_hdr, &l_ch_chain->request_hdr, sizeof (l_ch_chain->request_hdr)); + l_sync_request->pkt_list = dap_list_append(l_sync_request->pkt_list, l_pkt_item); + dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_gdb_in_pkt_proc_callback, l_sync_request); + } else { + log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size"); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_GLOBAL_DB_PACKET_EMPTY"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + } } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB: { - if(s_debug_chain_sync) - log_it(L_INFO, "In: GLOBAL_DB data_size=%d ", l_chain_pkt_data_size); - // get transaction and save it to global_db - if(l_chain_pkt_data_size > 0) { - dap_chain_pkt_item_t *l_pkt_copy = DAP_NEW_Z(dap_chain_pkt_item_t); - memcpy(&l_pkt_copy->pkt_hdr, &l_chain_pkt->hdr, sizeof(l_chain_pkt->hdr)); - l_pkt_copy->pkt_data = DAP_NEW_Z_SIZE(byte_t, l_chain_pkt_data_size); - memcpy(l_pkt_copy->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); - l_pkt_copy->pkt_data_size = l_chain_pkt_data_size; - l_ch_chain->in_gdb_list = dap_list_append(l_ch_chain->in_gdb_list, l_pkt_copy); - dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); -//#ifdef DAP_OS_WINDOWS -// if (a_ch->stream_worker->worker->proc_queue_input->buf_out_size == 0) -//#endif - dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_gdb_in_pkt_callback, a_ch); - } else { - log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size"); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_GLOBAL_DB_PACKET_EMPTY"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS: { + dap_stream_ch_chain_sync_request_t l_sync_gdb = {0}; + memcpy(&l_sync_gdb, l_chain_pkt->data, l_chain_pkt_data_size); + l_sync_gdb.id_start = dap_db_get_last_id_remote(l_sync_gdb.node_addr.uint64); + dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id); + l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); + log_it(L_INFO, "In: SYNC_GLOBAL_DB_RVRS pkt: net 0x%016x chain 0x%016x cell 0x%016x, request gdb sync from %u", l_chain_pkt->hdr.net_id.uint64 , + l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id, l_sync_gdb.id_start ); + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); } - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS: { - dap_stream_ch_chain_sync_request_t l_sync_gdb = {0}; - memcpy(&l_sync_gdb, l_chain_pkt->data, l_chain_pkt_data_size); - //l_sync_gdb.id_start = dap_db_get_last_id_remote(l_sync_gdb.node_addr.uint64); - dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id); - l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); - log_it(L_INFO, "In: SYNC_GLOBAL_DB_RVRS pkt: net 0x%016x chain 0x%016x cell 0x%016x, request gdb sync from %u", l_chain_pkt->hdr.net_id.uint64 , - l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id, l_sync_gdb.id_start ); - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: { - if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { - dap_stream_ch_chain_sync_request_t l_request={0}; - dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); - if( l_chain){ - //dap_chain_get_atom_last_hash(l_chain,& l_request.hash_from); - if( dap_log_level_get()<= L_INFO){ - char l_hash_from_str[70]={[0]='\0'}; - dap_chain_hash_fast_to_str(&l_request.hash_from,l_hash_from_str,sizeof (l_hash_from_str)-1); - log_it(L_INFO, "In: SYNC_CHAINS_RVRS pkt: net 0x%016x chain 0x%016x cell 0x%016x request chains sync from %s", - l_chain_pkt->hdr.net_id.uint64 , l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, - l_hash_from_str[0] ? l_hash_from_str :"(null)"); + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: { + if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { + dap_stream_ch_chain_sync_request_t l_request={0}; + dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); + if( l_chain){ + dap_chain_get_atom_last_hash(l_chain,& l_request.hash_from); // Move away from i/o reactor to callback processor + if( dap_log_level_get()<= L_INFO){ + char l_hash_from_str[70]={[0]='\0'}; + dap_chain_hash_fast_to_str(&l_request.hash_from,l_hash_from_str,sizeof (l_hash_from_str)-1); + log_it(L_INFO, "In: SYNC_CHAINS_RVRS pkt: net 0x%016x chain 0x%016x cell 0x%016x request chains sync from %s", + l_chain_pkt->hdr.net_id.uint64 , l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, + l_hash_from_str[0] ? l_hash_from_str :"(null)"); + } + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_request, sizeof(l_request)); } - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_request, sizeof(l_request)); + }else{ + log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_CHAIN_PKT_DATA_SIZE" ); } - }else{ - log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_CHAIN_PKT_DATA_SIZE" ); - } - } - break; - case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR: - break; - default: { - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_UNKNOWN_CHAIN_PKT_TYPE"); } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR: + break; + default: { + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_UNKNOWN_CHAIN_PKT_TYPE"); + } } if(l_ch_chain->callback_notify_packet_in) l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt, @@ -763,7 +968,7 @@ static void s_process_gdb_iter(dap_stream_ch_t *a_ch) { dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs; - dap_store_obj_pkt_t *l_pkt = (dap_store_obj_pkt_t *)l_ch_chain->db_iter->data; + dap_store_obj_pkt_t *l_pkt = (dap_store_obj_pkt_t *)l_ch_chain->request_db_iter->data; uint32_t l_pkt_size = sizeof(dap_store_obj_pkt_t) + l_pkt->data_size; if( s_debug_chain_sync) log_it(L_INFO, "Send one global_db record packet len=%d (rest=%d/%d items)", l_pkt_size, @@ -771,48 +976,49 @@ static void s_process_gdb_iter(dap_stream_ch_t *a_ch) dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, l_ch_chain->request_hdr.cell_id, l_pkt, l_pkt_size); - dap_list_t *l_iter = dap_list_next(l_ch_chain->db_iter); + dap_list_t *l_iter = dap_list_next(l_ch_chain->request_db_iter); if (l_iter) { - l_ch_chain->db_iter = l_iter; + l_ch_chain->request_db_iter = l_iter; } else { l_ch_chain->stats_request_gdb_processed++; - l_ch_chain->db_iter = dap_list_first(l_ch_chain->db_iter); - dap_list_free_full(l_ch_chain->db_iter, free); - l_ch_chain->db_iter = NULL; + l_ch_chain->request_db_iter = dap_list_first(l_ch_chain->request_db_iter); + dap_list_free_full(l_ch_chain->request_db_iter, free); + l_ch_chain->request_db_iter = NULL; } } -static bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) +/** + * @brief s_stream_ch_packet_out + * @param ch + * @param arg + */ +void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) { - UNUSED(a_thread); - dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + UNUSED(a_arg); - if(s_debug_chain_sync) - log_it( L_DEBUG,"s_stream_ch_packet_out state=%d", l_ch_chain ? l_ch_chain->state : -1); - switch (l_ch_chain->state) { + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); - case CHAIN_STATE_IDLE: { - dap_stream_ch_chain_go_idle(l_ch_chain); - } break; + if(s_debug_chain_sync) + log_it( L_DEBUG,"Out: ch=ch_chain state=%d esocket->buf_out_size=%zd", l_ch_chain ? l_ch_chain->state : -1, a_ch->stream->esocket->buf_out_size); + switch (l_ch_chain->state) { // Synchronize GDB case CHAIN_STATE_SYNC_GLOBAL_DB: { - if (l_ch_chain->db_iter) { - s_process_gdb_iter(l_ch); + if (l_ch_chain->request_db_iter) { + s_process_gdb_iter(a_ch); } else { dap_global_db_obj_t *l_obj; do { // Get log diff size_t l_item_size_out = 0; l_obj = dap_db_log_list_get(l_ch_chain->request_global_db_trs); - l_ch_chain->db_iter = dap_db_log_pack(l_obj, &l_item_size_out); - if (l_ch_chain->db_iter && l_item_size_out) { + l_ch_chain->request_db_iter = dap_db_log_pack(l_obj, &l_item_size_out); + if (l_ch_chain->request_db_iter && l_item_size_out) { break; } // Item not found, maybe it has deleted? Then go to the next item } while (l_obj); - if (l_ch_chain->db_iter) { - s_process_gdb_iter(l_ch); + if (l_ch_chain->request_db_iter) { + s_process_gdb_iter(a_ch); } else { // free log list dap_db_log_list_delete(l_ch_chain->request_global_db_trs); @@ -821,7 +1027,7 @@ static bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) l_ch_chain->stats_request_gdb_processed ); // last message dap_stream_ch_chain_sync_request_t l_request = {}; - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); dap_stream_ch_chain_go_idle(l_ch_chain); @@ -843,7 +1049,7 @@ static bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) log_it(L_INFO, "Out CHAIN pkt: atom hash %s (size %zd) ", l_atom_hash_str, l_ch_chain->request_atom_iter->cur_size); DAP_DELETE(l_atom_hash_str); } - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id, + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, l_ch_chain->request_hdr.cell_id, l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size); l_ch_chain->stats_request_atoms_processed++; @@ -852,7 +1058,7 @@ static bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) } else { // All chains synced dap_stream_ch_chain_sync_request_t l_request = {}; // last message - dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, l_ch_chain->request_hdr.cell_id, &l_request, sizeof(l_request)); log_it( L_INFO,"Synced: %llu atoms processed", l_ch_chain->stats_request_atoms_processed); @@ -864,33 +1070,4 @@ static bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) } break; default: break; } - if (l_ch->stream->esocket->buf_out_size || l_ch_chain->state == CHAIN_STATE_IDLE) { - dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); - dap_proc_thread_assign_on_worker_inter(a_thread, l_ch->stream_worker->worker, l_ch->stream->esocket ); - return true; - } - return false; -} - -/** - * @brief s_stream_ch_packet_out - * @param ch - * @param arg - */ -void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) -{ - (void) a_arg; - - /// That was for what?! - /// - /// if (a_ch->stream->esocket->buf_out_size > ( a_ch->stream->esocket->buf_out_size_max / 4 )) { - /// return; - /// } - /// - dap_stream_ch_set_ready_to_write_unsafe(a_ch, false); - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); - if (l_ch_chain && l_ch_chain->state != CHAIN_STATE_IDLE) { - dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); - dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_out_pkt_callback, a_ch); - } } diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index 15702b4c46608998eb17e9deec5d886a60805cac..be92f2912dfb64160401ff999578f346ba105828 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -55,17 +55,19 @@ typedef struct dap_chain_pkt_item { typedef struct dap_stream_ch_chain { dap_stream_ch_t * ch; - dap_db_log_list_t *request_global_db_trs; // list of global db records - dap_list_t *db_iter; - dap_stream_ch_chain_state_t state; - dap_chain_atom_iter_t *request_atom_iter; - dap_list_t *in_gdb_list; - dap_list_t *in_chains_list; + dap_stream_ch_chain_state_t state; uint64_t stats_request_atoms_processed; uint64_t stats_request_gdb_processed; + + // request section + dap_chain_atom_iter_t *request_atom_iter; + dap_db_log_list_t *request_global_db_trs; // list of global db records dap_stream_ch_chain_sync_request_t request; dap_stream_ch_chain_pkt_hdr_t request_hdr; + dap_list_t *request_db_iter; + + atomic_bool is_on_request; // Protects request section dap_stream_ch_chain_callback_packet_t callback_notify_packet_out; dap_stream_ch_chain_callback_packet_t callback_notify_packet_in; diff --git a/modules/global-db/dap_chain_global_db_driver.c b/modules/global-db/dap_chain_global_db_driver.c index fd07a1bda401d4a7667c7b4377629862718659c5..095e99dfcc4256ec58ed62ee64df84f96d188b23 100644 --- a/modules/global-db/dap_chain_global_db_driver.c +++ b/modules/global-db/dap_chain_global_db_driver.c @@ -270,7 +270,9 @@ dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *pkt, siz dap_store_obj_t *store_obj = DAP_NEW_Z_SIZE(dap_store_obj_t, count * sizeof(struct dap_store_obj)); for(size_t q = 0; q < count; ++q) { dap_store_obj_t *obj = store_obj + q; - uint16_t str_size; + uint16_t str_length; + + if (offset+sizeof (int)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'type' field"); break;} // Check for buffer boundries memcpy(&obj->type, pkt->data + offset, sizeof(int)); offset += sizeof(int); @@ -280,32 +282,42 @@ dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *pkt, siz //memcpy(obj->section, pkt->data + offset, str_size); //offset += str_size; - memcpy(&str_size, pkt->data + offset, sizeof(uint16_t)); + if (offset+sizeof (uint16_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'group_length' field"); break;} // Check for buffer boundries + memcpy(&str_length, pkt->data + offset, sizeof(uint16_t)); offset += sizeof(uint16_t); - obj->group = DAP_NEW_Z_SIZE(char, str_size + 1); - memcpy(obj->group, pkt->data + offset, str_size); - offset += str_size; + if (offset+str_length> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'group' field"); break;} // Check for buffer boundries + obj->group = DAP_NEW_Z_SIZE(char, str_length + 1); + memcpy(obj->group, pkt->data + offset, str_length); + offset += str_length; + + if (offset+sizeof (uint64_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'id' field"); break;} // Check for buffer boundries memcpy(&obj->id, pkt->data + offset, sizeof(uint64_t)); offset += sizeof(uint64_t); + if (offset+sizeof (time_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'timestamp' field"); break;} // Check for buffer boundries memcpy(&obj->timestamp, pkt->data + offset, sizeof(time_t)); offset += sizeof(time_t); - memcpy(&str_size, pkt->data + offset, sizeof(uint16_t)); + if (offset+sizeof (uint16_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'key_length' field"); break;} // Check for buffer boundries + memcpy(&str_length, pkt->data + offset, sizeof(uint16_t)); offset += sizeof(uint16_t); - obj->key = DAP_NEW_Z_SIZE(char, str_size + 1); - memcpy(obj->key, pkt->data + offset, str_size); - offset += str_size; - memcpy(&obj->value_len, pkt->data + offset, sizeof(size_t)); - offset += sizeof(size_t); + if (offset+ str_length > pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'key' field"); break;} // Check for buffer boundries + obj->key = DAP_NEW_Z_SIZE(char, str_length + 1); + memcpy(obj->key, pkt->data + offset, str_length); + offset += str_length; + + if (offset+sizeof (uint32_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'value_length' field"); break;} // Check for buffer boundries + memcpy(&obj->value_len, pkt->data + offset, sizeof(uint32_t)); + offset += sizeof(uint32_t); + if (offset+obj->value_len> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'value' field"); break;} // Check for buffer boundries obj->value = DAP_NEW_Z_SIZE(uint8_t, obj->value_len + 1); memcpy(obj->value, pkt->data + offset, obj->value_len); offset += obj->value_len; } - assert(pkt->data_size == offset); + //assert(pkt->data_size == offset); if(store_obj_count) *store_obj_count = count; return store_obj; diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 17aa252863f33490b1db5ba37230e98218d22da6..a636a5c5ada7a336463e2e67a51904d2c4241fc4 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -524,7 +524,7 @@ static int s_net_states_proc(dap_chain_net_t *a_net) for (dap_list_t *l_tmp = l_pvt_net->links; l_tmp; ) { dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)l_tmp->data; dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, dap_stream_ch_chain_get_id()); - if (!l_ch_chain) { // Channel or stream or client itself closed + if ( !l_ch_chain) { // Channel or stream or client itself closed l_tmp = dap_list_next(l_tmp); dap_chain_node_client_close(l_node_client); l_pvt_net->links = dap_list_remove(l_pvt_net->links, l_node_client); @@ -1623,7 +1623,7 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) else{ l_node_addr = DAP_NEW_Z(dap_chain_node_addr_t); bool parse_succesfully = false; - if ( sscanf(l_node_addr_str, "0x%016x" DAP_UINT64_FORMAT_x ,&l_node_addr->uint64 ) == 1 ){ + if ( sscanf(l_node_addr_str, "0x%016" DAP_UINT64_FORMAT_x ,&l_node_addr->uint64 ) == 1 ){ log_it(L_DEBUG, "Parse node address with format 0x016llx"); parse_succesfully = true; }