Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (15)
......@@ -3,7 +3,6 @@ INCLUDEPATH += $$PWD
HEADERS += $$PWD/common_salsa.h \
$$PWD/crypto_core_salsa2012.h \
$$PWD/crypto_stream_salsa2012.h \
$$PWD/utils.h
SOURCES += $$PWD/core_salsa_ref.c \
$$PWD/stream_salsa2012.c \
......
......@@ -85,6 +85,7 @@ static struct epoll_event *threads_epoll_events = NULL;
static dap_server_t *_current_run_server = NULL;
static void read_write_cb( dap_client_remote_t *dap_cur, int32_t revents );
void *thread_loop( void *arg );
dap_server_thread_t dap_server_threads[ DAP_MAX_THREADS ];
......@@ -170,6 +171,26 @@ int32_t dap_server_init( uint32_t count_threads )
log_it( L_NOTICE, "Initialized socket server module" );
dap_client_remote_init( );
pthread_t thread_listener[ DAP_MAX_THREADS ];
for( uint32_t i = 0; i < _count_threads; ++i ) {
EPOLL_HANDLE efd = epoll_create1( 0 );
if ( (intptr_t)efd == -1 ) {
log_it( L_ERROR, "Can't create epoll instance" );
goto err;
}
dap_server_threads[ i ].epoll_fd = efd;
dap_server_threads[ i ].thread_num = i;
}
for( uint32_t i = 0; i < _count_threads; ++i ) {
pthread_create( &thread_listener[i], NULL, thread_loop, &dap_server_threads[i] );
}
return 0;
err:;
......@@ -180,7 +201,6 @@ err:;
void dap_server_loop_stop( void ){
bQuitSignal = true;
dap_server_deinit();
}
/*
......@@ -208,6 +228,14 @@ void dap_server_deinit( void )
pthread_mutex_destroy( &dap_server_threads[i].mutex_on_hash );
pthread_mutex_destroy( &dap_server_threads[i].mutex_dlist_add_remove );
if ( (intptr_t)dap_server_threads[ i ].epoll_fd != -1 ) {
#ifndef _WIN32
close( dap_server_threads[ i ].epoll_fd );
#else
epoll_close( dap_server_threads[ i ].epoll_fd );
#endif
}
}
}
moduleInit = false;
......@@ -738,106 +766,73 @@ void *thread_loop( void *arg )
*/
int32_t dap_server_loop( dap_server_t *d_server )
{
static uint32_t pickthread = 0; // just for test
pthread_t thread_listener[ DAP_MAX_THREADS ];
for( uint32_t i = 0; i < _count_threads; ++i ) {
EPOLL_HANDLE efd = epoll_create1( 0 );
// log_it( L_ERROR, "EPOLL_HANDLE efd %u for thread %u created", efd, i );
if ( (intptr_t)efd == -1 ) {
log_it( L_ERROR, "Server wakeup no events / error" );
goto error;
}
dap_server_threads[ i ].epoll_fd = efd;
dap_server_threads[ i ].thread_num = i;
}
int errCode = 0;
for( uint32_t i = 0; i < _count_threads; ++i ) {
pthread_create( &thread_listener[i], NULL, thread_loop, &dap_server_threads[i] );
if(d_server == NULL){
log_it(L_ERROR, "Server is NULL");
return -1;
}
_current_run_server = d_server;
EPOLL_HANDLE efd = epoll_create1( 0 );
if ( (intptr_t)efd == -1 )
goto error;
if ( (intptr_t)efd == -1 ) {
return -10;
}
struct epoll_event pev;
struct epoll_event events[ 16 ];
if(d_server){
memset(&pev, 0, sizeof(pev));
pev.events = EPOLLIN | EPOLLERR;
pev.data.fd = d_server->socket_listener;
memset(&pev, 0, sizeof(pev));
pev.events = EPOLLIN | EPOLLERR;
pev.data.fd = d_server->socket_listener;
if( epoll_ctl( efd, EPOLL_CTL_ADD, d_server->socket_listener, &pev) != 0 ) {
log_it( L_ERROR, "epoll_ctl failed 004" );
goto error;
}
if( epoll_ctl( efd, EPOLL_CTL_ADD, d_server->socket_listener, &pev) != 0 ) {
log_it( L_ERROR, "epoll_ctl failed 004" );
return -20;
}
while( !bQuitSignal ) {
if(d_server){
int32_t n = epoll_wait( efd, &events[0], 16, -1 );
if ( bQuitSignal )
break;
if ( n <= 0 ) {
if ( errno == EINTR )
continue;
log_it( L_ERROR, "Server wakeup no events / error" );
break;
}
while( !bQuitSignal && errCode == 0 ) {
int32_t n = epoll_wait( efd, &events[0], 16, 1000 );
for( int32_t i = 0; i < n; ++ i ) {
if ( bQuitSignal )
break;
if ( events[i].events & EPOLLIN ) {
if ( n < 0 ) {
if ( errno == EINTR )
continue;
log_it( L_ERROR, "Server wakeup on error: %i", errno );
errCode = -30;
}
int client_fd = accept( events[i].data.fd, 0, 0 );
for( int32_t i = 0; i < n && errCode == 0; ++i ) {
if ( client_fd < 0 ) {
log_it( L_ERROR, "accept_cb: error accept socket");
continue;
}
if ( events[i].events & EPOLLIN ) {
int client_fd = accept( events[i].data.fd, 0, 0 );
set_nonblock_socket( client_fd );
dap_server_add_socket( client_fd, -1 );
}
else if( events[i].events & EPOLLERR ) {
log_it( L_ERROR, "Server socket error event" );
goto exit;
if ( client_fd < 0 ) {
log_it( L_ERROR, "accept_cb: error accept socket");
continue;
}
} // for
}else{
static const int c_dap_server_client_mode_tick_rate = 200;
dap_usleep(DAP_USEC_PER_SEC / c_dap_server_client_mode_tick_rate);
}
set_nonblock_socket( client_fd );
dap_server_add_socket( client_fd, -1 );
}
else if( events[i].events & EPOLLERR ) {
log_it( L_ERROR, "Server socket error event" );
errCode = -40;
}
} // for
} // while
exit:;
#ifndef _WIN32
close( efd );
#else
epoll_close( efd );
#endif
error:;
bQuitSignal = true;
for( uint32_t i = 0; i < _count_threads; ++i ) {
if ( (intptr_t)dap_server_threads[ i ].epoll_fd != -1 ) {
#ifndef _WIN32
close( dap_server_threads[ i ].epoll_fd );
#else
epoll_close( dap_server_threads[ i ].epoll_fd );
#endif
}
if (efd != -1) {
#ifndef _WIN32
close( efd );
#else
epoll_close( efd );
#endif
}
return 0;
return errCode;
}
......@@ -94,7 +94,6 @@ void dap_chain_deinit(void)
pthread_rwlock_wrlock(&s_chain_items_rwlock);
HASH_ITER(hh, s_chain_items, l_item, l_tmp) {
dap_chain_delete(s_chain_items->chain);
DAP_DELETE(l_item);
}
pthread_rwlock_unlock(&s_chain_items_rwlock);
}
......
......@@ -252,7 +252,7 @@ int dap_chain_ledger_token_add(dap_ledger_t * a_ledger, dap_chain_datum_token_t
switch(a_token->type){
case DAP_CHAIN_DATUM_TOKEN_TYPE_SIMPLE:
l_token_item->total_supply = a_token->header_private.total_supply;
log_it( L_NOTICE, "Private token %s added (total_supply = %.1llf signs_valid=%hu signs_total=%hu type=DAP_CHAIN_DATUM_TOKEN_PRIVATE )",
log_it( L_NOTICE, "Private token %s added (total_supply = %.1llf total_signs_valid=%hu signs_total=%hu type=DAP_CHAIN_DATUM_TOKEN_PRIVATE )",
a_token->ticker, dap_chain_balance_to_coins(a_token->header_private.total_supply),
a_token->header_private.signs_valid, a_token->header_private.signs_total);
break;
......@@ -333,7 +333,7 @@ int dap_chain_ledger_token_emission_add_check(dap_ledger_t *a_ledger, const dap_
HASH_FIND(hh,l_token_item ? l_token_item->token_emissions : l_ledger_priv->treshold_emissions,
&l_token_emission_hash, sizeof(l_token_emission_hash), l_token_emission_item);
if(l_token_emission_item ) {
log_it(L_ERROR, "Can't add token emission datum of %llu %s ( 0x%s ): already present in cache",
log_it(L_ERROR, "Can't add token emission datum of %llu %s ( %s ): already present in cache",
a_token_emission->hdr.value, c_token_ticker, l_hash_str);
ret = -1;
}else if ( (! l_token_item) && ( HASH_COUNT( l_ledger_priv->treshold_emissions) >= s_treshold_emissions_max )) {
......@@ -412,7 +412,7 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger,
ret = -2;
}
} else {
log_it(L_ERROR, "Can't add token emission datum of %llu %s ( 0x%s )",
log_it(L_ERROR, "Can't add token emission datum of %llu %s ( %s )",
a_token_emission->hdr.value, c_token_ticker, l_hash_str);
ret = -1;
}
......
......@@ -831,7 +831,7 @@ int dap_chain_node_cli_init(dap_config_t * g_config)
"\t -flags_set <value>:\t Set list of flags from <value> to token declaration\n"
"\t -flags_unset <value>:\t Unset list of flags from <value> from token declaration\n"
"\t -total_supply <value>:\t Set total supply - emission's maximum - to the <value>\n"
"\t -signs_valid <value>:\t Set valid signatures count's minimum\n"
"\t -total_signs_valid <value>:\t Set valid signatures count's minimum\n"
"\t -signs_add <value>:\t Add signature's pkey fingerprint to the list of owners\n"
"\t -signs_remove <value>:\t Remove signature's pkey fingerprint from the owners\n"
"\nDatum type allowed/blocked updates:\n"
......@@ -895,7 +895,7 @@ int dap_chain_node_cli_init(dap_config_t * g_config)
"General:\n"
"\t -flags <value>:\t List of flags from <value> to token declaration\n"
"\t -total_supply <value>:\t Set total supply - emission's maximum - to the <value>\n"
"\t -signs_valid <value>:\t Set valid signatures count's minimum\n"
"\t -total_signs_valid <value>:\t Set valid signatures count's minimum\n"
"\t -signs <value>:\t Signature's fingerprint list\n"
"\nDatum type allowed/blocked:\n"
"\t -datum_type_allowed <value>:\t Set allowed datum type(s)\n"
......
......@@ -244,6 +244,41 @@ void dap_chain_cs_dag_delete(dap_chain_t * a_chain)
DAP_DELETE(l_dag->_pvt);
}
int dap_chain_add_to_ledger(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger, dap_chain_cs_dag_event_item_t * a_event_item){
dap_chain_datum_t *l_datum = (dap_chain_datum_t*) dap_chain_cs_dag_event_get_datum(a_event_item->event);
switch (l_datum->header.type_id) {
case DAP_CHAIN_DATUM_TOKEN_DECL: {
dap_chain_datum_token_t *l_token = (dap_chain_datum_token_t*) l_datum->data;
dap_chain_ledger_token_add(a_ledger, l_token, l_datum->header.data_size);
}
break;
case DAP_CHAIN_DATUM_TOKEN_EMISSION: {
dap_chain_datum_token_emission_t *l_token_emission = (dap_chain_datum_token_emission_t*) l_datum->data;
dap_chain_ledger_token_emission_add(a_ledger, l_token_emission, l_datum->header.data_size);
}
break;
case DAP_CHAIN_DATUM_TX: {
dap_chain_datum_tx_t *l_tx = (dap_chain_datum_tx_t*) l_datum->data;
dap_chain_cs_dag_event_item_t * l_tx_event= DAP_NEW_Z(dap_chain_cs_dag_event_item_t);
l_tx_event->ts_added = a_event_item->ts_added;
l_tx_event->event = a_event_item->event;
memcpy(&l_tx_event->hash, &a_event_item->hash, sizeof (l_tx_event->hash) );
HASH_ADD(hh,PVT(a_dag)->tx_events,hash,sizeof (l_tx_event->hash),l_tx_event);
// don't save bad transactions to base
if(dap_chain_ledger_tx_add(a_ledger, l_tx) != 1) {
return -1;
}
}
break;
default:
return -1;
}
return 0;
}
/**
* @brief s_chain_callback_atom_add Accept new event in dag
* @param a_chain DAG object
......@@ -252,20 +287,18 @@ void dap_chain_cs_dag_delete(dap_chain_t * a_chain)
*/
static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t a_atom)
{
bool l_add_to_threshold = false;
int ret = s_chain_callback_atom_verify (a_chain, a_atom);
if ( ret < 0 ){
log_it(L_WARNING,"Wrong event, can't accept, verification returned %d",ret);
return -1;
}else if( ret > 0){
l_add_to_threshold = true;
}
dap_chain_cs_dag_t * l_dag = DAP_CHAIN_CS_DAG(a_chain);
dap_chain_cs_dag_event_t * l_event = (dap_chain_cs_dag_event_t *) a_atom;
// verification was already in s_chain_callback_atom_verify()
int ret_cs = l_dag->callback_cs_verify(l_dag,l_event);
if ( ret_cs != 0 ){
log_it(L_WARNING,"Consensus can't accept the event, verification returned %d",ret_cs);
return -2;
}
dap_chain_cs_dag_event_item_t * l_event_item = DAP_NEW_Z(dap_chain_cs_dag_event_item_t);
l_event_item->event = l_event;
l_event_item->ts_added = time(NULL);
......@@ -273,7 +306,7 @@ static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t
// Put in main table or in the treshhold if not all the rest linked event are present
dap_chain_cs_dag_event_item_t * l_event_search = NULL;
dap_chain_cs_dag_event_item_t * l_events =( (ret==0 && ret_cs == 0)? PVT(l_dag)->events : PVT(l_dag)->events_treshold );
dap_chain_cs_dag_event_item_t * l_events =( l_add_to_threshold )? PVT(l_dag)->events_treshold : PVT(l_dag)->events ;
pthread_rwlock_t * l_events_rwlock = &PVT(l_dag)->events_rwlock ;
pthread_rwlock_wrlock( l_events_rwlock );
HASH_FIND(hh, l_events,&l_event_item->hash,sizeof (l_event_search->hash), l_event_search);
......@@ -287,10 +320,11 @@ static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t
}
HASH_ADD(hh, l_events,hash,sizeof (l_event_item->hash), l_event_item);
// save l_events to dag_pvt
if(ret==0 && ret_cs == 0)
PVT(l_dag)->events = l_events;
else
if(l_add_to_threshold)
PVT(l_dag)->events_treshold = l_events;
else
PVT(l_dag)->events = l_events;
//HASH_ADD(hh, PVT(l_dag)->events_treshold, hash, sizeof(l_event_item->hash), l_event_item);
pthread_rwlock_unlock( l_events_rwlock );
if ( l_events == PVT(l_dag)->events){
......@@ -320,47 +354,23 @@ static int s_chain_callback_atom_add(dap_chain_t * a_chain, dap_chain_atom_ptr_t
pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock);
}
// add datum from event to ledger
dap_chain_datum_t *l_datum = (dap_chain_datum_t*) dap_chain_cs_dag_event_get_datum(l_event);
switch (l_datum->header.type_id) {
case DAP_CHAIN_DATUM_TOKEN_DECL: {
dap_chain_datum_token_t *l_token = (dap_chain_datum_token_t*) l_datum->data;
dap_chain_ledger_token_add(a_chain->ledger, l_token, l_datum->header.data_size);
}
break;
case DAP_CHAIN_DATUM_TOKEN_EMISSION: {
dap_chain_datum_token_emission_t *l_token_emission = (dap_chain_datum_token_emission_t*) l_datum->data;
dap_chain_ledger_token_emission_add(a_chain->ledger, l_token_emission, l_datum->header.data_size);
}
break;
case DAP_CHAIN_DATUM_TX: {
dap_chain_datum_tx_t *l_tx = (dap_chain_datum_tx_t*) l_datum->data;
dap_chain_cs_dag_event_item_t * l_tx_event= DAP_NEW_Z(dap_chain_cs_dag_event_item_t);
l_tx_event->ts_added = l_event_item->ts_added;
l_tx_event->event = l_event;
memcpy(&l_tx_event->hash, &l_event_item->hash, sizeof (l_tx_event->hash) );
pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock);
HASH_ADD(hh,PVT(l_dag)->tx_events,hash,sizeof (l_tx_event->hash),l_tx_event);
pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock);
//if ( !l_gdb_priv->is_load_mode ) // If its not load module but mempool proc
// l_tx->header.ts_created = time(NULL);
//if(dap_chain_datum_tx_get_size(l_tx) == l_datum->header.data_size){
if(!l_add_to_threshold){
int ret_cs = l_dag->callback_cs_verify(l_dag,l_event);
if ( ret_cs != 0 ){
log_it(L_WARNING,"Consensus can't accept the event, verification returned %d",ret_cs);
return -2;
}
pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock);
int res_ledger = dap_chain_add_to_ledger(l_dag, a_chain->ledger, l_event_item);
pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock);
// don't save bad transactions to base
if(dap_chain_ledger_tx_add(a_chain->ledger, l_tx) != 1) {
return -1;
}
//}else
// return -2;
}
break;
default:
return -1;
if(res_ledger < 0)
return res_ledger;
}
// Now check the treshold if some events now are ready to move to the main table
pthread_rwlock_wrlock(&PVT(l_dag)->events_rwlock);
while(dap_chain_cs_dag_proc_treshold(l_dag));
while(dap_chain_cs_dag_proc_treshold(l_dag, a_chain->ledger));
pthread_rwlock_unlock(&PVT(l_dag)->events_rwlock);
return 0;
......@@ -697,7 +707,7 @@ int dap_chain_cs_dag_event_verify_hashes_with_treshold(dap_chain_cs_dag_t * a_da
* @param a_dag
* @returns true if some atoms were moved from threshold to events
*/
bool dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag)
bool dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger)
{
bool res = false;
// TODO Process finish treshold. For now - easiest from possible
......@@ -710,6 +720,7 @@ bool dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag)
if(ret == DAP_THRESHOLD_OK){
HASH_ADD(hh, PVT(a_dag)->events, hash,sizeof (l_event_item->hash), l_event_item);
int l_ledger_ret = dap_chain_add_to_ledger(a_dag, a_ledger, l_event_item);
res = true;
}else if(ret == DAP_THRESHOLD_CONFLICTING)
HASH_ADD(hh, PVT(a_dag)->events_treshold_conflicted, hash,sizeof (l_event_item->hash), l_event_item);
......
......@@ -65,7 +65,7 @@ void dap_chain_cs_dag_deinit(void);
int dap_chain_cs_dag_new(dap_chain_t * a_chain, dap_config_t * a_chain_cfg);
void dap_chain_cs_dag_delete(dap_chain_t * a_chain);
bool dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag);
bool dap_chain_cs_dag_proc_treshold(dap_chain_cs_dag_t * a_dag, dap_ledger_t * a_ledger);
void dap_chain_cs_dag_proc_event_round_new(dap_chain_cs_dag_t *a_dag);
dap_chain_cs_dag_event_t* dap_chain_cs_dag_find_event_by_hash(dap_chain_cs_dag_t * a_dag,
......