-
Dmitriy A. Gerasimov authored
[*] Reworked and renamed some
5dcc0643
/*
* Authors:
* Alexander Lysikov <alexander.lysikov@demlabs.net>
* DeM Labs Inc. https://demlabs.net
* Kelvin Project https://gitlab.demlabs.net/cellframe
* Copyright (c) 2019
* All rights reserved.
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <pthread.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/select.h>
#include <sys/ioctl.h>
#include <sys/time.h>
#include <sys/epoll.h>
#include "dap_common.h"
#include "dap_config.h"
#include "dap_strfuncs.h"
#include "dap_client.h"
#include "dap_chain_node_client.h"
#include "dap_stream_ch_proc.h"
#include "dap_chain_net_srv_vpn.h"
#include "dap_chain_net_vpn_client.h"
#include "dap_stream_ch_pkt.h"
#include "dap_chain_net_vpn_client_tun.h"
typedef enum dap_http_client_state {
DAP_HTTP_CLIENT_STATE_NONE = 0,
DAP_HTTP_CLIENT_STATE_START = 1,
DAP_HTTP_CLIENT_STATE_HEADERS = 2,
DAP_HTTP_CLIENT_STATE_DATA = 3
} dap_http_client_state_t;
#define LOG_TAG "vpn_client"
static EPOLL_HANDLE sf_socks_epoll_fd;
static ch_vpn_socket_proxy_t *sf_socks = NULL;
static ch_vpn_socket_proxy_t *sf_socks_client = NULL;
static pthread_mutex_t sf_socks_mutex;
static dap_chain_node_info_t *s_node_info = NULL;
static dap_chain_node_client_t *s_vpn_client = NULL;
dap_stream_ch_t* dap_chain_net_vpn_client_get_stream_ch(void)
{
if(!s_vpn_client)
return NULL;
dap_stream_ch_t *l_stream = dap_client_get_stream_ch(s_vpn_client->client, DAP_STREAM_CH_ID_NET_SRV_VPN );
return l_stream;
}
/// TODO convert below callback to processor of stage
/*
void s_stage_callback()
{
char* l_full_path = NULL;
const char * l_path = "stream";
const char *l_suburl = "globaldb";
int l_full_path_size = snprintf(l_full_path, 0, "%s/%s?session_id=%s", DAP_UPLINK_PATH_STREAM, l_suburl,
dap_client_get_stream_id(a_client_pvt->client));
l_full_path = DAP_NEW_Z_SIZE(char, l_full_path_size + 1);
snprintf(l_full_path, l_full_path_size + 1, "%s/%s?session_id=%s", DAP_UPLINK_PATH_STREAM, l_suburl,
dap_client_get_stream_id(a_client_pvt->client));
//dap_client_request(a_client_pvt->client, l_full_path, "12345", 0, m_stream_response, m_stream_error);
const char *l_add_str = "";
// if connect to vpn server
const char l_active_vpn_channels[] = { VPN_CLIENT_ID, 0 };
if(!dap_strcmp(a_client_pvt->active_channels, l_active_vpn_channels))
l_add_str = "\r\nService-Key: test";
{
char *l_message = dap_strdup_printf("GET /%s HTTP/1.1\r\nHost: %s:%d%s\r\n\r\n",
l_full_path, a_client_pvt->uplink_addr, a_client_pvt->uplink_port, l_add_str);
size_t l_message_size = dap_strlen(l_message);
int count = send(a_client_pvt->stream_socket, l_message, l_message_size, 0);
DAP_DELETE(l_message);
}
DAP_DELETE(l_full_path);
}*/
/**
* Start VPN client
*
* return: 0 Ok, 1 Already started, <0 Error
*/
int dap_chain_net_vpn_client_start(dap_chain_net_t *a_net, const char *a_ipv4_str, const char *a_ipv6_str, int a_port)
{
int l_ret = 0;
if(!a_ipv4_str) // && !a_ipv6_str)
return -1;
/*
dap_client_t *l_client = DAP_NEW_Z(dap_client_t);
dap_events_t *l_events = NULL; //dap_events_new();
l_client = dap_client_new(l_events, s_stage_status_callback, s_stage_status_error_callback);
char l_channels[2] = { VPN_CLIENT_ID, 0 };
dap_client_set_active_channels(l_client, l_channels);
dap_client_set_uplink(l_client, strdup(a_ip_v4), a_port);
dap_client_go_stage(l_client, STAGE_STREAM_STREAMING, s_stage_connected_callback);
*/
if(!s_node_info)
s_node_info = DAP_NEW_Z(dap_chain_node_info_t);
s_node_info->hdr.ext_port = a_port;
dap_client_stage_t l_stage_target = STAGE_STREAM_STREAMING; //DAP_CLIENT_STAGE_STREAM_CTL;//STAGE_STREAM_STREAMING;
const char l_active_channels[] = { DAP_STREAM_CH_ID_NET_SRV_VPN , 0 };
if(a_ipv4_str)
inet_pton(AF_INET, a_ipv4_str, &(s_node_info->hdr.ext_addr_v4));
if(a_ipv6_str)
inet_pton(AF_INET6, a_ipv6_str, &(s_node_info->hdr.ext_addr_v6));
s_vpn_client = dap_chain_client_connect(s_node_info, l_stage_target, l_active_channels);
if(!s_vpn_client) {
log_it(L_ERROR, "Can't connect to VPN server=%s:%d", a_ipv4_str, a_port);
// clean client struct
dap_chain_node_client_close(s_vpn_client);
DAP_DELETE(s_node_info);
s_node_info = NULL;
return -2;
}
// wait connected
int timeout_ms = 500000; //5 sec = 5000 ms
int res = dap_chain_node_client_wait(s_vpn_client, NODE_CLIENT_STATE_CONNECTED, timeout_ms);
if(res) {
log_it(L_ERROR, "No response from VPN server=%s:%d", a_ipv4_str, a_port);
// clean client struct
dap_chain_node_client_close(s_vpn_client);
DAP_DELETE(s_node_info);
s_node_info = NULL;
return -3;
}
l_ret = dap_chain_net_vpn_client_tun_init(a_ipv4_str);
// send first packet to server
// if(0)
{
dap_stream_ch_t *l_ch = dap_chain_net_vpn_client_get_stream_ch();
if(l_ch) { // Is present in hash table such destination address
size_t l_ipv4_str_len = 0; //dap_strlen(a_ipv4_str);
ch_vpn_pkt_t *pkt_out = (ch_vpn_pkt_t*) calloc(1, sizeof(pkt_out->header) + l_ipv4_str_len);
pkt_out->header.op_code = VPN_PACKET_OP_CODE_VPN_ADDR_REQUEST;
//pkt_out->header.sock_id = l_stream->stream->events_socket->socket;
//pkt_out->header.op_connect.addr_size = l_ipv4_str_len; //remoteAddrBA.length();
//pkt_out->header.op_connect.port = a_port;
//memcpy(pkt_out->data, a_ipv4_str, l_ipv4_str_len);
dap_stream_ch_pkt_write(l_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pkt_out,
pkt_out->header.op_data.data_size + sizeof(pkt_out->header));
dap_stream_ch_set_ready_to_write(l_ch, true);
DAP_DELETE(pkt_out);
}
}
/* dap_stream_ch_t *l_stream = dap_client_get_stream_ch(s_vpn_client->client, VPN_CLIENT_ID);//dap_stream_ch_chain_get_id());
size_t l_res = dap_stream_ch_chain_pkt_write(l_stream,
VPN_PACKET_OP_CODE_CONNECT, a_net->pub.id, (dap_chain_id_t ) { { 0 } },
a_net->pub.cell_id, NULL, 0);*/
//return l_ret;
// send connect packet to server
/* {
dap_stream_ch_t *l_stream = dap_chain_net_vpn_client_get_stream();
if(l_stream) { // Is present in hash table such destination address
size_t l_ipv4_str_len = dap_strlen(a_ipv4_str);
ch_vpn_pkt_t *pkt_out = (ch_vpn_pkt_t*) calloc(1, sizeof(pkt_out->header) + l_ipv4_str_len);
pkt_out->header.op_code = VPN_PACKET_OP_CODE_CONNECT;
pkt_out->header.sock_id = l_stream->stream->events_socket->socket;
pkt_out->header.op_connect.addr_size = l_ipv4_str_len; //remoteAddrBA.length();
pkt_out->header.op_connect.port = a_port;
memcpy(pkt_out->data, a_ipv4_str, l_ipv4_str_len);
// pkt_out->header.op_code = VPN_PACKET_OP_CODE_VPN_RECV;
// pkt_out->header.sock_id = 123;
// pkt_out->header.op_data.data_size = 0;
//memcpy(pkt_out->data, 0, 0);
dap_stream_ch_pkt_write(l_stream, DATA_CHANNEL_ID, pkt_out,
pkt_out->header.op_data.data_size + sizeof(pkt_out->header));
dap_stream_ch_set_ready_to_write(l_stream, true);
DAP_DELETE(pkt_out);
}
}*/
//l_ret = dap_chain_net_vpn_client_tun_init(a_ipv4_str);
/* {
dap_stream_ch_t *l_stream = dap_chain_net_vpn_client_get_stream();
if(l_stream) { // Is present in hash table such destination address
ch_vpn_pkt_t *pkt_out = (ch_vpn_pkt_t*) calloc(1, sizeof(pkt_out->header) + 0);
pkt_out->header.op_code = VPN_PACKET_OP_CODE_VPN_RECV;
pkt_out->header.sock_id = 123;
pkt_out->header.op_data.data_size = 0;
//memcpy(pkt_out->data, 0, 0);
dap_stream_ch_pkt_write(l_stream, DATA_CHANNEL_ID, pkt_out,
pkt_out->header.op_data.data_size + sizeof(pkt_out->header));
dap_stream_ch_set_ready_to_write(l_stream, true);
}
}*/
return l_ret;
}
int dap_chain_net_vpn_client_stop(void)
{
// delete connection with VPN server
if(!s_vpn_client) {
dap_chain_node_client_close(s_vpn_client);
s_vpn_client = NULL;
}
DAP_DELETE(s_node_info);
s_node_info = NULL;
int l_ret = dap_chain_net_vpn_client_tun_delete();
return l_ret;
}
int dap_chain_net_vpn_client_status(void)
{
if(!dap_chain_net_vpn_client_tun_status())
// VPN client started
return 1;
return 0;
}
static void vpn_socket_delete(ch_vpn_socket_proxy_t * sf)
{
close(sf->sock);
pthread_mutex_destroy(&(sf->mutex));
if(sf)
free(sf);
}
static void send_pong_pkt(dap_stream_ch_t* a_ch)
{
// log_it(L_DEBUG,"---------------------------------- PONG!");
ch_vpn_pkt_t *pkt_out = (ch_vpn_pkt_t*) calloc(1, sizeof(pkt_out->header));
pkt_out->header.op_code = VPN_PACKET_OP_CODE_PONG;
dap_stream_ch_pkt_write(a_ch, 'd', pkt_out,
pkt_out->header.op_data.data_size + sizeof(pkt_out->header));
dap_stream_ch_set_ready_to_write(a_ch, true);
free(pkt_out);
}
/**
* @brief dap_chain_net_vpn_client_pkt_in
* @param a_ch
* @param a_arg
*/
void dap_chain_net_vpn_client_pkt_in(dap_stream_ch_t* a_ch, dap_stream_ch_pkt_t* a_pkt)
{
ch_vpn_pkt_t * l_sf_pkt = (ch_vpn_pkt_t *) a_pkt->data;
size_t l_sf_pkt_data_size = a_pkt->hdr.size - sizeof(l_sf_pkt->header);
if(!a_pkt->hdr.size) {
log_it(L_WARNING, "Bad input packet");
return;
}
int remote_sock_id = l_sf_pkt->header.sock_id;
if(l_sf_pkt->header.op_code == 0) { // Raw packets
log_it(L_WARNING, "Bad op_code=0");
return;
}
// log_it(L_DEBUG,"Got SF packet: remote_sock_id:%d op_code:0x%02x data_size:%lu"
// ,remote_sock_id, l_sf_pkt->header.op_code, l_sf_pkt_data_size );
if(l_sf_pkt->header.op_code >= 0xb0) { // Raw packets
switch (l_sf_pkt->header.op_code) {
case VPN_PACKET_OP_CODE_VPN_ADDR_REPLY: { // Assigned address for peer
if(ch_sf_tun_addr_leased(CH_VPN(a_ch), l_sf_pkt, l_sf_pkt_data_size) < 0) {
log_it(L_WARNING, "Can't create tun");
}
}
break;
case VPN_PACKET_OP_CODE_VPN_ADDR_REQUEST: // Client request after L3 connection the new IP address
log_it(L_WARNING, "Got VPN_PACKET_OP_CODE_VPN_ADDR_REQUEST packet with id %d, it's very strange' ",
remote_sock_id);
break;
case VPN_PACKET_OP_CODE_VPN_SEND:
log_it(L_WARNING, "Got VPN_PACKET_OP_CODE_VPN_SEND packet with id %d, it's very strange' ", remote_sock_id);
case VPN_PACKET_OP_CODE_VPN_RECV:
a_ch->stream->events_socket->last_ping_request = time(NULL); // not ping, but better ;-)
ch_sf_tun_send(CH_VPN(a_ch), l_sf_pkt->data, l_sf_pkt->header.op_data.data_size);
break;
case VPN_PACKET_OP_CODE_PING:
a_ch->stream->events_socket->last_ping_request = time(NULL);
send_pong_pkt(a_ch);
break;
case VPN_PACKET_OP_CODE_PONG:
a_ch->stream->events_socket->last_ping_request = time(NULL);
break;
default:
log_it(L_WARNING, "Can't process SF type 0x%02x", l_sf_pkt->header.op_code);
}
} else { // All except CONNECT
ch_vpn_socket_proxy_t * sf_sock = NULL;
if((l_sf_pkt->header.op_code != VPN_PACKET_OP_CODE_CONNECT) // tcp
&& (l_sf_pkt->header.op_code != VPN_PACKET_OP_CODE_CONNECTED)) { //udp
pthread_mutex_lock(&( CH_VPN(a_ch)->mutex));
log_it(L_DEBUG, "Looking in hash table with %d", remote_sock_id);
HASH_FIND_INT((CH_VPN(a_ch)->socks), &remote_sock_id, sf_sock);
pthread_mutex_unlock(&( CH_VPN(a_ch)->mutex));
if(sf_sock != NULL) {
pthread_mutex_lock(&sf_sock->mutex); // Unlock it in your case as soon as possible to reduce lock time
sf_sock->time_lastused = time(NULL);
switch (l_sf_pkt->header.op_code) {
case VPN_PACKET_OP_CODE_SEND: {
int ret;
if((ret = send(sf_sock->sock, l_sf_pkt->data, l_sf_pkt->header.op_data.data_size, 0)) < 0) {
log_it(L_INFO, "Disconnected from the remote host");
pthread_mutex_unlock(&sf_sock->mutex);
pthread_mutex_lock(&( CH_VPN(a_ch)->mutex));
HASH_DEL(CH_VPN(a_ch)->socks, sf_sock);
pthread_mutex_unlock(&( CH_VPN(a_ch)->mutex));
pthread_mutex_lock(&sf_socks_mutex);
HASH_DELETE(hh2, sf_socks, sf_sock);
HASH_DELETE(hh_sock, sf_socks_client, sf_sock);
struct epoll_event ev;
ev.data.fd = sf_sock->sock;
ev.events = EPOLLIN;
if(epoll_ctl(sf_socks_epoll_fd, EPOLL_CTL_DEL, sf_sock->sock, &ev) < 0) {
log_it(L_ERROR, "Can't remove sock_id %d from the epoll fd", remote_sock_id);
//stream_ch_pkt_write_f(ch,'i',"sock_id=%d op_code=0x%02x result=-2",sf_pkt->sock_id, sf_pkt->op_code);
} else {
log_it(L_NOTICE, "Removed sock_id %d from the the epoll fd", remote_sock_id);
//stream_ch_pkt_write_f(ch,'i',"sock_id=%d op_code=0x%02x result=0",sf_pkt->sock_id, sf_pkt->op_code);
}
pthread_mutex_unlock(&sf_socks_mutex);
vpn_socket_delete(sf_sock);
} else {
sf_sock->bytes_sent += ret;
pthread_mutex_unlock(&sf_sock->mutex);
}
log_it(L_INFO, "Send action from %d sock_id (sf_packet size %lu, ch packet size %lu, have sent %d)"
, sf_sock->id, l_sf_pkt->header.op_data.data_size, a_pkt->hdr.size, ret);
}
break;
case VPN_PACKET_OP_CODE_DISCONNECT: {
log_it(L_INFO, "Disconnect action from %d sock_id", sf_sock->id);
pthread_mutex_lock(&( CH_VPN(a_ch)->mutex));
HASH_DEL(CH_VPN(a_ch)->socks, sf_sock);
pthread_mutex_unlock(&( CH_VPN(a_ch)->mutex));
pthread_mutex_lock(&sf_socks_mutex);
HASH_DELETE(hh2, sf_socks, sf_sock);
HASH_DELETE(hh_sock, sf_socks_client, sf_sock);
struct epoll_event ev;
ev.data.fd = sf_sock->sock;
ev.events = EPOLLIN;
if(epoll_ctl(sf_socks_epoll_fd, EPOLL_CTL_DEL, sf_sock->sock, &ev) < 0) {
log_it(L_ERROR, "Can't remove sock_id %d to the epoll fd", remote_sock_id);
//stream_ch_pkt_write_f(ch,'i',"sock_id=%d op_code=%uc result=-2",sf_pkt->sock_id, sf_pkt->op_code);
} else {
log_it(L_NOTICE, "Removed sock_id %d from the epoll fd", remote_sock_id);
//stream_ch_pkt_write_f(ch,'i',"sock_id=%d op_code=%uc result=0",sf_pkt->sock_id, sf_pkt->op_code);
}
pthread_mutex_unlock(&sf_socks_mutex);
pthread_mutex_unlock(&sf_sock->mutex);
vpn_socket_delete(sf_sock);
}
break;
default: {
log_it(L_WARNING, "Unprocessed op code 0x%02x", l_sf_pkt->header.op_code);
pthread_mutex_unlock(&sf_sock->mutex);
}
}
} else
log_it(L_WARNING, "Packet input: packet with sock_id %d thats not present in current stream channel",
remote_sock_id);
} else {
HASH_FIND_INT(CH_VPN(a_ch)->socks, &remote_sock_id, sf_sock);
if(sf_sock) {
log_it(L_WARNING, "Socket id %d is already used, take another number for socket id", remote_sock_id);
} else { // Connect action
struct sockaddr_in remote_addr;
char addr_str[1024];
size_t addr_str_size =
(l_sf_pkt->header.op_connect.addr_size > (sizeof(addr_str) - 1)) ?
(sizeof(addr_str) - 1) :
l_sf_pkt->header.op_connect.addr_size;
memset(&remote_addr, 0, sizeof(remote_addr));
remote_addr.sin_family = AF_INET;
remote_addr.sin_port = htons(l_sf_pkt->header.op_connect.port);
memcpy(addr_str, l_sf_pkt->data, addr_str_size);
addr_str[addr_str_size] = 0;
log_it(L_DEBUG, "Connect action to %s:%u (addr_size %lu)", addr_str, l_sf_pkt->header.op_connect.port,
l_sf_pkt->header.op_connect.addr_size);
if(inet_pton(AF_INET, addr_str, &(remote_addr.sin_addr)) < 0) {
log_it(L_ERROR, "Wrong remote address '%s:%u'", addr_str, l_sf_pkt->header.op_connect.port);
ch_vpn_pkt_t *l_pkt_out = DAP_NEW_Z(ch_vpn_pkt_t);
l_pkt_out->header.op_code = VPN_PACKET_OP_CODE_PROBLEM;
dap_stream_ch_pkt_write(a_ch, 'd', l_pkt_out,
l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header));
dap_stream_ch_set_ready_to_write(a_ch, true);
free(l_pkt_out);
} else {
int s;
if((s = socket(AF_INET,
(l_sf_pkt->header.op_code == VPN_PACKET_OP_CODE_CONNECT) ?
SOCK_STREAM :
SOCK_DGRAM, 0)) >= 0) {
log_it(L_DEBUG, "Socket is created (%d)", s);
if(connect(s, (struct sockaddr *) &remote_addr, sizeof(remote_addr)) >= 0) {
fcntl(s, F_SETFL, O_NONBLOCK);
log_it(L_INFO, "Remote address connected (%s:%u) with sock_id %d", addr_str,
l_sf_pkt->header.op_connect.port, remote_sock_id);
ch_vpn_socket_proxy_t * sf_sock = NULL;
sf_sock = DAP_NEW_Z(ch_vpn_socket_proxy_t);
sf_sock->id = remote_sock_id;
sf_sock->sock = s;
sf_sock->ch = a_ch;
pthread_mutex_init(&sf_sock->mutex, NULL);
pthread_mutex_lock(&sf_socks_mutex);
pthread_mutex_lock(&( CH_VPN(a_ch)->mutex));
HASH_ADD_INT(CH_VPN(a_ch)->socks, id, sf_sock);
log_it(L_DEBUG, "Added %d sock_id with sock %d to the hash table", sf_sock->id,
sf_sock->sock);
HASH_ADD(hh2, sf_socks, id, sizeof(sf_sock->id), sf_sock);
log_it(L_DEBUG, "Added %d sock_id with sock %d to the hash table", sf_sock->id,
sf_sock->sock);
HASH_ADD(hh_sock, sf_socks_client, sock, sizeof(int), sf_sock);
// log_it(L_DEBUG,"Added %d sock_id with sock %d to the socks hash table",sf->id,sf->sock);
pthread_mutex_unlock(&( CH_VPN(a_ch)->mutex));
pthread_mutex_unlock(&sf_socks_mutex);
struct epoll_event ev;
ev.data.fd = s;
ev.events = EPOLLIN | EPOLLERR;
if(epoll_ctl(sf_socks_epoll_fd, EPOLL_CTL_ADD, s, &ev) == -1) {
log_it(L_ERROR, "Can't add sock_id %d to the epoll fd", remote_sock_id);
//stream_ch_pkt_write_f(ch,'i',"sock_id=%d op_code=%uc result=-2",sf_pkt->sock_id, sf_pkt->op_code);
} else {
log_it(L_NOTICE, "Added sock_id %d with sock %d to the epoll fd", remote_sock_id, s);
//stream_ch_pkt_write_f(ch,'i',"sock_id=%d op_code=%uc result=0",sf_pkt->sock_id, sf_pkt->op_code);
}
dap_stream_ch_set_ready_to_write(a_ch, true);
} else {
ch_vpn_pkt_t *l_pkt_out = (ch_vpn_pkt_t*) calloc(1, sizeof(l_pkt_out->header));
l_pkt_out->header.op_code = VPN_PACKET_OP_CODE_PROBLEM;
dap_stream_ch_pkt_write(a_ch, 'd', l_pkt_out,
l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header));
dap_stream_ch_set_ready_to_write(a_ch, true);
free(l_pkt_out);
log_it(L_INFO, "Can't connect to the remote server %s", addr_str);
dap_stream_ch_pkt_write_f(a_ch, 'i', "sock_id=%d op_code=%c result=-1",
l_sf_pkt->header.sock_id, l_sf_pkt->header.op_code);
dap_stream_ch_set_ready_to_write(a_ch, true);
}
} else {
log_it(L_ERROR, "Can't create the socket");
ch_vpn_pkt_t *l_pkt_out = (ch_vpn_pkt_t*) calloc(1, sizeof(l_pkt_out->header));
l_pkt_out->header.op_code = VPN_PACKET_OP_CODE_PROBLEM;
dap_stream_ch_pkt_write(a_ch, 'd', l_pkt_out,
l_pkt_out->header.op_data.data_size + sizeof(l_pkt_out->header));
dap_stream_ch_set_ready_to_write(a_ch, true);
free(l_pkt_out);
}
}
}
}
}
}
/**
* @brief dap_chain_net_vpn_client_pkt_out
* @param a_ch
*/
void dap_chain_net_vpn_client_pkt_out(dap_stream_ch_t* a_ch)
{
ch_vpn_socket_proxy_t * l_cur = NULL, *l_tmp;
bool l_is_smth_out = false;
// log_it(L_DEBUG,"Socket forwarding packet out callback: %u sockets in hashtable", HASH_COUNT(CH_VPN(ch)->socks) );
HASH_ITER(hh, CH_VPN(a_ch)->socks , l_cur, l_tmp)
{
bool l_signal_to_break = false;
pthread_mutex_lock(&(l_cur->mutex));
int i;
// log_it(L_DEBUG,"Socket with id %d has %u packets in output buffer", cur->id, cur->pkt_out_size );
if(l_cur->pkt_out_size) {
for(i = 0; i < l_cur->pkt_out_size; i++) {
ch_vpn_pkt_t * pout = l_cur->pkt_out[i];
if(pout) {
if(dap_stream_ch_pkt_write(a_ch, DAP_STREAM_CH_PKT_TYPE_NET_SRV_VPN_DATA, pout, pout->header.op_data.data_size + sizeof(pout->header))) {
l_is_smth_out = true;
if(pout)
free(pout);
l_cur->pkt_out[i] = NULL;
} else {
log_it(L_WARNING,
"Buffer is overflowed, breaking cycle to let the upper level cycle drop data to the output socket");
l_is_smth_out = true;
l_signal_to_break = true;
break;
}
}
}
}
if(l_signal_to_break) {
pthread_mutex_unlock(&(l_cur->mutex));
break;
}
l_cur->pkt_out_size = 0;
if(l_cur->signal_to_delete) {
log_it(L_NOTICE, "Socket id %d got signal to be deleted", l_cur->id);
pthread_mutex_lock(&( CH_VPN(a_ch)->mutex));
HASH_DEL(CH_VPN(a_ch)->socks, l_cur);
pthread_mutex_unlock(&( CH_VPN(a_ch)->mutex));
pthread_mutex_lock(&(sf_socks_mutex));
HASH_DELETE(hh2, sf_socks, l_cur);
HASH_DELETE(hh_sock, sf_socks_client, l_cur);
pthread_mutex_unlock(&(sf_socks_mutex));
pthread_mutex_unlock(&(l_cur->mutex));
vpn_socket_delete(l_cur);
} else
pthread_mutex_unlock(&(l_cur->mutex));
}
/* ch->writable = isSmthOut;
if(isSmthOut) {
if(ch->stream->conn_http)
ch->stream->conn_http->state_write = DAP_HTTP_CLIENT_STATE_DATA; //SAP_HTTP_CONN_STATE_DATA;
}*/
}
int dap_chain_net_vpn_client_init(dap_config_t * g_config)
{
pthread_mutex_init(&sf_socks_mutex, NULL);
return 0;
}
void dap_chain_net_vpn_client_deinit()
{
pthread_mutex_destroy(&sf_socks_mutex);
}