diff --git a/dap_stream_ch_chain_net.c b/dap_stream_ch_chain_net.c index cf44a89926b26da2bd4c0f7ee4a741bd146cd56f..348ee15bb6311ab3098f124d32027cedfbfc7dff 100755 --- a/dap_stream_ch_chain_net.c +++ b/dap_stream_ch_chain_net.c @@ -45,7 +45,12 @@ static void s_stream_ch_delete(dap_stream_ch_t* ch, void* arg); static void s_stream_ch_packet_in(dap_stream_ch_t* ch, void* arg); static void s_stream_ch_packet_out(dap_stream_ch_t* ch, void* arg); -typedef struct session_data { +typedef enum dap_chain_net_session_state{ + CHAIN_NET_SESSION_STATE_IDLE=0, + CHAIN_NET_SESSION_STATE_REQUESTED_ADDR, +} dap_chain_net_session_state_t; + +typedef struct dap_chain_net_session_data { unsigned int id; //int sock; int message_id; @@ -55,9 +60,10 @@ typedef struct session_data { dap_list_t *list_tr; // list of transactions dap_chain_node_addr_t node_remote; dap_chain_node_addr_t node_cur; + dap_chain_net_session_state_t state; UT_hash_handle hh; -} session_data_t; +} dap_chain_net_session_data_t; typedef struct message_data { time_t timestamp_start; @@ -66,7 +72,7 @@ typedef struct message_data { } DAP_ALIGN_PACKED message_data_t; // list of active sessions -static session_data_t *s_chain_net_data = NULL; +static dap_chain_net_session_data_t *s_chain_net_data = NULL; // for separate access to session_data_t static pthread_mutex_t s_hash_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -102,11 +108,11 @@ static const message_data_t *dap_stream_ch_chain_net_parse_packet(uint8_t* a_dat static void session_data_update(unsigned int a_id, int a_messsage_id, dap_list_t *a_list, message_data_t *a_data, time_t a_timestamp_cur) { - session_data_t *l_sdata; + dap_chain_net_session_data_t *l_sdata; pthread_mutex_lock(&s_hash_mutex); HASH_FIND_INT(s_chain_net_data, &a_id, l_sdata); if(l_sdata == NULL) { - l_sdata = DAP_NEW_Z(session_data_t); + l_sdata = DAP_NEW_Z(dap_chain_net_session_data_t); l_sdata->id = a_id; HASH_ADD_INT(s_chain_net_data, id, l_sdata); } @@ -127,9 +133,9 @@ static void session_data_update(unsigned int a_id, int a_messsage_id, dap_list_t pthread_mutex_unlock(&s_hash_mutex); } -static session_data_t* session_data_find(unsigned int a_id) +static dap_chain_net_session_data_t* session_data_find(unsigned int a_id) { - session_data_t *l_sdata; + dap_chain_net_session_data_t *l_sdata; pthread_mutex_lock(&s_hash_mutex); HASH_FIND_INT(s_chain_net_data, &a_id, l_sdata); pthread_mutex_unlock(&s_hash_mutex); @@ -138,7 +144,7 @@ static session_data_t* session_data_find(unsigned int a_id) static void session_data_del(unsigned int a_id) { - session_data_t *l_sdata; + dap_chain_net_session_data_t *l_sdata; pthread_mutex_lock(&s_hash_mutex); HASH_FIND_INT(s_chain_net_data, &a_id, l_sdata); if(l_sdata) { @@ -150,7 +156,7 @@ static void session_data_del(unsigned int a_id) static void session_data_del_all() { - session_data_t *l_sdata, *l_sdata_tmp; + dap_chain_net_session_data_t *l_sdata, *l_sdata_tmp; pthread_mutex_lock(&s_hash_mutex); HASH_ITER(hh, s_chain_net_data , l_sdata, l_sdata_tmp) { @@ -253,10 +259,22 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_set_ready_to_write(a_ch, false); } break; - // get node address case STREAM_CH_CHAIN_NET_PKT_TYPE_GET_NODE_ADDR: { log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GET_NODE_ADDR"); dap_stream_ch_set_ready_to_write(a_ch, false); + } + // get node address + case STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_REQUEST: { + log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_REQUEST"); + dap_chain_net_session_data_t *l_session_data = session_data_find(a_ch->stream->session->id); + if (l_session_data ) { + l_session_data->state = CHAIN_NET_SESSION_STATE_REQUESTED_ADDR; + dap_stream_ch_set_ready_to_write(a_ch, true); + }else { + log_it(L_ERROR, "Can't find session_id=%u to produce addr in reply", + a_ch->stream->session->id); + dap_stream_ch_set_ready_to_write(a_ch, false); + } } break; // set new node address @@ -371,7 +389,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } if(l_ch_chain_net->notify_callback) { if(l_chain_pkt->hdr.type == STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC) { - session_data_t *l_data = session_data_find(a_ch->stream->session->id); + dap_chain_net_session_data_t *l_data = session_data_find(a_ch->stream->session->id); // end of session if(!l_data->list_tr) l_ch_chain_net->notify_callback(NULL, l_ch_pkt_data_size, l_ch_chain_net->notify_callback_arg); @@ -388,6 +406,32 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) } } +/** + * @brief s_session_state_proc + * @param a_ch + */ +void s_session_state_proc(dap_stream_ch_t * a_ch ) +{ + dap_stream_ch_chain_net_t * l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(a_ch); + + dap_chain_net_session_data_t *l_session_data = session_data_find(a_ch->stream->session->id); + //printf("*packet out session_id=%u\n", a_ch->stream->session->id); + if(!l_session_data) { + log_it(L_WARNING, "Can't find session_data"); + dap_stream_ch_set_ready_to_write(a_ch, false); + return; + } + + switch (l_session_data->state){ + case CHAIN_NET_SESSION_STATE_REQUESTED_ADDR:{ +// dap_chain_node_addr_t * l_addr = dap_chain_node_gen_addr( l_ch_chain_net-> ) + }break; + case CHAIN_NET_SESSION_STATE_IDLE:{ + log_it(L_NOTICE,"Session idle state, nothing to do"); + }break; + } +} + /** * @brief s_stream_ch_packet_out * @param ch @@ -398,7 +442,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_chain_net_t * l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(a_ch); pthread_mutex_lock(&l_ch_chain_net->mutex); - session_data_t *l_data = session_data_find(a_ch->stream->session->id); + dap_chain_net_session_data_t *l_data = session_data_find(a_ch->stream->session->id); //printf("*packet out session_id=%u\n", a_ch->stream->session->id); if(!l_data) { log_it(L_WARNING, "if packet_out() l_data=NULL"); @@ -426,7 +470,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) int len = dap_list_length(l_list); //printf("*len=%d\n", len); if(l_list) { - size_t l_item_size_out = 0; + size_t l_item_size_out = 0; uint8_t *l_item = NULL; while(l_list && !l_item) { l_item = dap_db_log_pack((dap_global_db_obj_t *) l_list->data, &l_item_size_out); diff --git a/dap_stream_ch_chain_net.h b/dap_stream_ch_chain_net.h index 9279034d2882864ca1da925a0a81667f6e5c17b8..62967c0025eaab8e580a77bf2df203053db50ee6 100755 --- a/dap_stream_ch_chain_net.h +++ b/dap_stream_ch_chain_net.h @@ -26,6 +26,7 @@ #include <pthread.h> #include <stdint.h> +#include "dap_stream_ch.h" #include "dap_stream_ch_chain_net_pkt.h" typedef void (*dap_stream_ch_chain_net_callback_t)(dap_stream_ch_chain_net_pkt_t *a_pkt, size_t a_pkt_data_size, void *arg); diff --git a/dap_stream_ch_chain_net_pkt.c b/dap_stream_ch_chain_net_pkt.c index f4c7d01a4a6fb2c7f43c2a4cfe26c1db1968e353..5423ead8e219701c639edf19de78a3dad3639267 100755 --- a/dap_stream_ch_chain_net_pkt.c +++ b/dap_stream_ch_chain_net_pkt.c @@ -29,7 +29,7 @@ size_t dap_stream_ch_chain_net_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, char *l_buf = DAP_NEW_SIZE(char, l_buf_size); memcpy(l_buf, &l_hdr, sizeof(l_hdr)); memcpy(l_buf + sizeof(l_hdr), a_data, a_data_size); - int l_ret = dap_stream_ch_pkt_write(a_ch, STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_REQUEST, l_buf, l_buf_size); + size_t l_ret = dap_stream_ch_pkt_write(a_ch, a_type , l_buf, l_buf_size); DAP_DELETE(l_buf); return l_ret; } diff --git a/dap_stream_ch_chain_net_pkt.h b/dap_stream_ch_chain_net_pkt.h index b05ebd45e6f532bbf1a48b024ce4601293aaeec7..c089d470151804883808bde0d6ca891baa3689c6 100755 --- a/dap_stream_ch_chain_net_pkt.h +++ b/dap_stream_ch_chain_net_pkt.h @@ -39,6 +39,8 @@ #define STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC 0x14 #define STREAM_CH_CHAIN_NET_PKT_TYPE_GET_NODE_ADDR 0x15 #define STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR 0x16 +#define STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_REQUEST 0x17 +#define STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_RESPONSE 0x18 #define STREAM_CH_CHAIN_NET_PKT_TYPE_DBG 0x99 typedef struct stream_ch_chain_net_pkt_hdr{