diff --git a/dap-sdk/core/src/dap_config.c b/dap-sdk/core/src/dap_config.c index 392c7bf05d3866ba1f23ed569f2ed7753a32052a..7089b308fe6ee0044b597452478bac1069ab9419 100755 --- a/dap-sdk/core/src/dap_config.c +++ b/dap-sdk/core/src/dap_config.c @@ -505,7 +505,7 @@ uint64_t dap_config_get_item_uint64_default(dap_config_t * a_config, const char */ static dap_config_item_t * dap_config_get_item(dap_config_t * a_config, const char * a_section_path, const char * a_item_name) { - dap_config_item_t * l_item_section = DAP_CONFIG_INTERNAL(a_config)->item_root ; + dap_config_item_t * l_item_section = a_config? DAP_CONFIG_INTERNAL(a_config)->item_root: NULL ; while(l_item_section){ if (strcmp(l_item_section->name,a_section_path)==0){ dap_config_item_t * l_item = l_item_section->childs; @@ -552,10 +552,11 @@ char** dap_config_get_array_str(dap_config_t * a_config, const char * a_section_ if(array_length != NULL) *array_length = 0; return NULL; + }else{ + if (array_length != NULL) + *array_length = item->array_length; + return item->data_str_array; } - if (array_length != NULL) - *array_length = item->array_length; - return item->data_str_array; } @@ -569,7 +570,7 @@ char** dap_config_get_array_str(dap_config_t * a_config, const char * a_section_ */ const char * dap_config_get_item_str_default(dap_config_t * a_config, const char * a_section_path, const char * a_item_name, const char * a_value_default) { - dap_config_item_t * l_item_section = DAP_CONFIG_INTERNAL(a_config)->item_root ; + dap_config_item_t * l_item_section =a_config? DAP_CONFIG_INTERNAL(a_config)->item_root: NULL ; while(l_item_section){ if (strcmp(l_item_section->name,a_section_path)==0){ dap_config_item_t * l_item = l_item_section->childs; diff --git a/dap-sdk/crypto/include/dap_hash.h b/dap-sdk/crypto/include/dap_hash.h index 2cff59c6bde874a244cb13a5d20e83805b35af93..fc557c3e510949ce15811ffe84ca9ab393e6cf1e 100755 --- a/dap-sdk/crypto/include/dap_hash.h +++ b/dap-sdk/crypto/include/dap_hash.h @@ -37,6 +37,8 @@ #define DAP_HASH_FAST_SIZE 32 #define DAP_CHAIN_HASH_FAST_SIZE 32 +#define DAP_CHAIN_HASH_MAX_SIZE 63 + typedef enum dap_hash_type { DAP_HASH_TYPE_KECCAK = 0, DAP_HASH_TYPE_SLOW_0 = 1, diff --git a/dap-sdk/net/client/dap_client_http.c b/dap-sdk/net/client/dap_client_http.c index 66011d201f442906ee85c06b41add2bfc47c0f28..43784be52497d454c98d4c9f997abea1cc95ef0a 100644 --- a/dap-sdk/net/client/dap_client_http.c +++ b/dap-sdk/net/client/dap_client_http.c @@ -119,7 +119,7 @@ static bool s_timer_timeout_check(void * a_arg) dap_events_t * l_events = dap_events_get_default(); assert(l_events); - dap_worker_t * l_worker =(dap_worker_t*) pthread_getspecific(l_events->pth_key_worker);; // We're in own esocket context + dap_worker_t * l_worker = dap_events_get_current_worker(l_events); // We're in own esocket context assert(l_worker); if(dap_events_socket_check_unsafe(l_worker, l_es) ){ @@ -469,7 +469,7 @@ void* dap_client_http_request_custom(dap_worker_t * a_worker,const char *a_uplin l_ev_socket->remote_addr.sin_family = AF_INET; l_ev_socket->remote_addr.sin_port = htons(a_uplink_port); l_ev_socket->flags |= DAP_SOCK_CONNECTING; - l_ev_socket->type = DESCRIPTOR_TYPE_SOCKET; + l_ev_socket->type = DESCRIPTOR_TYPE_SOCKET_CLIENT; l_ev_socket->flags |= DAP_SOCK_READY_TO_WRITE; int l_err = connect(l_socket, (struct sockaddr *) &l_ev_socket->remote_addr, sizeof(struct sockaddr_in)); diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 9ceee08ebd4921c5c9a69a47c3fc4ff5c26ac11e..32a98e2596e34b9bdfee5fc7a8f3bf0a874222bb 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -58,6 +58,7 @@ #include "dap_strfuncs.h" #include "dap_cert.h" +#include "dap_timerfd.h" //#include "dap_http_client_simple.h" #include "dap_client_http.h" #include "dap_client.h" diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 567dac7d42dcc042b04783545183407435642f3e..3d3c993718217648138f464f5e4fa339814cc1ba 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -28,10 +28,16 @@ #include <string.h> #include <assert.h> #include <errno.h> + #ifndef _WIN32 #include <sys/epoll.h> +#include <sys/types.h> #include <sys/select.h> #include <unistd.h> +#include <sys/socket.h> +#include <arpa/inet.h> + + #else #include <winsock2.h> #include <windows.h> @@ -39,6 +45,8 @@ #include <io.h> #endif + + #if defined (DAP_EVENTS_CAPS_QUEUE_MQUEUE) #include <sys/time.h> #include <sys/resource.h> @@ -137,7 +145,8 @@ dap_events_socket_t *dap_events_socket_wrap_no_add( dap_events_t *a_events, ret->socket = a_sock; ret->events = a_events; - memcpy(&ret->callbacks, a_callbacks, sizeof(ret->callbacks) ); + if (a_callbacks) + memcpy(&ret->callbacks, a_callbacks, sizeof(ret->callbacks) ); ret->flags = DAP_SOCK_READY_TO_READ; ret->buf_in_size_max = DAP_EVENTS_SOCKET_BUF; @@ -297,6 +306,43 @@ dap_events_socket_t * dap_events_socket_create_type_pipe_mt(dap_worker_t * a_w, return l_es; } +/** + * @brief dap_events_socket_create + * @param a_type + * @param a_callbacks + * @return + */ +dap_events_socket_t * dap_events_socket_create(dap_events_desc_type_t a_type, dap_events_socket_callbacks_t* a_callbacks) +{ + dap_events_socket_t * l_es = NULL; + switch(a_type){ + case DESCRIPTOR_TYPE_SOCKET_CLIENT: + case DESCRIPTOR_TYPE_SOCKET_UDP :{ + #ifdef WIN32 + SOCKET l_sock; + #else + int l_sock; + #endif + l_sock = socket(AF_INET, (a_type==DESCRIPTOR_TYPE_SOCKET_CLIENT? SOCK_STREAM : SOCK_DGRAM) + | SOCK_NONBLOCK , 0); + if (l_sock == INVALID_SOCKET) { + log_it(L_ERROR, "Socket create error"); + break; + } + + dap_events_socket_t * l_es =dap_events_socket_wrap_no_add(dap_events_get_default(),l_sock,a_callbacks); + if(!l_es){ + log_it(L_CRITICAL,"Can't allocate memory for the new esocket"); + break; + } + l_es->type = DESCRIPTOR_TYPE_EVENT; + } break; + default: + log_it(L_CRITICAL,"Can't create socket type %d", a_type ); + } + return l_es; +} + /** * @brief dap_events_socket_create_type_pipe_unsafe * @param a_w diff --git a/dap-sdk/net/core/dap_proc_thread.c b/dap-sdk/net/core/dap_proc_thread.c index 59c52149d1a1a994754aeea4349628bd5c61cc92..2b24e3ddf07b0a37a3f95e9e8a966dcca3e3b989 100644 --- a/dap-sdk/net/core/dap_proc_thread.c +++ b/dap-sdk/net/core/dap_proc_thread.c @@ -22,6 +22,7 @@ */ #include <assert.h> +#include <errno.h> #include "dap_server.h" #if defined(DAP_EVENTS_CAPS_EPOLL) && !defined(DAP_OS_WINDOWS) diff --git a/dap-sdk/net/core/dap_server.c b/dap-sdk/net/core/dap_server.c index 208a50bcc8b69ae99f4bdadb840e52c0b23d50ed..f50e423ea743fa87dd08ad67665d5fa4fe8df6a7 100644 --- a/dap-sdk/net/core/dap_server.c +++ b/dap-sdk/net/core/dap_server.c @@ -322,7 +322,7 @@ static dap_events_socket_t * s_es_server_create(dap_events_t * a_events, int a_s //fcntl(a_sock, F_SETFL, O_NONBLOCK); ret = dap_events_socket_wrap_no_add(a_events, a_sock, a_callbacks); - ret->type = DESCRIPTOR_TYPE_SOCKET; + ret->type = DESCRIPTOR_TYPE_SOCKET_CLIENT; ret->server = a_server; ret->hostaddr = DAP_NEW_Z_SIZE(char, 256); ret->service = DAP_NEW_Z_SIZE(char, 54); diff --git a/dap-sdk/net/core/dap_timerfd.c b/dap-sdk/net/core/dap_timerfd.c index ae4eb4e2f4738cb89bcbdb334c88873d0c7d2c31..1bbc56e3c1b4bdfc9343be0923140c61fd06b6e7 100644 --- a/dap-sdk/net/core/dap_timerfd.c +++ b/dap-sdk/net/core/dap_timerfd.c @@ -84,7 +84,34 @@ void __stdcall TimerAPCb(void* arg, DWORD low, DWORD high) { // Timer high valu * @return */ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg) +{ + dap_timerfd_t* l_timerfd = dap_timerfd_create( a_timeout_ms, a_callback, a_callback_arg); + dap_worker_add_events_socket(l_timerfd->events_socket, a_worker); + return l_timerfd; +} +/** + * @brief dap_timerfd_start_on_proc_thread + * @param a_proc_thread + * @param a_timeout_ms + * @param a_callback + * @param a_callback_arg + * @return + */ +dap_timerfd_t* dap_timerfd_start_on_proc_thread(dap_proc_thread_t * a_proc_thread, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg) +{ + dap_timerfd_t* l_timerfd = dap_timerfd_create( a_timeout_ms, a_callback, a_callback_arg); + return l_timerfd; +} + +/** + * @brief dap_timerfd_create + * @param a_timeout_ms + * @param a_callback + * @param a_callback_arg + * @return + */ +dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg) { dap_timerfd_t *l_timerfd = DAP_NEW(dap_timerfd_t); #if defined DAP_OS_UNIX @@ -154,7 +181,7 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t memset(&l_s_callbacks,0,sizeof (l_s_callbacks)); l_s_callbacks.timer_callback = s_es_callback_timer; - dap_events_socket_t * l_events_socket = dap_events_socket_wrap_no_add(a_worker->events, l_tfd, &l_s_callbacks); + dap_events_socket_t * l_events_socket = dap_events_socket_wrap_no_add(dap_events_get_default(), l_tfd, &l_s_callbacks); l_events_socket->type = DESCRIPTOR_TYPE_TIMER; // pass l_timerfd to events_socket l_events_socket->_inheritor = l_timerfd; @@ -168,7 +195,6 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t #ifdef DAP_OS_WINDOWS l_timerfd->th = l_th; #endif - dap_worker_add_events_socket(l_events_socket, a_worker); return l_timerfd; } diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index dbe26e98b7cfcf801b841e7b12b6d8fe85806141..6c9162215032cd9a8fee49021694093886bfdd6d 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -41,6 +41,7 @@ #include "dap_worker.h" #include "dap_events.h" #include "dap_enc_base64.h" +#include "dap_proc_queue.h" #define LOG_TAG "dap_worker" @@ -210,7 +211,7 @@ void *dap_worker_thread(void *arg) if( l_flag_hup ) { switch (l_cur->type ){ case DESCRIPTOR_TYPE_SOCKET_UDP: - case DESCRIPTOR_TYPE_SOCKET: { + case DESCRIPTOR_TYPE_SOCKET_CLIENT: { int l_err = getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size); #ifndef DAP_OS_WINDOWS if (l_sock_err) { @@ -258,7 +259,7 @@ void *dap_worker_thread(void *arg) if(l_flag_error) { switch (l_cur->type ){ case DESCRIPTOR_TYPE_SOCKET_LISTENING: - case DESCRIPTOR_TYPE_SOCKET: + case DESCRIPTOR_TYPE_SOCKET_CLIENT: getsockopt(l_cur->socket, SOL_SOCKET, SO_ERROR, (void *)&l_sock_err, (socklen_t *)&l_sock_err_size); #ifdef DAP_OS_WINDOWS log_it(L_ERROR, "Winsock error: %d", l_sock_err); @@ -306,7 +307,7 @@ void *dap_worker_thread(void *arg) #endif l_errno = errno; break; - case DESCRIPTOR_TYPE_SOCKET: + case DESCRIPTOR_TYPE_SOCKET_CLIENT: l_must_read_smth = true; l_bytes_read = recv(l_cur->fd, (char *) (l_cur->buf_in + l_cur->buf_in_size), l_cur->buf_in_size_max - l_cur->buf_in_size, 0); @@ -397,7 +398,7 @@ void *dap_worker_thread(void *arg) if (l_must_read_smth){ // Socket/Descriptor read if(l_bytes_read > 0) { - if (l_cur->type == DESCRIPTOR_TYPE_SOCKET || l_cur->type == DESCRIPTOR_TYPE_SOCKET_UDP) { + if (l_cur->type == DESCRIPTOR_TYPE_SOCKET_CLIENT || l_cur->type == DESCRIPTOR_TYPE_SOCKET_UDP) { l_cur->last_time_active = l_cur_time; } l_cur->buf_in_size += l_bytes_read; @@ -439,7 +440,7 @@ void *dap_worker_thread(void *arg) if (l_flag_rdhup){ switch (l_cur->type ){ case DESCRIPTOR_TYPE_SOCKET_UDP: - case DESCRIPTOR_TYPE_SOCKET: + case DESCRIPTOR_TYPE_SOCKET_CLIENT: dap_events_socket_set_readable_unsafe(l_cur, false); dap_events_socket_set_writable_unsafe(l_cur, false); l_cur->buf_out_size = 0; @@ -454,7 +455,7 @@ void *dap_worker_thread(void *arg) // If its outgoing connection if ( l_flag_write && !l_cur->server && (l_cur->flags & DAP_SOCK_CONNECTING) && - (l_cur->type == DESCRIPTOR_TYPE_SOCKET || l_cur->type == DESCRIPTOR_TYPE_SOCKET_UDP )){ + (l_cur->type == DESCRIPTOR_TYPE_SOCKET_CLIENT || l_cur->type == DESCRIPTOR_TYPE_SOCKET_UDP )){ int l_error = 0; socklen_t l_error_len = sizeof(l_error); char l_error_buf[128]; @@ -512,7 +513,7 @@ void *dap_worker_thread(void *arg) int l_errno=0; switch (l_cur->type){ - case DESCRIPTOR_TYPE_SOCKET: { + case DESCRIPTOR_TYPE_SOCKET_CLIENT: { l_bytes_sent = send(l_cur->socket, (const char *)l_cur->buf_out, l_cur->buf_out_size, MSG_DONTWAIT | MSG_NOSIGNAL); #ifdef DAP_OS_WINDOWS @@ -697,7 +698,7 @@ static void s_queue_add_es_callback( dap_events_socket_t * a_es, void * a_arg) switch( l_es_new->type){ case DESCRIPTOR_TYPE_SOCKET_UDP: - case DESCRIPTOR_TYPE_SOCKET: + case DESCRIPTOR_TYPE_SOCKET_CLIENT: case DESCRIPTOR_TYPE_SOCKET_LISTENING:{ #ifdef DAP_OS_UNIX @@ -862,7 +863,7 @@ static bool s_socket_all_check_activity( void * a_arg) //log_it(L_DEBUG,"Check sockets activity on worker #%u at %s", l_worker->id, l_curtimebuf); pthread_rwlock_rdlock(&l_worker->esocket_rwlock); HASH_ITER(hh_worker, l_worker->esockets, l_es, tmp ) { - if ( l_es->type == DESCRIPTOR_TYPE_SOCKET || l_es->type == DESCRIPTOR_TYPE_SOCKET_UDP ){ + if ( l_es->type == DESCRIPTOR_TYPE_SOCKET_CLIENT || l_es->type == DESCRIPTOR_TYPE_SOCKET_UDP ){ if ( !(l_es->flags & DAP_SOCK_SIGNAL_CLOSE) && ( l_curtime >= (l_es->last_time_active + s_connection_timeout) ) && !l_es->no_close ) { log_it( L_INFO, "Socket %u timeout (diff %u ), closing...", l_es->socket, l_curtime - (time_t)l_es->last_time_active - s_connection_timeout ); diff --git a/dap-sdk/net/core/include/dap_events.h b/dap-sdk/net/core/include/dap_events.h index 9b4c5523acc2d32ebe80a134c405d55f8b3103ed..58c4a36705b2720adb28d59e65a1217e69464e61 100644 --- a/dap-sdk/net/core/include/dap_events.h +++ b/dap-sdk/net/core/include/dap_events.h @@ -69,6 +69,9 @@ dap_worker_t * dap_events_worker_get(uint8_t a_index); uint32_t dap_get_cpu_count(); void dap_cpu_assign_thread_on(uint32_t a_cpu_id); +static inline dap_worker_t * dap_events_get_current_worker(dap_events_t * a_events){ + (dap_worker_t*) pthread_getspecific(a_events->pth_key_worker); +} #ifdef __cplusplus } diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index ec59899e20b1a72de883999150e1a6a6d379ee7e..97c1fbfe5a28ce84ebb4f9506b59fc74f46c157f 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -28,6 +28,13 @@ #define DAP_EVENTS_SOCKET_MAX 8194 +#ifndef _WIN32 +typedef int SOCKET; +#define closesocket close +#define INVALID_SOCKET -1 // for win32 = (SOCKET)(~0) +#define SOCKET_ERROR -1 // for win32 = (-1) +#endif + // Caps for different platforms #if defined (DAP_OS_ANDROID) #define DAP_EVENTS_CAPS_POLL @@ -129,7 +136,7 @@ typedef struct dap_events_socket_callbacks { #define DAP_EVENTS_SOCKET_BUF 100000 typedef enum { - DESCRIPTOR_TYPE_SOCKET = 0, + DESCRIPTOR_TYPE_SOCKET_CLIENT = 0, DESCRIPTOR_TYPE_SOCKET_UDP, DESCRIPTOR_TYPE_SOCKET_LISTENING, DESCRIPTOR_TYPE_QUEUE, @@ -242,6 +249,8 @@ extern "C" { int dap_events_socket_init(); // Init clients module void dap_events_socket_deinit(); // Deinit clients module +dap_events_socket_t * dap_events_socket_create(dap_events_desc_type_t a_type, dap_events_socket_callbacks_t* a_callbacks); + dap_events_socket_t * dap_events_socket_create_type_queue_ptr_unsafe(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback); dap_events_socket_t * dap_events_socket_create_type_queue_ptr_mt(dap_worker_t * a_w, dap_events_socket_callback_queue_ptr_t a_callback); int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket); diff --git a/dap-sdk/net/core/include/dap_proc_thread.h b/dap-sdk/net/core/include/dap_proc_thread.h index 2e76df9e7b808bc7d0ca3694ea4e68007460d720..d731338197a8898160052ec2da57195879ad179e 100644 --- a/dap-sdk/net/core/include/dap_proc_thread.h +++ b/dap-sdk/net/core/include/dap_proc_thread.h @@ -68,4 +68,9 @@ int dap_proc_thread_esocket_write_inter(dap_proc_thread_t * a_thread,dap_worker_ const void * a_data, size_t a_data_size); int dap_proc_thread_esocket_write_f_inter(dap_proc_thread_t * a_thread,dap_worker_t * a_worker, dap_events_socket_t *a_esocket, const char * a_format,...); -void dap_proc_thread_worker_exec_callback(dap_proc_thread_t * a_thread, size_t a_worker_id, dap_worker_callback_t a_callback, void * a_arg); + +typedef void (*dap_proc_worker_callback_t)(dap_worker_t *,void *); + +void dap_proc_thread_worker_exec_callback(dap_proc_thread_t * a_thread, size_t a_worker_id, dap_proc_worker_callback_t a_callback, void * a_arg); + +dap_proc_thread_t * dap_proc_thread_assign_esocket_unsafe(dap_proc_thread_t * a_thread, dap_events_socket_t * a_esocket); diff --git a/dap-sdk/net/core/include/dap_timerfd.h b/dap-sdk/net/core/include/dap_timerfd.h index 671e8b42ce6b41abf92138e32e6b576a01f07d9e..c6b14e3a67c78d2037ae9a3e6611b0e8f75dcf5d 100644 --- a/dap-sdk/net/core/include/dap_timerfd.h +++ b/dap-sdk/net/core/include/dap_timerfd.h @@ -37,6 +37,7 @@ #include "dap_common.h" #include "dap_events_socket.h" +#include "dap_proc_thread.h" typedef bool (*dap_timerfd_callback_t)(void* ); // Callback for timer. If return true, it will be called after next timeout @@ -58,7 +59,9 @@ typedef struct dap_timerfd { } dap_timerfd_t; int dap_timerfd_init(); +dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg); dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *callback_arg); dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg); +dap_timerfd_t* dap_timerfd_start_on_proc_thread(dap_proc_thread_t * a_proc_thread, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg); void dap_timerfd_delete(dap_timerfd_t *l_timerfd); diff --git a/dap-sdk/net/core/include/dap_worker.h b/dap-sdk/net/core/include/dap_worker.h index 0182843e6d4697a91fa30a0b83b1111449119ffc..4447b5f84b49f7bacb28ae157f06324b53bf4771 100644 --- a/dap-sdk/net/core/include/dap_worker.h +++ b/dap-sdk/net/core/include/dap_worker.h @@ -27,9 +27,9 @@ #include "dap_common.h" #include "dap_events_socket.h" -#include "dap_timerfd.h" -#include "dap_proc_queue.h" +typedef struct dap_proc_queue dap_proc_queue_t; +typedef struct dap_timerfd dap_timerfd_t; typedef struct dap_worker { uint32_t id; diff --git a/dap-sdk/net/server/CMakeLists.txt b/dap-sdk/net/server/CMakeLists.txt index 4bde7fcb05405412d3fabc6cb88e73224826d203..be52ca81d5392fbf895fe6196b3118ed68b3d0e2 100644 --- a/dap-sdk/net/server/CMakeLists.txt +++ b/dap-sdk/net/server/CMakeLists.txt @@ -1,7 +1,6 @@ project(libdap-server C) cmake_minimum_required(VERSION 3.0) - add_subdirectory(http_server) add_subdirectory(enc_server) add_subdirectory(json_rpc) diff --git a/dap-sdk/net/server/http_server/CMakeLists.txt b/dap-sdk/net/server/http_server/CMakeLists.txt index b297cb547014523f276edd8647d4257d04d1fe3f..05eaf454d8bac09a32b60942944a764979df4406 100644 --- a/dap-sdk/net/server/http_server/CMakeLists.txt +++ b/dap-sdk/net/server/http_server/CMakeLists.txt @@ -44,5 +44,5 @@ if(WIN32) endif() if(UNIX) - target_link_libraries(dap_http_server curl dap_core dap_server_core magic json-c) + target_link_libraries(dap_http_server dap_core dap_server_core magic json-c) endif() diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index 39561ca0af59ab4824074ab1c0a8ce2ba41b44d2..5a43fb11ace7c032ba0dcc756e4b5c6d41975ff7 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -38,6 +38,7 @@ #endif #include "dap_common.h" +#include "dap_timerfd.h" #include "dap_stream.h" #include "dap_stream_pkt.h" diff --git a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c index 1d7e654250f6b60541b58ea41ef3696501846570..6a45d7919a6e3d59c131e83c02279a6f5cc8bc36 100644 --- a/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c +++ b/modules/channel/chain-net-srv/dap_stream_ch_chain_net_srv.c @@ -26,6 +26,7 @@ along with any CellFrame SDK based project. If not, see <http://www.gnu.org/lic #include "dap_common.h" #include "dap_hash.h" #include "rand/dap_rand.h" +#include "dap_timerfd.h" #include "dap_chain.h" #include "dap_chain_datum_tx.h" diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 43c110229ecc40c874ae24785c4ae20a202b01da..ac22939f2b67e998de19c5df40fd2fff79927b6f 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -696,6 +696,98 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) return; } switch (l_ch_pkt->hdr.type) { + /// --- GDB update --- + // Request for gdbs list update + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ:{ + l_ch_chain->state = CHAIN_STATE_UPDATE_GLOBAL_DB; + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + }break; + // Response with metadata organized in TSD + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD:{ + + }break; + // Response with gdb element hashes and sizes + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB:{ + for ( dap_stream_ch_chain_update_element_t * l_element =(dap_stream_ch_chain_update_element_t *) l_chain_pkt->data; + (size_t) (((byte_t*)l_element) - l_chain_pkt->data ) < l_chain_pkt_data_size; + l_element++){ + dap_stream_ch_chain_hash_item_t * l_hash_item = NULL; + HASH_FIND(hh,l_ch_chain->remote_gdbs, &l_element->hash, sizeof (l_element->hash), l_hash_item ); + if( ! l_hash_item ){ + l_hash_item = DAP_NEW(dap_stream_ch_chain_hash_item_t); + memcpy(&l_hash_item->hash, &l_element->hash, sizeof (l_element->hash)); + l_hash_item->size = l_element->size; + HASH_ADD(hh, l_ch_chain->remote_gdbs, hash, sizeof (l_hash_item->hash), l_hash_item); + if (s_debug_chain_sync){ + char l_hash_str[72]={ [0]='\0'}; + dap_chain_hash_fast_to_str(&l_hash_item->hash,l_hash_str,sizeof (l_hash_str)); + log_it(L_INFO,"In: Updated remote hash gdb list with %s ", l_hash_str); + } + } + } + }break; + // End of response + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END:{ + l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS ; // Switch on update chains hashes + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ , + l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, + l_ch_chain->request_hdr.cell_id, NULL, 0); + }break; + + /// --- Chains update --- + // Request for atoms list update + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ:{ + l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS; + dap_stream_ch_set_ready_to_write_unsafe(a_ch, true); + }break; + // Response with metadata organized in TSD + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD :{ + + }break; + // Response with atom hashes and sizes + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS :{ + for ( dap_stream_ch_chain_update_element_t * l_element =(dap_stream_ch_chain_update_element_t *) l_chain_pkt->data; + (size_t) (((byte_t*)l_element) - l_chain_pkt->data ) < l_chain_pkt_data_size; + l_element++){ + dap_stream_ch_chain_hash_item_t * l_hash_item = NULL; + HASH_FIND(hh,l_ch_chain->remote_atoms, &l_element->hash, sizeof (l_element->hash), l_hash_item ); + if( ! l_hash_item ){ + l_hash_item = DAP_NEW(dap_stream_ch_chain_hash_item_t); + memcpy(&l_hash_item->hash, &l_element->hash, sizeof (l_element->hash)); + l_hash_item->size = l_element->size; + HASH_ADD(hh, l_ch_chain->remote_atoms, hash, sizeof (l_hash_item->hash), l_hash_item); + if (s_debug_chain_sync){ + char l_hash_str[72]={ [0]='\0'}; + dap_chain_hash_fast_to_str(&l_hash_item->hash,l_hash_str,sizeof (l_hash_str)); + log_it(L_INFO,"In: Updated remote atom hash list with %s ", l_hash_str); + } + } + } + }break; + // End of response + case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END:{ + l_ch_chain->state = CHAIN_STATE_UPDATE_CHAINS ; // Switch on update chains hashes + dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ , + l_ch_chain->request_hdr.net_id, l_ch_chain->request_hdr.chain_id, + l_ch_chain->request_hdr.cell_id, NULL, 0); + }break; + + // first packet of data with source node address + case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN: { + if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)){ + log_it(L_INFO, "From "NODE_ADDR_FP_STR": FIRST_CHAIN data_size=%d net 0x%016x chain 0x%016x cell 0x%016x ", + NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr), + l_chain_pkt_data_size, l_ch_chain->request_hdr.net_id.uint64 , + l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64); + }else{ + log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN", l_chain_pkt_data_size); + dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, + l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, + "ERROR_CHAIN_PACKET_TYPE_FIRST_CHAIN_INCORRET_DATA_SIZE(%zd/%zd)",l_chain_pkt_data_size, sizeof(dap_chain_node_addr_t)); + } + } + break; + case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: { log_it(L_INFO, "In: SYNCED_ALL net 0x%016x chain 0x%016x cell 0x%016x", l_chain_pkt->hdr.net_id.uint64 , l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64); @@ -823,21 +915,6 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, "ERROR_CHAIN_PKT_DATA_SIZE" ); } - } - break; - // first packet of data with source node address - case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN: { - if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t)){ - log_it(L_INFO, "From "NODE_ADDR_FP_STR": FIRST_CHAIN data_size=%d net 0x%016x chain 0x%016x cell 0x%016x ", - NODE_ADDR_FP_ARGS_S(l_ch_chain->request.node_addr), - l_chain_pkt_data_size, l_ch_chain->request_hdr.net_id.uint64 , - l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64); - }else{ - log_it(L_WARNING,"Incorrect data size %zd in packet DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN", l_chain_pkt_data_size); - dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id, - l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, - "ERROR_CHAIN_PACKET_TYPE_FIRST_CHAIN_INCORRET_DATA_SIZE(%zd/%zd)",l_chain_pkt_data_size, sizeof(dap_chain_node_addr_t)); - } } break; case DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN: { diff --git a/modules/channel/chain/include/dap_stream_ch_chain.h b/modules/channel/chain/include/dap_stream_ch_chain.h index ab36655e890ba901ab2cb83e069b9791fef737b2..2cf9448dce513401c2c032e92ad355d33c30b02c 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain.h +++ b/modules/channel/chain/include/dap_stream_ch_chain.h @@ -52,6 +52,13 @@ typedef struct dap_chain_pkt_item { byte_t *pkt_data; } dap_chain_pkt_item_t; +typedef struct dap_stream_ch_chain_hash_item{ + dap_hash_fast_t hash; + uint32_t size; + UT_hash_handle hh; +} dap_stream_ch_chain_hash_item_t; + + typedef struct dap_stream_ch_chain { dap_stream_ch_t * ch; @@ -60,6 +67,10 @@ typedef struct dap_stream_ch_chain { uint64_t stats_request_atoms_processed; uint64_t stats_request_gdb_processed; + + dap_stream_ch_chain_hash_item_t * remote_atoms; // Remote atoms + dap_stream_ch_chain_hash_item_t * remote_gdbs; // Remote gdbs + // request section dap_chain_atom_iter_t *request_atom_iter; dap_db_log_list_t *request_global_db_trs; // list of global db records diff --git a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h index 6dd24f49105284732e54e2d75308b4f78d073936..8e36ebba3c1be85885d286e33e087225a6c8cf0f 100644 --- a/modules/channel/chain/include/dap_stream_ch_chain_pkt.h +++ b/modules/channel/chain/include/dap_stream_ch_chain_pkt.h @@ -57,23 +57,37 @@ #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ 0x05 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_META 0x15 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD 0x15 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS 0x25 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END 0x35 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ 0x65 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_META 0x66 -#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB 0x57 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_REQ 0x06 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD 0x16 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB 0x26 +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_END 0x36 #define DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR 0xff +// TSD sections +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_PROTO 0x0001 // Protocol version +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_COUNT 0x0002 // Items count +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_HASH_LAST 0x0003 // Hash of last(s) item +#define DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_TSD_HASH_FIRST 0x0004 // Hash of first(s) item + typedef enum dap_stream_ch_chain_state{ CHAIN_STATE_IDLE=0, - CHAIN_STATE_SYNC_CHAINS=1, - CHAIN_STATE_SYNC_GLOBAL_DB=2, - CHAIN_STATE_SYNC_ALL=3 + CHAIN_STATE_UPDATE_GLOBAL_DB=1, // Update GDB hashtable + CHAIN_STATE_UPDATE_CHAINS=2, // Update chains hashtable + CHAIN_STATE_SYNC_CHAINS=3, + CHAIN_STATE_SYNC_GLOBAL_DB=4, + CHAIN_STATE_SYNC_ALL=5 } dap_stream_ch_chain_state_t; +typedef struct dap_stream_ch_chain_update_element{ + dap_hash_fast_t hash; + uint32_t size; +} DAP_ALIGN_PACKED dap_stream_ch_chain_update_element_t; typedef struct dap_stream_ch_chain_sync_request{ dap_chain_node_addr_t node_addr; // Requesting node's address diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 9bfe93088f90ffa1db82449068569120c13b7c1e..e1523f1edb7e0299eb256f663b86473d7221dee3 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -82,7 +82,7 @@ #include "dap_stream_ch_chain_pkt.h" #include "dap_stream_ch.h" #include "dap_stream_ch_pkt.h" -#include "dap_dns_server.h" +#include "dap_chain_node_dns_client.h" #include "dap_module.h" @@ -121,7 +121,6 @@ typedef struct dap_chain_net_pvt{ HANDLE state_proc_cond; #endif - dap_events_socket_t * event_state_proc; pthread_mutex_t state_mutex_cond; dap_chain_node_role_t node_role; @@ -151,6 +150,13 @@ typedef struct dap_chain_net_pvt{ dap_chain_net_state_t state; dap_chain_net_state_t state_target; uint16_t acl_idx; + + // Main loop timer + dap_timerfd_t * main_timer; + + // General rwlock for structure + pthread_rwlock_t rwlock; + } dap_chain_net_pvt_t; typedef struct dap_chain_net_item{ @@ -188,11 +194,11 @@ static void s_node_link_callback_disconnected(dap_chain_node_client_t * a_node_c static void s_node_link_callback_stage(dap_chain_node_client_t * a_node_client,dap_client_stage_t a_stage, void * a_arg); static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client, int a_error, void * a_arg); -static int s_net_states_proc(dap_chain_net_t * a_net); +static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg); -static void * s_net_check_thread ( void * a_net); +static bool s_net_check_timer_callback ( void * a_net); static void s_net_check_thread_start( dap_chain_net_t * a_net ); static void s_net_proc_kill( dap_chain_net_t * a_net ); int s_net_load(const char * a_net_name, uint16_t a_acl_idx); @@ -432,12 +438,14 @@ static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client, * @brief s_net_states_proc * @param l_net */ -static int s_net_states_proc(dap_chain_net_t *a_net) +static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) { - dap_chain_net_pvt_t *l_pvt_net = PVT(a_net); - int ret = 0; + dap_chain_net_t *l_net = (dap_chain_net_t *) a_arg; + dap_chain_net_pvt_t *l_pvt_net = PVT(l_net); + pthread_rwlock_wrlock(&l_pvt_net->rwlock); switch (l_pvt_net->state) { + // State OFFLINE where we don't do anything case NET_STATE_OFFLINE: { // delete all links dap_list_t *l_tmp = l_pvt_net->links; @@ -448,8 +456,10 @@ static int s_net_states_proc(dap_chain_net_t *a_net) l_tmp = l_next; } l_pvt_net->links = NULL; - dap_list_free_full(l_pvt_net->links_info, free); - l_pvt_net->links_info = NULL; + if(l_pvt_net->links_info){ + dap_list_free_full(l_pvt_net->links_info, free); + l_pvt_net->links_info = NULL; + } if ( l_pvt_net->state_target != NET_STATE_OFFLINE ){ l_pvt_net->state = NET_STATE_LINKS_PREPARE; break; @@ -459,12 +469,13 @@ static int s_net_states_proc(dap_chain_net_t *a_net) l_pvt_net->last_sync = 0; } break; + // Prepare links case NET_STATE_LINKS_PREPARE: { - log_it(L_NOTICE,"%s.state: NET_STATE_LINKS_PREPARE", a_net->pub.name); - uint64_t l_own_addr = dap_chain_net_get_cur_addr_int(a_net); + log_it(L_NOTICE,"%s.state: NET_STATE_LINKS_PREPARE", l_net->pub.name); + uint64_t l_own_addr = dap_chain_net_get_cur_addr_int(l_net); if (l_pvt_net->node_info) { for (size_t i = 0; i < l_pvt_net->node_info->hdr.links_number; i++) { - dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(a_net, &l_pvt_net->node_info->links[i]); + dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(l_net, &l_pvt_net->node_info->links[i]); if (!l_link_node_info || l_link_node_info->hdr.address.uint64 == l_own_addr) { continue; // Do not link with self } @@ -483,7 +494,7 @@ static int s_net_states_proc(dap_chain_net_t *a_net) case NODE_ROLE_CELL_MASTER: { if (l_pvt_net->seed_aliases_count) { // Add other root nodes as synchronization links - s_fill_links_from_root_aliases(a_net); + s_fill_links_from_root_aliases(l_net); break; } } @@ -499,8 +510,8 @@ static int s_net_states_proc(dap_chain_net_t *a_net) uint16_t i, l_port; if (l_pvt_net->seed_aliases_count) { i = rand() % l_pvt_net->seed_aliases_count; - dap_chain_node_addr_t *l_remote_addr = dap_chain_node_alias_find(a_net, l_pvt_net->seed_aliases[i]); - dap_chain_node_info_t *l_remote_node_info = dap_chain_node_info_read(a_net, l_remote_addr); + dap_chain_node_addr_t *l_remote_addr = dap_chain_node_alias_find(l_net, l_pvt_net->seed_aliases[i]); + dap_chain_node_info_t *l_remote_node_info = dap_chain_node_info_read(l_net, l_remote_addr); l_addr.s_addr = l_remote_node_info ? l_remote_node_info->hdr.ext_addr_v4.s_addr : 0; DAP_DELETE(l_remote_node_info); l_port = DNS_LISTEN_PORT; @@ -521,8 +532,8 @@ static int s_net_states_proc(dap_chain_net_t *a_net) #else struct in_addr _in_addr = { { .S_addr = l_addr.S_un.S_addr } }; #endif - log_it(L_INFO, "dns get addrs %s : %d, net %s", inet_ntoa(_in_addr), l_port, a_net->pub.name); - int l_res = dap_dns_client_get_addr(l_addr, l_port, a_net->pub.name, l_link_node_info); + log_it(L_INFO, "dns get addrs %s : %d, net %s", inet_ntoa(_in_addr), l_port, l_net->pub.name); + int l_res = dap_dns_client_get_addr(l_addr, l_port, l_net->pub.name, l_link_node_info); if (!l_res && l_link_node_info->hdr.address.uint64 != l_own_addr) { l_pvt_net->links_info = dap_list_append(l_pvt_net->links_info, l_link_node_info); l_tries = 0; @@ -533,7 +544,7 @@ static int s_net_states_proc(dap_chain_net_t *a_net) } l_tries++; } - s_fill_links_from_root_aliases(a_net); + s_fill_links_from_root_aliases(l_net); } break; } if (l_pvt_net->state_target != NET_STATE_OFFLINE) { @@ -551,7 +562,7 @@ static int s_net_states_proc(dap_chain_net_t *a_net) } break; case NET_STATE_LINKS_CONNECTING: { - log_it(L_DEBUG, "%s.state: NET_STATE_LINKS_CONNECTING",a_net->pub.name); + log_it(L_DEBUG, "%s.state: NET_STATE_LINKS_CONNECTING",l_net->pub.name); for (dap_list_t *l_tmp = l_pvt_net->links_info; l_tmp; l_tmp = dap_list_next(l_tmp)) { dap_chain_node_info_t *l_link_info = (dap_chain_node_info_t *)l_tmp->data; dap_chain_node_client_t *l_node_client = dap_chain_node_client_create_n_connect(l_link_info,"CN",s_node_link_callback_connected, @@ -598,14 +609,14 @@ static int s_net_states_proc(dap_chain_net_t *a_net) // Get last timestamp in log if wasn't SYNC_FROM_ZERO flag if (! (l_pvt_net->flags & F_DAP_CHAIN_NET_SYNC_FROM_ZERO) ) l_sync_gdb.id_start = (uint64_t) dap_db_get_last_id_remote(l_node_client->remote_node_addr.uint64); - l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(a_net); + l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); log_it(L_DEBUG, "Prepared request to gdb sync from %llu to %llu", l_sync_gdb.id_start, l_sync_gdb.id_end?l_sync_gdb.id_end:-1 ); // find dap_chain_id_t - dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(a_net, "gdb"); + dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb"); dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t ) {0}; dap_chain_node_client_reset(l_node_client); - size_t l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, a_net->pub.id, - l_chain_id, a_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); + size_t l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_net->pub.id, + l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); if (l_res == 0) { log_it(L_WARNING, "Can't send GDB sync request"); continue; @@ -627,8 +638,8 @@ static int s_net_states_proc(dap_chain_net_t *a_net) } dap_chain_node_client_reset(l_node_client); - l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS, a_net->pub.id, - l_chain_id, a_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); + l_res = dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS, l_net->pub.id, + l_chain_id, l_net->pub.cell_id, &l_sync_gdb, sizeof(l_sync_gdb)); l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); switch (l_res) { case -1: @@ -670,7 +681,7 @@ static int s_net_states_proc(dap_chain_net_t *a_net) dap_stream_worker_t *l_worker = dap_client_get_stream_worker(l_node_client->client); dap_chain_t * l_chain = NULL; int l_res = 0; - DL_FOREACH (a_net->pub.chains, l_chain) { + DL_FOREACH (l_net->pub.chains, l_chain) { dap_chain_node_client_reset(l_node_client); dap_stream_ch_chain_sync_request_t l_request = {0}; @@ -685,13 +696,13 @@ static int s_net_states_proc(dap_chain_net_t *a_net) char l_hash_str[128]={[0]='\0'}; dap_chain_hash_fast_to_str(&l_request.hash_from,l_hash_str,sizeof (l_hash_str)-1); log_it(L_DEBUG,"Send sync chain request to"NODE_ADDR_FP_STR" for %s:%s from %s to infinity", - NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr) ,a_net->pub.name, l_chain->name, l_hash_str); + NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr) ,l_net->pub.name, l_chain->name, l_hash_str); } }else log_it(L_DEBUG,"Send sync chain request for all the chains for addr "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_node_client->remote_node_addr)); - dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, a_net->pub.id, - l_chain->id, a_net->pub.cell_id, &l_request, sizeof(l_request)); + dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id, + l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request)); // wait for finishing of request int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms // TODO add progress info to console @@ -711,9 +722,9 @@ static int s_net_states_proc(dap_chain_net_t *a_net) dap_chain_node_client_reset(l_node_client); - l_request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(a_net); - dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS, a_net->pub.id, - l_chain->id, a_net->pub.cell_id, &l_request, sizeof(l_request)); + l_request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); + dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS, l_net->pub.id, + l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request)); l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms); switch (l_res) { case -1: @@ -778,16 +789,17 @@ static int s_net_states_proc(dap_chain_net_t *a_net) break; default: log_it (L_DEBUG, "Unprocessed state"); } - return ret; + pthread_rwlock_unlock(&l_pvt_net->rwlock); + + return false; } /** - * @brief s_net_proc_thread - * @details Brings up and check the Dap Chain Network - * @param a_cfg Network1 configuration + * @brief s_net_check_timer_callback + * @details Brings up and checkup the Dap Chain Network * @return */ -static void *s_net_check_thread ( void *a_net ) +static bool s_net_check_timer_callback ( void *a_net ) { dap_chain_net_t *l_net = (dap_chain_net_t *)a_net; dap_chain_net_pvt_t *l_net_pvt = (dap_chain_net_pvt_t *)(void *)l_net->pvt; @@ -797,35 +809,31 @@ static void *s_net_check_thread ( void *a_net ) // set callback to update data //dap_chain_global_db_set_callback_for_update_base(s_net_proc_thread_callback_update_db); - while(1){ - if ( l_net_pvt->flags & F_DAP_CHAIN_NET_SHUTDOWN ) { - log_it(L_NOTICE,"Shutdown chain net context thread for network \"%s\" (flags 0x%08X)",l_net->pub.name , l_net_pvt->flags); - return NULL; - } + if ( l_net_pvt->flags & F_DAP_CHAIN_NET_SHUTDOWN ) { + log_it(L_NOTICE,"Shutdown chain net context thread for network \"%s\" (flags 0x%08X)",l_net->pub.name , l_net_pvt->flags); + return false; + } - //log_it(L_DEBUG, "Check net states"); - // check or start sync - s_net_states_proc( l_net ); + //log_it(L_DEBUG, "Check net states"); + // check or start sync + dap_proc_queue_add_callback_inter( l_net_pvt->main_timer->events_socket->worker->proc_queue_input,s_net_states_proc,l_net ); - if (l_net_pvt->flags & F_DAP_CHAIN_NET_GO_SYNC) { - // check or start sync - s_net_states_proc( l_net ); - continue; - } - struct timespec l_to; - - // checking whether new sync is needed - time_t l_sync_timeout = 180; // 1800 sec = 30 min - clock_gettime(CLOCK_MONOTONIC, &l_to); - // start sync every l_sync_timeout sec - if(l_to.tv_sec >= l_net_pvt->last_sync + l_sync_timeout) { - l_net_pvt->flags |= F_DAP_CHAIN_NET_GO_SYNC; - s_net_states_proc( l_net ); - } - //log_it(L_DEBUG, "Sleep on 10 seconds..."); - sleep(10); + if (l_net_pvt->flags & F_DAP_CHAIN_NET_GO_SYNC) { + // check or start sync + dap_proc_queue_add_callback_inter( l_net_pvt->main_timer->events_socket->worker->proc_queue_input,s_net_states_proc,l_net ); + return true; } - return NULL; + struct timespec l_to; + + // checking whether new sync is needed + time_t l_sync_timeout = dap_config_get_item_uint64_default(g_config,"chain_net","net_sync_timeout",180) ; // 180 sec = 3 min + clock_gettime(CLOCK_MONOTONIC, &l_to); + // start sync every l_sync_timeout sec + if(l_to.tv_sec >= l_net_pvt->last_sync + l_sync_timeout) { + l_net_pvt->flags |= F_DAP_CHAIN_NET_GO_SYNC; + dap_proc_queue_add_callback_inter( l_net_pvt->main_timer->events_socket->worker->proc_queue_input,s_net_states_proc,l_net ); + } + return true; } /** @@ -834,7 +842,8 @@ static void *s_net_check_thread ( void *a_net ) */ static void s_net_check_thread_start( dap_chain_net_t * a_net ) { - pthread_create(&s_net_check_pid,NULL, s_net_check_thread, a_net); + PVT(a_net)->main_timer = dap_timerfd_start(dap_config_get_item_uint64_default(g_config,"chain_net","net_check_timeout",10)*1000, + s_net_check_timer_callback, a_net); } diff --git a/modules/net/dap_chain_node_dns_client.c b/modules/net/dap_chain_node_dns_client.c new file mode 100644 index 0000000000000000000000000000000000000000..c73b1508688369905bfba6a1f2f517eecd2197e8 --- /dev/null +++ b/modules/net/dap_chain_node_dns_client.c @@ -0,0 +1,323 @@ +/* + * Authors: + * Roman Khlopkov <roman.khlopkov@demlabs.net> + * Dmitriy Gerasimov <dmitriy.gerasmiov@demlabs.net> + * DeM Labs Ltd https://demlabs.net + * DeM Labs Open source community https://gitlab.demlabs.net + * Copyright (c) 2021 + * All rights reserved. + + This file is part of DapChain SDK the open source project + + DapChain SDK is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DapChain SDK is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DapChain SDK based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "dap_events.h" +#include "dap_timerfd.h" +#include "dap_chain_node_dns_server.h" +#include "dap_chain_node_dns_client.h" + +#define LOG_TAG "dap_chain_node_dns_client" + +struct dns_client +{ + dap_events_socket_t * parent; + dap_chain_node_info_t *result; + struct in_addr addr; + uint16_t port; + char *name; + dap_dns_client_node_info_request_success_callback_t callback_success; + dap_dns_client_node_info_request_error_callback_t callback_error; + void * callbacks_arg; + + bool is_callbacks_called; +}; + +static void s_dns_client_esocket_read_callback(dap_events_socket_t * a_esocket, void * a_arg); +static void s_dns_client_esocket_error_callback(dap_events_socket_t * a_esocket, int a_error); +static bool s_dns_client_esocket_timeout_callback( void * a_arg); +static void s_dns_client_esocket_delete_callback(dap_events_socket_t * a_esocket, void * a_arg); + +/** + * @brief s_dns_client_esocket_new_callback + * @param a_esocket + * @param a_arg + */ +static void s_dns_client_esocket_new_callback(dap_events_socket_t * a_esocket, void * a_arg) +{ + struct dns_client * l_dns_client = (struct dns_client*) a_esocket->_inheritor; +} + +/** + * @brief s_dns_client_esocket_read_callback + * @param a_esocket + * @param a_arg + */ +static void s_dns_client_esocket_read_callback(dap_events_socket_t * a_esocket, void * a_arg) +{ + struct dns_client * l_dns_client = (struct dns_client*) a_esocket->_inheritor; + struct sockaddr_in l_clientaddr; + socklen_t l_clientlen = sizeof(l_clientaddr); + size_t l_recieved = recvfrom(l_sock, (char *)l_buf, l_buf_size, 0, (struct sockaddr *)&l_clientaddr, &l_clientlen); + size_t l_addr_point = DNS_HEADER_SIZE + strlen(a_name) + 2 + 2 * sizeof(uint16_t) + DNS_ANSWER_SIZE - sizeof(uint32_t); + if (l_recieved < l_addr_point + sizeof(uint32_t)) { + log_it(L_WARNING, "DNS answer incomplete"); + closesocket(l_sock); + return -5; + } + l_cur = l_buf + 3 * sizeof(uint16_t); + int l_answers_count = ntohs(*(uint16_t *)l_cur); + if (l_answers_count != 1) { + log_it(L_WARNING, "Incorrect DNS answer format"); + closesocket(l_sock); + return -6; + } + l_cur = l_buf + l_addr_point; + if (a_result) { + a_result->hdr.ext_addr_v4.s_addr = ntohl(*(uint32_t *)l_cur); + } + l_cur = l_buf + 5 * sizeof(uint16_t); + int l_additions_count = ntohs(*(uint16_t *)l_cur); + if (l_additions_count == 1) { + l_cur = l_buf + l_addr_point + DNS_ANSWER_SIZE; + if (a_result) { + a_result->hdr.ext_port = ntohs(*(uint16_t *)l_cur); + } + l_cur += sizeof(uint16_t); + if (a_result) { + a_result->hdr.address.uint64 = be64toh(*(uint64_t *)l_cur); + } + } + closesocket(l_sock); +} + +/** + * @brief s_dns_client_esocket_error_callback + * @param a_esocket + * @param a_error + */ +static void s_dns_client_esocket_error_callback(dap_events_socket_t * a_esocket, int a_error) +{ + struct dns_client * l_dns_client = (struct dns_client*) a_esocket->_inheritor; + log_it(L_ERROR,"DNS client esocket error %d", a_error); + l_dns_client->callback_error(a_esocket->worker, l_dns_client->result,l_dns_client->callbacks_arg,a_error); + l_dns_client->is_callbacks_called = true; +} + +/** + * @brief s_dns_client_esocket_timeout_callback + * @param a_worker + * @param a_arg + * @return + */ +static bool s_dns_client_esocket_timeout_callback(void * a_arg) +{ + dap_events_socket_t * l_es = (dap_events_socket_t*) a_arg; + assert(l_es); + dap_events_t * l_events = dap_events_get_default(); + assert(l_events); + + dap_worker_t * l_worker = dap_events_get_current_worker(l_events); // We're in own esocket context + assert(l_worker); + + struct dns_client * l_dns_client = (struct dns_client*) l_es->_inheritor; + + if(dap_events_socket_check_unsafe(l_worker, l_es) ){ // If we've not closed this esocket + log_it(L_WARNING,"DNS request timeout, bad network?"); + if(! l_dns_client->is_callbacks_called ){ + l_dns_client->callback_error(l_es->worker,l_dns_client->result,l_dns_client->callbacks_arg,ETIMEDOUT); + l_dns_client->is_callbacks_called = true; + } + + dap_events_socket_remove_and_delete_unsafe( l_es, false); + } + return false; +} + +/** + * @brief s_dns_client_esocket_delete_callback + * @param a_esocket + * @param a_arg + */ +static void s_dns_client_esocket_delete_callback(dap_events_socket_t * a_esocket, void * a_arg) +{ + struct dns_client * l_dns_client = (struct dns_client*) a_esocket->_inheritor; + if(! l_dns_client->is_callbacks_called ) + l_dns_client->callback_error(a_esocket->worker,l_dns_client->result,l_dns_client->callbacks_arg,EBUSY); + if(l_dns_client->name) + DAP_DELETE(l_dns_client->name); +} + +/** + * @brief dap_chain_node_info_dns_request + * @param a_addr + * @param a_port + * @param a_name + * @param a_result + * @param a_callback_success + * @param a_callback_error + * @param a_callbacks_arg + */ +void dap_chain_node_info_dns_request(struct in_addr a_addr, uint16_t a_port, char *a_name, dap_chain_node_info_t *a_result, + dap_dns_client_node_info_request_success_callback_t a_callback_success, + dap_dns_client_node_info_request_error_callback_t a_callback_error,void * a_callbacks_arg) +{ + struct dns_client * l_dns_client = DAP_NEW_Z(struct dns_client); + l_dns_client->name = dap_strdup(a_name); + l_dns_client->callback_error = a_callback_error; + l_dns_client->callback_success = a_callback_success; + l_dns_client->callbacks_arg = a_callbacks_arg; + l_dns_client->addr = a_addr; + + const size_t l_buf_size = 1024; + uint8_t l_buf[l_buf_size]; + dap_dns_buf_t l_dns_request = {}; + l_dns_request.data = (char *)l_buf; + dap_dns_buf_put_uint16(&l_dns_request, rand() % 0xFFFF); // ID + dap_dns_message_flags_t l_flags = {}; + dap_dns_buf_put_uint16(&l_dns_request, l_flags.val); + dap_dns_buf_put_uint16(&l_dns_request, 1); // we have only 1 question + dap_dns_buf_put_uint16(&l_dns_request, 0); + dap_dns_buf_put_uint16(&l_dns_request, 0); + dap_dns_buf_put_uint16(&l_dns_request, 0); + size_t l_ptr = 0; + uint8_t *l_cur = l_buf + l_dns_request.size; + for (size_t i = 0; i <= strlen(a_name); i++) + { + if (a_name[i] == '.' || a_name[i] == 0) + { + *l_cur++ = i - l_ptr; + for( ; l_ptr < i; l_ptr++) + { + *l_cur++ = a_name[l_ptr]; + } + l_ptr++; + } + } + *l_cur++='\0'; + l_dns_request.size = l_cur - l_buf; + dap_dns_buf_put_uint16(&l_dns_request, DNS_RECORD_TYPE_A); + dap_dns_buf_put_uint16(&l_dns_request, DNS_CLASS_TYPE_IN); + + dap_events_socket_callbacks_t l_esocket_callbacks={0}; + + l_esocket_callbacks.delete_callback = s_dns_client_esocket_delete_callback; // Delete client callback + l_esocket_callbacks.read_callback = s_dns_client_esocket_read_callback; // Read function + l_esocket_callbacks.error_callback = s_dns_client_esocket_error_callback; // Error processing function + + dap_events_socket_t * l_esocket = dap_events_socket_create(DESCRIPTOR_TYPE_SOCKET_UDP,&l_esocket_callbacks); + l_esocket->flags |= DAP_SOCK_READY_TO_WRITE; + l_esocket->remote_addr.sin_family = AF_INET; + l_esocket->remote_addr.sin_port = htons(a_port); + l_esocket->remote_addr.sin_addr = a_addr; + + dap_worker_t * l_worker = dap_events_worker_get_auto(); + dap_events_socket_write_unsafe(l_esocket,l_dns_request.data, l_dns_request.size ); + dap_events_socket_assign_on_worker_mt(l_esocket,l_worker); + dap_timerfd_start_on_worker(l_worker,10000,s_dns_client_esocket_timeout_callback,l_esocket); +} + + +/** + * @brief dap_dns_buf_init Initialize DNS parser buffer + * @param buf DNS buffer structure + * @param msg DNS message + * @return none + */ +void dap_dns_buf_init(dap_dns_buf_t *buf, char *msg) +{ + buf->data = msg; + buf->size = 0; +} + +/** + * @brief dap_dns_buf_get_uint16 Get uint16 from network order + * @param buf DNS buffer structure + * @return uint16 in host order + */ +uint16_t dap_dns_buf_get_uint16(dap_dns_buf_t *buf) +{ + char c; + c = buf->data[buf->size++]; + return c << 8 | buf->data[buf->size++]; +} + +/** + * @brief dap_dns_buf_put_uint16 Put uint16 to network order + * @param buf DNS buffer structure + * @param val uint16 in host order + * @return none + */ +void dap_dns_buf_put_uint16(dap_dns_buf_t *buf, uint16_t val) +{ + buf->data[buf->size++] = val >> 8; + buf->data[buf->size++] = val; +} + +/** + * @brief dap_dns_buf_put_uint32 Put uint32 to network order + * @param buf DNS buffer structure + * @param val uint32 in host order + * @return none + */ +void dap_dns_buf_put_uint32(dap_dns_buf_t *buf, uint32_t val) +{ + dap_dns_buf_put_uint16(buf, val >> 16); + dap_dns_buf_put_uint16(buf, val); +} + +/** + * @brief dap_dns_buf_put_uint64 Put uint64 to network order + * @param buf DNS buffer structure + * @param val uint64 in host order + * @return none + */ +void dap_dns_buf_put_uint64(dap_dns_buf_t *buf, uint64_t val) +{ + dap_dns_buf_put_uint32(buf, val >> 32); + dap_dns_buf_put_uint32(buf, val); +} + +/** + * @brief dap_dns_resolve_hostname + * @param str + * @return + */ +dap_chain_node_info_t *dap_dns_resolve_hostname(char *str) +{ + log_it(L_DEBUG, "DNS parser retrieve hostname %s", str); + dap_chain_net_t *l_net = dap_chain_net_by_name(str); + if (l_net == NULL) { + uint16_t l_nets_count; + dap_chain_net_t **l_nets = dap_chain_net_list(&l_nets_count); + if (!l_nets_count) { + log_it(L_WARNING, "No chain network present"); + return 0; + } + l_net = l_nets[rand() % l_nets_count]; + } + // get nodes list from global_db + dap_global_db_obj_t *l_objs = NULL; + size_t l_nodes_count = 0; + // read all node + l_objs = dap_chain_global_db_gr_load(l_net->pub.gdb_nodes, &l_nodes_count); + if (!l_nodes_count || !l_objs) + return 0; + size_t l_node_num = rand() % l_nodes_count; + dap_chain_node_info_t *l_node_info = DAP_NEW_Z(dap_chain_node_info_t); + memcpy(l_node_info, l_objs[l_node_num].value, sizeof(dap_chain_node_info_t)); + dap_chain_global_db_objs_delete(l_objs, l_nodes_count); + log_it(L_DEBUG, "DNS resolver find ip %s", inet_ntoa(l_node_info->hdr.ext_addr_v4)); + return l_node_info; +} diff --git a/modules/net/dap_dns_server.c b/modules/net/dap_chain_node_dns_server.c similarity index 58% rename from modules/net/dap_dns_server.c rename to modules/net/dap_chain_node_dns_server.c index 1121fd50eba2eef831c53090f7a992538d6c2000..ba41f42d229ecedf03da014db09146e15230497d 100644 --- a/modules/net/dap_dns_server.c +++ b/modules/net/dap_chain_node_dns_server.c @@ -23,7 +23,9 @@ */ #include <errno.h> -#include "dap_dns_server.h" +#include "dap_chain_node_dns_client.h" +#include "dap_chain_node_dns_server.h" +#include "dap_events.h" #include "dap_events_socket.h" #include "dap_common.h" #include "dap_chain_net.h" @@ -32,102 +34,12 @@ #include "dap_chain_global_db.h" #include "dap_chain_global_db_remote.h" -#define LOG_TAG "dap_dns_server" +#define LOG_TAG "dap_chain_node_dns_server" #define BUF_SIZE 1024 -#ifndef _WIN32 -#include <unistd.h> // for close -#include <sys/types.h> -#include <sys/socket.h> -#include <arpa/inet.h> -#define closesocket close -#define INVALID_SOCKET -1 -#endif - static dap_dns_server_t *s_dns_server; static char s_root_alias[] = "dnsroot"; -/** - * @brief dap_dns_buf_init Initialize DNS parser buffer - * @param buf DNS buffer structure - * @param msg DNS message - * @return none - */ -void dap_dns_buf_init(dap_dns_buf_t *buf, char *msg) { - buf->data = msg; - buf->ptr = 0; -} - -/** - * @brief dap_dns_buf_get_uint16 Get uint16 from network order - * @param buf DNS buffer structure - * @return uint16 in host order - */ -uint16_t dap_dns_buf_get_uint16(dap_dns_buf_t *buf) { - char c; - c = buf->data[buf->ptr++]; - return c << 8 | buf->data[buf->ptr++]; -} - -/** - * @brief dap_dns_buf_put_uint16 Put uint16 to network order - * @param buf DNS buffer structure - * @param val uint16 in host order - * @return none - */ -void dap_dns_buf_put_uint16(dap_dns_buf_t *buf, uint16_t val) { - buf->data[buf->ptr++] = val >> 8; - buf->data[buf->ptr++] = val; -} - -/** - * @brief dap_dns_buf_put_uint32 Put uint32 to network order - * @param buf DNS buffer structure - * @param val uint32 in host order - * @return none - */ -void dap_dns_buf_put_uint32(dap_dns_buf_t *buf, uint32_t val) { - dap_dns_buf_put_uint16(buf, val >> 16); - dap_dns_buf_put_uint16(buf, val); -} - -/** - * @brief dap_dns_buf_put_uint64 Put uint64 to network order - * @param buf DNS buffer structure - * @param val uint64 in host order - * @return none - */ -void dap_dns_buf_put_uint64(dap_dns_buf_t *buf, uint64_t val) { - dap_dns_buf_put_uint32(buf, val >> 32); - dap_dns_buf_put_uint32(buf, val); -} - -dap_chain_node_info_t *dap_dns_resolve_hostname(char *str) { - log_it(L_DEBUG, "DNS parser retrieve hostname %s", str); - dap_chain_net_t *l_net = dap_chain_net_by_name(str); - if (l_net == NULL) { - uint16_t l_nets_count; - dap_chain_net_t **l_nets = dap_chain_net_list(&l_nets_count); - if (!l_nets_count) { - log_it(L_WARNING, "No chain network present"); - return 0; - } - l_net = l_nets[rand() % l_nets_count]; - } - // get nodes list from global_db - dap_global_db_obj_t *l_objs = NULL; - size_t l_nodes_count = 0; - // read all node - l_objs = dap_chain_global_db_gr_load(l_net->pub.gdb_nodes, &l_nodes_count); - if (!l_nodes_count || !l_objs) - return 0; - size_t l_node_num = rand() % l_nodes_count; - dap_chain_node_info_t *l_node_info = DAP_NEW_Z(dap_chain_node_info_t); - memcpy(l_node_info, l_objs[l_node_num].value, sizeof(dap_chain_node_info_t)); - dap_chain_global_db_objs_delete(l_objs, l_nodes_count); - log_it(L_DEBUG, "DNS resolver find ip %s", inet_ntoa(l_node_info->hdr.ext_addr_v4)); - return l_node_info; -} /** * @brief dap_dns_zone_register Register DNS zone and set callback to handle it @@ -203,16 +115,16 @@ void dap_dns_client_read(dap_events_socket_t *a_es, void *a_arg) { dns_message->data = DAP_NEW_SIZE(char, a_es->buf_in_size + 1); dns_message->data[a_es->buf_in_size] = 0; dap_events_socket_pop_from_buf_in(a_es, dns_message->data, a_es->buf_in_size); - dns_message->ptr = 0; + dns_message->size = 0; // Parse incoming DNS message int block_len = DNS_HEADER_SIZE; dns_reply->data = DAP_NEW_SIZE(char, block_len); - dns_reply->ptr = 0; + dns_reply->size = 0; uint16_t val = dap_dns_buf_get_uint16(dns_message); // ID dap_dns_buf_put_uint16(dns_reply, val); val = dap_dns_buf_get_uint16(dns_message); // Flags - dns_reply->ptr += sizeof(uint16_t); // Put flags later + dns_reply->size += sizeof(uint16_t); // Put flags later dap_dns_message_flags_t msg_flags; msg_flags.val = val; dap_dns_message_flags_bits_t *flags = &msg_flags.flags; @@ -246,14 +158,14 @@ void dap_dns_client_read(dap_events_socket_t *a_es, void *a_arg) { int dot_count = 0; dap_string_t *dns_hostname = dap_string_new(""); for (int i = 0; i < qdcount; i++) { - block_len = strlen(&dns_message->data[dns_message->ptr]) + 1 + 2 * sizeof(uint16_t); - dns_reply->data = DAP_REALLOC(dns_reply->data, dns_reply->ptr + block_len); - memcpy(&dns_reply->data[dns_reply->ptr], &dns_message->data[dns_message->ptr], block_len); - dns_reply->ptr += block_len; + block_len = strlen(&dns_message->data[dns_message->size]) + 1 + 2 * sizeof(uint16_t); + dns_reply->data = DAP_REALLOC(dns_reply->data, dns_reply->size + block_len); + memcpy(&dns_reply->data[dns_reply->size], &dns_message->data[dns_message->size], block_len); + dns_reply->size += block_len; if (flags->rcode) break; - while (dns_message->ptr < dns_reply->ptr - 2 * sizeof(uint16_t)) { - uint8_t len = dns_message->data[dns_message->ptr++]; + while (dns_message->size < dns_reply->size - 2 * sizeof(uint16_t)) { + uint8_t len = dns_message->data[dns_message->size++]; if (len > DNS_MAX_DOMAIN_NAME_LEN) { flags->rcode = DNS_ERROR_NAME; break; @@ -268,8 +180,8 @@ void dap_dns_client_read(dap_events_socket_t *a_es, void *a_arg) { } dap_string_append(dns_hostname, "."); } - dap_string_append_len(dns_hostname, &dns_message->data[dns_message->ptr], len); - dns_message->ptr += len; + dap_string_append_len(dns_hostname, &dns_message->data[dns_message->size], len); + dns_message->size += len; dot_count++; if (dns_hostname->len >= DNS_MAX_HOSTNAME_LEN) { flags->rcode = DNS_ERROR_NAME; @@ -286,8 +198,8 @@ void dap_dns_client_read(dap_events_socket_t *a_es, void *a_arg) { flags->rcode = DNS_ERROR_NOT_SUPPORTED; break; } - if (dns_message->ptr != dns_reply->ptr) { - log_it(L_ERROR, "DNS parser pointer unequal, mptr = %u, rptr = %u", dns_message->ptr, dns_reply->ptr); + if (dns_message->size != dns_reply->size) { + log_it(L_ERROR, "DNS parser pointer unequal, mptr = %u, rptr = %u", dns_message->size, dns_reply->size); } } // Find ip addr @@ -301,7 +213,7 @@ void dap_dns_client_read(dap_events_socket_t *a_es, void *a_arg) { if (l_node_info) { // Compose DNS answer block_len = DNS_ANSWER_SIZE * 2 - sizeof(uint16_t) + sizeof(uint64_t); - dns_reply->data = DAP_REALLOC(dns_reply->data, dns_reply->ptr + block_len); + dns_reply->data = DAP_REALLOC(dns_reply->data, dns_reply->size + block_len); val = 0xc000 | DNS_HEADER_SIZE; // Link to host name dap_dns_buf_put_uint16(dns_reply, val); val = DNS_RECORD_TYPE_A; @@ -334,7 +246,7 @@ void dap_dns_client_read(dap_events_socket_t *a_es, void *a_arg) { dns_reply->data[2] = msg_flags.val >> 8; dns_reply->data[3] = msg_flags.val; // Send DNS reply - dap_events_socket_write_unsafe(a_es, dns_reply->data, dns_reply->ptr); + dap_events_socket_write_unsafe(a_es, dns_reply->data, dns_reply->size); dap_string_free(dns_hostname, true); cleanup: DAP_DELETE(dns_reply->data); @@ -371,115 +283,6 @@ void dap_dns_server_stop() { DAP_DELETE(s_dns_server); } -int dap_dns_client_get_addr(struct in_addr a_addr, uint16_t a_port, char *a_name, dap_chain_node_info_t *a_result) -{ - const size_t l_buf_size = 1024; - uint8_t l_buf[l_buf_size]; - dap_dns_buf_t l_dns_request = {}; - l_dns_request.data = (char *)l_buf; - dap_dns_buf_put_uint16(&l_dns_request, rand() % 0xFFFF); // ID - dap_dns_message_flags_t l_flags = {}; - dap_dns_buf_put_uint16(&l_dns_request, l_flags.val); - dap_dns_buf_put_uint16(&l_dns_request, 1); // we have only 1 question - dap_dns_buf_put_uint16(&l_dns_request, 0); - dap_dns_buf_put_uint16(&l_dns_request, 0); - dap_dns_buf_put_uint16(&l_dns_request, 0); - size_t l_ptr = 0; - uint8_t *l_cur = l_buf + l_dns_request.ptr; - for (size_t i = 0; i <= strlen(a_name); i++) - { - if (a_name[i] == '.' || a_name[i] == 0) - { - *l_cur++ = i - l_ptr; - for( ; l_ptr < i; l_ptr++) - { - *l_cur++ = a_name[l_ptr]; - } - l_ptr++; - } - } - *l_cur++='\0'; - l_dns_request.ptr = l_cur - l_buf; - dap_dns_buf_put_uint16(&l_dns_request, DNS_RECORD_TYPE_A); - dap_dns_buf_put_uint16(&l_dns_request, DNS_CLASS_TYPE_IN); -#ifdef WIN32 - SOCKET l_sock; -#else - int l_sock; -#endif - l_sock = socket(AF_INET, SOCK_DGRAM, 0); - if (l_sock == INVALID_SOCKET) { - log_it(L_ERROR, "Socket error"); - return -1; - } - struct sockaddr_in l_addr; - l_addr.sin_family = AF_INET; - l_addr.sin_port = htons(a_port); - l_addr.sin_addr = a_addr; - int l_portion = 0, l_len = l_dns_request.ptr; - for (int l_sent = 0; l_sent < l_len; l_sent += l_portion) { - l_portion = sendto(l_sock, (const char *)(l_buf + l_sent), l_len - l_sent, 0, (struct sockaddr *)&l_addr, sizeof(l_addr)); - if (l_portion < 0) { - log_it(L_ERROR, "send() function error"); - closesocket(l_sock); - return -2; - } - } - fd_set fd; - FD_ZERO(&fd); - FD_SET(l_sock, &fd); - struct timeval tv; - tv.tv_sec = 5; - tv.tv_usec = 0; -#ifdef WIN32 - int l_selected = select(1, &fd, NULL, NULL, &tv); -#else - int l_selected = select(l_sock + 1, &fd, NULL, NULL, &tv); -#endif - if (l_selected < 0) - { - log_it(L_WARNING, "select() error"); - closesocket(l_sock); - return -3; - } - if (l_selected == 0) - { - log_it(L_DEBUG, "select() timeout"); - closesocket(l_sock); - return -4; - } - struct sockaddr_in l_clientaddr; - socklen_t l_clientlen = sizeof(l_clientaddr); - size_t l_recieved = recvfrom(l_sock, (char *)l_buf, l_buf_size, 0, (struct sockaddr *)&l_clientaddr, &l_clientlen); - size_t l_addr_point = DNS_HEADER_SIZE + strlen(a_name) + 2 + 2 * sizeof(uint16_t) + DNS_ANSWER_SIZE - sizeof(uint32_t); - if (l_recieved < l_addr_point + sizeof(uint32_t)) { - log_it(L_WARNING, "DNS answer incomplete"); - closesocket(l_sock); - return -5; - } - l_cur = l_buf + 3 * sizeof(uint16_t); - int l_answers_count = ntohs(*(uint16_t *)l_cur); - if (l_answers_count != 1) { - log_it(L_WARNING, "Incorrect DNS answer format"); - closesocket(l_sock); - return -6; - } - l_cur = l_buf + l_addr_point; - if (a_result) { - a_result->hdr.ext_addr_v4.s_addr = ntohl(*(uint32_t *)l_cur); - } - l_cur = l_buf + 5 * sizeof(uint16_t); - int l_additions_count = ntohs(*(uint16_t *)l_cur); - if (l_additions_count == 1) { - l_cur = l_buf + l_addr_point + DNS_ANSWER_SIZE; - if (a_result) { - a_result->hdr.ext_port = ntohs(*(uint16_t *)l_cur); - } - l_cur += sizeof(uint16_t); - if (a_result) { - a_result->hdr.address.uint64 = be64toh(*(uint64_t *)l_cur); - } - } - closesocket(l_sock); - return 0; -} + + + diff --git a/modules/net/include/dap_chain_node.h b/modules/net/include/dap_chain_node.h index 0277aefb71f3b59b11f3e64c2ad20cb77556bdd0..4bfc7f23c1f150d29d4afb76aaa283bb2131f1f9 100644 --- a/modules/net/include/dap_chain_node.h +++ b/modules/net/include/dap_chain_node.h @@ -40,6 +40,9 @@ #include "dap_common.h" +#include "dap_worker.h" +#include "dap_events_socket.h" + #include "dap_chain_common.h" #include "dap_chain_global_db.h" #include "dap_chain.h" @@ -146,3 +149,4 @@ bool dap_chain_node_mempool_autoproc_init(); void dap_chain_node_mempool_autoproc_deinit(); void dap_chain_node_mempool_autoproc_notify(void *a_arg, const char a_op_code, const char *a_prefix, const char *a_group, const char *a_key, const void *a_value, const size_t a_value_len); + diff --git a/modules/net/include/dap_chain_node_cli.h b/modules/net/include/dap_chain_node_cli.h index f67aca3a3f0b0f945829720de690ea31e12d15a3..e98b1a1622786a4bb173469e0d8faf1354711cfe 100644 --- a/modules/net/include/dap_chain_node_cli.h +++ b/modules/net/include/dap_chain_node_cli.h @@ -29,14 +29,7 @@ #include "dap_config.h" #include "uthash.h" -#ifndef _WIN32 -#include <unistd.h> // for close -#define closesocket close -typedef int SOCKET; -#define SOCKET_ERROR -1 // for win32 = (-1) -#define INVALID_SOCKET -1 // for win32 = (SOCKET)(~0) -#endif - +#include "dap_events_socket.h" typedef int cmdfunc_t(int argc, char ** argv, void *arg_func, char **str_reply); diff --git a/modules/net/include/dap_chain_node_dns_client.h b/modules/net/include/dap_chain_node_dns_client.h new file mode 100644 index 0000000000000000000000000000000000000000..d80ff8f933c8f5dee3912e8dd29fd57bfbcaa1b7 --- /dev/null +++ b/modules/net/include/dap_chain_node_dns_client.h @@ -0,0 +1,51 @@ +/* + * Authors: + * Roman Khlopkov <roman.khlopkov@demlabs.net> + * Dmitriy Gerasimov <dmitriy.gerasmiov@demlabs.net> + * DeM Labs Ltd https://demlabs.net + * DeM Labs Open source community https://gitlab.demlabs.net + * Copyright (c) 2021 + * All rights reserved. + + This file is part of DapChain SDK the open source project + + DapChain SDK is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DapChain SDK is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with any DapChain SDK based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +#include <stdint.h> +#include "dap_worker.h" +#include "dap_chain_node.h" + +typedef struct _dap_dns_buf_t { + char *data; + uint32_t size; +} dap_dns_buf_t; + +// node info request callbacks +typedef void (*dap_dns_client_node_info_request_success_callback_t) (dap_worker_t * a_worker, dap_chain_node_info_t * , void *); +typedef void (*dap_dns_client_node_info_request_error_callback_t) (dap_worker_t * a_worker, dap_chain_node_info_t * , void *, int); + +void dap_chain_node_info_dns_request(struct in_addr a_addr, uint16_t a_port, char *a_name, dap_chain_node_info_t *a_result, + dap_dns_client_node_info_request_success_callback_t a_callback_success, + dap_dns_client_node_info_request_error_callback_t a_callback_error,void * a_callback_arg); + +dap_chain_node_info_t *dap_dns_resolve_hostname(char *str); + +void dap_dns_buf_init(dap_dns_buf_t *buf, char *msg); +void dap_dns_buf_put_uint64(dap_dns_buf_t *buf, uint64_t val); +void dap_dns_buf_put_uint32(dap_dns_buf_t *buf, uint32_t val); +void dap_dns_buf_put_uint16(dap_dns_buf_t *buf, uint16_t val); +uint16_t dap_dns_buf_get_uint16(dap_dns_buf_t *buf); diff --git a/modules/net/include/dap_dns_server.h b/modules/net/include/dap_chain_node_dns_server.h similarity index 88% rename from modules/net/include/dap_dns_server.h rename to modules/net/include/dap_chain_node_dns_server.h index 28c58842d17a6bfec03c702b61469da288b1094b..b58c09d47ad900c2486f0429ecf39fbf79fea102 100644 --- a/modules/net/include/dap_dns_server.h +++ b/modules/net/include/dap_chain_node_dns_server.h @@ -1,25 +1,26 @@ /* * Authors: * Roman Khlopkov <roman.khlopkov@demlabs.net> - * DeM Labs Inc. https://demlabs.net + * Dmitriy Gerasimov <dmitriy.gerasmiov@demlabs.net> + * DeM Labs Ltd https://demlabs.net * DeM Labs Open source community https://gitlab.demlabs.net - * Copyright (c) 2017-2020 + * Copyright (c) 2021 * All rights reserved. - This file is part of DAP (Deus Applications Prototypes) the open source project + This file is part of DapChain SDK the open source project - DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + DapChain SDK is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. - DAP is distributed in the hope that it will be useful, + DapChain SDK is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License - along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. + along with any DapChain SDK based project. If not, see <http://www.gnu.org/licenses/>. */ #pragma once @@ -28,8 +29,8 @@ #include <pthread.h> #endif #include "dap_server.h" -#include "uthash.h" #include "dap_chain_node.h" +#include "uthash.h" #define DNS_LISTEN_PORT 53 // UDP #define DNS_TIME_TO_LIVE 600 // Seconds @@ -96,17 +97,13 @@ typedef struct _dap_dns_message_flags_bits_t { int qr : 1; // 0 - query, 1 - response } dap_dns_message_flags_bits_t; -typedef dap_chain_node_info_t *(*dap_dns_zone_callback_t) (char *hostname); // Callback for DNS zone operations typedef union _dap_dns_message_flags_t { dap_dns_message_flags_bits_t flags; int val; } dap_dns_message_flags_t; -typedef struct _dap_dns_buf_t { - char *data; - uint32_t ptr; -} dap_dns_buf_t; +typedef dap_chain_node_info_t *(*dap_dns_zone_callback_t) (char *hostname); // Callback for DNS zone operations typedef struct _dap_dns_zone_hash_t { char *zone; @@ -119,8 +116,10 @@ typedef struct _dap_dns_server_t { dap_dns_zone_hash_t *hash_table; } dap_dns_server_t; + + void dap_dns_server_start(dap_events_t *a_ev, uint16_t a_port); void dap_dns_server_stop(); int dap_dns_zone_register(char *zone, dap_dns_zone_callback_t callback); int dap_dns_zone_unregister(char *zone); -int dap_dns_client_get_addr(struct in_addr a_addr, uint16_t a_port, char *a_name, dap_chain_node_info_t *a_result); +