Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (1)
  • Dmitriy A. Gerasimov's avatar
    [+] More checks · b174c15b
    Dmitriy A. Gerasimov authored
    [+] New debug_more param for srv_vpn
    [+] More warnings about data loose
    [+] Stats collection for sent, receieved and lost
    b174c15b
...@@ -148,7 +148,6 @@ void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old, ...@@ -148,7 +148,6 @@ void dap_events_socket_reassign_between_workers_mt(dap_worker_t * a_worker_old,
l_msg->esocket = a_es; l_msg->esocket = a_es;
l_msg->worker_new = a_worker_new; l_msg->worker_new = a_worker_new;
dap_events_socket_queue_ptr_send(a_worker_old->queue_es_reassign, l_msg); dap_events_socket_queue_ptr_send(a_worker_old->queue_es_reassign, l_msg);
} }
/** /**
...@@ -254,7 +253,7 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc ...@@ -254,7 +253,7 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc
int l_pipe[2]; int l_pipe[2];
int l_errno; int l_errno;
char l_errbuf[128]; char l_errbuf[128];
if( pipe2(l_pipe,O_DIRECT) < 0 ){ if( pipe2(l_pipe,O_DIRECT | O_NONBLOCK ) < 0 ){
l_errno = errno; l_errno = errno;
strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
switch (l_errno) { switch (l_errno) {
...@@ -483,8 +482,12 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg) ...@@ -483,8 +482,12 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg)
int l_errno = errno; int l_errno = errno;
if (ret == sizeof(a_arg) ) if (ret == sizeof(a_arg) )
return 0; return 0;
else else{
char l_errbuf[128];
strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
log_it(L_ERROR, "Can't send ptr to queue:\"%s\" code %d", l_errbuf, l_errno);
return l_errno; return l_errno;
}
#elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX) #elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX)
struct timespec l_timeout; struct timespec l_timeout;
clock_gettime(CLOCK_REALTIME, &l_timeout); clock_gettime(CLOCK_REALTIME, &l_timeout);
......
...@@ -267,8 +267,8 @@ void *dap_worker_thread(void *arg) ...@@ -267,8 +267,8 @@ void *dap_worker_thread(void *arg)
} }
if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) { if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) {
static const uint32_t buf_out_zero_count_max = 5; static const uint32_t buf_out_zero_count_max = 2;
l_cur->buf_out[l_cur->buf_out_size] = 0; //l_cur->buf_out[l_cur->buf_out_size] = 0;
if(!l_cur->buf_out_size) { if(!l_cur->buf_out_size) {
...@@ -276,8 +276,8 @@ void *dap_worker_thread(void *arg) ...@@ -276,8 +276,8 @@ void *dap_worker_thread(void *arg)
l_cur->buf_out_zero_count++; l_cur->buf_out_zero_count++;
if(l_cur->buf_out_zero_count > buf_out_zero_count_max) { // How many time buf_out on write event could be empty if(l_cur->buf_out_zero_count > buf_out_zero_count_max) { // How many time buf_out on write event could be empty
//log_it(L_WARNING, "Output: nothing to send %u times, remove socket from the write set", log_it(L_WARNING, "Output: nothing to send %u times, remove socket from the write set",
// buf_out_zero_count_max); buf_out_zero_count_max);
dap_events_socket_set_writable_unsafe(l_cur, false); dap_events_socket_set_writable_unsafe(l_cur, false);
} }
} }
......
...@@ -132,6 +132,26 @@ size_t dap_stream_ch_pkt_write_mt(dap_stream_worker_t * a_worker , dap_stream_ch ...@@ -132,6 +132,26 @@ size_t dap_stream_ch_pkt_write_mt(dap_stream_worker_t * a_worker , dap_stream_ch
return a_data_size; return a_data_size;
} }
/**
* @brief dap_stream_ch_check_unsafe
* @param a_worker
* @param a_ch
* @return
*/
bool dap_stream_ch_check_unsafe(dap_stream_worker_t * a_worker,dap_stream_ch_t * a_ch)
{
if (a_ch){
if ( a_worker->channels){
dap_stream_ch_t * l_ch = NULL;
HASH_FIND(hh_worker,a_worker->channels ,&a_ch, sizeof(a_ch), l_ch );
return l_ch == a_ch;
}else
return false;
}else
return false;
}
/** /**
* @brief stream_ch_pkt_write * @brief stream_ch_pkt_write
......
...@@ -54,5 +54,7 @@ void dap_stream_ch_pkt_deinit(); ...@@ -54,5 +54,7 @@ void dap_stream_ch_pkt_deinit();
size_t dap_stream_ch_pkt_write_f_unsafe(struct dap_stream_ch * a_ch, uint8_t a_type, const char * a_str,...); size_t dap_stream_ch_pkt_write_f_unsafe(struct dap_stream_ch * a_ch, uint8_t a_type, const char * a_str,...);
size_t dap_stream_ch_pkt_write_unsafe(struct dap_stream_ch * a_ch, uint8_t a_type, const void * a_data, size_t a_data_size); size_t dap_stream_ch_pkt_write_unsafe(struct dap_stream_ch * a_ch, uint8_t a_type, const void * a_data, size_t a_data_size);
bool dap_stream_ch_check_unsafe(dap_stream_worker_t * a_worker,dap_stream_ch_t * a_ch);
size_t dap_stream_ch_pkt_write_f_mt(dap_stream_worker_t * a_worker , dap_stream_ch_t *a_ch, uint8_t a_type, const char * a_str,...); size_t dap_stream_ch_pkt_write_f_mt(dap_stream_worker_t * a_worker , dap_stream_ch_t *a_ch, uint8_t a_type, const char * a_str,...);
size_t dap_stream_ch_pkt_write_mt(dap_stream_worker_t * a_worker , dap_stream_ch_t *a_ch, uint8_t a_type, const void * a_data, size_t a_data_size); size_t dap_stream_ch_pkt_write_mt(dap_stream_worker_t * a_worker , dap_stream_ch_t *a_ch, uint8_t a_type, const void * a_data, size_t a_data_size);
...@@ -148,7 +148,7 @@ size_t dap_stream_pkt_write_unsafe(dap_stream_t * a_stream, const void * a_data, ...@@ -148,7 +148,7 @@ size_t dap_stream_pkt_write_unsafe(dap_stream_t * a_stream, const void * a_data,
memset(&pkt_hdr,0,sizeof(pkt_hdr)); memset(&pkt_hdr,0,sizeof(pkt_hdr));
memcpy(pkt_hdr.sig,c_dap_stream_sig,sizeof(pkt_hdr.sig)); memcpy(pkt_hdr.sig,c_dap_stream_sig,sizeof(pkt_hdr.sig));
pkt_hdr.size =(uint32_t) a_stream->session->key->enc_na(a_stream->session->key, a_data,a_data_size,l_buf_selected, l_buf_size_required); pkt_hdr.size =(uint32_t) dap_enc_code( a_stream->session->key, a_data,a_data_size,l_buf_selected, l_buf_size_required, DAP_ENC_DATA_TYPE_RAW);
ret+=dap_events_socket_write_unsafe(a_stream->esocket,&pkt_hdr,sizeof(pkt_hdr)); ret+=dap_events_socket_write_unsafe(a_stream->esocket,&pkt_hdr,sizeof(pkt_hdr));
ret+=dap_events_socket_write_unsafe(a_stream->esocket,l_buf_selected,pkt_hdr.size); ret+=dap_events_socket_write_unsafe(a_stream->esocket,l_buf_selected,pkt_hdr.size);
......
...@@ -61,15 +61,30 @@ typedef struct dap_chain_net_srv_usage{ ...@@ -61,15 +61,30 @@ typedef struct dap_chain_net_srv_usage{
typedef void (*dap_response_success_callback_t) (dap_stream_ch_chain_net_srv_pkt_success_t*, void*); typedef void (*dap_response_success_callback_t) (dap_stream_ch_chain_net_srv_pkt_success_t*, void*);
typedef struct dap_net_stats{
uintmax_t bytes_sent;
uintmax_t bytes_recv;
uintmax_t bytes_sent_lost;
uintmax_t bytes_recv_lost;
uintmax_t packets_sent;
uintmax_t packets_recv;
uintmax_t packets_sent_lost;
intmax_t packets_recv_lost;
} dap_net_stats_t;
typedef struct dap_chain_net_srv_stream_session { typedef struct dap_chain_net_srv_stream_session {
dap_stream_session_t * parent; dap_stream_session_t * parent;
dap_chain_net_srv_usage_t * usages; dap_chain_net_srv_usage_t * usages;
dap_chain_net_srv_usage_t * usage_active; dap_chain_net_srv_usage_t * usage_active;
uint128_t limits_bytes; // Bytes left uintmax_t limits_bytes; // Bytes left
time_t limits_ts; // Timestamp until its activte time_t limits_ts; // Timestamp until its activte
dap_chain_net_srv_price_unit_uid_t limits_units_type; dap_chain_net_srv_price_unit_uid_t limits_units_type;
// Some common stats
volatile dap_net_stats_t stats;
time_t ts_activated; time_t ts_activated;
dap_sign_t* user_sign; // User's signature for auth if reconnect dap_sign_t* user_sign; // User's signature for auth if reconnect
......
...@@ -130,6 +130,7 @@ struct tun_socket_msg{ ...@@ -130,6 +130,7 @@ struct tun_socket_msg{
ch_sf_tun_socket_t ** s_tun_sockets = NULL; ch_sf_tun_socket_t ** s_tun_sockets = NULL;
dap_events_socket_t ** s_tun_sockets_queue_msg = NULL; dap_events_socket_t ** s_tun_sockets_queue_msg = NULL;
uint32_t s_tun_sockets_count = 0; uint32_t s_tun_sockets_count = 0;
bool s_debug_more = false;
static usage_client_t * s_clients; static usage_client_t * s_clients;
static dap_chain_net_srv_ch_vpn_t * s_ch_vpn_addrs ; static dap_chain_net_srv_ch_vpn_t * s_ch_vpn_addrs ;
...@@ -182,21 +183,34 @@ static int s_tun_deattach_queue(int fd); ...@@ -182,21 +183,34 @@ static int s_tun_deattach_queue(int fd);
static int s_tun_attach_queue(int fd); static int s_tun_attach_queue(int fd);
static void s_tun_client_send_data(dap_chain_net_srv_ch_vpn_info_t * a_ch_vpn_info, const void * a_data, size_t a_data_size); static bool s_tun_client_send_data(dap_chain_net_srv_ch_vpn_info_t * a_ch_vpn_info, const void * a_data, size_t a_data_size);
static size_t s_stream_session_esocket_send(dap_chain_net_srv_stream_session_t * l_srv_session, dap_events_socket_t * l_es, const void * a_data, size_t a_data_size );
static bool s_tun_client_send_data_unsafe(dap_chain_net_srv_ch_vpn_t * l_ch_vpn, ch_vpn_pkt_t * l_pkt_out);
static void s_tun_client_send_data_unsafe(dap_chain_net_srv_ch_vpn_t * l_ch_vpn, ch_vpn_pkt_t * l_pkt_out) static bool s_tun_client_send_data_unsafe(dap_chain_net_srv_ch_vpn_t * l_ch_vpn, ch_vpn_pkt_t * l_pkt_out)
{ {
dap_chain_net_srv_stream_session_t * l_srv_session = DAP_CHAIN_NET_SRV_STREAM_SESSION (l_ch_vpn->ch->stream->session ); dap_chain_net_srv_stream_session_t * l_srv_session = DAP_CHAIN_NET_SRV_STREAM_SESSION (l_ch_vpn->ch->stream->session );
dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find_unsafe(l_srv_session, l_ch_vpn->usage_id); dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find_unsafe(l_srv_session, l_ch_vpn->usage_id);
dap_stream_ch_pkt_write_unsafe(l_ch_vpn->ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, l_pkt_out, size_t l_data_to_send = (l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header));
l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header)); size_t l_data_sent = dap_stream_ch_pkt_write_unsafe(l_ch_vpn->ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, l_pkt_out, l_data_to_send);
s_update_limits(l_ch_vpn->ch,l_srv_session,l_usage, l_pkt_out->header.op_data.data_size ); s_update_limits(l_ch_vpn->ch,l_srv_session,l_usage, l_data_sent );
if ( l_data_sent != l_data_to_send){
log_it(L_WARNING, "Wasn't sent all the data in tunnel: probably buffer overflow");
l_srv_session->stats.bytes_recv_lost += l_data_to_send - l_data_sent;
l_srv_session->stats.packets_recv_lost++;
return false;
}else{
l_srv_session->stats.bytes_recv += l_data_sent;
l_srv_session->stats.packets_recv++;
return true;
}
} }
static void s_tun_client_send_data(dap_chain_net_srv_ch_vpn_info_t * l_ch_vpn_info, const void * a_data, size_t a_data_size) static bool s_tun_client_send_data(dap_chain_net_srv_ch_vpn_info_t * l_ch_vpn_info, const void * a_data, size_t a_data_size)
{ {
assert(a_data_size > sizeof (struct iphdr));
ch_vpn_pkt_t *l_pkt_out = DAP_NEW_Z_SIZE(ch_vpn_pkt_t, sizeof(l_pkt_out->header) + a_data_size); ch_vpn_pkt_t *l_pkt_out = DAP_NEW_Z_SIZE(ch_vpn_pkt_t, sizeof(l_pkt_out->header) + a_data_size);
l_pkt_out->header.op_code = VPN_PACKET_OP_CODE_VPN_RECV; l_pkt_out->header.op_code = VPN_PACKET_OP_CODE_VPN_RECV;
l_pkt_out->header.sock_id = s_raw_server->tun_fd; l_pkt_out->header.sock_id = s_raw_server->tun_fd;
...@@ -204,8 +218,21 @@ static void s_tun_client_send_data(dap_chain_net_srv_ch_vpn_info_t * l_ch_vpn_in ...@@ -204,8 +218,21 @@ static void s_tun_client_send_data(dap_chain_net_srv_ch_vpn_info_t * l_ch_vpn_in
l_pkt_out->header.op_data.data_size = a_data_size; l_pkt_out->header.op_data.data_size = a_data_size;
memcpy(l_pkt_out->data, a_data, a_data_size); memcpy(l_pkt_out->data, a_data, a_data_size);
struct in_addr l_in_daddr;
l_in_daddr.s_addr = ((struct iphdr* ) l_pkt_out->data)->daddr;
if(l_ch_vpn_info->is_on_this_worker){ if(l_ch_vpn_info->is_on_this_worker){
s_tun_client_send_data_unsafe(l_ch_vpn_info->ch_vpn,l_pkt_out); if( dap_events_socket_check_unsafe(l_ch_vpn_info->worker, l_ch_vpn_info->esocket ) ){
if(s_debug_more){
char l_str_daddr[INET_ADDRSTRLEN];
inet_ntop(AF_INET,&l_in_daddr,l_str_daddr,sizeof (l_in_daddr));
log_it(L_INFO, "Sent packet for desitnation %s in own context",l_str_daddr);
}
s_tun_client_send_data_unsafe(l_ch_vpn_info->ch_vpn,l_pkt_out);
}else{
log_it(L_WARNING, "Was no esocket %p on worker #%u, lost %zd data",l_ch_vpn_info->esocket, l_ch_vpn_info->worker->id,a_data_size );
}
DAP_DELETE(l_pkt_out); DAP_DELETE(l_pkt_out);
}else{ }else{
struct tun_socket_msg * l_msg= DAP_NEW_Z(struct tun_socket_msg); struct tun_socket_msg * l_msg= DAP_NEW_Z(struct tun_socket_msg);
...@@ -213,8 +240,20 @@ static void s_tun_client_send_data(dap_chain_net_srv_ch_vpn_info_t * l_ch_vpn_in ...@@ -213,8 +240,20 @@ static void s_tun_client_send_data(dap_chain_net_srv_ch_vpn_info_t * l_ch_vpn_in
l_msg->ch_vpn = l_ch_vpn_info->ch_vpn; l_msg->ch_vpn = l_ch_vpn_info->ch_vpn;
l_msg->esocket = l_ch_vpn_info->esocket; l_msg->esocket = l_ch_vpn_info->esocket;
l_msg->ch_vpn_send.pkt = l_pkt_out; l_msg->ch_vpn_send.pkt = l_pkt_out;
dap_events_socket_queue_ptr_send(l_ch_vpn_info->queue_msg, l_msg); if (dap_events_socket_queue_ptr_send(l_ch_vpn_info->queue_msg, l_msg) != 0 ){
log_it(L_WARNING, "Lost %zd data send in tunnel send operation in alien context: queue is overfilled?",a_data_size );
DAP_DELETE(l_msg);
DAP_DELETE(l_pkt_out);
return false;
}
if(s_debug_more){
char l_str_daddr[INET_ADDRSTRLEN];
inet_ntop(AF_INET,&l_in_daddr,l_str_daddr,sizeof (l_in_daddr));
log_it(L_INFO, "Sent packet for desitnation %s between contexts",l_str_daddr);
}
} }
return true;
} }
/** /**
...@@ -296,7 +335,8 @@ static void s_tun_recv_msg_callback(dap_events_socket_t * a_esocket_queue, void ...@@ -296,7 +335,8 @@ static void s_tun_recv_msg_callback(dap_events_socket_t * a_esocket_queue, void
}break; }break;
case TUN_SOCKET_MSG_CH_VPN_SEND:{ case TUN_SOCKET_MSG_CH_VPN_SEND:{
s_tun_client_send_data_unsafe(l_msg->ch_vpn,l_msg->ch_vpn_send.pkt); if(dap_events_socket_check_unsafe(a_esocket_queue->worker, l_msg->esocket ) )
s_tun_client_send_data_unsafe(l_msg->ch_vpn,l_msg->ch_vpn_send.pkt);
DAP_DELETE(l_msg->ch_vpn_send.pkt); DAP_DELETE(l_msg->ch_vpn_send.pkt);
}break; }break;
default:log_it(L_ERROR,"Wrong tun socket message type %d", l_msg->type); default:log_it(L_ERROR,"Wrong tun socket message type %d", l_msg->type);
...@@ -353,7 +393,7 @@ static void s_tun_send_msg_ip_unassigned(uint32_t a_worker_id, dap_chain_net_srv ...@@ -353,7 +393,7 @@ static void s_tun_send_msg_ip_unassigned(uint32_t a_worker_id, dap_chain_net_srv
l_msg->esocket = a_ch_vpn->ch->stream->esocket; l_msg->esocket = a_ch_vpn->ch->stream->esocket;
l_msg->is_reassigned_once = a_ch_vpn->ch->stream->esocket->was_reassigned; l_msg->is_reassigned_once = a_ch_vpn->ch->stream->esocket->was_reassigned;
if (dap_events_socket_queue_ptr_send(s_tun_sockets_queue_msg[a_worker_id], l_msg) != 0){ if ( dap_events_socket_queue_ptr_send(s_tun_sockets_queue_msg[a_worker_id], l_msg) != 0 ) {
log_it(L_WARNING, "Cant send new ip unassign message to the tun msg queue #%u", a_worker_id); log_it(L_WARNING, "Cant send new ip unassign message to the tun msg queue #%u", a_worker_id);
} }
} }
...@@ -627,6 +667,10 @@ int s_vpn_service_create(dap_config_t * g_config){ ...@@ -627,6 +667,10 @@ int s_vpn_service_create(dap_config_t * g_config){
uint16_t l_pricelist_count = 0; uint16_t l_pricelist_count = 0;
// Read if we need to dump all pkt operations
s_debug_more= dap_config_get_item_bool_default(g_config,"srv_vpn", "debug_more",false);
//! IMPORTANT ! This fetch is single-action and cannot be further reused, since it modifies the stored config data //! IMPORTANT ! This fetch is single-action and cannot be further reused, since it modifies the stored config data
//! it also must NOT be freed within this module ! //! it also must NOT be freed within this module !
char **l_pricelist = dap_config_get_array_str(g_config, "srv_vpn", "pricelist", &l_pricelist_count); // must not be freed! char **l_pricelist = dap_config_get_array_str(g_config, "srv_vpn", "pricelist", &l_pricelist_count); // must not be freed!
...@@ -718,11 +762,10 @@ int dap_chain_net_srv_vpn_init(dap_config_t * g_config) { ...@@ -718,11 +762,10 @@ int dap_chain_net_srv_vpn_init(dap_config_t * g_config) {
return -1; return -1;
} }
log_it(L_INFO,"TUN driver configured successfuly"); log_it(L_INFO,"TUN driver configured successfuly");
s_vpn_service_create(g_config);
dap_stream_ch_proc_add(DAP_STREAM_CH_ID_NET_SRV_VPN, s_ch_vpn_new, s_ch_vpn_delete, s_ch_packet_in, dap_stream_ch_proc_add(DAP_STREAM_CH_ID_NET_SRV_VPN, s_ch_vpn_new, s_ch_vpn_delete, s_ch_packet_in,
s_ch_packet_out); s_ch_packet_out);
s_vpn_service_create(g_config);
return 0; return 0;
} }
...@@ -1056,6 +1099,7 @@ static void send_pong_pkt(dap_stream_ch_t* a_ch) ...@@ -1056,6 +1099,7 @@ static void send_pong_pkt(dap_stream_ch_t* a_ch)
void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_net_srv_usage_t * a_usage){ void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_net_srv_usage_t * a_usage){
dap_chain_net_srv_ch_vpn_t *l_ch_vpn = CH_VPN(a_ch); dap_chain_net_srv_ch_vpn_t *l_ch_vpn = CH_VPN(a_ch);
dap_chain_net_srv_vpn_t * l_srv_vpn =(dap_chain_net_srv_vpn_t *) a_usage->service->_inhertor; dap_chain_net_srv_vpn_t * l_srv_vpn =(dap_chain_net_srv_vpn_t *) a_usage->service->_inhertor;
dap_chain_net_srv_stream_session_t * l_srv_session= DAP_CHAIN_NET_SRV_STREAM_SESSION(l_ch_vpn->ch->stream->session);
if ( l_ch_vpn->addr_ipv4.s_addr ){ if ( l_ch_vpn->addr_ipv4.s_addr ){
log_it(L_WARNING,"We already have ip address leased to us"); log_it(L_WARNING,"We already have ip address leased to us");
...@@ -1064,9 +1108,19 @@ void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_net_srv ...@@ -1064,9 +1108,19 @@ void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_net_srv
pkt_out->header.op_problem.code = VPN_PROBLEM_CODE_ALREADY_ASSIGNED_ADDR; pkt_out->header.op_problem.code = VPN_PROBLEM_CODE_ALREADY_ASSIGNED_ADDR;
pkt_out->header.sock_id = s_raw_server->tun_fd; pkt_out->header.sock_id = s_raw_server->tun_fd;
pkt_out->header.usage_id = a_usage->id; pkt_out->header.usage_id = a_usage->id;
dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out,
pkt_out->header.op_data.data_size + sizeof(pkt_out->header)); size_t l_data_to_write = pkt_out->header.op_data.data_size + sizeof(pkt_out->header);
dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); size_t l_data_wrote = dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out,
l_data_to_write);
if (l_data_wrote < l_data_to_write){
log_it(L_WARNING, "Buffer overfilled: can't send packet with VPN_PROBLEM_CODE_ALREADY_ASSIGNED_ADDR: sent only %zd from %zd",
l_data_wrote,l_data_to_write );
l_srv_session->stats.bytes_sent_lost += l_data_to_write - l_data_wrote;
l_srv_session->stats.packets_sent_lost++;
}else{
l_srv_session->stats.packets_sent++;
l_srv_session->stats.bytes_sent+= l_data_wrote;
}
return; return;
} }
dap_chain_net_srv_vpn_item_ipv4_t * l_item_ipv4 = l_srv_vpn->ipv4_unleased; dap_chain_net_srv_vpn_item_ipv4_t * l_item_ipv4 = l_srv_vpn->ipv4_unleased;
...@@ -1089,15 +1143,27 @@ void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_net_srv ...@@ -1089,15 +1143,27 @@ void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_net_srv
memcpy(l_pkt_out->data + sizeof(l_ch_vpn->addr_ipv4), &s_raw_server->ipv4_gw , memcpy(l_pkt_out->data + sizeof(l_ch_vpn->addr_ipv4), &s_raw_server->ipv4_gw ,
sizeof(s_raw_server->ipv4_gw)); sizeof(s_raw_server->ipv4_gw));
dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA , l_pkt_out, size_t l_data_to_write = l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header);
l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header)); size_t l_data_wrote = dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA , l_pkt_out,
log_it(L_NOTICE, "VPN client address %s leased", inet_ntoa(l_ch_vpn->addr_ipv4)); l_data_to_write);
log_it(L_INFO, "\tnet gateway %s", inet_ntoa(s_raw_server->ipv4_network_addr)); if (l_data_wrote < l_data_to_write){
log_it(L_INFO, "\tnet mask %s", inet_ntoa(s_raw_server->ipv4_network_mask)); log_it(L_WARNING, "Buffer overfilled: can't send packet with VPN_PACKET_OP_CODE_VPN_ADDR_REPLY: sent only %zd from %zd",
log_it(L_INFO, "\tgw %s", inet_ntoa(s_raw_server->ipv4_gw)); l_data_wrote,l_data_to_write );
log_it(L_INFO, "\tlast_addr %s", inet_ntoa(s_raw_server->ipv4_lease_last)); dap_chain_net_srv_stream_session_t * l_srv_session= DAP_CHAIN_NET_SRV_STREAM_SESSION(l_ch_vpn->ch->stream->session);
l_srv_vpn->ipv4_unleased = l_item_ipv4->next; assert(l_srv_session);
DAP_DELETE(l_item_ipv4); l_srv_session->stats.bytes_sent_lost += l_data_to_write - l_data_wrote;
l_srv_session->stats.packets_sent_lost++;
}else{
log_it(L_NOTICE, "VPN client address %s leased", inet_ntoa(l_ch_vpn->addr_ipv4));
log_it(L_INFO, "\tnet gateway %s", inet_ntoa(s_raw_server->ipv4_network_addr));
log_it(L_INFO, "\tnet mask %s", inet_ntoa(s_raw_server->ipv4_network_mask));
log_it(L_INFO, "\tgw %s", inet_ntoa(s_raw_server->ipv4_gw));
log_it(L_INFO, "\tlast_addr %s", inet_ntoa(s_raw_server->ipv4_lease_last));
l_srv_vpn->ipv4_unleased = l_item_ipv4->next;
DAP_DELETE(l_item_ipv4);
l_srv_session->stats.packets_sent++;
l_srv_session->stats.bytes_sent+= l_data_wrote;
}
}else{ }else{
struct in_addr n_addr = { 0 }, n_addr_max; struct in_addr n_addr = { 0 }, n_addr_max;
n_addr.s_addr = ntohl(s_raw_server->ipv4_lease_last.s_addr); n_addr.s_addr = ntohl(s_raw_server->ipv4_lease_last.s_addr);
...@@ -1141,11 +1207,21 @@ void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_net_srv ...@@ -1141,11 +1207,21 @@ void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_net_srv
memcpy(pkt_out->data + sizeof(l_ch_vpn->addr_ipv4), &s_raw_server->ipv4_gw, memcpy(pkt_out->data + sizeof(l_ch_vpn->addr_ipv4), &s_raw_server->ipv4_gw,
sizeof(s_raw_server->ipv4_gw)); sizeof(s_raw_server->ipv4_gw));
if(dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out, size_t l_data_to_write = pkt_out->header.op_data.data_size + sizeof(pkt_out->header);
pkt_out->header.op_data.data_size + sizeof(pkt_out->header))) { size_t l_data_wrote = dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out,
dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); l_data_to_write);
if (l_data_wrote < l_data_to_write){
log_it(L_WARNING, "Buffer overfilled: can't send packet with VPN_PACKET_OP_CODE_VPN_ADDR_REPLY: sent only %zd from %zd",
l_data_wrote,l_data_to_write );
dap_chain_net_srv_stream_session_t * l_srv_session= DAP_CHAIN_NET_SRV_STREAM_SESSION(l_ch_vpn->ch->stream->session);
assert(l_srv_session);
l_srv_session->stats.bytes_sent_lost += l_data_to_write - l_data_wrote;
l_srv_session->stats.packets_sent_lost++;
}else{
l_srv_session->stats.packets_sent++;
l_srv_session->stats.bytes_sent+= l_data_wrote;
s_tun_send_msg_ip_assigned_all(l_ch_vpn, l_ch_vpn->addr_ipv4);
} }
s_tun_send_msg_ip_assigned_all(l_ch_vpn, l_ch_vpn->addr_ipv4);
} else { // All the network is filled with clients, can't lease a new address } else { // All the network is filled with clients, can't lease a new address
log_it(L_WARNING, "All the network is filled with clients, can't lease a new address"); log_it(L_WARNING, "All the network is filled with clients, can't lease a new address");
ch_vpn_pkt_t *pkt_out = (ch_vpn_pkt_t*) calloc(1, sizeof(pkt_out->header)); ch_vpn_pkt_t *pkt_out = (ch_vpn_pkt_t*) calloc(1, sizeof(pkt_out->header));
...@@ -1153,9 +1229,20 @@ void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_net_srv ...@@ -1153,9 +1229,20 @@ void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_net_srv
pkt_out->header.op_code = VPN_PACKET_OP_CODE_PROBLEM; pkt_out->header.op_code = VPN_PACKET_OP_CODE_PROBLEM;
pkt_out->header.usage_id = a_usage->id; pkt_out->header.usage_id = a_usage->id;
pkt_out->header.op_problem.code = VPN_PROBLEM_CODE_NO_FREE_ADDR; pkt_out->header.op_problem.code = VPN_PROBLEM_CODE_NO_FREE_ADDR;
dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out, size_t l_data_to_write = pkt_out->header.op_data.data_size + sizeof(pkt_out->header);
size_t l_data_wrote = dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out,
pkt_out->header.op_data.data_size + sizeof(pkt_out->header)); pkt_out->header.op_data.data_size + sizeof(pkt_out->header));
dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); if (l_data_wrote < l_data_to_write){
log_it(L_WARNING, "Buffer overfilled: can't send packet with VPN_PACKET_OP_CODE_PROBLEM: sent only %zd from %zd",
l_data_wrote,l_data_to_write );
dap_chain_net_srv_stream_session_t * l_srv_session= DAP_CHAIN_NET_SRV_STREAM_SESSION(l_ch_vpn->ch->stream->session);
assert(l_srv_session);
l_srv_session->stats.bytes_sent_lost += l_data_to_write - l_data_wrote;
l_srv_session->stats.packets_sent_lost++;
}else{
l_srv_session->stats.packets_sent++;
l_srv_session->stats.bytes_sent+= l_data_wrote;
}
} }
} }
} }
...@@ -1192,16 +1279,21 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -1192,16 +1279,21 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
ch_vpn_pkt_t * l_vpn_pkt = (ch_vpn_pkt_t *) l_pkt->data; ch_vpn_pkt_t * l_vpn_pkt = (ch_vpn_pkt_t *) l_pkt->data;
size_t l_vpn_pkt_size = l_pkt->hdr.size - sizeof (l_vpn_pkt->header); size_t l_vpn_pkt_size = l_pkt->hdr.size - sizeof (l_vpn_pkt->header);
if (s_debug_more)
log_it(L_INFO, "Got srv_vpn packet with op_code=0x%02x", l_vpn_pkt->header.op_code);
//log_it(L_DEBUG, "Got SF packet with id %d op_code 0x%02x", remote_sock_id, sf_pkt->header.op_code);
if(l_vpn_pkt->header.op_code >= 0xb0) { // Raw packets if(l_vpn_pkt->header.op_code >= 0xb0) { // Raw packets
switch (l_vpn_pkt->header.op_code) { switch (l_vpn_pkt->header.op_code) {
case VPN_PACKET_OP_CODE_PING: case VPN_PACKET_OP_CODE_PING:
a_ch->stream->esocket->last_ping_request = time(NULL); a_ch->stream->esocket->last_ping_request = time(NULL);
l_srv_session->stats.bytes_recv += l_vpn_pkt_size;
l_srv_session->stats.packets_recv++;
send_pong_pkt(a_ch); send_pong_pkt(a_ch);
break; break;
case VPN_PACKET_OP_CODE_PONG: case VPN_PACKET_OP_CODE_PONG:
a_ch->stream->esocket->last_ping_request = time(NULL); a_ch->stream->esocket->last_ping_request = time(NULL);
l_srv_session->stats.bytes_recv += l_vpn_pkt_size;
l_srv_session->stats.packets_recv++;
break; break;
// for client // for client
case VPN_PACKET_OP_CODE_VPN_ADDR_REPLY: { // Assigned address for peer case VPN_PACKET_OP_CODE_VPN_ADDR_REPLY: { // Assigned address for peer
...@@ -1209,11 +1301,15 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -1209,11 +1301,15 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
log_it(L_ERROR, "Can't create tun"); log_it(L_ERROR, "Can't create tun");
}else }else
s_tun_send_msg_ip_assigned_all(CH_VPN(a_ch), CH_VPN(a_ch)->addr_ipv4); s_tun_send_msg_ip_assigned_all(CH_VPN(a_ch), CH_VPN(a_ch)->addr_ipv4);
l_srv_session->stats.bytes_recv += l_vpn_pkt_size;
l_srv_session->stats.packets_recv++;
} break; } break;
// for server // for server
case VPN_PACKET_OP_CODE_VPN_ADDR_REQUEST: { // Client request after L3 connection the new IP address case VPN_PACKET_OP_CODE_VPN_ADDR_REQUEST: { // Client request after L3 connection the new IP address
log_it(L_INFO, "Received address request "); log_it(L_INFO, "Received address request ");
s_ch_packet_in_vpn_address_request(a_ch, l_usage); s_ch_packet_in_vpn_address_request(a_ch, l_usage);
l_srv_session->stats.bytes_recv += l_vpn_pkt_size;
l_srv_session->stats.packets_recv++;
} break; } break;
// for client only // for client only
case VPN_PACKET_OP_CODE_VPN_RECV:{ case VPN_PACKET_OP_CODE_VPN_RECV:{
...@@ -1221,9 +1317,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -1221,9 +1317,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
// Find tun socket for current worker // Find tun socket for current worker
ch_sf_tun_socket_t * l_tun = s_tun_sockets[a_ch->stream_worker->worker->id]; ch_sf_tun_socket_t * l_tun = s_tun_sockets[a_ch->stream_worker->worker->id];
assert(l_tun); assert(l_tun);
// Unsafely send it s_stream_session_esocket_send(l_srv_session, l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size);
dap_events_socket_write_unsafe( l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size);
//dap_events_socket_set_writable_unsafe(l_tun->es, true);
} break; } break;
// for servier only // for servier only
...@@ -1231,7 +1325,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -1231,7 +1325,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
ch_sf_tun_socket_t * l_tun = s_tun_sockets[a_ch->stream_worker->worker->id]; ch_sf_tun_socket_t * l_tun = s_tun_sockets[a_ch->stream_worker->worker->id];
assert(l_tun); assert(l_tun);
// Unsafely send it // Unsafely send it
size_t l_ret = dap_events_socket_write_unsafe( l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size); size_t l_ret = s_stream_session_esocket_send(l_srv_session, l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size);
//dap_events_socket_set_writable_unsafe(l_tun->es, true); //dap_events_socket_set_writable_unsafe(l_tun->es, true);
if( l_ret) if( l_ret)
s_update_limits (a_ch, l_srv_session, l_usage,l_ret ); s_update_limits (a_ch, l_srv_session, l_usage,l_ret );
...@@ -1242,6 +1336,58 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -1242,6 +1336,58 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
} }
} }
/**
* @brief s_stream_session_esocket_send
* @param l_srv_session
* @param l_es
* @param a_data
* @param a_data_size
*/
static size_t s_stream_session_esocket_send(dap_chain_net_srv_stream_session_t * l_srv_session, dap_events_socket_t * l_es, const void * a_data, size_t a_data_size )
{
// Lets first try to send it directly with write() call
ssize_t l_direct_wrote;
size_t l_ret = 0;
if (l_es->type == DESCRIPTOR_TYPE_FILE )
l_direct_wrote = write(l_es->fd, a_data, a_data_size);
else
l_direct_wrote = send(l_es->fd, a_data, a_data_size, MSG_DONTWAIT | MSG_NOSIGNAL);
int l_errno = errno;
size_t l_data_left_to_send=0;
if (l_direct_wrote > 0){
l_ret += l_direct_wrote;
if((size_t) l_direct_wrote < a_data_size){ // If we sent not all - lets put tail in buffer
l_data_left_to_send = a_data_size-l_direct_wrote;
}else{
l_srv_session->stats.packets_sent++;
l_srv_session->stats.bytes_sent+= l_direct_wrote;
}
}else{
l_data_left_to_send = a_data_size;
l_direct_wrote=0;
if(l_errno != EAGAIN && l_errno != EWOULDBLOCK){
char l_errbuf[128];
strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
log_it(L_WARNING,"Error with data sent: \"%s\" code %d",l_errbuf, l_errno);
}
}
if(l_data_left_to_send){
if ( dap_events_socket_write_unsafe( l_es, a_data +l_direct_wrote,l_data_left_to_send
) < l_data_left_to_send ){
log_it(L_WARNING,"Loosing data, probably buffers are overfilling, lost %zd bytes", l_data_left_to_send);
l_srv_session->stats.bytes_sent_lost += l_data_left_to_send;
l_srv_session->stats.packets_sent_lost++;
}else{
l_ret += l_data_left_to_send;
l_srv_session->stats.packets_sent++;
l_srv_session->stats.bytes_sent+= l_direct_wrote;
}
}
return l_ret;
}
/** /**
* @brief stream_sf_packet_out Packet Out Ch callback * @brief stream_sf_packet_out Packet Out Ch callback
* @param ch * @param ch
...@@ -1277,6 +1423,10 @@ static void s_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) ...@@ -1277,6 +1423,10 @@ static void s_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
dap_stream_ch_pkt_write_unsafe( l_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 ); dap_stream_ch_pkt_write_unsafe( l_usage->client->ch , DAP_STREAM_CH_CHAIN_NET_SRV_PKT_TYPE_NOTIFY_STOPPED , NULL, 0 );
return; return;
} }
// Check for empty buffer out here to prevent warnings in worker
if ( ! a_ch->stream->esocket->buf_out_size )
dap_events_socket_set_writable_unsafe(a_ch->stream->esocket,false);
} }
void m_es_tun_delete(dap_events_socket_t * a_es, void * arg) void m_es_tun_delete(dap_events_socket_t * a_es, void * arg)
...@@ -1295,27 +1445,29 @@ void m_es_tun_read(dap_events_socket_t * a_es, void * arg) ...@@ -1295,27 +1445,29 @@ void m_es_tun_read(dap_events_socket_t * a_es, void * arg)
if(l_buf_in_size) { if(l_buf_in_size) {
struct iphdr *iph = (struct iphdr*) a_es->buf_in; struct iphdr *iph = (struct iphdr*) a_es->buf_in;
struct in_addr in_daddr; struct in_addr l_in_daddr;
in_daddr.s_addr = iph->daddr; l_in_daddr.s_addr = iph->daddr;
// //
dap_chain_net_srv_ch_vpn_info_t * l_vpn_info = NULL; dap_chain_net_srv_ch_vpn_info_t * l_vpn_info = NULL;
// Try to find in worker's clients, without locks // Try to find in worker's clients, without locks
if ( l_tun_socket->clients){ if ( l_tun_socket->clients){
HASH_FIND_INT( l_tun_socket->clients,&in_daddr.s_addr,l_vpn_info ); HASH_FIND_INT( l_tun_socket->clients,&l_in_daddr.s_addr,l_vpn_info );
} }
// We found in local table, sending data (if possible) // We found in local table, sending data (if possible)
if (l_vpn_info){ if (l_vpn_info){
if ( !l_vpn_info->is_on_this_worker && !l_vpn_info->is_reassigned_once ){ if ( !l_vpn_info->is_on_this_worker && !l_vpn_info->is_reassigned_once ){
log_it(L_NOTICE, "Reassigning from worker %u to %u", l_vpn_info->worker->id, a_es->worker->id); log_it(L_NOTICE, "Reassigning from worker %u to %u", l_vpn_info->worker->id, a_es->worker->id);
dap_events_socket_reassign_between_workers_mt( l_vpn_info->worker,l_vpn_info->esocket,a_es->worker);
l_vpn_info->is_reassigned_once = true; l_vpn_info->is_reassigned_once = true;
s_tun_send_msg_esocket_reasigned_all_mt(l_vpn_info->ch_vpn, l_vpn_info->esocket, l_vpn_info->addr_ipv4,a_es->worker->id); s_tun_send_msg_esocket_reasigned_all_mt(l_vpn_info->ch_vpn, l_vpn_info->esocket, l_vpn_info->addr_ipv4,a_es->worker->id);
dap_events_socket_reassign_between_workers_mt( l_vpn_info->worker,l_vpn_info->esocket,a_es->worker);
} }
s_tun_client_send_data(l_vpn_info, a_es->buf_in, l_buf_in_size); s_tun_client_send_data(l_vpn_info, a_es->buf_in, l_buf_in_size);
}//else{ }else if(s_debug_more){
// log_it(L_DEBUG, "Can't find route for desitnation %s",str_daddr); char l_str_daddr[INET_ADDRSTRLEN];
//} inet_ntop(AF_INET,&l_in_daddr,l_str_daddr,sizeof (l_in_daddr));
log_it(L_WARNING, "Can't find route for desitnation %s",l_str_daddr);
}
} }
a_es->buf_in_size=0; // NULL it out because read it all a_es->buf_in_size=0; // NULL it out because read it all
} }
......
...@@ -90,7 +90,7 @@ typedef struct ch_vpn_pkt { ...@@ -90,7 +90,7 @@ typedef struct ch_vpn_pkt {
} raw; // Raw access to OP bytes } raw; // Raw access to OP bytes
}; };
} DAP_ALIGN_PACKED header; } DAP_ALIGN_PACKED header;
uint8_t data[]; // Binary data nested by packet byte_t data[]; // Binary data nested by packet
}DAP_ALIGN_PACKED ch_vpn_pkt_t; }DAP_ALIGN_PACKED ch_vpn_pkt_t;
typedef struct ch_sf_tun_socket ch_sf_tun_socket_t; typedef struct ch_sf_tun_socket ch_sf_tun_socket_t;
......