diff --git a/include/dap_client_pvt.h b/include/dap_client_pvt.h index 38a812107dfe0afee6bbb3bec15d1dbc70fc2b33..e8e6b2551cf39715464290735ebbd850d4712d97 100755 --- a/include/dap_client_pvt.h +++ b/include/dap_client_pvt.h @@ -71,6 +71,8 @@ typedef struct dap_client_internal dap_client_callback_t stage_status_done_callback; dap_client_callback_t stage_status_error_callback; + int connect_attempt; + bool is_encrypted; bool is_reconnect; bool is_close_session;// the last request in session, in the header will be added "SessionCloseAfterRequest: true" diff --git a/src/dap_client_pvt.c b/src/dap_client_pvt.c index 7d89723053bfe95c2fc166f62018eaa8e3054f3e..09a0a3c29984d37c531a68346f73ac8cd3fa7bcb 100644 --- a/src/dap_client_pvt.c +++ b/src/dap_client_pvt.c @@ -8,19 +8,19 @@ This file is part of DAP (Deus Applications Prototypes) the open source project - DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - DAP is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. -*/ + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. + */ #include <stdlib.h> #include <stdio.h> @@ -85,7 +85,7 @@ void m_enc_init_error(dap_client_t *, int); // STREAM_CTL stage callbacks void m_stream_ctl_response(dap_client_t *, void *, size_t); void m_stream_ctl_error(dap_client_t *, int); -void m_stage_stream_streaming (dap_client_t * a_client, void* arg); +void m_stage_stream_streaming(dap_client_t * a_client, void* arg); // STREAM stage callbacks void m_stream_response(dap_client_t *, void *, size_t); @@ -140,16 +140,16 @@ void dap_client_pvt_delete(dap_client_pvt_t * a_client_pvt) if(a_client_pvt->session_key_id) DAP_DELETE(a_client_pvt->session_key_id); - if ( a_client_pvt->active_channels ) - DAP_DELETE(a_client_pvt->active_channels ); + if(a_client_pvt->active_channels) + DAP_DELETE(a_client_pvt->active_channels); - if (a_client_pvt->session_key) + if(a_client_pvt->session_key) dap_enc_key_delete(a_client_pvt->session_key); - if (a_client_pvt->session_key_open) + if(a_client_pvt->session_key_open) dap_enc_key_delete(a_client_pvt->session_key_open); - if (a_client_pvt->stream_key) + if(a_client_pvt->stream_key) dap_enc_key_delete(a_client_pvt->stream_key); DAP_DELETE(a_client_pvt); @@ -213,24 +213,24 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) char *l_suburl; // connect to vpn server const char l_active_vpn_channels[] = { VPN_CLIENT_ID, 0 }; - if(!dap_strcmp(a_client_pvt->active_channels,l_active_vpn_channels)) + if(!dap_strcmp(a_client_pvt->active_channels, l_active_vpn_channels)) l_suburl = dap_strdup_printf("socket_forward"); // connect for node sync else - l_suburl = dap_strdup_printf("stream_ctl,channels=%s", a_client_pvt->active_channels ); + l_suburl = dap_strdup_printf("stream_ctl,channels=%s", a_client_pvt->active_channels); // dap_client_pvt_request_enc(a_client_pvt, DAP_UPLINK_PATH_STREAM_CTL, - l_suburl, "type=tcp,maxconn=4" , l_request, l_request_size, + l_suburl, "type=tcp,maxconn=4", l_request, l_request_size, m_stream_ctl_response, m_stream_ctl_error); DAP_DELETE(l_request); - DAP_DELETE( l_suburl); + DAP_DELETE(l_suburl); } break; case STAGE_STREAM_SESSION: { log_it(L_INFO, "Go to stage STREAM_SESSION: process the state ops"); - a_client_pvt->stream_socket = socket( PF_INET, SOCK_STREAM, 0 ); + a_client_pvt->stream_socket = socket( PF_INET, SOCK_STREAM, 0); #ifdef _WIN32 { int buffsize = 65536; @@ -298,9 +298,9 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) break; case STAGE_STREAM_CONNECTED: { log_it(L_INFO, "Go to stage STAGE_STREAM_CONNECTED"); - size_t count_channels = strlen( a_client_pvt->active_channels ) ; + size_t count_channels = strlen(a_client_pvt->active_channels); for(size_t i = 0; i < count_channels; i++) { - dap_stream_ch_new(a_client_pvt->stream,(uint8_t) a_client_pvt->active_channels[i]); + dap_stream_ch_new(a_client_pvt->stream, (uint8_t) a_client_pvt->active_channels[i]); //sid->channel[i]->ready_to_write = true; } @@ -355,24 +355,41 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) // case STAGE_STATUS_ABORTING: { // log_it(L_ERROR, "Aborting state"); // } - break; + break; case STAGE_STATUS_ERROR: { + // limit the number of attempts + int MAX_ATTEMPTS = 3; + a_client_pvt->connect_attempt++; + bool l_is_last_attempt = a_client_pvt->connect_attempt > MAX_ATTEMPTS ? true : false; + log_it(L_ERROR, "Error state, doing callback if present"); if(a_client_pvt->stage_status_error_callback) { - a_client_pvt->stage_status_error_callback(a_client_pvt->client, NULL); + a_client_pvt->stage_status_error_callback(a_client_pvt->client, (void*)l_is_last_attempt); // Expecting that its one-shot callback //a_client_internal->stage_status_error_callback = NULL; - } - if(a_client_pvt->stage_target == STAGE_STREAM_ABORT){ + if(a_client_pvt->stage_target == STAGE_STREAM_ABORT) { a_client_pvt->stage = STAGE_STREAM_ABORT; a_client_pvt->stage_status = STAGE_STATUS_ABORTING; } - else{ - a_client_pvt->stage = STAGE_ENC_INIT; - // Trying the step again - a_client_pvt->stage_status = STAGE_STATUS_IN_PROGRESS; - s_stage_status_after(a_client_pvt); + else { + if(!l_is_last_attempt) { + // small delay before next request + log_it(L_INFO, "Connection attempt â„– %d", a_client_pvt->connect_attempt); +#ifdef _WIN32 + Sleep(300);// 0.3 sec +#else + usleep(300000);// 0.3 sec +#endif + a_client_pvt->stage = STAGE_ENC_INIT; + // Trying the step again + a_client_pvt->stage_status = STAGE_STATUS_IN_PROGRESS; + s_stage_status_after(a_client_pvt); + } + else{ + log_it(L_INFO, "Too many connection attempts. Tries are over."); + //a_client_pvt->stage_status = STAGE_STATUS_DONE; + } } } break; @@ -553,7 +570,7 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char size_t a_custom_count = 1; a_custom_new[0] = l_key_hdr_str; // close session - if(a_client_internal->is_close_session){ + if(a_client_internal->is_close_session) { a_custom_new[1] = "SessionCloseAfterRequest: true"; a_custom_count++; } @@ -717,7 +734,7 @@ void m_enc_init_error(dap_client_t * a_client, int a_err_code) return; } //dap_client_internal_t * l_client_internal = dap_CLIENT_INTERNAL(a_client); - log_it(L_ERROR,"ENC: Can't init ecnryption session, err code %d",a_err_code); + log_it(L_ERROR, "ENC: Can't init ecnryption session, err code %d", a_err_code); l_client_pvt->last_error = ERROR_NETWORK_CONNECTION_REFUSE; l_client_pvt->stage_status = STAGE_STATUS_ERROR; @@ -789,8 +806,8 @@ void m_stream_ctl_response(dap_client_t * a_client, void * a_data, size_t a_data strncpy(l_client_internal->stream_id, l_stream_id, sizeof(l_client_internal->stream_id) - 1); l_client_internal->stream_key = - dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_OAES, l_stream_key, strlen(l_stream_key), NULL, 0, 32); - + dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_OAES, l_stream_key, strlen(l_stream_key), NULL, 0, + 32); if(l_client_internal->stage == STAGE_STREAM_CTL) { // We are on the right stage l_client_internal->stage_status = STAGE_STATUS_DONE; @@ -888,30 +905,30 @@ void m_stage_stream_streaming(dap_client_t * a_client, void* arg) * @param a_es * @param arg */ -void m_es_stream_delete( dap_events_socket_t *a_es, void *arg ) +void m_es_stream_delete(dap_events_socket_t *a_es, void *arg) { log_it(L_INFO, "====================================================== stream delete/peer reconnect"); dap_client_t *l_client = DAP_CLIENT(a_es); - if ( l_client == NULL ) { + if(l_client == NULL) { log_it(L_ERROR, "dap_client is not initialized"); return; } pthread_mutex_lock(&l_client->mutex); dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client); - if( l_client_pvt == NULL ) { + if(l_client_pvt == NULL) { log_it(L_ERROR, "dap_client_pvt is not initialized"); pthread_mutex_unlock(&l_client->mutex); return; } - dap_stream_delete( l_client_pvt->stream ); + dap_stream_delete(l_client_pvt->stream); l_client_pvt->stream = NULL; - if( l_client_pvt->client ) - dap_client_reset (l_client_pvt->client ); + if(l_client_pvt->client) + dap_client_reset(l_client_pvt->client); // l_client_pvt->client= NULL; @@ -925,10 +942,10 @@ void m_es_stream_delete( dap_events_socket_t *a_es, void *arg ) pthread_mutex_unlock(&l_client->mutex); - if ( l_client_pvt->is_reconnect ) { + if(l_client_pvt->is_reconnect) { log_it(L_DEBUG, "l_client_pvt->is_reconnect = true"); - dap_client_go_stage(l_client_pvt->client, STAGE_STREAM_STREAMING, m_stage_stream_streaming); + dap_client_go_stage(l_client_pvt->client, STAGE_STREAM_STREAMING, m_stage_stream_streaming); } else log_it(L_DEBUG, "l_client_pvt->is_reconnect = false"); @@ -948,35 +965,36 @@ void m_es_stream_read(dap_events_socket_t * a_es, void * arg) return; } switch (l_client_pvt->stage) { - case STAGE_STREAM_SESSION: - dap_client_go_stage(l_client_pvt->client, STAGE_STREAM_STREAMING, m_stage_stream_streaming); - break; - case STAGE_STREAM_CONNECTED: { // Collect HTTP headers before streaming - if(a_es->buf_in_size > 1) { - char * l_pos_endl; - l_pos_endl = (char*) memchr(a_es->buf_in, '\r', a_es->buf_in_size - 1); - if(l_pos_endl) { - if(*(l_pos_endl + 1) == '\n') { - dap_events_socket_shrink_buf_in(a_es, l_pos_endl - a_es->buf_in_str); - log_it(L_DEBUG, "Header passed, go to streaming (%lu bytes already are in input buffer", - a_es->buf_in_size); - l_client_pvt->stage = STAGE_STREAM_STREAMING; - l_client_pvt->stage_status = STAGE_STATUS_DONE; - s_stage_status_after(l_client_pvt); - - dap_stream_data_proc_read(l_client_pvt->stream); - dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size); - } + case STAGE_STREAM_SESSION: + dap_client_go_stage(l_client_pvt->client, STAGE_STREAM_STREAMING, m_stage_stream_streaming); + break; + case STAGE_STREAM_CONNECTED: { // Collect HTTP headers before streaming + if(a_es->buf_in_size > 1) { + char * l_pos_endl; + l_pos_endl = (char*) memchr(a_es->buf_in, '\r', a_es->buf_in_size - 1); + if(l_pos_endl) { + if(*(l_pos_endl + 1) == '\n') { + dap_events_socket_shrink_buf_in(a_es, l_pos_endl - a_es->buf_in_str); + log_it(L_DEBUG, "Header passed, go to streaming (%lu bytes already are in input buffer", + a_es->buf_in_size); + l_client_pvt->stage = STAGE_STREAM_STREAMING; + l_client_pvt->stage_status = STAGE_STATUS_DONE; + s_stage_status_after(l_client_pvt); + + dap_stream_data_proc_read(l_client_pvt->stream); + dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size); } } } - break; - case STAGE_STREAM_STREAMING: { // if streaming - process data with stream processor - dap_stream_data_proc_read(l_client_pvt->stream); - dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size); - } + } + break; + case STAGE_STREAM_STREAMING: { // if streaming - process data with stream processor + dap_stream_data_proc_read(l_client_pvt->stream); + dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size); + } break; - default: {} + default: { + } } } @@ -994,25 +1012,26 @@ void m_es_stream_write(dap_events_socket_t * a_es, void * arg) return; } switch (l_client_pvt->stage) { - case STAGE_STREAM_STREAMING: { - size_t i; - bool ready_to_write = false; - // log_it(DEBUG,"Process channels data output (%u channels)",STREAM(sh)->channel_count); - - for(i = 0; i < l_client_pvt->stream->channel_count; i++) { - dap_stream_ch_t * ch = l_client_pvt->stream->channel[i]; - if(ch->ready_to_write) { - ch->proc->packet_out_callback(ch, NULL); - ready_to_write |= ch->ready_to_write; - } + case STAGE_STREAM_STREAMING: { + size_t i; + bool ready_to_write = false; + // log_it(DEBUG,"Process channels data output (%u channels)",STREAM(sh)->channel_count); + + for(i = 0; i < l_client_pvt->stream->channel_count; i++) { + dap_stream_ch_t * ch = l_client_pvt->stream->channel[i]; + if(ch->ready_to_write) { + ch->proc->packet_out_callback(ch, NULL); + ready_to_write |= ch->ready_to_write; } - //log_it(L_DEBUG,"stream_data_out (ready_to_write=%s)", ready_to_write?"true":"false"); - - dap_events_socket_set_writable(l_client_pvt->stream_es, ready_to_write); - //log_it(ERROR,"No stream_data_write_callback is defined"); } + //log_it(L_DEBUG,"stream_data_out (ready_to_write=%s)", ready_to_write?"true":"false"); + + dap_events_socket_set_writable(l_client_pvt->stream_es, ready_to_write); + //log_it(ERROR,"No stream_data_write_callback is defined"); + } break; - default: {} + default: { + } } }