From 1853f811d3dad1cc5c66c26ffc45fdc30f907071 Mon Sep 17 00:00:00 2001 From: "Dmitriy A. Gerasimov" <dmitriy.gerasimov@demlabs.net> Date: Thu, 12 Nov 2020 21:23:54 +0700 Subject: [PATCH] [!] Fixed behaviour of dap_http_simple: now esocket stays in reactor, changes only write flag --- dap-sdk/net/core/dap_proc_thread.c | 52 +++++++++++++++++++ dap-sdk/net/core/include/dap_proc_thread.h | 4 ++ .../net/server/http_server/dap_http_simple.c | 27 ++++------ modules/channel/chain/dap_stream_ch_chain.c | 8 ++- 4 files changed, 74 insertions(+), 17 deletions(-) diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 0f3b5bfe0d..e80d1d552b 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -495,4 +495,56 @@ bool dap_proc_thread_assign_on_worker_inter(dap_proc_thread_t * a_thread, dap_wo return true; } +/** + * @brief dap_proc_thread_esocket_write_inter + * @param a_thread + * @param a_worker + * @param a_esocket + * @param a_data + * @param a_data_size + * @return + */ +int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_esocket, + const void * a_data, size_t a_data_size) +{ + dap_events_socket_t * l_es_io_input = a_thread->queue_io_input[a_worker->id]; + dap_events_socket_write_inter(l_es_io_input,a_esocket, a_data, a_data_size); + l_es_io_input->flags |= DAP_SOCK_READY_TO_WRITE; + s_update_poll_flags(a_thread, l_es_io_input); + return 0; +} + + +/** + * @brief dap_proc_thread_esocket_write_f_inter + * @param a_thread + * @param a_worker + * @param a_esocket + * @param a_format + * @return + */ +int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_esocket, + const char * a_format,...) +{ + va_list ap, ap_copy; + va_start(ap,a_format); + va_copy(ap_copy, ap); + int l_data_size = dap_vsnprintf(NULL,0,a_format,ap); + va_end(ap); + if (l_data_size <0 ){ + log_it(L_ERROR,"Can't write out formatted data '%s' with values",a_format); + va_end(ap_copy); + return 0; + } + + dap_events_socket_t * l_es_io_input = a_thread->queue_io_input[a_worker->id]; + char * l_data = DAP_NEW_SIZE(char,l_data_size+1); + l_data_size = dap_vsprintf(l_data,a_format,ap_copy); + va_end(ap_copy); + + dap_events_socket_write_inter(l_es_io_input,a_esocket, l_data, l_data_size); + l_es_io_input->flags |= DAP_SOCK_READY_TO_WRITE; + s_update_poll_flags(a_thread, l_es_io_input); + return 0; +} diff --git a/dap-sdk/net/core/include/dap_proc_thread.h b/dap-sdk/net/core/include/dap_proc_thread.h index b269e0d1e0..02374e8e25 100644 --- a/dap-sdk/net/core/include/dap_proc_thread.h +++ b/dap-sdk/net/core/include/dap_proc_thread.h @@ -61,3 +61,7 @@ int dap_proc_thread_init(uint32_t a_threads_count); dap_proc_thread_t * dap_proc_thread_get(uint32_t a_thread_number); dap_proc_thread_t * dap_proc_thread_get_auto(); bool dap_proc_thread_assign_on_worker_inter(dap_proc_thread_t * a_thread, dap_worker_t * a_worker, dap_events_socket_t *a_esocket ); +int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_esocket, + const void * a_data, size_t a_data_size); +int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_esocket, + const char * a_format,...); diff --git a/dap-sdk/net/server/http_server/dap_http_simple.c b/dap-sdk/net/server/http_server/dap_http_simple.c index 60abea7f92..b1c0266cf4 100644 --- a/dap-sdk/net/server/http_server/dap_http_simple.c +++ b/dap-sdk/net/server/http_server/dap_http_simple.c @@ -225,28 +225,26 @@ inline static bool _is_supported_user_agents_list_setted() return cnt; } -inline static void _set_only_write_http_client_state(dap_http_client_t* http_client) +inline static void _set_only_write_http_client_state(dap_http_simple_t * a_simple) { // log_it(L_DEBUG,"_set_only_write_http_client_state"); - - http_client->state_write=DAP_HTTP_CLIENT_STATE_START; - dap_events_socket_set_writable_unsafe(http_client->esocket,true); - dap_events_socket_set_readable_unsafe(http_client->esocket, false); + a_simple->http_client->state_write=DAP_HTTP_CLIENT_STATE_START; + dap_events_socket_set_writable_mt(a_simple->worker, a_simple->http_client->esocket,true); } -static void _copy_reply_and_mime_to_response( dap_http_simple_t *cl_sh ) +static void _copy_reply_and_mime_to_response( dap_http_simple_t *a_simple ) { // log_it(L_DEBUG,"_copy_reply_and_mime_to_response"); // Sleep(300); - if( !cl_sh->reply_size ) { + if( !a_simple->reply_size ) { log_it( L_WARNING, " cl_sh->reply_size equal 0" ); return; } - cl_sh->http_client->out_content_length = cl_sh->reply_size; - strcpy( cl_sh->http_client->out_content_type, cl_sh->reply_mime ); + a_simple->http_client->out_content_length = a_simple->reply_size; + strcpy( a_simple->http_client->out_content_type, a_simple->reply_mime ); return; } @@ -291,8 +289,7 @@ bool s_proc_queue_callback(dap_proc_thread_t * a_thread, void * a_arg ) if(header == NULL && is_unknown_user_agents_pass == false) { const char error_msg[] = "Not found User-Agent HTTP header"; _write_response_bad_request(l_http_simple, error_msg); - _set_only_write_http_client_state( l_http_simple->http_client); - dap_proc_thread_assign_on_worker_inter(a_thread, l_http_simple->worker, l_http_simple->esocket ); + _set_only_write_http_client_state( l_http_simple); return true; } @@ -300,8 +297,7 @@ bool s_proc_queue_callback(dap_proc_thread_t * a_thread, void * a_arg ) log_it(L_DEBUG, "Not supported user agent in request: %s", header->value); const char* error_msg = "User-Agent version not supported. Update your software"; _write_response_bad_request(l_http_simple, error_msg); - _set_only_write_http_client_state( l_http_simple->http_client); - dap_proc_thread_assign_on_worker_inter(a_thread, l_http_simple->worker, l_http_simple->esocket ); + _set_only_write_http_client_state( l_http_simple); return true; } } @@ -317,8 +313,7 @@ bool s_proc_queue_callback(dap_proc_thread_t * a_thread, void * a_arg ) l_http_simple->http_client->reply_status_code = Http_Status_InternalServerError; } - _set_only_write_http_client_state( l_http_simple->http_client); - dap_proc_thread_assign_on_worker_inter(a_thread, l_http_simple->worker, l_http_simple->esocket ); + _set_only_write_http_client_state( l_http_simple); return true; } @@ -353,7 +348,7 @@ static void s_http_client_headers_read( dap_http_client_t *a_http_client, void * log_it(L_ERROR, "Not defined content-length %u in request", a_http_client->in_content_length); } else { log_it( L_DEBUG, "No data section, execution proc callback" ); - dap_events_socket_remove_from_worker_unsafe(a_http_client->esocket,a_http_client->esocket->worker); + dap_events_socket_set_writable_unsafe(a_http_client->esocket,false); dap_proc_queue_add_callback_inter( l_http_simple->worker->proc_queue_input, s_proc_queue_callback, l_http_simple); } diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index ffaf123cc1..9d064a5a6e 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -139,7 +139,12 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg) } } - +/** + * @brief s_sync_chains_callback + * @param a_thread + * @param a_arg + * @return + */ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) { UNUSED(a_thread); @@ -160,6 +165,7 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) dap_chain_node_addr_t l_node_addr = {}; dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_hdr.net_id); l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); + dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN, l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, l_ch_chain->request_hdr.cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); -- GitLab