diff --git a/dap-sdk/core/include/dap_config.h b/dap-sdk/core/include/dap_config.h index 5eaed02230a6c5e752b2ce2fd74cfc32141dab1d..806bf6eac74972a437c21d19bb5c3c77a577d5b1 100755 --- a/dap-sdk/core/include/dap_config.h +++ b/dap-sdk/core/include/dap_config.h @@ -47,6 +47,12 @@ const char * dap_config_path(); uint16_t dap_config_get_item_uint16(dap_config_t * a_config, const char * a_section_path, const char * a_item_name); uint16_t dap_config_get_item_uint16_default(dap_config_t * a_config, const char * a_section_path, const char * a_item_name, uint16_t a_default); +int16_t dap_config_get_item_int16(dap_config_t * a_config, const char * a_section_path, const char * a_item_name); +int16_t dap_config_get_item_int16_default(dap_config_t * a_config, const char * a_section_path, const char * a_item_name, int16_t a_default); + +uint32_t dap_config_get_item_uint32(dap_config_t * a_config, const char * a_section_path, const char * a_item_name); +uint32_t dap_config_get_item_uint32_default(dap_config_t * a_config, const char * a_section_path, const char * a_item_name, uint32_t a_default); + int32_t dap_config_get_item_int32(dap_config_t * a_config, const char * a_section_path, const char * a_item_name); int32_t dap_config_get_item_int32_default(dap_config_t * a_config, const char * a_section_path, const char * a_item_name, int32_t a_default); diff --git a/dap-sdk/core/src/dap_config.c b/dap-sdk/core/src/dap_config.c index 93d2cf7772fe5b57d9adef74478ebcf6b85b73bb..392c7bf05d3866ba1f23ed569f2ed7753a32052a 100755 --- a/dap-sdk/core/src/dap_config.c +++ b/dap-sdk/core/src/dap_config.c @@ -378,6 +378,18 @@ uint16_t dap_config_get_item_uint16(dap_config_t * a_config, const char * a_sect return (uint16_t) atoi(dap_config_get_item_str(a_config,a_section_path,a_item_name)); } +/** + * @brief dap_config_get_item_int16 + * @param a_config + * @param a_section_path + * @param a_item_name + * @return + */ +int16_t dap_config_get_item_int16(dap_config_t * a_config, const char * a_section_path, const char * a_item_name) +{ + return (int16_t) atoi(dap_config_get_item_str(a_config,a_section_path,a_item_name)); +} + /** * @brief dap_config_get_item_int32_default Getting a configuration item as a int32 @@ -393,6 +405,40 @@ int32_t dap_config_get_item_int32_default(dap_config_t * a_config, const char * return l_str_ret?atoi(l_str_ret):a_default; } +/** + * @brief dap_config_get_item_uint32_default + * @param a_config + * @param a_section_path + * @param a_item_name + * @param a_default + * @return + */ +uint32_t dap_config_get_item_uint32_default(dap_config_t * a_config, const char * a_section_path, const char * a_item_name, uint32_t a_default) +{ + const char * l_str_ret = dap_config_get_item_str(a_config,a_section_path,a_item_name); + uint32_t l_ret = 0; + if (l_str_ret && sscanf(l_str_ret, "%u", &l_ret) == 1) + return l_ret; + else + return a_default; +} + +/** + * @brief dap_config_get_item_uint32 + * @param a_config + * @param a_section_path + * @param a_item_name + * @return + */ +uint32_t dap_config_get_item_uint32(dap_config_t * a_config, const char * a_section_path, const char * a_item_name) +{ + const char * l_str_ret = dap_config_get_item_str(a_config,a_section_path,a_item_name); + uint32_t l_ret = 0; + if (l_str_ret) + sscanf(l_str_ret, "%u", &l_ret); + return l_ret; +} + /** * @brief dap_config_get_item_uint16_default * @param a_config @@ -407,6 +453,20 @@ uint16_t dap_config_get_item_uint16_default(dap_config_t * a_config, const char return l_str_ret? (uint16_t) atoi(l_str_ret):a_default; } +/** + * @brief dap_config_get_item_int16_default + * @param a_config + * @param a_section_path + * @param a_item_name + * @param a_default + * @return + */ +int16_t dap_config_get_item_int16_default(dap_config_t * a_config, const char * a_section_path, const char * a_item_name, int16_t a_default) +{ + const char * l_str_ret = dap_config_get_item_str(a_config,a_section_path,a_item_name); + return l_str_ret? (int16_t) atoi(l_str_ret):a_default; +} + /** * @brief dap_config_get_item_int64_default * @param a_config diff --git a/dap-sdk/net/client/dap_client.c b/dap-sdk/net/client/dap_client.c index caf4db1444099d4c8ac6d40c6720afeaa69752eb..09f32b4940e11c69472a9c3079b35be954bd6b97 100644 --- a/dap-sdk/net/client/dap_client.c +++ b/dap-sdk/net/client/dap_client.c @@ -30,7 +30,7 @@ // FSM realization: thats callback executes after the every stage is done // and have to select the next one stage -void m_stage_fsm_operator(dap_client_t *, void *); +void m_stage_fsm_operator_unsafe(dap_client_t *, void *); /** * @brief dap_client_init @@ -84,7 +84,7 @@ dap_client_t * dap_client_new(dap_events_t * a_events, dap_client_callback_t a_s DAP_CLIENT_PVT(l_client)->events = a_events; DAP_CLIENT_PVT(l_client)->stage_status_callback = a_stage_status_callback; DAP_CLIENT_PVT(l_client)->stage_status_error_callback = a_stage_status_error_callback; - + DAP_CLIENT_PVT(l_client)->worker = dap_events_worker_get_auto(); dap_client_pvt_new(DAP_CLIENT_PVT(l_client) ); @@ -180,13 +180,16 @@ void dap_client_set_auth_cert(dap_client_t * a_client, dap_cert_t *a_cert) /** - * @brief dap_client_reset - * @param a_client + * @brief s_client_reset_unsafe + * @param a_worker + * @param a_arg */ -void dap_client_reset(dap_client_t * a_client) +static void s_client_reset_unsafe(dap_worker_t * a_worker, void * a_arg) { - dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); - + (void) a_arg; + assert(a_arg); + dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT( ((dap_client_t*)a_arg)); + assert(l_client_internal); if(l_client_internal->session_key){ dap_enc_key_delete(l_client_internal->session_key); l_client_internal->session_key = NULL; @@ -205,6 +208,18 @@ void dap_client_reset(dap_client_t * a_client) l_client_internal->stage = STAGE_BEGIN; l_client_internal->stage_status = STAGE_STATUS_DONE ; l_client_internal->stage_target = STAGE_BEGIN ; + +} + +/** + * @brief dap_client_reset + * @param a_client + */ +void dap_client_reset(dap_client_t * a_client) +{ + dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); + assert(l_client_internal); + dap_worker_exec_callback_on(l_client_internal->worker, s_client_reset_unsafe, a_client); } @@ -224,31 +239,26 @@ void dap_client_delete(dap_client_t * a_client) DAP_DELETE(a_client); } -/** - * @brief dap_client_go_stage - * @param a_client - * @param a_stage_end - */ -void dap_client_go_stage(dap_client_t * a_client, dap_client_stage_t a_stage_target, dap_client_callback_t a_stage_end_callback) -{ - // ----- check parameters ----- - if(NULL == a_client) { - log_it(L_ERROR, "dap_client_go_stage, a_client == NULL"); - return; - } - if(NULL == a_stage_end_callback) { - log_it(L_ERROR, "dap_client_go_stage, a_stage_end_callback == NULL"); - return; - } - dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); +struct go_stage_arg{ + dap_client_pvt_t *client_pvt; + dap_client_stage_t stage_target; + dap_client_callback_t stage_end_callback; +}; - assert(l_client_internal); +static void s_go_stage_on_client_worker_unsafe(dap_worker_t * a_worker,void * a_arg) +{ + (void) a_worker; + assert(a_arg); + dap_client_stage_t a_stage_target = ((struct go_stage_arg*) a_arg)->stage_target; + dap_client_callback_t a_stage_end_callback= ((struct go_stage_arg*) a_arg)->stage_end_callback; + dap_client_pvt_t * l_client_pvt = ((struct go_stage_arg*) a_arg)->client_pvt; + DAP_DELETE(a_arg); - l_client_internal->stage_target = a_stage_target; - l_client_internal->stage_target_done_callback = a_stage_end_callback; + l_client_pvt->stage_target = a_stage_target; + l_client_pvt->stage_target_done_callback = a_stage_end_callback; - dap_client_stage_t l_cur_stage = l_client_internal->stage; - dap_client_stage_status_t l_cur_stage_status= l_client_internal->stage_status; + dap_client_stage_t l_cur_stage = l_client_pvt->stage; + dap_client_stage_status_t l_cur_stage_status= l_client_pvt->stage_status; if(a_stage_target != l_cur_stage ){ // Going to stages downstairs @@ -267,25 +277,50 @@ void dap_client_go_stage(dap_client_t * a_client, dap_client_stage_t a_stage_tar log_it(L_DEBUG, "Start transitions chain to %s" ,dap_client_stage_str(a_stage_target) ); int step = (a_stage_target > l_cur_stage)?1:-1; - dap_client_pvt_stage_transaction_begin(l_client_internal, + dap_client_pvt_stage_transaction_begin(l_client_pvt, l_cur_stage+step, - m_stage_fsm_operator + m_stage_fsm_operator_unsafe ); } } }else{ // Same stage log_it(L_ERROR,"We're already on stage %s",dap_client_stage_str(a_stage_target)); } +} +/** + * @brief dap_client_go_stage + * @param a_client + * @param a_stage_end + */ +void dap_client_go_stage(dap_client_t * a_client, dap_client_stage_t a_stage_target, dap_client_callback_t a_stage_end_callback) +{ + // ----- check parameters ----- + if(NULL == a_client) { + log_it(L_ERROR, "dap_client_go_stage, a_client == NULL"); + return; + } + if(NULL == a_stage_end_callback) { + log_it(L_ERROR, "dap_client_go_stage, a_stage_end_callback == NULL"); + return; + } + dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(a_client); + + assert(l_client_pvt); + struct go_stage_arg *l_stage_arg = DAP_NEW(struct go_stage_arg); + l_stage_arg->stage_end_callback = a_stage_end_callback; + l_stage_arg->stage_target = a_stage_target; + l_stage_arg->client_pvt = l_client_pvt; + dap_worker_exec_callback_on(l_client_pvt->worker, s_go_stage_on_client_worker_unsafe, l_stage_arg); } /** - * @brief m_stage_fsm_operator + * @brief m_stage_fsm_operator_unsafe * @param a_client * @param a_arg */ -void m_stage_fsm_operator(dap_client_t * a_client, void * a_arg) +void m_stage_fsm_operator_unsafe(dap_client_t * a_client, void * a_arg) { UNUSED(a_arg); dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); @@ -308,7 +343,7 @@ void m_stage_fsm_operator(dap_client_t * a_client, void * a_arg) ,dap_client_stage_str(l_client_internal->stage), dap_client_stage_str(l_stage_next) ,dap_client_stage_str(l_client_internal->stage_target)); dap_client_pvt_stage_transaction_begin(l_client_internal, - l_stage_next, m_stage_fsm_operator + l_stage_next, m_stage_fsm_operator_unsafe ); } @@ -325,14 +360,14 @@ void m_stage_fsm_operator(dap_client_t * a_client, void * a_arg) * @param a_response_proc * @param a_response_error */ -void dap_client_request_enc(dap_client_t * a_client, const char * a_path, const char * a_suburl,const char* a_query, void * a_request, size_t a_request_size, +void dap_client_request_enc_unsafe(dap_client_t * a_client, const char * a_path, const char * a_suburl,const char* a_query, void * a_request, size_t a_request_size, dap_client_callback_data_size_t a_response_proc, dap_client_callback_int_t a_response_error ) { dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); dap_client_pvt_request_enc(l_client_internal, a_path, a_suburl, a_query,a_request,a_request_size, a_response_proc,a_response_error); } -void dap_client_request(dap_client_t * a_client, const char * a_full_path, void * a_request, size_t a_request_size, +void dap_client_request_unsafe(dap_client_t * a_client, const char * a_full_path, void * a_request, size_t a_request_size, dap_client_callback_data_size_t a_response_proc, dap_client_callback_int_t a_response_error ) { dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index 74fde346b4ace0fdb1f6bf34db7a2e1b170c88e9..7177418ac9f095a06e1b91542112f474404b0410 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -76,54 +76,10 @@ typedef struct dap_http_client_internal { #define PVT(a) (a ? (dap_client_http_pvt_t *) (a)->_inheritor : NULL) static void s_http_connected(dap_events_socket_t * a_esocket); // Connected callback -static void s_http_new(dap_events_socket_t * a_es, void * arg); // New callback (assigned on worker first time) static void s_http_delete(dap_events_socket_t * a_es, void * arg); static void s_http_read(dap_events_socket_t * a_es, void * arg); -static void s_http_write(dap_events_socket_t * a_es, void * arg); static void s_http_error(dap_events_socket_t * a_es, int a_arg); -/** - * @brief s_http_new - * @param a_es - * @param arg - */ -static void s_http_new(dap_events_socket_t * a_es, void * arg) -{ - UNUSED(arg); - log_it(L_DEBUG, "HTTP client connected"); - dap_client_http_pvt_t * l_client_http_internal = PVT(a_es); - if(!l_client_http_internal) { - log_it(L_ERROR, "s_http_new: l_client_http_internal is NULL!"); - return; - } - l_client_http_internal->header_length = 0; - l_client_http_internal->content_length = 0; - l_client_http_internal->response_size = 0; - l_client_http_internal->response_size_max = DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX; - l_client_http_internal->response = (uint8_t*) DAP_NEW_Z_SIZE(uint8_t, DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX); -} - - - -/** - * @brief s_http_stream_write - * @param a_es - * @param arg - */ -static void s_http_write(dap_events_socket_t * a_es, void * arg) -{ - UNUSED(a_es); - UNUSED(arg); -// log_it(L_DEBUG, "s_http_write "); -// dap_client_http_internal_t * l_client_http_internal = DAP_CLIENT_HTTP(a_es); -// if(!l_client_internal) { -// log_it(L_ERROR, "s_http_write: l_client_http_internal is NULL!"); -// return; -// } - - //bool ready_to_write = false; - //dap_events_socket_set_writable(a_es, ready_to_write); -} /** * @brief s_http_stream_read @@ -282,6 +238,7 @@ static void s_http_delete(dap_events_socket_t *a_es, void *arg) /** * @brief dap_client_http_request_custom + * @param a_worker * @param a_uplink_addr * @param a_uplink_port * @param a_method GET or POST @@ -296,17 +253,16 @@ static void s_http_delete(dap_events_socket_t *a_es, void *arg) * @param a_custom * @param a_custom_count */ -void* dap_client_http_request_custom(const char *a_uplink_addr, uint16_t a_uplink_port, const char *a_method, +void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplink_addr, uint16_t a_uplink_port, const char *a_method, const char *a_request_content_type, const char * a_path, void *a_request, size_t a_request_size, char *a_cookie, dap_client_http_callback_data_t a_response_callback, dap_client_http_callback_error_t a_error_callback, void *a_obj, char **a_custom, size_t a_custom_count) { + //log_it(L_DEBUG, "HTTP request on url '%s:%d'", a_uplink_addr, a_uplink_port); static dap_events_socket_callbacks_t l_s_callbacks = { - .new_callback = s_http_new, .connected_callback = s_http_connected, .read_callback = s_http_read, - .write_callback = s_http_write, .error_callback = s_http_error, .delete_callback = s_http_delete }; @@ -317,7 +273,17 @@ void* dap_client_http_request_custom(const char *a_uplink_addr, uint16_t a_uplin log_it(L_ERROR, "Error %d with socket create", errno); return NULL; } - fcntl( l_socket, F_SETFL, O_NONBLOCK); // Make it non-block + // Get socket flags + int l_socket_flags = fcntl(l_socket, F_GETFL); + if (l_socket_flags == -1){ + log_it(L_ERROR, "Error %d can't get socket flags", errno); + return NULL; + } + // Make it non-block + if (fcntl( l_socket, F_SETFL,l_socket_flags| O_NONBLOCK) == -1){ + log_it(L_ERROR, "Error %d can't get socket flags", errno); + return NULL; + } // set socket param int buffsize = DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX; #ifdef _WIN32 @@ -347,32 +313,41 @@ void* dap_client_http_request_custom(const char *a_uplink_addr, uint16_t a_uplin l_client_http_internal->request_custom_headers = a_custom; l_client_http_internal->request_custom_headers_count = a_custom_count; + l_client_http_internal->response_size_max = DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX; + l_client_http_internal->response = (uint8_t*) DAP_NEW_Z_SIZE(uint8_t, DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX); + + // get struct in_addr from ip_str inet_pton(AF_INET, a_uplink_addr, &(l_ev_socket->remote_addr.sin_addr)); //Resolve addr if if(!l_ev_socket->remote_addr.sin_addr.s_addr) { if(dap_net_resolve_host(a_uplink_addr, AF_INET, (struct sockaddr*) &l_ev_socket->remote_addr.sin_addr) < 0) { log_it(L_ERROR, "Wrong remote address '%s:%u'", a_uplink_addr, a_uplink_port); - dap_events_socket_remove_and_delete_unsafe( l_ev_socket, true); + dap_events_socket_delete_unsafe( l_ev_socket, true); return NULL; } } - l_client_http_internal->worker = dap_worker_add_events_socket_auto(l_ev_socket); // connect l_ev_socket->remote_addr.sin_family = AF_INET; l_ev_socket->remote_addr.sin_port = htons(a_uplink_port); + l_ev_socket->flags |= DAP_SOCK_CONNECTING; int l_err = connect(l_socket, (struct sockaddr *) &l_ev_socket->remote_addr, sizeof(struct sockaddr_in)); if (l_err == 0){ log_it(L_DEBUG, "Connected momentaly with %s:%u!", a_uplink_addr, a_uplink_port); + l_client_http_internal->worker = a_worker?a_worker: dap_events_worker_get_auto(); + dap_worker_add_events_socket(l_ev_socket,l_client_http_internal->worker); return l_client_http_internal; - }else if( l_err == EINPROGRESS){ + }else if( errno == EINPROGRESS && l_err == -1){ log_it(L_DEBUG, "Connecting to %s:%u", a_uplink_addr, a_uplink_port); + l_client_http_internal->worker = a_worker?a_worker: dap_events_worker_get_auto(); + dap_worker_add_events_socket(l_ev_socket,l_client_http_internal->worker); return l_client_http_internal; }else{ char l_errbuf[128]; l_errbuf[0] = '\0'; strerror_r(l_err, l_errbuf, sizeof (l_errbuf)); log_it(L_ERROR, "Connecting error: \"%s\" (code %d)", l_errbuf, l_err); + dap_events_socket_delete_unsafe( l_ev_socket, true); return NULL; } } @@ -454,6 +429,7 @@ static void s_http_connected(dap_events_socket_t * a_esocket) /** * @brief dap_client_http_request + * @param a_worker * @param a_uplink_addr * @param a_uplink_port * @param a_method GET or POST @@ -467,7 +443,7 @@ static void s_http_connected(dap_events_socket_t * a_esocket) * @param a_obj * @param a_custom */ -void* dap_client_http_request(const char *a_uplink_addr, uint16_t a_uplink_port, const char * a_method, +void* dap_client_http_request(dap_worker_t * a_worker,const char *a_uplink_addr, uint16_t a_uplink_port, const char * a_method, const char* a_request_content_type, const char * a_path, void *a_request, size_t a_request_size, char * a_cookie, dap_client_http_callback_data_t a_response_callback, dap_client_http_callback_error_t a_error_callback, void *a_obj, void * a_custom) @@ -479,7 +455,7 @@ void* dap_client_http_request(const char *a_uplink_addr, uint16_t a_uplink_port, if(a_custom) a_custom_count = 1; - return dap_client_http_request_custom(a_uplink_addr, a_uplink_port, a_method, a_request_content_type, a_path, + return dap_client_http_request_custom(a_worker, a_uplink_addr, a_uplink_port, a_method, a_request_content_type, a_path, a_request, a_request_size, a_cookie, a_response_callback, a_error_callback, a_obj, a_custom_new, a_custom_count); } diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 67ea0a664ce25a198856156587105542f1fbd861..3e5b2d31237ddcec3102b90af20f3f0a99ebc6b1 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -103,6 +103,7 @@ static void s_stream_es_callback_delete(dap_events_socket_t * a_es, void * arg); static void s_stream_es_callback_read(dap_events_socket_t * a_es, void * arg); static void s_stream_es_callback_write(dap_events_socket_t * a_es, void * arg); static void s_stream_es_callback_error(dap_events_socket_t * a_es, int a_arg); +static void s_stream_es_worker_assign(dap_events_socket_t * a_es, dap_worker_t * a_worker); /** * @brief dap_client_internal_init @@ -164,6 +165,7 @@ int dap_client_pvt_disconnect_all_n_wait(dap_client_pvt_t *a_client_pvt) pthread_mutex_lock(&a_client_pvt->disconnected_mutex); dap_client_go_stage(a_client_pvt->client, STAGE_BEGIN, s_client_pvt_disconnected ); pthread_cond_wait(&a_client_pvt->disconnected_cond, &a_client_pvt->disconnected_mutex); + pthread_mutex_unlock(&a_client_pvt->disconnected_mutex); return 0; } @@ -228,6 +230,9 @@ static void s_stream_connected(dap_client_pvt_t * a_client_pvt) */ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) { + dap_worker_t * l_worker= a_client_pvt->worker; + assert(l_worker); + assert(l_worker->_inheritor); //bool l_is_unref = false; dap_client_stage_status_t l_stage_status = a_client_pvt->stage_status; dap_client_stage_t l_stage = a_client_pvt->stage; @@ -316,8 +321,8 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) } #else int buffsize = 65536*4; - setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (const void *) &buffsize, sizeof(int)); - setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (const void *) &buffsize, sizeof(int)); + setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, ( void *) &buffsize, sizeof(int)); + setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, ( void *) &buffsize, sizeof(int)); #endif // Wrap socket and setup callbacks @@ -332,13 +337,6 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) a_client_pvt->stream_socket, &l_s_callbacks); a_client_pvt->stream_es->flags &= DAP_SOCK_CONNECTING ; // To catch non-blocking error when connecting we should ar WRITE flag - dap_worker_t * l_worker = dap_events_worker_get_auto(); - assert(l_worker); - assert(l_worker->_inheritor); - a_client_pvt->stream_worker = DAP_STREAM_WORKER(l_worker); - // add to dap_worker - dap_events_socket_assign_on_worker_mt(a_client_pvt->stream_es, l_worker); - a_client_pvt->stream_es->_inheritor = a_client_pvt; a_client_pvt->stream = dap_stream_new_es_client(a_client_pvt->stream_es); assert(a_client_pvt->stream); @@ -347,6 +345,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) // new added, whether it is necessary? a_client_pvt->stream->session->key = a_client_pvt->stream_key; + a_client_pvt->stream_worker = DAP_STREAM_WORKER(l_worker); a_client_pvt->stream->stream_worker = a_client_pvt->stream_worker; @@ -357,7 +356,6 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) if(inet_pton(AF_INET, a_client_pvt->uplink_addr, &(a_client_pvt->stream_es->remote_addr.sin_addr)) < 0) { log_it(L_ERROR, "Wrong remote address '%s:%u'", a_client_pvt->uplink_addr, a_client_pvt->uplink_port); //close(a_client_pvt->stream_socket); - dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es); //a_client_pvt->stream_socket = 0; a_client_pvt->stage_status = STAGE_STATUS_ERROR; } @@ -369,8 +367,10 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) if((l_err = connect(a_client_pvt->stream_socket, (struct sockaddr *) &a_client_pvt->stream_es->remote_addr, sizeof(struct sockaddr_in))) ==0) { log_it(L_DEBUG, "Connected momentaly with %s:%u", a_client_pvt->uplink_addr, a_client_pvt->uplink_port); + // add to dap_worker + dap_worker_add_events_socket_unsafe( a_client_pvt->stream_es, l_worker); } - else if (l_err != EINPROGRESS){ + else if (l_err != EINPROGRESS && l_err != -1){ char l_errbuf[128]; l_errbuf[0]='\0'; if (l_err) @@ -379,9 +379,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) strncpy(l_errbuf,"Unknown Error",sizeof(l_errbuf)-1); log_it(L_ERROR, "Remote address can't connect (%s:%u) with sock_id %d: \"%s\" (code %d)", a_client_pvt->uplink_addr, a_client_pvt->uplink_port, l_errbuf, l_err); - a_client_pvt->stream_es->kill_signal = true; - a_client_pvt->stream_es->flags &= DAP_SOCK_SIGNAL_CLOSE; - //close(a_client_pvt->stream_socket); + close(a_client_pvt->stream_socket); a_client_pvt->stream_socket = 0; a_client_pvt->stage_status = STAGE_STATUS_ERROR; a_client_pvt->last_error = ERROR_STREAM_CONNECT ; @@ -389,6 +387,8 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) s_stage_status_after(a_client_pvt); }else{ log_it(L_INFO,"Connecting to remote %s:%s",a_client_pvt->uplink_addr, a_client_pvt->uplink_port); + // add to dap_worker + dap_worker_add_events_socket_unsafe(a_client_pvt->stream_es, l_worker); } } } @@ -414,7 +414,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) const char *l_add_str = ""; - dap_events_socket_write_f_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es, "GET /%s HTTP/1.1\r\n" + dap_events_socket_write_f_unsafe( a_client_pvt->stream_es, "GET /%s HTTP/1.1\r\n" "Host: %s:%d%s\r\n" "\r\n", l_full_path, a_client_pvt->uplink_addr, a_client_pvt->uplink_port, l_add_str); @@ -451,52 +451,50 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) case STAGE_STATUS_ERROR: { // limit the number of attempts int MAX_ATTEMPTS = 3; - a_client_pvt->connect_attempt++; - bool l_is_last_attempt = a_client_pvt->connect_attempt > MAX_ATTEMPTS ? true : false; + a_client_pvt->stage_errors++; + bool l_is_last_attempt = a_client_pvt->stage_errors > MAX_ATTEMPTS ? true : false; log_it(L_ERROR, "Error state, doing callback if present"); - if(a_client_pvt->stage_status_error_callback) { - //dap_client_pvt_ref(a_client_pvt); - if(a_client_pvt == a_client_pvt->client->_internal) - a_client_pvt->stage_status_error_callback(a_client_pvt->client, (void*) l_is_last_attempt); - else { - log_it(L_ERROR, "client_pvt->client=%x corrupted", a_client_pvt->client->_internal); - } - //dap_client_pvt_unref(a_client_pvt); - // Expecting that its one-shot callback - //a_client_internal->stage_status_error_callback = NULL; + if(a_client_pvt->stage_status_error_callback) + a_client_pvt->stage_status_error_callback(a_client_pvt->client, (void*) l_is_last_attempt); + + switch (a_client_pvt->stage) { + case STAGE_UNDEFINED: + log_it(L_ERROR,"!!Stage undefined!!! in dap_client_pvt::s_stage_status_after()"); + break; + case STAGE_BEGIN: + case STAGE_ENC_INIT: + case STAGE_STREAM_CTL: + case STAGE_STREAM_SESSION: + case STAGE_STREAM_CONNECTED: + case STAGE_STREAM_STREAMING: + case STAGE_STREAM_ABORT: + if(a_client_pvt->stage_target == STAGE_STREAM_ABORT) { + a_client_pvt->stage = STAGE_STREAM_ABORT; + a_client_pvt->stage_status = STAGE_STATUS_ABORTING; + // unref pvt + //l_is_unref = true; + } else if (a_client_pvt->last_error != ERROR_NETWORK_CONNECTION_TIMEOUT) { + if(!l_is_last_attempt) { + a_client_pvt->stage = STAGE_ENC_INIT; + // Trying the step again + a_client_pvt->stage_status = STAGE_STATUS_IN_PROGRESS; + log_it(L_INFO, "Connection attempt %d in 0.3 seconds", a_client_pvt->stage_errors); + + // small delay before next request + dap_timerfd_start_on_worker( l_worker, 300, + (dap_timerfd_callback_t) s_stage_status_after,a_client_pvt ); + } + else{ + log_it(L_INFO, "Too many connection attempts. Tries are over."); + a_client_pvt->stage_status = STAGE_STATUS_DONE; + } + } + break; } - if(a_client_pvt->stage_target == STAGE_STREAM_ABORT) { - a_client_pvt->stage = STAGE_STREAM_ABORT; - a_client_pvt->stage_status = STAGE_STATUS_ABORTING; - // unref pvt - //l_is_unref = true; - } else if (a_client_pvt->last_error != ERROR_NETWORK_CONNECTION_TIMEOUT) { - if(!l_is_last_attempt) { - // small delay before next request - log_it(L_INFO, "Connection attempt %d", a_client_pvt->connect_attempt); - #ifdef _WIN32 - Sleep(300);// 0.3 sec - #else - usleep(300000);// 0.3 sec - #endif - a_client_pvt->stage = STAGE_ENC_INIT; - // Trying the step again - a_client_pvt->stage_status = STAGE_STATUS_IN_PROGRESS; - - //dap_client_pvt_ref(a_client_pvt); - s_stage_status_after(a_client_pvt); - } - else{ - log_it(L_INFO, "Too many connection attempts. Tries are over."); - a_client_pvt->stage_status = STAGE_STATUS_DONE; - // unref pvt - //l_is_unref = true; - } - } } - break; + break; case STAGE_STATUS_DONE: { log_it(L_INFO, "Stage status %s is done", dap_client_stage_str(a_client_pvt->stage)); // go to next stage @@ -563,25 +561,10 @@ int dap_client_pvt_request(dap_client_pvt_t * a_client_internal, const char * a_ a_client_internal->request_error_callback = a_response_error; a_client_internal->is_encrypted = false; -// size_t l_url_size_max = 0; -// char *l_url = NULL; -// if(a_path) { -// l_url_size_max = dap_strlen(a_client_internal->uplink_addr) + strlen(a_path) + 15; -// l_url = DAP_NEW_Z_SIZE(char, l_url_size_max); -// -// snprintf(l_url, l_url_size_max, "http://%s:%u/%s", a_client_internal->uplink_addr, -// a_client_internal->uplink_port, a_path); -// } else { -// l_url_size_max = strlen(a_client_internal->uplink_addr) + 15; -// l_url = DAP_NEW_Z_SIZE(char, l_url_size_max); -// snprintf(l_url, l_url_size_max, "http://%s:%u", a_client_internal->uplink_addr, a_client_internal->uplink_port); -// } - void *l_ret = dap_client_http_request(a_client_internal->uplink_addr,a_client_internal->uplink_port, + void *l_ret = dap_client_http_request(a_client_internal->worker, a_client_internal->uplink_addr,a_client_internal->uplink_port, a_request ? "POST" : "GET", "text/text", a_path, a_request, a_request_size, NULL, s_request_response, s_request_error, a_client_internal, NULL); -// a_client_internal->curl = dap_http_client_simple_request(l_url, a_request ? "POST" : "GET", "text/text", a_request, -// a_request_size, NULL, m_request_response, m_request_error, &a_client_internal->curl_sockfd, a_client_internal, NULL); -// DAP_DELETE(l_url); + if(l_ret) return 0; return -1; @@ -702,7 +685,7 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char a_custom_new[1] = "SessionCloseAfterRequest: true"; a_custom_count++; } - dap_client_http_request_custom(a_client_internal->uplink_addr, a_client_internal->uplink_port, a_request ? "POST" : "GET", "text/text", + dap_client_http_request_custom(a_client_internal->worker, a_client_internal->uplink_addr, a_client_internal->uplink_port, a_request ? "POST" : "GET", "text/text", l_path, l_request_enc, l_request_enc_size, NULL, s_request_response, s_request_error, a_client_internal, a_custom_new, a_custom_count); // dap_http_client_simple_request_custom(l_url_full, a_request ? "POST" : "GET", "text/text", @@ -850,39 +833,30 @@ static void s_enc_init_response(dap_client_t * a_client, void * a_response, size l_client_pvt->session_key_id, strlen(l_client_pvt->session_key_id), 0); DAP_DELETE(l_bob_message); - pthread_mutex_lock(&l_client_pvt->disconnected_mutex); if(l_client_pvt->stage == STAGE_ENC_INIT) { // We are in proper stage l_client_pvt->stage_status = STAGE_STATUS_DONE; - pthread_mutex_unlock(&l_client_pvt->disconnected_mutex); s_stage_status_after(l_client_pvt); } else { log_it(L_WARNING, "ENC: initialized encryption but current stage is %s (%s)", dap_client_get_stage_str(a_client), dap_client_get_stage_status_str(a_client)); - pthread_mutex_unlock(&l_client_pvt->disconnected_mutex); } } else { log_it(L_ERROR, "ENC: Wrong response (size %u data '%s')", a_response_size, (char* ) a_response); - pthread_mutex_lock(&l_client_pvt->disconnected_mutex); l_client_pvt->last_error = ERROR_ENC_NO_KEY; l_client_pvt->stage_status = STAGE_STATUS_ERROR; - pthread_mutex_unlock(&l_client_pvt->disconnected_mutex); s_stage_status_after(l_client_pvt); } DAP_DELETE(l_session_id_b64); DAP_DELETE(l_bob_message_b64); } else if(a_response_size > 1) { log_it(L_ERROR, "ENC: Wrong response (size %u data '%s')", a_response_size, (char* ) a_response); - pthread_mutex_lock(&l_client_pvt->disconnected_mutex); l_client_pvt->last_error = ERROR_ENC_NO_KEY; l_client_pvt->stage_status = STAGE_STATUS_ERROR; - pthread_mutex_unlock(&l_client_pvt->disconnected_mutex); s_stage_status_after(l_client_pvt); } else { log_it(L_ERROR, "ENC: Wrong response (size %u)", a_response_size); - pthread_mutex_lock(&l_client_pvt->disconnected_mutex); l_client_pvt->last_error = ERROR_ENC_NO_KEY; l_client_pvt->stage_status = STAGE_STATUS_ERROR; - pthread_mutex_unlock(&l_client_pvt->disconnected_mutex); s_stage_status_after(l_client_pvt); } } @@ -901,14 +875,12 @@ static void s_enc_init_error(dap_client_t * a_client, int a_err_code) } //dap_client_internal_t * l_client_internal = dap_CLIENT_INTERNAL(a_client); log_it(L_ERROR, "ENC: Can't init ecnryption session, err code %d", a_err_code); - pthread_mutex_lock(&l_client_pvt->disconnected_mutex); if (a_err_code == ETIMEDOUT) { l_client_pvt->last_error = ERROR_NETWORK_CONNECTION_TIMEOUT; } else { l_client_pvt->last_error = ERROR_NETWORK_CONNECTION_REFUSE; } l_client_pvt->stage_status = STAGE_STATUS_ERROR; - pthread_mutex_unlock(&l_client_pvt->disconnected_mutex); s_stage_status_after(l_client_pvt); } diff --git a/dap-sdk/net/client/include/dap_client.h b/dap-sdk/net/client/include/dap_client.h index b9a9c7f25cfec6000fd3ddfd426c75be0b951e89..aa0da2b35f2f8a91d66ded9b9c2dcbec06c530fa 100644 --- a/dap-sdk/net/client/include/dap_client.h +++ b/dap-sdk/net/client/include/dap_client.h @@ -113,10 +113,10 @@ void dap_client_go_stage(dap_client_t * a_client, dap_client_stage_t a_stage_end void dap_client_reset(dap_client_t * a_client); -void dap_client_request_enc(dap_client_t * a_client, const char * a_path,const char * a_suburl,const char* a_query, void * a_request, size_t a_request_size, +void dap_client_request_enc_unsafe(dap_client_t * a_client, const char * a_path,const char * a_suburl,const char* a_query, void * a_request, size_t a_request_size, dap_client_callback_data_size_t a_response_proc, dap_client_callback_int_t a_response_error); -void dap_client_request(dap_client_t * a_client, const char * a_full_path, void * a_request, size_t a_request_size, +void dap_client_request_unsafe(dap_client_t * a_client, const char * a_full_path, void * a_request, size_t a_request_size, dap_client_callback_data_size_t a_response_proc, dap_client_callback_int_t a_response_error); //int dap_client_disconnect(dap_client_t * a_client); diff --git a/dap-sdk/net/client/include/dap_client_http.h b/dap-sdk/net/client/include/dap_client_http.h index ec17cdbae2c758b2f40896b1ab2b0ac2517ea314..f158cd04c4108a8a4e15691ab2f4d7e8f400bbd8 100644 --- a/dap-sdk/net/client/include/dap_client_http.h +++ b/dap-sdk/net/client/include/dap_client_http.h @@ -23,6 +23,7 @@ #include <stdint.h> #include <stddef.h> +#include "dap_worker.h" #ifdef __cplusplus extern "C" { #endif @@ -30,12 +31,12 @@ extern "C" { typedef void (*dap_client_http_callback_error_t)(int, void *); // Callback for specific http client operations typedef void (*dap_client_http_callback_data_t)(void *, size_t, void *); // Callback for specific http client operations -void* dap_client_http_request_custom(const char *a_uplink_addr, uint16_t a_uplink_port, const char *a_method, +void* dap_client_http_request_custom(dap_worker_t * a_worker, const char *a_uplink_addr, uint16_t a_uplink_port, const char *a_method, const char *a_request_content_type, const char * a_path, void *a_request, size_t a_request_size, char *a_cookie, dap_client_http_callback_data_t a_response_callback, dap_client_http_callback_error_t a_error_callback, void *a_obj, char **a_custom, size_t a_custom_count); -void* dap_client_http_request(const char *a_uplink_addr, uint16_t a_uplink_port, const char * a_method, +void* dap_client_http_request(dap_worker_t * a_worker,const char *a_uplink_addr, uint16_t a_uplink_port, const char * a_method, const char* a_request_content_type, const char * a_path, void *a_request, size_t a_request_size, char * a_cookie, dap_client_http_callback_data_t a_response_callback, dap_client_http_callback_error_t a_error_callback, void *a_obj, void * a_custom); diff --git a/dap-sdk/net/client/include/dap_client_pvt.h b/dap-sdk/net/client/include/dap_client_pvt.h index 8ff519b1dbd6a248e1dfb10ee26175a10c9a6f0d..b7c8db758d549208706894ef53bd9c722059d2b1 100644 --- a/dap-sdk/net/client/include/dap_client_pvt.h +++ b/dap-sdk/net/client/include/dap_client_pvt.h @@ -42,6 +42,7 @@ typedef struct dap_client_internal int stream_socket; dap_stream_t* stream; dap_stream_worker_t* stream_worker; + dap_worker_t * worker; dap_events_t * events; dap_enc_key_t * session_key_open; // Open assymetric keys exchange @@ -63,6 +64,8 @@ typedef struct dap_client_internal dap_client_stage_t stage_target; dap_client_callback_t stage_target_done_callback; + dap_client_callback_t stage_target_error_callback; + dap_client_stage_t stage; dap_client_stage_status_t stage_status; dap_client_error_t last_error; @@ -70,7 +73,7 @@ typedef struct dap_client_internal dap_client_callback_t stage_status_done_callback; dap_client_callback_t stage_status_error_callback; - int connect_attempt; + int stage_errors; bool is_encrypted; bool encrypted_headers; diff --git a/dap-sdk/net/core/dap_events.c b/dap-sdk/net/core/dap_events.c index 7541a3c7badebecbbf5aab249113f53f0693d716..6913a18d95dd7eb7e8501ae7a5ee992b6cf103f2 100644 --- a/dap-sdk/net/core/dap_events.c +++ b/dap-sdk/net/core/dap_events.c @@ -206,6 +206,8 @@ dap_events_t * dap_events_new( ) pthread_rwlock_init( &ret->sockets_rwlock, NULL ); if ( s_events_default == NULL) s_events_default = ret; + pthread_key_create( &ret->pth_key_worker, NULL); + return ret; } diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 1074b9f7096bf22b6caa2b3f59728f263a985c7c..04ecef1b1b43ab46692c8932cccb51a34ae24256 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -39,11 +39,11 @@ #define LOG_TAG "dap_worker" -static time_t s_connection_timeout = 20000; +static time_t s_connection_timeout = 60; static void s_socket_all_check_activity( void * a_arg); -static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg); +static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg); static void s_queue_delete_es_callback( dap_events_socket_t * a_es, void * a_arg); static void s_queue_es_reassign_callback( dap_events_socket_t * a_es, void * a_arg); static void s_queue_callback_callback( dap_events_socket_t * a_es, void * a_arg); @@ -96,7 +96,7 @@ void *dap_worker_thread(void *arg) #endif - l_worker->queue_es_new = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_new_es_callback); + l_worker->queue_es_new = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_add_es_callback); l_worker->queue_es_delete = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_delete_es_callback); l_worker->queue_es_io = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_io_callback); l_worker->queue_es_reassign = dap_events_socket_create_type_queue_ptr_unsafe( l_worker, s_queue_es_reassign_callback ); @@ -104,7 +104,7 @@ void *dap_worker_thread(void *arg) l_worker->event_exit = dap_events_socket_create_type_event_unsafe(l_worker, s_event_exit_callback ); l_worker->timer_check_activity = dap_timerfd_start_on_worker( l_worker,s_connection_timeout / 2,s_socket_all_check_activity,l_worker); - + pthread_setspecific(l_worker->events->pth_key_worker, l_worker); pthread_cond_broadcast(&l_worker->started_cond); bool s_loop_is_active = true; while(s_loop_is_active) { @@ -312,6 +312,9 @@ void *dap_worker_thread(void *arg) } } + //if (l_flag_write) + // log_it(L_DEBUG,"Alarmed write flag for remote %s", l_cur->remote_addr_str[0]?l_cur->remote_addr_str:"(null)"); + // If its outgoing connection if ( l_flag_write && ! l_cur->server && l_cur->flags& DAP_SOCK_CONNECTING && (l_cur->type == DESCRIPTOR_TYPE_SOCKET || l_cur->type == DESCRIPTOR_TYPE_SOCKET_UDP )){ @@ -334,6 +337,7 @@ void *dap_worker_thread(void *arg) l_cur->flags ^= DAP_SOCK_CONNECTING; if (l_cur->callbacks.connected_callback) l_cur->callbacks.connected_callback(l_cur); + dap_events_socket_worker_poll_update_unsafe(l_cur); } } @@ -472,13 +476,13 @@ void *dap_worker_thread(void *arg) * @param a_es * @param a_arg */ -static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) +static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg) { dap_events_socket_t * l_es_new =(dap_events_socket_t *) a_arg; dap_worker_t * w = a_es->worker; //log_it(L_DEBUG, "Received event socket %p to add on worker", l_es_new); if(dap_events_socket_check_unsafe( w, l_es_new)){ - //log_it(L_ERROR, "Already assigned %d (%p), you're doing smth wrong", l_es_new->socket, l_es_new); + log_it(L_WARNING, "Already assigned %d (%p), you're doing smth wrong", l_es_new->socket, l_es_new); // Socket already present in worker, it's OK return; } @@ -495,6 +499,7 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) } l_es_new->worker = w; + l_es_new->last_time_active = time(NULL); // We need to differ new and reassigned esockets. If its new - is_initialized is false if ( ! l_es_new->is_initalized ){ if (l_es_new->callbacks.new_callback) @@ -512,7 +517,7 @@ static void s_queue_new_es_callback( dap_events_socket_t * a_es, void * a_arg) l_es_new->me = l_es_new; HASH_ADD(hh_worker, w->esockets, me, sizeof(void *), l_es_new ); w->event_sockets_count++; - //log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->id); + log_it(L_DEBUG, "Added socket %d on worker %u", l_es_new->socket, w->id); if (l_es_new->callbacks.worker_assign_callback) l_es_new->callbacks.worker_assign_callback(l_es_new, w); @@ -644,8 +649,9 @@ static void s_socket_all_check_activity( void * a_arg) HASH_ITER(hh_worker, l_worker->esockets, l_es, tmp ) { if ( l_es->type == DESCRIPTOR_TYPE_SOCKET ){ - if ( !l_es->kill_signal && l_curtime >= (time_t)l_es->last_time_active + s_connection_timeout && !l_es->no_close ) { - log_it( L_INFO, "Socket %u timeout, closing...", l_es->socket ); + if ( !l_es->kill_signal && + ( l_curtime >= (l_es->last_time_active + s_connection_timeout) ) && !l_es->no_close ) { + log_it( L_INFO, "Socket %u timeout (diff %u ), closing...", l_es->socket, l_curtime - (time_t)l_es->last_time_active - s_connection_timeout ); if (l_es->callbacks.error_callback) { l_es->callbacks.error_callback(l_es, ETIMEDOUT); } @@ -700,9 +706,10 @@ int dap_worker_add_events_socket_unsafe( dap_events_socket_t * a_esocket, dap_wo a_worker->poll[a_worker->poll_count].events = a_esocket->poll_base_flags; if( a_esocket->flags & DAP_SOCK_READY_TO_READ ) a_worker->poll[a_worker->poll_count].events |= POLLIN; - if( a_esocket->flags & DAP_SOCK_READY_TO_WRITE || a_esocket->flags & DAP_SOCK_CONNECTING ) + if( (a_esocket->flags & DAP_SOCK_READY_TO_WRITE) || (a_esocket->flags & DAP_SOCK_CONNECTING) ) a_worker->poll[a_worker->poll_count].events |= POLLOUT; + a_worker->poll_esocket[a_worker->poll_count] = a_esocket; a_worker->poll_count++; return 0; diff --git a/dap-sdk/net/core/include/dap_events.h b/dap-sdk/net/core/include/dap_events.h index 0f119ee03103d4c958eb9f791bc68ca8bfcbe545..c89642403aa7fdcb1a5f724bdef16067a808c774 100644 --- a/dap-sdk/net/core/include/dap_events.h +++ b/dap-sdk/net/core/include/dap_events.h @@ -39,6 +39,7 @@ typedef struct dap_thread { typedef struct dap_events { dap_events_socket_t *sockets; // Hashmap of event sockets pthread_rwlock_t sockets_rwlock; + pthread_key_t pth_key_worker; void *_inheritor; // Pointer to the internal data, HTTP for example dap_thread_t proc_thread; } dap_events_t; diff --git a/dap-sdk/net/server/json_rpc/src/dap_json_rpc_request.c b/dap-sdk/net/server/json_rpc/src/dap_json_rpc_request.c index 1754423f852b97fe8d3f8ad8e5f19533dd43530c..1d0028ae42fe56f61c6fb518e3cf5d8d5ef02063 100644 --- a/dap-sdk/net/server/json_rpc/src/dap_json_rpc_request.c +++ b/dap-sdk/net/server/json_rpc/src/dap_json_rpc_request.c @@ -80,6 +80,6 @@ void dap_json_rpc_request_send(dap_json_rpc_request_t *a_request, dap_json_rpc_r a_request->id = l_id_response; char *l_str = dap_json_rpc_request_to_json(a_request); log_it(L_NOTICE, "Sending request in address: %s", a_uplink_addr); - dap_client_http_request(a_uplink_addr, a_uplink_port, "POST", "application/json", s_url_service, l_str, strlen(l_str), + dap_client_http_request(NULL,a_uplink_addr, a_uplink_port, "POST", "application/json", s_url_service, l_str, strlen(l_str), NULL, dap_json_rpc_response_accepted, func_error, NULL, NULL); } diff --git a/modules/net/dap_chain_node_client.c b/modules/net/dap_chain_node_client.c index 823c4f91a806468ac79039f4fed2c6abf2323fd0..2037b86213309d192353716d69802f4a0edbabbd 100644 --- a/modules/net/dap_chain_node_client.c +++ b/modules/net/dap_chain_node_client.c @@ -152,6 +152,7 @@ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) pthread_mutex_unlock(&l_node_client->wait_mutex); //dap_client_go_stage( a_client , STAGE_STREAM_STREAMING, s_stage_end_callback ); } + //printf("* tage_status_error_callback client=%x data=%x\n", a_client, a_arg); } @@ -458,11 +459,6 @@ dap_chain_node_client_t* dap_chain_client_connect(dap_chain_node_info_t *a_node_ //dap_client_pvt_ref(DAP_CLIENT_PVT(l_node_client->client)); // Handshake & connect dap_client_go_stage(l_node_client->client, a_stage_target, s_stage_connected_callback); - dap_client_pvt_t *l_client_internal = DAP_CLIENT_PVT(l_node_client->client); - if(!l_client_internal || l_client_internal->stage_status == STAGE_STATUS_ERROR){ - dap_chain_node_client_close(l_node_client); - return NULL; - } return l_node_client; } @@ -526,33 +522,25 @@ int dap_chain_node_client_send_ch_pkt(dap_chain_node_client_t *a_client, uint8_t int dap_chain_node_client_wait(dap_chain_node_client_t *a_client, int a_waited_state, int a_timeout_ms) { int ret = -1; - if(!a_client) + if(!a_client){ + log_it(L_ERROR, "Can't wait for status for (null) object"); return -3; + } pthread_mutex_lock(&a_client->wait_mutex); // have waited if(a_client->state == a_waited_state) { + log_it(L_INFO, "We're already in state %s",dap_chain_node_client_state_to_str(a_client->state)); pthread_mutex_unlock(&a_client->wait_mutex); return 0; } - if(a_client->state == NODE_CLIENT_STATE_DISCONNECTED) { - pthread_mutex_unlock(&a_client->wait_mutex); - return -2; - } #ifndef _WIN32 // prepare for signal waiting - struct timespec to; - clock_gettime( CLOCK_MONOTONIC, &to); - int64_t nsec_new = to.tv_nsec + a_timeout_ms * 1000000ll; - // if the new number of nanoseconds is more than a second - - if(nsec_new > (long) 1e9) { - to.tv_sec += nsec_new / (long) 1e9; - to.tv_nsec = nsec_new % (long) 1e9; - } - else - to.tv_nsec = (long) nsec_new; + struct timespec l_cond_timeout; + uint32_t l_timeout_s = dap_config_get_item_uint32_default(g_config,"chain_net","status_wait_timeout",10); + clock_gettime( CLOCK_MONOTONIC, &l_cond_timeout); + l_cond_timeout.tv_sec += l_timeout_s; #else pthread_mutex_unlock( &a_client->wait_mutex ); #endif @@ -561,20 +549,26 @@ int dap_chain_node_client_wait(dap_chain_node_client_t *a_client, int a_waited_s do { #ifndef _WIN32 - int wait = pthread_cond_timedwait(&a_client->wait_cond, &a_client->wait_mutex, &to); - if(wait == 0 && ( + int l_ret_wait = pthread_cond_timedwait(&a_client->wait_cond, &a_client->wait_mutex, &l_cond_timeout); + if(l_ret_wait == 0 && ( a_client->state == a_waited_state || (a_client->state == NODE_CLIENT_STATE_ERROR || a_client->state == NODE_CLIENT_STATE_DISCONNECTED)) ) { ret = a_client->state == a_waited_state ? 0 : -2; break; } - else if(wait == ETIMEDOUT) { // 110 260 + else if(l_ret_wait == ETIMEDOUT) { // 110 260 + log_it(L_NOTICE,"Wait for status is stopped by timeout"); ret = -1; break; + }else if (l_ret_wait != 0 ){ + char l_errbuf[128]; + l_errbuf[0] = '\0'; + strerror_r(l_ret_wait,l_errbuf,sizeof (l_errbuf)); + log_it(L_ERROR, "Pthread condition timed wait returned \"%s\"(code %d)", l_errbuf, l_ret_wait); } #else - int wait = WaitForSingleObject( a_client->wait_cond, (uint32_t)a_timeout_ms ); + int wait = WaitForSingleObject( a_client->wait_cond, (uint32_t)a_timeout_s*1000 ); pthread_mutex_lock( &a_client->wait_mutex ); if ( wait == WAIT_OBJECT_0 && ( diff --git a/modules/net/include/dap_chain_node_client.h b/modules/net/include/dap_chain_node_client.h index 788403989d55ea752d633b7b06356031006027cc..265bb37a72536c47c5d924b6f6842d6c241070a1 100644 --- a/modules/net/include/dap_chain_node_client.h +++ b/modules/net/include/dap_chain_node_client.h @@ -115,4 +115,22 @@ int dap_chain_node_client_set_callbacks(dap_client_t *a_client, uint8_t a_ch_id) int dap_chain_node_client_send_nodelist_req(dap_chain_node_client_t *a_client); - +static inline const char * dap_chain_node_client_state_to_str( dap_chain_node_client_state_t a_state) +{ + switch (a_state) { + case NODE_CLIENT_STATE_ERROR: return "ERROR"; + case NODE_CLIENT_STATE_DISCONNECTED: return "DISCONNECTED"; + case NODE_CLIENT_STATE_GET_NODE_ADDR: return "GET_NODE_ADDR"; + case NODE_CLIENT_STATE_NODE_ADDR_LEASED: return "NODE_ADDR_LEASED"; + case NODE_CLIENT_STATE_PING: return "PING"; + case NODE_CLIENT_STATE_PONG: return "PONG"; + case NODE_CLIENT_STATE_CONNECT: return "CONNECT"; + case NODE_CLIENT_STATE_CONNECTED: return "CONNECTED"; + case NODE_CLIENT_STATE_SYNC_GDB: return "SYNC_GDB"; + case NODE_CLIENT_STATE_SYNC_CHAINS: return "SYNC_CHAINS"; + case NODE_CLIENT_STATE_SYNCED: return "SYNCED"; + case NODE_CLIENT_STATE_CHECKED: return "CHECKED"; + default: return "(Undefined node client state)"; + } + +} diff --git a/modules/net/srv/dap_chain_net_srv_geoip.c b/modules/net/srv/dap_chain_net_srv_geoip.c index ff7aea1c4fe3aaa8ed24b9d9fe8cb08602728be6..e79d0d48f85a0ab6bd2580698e6d67ab9bb525fd 100644 --- a/modules/net/srv/dap_chain_net_srv_geoip.c +++ b/modules/net/srv/dap_chain_net_srv_geoip.c @@ -76,7 +76,7 @@ geoip_info_t *chain_net_geoip_get_ip_info_by_web(const char *a_ip_str) char * l_custom = l_out_len > 0 ? dap_strdup_printf("Authorization: Basic %s", l_out) : NULL; size_t l_custom_count = 1; // todo just need to finish up https request - dap_client_http_request_custom("geoip.maxmind.com", 443, "GET", "application/json", l_path, NULL, + dap_client_http_request_custom(NULL,"geoip.maxmind.com", 443, "GET", "application/json", l_path, NULL, 0, NULL, m_request_getip_response, m_request_getip_request_error, NULL, &l_custom, l_custom_count); return NULL; }