diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 1404f7e32f9289dd13e6e782969a0e7199362909..8486ad41fd2828ab2438e5fe55ff7e49439c1005 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -490,6 +490,8 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) // new added, whether it is necessary? a_client_pvt->stream->session->key = a_client_pvt->stream_key; + a_client_pvt->stream_worker = (dap_stream_worker_t*) l_worker->_inheritor; + // connect struct sockaddr_in l_remote_addr; diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 3f48f92454e73a854251d3c2ac884ccd4978776d..d9f71bf55b7616a494df2ff96fec8dda392088b1 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -94,7 +94,7 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, ret->ev_base_flags = EPOLLERR | EPOLLRDHUP | EPOLLHUP; #endif - log_it( L_DEBUG,"Dap event socket wrapped around %d sock a_events = %X", a_sock, a_events ); + // log_it( L_DEBUG,"Dap event socket wrapped around %d sock a_events = %X", a_sock, a_events ); return ret; } @@ -157,8 +157,8 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c log_it( L_ERROR, "Error detected, can't create pipe(): '%s' (%d)", l_errbuf, l_errno); DAP_DELETE(l_es); return NULL; - }else - log_it(L_DEBUG, "Created one-way unnamed bytestream pipe %d->%d", l_pipe[0], l_pipe[1]); + }//else + // log_it(L_DEBUG, "Created one-way unnamed bytestream pipe %d->%d", l_pipe[0], l_pipe[1]); l_es->fd = l_pipe[0]; l_es->fd2 = l_pipe[1]; return l_es; @@ -223,8 +223,8 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc } DAP_DELETE(l_es); return NULL; - }else - log_it(L_DEBUG, "Created one-way unnamed packet pipe %d->%d", l_pipe[0], l_pipe[1]); + }//else + // log_it(L_DEBUG, "Created one-way unnamed packet pipe %d->%d", l_pipe[0], l_pipe[1]); l_es->fd = l_pipe[0]; l_es->fd2 = l_pipe[1]; #endif @@ -450,7 +450,7 @@ dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct da assert( a_callbacks ); assert( a_server ); - log_it( L_DEBUG,"Dap event socket wrapped around %d sock", a_sock ); + //log_it( L_DEBUG,"Dap event socket wrapped around %d sock", a_sock ); dap_events_socket_t * ret = DAP_NEW_Z( dap_events_socket_t ); ret->socket = a_sock; @@ -569,7 +569,7 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool if ( !a_es ) return; - log_it( L_DEBUG, "es is going to be removed from the lists and free the memory (0x%016X)", a_es ); + //log_it( L_DEBUG, "es is going to be removed from the lists and free the memory (0x%016X)", a_es ); dap_events_socket_remove_from_worker_unsafe(a_es, a_es->worker); if (a_es->events){ // It could be socket NOT from events @@ -584,7 +584,7 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool HASH_DEL( a_es->events->sockets, a_es ); pthread_rwlock_unlock( &a_es->events->sockets_rwlock ); } - log_it( L_DEBUG, "dap_events_socket wrapped around %d socket is removed", a_es->socket ); + //log_it( L_DEBUG, "dap_events_socket wrapped around %d socket is removed", a_es->socket ); if( a_es->callbacks.delete_callback ) a_es->callbacks.delete_callback( a_es, NULL ); // Init internal structure @@ -624,9 +624,8 @@ void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd %d \"%s\" (%d)", a_worker->epoll_fd, l_errbuf, l_errno); - } - else - log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_worker->id ); + } //else + // log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_worker->id ); a_worker->event_sockets_count--; if(a_worker->esockets) HASH_DELETE(hh_worker,a_worker->esockets, a_es); @@ -794,7 +793,7 @@ size_t dap_events_socket_write_unsafe(dap_events_socket_t *sc, const void * data */ size_t dap_events_socket_write_f_unsafe(dap_events_socket_t *sc, const char * format,...) { - log_it(L_DEBUG,"dap_events_socket_write_f %u sock", sc->socket ); + //log_it(L_DEBUG,"dap_events_socket_write_f %u sock", sc->socket ); size_t max_data_size = sizeof(sc->buf_out)-sc->buf_out_size; va_list ap; diff --git a/dap-sdk/net/core/dap_proc_queue.c b/dap-sdk/net/core/dap_proc_queue.c index 632d40ef81a773b5de1f7832ba6b0f0faefea4db..d41f6604ca75a55e570d21031dbe733bd8ec1a00 100644 --- a/dap-sdk/net/core/dap_proc_queue.c +++ b/dap-sdk/net/core/dap_proc_queue.c @@ -65,7 +65,7 @@ void dap_proc_queue_delete(dap_proc_queue_t * a_queue) */ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg) { - log_it(L_DEBUG, "New callback in list accepted"); + //log_it(L_DEBUG, "New callback in list accepted"); dap_proc_queue_t * l_queue = (dap_proc_queue_t*) a_es->_inheritor; dap_proc_queue_msg_t * l_msg = (dap_proc_queue_msg_t*) a_msg; @@ -78,7 +78,7 @@ static void s_queue_esocket_callback( dap_events_socket_t * a_es, void * a_msg) l_queue->items = l_item; // Add on top so after call this callback will be executed first dap_events_socket_queue_ptr_send(l_queue->proc_thread->proc_event,NULL); - log_it( L_DEBUG, "Sent signal to proc thread that we have callbacks on board"); + //log_it( L_DEBUG, "Sent signal to proc thread that we have callbacks on board"); } if (l_msg->signal_kill){ // Say to kill this object and delete its inherior dap_proc_queue_t a_es->kill_signal = true; diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 930f67b702a89114ca701d1b02eebfa2e92ac97c..27d03da39d702c58eb5943c844764630f580f8dd 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -116,7 +116,7 @@ dap_proc_thread_t * dap_proc_thread_get_auto() static void s_proc_event_callback(dap_events_socket_t * a_esocket, void * a_value) { (void) a_value; - log_it(L_DEBUG, "Proc event callback"); + //log_it(L_DEBUG, "Proc event callback"); dap_proc_thread_t * l_thread = (dap_proc_thread_t *) a_esocket->_inheritor; dap_proc_queue_item_t * l_item = l_thread->proc_queue->items; dap_proc_queue_item_t * l_item_old = NULL; @@ -247,8 +247,8 @@ static void * s_proc_thread_function(void * a_arg) #ifdef DAP_EVENTS_CAPS_EPOLL if ( epoll_ctl( l_thread->epoll_ctl, EPOLL_CTL_DEL, l_cur->fd, &l_cur->ev ) == -1 ) log_it( L_ERROR,"Can't remove event socket's handler from the epoll ctl" ); - else - log_it( L_DEBUG,"Removed epoll's event from proc thread #%u", l_thread->cpu_id ); + //else + // log_it( L_DEBUG,"Removed epoll's event from proc thread #%u", l_thread->cpu_id ); if (l_cur->callbacks.delete_callback) l_cur->callbacks.delete_callback(l_cur, l_thread); if(l_cur->_inheritor) diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 6c61733b2f113c1f6958a5ed530767fe570b6fcc..ff88a74640a5f3550458ea6106ff6cae4ea5ee16 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -135,7 +135,7 @@ void *dap_worker_thread(void *arg) //cur->no_close = false; if (l_sock_err) { l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; - log_it(L_DEBUG, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err)); + log_it(L_INFO, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err)); } break; default: log_it(L_WARNING, "Unimplemented EPOLLHUP for socket type %d", l_cur->type); @@ -228,9 +228,13 @@ void *dap_worker_thread(void *arg) if(l_bytes_read > 0) { l_cur->buf_in_size += l_bytes_read; //log_it(L_DEBUG, "Received %d bytes", l_bytes_read); - if(l_cur->callbacks.read_callback) + if(l_cur->callbacks.read_callback){ l_cur->callbacks.read_callback(l_cur, NULL); // Call callback to process read event. At the end of callback buf_in_size should be zero if everything was read well - else{ + if (l_cur->worker == NULL ){ // esocket was unassigned in callback, we don't need any ops with it now, + // continue to poll another esockets + continue; + } + }else{ log_it(L_WARNING, "We have incomming %u data but no read callback on socket %d, removing from read set", l_cur->socket); dap_events_socket_set_readable_unsafe(l_cur,false); } @@ -255,6 +259,10 @@ void *dap_worker_thread(void *arg) //log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size); if(l_cur->callbacks.write_callback) l_cur->callbacks.write_callback(l_cur, NULL); // Call callback to process write event + if (l_cur->worker == NULL ){ // esocket was unassigned in callback, we don't need any ops with it now, + // continue to poll another esockets + continue; + } if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) { @@ -389,7 +397,7 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) l_es_new->me = l_es_new; HASH_ADD(hh_worker, w->esockets, me, sizeof(void *), l_es_new ); w->event_sockets_count++; - log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->id); + //log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->id); if (l_es_new->callbacks.worker_assign_callback) l_es_new->callbacks.worker_assign_callback(l_es_new, w); @@ -463,7 +471,7 @@ static void s_queue_es_io_callback( dap_events_socket_t * a_es, void * a_arg) dap_events_socket_t *l_msg_es = NULL; HASH_FIND(hh_worker, a_es->worker->esockets, &l_msg->esocket , sizeof (void*), l_msg_es ); if ( l_msg_es == NULL){ - log_it(L_DEBUG, "We got i/o message for client thats now not in list. Lost %u data", l_msg->data_size); + log_it(L_INFO, "We got i/o message for client thats now not in list. Lost %u data", l_msg->data_size); DAP_DELETE(l_msg); return; } diff --git a/dap-sdk/net/server/http_server/dap_http_simple.c b/dap-sdk/net/server/http_server/dap_http_simple.c index 9a11db0ba4a4b7dcaaf60700320088701dff2360..49e72c20a135fb97a87af3a1711ef4ab25045edb 100644 --- a/dap-sdk/net/server/http_server/dap_http_simple.c +++ b/dap-sdk/net/server/http_server/dap_http_simple.c @@ -358,7 +358,8 @@ static void s_http_client_headers_read( dap_http_client_t *a_http_client, void * } else { log_it( L_DEBUG, "No data section, execution proc callback" ); dap_events_socket_remove_from_worker_unsafe(a_http_client->esocket,a_http_client->esocket->worker); - dap_proc_queue_add_callback( a_http_client->esocket->worker->proc_queue, s_proc_queue_callback, l_http_simple); + dap_proc_queue_add_callback( l_http_simple->worker->proc_queue, s_proc_queue_callback, l_http_simple); + } } diff --git a/dap-sdk/net/stream/ch/dap_stream_ch.c b/dap-sdk/net/stream/ch/dap_stream_ch.c index 036dbac03900c7d1bed85fc7d97eb927886dee4f..c507880b314aeeb1e0c6abc4147ef0885f78d884 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch.c @@ -96,7 +96,7 @@ dap_stream_ch_t* dap_stream_ch_new(dap_stream_t* a_stream, uint8_t id) l_ch_new->ready_to_read = true; // Init on stream worker - dap_stream_worker_t * l_stream_worker = DAP_STREAM_WORKER( a_stream->esocket->worker ); + dap_stream_worker_t * l_stream_worker = a_stream->stream_worker; l_ch_new->stream_worker = l_stream_worker; HASH_ADD(hh_worker,l_stream_worker->channels, me,sizeof (void*),l_ch_new); diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index f47b347b670f638c66d790573583fcdc33e00dde..f14bff6ce9bf23e54d3a88593a20365cb27b3403 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -322,6 +322,7 @@ dap_stream_t * stream_new(dap_http_client_t * a_sh) pthread_rwlock_init( &ret->rwlock, NULL); ret->esocket = a_sh->esocket; + ret->stream_worker = (dap_stream_worker_t*) a_sh->esocket->worker->_inheritor; ret->conn_http=a_sh; ret->buf_defrag_size = 0; ret->seq_id = 0; @@ -333,6 +334,7 @@ dap_stream_t * stream_new(dap_http_client_t * a_sh) return ret; } + /** * @brief dap_stream_delete * @param a_stream @@ -391,7 +393,6 @@ dap_stream_t* dap_stream_new_es(dap_events_socket_t * a_es) ret->esocket = a_es; ret->buf_defrag_size=0; ret->is_client_to_uplink = true; - log_it(L_NOTICE,"New stream with events socket instance for %s",a_es->hostaddr); return ret; } diff --git a/dap-sdk/net/stream/stream/include/dap_stream.h b/dap-sdk/net/stream/stream/include/dap_stream.h index 9f953ff0980b803ee45cfca9d61970200aacd3f8..3130f61dfb0add3d018a2855353fd3c1a7644fd5 100644 --- a/dap-sdk/net/stream/stream/include/dap_stream.h +++ b/dap-sdk/net/stream/stream/include/dap_stream.h @@ -55,7 +55,7 @@ typedef struct dap_stream { pthread_rwlock_t rwlock; dap_stream_session_t * session; dap_events_socket_t * esocket; // Connection - + dap_stream_worker_t * stream_worker; struct dap_http_client * conn_http; // HTTP-specific char * service_key; diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 4b1d47d0f05d095e955032ec49bf78ad88839590..859485cc2bf781b7ac2bd69a143257a54410c457 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -129,6 +129,7 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) UNUSED(a_thread); dap_stream_ch_t *l_ch = (dap_stream_ch_t *)a_arg; dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch); + dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id); dap_chain_atom_ptr_t * l_lasts = NULL; @@ -243,31 +244,36 @@ bool s_chain_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) dap_chain_hash_fast_t l_atom_hash = {}; dap_chain_atom_ptr_t l_atom_copy = l_ch_chain->pkt_data; uint64_t l_atom_copy_size = l_ch_chain->pkt_data_size; - dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash); - dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain); - size_t l_atom_size =0; - if ( l_chain->callback_atom_find_by_hash(l_atom_iter, &l_atom_hash, &l_atom_size) != NULL ) { - dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size); - if(l_atom_add_res == ATOM_ACCEPT && dap_chain_has_file_store(l_chain)) { - // append to file - dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_ch_chain->request_cell_id); - // add one atom only - int l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size); - // rewrite all file - //l_res = dap_chain_cell_file_update(l_cell); - if(!l_cell || l_res < 0) { - log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_atom_hash, - l_cell ? l_cell->file_storage_path : "[null]"); + l_ch_chain->pkt_data = NULL; + l_ch_chain->pkt_data_size = 0; + if( l_atom_copy && l_atom_copy_size){ + dap_hash_fast(l_atom_copy, l_atom_copy_size, &l_atom_hash); + dap_chain_atom_iter_t *l_atom_iter = l_chain->callback_atom_iter_create(l_chain); + size_t l_atom_size =0; + if ( l_chain->callback_atom_find_by_hash(l_atom_iter, &l_atom_hash, &l_atom_size) != NULL ) { + dap_chain_atom_verify_res_t l_atom_add_res = l_chain->callback_atom_add(l_chain, l_atom_copy, l_atom_copy_size); + if(l_atom_add_res == ATOM_ACCEPT && dap_chain_has_file_store(l_chain)) { + // append to file + dap_chain_cell_t *l_cell = dap_chain_cell_create_fill(l_chain, l_ch_chain->request_cell_id); + // add one atom only + int l_res = dap_chain_cell_file_append(l_cell, l_atom_copy, l_atom_copy_size); + // rewrite all file + //l_res = dap_chain_cell_file_update(l_cell); + if(!l_cell || l_res < 0) { + log_it(L_ERROR, "Can't save event 0x%x to the file '%s'", l_atom_hash, + l_cell ? l_cell->file_storage_path : "[null]"); + } + // delete cell and close file + dap_chain_cell_delete(l_cell); } - // delete cell and close file - dap_chain_cell_delete(l_cell); - } - if(l_atom_add_res == ATOM_PASS) + if(l_atom_add_res == ATOM_PASS) + DAP_DELETE(l_atom_copy); + } else { DAP_DELETE(l_atom_copy); - } else { - DAP_DELETE(l_atom_copy); - } - l_chain->callback_atom_iter_delete(l_atom_iter); + } + l_chain->callback_atom_iter_delete(l_atom_iter); + }else + log_it(L_WARNING, "In proc thread got stream ch packet with pkt_size: %zd and pkt_data: %p", l_atom_copy_size, l_atom_copy); dap_events_socket_assign_on_worker_mt(l_ch->stream->esocket, l_ch->stream_worker->worker); return true; } @@ -749,8 +755,7 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) } HASH_DEL(l_ch_chain->request_atoms_lasts, l_atom_item); break; - } - else{ + } else { HASH_DEL(l_ch_chain->request_atoms_lasts, l_atom_item); } }