diff --git a/dap-sdk/net/client/dap_client.c b/dap-sdk/net/client/dap_client.c index b55f4d0229142f63b2281d71b9c4af76f7b2f5c0..15f40ca49ce61311b2e07653da242ba876656d27 100644 --- a/dap-sdk/net/client/dap_client.c +++ b/dap-sdk/net/client/dap_client.c @@ -233,6 +233,7 @@ void dap_client_delete(dap_client_t * a_client) //memset(a_client, 0, sizeof(dap_client_t)); //pthread_mutex_unlock(l_mutex); pthread_mutex_unlock(&a_client->mutex); + pthread_mutex_destroy(&a_client->mutex); // a_client will be deleted in dap_events_socket_delete() -> free( a_es->_inheritor ); //DAP_DELETE(a_client); DAP_DEL_Z(a_client); diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 5a3ca7692c94c91b10a17f7e2d2401813e52ce60..61de1cc22451ca34e7509afa580121ebf7dbef39 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -283,7 +283,7 @@ int dap_client_pvt_disconnect(dap_client_pvt_t *a_client_pvt) // l_client_internal->stream_es->signal_close = true; // start stopping connection if(a_client_pvt->stream_es ) { - dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_es->worker, a_client_pvt->stream_es); + dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es); int l_counter = 0; // wait for stop of connection (max 0.7 sec.) while(a_client_pvt->stream_es && l_counter < 70) { @@ -501,8 +501,6 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) assert(l_worker); assert(l_worker->_inheritor); a_client_pvt->stream_worker = DAP_STREAM_WORKER(l_worker); - // add to dap_worker - dap_events_socket_assign_on_worker_mt(a_client_pvt->stream_es, l_worker); a_client_pvt->stream_es->_inheritor = a_client_pvt;//->client; a_client_pvt->stream = dap_stream_new_es_client(a_client_pvt->stream_es); @@ -523,9 +521,11 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) if(inet_pton(AF_INET, a_client_pvt->uplink_addr, &(l_remote_addr.sin_addr)) < 0) { log_it(L_ERROR, "Wrong remote address '%s:%u'", a_client_pvt->uplink_addr, a_client_pvt->uplink_port); //close(a_client_pvt->stream_socket); - dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es); + //dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es); //a_client_pvt->stream_socket = 0; a_client_pvt->stage_status = STAGE_STATUS_ERROR; + dap_events_socket_delete_unsafe(a_client_pvt->stream_es, true); + a_client_pvt->stream_es = NULL; } else { int l_err = 0; @@ -537,14 +537,18 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) log_it(L_INFO, "Remote address connected (%s:%u) with sock_id %d (assign on worker #%u)", a_client_pvt->uplink_addr, a_client_pvt->uplink_port, a_client_pvt->stream_socket, l_worker->id); a_client_pvt->stage_status = STAGE_STATUS_DONE; + // add to dap_worker + dap_events_socket_assign_on_worker_mt(a_client_pvt->stream_es, l_worker); } else { log_it(L_ERROR, "Remote address can't connected (%s:%u) with sock_id %d", a_client_pvt->uplink_addr, a_client_pvt->uplink_port); - dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es); + //dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es); //close(a_client_pvt->stream_socket); a_client_pvt->stream_socket = 0; a_client_pvt->stage_status = STAGE_STATUS_ERROR; + dap_events_socket_delete_unsafe(a_client_pvt->stream_es, true); + a_client_pvt->stream_es = NULL; } } s_stage_status_after(a_client_pvt); diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 0676e1b9fbe8bb2d1812108b32a0709c5383a2c5..d91f330711a7ff31159b9c521dc5514cfffdeb82 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -785,42 +785,54 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool return; //log_it( L_DEBUG, "es is going to be removed from the lists and free the memory (0x%016X)", a_es ); - dap_events_socket_remove_from_worker_unsafe(a_es, a_es->worker); + if (a_es->worker) + dap_events_socket_remove_from_worker_unsafe(a_es, a_es->worker); - if (a_es->events){ // It could be socket NOT from events - pthread_rwlock_wrlock( &a_es->events->sockets_rwlock ); - if(!dap_events_socket_find_unsafe(a_es->socket, a_es->events)){ - log_it( L_ERROR, "dap_events_socket 0x%x already deleted", a_es); - pthread_rwlock_unlock( &a_es->events->sockets_rwlock ); - return ; - } - - if(a_es->events->sockets) - HASH_DEL( a_es->events->sockets, a_es ); - pthread_rwlock_unlock( &a_es->events->sockets_rwlock ); - } //log_it( L_DEBUG, "dap_events_socket wrapped around %d socket is removed", a_es->socket ); if( a_es->callbacks.delete_callback ) a_es->callbacks.delete_callback( a_es, NULL ); // Init internal structure - if ( a_es->_inheritor && !preserve_inheritor ) - DAP_DELETE( a_es->_inheritor ); + dap_events_socket_delete_unsafe(a_es, preserve_inheritor); +} - if ( a_es->socket && a_es->socket != -1) { -#ifdef _WIN32 - closesocket( a_es->socket ); -#else - close( a_es->socket ); -#ifdef DAP_EVENTS_CAPS_QUEUE_PIPE2 - if( a_es->type == DESCRIPTOR_TYPE_QUEUE){ - close( a_es->fd2); +/** + * @brief dap_events_socket_delete_unsafe + * @param a_esocket + * @param a_preserve_inheritor + */ +void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_preserve_inheritor) +{ + if (a_esocket->events){ // It could be socket NOT from events + pthread_rwlock_wrlock( &a_esocket->events->sockets_rwlock ); + if(!dap_events_socket_find_unsafe(a_esocket->socket, a_esocket->events)){ + log_it( L_ERROR, "dap_events_socket 0x%x already deleted", a_esocket); + pthread_rwlock_unlock( &a_esocket->events->sockets_rwlock ); + return ; } -#endif -#endif + if(a_esocket->events->sockets) + HASH_DEL( a_esocket->events->sockets, a_esocket ); + pthread_rwlock_unlock( &a_esocket->events->sockets_rwlock ); + } + + if ( a_esocket->_inheritor && !a_preserve_inheritor ) + DAP_DELETE( a_esocket->_inheritor ); + + if ( a_esocket->socket && a_esocket->socket != -1) { + #ifdef _WIN32 + closesocket( a_esocket->socket ); + #else + close( a_esocket->socket ); + #ifdef DAP_EVENTS_CAPS_QUEUE_PIPE2 + if( a_esocket->type == DESCRIPTOR_TYPE_QUEUE){ + close( a_esocket->fd2); + } + #endif + + #endif } - DAP_DELETE( a_es ); + DAP_DELETE( a_esocket ); } /** diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index ceb4bc717ec6b9f350484dbdd3fdf0a49fb59fc5..fff81c6cb718a9ff9b15180798e2a50f1a3e3bcc 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -209,6 +209,8 @@ dap_events_socket_t * dap_events_socket_create_type_pipe_mt(dap_worker_t * a_w, int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg); int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value); +void dap_events_socket_delete_unsafe( dap_events_socket_t * a_esocket , bool a_preserve_inheritor); + dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, int a_sock, dap_events_socket_callbacks_t *a_callbacks ); dap_events_socket_t * dap_events_socket_wrap2( dap_server_t *a_server, struct dap_events *a_events, diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index 9904f8dac112dc590379281f94be244dfe0f089d..e57dc4e405009b108375f5d6491044d5f37750bd 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -338,7 +338,6 @@ dap_stream_t* dap_stream_new_es_client(dap_events_socket_t * a_esocket) ret->esocket = a_esocket; ret->buf_defrag_size=0; ret->is_client_to_uplink = true; - log_it(L_NOTICE,"New stream with events socket instance for %s",a_esocket->hostaddr); return ret; } diff --git a/dap-sdk/net/stream/stream/dap_stream_ctl.c b/dap-sdk/net/stream/stream/dap_stream_ctl.c index 52bf55be3d7ea1de459d6ebd43193bb756723631..515a05b2ba96697b63aed520e69614148370b928 100644 --- a/dap-sdk/net/stream/stream/dap_stream_ctl.c +++ b/dap-sdk/net/stream/stream/dap_stream_ctl.c @@ -129,17 +129,13 @@ void s_proc(struct dap_http_simple *a_http_simple, void * a_arg) char * l_subtok_name = strtok_r(l_tok, "=",&l_subtok_tmp); char * l_subtok_value = strtok_r(NULL, "=",&l_subtok_tmp); if (l_subtok_value){ - log_it(L_DEBUG, "tok = %s value =%s",l_subtok_name,l_subtok_value); if ( strcmp(l_subtok_name,"channels")==0 ){ strncpy(l_channels_str,l_subtok_value,sizeof (l_channels_str)-1); - log_it(L_DEBUG,"Param: channels=%s",l_channels_str); }else if(strcmp(l_subtok_name,"enc_type")==0){ l_enc_type = atoi(l_subtok_value); - log_it(L_DEBUG,"Param: enc_type=%s",dap_enc_get_type_name(l_enc_type)); l_is_legacy = false; }else if(strcmp(l_subtok_name,"enc_headers")==0){ l_enc_headers = atoi(l_subtok_value); - log_it(L_DEBUG,"Param: enc_headers=%d",l_enc_headers); } } l_tok = strtok_r(NULL, ",",&l_tok_tmp) ; @@ -149,9 +145,10 @@ void s_proc(struct dap_http_simple *a_http_simple, void * a_arg) log_it(L_INFO, "legacy encryption mode used (OAES)"); l_enc_type = DAP_ENC_KEY_TYPE_OAES; l_new_session = true; - } - if(l_new_session){ + }else + log_it(L_DEBUG,"Encryption type %s (enc headers %d)",dap_enc_get_type_name(l_enc_type), l_enc_headers); + if(l_new_session){ ss = dap_stream_session_pure_new(); strncpy(ss->active_channels, l_channels_str, l_channels_str_size); char *key_str = calloc(1, KEX_KEY_STR_SIZE+1);