diff --git a/core/include/dap_common.h b/core/include/dap_common.h index 0b9f115e2eb089a95a39616b3493f9274a04eb23..ac24502bbab7df75f0b34f2e2bee98ee19291ff6 100755 --- a/core/include/dap_common.h +++ b/core/include/dap_common.h @@ -302,11 +302,12 @@ DAP_STATIC_INLINE void _dap_page_aligned_free(void *ptr) { /* * 23: added support for encryption key type parameter and option to encrypt headers * 24: Update hashes protocol + * 25: Added node sign */ -#define DAP_PROTOCOL_VERSION 24 +#define DAP_PROTOCOL_VERSION 25 #define DAP_PROTOCOL_VERSION_DEFAULT 24 // used if version is not explicitly specified -#define DAP_CLIENT_PROTOCOL_VERSION 24 +#define DAP_CLIENT_PROTOCOL_VERSION 25 #if (__SIZEOF_LONG__ == 4) || defined (DAP_OS_DARWIN) #define DAP_UINT64_FORMAT_X "llX" @@ -772,3 +773,23 @@ int exec_silent(const char *a_cmd); #ifdef __cplusplus } #endif + +#if __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ +#define NODE_ADDR_FP_STR "%04hX::%04hX::%04hX::%04hX" +#define NODE_ADDR_FP_ARGS(a) a->words[2],a->words[3],a->words[0],a->words[1] +#define NODE_ADDR_FPS_ARGS(a) &a->words[2],&a->words[3],&a->words[0],&a->words[1] +#define NODE_ADDR_FP_ARGS_S(a) a.words[2],a.words[3],a.words[0],a.words[1] +#define NODE_ADDR_FPS_ARGS_S(a) &a.words[2],&a.words[3],&a.words[0],&a.words[1] +#else +#define NODE_ADDR_FP_STR "%04hX::%04hX::%04hX::%04hX" +#define NODE_ADDR_FP_ARGS(a) a->words[3],a->words[2],a->words[1],a->words[0] +#define NODE_ADDR_FPS_ARGS(a) &a->words[3],&a->words[2],&a->words[1],&a->words[0] +#define NODE_ADDR_FP_ARGS_S(a) a.words[3],a.words[2],a.words[1],a.words[0] +#define NODE_ADDR_FPS_ARGS_S(a) &a.words[3],&a.words[2],&a.words[1],&a.words[0] +#endif + +typedef union dap_stream_node_addr { + uint64_t uint64; + uint16_t words[sizeof(uint64_t)/2]; + uint8_t raw[sizeof(uint64_t)]; // Access to selected octects +} DAP_ALIGN_PACKED dap_stream_node_addr_t; \ No newline at end of file diff --git a/net/client/dap_client.c b/net/client/dap_client.c index bec4a18cfb36b8a5f068a50701e4d5a7b1201175..a60f3061bad0db995ee02464608b04a1ebaae8c5 100644 --- a/net/client/dap_client.c +++ b/net/client/dap_client.c @@ -610,10 +610,10 @@ dap_stream_ch_t * dap_client_get_stream_ch_unsafe(dap_client_t * a_client, uint8 * @param a_client * @return */ -const char * dap_client_get_stream_id(dap_client_t * a_client) +uint32_t dap_client_get_stream_id(dap_client_t * a_client) { if(!(a_client || !DAP_CLIENT_PVT(a_client))) - return NULL; + return 0; return DAP_CLIENT_PVT(a_client)->stream_id; } diff --git a/net/client/dap_client_pvt.c b/net/client/dap_client_pvt.c index f5c57059e62ab54fb0411dcb224c6a952436c6ac..526e82e12a78e70d916f69fb29e85011b5a8bbe6 100644 --- a/net/client/dap_client_pvt.c +++ b/net/client/dap_client_pvt.c @@ -68,6 +68,8 @@ static time_t s_client_timeout_active_after_connect_seconds = 15; static void s_stage_status_after(dap_client_pvt_t * a_client_internal); +static int s_add_cert_sign_to_data(const dap_cert_t *a_cert, uint8_t **a_data, size_t *a_size, const void* a_signing_data, size_t a_signing_size); +static int s_json_multy_obj_parse_str(const char *a_key, const char *a_val, int a_count, ...); // ENC stage callbacks static void s_enc_init_response(dap_client_t *a_client, void *a_data, size_t a_data_size); @@ -142,7 +144,7 @@ static void s_client_internal_clean(dap_client_pvt_t *a_client_pvt) dap_stream_delete_unsafe(a_client_pvt->stream); a_client_pvt->stream = NULL; a_client_pvt->stream_es = NULL; - a_client_pvt->stream_id[0] = 0; + a_client_pvt->stream_id = 0; a_client_pvt->stream_key = NULL; } @@ -324,6 +326,33 @@ static bool s_timer_reconnect_callback(void *a_arg) return false; } +/** + * @brief s_add_cert_sign_to_data + * @param a_cert - certificate to get sign + * @param a_size - input/output data size + * @param a_data - input/output data + * @param a_signing_data - signing data + * @param a_signing_size - signing data size + */ +int s_add_cert_sign_to_data(const dap_cert_t *a_cert, uint8_t **a_data, size_t *a_size, const void* a_signing_data, size_t a_signing_size) +{ + dap_return_val_if_pass(!a_cert || !a_size || !a_data, 0); + + dap_sign_t *l_sign = dap_sign_create(a_cert->enc_key, a_signing_data, a_signing_size, 0); + dap_return_val_if_pass(!l_sign, 0); + + size_t l_sign_size = dap_sign_get_size(l_sign); + *a_data = DAP_REALLOC(*a_data, (*a_size + l_sign_size) * sizeof(uint8_t)); + if (!*a_data) { + log_it(L_CRITICAL, "Memory allocation error"); + return 0; + } + memcpy(*a_data + *a_size, l_sign, l_sign_size); + *a_size += l_sign_size; + DAP_DELETE(l_sign); + return 1; +} + /** * @brief s_client_internal_stage_status_proc * @param a_client @@ -368,38 +397,34 @@ static void s_stage_status_after(dap_client_pvt_t *a_client_pvt) a_client_pvt->last_error = ERROR_OUT_OF_MEMORY; break; } - size_t l_key_size = a_client_pvt->session_key_open->pub_key_data_size; - dap_cert_t *l_cert = a_client_pvt->client->auth_cert; - dap_sign_t *l_sign = NULL; - size_t l_sign_size = 0; - if (l_cert) { - l_sign = dap_sign_create(l_cert->enc_key, a_client_pvt->session_key_open->pub_key_data, l_key_size, 0); - l_sign_size = dap_sign_get_size(l_sign); - } - uint8_t l_data[l_key_size + l_sign_size]; - memset(l_data, 0, sizeof(l_data)); - memcpy(l_data,a_client_pvt->session_key_open->pub_key_data, l_key_size); - if (l_sign) { - memcpy(l_data + l_key_size, l_sign, l_sign_size); - } - size_t l_data_str_size_max = DAP_ENC_BASE64_ENCODE_SIZE(l_key_size + l_sign_size); - char l_data_str[l_data_str_size_max + 1]; - memset(l_data_str, 0, sizeof(l_data_str)); + size_t l_data_size = a_client_pvt->session_key_open->pub_key_data_size; + uint8_t *l_data = DAP_NEW_Z_SIZE(uint8_t, l_data_size); + memcpy(l_data, a_client_pvt->session_key_open->pub_key_data, a_client_pvt->session_key_open->pub_key_data_size); + + dap_cert_t *l_node_cert = dap_cert_find_by_name("node-addr"); + size_t l_sign_count = s_add_cert_sign_to_data(a_client_pvt->client->auth_cert, &l_data, &l_data_size, a_client_pvt->session_key_open->pub_key_data, a_client_pvt->session_key_open->pub_key_data_size) + + s_add_cert_sign_to_data(l_node_cert, &l_data, &l_data_size, a_client_pvt->session_key_open->pub_key_data, a_client_pvt->session_key_open->pub_key_data_size); + + + size_t l_data_str_size_max = DAP_ENC_BASE64_ENCODE_SIZE(l_data_size); + char *l_data_str = DAP_NEW_Z_SIZE(char, l_data_str_size_max + 1); // DAP_ENC_DATA_TYPE_B64_URLSAFE not need because send it by POST request - size_t l_data_str_enc_size = dap_enc_base64_encode(l_data, l_key_size + l_sign_size, l_data_str, DAP_ENC_DATA_TYPE_B64); + size_t l_data_str_enc_size = dap_enc_base64_encode(l_data, l_data_size, l_data_str, DAP_ENC_DATA_TYPE_B64); debug_if(s_debug_more, L_DEBUG, "ENC request size %zu", l_data_str_enc_size); char l_enc_init_url[1024] = { '\0' }; snprintf(l_enc_init_url, sizeof(l_enc_init_url), DAP_UPLINK_PATH_ENC_INIT - "/gd4y5yh78w42aaagh" "?enc_type=%d,pkey_exchange_type=%d,pkey_exchange_size=%zd,block_key_size=%zd", - a_client_pvt->session_key_type, a_client_pvt->session_key_open_type, l_key_size, - a_client_pvt->session_key_block_size ); + "/gd4y5yh78w42aaagh" "?enc_type=%d,pkey_exchange_type=%d,pkey_exchange_size=%zd,block_key_size=%zd,protocol_version=%d,sign_count=%zu", + a_client_pvt->session_key_type, a_client_pvt->session_key_open_type, a_client_pvt->session_key_open->pub_key_data_size, + a_client_pvt->session_key_block_size, DAP_CLIENT_PROTOCOL_VERSION, l_sign_count); int l_res = dap_client_pvt_request(a_client_pvt, l_enc_init_url, l_data_str, l_data_str_enc_size, s_enc_init_response, s_enc_init_error); // bad request if (l_res < 0) a_client_pvt->stage_status = STAGE_STATUS_ERROR; + DAP_DEL_Z(l_data); + DAP_DEL_Z(l_data_str); } break; case STAGE_STREAM_CTL: { @@ -409,7 +434,7 @@ static void s_stage_status_after(dap_client_pvt_t *a_client_pvt) size_t l_request_size; l_request_size = snprintf(l_request, sizeof(l_request), "%d", DAP_CLIENT_PROTOCOL_VERSION); - + debug_if(s_debug_more, L_DEBUG, "STREAM_CTL request size %zu", l_request_size); char *l_suburl; @@ -420,7 +445,7 @@ static void s_stage_status_after(dap_client_pvt_t *a_client_pvt) if(l_least_common_dap_protocol < 23){ l_suburl = dap_strdup_printf("stream_ctl,channels=%s", a_client_pvt->client->active_channels); - }else{ + } else { l_suburl = dap_strdup_printf("channels=%s,enc_type=%d,enc_key_size=%zu,enc_headers=%d", a_client_pvt->client->active_channels, a_client_pvt->session_key_type, a_client_pvt->session_key_block_size, 0); @@ -519,6 +544,7 @@ static void s_stage_status_after(dap_client_pvt_t *a_client_pvt) return; } *l_stream_es_uuid_ptr = a_client_pvt->stream_es->uuid; + dap_stream_change_id(a_client_pvt->session_key, a_client_pvt->stream_id); // change id in hash tab dap_timerfd_start_on_worker(a_client_pvt->worker, (unsigned long)s_client_timeout_active_after_connect_seconds * 1000, s_stream_timer_timeout_check,l_stream_es_uuid_ptr); } @@ -535,7 +561,7 @@ static void s_stage_status_after(dap_client_pvt_t *a_client_pvt) dap_events_socket_delete_unsafe(a_client_pvt->stream_es, true); a_client_pvt->stage_status = STAGE_STATUS_ERROR; a_client_pvt->last_error = ERROR_STREAM_CONNECT; - }else{ + } else { log_it(L_INFO, "Connecting stream to remote %s:%u", a_client_pvt->client->uplink_addr, a_client_pvt->client->uplink_port); // add to dap_worker assert (a_client_pvt->stream_es); @@ -549,6 +575,7 @@ static void s_stage_status_after(dap_client_pvt_t *a_client_pvt) return; } *l_stream_es_uuid_ptr = a_client_pvt->stream_es->uuid; + dap_stream_change_id(a_client_pvt->session_key, a_client_pvt->stream_id); // change id in hash tab dap_timerfd_start_on_worker(a_client_pvt->worker, (unsigned long)s_client_timeout_active_after_connect_seconds * 1000, s_stream_timer_timeout_check,l_stream_es_uuid_ptr); } @@ -562,6 +589,7 @@ static void s_stage_status_after(dap_client_pvt_t *a_client_pvt) if(!a_client_pvt->stream){ a_client_pvt->stage_status = STAGE_STATUS_ERROR; a_client_pvt->last_error = ERROR_STREAM_ABORTED; + dap_stream_delete_prep_addr(a_client_pvt->stream_id, NULL); s_stage_status_after(a_client_pvt); return; } @@ -571,7 +599,7 @@ static void s_stage_status_after(dap_client_pvt_t *a_client_pvt) dap_stream_ch_new(a_client_pvt->stream, (uint8_t)a_client_pvt->client->active_channels[i]); char l_full_path[2048]; - snprintf(l_full_path, sizeof(l_full_path) - 1, "%s/globaldb?session_id=%s", DAP_UPLINK_PATH_STREAM, + snprintf(l_full_path, sizeof(l_full_path) - 1, "%s/globaldb?session_id=%u", DAP_UPLINK_PATH_STREAM, dap_client_get_stream_id(a_client_pvt->client)); dap_events_socket_write_f_unsafe( a_client_pvt->stream_es, "GET /%s HTTP/1.1\r\n" @@ -588,6 +616,7 @@ static void s_stage_status_after(dap_client_pvt_t *a_client_pvt) a_client_pvt->reconnect_attempts = 0; a_client_pvt->stage_status = STAGE_STATUS_DONE; + dap_stream_add_stream_info(a_client_pvt->stream, a_client_pvt->stream_id); s_stage_status_after(a_client_pvt); } break; @@ -877,6 +906,32 @@ static void s_request_response(void * a_response, size_t a_response_size, void * } } +/** + * @brief s_json_str_multy_obj_parse - check a_key and copy a_val to args. Args count should be even. + * @param a_key - compare key + * @param a_val - coping value + * @param a_count - args count + * @return count of success copies + */ +int s_json_multy_obj_parse_str(const char *a_key, const char *a_val, int a_count, ...) +{ + dap_return_val_if_pass(!a_key || !a_val || a_count % 2, 0); + int l_ret = 0; + va_list l_args; + va_start(l_args, a_count); + for (int i = 0; i < a_count / 2; ++i) { + const char *l_key = va_arg(l_args, const char *); + char **l_pointer = va_arg(l_args, char **); + if(!strcmp(a_key, l_key)) { + DAP_DEL_Z(*l_pointer); + *l_pointer = dap_strdup(a_val); + l_ret++; + } + } + va_end(l_args); + return l_ret; +} + /** * @brief s_enc_init_response * @param a_client @@ -907,7 +962,8 @@ static void s_enc_init_response(dap_client_t *a_client, void * a_data, size_t a_ char *l_session_id_b64 = NULL; char *l_bob_message_b64 = NULL; - int json_parse_count = 0; + char *l_node_sign_b64 = NULL; + int l_json_parse_count = 0; struct json_object *jobj = json_tokener_parse((const char *) a_data); if(jobj) { // parse encrypt_id & encrypt_msg @@ -915,24 +971,16 @@ static void s_enc_init_response(dap_client_t *a_client, void * a_data, size_t a_ { if(json_object_get_type(val) == json_type_string) { const char *l_str = json_object_get_string(val); - if(!strcmp(key, "encrypt_id")) { - DAP_DEL_Z(l_session_id_b64); - l_session_id_b64 = DAP_NEW_Z_SIZE(char, strlen(l_str) + 1); - strcpy(l_session_id_b64, l_str); - json_parse_count++; - } - if(!strcmp(key, "encrypt_msg")) { - DAP_DEL_Z(l_bob_message_b64); - l_bob_message_b64 = DAP_NEW_Z_SIZE(char, strlen(l_str) + 1); - strcpy(l_bob_message_b64, l_str); - json_parse_count++; - } + l_json_parse_count += s_json_multy_obj_parse_str( key, l_str, 6, + "encrypt_id", &l_session_id_b64, + "encrypt_msg", &l_bob_message_b64, + "node_sign", &l_node_sign_b64); } if(json_object_get_type(val) == json_type_int) { int val_int = json_object_get_int(val); if(!strcmp(key, "dap_protocol_version")) { l_client_pvt->remote_protocol_version = val_int; - json_parse_count++; + l_json_parse_count++; } } } @@ -941,7 +989,7 @@ static void s_enc_init_response(dap_client_t *a_client, void * a_data, size_t a_ if(!l_client_pvt->remote_protocol_version) l_client_pvt->remote_protocol_version = DAP_PROTOCOL_VERSION_DEFAULT; } - if (json_parse_count < 2 || json_parse_count > 3) { + if (l_json_parse_count < 2 || l_json_parse_count > 4) { l_client_pvt->last_error = ERROR_ENC_NO_KEY; log_it(L_ERROR, "ENC: Wrong response (size %zu data '%s')", a_data_size, (char* ) a_data); } @@ -977,6 +1025,7 @@ static void s_enc_init_response(dap_client_t *a_client, void * a_data, size_t a_ } } DAP_DEL_Z(l_bob_message_b64); + if (l_client_pvt->last_error == ERROR_NO_ERROR) { size_t l_rc = l_client_pvt->session_key_open->gen_alice_shared_key( l_client_pvt->session_key_open, l_client_pvt->session_key_open->priv_key_data, @@ -986,7 +1035,6 @@ static void s_enc_init_response(dap_client_t *a_client, void * a_data, size_t a_ l_client_pvt->last_error = ERROR_ENC_WRONG_KEY; } } - DAP_DEL_Z(l_bob_message); if (l_client_pvt->last_error == ERROR_NO_ERROR) { l_client_pvt->session_key = dap_enc_key_new_generate(l_client_pvt->session_key_type, l_client_pvt->session_key_open->priv_key_data, // shared key @@ -999,6 +1047,20 @@ static void s_enc_init_response(dap_client_t *a_client, void * a_data, size_t a_ log_it(L_WARNING, "ENC: initialized encryption but current stage is %s (%s)", dap_client_get_stage_str(a_client), dap_client_get_stage_status_str(a_client)); } + if (l_client_pvt->last_error == ERROR_NO_ERROR && l_node_sign_b64) { + dap_sign_t *l_sign = (dap_sign_t *)DAP_NEW_Z_SIZE(uint8_t, strlen(l_node_sign_b64) + 1); + if (!l_sign){ + log_it(L_CRITICAL, "Memory allocation error"); + } + size_t l_decode_len = dap_enc_base64_decode(l_node_sign_b64, strlen(l_node_sign_b64), l_sign, DAP_ENC_DATA_TYPE_B64); + if (!dap_sign_verify_all(l_sign, l_decode_len, l_bob_message, l_bob_message_size)) { + dap_stream_add_addr(dap_stream_get_addr_from_sign(l_sign), l_client_pvt->session_key); + } else { + log_it(L_WARNING, "ENC: Invalid node sign"); + } + } + DAP_DEL_Z(l_node_sign_b64); + DAP_DEL_Z(l_bob_message); } if (l_client_pvt->last_error == ERROR_NO_ERROR) l_client_pvt->stage_status = STAGE_STATUS_DONE; @@ -1061,7 +1123,6 @@ static void s_stream_ctl_response(dap_client_t * a_client, void * a_data, size_t s_stage_status_after(l_client_pvt); } else { int l_arg_count; - char l_stream_id[26] = { 0 }; char *l_stream_key = DAP_NEW_Z_SIZE(char, 4096 * 3); if (!l_stream_key) { log_it(L_CRITICAL, "Memory allocation error"); @@ -1071,9 +1132,10 @@ static void s_stream_ctl_response(dap_client_t * a_client, void * a_data, size_t uint32_t l_remote_protocol_version; dap_enc_key_type_t l_enc_type = l_client_pvt->session_key_type; int l_enc_headers = 0; + uint32_t l_stream_id_int = 0; - l_arg_count = sscanf(l_response_str, "%25s %4096s %u %d %d" - , l_stream_id, l_stream_key, &l_remote_protocol_version, &l_enc_type, &l_enc_headers); + l_arg_count = sscanf(l_response_str, "%u %4096s %u %d %d" + , &l_stream_id_int, l_stream_key, &l_remote_protocol_version, &l_enc_type, &l_enc_headers); if(l_arg_count < 2) { log_it(L_WARNING, "STREAM_CTL Need at least 2 arguments in reply (got %d)", l_arg_count); l_client_pvt->last_error = ERROR_STREAM_CTL_ERROR_RESPONSE_FORMAT; @@ -1088,17 +1150,16 @@ static void s_stream_ctl_response(dap_client_t * a_client, void * a_data, size_t log_it(L_WARNING, "No uplink protocol version, use default version %d" , l_client_pvt->uplink_protocol_version = DAP_PROTOCOL_VERSION_DEFAULT); - if(strlen(l_stream_id) < 13) { + if(l_stream_id_int) { //log_it(L_DEBUG, "Stream server id %s, stream key length(base64 encoded) %u" // ,l_stream_id,strlen(l_stream_key) ); - log_it(L_DEBUG, "Stream server id %s", l_stream_id); + log_it(L_DEBUG, "Stream server id %u", l_stream_id_int); // Delete old key if present if(l_client_pvt->stream_key) dap_enc_key_delete(l_client_pvt->stream_key); - strncpy(l_client_pvt->stream_id, (char *)l_stream_id, sizeof(l_client_pvt->stream_id) -1 ); - l_client_pvt->stream_id[sizeof(l_client_pvt->stream_id) - 1] = '\0'; + l_client_pvt->stream_id = l_stream_id_int; l_client_pvt->stream_key = dap_enc_key_new_generate(l_enc_type, l_stream_key, strlen(l_stream_key), NULL, 0, 32); @@ -1144,6 +1205,7 @@ static void s_stream_ctl_error(dap_client_t * a_client, UNUSED_ARG void *a_arg, l_client_pvt->last_error = ERROR_STREAM_CTL_ERROR; } l_client_pvt->stage_status = STAGE_STATUS_ERROR; + dap_stream_delete_prep_addr(0, l_client_pvt->session_key); s_stage_status_after(l_client_pvt); @@ -1299,6 +1361,8 @@ static void s_stream_es_callback_error(dap_events_socket_t * a_es, int a_error) } l_client_pvt->stage_status = STAGE_STATUS_ERROR; l_client_pvt->stream->esocket = NULL; // Prevent to delete twice + if (l_client_pvt->stream && l_client_pvt->stream->node.uint64) + dap_stream_delete_addr(l_client_pvt->stream->node, false); s_stage_status_after(l_client_pvt); a_es->_inheritor = NULL; // To prevent delete in reactor } diff --git a/net/client/include/dap_client.h b/net/client/include/dap_client.h index 99161d8883d9ec5d5f6cbb6a05b367768f39c5a0..1c8a7f8febba59dec3996b8dc80a5758a12a263d 100644 --- a/net/client/include/dap_client.h +++ b/net/client/include/dap_client.h @@ -160,7 +160,7 @@ const char * dap_client_get_auth_cookie(dap_client_t * a_client); dap_stream_t * dap_client_get_stream(dap_client_t * a_client); dap_stream_worker_t * dap_client_get_stream_worker(dap_client_t * a_client); dap_stream_ch_t * dap_client_get_stream_ch_unsafe(dap_client_t * a_client, uint8_t a_ch_id); -const char * dap_client_get_stream_id(dap_client_t * a_client); +uint32_t dap_client_get_stream_id(dap_client_t * a_client); void dap_client_set_active_channels_unsafe (dap_client_t * a_client, const char * a_active_channels); void dap_client_set_auth_cert_unsafe(dap_client_t * a_client, dap_cert_t *a_cert); void dap_client_set_auth_cert(dap_client_t *a_client, const char *a_chain_net_name); diff --git a/net/client/include/dap_client_pvt.h b/net/client/include/dap_client_pvt.h index 3079972d35b00b1eb85dc1ea689a8604ee824020..91e9b480f9c3ffd7a2b9168d305bd6fd1bf9f484 100644 --- a/net/client/include/dap_client_pvt.h +++ b/net/client/include/dap_client_pvt.h @@ -50,7 +50,7 @@ typedef struct dap_client_pvt { dap_enc_key_t *session_key_open; // Open assymetric keys exchange dap_enc_key_t *session_key; // Symmetric private key for session encryption dap_enc_key_t *stream_key; // Stream private key for stream encryption - char stream_id[25]; + uint32_t stream_id; char *session_key_id; uint32_t uplink_protocol_version; uint32_t remote_protocol_version; diff --git a/net/server/enc_server/dap_enc_http.c b/net/server/enc_server/dap_enc_http.c index 86ea8ae93d14e552b257b1984a59b1161d161204..8283a42e915466e6836ee7949f4c7c6a9ecfb032 100644 --- a/net/server/enc_server/dap_enc_http.c +++ b/net/server/enc_server/dap_enc_http.c @@ -50,10 +50,13 @@ #include "dap_enc_http_ban_list_client.h" #include "json.h" #include "dap_enc_http_ban_list_client.h" - +#include "dap_cert.h" +#include "dap_strfuncs.h" #define LOG_TAG "dap_enc_http" +dap_stream_node_addr_t dap_stream_get_addr_from_sign(dap_sign_t *a_sign); + static dap_enc_acl_callback_t s_acl_callback = NULL; int enc_http_init() @@ -67,18 +70,19 @@ void enc_http_deinit() } -static void _enc_http_write_reply(struct dap_http_simple *cl_st, - const char* encrypt_id, - const char* encrypt_msg) +static void _enc_http_write_reply(struct dap_http_simple *a_cl_st, + const char* a_encrypt_id, + const char* a_encrypt_msg, const char *a_node_sign) { - struct json_object *jobj = json_object_new_object(); - json_object_object_add(jobj, "encrypt_id", json_object_new_string(encrypt_id)); - json_object_object_add(jobj, "encrypt_msg", json_object_new_string(encrypt_msg)); - json_object_object_add(jobj, "dap_protocol_version", json_object_new_int(DAP_PROTOCOL_VERSION)); - const char* json_str = json_object_to_json_string(jobj); - dap_http_simple_reply(cl_st, (void*) json_str, - (size_t) strlen(json_str)); - json_object_put(jobj); + struct json_object *l_jobj = json_object_new_object(); + json_object_object_add(l_jobj, "encrypt_id", json_object_new_string(a_encrypt_id)); + json_object_object_add(l_jobj, "encrypt_msg", json_object_new_string(a_encrypt_msg)); + if (a_node_sign) + json_object_object_add(l_jobj, "node_sign", json_object_new_string(a_node_sign)); + json_object_object_add(l_jobj, "dap_protocol_version", json_object_new_int(DAP_PROTOCOL_VERSION)); + const char* l_json_str = json_object_to_json_string(l_jobj); + dap_http_simple_reply(a_cl_st, (void*) l_json_str, (size_t) strlen(l_json_str)); + json_object_put(l_jobj); } void dap_enc_http_json_response_format_enable(bool); @@ -106,33 +110,50 @@ void enc_http_proc(struct dap_http_simple *cl_st, void * arg) return; } - if(strcmp(cl_st->http_client->url_path,"gd4y5yh78w42aaagh") == 0 ) { + if(!strcmp(cl_st->http_client->url_path,"gd4y5yh78w42aaagh")) { dap_enc_key_type_t l_pkey_exchange_type =DAP_ENC_KEY_TYPE_MSRLN ; dap_enc_key_type_t l_enc_block_type = DAP_ENC_KEY_TYPE_IAES; - size_t l_pkey_exchange_size=MSRLN_PKA_BYTES; + size_t l_pkey_exchange_size = MSRLN_PKA_BYTES; size_t l_block_key_size=32; - sscanf(cl_st->http_client->in_query_string, "enc_type=%d,pkey_exchange_type=%d,pkey_exchange_size=%zu,block_key_size=%zu", - &l_enc_block_type,&l_pkey_exchange_type,&l_pkey_exchange_size,&l_block_key_size); + int l_protocol_version = 0; + size_t l_sign_count = 0; + char *encrypt_msg = NULL, *encrypt_id = NULL; + sscanf(cl_st->http_client->in_query_string, "enc_type=%d,pkey_exchange_type=%d,pkey_exchange_size=%zu,block_key_size=%zu,protocol_version=%d,sign_count=%zu", + &l_enc_block_type,&l_pkey_exchange_type,&l_pkey_exchange_size,&l_block_key_size, &l_protocol_version, &l_sign_count); log_it(L_DEBUG, "Stream encryption: %s\t public key exchange: %s",dap_enc_get_type_name(l_enc_block_type), dap_enc_get_type_name(l_pkey_exchange_type)); uint8_t alice_msg[cl_st->request_size]; size_t l_decode_len = dap_enc_base64_decode(cl_st->request, cl_st->request_size, alice_msg, DAP_ENC_DATA_TYPE_B64); - dap_chain_hash_fast_t l_sign_hash = { }; - if (l_decode_len > l_pkey_exchange_size + sizeof(dap_sign_hdr_t)) { - /* Message contains pubkey and serialized sign */ - dap_sign_t *l_sign = (dap_sign_t *)&alice_msg[l_pkey_exchange_size]; - size_t l_sign_size = l_decode_len - l_pkey_exchange_size; - int l_verify_ret = dap_sign_verify_all(l_sign, l_sign_size, alice_msg, l_pkey_exchange_size); + dap_chain_hash_fast_t l_sign_hash = {0}; + if (!l_protocol_version && !l_sign_count) { + if (l_decode_len > l_pkey_exchange_size + sizeof(dap_sign_hdr_t)) { + l_sign_count = 1; + } else if (l_decode_len != l_pkey_exchange_size) { + /* No sign inside */ + log_it(L_WARNING, "Wrong message size, without a valid sign must be = %zu", l_pkey_exchange_size); + *return_code = Http_Status_BadRequest; + return; + } + } + + /* Verify all signs */ + dap_sign_t *l_sign = NULL; + size_t l_bias = l_pkey_exchange_size; + size_t l_sign_validated_count = 0; + for(; l_sign_validated_count < l_sign_count && l_bias < l_decode_len; ++l_sign_validated_count) { + l_sign = (dap_sign_t *)&alice_msg[l_bias]; + int l_verify_ret = dap_sign_verify_all(l_sign, l_decode_len - l_bias, alice_msg, l_pkey_exchange_size); if (l_verify_ret) { log_it(L_ERROR, "Can't authorize, sign verification didn't pass (err %d)", l_verify_ret); *return_code = Http_Status_Unauthorized; return; } - } else if (l_decode_len != l_pkey_exchange_size) { - /* No sign inside */ - log_it(L_WARNING, "Wrong message size, without a valid sign must be = %zu", l_pkey_exchange_size); - *return_code = Http_Status_BadRequest; + l_bias += dap_sign_get_size(l_sign); + } + if (l_sign_validated_count != l_sign_count) { + log_it(L_ERROR, "Can't authorize all %zu signs", l_sign_count); + *return_code = Http_Status_Unauthorized; return; } @@ -147,31 +168,65 @@ void enc_http_proc(struct dap_http_simple *cl_st, void * arg) (void**) &l_pkey_exchange_key->pub_key_data); } - dap_enc_ks_key_t * l_enc_key_ks = dap_enc_ks_new(); + dap_enc_ks_key_t *l_enc_key_ks = dap_enc_ks_new(); + dap_return_if_pass(!l_enc_key_ks); if (s_acl_callback) { l_enc_key_ks->acl_list = s_acl_callback(&l_sign_hash); } else { log_it(L_DEBUG, "Callback for ACL is not set, pass anauthorized"); } - - char encrypt_msg[DAP_ENC_BASE64_ENCODE_SIZE(l_pkey_exchange_key->pub_key_data_size) + 1]; + + if ( + !(encrypt_msg = DAP_NEW_Z_SIZE(char, DAP_ENC_BASE64_ENCODE_SIZE(l_pkey_exchange_key->pub_key_data_size) + 1)) || + !(encrypt_id = DAP_NEW_Z_SIZE(char, DAP_ENC_BASE64_ENCODE_SIZE(DAP_ENC_KS_KEY_ID_SIZE) + 1)) + ) { + log_it(L_CRITICAL, "Memory allocation error"); + dap_enc_key_delete(l_pkey_exchange_key); + *return_code = Http_Status_InternalServerError; + return; + } size_t encrypt_msg_size = dap_enc_base64_encode(l_pkey_exchange_key->pub_key_data, l_pkey_exchange_key->pub_key_data_size, encrypt_msg, DAP_ENC_DATA_TYPE_B64); - encrypt_msg[encrypt_msg_size] = '\0'; l_enc_key_ks->key = dap_enc_key_new_generate(l_enc_block_type, l_pkey_exchange_key->priv_key_data, // shared key l_pkey_exchange_key->priv_key_data_size, l_enc_key_ks->id, DAP_ENC_KS_KEY_ID_SIZE, l_block_key_size); + dap_enc_ks_save_in_storage(l_enc_key_ks); - char encrypt_id[DAP_ENC_BASE64_ENCODE_SIZE(DAP_ENC_KS_KEY_ID_SIZE) + 1]; - size_t encrypt_id_size = dap_enc_base64_encode(l_enc_key_ks->id, sizeof (l_enc_key_ks->id), encrypt_id, DAP_ENC_DATA_TYPE_B64); - encrypt_id[encrypt_id_size] = '\0'; - _enc_http_write_reply(cl_st, encrypt_id, encrypt_msg); + // save verified node addr and generate own sign + char* l_node_sign_msg = NULL; + if (l_protocol_version && l_sign_count) { + dap_stream_add_addr(dap_stream_get_addr_from_sign(l_sign), l_enc_key_ks); + + dap_cert_t *l_node_cert = dap_cert_find_by_name("node-addr"); + dap_sign_t *l_node_sign = dap_sign_create(l_node_cert->enc_key,l_pkey_exchange_key->pub_key_data, l_pkey_exchange_key->pub_key_data_size, 0); + if (!l_node_sign) { + dap_enc_key_delete(l_pkey_exchange_key); + *return_code = Http_Status_InternalServerError; + return; + } + size_t l_node_sign_size = dap_sign_get_size(l_node_sign); + size_t l_node_sign_size_new = DAP_ENC_BASE64_ENCODE_SIZE(l_node_sign_size) + 1; + + l_node_sign_msg = DAP_NEW_Z_SIZE(char, l_node_sign_size_new); + if (!l_node_sign_msg) { + log_it(L_CRITICAL, "Memory allocation error"); + dap_enc_key_delete(l_pkey_exchange_key); + *return_code = Http_Status_InternalServerError; + DAP_DELETE(l_node_sign); + return; + } + l_node_sign_size = dap_enc_base64_encode(l_node_sign, l_node_sign_size, l_node_sign_msg, DAP_ENC_DATA_TYPE_B64); + DAP_DELETE(l_node_sign); + } + + _enc_http_write_reply(cl_st, encrypt_id, encrypt_msg, l_node_sign_msg); dap_enc_key_delete(l_pkey_exchange_key); + DAP_DEL_Z(l_node_sign_msg); *return_code = Http_Status_OK; diff --git a/net/stream/ch/dap_stream_ch.c b/net/stream/ch/dap_stream_ch.c index 7d5569c32002d9c13995d7fbe46db2310840f723..9209f8f6fce248d748c125581e458207835c7cf1 100644 --- a/net/stream/ch/dap_stream_ch.c +++ b/net/stream/ch/dap_stream_ch.c @@ -146,7 +146,7 @@ dap_stm_ch_rec_t *l_rec; /* Add new record into the hash table */ l_rc = pthread_rwlock_wrlock(&s_stm_ch_lock); assert(!l_rc); - HASH_ADD(hh, s_stm_chs, stm_ch, sizeof(dap_events_socket_t *), l_rec ); + HASH_ADD(hh, s_stm_chs, stm_ch, sizeof(dap_stream_ch_t *), l_rec ); #ifdef DAP_SYS_DEBUG s_memstat[MEMSTAT$K_STM_CH].alloc_nr += 1; diff --git a/net/stream/session/dap_stream_session.c b/net/stream/session/dap_stream_session.c index 81dcc9d6d7b368508c41cd4ec4140902c1cd0645..7796315b4405a2f530f69fbac62de3666c069c68 100644 --- a/net/stream/session/dap_stream_session.c +++ b/net/stream/session/dap_stream_session.c @@ -143,13 +143,13 @@ dap_stream_session_t * dap_stream_session_new(unsigned int media_id, bool open_p * @param id * @return */ -dap_stream_session_t *dap_stream_session_id_mt(uint32_t id ) +dap_stream_session_t *dap_stream_session_id_mt(uint32_t a_id) { - dap_stream_session_t *ret; + dap_stream_session_t *l_ret = NULL; dap_stream_session_lock(); - HASH_FIND(hh, s_sessions, &id, sizeof(uint32_t), ret); + HASH_FIND(hh, s_sessions, &a_id, sizeof(uint32_t), l_ret); dap_stream_session_unlock(); - return ret; + return l_ret; } /** diff --git a/net/stream/session/include/dap_stream_session.h b/net/stream/session/include/dap_stream_session.h index 25d1fb2a83a68672987c23809a92d23842ce84c3..7904603078fc8d46e54a1eb5dd04f5bb0f0f6e05 100644 --- a/net/stream/session/include/dap_stream_session.h +++ b/net/stream/session/include/dap_stream_session.h @@ -74,10 +74,10 @@ void dap_stream_session_deinit(); dap_list_t* dap_stream_session_get_list_sessions(void); void dap_stream_session_get_list_sessions_unlock(void); -dap_stream_session_t * dap_stream_session_pure_new(); -dap_stream_session_t * dap_stream_session_new(uint32_t media_id, bool open_preview); -dap_stream_session_t * dap_stream_session_id_mt(uint32_t id); -dap_stream_session_t *dap_stream_session_id_unsafe(uint32_t id ); +dap_stream_session_t *dap_stream_session_pure_new(); +dap_stream_session_t *dap_stream_session_new(uint32_t media_id, bool open_preview); +dap_stream_session_t *dap_stream_session_id_mt(uint32_t a_id); +dap_stream_session_t *dap_stream_session_id_unsafe(uint32_t id); void dap_stream_session_lock(); void dap_stream_session_unlock(); diff --git a/net/stream/stream/dap_stream.c b/net/stream/stream/dap_stream.c index 60cb527b7991358ff445368b31a87dc1d08508c5..dcf69842d014beac62645cfe0780fe6ae3355ad5 100644 --- a/net/stream/stream/dap_stream.c +++ b/net/stream/stream/dap_stream.c @@ -55,9 +55,27 @@ #include "dap_client_pvt.h" #include "dap_strfuncs.h" #include "uthash.h" +#include "dap_enc_ks.h" #define LOG_TAG "dap_stream" +typedef struct authorized_stream { + dap_stream_node_addr_t node; + union { + unsigned long num; + void *pointer; + } id; + dap_events_socket_uuid_t esocket_uuid; + dap_stream_worker_t *stream_worker; + UT_hash_handle hh; +} authorized_stream_t; + +static authorized_stream_t *s_authorized_streams = NULL; +static authorized_stream_t *s_authorized_streams_dublicate = NULL; +static authorized_stream_t *s_prep_authorized_streams = NULL; +static pthread_rwlock_t s_steams_lock = PTHREAD_RWLOCK_INITIALIZER; + +static int s_add_stream_info(authorized_stream_t **a_hash_table, authorized_stream_t *a_item, dap_stream_t *a_stream); static void s_stream_proc_pkt_in(dap_stream_t * a_stream, dap_stream_pkt_t *l_pkt, size_t l_pkt_size); @@ -312,13 +330,13 @@ dap_stream_t *s_stream_new(dap_http_client_t *a_http_client) atomic_fetch_add(&s_memstat[MEMSTAT$K_STM].alloc_nr, 1); #endif - l_ret->esocket = a_http_client->esocket; l_ret->esocket_uuid = a_http_client->esocket->uuid; l_ret->stream_worker = (dap_stream_worker_t *)a_http_client->esocket->context->worker->_inheritor; l_ret->conn_http = a_http_client; l_ret->seq_id = 0; l_ret->client_last_seq_id_packet = (size_t)-1; + l_ret->sign_group = UNSIGNED; // Start server keep-alive timer dap_events_socket_uuid_t *l_es_uuid = DAP_NEW_Z(dap_events_socket_uuid_t); if (!l_es_uuid) { @@ -343,7 +361,7 @@ dap_stream_t *s_stream_new(dap_http_client_t *a_http_client) * @param a_es * @return */ -dap_stream_t* dap_stream_new_es_client(dap_events_socket_t * a_esocket) +dap_stream_t *dap_stream_new_es_client(dap_events_socket_t *a_esocket) { dap_stream_t *l_ret = DAP_NEW_Z(dap_stream_t); if (!l_ret) { @@ -369,8 +387,8 @@ dap_stream_t* dap_stream_new_es_client(dap_events_socket_t * a_esocket) * @param a_stream */ void dap_stream_delete_unsafe(dap_stream_t *a_stream) -{ - if(a_stream == NULL) { +{ + if(!a_stream) { log_it(L_ERROR,"stream delete NULL instance"); return; } @@ -390,6 +408,7 @@ void dap_stream_delete_unsafe(dap_stream_t *a_stream) atomic_fetch_add(&s_memstat[MEMSTAT$K_STM].free_nr, 1); #endif + dap_stream_delete_addr(a_stream->node, true); DAP_DEL_Z(a_stream->buf_fragments); DAP_DEL_Z(a_stream->pkt_buf_in); DAP_DELETE(a_stream); @@ -424,50 +443,51 @@ void s_http_client_headers_read(dap_http_client_t * a_http_client, void * a_arg) // char * raw=0; // int raw_size; - unsigned int id=0; + unsigned int l_id=0; //log_it(L_DEBUG,"Prepare data stream"); if(a_http_client->in_query_string[0]){ log_it(L_INFO,"Query string [%s]",a_http_client->in_query_string); // if(sscanf(cl_ht->in_query_string,"fj913htmdgaq-d9hf=%u",&id)==1){ - if(sscanf(a_http_client->in_query_string,"session_id=%u",&id) == 1 || - sscanf(a_http_client->in_query_string,"fj913htmdgaq-d9hf=%u",&id) == 1) { - dap_stream_session_t * ss=NULL; - ss=dap_stream_session_id_mt(id); - if(ss==NULL){ - log_it(L_ERROR,"No session id %u was found",id); + if(sscanf(a_http_client->in_query_string,"session_id=%u",&l_id) == 1 || + sscanf(a_http_client->in_query_string,"fj913htmdgaq-d9hf=%u",&l_id) == 1) { + dap_stream_session_t *l_ss = dap_stream_session_id_mt(l_id); + if(!l_ss) { + log_it(L_ERROR,"No session id %u was found", l_id); a_http_client->reply_status_code=404; strcpy(a_http_client->reply_reason_phrase,"Not found"); - }else{ - log_it(L_INFO,"Session id %u was found with channels = %s",id,ss->active_channels); - if(dap_stream_session_open(ss)==0){ // Create new stream - dap_stream_t * sid = s_stream_new(a_http_client); - if (!sid) { + } else { + log_it(L_INFO,"Session id %u was found with channels = %s", l_id, l_ss->active_channels); + if(!dap_stream_session_open(l_ss)){ // Create new stream + dap_stream_t *l_sid = s_stream_new(a_http_client); + if (!l_sid) { log_it(L_CRITICAL, "Memory allocation error"); a_http_client->reply_status_code=404; return; } - sid->session=ss; + l_sid->session = l_ss; dap_http_header_t *header = dap_http_header_find(a_http_client->in_headers, "Service-Key"); if (header) - ss->service_key = strdup(header->value); - size_t count_channels = strlen(ss->active_channels); + l_ss->service_key = strdup(header->value); + size_t count_channels = strlen(l_ss->active_channels); for(size_t i = 0; i < count_channels; i++) { - dap_stream_ch_t * l_ch = dap_stream_ch_new(sid, ss->active_channels[i]); + dap_stream_ch_t * l_ch = dap_stream_ch_new(l_sid, l_ss->active_channels[i]); l_ch->ready_to_read = true; - //sid->channel[i]->ready_to_write = true; + //l_sid->channel[i]->ready_to_write = true; } a_http_client->reply_status_code=200; strcpy(a_http_client->reply_reason_phrase,"OK"); - stream_states_update(sid); + stream_states_update(l_sid); a_http_client->state_read=DAP_HTTP_CLIENT_STATE_DATA; a_http_client->state_write=DAP_HTTP_CLIENT_STATE_START; dap_events_socket_set_readable_unsafe(a_http_client->esocket,true); dap_events_socket_set_writable_unsafe(a_http_client->esocket,true); // Dirty hack, because previous function shouldn't // // set write flag off but it does! + + dap_stream_add_stream_info(l_sid, l_sid->session->id); }else{ - log_it(L_ERROR,"Can't open session id %u",id); + log_it(L_ERROR,"Can't open session id %u", l_id); a_http_client->reply_status_code=404; strcpy(a_http_client->reply_reason_phrase,"Not found"); } @@ -1001,3 +1021,204 @@ static bool s_callback_server_keepalive(void *a_arg) { return s_callback_keepalive(a_arg, true); } + +/** + * @brief dap_stream_add_addr Adding autorized stream to hash table + * @param a_node - autorrized node address + * @param a_id - pointer use as ID + * @param a_stream - using stream + * @param a_protocol_version - client protocol version + * @return 0 if ok others if not + */ +int dap_stream_add_addr(dap_stream_node_addr_t a_addr, void *a_id) +{ + authorized_stream_t *l_a_stream = DAP_NEW_Z(authorized_stream_t); + if(!l_a_stream) { + log_it(L_CRITICAL, "Memory allocation error"); + return -1; + } + l_a_stream->node.uint64 = a_addr.uint64; + l_a_stream->id.pointer = a_id; + + assert(!pthread_rwlock_wrlock(&s_steams_lock)); + HASH_ADD(hh, s_prep_authorized_streams, id, sizeof(l_a_stream->id), l_a_stream); + assert(!pthread_rwlock_unlock(&s_steams_lock)); + return 0; +} + +/** + * @brief dap_stream_change_id change session id in hash table + * @param a_old - old session value id + * @param a_new - new session value id + * @return 0 if ok others if not + */ +int dap_stream_change_id(void *a_old, uint64_t a_new) +{ + dap_return_val_if_pass(!a_old, -1); + authorized_stream_t *l_a_stream = NULL; + int l_ret = -1; + assert(!pthread_rwlock_wrlock(&s_steams_lock)); + HASH_FIND(hh, s_prep_authorized_streams, &a_old, sizeof(a_old), l_a_stream); + if (l_a_stream) { + HASH_DEL(s_prep_authorized_streams, l_a_stream); + l_a_stream->id.num = a_new; + HASH_ADD(hh, s_prep_authorized_streams, id, sizeof(l_a_stream->id), l_a_stream); + l_ret = 0; + } + assert(!pthread_rwlock_unlock(&s_steams_lock)); + return l_ret; +} + +/** + * @brief s_add_stream_info Adding autorized stream to target hash tables + * @param a_hash_table - using hashtable + * @param a_item - inserted item + * @param a_stream - using stream + * @return 0 if ok others if not + */ +int s_add_stream_info(authorized_stream_t **a_hash_table, authorized_stream_t *a_item, dap_stream_t *a_stream) +{ + dap_return_val_if_pass(!a_hash_table || !a_item || !a_stream, -1); + authorized_stream_t *l_a_stream = NULL; + HASH_FIND(hh, *a_hash_table, &a_item->node, sizeof(a_item->node), l_a_stream); + if (l_a_stream){ + log_it(L_WARNING,"Trying replace stream in hash table for node "NODE_ADDR_FP_STR"", NODE_ADDR_FP_ARGS_S(l_a_stream->node)); + return -1; + } + a_item->esocket_uuid = a_stream->esocket_uuid; + a_item->stream_worker = a_stream->stream_worker; + a_stream->node.uint64 = a_item->node.uint64; + a_stream->sign_group = BASE_NODE_SIGN; + HASH_ADD(hh, *a_hash_table, node, sizeof(a_item->node), a_item); + return 0; +} + +/** + * @brief dap_stream_add_stream_info Adding autorized stream to hash tables + * @param a_stream - using stream + * @param a_id - id to finding node in preparing table + * @return 0 if ok others if not + */ +int dap_stream_add_stream_info(dap_stream_t *a_stream, uint64_t a_id) +{ + dap_return_val_if_pass(!a_stream, -1); + authorized_stream_t *l_a_stream = NULL; + int l_ret = 0; + assert(!pthread_rwlock_wrlock(&s_steams_lock)); + HASH_FIND(hh, s_prep_authorized_streams, &a_id, sizeof(a_id), l_a_stream); + if (l_a_stream) { + HASH_DEL(s_prep_authorized_streams, l_a_stream); + if(s_add_stream_info(&s_authorized_streams, l_a_stream, a_stream) && + s_add_stream_info(&s_authorized_streams_dublicate, l_a_stream, a_stream)) { + DAP_DELETE(l_a_stream); // free memory if we have dublicates in both tables + l_ret = -1; + } + } + assert(!pthread_rwlock_unlock(&s_steams_lock)); + return l_ret; +} + +/** + * @brief dap_stream_delete_addr Delete autorized stream from hash table + * and memory free + * @param a_node - autorrized node address + * @param a_full - search and delete stream in s_authorized_streams_dublicate + * @return 0 if ok others if not + */ +int dap_stream_delete_addr(dap_stream_node_addr_t a_addr, bool a_full) +{ + authorized_stream_t *l_a_stream = NULL; + assert(!pthread_rwlock_wrlock(&s_steams_lock)); + HASH_FIND(hh, s_authorized_streams, &a_addr, sizeof(a_addr), l_a_stream); + if (l_a_stream) { + HASH_DEL(s_authorized_streams, l_a_stream); + DAP_DEL_Z(l_a_stream); + } + // if full - clean all, if not - transfer from dublicate to main table + HASH_FIND(hh, s_authorized_streams_dublicate, &a_addr, sizeof(a_addr), l_a_stream); + if (l_a_stream) { + HASH_DEL(s_authorized_streams_dublicate, l_a_stream); + if (a_full) { + DAP_DEL_Z(l_a_stream); + } else { + HASH_ADD(hh, s_authorized_streams, node, sizeof(l_a_stream->node), l_a_stream); + } + } + assert(!pthread_rwlock_unlock(&s_steams_lock)); + return 0; +} + +/** + * @brief dap_stream_delete_prep_addr Delete preparing autorized stream from hash table + * and memory free + * @param a_num_id - num value session ID + * @param a_pointer_id - pointer value session ID + * @return 0 if ok others if not + */ +int dap_stream_delete_prep_addr(uint64_t a_num_id, void *a_pointer_id) +{ + dap_return_val_if_pass(!a_num_id && !a_pointer_id, -1); + authorized_stream_t *l_a_stream = NULL; + assert(!pthread_rwlock_wrlock(&s_steams_lock)); + if(a_num_id) { + HASH_FIND(hh, s_prep_authorized_streams, &a_num_id, sizeof(a_num_id), l_a_stream); + if (l_a_stream) { + HASH_DEL(s_prep_authorized_streams, l_a_stream); + DAP_DEL_Z(l_a_stream); + } + } + if(a_pointer_id) { + HASH_FIND(hh, s_prep_authorized_streams, &a_pointer_id, sizeof(a_pointer_id), l_a_stream); + if (l_a_stream) { + HASH_DEL(s_prep_authorized_streams, l_a_stream); + DAP_DEL_Z(l_a_stream); + } + } + assert(!pthread_rwlock_unlock(&s_steams_lock)); + return 0; +} + +/** + * @brief dap_stream_find_by_addr find a_stream with current node + * @param a_addr - autorrized node address + * @param a_worker - pointer to worker + * @return esocket_uuid if ok 0 if not + */ +dap_events_socket_uuid_t dap_stream_find_by_addr(dap_stream_node_addr_t a_addr, dap_worker_t **a_worker) +{ + dap_return_val_if_pass(!a_worker, 0); + authorized_stream_t *l_a_stream = NULL; + assert(!pthread_rwlock_wrlock(&s_steams_lock)); + HASH_FIND(hh, s_authorized_streams, &a_addr, sizeof(a_addr), l_a_stream); + assert(!pthread_rwlock_unlock(&s_steams_lock)); + dap_return_val_if_pass(!l_a_stream, 0); // return if not finded + *a_worker = l_a_stream->stream_worker->worker; + return l_a_stream->esocket_uuid; +} + +/** + * @brief dap_stream_get_addr_from_sign create dap_stream_node_addr_t from dap_sign_t, need memory free + * @param a_hash - pointer to hash_fast_t + * @return pointer if ok NULL if not + */ +dap_stream_node_addr_t dap_stream_get_addr_from_sign(dap_sign_t *a_sign) { + + dap_stream_node_addr_t l_ret = {0}; + dap_return_val_if_pass(!a_sign, l_ret); + + dap_enc_key_t *l_key = dap_sign_to_enc_key(a_sign); + dap_chain_hash_fast_t l_hash = {0}; + size_t l_pub_key_data_size = 0; + uint8_t *l_pub_key_data = NULL; + l_pub_key_data = dap_enc_key_serialize_pub_key(l_key, &l_pub_key_data_size); + + dap_return_val_if_fail(l_pub_key_data_size > 0 && dap_hash_fast(l_pub_key_data, l_pub_key_data_size, &l_hash) == 1, l_ret); + + l_ret.words[3] = (uint16_t) *(uint16_t*) (l_hash.raw); + l_ret.words[2] = (uint16_t) *(uint16_t*) (l_hash.raw + 2); + l_ret.words[1] = (uint16_t) *(uint16_t*) (l_hash.raw + DAP_CHAIN_HASH_FAST_SIZE - 4); + l_ret.words[0] = (uint16_t) *(uint16_t*) (l_hash.raw + DAP_CHAIN_HASH_FAST_SIZE - 2); + + log_it(L_INFO, "Verified stream sign from node "NODE_ADDR_FP_STR"\n", NODE_ADDR_FP_ARGS_S(l_ret)); + return l_ret; +} \ No newline at end of file diff --git a/net/stream/stream/dap_stream_ctl.c b/net/stream/stream/dap_stream_ctl.c index 114222fb0b7655da7ae015e0299eb1ccc3dde01e..b2cdbf66524b06d9cd9050093c3a912c748e2d8b 100644 --- a/net/stream/stream/dap_stream_ctl.c +++ b/net/stream/stream/dap_stream_ctl.c @@ -186,6 +186,7 @@ void s_stream_ctl_proc(struct dap_http_simple *a_http_simple, void *a_arg) return; } l_stream_session->acl = l_ks_key->acl_list; + dap_stream_change_id(l_ks_key, l_stream_session->id); } if (l_is_legacy) enc_http_reply_f(l_dg, "%u %s", l_stream_session->id, l_key_str); diff --git a/net/stream/stream/include/dap_stream.h b/net/stream/stream/include/dap_stream.h index 1c9270b3e3de7f424fda31b99ee42a0ae2f38f1d..4e57d7e662a0485c24ff3998bd510999738f88c8 100644 --- a/net/stream/stream/include/dap_stream.h +++ b/net/stream/stream/include/dap_stream.h @@ -31,6 +31,7 @@ #include "dap_config.h" #include "dap_stream_session.h" #include "dap_timerfd.h" +#include "dap_sign.h" /* #define CHUNK_SIZE_MAX (3 * 1024) #define STREAM_BUF_SIZE_MAX DAP_STREAM_PKT_SIZE_MAX @@ -41,6 +42,11 @@ typedef struct dap_stream_ch dap_stream_ch_t; typedef struct dap_stream_worker dap_stream_worker_t; +typedef enum dap_stream_sign_group { + UNSIGNED = 0, + BASE_NODE_SIGN, +} dap_stream_sign_group_t; + typedef struct dap_stream { int id; dap_stream_session_t * session; @@ -78,6 +84,8 @@ typedef struct dap_stream { struct dap_stream *prev, *next; + dap_stream_sign_group_t sign_group; + dap_stream_node_addr_t node; } dap_stream_t; typedef void (*dap_stream_callback)(dap_stream_t *, void *); @@ -104,3 +112,12 @@ void dap_stream_es_rw_states_update(struct dap_stream *a_stream); void dap_stream_set_ready_to_write(dap_stream_t * a_stream,bool a_is_ready); dap_enc_key_type_t dap_stream_get_preferred_encryption_type(); + +// autorization stream block +int dap_stream_add_addr(dap_stream_node_addr_t a_addr, void *a_id); +int dap_stream_delete_addr(dap_stream_node_addr_t a_addr, bool a_full); +int dap_stream_delete_prep_addr(uint64_t a_num_id, void *a_pointer_id); +int dap_stream_add_stream_info(dap_stream_t *a_stream, uint64_t a_id); +int dap_stream_change_id(void *a_old, uint64_t a_new); +dap_events_socket_uuid_t dap_stream_find_by_addr(dap_stream_node_addr_t a_addr, dap_worker_t **a_worker); +dap_stream_node_addr_t dap_stream_get_addr_from_sign(dap_sign_t *a_sign); \ No newline at end of file