diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index cbf24ae71f2b8505cdc3a8323b9dac1fa25d2965..22b9fd3c40b3238c97af3c112b6f0a69ba18eafe 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -133,6 +133,7 @@ void dap_events_socket_assign_on_worker_mt(dap_events_socket_t * a_es, struct da void dap_events_socket_reassign_between_workers_unsafe(dap_events_socket_t * a_es, dap_worker_t * a_worker_new) { + log_it(L_DEBUG, "reassign between workers"); dap_events_socket_remove_from_worker_unsafe( a_es, a_es->worker ); a_es->was_reassigned = true; if (a_es->callbacks.worker_unassign_callback) @@ -885,6 +886,7 @@ size_t dap_events_socket_write_unsafe(dap_events_socket_t *sc, const void * data data_size = ((sc->buf_out_size+data_size)<(sizeof(sc->buf_out)))?data_size:(sizeof(sc->buf_out)-sc->buf_out_size ); memcpy(sc->buf_out+sc->buf_out_size,data,data_size); sc->buf_out_size+=data_size; + dap_events_socket_set_writable_unsafe(sc, true); return data_size; } @@ -908,6 +910,7 @@ size_t dap_events_socket_write_f_unsafe(dap_events_socket_t *sc, const char * fo }else{ log_it(L_ERROR,"Can't write out formatted data '%s'",format); } + dap_events_socket_set_writable_unsafe(sc, true); return (ret > 0) ? ret : 0; } diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index 68d1b20191ecb17d00da765e9a7c8389b934ad82..26a3c93abb023dd3e5e22720720fec948820b9c4 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -266,7 +266,7 @@ void *dap_worker_thread(void *arg) if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) { - static const uint32_t buf_out_zero_count_max = 2; + static const uint32_t buf_out_zero_count_max = 5; l_cur->buf_out[l_cur->buf_out_size] = 0; if(!l_cur->buf_out_size) { diff --git a/dap-sdk/net/server/http_server/http_client/dap_http_client.c b/dap-sdk/net/server/http_server/http_client/dap_http_client.c index b5add239f73689fc9a34998cefcf9ff62d7abda7..ee6957b50d00231e500a3b00c6875e0ca4640b4d 100644 --- a/dap-sdk/net/server/http_server/http_client/dap_http_client.c +++ b/dap-sdk/net/server/http_server/http_client/dap_http_client.c @@ -449,7 +449,7 @@ void dap_http_client_read( dap_events_socket_t *a_esocket, void *arg ) // If no headers callback we go to the DATA processing if( l_http_client->in_content_length ) { - log_it( L_DEBUG, "headers -> DAP_HTTP_CLIENT_STATE_DATA" ); + //log_it( L_DEBUG, "headers -> DAP_HTTP_CLIENT_STATE_DATA" ); l_http_client->state_read = DAP_HTTP_CLIENT_STATE_DATA; } } @@ -457,6 +457,7 @@ void dap_http_client_read( dap_events_socket_t *a_esocket, void *arg ) } break; case DAP_HTTP_CLIENT_STATE_DATA:{ size_t read_bytes = 0; + //log_it(L_DEBUG, "dap_http_client_read: DAP_HTTP_CLIENT_STATE_DATA"); if ( l_http_client->proc->data_read_callback ) { l_http_client->proc->data_read_callback( l_http_client, &read_bytes ); dap_events_socket_shrink_buf_in( a_esocket, read_bytes ); @@ -506,7 +507,8 @@ void dap_http_client_write( dap_events_socket_t * cl, void *arg ) case DAP_HTTP_CLIENT_STATE_HEADERS: { dap_http_header_t *hdr = l_http_client->out_headers; if ( hdr == NULL ) { - log_it(L_DEBUG, "Output: headers are over (reply status code %u)",l_http_client->reply_status_code); + log_it(L_DEBUG, "Output: headers are over (reply status code %u content_lentgh %u)", + l_http_client->reply_status_code); dap_events_socket_write_f_unsafe(cl, "\r\n"); if ( l_http_client->out_content_length || l_http_client->out_content_ready ) { l_http_client->state_write=DAP_HTTP_CLIENT_STATE_DATA; @@ -523,17 +525,17 @@ void dap_http_client_write( dap_events_socket_t * cl, void *arg ) dap_http_header_remove( &l_http_client->out_headers, hdr ); } } break; - case DAP_HTTP_CLIENT_STATE_DATA: - { - if ( l_http_client->proc ){ - if ( l_http_client->proc->data_write_callback ){ - l_http_client->proc->data_write_callback( l_http_client, NULL ); + case DAP_HTTP_CLIENT_STATE_DATA: + { + if ( l_http_client->proc ){ + if ( l_http_client->proc->data_write_callback ){ + l_http_client->proc->data_write_callback( l_http_client, NULL ); + } + }else{ + log_it(L_WARNING, "No http proc, nothing to write"); + } } - }else{ - log_it(L_WARNING, "No http proc, nothing to write"); - } - } - break; + break; } } diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index f3c6de93ca1feec324532329f02dec94895ddb4b..d0ca63ba4fb8acaddd9397a99661162c7d1b741e 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -382,7 +382,8 @@ void s_http_client_headers_read(dap_http_client_t * a_http_client, void * a_arg) ss->service_key = strdup(header->value); size_t count_channels = strlen(ss->active_channels); for(size_t i = 0; i < count_channels; i++) { - dap_stream_ch_new(sid, ss->active_channels[i]); + dap_stream_ch_t * l_ch = dap_stream_ch_new(sid, ss->active_channels[i]); + l_ch->ready_to_read = true; //sid->channel[i]->ready_to_write = true; } @@ -391,7 +392,7 @@ void s_http_client_headers_read(dap_http_client_t * a_http_client, void * a_arg) stream_states_update(sid); a_http_client->state_read=DAP_HTTP_CLIENT_STATE_DATA; a_http_client->state_write=DAP_HTTP_CLIENT_STATE_START; - //dap_events_socket_set_readable_unsafe(a_http_client->esocket,true); + dap_events_socket_set_readable_unsafe(a_http_client->esocket,true); dap_events_socket_set_writable_unsafe(a_http_client->esocket,true); // Dirty hack, because previous function shouldn't // // set write flag off but it does! }else{ diff --git a/dap-sdk/net/stream/stream/dap_stream_pkt.c b/dap-sdk/net/stream/stream/dap_stream_pkt.c index 1f1357e25aa5f3648608b55fc058be88fee0d6c9..68e51097de8de9f3642f74ba4cc069ecfaa2b617 100644 --- a/dap-sdk/net/stream/stream/dap_stream_pkt.c +++ b/dap-sdk/net/stream/stream/dap_stream_pkt.c @@ -149,8 +149,6 @@ size_t dap_stream_pkt_write_unsafe(dap_stream_t * a_stream, const void * a_data, ret+=dap_events_socket_write_unsafe(a_stream->esocket,&pkt_hdr,sizeof(pkt_hdr)); ret+=dap_events_socket_write_unsafe(a_stream->esocket,a_stream->buf,pkt_hdr.size); - dap_events_socket_set_writable_unsafe(a_stream->esocket, true); - return ret; } @@ -193,7 +191,6 @@ void dap_stream_send_keepalive(dap_stream_t * a_stream) l_pkt.id = TECHICAL_CHANNEL_ID; l_pkt.type=STREAM_CH_PKT_TYPE_KEEPALIVE; - if( dap_stream_pkt_write_unsafe( a_stream, &l_pkt, sizeof(l_pkt) ) ) - dap_stream_set_ready_to_write( a_stream, true ); + dap_stream_pkt_write_unsafe( a_stream, &l_pkt, sizeof(l_pkt) ); } diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index ae7385b0226bbb7d624aa437d933989ea4c29ce7..9aa788be20a73a1becd07766ae8abb8a961a42f0 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -1145,7 +1145,7 @@ void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_net_srv pkt_out->header.op_data.data_size + sizeof(pkt_out->header))) { dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); } - + s_tun_send_msg_ip_assigned_all(l_ch_vpn, l_ch_vpn->addr_ipv4); } else { // All the network is filled with clients, can't lease a new address log_it(L_WARNING, "All the network is filled with clients, can't lease a new address"); ch_vpn_pkt_t *pkt_out = (ch_vpn_pkt_t*) calloc(1, sizeof(pkt_out->header)); @@ -1289,11 +1289,10 @@ void m_es_tun_read(dap_events_socket_t * a_es, void * arg) const static int tun_MTU = 100000; /// TODO Replace with detection of MTU size uint8_t l_tmp_buf[tun_MTU]; - if (! a_es->_inheritor) // There is moment between inheritor initialization and active live of event socket in worker. - return; - ch_sf_tun_socket_t * l_tun_socket = CH_SF_TUN_SOCKET(a_es); + assert(l_tun_socket); size_t l_buf_in_size = a_es->buf_in_size; + //log_it(L_DEBUG,"m_es_tun_read() received ip pacet %u size", l_buf_in_size); if(l_buf_in_size) { struct iphdr *iph = (struct iphdr*) a_es->buf_in; @@ -1312,14 +1311,18 @@ void m_es_tun_read(dap_events_socket_t * a_es, void * arg) } // We found in local table, sending data (if possible) if (l_vpn_info){ - if ( !l_vpn_info->is_on_this_worker && !l_vpn_info->is_reassigned_once ){ + /*if ( !l_vpn_info->is_on_this_worker && !l_vpn_info->is_reassigned_once ){ + log_it(L_NOTICE, "Reassigning from worker %u to %u", l_vpn_info->worker->id, a_es->worker->id); dap_events_socket_reassign_between_workers_mt( l_vpn_info->worker,l_vpn_info->esocket,a_es->worker); l_vpn_info->is_reassigned_once = true; s_tun_send_msg_esocket_reasigned_all_mt(l_vpn_info->ch_vpn, l_vpn_info->esocket, l_vpn_info->addr_ipv4,a_es->worker->id); - } + }*/ s_tun_client_send_data(l_vpn_info, a_es->buf_in, l_buf_in_size); - } + }//else{ + // log_it(L_DEBUG, "Can't find route for desitnation %s",str_daddr); + //} } + a_es->buf_in_size=0; // NULL it out because read it all }