diff --git a/global-db/dap_global_db_ch.c b/global-db/dap_global_db_ch.c index 1a6b071ca997a58f7aaa08dfadf586727f32eb0e..6d72ec6008c6b8cab523770f805a798286a0b6f4 100644 --- a/global-db/dap_global_db_ch.c +++ b/global-db/dap_global_db_ch.c @@ -34,7 +34,7 @@ along with any DAP SDK based project. If not, see <http://www.gnu.org/licenses/ static void s_stream_ch_new(dap_stream_ch_t *a_ch, void *a_arg); static void s_stream_ch_delete(dap_stream_ch_t *a_ch, void *a_arg); -static void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg); +static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg); static void s_gossip_payload_callback(void *a_payload, size_t a_payload_size, dap_stream_node_addr_t a_sender_addr); /** @@ -302,12 +302,12 @@ static void s_gossip_payload_callback(void *a_payload, size_t a_payload_size, da dap_proc_thread_callback_add_pri(NULL, s_process_record, l_obj, DAP_GLOBAL_DB_TASK_PRIORITY); } -static void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) +static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) { dap_stream_ch_gdb_t *l_ch_gdb = DAP_STREAM_CH_GDB(a_ch); if (!l_ch_gdb || l_ch_gdb->_inheritor != a_ch) { log_it(L_ERROR, "Not valid Global DB channel, returning"); - return; + return false; } dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *)a_arg; switch (l_ch_pkt->hdr.type) { @@ -317,7 +317,7 @@ static void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) if (l_ch_pkt->hdr.data_size < sizeof(dap_global_db_start_pkt_t) || l_ch_pkt->hdr.data_size != dap_global_db_start_pkt_get_size(l_pkt)) { log_it(L_WARNING, "Invalid packet size %u", l_ch_pkt->hdr.data_size); - break; + return false; } debug_if(g_dap_global_db_debug_more, L_INFO, "IN: GLOBAL_DB_SYNC_START packet for group %s", l_pkt->group); @@ -333,15 +333,15 @@ static void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) case DAP_STREAM_CH_GLOBAL_DB_MSG_TYPE_HASHES: case DAP_STREAM_CH_GLOBAL_DB_MSG_TYPE_REQUEST: { - if (l_ch_pkt->hdr.type == DAP_STREAM_CH_GLOBAL_DB_MSG_TYPE_HASHES && - dap_proc_thread_get_avg_queue_size() > DAP_GLOBAL_DB_QUEUE_SIZE_MAX) - break; dap_global_db_hash_pkt_t *l_pkt = (dap_global_db_hash_pkt_t *)l_ch_pkt->data; if (l_ch_pkt->hdr.data_size < sizeof(dap_global_db_hash_pkt_t) || l_ch_pkt->hdr.data_size != dap_global_db_hash_pkt_get_size(l_pkt)) { log_it(L_WARNING, "Invalid packet size %u", l_ch_pkt->hdr.data_size); - break; + return false; } + if (l_ch_pkt->hdr.type == DAP_STREAM_CH_GLOBAL_DB_MSG_TYPE_HASHES && + dap_proc_thread_get_avg_queue_size() > DAP_GLOBAL_DB_QUEUE_SIZE_MAX) + break; debug_if(g_dap_global_db_debug_more, L_INFO, "IN: %s packet for group %s with hashes count %u", l_ch_pkt->hdr.type == DAP_STREAM_CH_GLOBAL_DB_MSG_TYPE_HASHES ? "GLOBAL_DB_HASHES" : "GLOBAL_DB_REQUEST", @@ -371,7 +371,7 @@ static void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) #endif if (!l_objs) { log_it(L_WARNING, "Wrong Global DB record packet rejected"); - break; + return false; } debug_if(g_dap_global_db_debug_more, L_INFO, "IN: GLOBAL_DB_RECORD_PACK packet for group %s with records count %zu", l_objs->group, l_objs_count); @@ -388,8 +388,9 @@ static void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) default: log_it(L_WARNING, "Unknown global DB packet type %hhu", l_ch_pkt->hdr.type); - break; + return false; } + return true; } /** diff --git a/net/stream/ch/dap_stream_ch_gossip.c b/net/stream/ch/dap_stream_ch_gossip.c index eed0d57472442278567240ab70635d37933e7b3c..d1593d73945dc57a49ddc89466fc0e52831c70e6 100644 --- a/net/stream/ch/dap_stream_ch_gossip.c +++ b/net/stream/ch/dap_stream_ch_gossip.c @@ -48,7 +48,7 @@ static struct gossip_msg_item { dap_timerfd_t *s_gossip_timer = NULL; static bool s_callback_hashtable_maintenance(void *a_arg); -static void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg); +static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg); static bool s_debug_more = false; /** * @brief dap_stream_ch_gdb_init @@ -156,7 +156,7 @@ void dap_gossip_msg_issue(dap_cluster_t *a_cluster, const char a_ch_id, const vo &g_node_addr, 1); } -static void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) +static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) { dap_stream_ch_pkt_t *l_ch_pkt = (dap_stream_ch_pkt_t *)a_arg; switch (l_ch_pkt->hdr.type) { @@ -167,7 +167,7 @@ static void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) if (l_ch_pkt->hdr.data_size != sizeof(dap_hash_t)) { log_it(L_WARNING, "Incorrect gossip message data size %u, expected %zu", l_ch_pkt->hdr.data_size, sizeof(dap_hash_t)); - break; + return false; } debug_if(s_debug_more, L_INFO, "IN: %s packet for hash %s", l_ch_pkt->hdr.type == DAP_STREAM_CH_GOSSIP_MSG_TYPE_HASH ? "GOSSIP_HASH" : "GOSSIP_REQUEST", @@ -193,25 +193,25 @@ static void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) if (l_ch_pkt->hdr.data_size < sizeof(dap_gossip_msg_t)) { log_it(L_WARNING, "Incorrect gossip message data size %u, must be at least %zu", l_ch_pkt->hdr.data_size, sizeof(dap_gossip_msg_t)); - break; + return false; } if (l_ch_pkt->hdr.data_size != dap_gossip_msg_get_size(l_msg)) { log_it(L_WARNING, "Incorrect gossip message data size %u, expected %zu", l_ch_pkt->hdr.data_size, dap_gossip_msg_get_size(l_msg)); - break; + return false; } if (l_msg->version != DAP_GOSSIP_CURRENT_VERSION) { log_it(L_ERROR, "Incorrect gossip protocol version %hhu, current version is %u", l_msg->version, DAP_GOSSIP_CURRENT_VERSION); - break; + return false; } if (l_msg->trace_len % sizeof(dap_stream_node_addr_t) != 0) { log_it(L_WARNING, "Unaligned gossip message tracepath size %u", l_msg->trace_len); - break; + return false; } if (!l_msg->payload_len) { log_it(L_WARNING, "Zero size of gossip message payload"); - break; + return false; } debug_if(s_debug_more, L_INFO, "IN: GOSSIP_DATA packet for hash %s", dap_hash_fast_to_str_static(&l_msg->payload_hash)); struct gossip_msg_item *l_item_new = NULL; @@ -275,7 +275,8 @@ static void s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg) default: log_it(L_WARNING, "Unknown gossip packet type %hhu", l_ch_pkt->hdr.type); - break; + return false; } + return true; } diff --git a/net/stream/ch/dap_stream_ch_proc.c b/net/stream/ch/dap_stream_ch_proc.c index 31ce8845d682be8a5db4dd5bd007c3498159f524..cd4e6fe73558ab6442c17d493af64a4c3fec4a76 100644 --- a/net/stream/ch/dap_stream_ch_proc.c +++ b/net/stream/ch/dap_stream_ch_proc.c @@ -70,7 +70,7 @@ void stream_ch_proc_deinit() void dap_stream_ch_proc_add(uint8_t id, dap_stream_ch_callback_t new_callback, dap_stream_ch_callback_t delete_callback, - dap_stream_ch_callback_t packet_in_callback, + dap_stream_ch_read_callback_t packet_in_callback, dap_stream_ch_write_callback_t packet_out_callback) { *(s_proc + id) = (dap_stream_ch_proc_t) { diff --git a/net/stream/ch/include/dap_stream_ch.h b/net/stream/ch/include/dap_stream_ch.h index 3fb18610669bc236c40cbcfc7d97d6d58fbfc697..671b38d9b03fe892f32aa00252e8945f0a1d1a04 100644 --- a/net/stream/ch/include/dap_stream_ch.h +++ b/net/stream/ch/include/dap_stream_ch.h @@ -35,6 +35,7 @@ typedef struct dap_stream_ch_proc dap_dap_stream_ch_proc_t; typedef struct dap_events_socket dap_events_socket_t; typedef void (*dap_stream_ch_callback_t)(dap_stream_ch_t *a_ch, void *a_arg); +typedef bool (*dap_stream_ch_read_callback_t)(dap_stream_ch_t *a_ch, void *a_arg); typedef bool (*dap_stream_ch_write_callback_t)(dap_stream_ch_t *a_ch, void *a_arg); typedef void (*dap_stream_ch_notify_callback_t)(dap_stream_ch_t *a_ch, uint8_t a_type, const void *a_data, size_t a_data_size, void *a_arg); diff --git a/net/stream/ch/include/dap_stream_ch_proc.h b/net/stream/ch/include/dap_stream_ch_proc.h index 85556d74ed5f24825187133b7a9c5de796c3fc2b..e22175ffcb06128503e544c0cb1d9f156efe17be 100644 --- a/net/stream/ch/include/dap_stream_ch_proc.h +++ b/net/stream/ch/include/dap_stream_ch_proc.h @@ -31,7 +31,7 @@ typedef struct dap_stream_ch_proc { dap_stream_ch_callback_t new_callback; dap_stream_ch_callback_t delete_callback; - dap_stream_ch_callback_t packet_in_callback; + dap_stream_ch_read_callback_t packet_in_callback; dap_stream_ch_write_callback_t packet_out_callback; void * internal; } dap_stream_ch_proc_t; @@ -42,7 +42,7 @@ void stream_ch_proc_deinit(); void dap_stream_ch_proc_add(uint8_t id, dap_stream_ch_callback_t new_callback, dap_stream_ch_callback_t delete_callback, - dap_stream_ch_callback_t packet_in_callback, + dap_stream_ch_read_callback_t packet_in_callback, dap_stream_ch_write_callback_t packet_out_callback ); dap_stream_ch_proc_t *dap_stream_ch_proc_find(uint8_t id); diff --git a/net/stream/stream/dap_stream.c b/net/stream/stream/dap_stream.c index 67579c41fffbfe950cf0f66a07133a79fef00f9e..d67185fb1a041b066f172ee49a08f8ad8b0ad6d8 100644 --- a/net/stream/stream/dap_stream.c +++ b/net/stream/stream/dap_stream.c @@ -839,11 +839,11 @@ static void s_stream_proc_pkt_in(dap_stream_t * a_stream, dap_stream_pkt_t *a_pk if(l_ch) { l_ch->stat.bytes_read += l_ch_pkt->hdr.data_size; if(l_ch->proc && l_ch->proc->packet_in_callback) { - l_ch->proc->packet_in_callback(l_ch, l_ch_pkt); + bool l_security_check_passed = l_ch->proc->packet_in_callback(l_ch, l_ch_pkt); debug_if(s_dump_packet_headers, L_INFO, "Income channel packet: id='%c' size=%u type=0x%02X seq_id=0x%016" DAP_UINT64_FORMAT_X" enc_type=0x%02X", (char)l_ch_pkt->hdr.id, l_ch_pkt->hdr.data_size, l_ch_pkt->hdr.type, l_ch_pkt->hdr.seq_id, l_ch_pkt->hdr.enc_type); - for (dap_list_t *it = l_ch->packet_in_notifiers; it; it = it->next) { + for (dap_list_t *it = l_ch->packet_in_notifiers; it && l_security_check_passed; it = it->next) { dap_stream_ch_notifier_t *l_notifier = it->data; assert(l_notifier); l_notifier->callback(l_ch, l_ch_pkt->hdr.type, l_ch_pkt->data, l_ch_pkt->hdr.data_size, l_notifier->arg);