diff --git a/CMakeLists.txt b/CMakeLists.txt index a97234a90b29f5410e369dbd8dcd605997c6d5f1..40330f339a6c485bfe21d38e1ed4e91c22957826 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 2.8) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.6-0") +set(CELLFRAME_SDK_NATIVE_VERSION "2.6-1") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") diff --git a/dap-sdk/core/include/dap_common.h b/dap-sdk/core/include/dap_common.h index ea2a41ab94901868dd8158c9735d2825258a9821..3b58cda9e5d68d7ae879194037bd8b88d4e60c32 100755 --- a/dap-sdk/core/include/dap_common.h +++ b/dap-sdk/core/include/dap_common.h @@ -160,7 +160,13 @@ DAP_STATIC_INLINE void _dap_aligned_free( void *ptr ) DAP_FREE( base_ptr ); } -#define DAP_PROTOCOL_VERSION 22 +/* + * 23: added support for encryption key type parameter and option to encrypt headers +*/ +#define DAP_PROTOCOL_VERSION 23 +#define DAP_PROTOCOL_VERSION_DEFAULT 22 // used if version is not explicitly specified + +#define DAP_CLIENT_PROTOCOL_VERSION 23 #if __SIZEOF_LONG__==8 #define DAP_UINT64_FORMAT_X "lX" diff --git a/dap-sdk/crypto/include/dap_enc_key.h b/dap-sdk/crypto/include/dap_enc_key.h index 7ef187742ec71e45813da67a55d286cddee647b0..42711803e73cfa8bc21d79d8a920723224cbeb79 100755 --- a/dap-sdk/crypto/include/dap_enc_key.h +++ b/dap-sdk/crypto/include/dap_enc_key.h @@ -43,7 +43,7 @@ typedef enum dap_enc_data_type{DAP_ENC_DATA_TYPE_RAW, typedef enum dap_enc_key_type{ - + DAP_ENC_KEY_TYPE_INVALID = -1, DAP_ENC_KEY_TYPE_IAES, // Symmetric AES DAP_ENC_KEY_TYPE_OAES,// from https://github.com/monero-project/monero/tree/master/src/crypto @@ -123,7 +123,8 @@ typedef enum dap_enc_key_type{ DAP_ENC_KEY_TYPE_SIG_RINGCT20,//ring signature for confidentional transaction - DAP_ENC_KEY_TYPE_NULL = 0 + DAP_ENC_KEY_TYPE_LAST = DAP_ENC_KEY_TYPE_SIG_RINGCT20, + DAP_ENC_KEY_TYPE_NULL = 0 // avoid using it: 0 is a DAP_ENC_KEY_TYPE_NULL and DAP_ENC_KEY_TYPE_IAES at the same time } dap_enc_key_type_t; @@ -244,6 +245,7 @@ int dap_enc_key_init(void); void dap_enc_key_deinit(void); const char *dap_enc_get_type_name(dap_enc_key_type_t a_key_type); +dap_enc_key_type_t dap_enc_key_type_find_by_name(const char * a_name); size_t dap_enc_key_get_enc_size(dap_enc_key_t * a_key, const size_t buf_in_size); size_t dap_enc_key_get_dec_size(dap_enc_key_t * a_key, const size_t buf_in_size); diff --git a/dap-sdk/crypto/src/dap_enc_key.c b/dap-sdk/crypto/src/dap_enc_key.c index 95b9c89200f6f94124a1657e9cc882b54fd6bebf..d105cb5c14ecb487c84e2b4753fb3352a5bf0b3c 100755 --- a/dap-sdk/crypto/src/dap_enc_key.c +++ b/dap-sdk/crypto/src/dap_enc_key.c @@ -639,18 +639,25 @@ dap_enc_key_serealize_t* dap_enc_key_serealize(dap_enc_key_t * key) */ dap_enc_key_t* dap_enc_key_dup(dap_enc_key_t * a_key) { - dap_enc_key_t * l_ret = DAP_NEW_S_SIZE(dap_enc_key_t,sizeof(*l_ret) ); - memcpy(l_ret,a_key,sizeof (*a_key)); - - l_ret->priv_key_data = DAP_NEW_Z_SIZE(byte_t, l_ret->priv_key_data_size); - memcpy(l_ret->priv_key_data, a_key->priv_key_data, a_key->priv_key_data_size); - l_ret->pub_key_data = DAP_NEW_Z_SIZE(byte_t, a_key->pub_key_data_size); - memcpy(l_ret->pub_key_data, a_key->pub_key_data, a_key->pub_key_data_size); + if (!a_key || a_key->type == DAP_ENC_KEY_TYPE_INVALID) { + return NULL; + } + dap_enc_key_t *l_ret = dap_enc_key_new(a_key->type); + if (l_ret->priv_key_data_size) { + l_ret->priv_key_data = DAP_NEW_Z_SIZE(byte_t, a_key->priv_key_data_size); + l_ret->priv_key_data_size = a_key->priv_key_data_size; + memcpy(l_ret->priv_key_data, a_key->priv_key_data, a_key->priv_key_data_size); + } + if (a_key->pub_key_data_size) { + l_ret->pub_key_data = DAP_NEW_Z_SIZE(byte_t, a_key->pub_key_data_size); + l_ret->pub_key_data_size = a_key->pub_key_data_size; + memcpy(l_ret->pub_key_data, a_key->pub_key_data, a_key->pub_key_data_size); + } if(a_key->_inheritor_size) { - l_ret->_inheritor = DAP_NEW_Z_SIZE(byte_t, a_key->_inheritor_size ); + l_ret->_inheritor = DAP_NEW_Z_SIZE(byte_t, a_key->_inheritor_size); + l_ret->_inheritor_size = a_key->_inheritor_size; memcpy(l_ret->_inheritor, a_key->_inheritor, a_key->_inheritor_size); } - return l_ret; } @@ -837,7 +844,18 @@ const char *dap_enc_get_type_name(dap_enc_key_type_t a_key_type) if(s_callbacks[a_key_type].name) { return s_callbacks[a_key_type].name; } - log_it(L_ERROR, "name not realize for current key type"); + log_it(L_WARNING, "name was not set for key type %d", a_key_type); return 0; } + +dap_enc_key_type_t dap_enc_key_type_find_by_name(const char * a_name){ + for(dap_enc_key_type_t i = 0; i <= DAP_ENC_KEY_TYPE_LAST; i++){ + const char * l_current_key_name = dap_enc_get_type_name(i); + if(l_current_key_name && !strcmp(a_name, l_current_key_name)) + return i; + } + log_it(L_WARNING, "no key type with name %s", a_name); + return DAP_ENC_KEY_TYPE_INVALID; +} + diff --git a/dap-sdk/net/client/dap_client.c b/dap-sdk/net/client/dap_client.c index d977b219ddadbed22628cee1e67eeeb78bdbe1bb..b164bab85779d8dda73f833c52a7384dd49b14cf 100644 --- a/dap-sdk/net/client/dap_client.c +++ b/dap-sdk/net/client/dap_client.c @@ -459,9 +459,8 @@ dap_stream_ch_t * dap_client_get_stream_ch(dap_client_t * a_client, uint8_t a_ch { dap_stream_ch_t * l_ch = NULL; dap_client_pvt_t * l_client_internal = a_client ? DAP_CLIENT_PVT(a_client) : NULL; - if(l_client_internal && l_client_internal->stream) + if(l_client_internal && l_client_internal->stream && l_client_internal->stream_es) for(int i = 0; i < l_client_internal->stream->channel_count; i++) { - dap_stream_ch_proc_t *l_ch_id = l_client_internal->stream->channel[i]->proc; if(l_client_internal->stream->channel[i]->proc->id == a_ch_id) { l_ch = l_client_internal->stream->channel[i]; break; diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c index 59a52c01f98d95256170fcc41b950340ddafb383..9a97bc1e8fba20e44a7fe936f492e43459c0f360 100644 --- a/dap-sdk/net/client/dap_client_pvt.c +++ b/dap-sdk/net/client/dap_client_pvt.c @@ -79,6 +79,8 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_internal); +const static dap_enc_key_type_t s_dap_client_pvt_preferred_encryption_type = DAP_ENC_KEY_TYPE_IAES; + // ENC stage callbacks void m_enc_init_response(dap_client_t *, void *, size_t); void m_enc_init_error(dap_client_t *, int); @@ -324,18 +326,23 @@ static void dap_client_pvt_delete_in(dap_client_pvt_t * a_client_pvt) if(a_client_pvt->session_key_id) DAP_DELETE(a_client_pvt->session_key_id); + a_client_pvt->session_key_id = NULL; if(a_client_pvt->active_channels) DAP_DELETE(a_client_pvt->active_channels); + a_client_pvt->active_channels = NULL; if(a_client_pvt->session_key) dap_enc_key_delete(a_client_pvt->session_key); + a_client_pvt->session_key = NULL; if(a_client_pvt->session_key_open) dap_enc_key_delete(a_client_pvt->session_key_open); + a_client_pvt->session_key_open = NULL; if(a_client_pvt->stream_key) dap_enc_key_delete(a_client_pvt->stream_key); + a_client_pvt->stream_key = NULL; //a_client_pvt->client = NULL; // DAP_DELETE(a_client_pvt); @@ -397,7 +404,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) case STAGE_STATUS_IN_PROGRESS: { switch (a_client_pvt->stage) { case STAGE_ENC_INIT: { - log_it(L_INFO, "Go to stage ENC: prepare the request"); + log_it(L_INFO, "Go to stage ENC: prepare the request"); a_client_pvt->session_key_open = dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_MSRLN, NULL, 0, NULL, 0, 0); if (!a_client_pvt->session_key_open) { log_it(L_ERROR, "Insufficient memory! May be a huge memory leak present"); @@ -438,7 +445,17 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) log_it(L_DEBUG, "STREAM_CTL request size %u", strlen(l_request)); char *l_suburl; - l_suburl = dap_strdup_printf("stream_ctl,channels=%s", a_client_pvt->active_channels); + + uint32_t l_least_common_dap_protocol = min(a_client_pvt->remote_protocol_version, + a_client_pvt->uplink_protocol_version); + + if(l_least_common_dap_protocol < 23){ + l_suburl = dap_strdup_printf("stream_ctl,channels=%s", + a_client_pvt->active_channels); + }else{ + l_suburl = dap_strdup_printf("stream_ctl,channels=%s,enc_type=%d,enc_headers=%d", + a_client_pvt->active_channels,dap_stream_get_preferred_encryption_type(),0); + } // dap_client_pvt_request_enc(a_client_pvt, DAP_UPLINK_PATH_STREAM_CTL, @@ -457,7 +474,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) a_client_pvt->stage_status = STAGE_STATUS_ERROR; break; } -#ifdef _WIN32 +#ifdef _WIN32 { int buffsize = 65536; int optsize = sizeof( int ); @@ -595,7 +612,11 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) log_it(L_ERROR, "Error state, doing callback if present"); if(a_client_pvt->stage_status_error_callback) { //dap_client_pvt_ref(a_client_pvt); - a_client_pvt->stage_status_error_callback(a_client_pvt->client, (void*)l_is_last_attempt); + if(a_client_pvt == a_client_pvt->client->_internal) + a_client_pvt->stage_status_error_callback(a_client_pvt->client, (void*) l_is_last_attempt); + else { + log_it(L_ERROR, "client_pvt->client=%x corrupted", a_client_pvt->client->_internal); + } //dap_client_pvt_unref(a_client_pvt); // Expecting that its one-shot callback //a_client_internal->stage_status_error_callback = NULL; @@ -954,13 +975,22 @@ void m_enc_init_response(dap_client_t * a_client, void * a_response, size_t a_re json_parse_count++; } } + if(json_object_get_type(val) == json_type_int) { + int val_int = (uint32_t)json_object_get_int(val); + if(!strcmp(key, "dap_protocol_version")) { + l_client_pvt->remote_protocol_version = val_int; + json_parse_count++; + } + } } // free jobj json_object_put(jobj); + if(!l_client_pvt->remote_protocol_version) + l_client_pvt->remote_protocol_version = DAP_PROTOCOL_VERSION_DEFAULT; } //char l_session_id_b64[DAP_ENC_BASE64_ENCODE_SIZE(DAP_ENC_KS_KEY_ID_SIZE) + 1] = { 0 }; //char *l_bob_message_b64 = DAP_NEW_Z_SIZE(char, a_response_size - sizeof(l_session_id_b64) + 1); - if(json_parse_count == 2) { //if (sscanf (a_response,"%s %s",l_session_id_b64, l_bob_message_b64) == 2 ){ + if(json_parse_count >= 2 && json_parse_count <=3) { //if (sscanf (a_response,"%s %s",l_session_id_b64, l_bob_message_b64) == 2 ){ l_client_pvt->session_key_id = DAP_NEW_Z_SIZE(char, strlen(l_session_id_b64) + 1); dap_enc_base64_decode(l_session_id_b64, strlen(l_session_id_b64), l_client_pvt->session_key_id, DAP_ENC_DATA_TYPE_B64); @@ -1059,14 +1089,14 @@ void m_stream_ctl_response(dap_client_t * a_client, void * a_data, size_t a_data s_stage_status_after(l_client_internal); } else { int l_arg_count; - char l_stream_id[25] = { 0 }; + char l_stream_id[26] = { 0 }; char *l_stream_key = DAP_NEW_Z_SIZE(char, 4096 * 3); - void * l_stream_key_raw = DAP_NEW_Z_SIZE(char, 4096); - size_t l_stream_key_raw_size = 0; uint32_t l_remote_protocol_version; + dap_enc_key_type_t l_enc_type = DAP_ENC_KEY_TYPE_OAES; + int l_enc_headers = 0; - l_arg_count = sscanf(l_response_str, "%25s %4096s %u" - , l_stream_id, l_stream_key, &l_remote_protocol_version); + l_arg_count = sscanf(l_response_str, "%25s %4096s %u %d %d" + , l_stream_id, l_stream_key, &l_remote_protocol_version, &l_enc_type, &l_enc_headers); if(l_arg_count < 2) { log_it(L_WARNING, "STREAM_CTL Need at least 2 arguments in reply (got %d)", l_arg_count); l_client_internal->last_error = ERROR_STREAM_CTL_ERROR_RESPONSE_FORMAT; @@ -1078,8 +1108,8 @@ void m_stream_ctl_response(dap_client_t * a_client, void * a_data, size_t a_data l_client_internal->uplink_protocol_version = l_remote_protocol_version; log_it(L_DEBUG, "Uplink protocol version %u", l_remote_protocol_version); } else - log_it(L_WARNING, "No uplink protocol version, use the default version %d" - , l_client_internal->uplink_protocol_version = DAP_PROTOCOL_VERSION); + log_it(L_WARNING, "No uplink protocol version, use legacy version %d" + , l_client_internal->uplink_protocol_version = 22); if(strlen(l_stream_id) < 13) { //log_it(L_DEBUG, "Stream server id %s, stream key length(base64 encoded) %u" @@ -1087,17 +1117,17 @@ void m_stream_ctl_response(dap_client_t * a_client, void * a_data, size_t a_data log_it(L_DEBUG, "Stream server id %s, stream key '%s'" , l_stream_id, l_stream_key); - //l_stream_key_raw_size = dap_enc_base64_decode(l_stream_key,strlen(l_stream_key), - // l_stream_key_raw,DAP_ENC_DATA_TYPE_B64); // Delete old key if present if(l_client_internal->stream_key) dap_enc_key_delete(l_client_internal->stream_key); strncpy(l_client_internal->stream_id, l_stream_id, sizeof(l_client_internal->stream_id) - 1); l_client_internal->stream_key = - dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_OAES, l_stream_key, strlen(l_stream_key), NULL, 0, + dap_enc_key_new_generate(l_enc_type, l_stream_key, strlen(l_stream_key), NULL, 0, 32); + l_client_internal->encrypted_headers = l_enc_headers; + if(l_client_internal->stage == STAGE_STREAM_CTL) { // We are on the right stage l_client_internal->stage_status = STAGE_STATUS_DONE; s_stage_status_after(l_client_internal); @@ -1115,7 +1145,6 @@ void m_stream_ctl_response(dap_client_t * a_client, void * a_data, size_t a_data } DAP_DELETE(l_stream_key); - DAP_DELETE(l_stream_key_raw); } } @@ -1216,7 +1245,11 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg) log_it(L_DEBUG, "client_pvt=0x%x", l_client_pvt); if (l_client_pvt->stage_status_error_callback) { - l_client_pvt->stage_status_error_callback(l_client_pvt->client, (void *)true); + if(l_client_pvt == l_client_pvt->client->_internal) + l_client_pvt->stage_status_error_callback(l_client_pvt->client, (void *) true); + else { + log_it(L_ERROR, "client_pvt->client=%x corrupted", l_client_pvt->client->_internal); + } } dap_stream_delete(l_client_pvt->stream); l_client_pvt->stream = NULL; @@ -1329,4 +1362,3 @@ void m_es_stream_error(dap_events_socket_t * a_es, int a_arg) } log_it(L_INFO, "m_es_stream_error: code %d", a_arg); } - diff --git a/dap-sdk/net/client/include/dap_client.h b/dap-sdk/net/client/include/dap_client.h index ff2ec55fda9f9da9f5c6bb2226d49bf4b915af18..b18b329d9500dd5591081722a8b5f0f2d296dee0 100644 --- a/dap-sdk/net/client/include/dap_client.h +++ b/dap-sdk/net/client/include/dap_client.h @@ -66,8 +66,6 @@ typedef enum dap_client_error { ERROR_NETWORK_CONNECTION_TIMEOUT } dap_client_error_t; -#define DAP_CLIENT_PROTOCOL_VERSION 22 - /** * @brief The dap_client struct */ @@ -140,8 +138,6 @@ void dap_client_set_auth_cert(dap_client_t * a_client, dap_cert_t *a_cert); dap_client_stage_t dap_client_get_stage(dap_client_t * a_client); dap_client_stage_status_t dap_client_get_stage_status(dap_client_t * a_client); -#define DAP_CLIENT(a) (a ? (dap_client_t *) (a)->_inheritor : NULL) - #ifdef __cplusplus } #endif diff --git a/dap-sdk/net/client/include/dap_client_pvt.h b/dap-sdk/net/client/include/dap_client_pvt.h index 6966a2cf5f3bef2125ef8189d035df71dccbf556..c3c97473ebd5328b4a586cd05929e844383db086 100644 --- a/dap-sdk/net/client/include/dap_client_pvt.h +++ b/dap-sdk/net/client/include/dap_client_pvt.h @@ -60,7 +60,7 @@ typedef struct dap_client_internal char * active_channels; uint16_t uplink_port; uint32_t uplink_protocol_version; - + uint32_t remote_protocol_version; dap_client_stage_t stage_target; dap_client_callback_t stage_target_done_callback; @@ -77,6 +77,7 @@ typedef struct dap_client_internal int connect_attempt; bool is_encrypted; + bool encrypted_headers; bool is_reconnect; bool is_close_session;// the last request in session, in the header will be added "SessionCloseAfterRequest: true" dap_client_callback_data_size_t request_response_callback; diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c index 542be22e8779cae1cdafda25d0a92702b4b987ca..8f4f0175040fbdcce85eb79b713e99fa860fc82c 100644 --- a/dap-sdk/net/core/dap_events_socket.c +++ b/dap-sdk/net/core/dap_events_socket.c @@ -30,6 +30,7 @@ #include <errno.h> #ifndef _WIN32 #include <sys/epoll.h> +#include <sys/select.h> #include <unistd.h> #include <fcntl.h> #else @@ -39,10 +40,11 @@ #include <ws2tcpip.h> #include <io.h> #include "wepoll.h" -#include <pthread.h> #endif +#include <pthread.h> #include "dap_common.h" +#include "dap_list.h" #include "dap_worker.h" #include "dap_events.h" @@ -187,6 +189,7 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c int l_pipe[2]; int l_errno; char l_errbuf[128]; + l_errbuf[0]=0; if( pipe(l_pipe) < 0 ){ l_errno = errno; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); @@ -253,6 +256,7 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc int l_pipe[2]; int l_errno; char l_errbuf[128]; + l_errbuf[0]=0; if( pipe2(l_pipe,O_DIRECT | O_NONBLOCK ) < 0 ){ l_errno = errno; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); @@ -290,7 +294,8 @@ dap_events_socket_t * s_create_type_queue_ptr(dap_worker_t * a_w, dap_events_soc l_es->mqd = mq_open(l_mq_name,O_CREAT|O_RDWR,S_IRWXU, &l_mq_attr); if (l_es->mqd == -1 ){ int l_errno = errno; - char l_errbuf[128]={0}; + char l_errbuf[128]; + l_errbuf[0]=0; strerror_r(l_errno,l_errbuf,sizeof (l_errbuf) ); DAP_DELETE(l_es); l_es = NULL; @@ -357,7 +362,8 @@ int dap_events_socket_queue_proc_input_unsafe(dap_events_socket_t * a_esocket) ssize_t l_ret = mq_timedreceive(a_esocket->mqd,(char*) &l_queue_ptr, sizeof (l_queue_ptr),NULL,&s_timeout ); if (l_ret == -1){ int l_errno = errno; - char l_errbuf[128]={0}; + char l_errbuf[128]; + l_errbuf[0]=0; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); log_it(L_ERROR, "Error in esocket queue_ptr:\"%s\" code %d", l_errbuf, l_errno); return -1; @@ -398,6 +404,7 @@ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_ if((l_es->fd = eventfd(0,0) ) < 0 ){ int l_errno = errno; char l_errbuf[128]; + l_errbuf[0]=0; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); switch (l_errno) { case EINVAL: log_it(L_CRITICAL, "An unsupported value was specified in flags: \"%s\" (%d)", l_errbuf, l_errno); break; @@ -459,6 +466,7 @@ void dap_events_socket_event_proc_input_unsafe(dap_events_socket_t *a_esocket) }else if ( (errno != EAGAIN) && (errno != EWOULDBLOCK) ){ // we use blocked socket for now but who knows... int l_errno = errno; char l_errbuf[128]; + l_errbuf[0]=0; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); log_it(L_WARNING, "Can't read packet from event fd: \"%s\"(%d)", l_errbuf, l_errno); }else @@ -470,6 +478,90 @@ void dap_events_socket_event_proc_input_unsafe(dap_events_socket_t *a_esocket) log_it(L_ERROR, "Queue socket %d accepted data but callback is NULL ", a_esocket->socket); } + +typedef struct dap_events_socket_buf_item +{ + dap_events_socket_t * es; + void *arg; +} dap_events_socket_buf_item_t; + +int dap_events_socket_queue_ptr_send(dap_events_socket_t * a_es, void* a_arg); + +/** + * Waits on the socket + * return 0: timeout, 1: may send data, -1 error + */ +static int wait_send_socket(int a_sockfd, long timeout_ms) +{ + struct timeval l_tv; + fd_set l_outfd, l_errfd; + + l_tv.tv_sec = timeout_ms / 1000; + l_tv.tv_usec = (timeout_ms % 1000) * 1000; + + FD_ZERO(&l_outfd); + FD_ZERO(&l_errfd); + FD_SET(a_sockfd, &l_errfd); + FD_SET(a_sockfd, &l_outfd); + + while(1) { +#ifdef DAP_OS_WINDOWS + int l_res = select(1, NULL, &l_outfd, &l_errfd, &l_tv); +#else + int l_res = select(a_sockfd + 1, NULL, &l_outfd, &l_errfd, &l_tv); +#endif + if(l_res == 0){ + log_it(L_DEBUG, "socket %d timed out", a_sockfd); + } + if(l_res == -1) { + if(errno == EINTR) + continue; + log_it(L_DEBUG, "socket %d waiting errno=%d", errno); + return l_res; + } + break; + }; + + if(FD_ISSET(a_sockfd, &l_outfd)) + return 1; + + return -1; +} + +/** + * @brief dap_events_socket_buf_thread + * @param arg + * @return + */ +void *dap_events_socket_buf_thread(void *arg) +{ + dap_events_socket_buf_item_t *l_item = (dap_events_socket_buf_item_t*) arg; + if(!l_item) { + pthread_exit(0); + } + int l_res = 0; + //int l_count = 0; + //while(l_res < 1 && l_count < 3) { + // wait max 5 min + l_res = wait_send_socket(l_item->es->fd2, 300000); + // l_count++; + //} + // if timeout or + if(l_res >= 0) + dap_events_socket_queue_ptr_send(l_item->es, l_item->arg); + DAP_DELETE(l_item); + pthread_exit(0); +} + +static void add_ptr_to_buf(dap_events_socket_t * a_es, void* a_arg) +{ + dap_events_socket_buf_item_t *l_item = DAP_NEW(dap_events_socket_buf_item_t); + l_item->es = a_es; + l_item->arg = a_arg; + pthread_t l_thread; + pthread_create(&l_thread, NULL, dap_events_socket_buf_thread, l_item); +} + /** * @brief dap_events_socket_send_event * @param a_es @@ -484,8 +576,10 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg) return 0; else{ char l_errbuf[128]; - strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); - log_it(L_ERROR, "Can't send ptr to queue:\"%s\" code %d", l_errbuf, l_errno); + log_it(L_ERROR, "Can't send ptr to queue:\"%s\" code %d", strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)), l_errno); + // Try again + if(l_errno == EAGAIN) + add_ptr_to_buf(a_es, a_arg); return l_errno; } #elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX) @@ -623,6 +717,7 @@ void dap_events_socket_set_readable_unsafe( dap_events_socket_t *sc, bool is_rea if ( epoll_ctl(sc->worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &sc->ev) == -1 ){ int l_errno = errno; char l_errbuf[128]; + l_errbuf[0]=0; strerror_r( l_errno, l_errbuf, sizeof (l_errbuf)); log_it( L_ERROR,"Can't update read client socket state in the epoll_fd: \"%s\" (%d)", l_errbuf, l_errno ); } @@ -659,6 +754,7 @@ void dap_events_socket_set_writable_unsafe( dap_events_socket_t *sc, bool a_is_r if ( epoll_ctl(sc->worker->epoll_fd, EPOLL_CTL_MOD, sc->socket, &sc->ev) ){ int l_errno = errno; char l_errbuf[128]; + l_errbuf[0]=0; strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)); log_it(L_ERROR,"Can't update write client socket state in the epoll_fd %d: \"%s\" (%d)", sc->worker->epoll_fd, l_errbuf, l_errno); diff --git a/dap-sdk/net/core/dap_worker.c b/dap-sdk/net/core/dap_worker.c index ea14d3a1feb2c0e2df2dcf48797a764972f2d5b4..6a2081799ab911a6521789ccc5478ea2be0922ec 100644 --- a/dap-sdk/net/core/dap_worker.c +++ b/dap-sdk/net/core/dap_worker.c @@ -134,6 +134,9 @@ void *dap_worker_thread(void *arg) //if(!(events[n].events & EPOLLIN)) //cur->no_close = false; if (l_sock_err) { + dap_events_socket_set_readable_unsafe(l_cur, false); + dap_events_socket_set_writable_unsafe(l_cur, false); + l_cur->buf_out_size = 0; l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; log_it(L_INFO, "Socket shutdown (EPOLLHUP): %s", strerror(l_sock_err)); } @@ -150,6 +153,9 @@ void *dap_worker_thread(void *arg) log_it(L_ERROR, "Socket error: %s", strerror(l_sock_err)); default: ; } + dap_events_socket_set_readable_unsafe(l_cur, false); + dap_events_socket_set_writable_unsafe(l_cur, false); + l_cur->buf_out_size = 0; l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; l_cur->callbacks.error_callback(l_cur, 0); // Call callback to process error event } @@ -157,7 +163,10 @@ void *dap_worker_thread(void *arg) if (l_epoll_events[n].events & EPOLLRDHUP) { log_it(L_INFO, "Client socket disconnected"); dap_events_socket_set_readable_unsafe(l_cur, false); + dap_events_socket_set_writable_unsafe(l_cur, false); + l_cur->buf_out_size = 0; l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + } if(l_epoll_events[n].events & EPOLLIN) { @@ -244,6 +253,7 @@ void *dap_worker_thread(void *arg) log_it(L_ERROR, "Some error occured in recv() function: %s", strerror(errno)); dap_events_socket_set_readable_unsafe(l_cur, false); l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + l_cur->buf_out_size = 0; } } else if (!(l_epoll_events[n].events & EPOLLRDHUP) || !(l_epoll_events[n].events & EPOLLERR)) { @@ -308,6 +318,8 @@ void *dap_worker_thread(void *arg) if (l_errno != EAGAIN && l_errno != EWOULDBLOCK ){ // If we have non-blocking socket log_it(L_ERROR, "Some error occured in send(): %s", strerror(errno)); l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE; + l_cur->buf_out_size = 0; + } }else{ @@ -349,7 +361,7 @@ void *dap_worker_thread(void *arg) } } // while - + log_it(L_NOTICE,"Exiting thread #%u", l_worker->id); return NULL; } diff --git a/dap-sdk/net/core/include/dap_events_socket.h b/dap-sdk/net/core/include/dap_events_socket.h index 81bf6a0098557b55c6d8db40b8fda629ea7aadb4..68020f1a339071b7269d987bb897aee2b45f6d7f 100644 --- a/dap-sdk/net/core/include/dap_events_socket.h +++ b/dap-sdk/net/core/include/dap_events_socket.h @@ -95,6 +95,7 @@ typedef struct dap_events_socket_callbacks { dap_events_socket_callback_queue_t queue_callback; // Queue callback for listening socket dap_events_socket_callback_queue_ptr_t queue_ptr_callback; // queue_ptr callback for listening socket }; + dap_events_socket_callback_t new_callback; // Create new client callback dap_events_socket_callback_t delete_callback; // Delete client callback dap_events_socket_callback_t read_callback; // Read function diff --git a/dap-sdk/net/server/enc_server/dap_enc_http.c b/dap-sdk/net/server/enc_server/dap_enc_http.c index eabc22b6f6c1fa57f3781f051c181a719824cc5f..d8db585694eb6436166931bfcef48028aef3273f 100644 --- a/dap-sdk/net/server/enc_server/dap_enc_http.c +++ b/dap-sdk/net/server/enc_server/dap_enc_http.c @@ -75,6 +75,7 @@ static void _enc_http_write_reply(struct dap_http_simple *cl_st, struct json_object *jobj = json_object_new_object(); json_object_object_add(jobj, "encrypt_id", json_object_new_string(encrypt_id)); json_object_object_add(jobj, "encrypt_msg", json_object_new_string(encrypt_msg)); + json_object_object_add(jobj, "dap_protocol_version", json_object_new_int(DAP_PROTOCOL_VERSION)); const char* json_str = json_object_to_json_string(jobj); dap_http_simple_reply(cl_st, (void*) json_str, (size_t) strlen(json_str)); diff --git a/dap-sdk/net/stream/stream/dap_stream.c b/dap-sdk/net/stream/stream/dap_stream.c index 35b1dc91dc4cc1c10d8a467f1eef7a603cb5719a..e9598cdf910e84952313b2e19feb0793448a701b 100644 --- a/dap-sdk/net/stream/stream/dap_stream.c +++ b/dap-sdk/net/stream/stream/dap_stream.c @@ -90,11 +90,28 @@ bool dap_stream_get_dump_packet_headers(){ return s_dump_packet_headers; } static struct timespec keepalive_loop_sleep = { 0, STREAM_KEEPALIVE_TIMEOUT * 1000 * 1000 }; static bool s_detect_loose_packet(dap_stream_t * a_stream); +dap_enc_key_type_t s_stream_get_preferred_encryption_type = DAP_ENC_KEY_TYPE_IAES; + +void s_dap_stream_load_preferred_encryption_type(dap_config_t * a_config){ + const char * l_preferred_encryption_name = dap_config_get_item_str(a_config, "stream", "preferred_encryption"); + if(l_preferred_encryption_name){ + dap_enc_key_type_t l_found_key_type = dap_enc_key_type_find_by_name(l_preferred_encryption_name); + if(l_found_key_type != DAP_ENC_KEY_TYPE_INVALID) + s_stream_get_preferred_encryption_type = l_found_key_type; + } + + log_it(L_NOTICE,"ecryption type is set to %s", dap_enc_get_type_name(s_stream_get_preferred_encryption_type)); +} + +dap_enc_key_type_t dap_stream_get_preferred_encryption_type(){ + return s_stream_get_preferred_encryption_type; +} + /** * @brief stream_init Init stream module * @return 0 if ok others if not */ -int dap_stream_init( bool a_dump_packet_headers) +int dap_stream_init(dap_config_t * a_config) { if( dap_stream_ch_init() != 0 ){ log_it(L_CRITICAL, "Can't init channel types submodule"); @@ -105,7 +122,8 @@ int dap_stream_init( bool a_dump_packet_headers) return -2; } - s_dump_packet_headers = a_dump_packet_headers; + s_dap_stream_load_preferred_encryption_type(a_config); + s_dump_packet_headers = dap_config_get_item_bool_default(g_config,"general","debug_dump_stream_headers",false); s_keep_alive_loop_quit_signal = false; pthread_mutex_init( &s_mutex_keepalive_list, NULL ); //pthread_create( &keepalive_thread, NULL, stream_loop, NULL ); diff --git a/dap-sdk/net/stream/stream/dap_stream_ctl.c b/dap-sdk/net/stream/stream/dap_stream_ctl.c index ef5d72345ae8dbb122417fbf8f92a739da045fd1..dc82ed592ccacebcd472a7bf682e888faec5b6a1 100644 --- a/dap-sdk/net/stream/stream/dap_stream_ctl.c +++ b/dap-sdk/net/stream/stream/dap_stream_ctl.c @@ -69,17 +69,15 @@ static struct { dap_enc_key_type_t type; } s_socket_forward_key; - /** * @brief stream_ctl_init Initialize stream control module * @return Zero if ok others if not */ -int dap_stream_ctl_init(dap_enc_key_type_t socket_forward_key_type, - size_t socket_forward_key_size) +int dap_stream_ctl_init() { - s_socket_forward_key.type = socket_forward_key_type; - s_socket_forward_key.size = socket_forward_key_size; - log_it(L_NOTICE,"Initialized stream control module"); + s_socket_forward_key.size = 32; // Why do we set it, not autodeceting? + s_socket_forward_key.type = dap_stream_get_preferred_encryption_type(); + return 0; } @@ -121,17 +119,22 @@ void s_proc(struct dap_http_simple *a_http_simple, void * a_arg) if(l_dg){ size_t l_channels_str_size = sizeof(ss->active_channels); char l_channels_str[sizeof(ss->active_channels)]; - if(l_dg->url_path && strlen(l_dg->url_path) < 30 && - sscanf(l_dg->url_path, "stream_ctl,channels=%s", l_channels_str) == 1) { + dap_enc_key_type_t l_enc_type = s_socket_forward_key.type; + int l_enc_headers = 0; + bool l_is_legacy=false; + int l_url_sscanf_res = sscanf(l_dg->url_path, "stream_ctl,channels=%16s,enc_type=%d,enc_headers=%d", l_channels_str, &l_enc_type, &l_enc_headers); + if(l_url_sscanf_res > 0){ + if(l_url_sscanf_res < 3){ + log_it(L_INFO, "legacy encryption mode used (OAES)"); + l_enc_type = DAP_ENC_KEY_TYPE_OAES; + l_is_legacy = true; + } l_new_session = true; } else if(strcmp(l_dg->url_path, "socket_forward" ) == 0) { l_channels_str[0] = '\0'; l_new_session = true; } - /* }else if (strcmp(dg->url_path,"stream_ctl")==0) { - l_new_session = true; - }*/ else{ log_it(L_ERROR,"ctl command unknown: %s",l_dg->url_path); enc_http_delegate_delete(l_dg); @@ -144,7 +147,7 @@ void s_proc(struct dap_http_simple *a_http_simple, void * a_arg) strncpy(ss->active_channels, l_channels_str, l_channels_str_size); char *key_str = calloc(1, KEX_KEY_STR_SIZE+1); dap_random_string_fill(key_str, KEX_KEY_STR_SIZE); - ss->key = dap_enc_key_new_generate( s_socket_forward_key.type, key_str, KEX_KEY_STR_SIZE, + ss->key = dap_enc_key_new_generate( l_enc_type, key_str, KEX_KEY_STR_SIZE, NULL, 0, s_socket_forward_key.size); dap_http_header_t *l_hdr_key_id = dap_http_header_find(a_http_simple->http_client->in_headers, "KeyID"); if (l_hdr_key_id) { @@ -156,7 +159,10 @@ void s_proc(struct dap_http_simple *a_http_simple, void * a_arg) } ss->acl = l_ks_key->acl_list; } - enc_http_reply_f(l_dg,"%u %s",ss->id,key_str); + if (l_is_legacy) + enc_http_reply_f(l_dg,"%u %s",ss->id, key_str); + else + enc_http_reply_f(l_dg,"%u %s %u %d %d",ss->id, key_str, DAP_PROTOCOL_VERSION, l_enc_type, l_enc_headers); *return_code = Http_Status_OK; log_it(L_INFO," New stream session %u initialized",ss->id); diff --git a/dap-sdk/net/stream/stream/include/dap_stream.h b/dap-sdk/net/stream/stream/include/dap_stream.h index 888be346b5ae1b3b43b0d3aea5bc9004bfc999e1..0c0325bc3064b3903506d466f48e435025e78162 100644 --- a/dap-sdk/net/stream/stream/include/dap_stream.h +++ b/dap-sdk/net/stream/stream/include/dap_stream.h @@ -26,6 +26,8 @@ #include <pthread.h> #include <stdbool.h> #include <pthread.h> + +#include "dap_config.h" #include "dap_stream_session.h" #include "dap_stream_ch.h" @@ -91,7 +93,7 @@ typedef struct dap_stream { #define DAP_STREAM(a) ((dap_stream_t *) (a)->_inheritor ) -int dap_stream_init(bool a_dump_packet_headers); +int dap_stream_init(dap_config_t * g_config); bool dap_stream_get_dump_packet_headers(); @@ -110,4 +112,6 @@ void dap_stream_proc_pkt_in(dap_stream_t * sid); void dap_stream_es_rw_states_update(struct dap_stream *a_stream); void dap_stream_set_ready_to_write(dap_stream_t * a_stream,bool a_is_ready); +dap_enc_key_type_t dap_stream_get_preferred_encryption_type(); + diff --git a/dap-sdk/net/stream/stream/include/dap_stream_ctl.h b/dap-sdk/net/stream/stream/include/dap_stream_ctl.h index 56fe6e995554b46dbe186ffb0906712194979452..372ea969075bfa899710d2b142a846f173f06594 100644 --- a/dap-sdk/net/stream/stream/include/dap_stream_ctl.h +++ b/dap-sdk/net/stream/stream/include/dap_stream_ctl.h @@ -21,11 +21,11 @@ #pragma once #include "dap_enc.h" +#include "dap_config.h" typedef struct dap_http dap_http_t; #define KEX_KEY_STR_SIZE 128 -int dap_stream_ctl_init(dap_enc_key_type_t socket_forward_key_type, - size_t socket_forward_key_size); +int dap_stream_ctl_init(); void dap_stream_ctl_deinit(); void dap_stream_ctl_add_proc(struct dap_http * sh, const char * url); diff --git a/modules/channel/chain-net/dap_stream_ch_chain_net.c b/modules/channel/chain-net/dap_stream_ch_chain_net.c index 3bc1d829ddea5eb4a4a7e0883de688ec00334cc3..f06679a9a91c0de75cf2280673c780f240fefde6 100644 --- a/modules/channel/chain-net/dap_stream_ch_chain_net.c +++ b/modules/channel/chain-net/dap_stream_ch_chain_net.c @@ -170,9 +170,11 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg) (void) a_arg; //printf("* del session=%d\n", a_ch->stream->session->id); dap_stream_ch_chain_net_t * l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(a_ch); - pthread_mutex_lock(&l_ch_chain_net->mutex); - session_data_del(a_ch->stream->session->id); - pthread_mutex_unlock(&l_ch_chain_net->mutex); + if(l_ch_chain_net) { + pthread_mutex_lock(&l_ch_chain_net->mutex); + session_data_del(a_ch->stream->session->id); + pthread_mutex_unlock(&l_ch_chain_net->mutex); + } } /** diff --git a/modules/channel/chain/dap_stream_ch_chain.c b/modules/channel/chain/dap_stream_ch_chain.c index 607a183509803aa709b230b7353c527da02fae72..1d6eb6a2d6f28c2ec9e75f885f077a0e4285278b 100644 --- a/modules/channel/chain/dap_stream_ch_chain.c +++ b/modules/channel/chain/dap_stream_ch_chain.c @@ -161,7 +161,7 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg) l_ch_chain->state = CHAIN_STATE_SYNC_CHAINS; dap_chain_node_addr_t l_node_addr = { 0 }; dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); - l_node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; + l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN, l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); @@ -200,7 +200,7 @@ bool s_sync_gdb_callback(dap_proc_thread_t *a_thread, void *a_arg) dap_list_t *l_add_groups = dap_chain_net_get_add_gdb_group(l_net, l_ch_chain->request.node_addr); dap_db_log_list_t *l_db_log = dap_db_log_list_start(l_start_item + 1, l_add_groups); dap_chain_node_addr_t l_node_addr = { 0 }; - l_node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; + l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB, l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, &l_node_addr, sizeof(dap_chain_node_addr_t)); @@ -213,7 +213,7 @@ bool s_sync_gdb_callback(dap_proc_thread_t *a_thread, void *a_arg) } else { dap_stream_ch_chain_sync_request_t l_request = {}; //log_it(L_DEBUG, "No items to sync from %u", l_request->id_start + 1); - l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; + l_request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, l_ch_chain->request_net_id, l_ch_chain->request_chain_id, @@ -559,9 +559,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_chain_sync_request_t l_sync_gdb = {}; memcpy(&l_sync_gdb, l_chain_pkt->data, l_chain_pkt_data_size); dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id); - l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ? - dap_chain_net_get_cur_addr(l_net)->uint64 : - dap_db_get_cur_node_addr(l_net->pub.name); + l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); // Get last timestamp in log l_sync_gdb.id_start = (uint64_t) dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); // no limit @@ -678,7 +676,7 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg) // last message dap_stream_ch_chain_sync_request_t l_request = {}; dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id); - l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0; + l_request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64); l_request.id_end = 0; diff --git a/modules/consensus/dag-pos/dap_chain_cs_dag_pos.c b/modules/consensus/dag-pos/dap_chain_cs_dag_pos.c index 597ea51c63afaf5db0f92c5567c1fc847866389c..cb37b8d615f7b32d1c94b6e4ca17a8f634898354 100644 --- a/modules/consensus/dag-pos/dap_chain_cs_dag_pos.c +++ b/modules/consensus/dag-pos/dap_chain_cs_dag_pos.c @@ -231,9 +231,9 @@ static int s_callback_event_verify(dap_chain_cs_dag_t * a_dag, dap_chain_cs_dag_ dap_chain_addr_t l_addr = { 0 }; for ( size_t l_sig_pos=0; l_sig_pos < a_dag_event->header.signs_count; l_sig_pos++ ){ - dap_sign_t * l_sign = dap_chain_cs_dag_event_get_sign(a_dag_event, 0,a_dag_event_size); + dap_sign_t * l_sign = dap_chain_cs_dag_event_get_sign(a_dag_event, a_dag_event_size,l_sig_pos); if ( l_sign == NULL){ - log_it(L_WARNING, "Event is NOT signed with anything"); + log_it(L_WARNING, "Event is NOT signed with anything: sig pos %zd, event size %zd", a_dag_event_size); return -4; } diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index fb3731e98df40e5610f20bdd12398dc30b09f9d5..586c36a59f959d3d6797e5a86edfa1384832c7de 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -344,10 +344,8 @@ static void s_chain_callback_notify(void * a_arg, dap_chain_t *a_chain, dap_chai */ static int s_net_states_proc(dap_chain_net_t * l_net) { - dap_chain_net_pvt_t *l_pvt_net = PVT(l_net); - - int ret=0; + int ret = 0; switch (l_pvt_net->state) { case NET_STATE_OFFLINE: { @@ -373,64 +371,89 @@ static int s_net_states_proc(dap_chain_net_t * l_net) case NET_STATE_LINKS_PREPARE: { log_it(L_NOTICE,"%s.state: NET_STATE_LINKS_PREPARE",l_net->pub.name); + uint64_t l_own_addr = dap_chain_net_get_cur_addr_int(l_net); if (l_pvt_net->node_info) { for (size_t i = 0; i < l_pvt_net->node_info->hdr.links_number; i++) { dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(l_net, &l_pvt_net->node_info->links[i]); - if (!l_link_node_info || l_link_node_info->hdr.address.uint64 == l_pvt_net->node_info->hdr.address.uint64) { + if (!l_link_node_info || l_link_node_info->hdr.address.uint64 == l_own_addr) { continue; // Do not link with self } l_pvt_net->links_info = dap_list_append(l_pvt_net->links_info, l_link_node_info); + if (dap_list_length(l_pvt_net->links_info) >= s_max_links_count) { + break; + } } } else { log_it(L_WARNING,"No nodeinfo in global_db to prepare links for connecting, find nearest 3 links and fill global_db"); } - if (!l_pvt_net->seed_aliases_count) { - log_it(L_ERROR, "No root servers present in configuration file. Can't establish DNS requests"); - l_pvt_net->state_target = l_pvt_net->state = NET_STATE_OFFLINE; - break; - } - switch (l_pvt_net->node_role.enums) { - case NODE_ROLE_ROOT: - case NODE_ROLE_ROOT_MASTER: - case NODE_ROLE_ARCHIVE: - case NODE_ROLE_CELL_MASTER: { - // Add other root nodes as synchronization links - for (int i = 0; i < MIN(s_max_links_count, l_pvt_net->seed_aliases_count); i++) { - dap_chain_node_addr_t *l_link_addr = dap_chain_node_alias_find(l_net, l_pvt_net->seed_aliases[i]); - dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(l_net, l_link_addr); - l_pvt_net->links_info = dap_list_append(l_pvt_net->links_info, l_link_node_info); + if (dap_list_length(l_pvt_net->links_info) < s_max_links_count) { + if (l_pvt_net->seed_aliases_count) { + switch (l_pvt_net->node_role.enums) { + case NODE_ROLE_ROOT: + case NODE_ROLE_ROOT_MASTER: + case NODE_ROLE_ARCHIVE: + case NODE_ROLE_CELL_MASTER: { + // Add other root nodes as synchronization links + for (int i = 0; i < MIN(s_max_links_count, l_pvt_net->seed_aliases_count); i++) { + dap_chain_node_addr_t *l_link_addr = dap_chain_node_alias_find(l_net, l_pvt_net->seed_aliases[i]); + if (l_link_addr->uint64 == l_own_addr) { + continue; // Do not link with self + } + dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(l_net, l_link_addr); + if(l_link_node_info) { + l_pvt_net->links_info = dap_list_append(l_pvt_net->links_info, l_link_node_info); + } + else{ + log_it(L_WARNING, "Not found link "NODE_ADDR_FP_STR" in the node list", NODE_ADDR_FPS_ARGS(l_link_addr)); + } + if (dap_list_length(l_pvt_net->links_info) >= s_max_links_count) { + break; + } + } + } break; + case NODE_ROLE_FULL: + case NODE_ROLE_MASTER: + case NODE_ROLE_LIGHT: + default: { + // Get DNS request result from root nodes as synchronization links + while (dap_list_length(l_pvt_net->links_info) < s_max_links_count) { + int i = rand() % l_pvt_net->seed_aliases_count; + dap_chain_node_addr_t *l_remote_addr = dap_chain_node_alias_find(l_net, l_pvt_net->seed_aliases[i]); + dap_chain_node_info_t *l_remote_node_info = dap_chain_node_info_read(l_net, l_remote_addr); + if(l_remote_node_info) { + dap_chain_node_info_t *l_link_node_info = DAP_NEW_Z(dap_chain_node_info_t); + int res = 0; //dap_dns_client_get_addr(l_remote_node_info->hdr.ext_addr_v4.s_addr, l_net->pub.name, l_link_node_info); + memcpy(l_link_node_info, l_remote_node_info, sizeof(dap_chain_node_info_t)); + if (l_link_node_info->hdr.address.uint64 != l_own_addr) { + l_pvt_net->links_info = dap_list_append(l_pvt_net->links_info, l_link_node_info); + } + DAP_DELETE(l_remote_node_info); + } + else{ + log_it(L_WARNING, "Not found link "NODE_ADDR_FP_STR" in the node list", NODE_ADDR_FPS_ARGS(l_remote_addr)); + } + if (l_pvt_net->state_target == NET_STATE_OFFLINE) { + l_pvt_net->state = NET_STATE_OFFLINE; + break; + } + } + } break; } - } break; - case NODE_ROLE_FULL: - case NODE_ROLE_MASTER: - case NODE_ROLE_LIGHT: - default: { - // Get DNS request result from root nodes as synchronization links - while (dap_list_length(l_pvt_net->links_info) < s_max_links_count) { - int i = rand() % l_pvt_net->seed_aliases_count; - dap_chain_node_addr_t *l_remote_addr = dap_chain_node_alias_find(l_net, l_pvt_net->seed_aliases[i]); - dap_chain_node_info_t *l_remote_node_info = dap_chain_node_info_read(l_net, l_remote_addr); - dap_chain_node_info_t *l_link_node_info = DAP_NEW_Z(dap_chain_node_info_t); - int res = 0;//dap_dns_client_get_addr(l_remote_node_info->hdr.ext_addr_v4.s_addr, l_net->pub.name, l_link_node_info); - memcpy(l_link_node_info, l_remote_node_info, sizeof(dap_chain_node_info_t)); - DAP_DELETE(l_remote_node_info); - if (res) { - DAP_DELETE(l_link_node_info); - } else { - l_pvt_net->links_info = dap_list_append(l_pvt_net->links_info, l_link_node_info); - } - if (l_pvt_net->state_target == NET_STATE_OFFLINE) { - l_pvt_net->state = NET_STATE_OFFLINE; - break; - } + } else { + log_it(L_ERROR, "No root servers present in configuration file. Can't establish DNS requests"); + if (!dap_list_length(l_pvt_net->links_info)) { // No links can be prepared, go offline + l_pvt_net->state_target = NET_STATE_OFFLINE; } - } break; + } } if (l_pvt_net->state_target != NET_STATE_OFFLINE) { - l_pvt_net->state = NET_STATE_LINKS_CONNECTING; - log_it(L_DEBUG, "Prepared %u links, start to establish them", dap_list_length(l_pvt_net->links_info)); + if (dap_list_length(l_pvt_net->links_info)) { + l_pvt_net->state = NET_STATE_LINKS_CONNECTING; + log_it(L_DEBUG, "Prepared %u links, start to establish them", dap_list_length(l_pvt_net->links_info)); + } + // If no links prepared go again to NET_STATE_LINKS_PREPARE } else { - l_pvt_net->state = l_pvt_net->state_target = NET_STATE_OFFLINE; + l_pvt_net->state = NET_STATE_OFFLINE; } } break; @@ -447,13 +470,11 @@ static int s_net_states_proc(dap_chain_net_t * l_net) log_it(L_DEBUG, "Established connection with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address)); l_pvt_net->links = dap_list_append(l_pvt_net->links, l_node_client); } else { + log_it(L_DEBUG, "Can't establish link with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address)); dap_chain_node_client_close(l_node_client); l_node_client = NULL; } - } - if (!l_node_client) { - log_it(L_DEBUG, "Can't establish link with "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_link_info->hdr.address)); - } + } if (dap_list_length(l_pvt_net->links) >= s_required_links_count) { break; } @@ -484,9 +505,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net) // no limit l_sync_gdb.id_end = (uint64_t)0; - l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ? - dap_chain_net_get_cur_addr(l_net)->uint64 : - dap_db_get_cur_node_addr(l_net->pub.name); + l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net); log_it(L_DEBUG, "Prepared request to gdb sync from %llu to %llu", l_sync_gdb.id_start, l_sync_gdb.id_end); // find dap_chain_id_t dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb"); diff --git a/modules/type/dag/dap_chain_cs_dag.c b/modules/type/dag/dap_chain_cs_dag.c index c0de3cc2d7d37f9955134fec723f3fe6d9adebf8..6385aacf84141e8111d2e39249a4a947de32bbd8 100644 --- a/modules/type/dag/dap_chain_cs_dag.c +++ b/modules/type/dag/dap_chain_cs_dag.c @@ -482,7 +482,6 @@ static size_t s_chain_callback_datums_pool_proc(dap_chain_t * a_chain, dap_chain if(l_dag->callback_cs_event_create) l_event = l_dag->callback_cs_event_create(l_dag,l_datum,l_hashes,l_hashes_linked,&l_event_size); if ( l_event&&l_event_size){ // Event is created - if (l_dag->is_add_directy) { if (s_chain_callback_atom_add(a_chain, l_event, l_event_size) == ATOM_ACCEPT) { // add events to file @@ -593,24 +592,29 @@ static dap_chain_atom_verify_res_t s_chain_callback_atom_verify(dap_chain_t * a_ } // genesis or seed mode if (l_event->header.hash_count == 0){ - if(s_seed_mode && !PVT(l_dag)->events) - return ATOM_ACCEPT; - - if (l_dag->is_static_genesis_event ){ - dap_chain_hash_fast_t l_event_hash; - dap_chain_cs_dag_event_calc_hash(l_event,a_atom_size, &l_event_hash); - if ( memcmp( &l_event_hash, &l_dag->static_genesis_event_hash, sizeof(l_event_hash) ) != 0 ){ - char * l_event_hash_str = dap_chain_hash_fast_to_str_new(&l_event_hash); - char * l_genesis_event_hash_str = dap_chain_hash_fast_to_str_new(&l_dag->static_genesis_event_hash); - - log_it(L_WARNING, "Wrong genesis block %s (staticly predefined %s)",l_event_hash_str, l_genesis_event_hash_str); - DAP_DELETE(l_event_hash_str); - DAP_DELETE(l_genesis_event_hash_str); - return ATOM_REJECT; - }else{ - return ATOM_ACCEPT; + if(s_seed_mode && !PVT(l_dag)->events){ + log_it(L_NOTICE,"Accepting genesis event"); + return ATOM_ACCEPT; + }else if(s_seed_mode){ + log_it(L_WARNING,"Cant accept genesis event: already present data in DAG, ->events is not NULL"); + return ATOM_REJECT; + } + + if (l_dag->is_static_genesis_event ){ + dap_chain_hash_fast_t l_event_hash; + dap_chain_cs_dag_event_calc_hash(l_event,a_atom_size, &l_event_hash); + if ( memcmp( &l_event_hash, &l_dag->static_genesis_event_hash, sizeof(l_event_hash) ) != 0 ){ + char * l_event_hash_str = dap_chain_hash_fast_to_str_new(&l_event_hash); + char * l_genesis_event_hash_str = dap_chain_hash_fast_to_str_new(&l_dag->static_genesis_event_hash); + + log_it(L_WARNING, "Wrong genesis block %s (staticly predefined %s)",l_event_hash_str, l_genesis_event_hash_str); + DAP_DELETE(l_event_hash_str); + DAP_DELETE(l_genesis_event_hash_str); + return ATOM_REJECT; + }else{ + return ATOM_ACCEPT; + } } - } } //chain coherence @@ -787,6 +791,7 @@ static dap_chain_atom_iter_t* s_chain_callback_atom_iter_create_from(dap_chain_t dap_chain_atom_iter_t * l_atom_iter = DAP_NEW_Z(dap_chain_atom_iter_t); l_atom_iter->chain = a_chain; l_atom_iter->cur = a_atom; + l_atom_iter->cur_size = a_atom_size; if ( a_atom ){ dap_chain_hash_fast_t l_atom_hash; @@ -976,8 +981,8 @@ static dap_chain_atom_ptr_t s_chain_callback_atom_iter_get_next( dap_chain_atom_ l_event_item = (dap_chain_cs_dag_event_item_t*) a_atom_iter->cur_item; // if l_event_item=NULL then items are over a_atom_iter->cur = l_event_item ? l_event_item->event : NULL; + a_atom_iter->cur_size = a_atom_iter->cur ? l_event_item->event_size : 0; } - a_atom_iter->cur_size = a_atom_iter->cur?a_atom_iter->cur_size: 0; if(a_atom_size) *a_atom_size = a_atom_iter->cur_size; diff --git a/modules/type/dag/dap_chain_cs_dag_event.c b/modules/type/dag/dap_chain_cs_dag_event.c index a9d1d755b57b6591a1c395fe59d05aeebc0c3e34..3b39ba72c0c4cff5530c11a0b2f8e1c3e1325bb3 100644 --- a/modules/type/dag/dap_chain_cs_dag_event.c +++ b/modules/type/dag/dap_chain_cs_dag_event.c @@ -77,9 +77,10 @@ dap_chain_cs_dag_event_t * dap_chain_cs_dag_event_new(dap_chain_id_t a_chain_id, size_t l_sign_size = dap_sign_get_size(l_sign); l_event_new = (dap_chain_cs_dag_event_t* )DAP_REALLOC(l_event_new,l_event_size+l_sign_size ); memcpy(l_event_new->hashes_n_datum_n_signs + l_hashes_size + l_datum_size, l_sign, l_sign_size); - *a_event_size += l_sign_size; l_event_size += l_sign_size; + *a_event_size = l_event_size; l_event_new->header.signs_count++; + log_it(L_INFO,"Created event size %zd, signed with sign size %zd", l_event_size, l_sign_size); DAP_DELETE(l_sign); }else { log_it(L_ERROR,"Can't sign dag event!");