diff --git a/include/dap_client.h b/include/dap_client.h index 8b8206914716b6b9c5559c5c8df0ab71bf946997..29545c805ff733606b29a7392883ca9142d41a34 100755 --- a/include/dap_client.h +++ b/include/dap_client.h @@ -117,7 +117,7 @@ void dap_client_request_enc(dap_client_t * a_client, const char * a_path,const c void dap_client_request(dap_client_t * a_client, const char * a_full_path, void * a_request, size_t a_request_size, dap_client_callback_data_size_t a_response_proc, dap_client_callback_int_t a_response_error); -int dap_client_disconnect(dap_client_t * a_client); +//int dap_client_disconnect(dap_client_t * a_client); const char * dap_client_get_stage_str(dap_client_t * a_client); const char * dap_client_stage_str(dap_client_stage_t a_stage); @@ -136,7 +136,7 @@ void dap_client_set_active_channels (dap_client_t * a_client, const char * a_act dap_client_stage_t dap_client_get_stage(dap_client_t * a_client); dap_client_stage_status_t dap_client_get_stage_status(dap_client_t * a_client); -#define DAP_CLIENT(a) ((dap_client_t *) (a)->_inheritor ) +#define DAP_CLIENT(a) (a ? (dap_client_t *) (a)->_inheritor : NULL) #ifdef __cplusplus } diff --git a/include/dap_client_http.h b/include/dap_client_http.h new file mode 100755 index 0000000000000000000000000000000000000000..ebf0df092b03f074debaa5d35b05249998485cb8 --- /dev/null +++ b/include/dap_client_http.h @@ -0,0 +1,45 @@ +/* + * Authors: + * Alexander Lysikov <alexander.lysikov@demlabs.net> + * DeM Labs Inc. https://demlabs.net + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <stdint.h> + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void (*dap_client_http_callback_error_t)(int, void *); // Callback for specific http client operations +typedef void (*dap_client_http_callback_data_t)(void *, size_t, void *); // Callback for specific http client operations + +void* dap_client_http_request_custom(const char *a_uplink_addr, uint16_t a_uplink_port, const char *a_method, + const char *a_request_content_type, const char * a_path, void *a_request, size_t a_request_size, char *a_cookie, + dap_client_http_callback_data_t a_response_callback, dap_client_http_callback_error_t a_error_callback, + void *a_obj, char **a_custom, size_t a_custom_count); + +void* dap_client_http_request(const char *a_uplink_addr, uint16_t a_uplink_port, const char * a_method, + const char* a_request_content_type, const char * a_path, void *a_request, size_t a_request_size, + char * a_cookie, dap_client_http_callback_data_t a_response_callback, + dap_client_http_callback_error_t a_error_callback, void *a_obj, void * a_custom); + +#ifdef __cplusplus +} +#endif diff --git a/include/dap_client_pvt.h b/include/dap_client_pvt.h index 7de116e557a9e1e1a2ec7f215b9215004ab900ec..3c27387fbe003fdd14a9dd93c4927d33ae769d38 100755 --- a/include/dap_client_pvt.h +++ b/include/dap_client_pvt.h @@ -50,8 +50,8 @@ typedef struct dap_client_internal char * session_key_id; - void *curl;// curl connection descriptor - long curl_sockfd;// curl socket descriptor + //void *curl;// curl connection descriptor + //long curl_sockfd;// curl socket descriptor char * last_parsed_node; char * uplink_addr; @@ -100,5 +100,12 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char void dap_client_pvt_new(dap_client_pvt_t * a_client_internal); void dap_client_pvt_delete(dap_client_pvt_t * a_client_pvts); -int dap_client_pvt_ref(dap_client_pvt_t * a_client_internal); -int dap_client_pvt_unref(dap_client_pvt_t * a_client_internal); +//int dap_client_pvt_ref(dap_client_pvt_t * a_client_internal); +//int dap_client_pvt_unref(dap_client_pvt_t * a_client_internal); + +// from dap_client_pvt_hh.c +int dap_client_pvt_hh_lock(void); +int dap_client_pvt_hh_unlock(void); +void* dap_client_pvt_hh_get(dap_client_pvt_t* a_client_pvt); +int dap_client_pvt_hh_add(dap_client_pvt_t* a_client_pvt); +int dap_client_pvt_hh_del(dap_client_pvt_t *a_client_pvt); diff --git a/src/dap_client.c b/src/dap_client.c index c23d698910ac950e3e195a2b04d7bf8052a50a06..2b15fa18b62fb75095aea7fd94141425e6815544 100644 --- a/src/dap_client.c +++ b/src/dap_client.c @@ -190,7 +190,7 @@ void dap_client_delete(dap_client_t * a_client) pthread_mutex_lock(&a_client->mutex); - dap_client_disconnect(a_client); + //dap_client_disconnect(a_client); //dap_client_reset(a_client); //dap_client_pvt_t *l_client_pvt = DAP_CLIENT_PVT(a_client); @@ -199,7 +199,7 @@ void dap_client_delete(dap_client_t * a_client) //a_client->_internal = NULL; dap_client_pvt_delete(DAP_CLIENT_PVT(a_client)); - a_client->_internal = NULL; + //a_client->_internal = NULL; //pthread_mutex_t *l_mutex = &a_client->mutex; //memset(a_client, 0, sizeof(dap_client_t)); @@ -207,6 +207,7 @@ void dap_client_delete(dap_client_t * a_client) pthread_mutex_unlock(&a_client->mutex); // a_client will be deleted in dap_events_socket_delete() -> free( a_es->_inheritor ); //DAP_DELETE(a_client); + DAP_DELETE(a_client); } /** @@ -315,45 +316,6 @@ void dap_client_request(dap_client_t * a_client, const char * a_full_path, void dap_client_pvt_request(l_client_internal, a_full_path, a_request, a_request_size, a_response_proc, a_response_error); } -/** - * @brief dap_client_disconnect - * @param a_client - * @return - */ -int dap_client_disconnect( dap_client_t *a_client ) -{ - dap_client_pvt_t *l_client_internal = (a_client) ? DAP_CLIENT_PVT(a_client) : NULL; - if (!l_client_internal) - return -1; - // stop connection - dap_http_client_simple_request_break(l_client_internal->curl_sockfd); - - if ( l_client_internal && l_client_internal->stream_socket ) { - -// if ( l_client_internal->stream_es ) { -// dap_events_socket_remove_and_delete( l_client_internal->stream_es, true ); -// l_client_internal->stream_es = NULL; -// } - -// l_client_internal->stream_es->signal_close = true; - dap_events_kill_socket( l_client_internal->stream_es ); - -// if (l_client_internal->stream_socket ) { -// close (l_client_internal->stream_socket); - l_client_internal->stream_socket = 0; -// } - - return 1; - } - //l_client_internal->stream_socket = 0; - - l_client_internal->is_reconnect = false; - - log_it(L_DEBUG, "dap_client_disconnect( ) done" ); - - return -1; -} - /** * @brief dap_client_error_str * @param a_client_error diff --git a/src/dap_client_http.c b/src/dap_client_http.c new file mode 100644 index 0000000000000000000000000000000000000000..19eda356c2621b7582ab4180d668c219e17454cb --- /dev/null +++ b/src/dap_client_http.c @@ -0,0 +1,427 @@ +/* + * Authors: + * Alexander Lysikov <alexander.lysikov@demlabs.net> + * DeM Labs Inc. https://demlabs.net + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifdef WIN32 +// for Windows +#include <winsock2.h> +#include <windows.h> +#include <mswsock.h> +#include <ws2tcpip.h> +#include <io.h> +#else +// for Unix-like systems +#include <sys/types.h> +#include <sys/socket.h> +//#include <bits/socket_type.h> +#endif +#include <unistd.h> + +#include "dap_common.h" +#include "dap_strfuncs.h" +#include "dap_string.h" +#include "dap_events_socket.h" +#include "dap_stream_ch_proc.h" +#include "dap_server.h" +#include "dap_client.h" +#include "dap_client_pvt.h" +#include "dap_client_http.h" + +#define LOG_TAG "dap_client_http" + +#define DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX 65536 //40960 + +typedef struct dap_http_client_internal { + + dap_client_http_callback_data_t response_callback; + dap_client_http_callback_error_t error_callback; + + void *obj; // dap_client_pvt_t *client_pvt; + uint8_t *request; + size_t request_size; + size_t request_sent_size; + + int socket; + + bool is_header_read; + size_t header_length; + size_t content_length; + + uint8_t *response; + size_t response_size; + size_t response_size_max; + +} dap_client_http_internal_t; + +#define DAP_CLIENT_HTTP(a) (a ? (dap_client_http_internal_t *) (a)->_inheritor : NULL) + +/** + * @brief s_http_new + * @param a_es + * @param arg + */ +static void s_http_new(dap_events_socket_t * a_es, void * arg) +{ + log_it(L_DEBUG, "s_http_new "); + dap_client_http_internal_t * l_client_http_internal = DAP_CLIENT_HTTP(a_es); + if(!l_client_http_internal) { + log_it(L_ERROR, "s_http_new: l_client_http_internal is NULL!"); + return; + } + l_client_http_internal->header_length = 0; + l_client_http_internal->content_length = 0; + l_client_http_internal->response_size = 0; + l_client_http_internal->response_size_max = DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX; + l_client_http_internal->response = (uint8_t*) DAP_NEW_SIZE(uint8_t, DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX); +} + +/** + * @brief s_http_stream_write + * @param a_es + * @param arg + */ +static void s_http_write(dap_events_socket_t * a_es, void * arg) +{ + log_it(L_DEBUG, "s_http_write "); +// dap_client_http_internal_t * l_client_http_internal = DAP_CLIENT_HTTP(a_es); +// if(!l_client_internal) { +// log_it(L_ERROR, "s_http_write: l_client_http_internal is NULL!"); +// return; +// } + + //bool ready_to_write = false; + //dap_events_socket_set_writable(a_es, ready_to_write); +} + +/** + * @brief s_http_stream_read + * @param a_es + * @param arg + */ +static void s_http_read(dap_events_socket_t * a_es, void * arg) +{ + log_it(L_DEBUG, "s_http_read "); + dap_client_http_internal_t * l_client_http_internal = DAP_CLIENT_HTTP(a_es); + if(!l_client_http_internal) { + log_it(L_ERROR, "s_http_read: l_client_http_internal is NULL!"); + return; + } + // read data + l_client_http_internal->response_size += dap_events_socket_read(a_es, + l_client_http_internal->response + l_client_http_internal->response_size, + l_client_http_internal->response_size_max - l_client_http_internal->response_size); + + // if buffer is overfull then read once more + if(l_client_http_internal->response_size >= DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX) { + log_it(L_ERROR, "s_http_read response_size(%d) overfull!!!", l_client_http_internal->response_size); + } + + // search http header + if(!l_client_http_internal->is_header_read && l_client_http_internal->response_size > 4 + && !l_client_http_internal->content_length) { + for(size_t l_pos = 0; l_pos < l_client_http_internal->response_size - 4; l_pos++) { + uint8_t *l_str = l_client_http_internal->response + l_pos; + if(!dap_strncmp((const char*) l_str, "\r\n\r\n", 4)) { + l_client_http_internal->header_length = l_pos + 4; + l_client_http_internal->is_header_read = true; + //dap_events_socket_shrink_buf_in(a_es, l_client_internal->header_size); + break; + } + } + } + // process http header + if(l_client_http_internal->is_header_read) { + l_client_http_internal->response[l_client_http_internal->header_length - 1] = 0; + // search strings in header + char **l_strings = dap_strsplit((char*) l_client_http_internal->response, "\r\n", -1); + if(l_strings) { + int i = 0; + while(l_strings[i]) { + char *l_string = l_strings[i]; + char **l_values = dap_strsplit(l_string, ":", 2); + if(l_values && l_values[0] && l_values[1]) + if(!dap_strcmp("Content-Length", l_values[0])) { + l_client_http_internal->content_length = atoi(l_values[1]); + l_client_http_internal->is_header_read = false; + } + dap_strfreev(l_values); + if(l_client_http_internal->content_length) + break; + i++; + } + dap_strfreev(l_strings); + } + + // restore last symbol + l_client_http_internal->response[l_client_http_internal->header_length - 1] = '\n'; + } + + // process data + if(l_client_http_internal->content_length) { + l_client_http_internal->is_header_read = false; + /* debug + if(l_client_internal->content_length != (l_client_internal->response_size - l_client_internal->header_length)) { + log_it(L_DEBUG, "s_http_read error!!! content_length(%d)!=response_size-header_size(%d)=%d", + l_client_internal->content_length, l_client_internal->header_length, + l_client_internal->response_size - l_client_internal->header_length); + }*/ + + // received not enough data + if(l_client_http_internal->content_length + > (l_client_http_internal->response_size - l_client_http_internal->header_length)) { + return; + } + // process data + if(l_client_http_internal->response_callback) + l_client_http_internal->response_callback( + l_client_http_internal->response + l_client_http_internal->header_length, + l_client_http_internal->content_length, //l_client_internal->response_size - l_client_internal->header_size, + l_client_http_internal->obj); + l_client_http_internal->response_size -= l_client_http_internal->header_length; + l_client_http_internal->response_size -= l_client_http_internal->content_length; + l_client_http_internal->header_length = 0; + l_client_http_internal->content_length = 0; + // if the data remains, then read once more + if(l_client_http_internal->response_size > 0) { + s_http_read(a_es, arg); + } + else { + // close connection + dap_events_socket_kill_socket(a_es); + //dap_events_socket_remove_and_delete(a_es, true); //dap_events_socket_delete(a_es, true); + } + } +} + +/** + * @brief s_http_stream_error + * @param a_es + * @param arg + */ +static void s_http_error(dap_events_socket_t * a_es, void * arg) +{ + log_it(L_DEBUG, "s_http_error "); + int l_err_code = -1; + dap_client_http_internal_t * l_client_http_internal = DAP_CLIENT_HTTP(a_es); + if(!l_client_http_internal) { + log_it(L_ERROR, "s_http_write: l_client_http_internal is NULL!"); + return; + } + if(l_client_http_internal->error_callback) + l_client_http_internal->error_callback(l_err_code, l_client_http_internal->obj); + + // close connection + dap_events_socket_kill_socket(a_es); + //dap_events_socket_remove_and_delete(a_es, true); + //dap_events_thread_wake_up( &a_es->events->proc_thread); + //dap_events_socket_delete(a_es, false); + //a_es->no_close = false; +} + +/** + * @brief s_http_delete + * @param a_es + * @param arg + */ +static void s_http_delete(dap_events_socket_t *a_es, void *arg) +{ + // call from dap_events_socket_delete(ev_socket, true); + log_it(L_DEBUG, "s_http_delete "); + dap_client_http_internal_t * l_client_http_internal = DAP_CLIENT_HTTP(a_es); + if(!l_client_http_internal) { + log_it(L_ERROR, "s_http_write: l_client_http_internal is NULL!"); + return; + } + //close(l_client_http_internal->socket); + //l_client_http_internal->socket = 0; + + DAP_DELETE(l_client_http_internal->response); + l_client_http_internal->response = NULL; + + DAP_DELETE(l_client_http_internal); + a_es->_inheritor = NULL; +} + +/** + * @brief dap_client_http_request_custom + * @param a_uplink_addr + * @param a_uplink_port + * @param a_method GET or POST + * @param a_request_content_type like "text/text" + * @param a_path + * @param a_request + * @param a_request_size + * @param a_cookie + * @param a_response_callback + * @param a_error_callback + * @param a_obj + * @param a_custom + * @param a_custom_count + */ +void* dap_client_http_request_custom(const char *a_uplink_addr, uint16_t a_uplink_port, const char *a_method, + const char *a_request_content_type, const char * a_path, void *a_request, size_t a_request_size, char *a_cookie, + dap_client_http_callback_data_t a_response_callback, dap_client_http_callback_error_t a_error_callback, + void *a_obj, char **a_custom, size_t a_custom_count) +{ + log_it(L_DEBUG, "HTTP request on url '%s:%d'", a_uplink_addr, a_uplink_port); + static dap_events_socket_callbacks_t l_s_callbacks = { + .new_callback = s_http_new, + .read_callback = s_http_read, + .write_callback = s_http_write, + .error_callback = s_http_error, + .delete_callback = s_http_delete + }; + + // create socket + int l_socket = socket( PF_INET, SOCK_STREAM, 0); + // set socket param + int buffsize = DAP_CLIENT_HTTP_RESPONSE_SIZE_MAX; +#ifdef _WIN32 + int optsize = sizeof(int); + { + setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (char *)&buffsize, &optsize ); + setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, &optsize ); + } +#else + setsockopt(l_socket, SOL_SOCKET, SO_SNDBUF, (void*) &buffsize, sizeof(buffsize)); + setsockopt(l_socket, SOL_SOCKET, SO_RCVBUF, (void*) &buffsize, sizeof(buffsize)); +#endif + dap_events_socket_t *l_ev_socket = dap_events_socket_wrap_no_add(NULL, l_socket, &l_s_callbacks); + + // create private struct + dap_client_http_internal_t *l_client_http_internal = DAP_NEW_Z(dap_client_http_internal_t); + l_ev_socket->_inheritor = l_client_http_internal; + l_client_http_internal->error_callback = a_error_callback; + l_client_http_internal->response_callback = a_response_callback; + //l_client_http_internal->socket = l_socket; + l_client_http_internal->obj = a_obj; + + // add to dap_worker + dap_events_socket_create_after(l_ev_socket); + + // connect + struct sockaddr_in l_remote_addr; + memset(&l_remote_addr, 0, sizeof(l_remote_addr)); + l_remote_addr.sin_family = AF_INET; + l_remote_addr.sin_port = htons(a_uplink_port); + if(inet_pton(AF_INET, a_uplink_addr, &(l_remote_addr.sin_addr)) < 0) { + log_it(L_ERROR, "Wrong remote address '%s:%u'", a_uplink_addr, a_uplink_port); + //close(l_ev_socket->socket); + dap_events_socket_kill_socket(l_ev_socket); + return NULL; + } + else { + int l_err = 0; + if((l_err = connect(l_socket, (struct sockaddr *) &l_remote_addr, sizeof(struct sockaddr_in))) != -1) { + //s_set_sock_nonblock(a_client_pvt->stream_socket, false); + log_it(L_INFO, "Remote address connected (%s:%u) with sock_id %d", a_uplink_addr, a_uplink_port, socket); + } + else { + log_it(L_ERROR, "Remote address can't connected (%s:%u) with sock_id %d", a_uplink_addr, a_uplink_port); + //l_ev_socket->no_close = false; + dap_events_socket_kill_socket(l_ev_socket); + //shutdown(l_ev_socket->socket, SHUT_RDWR); + //dap_events_socket_remove_and_delete(l_ev_socket, true); + //l_ev_socket->socket = 0; + return NULL; + } + } + + //dap_client_pvt_t * l_client_pvt = (dap_client_pvt_t*) a_obj; + //dap_events_new(); + dap_string_t *l_request_headers = dap_string_new(NULL); + + if(a_request && (dap_strcmp(a_method, "POST") == 0 || dap_strcmp(a_method, "POST_ENC") == 0)) { + char l_buf[1024]; + log_it(L_DEBUG, "POST request with %u bytes of decoded data", a_request_size); + + if(a_request_content_type) { + dap_snprintf(l_buf, sizeof(l_buf), "Content-Type: %s\r\n", a_request_content_type); + l_request_headers = dap_string_append(l_request_headers, l_buf); + } + + if(a_custom) { + for(int i = 0; i < a_custom_count; i++) { + l_request_headers = dap_string_append(l_request_headers, (char*) a_custom[i]); + l_request_headers = dap_string_append(l_request_headers, "\r\n"); + } + } + + if(a_cookie) { + dap_snprintf(l_buf, sizeof(l_buf), "Cookie: %s\r\n", a_cookie); + l_request_headers = dap_string_append(l_request_headers, l_buf); + } + + dap_snprintf(l_buf, sizeof(l_buf), "Content-Length: %lu\r\n", a_request_size); + l_request_headers = dap_string_append(l_request_headers, l_buf); + } + + // adding string for GET request + char *l_get_str = NULL; + if(!dap_strcmp(a_method, "GET")) { + l_get_str = dap_strdup_printf("?%s", a_request); + } + // send header + dap_events_socket_write_f(l_ev_socket, "%s /%s%s HTTP/1.1\r\n" + "Host: %s\r\n" + "%s" + "\r\n", + a_method, a_path, l_get_str ? l_get_str : "", a_uplink_addr, l_request_headers->str); + // send data for POST request + if(!l_get_str) + dap_events_socket_write(l_ev_socket, a_request, a_request_size); + dap_events_socket_set_writable(l_ev_socket, true); + + DAP_DELETE(l_get_str); + dap_string_free(l_request_headers, true); + return l_client_http_internal; +} + +/** + * @brief dap_client_http_request + * @param a_uplink_addr + * @param a_uplink_port + * @param a_method GET or POST + * @param a_request_content_type like "text/text" + * @param a_path + * @param a_request + * @param a_request_size + * @param a_cookie + * @param a_response_callback + * @param a_error_callback + * @param a_obj + * @param a_custom + */ +void* dap_client_http_request(const char *a_uplink_addr, uint16_t a_uplink_port, const char * a_method, + const char* a_request_content_type, const char * a_path, void *a_request, size_t a_request_size, + char * a_cookie, dap_client_http_callback_data_t a_response_callback, + dap_client_http_callback_error_t a_error_callback, void *a_obj, void * a_custom) +{ + char *a_custom_new[1]; + size_t a_custom_count = 0; + // use no more then one custom item only + a_custom_new[0] = (char*) a_custom; + if(a_custom) + a_custom_count = 1; + + return dap_client_http_request_custom(a_uplink_addr, a_uplink_port, a_method, a_request_content_type, a_path, + a_request, a_request_size, a_cookie, a_response_callback, a_error_callback, a_obj, + a_custom_new, a_custom_count); +} diff --git a/src/dap_client_pvt.c b/src/dap_client_pvt.c index 8210530440965500004c0a30a280248d70f75564..9247f4e7ba57511edf04a717b765ed559a516d10 100644 --- a/src/dap_client_pvt.c +++ b/src/dap_client_pvt.c @@ -57,9 +57,11 @@ #include "dap_common.h" #include "dap_strfuncs.h" -#include "dap_http_client_simple.h" +//#include "dap_http_client_simple.h" +#include "dap_client_http.h" #include "dap_client.h" #include "dap_client_pvt.h" +#include "dap_server.h" #include "dap_stream.h" #include "dap_stream_ch.h" #include "dap_stream_ch_proc.h" @@ -92,7 +94,6 @@ void m_request_response(void * a_response, size_t a_response_size, void * a_obj) void m_request_error(int, void *); // stream callbacks - void m_es_stream_delete(dap_events_socket_t * a_es, void * arg); void m_es_stream_read(dap_events_socket_t * a_es, void * arg); void m_es_stream_write(dap_events_socket_t * a_es, void * arg); @@ -123,6 +124,8 @@ void dap_client_pvt_new(dap_client_pvt_t * a_client_internal) a_client_internal->stage = STAGE_BEGIN; // start point of state machine a_client_internal->stage_status = STAGE_STATUS_DONE; a_client_internal->uplink_protocol_version = DAP_PROTOCOL_VERSION; + // add to list + dap_client_pvt_hh_add(a_client_internal); } typedef struct dap_client_pvt_ref_count { @@ -131,10 +134,11 @@ typedef struct dap_client_pvt_ref_count { UT_hash_handle hh; } dap_client_pvt_ref_count_t; -static dap_client_pvt_ref_count_t *s_client_pvt_ref = NULL; -static pthread_mutex_t s_mutex_ref = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t s_cond_ref = PTHREAD_COND_INITIALIZER; +//static dap_client_pvt_ref_count_t *s_client_pvt_ref = NULL; +//static pthread_mutex_t s_mutex_ref = PTHREAD_MUTEX_INITIALIZER; +//static pthread_cond_t s_cond_ref = PTHREAD_COND_INITIALIZER; +/* int dap_client_pvt_ref(dap_client_pvt_t * a_client_internal) { if(a_client_internal==0x7fffd8003b00){ @@ -205,10 +209,6 @@ int dap_client_pvt_get_ref(dap_client_pvt_t * a_client_internal) return l_ref_count; } -/** - * @brief dap_client_pvt_wait - * @param a_client_internal - */ int dap_client_pvt_wait_unref(dap_client_pvt_t * a_client_internal, int a_timeout_ms) { if(!a_client_internal) @@ -252,6 +252,56 @@ int dap_client_pvt_wait_unref(dap_client_pvt_t * a_client_internal, int a_timeou while(l_client_pvt_ref); return l_ret; } +*/ + +/** + * @brief dap_client_disconnect + * @param a_client + * @return + */ +int dap_client_pvt_disconnect(dap_client_pvt_t *a_client_pvt) +{ + //dap_client_pvt_t *a_client_pvt = (a_client) ? DAP_CLIENT_PVT(a_client) : NULL; + if(!a_client_pvt) + return -1; + // stop connection + //dap_http_client_simple_request_break(l_client_internal->curl_sockfd); + + if(a_client_pvt && a_client_pvt->stream_socket) { + +// if ( l_client_internal->stream_es ) { +// dap_events_socket_remove_and_delete( l_client_internal->stream_es, true ); +// l_client_internal->stream_es = NULL; +// } + +// l_client_internal->stream_es->signal_close = true; + // start stopping connection + if(!dap_events_socket_kill_socket(a_client_pvt->stream_es)) { + int l_counter = 0; + // wait for stop of connection (max 0.7 sec.) + while(a_client_pvt->stream_es && l_counter < 70) { + dap_usleep(DAP_USEC_PER_SEC / 100); + l_counter++; + } + if(l_counter >= 70) { + dap_events_socket_remove_and_delete(a_client_pvt->stream_es, true); + } + } +// if (l_client_internal->stream_socket ) { +// close (l_client_internal->stream_socket); +// l_client_internal->stream_socket = 0; +// } + + return 1; + } + //l_client_internal->stream_socket = 0; + + a_client_pvt->is_reconnect = false; + + log_it(L_DEBUG, "dap_client_pvt_disconnect() done"); + + return -1; +} /** * @brief dap_client_pvt_delete @@ -261,6 +311,14 @@ static void dap_client_pvt_delete_in(dap_client_pvt_t * a_client_pvt) { if(!a_client_pvt) return; + // delete from list + if(dap_client_pvt_hh_del(a_client_pvt)<0){ + log_it(L_DEBUG, "dap_client_pvt 0x%x already deleted", a_client_pvt); + return; + } + + dap_client_pvt_disconnect(a_client_pvt); + log_it(L_INFO, "dap_client_pvt_delete 0x%x", a_client_pvt); if(a_client_pvt->session_key_id) @@ -278,21 +336,22 @@ static void dap_client_pvt_delete_in(dap_client_pvt_t * a_client_pvt) if(a_client_pvt->stream_key) dap_enc_key_delete(a_client_pvt->stream_key); - a_client_pvt->client = NULL; + //a_client_pvt->client = NULL; DAP_DELETE(a_client_pvt); } +/* static void* dap_client_pvt_delete_proc(void *a_arg) { dap_client_pvt_t * l_client_pvt = (dap_client_pvt_t*)a_arg; // wait for release l_client_pvt - dap_client_pvt_wait_unref(l_client_pvt, 20000000); + //dap_client_pvt_wait_unref(l_client_pvt, 20000000); //dap_client_reset(l_client_pvt->client); dap_client_pvt_delete_in(l_client_pvt); //DAP_DELETE(l_client_pvt->client); pthread_exit(0); -} +}*/ /** * @brief dap_client_pvt_delete @@ -301,7 +360,8 @@ static void* dap_client_pvt_delete_proc(void *a_arg) void dap_client_pvt_delete(dap_client_pvt_t * a_client_pvt) { pthread_t l_thread = NULL; - pthread_create(&l_thread, NULL, dap_client_pvt_delete_proc, a_client_pvt); + //pthread_create(&l_thread, NULL, dap_client_pvt_delete_proc, a_client_pvt); + dap_client_pvt_delete_in(a_client_pvt); } /** @@ -331,7 +391,7 @@ static void s_set_sock_nonblock(int sockfd, bool is_nonblock) */ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) { - bool l_is_unref = false; + //bool l_is_unref = false; switch (a_client_pvt->stage_status) { case STAGE_STATUS_IN_PROGRESS: { @@ -350,11 +410,11 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) log_it(L_DEBUG, "ENC request size %u", l_key_str_enc_size); dap_client_pvt_request(a_client_pvt, DAP_UPLINK_PATH_ENC_INIT "/gd4y5yh78w42aaagh", - l_key_str, l_key_str_size_max, m_enc_init_response, m_enc_init_error); + l_key_str, l_key_str_enc_size, m_enc_init_response, m_enc_init_error); DAP_DELETE(l_key_str); } break; - case STAGE_STREAM_CTL: { + case STAGE_STREAM_CTL: { log_it(L_INFO, "Go to stage STREAM_CTL: prepare the request"); char *l_request = dap_strdup_printf("%d", DAP_CLIENT_PROTOCOL_VERSION); @@ -384,8 +444,9 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (char *)&buffsize, &optsize ); } #else - setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (const void *) 50000, sizeof(int)); - setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (const void *) 50000, sizeof(int)); + int buffsize = 65536; + setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (const void *) &buffsize, sizeof(int)); + setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (const void *) &buffsize, sizeof(int)); #endif // Wrap socket and setup callbacks @@ -400,7 +461,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) // add to dap_worker dap_events_socket_create_after(a_client_pvt->stream_es); - a_client_pvt->stream_es->_inheritor = a_client_pvt->client; + a_client_pvt->stream_es->_inheritor = a_client_pvt;//->client; a_client_pvt->stream = dap_stream_new_es(a_client_pvt->stream_es); a_client_pvt->stream->is_client_to_uplink = true; a_client_pvt->stream_session = dap_stream_session_pure_new(); // may be from in packet? @@ -416,14 +477,16 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) l_remote_addr.sin_port = htons(a_client_pvt->uplink_port); if(inet_pton(AF_INET, a_client_pvt->uplink_addr, &(l_remote_addr.sin_addr)) < 0) { log_it(L_ERROR, "Wrong remote address '%s:%u'", a_client_pvt->uplink_addr, a_client_pvt->uplink_port); - close(a_client_pvt->stream_socket); - a_client_pvt->stream_socket = 0; + //close(a_client_pvt->stream_socket); + dap_events_socket_kill_socket(a_client_pvt->stream_es); + //a_client_pvt->stream_socket = 0; a_client_pvt->stage_status = STAGE_STATUS_ERROR; } else { int l_err = 0; if((l_err = connect(a_client_pvt->stream_socket, (struct sockaddr *) &l_remote_addr, sizeof(struct sockaddr_in))) != -1) { + a_client_pvt->stream_es->flags &= ~DAP_SOCK_SIGNAL_CLOSE; //s_set_sock_nonblock(a_client_pvt->stream_socket, false); log_it(L_INFO, "Remote address connected (%s:%u) with sock_id %d", a_client_pvt->uplink_addr, a_client_pvt->uplink_port, a_client_pvt->stream_socket); @@ -432,7 +495,8 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) else { log_it(L_ERROR, "Remote address can't connected (%s:%u) with sock_id %d", a_client_pvt->uplink_addr, a_client_pvt->uplink_port); - close(a_client_pvt->stream_socket); + dap_events_socket_kill_socket(a_client_pvt->stream_es); + //close(a_client_pvt->stream_socket); a_client_pvt->stream_socket = 0; a_client_pvt->stage_status = STAGE_STATUS_ERROR; } @@ -504,9 +568,9 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) log_it(L_ERROR, "Error state, doing callback if present"); if(a_client_pvt->stage_status_error_callback) { - dap_client_pvt_ref(a_client_pvt); + //dap_client_pvt_ref(a_client_pvt); a_client_pvt->stage_status_error_callback(a_client_pvt->client, (void*)l_is_last_attempt); - dap_client_pvt_unref(a_client_pvt); + //dap_client_pvt_unref(a_client_pvt); // Expecting that its one-shot callback //a_client_internal->stage_status_error_callback = NULL; } @@ -514,7 +578,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) a_client_pvt->stage = STAGE_STREAM_ABORT; a_client_pvt->stage_status = STAGE_STATUS_ABORTING; // unref pvt - l_is_unref = true; + //l_is_unref = true; } else { if(!l_is_last_attempt) { @@ -535,7 +599,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) log_it(L_INFO, "Too many connection attempts. Tries are over."); //a_client_pvt->stage_status = STAGE_STATUS_DONE; // unref pvt - l_is_unref = true; + //l_is_unref = true; } } } @@ -543,6 +607,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) case STAGE_STATUS_DONE: { log_it(L_INFO, "Stage status %s is done", dap_client_stage_str(a_client_pvt->stage)); + // go to next stage if(a_client_pvt->stage_status_done_callback) { a_client_pvt->stage_status_done_callback(a_client_pvt->client, NULL); // Expecting that its one-shot callback @@ -563,7 +628,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) } else{ log_it(L_ERROR, "!! dap_CLIENT_STAGE_STATUS_DONE but not l_is_last_stage (cur stage=%d, target=%d)!!",a_client_pvt->stage, a_client_pvt->stage_target); } - l_is_unref = true; + //l_is_unref = true; } break; default: @@ -573,10 +638,10 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) if(a_client_pvt->stage_status_callback) a_client_pvt->stage_status_callback(a_client_pvt->client, NULL); - if(l_is_unref) { + //if(l_is_unref) { // unref pvt - dap_client_pvt_unref(a_client_pvt); - } + //dap_client_pvt_unref(a_client_pvt); + //} } /** @@ -589,7 +654,7 @@ void dap_client_pvt_stage_transaction_begin(dap_client_pvt_t * a_client_internal dap_client_callback_t a_done_callback) { // ref pvt client - dap_client_pvt_ref(a_client_internal); + //dap_client_pvt_ref(a_client_internal); a_client_internal->stage_status_done_callback = a_done_callback; a_client_internal->stage = a_stage_next; @@ -613,22 +678,24 @@ void dap_client_pvt_request(dap_client_pvt_t * a_client_internal, const char * a a_client_internal->request_error_callback = a_response_error; a_client_internal->is_encrypted = false; - size_t l_url_size_max = 0; - char *l_url = NULL; - if(a_path) { - l_url_size_max = dap_strlen(a_client_internal->uplink_addr) + strlen(a_path) + 15; - l_url = DAP_NEW_Z_SIZE(char, l_url_size_max); - - snprintf(l_url, l_url_size_max, "http://%s:%u/%s", a_client_internal->uplink_addr, - a_client_internal->uplink_port, a_path); - } else { - l_url_size_max = strlen(a_client_internal->uplink_addr) + 15; - l_url = DAP_NEW_Z_SIZE(char, l_url_size_max); - snprintf(l_url, l_url_size_max, "http://%s:%u", a_client_internal->uplink_addr, a_client_internal->uplink_port); - } - a_client_internal->curl = dap_http_client_simple_request(l_url, a_request ? "POST" : "GET", "text/text", a_request, - a_request_size, NULL, m_request_response, m_request_error, &a_client_internal->curl_sockfd, a_client_internal, NULL); - DAP_DELETE(l_url); +// size_t l_url_size_max = 0; +// char *l_url = NULL; +// if(a_path) { +// l_url_size_max = dap_strlen(a_client_internal->uplink_addr) + strlen(a_path) + 15; +// l_url = DAP_NEW_Z_SIZE(char, l_url_size_max); +// +// snprintf(l_url, l_url_size_max, "http://%s:%u/%s", a_client_internal->uplink_addr, +// a_client_internal->uplink_port, a_path); +// } else { +// l_url_size_max = strlen(a_client_internal->uplink_addr) + 15; +// l_url = DAP_NEW_Z_SIZE(char, l_url_size_max); +// snprintf(l_url, l_url_size_max, "http://%s:%u", a_client_internal->uplink_addr, a_client_internal->uplink_port); +// } + dap_client_http_request(a_client_internal->uplink_addr,a_client_internal->uplink_port, a_request ? "POST" : "GET", "text/text", a_path, a_request, + a_request_size, NULL, m_request_response, m_request_error, a_client_internal, NULL); +// a_client_internal->curl = dap_http_client_simple_request(l_url, a_request ? "POST" : "GET", "text/text", a_request, +// a_request_size, NULL, m_request_response, m_request_error, &a_client_internal->curl_sockfd, a_client_internal, NULL); +// DAP_DELETE(l_url); } /** @@ -655,9 +722,9 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char size_t l_query_size = a_query ? strlen(a_query) : 0; size_t l_url_size; - char l_url[1024] = { 0 }; - snprintf(l_url, 1024, "http://%s:%u", a_client_internal->uplink_addr, a_client_internal->uplink_port); - l_url_size = strlen(l_url); +// char l_url[1024] = { 0 }; +// snprintf(l_url, 1024, "http://%s:%u", a_client_internal->uplink_addr, a_client_internal->uplink_port); +// l_url_size = strlen(l_url); size_t l_sub_url_enc_size_max = l_sub_url_size ? (5 * l_sub_url_size + 16) : 0; char *l_sub_url_enc = l_sub_url_size ? DAP_NEW_Z_SIZE(char, l_sub_url_enc_size_max + 1) : NULL; @@ -666,8 +733,8 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char char *l_query_enc = (is_query_enc) ? (l_query_size ? DAP_NEW_Z_SIZE(char, l_query_enc_size_max + 1) : NULL) : (char*) a_query; - size_t l_url_full_size_max = 5 * l_sub_url_size + 5 * l_query_size + 16 + l_url_size + 2; - char * l_url_full = DAP_NEW_Z_SIZE(char, l_url_full_size_max + 1); +// size_t l_url_full_size_max = 5 * l_sub_url_size + 5 * l_query_size + 16 + l_url_size + 2; +// char * l_url_full = DAP_NEW_Z_SIZE(char, l_url_full_size_max + 1); size_t l_request_enc_size_max = a_request_size ? a_request_size * 2 + 16 : 0; char * l_request_enc = a_request_size ? DAP_NEW_Z_SIZE(char, l_request_enc_size_max + 1) : NULL; @@ -702,6 +769,7 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char l_request_enc, l_request_enc_size_max, DAP_ENC_DATA_TYPE_RAW); +/* if(a_path) { if(l_sub_url_size) { if(l_query_size) { @@ -717,6 +785,20 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char } else { snprintf(l_url_full, l_url_full_size_max, "%s", l_url); } +*/ + char *l_path = NULL; + if(a_path) { + if(l_sub_url_size) { + if(l_query_size) { + l_path = dap_strdup_printf("%s/%s?%s", a_path, l_sub_url_enc, l_query_enc); + + } else { + l_path = dap_strdup_printf("%s/%s", a_path, l_sub_url_enc); + } + } else { + l_path = dap_strdup(a_path); + } + } size_t l_key_hdr_str_size_max = strlen(a_client_internal->session_key_id) + 10; char *l_key_hdr_str = DAP_NEW_Z_SIZE(char, l_key_hdr_str_size_max); @@ -730,9 +812,12 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char a_custom_new[1] = "SessionCloseAfterRequest: true"; a_custom_count++; } - dap_http_client_simple_request_custom(l_url_full, a_request ? "POST" : "GET", "text/text", - l_request_enc, l_request_enc_size, NULL, - m_request_response, a_client_internal->curl_sockfd ,m_request_error, a_client_internal, a_custom_new, a_custom_count); + dap_client_http_request_custom(a_client_internal->uplink_addr, a_client_internal->uplink_port, a_request ? "POST" : "GET", "text/text", + l_path, l_request_enc, l_request_enc_size, NULL, + m_request_response, m_request_error, a_client_internal, a_custom_new, a_custom_count); +// dap_http_client_simple_request_custom(l_url_full, a_request ? "POST" : "GET", "text/text", +// l_request_enc, l_request_enc_size, NULL, +// m_request_response, a_client_internal->curl_sockfd ,m_request_error, a_client_internal, a_custom_new, a_custom_count); DAP_DELETE(l_key_hdr_str); if(l_sub_url_enc) @@ -741,8 +826,8 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char if(is_query_enc && l_query_enc) DAP_DELETE(l_query_enc); - if(l_url_full) - DAP_DELETE(l_url_full); +// if(l_url_full) +// DAP_DELETE(l_url_full); if(l_request_enc) DAP_DELETE(l_request_enc); @@ -756,13 +841,17 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char void m_request_error(int a_err_code, void * a_obj) { dap_client_pvt_t * a_client_internal = (dap_client_pvt_t *) a_obj; + dap_client_pvt_hh_lock(); + if(!dap_client_pvt_hh_get(a_client_internal)){ + dap_client_pvt_hh_unlock(); + return; + } if(a_client_internal && a_client_internal->request_error_callback && a_client_internal->client) { if(a_client_internal && a_client_internal->request_error_callback && a_client_internal->client && a_client_internal->client->_internal) a_client_internal->request_error_callback(a_client_internal->client, a_err_code); } - // unref pvt client - dap_client_pvt_unref(a_client_internal); + dap_client_pvt_hh_unlock(); } /** @@ -776,7 +865,7 @@ void m_request_response(void * a_response, size_t a_response_size, void * a_obj) dap_client_pvt_t * a_client_internal = (dap_client_pvt_t *) a_obj; if(!a_client_internal || !a_client_internal->client) return; - int l_ref = dap_client_pvt_get_ref(a_client_internal); + //int l_ref = dap_client_pvt_get_ref(a_client_internal); if(a_client_internal->is_encrypted) { size_t l_response_dec_size_max = a_response_size ? a_response_size * 2 + 16 : 0; char * l_response_dec = a_response_size ? DAP_NEW_Z_SIZE(char, l_response_dec_size_max) : NULL; @@ -795,9 +884,9 @@ void m_request_response(void * a_response, size_t a_response_size, void * a_obj) a_client_internal->request_response_callback(a_client_internal->client, a_response, a_response_size); } - int l_ref2 = dap_client_pvt_get_ref(a_client_internal); + //int l_ref2 = dap_client_pvt_get_ref(a_client_internal); // unref pvt client - dap_client_pvt_unref(a_client_internal); + //dap_client_pvt_unref(a_client_internal); //dap_client_pvt_unref(DAP_CLIENT_PVT(a_client_internal->client)); } @@ -1078,19 +1167,20 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg) { log_it(L_INFO, "================= stream delete/peer reconnect"); - dap_client_t *l_client = DAP_CLIENT(a_es); + //dap_client_t *l_client = DAP_CLIENT(a_es); + dap_client_pvt_t * l_client_pvt = a_es->_inheritor; - if(l_client == NULL) { - log_it(L_ERROR, "dap_client is not initialized"); + if(l_client_pvt == NULL) { + log_it(L_ERROR, "dap_client_pvt_t is not initialized"); return; } - pthread_mutex_lock(&l_client->mutex); + //pthread_mutex_lock(&l_client->mutex); - dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client); + //dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client); log_it(L_DEBUG, "client_pvt=0x%x", l_client_pvt); if(l_client_pvt == NULL) { log_it(L_ERROR, "dap_client_pvt is not initialized"); - pthread_mutex_unlock(&l_client->mutex); + //pthread_mutex_unlock(&l_client->mutex); return; } @@ -1099,19 +1189,19 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg) // if(l_client_pvt->client && l_client_pvt->client == l_client) // dap_client_reset(l_client_pvt->client); - // l_client_pvt->client= NULL; - l_client_pvt->stream_es = NULL; - // log_it(L_DEBUG, "dap_stream_session_close()"); // sleep(3); -// dap_stream_session_close(l_client_pvt->stream_session->id); - + dap_stream_session_close(l_client_pvt->stream_session->id); l_client_pvt->stream_session = NULL; - pthread_mutex_unlock(&l_client->mutex); + // signal to permit deleting of l_client_pvt + l_client_pvt->stream_es = NULL; + //pthread_mutex_unlock(&l_client->mutex); + +/* disable reconnect from here if(l_client_pvt->is_reconnect) { log_it(L_DEBUG, "l_client_pvt->is_reconnect = true"); @@ -1119,6 +1209,7 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg) } else log_it(L_DEBUG, "l_client_pvt->is_reconnect = false"); +*/ } /** @@ -1128,8 +1219,8 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg) */ void m_es_stream_read(dap_events_socket_t * a_es, void * arg) { - dap_client_t * l_client = DAP_CLIENT(a_es); - dap_client_pvt_t * l_client_pvt = (l_client) ? DAP_CLIENT_PVT(l_client) : NULL; + //dap_client_t * l_client = DAP_CLIENT(a_es); + dap_client_pvt_t * l_client_pvt = a_es->_inheritor;//(l_client) ? DAP_CLIENT_PVT(l_client) : NULL; if(!l_client_pvt) { log_it(L_ERROR, "m_es_stream_read: l_client_pvt is NULL!"); return; @@ -1175,8 +1266,9 @@ void m_es_stream_read(dap_events_socket_t * a_es, void * arg) */ void m_es_stream_write(dap_events_socket_t * a_es, void * arg) { - dap_client_t * l_client = DAP_CLIENT(a_es); - dap_client_pvt_t * l_client_pvt = (l_client) ? DAP_CLIENT_PVT(l_client) : NULL; + //dap_client_t * l_client = DAP_CLIENT(a_es); + //dap_client_pvt_t * l_client_pvt = (l_client) ? DAP_CLIENT_PVT(l_client) : NULL; + dap_client_pvt_t * l_client_pvt = a_es->_inheritor; if(!l_client_pvt) { log_it(L_ERROR, "m_es_stream_write: l_client_pvt is NULL!"); return; @@ -1207,8 +1299,9 @@ void m_es_stream_write(dap_events_socket_t * a_es, void * arg) void m_es_stream_error(dap_events_socket_t * a_es, void * arg) { - dap_client_t * l_client = DAP_CLIENT(a_es); - dap_client_pvt_t * l_client_pvt = (l_client) ? DAP_CLIENT_PVT(l_client) : NULL; + //dap_client_t * l_client = DAP_CLIENT(a_es); + //dap_client_pvt_t * l_client_pvt = (l_client) ? DAP_CLIENT_PVT(l_client) : NULL; + dap_client_pvt_t * l_client_pvt = a_es->_inheritor; if(!l_client_pvt) { log_it(L_ERROR, "m_es_stream_error: l_client_pvt is NULL!"); return; diff --git a/src/dap_client_pvt_hh.c b/src/dap_client_pvt_hh.c new file mode 100644 index 0000000000000000000000000000000000000000..cd1d902a526d300d44be9b1105cf5f4d8f0a7fe7 --- /dev/null +++ b/src/dap_client_pvt_hh.c @@ -0,0 +1,121 @@ +/* + * Authors: + * Alexander Lysikov <alexander.lysikov@demlabs.net> + * DeM Labs Inc. https://demlabs.net + * Copyright (c) 2020 + * + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <stdio.h> +#include <pthread.h> +#include <uthash.h> + +#include "dap_common.h" +#include "dap_client_pvt.h" + +typedef struct dap_client_pvt_hh { + dap_client_pvt_t *client_pvt; + UT_hash_handle hh; +} dap_client_pvt_hh_t; + +// List of active connections +static dap_client_pvt_hh_t *s_client_pvt_list = NULL; +// for separate access to s_conn_list +static pthread_mutex_t s_client_pvt_list_mutex = PTHREAD_MUTEX_INITIALIZER; + +/** + * dap_client_pvt_hh_lock + */ +int dap_client_pvt_hh_lock(void) +{ + return pthread_mutex_lock(&s_client_pvt_list_mutex); +} + +/** + * dap_client_pvt_hh_unlock + */ +int dap_client_pvt_hh_unlock(void) +{ + return pthread_mutex_unlock(&s_client_pvt_list_mutex); +} + +/** + * find active connection in the list + * + * return 0 OK, -1 error, -2 connection not found + */ +void* dap_client_pvt_hh_get(dap_client_pvt_t* a_client_pvt) +{ + if(!a_client_pvt) + return NULL; + dap_client_pvt_hh_t *l_cur_item; + HASH_FIND_PTR(s_client_pvt_list, &a_client_pvt, l_cur_item); + return (void*) l_cur_item; +} + +/** + * Add new active connection to the list + * + * return 0 OK, -1 error, -2 connection present + */ +int dap_client_pvt_hh_add(dap_client_pvt_t* a_client_pvt) +{ + int l_ret = 0; + if(!a_client_pvt) + return -1; + pthread_mutex_lock(&s_client_pvt_list_mutex); + dap_client_pvt_hh_t *l_cur_item; + HASH_FIND_PTR(s_client_pvt_list, &a_client_pvt, l_cur_item); + if(l_cur_item == NULL) { + l_cur_item = DAP_NEW(dap_client_pvt_hh_t); + l_cur_item->client_pvt = a_client_pvt; + HASH_ADD_PTR(s_client_pvt_list, client_pvt, l_cur_item); + l_ret = 0; + } + // connection already present + else + l_ret = -2; + //connect_list = g_list_append(connect_list, client); + pthread_mutex_unlock(&s_client_pvt_list_mutex); + return l_ret; +} + +/** + * Delete active connection from the list + * + * return 0 OK, -1 error, -2 connection not found + */ +int dap_client_pvt_hh_del(dap_client_pvt_t *a_client_pvt) +{ + int ret = -1; + if(!a_client_pvt) + return -1; + pthread_mutex_lock(&s_client_pvt_list_mutex); + dap_client_pvt_hh_t *l_cur_item; + HASH_FIND_PTR(s_client_pvt_list, &a_client_pvt, l_cur_item); + if(l_cur_item != NULL) { + HASH_DEL(s_client_pvt_list, l_cur_item); + DAP_DELETE(l_cur_item); + ret = 0; + } + // connection not found in the hash + else { + ret = -2; + } + pthread_mutex_unlock(&s_client_pvt_list_mutex); + return ret; +}