diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 53b1d6430b0bd67a7e6b096b76708a6b038ec5de..3f48f92454e73a854251d3c2ac884ccd4978776d 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -107,7 +107,6 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct dap_worker * a_worker) { a_es->last_ping_request = time(NULL); - a_es->worker = a_worker; dap_worker_add_events_socket(a_es,a_worker); } @@ -556,7 +555,8 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *sc, bool is_rea int l_errno = errno; char l_errbuf[128]; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it(L_ERROR,"Can't update write client socket state in the epoll_fd: \"%s\" (%d)", l_errbuf, l_errno); + log_it(L_ERROR,"Can't update write client socket state in the epoll_fd %d: \"%s\" (%d)", + sc->worker->epoll_fd, l_errbuf, l_errno); } } @@ -614,13 +614,23 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool */ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap_worker_t * a_worker) { - if ( epoll_ctl( a_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 ) - log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" ); + if (!a_es->worker) { + // Socket already removed from worker + return; + } + if ( epoll_ctl( a_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 ) { + int l_errno = errno; + char l_errbuf[128]; + strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); + log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd %d \"%s\" (%d)", + a_worker->epoll_fd, l_errbuf, l_errno); + } else log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_worker->id ); a_worker->event_sockets_count--; if(a_worker->esockets) HASH_DELETE(hh_worker,a_worker->esockets, a_es); + a_es->worker = NULL; } /** diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index b3ed4989a3799a1ab8e1751e8c8b2a9947ba7c31..6c61733b2f113c1f6958a5ed530767fe570b6fcc 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -137,7 +137,8 @@ void *dap_worker_thread(void *arg) l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; log_it(L_DEBUG, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err)); } - default: log_it(L_WARNING, "Unimplemented EPOLLHUP for socket type %d", l_cur->type); + break; + default: log_it(L_WARNING, "Unimplemented EPOLLHUP for socket type %d", l_cur->type); } } @@ -346,12 +347,12 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg; dap_worker_t * w = a_es->worker; //log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new); - l_es_new->worker = w; if ( l_es_new->type == DESCRIPTOR_TYPE_SOCKET || l_es_new->type == DESCRIPTOR_TYPE_SOCKET_LISTENING ){ int l_cpu = w->id; setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu)); } - + bool l_socket_present = (l_es_new->worker && l_es_new->is_initalized) ? true : false; + l_es_new->worker = w; // We need to differ new and reassigned esockets. If its new - is_initialized is false if ( ! l_es_new->is_initalized ){ if (l_es_new->callbacks.new_callback) @@ -369,6 +370,10 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) if(l_es_new->flags & DAP_SOCK_READY_TO_WRITE ) l_es_new->ev.events |= EPOLLOUT; l_es_new->ev.data.ptr = l_es_new; + if (l_socket_present) { + // Update only flags, socket already present in worker + return; + } l_ret = epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, l_es_new->socket, &l_es_new->ev); #else #error "Unimplemented new esocket on worker callback for current platform" @@ -383,7 +388,7 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) // Add in worker l_es_new->me = l_es_new; HASH_ADD(hh_worker, w->esockets, me, sizeof(void *), l_es_new ); - + w->event_sockets_count++; log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->id); if (l_es_new->callbacks.worker_assign_callback) l_es_new->callbacks.worker_assign_callback(l_es_new, w); @@ -546,8 +551,7 @@ dap_worker_t *dap_worker_add_events_socket_auto( dap_events_socket_t *a_es) // struct epoll_event ev = {0}; dap_worker_t *l_worker = dap_events_worker_get_auto( ); - a_es->worker = l_worker; - a_es->events = a_es->worker->events; + a_es->events = l_worker->events; dap_worker_add_events_socket( a_es, l_worker); return l_worker; } diff --git a/dap-sdk/net/stream/ch/dap_stream_ch.c b/dap-sdk/net/stream/ch/dap_stream_ch.c index 70577caa6ae33615c5f56cb9e22e8b4bac45c8af..036dbac03900c7d1bed85fc7d97eb927886dee4f 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch.c @@ -145,7 +145,7 @@ struct dap_stream_ch_table_t *dap_stream_ch_valid(dap_stream_ch_t *a_ch) */ void dap_stream_ch_delete(dap_stream_ch_t *a_ch) { - dap_stream_worker_t * l_stream_worker = DAP_STREAM_WORKER( a_ch->stream->esocket->worker ); + dap_stream_worker_t * l_stream_worker = a_ch->stream_worker; HASH_DELETE(hh_worker,l_stream_worker->channels, a_ch); @@ -189,7 +189,7 @@ void dap_stream_ch_set_ready_to_read_unsafe(dap_stream_ch_t * a_ch,bool a_is_rea if( a_ch->ready_to_read != a_is_ready){ //log_it(L_DEBUG,"Change channel '%c' to %s", (char) ch->proc->id, is_ready?"true":"false"); a_ch->ready_to_read=a_is_ready; - dap_events_socket_set_readable_unsafe( a_ch->stream->esocket,a_is_ready); + dap_events_socket_set_readable_unsafe(a_ch->stream->esocket, a_is_ready); } } @@ -205,7 +205,7 @@ void dap_stream_ch_set_ready_to_write_unsafe(dap_stream_ch_t * ch,bool is_ready) ch->ready_to_write=is_ready; if(is_ready && ch->stream->conn_http) ch->stream->conn_http->state_write=DAP_HTTP_CLIENT_STATE_DATA; - dap_events_socket_set_writable_unsafe(ch->stream->esocket,is_ready); + dap_events_socket_set_writable_unsafe(ch->stream->esocket, is_ready); } } diff --git a/dap-sdk/net/stream/stream/dap_stream_worker.c b/dap-sdk/net/stream/stream/dap_stream_worker.c index 5479b717530a2d80a41950d4b60049211da68c06..fe374507f386098af0efc7fad49e6f5973ad5c6b 100644 --- a/dap-sdk/net/stream/stream/dap_stream_worker.c +++ b/dap-sdk/net/stream/stream/dap_stream_worker.c @@ -39,6 +39,10 @@ int dap_stream_worker_init() uint32_t l_worker_count = dap_events_worker_get_count(); for (uint32_t i = 0; i < l_worker_count; i++){ dap_worker_t * l_worker = dap_events_worker_get(i); + if (!l_worker) { + log_it(L_CRITICAL,"Can't init stream worker, woreker thread don't exist"); + return -2; + } if (l_worker->_inheritor){ log_it(L_CRITICAL,"Can't init stream worker, core worker has already inheritor"); return -1; diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index b24e58fdc6a6cc80ecca8792b046c487cd7bad03..4b1d47d0f05d095e955032ec49bf78ad88839590 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -46,6 +46,10 @@ #include "dap_hash.h" #include "utlist.h" +#include "dap_worker.h" +#include "dap_events.h" +#include "dap_proc_thread.h" + #include "dap_chain.h" #include "dap_chain_datum.h" #include "dap_chain_cs.h" @@ -55,6 +59,7 @@ #include "dap_chain_global_db_remote.h" #include "dap_stream.h" +#include "dap_stream_worker.h" #include "dap_stream_ch_pkt.h" #include "dap_stream_ch.h" #include "dap_stream_ch_proc.h" @@ -118,6 +123,251 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg) pthread_mutex_destroy(&DAP_STREAM_CH_CHAIN(a_ch)->mutex); } + +bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) +{ + UNUSED(a_thread); + dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); + + dap_chain_atom_ptr_t * l_lasts = NULL; + size_t *l_lasts_sizes = NULL; + size_t l_lasts_count = 0; + dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create(l_chain); + l_ch_chain->request_atom_iter = l_iter; + l_lasts = l_chain->callback_atom_iter_get_lasts(l_iter, &l_lasts_count, &l_lasts_sizes); + if(l_lasts&& l_lasts_sizes) { + for(size_t i = 0; i < l_lasts_count; i++) { + dap_chain_atom_item_t * l_item = NULL; + dap_chain_hash_fast_t l_atom_hash; + dap_hash_fast(l_lasts[i], l_lasts_sizes[i], + &l_atom_hash); + pthread_mutex_lock(&l_ch_chain->mutex); + HASH_FIND(hh, l_ch_chain->request_atoms_lasts, &l_atom_hash, sizeof(l_atom_hash), + l_item); + if(l_item == NULL) { // Not found, add new lasts + l_item = DAP_NEW_Z(dap_chain_atom_item_t); + l_item->atom = l_lasts[i]; + memcpy(&l_item->atom_hash, &l_atom_hash, sizeof(l_atom_hash)); + HASH_ADD(hh, l_ch_chain->request_atoms_lasts, atom_hash, sizeof(l_atom_hash), + l_item); + } + pthread_mutex_unlock(&l_ch_chain->mutex); + } + // first packet + l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS; + dap_chain_node_addr_t l_node_addr = { 0 }; + dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); + l_node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN, + l_ch_chain->request_net_id, l_ch_chain->request_chain_id, + l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); + } + else { + // last packet + dap_stream_ch_chain_sync_request_t l_request; + memset(&l_request,0,sizeof (l_request)); + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, + l_ch_chain->request_net_id, l_ch_chain->request_chain_id, + l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); + l_ch_chain->state = CHAIN_STATE_IDLE; + } + DAP_DELETE(l_lasts); + DAP_DELETE(l_iter); + dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); + dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); + return true; +} + +bool s_sync_gdb_callback(dap_proc_thread_t *a_thread, void *a_arg) +{ + UNUSED(a_thread); + dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + + // Get log diff + l_ch_chain->request_last_ts = dap_db_log_get_last_id(); + //log_it(L_DEBUG, "Requested transactions %llu:%llu", l_request->id_start, (uint64_t ) l_ch_chain->request_last_ts); + uint64_t l_start_item = l_ch_chain->request.id_start; + // If the current global_db has been truncated, but the remote node has not known this + if(l_ch_chain->request.id_start > l_ch_chain->request_last_ts) { + l_start_item = 0; + } + dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); + dap_list_t *l_add_groups = dap_chain_net_get_add_gdb_group(l_net, l_ch_chain->request.node_addr); + dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_start_item + 1, l_add_groups); + dap_chain_node_addr_t l_node_addr = { 0 }; + l_node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, + l_ch_chain->request_net_id, l_ch_chain->request_chain_id, + l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); + if(l_db_log) { + //log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list)); + // Add it to outgoing list + l_ch_chain->request_global_db_trs = l_db_log; + l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB; + } else { + dap_stream_ch_chain_sync_request_t l_request = {}; + //log_it(L_DEBUG, "No items to sync from %u", l_request->id_start + 1); + l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; + l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + l_ch_chain->request_net_id, l_ch_chain->request_chain_id, + l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); + l_ch_chain->state = CHAIN_STATE_IDLE; + if(l_ch_chain->callback_notify_packet_out) + l_ch_chain->callback_notify_packet_out(l_ch_chain, + DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + NULL, 0, l_ch_chain->callback_notify_arg); + } + //log_it(L_INFO, "Prepared %u items for sync", l_db_log->items_number - l_request->id_start); + // go to send data from list [in s_stream_ch_packet_out()] + // no data to send -> send one empty message DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB_SYNCED + dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); + dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); + return true; +} + +bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) +{ + UNUSED(a_thread); + dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); + if (!l_chain) { + log_it(L_WARNING, "No chain found for DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN"); + return true; + } + + dap_chain_hash_fast_t l_atom_hash = {}; + dap_chain_atom_ptr_t l_atom_copy = l_ch_chain->pkt_data; + uint64_t l_atom_copy_size = l_ch_chain->pkt_data_size; + dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash); + dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain); + size_t l_atom_size =0; + if ( l_chain->callback_atom_find_by_hash(l_atom_iter, &l_atom_hash, &l_atom_size) != NULL ) { + dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size); + if(l_atom_add_res == ATOM_ACCEPT && dap_chain_has_file_store(l_chain)) { + // append to file + dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_ch_chain->request_cell_id); + // add one atom only + int l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size); + // rewrite all file + //l_res = dap_chain_cell_file_update(l_cell); + if(!l_cell || l_res < 0) { + log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_atom_hash, + l_cell ? l_cell->file_storage_path : "[null]"); + } + // delete cell and close file + dap_chain_cell_delete(l_cell); + } + if(l_atom_add_res == ATOM_PASS) + DAP_DELETE(l_atom_copy); + } else { + DAP_DELETE(l_atom_copy); + } + l_chain->callback_atom_iter_delete(l_atom_iter); + dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); + return true; +} + +bool s_gdb_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) +{ + UNUSED(a_thread); + dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); + + size_t l_data_obj_count = 0; + // deserialize data & Parse data from dap_db_log_pack() + dap_store_obj_t *l_store_obj = dap_db_log_unpack(l_ch_chain->pkt_data,l_ch_chain->pkt_data_size, &l_data_obj_count); + //log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count ); + + for(size_t i = 0; i < l_data_obj_count; i++) { + // timestamp for exist obj + time_t l_timestamp_cur = 0; + // obj to add + dap_store_obj_t* l_obj = l_store_obj + i; + // read item from base; + size_t l_count_read = 0; + dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group, + l_obj->key, &l_count_read); + // get timestamp for the exist entry + if(l_read_obj) + l_timestamp_cur = l_read_obj->timestamp; + // get timestamp for the deleted entry + else + { + l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key); + } + + //check whether to apply the received data into the database + bool l_apply = true; + if(l_obj->timestamp < l_timestamp_cur) + l_apply = false; + else if(l_obj->type == 'd') { + // already deleted + if(!l_read_obj) + l_apply = false; + } + else if(l_obj->type == 'a') { + bool l_is_the_same_present = false; + if(l_read_obj && + l_read_obj->value_len == l_obj->value_len && + !memcmp(l_read_obj->value, l_obj->value, l_obj->value_len)) + l_is_the_same_present = true; + // this data already present in global_db and not obsolete (out of date) + if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp)) + l_apply = false; + } + if(l_read_obj) + dap_store_obj_free(l_read_obj, l_count_read); + + if(!l_apply) { + // If request was from defined node_addr we update its state + if(l_ch_chain->request.node_addr.uint64) { + dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id); + } + continue; + } + + char l_ts_str[50]; + dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), l_store_obj[i].timestamp); + /*log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\"" + " timestamp=\"%s\" value_len=%u ", + (char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group, + l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);*/ + // apply received transaction + dap_chain_t *l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); + if(l_chain) { + if(l_chain->callback_datums_pool_proc_with_group){ + void * restrict l_store_obj_value = l_store_obj->value; + l_chain->callback_datums_pool_proc_with_group(l_chain, + (dap_chain_datum_t** restrict) l_store_obj_value, 1, + l_store_obj[i].group); + } + } + // save data to global_db + if(!dap_chain_global_db_obj_save(l_obj, 1)) { + dap_stream_ch_chain_pkt_write_error(l_ch, l_ch_chain->request_net_id, + l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, + "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED"); + dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); + } else { + // If request was from defined node_addr we update its state + if(l_ch_chain->request.node_addr.uint64) { + dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id); + } + //log_it(L_DEBUG, "Added new GLOBAL_DB history pack"); + } + } + if(l_store_obj) + dap_store_obj_free(l_store_obj, l_data_obj_count); + dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); + return true; +} + /** * @brief s_stream_ch_packet_in * @param a_ch @@ -158,18 +408,10 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: { log_it(L_INFO, "In: SYNCED_GLOBAL_DB pkt"); - /*if(s_net_name) { - DAP_DELETE(s_net_name); - s_net_name = NULL; //"kelvin-testnet" - }*/ } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP: { log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt"); - /*if(s_net_name) { - DAP_DELETE(s_net_name); - s_net_name = NULL; //"kelvin-testnet" - }*/ } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: { @@ -182,7 +424,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: { - //log_it(L_INFO, "In: SYNC_CHAINS pkt"); + log_it(L_INFO, "In: SYNC_CHAINS pkt"); dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if(l_chain) { if(l_ch_chain->state != CHAIN_STATE_IDLE) { @@ -190,67 +432,20 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, "ERROR_STATE_NOT_IN_IDLE"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); } else { // fill ids if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) { dap_stream_ch_chain_sync_request_t * l_request = (dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data; memcpy(&l_ch_chain->request, l_request, l_chain_pkt_data_size); - memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, - sizeof(dap_chain_cell_id_t)); + memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); - } - - dap_chain_atom_ptr_t * l_lasts = NULL; - size_t *l_lasts_sizes = NULL; - size_t l_lasts_count = 0; - dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create(l_chain); - l_ch_chain->request_atom_iter = l_iter; - l_lasts = l_chain->callback_atom_iter_get_lasts(l_iter, &l_lasts_count, &l_lasts_sizes); - if(l_lasts&& l_lasts_sizes) { - for(size_t i = 0; i < l_lasts_count; i++) { - dap_chain_atom_item_t * l_item = NULL; - dap_chain_hash_fast_t l_atom_hash; - dap_hash_fast(l_lasts[i], l_lasts_sizes[i], - &l_atom_hash); - pthread_mutex_lock(&l_ch_chain->mutex); - HASH_FIND(hh, l_ch_chain->request_atoms_lasts, &l_atom_hash, sizeof(l_atom_hash), - l_item); - if(l_item == NULL) { // Not found, add new lasts - l_item = DAP_NEW_Z(dap_chain_atom_item_t); - l_item->atom = l_lasts[i]; - memcpy(&l_item->atom_hash, &l_atom_hash, sizeof(l_atom_hash)); - HASH_ADD(hh, l_ch_chain->request_atoms_lasts, atom_hash, sizeof(l_atom_hash), - l_item); - } - pthread_mutex_unlock(&l_ch_chain->mutex); - //else - // DAP_DELETE(l_lasts[i]); - } - // first packet - l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS; - dap_chain_node_addr_t l_node_addr = { 0 }; - dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); - l_node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); - } - else { - // last packet - dap_stream_ch_chain_sync_request_t l_request; - memset(&l_request,0,sizeof (l_request)); - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); - l_ch_chain->state = CHAIN_STATE_IDLE; - } - - DAP_DELETE(l_lasts); - DAP_DELETE(l_iter); } - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); + dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_chains_callback, a_ch); + } } } break; @@ -272,64 +467,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); - - // Get log diff - l_ch_chain->request_last_ts = dap_db_log_get_last_id(); - //log_it(L_DEBUG, "Requested transactions %llu:%llu", l_request->id_start, - // (uint64_t ) l_ch_chain->request_last_ts); - //dap_list_t *l_list = dap_db_log_get_list(l_request->id_start + 1); - uint64_t l_start_item = l_request->id_start; - // If the current global_db has been truncated, but the remote node has not known this - if(l_request->id_start > l_ch_chain->request_last_ts) { - l_start_item = 0; - } - dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id); - dap_list_t *l_add_groups = dap_chain_net_get_add_gdb_group(l_net, l_request->node_addr); - dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_start_item + 1, l_add_groups); - if(l_db_log) { - //log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list)); - // Add it to outgoing list - l_ch_chain->request_global_db_trs = l_db_log;//l_list; - //dap_list_t *l_last = dap_list_last(l_list); - //if(l_last) - // l_last->next = l_ch_chain->request_global_db_trs; - //l_ch_chain->request_global_db_trs = l_list; - l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB; - - dap_chain_node_addr_t l_node_addr = { 0 }; - dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); - l_node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); - - } else { - dap_chain_node_addr_t l_node_addr = { 0 }; - dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); - l_node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); - - dap_stream_ch_chain_sync_request_t l_request = {}; - //log_it(L_DEBUG, "No items to sync from %u", l_request->id_start + 1); - l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; - l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); -// dap_stream_ch_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB ,&l_request, -// sizeof (l_request)); - l_ch_chain->state = CHAIN_STATE_IDLE; - if(l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, - DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - NULL, 0, l_ch_chain->callback_notify_arg); - } - //log_it(L_INFO, "Prepared %u items for sync", l_db_log->items_number - l_request->id_start);//dap_list_length(l_ch_chain->request_global_db_trs)); - // go to send data from list [in s_stream_ch_packet_out()] - // no data to send -> send one empty message DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB_SYNCED - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); + dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_gdb_callback, a_ch); } else { log_it(L_ERROR, "Get DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB session_id=%u bad request", @@ -349,39 +488,19 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN: { - //log_it(L_INFO, "In: CHAIN pkt"); + log_it(L_INFO, "In: CHAIN pkt"); dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); if(l_chain) { // Expect atom element in if(l_chain_pkt_data_size > 0) { - dap_chain_hash_fast_t l_atom_hash = {}; - dap_hash_fast(l_chain_pkt->data, l_chain_pkt_data_size, &l_atom_hash); - dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain); - size_t l_atom_size =0; - if ( l_chain->callback_atom_find_by_hash(l_atom_iter, &l_atom_hash, &l_atom_size) != NULL ) { - dap_chain_atom_ptr_t l_atom_copy = DAP_CALLOC(1, l_chain_pkt_data_size); - memcpy(l_atom_copy, l_chain_pkt->data, l_chain_pkt_data_size); - dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy,l_chain_pkt_data_size); - if(l_atom_add_res == ATOM_ACCEPT && dap_chain_has_file_store(l_chain)) { - // append to file - dap_chain_cell_id_t l_cell_id; - l_cell_id.uint64 = l_chain_pkt->hdr.cell_id.uint64; - dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_cell_id); - // add one atom only - int l_res = dap_chain_cell_file_append(l_cell, l_chain_pkt->data, l_chain_pkt_data_size); - // rewrite all file - //l_res = dap_chain_cell_file_update(l_cell); - if(!l_cell || l_res < 0) { - log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_chain_pkt->data, - l_cell ? l_cell->file_storage_path : "[null]"); - } - // delete cell and close file - dap_chain_cell_delete(l_cell); - } - if(l_atom_add_res == ATOM_PASS) - DAP_DELETE(l_atom_copy); - } - l_chain->callback_atom_iter_delete(l_atom_iter); + memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); + memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); + memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); + l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size); + memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); + l_ch_chain->pkt_data_size = l_chain_pkt_data_size; + dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); + dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_chain_pkt_callback, a_ch); } else { log_it(L_WARNING, "Empty chain packet"); dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, @@ -397,106 +516,26 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d", l_chain_pkt_data_size); if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)) memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size); - //memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); - //memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); - //memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB: { //log_it(L_INFO, "In: GLOBAL_DB data_size=%d", l_chain_pkt_data_size); // get transaction and save it to global_db if(l_chain_pkt_data_size > 0) { - //session_data_t *l_data = session_data_find(a_ch->stream->session->id); - size_t l_data_obj_count = 0; - // deserialize data - dap_store_obj_t *l_store_obj = dap_db_log_unpack((uint8_t*) l_chain_pkt->data, - l_chain_pkt_data_size, &l_data_obj_count); // Parse data from dap_db_log_pack() -// log_it(L_INFO, "In: l_data_obj_count = %d", l_data_obj_count ); - - for(size_t i = 0; i < l_data_obj_count; i++) { - // timestamp for exist obj - time_t l_timestamp_cur = 0; - // obj to add - dap_store_obj_t* l_obj = l_store_obj + i; - // read item from base; - size_t l_count_read = 0; - dap_store_obj_t *l_read_obj = dap_chain_global_db_driver_read(l_obj->group, - l_obj->key, &l_count_read); - // get timestamp for the exist entry - if(l_read_obj) - l_timestamp_cur = l_read_obj->timestamp; - // get timestamp for the deleted entry - else - { - l_timestamp_cur = global_db_gr_del_get_timestamp(l_obj->group, l_obj->key); - } - - //check whether to apply the received data into the database - bool l_apply = true; - if(l_obj->timestamp < l_timestamp_cur) - l_apply = false; - else if(l_obj->type == 'd') { - // already deleted - if(!l_read_obj) - l_apply = false; - } - else if(l_obj->type == 'a') { - bool l_is_the_same_present = false; - if(l_read_obj && - l_read_obj->value_len == l_obj->value_len && - !memcmp(l_read_obj->value, l_obj->value, l_obj->value_len)) - l_is_the_same_present = true; - // this data already present in global_db and not obsolete (out of date) - if(l_read_obj && (l_is_the_same_present || l_read_obj->timestamp >= l_store_obj->timestamp)) - l_apply = false; - } - if(l_read_obj) - dap_store_obj_free(l_read_obj, l_count_read); - - if(!l_apply) { - // If request was from defined node_addr we update its state - if(l_ch_chain->request.node_addr.uint64) { - dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, l_obj->id); - } - continue; - } - - char l_ts_str[50]; - dap_time_to_str_rfc822(l_ts_str, sizeof(l_ts_str), l_store_obj[i].timestamp); - /*log_it(L_DEBUG, "Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\"" - " timestamp=\"%s\" value_len=%u ", - (char ) l_store_obj[i].type, l_store_obj[i].type, l_store_obj[i].group, - l_store_obj[i].key, l_ts_str, l_store_obj[i].value_len);*/ - // apply received transaction - dap_chain_t *l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id); - if(l_chain) { - if(l_chain->callback_datums_pool_proc_with_group){ - void * restrict l_store_obj_value = l_store_obj->value; - l_chain->callback_datums_pool_proc_with_group(l_chain, - (dap_chain_datum_t** restrict) l_store_obj_value, 1, - l_store_obj[i].group); - } - } - // save data to global_db - if(!dap_chain_global_db_obj_save(l_obj, 1)) { - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_GLOBAL_DB_INTERNAL_NOT_SAVED"); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } else { - // If request was from defined node_addr we update its state - if(l_ch_chain->request.node_addr.uint64) { - dap_db_log_set_last_id_remote(l_ch_chain->request.node_addr.uint64, - l_obj->id); - } - //log_it(L_DEBUG, "Added new GLOBAL_DB history pack"); - } - } - if(l_store_obj) - dap_store_obj_free(l_store_obj, l_data_obj_count); - + memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t)); + memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t)); + memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t)); + l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size); + memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size); + l_ch_chain->pkt_data_size = l_chain_pkt_data_size; + dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); + dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_gdb_pkt_callback, a_ch); } else { log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size"); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_GLOBAL_DB_PACKET_EMPTY"); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); } } break; @@ -557,24 +596,17 @@ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain) HASH_ITER( hh, a_ch_chain->request_atoms_processed, l_atom_item, l_atom_item_tmp ) HASH_DEL(a_ch_chain->request_atoms_processed, l_atom_item); pthread_mutex_unlock(&a_ch_chain->mutex); - dap_stream_ch_set_ready_to_write_unsafe(a_ch_chain->ch, false); - } -/** - * @brief s_stream_ch_packet_out - * @param ch - * @param arg - */ -void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) +bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) { - (void) a_arg; - - dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); + UNUSED(a_thread); + dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); //log_it( L_DEBUG,"s_stream_ch_packet_out state=%d", l_ch_chain ? l_ch_chain->state : -1); // log_it( L_DEBUG,"l_ch_chain %X", l_ch_chain ); - + bool l_packet_out = false; switch (l_ch_chain->state) { case CHAIN_STATE_IDLE: { dap_stream_ch_chain_go_idle(l_ch_chain); @@ -585,100 +617,63 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) case CHAIN_STATE_SYNC_GLOBAL_DB: { // Get log diff - //size_t l_data_size_out = 0; - - dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs; //dap_list_last( l_ch_chain->request_global_db_trs ); + dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs; dap_global_db_obj_t *l_obj = dap_db_log_list_get(l_db_list); - if (1) { - //dap_list_t *l_list = l_ch_chain->request_global_db_trs; //dap_list_last( l_ch_chain->request_global_db_trs ); - bool l_is_stop = true; //l_list ? false : true; - while(l_obj) { - - size_t l_items_total = dap_db_log_list_get_count(l_db_list); + bool l_is_stop = true; + while(l_obj) { + size_t l_item_size_out = 0; + uint8_t *l_item = dap_db_log_pack(l_obj, &l_item_size_out); + // Item not found, maybe it has deleted? Then go to the next item + if(!l_item || !l_item_size_out) { + //log_it(L_WARNING, "Log pack returned NULL??? data=0x%x (nothing to send) (rest=%d records)", l_obj, l_items_rest); + l_item_size_out = 0; + // go to next item + l_obj = dap_db_log_list_get(l_db_list); + } + else { + /*size_t l_items_total = dap_db_log_list_get_count(l_db_list); size_t l_items_rest = dap_db_log_list_get_count_rest(l_db_list); - - size_t l_item_size_out = 0; - uint8_t *l_item = dap_db_log_pack(l_obj, &l_item_size_out); - // Item not found, maybe it has deleted? Then go to the next item - if(!l_item || !l_item_size_out) { - //log_it(L_WARNING, "Log pack returned NULL??? data=0x%x (nothing to send) (rest=%d records)", l_obj, - // l_items_rest); - l_item_size_out = 0; - //dap_stream_ch_set_ready_to_write(a_ch, false); - - // go to next item - l_obj = dap_db_log_list_get(l_db_list); - //if(l_obj) - // continue; - // stop global_db sync - //else - // break; - } - else { - //log_it(L_INFO, "Send one global_db record data=0x%x len=%d (rest=%d/%d items)", l_item, l_item_size_out, - // l_items_rest, l_items_total); - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, l_item, l_item_size_out); - l_ch_chain->stats_request_gdb_processed++; - //dap_stream_ch_set_ready_to_write(a_ch, true); - //sleep(1); - - DAP_DELETE(l_item); - // sent the record, another will be sent - l_is_stop = false; - break; - } - // remove current item from list and go to next item - /*dap_chain_global_db_obj_delete((dap_global_db_obj_t *) l_list->data); - l_ch_chain->request_global_db_trs = dap_list_delete_link(l_ch_chain->request_global_db_trs, l_list); - // nothing was sent - if(!l_item_size_out) { - l_list = l_ch_chain->request_global_db_trs; - // go to next item - if(l_list) - continue; - // stop global_db sync - else - break; - }*/ + log_it(L_INFO, "Send one global_db record data=0x%x len=%d (rest=%d/%d items)", l_item, l_item_size_out, + l_items_rest, l_items_total);*/ + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, + l_ch_chain->request_net_id, l_ch_chain->request_chain_id, + l_ch_chain->request_cell_id, l_item, l_item_size_out); + l_packet_out = true; + l_ch_chain->stats_request_gdb_processed++; + DAP_DELETE(l_item); + // sent the record, another will be sent + l_is_stop = false; + break; } + } - if(l_is_stop){ - //log_it(L_DEBUG, "l_obj == 0, STOP"); - // If we don't need to send chains after -// if(l_ch_chain->state != CHAIN_STATE_SYNC_ALL){ -// dap_stream_ch_chain_go_idle(l_ch_chain); -// }else if(l_ch_chain->state == CHAIN_STATE_SYNC_GLOBAL_DB) - { - // free log list - l_ch_chain->request_global_db_trs = NULL; - dap_db_log_list_delete(l_db_list); - - // last message - - dap_stream_ch_chain_sync_request_t l_request = {}; - dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); - l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; - l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); - l_request.id_end = 0; - - log_it( L_DEBUG,"Syncronized database: last id %llu, items syncronyzed %llu ", l_request.id_start, - l_ch_chain->stats_request_gdb_processed ); - - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); - - if(l_ch_chain->callback_notify_packet_out) - l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - NULL, 0, l_ch_chain->callback_notify_arg); - - if(l_ch_chain->state != CHAIN_STATE_SYNC_ALL) - dap_stream_ch_chain_go_idle(l_ch_chain); - } - } + if(l_is_stop){ + //log_it(L_DEBUG, "l_obj == 0, STOP"); + // free log list + l_ch_chain->request_global_db_trs = NULL; + dap_db_log_list_delete(l_db_list); + // last message + dap_stream_ch_chain_sync_request_t l_request = {}; + dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); + l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; + l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); + l_request.id_end = 0; + + log_it( L_DEBUG,"Syncronized database: last id %llu, items syncronyzed %llu ", l_request.id_start, + l_ch_chain->stats_request_gdb_processed ); + + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + l_ch_chain->request_net_id, l_ch_chain->request_chain_id, + l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); + l_packet_out = true; + + if(l_ch_chain->callback_notify_packet_out) + l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + NULL, 0, l_ch_chain->callback_notify_arg); + + if(l_ch_chain->state != CHAIN_STATE_SYNC_ALL) + dap_stream_ch_chain_go_idle(l_ch_chain); } } @@ -689,17 +684,6 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) case CHAIN_STATE_SYNC_CHAINS: { //log_it(L_DEBUG, "CHAIN_STATE_SYNC_CHAINS"); dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); - /* - // alternative way to get l_chain - if(!l_ch_chain->request_atom_iter) { - log_it(L_ERROR, "CHAIN_STATE_SYNC_CHAINS not ready to send chains"); - l_ch_chain->state = CHAIN_STATE_IDLE; - break; - } - //dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create(l_chain); - dap_chain_t * l_chain = l_ch_chain->request_atom_iter->chain; - */ - dap_chain_atom_item_t * l_atom_item = NULL, *l_atom_item_tmp = NULL;//, *l_chains_lasts_new = NULL; if(l_ch_chain->request_atoms_lasts == NULL) { // All chains synced dap_stream_ch_chain_sync_request_t l_request; @@ -708,10 +692,10 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS : DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL; // last message - dap_stream_ch_chain_pkt_write_unsafe(a_ch, - l_send_pkt_type, - l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); + dap_stream_ch_chain_pkt_write_unsafe(l_ch, l_send_pkt_type, + l_ch_chain->request_net_id, l_ch_chain->request_chain_id, + l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); + l_packet_out = true; log_it( L_DEBUG,"Synced: %llu atoms processed", l_ch_chain->stats_request_atoms_processed); dap_stream_ch_chain_go_idle(l_ch_chain); @@ -735,22 +719,19 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) // Then flush it out to the remote size_t l_atom_size = l_atom_item->atom_size; - dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_net_id, - l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, - l_atom_item->atom, l_atom_size); + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_net_id, + l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, + l_atom_item->atom, l_atom_size); + l_packet_out = true; l_ch_chain->stats_request_atoms_processed++; // Then parse links and populate new lasts size_t l_lasts_size = 0; dap_chain_atom_ptr_t * l_links = NULL; size_t *l_links_sizes = NULL; - dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create_from(l_chain, l_atom_item->atom, l_atom_item->atom_size); - l_links = l_chain->callback_atom_iter_get_links(l_iter, &l_lasts_size,&l_links_sizes); + l_links = l_chain->callback_atom_iter_get_links(l_iter, &l_lasts_size, &l_links_sizes); DAP_DELETE(l_iter); - if( l_links&&l_links_sizes){ - //DAP_DELETE(l_atom_item->atom); - //l_links = l_chain->callback_atom_iter_get_links(l_atom_item->atom, &l_lasts_size); - + if (l_links && l_links_sizes) { for(size_t i = 0; i < l_lasts_size; i++) { // Find links dap_chain_atom_item_t * l_link_item = NULL; dap_chain_hash_fast_t l_link_hash; @@ -761,11 +742,8 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) l_link_item = DAP_NEW_Z(dap_chain_atom_item_t); l_link_item->atom = l_links[i];// do not use memory cause it will be deleted memcpy(&l_link_item->atom_hash, &l_link_hash, sizeof(l_link_hash)); - //HASH_ADD(hh, l_chains_lasts_new, atom_hash, sizeof(l_link_hash), l_link_item); HASH_ADD(hh, l_ch_chain->request_atoms_lasts, atom_hash, sizeof(l_link_hash), l_link_item); } - //else - // DAP_DELETE(l_links[i]); } DAP_DELETE(l_links); } @@ -778,11 +756,24 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) } pthread_mutex_unlock(&l_ch_chain->mutex); } - //assert(l_ch_chain->request_atoms_lasts == NULL); - //l_ch_chain->request_atoms_lasts = l_chains_lasts_new; } break; - } + if (l_packet_out) { + dap_stream_ch_set_ready_to_write_unsafe(l_ch, true); + } + dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); + return true; +} +/** + * @brief s_stream_ch_packet_out + * @param ch + * @param arg + */ +void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) +{ + (void) a_arg; + dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker); + dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_out_pkt_callback, a_ch); } diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index 989a7ea97a43467bdcaa330d3e39314ad095f820..2d1ef9751167717a0af1c3dfb3ed2445757c7038 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -55,6 +55,8 @@ typedef struct dap_stream_ch_chain { dap_chain_atom_iter_t * request_atom_iter; dap_chain_atom_item_t * request_atoms_lasts; dap_chain_atom_item_t * request_atoms_processed; + uint8_t *pkt_data; + uint64_t pkt_data_size; uint64_t stats_request_atoms_processed; uint64_t stats_request_gdb_processed; dap_stream_ch_chain_sync_request_t request;