From a29ddc9c7c893f53c37c57f16c55010d97366488 Mon Sep 17 00:00:00 2001 From: anta999 <antarcticstation@gmail.com> Date: Mon, 3 Jun 2019 19:06:52 +0400 Subject: [PATCH] comm --- dap_stream_ch_chain.c | 189 ++++++++++++++++++++++++++++++++---------- 1 file changed, 145 insertions(+), 44 deletions(-) mode change 100755 => 100644 dap_stream_ch_chain.c diff --git a/dap_stream_ch_chain.c b/dap_stream_ch_chain.c old mode 100755 new mode 100644 index c745697..0bdabb4 --- a/dap_stream_ch_chain.c +++ b/dap_stream_ch_chain.c @@ -97,6 +97,7 @@ void s_stream_ch_new(dap_stream_ch_t* a_ch , void* a_arg) void s_stream_ch_delete(dap_stream_ch_t* a_ch , void* a_arg) { (void) a_arg; + if (DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs ) dap_list_free_full(DAP_STREAM_CH_CHAIN(a_ch)->request_global_db_trs, (dap_callback_destroyed_t) free); pthread_mutex_destroy( &DAP_STREAM_CH_CHAIN(a_ch)->mutex); @@ -110,6 +111,7 @@ void s_stream_ch_delete(dap_stream_ch_t* a_ch , void* a_arg) void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) { dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); + if ( l_ch_chain){ dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg; dap_stream_ch_chain_pkt_t * l_chain_pkt =(dap_stream_ch_chain_pkt_t *) l_ch_pkt->data; @@ -257,9 +259,9 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) char l_ts_str[50]; dap_time_to_str_rfc822(l_ts_str,sizeof(l_ts_str),l_store_obj[i].timestamp); log_it(L_DEBUG,"Unpacked log history: type='%c' (0x%02hhX) group=\"%s\" key=\"%s\"" - " timestamp=\"%s\" value_len=%u ", + " section=\"%s\" timestamp=\"%s\" value_len=%u ", (char) l_store_obj[i].type , l_store_obj[i].type, l_store_obj[i].group, l_store_obj[i].key, - l_ts_str, + l_store_obj[i].section , l_ts_str, l_store_obj[i].value_len); //memcpy(l_store_obj_reversed+l_data_obj_count-i-1, l_store_obj+i,sizeof(*l_store_obj) ); @@ -299,6 +301,8 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) } } } + + /** * @brief s_stream_ch_packet_out * @param ch @@ -307,76 +311,167 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch , void* a_arg) void s_stream_ch_packet_out(dap_stream_ch_t* a_ch , void* a_arg) { (void) a_arg; - dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch); +// log_it( L_DEBUG,"s_stream_ch_packet_out"); + + dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN( a_ch ); + + // log_it( L_DEBUG,"l_ch_chain %X", l_ch_chain ); switch ( l_ch_chain->state ) { + case CHAIN_STATE_IDLE:{ + + log_it( L_DEBUG,"CHAIN_STATE_IDLE"); + // Cleanup after request - memset(&l_ch_chain->request,0,sizeof(l_ch_chain->request)); - memset(&l_ch_chain->request_net_id,0,sizeof(l_ch_chain->request_net_id)); - memset(&l_ch_chain->request_cell_id,0,sizeof(l_ch_chain->request_cell_id)); - memset(&l_ch_chain->request_chain_id,0,sizeof(l_ch_chain->request_chain_id)); - memset(&l_ch_chain->request_last_ts,0,sizeof(l_ch_chain->request_last_ts )); + memset( &l_ch_chain->request, 0, sizeof(l_ch_chain->request) ); + memset( &l_ch_chain->request_net_id, 0, sizeof(l_ch_chain->request_net_id) ); + memset( &l_ch_chain->request_cell_id, 0, sizeof(l_ch_chain->request_cell_id) ); + memset( &l_ch_chain->request_chain_id, 0, sizeof(l_ch_chain->request_chain_id) ); + memset( &l_ch_chain->request_last_ts, 0, sizeof(l_ch_chain->request_last_ts ) ); - dap_chain_atom_item_t * l_atom_item = NULL, * l_atom_item_tmp = NULL; - HASH_ITER(hh,l_ch_chain->request_atoms_lasts, l_atom_item, l_atom_item_tmp) - HASH_DEL(l_ch_chain->request_atoms_lasts, l_atom_item); + dap_chain_atom_item_t *l_atom_item = NULL, *l_atom_item_tmp = NULL; - HASH_ITER(hh,l_ch_chain->request_atoms_processed , l_atom_item, l_atom_item_tmp) - HASH_DEL(l_ch_chain->request_atoms_processed, l_atom_item); + HASH_ITER( hh,l_ch_chain->request_atoms_lasts, l_atom_item, l_atom_item_tmp) + HASH_DEL( l_ch_chain->request_atoms_lasts, l_atom_item ); + + HASH_ITER( hh, l_ch_chain->request_atoms_processed, l_atom_item, l_atom_item_tmp ) + HASH_DEL( l_ch_chain->request_atoms_processed, l_atom_item ); + + dap_stream_ch_set_ready_to_write( a_ch, false ); + } + break; - dap_stream_ch_set_ready_to_write(a_ch, false); - }break; case CHAIN_STATE_SYNC_ALL: + log_it( L_DEBUG,"CHAIN_STATE_SYNC_ALL"); + case CHAIN_STATE_SYNC_GLOBAL_DB:{ + + log_it( L_DEBUG,"CHAIN_STATE_SYNC_GLOBAL_DB"); +// sleep(1); + // Get log diff size_t l_data_size_out = 0; - dap_list_t *l_list = dap_list_last(l_ch_chain->request_global_db_trs); - //printf("*len=%d\n", len); - if(l_list) { - size_t len = dap_list_length(l_list); - size_t l_item_size_out = 0; - uint8_t * l_item = dap_db_log_pack((dap_global_db_obj_t *) l_list->data, &l_item_size_out); - if(!l_item) { - log_it(L_WARNING,"Log pack returned NULL???"); - // remove current item from list - dap_chain_global_db_obj_delete((dap_global_db_obj_t *) l_list->data); + +// log_it( L_DEBUG,"dap_list_last( ) l_ch_chain %x->request_global_db_trs %X", l_ch_chain, l_ch_chain->request_global_db_trs ); +// sleep(1); + + dap_list_t *l_list = dap_list_last( l_ch_chain->request_global_db_trs ); + +/** + log_it( L_NOTICE,"last of request_global_db_trs l_list = %X", l_list ); + + log_it( L_WARNING,"Listing request_global_db_trs: ========" ); + { + dap_list_t *l_item = l_ch_chain->request_global_db_trs; + if ( l_item ) { + log_it( L_NOTICE,"list request_global_db_trs %X", l_item ); + + while( l_item ) { + log_it( L_DEBUG,"list item %X data = %X", l_item, l_item->data ); + l_item = l_item->next; + } + } + else { + + log_it( L_NOTICE,"request_global_db_trs EMPTY %X", l_item ); + } + } +**/ + +// log_it( L_WARNING,"Listing request_global_db_trs: END ====" ); + +// dap_list_t *request_global_db_trs; // list of transactions + +// log_it( L_DEBUG,"dap_list_last( ) ok"); +// sleep(1); + + //log_it( L_DEBUG,"*len=%d", len); + + if ( l_list ) { + + log_it( L_DEBUG,"l_list = %X", l_list ); + + size_t len = dap_list_length( l_list ); + size_t l_item_size_out = 0; + +// log_it( L_DEBUG,"len = dap_list_length(l_list); = %u", len ); + + uint8_t *l_item = dap_db_log_pack( (dap_global_db_obj_t *) l_list->data, &l_item_size_out ); + + if ( !l_item ) { + + log_it( L_WARNING, "Log pack returned NULL???" ); + + dap_chain_global_db_obj_delete( (dap_global_db_obj_t *) l_list->data ); + l_list = l_list->prev; - if (l_list){ - dap_list_free(l_list->next); + + if ( l_list ) { + dap_list_free( l_list->next ); l_list->next = NULL; - }else + } + else l_ch_chain->request_global_db_trs = NULL; + } - dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, + + dap_stream_ch_chain_pkt_write( a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, l_item, l_item_size_out) ; - DAP_DELETE( l_item); + l_ch_chain->request_cell_id, l_item, l_item_size_out ); + + DAP_DELETE( l_item ); + // remove current item from list - if (l_list ){ - dap_chain_global_db_obj_delete((dap_global_db_obj_t *) l_list->data); - l_list = l_list->prev; - if (l_list){ - dap_list_free(l_list->next); - l_list->next = NULL; - }else + if ( l_list ) { + +// log_it( L_DEBUG,"dap_chain_global_db_obj_delete: list(%X)->data(%X)", l_list, l_list->data ); + + dap_chain_global_db_obj_delete( (dap_global_db_obj_t *)l_list->data ); + +// l_list = l_list->prev; +// if ( l_list ) { +// dap_list_free( l_list->next ); +// l_list->next = NULL; +// } +// else +// l_ch_chain->request_global_db_trs = NULL; + +// log_it( L_DEBUG,"dap_list_delete_link: l_ch_chain->request_global_db_trs(%X), l_list(%X)", l_ch_chain->request_global_db_trs, l_list ); + dap_list_delete_link( l_ch_chain->request_global_db_trs, l_list ); + + if ( l_list == l_ch_chain->request_global_db_trs ) { +// log_it( L_DEBUG,"l_list == l_ch_chain->request_global_db_trs: l_ch_chain->request_global_db_trs = NULL;" ); l_ch_chain->request_global_db_trs = NULL; + } +// log_it( L_DEBUG,"after all l_ch_chain->request_global_db_trs = %X, l_list = %X", l_ch_chain->request_global_db_trs, l_list ); + } - }else{ - if ( l_ch_chain->state == CHAIN_STATE_SYNC_GLOBAL_DB){ + } else { +// log_it( L_DEBUG,"l_list == 0, STOP"); + + if ( l_ch_chain->state == CHAIN_STATE_SYNC_GLOBAL_DB ) { // last message +// log_it( L_DEBUG,"dap_db_log_get_last_timestamp_remote"); + dap_stream_ch_chain_sync_request_t l_request = {{0}}; l_request.ts_start = dap_db_log_get_last_timestamp_remote(l_ch_chain->request.node_addr.uint64); - dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + + dap_stream_ch_chain_pkt_write( a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, l_ch_chain->request_net_id, l_ch_chain->request_chain_id, - l_ch_chain->request_cell_id, &l_request, sizeof(l_request)); + l_ch_chain->request_cell_id, &l_request, sizeof(l_request) ); + // dap_stream_ch_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB ,&l_request, // sizeof (l_request)); + l_ch_chain->state = CHAIN_STATE_IDLE; + +// log_it( L_DEBUG," callback_notify_packet_out" ); + if (l_ch_chain->callback_notify_packet_out ) - l_ch_chain->callback_notify_packet_out(l_ch_chain,DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, - NULL,0,l_ch_chain->callback_notify_arg ); + l_ch_chain->callback_notify_packet_out( l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, + NULL, 0, l_ch_chain->callback_notify_arg ); } } @@ -385,6 +480,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch , void* a_arg) // Syncronyze chains case CHAIN_STATE_SYNC_CHAINS:{ + log_it( L_DEBUG,"CHAIN_STATE_SYNC_CHAINS"); dap_chain_t * l_chain = l_ch_chain->request_atom_iter->chain; dap_chain_atom_item_t * l_atom_item = NULL, * l_atom_item_tmp = NULL, *l_chains_lasts_new = NULL; if ( l_ch_chain->request_atoms_lasts == NULL) { // All chains synced @@ -441,7 +537,10 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch , void* a_arg) }break; } + if ( l_ch_chain->state == CHAIN_STATE_SYNC_ALL) { + log_it( L_DEBUG,"<<CHAIN_STATE_SYNC_ALL>>"); + dap_stream_ch_chain_pkt_write(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB, l_ch_chain->request_net_id, l_ch_chain->request_chain_id, l_ch_chain->request_cell_id, NULL,0); @@ -452,4 +551,6 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch , void* a_arg) l_ch_chain->callback_notify_packet_out(l_ch_chain,DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, NULL,0,l_ch_chain->callback_notify_arg ); } + +// log_it( L_DEBUG,"s_stream_ch_packet_out ok"); } -- GitLab