diff --git a/dap-sdk/net/client/dap_client.c b/dap-sdk/net/client/dap_client.c index 15f40ca49ce61311b2e07653da242ba876656d27..faab826f00fa4eaff6410be2e888cb82d4453d35 100644 --- a/dap-sdk/net/client/dap_client.c +++ b/dap-sdk/net/client/dap_client.c @@ -24,6 +24,7 @@ #include "dap_client.h" #include "dap_client_pvt.h" #include "dap_stream_ch_proc.h" +#include "dap_stream_worker.h" #define LOG_TAG "dap_client" @@ -215,28 +216,12 @@ void dap_client_delete(dap_client_t * a_client) { if(!a_client) return; - pthread_mutex_lock(&a_client->mutex); - //dap_client_disconnect(a_client); - //dap_client_reset(a_client); - - //dap_client_pvt_t *l_client_pvt = DAP_CLIENT_PVT(a_client); - // reset l_client_pvt (before removal) - //memset(l_client_pvt, 0, sizeof(dap_client_pvt_t)); - //a_client->_internal = NULL; - dap_client_pvt_delete(DAP_CLIENT_PVT(a_client)); - //a_client->_internal = NULL; - - //pthread_mutex_t *l_mutex = &a_client->mutex; - //memset(a_client, 0, sizeof(dap_client_t)); - //pthread_mutex_unlock(l_mutex); pthread_mutex_unlock(&a_client->mutex); pthread_mutex_destroy(&a_client->mutex); - // a_client will be deleted in dap_events_socket_delete() -> free( a_es->_inheritor ); - //DAP_DELETE(a_client); - DAP_DEL_Z(a_client); + DAP_DELETE(a_client); } /** @@ -257,26 +242,35 @@ void dap_client_go_stage(dap_client_t * a_client, dap_client_stage_t a_stage_tar } dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); + assert(l_client_internal); + + pthread_mutex_lock( &l_client_internal->stage_mutex); l_client_internal->stage_target = a_stage_target; l_client_internal->stage_target_done_callback = a_stage_end_callback; - if(a_stage_target != l_client_internal->stage ){ // Going to stages downstairs - switch(l_client_internal->stage_status ){ + + dap_client_stage_t l_cur_stage = l_client_internal->stage; + dap_client_stage_status_t l_cur_stage_status= l_client_internal->stage_status; + pthread_mutex_unlock( &l_client_internal->stage_mutex); + + + if(a_stage_target != l_cur_stage ){ // Going to stages downstairs + switch(l_cur_stage_status ){ case STAGE_STATUS_ABORTING: log_it(L_ERROR, "Already aborting the stage %s" - , dap_client_stage_str(l_client_internal->stage)); + , dap_client_stage_str(l_cur_stage)); break; case STAGE_STATUS_IN_PROGRESS:{ log_it(L_WARNING, "Status progress the stage %s" - , dap_client_stage_str(l_client_internal->stage)); + , dap_client_stage_str(l_cur_stage)); }break; case STAGE_STATUS_DONE: case STAGE_STATUS_ERROR: default: { log_it(L_DEBUG, "Start transitions chain to %s" - ,dap_client_stage_str(l_client_internal->stage_target) ); - int step = (a_stage_target > l_client_internal->stage)?1:-1; + ,dap_client_stage_str(a_stage_target) ); + int step = (a_stage_target > l_cur_stage)?1:-1; dap_client_pvt_stage_transaction_begin(l_client_internal, - l_client_internal->stage+step, + l_cur_stage+step, m_stage_fsm_operator ); } @@ -284,8 +278,10 @@ void dap_client_go_stage(dap_client_t * a_client, dap_client_stage_t a_stage_tar }else{ // Same stage log_it(L_ERROR,"We're already on stage %s",dap_client_stage_str(a_stage_target)); } + } + /** * @brief m_stage_fsm_operator * @param a_client diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 3eb3a14148778e6d94193e62e9f712b783018c73..f3627a7042a4691d100d0966f57ce16f75dcc45c 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -80,8 +80,6 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_internal); -const static dap_enc_key_type_t s_dap_client_pvt_preferred_encryption_type = DAP_ENC_KEY_TYPE_IAES; - // ENC stage callbacks void m_enc_init_response(dap_client_t *, void *, size_t); void m_enc_init_error(dap_client_t *, int); @@ -130,181 +128,42 @@ void dap_client_pvt_new(dap_client_pvt_t * a_client_internal) a_client_internal->stage_status = STAGE_STATUS_DONE; a_client_internal->uplink_protocol_version = DAP_PROTOCOL_VERSION; a_client_internal->events = dap_events_get_default(); + pthread_mutex_init( &a_client_internal->disconnected_mutex, NULL); + pthread_cond_init( &a_client_internal->disconnected_cond, NULL); + pthread_mutex_init( &a_client_internal->stage_mutex, NULL); // add to list dap_client_pvt_hh_add(a_client_internal); } -typedef struct dap_client_pvt_ref_count { - dap_client_pvt_t *client_internal; - uint32_t ref_count; - UT_hash_handle hh; -} dap_client_pvt_ref_count_t; - -//static dap_client_pvt_ref_count_t *s_client_pvt_ref = NULL; -//static pthread_mutex_t s_mutex_ref = PTHREAD_MUTEX_INITIALIZER; -//static pthread_cond_t s_cond_ref = PTHREAD_COND_INITIALIZER; - -/* -int dap_client_pvt_ref(dap_client_pvt_t * a_client_internal) -{ - if(a_client_internal==0x7fffd8003b00){ - int dbg = 5325; - } - int l_ret = 0; - dap_client_pvt_ref_count_t *l_client_pvt_ref; - pthread_mutex_lock(&s_mutex_ref); - HASH_FIND(hh, s_client_pvt_ref, &a_client_internal, sizeof(dap_client_pvt_t*), l_client_pvt_ref); - if(!l_client_pvt_ref) { - l_client_pvt_ref = DAP_NEW_Z(dap_client_pvt_ref_count_t); - l_client_pvt_ref->client_internal = a_client_internal; - l_client_pvt_ref->ref_count = 1; - HASH_ADD(hh, s_client_pvt_ref, client_internal, sizeof(dap_client_pvt_t*), l_client_pvt_ref); - } - else { - l_client_pvt_ref->ref_count++; - } - l_ret = l_client_pvt_ref->ref_count; - //printf("** ref %d %x\n\n", l_client_pvt_ref->ref_count, a_client_internal); - pthread_mutex_unlock(&s_mutex_ref); - - return l_ret; -} - -int dap_client_pvt_unref(dap_client_pvt_t * a_client_internal) -{ - if(a_client_internal==0x7fffd8003b00){ - int dbg = 5325; - } - int l_ret = -1; - dap_client_pvt_ref_count_t *l_client_pvt_ref; - pthread_mutex_lock(&s_mutex_ref); - HASH_FIND(hh, s_client_pvt_ref, &a_client_internal, sizeof(dap_client_pvt_t*), l_client_pvt_ref); - if(l_client_pvt_ref) { - if(l_client_pvt_ref->ref_count <= 1) { - HASH_DELETE(hh, s_client_pvt_ref, l_client_pvt_ref); - DAP_DELETE(l_client_pvt_ref); - pthread_cond_broadcast(&s_cond_ref); - l_ret = 0; - } - else { - l_client_pvt_ref->ref_count--; - l_ret = l_client_pvt_ref->ref_count; - } - } - else{ - l_ret = -1; - } - //printf("** unref %d %x\n\n", l_ret, a_client_internal); - pthread_mutex_unlock(&s_mutex_ref); - return l_ret; -} - -int dap_client_pvt_get_ref(dap_client_pvt_t * a_client_internal) -{ - int l_ref_count = -1; - if(a_client_internal==0x7fffd8003b00){ - int dbg = 5325; - } - dap_client_pvt_ref_count_t *l_client_pvt_ref; - pthread_mutex_lock(&s_mutex_ref); - HASH_FIND(hh, s_client_pvt_ref, &a_client_internal, sizeof(dap_client_pvt_t*), l_client_pvt_ref); - if(l_client_pvt_ref) { - l_ref_count = l_client_pvt_ref->ref_count; - } - pthread_mutex_unlock(&s_mutex_ref); - return l_ref_count; -} - -int dap_client_pvt_wait_unref(dap_client_pvt_t * a_client_internal, int a_timeout_ms) +/** + * @brief s_client_pvt_disconnected + * @param a_client + * @param a_arg + */ +static void s_client_pvt_disconnected(dap_client_t * a_client, void * a_arg ) { - if(!a_client_internal) - return -1; - int l_ret = 0; - dap_client_pvt_ref_count_t *l_client_pvt_ref; - do { - pthread_mutex_lock(&s_mutex_ref); - HASH_FIND(hh, s_client_pvt_ref, &a_client_internal, sizeof(dap_client_pvt_t*), l_client_pvt_ref); - // wait for release a_client_internal - if(l_client_pvt_ref) { - struct timeval now; - struct timespec l_to; - gettimeofday(&now, 0); - l_to.tv_sec = now.tv_sec; // sec - l_to.tv_nsec = now.tv_usec * 1000; // nsec - int64_t l_nsec_new = l_to.tv_nsec + a_timeout_ms * 1000000ll; - // if the new number of nanoseconds is more than a second - if(l_nsec_new > (long) 1e9) { - l_to.tv_sec += l_nsec_new / (long) 1e9; - l_to.tv_nsec = l_nsec_new % (long) 1e9; - } - else - l_to.tv_nsec = (long) l_nsec_new; - int l_res = pthread_cond_timedwait(&s_cond_ref, &s_mutex_ref, &l_to); - if(l_res == ETIMEDOUT) { - l_ret = -1; - } - else { - //a_timeout_ms = 0; - pthread_mutex_unlock(&s_mutex_ref); - continue; - } - } - else - l_ret = 0; - - //printf("** end wait %x\n\n", a_client_internal); - pthread_mutex_unlock(&s_mutex_ref); - } - while(l_client_pvt_ref); - return l_ret; + (void) a_arg; + pthread_mutex_lock(&DAP_CLIENT_PVT(a_client)->disconnected_mutex); + pthread_mutex_unlock(&DAP_CLIENT_PVT(a_client)->disconnected_mutex); + pthread_cond_broadcast(&DAP_CLIENT_PVT(a_client)->disconnected_cond); } -*/ /** * @brief dap_client_disconnect * @param a_client * @return */ -int dap_client_pvt_disconnect(dap_client_pvt_t *a_client_pvt) +int dap_client_pvt_disconnect_all(dap_client_pvt_t *a_client_pvt) { //dap_client_pvt_t *a_client_pvt = (a_client) ? DAP_CLIENT_PVT(a_client) : NULL; if(!a_client_pvt) return -1; - // stop connection - //dap_http_client_simple_request_break(l_client_internal->curl_sockfd); - - if(a_client_pvt && a_client_pvt->stream_socket) { - -// if ( l_client_internal->stream_es ) { -// dap_events_socket_remove_and_delete( l_client_internal->stream_es, true ); -// l_client_internal->stream_es = NULL; -// } - -// l_client_internal->stream_es->signal_close = true; - // start stopping connection - if(a_client_pvt->stream_es ) { - dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_es->worker, a_client_pvt->stream_es); - int l_counter = 0; - // wait for stop of connection (max 0.7 sec.) - while(a_client_pvt->stream_es && l_counter < 70) { - dap_usleep(DAP_USEC_PER_SEC / 100); - l_counter++; - } - } -// if (l_client_internal->stream_socket ) { -// close (l_client_internal->stream_socket); -// l_client_internal->stream_socket = 0; -// } - - return 1; - } - //l_client_internal->stream_socket = 0; - - a_client_pvt->is_reconnect = false; - log_it(L_DEBUG, "dap_client_pvt_disconnect() done"); + pthread_mutex_lock(&a_client_pvt->disconnected_mutex); + dap_client_go_stage(a_client_pvt->client, STAGE_BEGIN, s_client_pvt_disconnected ); + pthread_cond_wait(&a_client_pvt->disconnected_cond, &a_client_pvt->disconnected_mutex); - return -1; + return 0; } /** @@ -321,7 +180,7 @@ static void dap_client_pvt_delete_in(dap_client_pvt_t * a_client_pvt) return; } - dap_client_pvt_disconnect(a_client_pvt); + dap_client_pvt_disconnect_all(a_client_pvt); log_it(L_INFO, "dap_client_pvt_delete 0x%x", a_client_pvt); @@ -345,6 +204,9 @@ static void dap_client_pvt_delete_in(dap_client_pvt_t * a_client_pvt) dap_enc_key_delete(a_client_pvt->stream_key); a_client_pvt->stream_key = NULL; + pthread_mutex_destroy( &a_client_pvt->disconnected_mutex); + pthread_cond_destroy( &a_client_pvt->disconnected_cond); + pthread_mutex_destroy(&a_client_pvt->stage_mutex); //a_client_pvt->client = NULL; // DAP_DELETE(a_client_pvt); } @@ -372,27 +234,6 @@ void dap_client_pvt_delete(dap_client_pvt_t * a_client_pvt) dap_client_pvt_delete_in(a_client_pvt); } -/** - * Make socket non-blocking / blocking - * is_nonblock - (true) non-blocking / (false) blocking - */ -static void s_set_sock_nonblock(int sockfd, bool is_nonblock) -{ -// for Windows -#ifdef _WIN32 - unsigned long arg = is_nonblock; - ioctlsocket((SOCKET)sockfd, FIONBIO, &arg); -// for Unix-like OS -#else - int arg = fcntl(sockfd, F_GETFL, NULL); - if(is_nonblock) - arg |= O_NONBLOCK; - else - arg |= ~O_NONBLOCK; - fcntl(sockfd, F_SETFL, arg); -#endif -} - /** * @brief s_client_internal_stage_status_proc * @param a_client @@ -400,297 +241,293 @@ static void s_set_sock_nonblock(int sockfd, bool is_nonblock) static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) { //bool l_is_unref = false; - switch (a_client_pvt->stage_status) { - case STAGE_STATUS_IN_PROGRESS: { - switch (a_client_pvt->stage) { - case STAGE_ENC_INIT: { - log_it(L_INFO, "Go to stage ENC: prepare the request"); - a_client_pvt->session_key_open = dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_MSRLN, NULL, 0, NULL, 0, 0); - if (!a_client_pvt->session_key_open) { - log_it(L_ERROR, "Insufficient memory! May be a huge memory leak present"); - a_client_pvt->stage_status = STAGE_STATUS_ERROR; - break; - } - size_t l_key_size = a_client_pvt->session_key_open->pub_key_data_size; - dap_cert_t *l_cert = a_client_pvt->auth_cert; - dap_sign_t *l_sign = NULL; - size_t l_sign_size = 0; - if (l_cert) { - l_sign = dap_sign_create(l_cert->enc_key, a_client_pvt->session_key_open->pub_key_data, l_key_size, 0); - l_sign_size = dap_sign_get_size(l_sign); - } - uint8_t l_data[l_key_size + l_sign_size]; - memcpy(l_data,a_client_pvt->session_key_open->pub_key_data, l_key_size); - if (l_sign) { - memcpy(l_data + l_key_size, l_sign, l_sign_size); - } - size_t l_data_str_size_max = DAP_ENC_BASE64_ENCODE_SIZE(l_key_size + l_sign_size); - char l_data_str[l_data_str_size_max + 1]; - // DAP_ENC_DATA_TYPE_B64_URLSAFE not need because send it by POST request - size_t l_data_str_enc_size = dap_enc_base64_encode(l_data, l_key_size + l_sign_size, l_data_str, DAP_ENC_DATA_TYPE_B64); - log_it(L_DEBUG, "ENC request size %u", l_data_str_enc_size); - int l_res = dap_client_pvt_request(a_client_pvt, DAP_UPLINK_PATH_ENC_INIT "/gd4y5yh78w42aaagh", - l_data_str, l_data_str_enc_size, m_enc_init_response, m_enc_init_error); - // bad request - if(l_res<0){ - a_client_pvt->stage_status = STAGE_STATUS_ERROR; - } - } - break; - case STAGE_STREAM_CTL: { - log_it(L_INFO, "Go to stage STREAM_CTL: prepare the request"); - - char *l_request = dap_strdup_printf("%d", DAP_CLIENT_PROTOCOL_VERSION); - size_t l_request_size = dap_strlen(l_request); - log_it(L_DEBUG, "STREAM_CTL request size %u", strlen(l_request)); + case STAGE_STATUS_IN_PROGRESS: { + switch (a_client_pvt->stage) { + case STAGE_ENC_INIT: { + log_it(L_INFO, "Go to stage ENC: prepare the request"); + a_client_pvt->session_key_open = dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_MSRLN, NULL, 0, NULL, 0, 0); + if (!a_client_pvt->session_key_open) { + log_it(L_ERROR, "Insufficient memory! May be a huge memory leak present"); + a_client_pvt->stage_status = STAGE_STATUS_ERROR; + break; + } + size_t l_key_size = a_client_pvt->session_key_open->pub_key_data_size; + dap_cert_t *l_cert = a_client_pvt->auth_cert; + dap_sign_t *l_sign = NULL; + size_t l_sign_size = 0; + if (l_cert) { + l_sign = dap_sign_create(l_cert->enc_key, a_client_pvt->session_key_open->pub_key_data, l_key_size, 0); + l_sign_size = dap_sign_get_size(l_sign); + } + uint8_t l_data[l_key_size + l_sign_size]; + memcpy(l_data,a_client_pvt->session_key_open->pub_key_data, l_key_size); + if (l_sign) { + memcpy(l_data + l_key_size, l_sign, l_sign_size); + } + size_t l_data_str_size_max = DAP_ENC_BASE64_ENCODE_SIZE(l_key_size + l_sign_size); + char l_data_str[l_data_str_size_max + 1]; + // DAP_ENC_DATA_TYPE_B64_URLSAFE not need because send it by POST request + size_t l_data_str_enc_size = dap_enc_base64_encode(l_data, l_key_size + l_sign_size, l_data_str, DAP_ENC_DATA_TYPE_B64); + log_it(L_DEBUG, "ENC request size %u", l_data_str_enc_size); + int l_res = dap_client_pvt_request(a_client_pvt, DAP_UPLINK_PATH_ENC_INIT "/gd4y5yh78w42aaagh", + l_data_str, l_data_str_enc_size, m_enc_init_response, m_enc_init_error); + // bad request + if(l_res<0){ + a_client_pvt->stage_status = STAGE_STATUS_ERROR; + } + } + break; + case STAGE_STREAM_CTL: { + log_it(L_INFO, "Go to stage STREAM_CTL: prepare the request"); - char *l_suburl; + char *l_request = dap_strdup_printf("%d", DAP_CLIENT_PROTOCOL_VERSION); + size_t l_request_size = dap_strlen(l_request); + log_it(L_DEBUG, "STREAM_CTL request size %u", strlen(l_request)); - uint32_t l_least_common_dap_protocol = min(a_client_pvt->remote_protocol_version, - a_client_pvt->uplink_protocol_version); + char *l_suburl; - if(l_least_common_dap_protocol < 23){ - l_suburl = dap_strdup_printf("stream_ctl,channels=%s", - a_client_pvt->active_channels); - }else{ - l_suburl = dap_strdup_printf("stream_ctl,channels=%s,enc_type=%d,enc_headers=%d", - a_client_pvt->active_channels,dap_stream_get_preferred_encryption_type(),0); - } - // - dap_client_pvt_request_enc(a_client_pvt, - DAP_UPLINK_PATH_STREAM_CTL, - l_suburl, "type=tcp,maxconn=4", l_request, l_request_size, - m_stream_ctl_response, m_stream_ctl_error); - DAP_DELETE(l_request); - DAP_DELETE(l_suburl); - } - break; - case STAGE_STREAM_SESSION: { - log_it(L_INFO, "Go to stage STREAM_SESSION: process the state ops"); - - a_client_pvt->stream_socket = socket( PF_INET, SOCK_STREAM, 0); - if (a_client_pvt->stream_socket == -1) { - log_it(L_ERROR, "Error %d with socket create", errno); - a_client_pvt->stage_status = STAGE_STATUS_ERROR; - break; - } -#ifdef _WIN32 - { - int buffsize = 65536; - int optsize = sizeof( int ); - setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (char *)&buffsize, &optsize ); - setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, &optsize ); - } -#else - int buffsize = 65536; - setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (const void *) &buffsize, sizeof(int)); - setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (const void *) &buffsize, sizeof(int)); -#endif + uint32_t l_least_common_dap_protocol = min(a_client_pvt->remote_protocol_version, + a_client_pvt->uplink_protocol_version); - // Wrap socket and setup callbacks - static dap_events_socket_callbacks_t l_s_callbacks = { - .read_callback = m_es_stream_read, - .write_callback = m_es_stream_write, - .error_callback = m_es_stream_error, - .delete_callback = m_es_stream_delete - }; - a_client_pvt->stream_es = dap_events_socket_wrap_no_add(a_client_pvt->events, - a_client_pvt->stream_socket, &l_s_callbacks); - dap_worker_t * l_worker = dap_events_worker_get_auto(); - assert(l_worker); - assert(l_worker->_inheritor); - a_client_pvt->stream_worker = DAP_STREAM_WORKER(l_worker); - // add to dap_worker - dap_events_socket_assign_on_worker_mt(a_client_pvt->stream_es, l_worker); - - a_client_pvt->stream_es->_inheritor = a_client_pvt;//->client; - a_client_pvt->stream = dap_stream_new_es_client(a_client_pvt->stream_es); - assert(a_client_pvt->stream); - a_client_pvt->stream->is_client_to_uplink = true; - a_client_pvt->stream->session = dap_stream_session_pure_new(); // may be from in packet? - - // new added, whether it is necessary? - a_client_pvt->stream->session->key = a_client_pvt->stream_key; - a_client_pvt->stream->stream_worker = a_client_pvt->stream_worker; - - - // connect - struct sockaddr_in l_remote_addr; - memset(&l_remote_addr, 0, sizeof(l_remote_addr)); - l_remote_addr.sin_family = AF_INET; - l_remote_addr.sin_port = htons(a_client_pvt->uplink_port); - if(inet_pton(AF_INET, a_client_pvt->uplink_addr, &(l_remote_addr.sin_addr)) < 0) { - log_it(L_ERROR, "Wrong remote address '%s:%u'", a_client_pvt->uplink_addr, a_client_pvt->uplink_port); - //close(a_client_pvt->stream_socket); - dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es); - //a_client_pvt->stream_socket = 0; - a_client_pvt->stage_status = STAGE_STATUS_ERROR; - } - else { - int l_err = 0; - if((l_err = connect(a_client_pvt->stream_socket, (struct sockaddr *) &l_remote_addr, - sizeof(struct sockaddr_in))) != -1) { - - // a_client_pvt->stream_es->flags &= ~DAP_SOCK_SIGNAL_CLOSE;// ??? what it was? Why out of esocket context??? - //s_set_sock_nonblock(a_client_pvt->stream_socket, false); - log_it(L_INFO, "Remote address connected (%s:%u) with sock_id %d (assign on worker #%u)", a_client_pvt->uplink_addr, - a_client_pvt->uplink_port, a_client_pvt->stream_socket, l_worker->id); - a_client_pvt->stage_status = STAGE_STATUS_DONE; - } - else { - log_it(L_ERROR, "Remote address can't connected (%s:%u) with sock_id %d", a_client_pvt->uplink_addr, - a_client_pvt->uplink_port); - dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es); - //close(a_client_pvt->stream_socket); - a_client_pvt->stream_socket = 0; - a_client_pvt->stage_status = STAGE_STATUS_ERROR; + if(l_least_common_dap_protocol < 23){ + l_suburl = dap_strdup_printf("stream_ctl,channels=%s", + a_client_pvt->active_channels); + }else{ + l_suburl = dap_strdup_printf("stream_ctl,channels=%s,enc_type=%d,enc_headers=%d", + a_client_pvt->active_channels,dap_stream_get_preferred_encryption_type(),0); + } + // + dap_client_pvt_request_enc(a_client_pvt, + DAP_UPLINK_PATH_STREAM_CTL, + l_suburl, "type=tcp,maxconn=4", l_request, l_request_size, + m_stream_ctl_response, m_stream_ctl_error); + DAP_DELETE(l_request); + DAP_DELETE(l_suburl); } - } - s_stage_status_after(a_client_pvt); + break; + case STAGE_STREAM_SESSION: { + log_it(L_INFO, "Go to stage STREAM_SESSION: process the state ops"); + + a_client_pvt->stream_socket = socket( PF_INET, SOCK_STREAM, 0); + if (a_client_pvt->stream_socket == -1) { + log_it(L_ERROR, "Error %d with socket create", errno); + a_client_pvt->stage_status = STAGE_STATUS_ERROR; + break; + } + #ifdef _WIN32 + { + int buffsize = 65536; + int optsize = sizeof( int ); + setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (char *)&buffsize, &optsize ); + setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, &optsize ); + } + #else + int buffsize = 65536; + setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (const void *) &buffsize, sizeof(int)); + setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (const void *) &buffsize, sizeof(int)); + #endif + + // Wrap socket and setup callbacks + static dap_events_socket_callbacks_t l_s_callbacks = { + .read_callback = m_es_stream_read, + .write_callback = m_es_stream_write, + .error_callback = m_es_stream_error, + .delete_callback = m_es_stream_delete + }; + a_client_pvt->stream_es = dap_events_socket_wrap_no_add(a_client_pvt->events, + a_client_pvt->stream_socket, &l_s_callbacks); + dap_worker_t * l_worker = dap_events_worker_get_auto(); + assert(l_worker); + assert(l_worker->_inheritor); + a_client_pvt->stream_worker = DAP_STREAM_WORKER(l_worker); + // add to dap_worker + dap_events_socket_assign_on_worker_mt(a_client_pvt->stream_es, l_worker); + + a_client_pvt->stream_es->_inheritor = a_client_pvt;//->client; + a_client_pvt->stream = dap_stream_new_es_client(a_client_pvt->stream_es); + assert(a_client_pvt->stream); + a_client_pvt->stream->is_client_to_uplink = true; + a_client_pvt->stream->session = dap_stream_session_pure_new(); // may be from in packet? + + // new added, whether it is necessary? + a_client_pvt->stream->session->key = a_client_pvt->stream_key; + a_client_pvt->stream->stream_worker = a_client_pvt->stream_worker; + + + // connect + struct sockaddr_in l_remote_addr; + memset(&l_remote_addr, 0, sizeof(l_remote_addr)); + l_remote_addr.sin_family = AF_INET; + l_remote_addr.sin_port = htons(a_client_pvt->uplink_port); + if(inet_pton(AF_INET, a_client_pvt->uplink_addr, &(l_remote_addr.sin_addr)) < 0) { + log_it(L_ERROR, "Wrong remote address '%s:%u'", a_client_pvt->uplink_addr, a_client_pvt->uplink_port); + //close(a_client_pvt->stream_socket); + dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es); + //a_client_pvt->stream_socket = 0; + a_client_pvt->stage_status = STAGE_STATUS_ERROR; + } + else { + int l_err = 0; + if((l_err = connect(a_client_pvt->stream_socket, (struct sockaddr *) &l_remote_addr, + sizeof(struct sockaddr_in))) != -1) { + + // a_client_pvt->stream_es->flags &= ~DAP_SOCK_SIGNAL_CLOSE;// ??? what it was? Why out of esocket context??? + //s_set_sock_nonblock(a_client_pvt->stream_socket, false); + log_it(L_INFO, "Remote address connected (%s:%u) with sock_id %d (assign on worker #%u)", a_client_pvt->uplink_addr, + a_client_pvt->uplink_port, a_client_pvt->stream_socket, l_worker->id); + a_client_pvt->stage_status = STAGE_STATUS_DONE; + } + else { + log_it(L_ERROR, "Remote address can't connected (%s:%u) with sock_id %d", a_client_pvt->uplink_addr, + a_client_pvt->uplink_port); + dap_events_socket_remove_and_delete_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es); + //close(a_client_pvt->stream_socket); + a_client_pvt->stream_socket = 0; + a_client_pvt->stage_status = STAGE_STATUS_ERROR; + } + } + s_stage_status_after(a_client_pvt); - } - break; - case STAGE_STREAM_CONNECTED: { - log_it(L_INFO, "Go to stage STAGE_STREAM_CONNECTED"); - size_t count_channels = a_client_pvt->active_channels? strlen(a_client_pvt->active_channels) : 0; - for(size_t i = 0; i < count_channels; i++) { - dap_stream_ch_new(a_client_pvt->stream, (uint8_t) a_client_pvt->active_channels[i]); - //sid->channel[i]->ready_to_write = true; - } + } + break; + case STAGE_STREAM_CONNECTED: { + log_it(L_INFO, "Go to stage STAGE_STREAM_CONNECTED"); + size_t count_channels = a_client_pvt->active_channels? strlen(a_client_pvt->active_channels) : 0; + for(size_t i = 0; i < count_channels; i++) { + dap_stream_ch_new(a_client_pvt->stream, (uint8_t) a_client_pvt->active_channels[i]); + //sid->channel[i]->ready_to_write = true; + } - char* l_full_path = NULL; - const char * l_path = "stream"; - const char *l_suburl = "globaldb"; - int l_full_path_size = snprintf(l_full_path, 0, "%s/%s?session_id=%s", DAP_UPLINK_PATH_STREAM, l_suburl, - dap_client_get_stream_id(a_client_pvt->client)); - l_full_path = DAP_NEW_Z_SIZE(char, l_full_path_size + 1); - snprintf(l_full_path, l_full_path_size + 1, "%s/%s?session_id=%s", DAP_UPLINK_PATH_STREAM, l_suburl, - dap_client_get_stream_id(a_client_pvt->client)); + char* l_full_path = NULL; + const char * l_path = "stream"; + const char *l_suburl = "globaldb"; + int l_full_path_size = snprintf(l_full_path, 0, "%s/%s?session_id=%s", DAP_UPLINK_PATH_STREAM, l_suburl, + dap_client_get_stream_id(a_client_pvt->client)); + l_full_path = DAP_NEW_Z_SIZE(char, l_full_path_size + 1); + snprintf(l_full_path, l_full_path_size + 1, "%s/%s?session_id=%s", DAP_UPLINK_PATH_STREAM, l_suburl, + dap_client_get_stream_id(a_client_pvt->client)); - //dap_client_request(a_client_pvt->client, l_full_path, "12345", 0, m_stream_response, m_stream_error); + //dap_client_request(a_client_pvt->client, l_full_path, "12345", 0, m_stream_response, m_stream_error); - const char *l_add_str = ""; + const char *l_add_str = ""; - dap_events_socket_write_f_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es, "GET /%s HTTP/1.1\r\n" - "Host: %s:%d%s\r\n" - "\r\n", - l_full_path, a_client_pvt->uplink_addr, a_client_pvt->uplink_port, l_add_str); - DAP_DELETE(l_full_path); + dap_events_socket_write_f_mt(a_client_pvt->stream_worker->worker, a_client_pvt->stream_es, "GET /%s HTTP/1.1\r\n" + "Host: %s:%d%s\r\n" + "\r\n", + l_full_path, a_client_pvt->uplink_addr, a_client_pvt->uplink_port, l_add_str); + DAP_DELETE(l_full_path); - a_client_pvt->stage_status = STAGE_STATUS_DONE; - s_stage_status_after(a_client_pvt); - } - break; - case STAGE_STREAM_STREAMING: { - log_it(L_INFO, "Go to stage STAGE_STREAM_STREAMING"); + a_client_pvt->stage_status = STAGE_STATUS_DONE; + s_stage_status_after(a_client_pvt); + } + break; + case STAGE_STREAM_STREAMING: { + log_it(L_INFO, "Go to stage STAGE_STREAM_STREAMING"); - a_client_pvt->stage_status = STAGE_STATUS_DONE; - s_stage_status_after(a_client_pvt); + a_client_pvt->stage_status = STAGE_STATUS_DONE; + s_stage_status_after(a_client_pvt); - } - break; + } + break; - default: { - log_it(L_ERROR, "Undefined proccessing actions for stage status %s", - dap_client_stage_status_str(a_client_pvt->stage_status)); - a_client_pvt->stage_status = STAGE_STATUS_ERROR; - s_stage_status_after(a_client_pvt); // be carefull to not to loop! - } - } - } - break; -// case STAGE_STATUS_ABORTING: { -// log_it(L_ERROR, "Aborting state"); -// } - break; - case STAGE_STATUS_ERROR: { - // limit the number of attempts - int MAX_ATTEMPTS = 3; - a_client_pvt->connect_attempt++; - bool l_is_last_attempt = a_client_pvt->connect_attempt > MAX_ATTEMPTS ? true : false; - - log_it(L_ERROR, "Error state, doing callback if present"); - if(a_client_pvt->stage_status_error_callback) { - //dap_client_pvt_ref(a_client_pvt); - if(a_client_pvt == a_client_pvt->client->_internal) - a_client_pvt->stage_status_error_callback(a_client_pvt->client, (void*) l_is_last_attempt); - else { - log_it(L_ERROR, "client_pvt->client=%x corrupted", a_client_pvt->client->_internal); + default: { + log_it(L_ERROR, "Undefined proccessing actions for stage status %s", + dap_client_stage_status_str(a_client_pvt->stage_status)); + a_client_pvt->stage_status = STAGE_STATUS_ERROR; + s_stage_status_after(a_client_pvt); // be carefull to not to loop! + } } - //dap_client_pvt_unref(a_client_pvt); - // Expecting that its one-shot callback - //a_client_internal->stage_status_error_callback = NULL; } - if(a_client_pvt->stage_target == STAGE_STREAM_ABORT) { - a_client_pvt->stage = STAGE_STREAM_ABORT; - a_client_pvt->stage_status = STAGE_STATUS_ABORTING; - // unref pvt - //l_is_unref = true; - } else if (a_client_pvt->last_error != ERROR_NETWORK_CONNECTION_TIMEOUT) { - if(!l_is_last_attempt) { - // small delay before next request - log_it(L_INFO, "Connection attempt %d", a_client_pvt->connect_attempt); -#ifdef _WIN32 - Sleep(300);// 0.3 sec -#else - usleep(300000);// 0.3 sec -#endif - a_client_pvt->stage = STAGE_ENC_INIT; - // Trying the step again - a_client_pvt->stage_status = STAGE_STATUS_IN_PROGRESS; + break; + // case STAGE_STATUS_ABORTING: { + // log_it(L_ERROR, "Aborting state"); + // } + break; + case STAGE_STATUS_ERROR: { + // limit the number of attempts + int MAX_ATTEMPTS = 3; + a_client_pvt->connect_attempt++; + bool l_is_last_attempt = a_client_pvt->connect_attempt > MAX_ATTEMPTS ? true : false; + + log_it(L_ERROR, "Error state, doing callback if present"); + if(a_client_pvt->stage_status_error_callback) { //dap_client_pvt_ref(a_client_pvt); - s_stage_status_after(a_client_pvt); + if(a_client_pvt == a_client_pvt->client->_internal) + a_client_pvt->stage_status_error_callback(a_client_pvt->client, (void*) l_is_last_attempt); + else { + log_it(L_ERROR, "client_pvt->client=%x corrupted", a_client_pvt->client->_internal); + } + //dap_client_pvt_unref(a_client_pvt); + // Expecting that its one-shot callback + //a_client_internal->stage_status_error_callback = NULL; } - else{ - log_it(L_INFO, "Too many connection attempts. Tries are over."); - a_client_pvt->stage_status = STAGE_STATUS_DONE; + if(a_client_pvt->stage_target == STAGE_STREAM_ABORT) { + a_client_pvt->stage = STAGE_STREAM_ABORT; + a_client_pvt->stage_status = STAGE_STATUS_ABORTING; // unref pvt //l_is_unref = true; + } else if (a_client_pvt->last_error != ERROR_NETWORK_CONNECTION_TIMEOUT) { + if(!l_is_last_attempt) { + // small delay before next request + log_it(L_INFO, "Connection attempt %d", a_client_pvt->connect_attempt); + #ifdef _WIN32 + Sleep(300);// 0.3 sec + #else + usleep(300000);// 0.3 sec + #endif + a_client_pvt->stage = STAGE_ENC_INIT; + // Trying the step again + a_client_pvt->stage_status = STAGE_STATUS_IN_PROGRESS; + //dap_client_pvt_ref(a_client_pvt); + s_stage_status_after(a_client_pvt); + } + else{ + log_it(L_INFO, "Too many connection attempts. Tries are over."); + a_client_pvt->stage_status = STAGE_STATUS_DONE; + // unref pvt + //l_is_unref = true; + } } } - } - break; - case STAGE_STATUS_DONE: { - log_it(L_INFO, "Stage status %s is done", - dap_client_stage_str(a_client_pvt->stage)); - // go to next stage - if(a_client_pvt->stage_status_done_callback) { - a_client_pvt->stage_status_done_callback(a_client_pvt->client, NULL); - // Expecting that its one-shot callback - //a_client_internal->stage_status_done_callback = NULL; - } else - log_it(L_WARNING, "Stage done callback is not present"); - - bool l_is_last_stage = (a_client_pvt->stage == a_client_pvt->stage_target); - if(l_is_last_stage) { - //l_is_unref = true; - if(a_client_pvt->stage_target_done_callback) { - log_it(L_NOTICE, "Stage %s is achieved", - dap_client_stage_str(a_client_pvt->stage)); - a_client_pvt->stage_target_done_callback(a_client_pvt->client, NULL); + break; + case STAGE_STATUS_DONE: { + log_it(L_INFO, "Stage status %s is done", + dap_client_stage_str(a_client_pvt->stage)); + // go to next stage + if(a_client_pvt->stage_status_done_callback) { + a_client_pvt->stage_status_done_callback(a_client_pvt->client, NULL); // Expecting that its one-shot callback - a_client_pvt->stage_target_done_callback = NULL; + //a_client_internal->stage_status_done_callback = NULL; + } else + log_it(L_WARNING, "Stage done callback is not present"); + + bool l_is_last_stage = (a_client_pvt->stage == a_client_pvt->stage_target); + if(l_is_last_stage) { + //l_is_unref = true; + if(a_client_pvt->stage_target_done_callback) { + log_it(L_NOTICE, "Stage %s is achieved", + dap_client_stage_str(a_client_pvt->stage)); + a_client_pvt->stage_target_done_callback(a_client_pvt->client, NULL); + // Expecting that its one-shot callback + a_client_pvt->stage_target_done_callback = NULL; + } + } else if (a_client_pvt->stage != STAGE_STREAM_CTL) { // need wait for dap_client_pvt_request_enc response + log_it(L_ERROR, "!! dap_CLIENT_STAGE_STATUS_DONE but not l_is_last_stage (cur stage=%d, target=%d)!!",a_client_pvt->stage, a_client_pvt->stage_target); } - } else if (a_client_pvt->stage != STAGE_STREAM_CTL) { // need wait for dap_client_pvt_request_enc response - log_it(L_ERROR, "!! dap_CLIENT_STAGE_STATUS_DONE but not l_is_last_stage (cur stage=%d, target=%d)!!",a_client_pvt->stage, a_client_pvt->stage_target); + //l_is_unref = true; } - //l_is_unref = true; - } - break; - default: - log_it(L_ERROR, "Undefined proccessing actions for stage status %s", - dap_client_stage_status_str(a_client_pvt->stage_status)); + break; + default: + log_it(L_ERROR, "Undefined proccessing actions for stage status %s", + dap_client_stage_status_str(a_client_pvt->stage_status)); } if(a_client_pvt->stage_status_callback) a_client_pvt->stage_status_callback(a_client_pvt->client, NULL); - //if(l_is_unref) { - // unref pvt - //dap_client_pvt_unref(a_client_pvt); - //} + } /** @@ -702,13 +539,13 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) void dap_client_pvt_stage_transaction_begin(dap_client_pvt_t * a_client_internal, dap_client_stage_t a_stage_next, dap_client_callback_t a_done_callback) { - // ref pvt client - //dap_client_pvt_ref(a_client_internal); + pthread_mutex_lock( &a_client_internal->stage_mutex); a_client_internal->stage_status_done_callback = a_done_callback; a_client_internal->stage = a_stage_next; a_client_internal->stage_status = STAGE_STATUS_IN_PROGRESS; s_stage_status_after(a_client_internal); + pthread_mutex_unlock( &a_client_internal->stage_mutex); } /** @@ -1210,6 +1047,8 @@ void m_stream_error(dap_client_t * a_client, int a_error) log_it(L_ERROR, "m_stream_error: l_client_pvt is NULL!"); return; } + pthread_mutex_lock(&l_client_pvt->stage_mutex); + if (a_error == ETIMEDOUT) { l_client_pvt->last_error = ERROR_NETWORK_CONNECTION_TIMEOUT; } else { @@ -1218,6 +1057,8 @@ void m_stream_error(dap_client_t * a_client, int a_error) l_client_pvt->stage_status = STAGE_STATUS_ERROR; s_stage_status_after(l_client_pvt); + pthread_mutex_unlock(&l_client_pvt->stage_mutex); + } /** @@ -1249,6 +1090,7 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg) log_it(L_DEBUG, "client_pvt=0x%x", l_client_pvt); + pthread_mutex_lock(&l_client_pvt->stage_mutex); if (l_client_pvt->stage_status_error_callback) { if(l_client_pvt == l_client_pvt->client->_internal) l_client_pvt->stage_status_error_callback(l_client_pvt->client, (void *) true); @@ -1259,7 +1101,7 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg) dap_stream_delete(l_client_pvt->stream); l_client_pvt->stream = NULL; l_client_pvt->stream_es = NULL; - + pthread_mutex_unlock(&l_client_pvt->stage_mutex); /* disable reconnect from here if(l_client_pvt->is_reconnect) { log_it(L_DEBUG, "l_client_pvt->is_reconnect = true"); @@ -1285,36 +1127,39 @@ void m_es_stream_read(dap_events_socket_t * a_es, void * arg) return; } switch (l_client_pvt->stage) { - case STAGE_STREAM_SESSION: - dap_client_go_stage(l_client_pvt->client, STAGE_STREAM_STREAMING, m_stage_stream_streaming); - break; - case STAGE_STREAM_CONNECTED: { // Collect HTTP headers before streaming - if(a_es->buf_in_size > 1) { - char * l_pos_endl; - l_pos_endl = (char*) memchr(a_es->buf_in, '\r', a_es->buf_in_size - 1); - if(l_pos_endl) { - if(*(l_pos_endl + 1) == '\n') { - dap_events_socket_shrink_buf_in(a_es, l_pos_endl - a_es->buf_in_str); - log_it(L_DEBUG, "Header passed, go to streaming (%lu bytes already are in input buffer", - a_es->buf_in_size); - l_client_pvt->stage = STAGE_STREAM_STREAMING; - l_client_pvt->stage_status = STAGE_STATUS_DONE; - s_stage_status_after(l_client_pvt); - - dap_stream_data_proc_read(l_client_pvt->stream); - dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size); + case STAGE_STREAM_SESSION: + dap_client_go_stage(l_client_pvt->client, STAGE_STREAM_STREAMING, m_stage_stream_streaming); + break; + case STAGE_STREAM_CONNECTED: { // Collect HTTP headers before streaming + if(a_es->buf_in_size > 1) { + char * l_pos_endl; + l_pos_endl = (char*) memchr(a_es->buf_in, '\r', a_es->buf_in_size - 1); + if(l_pos_endl) { + if(*(l_pos_endl + 1) == '\n') { + dap_events_socket_shrink_buf_in(a_es, l_pos_endl - a_es->buf_in_str); + log_it(L_DEBUG, "Header passed, go to streaming (%lu bytes already are in input buffer", + a_es->buf_in_size); + pthread_mutex_lock(&l_client_pvt->stage_mutex); + + l_client_pvt->stage = STAGE_STREAM_STREAMING; + l_client_pvt->stage_status = STAGE_STATUS_DONE; + s_stage_status_after(l_client_pvt); + pthread_mutex_unlock(&l_client_pvt->stage_mutex); + + dap_stream_data_proc_read(l_client_pvt->stream); + dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size); + } } } } - } - break; - case STAGE_STREAM_STREAMING: { // if streaming - process data with stream processor - dap_stream_data_proc_read(l_client_pvt->stream); - dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size); - } - break; - default: { - } + break; + case STAGE_STREAM_STREAMING: { // if streaming - process data with stream processor + dap_stream_data_proc_read(l_client_pvt->stream); + dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size); + } + break; + default: { + } } } diff --git a/dap-sdk/net/client/include/dap_client_pvt.h b/dap-sdk/net/client/include/dap_client_pvt.h index c3c97473ebd5328b4a586cd05929e844383db086..3785c90df66ec9c05b1fb944a61929f0c3aa4f04 100644 --- a/dap-sdk/net/client/include/dap_client_pvt.h +++ b/dap-sdk/net/client/include/dap_client_pvt.h @@ -62,15 +62,13 @@ typedef struct dap_client_internal uint32_t uplink_protocol_version; uint32_t remote_protocol_version; + pthread_mutex_t stage_mutex; // Protect all the stage_ fields below dap_client_stage_t stage_target; dap_client_callback_t stage_target_done_callback; - dap_client_stage_t stage; dap_client_stage_status_t stage_status; dap_client_error_t last_error; - dap_client_callback_t stage_status_callback; - dap_client_callback_t stage_status_done_callback; dap_client_callback_t stage_status_error_callback; @@ -82,6 +80,10 @@ typedef struct dap_client_internal bool is_close_session;// the last request in session, in the header will be added "SessionCloseAfterRequest: true" dap_client_callback_data_size_t request_response_callback; dap_client_callback_int_t request_error_callback; + + // Conds + pthread_cond_t disconnected_cond; + pthread_mutex_t disconnected_mutex; } dap_client_pvt_t; #define DAP_CLIENT_PVT(a) (a ? (dap_client_pvt_t*) a->_internal : NULL) diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 0676e1b9fbe8bb2d1812108b32a0709c5383a2c5..2ba195f72bb6f1514becb3d94ccba50af33bc947 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -48,6 +48,7 @@ #include "dap_worker.h" #include "dap_events.h" +#include "dap_timerfd.h" #include "dap_events_socket.h" #define LOG_TAG "dap_events_socket" @@ -79,7 +80,6 @@ int dap_events_socket_init( ) */ void dap_events_socket_deinit( ) { - } diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 18e9ed2b3cf735e42145a0f9979ebc233559761a..44e6632d775abf5e89a374ef40887aa546987478 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -23,33 +23,13 @@ along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. */ -#include <stdlib.h> -#include <stdint.h> -#include <stddef.h> #include <stdio.h> - - #include <stdlib.h> #include <stddef.h> #include <stdint.h> #include <string.h> #include <errno.h> -#ifdef DAP_OS_UNIX -#include <sys/types.h> -#include <sys/socket.h> -#include <arpa/inet.h> -#include <netdb.h> -#endif - -#ifdef WIN32 -#include <winsock2.h> -#include <windows.h> -#include <mswsock.h> -#include <ws2tcpip.h> -#include <io.h> -#endif - #include <pthread.h> #include "uthash.h" @@ -64,6 +44,9 @@ #include "dap_hash.h" #include "dap_cert.h" #include "dap_cert_file.h" + +#include "dap_timerfd.h" + #include "dap_enc_http.h" #include "dap_chain_common.h" #include "dap_chain_net.h" @@ -107,6 +90,7 @@ static size_t s_max_links_count = 5;// by default 5 // number of required connections static size_t s_required_links_count = 3;// by default 3 +static dap_timerfd_t * s_timer_check = NULL; /** * @struct dap_chain_net_pvt @@ -173,8 +157,8 @@ static const char * c_net_states[]={ static dap_chain_net_t * s_net_new(const char * a_id, const char * a_name , const char * a_node_role); inline static const char * s_net_state_to_str(dap_chain_net_state_t l_state); static int s_net_states_proc(dap_chain_net_t * l_net); -static void * s_net_proc_thread ( void * a_net); -static void s_net_proc_thread_start( dap_chain_net_t * a_net ); +static void s_timer_check_callback ( void * a_net); +static void s_timer_check_start( dap_chain_net_t * a_net ); static void s_net_proc_kill( dap_chain_net_t * a_net ); int s_net_load(const char * a_net_name, uint16_t a_acl_idx); @@ -691,97 +675,48 @@ static int s_net_states_proc(dap_chain_net_t * l_net) * @param a_cfg Network1 configuration * @return */ -static void *s_net_proc_thread ( void *a_net ) +static void s_timer_check_callback ( void *a_net ) { dap_chain_net_t *l_net = (dap_chain_net_t *)a_net; dap_chain_net_pvt_t *p_net = (dap_chain_net_pvt_t *)(void *)l_net->pvt; - const uint64_t l_timeout_ms = 60000;// 60 sec - // set callback to update data //dap_chain_global_db_set_callback_for_update_base(s_net_proc_thread_callback_update_db); - while( !(p_net->flags & F_DAP_CHAIN_NET_SHUTDOWN) ) { + if ( !(p_net->flags & F_DAP_CHAIN_NET_SHUTDOWN) ) { + dap_events_socket_remove_and_delete_unsafe( s_timer_check->events_socket,false); + } - // check or start sync + // check or start sync + s_net_states_proc( l_net ); + if (p_net->flags & F_DAP_CHAIN_NET_GO_SYNC) { + s_net_states_proc( l_net ); + } + struct timespec l_to; + + // checking whether new sync is needed + time_t l_sync_timeout = 180; // 1800 sec = 30 min + clock_gettime(CLOCK_MONOTONIC, &l_to); + // start sync every l_sync_timeout sec + if(l_to.tv_sec >= p_net->last_sync + l_sync_timeout) { + p_net->flags |= F_DAP_CHAIN_NET_GO_SYNC; s_net_states_proc( l_net ); - if (p_net->flags & F_DAP_CHAIN_NET_GO_SYNC) { - continue; - } - struct timespec l_to; -#ifndef _WIN32 - int l_ret = 0; - // prepare for signal waiting - clock_gettime( CLOCK_MONOTONIC, &l_to ); - int64_t l_nsec_new = l_to.tv_nsec + l_timeout_ms * 1000000ll; - // if the new number of nanoseconds is more than a second - if(l_nsec_new > (long) 1e9) { - l_to.tv_sec += l_nsec_new / (long) 1e9; - l_to.tv_nsec = l_nsec_new % (long) 1e9; - } - else - l_to.tv_nsec = (long) l_nsec_new; - pthread_mutex_lock( &p_net->state_mutex_cond ); - // wait if flag not set then go to SYNC_GDB - while ((p_net->flags & F_DAP_CHAIN_NET_GO_SYNC) == 0 && l_ret == 0) { - // signal waiting - l_ret = pthread_cond_timedwait( &p_net->state_proc_cond, &p_net->state_mutex_cond, &l_to ); - } - pthread_mutex_unlock(&p_net->state_mutex_cond); -#else // WIN32 - - WaitForSingleObject( p_net->state_proc_cond, (uint32_t)l_timeout_ms ); - -#endif - // checking whether new sync is needed - time_t l_sync_timeout = 1800; // 1800 sec = 30 min - clock_gettime(CLOCK_MONOTONIC, &l_to); - // start sync every l_sync_timeout sec - if(l_to.tv_sec >= p_net->last_sync + l_sync_timeout) { - p_net->flags |= F_DAP_CHAIN_NET_GO_SYNC; - } } - - return NULL; } /** * @brief net_proc_start * @param a_cfg */ -static void s_net_proc_thread_start( dap_chain_net_t * a_net ) +static void s_timer_check_start( dap_chain_net_t * a_net ) { - if ( pthread_create(& PVT(a_net)->proc_tid ,NULL, s_net_proc_thread, a_net) == 0 ){ - log_it (L_NOTICE,"Network processing thread started"); + if ( ! s_timer_check ){ + dap_timerfd_delete(s_timer_check); + s_timer_check = NULL; } + s_timer_check=dap_timerfd_start(60000, s_timer_check_callback, a_net ); } -/** - * @brief s_net_proc_kill - * @param a_net - */ -static void s_net_proc_kill( dap_chain_net_t * a_net ) -{ - if ( !PVT(a_net)->proc_tid ) - return; - - log_it( L_NOTICE,"Sent KILL signal to the net process thread %d, waiting for shutdown...", PVT(a_net)->proc_tid ); - - PVT(a_net)->flags |= F_DAP_CHAIN_NET_SHUTDOWN; - -#ifndef _WIN32 - pthread_cond_signal( &PVT(a_net)->state_proc_cond ); -#else - SetEvent( PVT(a_net)->state_proc_cond ); -#endif - - pthread_join( PVT(a_net)->proc_tid , NULL ); - log_it( L_NOTICE,"Net process thread %d shutted down", PVT(a_net)->proc_tid ); - - PVT(a_net)->proc_tid = 0; - - return; -} dap_chain_node_role_t dap_chain_net_get_role(dap_chain_net_t * a_net) { @@ -1788,7 +1723,7 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) PVT(l_net)->flags |= F_DAP_CHAIN_NET_GO_SYNC; // Start the proc thread - s_net_proc_thread_start(l_net); + s_timer_check_start(l_net); log_it(L_NOTICE, "Сhain network \"%s\" initialized",l_net_item->name); dap_config_close(l_cfg); }