diff --git a/include/dap_client_pvt.h b/include/dap_client_pvt.h index 5a75234f1affac684556ecc46911f32b75cb00e1..7de116e557a9e1e1a2ec7f215b9215004ab900ec 100755 --- a/include/dap_client_pvt.h +++ b/include/dap_client_pvt.h @@ -50,7 +50,8 @@ typedef struct dap_client_internal char * session_key_id; - + void *curl;// curl connection descriptor + long curl_sockfd;// curl socket descriptor char * last_parsed_node; char * uplink_addr; @@ -80,7 +81,7 @@ typedef struct dap_client_internal dap_client_callback_int_t request_error_callback; } dap_client_pvt_t; -#define DAP_CLIENT_PVT(a) ((dap_client_pvt_t*) a->_internal ) +#define DAP_CLIENT_PVT(a) (a ? (dap_client_pvt_t*) a->_internal : NULL) int dap_client_pvt_init(); void dap_client_pvt_deinit(); diff --git a/src/dap_client.c b/src/dap_client.c index da44cfe8885f2828d33051734a692997b859189a..c23d698910ac950e3e195a2b04d7bf8052a50a06 100644 --- a/src/dap_client.c +++ b/src/dap_client.c @@ -199,6 +199,7 @@ void dap_client_delete(dap_client_t * a_client) //a_client->_internal = NULL; dap_client_pvt_delete(DAP_CLIENT_PVT(a_client)); + a_client->_internal = NULL; //pthread_mutex_t *l_mutex = &a_client->mutex; //memset(a_client, 0, sizeof(dap_client_t)); @@ -322,6 +323,10 @@ void dap_client_request(dap_client_t * a_client, const char * a_full_path, void int dap_client_disconnect( dap_client_t *a_client ) { dap_client_pvt_t *l_client_internal = (a_client) ? DAP_CLIENT_PVT(a_client) : NULL; + if (!l_client_internal) + return -1; + // stop connection + dap_http_client_simple_request_break(l_client_internal->curl_sockfd); if ( l_client_internal && l_client_internal->stream_socket ) { diff --git a/src/dap_client_pvt.c b/src/dap_client_pvt.c index 809fabe27f14ca18b796da7e5684b712e29a5f38..8210530440965500004c0a30a280248d70f75564 100644 --- a/src/dap_client_pvt.c +++ b/src/dap_client_pvt.c @@ -137,6 +137,9 @@ static pthread_cond_t s_cond_ref = PTHREAD_COND_INITIALIZER; int dap_client_pvt_ref(dap_client_pvt_t * a_client_internal) { + if(a_client_internal==0x7fffd8003b00){ + int dbg = 5325; + } int l_ret = 0; dap_client_pvt_ref_count_t *l_client_pvt_ref; pthread_mutex_lock(&s_mutex_ref); @@ -159,6 +162,9 @@ int dap_client_pvt_ref(dap_client_pvt_t * a_client_internal) int dap_client_pvt_unref(dap_client_pvt_t * a_client_internal) { + if(a_client_internal==0x7fffd8003b00){ + int dbg = 5325; + } int l_ret = -1; dap_client_pvt_ref_count_t *l_client_pvt_ref; pthread_mutex_lock(&s_mutex_ref); @@ -183,6 +189,22 @@ int dap_client_pvt_unref(dap_client_pvt_t * a_client_internal) return l_ret; } +int dap_client_pvt_get_ref(dap_client_pvt_t * a_client_internal) +{ + int l_ref_count = -1; + if(a_client_internal==0x7fffd8003b00){ + int dbg = 5325; + } + dap_client_pvt_ref_count_t *l_client_pvt_ref; + pthread_mutex_lock(&s_mutex_ref); + HASH_FIND(hh, s_client_pvt_ref, &a_client_internal, sizeof(dap_client_pvt_t*), l_client_pvt_ref); + if(l_client_pvt_ref) { + l_ref_count = l_client_pvt_ref->ref_count; + } + pthread_mutex_unlock(&s_mutex_ref); + return l_ref_count; +} + /** * @brief dap_client_pvt_wait * @param a_client_internal @@ -239,6 +261,7 @@ static void dap_client_pvt_delete_in(dap_client_pvt_t * a_client_pvt) { if(!a_client_pvt) return; + log_it(L_INFO, "dap_client_pvt_delete 0x%x", a_client_pvt); if(a_client_pvt->session_key_id) DAP_DELETE(a_client_pvt->session_key_id); @@ -255,6 +278,7 @@ static void dap_client_pvt_delete_in(dap_client_pvt_t * a_client_pvt) if(a_client_pvt->stream_key) dap_enc_key_delete(a_client_pvt->stream_key); + a_client_pvt->client = NULL; DAP_DELETE(a_client_pvt); } @@ -480,7 +504,9 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) log_it(L_ERROR, "Error state, doing callback if present"); if(a_client_pvt->stage_status_error_callback) { + dap_client_pvt_ref(a_client_pvt); a_client_pvt->stage_status_error_callback(a_client_pvt->client, (void*)l_is_last_attempt); + dap_client_pvt_unref(a_client_pvt); // Expecting that its one-shot callback //a_client_internal->stage_status_error_callback = NULL; } @@ -526,6 +552,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) bool l_is_last_stage = (a_client_pvt->stage == a_client_pvt->stage_target); if(l_is_last_stage) { + //l_is_unref = true; log_it(L_NOTICE, "Stage %s is achieved", dap_client_stage_str(a_client_pvt->stage)); if(a_client_pvt->stage_target_done_callback) { @@ -561,11 +588,12 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) void dap_client_pvt_stage_transaction_begin(dap_client_pvt_t * a_client_internal, dap_client_stage_t a_stage_next, dap_client_callback_t a_done_callback) { + // ref pvt client + dap_client_pvt_ref(a_client_internal); + a_client_internal->stage_status_done_callback = a_done_callback; a_client_internal->stage = a_stage_next; a_client_internal->stage_status = STAGE_STATUS_IN_PROGRESS; - // ref pvt client - dap_client_pvt_ref(a_client_internal); s_stage_status_after(a_client_internal); } @@ -598,9 +626,8 @@ void dap_client_pvt_request(dap_client_pvt_t * a_client_internal, const char * a l_url = DAP_NEW_Z_SIZE(char, l_url_size_max); snprintf(l_url, l_url_size_max, "http://%s:%u", a_client_internal->uplink_addr, a_client_internal->uplink_port); } - dap_http_client_simple_request(l_url, a_request ? "POST" : "GET", "text/text", a_request, a_request_size, - NULL, - m_request_response, m_request_error, a_client_internal, NULL); + a_client_internal->curl = dap_http_client_simple_request(l_url, a_request ? "POST" : "GET", "text/text", a_request, + a_request_size, NULL, m_request_response, m_request_error, &a_client_internal->curl_sockfd, a_client_internal, NULL); DAP_DELETE(l_url); } @@ -705,7 +732,7 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char } dap_http_client_simple_request_custom(l_url_full, a_request ? "POST" : "GET", "text/text", l_request_enc, l_request_enc_size, NULL, - m_request_response, m_request_error, a_client_internal, a_custom_new, a_custom_count); + m_request_response, a_client_internal->curl_sockfd ,m_request_error, a_client_internal, a_custom_new, a_custom_count); DAP_DELETE(l_key_hdr_str); if(l_sub_url_enc) @@ -735,7 +762,7 @@ void m_request_error(int a_err_code, void * a_obj) a_client_internal->request_error_callback(a_client_internal->client, a_err_code); } // unref pvt client - //dap_client_pvt_unref(a_client_internal); + dap_client_pvt_unref(a_client_internal); } /** @@ -749,7 +776,7 @@ void m_request_response(void * a_response, size_t a_response_size, void * a_obj) dap_client_pvt_t * a_client_internal = (dap_client_pvt_t *) a_obj; if(!a_client_internal || !a_client_internal->client) return; - + int l_ref = dap_client_pvt_get_ref(a_client_internal); if(a_client_internal->is_encrypted) { size_t l_response_dec_size_max = a_response_size ? a_response_size * 2 + 16 : 0; char * l_response_dec = a_response_size ? DAP_NEW_Z_SIZE(char, l_response_dec_size_max) : NULL; @@ -768,8 +795,10 @@ void m_request_response(void * a_response, size_t a_response_size, void * a_obj) a_client_internal->request_response_callback(a_client_internal->client, a_response, a_response_size); } + int l_ref2 = dap_client_pvt_get_ref(a_client_internal); // unref pvt client - dap_client_pvt_unref(DAP_CLIENT_PVT(a_client_internal->client)); + dap_client_pvt_unref(a_client_internal); + //dap_client_pvt_unref(DAP_CLIENT_PVT(a_client_internal->client)); } /** @@ -1047,7 +1076,7 @@ void m_stage_stream_streaming(dap_client_t * a_client, void* arg) */ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg) { - log_it(L_INFO, "====================================================== stream delete/peer reconnect"); + log_it(L_INFO, "================= stream delete/peer reconnect"); dap_client_t *l_client = DAP_CLIENT(a_es); @@ -1058,6 +1087,7 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg) pthread_mutex_lock(&l_client->mutex); dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client); + log_it(L_DEBUG, "client_pvt=0x%x", l_client_pvt); if(l_client_pvt == NULL) { log_it(L_ERROR, "dap_client_pvt is not initialized"); pthread_mutex_unlock(&l_client->mutex); @@ -1067,8 +1097,8 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg) dap_stream_delete(l_client_pvt->stream); l_client_pvt->stream = NULL; - if(l_client_pvt->client) - dap_client_reset(l_client_pvt->client); +// if(l_client_pvt->client && l_client_pvt->client == l_client) +// dap_client_reset(l_client_pvt->client); // l_client_pvt->client= NULL;