diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index df65303c5f108ca90d1ac08c650d45b43554f299..957a59499f0b3dbbe1e19b906005c27862be1e21 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -20,8 +20,13 @@ tests:amd64.debian: image: demlabs/debian/amd64:qt5 before_script: /opt/buildtools/prepare_environment.sh amd64-linux script: + - pwd - mkdir build - - cd build && cmake .. -DCMAKE_BUILD_TYPE=Release -DBUILD_DAP_SDK_TESTS=ON -DBUILD_WITH_ECDSA=ON && make -j$(nproc) && ctest --verbose + - cd build && cmake .. -DCMAKE_BUILD_TYPE=Release -DBUILD_DAP_SDK_TESTS=ON -DBUILD_WITH_ECDSA=ON && make -j$(nproc) && ctest --verbose && make cppcheck + - cd /opt/buildtools/ && python3 -m cppcheck_codequality --input-file=${CI_PROJECT_DIR}/build/cppcheck_results.xml --output-file=${CI_PROJECT_DIR}/build/cppcheck.json + artifacts: + reports: + codequality: build/cppcheck.json tests:arm32.debian: extends: .tests diff --git a/cmake/OS_Detection.cmake b/cmake/OS_Detection.cmake index 0aa3ef6d678c308b8e32c5602ece64c8fe32624e..7129600b3b89e63fc2f49f18b947dbe0aa08b573 100644 --- a/cmake/OS_Detection.cmake +++ b/cmake/OS_Detection.cmake @@ -257,3 +257,28 @@ if ( CELLFRAME_NO_OPTIMIZATION) set(DAP_CRYPTO_XKCP_PLAINC ON) endif () +FIND_PROGRAM(CPPCHECK "cppcheck") +IF(CPPCHECK) + # Set export commands on + message("[!] CPPCHECK FOUND, ADDED TARGET CPPCHECK") + + SET(CMAKE_EXPORT_COMPILE_COMMANDS ON) + + ADD_CUSTOM_TARGET( + cppcheck + COMMAND + ${CPPCHECK} --enable=all --project=${CMAKE_BINARY_DIR}/compile_commands.json --std=c++11 --verbose --quiet + --xml-version=2 --language=c++ --suppress=missingIncludeSystem + --output-file=${CMAKE_BINARY_DIR}/cppcheck_results.xml ${CHECK_CXX_SOURCE_FILES} + COMMENT "Generate cppcheck report for the project") + + FIND_PROGRAM(CPPCHECK_HTML "cppcheck-htmlreport") + IF(CPPCHECK_HTML) + ADD_CUSTOM_TARGET( + cppcheck-html + COMMAND ${CPPCHECK_HTML} --title=${CMAKE_PROJECT_NAME} --file=${CMAKE_BINARY_DIR}/cppcheck_results.xml + --report-dir=${CMAKE_BINARY_DIR}/cppcheck_results --source-dir=${CMAKE_SOURCE_DIR} + COMMENT "Convert cppcheck report to HTML output") + ADD_DEPENDENCIES(cppcheck-html cppcheck) + ENDIF() +ENDIF() diff --git a/core/src/dap_common.c b/core/src/dap_common.c index 9ae95ccfad2245e06acbff5797090ff2d8e9ff92..3e2563e0e8d4f67f8f4d12f3f3cf58dd97d8aeaa 100755 --- a/core/src/dap_common.c +++ b/core/src/dap_common.c @@ -196,16 +196,16 @@ static void print_it_stderr(unsigned a_off, const char *a_fmt, va_list va) { vfprintf(s_print_param, a_fmt, va); } - +*/ #if ANDROID #include <android/log.h> static void print_it_alog (unsigned a_off, const char *a_fmt, va_list va) { - __android_log_vprint(ANDROID_LOG_INFO, s_print_param, a_fmt, va); + __android_log_vprint(ANDROID_LOG_INFO, "CellframeNodeNative", a_fmt, va); } -#endif*/ +#endif void dap_log_set_external_output(LOGGER_EXTERNAL_OUTPUT output, void *param) { @@ -230,10 +230,9 @@ void dap_log_set_external_output(LOGGER_EXTERNAL_OUTPUT output, void *param) s_print_callback = print_it_none; break; #ifdef ANDROID - /*case LOGGER_OUTPUT_ALOG: + case LOGGER_OUTPUT_ALOG: s_print_callback = print_it_alog; - s_print_param = param; - break;*/ + break; #endif default: diff --git a/crypto/include/dap_hash.h b/crypto/include/dap_hash.h index 7a194c8bdc8cce0b369c4266f595985e84fa239f..bb93022650bbe67e7fc8b03c288a8bd079c8062a 100755 --- a/crypto/include/dap_hash.h +++ b/crypto/include/dap_hash.h @@ -127,8 +127,13 @@ DAP_STATIC_INLINE int dap_chain_hash_fast_to_str(const dap_hash_fast_t *a_hash, return DAP_CHAIN_HASH_FAST_STR_SIZE; } -const char *dap_chain_hash_fast_to_str_static(const dap_hash_fast_t *a_hash); +DAP_STATIC_INLINE dap_hash_str_t dap_chain_hash_fast_to_hash_str(const dap_hash_fast_t *a_hash) { + dap_hash_str_t l_ret = { }; + dap_chain_hash_fast_to_str(a_hash, l_ret.s, DAP_CHAIN_HASH_FAST_STR_SIZE); + return l_ret; +} +#define dap_chain_hash_fast_to_str_static(hash) dap_chain_hash_fast_to_hash_str(hash).s #define dap_hash_fast_to_str dap_chain_hash_fast_to_str #define dap_hash_fast_to_str_static dap_chain_hash_fast_to_str_static @@ -164,13 +169,14 @@ DAP_STATIC_INLINE char *dap_hash_fast_str_new( const void *a_data, size_t a_data return NULL; } -#define dap_get_data_hash_str_static(data,data_size,strname) \ -do { \ - strname = DAP_NEW_STACK_SIZE(char, DAP_CHAIN_HASH_FAST_STR_SIZE); \ - dap_hash_fast_t dummy_hash; \ - dap_hash_fast(data,data_size,&dummy_hash); \ - dap_chain_hash_fast_to_str(&dummy_hash,strname,DAP_CHAIN_HASH_FAST_STR_SIZE); \ -} while (0) +DAP_STATIC_INLINE dap_hash_str_t dap_get_data_hash_str(const void *a_data, size_t a_data_size) +{ + dap_hash_str_t l_ret = { }; + dap_hash_fast_t dummy_hash; + dap_hash_fast(a_data, a_data_size, &dummy_hash); + dap_chain_hash_fast_to_str(&dummy_hash, l_ret.s, DAP_CHAIN_HASH_FAST_STR_SIZE); + return l_ret; +} #ifdef __cplusplus } diff --git a/crypto/src/dap_cert.c b/crypto/src/dap_cert.c index ac96ce52d293f7bebc1b7e0d42dadb380c101130..64331a984df40cc65ed0aac5e179571782ec994f 100755 --- a/crypto/src/dap_cert.c +++ b/crypto/src/dap_cert.c @@ -236,11 +236,8 @@ dap_cert_t * dap_cert_generate_mem_with_seed(const char * a_cert_name, dap_enc_k if (l_enc_key) { dap_cert_t * l_cert = dap_cert_new(a_cert_name); l_cert->enc_key = l_enc_key; - if (a_seed && a_seed_size) { - char *l_hash_str; - dap_get_data_hash_str_static(a_seed, a_seed_size, l_hash_str); - log_it(L_DEBUG, "Certificate generated with seed hash %s", l_hash_str); - } + if (a_seed && a_seed_size) + log_it(L_DEBUG, "Certificate generated with seed hash %s", dap_get_data_hash_str(a_seed, a_seed_size).s); return l_cert; } else { log_it(L_ERROR,"Can't generate key in memory!"); @@ -607,11 +604,7 @@ char *dap_cert_dump(dap_cert_t *a_cert) const char *dap_cert_get_folder(int a_n_folder_path) { char **l_p = utarray_eltptr(s_cert_folders, (u_int)a_n_folder_path); - if (!l_p) { - log_it(L_ERROR, "No default cert path check 'ca_folders' in cellframe-node.cfg"); - return NULL; - } else - return *l_p; + return l_p ? *l_p : ( log_it(L_ERROR, "No default cert path, check \"ca_folders\" in cellframe-node.cfg"), NULL ); } diff --git a/crypto/src/dap_hash.c b/crypto/src/dap_hash.c index a54345830af99c639aae615b5a943114ddcdf21e..94eda5d58ae95633d363549d50c2badd1c2eda97 100755 --- a/crypto/src/dap_hash.c +++ b/crypto/src/dap_hash.c @@ -78,10 +78,3 @@ int dap_chain_hash_fast_from_str( const char *a_hash_str, dap_chain_hash_fast_t { return dap_chain_hash_fast_from_hex_str(a_hash_str, a_hash) && dap_chain_hash_fast_from_base58_str(a_hash_str, a_hash); } - -const char *dap_chain_hash_fast_to_str_static(const dap_hash_fast_t *a_hash) -{ - _Thread_local static char s_hash_str[DAP_HASH_FAST_STR_SIZE]; - return dap_chain_hash_fast_to_str(a_hash, s_hash_str, sizeof(s_hash_str)) == DAP_CHAIN_HASH_FAST_STR_SIZE - ? s_hash_str : NULL; -} diff --git a/io/dap_proc_thread.c b/io/dap_proc_thread.c index ba9148cb54b3d859c36f52b6d489fb92258c84a0..e8fab8317f409e2740f0f2200ae6a38584260ef1 100644 --- a/io/dap_proc_thread.c +++ b/io/dap_proc_thread.c @@ -220,25 +220,24 @@ struct timer_arg { dap_proc_thread_t *thread; dap_thread_timer_callback_t callback; void *callback_arg; + bool oneshot; dap_queue_msg_priority_t priority; }; static bool s_thread_timer_callback(void *a_arg) { struct timer_arg *l_arg = a_arg; - l_arg->callback(l_arg->callback_arg); - return false; + return l_arg->callback(l_arg->callback_arg), false; } static bool s_timer_callback(void *a_arg) { struct timer_arg *l_arg = a_arg; - dap_proc_thread_callback_add_pri(l_arg->thread, s_thread_timer_callback, l_arg, l_arg->priority); - // Repeat after exit - return true; + // Repeat after exit, if not oneshot + return dap_proc_thread_callback_add_pri(l_arg->thread, s_thread_timer_callback, l_arg, l_arg->priority), !l_arg->oneshot; } -int dap_proc_thread_timer_add_pri(dap_proc_thread_t *a_thread, dap_thread_timer_callback_t a_callback, void *a_callback_arg, uint64_t a_timeout_ms, dap_queue_msg_priority_t a_priority) +int dap_proc_thread_timer_add_pri(dap_proc_thread_t *a_thread, dap_thread_timer_callback_t a_callback, void *a_callback_arg, uint64_t a_timeout_ms, bool a_oneshot, dap_queue_msg_priority_t a_priority) { dap_return_val_if_fail(a_callback && a_timeout_ms, -1); dap_proc_thread_t *l_thread = a_thread ? a_thread : dap_proc_thread_get_auto(); @@ -248,7 +247,9 @@ int dap_proc_thread_timer_add_pri(dap_proc_thread_t *a_thread, dap_thread_timer_ return -2; } struct timer_arg *l_timer_arg = DAP_NEW_Z(struct timer_arg); - *l_timer_arg = (struct timer_arg){ .thread = l_thread, .callback = a_callback, .callback_arg = a_callback_arg, .priority = a_priority }; + *l_timer_arg = (struct timer_arg){ .thread = l_thread, .callback = a_callback, + .callback_arg = a_callback_arg, + .oneshot = a_oneshot, .priority = a_priority }; dap_timerfd_start_on_worker(l_worker, a_timeout_ms, s_timer_callback, l_timer_arg); return 0; } diff --git a/io/include/dap_proc_thread.h b/io/include/dap_proc_thread.h index e7d357bbb9a9483ade713aaedd68906f16999954..0ba6823228090592d8a4733fe4dfa0eb7f6faebe 100644 --- a/io/include/dap_proc_thread.h +++ b/io/include/dap_proc_thread.h @@ -72,9 +72,9 @@ DAP_STATIC_INLINE int dap_proc_thread_callback_add(dap_proc_thread_t *a_thread, { return dap_proc_thread_callback_add_pri(a_thread, a_callback, a_callback_arg, DAP_QUEUE_MSG_PRIORITY_NORMAL); } -int dap_proc_thread_timer_add_pri(dap_proc_thread_t *a_thread, dap_thread_timer_callback_t a_callback, void *a_callback_arg, uint64_t a_timeout_ms, dap_queue_msg_priority_t a_priority); +int dap_proc_thread_timer_add_pri(dap_proc_thread_t *a_thread, dap_thread_timer_callback_t a_callback, void *a_callback_arg, uint64_t a_timeout_ms, bool a_oneshot, dap_queue_msg_priority_t a_priority); DAP_STATIC_INLINE int dap_proc_thread_timer_add(dap_proc_thread_t *a_thread, dap_thread_timer_callback_t a_callback, void *a_callback_arg, uint64_t a_timeout_ms) { - return dap_proc_thread_timer_add_pri(a_thread, a_callback, a_callback_arg, a_timeout_ms, DAP_QUEUE_MSG_PRIORITY_NORMAL); + return dap_proc_thread_timer_add_pri(a_thread, a_callback, a_callback_arg, a_timeout_ms, false, DAP_QUEUE_MSG_PRIORITY_NORMAL); } size_t dap_proc_thread_get_avg_queue_size(); diff --git a/net/app-cli/dap_app_cli.c b/net/app-cli/dap_app_cli.c index 34866b20ac0210f3a541531f701c40a10a5531c2..c7619ed096f8b3244231dbb0e6d565dbcd1d8f33 100644 --- a/net/app-cli/dap_app_cli.c +++ b/net/app-cli/dap_app_cli.c @@ -34,10 +34,10 @@ #include "dap_app_cli.h" #include "dap_app_cli_net.h" #include "dap_app_cli_shell.h" - -#ifdef DAP_OS_ANDROID #include "dap_json_rpc_params.h" #include "dap_json_rpc_request.h" + +#ifdef DAP_OS_ANDROID #include <android/log.h> #include <jni.h> static dap_config_t *cli_config; @@ -154,6 +154,28 @@ static int shell_reader_loop() return 0; } + +char *dap_cli_exec(int argc, char **argv) { + + dap_app_cli_cmd_state_t cmd = { + .cmd_name = (char*)argv[0], + .cmd_param_count = argc - 2, + .cmd_param = argc - 2 > 0 ? (char**)(argv + 1) : NULL + }; + + char *l_cmd_str = dap_app_cli_form_command(&cmd); + dap_json_rpc_params_t *params = dap_json_rpc_params_create(); + dap_json_rpc_params_add_data(params, l_cmd_str, TYPE_PARAM_STRING); + DAP_DELETE(l_cmd_str); + dap_json_rpc_request_t *a_request = dap_json_rpc_request_creation(cmd.cmd_name, params, 0); + char *req_str = dap_json_rpc_request_to_json_string(a_request), + *res = dap_cli_cmd_exec(req_str); + dap_json_rpc_request_free(a_request); + return res; + + +} + #ifdef DAP_OS_ANDROID JNIEXPORT jstring JNICALL Java_com_CellframeWallet_Node_cellframeNodeCliMain(JNIEnv *javaEnv, jobject __unused jobj, jobjectArray argvStr) { diff --git a/net/app-cli/dap_app_cli_net.c b/net/app-cli/dap_app_cli_net.c index 5a043205e20c407a3c895479db1ae255972e391a..a23593c6cc7ef51a61da78e5983555c63d9da26e 100644 --- a/net/app-cli/dap_app_cli_net.c +++ b/net/app-cli/dap_app_cli_net.c @@ -121,9 +121,7 @@ dap_app_cli_connect_param_t dap_app_cli_connect() if (l_addr) { l_addrs[0] = NULL; dap_config_get_item_str_path_array_free(l_addrs, &l_array_count); -#ifdef DAP_OS_WINDOWS - printf("Unix socket-based server is not yet implemented, consider localhost usage\n"); // TODO - return ~0; +#if defined(DAP_OS_WINDOWS) || defined(DAP_OS_ANDROID) #else if ( -1 == (l_socket = socket(AF_UNIX, SOCK_STREAM, 0)) ) { printf ("socket() error %d", errno); diff --git a/net/app-cli/include/dap_app_cli.h b/net/app-cli/include/dap_app_cli.h index d6749b8690e8f293cb7836a31d60929e366ea7bc..722c53454d8f6eb98854b75a9eaa4cd6da622410 100644 --- a/net/app-cli/include/dap_app_cli.h +++ b/net/app-cli/include/dap_app_cli.h @@ -42,6 +42,8 @@ typedef struct dap_app_cli_cmd_state { extern "C" { #endif int dap_app_cli_main(const char *a_app_name, int argc, const char **argv); +char *dap_cli_exec(int argc, char **argv); + #ifdef __cplusplus } #endif diff --git a/net/client/dap_client_pvt.c b/net/client/dap_client_pvt.c index 12df95cb8f6ecfa954aaa7dc1ea46b732cba556a..864b22daa88d686ce243ae3cd9cc171a2dbdf649 100644 --- a/net/client/dap_client_pvt.c +++ b/net/client/dap_client_pvt.c @@ -144,7 +144,9 @@ static void s_client_internal_clean(dap_client_pvt_t *a_client_pvt) } if (a_client_pvt->stream_es) { dap_stream_delete_unsafe(a_client_pvt->stream); - a_client_pvt->stream = a_client_pvt->stream_es = a_client_pvt->stream_key = NULL; + a_client_pvt->stream = NULL; + a_client_pvt->stream_es = NULL; + a_client_pvt->stream_key = NULL; a_client_pvt->stream_id = 0; } @@ -1335,16 +1337,16 @@ static void s_stream_es_callback_read(dap_events_socket_t * a_es, void * arg) l_client_pvt->stage_status = STAGE_STATUS_DONE; s_stage_status_after(l_client_pvt); - dap_stream_data_proc_read(l_client_pvt->stream); - dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size); + size_t l_bytes_read = dap_stream_data_proc_read(l_client_pvt->stream); + dap_events_socket_shrink_buf_in(a_es, l_bytes_read); } } } } break; case STAGE_STREAM_STREAMING: { // if streaming - process data with stream processor - dap_stream_data_proc_read(l_client_pvt->stream); - dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size); + size_t l_bytes_read = dap_stream_data_proc_read(l_client_pvt->stream); + dap_events_socket_shrink_buf_in(a_es, l_bytes_read); } break; default: { diff --git a/net/link_manager/dap_link_manager.c b/net/link_manager/dap_link_manager.c index 38fafff08c803f183a88740d5fdcbbaab8319917..04467ee8bf1121ea77c94a1153b06db8f9575e43 100644 --- a/net/link_manager/dap_link_manager.c +++ b/net/link_manager/dap_link_manager.c @@ -50,8 +50,8 @@ typedef struct dap_managed_net { static bool s_debug_more = false; static const char *s_init_error = "Link manager not inited"; -static uint32_t s_timer_update_states = 2000; -static uint32_t s_max_attempts_num = 3; +static uint32_t s_timer_update_states = 5000; +static uint32_t s_max_attempts_num = 1; static uint32_t s_reconnect_delay = 20; // sec static dap_link_manager_t *s_link_manager = NULL; static dap_proc_thread_t *s_query_thread = NULL; @@ -167,15 +167,24 @@ DAP_STATIC_INLINE void s_debug_accounting_link_in_net(bool a_uplink, dap_stream_ DAP_STATIC_INLINE void s_link_manager_print_links_info(dap_link_manager_t *a_link_manager) { dap_link_t *l_link = NULL, *l_tmp = NULL; - printf("| Uplink |\tNode addr\t|Active Clusters|Static clusters|\n" + dap_string_t *l_report = dap_string_new("\n| Uplink |\tNode addr\t|Active Clusters|Static clusters|\tNet IDs\t\n" "-----------------------------------------------------------------\n"); - HASH_ITER(hh, a_link_manager->links, l_link, l_tmp) - printf("| %5s |"NODE_ADDR_FP_STR"|\t%"DAP_UINT64_FORMAT_U - "\t|\t%"DAP_UINT64_FORMAT_U"\t|\n", + HASH_ITER(hh, a_link_manager->links, l_link, l_tmp) { + dap_string_append_printf(l_report, "| %5s |"NODE_ADDR_FP_STR"|\t%"DAP_UINT64_FORMAT_U + "\t|\t%"DAP_UINT64_FORMAT_U"\t| ", l_link->is_uplink ? "True" : "False", NODE_ADDR_FP_ARGS_S(l_link->addr), dap_list_length(l_link->active_clusters), dap_list_length(l_link->static_clusters)); + dap_list_t *it, *tmp; + DL_FOREACH_SAFE(l_link->uplink.associated_nets, it, tmp) { + dap_managed_net_t *l_net = it->data; + dap_string_append_printf(l_report, " %"DAP_UINT64_FORMAT_x, l_net->id); + } + dap_string_append_printf(l_report, "%s", "\n"); + } + log_it(L_DEBUG, "%s", l_report->str); + dap_string_free(l_report, true); } // General functional @@ -400,7 +409,6 @@ void dap_link_manager_set_net_condition(uint64_t a_net_id, bool a_new_condition) } if (a_new_condition) return; - l_net->uplinks = 0; pthread_rwlock_wrlock(&s_link_manager->links_lock); dap_link_t *l_link_it, *l_link_tmp; HASH_ITER(hh, s_link_manager->links, l_link_it, l_link_tmp) { @@ -514,7 +522,6 @@ void s_link_drop(dap_link_t *a_link, bool a_disconnected) if (l_is_permanent_link) continue; DL_DELETE(a_link->uplink.associated_nets, it); - l_net->uplinks--; } } if (!a_link->active_clusters && !a_link->uplink.associated_nets && !a_link->static_clusters) { @@ -713,9 +720,11 @@ void s_links_request(dap_link_manager_t *a_link_manager) dap_list_t *l_item = NULL; DL_FOREACH(a_link_manager->nets, l_item) { dap_managed_net_t *l_net = (dap_managed_net_t *)l_item->data; - if (l_net->active && a_link_manager->callbacks.link_request && - l_net->uplinks < l_net->min_links_num) - a_link_manager->callbacks.link_request(l_net->id); + if (l_net->active ) { + l_net->uplinks = dap_link_manager_links_count(l_net->id); + if (a_link_manager->callbacks.link_request && l_net->uplinks < l_net->min_links_num) + a_link_manager->callbacks.link_request(l_net->id); + } } } @@ -760,7 +769,9 @@ static dap_link_t *s_link_manager_link_create(dap_stream_node_addr_t *a_node_add l_link->addr.uint64 = a_node_addr->uint64; l_link->link_manager = s_link_manager; HASH_ADD(hh, s_link_manager->links, addr, sizeof(*a_node_addr), l_link); - } + } + if (s_debug_more) + s_link_manager_print_links_info(s_link_manager); if (a_with_client) { if (!l_link->uplink.client) l_link->uplink.client = dap_client_new(s_client_error_callback, NULL); @@ -778,11 +789,8 @@ static dap_link_t *s_link_manager_link_create(dap_stream_node_addr_t *a_node_add return NULL; } l_link->uplink.associated_nets = dap_list_append(l_link->uplink.associated_nets, l_net); - l_net->uplinks++; } } - if (s_debug_more) - s_link_manager_print_links_info(s_link_manager); return l_link; } @@ -1120,7 +1128,6 @@ static bool s_link_accounting_callback(void *a_arg) } } l_link->uplink.associated_nets = dap_list_remove(l_link->uplink.associated_nets, l_net); - l_net->uplinks--; if (l_link->uplink.client && !l_link->uplink.associated_nets && !l_link->static_clusters) s_link_delete(&l_link, false, false); } @@ -1234,7 +1241,7 @@ void dap_link_manager_remove_static_links_cluster(dap_cluster_member_t *a_member * @param a_downlinks_count output count of finded downlinks * @return pointer to dap_stream_node_addr_t array, first uplinks, second downlinks, or NULL */ -dap_stream_node_addr_t *dap_link_manager_get_net_links_addrs(uint64_t a_net_id, size_t *a_uplinks_count, size_t *a_downlinks_count, bool a_uplinks_only) +dap_stream_node_addr_t *dap_link_manager_get_net_links_addrs(uint64_t a_net_id, size_t *a_uplinks_count, size_t *a_downlinks_count, bool a_established_only) { // sanity check dap_managed_net_t *l_net = s_find_net_by_id(a_net_id); @@ -1255,13 +1262,13 @@ dap_stream_node_addr_t *dap_link_manager_get_net_links_addrs(uint64_t a_net_id, for (size_t i = 0; i < l_cur_count; ++i) { dap_link_t *l_link = NULL; HASH_FIND(hh, s_link_manager->links, l_links_addrs + i, sizeof(l_links_addrs[i]), l_link); - if (!l_link || (l_link->is_uplink && l_link->uplink.state != LINK_STATE_ESTABLISHED)) { + if (!l_link || (l_link->is_uplink && a_established_only && l_link->uplink.state != LINK_STATE_ESTABLISHED)) { continue; } else if (l_link->is_uplink) { // first uplinks, second downlinks l_ret[l_uplinks_count + l_downlinks_count].uint64 = l_ret[l_uplinks_count].uint64; l_ret[l_uplinks_count].uint64 = l_link->addr.uint64; ++l_uplinks_count; - } else if (!a_uplinks_only) { + } else { l_ret[l_uplinks_count + l_downlinks_count].uint64 = l_link->addr.uint64; ++l_downlinks_count; } diff --git a/net/link_manager/include/dap_link_manager.h b/net/link_manager/include/dap_link_manager.h index 83dbafb82474b93aa415527991ce39341a44f915..6e44860c5b58f19d46cebed981ffca078667e4b8 100644 --- a/net/link_manager/include/dap_link_manager.h +++ b/net/link_manager/include/dap_link_manager.h @@ -111,6 +111,6 @@ size_t dap_link_manager_needed_links_count(uint64_t a_net_id); void dap_link_manager_set_condition(bool a_new_condition); bool dap_link_manager_get_condition(); char *dap_link_manager_get_links_info(); -dap_stream_node_addr_t *dap_link_manager_get_net_links_addrs(uint64_t a_net_id, size_t *a_uplinks_count, size_t *a_downlinks_count, bool a_uplinks_only); +dap_stream_node_addr_t *dap_link_manager_get_net_links_addrs(uint64_t a_net_id, size_t *a_uplinks_count, size_t *a_downlinks_count, bool a_established_only); dap_stream_node_addr_t *dap_link_manager_get_ignored_addrs(size_t *a_ignored_count, uint64_t a_net_id); void dap_link_manager_stream_replace(dap_stream_node_addr_t *a_addr, bool a_new_is_uplink); diff --git a/net/server/http_server/http_client/dap_http_client.c b/net/server/http_server/http_client/dap_http_client.c index ef2d9cf492216645af45df97da1c61a79ac98050..36a5ceab5cf0a1a26d9f33fb866f91a99044dbea 100644 --- a/net/server/http_server/http_client/dap_http_client.c +++ b/net/server/http_server/http_client/dap_http_client.c @@ -313,8 +313,8 @@ void dap_http_client_read( dap_events_socket_t *a_esocket, void *a_arg ) byte_t *l_peol; char *l_cp; - int l_len, l_ret; - size_t read_bytes = 0; + int l_ret; + size_t l_len = 0; dap_http_client_t *l_http_client = DAP_HTTP_CLIENT( a_esocket ); dap_http_url_proc_t *url_proc = NULL; @@ -498,7 +498,7 @@ void dap_http_client_read( dap_events_socket_t *a_esocket, void *a_arg ) break; } } - dap_events_socket_shrink_buf_in( a_esocket, l_len); /* Shrink input buffer over whole HTTP header */ + dap_events_socket_shrink_buf_in(a_esocket, l_len); /* Shrink input buffer over whole HTTP header */ } break; case DAP_HTTP_CLIENT_STATE_DATA:{ @@ -507,8 +507,8 @@ void dap_http_client_read( dap_events_socket_t *a_esocket, void *a_arg ) pthread_rwlock_rdlock(&l_http_client->proc->cache_rwlock); if ( l_http_client->proc->cache == NULL && l_http_client->proc->data_read_callback ) { pthread_rwlock_unlock(&l_http_client->proc->cache_rwlock); - l_http_client->proc->data_read_callback( l_http_client, &read_bytes ); - dap_events_socket_shrink_buf_in( a_esocket, read_bytes ); + l_http_client->proc->data_read_callback( l_http_client, &l_len ); + dap_events_socket_shrink_buf_in( a_esocket, l_len ); } else { pthread_rwlock_unlock(&l_http_client->proc->cache_rwlock); a_esocket->buf_in_size = 0; @@ -519,14 +519,12 @@ void dap_http_client_read( dap_events_socket_t *a_esocket, void *a_arg ) a_esocket->buf_in_size = 0; } break; } // switch - if (l_iter_count++ > 1000) { + if (++l_iter_count > 1000) { log_it(L_ERROR, "Indefinite loop in DAP HTTP client read"); s_report_error_and_restart( a_esocket, l_http_client, Http_Status_LoopDetected ); break; } - } while (a_esocket->buf_in_size); -// log_it( L_DEBUG, "dap_http_client_read...exit" ); -// Sleep(100); + } while (a_esocket->buf_in_size && l_len); } /** diff --git a/net/server/json_rpc/rpc_core/src/dap_json_rpc_response.c b/net/server/json_rpc/rpc_core/src/dap_json_rpc_response.c index 7fe56b2f120c336be63a65ceaad8f4921e09e636..c12d6fbab93ff84fa95beac3630ff67da8b2c994 100644 --- a/net/server/json_rpc/rpc_core/src/dap_json_rpc_response.c +++ b/net/server/json_rpc/rpc_core/src/dap_json_rpc_response.c @@ -1,6 +1,7 @@ #include "dap_json_rpc_response.h" #define LOG_TAG "dap_json_rpc_response" +#define INDENTATION_LEVEL " " dap_json_rpc_response_t *dap_json_rpc_response_init() { @@ -182,7 +183,7 @@ void json_print_object(json_object *obj, int indent_level) { case json_type_object: { json_object_object_foreach(obj, key, val) { for (int i = 0; i <= indent_level; i++) { - printf(" "); // indentation level + printf(INDENTATION_LEVEL); // indentation level } printf("%s: ", key); json_print_value(val, key, indent_level + 1, false); @@ -193,10 +194,13 @@ void json_print_object(json_object *obj, int indent_level) { case json_type_array: { int length = json_object_array_length(obj); for (int i = 0; i < length; i++) { + for (int j = 0; j <= indent_level; j++) { + printf(INDENTATION_LEVEL); // indentation level + } json_object *item = json_object_array_get_idx(obj, i); json_print_value(item, NULL, indent_level + 1, length - 1 - i); + printf("\n"); } - printf("\n"); break; } default: diff --git a/net/server/notify_server/include/dap_notify_srv.h b/net/server/notify_server/include/dap_notify_srv.h index 43904e1882300fbd6d44fb8320425c11187b49b5..7041fac8d1d63ff57820990f5150203bae328498 100644 --- a/net/server/notify_server/include/dap_notify_srv.h +++ b/net/server/notify_server/include/dap_notify_srv.h @@ -31,4 +31,7 @@ DAP_PRINTF_ATTR(2, 3) int dap_notify_server_send_f_inter(uint32_t a_worker_id, c int dap_notify_server_send_mt(const char * a_data); DAP_PRINTF_ATTR(1, 2) int dap_notify_server_send_f_mt(const char *a_format, ...); +typedef bool (*dap_notify_data_user_callback_t)(const char *data); +void dap_notify_data_set_user_callback(dap_notify_data_user_callback_t callback); + void dap_notify_srv_set_callback_new(dap_events_socket_callback_t); \ No newline at end of file diff --git a/net/server/notify_server/src/dap_notify_srv.c b/net/server/notify_server/src/dap_notify_srv.c index 6f305a9ae671d9579e0fa750d5c36ee05410a45b..b894b2216091e941607f339312eece1a67544c7b 100644 --- a/net/server/notify_server/src/dap_notify_srv.c +++ b/net/server/notify_server/src/dap_notify_srv.c @@ -48,9 +48,15 @@ pthread_rwlock_t s_notify_server_clients_mutex = PTHREAD_RWLOCK_INITIALIZER; static void s_notify_server_callback_queue(dap_events_socket_t * a_es, void * a_arg); static void s_notify_server_callback_new(dap_events_socket_t * a_es, void * a_arg); static void s_notify_server_callback_delete(dap_events_socket_t * a_es, void * a_arg); - +static dap_notify_data_user_callback_t s_notify_data_user_callback = NULL; dap_events_socket_callback_t s_notify_server_callback_new_ex = NULL; +void dap_notify_data_set_user_callback(dap_notify_data_user_callback_t callback) +{ + s_notify_data_user_callback = callback; +} + + void dap_notify_srv_set_callback_new(dap_events_socket_callback_t a_cb) { s_notify_server_callback_new_ex = a_cb; } @@ -144,6 +150,7 @@ int dap_notify_server_send_f_inter(uint32_t a_worker_id, const char * a_format,. */ int dap_notify_server_send_mt(const char *a_data) { + if (s_notify_data_user_callback) s_notify_data_user_callback(a_data); if(!s_notify_server_queue) // If not initialized - nothing to notify return 0; return dap_events_socket_queue_ptr_send(s_notify_server_queue, dap_strdup(a_data)); @@ -157,8 +164,9 @@ int dap_notify_server_send_mt(const char *a_data) */ int dap_notify_server_send_f_mt(const char *a_format, ...) { - if(!s_notify_server_queue) // If not initialized - nothing to notify + if (!s_notify_data_user_callback && s_notify_server_queue) return 0; + va_list ap, ap_copy; va_start(ap, a_format); va_copy(ap_copy, ap); @@ -178,6 +186,13 @@ int dap_notify_server_send_f_mt(const char *a_format, ...) } vsprintf(l_str, a_format, ap_copy); va_end(ap_copy); + + if (s_notify_data_user_callback) s_notify_data_user_callback(l_str); + + + if(!s_notify_server_queue) // If not initialized - nothing to notify + return 0; + int l_ret = dap_events_socket_queue_ptr_send(s_notify_server_queue, l_str); DAP_DELETE(l_str); return l_ret; diff --git a/net/stream/ch/dap_stream_ch_gossip.c b/net/stream/ch/dap_stream_ch_gossip.c index 82d8ddc149257d5a0333abd984b373d0cd19732d..592776acc45cd1ea9bff9fb804b7dd879db8dd0f 100644 --- a/net/stream/ch/dap_stream_ch_gossip.c +++ b/net/stream/ch/dap_stream_ch_gossip.c @@ -183,13 +183,18 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) HASH_VALUE(l_payload_hash, sizeof(dap_hash_t), l_hash_value); pthread_rwlock_wrlock(&s_gossip_lock); HASH_FIND_BYHASHVALUE(hh, s_gossip_last_msgs, l_ch_pkt->data, sizeof(dap_hash_t), l_hash_value, l_msg_item); - if (l_msg_item && l_msg_item->with_payload && l_ch_pkt->hdr.type == DAP_STREAM_CH_GOSSIP_MSG_TYPE_REQUEST) { - debug_if(s_debug_more, L_INFO, "OUT: GOSSIP_DATA packet for hash %s", dap_hash_fast_to_str_static((dap_hash_fast_t *)&l_ch_pkt->data)); - // Send data associated with this hash by request - dap_gossip_msg_t *l_msg = (dap_gossip_msg_t *)l_msg_item->message; - dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_GOSSIP_MSG_TYPE_DATA, l_msg, dap_gossip_msg_get_size(l_msg)); - } - if (!l_msg_item && l_ch_pkt->hdr.type == DAP_STREAM_CH_GOSSIP_MSG_TYPE_HASH) { + if (l_msg_item) { + if (l_msg_item->timestamp < dap_nanotime_now() - DAP_GOSSIP_LIFETIME * 1000000000UL) { + debug_if(s_debug_more, L_INFO, "Packet for hash %s is derelict", dap_hash_fast_to_str_static(l_payload_hash)); + HASH_DEL(s_gossip_last_msgs, l_msg_item); + DAP_DELETE(l_msg_item); + } else if (l_msg_item->with_payload && l_ch_pkt->hdr.type == DAP_STREAM_CH_GOSSIP_MSG_TYPE_REQUEST) { + debug_if(s_debug_more, L_INFO, "OUT: GOSSIP_DATA packet for hash %s", dap_hash_fast_to_str_static((dap_hash_fast_t *)&l_ch_pkt->data)); + // Send data associated with this hash by request + dap_gossip_msg_t *l_msg = (dap_gossip_msg_t *)l_msg_item->message; + dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_GOSSIP_MSG_TYPE_DATA, l_msg, dap_gossip_msg_get_size(l_msg)); + } + } else if (l_ch_pkt->hdr.type == DAP_STREAM_CH_GOSSIP_MSG_TYPE_HASH) { struct gossip_msg_item *l_item_new = DAP_NEW_Z(struct gossip_msg_item); if (!l_item_new) { log_it(L_CRITICAL, "%s", c_error_memory_alloc); @@ -234,7 +239,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) debug_if(s_debug_more, L_INFO, "IN: GOSSIP_DATA packet for hash %s", dap_hash_fast_to_str_static(&l_msg->payload_hash)); unsigned l_hash_value = 0; HASH_VALUE(&l_msg->payload_hash, sizeof(dap_hash_t), l_hash_value); - struct gossip_msg_item *l_payload_item = NULL; + struct gossip_msg_item *l_payload_item = NULL, *l_payload_item_new; pthread_rwlock_wrlock(&s_gossip_lock); HASH_FIND_BYHASHVALUE(hh, s_gossip_last_msgs, &l_msg->payload_hash, sizeof(dap_hash_t), l_hash_value, l_payload_item); if (!l_payload_item || l_payload_item->with_payload) { @@ -242,6 +247,12 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) pthread_rwlock_unlock(&s_gossip_lock); break; } + if (l_payload_item->timestamp < dap_nanotime_now() - DAP_GOSSIP_LIFETIME * 1000000000UL) { + HASH_DEL(s_gossip_last_msgs, l_payload_item); + DAP_DELETE(l_payload_item); + pthread_rwlock_unlock(&s_gossip_lock); + break; + } dap_cluster_t *l_links_cluster = dap_cluster_find(l_msg->cluster_id); if (l_links_cluster) { dap_cluster_member_t *l_check = dap_cluster_member_find_unsafe(l_links_cluster, &a_ch->stream->node); @@ -267,14 +278,16 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) pthread_rwlock_unlock(&s_gossip_lock); break; } - HASH_DEL(s_gossip_last_msgs, l_payload_item); size_t l_payload_item_size = dap_gossip_msg_get_size(l_msg) + sizeof(g_node_addr) + sizeof(struct gossip_msg_item); - l_payload_item = DAP_REALLOC(l_payload_item, l_payload_item_size); - if (!l_payload_item) { + HASH_DEL(s_gossip_last_msgs, l_payload_item); + l_payload_item_new = DAP_REALLOC(l_payload_item, l_payload_item_size); + if (!l_payload_item_new) { log_it(L_CRITICAL, "%s", c_error_memory_alloc); pthread_rwlock_unlock(&s_gossip_lock); break; } + l_payload_item = l_payload_item_new; + HASH_ADD_BYHASHVALUE(hh, s_gossip_last_msgs, payload_hash, sizeof(dap_hash_t), l_hash_value, l_payload_item); l_payload_item->with_payload = true; // Copy message and append g_node_addr to pathtrace dap_gossip_msg_t *l_msg_new = (dap_gossip_msg_t *)l_payload_item->message; @@ -282,8 +295,6 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) l_msg_new->trace_len = l_msg->trace_len + sizeof(g_node_addr); *(dap_stream_node_addr_t *)(l_msg_new->trace_n_payload + l_msg->trace_len) = g_node_addr; memcpy(l_msg_new->trace_n_payload + l_msg_new->trace_len, l_msg->trace_n_payload + l_msg->trace_len, l_msg->payload_len); - HASH_ADD_BYHASHVALUE(hh, s_gossip_last_msgs, payload_hash, sizeof(dap_hash_t), l_hash_value, l_payload_item); - pthread_rwlock_unlock(&s_gossip_lock); // Broadcast new message debug_if(s_debug_more, L_INFO, "OUT: GOSSIP_HASH broadcast for hash %s", dap_hash_fast_to_str_static(&l_msg_new->payload_hash)); @@ -292,6 +303,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) &l_msg_new->payload_hash, sizeof(dap_hash_t), (dap_stream_node_addr_t *)l_msg_new->trace_n_payload, l_msg_new->trace_len / sizeof(dap_stream_node_addr_t)); + pthread_rwlock_unlock(&s_gossip_lock); // Call back the payload func if any struct gossip_callback *l_callback = s_get_callbacks_by_ch_id(l_msg->payload_ch_id); if (!l_callback) { diff --git a/net/stream/stream/dap_stream.c b/net/stream/stream/dap_stream.c index c7eb83b8981630207786d7c16174c9dd073fd545..6493ed7d2d451537d826b0484d4f412b4e49a92a 100644 --- a/net/stream/stream/dap_stream.c +++ b/net/stream/stream/dap_stream.c @@ -81,7 +81,7 @@ static dap_enc_key_type_t s_stream_get_preferred_encryption_type = DAP_ENC_KEY static int s_add_stream_info(authorized_stream_t **a_hash_table, authorized_stream_t *a_item, dap_stream_t *a_stream); -static void s_stream_proc_pkt_in(dap_stream_t * a_stream, dap_stream_pkt_t *l_pkt, size_t l_pkt_size); +static void s_stream_proc_pkt_in(dap_stream_t * a_stream, dap_stream_pkt_t *l_pkt); // Callbacks for HTTP client static void s_http_client_headers_read(dap_http_client_t * a_http_client, void * a_arg); // Prepare stream when all headers are read @@ -429,7 +429,6 @@ void dap_stream_delete_unsafe(dap_stream_t *a_stream) s_stream_delete_from_list(a_stream); DAP_DEL_Z(a_stream->buf_fragments); - DAP_DEL_Z(a_stream->pkt_buf_in); DAP_DELETE(a_stream); log_it(L_NOTICE,"Stream connection is over"); } @@ -676,96 +675,38 @@ static void s_http_client_delete(dap_http_client_t * a_http_client, void *a_arg) size_t dap_stream_data_proc_read (dap_stream_t *a_stream) { dap_return_val_if_fail(a_stream && a_stream->esocket && a_stream->esocket->buf_in, 0); - - byte_t *l_buf_in = a_stream->esocket->buf_in; - size_t l_buf_in_size = a_stream->esocket->buf_in_size; - - // Save the received data to stream memory - if (!a_stream->pkt_buf_in) { - a_stream->pkt_buf_in = DAP_DUP_SIZE(l_buf_in, l_buf_in_size); - a_stream->pkt_buf_in_data_size = l_buf_in_size; - } else { - debug_if(s_dump_packet_headers, L_DEBUG, "dap_stream_data_proc_read() Receive previously unprocessed data %zu bytes + new %zu bytes", - a_stream->pkt_buf_in_data_size, l_buf_in_size); - // The current data is added to rest of the previous package - a_stream->pkt_buf_in = DAP_REALLOC(a_stream->pkt_buf_in, a_stream->pkt_buf_in_data_size + l_buf_in_size); - memcpy((byte_t*)a_stream->pkt_buf_in + a_stream->pkt_buf_in_data_size, l_buf_in, l_buf_in_size); - // Increase the size of pkt_buf_in - a_stream->pkt_buf_in_data_size += l_buf_in_size; - } - // Switch to stream memory - l_buf_in = (byte_t*) a_stream->pkt_buf_in; - l_buf_in_size = a_stream->pkt_buf_in_data_size; - size_t l_buf_in_left = l_buf_in_size; - - dap_stream_pkt_t *l_pkt = NULL; - if(l_buf_in_left >= sizeof(dap_stream_pkt_hdr_t)) { - // Now lets see how many packets we have in buffer now - while(l_buf_in_left > 0 && (l_pkt = dap_stream_pkt_detect(l_buf_in, l_buf_in_left))) { // Packet signature detected - if(l_pkt->hdr.size > DAP_STREAM_PKT_SIZE_MAX) { - log_it(L_ERROR, "dap_stream_data_proc_read() Too big packet size %u, drop %zu bytes", l_pkt->hdr.size, l_buf_in_left); - // Skip this packet - l_buf_in_left = 0; - break; - } - - size_t l_pkt_offset = (((uint8_t*) l_pkt) - l_buf_in); - l_buf_in += l_pkt_offset; - l_buf_in_left -= l_pkt_offset; - - size_t l_pkt_size = l_pkt->hdr.size + sizeof(dap_stream_pkt_hdr_t); - - //log_it(L_DEBUG, "read packet offset=%zu size=%zu buf_in_left=%zu)",l_pkt_offset, l_pkt_size, l_buf_in_left); - - // Got the whole package - if(l_buf_in_left >= l_pkt_size) { - // Process data - s_stream_proc_pkt_in(a_stream, (dap_stream_pkt_t*) l_pkt, l_pkt_size); - // Go to the next data - l_buf_in += l_pkt_size; - l_buf_in_left -= l_pkt_size; - } else { - debug_if(s_dump_packet_headers,L_DEBUG, "Input: Not all stream packet in input (pkt_size=%zu buf_in_left=%zu)", l_pkt_size, l_buf_in_left); + byte_t *l_pos = a_stream->esocket->buf_in, *l_end = l_pos + a_stream->esocket->buf_in_size; + size_t l_shift = 0, l_processed_size = 0; + while ( l_pos < l_end && (l_pos = memchr( l_pos, c_dap_stream_sig[0], (size_t)(l_end - l_pos))) ) { + if ( (size_t)(l_end - l_pos) < sizeof(dap_stream_pkt_hdr_t) ) + break; + if ( !memcmp(l_pos, c_dap_stream_sig, sizeof(c_dap_stream_sig)) ) { + dap_stream_pkt_t *l_pkt = (dap_stream_pkt_t*)l_pos; + if (l_pkt->hdr.size > DAP_STREAM_PKT_SIZE_MAX) { + log_it(L_ERROR, "Invalid packet size %lu, dump it", l_pkt->hdr.size); + l_shift = sizeof(dap_stream_pkt_hdr_t); + } else if ( (l_shift = sizeof(dap_stream_pkt_hdr_t) + l_pkt->hdr.size) <= (size_t)(l_end - l_pos) ) { + debug_if(s_dump_packet_headers, L_DEBUG, "Processing full packet, size %lu", l_shift); + s_stream_proc_pkt_in(a_stream, l_pkt); + } else break; - } - } - } - - if(l_buf_in_left > 0) { - // Save the received data to stream memory for the next piece of data - if(!l_pkt) { - // pkt header not found, maybe l_buf_in_left is too small to detect pkt header, will do that next time - l_pkt = (dap_stream_pkt_t*) l_buf_in; - debug_if(s_dump_packet_headers, L_DEBUG, "dap_stream_data_proc_read() left unprocessed data %zu bytes, l_pkt=0", l_buf_in_left); - } - if(l_pkt) { - a_stream->pkt_buf_in_data_size = l_buf_in_left; - if(l_pkt != a_stream->pkt_buf_in){ - memmove(a_stream->pkt_buf_in, l_pkt, a_stream->pkt_buf_in_data_size); - //log_it(L_DEBUG, "dap_stream_data_proc_read() l_pkt=%zu != a_stream->pkt_buf_in=%zu", l_pkt, a_stream->pkt_buf_in); - } - - debug_if(s_dump_packet_headers,L_DEBUG, "dap_stream_data_proc_read() left unprocessed data %zu bytes", l_buf_in_left); - } - else { - log_it(L_ERROR, "dap_stream_data_proc_read() pkt header not found, drop %zu bytes", l_buf_in_left); - DAP_DEL_Z(a_stream->pkt_buf_in); - a_stream->pkt_buf_in_data_size = 0; - } - } - else { - DAP_DEL_Z(a_stream->pkt_buf_in); - a_stream->pkt_buf_in_data_size = 0; + l_pos += l_shift; + l_processed_size += l_shift; + } else + ++l_pos; } - return a_stream->esocket->buf_in_size; //a_stream->conn->buf_in_size; + debug_if( s_dump_packet_headers && l_processed_size, L_DEBUG, "Processed %lu / %lu bytes", + l_processed_size, (size_t)(l_end - a_stream->esocket->buf_in) ); + return l_processed_size; } /** * @brief stream_proc_pkt_in * @param sid */ -static void s_stream_proc_pkt_in(dap_stream_t * a_stream, dap_stream_pkt_t *a_pkt, size_t a_pkt_size) +static void s_stream_proc_pkt_in(dap_stream_t * a_stream, dap_stream_pkt_t *a_pkt) { + size_t a_pkt_size = sizeof(dap_stream_pkt_hdr_t) + a_pkt->hdr.size; bool l_is_clean_fragments = false; a_stream->is_active = true; diff --git a/net/stream/stream/dap_stream_pkt.c b/net/stream/stream/dap_stream_pkt.c index fc3f08d9958203aebb46ec4e7d372c3649c68584..9d482f34dd732d46bf0bf5d8723595b32b077bdf 100644 --- a/net/stream/stream/dap_stream_pkt.c +++ b/net/stream/stream/dap_stream_pkt.c @@ -33,37 +33,6 @@ const uint8_t c_dap_stream_sig [STREAM_PKT_SIG_SIZE] = {0xa0,0x95,0x96,0xa9,0x9e,0x5c,0xfb,0xfa}; -dap_stream_pkt_t * dap_stream_pkt_detect(void * a_data, size_t data_size) -{ - uint8_t * sig_start=(uint8_t*) a_data; - dap_stream_pkt_t * hpkt = NULL; - size_t length_left = data_size; - - while ( (sig_start = memchr(sig_start, c_dap_stream_sig[0], length_left)) ) { - length_left = data_size - (size_t)(sig_start - (uint8_t *)a_data); - if(length_left < sizeof(c_dap_stream_sig) ) - break; - - if ( !memcmp(sig_start, c_dap_stream_sig, sizeof(c_dap_stream_sig)) ) { - hpkt = (dap_stream_pkt_t *)sig_start; - if (length_left < sizeof(dap_stream_ch_pkt_hdr_t)) { - //log_it(L_ERROR, "Too small packet size %zu", length_left); // it's not an error, just random case - hpkt = NULL; - break; - } - if(hpkt->hdr.size > DAP_STREAM_PKT_SIZE_MAX ){ - log_it(L_ERROR, "Too big packet size %u (%#x), type:%d(%#x)", - hpkt->hdr.size, hpkt->hdr.size, hpkt->hdr.type, hpkt->hdr.type); - hpkt = NULL; - } - break; - } else - sig_start++; - } - - return hpkt; -} - /** * @brief stream_pkt_read * @param sid @@ -94,10 +63,8 @@ size_t dap_stream_pkt_write_unsafe(dap_stream_t *a_stream, uint8_t a_type, const dap_stream_pkt_hdr_t *l_pkt_hdr = (dap_stream_pkt_hdr_t*)s_pkt_buf; *l_pkt_hdr = (dap_stream_pkt_hdr_t) { .size = dap_enc_code( l_key, a_data, a_data_size, s_pkt_buf + sizeof(*l_pkt_hdr), l_full_size - sizeof(*l_pkt_hdr), DAP_ENC_DATA_TYPE_RAW ), - .type = a_type, - .timestamp = dap_time_now(), - .src_addr = g_node_addr.uint64, - .dst_addr = a_stream->node.uint64 }; + .timestamp = dap_time_now(), .type = a_type, + .src_addr = g_node_addr.uint64, .dst_addr = a_stream->node.uint64 }; memcpy(l_pkt_hdr->sig, c_dap_stream_sig, sizeof(l_pkt_hdr->sig)); return dap_events_socket_write_unsafe(a_stream->esocket, s_pkt_buf, l_full_size); } diff --git a/net/stream/stream/include/dap_stream.h b/net/stream/stream/include/dap_stream.h index 888639faeaac0536f40abb6c916da7ae517341d7..416e0e5f39bfdd8ba09516e038080501a1bb7cf9 100644 --- a/net/stream/stream/include/dap_stream.h +++ b/net/stream/stream/include/dap_stream.h @@ -60,11 +60,6 @@ typedef struct dap_stream { char *service_key; bool is_client_to_uplink; - struct dap_stream_pkt *in_pkt; - struct dap_stream_pkt *pkt_buf_in; - size_t pkt_buf_in_data_size; - size_t pkt_buf_in_size_expected; - uint8_t *buf_fragments, *pkt_cache; size_t buf_fragments_size_total;// Full size of all fragments size_t buf_fragments_size_filled;// Received size diff --git a/project.yaml b/project.yaml index d3e939073d8c2b9dbcaf0a3a210973b095fcaa17..0c94ea5f870fa77828d0e01897de7052af9301f3 100644 --- a/project.yaml +++ b/project.yaml @@ -9,4 +9,6 @@ build_dependencies: - 'xsltproc' - 'curl' - 'jq' + - 'cppcheck' + - 'python3-xmltodict' \ No newline at end of file