diff --git a/CMakeLists.txt b/CMakeLists.txt index 7436a26467e4b6d6c15b549095ebb956e186c71e..aac8c5f04426ca81f1fac40998d1037ab873d5dd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ project(cellframe-sdk C) cmake_minimum_required(VERSION 3.0) set(CMAKE_C_STANDARD 11) -set(CELLFRAME_SDK_NATIVE_VERSION "2.9-48") +set(CELLFRAME_SDK_NATIVE_VERSION "2.9-49") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") set(DAPSDK_MODULES "") message("Cellframe modules: ${CELLFRAME_MODULES}") diff --git a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c index 9acf7b183e5fb8b068029f6c10eca6dbbcb1e5dd..76a98fe49aa4075d1c360ab49987484931d63b4a 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c @@ -75,6 +75,8 @@ void dap_stream_ch_pkt_deinit() */ size_t dap_stream_ch_pkt_write_f_mt(dap_stream_worker_t * a_worker , dap_stream_ch_uuid_t a_ch_uuid, uint8_t a_type, const char * a_format,...) { + if (!a_worker) + return 0; va_list ap; va_start(ap,a_format); int l_data_size = dap_vsnprintf(NULL,0,a_format,ap); @@ -161,6 +163,8 @@ size_t dap_stream_ch_pkt_write_f_inter(dap_events_socket_t * a_queue , dap_stre */ size_t dap_stream_ch_pkt_write_mt(dap_stream_worker_t * a_worker , dap_stream_ch_uuid_t a_ch_uuid, uint8_t a_type, const void * a_data, size_t a_data_size) { + if (!a_worker) + return 0; dap_stream_worker_msg_io_t * l_msg = DAP_NEW_Z(dap_stream_worker_msg_io_t); l_msg->ch_uuid = a_ch_uuid; l_msg->ch_pkt_type = a_type; diff --git a/modules/global-db/dap_chain_global_db_driver_pgsql.c b/modules/global-db/dap_chain_global_db_driver_pgsql.c index 7bb28cead13b550f53f3d79c02d7d8915b7b3490..e5297dbfa8b747af6d6b007e7186e8bfe7cc60f7 100644 --- a/modules/global-db/dap_chain_global_db_driver_pgsql.c +++ b/modules/global-db/dap_chain_global_db_driver_pgsql.c @@ -53,7 +53,7 @@ static pthread_rwlock_t s_db_rwlock = PTHREAD_RWLOCK_INITIALIZER; static PGconn *s_pgsql_get_connection(void) { - if (pthread_rwlock_rdlock(&s_db_rwlock) == EDEADLK) { + if (pthread_rwlock_wrlock(&s_db_rwlock) == EDEADLK) { return s_trans_conn; } PGconn *l_ret = NULL; @@ -70,7 +70,7 @@ static PGconn *s_pgsql_get_connection(void) static void s_pgsql_free_connection(PGconn *a_conn) { - if (pthread_rwlock_rdlock(&s_db_rwlock) == EDEADLK) { + if (pthread_rwlock_wrlock(&s_db_rwlock) == EDEADLK) { return; } for (int i = 0; i < DAP_PGSQL_POOL_COUNT; i++) { @@ -192,7 +192,7 @@ int dap_db_driver_pgsql_init(const char *a_filename_dir, dap_db_driver_callbacks int dap_db_driver_pgsql_deinit(void) { pthread_rwlock_wrlock(&s_db_rwlock); - for (int j = 0; j <= DAP_PGSQL_POOL_COUNT; j++) + for (int j = 0; j < DAP_PGSQL_POOL_COUNT; j++) PQfinish(s_conn_pool[j].conn); pthread_rwlock_unlock(&s_db_rwlock); pthread_rwlock_destroy(&s_db_rwlock); @@ -207,7 +207,7 @@ int dap_db_driver_pgsql_start_transaction(void) s_trans_conn = s_pgsql_get_connection(); if (!s_trans_conn) return -1; - pthread_rwlock_rdlock(&s_db_rwlock); + pthread_rwlock_wrlock(&s_db_rwlock); PGresult *l_res = PQexec(s_trans_conn, "BEGIN"); if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { log_it(L_ERROR, "Begin transaction failed with message: \"%s\"", PQresultErrorMessage(l_res)); @@ -292,7 +292,7 @@ int dap_db_driver_pgsql_apply_store_obj(dap_store_obj_t *a_store_obj) DAP_DELETE(a_store_obj->value); DAP_DELETE(a_store_obj->key); if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { - if (a_store_obj->type == 'a' && s_pgsql_create_group_table(a_store_obj->group, l_conn) == 0) { + if (s_pgsql_create_group_table(a_store_obj->group, l_conn) == 0) { PQclear(l_res); l_res = PQexecParams(l_conn, l_query_str, 2, NULL, l_param_vals, l_param_lens, l_param_formats, 0); } @@ -313,9 +313,10 @@ int dap_db_driver_pgsql_apply_store_obj(dap_store_obj_t *a_store_obj) // execute delete request l_res = PQexec(l_conn, l_query_str); if (PQresultStatus(l_res) != PGRES_COMMAND_OK) { - if (strcmp(PQresultErrorField(l_res, PG_DIAG_SQLSTATE), PGSQL_INVALID_TABLE)) + if (strcmp(PQresultErrorField(l_res, PG_DIAG_SQLSTATE), PGSQL_INVALID_TABLE)) { log_it(L_ERROR, "Delete object failed with message: \"%s\"", PQresultErrorMessage(l_res)); - l_ret = -4; + l_ret = -4; + } } } else { @@ -375,12 +376,12 @@ dap_store_obj_t *dap_db_driver_pgsql_read_store_obj(const char *a_group, const c } PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1); - s_pgsql_free_connection(l_conn); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { if (strcmp(PQresultErrorField(l_res, PG_DIAG_SQLSTATE), PGSQL_INVALID_TABLE)) log_it(L_ERROR, "Read objects failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); + s_pgsql_free_connection(l_conn); return NULL; } @@ -393,6 +394,7 @@ dap_store_obj_t *dap_db_driver_pgsql_read_store_obj(const char *a_group, const c s_pgsql_fill_object(a_group, l_obj_cur, l_res, i); } PQclear(l_res); + s_pgsql_free_connection(l_conn); if (a_count_out) *a_count_out = l_count; return l_obj; @@ -414,12 +416,12 @@ dap_store_obj_t *dap_db_driver_pgsql_read_last_store_obj(const char *a_group) } char *l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" ORDER BY obj_id DESC LIMIT 1", a_group); PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1); - s_pgsql_free_connection(l_conn); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { if (strcmp(PQresultErrorField(l_res, PG_DIAG_SQLSTATE), PGSQL_INVALID_TABLE)) log_it(L_ERROR, "Read last object failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); + s_pgsql_free_connection(l_conn); return NULL; } dap_store_obj_t *l_obj = NULL; @@ -428,6 +430,7 @@ dap_store_obj_t *dap_db_driver_pgsql_read_last_store_obj(const char *a_group) s_pgsql_fill_object(a_group, l_obj, l_res, 0); } PQclear(l_res); + s_pgsql_free_connection(l_conn); return l_obj; } @@ -456,12 +459,12 @@ dap_store_obj_t *dap_db_driver_pgsql_read_cond_store_obj(const char *a_group, ui l_query_str = dap_strdup_printf("SELECT * FROM \"%s\" WHERE obj_id >= '%"DAP_UINT64_FORMAT_U"' " "ORDER BY obj_id ASC", a_group, a_id); PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1); - s_pgsql_free_connection(l_conn); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { if (strcmp(PQresultErrorField(l_res, PG_DIAG_SQLSTATE), PGSQL_INVALID_TABLE)) log_it(L_ERROR, "Conditional read objects failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); + s_pgsql_free_connection(l_conn); return NULL; } @@ -474,6 +477,7 @@ dap_store_obj_t *dap_db_driver_pgsql_read_cond_store_obj(const char *a_group, ui s_pgsql_fill_object(a_group, l_obj_cur, l_res, i); } PQclear(l_res); + s_pgsql_free_connection(l_conn); if (a_count_out) *a_count_out = l_count; return l_obj; @@ -492,10 +496,10 @@ dap_list_t *dap_db_driver_pgsql_get_groups_by_mask(const char *a_group_mask) const char *l_query_str = "SELECT tablename FROM pg_catalog.pg_tables WHERE " "schemaname != 'information_schema' AND schemaname != 'pg_catalog'"; PGresult *l_res = PQexec(l_conn, l_query_str); - s_pgsql_free_connection(l_conn); if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { log_it(L_ERROR, "Read tables failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); + s_pgsql_free_connection(l_conn); return NULL; } @@ -506,6 +510,7 @@ dap_list_t *dap_db_driver_pgsql_get_groups_by_mask(const char *a_group_mask) l_ret_list = dap_list_prepend(l_ret_list, dap_strdup(l_table_name)); } PQclear(l_res); + s_pgsql_free_connection(l_conn); return l_ret_list; } @@ -521,16 +526,17 @@ size_t dap_db_driver_pgsql_read_count_store(const char *a_group, uint64_t a_id) char *l_query_str = dap_strdup_printf("SELECT count(*) FROM \"%s\" WHERE obj_id >= '%"DAP_UINT64_FORMAT_U"'", a_group, a_id); PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1); - s_pgsql_free_connection(l_conn); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { if (strcmp(PQresultErrorField(l_res, PG_DIAG_SQLSTATE), PGSQL_INVALID_TABLE)) log_it(L_ERROR, "Count objects failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); + s_pgsql_free_connection(l_conn); return 0; } size_t l_ret = be64toh(*(uint64_t *)PQgetvalue(l_res, 0, 0)); PQclear(l_res); + s_pgsql_free_connection(l_conn); return l_ret; } @@ -545,16 +551,17 @@ bool dap_db_driver_pgsql_is_obj(const char *a_group, const char *a_key) } char *l_query_str = dap_strdup_printf("SELECT EXISTS(SELECT * FROM \"%s\" WHERE obj_key = '%s')", a_group, a_key); PGresult *l_res = PQexecParams(l_conn, l_query_str, 0, NULL, NULL, NULL, NULL, 1); - s_pgsql_free_connection(l_conn); DAP_DELETE(l_query_str); if (PQresultStatus(l_res) != PGRES_TUPLES_OK) { if (strcmp(PQresultErrorField(l_res, PG_DIAG_SQLSTATE), PGSQL_INVALID_TABLE)) log_it(L_ERROR, "Existance check of object failed with message: \"%s\"", PQresultErrorMessage(l_res)); PQclear(l_res); + s_pgsql_free_connection(l_conn); return 0; } int l_ret = *PQgetvalue(l_res, 0, 0); PQclear(l_res); + s_pgsql_free_connection(l_conn); return l_ret; } diff --git a/modules/net/dap_chain_net.c b/modules/net/dap_chain_net.c index 493be4d7d1518c8b309ddf9c8bebe56e852ec2f9..cebea33c10f6bbe4a8bbc21e5a0490e60a78f951 100644 --- a/modules/net/dap_chain_net.c +++ b/modules/net/dap_chain_net.c @@ -164,8 +164,11 @@ typedef struct dap_chain_net_pvt{ uint16_t gdb_sync_groups_count; uint16_t gdb_sync_nodes_addrs_count; + uint16_t gdb_sync_nodes_links_count; char **gdb_sync_groups; dap_chain_node_addr_t *gdb_sync_nodes_addrs; + uint32_t *gdb_sync_nodes_links_ips; + uint16_t *gdb_sync_nodes_links_ports; uint16_t seed_aliases_count; @@ -344,7 +347,10 @@ void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const c pthread_rwlock_rdlock(&PVT(l_net)->rwlock); for (dap_list_t *l_tmp = PVT(l_net)->links; l_tmp; l_tmp = dap_list_next(l_tmp)) { dap_chain_node_client_t *l_node_client = (dap_chain_node_client_t *)l_tmp->data; - dap_stream_ch_chain_pkt_write_mt( dap_client_get_stream_worker(l_node_client->client), l_node_client->ch_chain_uuid, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_net->pub.id.uint64, + dap_stream_worker_t *l_stream_worker = dap_client_get_stream_worker(l_node_client->client); + if (l_stream_worker) + continue; + dap_stream_ch_chain_pkt_write_mt(l_stream_worker, l_node_client->ch_chain_uuid, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_net->pub.id.uint64, l_chain_id.uint64, l_net->pub.cell_id.uint64, l_data_out, sizeof(dap_store_obj_pkt_t) + l_data_out->data_size); } @@ -785,6 +791,15 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg) case NET_STATE_LINKS_PREPARE: { log_it(L_NOTICE,"%s.state: NET_STATE_LINKS_PREPARE", l_net->pub.name); s_net_states_notify(l_net); + for (int i = 0; i < l_net_pvt->gdb_sync_nodes_links_count; i++) { + if (i >= l_net_pvt->gdb_sync_nodes_addrs_count) + break; + dap_chain_node_info_t *l_link_node_info = DAP_NEW_Z(dap_chain_node_info_t); + l_link_node_info->hdr.address.uint64 = l_net_pvt->gdb_sync_nodes_addrs[i].uint64; + l_link_node_info->hdr.ext_addr_v4.s_addr = l_net_pvt->gdb_sync_nodes_links_ips[i]; + l_link_node_info->hdr.ext_port = l_net_pvt->gdb_sync_nodes_links_ports[i]; + l_net_pvt->links_info = dap_list_append(l_net_pvt->links_info, l_link_node_info); + } uint64_t l_own_addr = dap_chain_net_get_cur_addr_int(l_net); if (l_net_pvt->node_info) { for (size_t i = 0; i < l_net_pvt->node_info->hdr.links_number; i++) { @@ -1605,12 +1620,40 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) char **l_gdb_sync_nodes_addrs = dap_config_get_array_str(l_cfg, "general", "gdb_sync_nodes_addrs", &l_net_pvt->gdb_sync_nodes_addrs_count); if(l_gdb_sync_nodes_addrs && l_net_pvt->gdb_sync_nodes_addrs_count > 0) { - l_net_pvt->gdb_sync_nodes_addrs = (dap_chain_node_addr_t*) DAP_NEW_Z_SIZE(char**, + l_net_pvt->gdb_sync_nodes_addrs = DAP_NEW_Z_SIZE(dap_chain_node_addr_t, sizeof(dap_chain_node_addr_t)*l_net_pvt->gdb_sync_nodes_addrs_count); for(uint16_t i = 0; i < l_net_pvt->gdb_sync_nodes_addrs_count; i++) { dap_chain_node_addr_from_str(l_net_pvt->gdb_sync_nodes_addrs + i, l_gdb_sync_nodes_addrs[i]); } } + // links for special sync + uint16_t l_gdb_links_count = 0; + PVT(l_net)->gdb_sync_nodes_links_count = 0; + char **l_gdb_sync_nodes_links = dap_config_get_array_str(l_cfg, "general", "gdb_sync_nodes_links", &l_gdb_links_count); + if (l_gdb_sync_nodes_links && l_gdb_links_count > 0) { + l_net_pvt->gdb_sync_nodes_links_ips = DAP_NEW_Z_SIZE(uint32_t, l_gdb_links_count * sizeof(uint32_t)); + l_net_pvt->gdb_sync_nodes_links_ports = DAP_NEW_SIZE(uint16_t, l_gdb_links_count * sizeof(uint16_t)); + for(uint16_t i = 0; i < l_gdb_links_count; i++) { + char *l_gdb_link_port_str = strchr(l_gdb_sync_nodes_links[i], ':'); + if (!l_gdb_link_port_str) { + continue; + } + uint16_t l_gdb_link_port = atoi(l_gdb_link_port_str + 1); + if (!l_gdb_link_port) { + continue; + } + int l_gdb_link_len = l_gdb_link_port_str - l_gdb_sync_nodes_links[i]; + char *l_gdb_link_ip_str[l_gdb_link_len + 1]; + memcpy(l_gdb_link_ip_str, l_gdb_sync_nodes_links[i], l_gdb_link_len); + l_gdb_link_ip_str[l_gdb_link_len] = '\0'; + struct in_addr l_in_addr; + if (inet_pton(AF_INET, (const char *)l_gdb_link_ip_str, &l_in_addr) > 0) { + PVT(l_net)->gdb_sync_nodes_links_ips[PVT(l_net)->gdb_sync_nodes_links_count] = l_in_addr.s_addr; + PVT(l_net)->gdb_sync_nodes_links_ports[PVT(l_net)->gdb_sync_nodes_links_count] = l_gdb_link_port; + PVT(l_net)->gdb_sync_nodes_links_count++; + } + } + } // groups for special sync uint16_t l_gdb_sync_groups_count; char **l_gdb_sync_groups = dap_config_get_array_str(l_cfg, "general", "gdb_sync_groups", &l_gdb_sync_groups_count); @@ -1621,7 +1664,6 @@ int s_net_load(const char * a_net_name, uint16_t a_acl_idx) } } - // Add network to the list dap_chain_net_item_t * l_net_item = DAP_NEW_Z( dap_chain_net_item_t); dap_chain_net_item_t * l_net_item2 = DAP_NEW_Z( dap_chain_net_item_t);