diff --git a/CMakeLists.txt b/CMakeLists.txt index 8e9b4502acb5c37eb4ca726f26322496dba8949f..619301c989ea9bf3628b9e0803bc89bf3eba787b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.6-90") +set(CELLFRAME_SDK_NATIVE_VERSION "2.6-91") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index 520941545fbb9dc3a12a6bc0b9969093c88c86a2..2849e67a2570ca1a1f29a5f92d351776f9f541a4 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -375,6 +375,8 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin if (l_socket == -1) { log_it(L_ERROR, "Error %d with socket create", errno); #endif + if(a_error_callback) + a_error_callback(errno,a_obj); return NULL; } // Get socket flags @@ -386,11 +388,17 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin int l_socket_flags = fcntl(l_socket, F_GETFL); if (l_socket_flags == -1){ log_it(L_ERROR, "Error %d can't get socket flags", errno); + if(a_error_callback) + a_error_callback(errno,a_obj); + return NULL; } // Make it non-block if (fcntl( l_socket, F_SETFL,l_socket_flags| O_NONBLOCK) == -1){ log_it(L_ERROR, "Error %d can't get socket flags", errno); + if(a_error_callback) + a_error_callback(errno,a_obj); + return NULL; } #endif @@ -441,6 +449,10 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin s_client_http_delete( l_http_pvt); l_ev_socket->_inheritor = NULL; dap_events_socket_delete_unsafe( l_ev_socket, true); + + if(a_error_callback) + a_error_callback(errno,a_obj); + return NULL; } } @@ -489,6 +501,9 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin log_it(L_ERROR, "Connecting error: \"%s\" (code %d)", l_errbuf, l_err); s_client_http_delete( l_http_pvt); dap_events_socket_delete_unsafe( l_ev_socket, true); + if(a_error_callback) + a_error_callback(errno,a_obj); + return NULL; } #endif diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index e80282157b1e0c33ec3a9a9071d5f4c7c8a2b866..507de9cc2b964d6d70133cea2692b40c9a8faf0c 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -1001,12 +1001,12 @@ static void add_ptr_to_buf(dap_events_socket_t * a_es, void* a_arg) int dap_events_socket_queue_ptr_send_to_input(dap_events_socket_t * a_es_input, void * a_arg) { volatile void * l_arg = a_arg; - if (a_es_input->buf_out_size >= sizeof(void*)) { + /*if (a_es_input->buf_out_size >= sizeof(void*)) { if (memcmp(a_es_input->buf_out + a_es_input->buf_out_size - sizeof(void*), a_arg, sizeof(void*))) { - //log_it(L_INFO, "Ptr 0x%x already present in input, drop it", a_arg); + log_it(L_INFO, "Ptr 0x%x already present in input, drop it", a_arg); return 2; } - } + }*/ return dap_events_socket_write_unsafe(a_es_input, &l_arg, sizeof(l_arg)) == sizeof(l_arg) ? 0 : 1; } diff --git a/dap-sdk/net/core/dap_proc_queue.c b/dap-sdk/net/core/dap_proc_queue.c index 7f31abf79f0af370491fffc001ded4e6da2928d4..c15065b8f003f4951d4d29581a46b8ba5f9959ac 100644 --- a/dap-sdk/net/core/dap_proc_queue.c +++ b/dap-sdk/net/core/dap_proc_queue.c @@ -72,23 +72,28 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg) assert(l_msg); // We have callback to add in list if (l_msg->callback) { - dap_proc_queue_item_t * l_item = DAP_NEW_Z(dap_proc_queue_item_t); if (! l_item) return; + dap_proc_queue_item_t * l_item = DAP_NEW_Z(dap_proc_queue_item_t); + if (! l_item){ + log_it(L_CRITICAL,"Can't allocate memory for callback item, exiting"); + return; + } l_item->callback = l_msg->callback; l_item->callback_arg = l_msg->callback_arg; if ( l_queue->item_last) - l_queue->item_last = l_item; + l_queue->item_last->prev = l_item; l_item->next=l_queue->item_last ; - l_queue->item_last = l_item; - if( !l_queue->item_first) + if( l_queue->item_first == NULL){ + //log_it( L_DEBUG, "Added callback %p/%p in proc thread %u callback queue: first in list", l_msg->callback,l_msg->callback_arg, l_queue->proc_thread->cpu_id); l_queue->item_first = l_item; + }//else + // log_it( L_DEBUG, "Added callback %p/%p in proc thread %u callback queue: last in list", l_msg->callback,l_msg->callback_arg, l_queue->proc_thread->cpu_id); // Add on top so after call this callback will be executed first dap_events_socket_event_signal(l_queue->proc_thread->proc_event,1); - //log_it( L_DEBUG, "Sent signal to proc thread that we have callback %p/%p on board", l_msg->callback,l_msg->callback_arg); } if (l_msg->signal_kill){ // Say to kill this object and delete its inherior dap_proc_queue_t a_es->flags |= DAP_SOCK_SIGNAL_CLOSE; @@ -121,6 +126,8 @@ void dap_proc_queue_add_callback_inter( dap_events_socket_t * a_es_input, dap_pr dap_proc_queue_msg_t * l_msg = DAP_NEW_Z(dap_proc_queue_msg_t); if (!l_msg) return; l_msg->callback = a_callback; l_msg->callback_arg = a_callback_arg; + //log_it( L_DEBUG, "Sent inter callback %p/%p to queue input", l_msg->callback,l_msg->callback_arg); + dap_events_socket_queue_ptr_send_to_input( a_es_input , l_msg ); } diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 781151fc23bd3f490a1293f93342f67e75fb8d3c..c8e258ac3e2dd368ed987bad9f8d320d23f0b216 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -124,33 +124,40 @@ dap_proc_thread_t * dap_proc_thread_get_auto() static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_value) { (void) a_value; - //log_it(L_DEBUG, "Proc event callback"); +// log_it(L_DEBUG, "--> Proc event callback start"); dap_proc_thread_t * l_thread = (dap_proc_thread_t *) a_esocket->_inheritor; dap_proc_queue_item_t * l_item = l_thread->proc_queue->item_first; dap_proc_queue_item_t * l_item_old = NULL; bool l_is_anybody_for_repeat=false; while(l_item){ +// log_it(L_INFO, "Proc event callback: %p/%p", l_item->callback, l_item->callback_arg); bool l_is_finished = l_item->callback(l_thread, l_item->callback_arg); if (l_is_finished){ + if ( l_item->prev ){ + l_item->prev->next = l_item_old; + } if(l_item_old){ - if ( ! l_item->next ){ // We deleted tail + l_item_old->prev = l_item->prev; + + if ( ! l_item->prev ) { // We deleted tail l_thread->proc_queue->item_last = l_item_old; } - l_item_old->prev = l_item->next; + DAP_DELETE(l_item); l_item = l_item_old->prev; }else{ - l_thread->proc_queue->item_first = l_item->next; - if (l_thread->proc_queue->item_first) - l_thread->proc_queue->item_first->prev = NULL; // Prev if it was - now its NULL - else + l_thread->proc_queue->item_first = l_item->prev; + if ( l_item->prev){ + l_item->prev->next = NULL; // Prev if it was - now its NULL + }else l_thread->proc_queue->item_last = NULL; // NULL last item DAP_DELETE(l_item); l_item = l_thread->proc_queue->item_first; } - +// log_it(L_DEBUG, "Proc event finished"); }else{ +// log_it(L_DEBUG, "Proc event not finished"); l_item_old = l_item; l_item=l_item->prev; } @@ -158,6 +165,7 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_va } if(l_is_anybody_for_repeat) // Arm event if we have smth to proc again dap_events_socket_event_signal(a_esocket,1); +// log_it(L_DEBUG, "<-- Proc event callback end"); } /** @@ -518,8 +526,6 @@ static void * s_proc_thread_function(void * a_arg) if (l_cur->flags & DAP_SOCK_QUEUE_PTR){ #if defined(DAP_EVENTS_CAPS_QUEUE_PIPE2) l_bytes_sent = write(l_cur->socket, l_cur->buf_out, sizeof (void *) ); // We send pointer by pointer - #elif defined DAP_EVENTS_CAPS_QUEUE_POSIX - l_bytes_sent = mq_send(a_es->mqd, (const char *) l_cur->buf_out, sizeof (void *),0); #elif defined DAP_EVENTS_CAPS_MSMQ DWORD l_mp_id = 0; MQMSGPROPS l_mps; @@ -552,12 +558,21 @@ static void * s_proc_thread_function(void * a_arg) break; } #elif defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE) - l_bytes_sent = mq_send(l_cur->mqd, (const char *) l_cur->buf_out, sizeof (void *),0); - if (l_bytes_sent==0) + volatile char * l_ptr = (char *) l_cur->buf_out; + void *l_ptr_in; + memcpy(&l_ptr_in,l_ptr, sizeof (l_ptr_in) ); + + l_bytes_sent = mq_send(l_cur->mqd, l_ptr, sizeof (l_ptr),0); + if (l_bytes_sent==0){ +// log_it(L_DEBUG,"mq_send %p success", l_ptr_in); l_bytes_sent = sizeof (void *); - if (l_bytes_sent == -1 && errno == EINVAL) // To make compatible with other + }else if (l_bytes_sent == -1 && errno == EINVAL){ // To make compatible with other l_errno = EAGAIN; // non-blocking sockets - +// log_it(L_DEBUG,"mq_send %p EAGAIN", l_ptr_in); + }else{ + l_errno = errno; + log_it(L_WARNING,"mq_send %p errno: %d", l_ptr_in, l_errno); + } #else #error "Not implemented dap_events_socket_queue_ptr_send() for this platform" #endif diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index dcb1527bd513c32a9dea70f0c951a38ec37b02b0..8c0db0e7113ae17cceb728a060473721aef75f7d 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -411,6 +411,12 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) if ( l_chain->callback_atom_find_by_hash(l_atom_iter, &l_atom_hash, &l_atom_size) == NULL ) { dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size); if (l_atom_add_res == ATOM_ACCEPT && dap_chain_has_file_store(l_chain)) { + if (s_debug_chain_sync){ + char l_atom_hash_str[72]={[0]='\0'}; + dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); + log_it(L_INFO,"Accepted atom with hash %s for %s:%s", l_atom_hash_str, l_chain->net_name, l_chain->name); + } + // append to file dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_pkt_item->pkt_hdr.cell_id); int l_res; @@ -454,10 +460,21 @@ static bool s_sync_in_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) log_it(L_ERROR, "Can't get cell for cell_id 0x%x for save event to file", l_pkt_item->pkt_hdr.cell_id); } + }else{ + if (s_debug_chain_sync){ + char l_atom_hash_str[72]={[0]='\0'}; + dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); + log_it(L_WARNING,"Not accepted atom (code %d) with hash %s for %s:%s", l_atom_add_res, l_atom_hash_str, l_chain->net_name, l_chain->name); + } } if(l_atom_add_res == ATOM_PASS) DAP_DELETE(l_atom_copy); } else { + if (s_debug_chain_sync){ + char l_atom_hash_str[72]={[0]='\0'}; + dap_chain_hash_fast_to_str(&l_atom_hash,l_atom_hash_str,sizeof (l_atom_hash_str)-1 ); + log_it(L_WARNING,"Already has atom with hash %s ", l_atom_hash_str); + } dap_db_set_last_hash_remote(l_sync_request->request.node_addr.uint64, l_chain, &l_atom_hash); DAP_DELETE(l_atom_copy); } diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 936a36ad3c92cfd12aa660d89c84825907e8152d..258e8f8fb2fe22f43699cc1b4744a48c23b4a32c 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -540,8 +540,11 @@ static int s_net_states_proc(dap_chain_net_t *a_net) if (dap_list_length(l_pvt_net->links_info)) { l_pvt_net->state = NET_STATE_LINKS_CONNECTING; log_it(L_DEBUG, "Prepared %u links, start to establish them", dap_list_length(l_pvt_net->links_info)); + }else{ // If no links prepared go to offline + log_it(L_WARNING, "Not foun any links, return back to offline"); + l_pvt_net->state = NET_STATE_OFFLINE; + l_pvt_net->state_target = NET_STATE_OFFLINE; } - // If no links prepared go again to NET_STATE_LINKS_PREPARE } else { l_pvt_net->state = NET_STATE_OFFLINE; } @@ -1586,8 +1589,8 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) ; i++ ){ dap_chain_node_addr_t * l_seed_node_addr; l_seed_node_addr = dap_chain_node_alias_find(l_net, PVT(l_net)->seed_aliases[i] ); - if (l_seed_node_addr == NULL){ - log_it(L_NOTICE, "Not found alias %s in database, prefill it",PVT(l_net)->seed_aliases[i]); + //if (l_seed_node_addr == NULL){ + log_it(L_NOTICE, "Update alias %s in database, prefill it",PVT(l_net)->seed_aliases[i]); dap_chain_node_info_t * l_node_info = DAP_NEW_Z(dap_chain_node_info_t); l_seed_node_addr = DAP_NEW_Z(dap_chain_node_addr_t); dap_snprintf( l_node_info->hdr.alias,sizeof ( l_node_info->hdr.alias),"%s",PVT(l_net)->seed_aliases[i]); @@ -1648,10 +1651,12 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) }else log_it(L_WARNING,"No address for seed node, can't populate global_db with it"); DAP_DELETE( l_node_info); - }else{ + /*}else{ log_it(L_DEBUG,"Seed alias %s is present",PVT(l_net)->seed_aliases[i]); + dap_chain_node_info_t * l_node_info= dap_chain_node_info_read(l_net,l_seed_node_addr); + l_node DAP_DELETE( l_seed_node_addr); - } + }*/ } PVT(l_net)->bootstrap_nodes_count = 0; PVT(l_net)->bootstrap_nodes_addrs = DAP_NEW_SIZE(struct in_addr, l_bootstrap_nodes_len * sizeof(struct in_addr)); diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index 85540d1906fdab0e1804211eb0e7614040a5818d..7bfb278fb0d7bf92f1f67956653e753600ab4831 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -729,7 +729,7 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_verify(dap_chain_t * a_ HASH_FIND(hh, PVT(l_dag)->events ,l_hash ,sizeof (*l_hash), l_event_search); if ( l_event_search == NULL ){ char * l_hash_str = dap_chain_hash_fast_to_str_new(l_hash); - log_it(L_INFO, "Hash %s wasn't in hashtable of previously parsed", l_hash_str); + log_it(L_WARNING, "Hash %s wasn't in hashtable of previously parsed", l_hash_str); DAP_DELETE(l_hash_str); res = ATOM_MOVE_TO_THRESHOLD; break;