diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index 46ada366f2a46643c7cb77932d66840240246de9..3ea6b437aa2f5b61d716bce86835b8805de5b666 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -76,18 +76,6 @@ #define SF_MAX_EVENTS 256 -typedef struct usage_client { - pthread_rwlock_t rwlock; - dap_chain_net_srv_ch_vpn_t * ch_vpn; - dap_chain_net_srv_client_t *net_srv_client; - dap_chain_datum_tx_receipt_t * receipt; - size_t receipt_size; - uint32_t usage_id; - dap_chain_net_srv_t * srv; - UT_hash_handle hh; -} usage_client_t; - - typedef struct vpn_local_network { struct in_addr ipv4_lease_last; struct in_addr ipv4_network_mask; @@ -96,16 +84,14 @@ typedef struct vpn_local_network { int tun_ctl_fd; int tun_fd; struct ifreq ifr; + bool auto_cpu_reassignment; -#ifdef DAP_TUN_IN_WORKER - dap_events_socket_t * tun_events_socket; -#endif ch_vpn_pkt_t * pkt_out[400]; size_t pkt_out_size; size_t pkt_out_rindex; size_t pkt_out_windex; pthread_mutex_t pkt_out_mutex; - + pthread_rwlock_t rwlock; } vpn_local_network_t; static usage_client_t * s_clients; @@ -158,47 +144,36 @@ static void s_update_limits(dap_stream_ch_t * a_ch , dap_chain_net_srv_stream_session_t * a_srv_session, dap_chain_net_srv_usage_t * a_usage, size_t a_bytes); -#ifdef DAP_TUN_IN_WORKER +static void m_es_tun_new(dap_events_socket_t * a_es, void * arg); static void m_es_tun_delete(dap_events_socket_t * a_es, void * arg); static void m_es_tun_read(dap_events_socket_t * a_es, void * arg); static void m_es_tun_error(dap_events_socket_t * a_es, void * arg); -bool is_dap_tun_in_worker(void) -{ -#ifdef DAP_TUN_IN_WORKER - return true; -#else - return false; -#endif -} - -static void s_es_tun_new(dap_events_socket_t * a_es, void * arg); -static void s_es_tun_delete(dap_events_socket_t * a_es, void * arg); -static void s_es_tun_read(dap_events_socket_t * a_es, void * arg); -static void s_es_tun_error(dap_events_socket_t * a_es, void * arg); - pthread_rwlock_t s_tun_sockets_rwlock = PTHREAD_RWLOCK_INITIALIZER; ch_sf_tun_socket_t * s_tun_sockets = NULL; -//TODO: create .new_callback for event sockets -int s_tun_event_stream_create() +int s_tun_deattach_queue(int fd); +int s_tun_attach_queue(int fd); + +dap_events_socket_t * s_tun_event_stream_create(dap_worker_t * a_worker, int a_tun_fd) { - static dap_events_socket_callbacks_t l_s_callbacks = { - .read_callback = m_es_tun_read,// for server - .write_callback = NULL,// for client - .error_callback = m_es_tun_error, - .delete_callback = m_es_tun_delete - }; - - s_raw_server->tun_events_socket = dap_events_socket_wrap_no_add(NULL, - s_raw_server->tun_fd, &l_s_callbacks); - s_raw_server->tun_events_socket->type = DESCRIPTOR_TYPE_FILE; - dap_worker_add_events_socket_auto(s_raw_server->tun_events_socket); - s_raw_server->tun_events_socket->_inheritor = s_raw_server; - - return 0; + assert(a_worker); + static dap_events_socket_callbacks_t l_s_callbacks = {{ 0 }}; + l_s_callbacks.new_callback = m_es_tun_new; + l_s_callbacks.read_callback = m_es_tun_read; + l_s_callbacks.error_callback = m_es_tun_error; + l_s_callbacks.delete_callback = m_es_tun_delete; + + s_tun_deattach_queue(a_tun_fd); + + dap_events_socket_t * l_es = dap_events_socket_wrap_no_add(a_worker->events , + a_tun_fd, &l_s_callbacks); + l_es->type = DESCRIPTOR_TYPE_FILE; + dap_events_socket_assign_on_worker_mt(l_es, a_worker); + + return l_es; } -#endif + static int s_callback_client_success(dap_chain_net_srv_t * a_srv, uint32_t a_usage_id, dap_chain_net_srv_client_t * a_srv_client, const void * a_success, size_t a_success_size) @@ -335,9 +310,6 @@ int dap_chain_net_srv_client_vpn_init(dap_config_t * l_config) { return 0; } - -ch_sf_tun_server_t * m_tun_server = NULL; - int s_tun_attach_queue(int fd) { struct ifreq ifr; @@ -355,30 +327,6 @@ int s_tun_deattach_queue(int fd) } -/** - * @brief s_tun_es_create - * @param a_worker - * @return - */ -dap_events_socket_t * s_tun_es_create(dap_worker_t * a_worker, int a_tun_fd) -{ - assert(a_worker); - static dap_events_socket_callbacks_t l_s_callbacks = {{ 0 }}; - l_s_callbacks.new_callback = s_es_tun_new; - l_s_callbacks.read_callback = s_es_tun_read; - l_s_callbacks.error_callback = s_es_tun_error; - l_s_callbacks.delete_callback = s_es_tun_delete; - - s_tun_deattach_queue(a_tun_fd); - - dap_events_socket_t * l_es = dap_events_socket_wrap_no_add(a_worker->events , - a_tun_fd, &l_s_callbacks); - l_es->type = DESCRIPTOR_TYPE_FILE; - dap_events_socket_assign_on_worker_mt(l_es, a_worker); - - return l_es; -} - int s_vpn_tun_create(dap_config_t * g_config) { const char *c_addr = dap_config_get_item_str(g_config, "srv_vpn", "network_address"); @@ -390,13 +338,16 @@ int s_vpn_tun_create(dap_config_t * g_config) return -1; } - inet_aton(c_addr, &m_tun_server->int_network ); - inet_aton(c_mask, &m_tun_server->int_network_mask ); - m_tun_server->int_network_addr.s_addr= (m_tun_server->int_network.s_addr | 0x01000000); // grow up some shit here! - m_tun_server->client_addr_last.s_addr = m_tun_server->int_network_addr.s_addr; + inet_aton(c_addr, &s_raw_server->ipv4_host ); + inet_aton(c_mask, &s_raw_server->ipv4_network_mask ); + s_raw_server->ipv4_network_addr.s_addr= (s_raw_server->ipv4_host.s_addr | 0x01000000); // grow up some shit here! + s_raw_server->ipv4_lease_last.s_addr = s_raw_server->ipv4_network_addr.s_addr; + + s_raw_server->auto_cpu_reassignment = dap_config_get_item_bool_default(g_config, "srv_vpn", "auto_cpu_reassignment", false); + log_it(L_NOTICE,"auto cpu reassignment is set to '%s'", s_raw_server->auto_cpu_reassignment); - memset(&m_tun_server->ifr, 0, sizeof(m_tun_server->ifr)); - m_tun_server->ifr.ifr_flags = IFF_TUN | IFF_MULTI_QUEUE| IFF_NO_PI; + memset(&s_raw_server->ifr, 0, sizeof(s_raw_server->ifr)); + s_raw_server->ifr.ifr_flags = IFF_TUN | IFF_MULTI_QUEUE| IFF_NO_PI; uint32_t l_cpu_count = dap_get_cpu_count(); // maybe replace with getting s_threads_count directly log_it(L_NOTICE,"%s: trying to initialize multiqueue for %u workers", __PRETTY_FUNCTION__, l_cpu_count); @@ -412,21 +363,21 @@ int s_vpn_tun_create(dap_config_t * g_config) break; } log_it(L_DEBUG,"Opening /dev/net/tun:%u", i); - if( (err = ioctl(l_tun_fd, TUNSETIFF, (void *)& m_tun_server->ifr)) < 0 ) { + if( (err = ioctl(l_tun_fd, TUNSETIFF, (void *)& s_raw_server->ifr)) < 0 ) { log_it(L_CRITICAL, "ioctl(TUNSETIFF) error: '%s' ",strerror(errno)); close(l_tun_fd); break; } s_tun_deattach_queue(l_tun_fd); - s_tun_es_create(l_worker, l_tun_fd); + s_tun_event_stream_create(l_worker, l_tun_fd); } if (! err ){ char buf[256]; - log_it(L_NOTICE,"Bringed up %s virtual network interface (%s/%s)", m_tun_server->ifr.ifr_name,inet_ntoa(m_tun_server->int_network_addr),c_mask); - snprintf(buf,sizeof(buf),"ip link set %s up",m_tun_server->ifr.ifr_name); + log_it(L_NOTICE,"Bringed up %s virtual network interface (%s/%s)", s_raw_server->ifr.ifr_name,inet_ntoa(s_raw_server->ipv4_network_addr),c_mask); + snprintf(buf,sizeof(buf),"ip link set %s up",s_raw_server->ifr.ifr_name); system(buf); - snprintf(buf,sizeof(buf),"ip addr add %s/%s dev %s ",inet_ntoa(m_tun_server->int_network_addr),c_mask, m_tun_server->ifr.ifr_name ); + snprintf(buf,sizeof(buf),"ip addr add %s/%s dev %s ",inet_ntoa(s_raw_server->ipv4_network_addr),c_mask, s_raw_server->ifr.ifr_name ); system(buf); } @@ -439,153 +390,126 @@ int s_vpn_tun_create(dap_config_t * g_config) */ int s_vpn_tun_init() { - m_tun_server=DAP_NEW_Z(ch_sf_tun_server_t); - pthread_rwlock_init(&m_tun_server->rwlock, NULL); - - pthread_mutex_init(&m_tun_server->external_route_operations,NULL); - pthread_mutex_init(&m_tun_server->pkt_out_mutex,NULL); + s_raw_server=DAP_NEW_Z(vpn_local_network_t); + pthread_rwlock_init(&s_raw_server->rwlock, NULL); + pthread_mutex_init(&s_raw_server->pkt_out_mutex,NULL); + pthread_mutex_init(&s_sf_socks_mutex, NULL); + pthread_cond_init(&s_sf_socks_cond, NULL); return 0; } -int dap_chain_net_srv_vpn_init(dap_config_t * g_config) { - s_vpn_tun_init(); - - log_it(L_DEBUG,"Initializing TUN driver..."); - s_vpn_tun_create(g_config); - log_it(L_INFO,"TUN driver configured successfuly"); - - //TODO: initialize dap_chain_net_srv_vpn_t* here, - //take it from previous version of dap_chain_net_srv_vpn_init() (below) +int s_vpn_service_create(dap_config_t * g_config){ + dap_chain_net_srv_uid_t l_uid = { .uint64 = DAP_CHAIN_NET_SRV_VPN_ID }; + dap_chain_net_srv_t* l_srv = dap_chain_net_srv_add( l_uid, s_callback_requested, + s_callback_response_success, s_callback_response_error, + s_callback_receipt_next_success); + + dap_chain_net_srv_vpn_t* l_srv_vpn = DAP_NEW_Z( dap_chain_net_srv_vpn_t); + l_srv->_inhertor = l_srv_vpn; + l_srv_vpn->parent = l_srv; + + uint16_t l_pricelist_count = 0; + + //! IMPORTANT ! This fetch is single-action and cannot be further reused, since it modifies the stored config data + //! it also must NOT be freed within this module ! + char **l_pricelist = dap_config_get_array_str(g_config, "srv_vpn", "pricelist", &l_pricelist_count); // must not be freed! + for (uint16_t i = 0; i < l_pricelist_count; i++) { + dap_chain_net_srv_price_t *l_price = DAP_NEW_Z(dap_chain_net_srv_price_t); + short l_iter = 0; + char *l_ctx; + for (char *l_price_token = strtok_r(l_pricelist[i], ":", &l_ctx); l_price_token || l_iter == 6; l_price_token = strtok_r(NULL, ":", &l_ctx), ++l_iter) { + //log_it(L_DEBUG, "Tokenizer: %s", l_price_token); + switch (l_iter) { + case 0: + l_price->net_name = l_price_token; + if (!(l_price->net = dap_chain_net_by_name(l_price->net_name))) { + log_it(L_ERROR, "Error parsing pricelist: can't find network \"%s\"", l_price_token); + DAP_DELETE(l_price); + break; + } + continue; + case 1: + l_price->value_coins = atof(l_price_token); + if (!(l_price->value_datoshi = (uint64_t)dap_chain_coins_to_balance((long double)l_price->value_coins))) { + log_it(L_ERROR, "Error parsing pricelist: text on 2nd position \"%s\" is not floating number", l_price_token); + l_iter = 0; + DAP_DELETE(l_price); + break; + } + continue; + case 2: + dap_stpcpy(l_price->token, l_price_token); + continue; + case 3: + l_price->units = strtoul(l_price_token, NULL, 10); + if (!l_price->units) { + log_it(L_ERROR, "Error parsing pricelist: text on 4th position \"%s\" is not unsigned integer", l_price_token); + l_iter = 0; + DAP_DELETE(l_price); + break; + } + continue; + case 4: + if (!strcmp(l_price_token, "SEC")) + l_price->units_uid.enm = SERV_UNIT_SEC; + else if (!strcmp(l_price_token, "DAY")) + l_price->units_uid.enm = SERV_UNIT_DAY; + else if (!strcmp(l_price_token, "MB")) + l_price->units_uid.enm = SERV_UNIT_MB; + else { + log_it(L_ERROR, "Error parsing pricelist: wrong unit type \"%s\"", l_price_token); + l_iter = 0; + DAP_DELETE(l_price); + break; + } + continue; + case 5: + if (!(l_price->wallet = dap_chain_wallet_open(l_price_token, dap_config_get_item_str_default(g_config, "resources", "wallets_path", NULL)))) { + log_it(L_ERROR, "Error parsing pricelist: can't open wallet \"%s\"", l_price_token); + l_iter = 0; + DAP_DELETE(l_price); + break; + } + continue; + case 6: + log_it(L_INFO, "Price item correct, added to service"); + DL_APPEND(l_srv->pricelist, l_price); + break; + default: + break; + } + log_it(L_DEBUG, "Done with price item %d", i); + break; // double break exits tokenizer loop and steps to next price item + } + } return 0; + } + /** * @brief dap_stream_ch_vpn_init Init actions for VPN stream channel - * @param vpn_addr Zero if only client mode. Address if the node shares its local VPN - * @param vpn_mask Zero if only client mode. Mask if the node shares its local VPN + * @param g_config * @return 0 if everything is okay, lesser then zero if errors */ -/* int dap_chain_net_srv_vpn_init(dap_config_t * g_config) { - const char *c_addr = dap_config_get_item_str(g_config, "srv_vpn", "network_address"); - const char *c_mask = dap_config_get_item_str(g_config, "srv_vpn", "network_mask"); - if(c_addr && c_mask) { - s_srv_vpn_addr = strdup(c_addr); - s_srv_vpn_mask = strdup(c_mask); - - s_raw_server = DAP_NEW_Z(vpn_local_network_t); - pthread_mutex_init(&s_raw_server->pkt_out_mutex, NULL); - pthread_mutex_init(&s_sf_socks_mutex, NULL); - pthread_cond_init(&s_sf_socks_cond, NULL); - -#ifdef DAP_TUN_IN_WORKER - s_tun_create(); - - if(s_raw_server->tun_fd == -1){ - log_it(L_CRITICAL,"Error creating file descriptor for /dev/net/tun device"); - return -2; - } - - s_tun_event_stream_create(); -#else - pthread_create(&srv_sf_socks_raw_pid, NULL, srv_ch_sf_thread_raw, NULL); -#endif + s_vpn_tun_init(); - pthread_create(&srv_sf_socks_pid, NULL, srv_ch_sf_thread, NULL); - dap_stream_ch_proc_add(DAP_STREAM_CH_ID_NET_SRV_VPN, s_new, srv_ch_vpn_delete, s_ch_packet_in, - s_ch_packet_out); + log_it(L_DEBUG,"Initializing TUN driver..."); + if(s_vpn_tun_create(g_config) != 0){ + log_it(L_CRITICAL, "Error initializing TUN device driver!"); + return -1; + } + log_it(L_INFO,"TUN driver configured successfuly"); - dap_chain_net_srv_uid_t l_uid = { .uint64 = DAP_CHAIN_NET_SRV_VPN_ID }; - dap_chain_net_srv_t* l_srv = dap_chain_net_srv_add( l_uid, s_callback_requested, - s_callback_response_success, s_callback_response_error, - s_callback_receipt_next_success); - dap_chain_net_srv_vpn_t* l_srv_vpn = DAP_NEW_Z( dap_chain_net_srv_vpn_t); - l_srv->_inhertor = l_srv_vpn; - l_srv_vpn->parent = l_srv; - - uint16_t l_pricelist_count = 0; - - //! IMPORTANT ! This fetch is single-action and cannot be further reused, since it modifies the stored config data - //! it also must NOT be freed within this module ! - char **l_pricelist = dap_config_get_array_str(g_config, "srv_vpn", "pricelist", &l_pricelist_count); // must not be freed! - for (uint16_t i = 0; i < l_pricelist_count; i++) { - dap_chain_net_srv_price_t *l_price = DAP_NEW_Z(dap_chain_net_srv_price_t); - short l_iter = 0; - char *l_ctx; - for (char *l_price_token = strtok_r(l_pricelist[i], ":", &l_ctx); l_price_token || l_iter == 6; l_price_token = strtok_r(NULL, ":", &l_ctx), ++l_iter) { - //log_it(L_DEBUG, "Tokenizer: %s", l_price_token); - switch (l_iter) { - case 0: - l_price->net_name = l_price_token; - if (!(l_price->net = dap_chain_net_by_name(l_price->net_name))) { - log_it(L_ERROR, "Error parsing pricelist: can't find network \"%s\"", l_price_token); - DAP_DELETE(l_price); - break; - } - continue; - case 1: - l_price->value_coins = atof(l_price_token); - if (!(l_price->value_datoshi = (uint64_t)dap_chain_coins_to_balance((long double)l_price->value_coins))) { - log_it(L_ERROR, "Error parsing pricelist: text on 2nd position \"%s\" is not floating number", l_price_token); - l_iter = 0; - DAP_DELETE(l_price); - break; - } - continue; - case 2: - dap_stpcpy(l_price->token, l_price_token); - continue; - case 3: - l_price->units = strtoul(l_price_token, NULL, 10); - if (!l_price->units) { - log_it(L_ERROR, "Error parsing pricelist: text on 4th position \"%s\" is not unsigned integer", l_price_token); - l_iter = 0; - DAP_DELETE(l_price); - break; - } - continue; - case 4: - if (!strcmp(l_price_token, "SEC")) - l_price->units_uid.enm = SERV_UNIT_SEC; - else if (!strcmp(l_price_token, "DAY")) - l_price->units_uid.enm = SERV_UNIT_DAY; - else if (!strcmp(l_price_token, "MB")) - l_price->units_uid.enm = SERV_UNIT_MB; - else { - log_it(L_ERROR, "Error parsing pricelist: wrong unit type \"%s\"", l_price_token); - l_iter = 0; - DAP_DELETE(l_price); - break; - } - continue; - case 5: - if (!(l_price->wallet = dap_chain_wallet_open(l_price_token, dap_config_get_item_str_default(g_config, "resources", "wallets_path", NULL)))) { - log_it(L_ERROR, "Error parsing pricelist: can't open wallet \"%s\"", l_price_token); - l_iter = 0; - DAP_DELETE(l_price); - break; - } - continue; - case 6: - log_it(L_INFO, "Price item correct, added to service"); - DL_APPEND(l_srv->pricelist, l_price); - break; - default: - break; - } - log_it(L_DEBUG, "Done with price item %d", i); - break; // double break exits tokenizer loop and steps to next price item - } - } + pthread_create(&srv_sf_socks_pid, NULL, srv_ch_sf_thread, NULL); + dap_stream_ch_proc_add(DAP_STREAM_CH_ID_NET_SRV_VPN, s_new, srv_ch_vpn_delete, s_ch_packet_in, + s_ch_packet_out); - return 0; - //int retVal = dap_chain_net_srv_vpn_cmd_init(); - //return retVal; - } - return -1; + s_vpn_service_create(g_config); + return 0; } -*/ /** * @brief ch_sf_deinit @@ -648,6 +572,11 @@ static int s_callback_response_success(dap_chain_net_srv_t * a_srv, uint32_t a_u memcpy(l_usage_client->receipt, l_receipt, l_receipt_size); pthread_rwlock_wrlock(&s_clients_rwlock); HASH_ADD(hh, s_clients,usage_id,sizeof(a_usage_id),l_usage_client); + if ( l_srv_ch_vpn->tun_socket){ + //TODO: not so sure here. Should we make a copy, like in sap? + HASH_ADD(hh, l_srv_ch_vpn->tun_socket->clients, usage_id,sizeof(a_usage_id),l_usage_client); + }else + log_it(L_WARNING, "No tun socket for SF channel"); l_srv_session->usage_active = l_usage_active; l_srv_session->usage_active->is_active = true; @@ -753,8 +682,15 @@ static void s_tun_create(void) static void s_tun_destroy(void) { pthread_rwlock_wrlock(& s_raw_server_rwlock); - dap_events_socket_remove_and_delete_mt(s_raw_server->tun_events_socket->worker, s_raw_server->tun_events_socket); s_raw_server->tun_fd = -1; + + pthread_rwlock_wrlock(&s_tun_sockets_rwlock); + ch_sf_tun_socket_t * l_tun_socket = NULL, *tmp = NULL; + HASH_ITER(hh, s_tun_sockets, l_tun_socket, tmp) { + l_tun_socket->es->kill_signal=true; + } + pthread_rwlock_unlock(&s_tun_sockets_rwlock); + pthread_rwlock_unlock(& s_raw_server_rwlock); } @@ -1028,6 +964,113 @@ static void send_pong_pkt(dap_stream_ch_t* a_ch) free(pkt_out); } +void s_ch_packet_in_vpn_address_request(dap_stream_ch_t* a_ch, dap_chain_net_srv_usage_t * a_usage){ + dap_chain_net_srv_ch_vpn_t *l_ch_vpn = CH_VPN(a_ch); + dap_chain_net_srv_vpn_t * l_srv_vpn =(dap_chain_net_srv_vpn_t *) a_usage->service->_inhertor; + + if ( l_ch_vpn->addr_ipv4.s_addr ){ + log_it(L_WARNING,"We already have ip address leased to us"); + ch_vpn_pkt_t *pkt_out = (ch_vpn_pkt_t*) calloc(1, sizeof(pkt_out->header)); + pkt_out->header.op_code = VPN_PACKET_OP_CODE_PROBLEM; + pkt_out->header.op_problem.code = VPN_PROBLEM_CODE_ALREADY_ASSIGNED_ADDR; + pkt_out->header.sock_id = s_raw_server->tun_fd; + pkt_out->header.usage_id = a_usage->id; + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out, + pkt_out->header.op_data.data_size + sizeof(pkt_out->header)); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + return; + } + dap_chain_net_srv_vpn_item_ipv4_t * l_item_ipv4 = l_srv_vpn->ipv4_unleased; + if ( l_item_ipv4){ + log_it(L_WARNING,"We have unleased ip address"); + l_ch_vpn->addr_ipv4.s_addr = l_item_ipv4->addr.s_addr; + + pthread_rwlock_wrlock( &s_clients_rwlock ); + HASH_ADD(hh, s_ch_vpn_addrs, addr_ipv4, sizeof (l_ch_vpn->addr_ipv4), l_ch_vpn); + pthread_rwlock_unlock( &s_clients_rwlock ); + + ch_vpn_pkt_t *l_pkt_out = DAP_NEW_Z_SIZE(ch_vpn_pkt_t, + sizeof(l_pkt_out->header) + sizeof(l_ch_vpn->addr_ipv4) + sizeof(s_raw_server->ipv4_host)); + l_pkt_out->header.sock_id = s_raw_server->tun_fd; + l_pkt_out->header.op_code = VPN_PACKET_OP_CODE_VPN_ADDR_REPLY; + l_pkt_out->header.op_data.data_size = sizeof(l_ch_vpn->addr_ipv4) + sizeof(s_raw_server->ipv4_host); + l_pkt_out->header.usage_id = a_usage->id; + + memcpy(l_pkt_out->data, &l_ch_vpn->addr_ipv4, sizeof(l_ch_vpn->addr_ipv4)); + memcpy(l_pkt_out->data + sizeof(l_ch_vpn->addr_ipv4), &s_raw_server->ipv4_host, + sizeof(s_raw_server->ipv4_host)); + + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA , l_pkt_out, + l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header)); + log_it(L_NOTICE, "VPN client address %s leased", inet_ntoa(l_ch_vpn->addr_ipv4)); + log_it(L_INFO, "\tgateway %s", inet_ntoa(s_raw_server->ipv4_host)); + log_it(L_INFO, "\tmask %s", inet_ntoa(s_raw_server->ipv4_network_mask)); + log_it(L_INFO, "\taddr %s", inet_ntoa(s_raw_server->ipv4_network_addr)); + log_it(L_INFO, "\tlast_addr %s", inet_ntoa(s_raw_server->ipv4_lease_last)); + l_srv_vpn->ipv4_unleased = l_item_ipv4->next; + DAP_DELETE(l_item_ipv4); + }else{ + struct in_addr n_addr = { 0 }, n_addr_max; + n_addr.s_addr = ntohl(s_raw_server->ipv4_lease_last.s_addr); + n_addr.s_addr++; + n_addr_max.s_addr = (ntohl(s_raw_server->ipv4_network_addr.s_addr) + | ~ntohl(s_raw_server->ipv4_network_mask.s_addr)); + + // Just for log output we revert it back and forward + n_addr.s_addr = htonl(n_addr.s_addr); + n_addr_max.s_addr = htonl(n_addr_max.s_addr); + log_it(L_DEBUG, "Check if is address is lesser than"); + log_it(L_DEBUG," new_address = %s", inet_ntoa(n_addr)); + log_it(L_DEBUG," new_address_max = %s", inet_ntoa(n_addr_max)); + n_addr.s_addr = ntohl(n_addr.s_addr); + n_addr_max.s_addr = ntohl(n_addr_max.s_addr); + if(n_addr.s_addr <= n_addr_max.s_addr ) { + n_addr.s_addr = htonl(n_addr.s_addr); + n_addr_max.s_addr = htonl(n_addr_max.s_addr); + + s_raw_server->ipv4_lease_last.s_addr =n_addr.s_addr; + a_ch->stream->session->tun_client_addr.s_addr = n_addr.s_addr; + l_ch_vpn->addr_ipv4.s_addr = n_addr.s_addr; + + log_it(L_NOTICE, "VPN client address %s leased", inet_ntoa(n_addr)); + log_it(L_INFO, "\tgateway %s", inet_ntoa(s_raw_server->ipv4_host)); + log_it(L_INFO, "\tmask %s", inet_ntoa(s_raw_server->ipv4_network_mask)); + log_it(L_INFO, "\taddr %s", inet_ntoa(s_raw_server->ipv4_network_addr)); + log_it(L_INFO, "\tlast_addr %s", inet_ntoa(s_raw_server->ipv4_lease_last)); + pthread_rwlock_wrlock( &s_clients_rwlock ); + HASH_ADD(hh, s_ch_vpn_addrs, addr_ipv4, sizeof (l_ch_vpn->addr_ipv4), l_ch_vpn); + pthread_rwlock_unlock( &s_clients_rwlock ); + + ch_vpn_pkt_t *pkt_out = (ch_vpn_pkt_t*) calloc(1, + sizeof(pkt_out->header) + sizeof(l_ch_vpn->addr_ipv4) + sizeof(s_raw_server->ipv4_host)); + pkt_out->header.sock_id = s_raw_server->tun_fd; + pkt_out->header.op_code = VPN_PACKET_OP_CODE_VPN_ADDR_REPLY; + pkt_out->header.op_data.data_size = sizeof(l_ch_vpn->addr_ipv4) + sizeof(s_raw_server->ipv4_host); + pkt_out->header.usage_id = a_usage->id; + + memcpy(pkt_out->data, &l_ch_vpn->addr_ipv4, sizeof(l_ch_vpn->addr_ipv4)); + memcpy(pkt_out->data + sizeof(l_ch_vpn->addr_ipv4), &s_raw_server->ipv4_host, + sizeof(s_raw_server->ipv4_host)); + + if(dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out, + pkt_out->header.op_data.data_size + sizeof(pkt_out->header))) { + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + } + + } else { // All the network is filled with clients, can't lease a new address + log_it(L_WARNING, "All the network is filled with clients, can't lease a new address"); + ch_vpn_pkt_t *pkt_out = (ch_vpn_pkt_t*) calloc(1, sizeof(pkt_out->header)); + pkt_out->header.sock_id = s_raw_server->tun_fd; + pkt_out->header.op_code = VPN_PACKET_OP_CODE_PROBLEM; + pkt_out->header.usage_id = a_usage->id; + pkt_out->header.op_problem.code = VPN_PROBLEM_CODE_NO_FREE_ADDR; + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out, + pkt_out->header.op_data.data_size + sizeof(pkt_out->header)); + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + } + } +} + /** * @brief stream_sf_packet_in * @param ch @@ -1086,108 +1129,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) // for server case VPN_PACKET_OP_CODE_VPN_ADDR_REQUEST: { // Client request after L3 connection the new IP address log_it(L_INFO, "Received address request "); - if ( l_ch_vpn->addr_ipv4.s_addr ){ - log_it(L_WARNING,"We already have ip address leased to us"); - ch_vpn_pkt_t *pkt_out = (ch_vpn_pkt_t*) calloc(1, sizeof(pkt_out->header)); - pkt_out->header.op_code = VPN_PACKET_OP_CODE_PROBLEM; - pkt_out->header.op_problem.code = VPN_PROBLEM_CODE_ALREADY_ASSIGNED_ADDR; - pkt_out->header.sock_id = s_raw_server->tun_fd; - pkt_out->header.usage_id = l_usage->id; - dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out, - pkt_out->header.op_data.data_size + sizeof(pkt_out->header)); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - break; - } - dap_chain_net_srv_vpn_item_ipv4_t * l_item_ipv4 = l_srv_vpn->ipv4_unleased; - if ( l_item_ipv4){ - log_it(L_WARNING,"We have unleased ip address"); - l_ch_vpn->addr_ipv4.s_addr = l_item_ipv4->addr.s_addr; - - pthread_rwlock_wrlock( &s_clients_rwlock ); - HASH_ADD(hh, s_ch_vpn_addrs, addr_ipv4, sizeof (l_ch_vpn->addr_ipv4), l_ch_vpn); - pthread_rwlock_unlock( &s_clients_rwlock ); - - ch_vpn_pkt_t *l_pkt_out = DAP_NEW_Z_SIZE(ch_vpn_pkt_t, - sizeof(l_pkt_out->header) + sizeof(l_ch_vpn->addr_ipv4) + sizeof(s_raw_server->ipv4_host)); - l_pkt_out->header.sock_id = s_raw_server->tun_fd; - l_pkt_out->header.op_code = VPN_PACKET_OP_CODE_VPN_ADDR_REPLY; - l_pkt_out->header.op_data.data_size = sizeof(l_ch_vpn->addr_ipv4) + sizeof(s_raw_server->ipv4_host); - l_pkt_out->header.usage_id = l_usage->id; - - memcpy(l_pkt_out->data, &l_ch_vpn->addr_ipv4, sizeof(l_ch_vpn->addr_ipv4)); - memcpy(l_pkt_out->data + sizeof(l_ch_vpn->addr_ipv4), &s_raw_server->ipv4_host, - sizeof(s_raw_server->ipv4_host)); - dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA , l_pkt_out, - l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header)); - log_it(L_NOTICE, "VPN client address %s leased", inet_ntoa(l_ch_vpn->addr_ipv4)); - log_it(L_INFO, "\tgateway %s", inet_ntoa(s_raw_server->ipv4_host)); - log_it(L_INFO, "\tmask %s", inet_ntoa(s_raw_server->ipv4_network_mask)); - log_it(L_INFO, "\taddr %s", inet_ntoa(s_raw_server->ipv4_network_addr)); - log_it(L_INFO, "\tlast_addr %s", inet_ntoa(s_raw_server->ipv4_lease_last)); - //dap_stream_ch_set_ready_to_write(a_ch, true); - l_srv_vpn->ipv4_unleased = l_item_ipv4->next; - DAP_DELETE(l_item_ipv4); - }else{ - struct in_addr n_addr = { 0 }, n_addr_max; - n_addr.s_addr = ntohl(s_raw_server->ipv4_lease_last.s_addr); - n_addr.s_addr++; - n_addr_max.s_addr = (ntohl(s_raw_server->ipv4_network_addr.s_addr) - | ~ntohl(s_raw_server->ipv4_network_mask.s_addr)); - - // Just for log output we revert it back and forward - n_addr.s_addr = htonl(n_addr.s_addr); - n_addr_max.s_addr = htonl(n_addr_max.s_addr); - log_it(L_DEBUG, "Check if is address is lesser than"); - log_it(L_DEBUG," new_address = %s", inet_ntoa(n_addr)); - log_it(L_DEBUG," new_address_max = %s", inet_ntoa(n_addr_max)); - n_addr.s_addr = ntohl(n_addr.s_addr); - n_addr_max.s_addr = ntohl(n_addr_max.s_addr); - if(n_addr.s_addr <= n_addr_max.s_addr ) { - n_addr.s_addr = htonl(n_addr.s_addr); - n_addr_max.s_addr = htonl(n_addr_max.s_addr); - - s_raw_server->ipv4_lease_last.s_addr =n_addr.s_addr; - a_ch->stream->session->tun_client_addr.s_addr = n_addr.s_addr; - l_ch_vpn->addr_ipv4.s_addr = n_addr.s_addr; - - log_it(L_NOTICE, "VPN client address %s leased", inet_ntoa(n_addr)); - log_it(L_INFO, "\tgateway %s", inet_ntoa(s_raw_server->ipv4_host)); - log_it(L_INFO, "\tmask %s", inet_ntoa(s_raw_server->ipv4_network_mask)); - log_it(L_INFO, "\taddr %s", inet_ntoa(s_raw_server->ipv4_network_addr)); - log_it(L_INFO, "\tlast_addr %s", inet_ntoa(s_raw_server->ipv4_lease_last)); - pthread_rwlock_wrlock( &s_clients_rwlock ); - HASH_ADD(hh, s_ch_vpn_addrs, addr_ipv4, sizeof (l_ch_vpn->addr_ipv4), l_ch_vpn); - pthread_rwlock_unlock( &s_clients_rwlock ); - - ch_vpn_pkt_t *pkt_out = (ch_vpn_pkt_t*) calloc(1, - sizeof(pkt_out->header) + sizeof(l_ch_vpn->addr_ipv4) + sizeof(s_raw_server->ipv4_host)); - pkt_out->header.sock_id = s_raw_server->tun_fd; - pkt_out->header.op_code = VPN_PACKET_OP_CODE_VPN_ADDR_REPLY; - pkt_out->header.op_data.data_size = sizeof(l_ch_vpn->addr_ipv4) + sizeof(s_raw_server->ipv4_host); - pkt_out->header.usage_id = l_usage->id; - - memcpy(pkt_out->data, &l_ch_vpn->addr_ipv4, sizeof(l_ch_vpn->addr_ipv4)); - memcpy(pkt_out->data + sizeof(l_ch_vpn->addr_ipv4), &s_raw_server->ipv4_host, - sizeof(s_raw_server->ipv4_host)); - - if(dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out, - pkt_out->header.op_data.data_size + sizeof(pkt_out->header))) { - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } - - //ch_sf_raw_write(n_addr.s_addr,STREAM_SF_PACKET_OP_CODE_RAW_L3_ADDR_REPLY,&n_addr,sizeof(n_addr)); - } else { // All the network is filled with clients, can't lease a new address - log_it(L_WARNING, "All the network is filled with clients, can't lease a new address"); - ch_vpn_pkt_t *pkt_out = (ch_vpn_pkt_t*) calloc(1, sizeof(pkt_out->header)); - pkt_out->header.sock_id = s_raw_server->tun_fd; - pkt_out->header.op_code = VPN_PACKET_OP_CODE_PROBLEM; - pkt_out->header.usage_id = l_usage->id; - pkt_out->header.op_problem.code = VPN_PROBLEM_CODE_NO_FREE_ADDR; - dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out, - pkt_out->header.op_data.data_size + sizeof(pkt_out->header)); - dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); - } - } + s_ch_packet_in_vpn_address_request(a_ch, l_usage); } break; // for client only @@ -1639,136 +1581,18 @@ static void s_sig_handle (int sig) s_srv_ch_sf_thread_raw_is_exit = true; } -/** - * - * - **/ -void* srv_ch_sf_thread_raw(void *arg) -{ - UNUSED(arg); - s_tun_create(); - - if(s_raw_server->tun_fd <= 0) { - log_it(L_CRITICAL, "Tun/tap file descriptor is not initialized"); - return NULL; - } - /* if (fcntl(raw_server->tun_fd, F_SETFL, O_NONBLOCK) < 0){ ; - log_it(L_CRITICAL,"Can't switch tun/tap socket into the non-block mode"); - return NULL; - } - if (fcntl(raw_server->tun_fd, F_SETFD, FD_CLOEXEC) < 0){; - log_it(L_CRITICAL,"Can't switch tun/tap socket to not be passed across execs"); - return NULL; - } - */ - uint8_t *l_tmp_buf; - ssize_t tmp_buf_size; - static int tun_MTU = 100000; /// TODO Replace with detection of MTU size - - l_tmp_buf = (uint8_t *) alloca(tun_MTU); - if ( !l_tmp_buf){ // Can't allocate on stack - log_it(L_ERROR,"Can't allocate read buffer on stack, trying to get one from heap"); - l_tmp_buf = DAP_NEW_SIZE(uint8_t, tun_MTU); - } - tmp_buf_size = 0; - log_it(L_INFO, "Tun/tap thread starts with MTU = %d", tun_MTU); - - fd_set fds_read, fds_read_active; - sigset_t l_sig_mask_proc; - sigset_t l_sig_mask_orig; - sigemptyset (&l_sig_mask_proc); - sigaddset (&l_sig_mask_proc, SIGTERM); - - - FD_ZERO(&fds_read); - FD_SET(s_raw_server->tun_fd, &fds_read); - FD_SET(get_select_breaker(), &fds_read); - /// Main cycle - do { - fds_read_active = fds_read; - int ret = select(FD_SETSIZE, &fds_read_active, NULL, NULL, NULL); - // - if(ret > 0) { - if(FD_ISSET(get_select_breaker(), &fds_read_active)) { // Smth to send - ch_vpn_pkt_t* pkt = srv_ch_sf_raw_read(); - if(pkt) { - int write_ret = write(s_raw_server->tun_fd, pkt->data, pkt->header.op_data.data_size); - if(write_ret > 0) { - //log_it(L_DEBUG, "Wrote out %d bytes to the tun/tap interface", write_ret); - } else { - log_it(L_ERROR, "Tun/tap write %u bytes returned '%s' error, code (%d)", - pkt->header.op_data.data_size, strerror(errno), write_ret); - } - } - } - - if(FD_ISSET(s_raw_server->tun_fd, &fds_read_active)) { - int l_read_ret = read(s_raw_server->tun_fd, l_tmp_buf, tun_MTU); - if(l_read_ret < 0) { - log_it(L_CRITICAL, "Tun/tap read returned '%s' error, code (%d)", strerror(errno), l_read_ret); - break; - } else { - struct iphdr *iph = (struct iphdr*) l_tmp_buf; - struct in_addr in_daddr, in_saddr; - in_daddr.s_addr = iph->daddr; - in_saddr.s_addr = iph->saddr; - char str_daddr[42], str_saddr[42]; - dap_snprintf(str_saddr, sizeof(str_saddr), "%s",inet_ntoa(in_saddr) ); - dap_snprintf(str_daddr, sizeof(str_daddr), "%s",inet_ntoa(in_daddr) ); - - //log_it(L_DEBUG, "Read IP packet from tun/tap interface daddr=%s saddr=%s total_size = %d " - // , str_daddr, str_saddr, read_ret); - dap_chain_net_srv_ch_vpn_t * l_ch_vpn = NULL; - pthread_rwlock_rdlock(&s_clients_rwlock); - HASH_FIND(hh,s_ch_vpn_addrs, &in_daddr, sizeof (in_daddr), l_ch_vpn); -// HASH_ADD_INT(CH_SF(ch)->socks, id, sf_sock ); -// HASH_DEL(CH_SF(ch)->socks,sf_sock); - - /// - if(l_ch_vpn) { // Is present in hash table such destination address - - if (dap_stream_ch_get_ready_to_read(l_ch_vpn->ch ) ){ - dap_chain_net_srv_stream_session_t * l_srv_session = DAP_CHAIN_NET_SRV_STREAM_SESSION (l_ch_vpn->ch->stream->session ); - dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find(l_srv_session, l_ch_vpn->usage_id); - ch_vpn_pkt_t *l_pkt_out = DAP_NEW_Z_SIZE(ch_vpn_pkt_t, sizeof(l_pkt_out->header) + l_read_ret); - l_pkt_out->header.op_code = VPN_PACKET_OP_CODE_VPN_RECV; - l_pkt_out->header.sock_id = s_raw_server->tun_fd; - l_pkt_out->header.usage_id = l_ch_vpn->usage_id; - l_pkt_out->header.op_data.data_size = l_read_ret; - memcpy(l_pkt_out->data, l_tmp_buf, l_read_ret); - dap_stream_ch_pkt_write_unsafe(l_ch_vpn->ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, l_pkt_out, - l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header)); - s_update_limits(l_ch_vpn->ch,l_srv_session,l_usage, l_read_ret); - } - } - pthread_rwlock_unlock(&s_clients_rwlock);\ - } - }/*else { - log_it(L_CRITICAL,"select() has no tun handler in the returned set"); - break; - - }*/ - } else { - //log_it(L_WARNING, "Select returned %d: %s", ret, strerror(errno)); - //break; - } - } while(! s_srv_ch_sf_thread_raw_is_exit ); - log_it(L_NOTICE, "Raw sockets listen thread is stopped"); - s_tun_destroy(); - return NULL; -} - -#ifdef DAP_TUN_IN_WORKER void m_es_tun_delete(dap_events_socket_t * a_es, void * arg) { - log_it(L_WARNING, __PRETTY_FUNCTION__); - log_it(L_NOTICE, "Raw sockets listen thread is stopped"); - s_tun_destroy(); -} - -void m_es_tun_write(dap_events_socket_t * a_es, void * arg) -{ + if (! a_es->_inheritor) // There is moment between inheritor initialization and active live of event socket in worker. + return; + ch_sf_tun_socket_t * l_tun_socket = CH_SF_TUN_SOCKET( a_es ); + pthread_rwlock_wrlock(&s_tun_sockets_rwlock); + HASH_DEL(s_tun_sockets,l_tun_socket); + DAP_DELETE(l_tun_socket); + a_es->_inheritor = NULL; + pthread_rwlock_unlock(&s_tun_sockets_rwlock); + log_it(L_NOTICE,"Destroyed TUN event socket"); } void m_es_tun_read(dap_events_socket_t * a_es, void * arg) @@ -1776,13 +1600,14 @@ void m_es_tun_read(dap_events_socket_t * a_es, void * arg) const static int tun_MTU = 100000; /// TODO Replace with detection of MTU size uint8_t l_tmp_buf[tun_MTU]; - size_t l_read_ret; - + if (! a_es->_inheritor) // There is moment between inheritor initialization and active live of event socket in worker. + return; - l_read_ret = dap_events_socket_pop_from_buf_in(s_raw_server->tun_events_socket, l_tmp_buf, sizeof(l_tmp_buf)); + ch_sf_tun_socket_t * l_tun_socket = CH_SF_TUN_SOCKET(a_es); + size_t l_buf_in_size = a_es->buf_in_size; - if(l_read_ret > 0) { - struct iphdr *iph = (struct iphdr*) l_tmp_buf; + if(l_buf_in_size) { + struct iphdr *iph = (struct iphdr*) a_es->buf_in; struct in_addr in_daddr, in_saddr; in_daddr.s_addr = iph->daddr; in_saddr.s_addr = iph->saddr; @@ -1790,38 +1615,64 @@ void m_es_tun_read(dap_events_socket_t * a_es, void * arg) dap_snprintf(str_saddr, sizeof(str_saddr), "%s",inet_ntoa(in_saddr) ); dap_snprintf(str_daddr, sizeof(str_daddr), "%s",inet_ntoa(in_daddr) ); - dap_chain_net_srv_ch_vpn_t * l_ch_vpn = NULL; - pthread_rwlock_rdlock(&s_clients_rwlock); - HASH_FIND(hh,s_ch_vpn_addrs, &in_daddr, sizeof (in_daddr), l_ch_vpn); - - if(l_ch_vpn) { // Is present in hash table such destination address - dap_chain_net_srv_stream_session_t * l_srv_session = DAP_CHAIN_NET_SRV_STREAM_SESSION (l_ch_vpn->ch->stream->session ); - dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find(l_srv_session, l_ch_vpn->usage_id); - ch_vpn_pkt_t *l_pkt_out = DAP_NEW_Z_SIZE(ch_vpn_pkt_t, sizeof(l_pkt_out->header) + l_read_ret); - l_pkt_out->header.op_code = VPN_PACKET_OP_CODE_VPN_RECV; - l_pkt_out->header.sock_id = s_raw_server->tun_fd; - l_pkt_out->header.usage_id = l_ch_vpn->usage_id; - l_pkt_out->header.op_data.data_size = l_read_ret; - memcpy(l_pkt_out->data, l_tmp_buf, l_read_ret); - dap_stream_ch_pkt_write_mt(l_usage->client->ch->stream_worker , l_usage->client->ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, l_pkt_out, - l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header)); - s_update_limits(l_ch_vpn->ch,l_srv_session,l_usage, l_read_ret); + // + usage_client_t * l_tun_client = NULL; + bool l_tun_server_clients_locked = false; + + // Try to find in worker's clients, without locks + if ( l_tun_socket->clients) + HASH_FIND_INT( s_clients,&in_daddr.s_addr,l_tun_client ); + + if ( l_tun_client == NULL){ + // If not found - try to lock and find in global list + pthread_rwlock_rdlock(& s_raw_server->rwlock ); + l_tun_server_clients_locked = true; + HASH_FIND_INT( s_clients,&in_daddr.s_addr,l_tun_client ); + if ( l_tun_client ){ + if ( s_raw_server->auto_cpu_reassignment ){ + //TODO: first implement dap_events_socket_reassign_worker() + } + } } - pthread_rwlock_unlock(&s_clients_rwlock); - } + if(l_tun_client){ + dap_chain_net_srv_ch_vpn_t * l_ch_vpn = NULL; + pthread_rwlock_rdlock(&s_clients_rwlock); + HASH_FIND(hh,s_ch_vpn_addrs, &in_daddr, sizeof (in_daddr), l_ch_vpn); + + if(l_ch_vpn) { // Is present in hash table such destination address + dap_chain_net_srv_stream_session_t * l_srv_session = DAP_CHAIN_NET_SRV_STREAM_SESSION (l_ch_vpn->ch->stream->session ); + dap_chain_net_srv_usage_t * l_usage = dap_chain_net_srv_usage_find(l_srv_session, l_ch_vpn->usage_id); + ch_vpn_pkt_t *l_pkt_out = DAP_NEW_Z_SIZE(ch_vpn_pkt_t, sizeof(l_pkt_out->header) + l_buf_in_size); + l_pkt_out->header.op_code = VPN_PACKET_OP_CODE_VPN_RECV; + l_pkt_out->header.sock_id = s_raw_server->tun_fd; + l_pkt_out->header.usage_id = l_ch_vpn->usage_id; + l_pkt_out->header.op_data.data_size = l_buf_in_size; + memcpy(l_pkt_out->data, a_es->buf_in, l_buf_in_size); + dap_stream_ch_pkt_write_unsafe(l_usage->client->ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, l_pkt_out, + l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header)); + dap_stream_ch_set_ready_to_write_unsafe(l_ch_vpn->ch, true); + s_update_limits(l_ch_vpn->ch,l_srv_session,l_usage, l_buf_in_size); + } + pthread_rwlock_unlock(&s_clients_rwlock); + } + a_es->buf_in_size = 0; + + if (l_tun_server_clients_locked) + pthread_rwlock_unlock(&s_raw_server->rwlock); + } dap_events_socket_set_readable_unsafe(a_es, true); } void m_es_tun_error(dap_events_socket_t * a_es, void * arg) { - log_it(L_WARNING, __PRETTY_FUNCTION__); + if (! a_es->_inheritor) + return; + log_it(L_ERROR,"%s: error in socket %u (socket type %d)", __PRETTY_FUNCTION__, a_es->socket, a_es->type); } -#endif - -static void s_es_tun_new(dap_events_socket_t * a_es, void * arg) +void m_es_tun_new(dap_events_socket_t * a_es, void * arg) { (void) arg; ch_sf_tun_socket_t * l_tun_socket = DAP_NEW_Z(ch_sf_tun_socket_t); @@ -1839,29 +1690,3 @@ static void s_es_tun_new(dap_events_socket_t * a_es, void * arg) log_it(L_ERROR, "Can't allocate memory for tun socket"); } } - -static void s_es_tun_delete(dap_events_socket_t * a_es, void * arg) -{ - if (! a_es->_inheritor) // There is moment between inheritor initialization and active live of event socket in worker. - return; - - ch_sf_tun_socket_t * l_tun_socket = CH_SF_TUN_SOCKET( a_es ); - pthread_rwlock_wrlock(&s_tun_sockets_rwlock); - HASH_DEL(s_tun_sockets,l_tun_socket); - DAP_DELETE(l_tun_socket); - a_es->_inheritor = NULL; - pthread_rwlock_unlock(&s_tun_sockets_rwlock); - log_it(L_NOTICE,"Destroyed TUN event socket"); -} - -static void s_es_tun_error(dap_events_socket_t * a_es, void * arg) -{ - if (! a_es->_inheritor) - return; - log_it(L_ERROR,"%s: error in socket %u (socket type %d)", __PRETTY_FUNCTION__, a_es->socket, a_es->type); -} - -void s_es_tun_read(dap_events_socket_t * a_es, void * arg) -{ - //TODO: implement -} diff --git a/modules/service/vpn/dap_chain_net_vpn_client_tun.c b/modules/service/vpn/dap_chain_net_vpn_client_tun.c index 9408f4a24154c5acbeecc2968ca6c39e1b6fc3b6..dd3936e554c566866b0d46dd46ca26f566599948 100644 --- a/modules/service/vpn/dap_chain_net_vpn_client_tun.c +++ b/modules/service/vpn/dap_chain_net_vpn_client_tun.c @@ -72,7 +72,6 @@ static char *s_cur_ipv4_server = NULL; static const char *s_conn_name = "nodeVPNClient"; static char *s_last_used_connection_name = NULL, *s_last_used_connection_device = NULL; -static pthread_t s_thread_read_tun_id; static pthread_mutex_t s_clients_mutex; static dap_events_socket_t * s_tun_events_socket = NULL; @@ -231,135 +230,6 @@ static bool is_local_address(const char *a_address) } -/** - * Thread for read from /dev/net/tun - */ -static void* thread_read_tun(void *arg) -{ - //srv_ch_sf_tun_create(); - - if(s_fd_tun <= 0) { - log_it(L_CRITICAL, "Tun/tap file descriptor is not initialized"); - return NULL; - } - /* if (fcntl(raw_server->tun_fd, F_SETFL, O_NONBLOCK) < 0){ ; - log_it(L_CRITICAL,"Can't switch tun/tap socket into the non-block mode"); - return NULL; - } - if (fcntl(raw_server->tun_fd, F_SETFD, FD_CLOEXEC) < 0){; - log_it(L_CRITICAL,"Can't switch tun/tap socket to not be passed across execs"); - return NULL; - } - */ - uint8_t *tmp_buf; -// ssize_t tmp_buf_size; - static int tun_MTU = 100000; /// TODO Replace with detection of MTU size - - tmp_buf = (uint8_t *) calloc(1, tun_MTU); -// tmp_buf_size = 0; - log_it(L_INFO, "Tun/tap thread starts with MTU = %d", tun_MTU); - - fd_set fds_read, fds_read_active; - - FD_ZERO(&fds_read); - FD_SET(s_fd_tun, &fds_read); - FD_SET(get_select_breaker(), &fds_read); - /// Main cycle - do { - fds_read_active = fds_read; - int ret = select(FD_SETSIZE, &fds_read_active, NULL, NULL, NULL); - // - if(ret > 0) { - if(FD_ISSET(get_select_breaker(), &fds_read_active)) { // Smth to send - ch_vpn_pkt_t* pkt = NULL; //TODO srv_ch_sf_raw_read(); - if(pkt) { - int write_ret = write(s_fd_tun, pkt->data, pkt->header.op_data.data_size); - if(write_ret > 0) { - log_it(L_DEBUG, "Wrote out %d bytes to the tun/tap interface", write_ret); - } else { - log_it(L_ERROR, "Tun/tap write %u bytes returned '%s' error, code (%d)", - pkt->header.op_data.data_size, strerror(errno), write_ret); - } - } - } - // there is data in tun for sent to vpn server - if(FD_ISSET(s_fd_tun, &fds_read_active)) { - int read_ret = read(s_fd_tun, tmp_buf, tun_MTU); - if(read_ret < 0) { - log_it(L_CRITICAL, "Tun/tap read returned '%s' error, code (%d)", strerror(errno), read_ret); - break; - } else { - struct iphdr *iph = (struct iphdr*) tmp_buf; - struct in_addr in_daddr, in_saddr; - // destination address - in_daddr.s_addr = iph->daddr; - // source address - in_saddr.s_addr = iph->saddr; - char str_daddr[42], str_saddr[42]; - strncpy(str_saddr, inet_ntoa(in_saddr), sizeof(str_saddr)); - strncpy(str_daddr, inet_ntoa(in_daddr), sizeof(str_daddr)); - - if(iph->tot_len > (uint16_t) read_ret) { - log_it(L_INFO, "Tun/Tap interface returned only the fragment (tot_len =%u read_ret=%d) ", - iph->tot_len, read_ret); - } - if(iph->tot_len < (uint16_t) read_ret) { - log_it(L_WARNING, "Tun/Tap interface returned more then one packet (tot_len =%u read_ret=%d) ", - iph->tot_len, read_ret); - } - - log_it(L_DEBUG, "Read IP packet from tun/tap interface daddr=%s saddr=%s total_size = %d " - , str_daddr, str_saddr, read_ret); - - //dap_client_t *l_client = l_vpn_client ? l_vpn_client->client : NULL; - //DAP_CLIENT_PVT(l_client); - //dap_stream_ch_vpn_remote_single_t * raw_client = l_client->; - -// HASH_FIND_INT(raw_server->clients, &in_daddr.s_addr, raw_client); - // HASH_ADD_INT(CH_SF(ch)->socks, id, sf_sock ); - // HASH_DEL(CH_SF(ch)->socks,sf_sock); -// if(l_stream) { // Is present in hash table such destination address - dap_stream_ch_t *l_ch = dap_chain_net_vpn_client_get_stream_ch(); - dap_stream_worker_t * l_stream_worker = dap_chain_net_vpn_client_get_stream_worker(); - if(l_ch) { - // form packet to vpn-server - ch_vpn_pkt_t *pkt_out = (ch_vpn_pkt_t*) calloc(1, sizeof(pkt_out->header) + read_ret); - pkt_out->header.op_code = VPN_PACKET_OP_CODE_VPN_SEND; //VPN_PACKET_OP_CODE_VPN_RECV - pkt_out->header.sock_id = s_fd_tun; - pkt_out->header.op_data.data_size = read_ret; - memcpy(pkt_out->data, tmp_buf, read_ret); - - // sent packet to vpn server - dap_stream_ch_pkt_write_mt(l_stream_worker,l_ch , DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out, - pkt_out->header.op_data.data_size + sizeof(pkt_out->header)); - - DAP_DELETE(pkt_out); - } - else { - log_it(L_DEBUG, "No remote client for income IP packet with addr %s", inet_ntoa(in_daddr)); - } - - } - }/*else { - log_it(L_CRITICAL,"select() has no tun handler in the returned set"); - break; - - }*/ - } else { - log_it(L_CRITICAL, "Select returned %d", ret); - break; - } - } while(1); - log_it(L_NOTICE, "Raw sockets listen thread is stopped"); - // close tun - if(s_fd_tun > 0) { - int l_fd_tun = s_fd_tun; - s_fd_tun = 0; - close(l_fd_tun); - } - - return NULL; -} int dap_chain_net_vpn_client_tun_init(const char *a_ipv4_server_str) { @@ -541,26 +411,19 @@ int dap_chain_net_vpn_client_tun_create(const char *a_ipv4_addr_str, const char pthread_mutex_init(&s_clients_mutex, NULL); - if(is_dap_tun_in_worker()) { - - static dap_events_socket_callbacks_t l_s_callbacks = { - .read_callback = m_client_tun_read,// for server - .write_callback = m_client_tun_write,// for client - .error_callback = m_client_tun_error, - .delete_callback = m_client_tun_delete - }; + static dap_events_socket_callbacks_t l_s_callbacks = { + .read_callback = m_client_tun_read,// for server + .write_callback = m_client_tun_write,// for client + .error_callback = m_client_tun_error, + .delete_callback = m_client_tun_delete + }; - s_tun_events_socket = dap_events_socket_wrap_no_add(NULL, s_fd_tun, &l_s_callbacks); - s_tun_events_socket->type = DESCRIPTOR_TYPE_FILE; - dap_worker_add_events_socket_auto(s_tun_events_socket); - s_tun_events_socket->_inheritor = NULL; - - return 0; - } - else { - pthread_create(&s_thread_read_tun_id, NULL, thread_read_tun, NULL); - } + s_tun_events_socket = dap_events_socket_wrap_no_add(NULL, s_fd_tun, &l_s_callbacks); + s_tun_events_socket->type = DESCRIPTOR_TYPE_FILE; + dap_worker_add_events_socket_auto(s_tun_events_socket); + s_tun_events_socket->_inheritor = NULL; + return 0; //m_tunDeviceName = dev; //m_tunSocket = fd; @@ -569,13 +432,10 @@ int dap_chain_net_vpn_client_tun_create(const char *a_ipv4_addr_str, const char int dap_chain_net_vpn_client_tun_delete(void) { - if(is_dap_tun_in_worker()) - { - pthread_mutex_lock(&s_clients_mutex); - dap_events_socket_remove_and_delete_mt(s_tun_events_socket->worker, s_tun_events_socket); - s_tun_events_socket = NULL; - pthread_mutex_unlock(&s_clients_mutex); - } + pthread_mutex_lock(&s_clients_mutex); + dap_events_socket_remove_and_delete_mt(s_tun_events_socket->worker, s_tun_events_socket); + s_tun_events_socket = NULL; + pthread_mutex_unlock(&s_clients_mutex); // restore previous routing if(!s_conn_name || !s_last_used_connection_name) diff --git a/modules/service/vpn/include/dap_chain_net_srv_vpn.h b/modules/service/vpn/include/dap_chain_net_srv_vpn.h index 52cc9c62a15f93511a870dce45dbe9680e386637..83ba1f449a874352688440a88c0585995b2080db 100644 --- a/modules/service/vpn/include/dap_chain_net_srv_vpn.h +++ b/modules/service/vpn/include/dap_chain_net_srv_vpn.h @@ -93,6 +93,33 @@ typedef struct ch_vpn_pkt { uint8_t data[]; // Binary data nested by packet }DAP_ALIGN_PACKED ch_vpn_pkt_t; +typedef struct ch_sf_tun_socket ch_sf_tun_socket_t; +typedef struct dap_chain_net_srv_ch_vpn dap_chain_net_srv_ch_vpn_t; + +typedef struct usage_client { + pthread_rwlock_t rwlock; + dap_chain_net_srv_ch_vpn_t * ch_vpn; + dap_chain_net_srv_client_t *net_srv_client; + dap_chain_datum_tx_receipt_t * receipt; + size_t receipt_size; + uint32_t usage_id; + dap_chain_net_srv_t * srv; + ch_sf_tun_socket_t * tun_socket; + UT_hash_handle hh; +} usage_client_t; + +typedef struct ch_sf_tun_socket { + uint8_t worker_id; + dap_worker_t * worker; + dap_events_socket_t * es; + + usage_client_t * clients; // Remote clients identified by destination address + + UT_hash_handle hh; +}ch_sf_tun_socket_t; +#define CH_SF_TUN_SOCKET(a) ((ch_sf_tun_socket_t*) a->_inheritor ) + + /** * @struct ch_vpn_socket_proxy * @brief Internal data storage for single socket proxy functions. Usualy helpfull for\ @@ -137,6 +164,7 @@ typedef struct dap_chain_net_srv_ch_vpn ch_vpn_socket_proxy_t * socks; int raw_l3_sock; bool is_allowed; + ch_sf_tun_socket_t * tun_socket; struct in_addr addr_ipv4; dap_stream_ch_t * ch; @@ -156,57 +184,8 @@ typedef struct dap_chain_net_srv_vpn dap_chain_net_srv_t * parent; } dap_chain_net_srv_vpn_t; - -typedef struct ch_sf_tun_socket ch_sf_tun_socket_t; - -typedef struct ch_sf_tun_client{ - in_addr_t addr; - dap_stream_ch_t * ch; - ch_sf_tun_socket_t * tun_socket; - - uint64_t bytes_sent; - uint64_t bytes_recieved; - - UT_hash_handle hh; -} ch_sf_tun_client_t; - -typedef struct ch_sf_tun_socket { - uint8_t worker_id; - dap_worker_t * worker; - dap_events_socket_t * es; - - ch_sf_tun_client_t * clients; // Remote clients identified by destination address - - UT_hash_handle hh; -}ch_sf_tun_socket_t; -#define CH_SF_TUN_SOCKET(a) ((ch_sf_tun_socket_t*) a->_inheritor ) - -struct ch_sf_tun_server{ - struct in_addr client_addr_last; - struct in_addr int_network_mask; - struct in_addr int_network_addr; - struct in_addr int_network; - int tun_ctl_fd; - struct ifreq ifr; - ch_sf_tun_client_t * clients; // Remote clients identified by destination address - - ch_vpn_pkt_t * pkt_out[400]; - size_t pkt_out_size; - size_t pkt_out_rindex; - size_t pkt_out_windex; - pthread_mutex_t pkt_out_mutex; - - pthread_mutex_t external_route_operations; - - pthread_rwlock_t rwlock; -}; -typedef struct ch_sf_tun_server ch_sf_tun_server_t; -extern ch_sf_tun_server_t * m_tun_server; - #define CH_VPN(a) ((dap_chain_net_srv_ch_vpn_t *) ((a)->internal) ) -bool is_dap_tun_in_worker(void); - int dap_chain_net_srv_client_vpn_init(dap_config_t * g_config); int dap_chain_net_srv_vpn_init(dap_config_t * g_config);