Skip to content
Snippets Groups Projects
Commit ca5ac6ff authored by dmitriy.gerasimov's avatar dmitriy.gerasimov
Browse files

Merge branch 'feature-2618' into 'master'

added the attempt connect counter and it used into STAGE_STATUS_ERROR

See merge request !5
parents b520a4eb fb55f0dd
No related branches found
No related tags found
1 merge request!5added the attempt connect counter and it used into STAGE_STATUS_ERROR
......@@ -71,6 +71,8 @@ typedef struct dap_client_internal
dap_client_callback_t stage_status_done_callback;
dap_client_callback_t stage_status_error_callback;
int connect_attempt;
bool is_encrypted;
bool is_reconnect;
bool is_close_session;// the last request in session, in the header will be added "SessionCloseAfterRequest: true"
......
......@@ -8,19 +8,19 @@
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <stdio.h>
......@@ -85,7 +85,7 @@ void m_enc_init_error(dap_client_t *, int);
// STREAM_CTL stage callbacks
void m_stream_ctl_response(dap_client_t *, void *, size_t);
void m_stream_ctl_error(dap_client_t *, int);
void m_stage_stream_streaming (dap_client_t * a_client, void* arg);
void m_stage_stream_streaming(dap_client_t * a_client, void* arg);
// STREAM stage callbacks
void m_stream_response(dap_client_t *, void *, size_t);
......@@ -140,16 +140,16 @@ void dap_client_pvt_delete(dap_client_pvt_t * a_client_pvt)
if(a_client_pvt->session_key_id)
DAP_DELETE(a_client_pvt->session_key_id);
if ( a_client_pvt->active_channels )
DAP_DELETE(a_client_pvt->active_channels );
if(a_client_pvt->active_channels)
DAP_DELETE(a_client_pvt->active_channels);
if (a_client_pvt->session_key)
if(a_client_pvt->session_key)
dap_enc_key_delete(a_client_pvt->session_key);
if (a_client_pvt->session_key_open)
if(a_client_pvt->session_key_open)
dap_enc_key_delete(a_client_pvt->session_key_open);
if (a_client_pvt->stream_key)
if(a_client_pvt->stream_key)
dap_enc_key_delete(a_client_pvt->stream_key);
DAP_DELETE(a_client_pvt);
......@@ -213,24 +213,24 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
char *l_suburl;
// connect to vpn server
const char l_active_vpn_channels[] = { VPN_CLIENT_ID, 0 };
if(!dap_strcmp(a_client_pvt->active_channels,l_active_vpn_channels))
if(!dap_strcmp(a_client_pvt->active_channels, l_active_vpn_channels))
l_suburl = dap_strdup_printf("socket_forward");
// connect for node sync
else
l_suburl = dap_strdup_printf("stream_ctl,channels=%s", a_client_pvt->active_channels );
l_suburl = dap_strdup_printf("stream_ctl,channels=%s", a_client_pvt->active_channels);
//
dap_client_pvt_request_enc(a_client_pvt,
DAP_UPLINK_PATH_STREAM_CTL,
l_suburl, "type=tcp,maxconn=4" , l_request, l_request_size,
l_suburl, "type=tcp,maxconn=4", l_request, l_request_size,
m_stream_ctl_response, m_stream_ctl_error);
DAP_DELETE(l_request);
DAP_DELETE( l_suburl);
DAP_DELETE(l_suburl);
}
break;
case STAGE_STREAM_SESSION: {
log_it(L_INFO, "Go to stage STREAM_SESSION: process the state ops");
a_client_pvt->stream_socket = socket( PF_INET, SOCK_STREAM, 0 );
a_client_pvt->stream_socket = socket( PF_INET, SOCK_STREAM, 0);
#ifdef _WIN32
{
int buffsize = 65536;
......@@ -298,9 +298,9 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
break;
case STAGE_STREAM_CONNECTED: {
log_it(L_INFO, "Go to stage STAGE_STREAM_CONNECTED");
size_t count_channels = strlen( a_client_pvt->active_channels ) ;
size_t count_channels = strlen(a_client_pvt->active_channels);
for(size_t i = 0; i < count_channels; i++) {
dap_stream_ch_new(a_client_pvt->stream,(uint8_t) a_client_pvt->active_channels[i]);
dap_stream_ch_new(a_client_pvt->stream, (uint8_t) a_client_pvt->active_channels[i]);
//sid->channel[i]->ready_to_write = true;
}
......@@ -355,24 +355,41 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
// case STAGE_STATUS_ABORTING: {
// log_it(L_ERROR, "Aborting state");
// }
break;
break;
case STAGE_STATUS_ERROR: {
// limit the number of attempts
int MAX_ATTEMPTS = 3;
a_client_pvt->connect_attempt++;
bool l_is_last_attempt = a_client_pvt->connect_attempt > MAX_ATTEMPTS ? true : false;
log_it(L_ERROR, "Error state, doing callback if present");
if(a_client_pvt->stage_status_error_callback) {
a_client_pvt->stage_status_error_callback(a_client_pvt->client, NULL);
a_client_pvt->stage_status_error_callback(a_client_pvt->client, (void*)l_is_last_attempt);
// Expecting that its one-shot callback
//a_client_internal->stage_status_error_callback = NULL;
}
if(a_client_pvt->stage_target == STAGE_STREAM_ABORT){
if(a_client_pvt->stage_target == STAGE_STREAM_ABORT) {
a_client_pvt->stage = STAGE_STREAM_ABORT;
a_client_pvt->stage_status = STAGE_STATUS_ABORTING;
}
else{
a_client_pvt->stage = STAGE_ENC_INIT;
// Trying the step again
a_client_pvt->stage_status = STAGE_STATUS_IN_PROGRESS;
s_stage_status_after(a_client_pvt);
else {
if(!l_is_last_attempt) {
// small delay before next request
log_it(L_INFO, "Connection attempt № %d", a_client_pvt->connect_attempt);
#ifdef _WIN32
Sleep(300);// 0.3 sec
#else
usleep(300000);// 0.3 sec
#endif
a_client_pvt->stage = STAGE_ENC_INIT;
// Trying the step again
a_client_pvt->stage_status = STAGE_STATUS_IN_PROGRESS;
s_stage_status_after(a_client_pvt);
}
else{
log_it(L_INFO, "Too many connection attempts. Tries are over.");
//a_client_pvt->stage_status = STAGE_STATUS_DONE;
}
}
}
break;
......@@ -553,7 +570,7 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char
size_t a_custom_count = 1;
a_custom_new[0] = l_key_hdr_str;
// close session
if(a_client_internal->is_close_session){
if(a_client_internal->is_close_session) {
a_custom_new[1] = "SessionCloseAfterRequest: true";
a_custom_count++;
}
......@@ -717,7 +734,7 @@ void m_enc_init_error(dap_client_t * a_client, int a_err_code)
return;
}
//dap_client_internal_t * l_client_internal = dap_CLIENT_INTERNAL(a_client);
log_it(L_ERROR,"ENC: Can't init ecnryption session, err code %d",a_err_code);
log_it(L_ERROR, "ENC: Can't init ecnryption session, err code %d", a_err_code);
l_client_pvt->last_error = ERROR_NETWORK_CONNECTION_REFUSE;
l_client_pvt->stage_status = STAGE_STATUS_ERROR;
......@@ -789,8 +806,8 @@ void m_stream_ctl_response(dap_client_t * a_client, void * a_data, size_t a_data
strncpy(l_client_internal->stream_id, l_stream_id, sizeof(l_client_internal->stream_id) - 1);
l_client_internal->stream_key =
dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_OAES, l_stream_key, strlen(l_stream_key), NULL, 0, 32);
dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_OAES, l_stream_key, strlen(l_stream_key), NULL, 0,
32);
if(l_client_internal->stage == STAGE_STREAM_CTL) { // We are on the right stage
l_client_internal->stage_status = STAGE_STATUS_DONE;
......@@ -888,30 +905,30 @@ void m_stage_stream_streaming(dap_client_t * a_client, void* arg)
* @param a_es
* @param arg
*/
void m_es_stream_delete( dap_events_socket_t *a_es, void *arg )
void m_es_stream_delete(dap_events_socket_t *a_es, void *arg)
{
log_it(L_INFO, "====================================================== stream delete/peer reconnect");
dap_client_t *l_client = DAP_CLIENT(a_es);
if ( l_client == NULL ) {
if(l_client == NULL) {
log_it(L_ERROR, "dap_client is not initialized");
return;
}
pthread_mutex_lock(&l_client->mutex);
dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client);
if( l_client_pvt == NULL ) {
if(l_client_pvt == NULL) {
log_it(L_ERROR, "dap_client_pvt is not initialized");
pthread_mutex_unlock(&l_client->mutex);
return;
}
dap_stream_delete( l_client_pvt->stream );
dap_stream_delete(l_client_pvt->stream);
l_client_pvt->stream = NULL;
if( l_client_pvt->client )
dap_client_reset (l_client_pvt->client );
if(l_client_pvt->client)
dap_client_reset(l_client_pvt->client);
// l_client_pvt->client= NULL;
......@@ -925,10 +942,10 @@ void m_es_stream_delete( dap_events_socket_t *a_es, void *arg )
pthread_mutex_unlock(&l_client->mutex);
if ( l_client_pvt->is_reconnect ) {
if(l_client_pvt->is_reconnect) {
log_it(L_DEBUG, "l_client_pvt->is_reconnect = true");
dap_client_go_stage(l_client_pvt->client, STAGE_STREAM_STREAMING, m_stage_stream_streaming);
dap_client_go_stage(l_client_pvt->client, STAGE_STREAM_STREAMING, m_stage_stream_streaming);
}
else
log_it(L_DEBUG, "l_client_pvt->is_reconnect = false");
......@@ -948,35 +965,36 @@ void m_es_stream_read(dap_events_socket_t * a_es, void * arg)
return;
}
switch (l_client_pvt->stage) {
case STAGE_STREAM_SESSION:
dap_client_go_stage(l_client_pvt->client, STAGE_STREAM_STREAMING, m_stage_stream_streaming);
break;
case STAGE_STREAM_CONNECTED: { // Collect HTTP headers before streaming
if(a_es->buf_in_size > 1) {
char * l_pos_endl;
l_pos_endl = (char*) memchr(a_es->buf_in, '\r', a_es->buf_in_size - 1);
if(l_pos_endl) {
if(*(l_pos_endl + 1) == '\n') {
dap_events_socket_shrink_buf_in(a_es, l_pos_endl - a_es->buf_in_str);
log_it(L_DEBUG, "Header passed, go to streaming (%lu bytes already are in input buffer",
a_es->buf_in_size);
l_client_pvt->stage = STAGE_STREAM_STREAMING;
l_client_pvt->stage_status = STAGE_STATUS_DONE;
s_stage_status_after(l_client_pvt);
dap_stream_data_proc_read(l_client_pvt->stream);
dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size);
}
case STAGE_STREAM_SESSION:
dap_client_go_stage(l_client_pvt->client, STAGE_STREAM_STREAMING, m_stage_stream_streaming);
break;
case STAGE_STREAM_CONNECTED: { // Collect HTTP headers before streaming
if(a_es->buf_in_size > 1) {
char * l_pos_endl;
l_pos_endl = (char*) memchr(a_es->buf_in, '\r', a_es->buf_in_size - 1);
if(l_pos_endl) {
if(*(l_pos_endl + 1) == '\n') {
dap_events_socket_shrink_buf_in(a_es, l_pos_endl - a_es->buf_in_str);
log_it(L_DEBUG, "Header passed, go to streaming (%lu bytes already are in input buffer",
a_es->buf_in_size);
l_client_pvt->stage = STAGE_STREAM_STREAMING;
l_client_pvt->stage_status = STAGE_STATUS_DONE;
s_stage_status_after(l_client_pvt);
dap_stream_data_proc_read(l_client_pvt->stream);
dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size);
}
}
}
break;
case STAGE_STREAM_STREAMING: { // if streaming - process data with stream processor
dap_stream_data_proc_read(l_client_pvt->stream);
dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size);
}
}
break;
case STAGE_STREAM_STREAMING: { // if streaming - process data with stream processor
dap_stream_data_proc_read(l_client_pvt->stream);
dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size);
}
break;
default: {}
default: {
}
}
}
......@@ -994,25 +1012,26 @@ void m_es_stream_write(dap_events_socket_t * a_es, void * arg)
return;
}
switch (l_client_pvt->stage) {
case STAGE_STREAM_STREAMING: {
size_t i;
bool ready_to_write = false;
// log_it(DEBUG,"Process channels data output (%u channels)",STREAM(sh)->channel_count);
for(i = 0; i < l_client_pvt->stream->channel_count; i++) {
dap_stream_ch_t * ch = l_client_pvt->stream->channel[i];
if(ch->ready_to_write) {
ch->proc->packet_out_callback(ch, NULL);
ready_to_write |= ch->ready_to_write;
}
case STAGE_STREAM_STREAMING: {
size_t i;
bool ready_to_write = false;
// log_it(DEBUG,"Process channels data output (%u channels)",STREAM(sh)->channel_count);
for(i = 0; i < l_client_pvt->stream->channel_count; i++) {
dap_stream_ch_t * ch = l_client_pvt->stream->channel[i];
if(ch->ready_to_write) {
ch->proc->packet_out_callback(ch, NULL);
ready_to_write |= ch->ready_to_write;
}
//log_it(L_DEBUG,"stream_data_out (ready_to_write=%s)", ready_to_write?"true":"false");
dap_events_socket_set_writable(l_client_pvt->stream_es, ready_to_write);
//log_it(ERROR,"No stream_data_write_callback is defined");
}
//log_it(L_DEBUG,"stream_data_out (ready_to_write=%s)", ready_to_write?"true":"false");
dap_events_socket_set_writable(l_client_pvt->stream_es, ready_to_write);
//log_it(ERROR,"No stream_data_write_callback is defined");
}
break;
default: {}
default: {
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment