Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/libdap-client
1 result
Show changes
Commits on Source (3)
......@@ -117,7 +117,7 @@ void dap_client_request_enc(dap_client_t * a_client, const char * a_path,const c
void dap_client_request(dap_client_t * a_client, const char * a_full_path, void * a_request, size_t a_request_size,
dap_client_callback_data_size_t a_response_proc, dap_client_callback_int_t a_response_error);
int dap_client_disconnect(dap_client_t * a_client);
//int dap_client_disconnect(dap_client_t * a_client);
const char * dap_client_get_stage_str(dap_client_t * a_client);
const char * dap_client_stage_str(dap_client_stage_t a_stage);
......@@ -136,7 +136,7 @@ void dap_client_set_active_channels (dap_client_t * a_client, const char * a_act
dap_client_stage_t dap_client_get_stage(dap_client_t * a_client);
dap_client_stage_status_t dap_client_get_stage_status(dap_client_t * a_client);
#define DAP_CLIENT(a) ((dap_client_t *) (a)->_inheritor )
#define DAP_CLIENT(a) (a ? (dap_client_t *) (a)->_inheritor : NULL)
#ifdef __cplusplus
}
......
/*
* Authors:
* Alexander Lysikov <alexander.lysikov@demlabs.net>
* DeM Labs Inc. https://demlabs.net
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef void (*dap_client_http_callback_error_t)(int, void *); // Callback for specific http client operations
typedef void (*dap_client_http_callback_data_t)(void *, size_t, void *); // Callback for specific http client operations
void* dap_client_http_request_custom(const char *a_uplink_addr, uint16_t a_uplink_port, const char *a_method,
const char *a_request_content_type, const char * a_path, void *a_request, size_t a_request_size, char *a_cookie,
dap_client_http_callback_data_t a_response_callback, dap_client_http_callback_error_t a_error_callback,
void *a_obj, char **a_custom, size_t a_custom_count);
void* dap_client_http_request(const char *a_uplink_addr, uint16_t a_uplink_port, const char * a_method,
const char* a_request_content_type, const char * a_path, void *a_request, size_t a_request_size,
char * a_cookie, dap_client_http_callback_data_t a_response_callback,
dap_client_http_callback_error_t a_error_callback, void *a_obj, void * a_custom);
#ifdef __cplusplus
}
#endif
......@@ -50,7 +50,8 @@ typedef struct dap_client_internal
char * session_key_id;
//void *curl;// curl connection descriptor
//long curl_sockfd;// curl socket descriptor
char * last_parsed_node;
char * uplink_addr;
......@@ -80,7 +81,7 @@ typedef struct dap_client_internal
dap_client_callback_int_t request_error_callback;
} dap_client_pvt_t;
#define DAP_CLIENT_PVT(a) ((dap_client_pvt_t*) a->_internal )
#define DAP_CLIENT_PVT(a) (a ? (dap_client_pvt_t*) a->_internal : NULL)
int dap_client_pvt_init();
void dap_client_pvt_deinit();
......@@ -99,5 +100,12 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char
void dap_client_pvt_new(dap_client_pvt_t * a_client_internal);
void dap_client_pvt_delete(dap_client_pvt_t * a_client_pvts);
int dap_client_pvt_ref(dap_client_pvt_t * a_client_internal);
int dap_client_pvt_unref(dap_client_pvt_t * a_client_internal);
//int dap_client_pvt_ref(dap_client_pvt_t * a_client_internal);
//int dap_client_pvt_unref(dap_client_pvt_t * a_client_internal);
// from dap_client_pvt_hh.c
int dap_client_pvt_hh_lock(void);
int dap_client_pvt_hh_unlock(void);
void* dap_client_pvt_hh_get(dap_client_pvt_t* a_client_pvt);
int dap_client_pvt_hh_add(dap_client_pvt_t* a_client_pvt);
int dap_client_pvt_hh_del(dap_client_pvt_t *a_client_pvt);
......@@ -190,7 +190,7 @@ void dap_client_delete(dap_client_t * a_client)
pthread_mutex_lock(&a_client->mutex);
dap_client_disconnect(a_client);
//dap_client_disconnect(a_client);
//dap_client_reset(a_client);
//dap_client_pvt_t *l_client_pvt = DAP_CLIENT_PVT(a_client);
......@@ -199,6 +199,7 @@ void dap_client_delete(dap_client_t * a_client)
//a_client->_internal = NULL;
dap_client_pvt_delete(DAP_CLIENT_PVT(a_client));
//a_client->_internal = NULL;
//pthread_mutex_t *l_mutex = &a_client->mutex;
//memset(a_client, 0, sizeof(dap_client_t));
......@@ -206,6 +207,7 @@ void dap_client_delete(dap_client_t * a_client)
pthread_mutex_unlock(&a_client->mutex);
// a_client will be deleted in dap_events_socket_delete() -> free( a_es->_inheritor );
//DAP_DELETE(a_client);
DAP_DELETE(a_client);
}
/**
......@@ -314,41 +316,6 @@ void dap_client_request(dap_client_t * a_client, const char * a_full_path, void
dap_client_pvt_request(l_client_internal, a_full_path, a_request, a_request_size, a_response_proc, a_response_error);
}
/**
* @brief dap_client_disconnect
* @param a_client
* @return
*/
int dap_client_disconnect( dap_client_t *a_client )
{
dap_client_pvt_t *l_client_internal = (a_client) ? DAP_CLIENT_PVT(a_client) : NULL;
if ( l_client_internal && l_client_internal->stream_socket ) {
// if ( l_client_internal->stream_es ) {
// dap_events_socket_remove_and_delete( l_client_internal->stream_es, true );
// l_client_internal->stream_es = NULL;
// }
// l_client_internal->stream_es->signal_close = true;
dap_events_kill_socket( l_client_internal->stream_es );
// if (l_client_internal->stream_socket ) {
// close (l_client_internal->stream_socket);
l_client_internal->stream_socket = 0;
// }
return 1;
}
//l_client_internal->stream_socket = 0;
l_client_internal->is_reconnect = false;
log_it(L_DEBUG, "dap_client_disconnect( ) done" );
return -1;
}
/**
* @brief dap_client_error_str
* @param a_client_error
......
/*
* Authors:
* Alexander Lysikov <alexander.lysikov@demlabs.net>
* DeM Labs Inc. https://demlabs.net
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef WIN32
// for Windows
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#else
// for Unix-like systems
#include <sys/types.h>
#include <sys/socket.h>
//#include <bits/socket_type.h>
#endif
#include <unistd.h>
#include "dap_common.h"
#include "dap_strfuncs.h"
#include "dap_string.h"
#include "dap_events_socket.h"
#include "dap_stream_ch_proc.h"
#include "dap_server.h"
#include "dap_client.h"
#include "dap_client_pvt.h"
#include "dap_client_http.h"
#define LOG_TAG "dap_client_http"
#define DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX 65536 //40960
typedef struct dap_http_client_internal {
dap_client_http_callback_data_t response_callback;
dap_client_http_callback_error_t error_callback;
void *obj; // dap_client_pvt_t *client_pvt;
uint8_t *request;
size_t request_size;
size_t request_sent_size;
int socket;
bool is_header_read;
size_t header_length;
size_t content_length;
uint8_t *response;
size_t response_size;
size_t response_size_max;
} dap_client_http_internal_t;
#define DAP_CLIENT_HTTP(a) (a ? (dap_client_http_internal_t *) (a)->_inheritor : NULL)
/**
* @brief s_http_new
* @param a_es
* @param arg
*/
static void s_http_new(dap_events_socket_t * a_es, void * arg)
{
log_it(L_DEBUG, "s_http_new ");
dap_client_http_internal_t * l_client_http_internal = DAP_CLIENT_HTTP(a_es);
if(!l_client_http_internal) {
log_it(L_ERROR, "s_http_new: l_client_http_internal is NULL!");
return;
}
l_client_http_internal->header_length = 0;
l_client_http_internal->content_length = 0;
l_client_http_internal->response_size = 0;
l_client_http_internal->response_size_max = DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX;
l_client_http_internal->response = (uint8_t*) DAP_NEW_SIZE(uint8_t, DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX);
}
/**
* @brief s_http_stream_write
* @param a_es
* @param arg
*/
static void s_http_write(dap_events_socket_t * a_es, void * arg)
{
log_it(L_DEBUG, "s_http_write ");
// dap_client_http_internal_t * l_client_http_internal = DAP_CLIENT_HTTP(a_es);
// if(!l_client_internal) {
// log_it(L_ERROR, "s_http_write: l_client_http_internal is NULL!");
// return;
// }
//bool ready_to_write = false;
//dap_events_socket_set_writable(a_es, ready_to_write);
}
/**
* @brief s_http_stream_read
* @param a_es
* @param arg
*/
static void s_http_read(dap_events_socket_t * a_es, void * arg)
{
log_it(L_DEBUG, "s_http_read ");
dap_client_http_internal_t * l_client_http_internal = DAP_CLIENT_HTTP(a_es);
if(!l_client_http_internal) {
log_it(L_ERROR, "s_http_read: l_client_http_internal is NULL!");
return;
}
// read data
l_client_http_internal->response_size += dap_events_socket_read(a_es,
l_client_http_internal->response + l_client_http_internal->response_size,
l_client_http_internal->response_size_max - l_client_http_internal->response_size);
// if buffer is overfull then read once more
if(l_client_http_internal->response_size >= DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX) {
log_it(L_ERROR, "s_http_read response_size(%d) overfull!!!", l_client_http_internal->response_size);
}
// search http header
if(!l_client_http_internal->is_header_read && l_client_http_internal->response_size > 4
&& !l_client_http_internal->content_length) {
for(size_t l_pos = 0; l_pos < l_client_http_internal->response_size - 4; l_pos++) {
uint8_t *l_str = l_client_http_internal->response + l_pos;
if(!dap_strncmp((const char*) l_str, "\r\n\r\n", 4)) {
l_client_http_internal->header_length = l_pos + 4;
l_client_http_internal->is_header_read = true;
//dap_events_socket_shrink_buf_in(a_es, l_client_internal->header_size);
break;
}
}
}
// process http header
if(l_client_http_internal->is_header_read) {
l_client_http_internal->response[l_client_http_internal->header_length - 1] = 0;
// search strings in header
char **l_strings = dap_strsplit((char*) l_client_http_internal->response, "\r\n", -1);
if(l_strings) {
int i = 0;
while(l_strings[i]) {
char *l_string = l_strings[i];
char **l_values = dap_strsplit(l_string, ":", 2);
if(l_values && l_values[0] && l_values[1])
if(!dap_strcmp("Content-Length", l_values[0])) {
l_client_http_internal->content_length = atoi(l_values[1]);
l_client_http_internal->is_header_read = false;
}
dap_strfreev(l_values);
if(l_client_http_internal->content_length)
break;
i++;
}
dap_strfreev(l_strings);
}
// restore last symbol
l_client_http_internal->response[l_client_http_internal->header_length - 1] = '\n';
}
// process data
if(l_client_http_internal->content_length) {
l_client_http_internal->is_header_read = false;
/* debug
if(l_client_internal->content_length != (l_client_internal->response_size - l_client_internal->header_length)) {
log_it(L_DEBUG, "s_http_read error!!! content_length(%d)!=response_size-header_size(%d)=%d",
l_client_internal->content_length, l_client_internal->header_length,
l_client_internal->response_size - l_client_internal->header_length);
}*/
// received not enough data
if(l_client_http_internal->content_length
> (l_client_http_internal->response_size - l_client_http_internal->header_length)) {
return;
}
// process data
if(l_client_http_internal->response_callback)
l_client_http_internal->response_callback(
l_client_http_internal->response + l_client_http_internal->header_length,
l_client_http_internal->content_length, //l_client_internal->response_size - l_client_internal->header_size,
l_client_http_internal->obj);
l_client_http_internal->response_size -= l_client_http_internal->header_length;
l_client_http_internal->response_size -= l_client_http_internal->content_length;
l_client_http_internal->header_length = 0;
l_client_http_internal->content_length = 0;
// if the data remains, then read once more
if(l_client_http_internal->response_size > 0) {
s_http_read(a_es, arg);
}
else {
// close connection
dap_events_socket_kill_socket(a_es);
//dap_events_socket_remove_and_delete(a_es, true); //dap_events_socket_delete(a_es, true);
}
}
}
/**
* @brief s_http_stream_error
* @param a_es
* @param arg
*/
static void s_http_error(dap_events_socket_t * a_es, void * arg)
{
log_it(L_DEBUG, "s_http_error ");
int l_err_code = -1;
dap_client_http_internal_t * l_client_http_internal = DAP_CLIENT_HTTP(a_es);
if(!l_client_http_internal) {
log_it(L_ERROR, "s_http_write: l_client_http_internal is NULL!");
return;
}
if(l_client_http_internal->error_callback)
l_client_http_internal->error_callback(l_err_code, l_client_http_internal->obj);
// close connection
dap_events_socket_kill_socket(a_es);
//dap_events_socket_remove_and_delete(a_es, true);
//dap_events_thread_wake_up( &a_es->events->proc_thread);
//dap_events_socket_delete(a_es, false);
//a_es->no_close = false;
}
/**
* @brief s_http_delete
* @param a_es
* @param arg
*/
static void s_http_delete(dap_events_socket_t *a_es, void *arg)
{
// call from dap_events_socket_delete(ev_socket, true);
log_it(L_DEBUG, "s_http_delete ");
dap_client_http_internal_t * l_client_http_internal = DAP_CLIENT_HTTP(a_es);
if(!l_client_http_internal) {
log_it(L_ERROR, "s_http_write: l_client_http_internal is NULL!");
return;
}
//close(l_client_http_internal->socket);
//l_client_http_internal->socket = 0;
DAP_DELETE(l_client_http_internal->response);
l_client_http_internal->response = NULL;
DAP_DELETE(l_client_http_internal);
a_es->_inheritor = NULL;
}
/**
* @brief dap_client_http_request_custom
* @param a_uplink_addr
* @param a_uplink_port
* @param a_method GET or POST
* @param a_request_content_type like "text/text"
* @param a_path
* @param a_request
* @param a_request_size
* @param a_cookie
* @param a_response_callback
* @param a_error_callback
* @param a_obj
* @param a_custom
* @param a_custom_count
*/
void* dap_client_http_request_custom(const char *a_uplink_addr, uint16_t a_uplink_port, const char *a_method,
const char *a_request_content_type, const char * a_path, void *a_request, size_t a_request_size, char *a_cookie,
dap_client_http_callback_data_t a_response_callback, dap_client_http_callback_error_t a_error_callback,
void *a_obj, char **a_custom, size_t a_custom_count)
{
log_it(L_DEBUG, "HTTP request on url '%s:%d'", a_uplink_addr, a_uplink_port);
static dap_events_socket_callbacks_t l_s_callbacks = {
.new_callback = s_http_new,
.read_callback = s_http_read,
.write_callback = s_http_write,
.error_callback = s_http_error,
.delete_callback = s_http_delete
};
// create socket
int l_socket = socket( PF_INET, SOCK_STREAM, 0);
// set socket param
int buffsize = DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX;
#ifdef _WIN32
int optsize = sizeof(int);
{
setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (char *)&buffsize, &optsize );
setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, &optsize );
}
#else
setsockopt(l_socket, SOL_SOCKET, SO_SNDBUF, (void*) &buffsize, sizeof(buffsize));
setsockopt(l_socket, SOL_SOCKET, SO_RCVBUF, (void*) &buffsize, sizeof(buffsize));
#endif
dap_events_socket_t *l_ev_socket = dap_events_socket_wrap_no_add(NULL, l_socket, &l_s_callbacks);
// create private struct
dap_client_http_internal_t *l_client_http_internal = DAP_NEW_Z(dap_client_http_internal_t);
l_ev_socket->_inheritor = l_client_http_internal;
l_client_http_internal->error_callback = a_error_callback;
l_client_http_internal->response_callback = a_response_callback;
//l_client_http_internal->socket = l_socket;
l_client_http_internal->obj = a_obj;
// add to dap_worker
dap_events_socket_create_after(l_ev_socket);
// connect
struct sockaddr_in l_remote_addr;
memset(&l_remote_addr, 0, sizeof(l_remote_addr));
l_remote_addr.sin_family = AF_INET;
l_remote_addr.sin_port = htons(a_uplink_port);
if(inet_pton(AF_INET, a_uplink_addr, &(l_remote_addr.sin_addr)) < 0) {
log_it(L_ERROR, "Wrong remote address '%s:%u'", a_uplink_addr, a_uplink_port);
//close(l_ev_socket->socket);
dap_events_socket_kill_socket(l_ev_socket);
return NULL;
}
else {
int l_err = 0;
if((l_err = connect(l_socket, (struct sockaddr *) &l_remote_addr, sizeof(struct sockaddr_in))) != -1) {
//s_set_sock_nonblock(a_client_pvt->stream_socket, false);
log_it(L_INFO, "Remote address connected (%s:%u) with sock_id %d", a_uplink_addr, a_uplink_port, socket);
}
else {
log_it(L_ERROR, "Remote address can't connected (%s:%u) with sock_id %d", a_uplink_addr, a_uplink_port);
//l_ev_socket->no_close = false;
dap_events_socket_kill_socket(l_ev_socket);
//shutdown(l_ev_socket->socket, SHUT_RDWR);
//dap_events_socket_remove_and_delete(l_ev_socket, true);
//l_ev_socket->socket = 0;
return NULL;
}
}
//dap_client_pvt_t * l_client_pvt = (dap_client_pvt_t*) a_obj;
//dap_events_new();
dap_string_t *l_request_headers = dap_string_new(NULL);
if(a_request && (dap_strcmp(a_method, "POST") == 0 || dap_strcmp(a_method, "POST_ENC") == 0)) {
char l_buf[1024];
log_it(L_DEBUG, "POST request with %u bytes of decoded data", a_request_size);
if(a_request_content_type) {
dap_snprintf(l_buf, sizeof(l_buf), "Content-Type: %s\r\n", a_request_content_type);
l_request_headers = dap_string_append(l_request_headers, l_buf);
}
if(a_custom) {
for(int i = 0; i < a_custom_count; i++) {
l_request_headers = dap_string_append(l_request_headers, (char*) a_custom[i]);
l_request_headers = dap_string_append(l_request_headers, "\r\n");
}
}
if(a_cookie) {
dap_snprintf(l_buf, sizeof(l_buf), "Cookie: %s\r\n", a_cookie);
l_request_headers = dap_string_append(l_request_headers, l_buf);
}
dap_snprintf(l_buf, sizeof(l_buf), "Content-Length: %lu\r\n", a_request_size);
l_request_headers = dap_string_append(l_request_headers, l_buf);
}
// adding string for GET request
char *l_get_str = NULL;
if(!dap_strcmp(a_method, "GET")) {
l_get_str = dap_strdup_printf("?%s", a_request);
}
// send header
dap_events_socket_write_f(l_ev_socket, "%s /%s%s HTTP/1.1\r\n"
"Host: %s\r\n"
"%s"
"\r\n",
a_method, a_path, l_get_str ? l_get_str : "", a_uplink_addr, l_request_headers->str);
// send data for POST request
if(!l_get_str)
dap_events_socket_write(l_ev_socket, a_request, a_request_size);
dap_events_socket_set_writable(l_ev_socket, true);
DAP_DELETE(l_get_str);
dap_string_free(l_request_headers, true);
return l_client_http_internal;
}
/**
* @brief dap_client_http_request
* @param a_uplink_addr
* @param a_uplink_port
* @param a_method GET or POST
* @param a_request_content_type like "text/text"
* @param a_path
* @param a_request
* @param a_request_size
* @param a_cookie
* @param a_response_callback
* @param a_error_callback
* @param a_obj
* @param a_custom
*/
void* dap_client_http_request(const char *a_uplink_addr, uint16_t a_uplink_port, const char * a_method,
const char* a_request_content_type, const char * a_path, void *a_request, size_t a_request_size,
char * a_cookie, dap_client_http_callback_data_t a_response_callback,
dap_client_http_callback_error_t a_error_callback, void *a_obj, void * a_custom)
{
char *a_custom_new[1];
size_t a_custom_count = 0;
// use no more then one custom item only
a_custom_new[0] = (char*) a_custom;
if(a_custom)
a_custom_count = 1;
return dap_client_http_request_custom(a_uplink_addr, a_uplink_port, a_method, a_request_content_type, a_path,
a_request, a_request_size, a_cookie, a_response_callback, a_error_callback, a_obj,
a_custom_new, a_custom_count);
}
......@@ -57,9 +57,11 @@
#include "dap_common.h"
#include "dap_strfuncs.h"
#include "dap_http_client_simple.h"
//#include "dap_http_client_simple.h"
#include "dap_client_http.h"
#include "dap_client.h"
#include "dap_client_pvt.h"
#include "dap_server.h"
#include "dap_stream.h"
#include "dap_stream_ch.h"
#include "dap_stream_ch_proc.h"
......@@ -92,7 +94,6 @@ void m_request_response(void * a_response, size_t a_response_size, void * a_obj)
void m_request_error(int, void *);
// stream callbacks
void m_es_stream_delete(dap_events_socket_t * a_es, void * arg);
void m_es_stream_read(dap_events_socket_t * a_es, void * arg);
void m_es_stream_write(dap_events_socket_t * a_es, void * arg);
......@@ -123,6 +124,8 @@ void dap_client_pvt_new(dap_client_pvt_t * a_client_internal)
a_client_internal->stage = STAGE_BEGIN; // start point of state machine
a_client_internal->stage_status = STAGE_STATUS_DONE;
a_client_internal->uplink_protocol_version = DAP_PROTOCOL_VERSION;
// add to list
dap_client_pvt_hh_add(a_client_internal);
}
typedef struct dap_client_pvt_ref_count {
......@@ -131,12 +134,16 @@ typedef struct dap_client_pvt_ref_count {
UT_hash_handle hh;
} dap_client_pvt_ref_count_t;
static dap_client_pvt_ref_count_t *s_client_pvt_ref = NULL;
static pthread_mutex_t s_mutex_ref = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t s_cond_ref = PTHREAD_COND_INITIALIZER;
//static dap_client_pvt_ref_count_t *s_client_pvt_ref = NULL;
//static pthread_mutex_t s_mutex_ref = PTHREAD_MUTEX_INITIALIZER;
//static pthread_cond_t s_cond_ref = PTHREAD_COND_INITIALIZER;
/*
int dap_client_pvt_ref(dap_client_pvt_t * a_client_internal)
{
if(a_client_internal==0x7fffd8003b00){
int dbg = 5325;
}
int l_ret = 0;
dap_client_pvt_ref_count_t *l_client_pvt_ref;
pthread_mutex_lock(&s_mutex_ref);
......@@ -159,6 +166,9 @@ int dap_client_pvt_ref(dap_client_pvt_t * a_client_internal)
int dap_client_pvt_unref(dap_client_pvt_t * a_client_internal)
{
if(a_client_internal==0x7fffd8003b00){
int dbg = 5325;
}
int l_ret = -1;
dap_client_pvt_ref_count_t *l_client_pvt_ref;
pthread_mutex_lock(&s_mutex_ref);
......@@ -183,10 +193,22 @@ int dap_client_pvt_unref(dap_client_pvt_t * a_client_internal)
return l_ret;
}
/**
* @brief dap_client_pvt_wait
* @param a_client_internal
*/
int dap_client_pvt_get_ref(dap_client_pvt_t * a_client_internal)
{
int l_ref_count = -1;
if(a_client_internal==0x7fffd8003b00){
int dbg = 5325;
}
dap_client_pvt_ref_count_t *l_client_pvt_ref;
pthread_mutex_lock(&s_mutex_ref);
HASH_FIND(hh, s_client_pvt_ref, &a_client_internal, sizeof(dap_client_pvt_t*), l_client_pvt_ref);
if(l_client_pvt_ref) {
l_ref_count = l_client_pvt_ref->ref_count;
}
pthread_mutex_unlock(&s_mutex_ref);
return l_ref_count;
}
int dap_client_pvt_wait_unref(dap_client_pvt_t * a_client_internal, int a_timeout_ms)
{
if(!a_client_internal)
......@@ -230,6 +252,56 @@ int dap_client_pvt_wait_unref(dap_client_pvt_t * a_client_internal, int a_timeou
while(l_client_pvt_ref);
return l_ret;
}
*/
/**
* @brief dap_client_disconnect
* @param a_client
* @return
*/
int dap_client_pvt_disconnect(dap_client_pvt_t *a_client_pvt)
{
//dap_client_pvt_t *a_client_pvt = (a_client) ? DAP_CLIENT_PVT(a_client) : NULL;
if(!a_client_pvt)
return -1;
// stop connection
//dap_http_client_simple_request_break(l_client_internal->curl_sockfd);
if(a_client_pvt && a_client_pvt->stream_socket) {
// if ( l_client_internal->stream_es ) {
// dap_events_socket_remove_and_delete( l_client_internal->stream_es, true );
// l_client_internal->stream_es = NULL;
// }
// l_client_internal->stream_es->signal_close = true;
// start stopping connection
if(!dap_events_socket_kill_socket(a_client_pvt->stream_es)) {
int l_counter = 0;
// wait for stop of connection (max 0.7 sec.)
while(a_client_pvt->stream_es && l_counter < 70) {
dap_usleep(DAP_USEC_PER_SEC / 100);
l_counter++;
}
if(l_counter >= 70) {
dap_events_socket_remove_and_delete(a_client_pvt->stream_es, true);
}
}
// if (l_client_internal->stream_socket ) {
// close (l_client_internal->stream_socket);
// l_client_internal->stream_socket = 0;
// }
return 1;
}
//l_client_internal->stream_socket = 0;
a_client_pvt->is_reconnect = false;
log_it(L_DEBUG, "dap_client_pvt_disconnect() done");
return -1;
}
/**
* @brief dap_client_pvt_delete
......@@ -239,6 +311,15 @@ static void dap_client_pvt_delete_in(dap_client_pvt_t * a_client_pvt)
{
if(!a_client_pvt)
return;
// delete from list
if(dap_client_pvt_hh_del(a_client_pvt)<0){
log_it(L_DEBUG, "dap_client_pvt 0x%x already deleted", a_client_pvt);
return;
}
dap_client_pvt_disconnect(a_client_pvt);
log_it(L_INFO, "dap_client_pvt_delete 0x%x", a_client_pvt);
if(a_client_pvt->session_key_id)
DAP_DELETE(a_client_pvt->session_key_id);
......@@ -255,20 +336,22 @@ static void dap_client_pvt_delete_in(dap_client_pvt_t * a_client_pvt)
if(a_client_pvt->stream_key)
dap_enc_key_delete(a_client_pvt->stream_key);
//a_client_pvt->client = NULL;
DAP_DELETE(a_client_pvt);
}
/*
static void* dap_client_pvt_delete_proc(void *a_arg)
{
dap_client_pvt_t * l_client_pvt = (dap_client_pvt_t*)a_arg;
// wait for release l_client_pvt
dap_client_pvt_wait_unref(l_client_pvt, 20000000);
//dap_client_pvt_wait_unref(l_client_pvt, 20000000);
//dap_client_reset(l_client_pvt->client);
dap_client_pvt_delete_in(l_client_pvt);
//DAP_DELETE(l_client_pvt->client);
pthread_exit(0);
}
}*/
/**
* @brief dap_client_pvt_delete
......@@ -277,7 +360,8 @@ static void* dap_client_pvt_delete_proc(void *a_arg)
void dap_client_pvt_delete(dap_client_pvt_t * a_client_pvt)
{
pthread_t l_thread = NULL;
pthread_create(&l_thread, NULL, dap_client_pvt_delete_proc, a_client_pvt);
//pthread_create(&l_thread, NULL, dap_client_pvt_delete_proc, a_client_pvt);
dap_client_pvt_delete_in(a_client_pvt);
}
/**
......@@ -307,7 +391,7 @@ static void s_set_sock_nonblock(int sockfd, bool is_nonblock)
*/
static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
{
bool l_is_unref = false;
//bool l_is_unref = false;
switch (a_client_pvt->stage_status) {
case STAGE_STATUS_IN_PROGRESS: {
......@@ -326,11 +410,11 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
log_it(L_DEBUG, "ENC request size %u", l_key_str_enc_size);
dap_client_pvt_request(a_client_pvt, DAP_UPLINK_PATH_ENC_INIT "/gd4y5yh78w42aaagh",
l_key_str, l_key_str_size_max, m_enc_init_response, m_enc_init_error);
l_key_str, l_key_str_enc_size, m_enc_init_response, m_enc_init_error);
DAP_DELETE(l_key_str);
}
break;
case STAGE_STREAM_CTL: {
case STAGE_STREAM_CTL: {
log_it(L_INFO, "Go to stage STREAM_CTL: prepare the request");
char *l_request = dap_strdup_printf("%d", DAP_CLIENT_PROTOCOL_VERSION);
......@@ -360,8 +444,9 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, &optsize );
}
#else
setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (const void *) 50000, sizeof(int));
setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (const void *) 50000, sizeof(int));
int buffsize = 65536;
setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (const void *) &buffsize, sizeof(int));
setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (const void *) &buffsize, sizeof(int));
#endif
// Wrap socket and setup callbacks
......@@ -376,7 +461,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
// add to dap_worker
dap_events_socket_create_after(a_client_pvt->stream_es);
a_client_pvt->stream_es->_inheritor = a_client_pvt->client;
a_client_pvt->stream_es->_inheritor = a_client_pvt;//->client;
a_client_pvt->stream = dap_stream_new_es(a_client_pvt->stream_es);
a_client_pvt->stream->is_client_to_uplink = true;
a_client_pvt->stream_session = dap_stream_session_pure_new(); // may be from in packet?
......@@ -392,14 +477,16 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
l_remote_addr.sin_port = htons(a_client_pvt->uplink_port);
if(inet_pton(AF_INET, a_client_pvt->uplink_addr, &(l_remote_addr.sin_addr)) < 0) {
log_it(L_ERROR, "Wrong remote address '%s:%u'", a_client_pvt->uplink_addr, a_client_pvt->uplink_port);
close(a_client_pvt->stream_socket);
a_client_pvt->stream_socket = 0;
//close(a_client_pvt->stream_socket);
dap_events_socket_kill_socket(a_client_pvt->stream_es);
//a_client_pvt->stream_socket = 0;
a_client_pvt->stage_status = STAGE_STATUS_ERROR;
}
else {
int l_err = 0;
if((l_err = connect(a_client_pvt->stream_socket, (struct sockaddr *) &l_remote_addr,
sizeof(struct sockaddr_in))) != -1) {
a_client_pvt->stream_es->flags &= ~DAP_SOCK_SIGNAL_CLOSE;
//s_set_sock_nonblock(a_client_pvt->stream_socket, false);
log_it(L_INFO, "Remote address connected (%s:%u) with sock_id %d", a_client_pvt->uplink_addr,
a_client_pvt->uplink_port, a_client_pvt->stream_socket);
......@@ -408,7 +495,8 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
else {
log_it(L_ERROR, "Remote address can't connected (%s:%u) with sock_id %d", a_client_pvt->uplink_addr,
a_client_pvt->uplink_port);
close(a_client_pvt->stream_socket);
dap_events_socket_kill_socket(a_client_pvt->stream_es);
//close(a_client_pvt->stream_socket);
a_client_pvt->stream_socket = 0;
a_client_pvt->stage_status = STAGE_STATUS_ERROR;
}
......@@ -480,7 +568,9 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
log_it(L_ERROR, "Error state, doing callback if present");
if(a_client_pvt->stage_status_error_callback) {
//dap_client_pvt_ref(a_client_pvt);
a_client_pvt->stage_status_error_callback(a_client_pvt->client, (void*)l_is_last_attempt);
//dap_client_pvt_unref(a_client_pvt);
// Expecting that its one-shot callback
//a_client_internal->stage_status_error_callback = NULL;
}
......@@ -488,7 +578,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
a_client_pvt->stage = STAGE_STREAM_ABORT;
a_client_pvt->stage_status = STAGE_STATUS_ABORTING;
// unref pvt
l_is_unref = true;
//l_is_unref = true;
}
else {
if(!l_is_last_attempt) {
......@@ -509,7 +599,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
log_it(L_INFO, "Too many connection attempts. Tries are over.");
//a_client_pvt->stage_status = STAGE_STATUS_DONE;
// unref pvt
l_is_unref = true;
//l_is_unref = true;
}
}
}
......@@ -517,6 +607,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
case STAGE_STATUS_DONE: {
log_it(L_INFO, "Stage status %s is done",
dap_client_stage_str(a_client_pvt->stage));
// go to next stage
if(a_client_pvt->stage_status_done_callback) {
a_client_pvt->stage_status_done_callback(a_client_pvt->client, NULL);
// Expecting that its one-shot callback
......@@ -526,6 +617,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
bool l_is_last_stage = (a_client_pvt->stage == a_client_pvt->stage_target);
if(l_is_last_stage) {
//l_is_unref = true;
log_it(L_NOTICE, "Stage %s is achieved",
dap_client_stage_str(a_client_pvt->stage));
if(a_client_pvt->stage_target_done_callback) {
......@@ -536,7 +628,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
} else{
log_it(L_ERROR, "!! dap_CLIENT_STAGE_STATUS_DONE but not l_is_last_stage (cur stage=%d, target=%d)!!",a_client_pvt->stage, a_client_pvt->stage_target);
}
l_is_unref = true;
//l_is_unref = true;
}
break;
default:
......@@ -546,10 +638,10 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
if(a_client_pvt->stage_status_callback)
a_client_pvt->stage_status_callback(a_client_pvt->client, NULL);
if(l_is_unref) {
//if(l_is_unref) {
// unref pvt
dap_client_pvt_unref(a_client_pvt);
}
//dap_client_pvt_unref(a_client_pvt);
//}
}
/**
......@@ -561,11 +653,12 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
void dap_client_pvt_stage_transaction_begin(dap_client_pvt_t * a_client_internal, dap_client_stage_t a_stage_next,
dap_client_callback_t a_done_callback)
{
// ref pvt client
//dap_client_pvt_ref(a_client_internal);
a_client_internal->stage_status_done_callback = a_done_callback;
a_client_internal->stage = a_stage_next;
a_client_internal->stage_status = STAGE_STATUS_IN_PROGRESS;
// ref pvt client
dap_client_pvt_ref(a_client_internal);
s_stage_status_after(a_client_internal);
}
......@@ -585,23 +678,24 @@ void dap_client_pvt_request(dap_client_pvt_t * a_client_internal, const char * a
a_client_internal->request_error_callback = a_response_error;
a_client_internal->is_encrypted = false;
size_t l_url_size_max = 0;
char *l_url = NULL;
if(a_path) {
l_url_size_max = dap_strlen(a_client_internal->uplink_addr) + strlen(a_path) + 15;
l_url = DAP_NEW_Z_SIZE(char, l_url_size_max);
snprintf(l_url, l_url_size_max, "http://%s:%u/%s", a_client_internal->uplink_addr,
a_client_internal->uplink_port, a_path);
} else {
l_url_size_max = strlen(a_client_internal->uplink_addr) + 15;
l_url = DAP_NEW_Z_SIZE(char, l_url_size_max);
snprintf(l_url, l_url_size_max, "http://%s:%u", a_client_internal->uplink_addr, a_client_internal->uplink_port);
}
dap_http_client_simple_request(l_url, a_request ? "POST" : "GET", "text/text", a_request, a_request_size,
NULL,
m_request_response, m_request_error, a_client_internal, NULL);
DAP_DELETE(l_url);
// size_t l_url_size_max = 0;
// char *l_url = NULL;
// if(a_path) {
// l_url_size_max = dap_strlen(a_client_internal->uplink_addr) + strlen(a_path) + 15;
// l_url = DAP_NEW_Z_SIZE(char, l_url_size_max);
//
// snprintf(l_url, l_url_size_max, "http://%s:%u/%s", a_client_internal->uplink_addr,
// a_client_internal->uplink_port, a_path);
// } else {
// l_url_size_max = strlen(a_client_internal->uplink_addr) + 15;
// l_url = DAP_NEW_Z_SIZE(char, l_url_size_max);
// snprintf(l_url, l_url_size_max, "http://%s:%u", a_client_internal->uplink_addr, a_client_internal->uplink_port);
// }
dap_client_http_request(a_client_internal->uplink_addr,a_client_internal->uplink_port, a_request ? "POST" : "GET", "text/text", a_path, a_request,
a_request_size, NULL, m_request_response, m_request_error, a_client_internal, NULL);
// a_client_internal->curl = dap_http_client_simple_request(l_url, a_request ? "POST" : "GET", "text/text", a_request,
// a_request_size, NULL, m_request_response, m_request_error, &a_client_internal->curl_sockfd, a_client_internal, NULL);
// DAP_DELETE(l_url);
}
/**
......@@ -628,9 +722,9 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char
size_t l_query_size = a_query ? strlen(a_query) : 0;
size_t l_url_size;
char l_url[1024] = { 0 };
snprintf(l_url, 1024, "http://%s:%u", a_client_internal->uplink_addr, a_client_internal->uplink_port);
l_url_size = strlen(l_url);
// char l_url[1024] = { 0 };
// snprintf(l_url, 1024, "http://%s:%u", a_client_internal->uplink_addr, a_client_internal->uplink_port);
// l_url_size = strlen(l_url);
size_t l_sub_url_enc_size_max = l_sub_url_size ? (5 * l_sub_url_size + 16) : 0;
char *l_sub_url_enc = l_sub_url_size ? DAP_NEW_Z_SIZE(char, l_sub_url_enc_size_max + 1) : NULL;
......@@ -639,8 +733,8 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char
char *l_query_enc =
(is_query_enc) ? (l_query_size ? DAP_NEW_Z_SIZE(char, l_query_enc_size_max + 1) : NULL) : (char*) a_query;
size_t l_url_full_size_max = 5 * l_sub_url_size + 5 * l_query_size + 16 + l_url_size + 2;
char * l_url_full = DAP_NEW_Z_SIZE(char, l_url_full_size_max + 1);
// size_t l_url_full_size_max = 5 * l_sub_url_size + 5 * l_query_size + 16 + l_url_size + 2;
// char * l_url_full = DAP_NEW_Z_SIZE(char, l_url_full_size_max + 1);
size_t l_request_enc_size_max = a_request_size ? a_request_size * 2 + 16 : 0;
char * l_request_enc = a_request_size ? DAP_NEW_Z_SIZE(char, l_request_enc_size_max + 1) : NULL;
......@@ -675,6 +769,7 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char
l_request_enc, l_request_enc_size_max,
DAP_ENC_DATA_TYPE_RAW);
/*
if(a_path) {
if(l_sub_url_size) {
if(l_query_size) {
......@@ -690,6 +785,20 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char
} else {
snprintf(l_url_full, l_url_full_size_max, "%s", l_url);
}
*/
char *l_path = NULL;
if(a_path) {
if(l_sub_url_size) {
if(l_query_size) {
l_path = dap_strdup_printf("%s/%s?%s", a_path, l_sub_url_enc, l_query_enc);
} else {
l_path = dap_strdup_printf("%s/%s", a_path, l_sub_url_enc);
}
} else {
l_path = dap_strdup(a_path);
}
}
size_t l_key_hdr_str_size_max = strlen(a_client_internal->session_key_id) + 10;
char *l_key_hdr_str = DAP_NEW_Z_SIZE(char, l_key_hdr_str_size_max);
......@@ -703,9 +812,12 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char
a_custom_new[1] = "SessionCloseAfterRequest: true";
a_custom_count++;
}
dap_http_client_simple_request_custom(l_url_full, a_request ? "POST" : "GET", "text/text",
l_request_enc, l_request_enc_size, NULL,
m_request_response, m_request_error, a_client_internal, a_custom_new, a_custom_count);
dap_client_http_request_custom(a_client_internal->uplink_addr, a_client_internal->uplink_port, a_request ? "POST" : "GET", "text/text",
l_path, l_request_enc, l_request_enc_size, NULL,
m_request_response, m_request_error, a_client_internal, a_custom_new, a_custom_count);
// dap_http_client_simple_request_custom(l_url_full, a_request ? "POST" : "GET", "text/text",
// l_request_enc, l_request_enc_size, NULL,
// m_request_response, a_client_internal->curl_sockfd ,m_request_error, a_client_internal, a_custom_new, a_custom_count);
DAP_DELETE(l_key_hdr_str);
if(l_sub_url_enc)
......@@ -714,8 +826,8 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char
if(is_query_enc && l_query_enc)
DAP_DELETE(l_query_enc);
if(l_url_full)
DAP_DELETE(l_url_full);
// if(l_url_full)
// DAP_DELETE(l_url_full);
if(l_request_enc)
DAP_DELETE(l_request_enc);
......@@ -729,13 +841,17 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char
void m_request_error(int a_err_code, void * a_obj)
{
dap_client_pvt_t * a_client_internal = (dap_client_pvt_t *) a_obj;
dap_client_pvt_hh_lock();
if(!dap_client_pvt_hh_get(a_client_internal)){
dap_client_pvt_hh_unlock();
return;
}
if(a_client_internal && a_client_internal->request_error_callback && a_client_internal->client)
{
if(a_client_internal && a_client_internal->request_error_callback && a_client_internal->client && a_client_internal->client->_internal)
a_client_internal->request_error_callback(a_client_internal->client, a_err_code);
}
// unref pvt client
//dap_client_pvt_unref(a_client_internal);
dap_client_pvt_hh_unlock();
}
/**
......@@ -749,7 +865,7 @@ void m_request_response(void * a_response, size_t a_response_size, void * a_obj)
dap_client_pvt_t * a_client_internal = (dap_client_pvt_t *) a_obj;
if(!a_client_internal || !a_client_internal->client)
return;
//int l_ref = dap_client_pvt_get_ref(a_client_internal);
if(a_client_internal->is_encrypted) {
size_t l_response_dec_size_max = a_response_size ? a_response_size * 2 + 16 : 0;
char * l_response_dec = a_response_size ? DAP_NEW_Z_SIZE(char, l_response_dec_size_max) : NULL;
......@@ -768,8 +884,10 @@ void m_request_response(void * a_response, size_t a_response_size, void * a_obj)
a_client_internal->request_response_callback(a_client_internal->client, a_response, a_response_size);
}
//int l_ref2 = dap_client_pvt_get_ref(a_client_internal);
// unref pvt client
dap_client_pvt_unref(DAP_CLIENT_PVT(a_client_internal->client));
//dap_client_pvt_unref(a_client_internal);
//dap_client_pvt_unref(DAP_CLIENT_PVT(a_client_internal->client));
}
/**
......@@ -1047,41 +1165,43 @@ void m_stage_stream_streaming(dap_client_t * a_client, void* arg)
*/
void m_es_stream_delete(dap_events_socket_t *a_es, void *arg)
{
log_it(L_INFO, "====================================================== stream delete/peer reconnect");
log_it(L_INFO, "================= stream delete/peer reconnect");
dap_client_t *l_client = DAP_CLIENT(a_es);
//dap_client_t *l_client = DAP_CLIENT(a_es);
dap_client_pvt_t * l_client_pvt = a_es->_inheritor;
if(l_client == NULL) {
log_it(L_ERROR, "dap_client is not initialized");
if(l_client_pvt == NULL) {
log_it(L_ERROR, "dap_client_pvt_t is not initialized");
return;
}
pthread_mutex_lock(&l_client->mutex);
//pthread_mutex_lock(&l_client->mutex);
dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client);
//dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client);
log_it(L_DEBUG, "client_pvt=0x%x", l_client_pvt);
if(l_client_pvt == NULL) {
log_it(L_ERROR, "dap_client_pvt is not initialized");
pthread_mutex_unlock(&l_client->mutex);
//pthread_mutex_unlock(&l_client->mutex);
return;
}
dap_stream_delete(l_client_pvt->stream);
l_client_pvt->stream = NULL;
if(l_client_pvt->client)
dap_client_reset(l_client_pvt->client);
// if(l_client_pvt->client && l_client_pvt->client == l_client)
// dap_client_reset(l_client_pvt->client);
// l_client_pvt->client= NULL;
l_client_pvt->stream_es = NULL;
// log_it(L_DEBUG, "dap_stream_session_close()");
// sleep(3);
// dap_stream_session_close(l_client_pvt->stream_session->id);
dap_stream_session_close(l_client_pvt->stream_session->id);
l_client_pvt->stream_session = NULL;
pthread_mutex_unlock(&l_client->mutex);
// signal to permit deleting of l_client_pvt
l_client_pvt->stream_es = NULL;
//pthread_mutex_unlock(&l_client->mutex);
/* disable reconnect from here
if(l_client_pvt->is_reconnect) {
log_it(L_DEBUG, "l_client_pvt->is_reconnect = true");
......@@ -1089,6 +1209,7 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg)
}
else
log_it(L_DEBUG, "l_client_pvt->is_reconnect = false");
*/
}
/**
......@@ -1098,8 +1219,8 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg)
*/
void m_es_stream_read(dap_events_socket_t * a_es, void * arg)
{
dap_client_t * l_client = DAP_CLIENT(a_es);
dap_client_pvt_t * l_client_pvt = (l_client) ? DAP_CLIENT_PVT(l_client) : NULL;
//dap_client_t * l_client = DAP_CLIENT(a_es);
dap_client_pvt_t * l_client_pvt = a_es->_inheritor;//(l_client) ? DAP_CLIENT_PVT(l_client) : NULL;
if(!l_client_pvt) {
log_it(L_ERROR, "m_es_stream_read: l_client_pvt is NULL!");
return;
......@@ -1145,8 +1266,9 @@ void m_es_stream_read(dap_events_socket_t * a_es, void * arg)
*/
void m_es_stream_write(dap_events_socket_t * a_es, void * arg)
{
dap_client_t * l_client = DAP_CLIENT(a_es);
dap_client_pvt_t * l_client_pvt = (l_client) ? DAP_CLIENT_PVT(l_client) : NULL;
//dap_client_t * l_client = DAP_CLIENT(a_es);
//dap_client_pvt_t * l_client_pvt = (l_client) ? DAP_CLIENT_PVT(l_client) : NULL;
dap_client_pvt_t * l_client_pvt = a_es->_inheritor;
if(!l_client_pvt) {
log_it(L_ERROR, "m_es_stream_write: l_client_pvt is NULL!");
return;
......@@ -1177,8 +1299,9 @@ void m_es_stream_write(dap_events_socket_t * a_es, void * arg)
void m_es_stream_error(dap_events_socket_t * a_es, void * arg)
{
dap_client_t * l_client = DAP_CLIENT(a_es);
dap_client_pvt_t * l_client_pvt = (l_client) ? DAP_CLIENT_PVT(l_client) : NULL;
//dap_client_t * l_client = DAP_CLIENT(a_es);
//dap_client_pvt_t * l_client_pvt = (l_client) ? DAP_CLIENT_PVT(l_client) : NULL;
dap_client_pvt_t * l_client_pvt = a_es->_inheritor;
if(!l_client_pvt) {
log_it(L_ERROR, "m_es_stream_error: l_client_pvt is NULL!");
return;
......
/*
* Authors:
* Alexander Lysikov <alexander.lysikov@demlabs.net>
* DeM Labs Inc. https://demlabs.net
* Copyright (c) 2020
*
This file is part of DAP (Deus Applications Prototypes) the open source project
DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
DAP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with any DAP based project. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <pthread.h>
#include <uthash.h>
#include "dap_common.h"
#include "dap_client_pvt.h"
typedef struct dap_client_pvt_hh {
dap_client_pvt_t *client_pvt;
UT_hash_handle hh;
} dap_client_pvt_hh_t;
// List of active connections
static dap_client_pvt_hh_t *s_client_pvt_list = NULL;
// for separate access to s_conn_list
static pthread_mutex_t s_client_pvt_list_mutex = PTHREAD_MUTEX_INITIALIZER;
/**
* dap_client_pvt_hh_lock
*/
int dap_client_pvt_hh_lock(void)
{
return pthread_mutex_lock(&s_client_pvt_list_mutex);
}
/**
* dap_client_pvt_hh_unlock
*/
int dap_client_pvt_hh_unlock(void)
{
return pthread_mutex_unlock(&s_client_pvt_list_mutex);
}
/**
* find active connection in the list
*
* return 0 OK, -1 error, -2 connection not found
*/
void* dap_client_pvt_hh_get(dap_client_pvt_t* a_client_pvt)
{
if(!a_client_pvt)
return NULL;
dap_client_pvt_hh_t *l_cur_item;
HASH_FIND_PTR(s_client_pvt_list, &a_client_pvt, l_cur_item);
return (void*) l_cur_item;
}
/**
* Add new active connection to the list
*
* return 0 OK, -1 error, -2 connection present
*/
int dap_client_pvt_hh_add(dap_client_pvt_t* a_client_pvt)
{
int l_ret = 0;
if(!a_client_pvt)
return -1;
pthread_mutex_lock(&s_client_pvt_list_mutex);
dap_client_pvt_hh_t *l_cur_item;
HASH_FIND_PTR(s_client_pvt_list, &a_client_pvt, l_cur_item);
if(l_cur_item == NULL) {
l_cur_item = DAP_NEW(dap_client_pvt_hh_t);
l_cur_item->client_pvt = a_client_pvt;
HASH_ADD_PTR(s_client_pvt_list, client_pvt, l_cur_item);
l_ret = 0;
}
// connection already present
else
l_ret = -2;
//connect_list = g_list_append(connect_list, client);
pthread_mutex_unlock(&s_client_pvt_list_mutex);
return l_ret;
}
/**
* Delete active connection from the list
*
* return 0 OK, -1 error, -2 connection not found
*/
int dap_client_pvt_hh_del(dap_client_pvt_t *a_client_pvt)
{
int ret = -1;
if(!a_client_pvt)
return -1;
pthread_mutex_lock(&s_client_pvt_list_mutex);
dap_client_pvt_hh_t *l_cur_item;
HASH_FIND_PTR(s_client_pvt_list, &a_client_pvt, l_cur_item);
if(l_cur_item != NULL) {
HASH_DEL(s_client_pvt_list, l_cur_item);
DAP_DELETE(l_cur_item);
ret = 0;
}
// connection not found in the hash
else {
ret = -2;
}
pthread_mutex_unlock(&s_client_pvt_list_mutex);
return ret;
}