Skip to content
Snippets Groups Projects
Commit bdd91529 authored by dmitriy.gerasimov's avatar dmitriy.gerasimov
Browse files

Merge branch 'support-4390' into 'testing'

support-4390

See merge request !172
parents 72e27bbc 674d06fc
No related branches found
No related tags found
4 merge requests!251Master,!250Master,!187Release 2.5 version,!172support-4390
Pipeline #4736 passed with stage
in 10 seconds
...@@ -107,7 +107,6 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, ...@@ -107,7 +107,6 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events,
void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct dap_worker * a_worker) void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct dap_worker * a_worker)
{ {
a_es->last_ping_request = time(NULL); a_es->last_ping_request = time(NULL);
a_es->worker = a_worker;
dap_worker_add_events_socket(a_es,a_worker); dap_worker_add_events_socket(a_es,a_worker);
} }
...@@ -556,7 +555,8 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *sc, bool is_rea ...@@ -556,7 +555,8 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *sc, bool is_rea
int l_errno = errno; int l_errno = errno;
char l_errbuf[128]; char l_errbuf[128];
strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
log_it(L_ERROR,"Can't update write client socket state in the epoll_fd: \"%s\" (%d)", l_errbuf, l_errno); log_it(L_ERROR,"Can't update write client socket state in the epoll_fd %d: \"%s\" (%d)",
sc->worker->epoll_fd, l_errbuf, l_errno);
} }
} }
...@@ -614,13 +614,23 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool ...@@ -614,13 +614,23 @@ void dap_events_socket_remove_and_delete_unsafe( dap_events_socket_t *a_es, bool
*/ */
void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap_worker_t * a_worker) void dap_events_socket_remove_from_worker_unsafe( dap_events_socket_t *a_es, dap_worker_t * a_worker)
{ {
if ( epoll_ctl( a_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 ) if (!a_es->worker) {
log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd" ); // Socket already removed from worker
return;
}
if ( epoll_ctl( a_worker->epoll_fd, EPOLL_CTL_DEL, a_es->socket, &a_es->ev) == -1 ) {
int l_errno = errno;
char l_errbuf[128];
strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
log_it( L_ERROR,"Can't remove event socket's handler from the epoll_fd %d \"%s\" (%d)",
a_worker->epoll_fd, l_errbuf, l_errno);
}
else else
log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_worker->id ); log_it( L_DEBUG,"Removed epoll's event from dap_worker #%u", a_worker->id );
a_worker->event_sockets_count--; a_worker->event_sockets_count--;
if(a_worker->esockets) if(a_worker->esockets)
HASH_DELETE(hh_worker,a_worker->esockets, a_es); HASH_DELETE(hh_worker,a_worker->esockets, a_es);
a_es->worker = NULL;
} }
/** /**
......
...@@ -137,7 +137,8 @@ void *dap_worker_thread(void *arg) ...@@ -137,7 +137,8 @@ void *dap_worker_thread(void *arg)
l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
log_it(L_DEBUG, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err)); log_it(L_DEBUG, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err));
} }
default: log_it(L_WARNING, "Unimplemented EPOLLHUP for socket type %d", l_cur->type); break;
default: log_it(L_WARNING, "Unimplemented EPOLLHUP for socket type %d", l_cur->type);
} }
} }
...@@ -346,12 +347,12 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) ...@@ -346,12 +347,12 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg)
dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg; dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg;
dap_worker_t * w = a_es->worker; dap_worker_t * w = a_es->worker;
//log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new); //log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new);
l_es_new->worker = w;
if ( l_es_new->type == DESCRIPTOR_TYPE_SOCKET || l_es_new->type == DESCRIPTOR_TYPE_SOCKET_LISTENING ){ if ( l_es_new->type == DESCRIPTOR_TYPE_SOCKET || l_es_new->type == DESCRIPTOR_TYPE_SOCKET_LISTENING ){
int l_cpu = w->id; int l_cpu = w->id;
setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu)); setsockopt(l_es_new->socket , SOL_SOCKET, SO_INCOMING_CPU, &l_cpu, sizeof(l_cpu));
} }
bool l_socket_present = (l_es_new->worker && l_es_new->is_initalized) ? true : false;
l_es_new->worker = w;
// We need to differ new and reassigned esockets. If its new - is_initialized is false // We need to differ new and reassigned esockets. If its new - is_initialized is false
if ( ! l_es_new->is_initalized ){ if ( ! l_es_new->is_initalized ){
if (l_es_new->callbacks.new_callback) if (l_es_new->callbacks.new_callback)
...@@ -369,6 +370,10 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) ...@@ -369,6 +370,10 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg)
if(l_es_new->flags & DAP_SOCK_READY_TO_WRITE ) if(l_es_new->flags & DAP_SOCK_READY_TO_WRITE )
l_es_new->ev.events |= EPOLLOUT; l_es_new->ev.events |= EPOLLOUT;
l_es_new->ev.data.ptr = l_es_new; l_es_new->ev.data.ptr = l_es_new;
if (l_socket_present) {
// Update only flags, socket already present in worker
return;
}
l_ret = epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, l_es_new->socket, &l_es_new->ev); l_ret = epoll_ctl(w->epoll_fd, EPOLL_CTL_ADD, l_es_new->socket, &l_es_new->ev);
#else #else
#error "Unimplemented new esocket on worker callback for current platform" #error "Unimplemented new esocket on worker callback for current platform"
...@@ -383,7 +388,7 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) ...@@ -383,7 +388,7 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg)
// Add in worker // Add in worker
l_es_new->me = l_es_new; l_es_new->me = l_es_new;
HASH_ADD(hh_worker, w->esockets, me, sizeof(void *), l_es_new ); HASH_ADD(hh_worker, w->esockets, me, sizeof(void *), l_es_new );
w->event_sockets_count++;
log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->id); log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->id);
if (l_es_new->callbacks.worker_assign_callback) if (l_es_new->callbacks.worker_assign_callback)
l_es_new->callbacks.worker_assign_callback(l_es_new, w); l_es_new->callbacks.worker_assign_callback(l_es_new, w);
...@@ -546,8 +551,7 @@ dap_worker_t *dap_worker_add_events_socket_auto( dap_events_socket_t *a_es) ...@@ -546,8 +551,7 @@ dap_worker_t *dap_worker_add_events_socket_auto( dap_events_socket_t *a_es)
// struct epoll_event ev = {0}; // struct epoll_event ev = {0};
dap_worker_t *l_worker = dap_events_worker_get_auto( ); dap_worker_t *l_worker = dap_events_worker_get_auto( );
a_es->worker = l_worker; a_es->events = l_worker->events;
a_es->events = a_es->worker->events;
dap_worker_add_events_socket( a_es, l_worker); dap_worker_add_events_socket( a_es, l_worker);
return l_worker; return l_worker;
} }
......
...@@ -145,7 +145,7 @@ struct dap_stream_ch_table_t *dap_stream_ch_valid(dap_stream_ch_t *a_ch) ...@@ -145,7 +145,7 @@ struct dap_stream_ch_table_t *dap_stream_ch_valid(dap_stream_ch_t *a_ch)
*/ */
void dap_stream_ch_delete(dap_stream_ch_t *a_ch) void dap_stream_ch_delete(dap_stream_ch_t *a_ch)
{ {
dap_stream_worker_t * l_stream_worker = DAP_STREAM_WORKER( a_ch->stream->esocket->worker ); dap_stream_worker_t * l_stream_worker = a_ch->stream_worker;
HASH_DELETE(hh_worker,l_stream_worker->channels, a_ch); HASH_DELETE(hh_worker,l_stream_worker->channels, a_ch);
...@@ -189,7 +189,7 @@ void dap_stream_ch_set_ready_to_read_unsafe(dap_stream_ch_t * a_ch,bool a_is_rea ...@@ -189,7 +189,7 @@ void dap_stream_ch_set_ready_to_read_unsafe(dap_stream_ch_t * a_ch,bool a_is_rea
if( a_ch->ready_to_read != a_is_ready){ if( a_ch->ready_to_read != a_is_ready){
//log_it(L_DEBUG,"Change channel '%c' to %s", (char) ch->proc->id, is_ready?"true":"false"); //log_it(L_DEBUG,"Change channel '%c' to %s", (char) ch->proc->id, is_ready?"true":"false");
a_ch->ready_to_read=a_is_ready; a_ch->ready_to_read=a_is_ready;
dap_events_socket_set_readable_unsafe( a_ch->stream->esocket,a_is_ready); dap_events_socket_set_readable_unsafe(a_ch->stream->esocket, a_is_ready);
} }
} }
...@@ -205,7 +205,7 @@ void dap_stream_ch_set_ready_to_write_unsafe(dap_stream_ch_t * ch,bool is_ready) ...@@ -205,7 +205,7 @@ void dap_stream_ch_set_ready_to_write_unsafe(dap_stream_ch_t * ch,bool is_ready)
ch->ready_to_write=is_ready; ch->ready_to_write=is_ready;
if(is_ready && ch->stream->conn_http) if(is_ready && ch->stream->conn_http)
ch->stream->conn_http->state_write=DAP_HTTP_CLIENT_STATE_DATA; ch->stream->conn_http->state_write=DAP_HTTP_CLIENT_STATE_DATA;
dap_events_socket_set_writable_unsafe(ch->stream->esocket,is_ready); dap_events_socket_set_writable_unsafe(ch->stream->esocket, is_ready);
} }
} }
......
...@@ -39,6 +39,10 @@ int dap_stream_worker_init() ...@@ -39,6 +39,10 @@ int dap_stream_worker_init()
uint32_t l_worker_count = dap_events_worker_get_count(); uint32_t l_worker_count = dap_events_worker_get_count();
for (uint32_t i = 0; i < l_worker_count; i++){ for (uint32_t i = 0; i < l_worker_count; i++){
dap_worker_t * l_worker = dap_events_worker_get(i); dap_worker_t * l_worker = dap_events_worker_get(i);
if (!l_worker) {
log_it(L_CRITICAL,"Can't init stream worker, woreker thread don't exist");
return -2;
}
if (l_worker->_inheritor){ if (l_worker->_inheritor){
log_it(L_CRITICAL,"Can't init stream worker, core worker has already inheritor"); log_it(L_CRITICAL,"Can't init stream worker, core worker has already inheritor");
return -1; return -1;
......
This diff is collapsed.
...@@ -55,6 +55,8 @@ typedef struct dap_stream_ch_chain { ...@@ -55,6 +55,8 @@ typedef struct dap_stream_ch_chain {
dap_chain_atom_iter_t * request_atom_iter; dap_chain_atom_iter_t * request_atom_iter;
dap_chain_atom_item_t * request_atoms_lasts; dap_chain_atom_item_t * request_atoms_lasts;
dap_chain_atom_item_t * request_atoms_processed; dap_chain_atom_item_t * request_atoms_processed;
uint8_t *pkt_data;
uint64_t pkt_data_size;
uint64_t stats_request_atoms_processed; uint64_t stats_request_atoms_processed;
uint64_t stats_request_gdb_processed; uint64_t stats_request_gdb_processed;
dap_stream_ch_chain_sync_request_t request; dap_stream_ch_chain_sync_request_t request;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment