diff --git a/dap_stream_ch_chain_net.c b/dap_stream_ch_chain_net.c index 348ee15bb6311ab3098f124d32027cedfbfc7dff..fbdc0ab768aa997151fd34b620497fed8e720918 100755 --- a/dap_stream_ch_chain_net.c +++ b/dap_stream_ch_chain_net.c @@ -45,93 +45,19 @@ static void s_stream_ch_delete(dap_stream_ch_t* ch, void* arg); static void s_stream_ch_packet_in(dap_stream_ch_t* ch, void* arg); static void s_stream_ch_packet_out(dap_stream_ch_t* ch, void* arg); -typedef enum dap_chain_net_session_state{ - CHAIN_NET_SESSION_STATE_IDLE=0, - CHAIN_NET_SESSION_STATE_REQUESTED_ADDR, -} dap_chain_net_session_state_t; - typedef struct dap_chain_net_session_data { - unsigned int id; - //int sock; - int message_id; - uint16_t type; // data type - time_t timestamp_start; - time_t timestamp_cur; - dap_list_t *list_tr; // list of transactions - dap_chain_node_addr_t node_remote; - dap_chain_node_addr_t node_cur; - dap_chain_net_session_state_t state; + unsigned int session_id; + + dap_chain_node_addr_t addr_remote; UT_hash_handle hh; } dap_chain_net_session_data_t; -typedef struct message_data { - time_t timestamp_start; - uint64_t addr_from; // node addr - uint64_t addr_to; // node addr -} DAP_ALIGN_PACKED message_data_t; - // list of active sessions static dap_chain_net_session_data_t *s_chain_net_data = NULL; // for separate access to session_data_t static pthread_mutex_t s_hash_mutex = PTHREAD_MUTEX_INITIALIZER; -// create packet to send -uint8_t* dap_stream_ch_chain_net_make_packet(uint64_t a_node_addr_from, uint64_t a_node_addr_to, - time_t a_timestamp_start, uint8_t *a_sdata, size_t a_sdata_len, size_t *a_data_len_out) -{ - //message_data_t *l_data = DAP_NEW_Z(message_data_t); - message_data_t *l_data = DAP_NEW_Z_SIZE(message_data_t, sizeof(message_data_t) + a_sdata_len); - l_data->timestamp_start = a_timestamp_start; - l_data->addr_from = a_node_addr_from; - l_data->addr_to = a_node_addr_to; - // copy add data - memcpy(l_data + 1, a_sdata, a_sdata_len); - *a_data_len_out = sizeof(message_data_t) + a_sdata_len; - return (uint8_t*) l_data; -} - -// parse received packet -static const message_data_t *dap_stream_ch_chain_net_parse_packet(uint8_t* a_data, size_t a_data_len, - const uint8_t **a_add_data_out, size_t *a_add_data_len_out) -{ - if(a_data_len < sizeof(message_data_t)) - return NULL; - message_data_t *l_data = (message_data_t*) a_data; - if(a_add_data_out) - *a_add_data_out = (uint8_t*) (l_data + 1); - if(a_add_data_len_out) - *a_add_data_len_out = a_data_len - sizeof(message_data_t); - return l_data; -} - -static void session_data_update(unsigned int a_id, int a_messsage_id, dap_list_t *a_list, message_data_t *a_data, - time_t a_timestamp_cur) -{ - dap_chain_net_session_data_t *l_sdata; - pthread_mutex_lock(&s_hash_mutex); - HASH_FIND_INT(s_chain_net_data, &a_id, l_sdata); - if(l_sdata == NULL) { - l_sdata = DAP_NEW_Z(dap_chain_net_session_data_t); - l_sdata->id = a_id; - HASH_ADD_INT(s_chain_net_data, id, l_sdata); - } - if(a_messsage_id != -1) - l_sdata->message_id = a_messsage_id; - - l_sdata->list_tr = a_list; - - if(a_data) { - l_sdata->timestamp_start = a_data->timestamp_start; - l_sdata->node_remote.uint64 = a_data->addr_from; - l_sdata->node_cur.uint64 = a_data->addr_to; - } - if(a_timestamp_cur != (time_t) -1) { - l_sdata->timestamp_cur = a_timestamp_cur; - } - - pthread_mutex_unlock(&s_hash_mutex); -} static dap_chain_net_session_data_t* session_data_find(unsigned int a_id) { @@ -205,6 +131,17 @@ void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg) dap_stream_ch_chain_net_t * l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(a_ch); l_ch_chain_net->ch = a_ch; pthread_mutex_init(&l_ch_chain_net->mutex, NULL); + + // Create chain net session ever it created + dap_chain_net_session_data_t *l_sdata; + pthread_mutex_lock(&s_hash_mutex); + HASH_FIND_INT(s_chain_net_data, &a_ch->stream->session->id, l_sdata); + if(l_sdata == NULL) { + l_sdata = DAP_NEW_Z(dap_chain_net_session_data_t); + l_sdata->session_id = a_ch->stream->session->id; + HASH_ADD_INT(s_chain_net_data, session_id, l_sdata); + } + pthread_mutex_unlock(&s_hash_mutex); } /** @@ -230,207 +167,124 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch, void* a_arg) void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg) { dap_stream_ch_chain_net_t * l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(a_ch); + dap_chain_net_session_data_t *l_session_data = session_data_find(a_ch->stream->session->id); + if (l_session_data == NULL) { + log_it(L_ERROR, "Can't find chain net session for stream session %d", a_ch->stream->session->id); + dap_stream_ch_set_ready_to_write(a_ch, false); + } + if(l_ch_chain_net) { pthread_mutex_lock(&l_ch_chain_net->mutex); dap_stream_ch_pkt_t *l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg; - dap_stream_ch_chain_net_pkt_t *l_chain_pkt = (dap_stream_ch_chain_net_pkt_t *) l_ch_pkt->data; - if(l_chain_pkt) { - - size_t l_ch_pkt_data_size = l_ch_pkt->hdr.size - sizeof(dap_stream_ch_chain_net_pkt_hdr_t); - //printf("*packet TYPE=%d data_size=%d\n", l_chain_pkt->hdr.type, l_ch_pkt_data_size); - //(data_size > 0) ? (char*) (l_chain_pkt->data) : "-"); - - switch (l_chain_pkt->hdr.type) { - case STREAM_CH_CHAIN_NET_PKT_TYPE_DBG: { - dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_PING, NULL, 0); - dap_stream_ch_set_ready_to_write(a_ch, true); - } - break; - // received ping request - > send pong request - case STREAM_CH_CHAIN_NET_PKT_TYPE_PING: { - log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_PING"); - size_t l_res = dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_PONG, NULL, 0); - dap_stream_ch_set_ready_to_write(a_ch, true); - } - break; - // receive pong request -> send nothing - case STREAM_CH_CHAIN_NET_PKT_TYPE_PONG: { - log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_PONG"); - dap_stream_ch_set_ready_to_write(a_ch, false); - } - break; - case STREAM_CH_CHAIN_NET_PKT_TYPE_GET_NODE_ADDR: { - log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GET_NODE_ADDR"); - dap_stream_ch_set_ready_to_write(a_ch, false); - } - // get node address - case STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_REQUEST: { - log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_REQUEST"); - dap_chain_net_session_data_t *l_session_data = session_data_find(a_ch->stream->session->id); - if (l_session_data ) { - l_session_data->state = CHAIN_NET_SESSION_STATE_REQUESTED_ADDR; + dap_stream_ch_chain_net_pkt_t *l_ch_chain_net_pkt = (dap_stream_ch_chain_net_pkt_t *) l_ch_pkt->data; + size_t l_ch_chain_net_pkt_data_size = (size_t) l_ch_pkt->hdr.size - sizeof (l_ch_chain_net_pkt->hdr); + if(l_ch_chain_net_pkt) { + size_t l_ch_chain_net_pkt_data_size = l_ch_pkt->hdr.size - sizeof(dap_stream_ch_chain_net_pkt_hdr_t); + switch (l_ch_pkt->hdr.type) { + case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_DBG: { + dap_stream_ch_chain_net_pkt_write(a_ch, DAP_SREAM_CH_CHAIN_NET_PKT_TYPE_PING, + l_ch_chain_net_pkt->hdr.net_id, NULL, 0); + dap_stream_ch_set_ready_to_write(a_ch, true); + } + break; + // received ping request - > send pong request + case DAP_SREAM_CH_CHAIN_NET_PKT_TYPE_PING: { + log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_PING"); + dap_stream_ch_chain_net_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_PONG, + l_ch_chain_net_pkt->hdr.net_id,NULL, 0); dap_stream_ch_set_ready_to_write(a_ch, true); - }else { - log_it(L_ERROR, "Can't find session_id=%u to produce addr in reply", - a_ch->stream->session->id); + } + break; + // receive pong request -> send nothing + case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_PONG: { + log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_PONG"); dap_stream_ch_set_ready_to_write(a_ch, false); } - } break; - // set new node address - case STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR: { - log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR"); - { - uint64_t l_addr = 0; - // set cur node addr - if(l_ch_pkt_data_size == sizeof(uint64_t)) { - memcpy(&l_addr, l_chain_pkt->data, sizeof(uint64_t)); - dap_db_set_cur_node_addr(l_addr); + case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR: { + log_it(L_INFO, "Get CH_CHAIN_NET_PKT_TYPE_NODE_ADDR"); + if ( l_ch_chain_net_pkt_data_size == sizeof (dap_chain_node_addr_t) ) { + dap_chain_node_addr_t * l_addr = (dap_chain_node_addr_t *) l_ch_chain_net_pkt->data; + memcpy( &l_session_data->addr_remote,l_addr,sizeof (*l_addr) ); + log_it(L_NOTICE,"Accepted remote node addr 0x%016llX",l_addr->uint64); + }else { + log_it(L_WARNING,"Wrong data secion size %u",l_ch_chain_net_pkt_data_size, + sizeof (dap_chain_node_addr_t)); } - } - session_data_update(a_ch->stream->session->id, STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR, - NULL, NULL, -1); - dap_stream_ch_set_ready_to_write(a_ch, true); - } - break; - case STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB: { - log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB data_size=%d", l_ch_pkt_data_size); - // get transaction and save it to global_db - if(l_ch_pkt_data_size > 0) { - - // parse received packet - size_t l_data_size = 0; - const uint8_t *l_data = NULL; - const message_data_t *l_mdata = dap_stream_ch_chain_net_parse_packet(l_chain_pkt->data, - l_ch_pkt_data_size, - &l_data, &l_data_size); - - /*dap_chain_node_addr_t l_node_cur; - dap_chain_node_addr_t l_node_remote; - uint8_t *l_recv_data = l_chain_pkt->data; //DAP_NEW_SIZE(uint8_t, l_item_size_out + 2 * sizeof(dap_chain_node_addr_t)); - memcpy(&l_node_remote, l_recv_data, sizeof(dap_chain_node_addr_t)); - memcpy(&l_node_cur, l_recv_data + sizeof(dap_chain_node_addr_t), sizeof(dap_chain_node_addr_t)); - uint8_t *l_mdata = l_recv_data + 2 * sizeof(dap_chain_node_addr_t); - l_ch_pkt_data_size -= 2 * sizeof(dap_chain_node_addr_t);*/ - - if(l_data && l_data_size > 0) { - //session_data_t *l_data = session_data_find(a_ch->stream->session->id); - size_t l_data_obj_count = 0; - - // deserialize data - void *l_data_obj = dap_db_log_unpack((uint8_t*) l_data, l_data_size, &l_data_obj_count); // Parse data from dap_db_log_pack() - // save data to global_db - if(!dap_chain_global_db_obj_save(l_data_obj, l_data_obj_count)) { - log_it(L_ERROR, "Don't saved to global_db objs=0x%x count=%d", l_data_obj, - l_data_obj_count); - } - else { - // Get remote timestamp - time_t l_timestamp_remote = dap_db_log_unpack_get_timestamp((uint8_t*) l_data, l_data_size); - // set new timestamp (saved data) for remote node - dap_db_log_set_last_timestamp_remote(l_mdata->addr_from, l_timestamp_remote); - //printf("***ts=%llu\n", l_timestamp_remote); + dap_stream_ch_set_ready_to_write(a_ch, false); + }break; + case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_LEASE: { + log_it(L_INFO, "Get CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_LEASE"); + if ( l_ch_chain_net_pkt_data_size == sizeof (dap_chain_node_addr_t) ) { + dap_chain_node_addr_t * l_addr = (dap_chain_node_addr_t *) l_ch_chain_net_pkt->data; + log_it(L_NOTICE,"Leased new node addr 0x%016llX",l_addr->uint64); + dap_chain_net_t * l_net = dap_chain_net_by_id( l_ch_chain_net_pkt->hdr.net_id ); + if ( l_net == NULL){ + char l_err_str[]="ERROR_NET_INVALID_ID"; + dap_stream_ch_chain_net_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR , + l_ch_chain_net_pkt->hdr.net_id, l_err_str,sizeof (l_err_str)); + dap_stream_ch_set_ready_to_write(a_ch, true); + log_it(L_WARNING,"Invalid net id in packet"); + } else { + + uint64_t l_cur_node_addr = dap_db_get_cur_node_addr(); + if ( l_cur_node_addr == 0 ){ + if (dap_db_set_cur_node_addr( l_addr->uint64 )) + log_it(L_NOTICE,"Set up cur node address 0x%016llX",l_addr->uint64); + else + log_it(L_ERROR,"Can't set up cur node address 0x%016llX",l_addr->uint64); + }else + log_it(L_ERROR,"Already have node address 0x%016llX",l_cur_node_addr); } + memcpy( &l_session_data->addr_remote,l_addr,sizeof (*l_addr) ); + }else { + log_it(L_WARNING,"Wrong data secion size %u",l_ch_chain_net_pkt_data_size, + sizeof (dap_chain_node_addr_t)); } dap_stream_ch_set_ready_to_write(a_ch, false); - - } - - /*// go to data transfer mode - else if(!data_size) { - - dap_stream_ch_set_ready_to_write(a_ch, false); - // Get log diff - //dap_list_t *l_list = dap_db_log_get_list(a_data->timestamp_start); - //session_data_update(a_ch->stream->session->id, l_list, a_data, 0); - - }*/ - } - break; - // receive the latest global_db revision of the remote node -> go to send mode - case STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC: { - - if(l_ch_pkt_data_size == sizeof(message_data_t)) { - - // parse received packet - const message_data_t *l_data = dap_stream_ch_chain_net_parse_packet(l_chain_pkt->data, - l_ch_pkt_data_size, - NULL, NULL); - if(l_data) { - //message_data_t *l_data = (message_data_t*) l_chain_pkt->data; - - //time_t l_timestamp_remote_get = l_data->timestamp_start; - //time_t l_timestamp_remote_cur = dap_db_log_get_last_timestamp_remote(l_data->addr_from); - - // last timestamp for remote node - time_t l_timestamp_remote_saved = l_data->timestamp_start; //min(l_timestamp_remote_get, l_timestamp_remote_cur); - - // Get log diff - dap_list_t *l_list = dap_db_log_get_list(l_timestamp_remote_saved); - session_data_update(a_ch->stream->session->id, - STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC, - l_list, (message_data_t*) l_data, -1); - - log_it(L_INFO, - "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC session_id=%u from 0x%llx to 0x%llx count=%d", - a_ch->stream->session->id, l_data->addr_from, l_data->addr_to, dap_list_length(l_list)); - } - // go to send data from list [in s_stream_ch_packet_out()] - // no data to send -> send one empty message STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC + }break; + // get current node address + case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_REQUEST: { + log_it(L_INFO, "Get CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_REQUEST"); + // get cur node addr + uint64_t l_addr = dap_db_get_cur_node_addr(); + size_t l_send_data_len = sizeof(uint64_t); + // send cur node addr + dap_stream_ch_chain_net_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_REQUEST , + l_ch_chain_net_pkt->hdr.net_id, &l_addr, l_send_data_len); dap_stream_ch_set_ready_to_write(a_ch, true); + } break; + case DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_LEASE_REQUEST: { + log_it(L_INFO, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_REQUEST"); + // gen node addr + dap_chain_net_t * l_net = dap_chain_net_by_id( l_ch_chain_net_pkt->hdr.net_id ); + if ( l_net == NULL){ + char l_err_str[]="ERROR_NET_INVALID_ID"; + dap_stream_ch_chain_net_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR , l_net->pub.id, + l_err_str,sizeof (l_err_str)); + dap_stream_ch_set_ready_to_write(a_ch, true); + } else { + dap_chain_node_addr_t *l_addr_new = dap_chain_node_gen_addr( &l_net->pub.cell_id ); + dap_stream_ch_chain_net_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_LEASE , + l_ch_chain_net_pkt->hdr.net_id, l_addr_new, sizeof (*l_addr_new)); + dap_stream_ch_set_ready_to_write(a_ch, true); + memcpy( &l_session_data->addr_remote,l_addr_new,sizeof (*l_addr_new) ); + DAP_DELETE(l_addr_new); + } } - else { - log_it(L_ERROR, "Get STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC session_id=%u bad request", - a_ch->stream->session->id); - dap_stream_ch_set_ready_to_write(a_ch, false); - } - } break; + } - if(l_ch_chain_net->notify_callback) { - if(l_chain_pkt->hdr.type == STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC) { - dap_chain_net_session_data_t *l_data = session_data_find(a_ch->stream->session->id); - // end of session - if(!l_data->list_tr) - l_ch_chain_net->notify_callback(NULL, l_ch_pkt_data_size, l_ch_chain_net->notify_callback_arg); - else - l_ch_chain_net->notify_callback(l_chain_pkt, l_ch_pkt_data_size, - l_ch_chain_net->notify_callback_arg); - } - else - l_ch_chain_net->notify_callback(l_chain_pkt, l_ch_pkt_data_size, - l_ch_chain_net->notify_callback_arg); - } + if(l_ch_chain_net->notify_callback) + l_ch_chain_net->notify_callback(l_ch_chain_net,l_ch_pkt->hdr.type, l_ch_chain_net_pkt, + l_ch_chain_net_pkt_data_size, l_ch_chain_net->notify_callback_arg); + } pthread_mutex_unlock(&l_ch_chain_net->mutex); } } -/** - * @brief s_session_state_proc - * @param a_ch - */ -void s_session_state_proc(dap_stream_ch_t * a_ch ) -{ - dap_stream_ch_chain_net_t * l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(a_ch); - - dap_chain_net_session_data_t *l_session_data = session_data_find(a_ch->stream->session->id); - //printf("*packet out session_id=%u\n", a_ch->stream->session->id); - if(!l_session_data) { - log_it(L_WARNING, "Can't find session_data"); - dap_stream_ch_set_ready_to_write(a_ch, false); - return; - } - switch (l_session_data->state){ - case CHAIN_NET_SESSION_STATE_REQUESTED_ADDR:{ -// dap_chain_node_addr_t * l_addr = dap_chain_node_gen_addr( l_ch_chain_net-> ) - }break; - case CHAIN_NET_SESSION_STATE_IDLE:{ - log_it(L_NOTICE,"Session idle state, nothing to do"); - }break; - } -} /** * @brief s_stream_ch_packet_out @@ -439,89 +293,5 @@ void s_session_state_proc(dap_stream_ch_t * a_ch ) */ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg) { - dap_stream_ch_chain_net_t * l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(a_ch); - pthread_mutex_lock(&l_ch_chain_net->mutex); - - dap_chain_net_session_data_t *l_data = session_data_find(a_ch->stream->session->id); - //printf("*packet out session_id=%u\n", a_ch->stream->session->id); - if(!l_data) { - log_it(L_WARNING, "if packet_out() l_data=NULL"); - dap_stream_ch_set_ready_to_write(a_ch, false); - return; - } - - if(l_data->message_id == STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR) { - // get cur node addr - uint64_t l_addr = dap_db_get_cur_node_addr(); - size_t l_send_data_len = sizeof(uint64_t); - // send cur node addr - dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GET_NODE_ADDR, &l_addr, l_send_data_len); - - pthread_mutex_unlock(&l_ch_chain_net->mutex); - dap_stream_ch_set_ready_to_write(a_ch, false); - return; - } - - dap_chain_node_addr_t node_cur; - - // Get log diff - size_t l_data_size_out = 0; - dap_list_t *l_list = l_data->list_tr; - int len = dap_list_length(l_list); - //printf("*len=%d\n", len); - if(l_list) { - size_t l_item_size_out = 0; - uint8_t *l_item = NULL; - while(l_list && !l_item) { - l_item = dap_db_log_pack((dap_global_db_obj_t *) l_list->data, &l_item_size_out); - if(!l_item) { - // remove current item from list - dap_chain_global_db_obj_delete((dap_global_db_obj_t *) l_list->data); - l_list = dap_list_delete_link(l_list, l_list); - } - } - - size_t l_send_data_len = 0; - uint8_t *l_send_data = dap_stream_ch_chain_net_make_packet(l_data->node_cur.uint64, l_data->node_remote.uint64, - 0, l_item, l_item_size_out, &l_send_data_len); - - dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB, l_send_data, l_send_data_len); - DAP_DELETE(l_send_data); - - // remove current item from list - dap_chain_global_db_obj_delete((dap_global_db_obj_t *) l_list->data); - - l_list = dap_list_delete_link(l_list, l_list); - - session_data_update(a_ch->stream->session->id, -1, l_list, NULL, -1); - } - // last message - if(!l_list) { - // send request - size_t l_data_size_out = 0; - // Get current last timestamp in log - //time_t l_timestamp_remote_saved = dap_db_log_get_last_timestamp(); - - // get remote last timestamp (saved data) for remote node - time_t l_timestamp_remote_saved = dap_db_log_get_last_timestamp_remote(l_data->node_remote.uint64); - - size_t l_data_send_len = 0; - uint8_t *l_data_send = dap_stream_ch_chain_net_make_packet(l_data->node_cur.uint64, l_data->node_remote.uint64, - l_timestamp_remote_saved, NULL, 0, &l_data_send_len); - - dap_stream_ch_chain_net_pkt_write(a_ch, STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC, l_data_send, - l_data_send_len); - DAP_DELETE(l_data_send); - - l_data = NULL; - } - int l_res = 0; - - // end of session - if(!l_list) - dap_stream_ch_set_ready_to_write(a_ch, false); - else - dap_stream_ch_set_ready_to_write(a_ch, true); - - pthread_mutex_unlock(&l_ch_chain_net->mutex); + dap_stream_ch_set_ready_to_write(a_ch, false); } diff --git a/dap_stream_ch_chain_net.h b/dap_stream_ch_chain_net.h index 62967c0025eaab8e580a77bf2df203053db50ee6..16bcfed44d388fd91a4e1fce25eabbd0ad3e147c 100755 --- a/dap_stream_ch_chain_net.h +++ b/dap_stream_ch_chain_net.h @@ -29,11 +29,14 @@ #include "dap_stream_ch.h" #include "dap_stream_ch_chain_net_pkt.h" -typedef void (*dap_stream_ch_chain_net_callback_t)(dap_stream_ch_chain_net_pkt_t *a_pkt, size_t a_pkt_data_size, void *arg); +typedef struct dap_stream_ch_chain_net dap_stream_ch_chain_net_t; + +typedef void (*dap_stream_ch_chain_net_callback_packet_t)( + dap_stream_ch_chain_net_t *, uint8_t, dap_stream_ch_chain_net_pkt_t *, size_t , void *); typedef struct dap_stream_ch_chain_net { pthread_mutex_t mutex; - dap_stream_ch_chain_net_callback_t notify_callback; + dap_stream_ch_chain_net_callback_packet_t notify_callback; dap_stream_ch_t *ch; void *notify_callback_arg; } dap_stream_ch_chain_net_t; @@ -41,7 +44,5 @@ typedef struct dap_stream_ch_chain_net { #define DAP_STREAM_CH_CHAIN_NET(a) ((dap_stream_ch_chain_net_t *) ((a)->internal) ) uint8_t dap_stream_ch_chain_net_get_id(); -uint8_t* dap_stream_ch_chain_net_make_packet(uint64_t a_node_addr_from, uint64_t a_node_addr_to, - time_t a_timestamp_start, uint8_t *a_sdata, size_t a_sdata_len, size_t *a_data_len_out); int dap_stream_ch_chain_net_init(); void dap_stream_ch_chain_net_deinit(); diff --git a/dap_stream_ch_chain_net_pkt.c b/dap_stream_ch_chain_net_pkt.c index 5423ead8e219701c639edf19de78a3dad3639267..0fea9b522bdc27c4ece016896cfca6b58dc624ac 100755 --- a/dap_stream_ch_chain_net_pkt.c +++ b/dap_stream_ch_chain_net_pkt.c @@ -19,34 +19,35 @@ * @param data_size * @return */ -size_t dap_stream_ch_chain_net_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, - const void * a_data, uint32_t a_data_size) +size_t dap_stream_ch_chain_net_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id, + const void * a_data, size_t a_data_size) { - dap_stream_ch_chain_net_pkt_t l_hdr; - memset(&l_hdr, 0, sizeof(l_hdr)); - l_hdr.hdr.type = a_type; - size_t l_buf_size = sizeof(l_hdr) + a_data_size; - char *l_buf = DAP_NEW_SIZE(char, l_buf_size); - memcpy(l_buf, &l_hdr, sizeof(l_hdr)); - memcpy(l_buf + sizeof(l_hdr), a_data, a_data_size); - size_t l_ret = dap_stream_ch_pkt_write(a_ch, a_type , l_buf, l_buf_size); - DAP_DELETE(l_buf); + dap_stream_ch_chain_net_pkt_t * l_net_pkt; + size_t l_net_pkt_size = sizeof (l_net_pkt->hdr) + a_data_size; + l_net_pkt = DAP_NEW_Z_SIZE(dap_stream_ch_chain_net_pkt_t, l_net_pkt_size ); + l_net_pkt->hdr.version = 1; + l_net_pkt->hdr.net_id.uint64 = a_net_id.uint64; + memcpy( l_net_pkt->data, a_data, a_data_size); + size_t l_ret = dap_stream_ch_pkt_write(a_ch, a_type , l_net_pkt, l_net_pkt_size); + DAP_DELETE(l_net_pkt); return l_ret; } /** * @brief dap_stream_ch_chain_net_pkt_write_f - * @param sid - * @param str + * @param a_ch + * @param a_type + * @param a_net_id + * @param a_str * @return */ -size_t dap_stream_ch_chain_net_pkt_write_f(dap_stream_ch_t *a_ch, uint8_t a_type, const char *a_str, ...) +size_t dap_stream_ch_chain_net_pkt_write_f(dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id, const char *a_str, ...) { char l_buf[4096]; va_list ap; va_start(ap, a_str); vsnprintf(l_buf, sizeof(l_buf), a_str, ap); va_end(ap); - size_t ret = dap_stream_ch_chain_net_pkt_write(a_ch, a_type, l_buf, strlen(l_buf)); + size_t ret = dap_stream_ch_chain_net_pkt_write(a_ch, a_type, a_net_id, l_buf, strlen(l_buf)); return ret; } diff --git a/dap_stream_ch_chain_net_pkt.h b/dap_stream_ch_chain_net_pkt.h index c089d470151804883808bde0d6ca891baa3689c6..0d52cf091d168b0862b01ba06aaf7418a86e76a6 100755 --- a/dap_stream_ch_chain_net_pkt.h +++ b/dap_stream_ch_chain_net_pkt.h @@ -28,35 +28,31 @@ #include "dap_chain_common.h" #include "dap_chain_net.h" #include "dap_chain_node.h" +#include "dap_stream_ch.h" +#define DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_REQUEST 0x01 +#define DAP_SREAM_CH_CHAIN_NET_PKT_TYPE_PING 0x02 +#define DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_PONG 0x03 +#define DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_REQUEST 0x14 +#define DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR 0x11 -#define STREAM_CH_CHAIN_NET_PKT_TYPE_REQUEST 0x00 -#define STREAM_CH_CHAIN_NET_PKT_TYPE_PING 0x01 -#define STREAM_CH_CHAIN_NET_PKT_TYPE_PONG 0x02 -#define STREAM_CH_CHAIN_NET_PKT_TYPE_BLOCK 0x11 -#define STREAM_CH_CHAIN_NET_PKT_TYPE_DATUM 0x12 -#define STREAM_CH_CHAIN_NET_PKT_TYPE_GLOVAL_DB 0x13 -#define STREAM_CH_CHAIN_NET_PKT_TYPE_GLOBAL_DB_REQUEST_SYNC 0x14 -#define STREAM_CH_CHAIN_NET_PKT_TYPE_GET_NODE_ADDR 0x15 -#define STREAM_CH_CHAIN_NET_PKT_TYPE_SET_NODE_ADDR 0x16 -#define STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_REQUEST 0x17 -#define STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_RESPONSE 0x18 -#define STREAM_CH_CHAIN_NET_PKT_TYPE_DBG 0x99 +#define DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_LEASE_REQUEST 0x17 +#define DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_NODE_ADDR_LEASE 0x18 +#define DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_DBG 0x99 + +#define DAP_STREAM_CH_CHAIN_NET_PKT_TYPE_ERROR 0xff typedef struct stream_ch_chain_net_pkt_hdr{ + uint8_t version; + uint8_t padding[3]; dap_chain_net_id_t net_id; - uint16_t type; // Chain data type - uint8_t padding1[2]; // Some padding - union{ - uint64_t raw; - }; -} __attribute__((packed)) dap_stream_ch_chain_net_pkt_hdr_t; +} DAP_ALIGN_PACKED dap_stream_ch_chain_net_pkt_hdr_t; typedef struct dap_stream_ch_chain_net_pkt{ dap_stream_ch_chain_net_pkt_hdr_t hdr; uint8_t data[]; -} __attribute__((packed)) dap_stream_ch_chain_net_pkt_t; +} DAP_ALIGN_PACKED dap_stream_ch_chain_net_pkt_t; -size_t dap_stream_ch_chain_net_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, const void * a_data, uint32_t a_data_size); -size_t dap_stream_ch_chain_net_pkt_write_f(dap_stream_ch_t *a_ch, uint8_t a_type, const char *a_str, ...); +size_t dap_stream_ch_chain_net_pkt_write(dap_stream_ch_t *a_ch, uint8_t a_type, dap_chain_net_id_t a_net_id, const void * a_data, size_t a_data_size); +size_t dap_stream_ch_chain_net_pkt_write_f(dap_stream_ch_t *a_ch, uint8_t a_type, dap_chain_net_id_t a_net_id, const char *a_str, ...);