Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (6)
...@@ -2,7 +2,7 @@ project(cellframe-sdk C) ...@@ -2,7 +2,7 @@ project(cellframe-sdk C)
cmake_minimum_required(VERSION 3.0) cmake_minimum_required(VERSION 3.0)
set(CMAKE_C_STANDARD 11) set(CMAKE_C_STANDARD 11)
set(CELLFRAME_SDK_NATIVE_VERSION "2.9-5") set(CELLFRAME_SDK_NATIVE_VERSION "2.9-6")
add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"") add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"")
set(DAPSDK_MODULES "") set(DAPSDK_MODULES "")
message("Cellframe modules: ${CELLFRAME_MODULES}") message("Cellframe modules: ${CELLFRAME_MODULES}")
......
...@@ -1305,6 +1305,8 @@ static void s_stream_es_callback_write(dap_events_socket_t * a_es, void * arg) ...@@ -1305,6 +1305,8 @@ static void s_stream_es_callback_write(dap_events_socket_t * a_es, void * arg)
// Response received after client_pvt was deleted // Response received after client_pvt was deleted
return; return;
} }
if (l_client_pvt->stage_status == STAGE_STATUS_ERROR)
return;
switch (l_client_pvt->stage) { switch (l_client_pvt->stage) {
case STAGE_STREAM_STREAMING: { case STAGE_STREAM_STREAMING: {
size_t i; size_t i;
......
...@@ -217,7 +217,6 @@ int dap_events_init( uint32_t a_threads_count, size_t a_conn_timeout ) ...@@ -217,7 +217,6 @@ int dap_events_init( uint32_t a_threads_count, size_t a_conn_timeout )
err: err:
log_it(L_ERROR,"Deinit events subsystem"); log_it(L_ERROR,"Deinit events subsystem");
dap_events_deinit(); dap_events_deinit();
dap_worker_deinit();
return -1; return -1;
} }
...@@ -226,8 +225,11 @@ err: ...@@ -226,8 +225,11 @@ err:
*/ */
void dap_events_deinit( ) void dap_events_deinit( )
{ {
dap_proc_thread_deinit();
dap_events_socket_deinit(); dap_events_socket_deinit();
dap_worker_deinit(); dap_worker_deinit();
dap_events_wait(s_events_default);
if ( s_threads ) if ( s_threads )
DAP_DELETE( s_threads ); DAP_DELETE( s_threads );
...@@ -379,7 +381,7 @@ int dap_events_wait( dap_events_t *a_events ) ...@@ -379,7 +381,7 @@ int dap_events_wait( dap_events_t *a_events )
void dap_events_stop_all( ) void dap_events_stop_all( )
{ {
for( uint32_t i = 0; i < s_threads_count; i ++ ) { for( uint32_t i = 0; i < s_threads_count; i ++ ) {
dap_events_socket_event_signal( s_workers[i]->event_exit, 0); dap_events_socket_event_signal( s_workers[i]->event_exit, 1);
} }
// TODO implement signal to stop the workers // TODO implement signal to stop the workers
} }
......
...@@ -62,6 +62,7 @@ static size_t s_threads_count = 0; ...@@ -62,6 +62,7 @@ static size_t s_threads_count = 0;
static bool s_debug_reactor = false; static bool s_debug_reactor = false;
static dap_proc_thread_t * s_threads = NULL; static dap_proc_thread_t * s_threads = NULL;
static void * s_proc_thread_function(void * a_arg); static void * s_proc_thread_function(void * a_arg);
static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags);
/** /**
* @brief dap_proc_thread_init * @brief dap_proc_thread_init
...@@ -97,14 +98,19 @@ int dap_proc_thread_init(uint32_t a_threads_count){ ...@@ -97,14 +98,19 @@ int dap_proc_thread_init(uint32_t a_threads_count){
*/ */
void dap_proc_thread_deinit() void dap_proc_thread_deinit()
{ {
// Signal to cancel working threads and wait for finish for (uint32_t i = 0; i < s_threads_count; i++){
// TODO: Android realization dap_events_socket_event_signal(s_threads[i].event_exit, 1);
#ifndef DAP_OS_ANDROID
for (size_t i = 0; i < s_threads_count; i++ ){
pthread_cancel(s_threads[i].thread_id);
pthread_join(s_threads[i].thread_id, NULL); pthread_join(s_threads[i].thread_id, NULL);
} }
#endif
// Signal to cancel working threads and wait for finish
// TODO: Android realization
//#ifndef DAP_OS_ANDROID
// for (size_t i = 0; i < s_threads_count; i++ ){
// pthread_cancel(s_threads[i].thread_id);
// pthread_join(s_threads[i].thread_id, NULL);
// }
//#endif
} }
...@@ -145,13 +151,15 @@ dap_proc_thread_t * dap_proc_thread_get_auto() ...@@ -145,13 +151,15 @@ dap_proc_thread_t * dap_proc_thread_get_auto()
static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_value) static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_value)
{ {
(void) a_value; (void) a_value;
// log_it(L_DEBUG, "--> Proc event callback start"); if(s_debug_reactor)
log_it(L_DEBUG, "--> Proc event callback start");
dap_proc_thread_t * l_thread = (dap_proc_thread_t *) a_esocket->_inheritor; dap_proc_thread_t * l_thread = (dap_proc_thread_t *) a_esocket->_inheritor;
dap_proc_queue_item_t * l_item = l_thread->proc_queue->item_first; dap_proc_queue_item_t * l_item = l_thread->proc_queue->item_first;
dap_proc_queue_item_t * l_item_old = NULL; dap_proc_queue_item_t * l_item_old = NULL;
bool l_is_anybody_for_repeat=false; bool l_is_anybody_for_repeat=false;
while(l_item){ while(l_item){
// log_it(L_INFO, "Proc event callback: %p/%p", l_item->callback, l_item->callback_arg); if(s_debug_reactor)
log_it(L_INFO, "Proc event callback: %p/%p", l_item->callback, l_item->callback_arg);
bool l_is_finished = l_item->callback(l_thread, l_item->callback_arg); bool l_is_finished = l_item->callback(l_thread, l_item->callback_arg);
if (l_is_finished){ if (l_is_finished){
if ( l_item->prev ){ if ( l_item->prev ){
...@@ -176,9 +184,11 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_va ...@@ -176,9 +184,11 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_va
DAP_DELETE(l_item); DAP_DELETE(l_item);
l_item = l_thread->proc_queue->item_first; l_item = l_thread->proc_queue->item_first;
} }
// log_it(L_DEBUG, "Proc event finished"); if(s_debug_reactor)
log_it(L_DEBUG, "Proc event finished");
}else{ }else{
// log_it(L_DEBUG, "Proc event not finished"); if(s_debug_reactor)
log_it(L_DEBUG, "Proc event not finished");
l_item_old = l_item; l_item_old = l_item;
l_item=l_item->prev; l_item=l_item->prev;
} }
...@@ -186,7 +196,8 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_va ...@@ -186,7 +196,8 @@ static void s_proc_event_callback(dap_events_socket_t * a_esocket, uint64_t a_va
} }
if(l_is_anybody_for_repeat) // Arm event if we have smth to proc again if(l_is_anybody_for_repeat) // Arm event if we have smth to proc again
dap_events_socket_event_signal(a_esocket,1); dap_events_socket_event_signal(a_esocket,1);
// log_it(L_DEBUG, "<-- Proc event callback end"); if(s_debug_reactor)
log_it(L_DEBUG, "<-- Proc event callback end");
} }
...@@ -393,7 +404,11 @@ static void * s_proc_thread_function(void * a_arg) ...@@ -393,7 +404,11 @@ static void * s_proc_thread_function(void * a_arg)
dap_events_socket_assign_on_worker_mt(l_worker_related->proc_queue_input,l_worker_related); dap_events_socket_assign_on_worker_mt(l_worker_related->proc_queue_input,l_worker_related);
l_thread->proc_event = dap_events_socket_create_type_event_unsafe(NULL, s_proc_event_callback); l_thread->proc_event = dap_events_socket_create_type_event_unsafe(NULL, s_proc_event_callback);
l_thread->event_exit = dap_events_socket_create_type_event_unsafe(NULL, s_event_exit_callback);
l_thread->proc_event->_inheritor = l_thread; // we pass thread through it l_thread->proc_event->_inheritor = l_thread; // we pass thread through it
l_thread->event_exit->_inheritor = l_thread;
size_t l_workers_count= dap_events_worker_get_count(); size_t l_workers_count= dap_events_worker_get_count();
assert(l_workers_count); assert(l_workers_count);
l_thread->queue_assign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)*l_workers_count ); l_thread->queue_assign_input = DAP_NEW_Z_SIZE(dap_events_socket_t*, sizeof (dap_events_socket_t*)*l_workers_count );
...@@ -435,6 +450,18 @@ static void * s_proc_thread_function(void * a_arg) ...@@ -435,6 +450,18 @@ static void * s_proc_thread_function(void * a_arg)
log_it(L_CRITICAL, "Can't add proc event on epoll ctl, err: %d", errno); log_it(L_CRITICAL, "Can't add proc event on epoll ctl, err: %d", errno);
return NULL; return NULL;
} }
// Add exit event
l_thread->event_exit->ev.events = l_thread->event_exit->ev_base_flags;
l_thread->event_exit->ev.data.ptr = l_thread->event_exit;
if( epoll_ctl(l_thread->epoll_ctl, EPOLL_CTL_ADD, l_thread->event_exit->socket , &l_thread->event_exit->ev) != 0 ){
#ifdef DAP_OS_WINDOWS
errno = WSAGetLastError();
#endif
log_it(L_CRITICAL, "Can't add exit event on epoll ctl, err: %d", errno);
return NULL;
}
for (size_t n = 0; n< dap_events_worker_get_count(); n++){ for (size_t n = 0; n< dap_events_worker_get_count(); n++){
// Queue asssign // Queue asssign
...@@ -485,6 +512,13 @@ static void * s_proc_thread_function(void * a_arg) ...@@ -485,6 +512,13 @@ static void * s_proc_thread_function(void * a_arg)
l_thread->poll[l_thread->poll_count].events = l_thread->proc_event->poll_base_flags; l_thread->poll[l_thread->poll_count].events = l_thread->proc_event->poll_base_flags;
l_thread->esockets[l_thread->poll_count] = l_thread->proc_event; l_thread->esockets[l_thread->poll_count] = l_thread->proc_event;
l_thread->poll_count++; l_thread->poll_count++;
// Add exit event
l_thread->poll[l_thread->poll_count].fd = l_thread->event_exit->fd;
l_thread->poll[l_thread->poll_count].events = l_thread->event_exit->poll_base_flags;
l_thread->esockets[l_thread->poll_count] = l_thread->event_exit;
l_thread->poll_count++;
for (size_t n = 0; n< dap_events_worker_get_count(); n++){ for (size_t n = 0; n< dap_events_worker_get_count(); n++){
dap_events_socket_t * l_queue_assign_input = l_thread->queue_assign_input[n]; dap_events_socket_t * l_queue_assign_input = l_thread->queue_assign_input[n];
...@@ -522,6 +556,7 @@ static void * s_proc_thread_function(void * a_arg) ...@@ -522,6 +556,7 @@ static void * s_proc_thread_function(void * a_arg)
dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->proc_queue->esocket); dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->proc_queue->esocket);
dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->proc_event); dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->proc_event);
dap_proc_thread_assign_esocket_unsafe(l_thread,l_thread->event_exit);
for (size_t n = 0; n< dap_events_worker_get_count(); n++){ for (size_t n = 0; n< dap_events_worker_get_count(); n++){
// Queue asssign // Queue asssign
...@@ -542,8 +577,11 @@ static void * s_proc_thread_function(void * a_arg) ...@@ -542,8 +577,11 @@ static void * s_proc_thread_function(void * a_arg)
pthread_mutex_lock(&l_thread->started_mutex); pthread_mutex_lock(&l_thread->started_mutex);
pthread_mutex_unlock(&l_thread->started_mutex); pthread_mutex_unlock(&l_thread->started_mutex);
pthread_cond_broadcast(&l_thread->started_cond); pthread_cond_broadcast(&l_thread->started_cond);
l_thread->signal_exit = false;
// Main loop // Main loop
while (! l_thread->signal_kill){ while (!l_thread->signal_kill && !l_thread->signal_exit){
int l_selected_sockets; int l_selected_sockets;
size_t l_sockets_max; size_t l_sockets_max;
...@@ -874,7 +912,8 @@ static void * s_proc_thread_function(void * a_arg) ...@@ -874,7 +912,8 @@ static void * s_proc_thread_function(void * a_arg)
} }
#endif #endif
} }
log_it(L_NOTICE, "Stop processing thread #%u", l_thread->cpu_id); log_it(L_ATT, "Stop processing thread #%u", l_thread->cpu_id);
fflush(stdout);
// cleanip inputs // cleanip inputs
for (size_t n=0; n<dap_events_worker_get_count(); n++){ for (size_t n=0; n<dap_events_worker_get_count(); n++){
...@@ -986,3 +1025,13 @@ void dap_proc_thread_worker_exec_callback(dap_proc_thread_t * a_thread, size_t a ...@@ -986,3 +1025,13 @@ void dap_proc_thread_worker_exec_callback(dap_proc_thread_t * a_thread, size_t a
dap_proc_thread_esocket_update_poll_flags(a_thread, a_thread->queue_callback_input[a_worker_id]); dap_proc_thread_esocket_update_poll_flags(a_thread, a_thread->queue_callback_input[a_worker_id]);
#endif #endif
} }
static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags)
{
(void) a_flags;
dap_proc_thread_t * l_thread = (dap_proc_thread_t *) a_es->_inheritor;
l_thread->signal_exit = true;
if(s_debug_reactor)
log_it(L_DEBUG, "Proc_thread :%u signaled to exit", l_thread->cpu_id);
}
...@@ -998,7 +998,8 @@ static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags) ...@@ -998,7 +998,8 @@ static void s_event_exit_callback( dap_events_socket_t * a_es, uint64_t a_flags)
{ {
(void) a_flags; (void) a_flags;
a_es->worker->signal_exit = true; a_es->worker->signal_exit = true;
log_it(L_NOTICE, "Worker :%u signaled to exit", a_es->worker->id); if(s_debug_reactor)
log_it(L_DEBUG, "Worker :%u signaled to exit", a_es->worker->id);
} }
/** /**
......
...@@ -43,6 +43,10 @@ typedef struct dap_proc_thread{ ...@@ -43,6 +43,10 @@ typedef struct dap_proc_thread{
pthread_mutex_t started_mutex; pthread_mutex_t started_mutex;
bool signal_kill; bool signal_kill;
bool signal_exit;
dap_events_socket_t * event_exit;
#ifdef DAP_EVENTS_CAPS_EPOLL #ifdef DAP_EVENTS_CAPS_EPOLL
EPOLL_HANDLE epoll_ctl; EPOLL_HANDLE epoll_ctl;
......
...@@ -92,6 +92,7 @@ void _dap_chain_datum_tx_out_data(dap_chain_datum_tx_t *a_datum, ...@@ -92,6 +92,7 @@ void _dap_chain_datum_tx_out_data(dap_chain_datum_tx_t *a_datum,
char *l_tx_hash_user_str; char *l_tx_hash_user_str;
char l_tx_hash_str[70]; char l_tx_hash_str[70];
dap_chain_hash_fast_to_str(&l_tx_hash, l_tx_hash_str, 70); dap_chain_hash_fast_to_str(&l_tx_hash, l_tx_hash_str, 70);
time_t l_ts_create = (time_t)a_datum->header.ts_created;
if(!dap_strcmp(a_hash_out_type, "hex")) if(!dap_strcmp(a_hash_out_type, "hex"))
l_tx_hash_user_str = dap_strdup(l_tx_hash_str); l_tx_hash_user_str = dap_strdup(l_tx_hash_str);
else else
...@@ -100,8 +101,9 @@ void _dap_chain_datum_tx_out_data(dap_chain_datum_tx_t *a_datum, ...@@ -100,8 +101,9 @@ void _dap_chain_datum_tx_out_data(dap_chain_datum_tx_t *a_datum,
if(a_ledger == NULL){ if(a_ledger == NULL){
dap_string_append_printf(a_str_out, "transaction: %s hash: %s\n Items:\n", l_list_tx_any ? "(emit)" : "", l_tx_hash_user_str); dap_string_append_printf(a_str_out, "transaction: %s hash: %s\n Items:\n", l_list_tx_any ? "(emit)" : "", l_tx_hash_user_str);
} else { } else {
dap_string_append_printf(a_str_out, "transaction: %s hash: %s\n Token ticker: %s\n Items:\n", char buf[50];
l_list_tx_any ? "(emit)" : "", l_tx_hash_user_str, dap_string_append_printf(a_str_out, "transaction: %s hash: %s\n TS Created: %s Token ticker: %s\n Items:\n",
l_list_tx_any ? "(emit)" : "", l_tx_hash_user_str, dap_ctime_r(&l_ts_create, buf),
dap_chain_ledger_tx_get_token_ticker_by_hash(a_ledger, &l_tx_hash)); dap_chain_ledger_tx_get_token_ticker_by_hash(a_ledger, &l_tx_hash));
} }
DAP_DELETE(l_tx_hash_user_str); DAP_DELETE(l_tx_hash_user_str);
......
...@@ -147,6 +147,7 @@ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg) ...@@ -147,6 +147,7 @@ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg)
SetEvent( l_node_client->wait_cond ); SetEvent( l_node_client->wait_cond );
#endif #endif
pthread_mutex_unlock(&l_node_client->wait_mutex); pthread_mutex_unlock(&l_node_client->wait_mutex);
l_node_client->own_esh.esocket = 0;
dap_timerfd_start_on_worker(dap_events_worker_get_auto(),s_timer_update_states*1000,s_timer_update_states_callback, l_node_client); dap_timerfd_start_on_worker(dap_events_worker_get_auto(),s_timer_update_states*1000,s_timer_update_states_callback, l_node_client);
return; return;
} }
...@@ -189,8 +190,8 @@ static bool s_timer_update_states_callback(void * a_arg ) ...@@ -189,8 +190,8 @@ static bool s_timer_update_states_callback(void * a_arg )
dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default()); dap_worker_t * l_worker = dap_events_get_current_worker(dap_events_get_default());
assert(l_worker); assert(l_worker);
assert(l_me); assert(l_me);
dap_events_socket_t * l_es = l_me->own_esh->esocket; dap_events_socket_t * l_es = l_me->own_esh.esocket;
uint128_t l_es_uuid = l_me->own_esh->uuid; uint128_t l_es_uuid = l_me->own_esh.uuid;
// check if esocket still in worker // check if esocket still in worker
if(dap_events_socket_check_unsafe(l_worker,l_es)){ if(dap_events_socket_check_unsafe(l_worker,l_es)){
// Check if its exactly ours! // Check if its exactly ours!
...@@ -230,7 +231,6 @@ static bool s_timer_update_states_callback(void * a_arg ) ...@@ -230,7 +231,6 @@ static bool s_timer_update_states_callback(void * a_arg )
log_it(L_INFO, "Reconnecting node client with peer "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_me->remote_node_addr)); log_it(L_INFO, "Reconnecting node client with peer "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_me->remote_node_addr));
dap_chain_node_client_connect_internal(l_me, "CN"); // isn't always CN here? dap_chain_node_client_connect_internal(l_me, "CN"); // isn't always CN here?
} }
DAP_DELETE(l_me->own_esh);
return false; return false;
} }
...@@ -265,10 +265,8 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg) ...@@ -265,10 +265,8 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg)
dap_stream_t * l_stream = dap_client_get_stream(a_client); dap_stream_t * l_stream = dap_client_get_stream(a_client);
if (l_stream) { if (l_stream) {
dap_events_socket_handler_t * l_es_handler = DAP_NEW_Z(dap_events_socket_handler_t); l_node_client->own_esh.esocket = l_stream->esocket;
l_es_handler->esocket = l_stream->esocket; l_node_client->own_esh.uuid = l_stream->esocket->uuid;
l_es_handler->uuid = l_stream->esocket->uuid;
l_node_client->own_esh = l_es_handler;
dap_timerfd_start_on_worker(l_stream->esocket->worker,s_timer_update_states*1000,s_timer_update_states_callback, l_node_client); dap_timerfd_start_on_worker(l_stream->esocket->worker,s_timer_update_states*1000,s_timer_update_states_callback, l_node_client);
} }
#ifndef _WIN32 #ifndef _WIN32
......
...@@ -94,7 +94,7 @@ typedef struct dap_chain_node_client { ...@@ -94,7 +94,7 @@ typedef struct dap_chain_node_client {
// Timer // Timer
dap_events_socket_t * timer_update_states; dap_events_socket_t * timer_update_states;
dap_events_socket_handler_t *own_esh; dap_events_socket_handler_t own_esh;
#ifndef _WIN32 #ifndef _WIN32
......