Skip to content
Snippets Groups Projects
Commit 1c809d21 authored by ruslan.laishev's avatar ruslan.laishev 💬
Browse files

Bugs #5434 - eliminate double allocation of memory

parent e2c23996
No related branches found
No related tags found
2 merge requests!512bugfix-5760,!507Bugs #5434 - eliminate double allocation of memory
Pipeline #12201 passed with stage
in 6 seconds
...@@ -106,8 +106,8 @@ static dap_events_t * s_events_default = NULL; ...@@ -106,8 +106,8 @@ static dap_events_t * s_events_default = NULL;
/** /**
* @brief dap_get_cpu_count * @brief dap_get_cpu_count
* *
* @return uint32_t * @return uint32_t
*/ */
uint32_t dap_get_cpu_count( ) uint32_t dap_get_cpu_count( )
{ {
...@@ -123,7 +123,7 @@ uint32_t dap_get_cpu_count( ) ...@@ -123,7 +123,7 @@ uint32_t dap_get_cpu_count( )
CPU_ZERO( &cs ); CPU_ZERO( &cs );
#endif #endif
#if defined (DAP_OS_ANDROID) #if defined (DAP_OS_ANDROID)
sched_getaffinity( 0, sizeof(cs), &cs ); sched_getaffinity( 0, sizeof(cs), &cs );
#elif defined (DAP_OS_DARWIN) #elif defined (DAP_OS_DARWIN)
int count=0; int count=0;
...@@ -151,8 +151,8 @@ uint32_t dap_get_cpu_count( ) ...@@ -151,8 +151,8 @@ uint32_t dap_get_cpu_count( )
/** /**
* @brief dap_cpu_assign_thread_on * @brief dap_cpu_assign_thread_on
* *
* @param a_cpu_id * @param a_cpu_id
*/ */
void dap_cpu_assign_thread_on(uint32_t a_cpu_id) void dap_cpu_assign_thread_on(uint32_t a_cpu_id)
{ {
...@@ -236,11 +236,11 @@ err: ...@@ -236,11 +236,11 @@ err:
*/ */
void dap_events_deinit( ) void dap_events_deinit( )
{ {
dap_proc_thread_deinit(); dap_proc_thread_deinit();
dap_events_socket_deinit(); dap_events_socket_deinit();
dap_worker_deinit(); dap_worker_deinit();
dap_events_wait(s_events_default); dap_events_wait(s_events_default);
if ( s_threads ) if ( s_threads )
DAP_DELETE( s_threads ); DAP_DELETE( s_threads );
...@@ -267,7 +267,7 @@ dap_events_t * dap_events_new( ) ...@@ -267,7 +267,7 @@ dap_events_t * dap_events_new( )
/** /**
* @brief dap_events_get_default * @brief dap_events_get_default
* simply return s_events_default * simply return s_events_default
* @return dap_events_t* * @return dap_events_t*
*/ */
dap_events_t* dap_events_get_default( ) dap_events_t* dap_events_get_default( )
{ {
...@@ -291,9 +291,9 @@ void dap_events_delete( dap_events_t *a_events ) ...@@ -291,9 +291,9 @@ void dap_events_delete( dap_events_t *a_events )
/** /**
* @brief dap_events_remove_and_delete_socket_unsafe * @brief dap_events_remove_and_delete_socket_unsafe
* calls dap_events_socket_remove_and_delete_unsafe * calls dap_events_socket_remove_and_delete_unsafe
* @param a_events * @param a_events
* @param a_socket * @param a_socket
* @param a_preserve_inheritor * @param a_preserve_inheritor
*/ */
void dap_events_remove_and_delete_socket_unsafe(dap_events_t *a_events, dap_events_socket_t *a_socket, bool a_preserve_inheritor) void dap_events_remove_and_delete_socket_unsafe(dap_events_t *a_events, dap_events_socket_t *a_socket, bool a_preserve_inheritor)
{ {
...@@ -349,10 +349,12 @@ int dap_events_start( dap_events_t *a_events ) ...@@ -349,10 +349,12 @@ int dap_events_start( dap_events_t *a_events )
struct timespec l_timeout; struct timespec l_timeout;
clock_gettime(CLOCK_REALTIME, &l_timeout); clock_gettime(CLOCK_REALTIME, &l_timeout);
l_timeout.tv_sec+=15; l_timeout.tv_sec+=15;
pthread_create( &s_threads[i].tid, NULL, dap_worker_thread, l_worker ); pthread_create( &s_threads[i].tid, NULL, dap_worker_thread, l_worker );
int l_ret; int l_ret;
l_ret=pthread_cond_timedwait(&l_worker->started_cond, &l_worker->started_mutex, &l_timeout); l_ret=pthread_cond_timedwait(&l_worker->started_cond, &l_worker->started_mutex, &l_timeout);
pthread_mutex_unlock(&l_worker->started_mutex); pthread_mutex_unlock(&l_worker->started_mutex);
if ( l_ret== ETIMEDOUT ){ if ( l_ret== ETIMEDOUT ){
log_it(L_CRITICAL, "Timeout 15 seconds is out: worker #%u thread don't respond", i); log_it(L_CRITICAL, "Timeout 15 seconds is out: worker #%u thread don't respond", i);
return -2; return -2;
...@@ -361,13 +363,17 @@ int dap_events_start( dap_events_t *a_events ) ...@@ -361,13 +363,17 @@ int dap_events_start( dap_events_t *a_events )
return -3; return -3;
} }
} }
#if 0 // @RRL: Bugfix-5434
// Link queues between // Link queues between
for( uint32_t i = 0; i < s_threads_count; i++) { for( uint32_t i = 0; i < s_threads_count; i++) {
dap_worker_t * l_worker = s_workers[i]; dap_worker_t * l_worker = s_workers[i];
l_worker->queue_es_new_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* s_threads_count); l_worker->queue_es_new_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* s_threads_count);
l_worker->queue_es_delete_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* s_threads_count); l_worker->queue_es_delete_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* s_threads_count);
l_worker->queue_es_reassign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* s_threads_count); l_worker->queue_es_reassign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* s_threads_count);
l_worker->queue_es_io_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* s_threads_count); l_worker->queue_es_io_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* s_threads_count);
for( uint32_t n = 0; n < s_threads_count; n++) { for( uint32_t n = 0; n < s_threads_count; n++) {
l_worker->queue_es_new_input[n] = dap_events_socket_queue_ptr_create_input(s_workers[n]->queue_es_new); l_worker->queue_es_new_input[n] = dap_events_socket_queue_ptr_create_input(s_workers[n]->queue_es_new);
l_worker->queue_es_delete_input[n] = dap_events_socket_queue_ptr_create_input(s_workers[n]->queue_es_delete); l_worker->queue_es_delete_input[n] = dap_events_socket_queue_ptr_create_input(s_workers[n]->queue_es_delete);
...@@ -375,6 +381,7 @@ int dap_events_start( dap_events_t *a_events ) ...@@ -375,6 +381,7 @@ int dap_events_start( dap_events_t *a_events )
l_worker->queue_es_io_input[n] = dap_events_socket_queue_ptr_create_input(s_workers[n]->queue_es_io); l_worker->queue_es_io_input[n] = dap_events_socket_queue_ptr_create_input(s_workers[n]->queue_es_io);
} }
} }
#endif
// Init callback processor // Init callback processor
if (dap_proc_thread_init(s_threads_count) != 0 ){ if (dap_proc_thread_init(s_threads_count) != 0 ){
...@@ -452,10 +459,7 @@ dap_worker_t *dap_events_worker_get_auto( ) ...@@ -452,10 +459,7 @@ dap_worker_t *dap_events_worker_get_auto( )
*/ */
dap_worker_t * dap_events_worker_get(uint8_t a_index) dap_worker_t * dap_events_worker_get(uint8_t a_index)
{ {
if (a_index < s_threads_count){ return (a_index < s_threads_count) ? s_workers[a_index] : NULL;
return s_workers[a_index];
}else
return NULL;
} }
/** /**
...@@ -463,8 +467,8 @@ dap_worker_t * dap_events_worker_get(uint8_t a_index) ...@@ -463,8 +467,8 @@ dap_worker_t * dap_events_worker_get(uint8_t a_index)
*/ */
void dap_events_worker_print_all( ) void dap_events_worker_print_all( )
{ {
uint32_t i;
for( i = 0; i < s_threads_count; i ++ ) { for( int i = 0; i < s_threads_count; i ++ ) {
log_it( L_INFO, "Worker: %d, count open connections: %d", log_it( L_INFO, "Worker: %d, count open connections: %d",
s_workers[i]->id, s_workers[i]->event_sockets_count ); s_workers[i]->id, s_workers[i]->event_sockets_count );
} }
......
...@@ -119,7 +119,7 @@ void *dap_worker_thread(void *arg) ...@@ -119,7 +119,7 @@ void *dap_worker_thread(void *arg)
struct sched_param l_shed_params; struct sched_param l_shed_params;
l_shed_params.sched_priority = 0; l_shed_params.sched_priority = 0;
#ifdef DAP_OS_WINDOWS #ifdef DAP_OS_WINDOWS
if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL)) if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL))
log_it(L_ERROR, "Couldn'r set thread priority, err: %lu", GetLastError()); log_it(L_ERROR, "Couldn'r set thread priority, err: %lu", GetLastError());
#else #else
pthread_setschedparam(pthread_self(),SCHED_FIFO ,&l_shed_params); pthread_setschedparam(pthread_self(),SCHED_FIFO ,&l_shed_params);
...@@ -148,6 +148,7 @@ void *dap_worker_thread(void *arg) ...@@ -148,6 +148,7 @@ void *dap_worker_thread(void *arg)
#else #else
#error "Unimplemented socket array for this platform" #error "Unimplemented socket array for this platform"
#endif #endif
l_worker->queue_es_new_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); l_worker->queue_es_new_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() );
l_worker->queue_es_delete_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); l_worker->queue_es_delete_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() );
l_worker->queue_es_io_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() ); l_worker->queue_es_io_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)* dap_events_worker_get_count() );
...@@ -159,9 +160,18 @@ void *dap_worker_thread(void *arg) ...@@ -159,9 +160,18 @@ void *dap_worker_thread(void *arg)
l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_es_io_callback); l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_es_io_callback);
l_worker->queue_es_reassign = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_es_reassign_callback ); l_worker->queue_es_reassign = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_es_reassign_callback );
for( int n = 0; n < dap_events_worker_get_count(); n++) {
l_worker->queue_es_new_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_new);
l_worker->queue_es_delete_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_delete);
l_worker->queue_es_io_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_io);
l_worker->queue_es_reassign_input[n] = dap_events_socket_queue_ptr_create_input(l_worker->queue_es_reassign);
}
l_worker->queue_callback = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_callback_callback); l_worker->queue_callback = dap_events_socket_create_type_queue_ptr_unsafe(l_worker, s_queue_callback_callback);
l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback); l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback);
l_worker->timer_check_activity = dap_timerfd_create(s_connection_timeout * 1000 / 2, l_worker->timer_check_activity = dap_timerfd_create(s_connection_timeout * 1000 / 2,
s_socket_all_check_activity, l_worker); s_socket_all_check_activity, l_worker);
dap_worker_add_events_socket_unsafe( l_worker->timer_check_activity->events_socket, l_worker); dap_worker_add_events_socket_unsafe( l_worker->timer_check_activity->events_socket, l_worker);
...@@ -170,8 +180,8 @@ void *dap_worker_thread(void *arg) ...@@ -170,8 +180,8 @@ void *dap_worker_thread(void *arg)
pthread_mutex_unlock(&l_worker->started_mutex); pthread_mutex_unlock(&l_worker->started_mutex);
bool s_loop_is_active = true; bool s_loop_is_active = true;
while(s_loop_is_active) { while(s_loop_is_active) {
int l_selected_sockets; int l_selected_sockets;
size_t l_sockets_max; size_t l_sockets_max;
#ifdef DAP_EVENTS_CAPS_EPOLL #ifdef DAP_EVENTS_CAPS_EPOLL
l_selected_sockets = epoll_wait(l_worker->epoll_fd, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1); l_selected_sockets = epoll_wait(l_worker->epoll_fd, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1);
l_sockets_max = l_selected_sockets; l_sockets_max = l_selected_sockets;
...@@ -657,9 +667,9 @@ void *dap_worker_thread(void *arg) ...@@ -657,9 +667,9 @@ void *dap_worker_thread(void *arg)
l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL); l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL);
#ifdef DAP_OS_WINDOWS #ifdef DAP_OS_WINDOWS
//dap_events_socket_set_writable_unsafe(l_cur,false); // enabling this will break windows server replies //dap_events_socket_set_writable_unsafe(l_cur,false); // enabling this will break windows server replies
l_errno = WSAGetLastError(); l_errno = WSAGetLastError();
#else #else
l_errno = errno; l_errno = errno;
#endif #endif
} }
break; break;
...@@ -729,7 +739,7 @@ void *dap_worker_thread(void *arg) ...@@ -729,7 +739,7 @@ void *dap_worker_thread(void *arg)
l_errno = errno; l_errno = errno;
if (l_bytes_sent == -1 && l_errno == EINVAL) // To make compatible with other if (l_bytes_sent == -1 && l_errno == EINVAL) // To make compatible with other
l_errno = EAGAIN; // non-blocking sockets l_errno = EAGAIN; // non-blocking sockets
#elif defined (DAP_EVENTS_CAPS_KQUEUE) #elif defined (DAP_EVENTS_CAPS_KQUEUE)
struct kevent* l_event=&l_cur->kqueue_event; struct kevent* l_event=&l_cur->kqueue_event;
dap_events_socket_w_data_t * l_es_w_data = DAP_NEW_Z(dap_events_socket_w_data_t); dap_events_socket_w_data_t * l_es_w_data = DAP_NEW_Z(dap_events_socket_w_data_t);
l_es_w_data->esocket = l_cur; l_es_w_data->esocket = l_cur;
...@@ -743,7 +753,7 @@ void *dap_worker_thread(void *arg) ...@@ -743,7 +753,7 @@ void *dap_worker_thread(void *arg)
log_it(L_WARNING,"queue ptr send error: kevent %p errno: %d", l_es_w_data, l_errno); log_it(L_WARNING,"queue ptr send error: kevent %p errno: %d", l_es_w_data, l_errno);
DAP_DELETE(l_es_w_data); DAP_DELETE(l_es_w_data);
} }
#else #else
#error "Not implemented dap_events_socket_queue_ptr_send() for this platform" #error "Not implemented dap_events_socket_queue_ptr_send() for this platform"
#endif #endif
...@@ -1212,8 +1222,8 @@ int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_wo ...@@ -1212,8 +1222,8 @@ int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_wo
} }
struct kevent l_event; struct kevent l_event;
u_short l_flags = a_esocket->kqueue_base_flags; u_short l_flags = a_esocket->kqueue_base_flags;
u_int l_fflags = a_esocket->kqueue_base_fflags; u_int l_fflags = a_esocket->kqueue_base_fflags;
short l_filter = a_esocket->kqueue_base_filter; short l_filter = a_esocket->kqueue_base_filter;
int l_kqueue_fd =a_worker->kqueue_fd; int l_kqueue_fd =a_worker->kqueue_fd;
if ( l_kqueue_fd == -1 ){ if ( l_kqueue_fd == -1 ){
log_it(L_ERROR, "Esocket is not assigned with anything ,exit"); log_it(L_ERROR, "Esocket is not assigned with anything ,exit");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment