diff --git a/CMakeLists.txt b/CMakeLists.txt index 8226ff439a5ae1b6431d162d157de6ffdd48b955..67bf9ca3bcafcdf56420cfc08733a682c1b5af6b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.8-9") +set(CELLFRAME_SDK_NATIVE_VERSION "2.8-10") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 423800273f7e805e53486d35d8c18abeb959e2a7..ee68c3c437a4e64e85753c10273ff94e196d178f 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -174,12 +174,12 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_va } /** - * @brief s_update_poll_flags + * @brief dap_proc_thread_esocket_update_poll_flags * @param a_thread * @param a_esocket * @return */ -static int s_update_poll_flags(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket) +int dap_proc_thread_esocket_update_poll_flags(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket) { #ifdef DAP_EVENTS_CAPS_EPOLL u_int events = a_esocket->ev_base_flags; @@ -599,14 +599,14 @@ static void * s_proc_thread_function(void * a_arg) memmove(l_cur->buf_out, l_cur->buf_out+l_bytes_sent, l_cur->buf_out_size ); }else{ l_cur->flags ^= DAP_SOCK_READY_TO_WRITE; - s_update_poll_flags(l_thread, l_cur); + dap_proc_thread_esocket_update_poll_flags(l_thread, l_cur); } } }else{ log_it(L_DEBUG,"(!) Write event receieved but nothing in buffer, switching off this flag"); l_cur->flags ^= DAP_SOCK_READY_TO_WRITE; - s_update_poll_flags(l_thread, l_cur); + dap_proc_thread_esocket_update_poll_flags(l_thread, l_cur); } @@ -681,7 +681,7 @@ bool dap_proc_thread_assign_on_worker_inter(dap_proc_thread_t * a_thread, dap_wo //log_it(L_DEBUG,"Remove esocket %p from proc thread and send it to worker #%u",a_esocket, a_worker->id); dap_events_socket_assign_on_worker_inter(l_es_assign_input, a_esocket); l_es_assign_input->flags |= DAP_SOCK_READY_TO_WRITE; - s_update_poll_flags(a_thread, l_es_assign_input); + dap_proc_thread_esocket_update_poll_flags(a_thread, l_es_assign_input); return true; } @@ -700,7 +700,7 @@ int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_ dap_events_socket_t * l_es_io_input = a_thread->queue_io_input[a_worker->id]; dap_events_socket_write_inter(l_es_io_input,a_esocket, a_data, a_data_size); l_es_io_input->flags |= DAP_SOCK_READY_TO_WRITE; - s_update_poll_flags(a_thread, l_es_io_input); + dap_proc_thread_esocket_update_poll_flags(a_thread, l_es_io_input); return 0; } @@ -734,7 +734,7 @@ int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worke dap_events_socket_write_inter(l_es_io_input,a_esocket, l_data, l_data_size); l_es_io_input->flags |= DAP_SOCK_READY_TO_WRITE; - s_update_poll_flags(a_thread, l_es_io_input); + dap_proc_thread_esocket_update_poll_flags(a_thread, l_es_io_input); return 0; } @@ -753,6 +753,6 @@ void dap_proc_thread_worker_exec_callback(dap_proc_thread_t * a_thread, size_t a dap_events_socket_queue_ptr_send_to_input(a_thread->queue_callback_input[a_worker_id],l_msg ); a_thread->queue_callback_input[a_worker_id]->flags |= DAP_SOCK_READY_TO_WRITE; - s_update_poll_flags(a_thread, a_thread->queue_callback_input[a_worker_id]); + dap_proc_thread_esocket_update_poll_flags(a_thread, a_thread->queue_callback_input[a_worker_id]); } diff --git a/dap-sdk/net/core/include/dap_proc_thread.h b/dap-sdk/net/core/include/dap_proc_thread.h index d731338197a8898160052ec2da57195879ad179e..8f307aff592748193aad5e24a0c057ebd163302a 100644 --- a/dap-sdk/net/core/include/dap_proc_thread.h +++ b/dap-sdk/net/core/include/dap_proc_thread.h @@ -55,6 +55,7 @@ typedef struct dap_proc_thread{ #else #error "No poll for proc thread for your platform" #endif + void * _inheritor; } dap_proc_thread_t; int dap_proc_thread_init(uint32_t a_threads_count); @@ -69,6 +70,8 @@ int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_ int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_esocket, const char * a_format,...); +int dap_proc_thread_esocket_update_poll_flags(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket); + typedef void (*dap_proc_worker_callback_t)(dap_worker_t *,void *); void dap_proc_thread_worker_exec_callback(dap_proc_thread_t * a_thread, size_t a_worker_id, dap_proc_worker_callback_t a_callback, void * a_arg); diff --git a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c index c71aafc118e4af9e4685c86c3de1da06d8545d75..57a737341688ca92abac34544fd71e0009cb943b 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c @@ -106,6 +106,47 @@ size_t dap_stream_ch_pkt_write_f_mt(dap_stream_worker_t * a_worker , dap_stream_ } +/** + * @brief dap_stream_ch_pkt_write_f_inter + * @param a_queue + * @param a_ch + * @param a_type + * @param a_format + * @return + */ +size_t dap_stream_ch_pkt_write_f_inter(dap_events_socket_t * a_queue , dap_stream_ch_t *a_ch, uint8_t a_type, const char * a_format,...) +{ + va_list ap; + va_start(ap,a_format); + int l_data_size = dap_vsnprintf(NULL,0,a_format,ap); + if (l_data_size <0 ){ + log_it(L_ERROR,"Can't write out formatted data '%s' with values",a_format); + return 0; + } + l_data_size++; // To calc trailing zero + dap_stream_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_stream_worker_msg_io_t); + l_msg->ch = a_ch; + l_msg->ch_pkt_type = a_type; + l_msg->data = DAP_NEW_SIZE(void,l_data_size); + l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; + l_data_size = dap_vsnprintf(l_msg->data,0,a_format,ap); + if (l_data_size <0 ){ + log_it(L_ERROR,"Can't write out formatted data '%s' with values",a_format); + DAP_DELETE(l_msg); + return 0; + } + l_data_size++; + l_msg->data_size = l_data_size; + int l_ret= dap_events_socket_queue_ptr_send_to_input(a_queue , l_msg ); + if (l_ret!=0){ + log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret); + DAP_DELETE(l_msg); + return 0; + } + return l_data_size; + +} + /** * @brief dap_stream_ch_pkt_write_mt * @param a_ch @@ -132,6 +173,34 @@ size_t dap_stream_ch_pkt_write_mt(dap_stream_worker_t * a_worker , dap_stream_ch return a_data_size; } + +/** + * @brief dap_stream_ch_pkt_write_inter + * @param a_queue + * @param a_ch + * @param a_type + * @param a_data + * @param a_data_size + * @return + */ +size_t dap_stream_ch_pkt_write_inter(dap_events_socket_t * a_queue , dap_stream_ch_t *a_ch, uint8_t a_type, const void * a_data, size_t a_data_size) +{ + dap_stream_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_stream_worker_msg_io_t); + l_msg->ch = a_ch; + l_msg->ch_pkt_type = a_type; + l_msg->data = DAP_NEW_SIZE(void,a_data_size); + l_msg->flags_set = DAP_SOCK_READY_TO_WRITE; + l_msg->data_size = a_data_size; + memcpy( l_msg->data, a_data, a_data_size); + int l_ret= dap_events_socket_queue_ptr_send_to_input(a_queue , l_msg ); + if (l_ret!=0){ + log_it(L_ERROR, "Wasn't send pointer to queue: code %d", l_ret); + DAP_DELETE(l_msg); + return 0; + } + return a_data_size; +} + /** * @brief dap_stream_ch_check_unsafe * @param a_worker diff --git a/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h b/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h index 43c8949c091a09d23d26a930ad762b7d46a20284..b561c723c4441700703c6b6d4dc83a1d45e4f33f 100644 --- a/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h +++ b/dap-sdk/net/stream/ch/include/dap_stream_ch_pkt.h @@ -58,3 +58,6 @@ bool dap_stream_ch_check_unsafe(dap_stream_worker_t * a_worker,dap_stream_ch_t * size_t dap_stream_ch_pkt_write_f_mt(dap_stream_worker_t * a_worker , dap_stream_ch_t *a_ch, uint8_t a_type, const char * a_str,...); size_t dap_stream_ch_pkt_write_mt(dap_stream_worker_t * a_worker , dap_stream_ch_t *a_ch, uint8_t a_type, const void * a_data, size_t a_data_size); + +size_t dap_stream_ch_pkt_write_f_inter(dap_events_socket_t * a_queue , dap_stream_ch_t *a_ch, uint8_t a_type, const char * a_str,...); +size_t dap_stream_ch_pkt_write_inter(dap_events_socket_t * a_queue , dap_stream_ch_t *a_ch, uint8_t a_type, const void * a_data, size_t a_data_size); diff --git a/dap-sdk/net/stream/stream/dap_stream_worker.c b/dap-sdk/net/stream/stream/dap_stream_worker.c index eda8d5a2ed6d4ef516160da7971f12c0e53edd50..fda2567375aa39ef4afd14538f61e3145ea58c5c 100644 --- a/dap-sdk/net/stream/stream/dap_stream_worker.c +++ b/dap-sdk/net/stream/stream/dap_stream_worker.c @@ -28,6 +28,13 @@ #define LOG_TAG "dap_stream_worker" +struct proc_thread_stream{ + dap_proc_thread_t * proc_thread; + dap_events_socket_t ** queue_ch_io_input; // Inputs for ch assign queues + dap_stream_ch_t * channels; // Client channels assigned on worker. Unsafe list, operate only in worker's context + pthread_rwlock_t channels_rwlock; +}; + static void s_ch_io_callback(dap_events_socket_t * a_es, void * a_msg); /** @@ -40,18 +47,44 @@ int dap_stream_worker_init() for (uint32_t i = 0; i < l_worker_count; i++){ dap_worker_t * l_worker = dap_events_worker_get(i); if (!l_worker) { - log_it(L_CRITICAL,"Can't init stream worker, woreker thread don't exist"); + log_it(L_CRITICAL,"Can't init stream worker,- worker thread don't exist"); return -2; } if (l_worker->_inheritor){ - log_it(L_CRITICAL,"Can't init stream worker, core worker has already inheritor"); + log_it(L_CRITICAL,"Can't init stream worker,- core worker has already inheritor"); return -1; } dap_stream_worker_t *l_stream_worker = DAP_NEW_Z(dap_stream_worker_t); + if(!l_stream_worker) + return -5; l_worker->_inheritor = l_stream_worker; l_stream_worker->worker = l_worker; pthread_rwlock_init( &l_stream_worker->channels_rwlock, NULL); + l_stream_worker->queue_ch_io = dap_events_socket_create_type_queue_ptr_mt( l_worker, s_ch_io_callback); + if(! l_stream_worker->queue_ch_io) + return -6; + } + for (uint32_t i = 0; i < l_worker_count; i++){ + dap_proc_thread_t * l_proc_thread = dap_proc_thread_get(i); + if (!l_proc_thread) { + log_it(L_CRITICAL,"Can't init stream proc thread,- proc thread don't exist"); + return -3; + } + if (l_proc_thread->_inheritor){ + log_it(L_CRITICAL,"Can't init stream worker, core worker has already inheritor"); + return -4; + } + struct proc_thread_stream * l_thread_stream = DAP_NEW_Z(struct proc_thread_stream); + if (!l_thread_stream) + return -7; + l_proc_thread->_inheritor = l_thread_stream; + l_thread_stream->queue_ch_io_input = DAP_NEW_Z_SIZE(dap_events_socket_t *, sizeof (dap_events_socket_t*)*l_worker_count); + for (uint32_t j = 0; j < l_worker_count; j++){ + dap_worker_t * l_worker = dap_events_worker_get(j); + dap_stream_worker_t *l_stream_worker = (dap_stream_worker_t*) l_worker->_inheritor; + l_thread_stream->queue_ch_io_input[i] = dap_events_socket_queue_ptr_create_input(l_stream_worker->queue_ch_io); + } } return 0; } @@ -89,3 +122,59 @@ static void s_ch_io_callback(dap_events_socket_t * a_es, void * a_msg) dap_stream_ch_pkt_write_unsafe(l_msg_ch, l_msg->ch_pkt_type, l_msg->data,l_msg->data_size); DAP_DELETE(l_msg); } + +/** + * @brief dap_proc_thread_stream_ch_write_inter + * @param a_thread + * @param a_worker + * @param a_ch + * @param a_type + * @param a_data + * @param a_data_size + * @return + */ +size_t dap_proc_thread_stream_ch_write_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_stream_ch_t *a_ch, uint8_t a_type, + const void * a_data, size_t a_data_size) +{ + struct proc_thread_stream * l_thread_stream = (struct proc_thread_stream *) a_thread->_inheritor; + dap_events_socket_t* l_es_input = l_thread_stream->queue_ch_io_input[a_worker->id]; + size_t l_ret = dap_stream_ch_pkt_write_inter(l_es_input,a_ch,a_type,a_data,a_data_size); + l_es_input->flags |= DAP_SOCK_READY_TO_WRITE; + dap_proc_thread_esocket_update_poll_flags(a_thread,l_es_input); + return l_ret; +} + +/** + * @brief dap_proc_thread_stream_ch_write_f_inter + * @param a_thread + * @param a_worker + * @param a_ch + * @param a_type + * @param a_format + * @return + */ +size_t dap_proc_thread_stream_ch_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_stream_ch_t *a_ch,uint8_t a_type, + const char * a_format,...) +{ + struct proc_thread_stream * l_thread_stream = (struct proc_thread_stream *) a_thread->_inheritor; + va_list ap, ap_copy; + va_start(ap,a_format); + va_copy(ap_copy, ap); + int l_data_size = dap_vsnprintf(NULL,0,a_format,ap); + va_end(ap); + if (l_data_size <0 ){ + log_it(L_ERROR,"Can't write out formatted data '%s' with values",a_format); + va_end(ap_copy); + return 0; + } + + dap_events_socket_t * l_es_io_input = l_thread_stream->queue_ch_io_input[a_worker->id]; + char * l_data = DAP_NEW_SIZE(char,l_data_size+1); if (!l_data) return -1; + l_data_size = dap_vsprintf(l_data,a_format,ap_copy); + va_end(ap_copy); + + size_t l_ret = dap_stream_ch_pkt_write_inter(l_es_io_input,a_ch,a_type, l_data, l_data_size); + l_es_io_input->flags |= DAP_SOCK_READY_TO_WRITE; + dap_proc_thread_esocket_update_poll_flags(a_thread, l_es_io_input); + return l_ret; +} diff --git a/dap-sdk/net/stream/stream/include/dap_stream_worker.h b/dap-sdk/net/stream/stream/include/dap_stream_worker.h index 2e84a650559d8546b1a675a47fa42c2161009da1..5a33b8eb9a5ebc0adac72f496c9f32615059517e 100644 --- a/dap-sdk/net/stream/stream/include/dap_stream_worker.h +++ b/dap-sdk/net/stream/stream/include/dap_stream_worker.h @@ -22,6 +22,7 @@ */ #pragma once #include "dap_worker.h" +#include "dap_proc_thread.h" #include "dap_stream_ch.h" typedef struct dap_stream_worker { @@ -43,3 +44,8 @@ typedef struct dap_stream_worker_msg_io { } dap_stream_worker_msg_io_t; int dap_stream_worker_init(); + +size_t dap_proc_thread_stream_ch_write_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_stream_ch_t *a_ch, + uint8_t a_type,const void * a_data, size_t a_data_size); +size_t dap_proc_thread_stream_ch_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_stream_ch_t *a_ch, + uint8_t a_type,const char * a_format,...); diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index ac22939f2b67e998de19c5df40fd2fff79927b6f..375f7b3f88080074df0b78abf39f31dd93515033 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -515,7 +515,7 @@ static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_worker, void *a_a UNUSED(a_worker); struct ch_chain_pkt_in * l_pkt_in = (struct ch_chain_pkt_in*) a_arg; - dap_stream_ch_chain_pkt_write_error(l_pkt_in->ch, l_pkt_in->pkt->pkt_hdr.net_id, + dap_stream_ch_chain_pkt_write_error_unsafe(l_pkt_in->ch, l_pkt_in->pkt->pkt_hdr.net_id, l_pkt_in->pkt->pkt_hdr.chain_id, l_pkt_in->pkt->pkt_hdr.cell_id, "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED"); dap_stream_ch_set_ready_to_write_unsafe(l_pkt_in->ch, true); @@ -679,7 +679,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_chain_pkt_data_size, l_ch_chain->callback_notify_arg); } } else { - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, "ERROR_NET_INVALID_ID"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); @@ -689,7 +689,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if (a_ch->stream->session->acl && !a_ch->stream->session->acl[l_acl_idx]) { log_it(L_WARNING, "Unauthorized request attempt to network %s", dap_chain_net_by_id(l_chain_pkt->hdr.net_id)->pub.name); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, "ERROR_NET_NOT_AUTHORIZED"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); @@ -781,7 +781,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64); }else{ log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN", l_chain_pkt_data_size); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, "ERROR_CHAIN_PACKET_TYPE_FIRST_CHAIN_INCORRET_DATA_SIZE(%zd/%zd)",l_chain_pkt_data_size, sizeof(dap_chain_node_addr_t)); } @@ -831,7 +831,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { if(l_ch_chain->is_on_request){ log_it(L_WARNING, "Can't process SYNC_CHAINS request because its already busy with syncronization"); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); @@ -852,7 +852,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) log_it(L_INFO, "Can't process SYNC_CHAINS request between %s and %s because not in idle state", l_hash_from_str? l_hash_from_str:"(null)", l_hash_to_str?l_hash_to_str:"(null)"); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, "ERROR_STATE_NOT_IN_IDLE"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); @@ -869,7 +869,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) DAP_DELETE(l_hash_to_str); }else{ log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, "ERROR_CHAIN_PKT_DATA_SIZE" ); } @@ -879,7 +879,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { if(l_ch_chain->state != CHAIN_STATE_IDLE) { log_it(L_WARNING, "Can't process SYNC_GLOBAL_DB request because not in idle state"); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, "ERROR_STATE_NOT_IN_IDLE"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); @@ -887,7 +887,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } else { // receive the latest global_db revision of the remote node -> go to send mode if(l_ch_chain->is_on_request){ log_it(L_WARNING, "Can't process SYNC_CHAINS request because its already busy with syncronization"); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, "ERROR_SYNC_REQUEST_ALREADY_IN_PROCESS"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); @@ -911,7 +911,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } }else{ log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, "ERROR_CHAIN_PKT_DATA_SIZE" ); } @@ -946,7 +946,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_sync_in_chains_callback, l_sync_request); } else { log_it(L_WARNING, "Empty chain packet"); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, "ERROR_CHAIN_PACKET_EMPTY"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); @@ -964,7 +964,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64, NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr) ); }else { log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB", l_chain_pkt_data_size); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, "ERROR_CHAIN_PACKET_TYPE_FIRST_GLOBAL_DB_INCORRET_DATA_SIZE"); } @@ -989,7 +989,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_proc_queue_add_callback_inter(a_ch->stream_worker->worker->proc_queue_input, s_gdb_in_pkt_proc_callback, l_sync_request); } else { log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size"); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, "ERROR_GLOBAL_DB_PACKET_EMPTY"); dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); @@ -1026,7 +1026,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } }else{ log_it(L_WARNING, "DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: Wrong chain packet size %zd when expected %zd", l_chain_pkt_data_size, sizeof(l_ch_chain->request)); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, "ERROR_CHAIN_PKT_DATA_SIZE" ); } @@ -1035,7 +1035,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR: break; default: { - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, "ERROR_UNKNOWN_CHAIN_PKT_TYPE"); } diff --git a/modules/channel/chain/dap_stream_ch_chain_pkt.c b/modules/channel/chain/dap_stream_ch_chain_pkt.c index 19cc566016f4a6da7dced3ae83de9970c270cd00..d547e587d1201f379a884b4a2b55b5cf59c0042c 100644 --- a/modules/channel/chain/dap_stream_ch_chain_pkt.c +++ b/modules/channel/chain/dap_stream_ch_chain_pkt.c @@ -16,6 +16,7 @@ #endif #include "dap_stream_ch.h" +#include "dap_stream_worker.h" #include "dap_stream_ch_pkt.h" #include "dap_stream_ch_chain_pkt.h" #include "dap_chain.h" @@ -68,6 +69,7 @@ size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_typ return l_ret; } + size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id, dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, const void * a_data, size_t a_data_size) @@ -87,3 +89,36 @@ size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_strea DAP_DELETE(l_chain_pkt); return l_ret; } + +/** + * @brief dap_stream_ch_chain_pkt_write_inter + * @param a_thread + * @param a_worker + * @param a_ch + * @param a_type + * @param a_net_id + * @param a_chain_id + * @param a_cell_id + * @param a_data + * @param a_data_size + * @return + */ +size_t dap_stream_ch_chain_pkt_write_inter(dap_proc_thread_t * a_thread, dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id, + dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, + const void * a_data, size_t a_data_size) +{ + dap_stream_ch_chain_pkt_t * l_chain_pkt; + size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr) + a_data_size; + l_chain_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_pkt_t, l_chain_pkt_size ); + l_chain_pkt->hdr.version = 1; + l_chain_pkt->hdr.net_id.uint64 = a_net_id.uint64; + l_chain_pkt->hdr.cell_id.uint64 = a_cell_id.uint64; + l_chain_pkt->hdr.chain_id.uint64 = a_chain_id.uint64; + + if (a_data_size && a_data) + memcpy( l_chain_pkt->data, a_data, a_data_size); + + size_t l_ret = dap_proc_thread_stream_ch_write_inter(a_thread, a_worker->worker, a_ch, a_type , l_chain_pkt, l_chain_pkt_size); + DAP_DELETE(l_chain_pkt); + return l_ret; +} diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index 2cf9448dce513401c2c032e92ad355d33c30b02c..e6b333c3bc152cf747ece0e524e0ab194f559e5f 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -29,6 +29,7 @@ #include "dap_chain_common.h" #include "dap_chain.h" #include "dap_chain_global_db_hist.h" +#include "dap_chain_node_client.h" #include "dap_list.h" #include "dap_stream_ch_chain_pkt.h" #include "uthash.h" @@ -62,8 +63,8 @@ typedef struct dap_stream_ch_chain_hash_item{ typedef struct dap_stream_ch_chain { dap_stream_ch_t * ch; - dap_stream_ch_chain_state_t state; + dap_chain_node_client_t * node_client; // Node client associated with stream uint64_t stats_request_atoms_processed; uint64_t stats_request_gdb_processed; diff --git a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h index 8e36ebba3c1be85885d286e33e087225a6c8cf0f..388400ae1a6fdcde67f45b16036838d013a0e01f 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h +++ b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h @@ -29,6 +29,7 @@ #include <stdarg.h> #include "dap_common.h" +#include "dap_proc_thread.h" #include "dap_chain_common.h" #include "dap_chain_datum.h" #include "dap_chain_cs.h" @@ -134,7 +135,20 @@ size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_strea dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, const void * a_data, size_t a_data_size); -inline static size_t dap_stream_ch_chain_pkt_write_error(dap_stream_ch_t *a_ch, dap_chain_net_id_t a_net_id, +size_t dap_stream_ch_chain_pkt_write_inter(dap_proc_thread_t * a_thread, dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id, + dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, + const void * a_data, size_t a_data_size); + +/** + * @brief dap_stream_ch_chain_pkt_write_error_unsafe + * @param a_ch + * @param a_net_id + * @param a_chain_id + * @param a_cell_id + * @param a_err_string_format + * @return + */ +inline static size_t dap_stream_ch_chain_pkt_write_error_unsafe(dap_stream_ch_t *a_ch, dap_chain_net_id_t a_net_id, dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, const char * a_err_string_format,... ) { va_list l_va; @@ -152,3 +166,33 @@ inline static size_t dap_stream_ch_chain_pkt_write_error(dap_stream_ch_t *a_ch, return 0; } } + +/** + * @brief dap_stream_ch_chain_pkt_write_error_inter + * @param a_thread + * @param a_stream_worker + * @param a_ch + * @param a_net_id + * @param a_chain_id + * @param a_cell_id + * @param a_err_string_format + * @return + */ +inline static size_t dap_stream_ch_chain_pkt_write_error_inter(dap_proc_thread_t * a_thread, dap_stream_worker_t * a_stream_worker, dap_stream_ch_t *a_ch, dap_chain_net_id_t a_net_id, + dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id, const char * a_err_string_format,... ) +{ + va_list l_va; + char * l_str; + va_start(l_va, a_err_string_format); + int l_size = vsnprintf(NULL,0,a_err_string_format,l_va); + if(l_size >0){ + l_size++; + l_str = DAP_NEW_S_SIZE(char, l_size); + vsnprintf(l_str,l_size,a_err_string_format,l_va); + va_end(l_va); + return dap_stream_ch_chain_pkt_write_inter(a_thread, a_stream_worker, a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR, a_net_id, a_chain_id, a_cell_id, l_str,l_size ); + }else{ + va_end(l_va); + return 0; + } +} diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 05dccc9db8336079bc5c520c60695774ddb0272c..e288b5606a55fceae220a717f4984980ae6d97f5 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -142,7 +142,7 @@ typedef struct dap_chain_net_pvt{ // Established links dap_list_t *links; // Links list - size_t links_count; + size_t links_connected_count; // Prepared links dap_list_t *links_info; // Links info list @@ -211,15 +211,9 @@ static void s_node_link_callback_stage(dap_chain_node_client_t * a_node_client,d static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client, int a_error, void * a_arg); static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg); -static bool s_node_link_states_proc(dap_proc_thread_t *a_thread, void *a_arg); static void s_net_state_link_prepare_success(dap_worker_t * a_worker,dap_chain_node_info_t * a_node_info, void * a_arg); static void s_net_state_link_prepare_error(dap_worker_t * a_worker,dap_chain_node_info_t * a_node_info, void * a_arg, int a_errno); - - - -static bool s_net_check_timer_callback ( void * a_net); -static void s_net_check_thread_start( dap_chain_net_t * a_net ); static void s_net_proc_kill( dap_chain_net_t * a_net ); int s_net_load(const char * a_net_name, uint16_t a_acl_idx); @@ -435,30 +429,29 @@ static void s_node_link_callback_connected(dap_chain_node_client_t * a_node_clie log_it(L_NOTICE, "Established connection with %s."NODE_ADDR_FP_STR,l_net->pub.name, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address)); pthread_rwlock_wrlock(&l_net_pvt->rwlock); l_net_pvt->links = dap_list_append(l_net_pvt->links, a_node_client); - l_net_pvt->links_count++; + l_net_pvt->links_connected_count++; + + // If we're fist time here - initiate the GDB sync + if (! a_node_client->is_reconnecting){ + dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; + // Get last timestamp in log if wasn't SYNC_FROM_ZERO flag + if (! (l_net_pvt->flags & F_DAP_CHAIN_NET_SYNC_FROM_ZERO) ) + l_sync_gdb.id_start = (uint64_t) dap_db_get_last_id_remote(a_node_client->remote_node_addr.uint64); + l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); + log_it(L_DEBUG, "Prepared request to gdb sync from %llu to %llu", l_sync_gdb.id_start, l_sync_gdb.id_end?l_sync_gdb.id_end:-1 ); + // find dap_chain_id_t + dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb"); + dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t ) {0}; + dap_stream_ch_chain_pkt_write_unsafe( a_node_client->ch_chain, + DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id, + l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); + } a_node_client->is_reconnecting = false; if(l_net_pvt->state == NET_STATE_LINKS_CONNECTING ){ l_net_pvt->state = NET_STATE_LINKS_ESTABLISHED; - - a_node_client->stream_worker = dap_client_get_stream_worker(a_node_client->client); - a_node_client->ch_chain = dap_client_get_stream_ch_unsafe(a_node_client->client, dap_stream_ch_chain_get_id()); - if (a_node_client->ch_chain){ - a_node_client->ch_chain_uuid = a_node_client->ch_chain->uuid; - a_node_client->ch_chain_net = dap_client_get_stream_ch_unsafe(a_node_client->client, dap_stream_ch_chain_net_get_id()); - if (a_node_client->ch_chain_net){ - a_node_client->ch_chain_net_uuid = a_node_client->ch_chain_net->uuid; - }else{ - log_it(L_WARNING,"No channel 'chain net' in stream connection"); - } - dap_proc_queue_add_callback_inter(dap_client_get_stream_worker(a_node_client->client)->worker->proc_queue_input, - s_node_link_states_proc, a_node_client); - }else{ - log_it(L_CRITICAL,"No channel 'chain' in stream connection, disconnecting link"); - l_net_pvt->state = NET_STATE_LINKS_CONNECTING; - dap_client_go_stage(a_node_client->client,STAGE_BEGIN,NULL); - } + dap_proc_queue_add_callback_inter(a_node_client->stream_worker->worker->proc_queue_input,s_net_states_proc,l_net ); } pthread_rwlock_unlock(&l_net_pvt->rwlock); @@ -482,10 +475,10 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t * a_node_c a_node_client->is_reconnecting = true; dap_chain_node_client_create_n_connect(l_net, a_node_client->info,"CN", - s_node_link_callback_connected, - s_node_link_callback_disconnected, - s_node_link_callback_stage, - s_node_link_callback_error,NULL); + s_node_link_callback_connected, + s_node_link_callback_disconnected, + s_node_link_callback_stage, + s_node_link_callback_error,NULL); }else if (l_net_pvt->state_target == NET_STATE_OFFLINE){ log_it(L_INFO, "%s."NODE_ADDR_FP_STR" disconnected",l_net->pub.name,NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address)); @@ -494,8 +487,8 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t * a_node_c ,NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address) , c_net_states[l_net_pvt->state_target] ); } - if(l_net_pvt->links_count) - l_net_pvt->links_count--; + if(l_net_pvt->links_connected_count) + l_net_pvt->links_connected_count--; else log_it(L_CRITICAL,"Links count is zero in disconnected callback, looks smbd decreased it twice or forget to increase on connect/reconnect"); pthread_rwlock_unlock(&l_net_pvt->rwlock); @@ -525,6 +518,30 @@ static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client, { dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg; log_it(L_WARNING, "Can't establish link with %s."NODE_ADDR_FP_STR, l_net->pub.name, NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address)); +} + +/** + * @brief s_node_link_callback_delete + * @param a_node_client + * @param a_arg + */ +static void s_node_link_callback_delete(dap_chain_node_client_t * a_node_client, void * a_arg) +{ + log_it(L_DEBUG,"Remove node client from list"); + dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg; + dap_chain_net_pvt_t * l_net_pvt = PVT(l_net); + pthread_rwlock_wrlock(&l_net_pvt->rwlock); + for ( dap_list_t * it = l_net_pvt->links; it; it=it->next ){ + dap_chain_node_client_t * l_client =(dap_chain_node_client_t *) it->data; + // Cut out current iterator if it equals with deleting handler + if (l_client == a_node_client){ + if (it->prev) + it->prev->next = it->next; + if (it->next) + it->next->prev = it->prev; + } + } + pthread_rwlock_unlock(&l_net_pvt->rwlock); dap_chain_node_client_close(a_node_client); } @@ -582,205 +599,6 @@ static void s_net_state_link_prepare_error(dap_worker_t * a_worker,dap_chain_nod } } -/** - * @brief s_node_link_states_proc - * @param a_thread - * @param a_arg - * @return - */ -static bool s_node_link_states_proc(dap_proc_thread_t *a_thread, void *a_arg) -{ - bool l_repeate_after_exit = false; - dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t*) a_arg; - assert(l_node_client); - - dap_chain_net_pvt_t * l_net_pvt = PVT(l_node_client->net); - switch (l_node_client->state) { - case NODE_CLIENT_STATE_ESTABLISHED: - if(l_node_client->sync_chains){ - l_node_client->state = NODE_CLIENT_STATE_SYNC_CHAINS; - l_repeate_after_exit = true; - } - if(l_node_client->sync_gdb){ - l_node_client->state = NODE_CLIENT_STATE_SYNC_GDB; - l_repeate_after_exit = true; - } - break; - case NODE_CLIENT_STATE_SYNC_GDB:{ - dap_stream_worker_t *l_worker = dap_client_get_stream_worker(l_node_client->client); - dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, dap_stream_ch_chain_get_id()); - if ( !l_ch_chain) { // Channel or stream or client itself closed - l_tmp = dap_list_next(l_tmp); - dap_chain_node_client_close(l_node_client); - l_net_pvt->links = dap_list_remove(l_net_pvt->links, l_node_client); - continue; - } - - dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; - // Get last timestamp in log if wasn't SYNC_FROM_ZERO flag - if (! (l_net_pvt->flags & F_DAP_CHAIN_NET_SYNC_FROM_ZERO) ) - l_sync_gdb.id_start = (uint64_t) dap_db_get_last_id_remote(l_node_client->remote_node_addr.uint64); - l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); - log_it(L_DEBUG, "Prepared request to gdb sync from %llu to %llu", l_sync_gdb.id_start, l_sync_gdb.id_end?l_sync_gdb.id_end:-1 ); - // find dap_chain_id_t - dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb"); - dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t ) {0}; - dap_chain_node_client_reset(l_node_client); - size_t l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id, - l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); - if (l_res == 0) { - log_it(L_WARNING, "Can't send GDB sync request"); - continue; - } - - // wait for finishing of request - int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms - // TODO add progress info to console - l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); - switch (l_res) { - case -1: - log_it(L_WARNING, "Timeout with link sync gdb"); - break; - case 0: - log_it(L_INFO, "Node sync gdb completed"); - break; - default: - log_it(L_INFO, "Node sync gdb error %d",l_res); - } - - dap_chain_node_client_reset(l_node_client); - l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS, l_net->pub.id, - l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); - l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); - switch (l_res) { - case -1: - log_it(L_WARNING, "Timeout with reverse link gdb sync"); - break; - case 0: - log_it(L_INFO, "Node reverse gdb sync completed"); - break; - default: - log_it(L_INFO, "Node reverse gdb sync error %d",l_res); - } - - // ----- - if (!l_net_pvt->links) { - l_net_pvt->state = NET_STATE_LINKS_PREPARE; - } else if (l_net_pvt->state_target >= NET_STATE_SYNC_CHAINS) { - l_net_pvt->state = NET_STATE_SYNC_CHAINS; - } else { // Synchronization done, go offline - log_it(L_INFO, "Synchronization done, go offline"); - l_net_pvt->flags ^= F_DAP_CHAIN_NET_GO_SYNC; - l_net_pvt->flags ^= F_DAP_CHAIN_NET_SYNC_FROM_ZERO; - l_net_pvt->state = l_net_pvt->state_target = NET_STATE_OFFLINE; - } - } break; - case NODE_CLIENT_STATE_SYNC_CHAINS:{ - dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)l_tmp->data; - dap_stream_ch_t *l_ch_chain = dap_client_get_stream_ch_unsafe(l_node_client->client, dap_stream_ch_chain_get_id()); - if (!l_ch_chain) { // Channel or stream or client itself closed - l_tmp = dap_list_next(l_tmp); - dap_chain_node_client_close(l_node_client); - l_net_pvt->links = dap_list_remove(l_net_pvt->links, l_node_client); - continue; - } - dap_stream_worker_t *l_worker = dap_client_get_stream_worker(l_node_client->client); - dap_chain_t * l_chain = NULL; - int l_res = 0; - DL_FOREACH (l_net->pub.chains, l_chain) { - dap_chain_node_client_reset(l_node_client); - dap_stream_ch_chain_sync_request_t l_request = {0}; - - // TODO: Uncomment next block when finish with partial updates - /* - if (! (l_pvt_net->flags & F_DAP_CHAIN_NET_SYNC_FROM_ZERO) ) - dap_chain_get_atom_last_hash(l_chain,&l_request.hash_from); - */ - - if ( !dap_hash_fast_is_blank(&l_request.hash_from) ){ - if(dap_log_level_get() <= L_DEBUG){ - char l_hash_str[128]={[0]='\0'}; - dap_chain_hash_fast_to_str(&l_request.hash_from,l_hash_str,sizeof (l_hash_str)-1); - log_it(L_DEBUG,"Send sync chain request to"NODE_ADDR_FP_STR" for %s:%s from %s to infinity", - NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr) ,l_net->pub.name, l_chain->name, l_hash_str); - } - }else - log_it(L_DEBUG,"Send sync chain request for all the chains for addr "NODE_ADDR_FP_STR, - NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr)); - dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id, - l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request)); - // wait for finishing of request - int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms - // TODO add progress info to console - l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); - switch (l_res) { - case -1: - //log_it(L_WARNING, "Timeout with sync of chain '%s' ", l_chain->name); - break; - case 0: - l_need_flush = true; - log_it(L_INFO, "Sync of chain '%s' completed ", l_chain->name); - break; - default: - log_it(L_ERROR, "Sync of chain '%s' error %d", l_chain->name,l_res); - } - - - dap_chain_node_client_reset(l_node_client); - - l_request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); - dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS, l_net->pub.id, - l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request)); - l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); - switch (l_res) { - case -1: - //log_it(L_WARNING, "Timeout with reverse sync of chain '%s' ", l_chain->name); - break; - case 0: - l_need_flush = true; - log_it(L_INFO, "Reverse sync of chain '%s' completed ", l_chain->name); - // set time of last sync - { - struct timespec l_to; - clock_gettime(CLOCK_MONOTONIC, &l_to); - l_net_pvt->last_sync = l_to.tv_sec; - } - break; - default: - log_it(L_ERROR, "Reverse sync of chain '%s' error %d", l_chain->name,l_res); - } - - } - - ///------------------- - if (l_need_flush) { - // flush global_db - dap_chain_global_db_flush(); - } - if (!l_net_pvt->links ) { - log_it( L_INFO,"Return back to state LINKS_PREPARE "); - l_net_pvt->state = NET_STATE_LINKS_PREPARE; - } else { - if (l_net_pvt->state_target == NET_STATE_ONLINE) { - l_net_pvt->flags ^= F_DAP_CHAIN_NET_GO_SYNC; - l_net_pvt->flags ^= F_DAP_CHAIN_NET_SYNC_FROM_ZERO; - l_net_pvt->state = NET_STATE_ONLINE; - log_it(L_INFO, "Synchronization done, status online"); - } else { // Synchronization done, go offline - l_net_pvt->state = l_net_pvt->state_target = NET_STATE_OFFLINE; - log_it(L_INFO, "Synchronization done, go offline"); - } - } - }break; - case NODE_CLIENT_STATE_SYNCED: - break; - default:{ - log_it(L_WARNING,"Non-processing node client state %d", l_node_client->state); - } - } - return l_repeate_after_exit; -} - /** * @brief s_net_states_proc * @param l_net @@ -927,29 +745,16 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) log_it(L_DEBUG, "%s.state: NET_STATE_LINKS_CONNECTING",l_net->pub.name); for (dap_list_t *l_tmp = l_net_pvt->links_info; l_tmp; l_tmp = dap_list_next(l_tmp)) { dap_chain_node_info_t *l_link_info = (dap_chain_node_info_t *)l_tmp->data; - dap_chain_node_client_t *l_node_client = dap_chain_node_client_create_n_connect(l_net, l_link_info,"CN",s_node_link_callback_connected, - s_node_link_callback_disconnected,s_node_link_callback_stage, - s_node_link_callback_error,NULL); + (void) dap_chain_node_client_create_n_connect(l_net, l_link_info,"CN",s_node_link_callback_connected, + s_node_link_callback_disconnected,s_node_link_callback_stage, + s_node_link_callback_error,NULL); } } break; - + case NET_STATE_LINKS_ESTABLISHED:{ + // TODO call some callbacks? + }break; case NET_STATE_ONLINE: { - if (l_net_pvt->flags & F_DAP_CHAIN_NET_GO_SYNC) - { - switch ( l_net_pvt->state_target) { - // disconnect - case NET_STATE_OFFLINE: - l_net_pvt->state = NET_STATE_OFFLINE; - log_it(L_NOTICE, "Going to disconnet"); - break; - case NET_STATE_ONLINE: - case NET_STATE_SYNC_GDB: - case NET_STATE_SYNC_CHAINS: - l_net_pvt->state = NET_STATE_SYNC_GDB; - break; - default: break; - } - } + // TODO call some callbacks? } break; default: log_it (L_DEBUG, "Unprocessed state"); @@ -959,17 +764,6 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) return l_repeat_after_exit; } -/** - * @brief net_proc_start - * @param a_cfg - */ -static void s_net_check_thread_start( dap_chain_net_t * a_net ) -{ - PVT(a_net)->main_timer = dap_timerfd_start(dap_config_get_item_uint64_default(g_config,"chain_net","net_check_timeout",10)*1000, - s_net_check_timer_callback, a_net); -} - - dap_chain_node_role_t dap_chain_net_get_role(dap_chain_net_t * a_net) { return PVT(a_net)->node_role; diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index c2d34c109afc2f97731f6db176cde09dccde5028..a8a04ab8476c371a53556fe9a8dde9371748674c 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -407,7 +407,7 @@ static void s_ch_chain_callback_notify_packet_R(dap_stream_ch_chain_net_srv_t* a dap_chain_node_client_t* dap_chain_client_connect(dap_chain_node_info_t *a_node_info, const char *a_active_channels) { - return dap_chain_node_client_create_n_connect(a_node_info,a_active_channels,NULL,NULL,NULL,NULL,NULL); + return dap_chain_node_client_create_n_connect(NULL,a_node_info,a_active_channels,NULL,NULL,NULL,NULL,NULL); } /**