Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (4)
Showing
with 336 additions and 251 deletions
......@@ -187,6 +187,46 @@ void dap_client_set_auth_cert_unsafe(dap_client_t * a_client, dap_cert_t *a_cert
DAP_CLIENT_PVT(a_client)->auth_cert = a_cert;
}
/**
* @brief dap_client_set_auth_cert
* @param a_client
* @param a_chain_net_name
* @param a_option
*/
void dap_client_set_auth_cert(dap_client_t *a_client, const char *a_chain_net_name)
{
const char *l_auth_hash_str = NULL;
if(a_client == NULL || a_chain_net_name == NULL){
log_it(L_ERROR,"Chain-net is NULL for dap_client_set_auth_cert");
return;
}
char *l_path = dap_strdup_printf("network/%s", a_chain_net_name);
if (!l_path) {
log_it(L_ERROR, "Can't allocate memory: file: %s line: %d", __FILE__, __LINE__);
return;
}
dap_config_t *l_cfg = dap_config_open(l_path);
free(l_path);
if (!l_cfg) {
log_it(L_ERROR, "Can't allocate memory: file: %s line: %d", __FILE__, __LINE__);
return;
}
dap_cert_t *l_cert = dap_cert_find_by_name(dap_config_get_item_str(l_cfg, "general", "auth_cert"));
if (!l_cert) {
dap_config_close(l_cfg);
log_it(L_ERROR,"l_cert is NULL by dap_cert_find_by_name");
return;
}
dap_client_set_auth_cert_unsafe(a_client, l_cert);
//dap_cert_delete(l_cert);
dap_config_close(l_cfg);
}
/**
* @brief s_client_delete
* @param a_client
......@@ -232,7 +272,7 @@ static void s_go_stage_on_client_worker_unsafe(dap_worker_t * a_worker,void * a_
dap_client_stage_t l_stage_target = ((struct go_stage_arg*) a_arg)->stage_target;
dap_client_callback_t l_stage_end_callback= ((struct go_stage_arg*) a_arg)->stage_end_callback;
dap_client_pvt_t * l_client_pvt = ((struct go_stage_arg*) a_arg)->client_pvt;
dap_client_t * l_client = ((struct go_stage_arg*) a_arg)->client_pvt->client;
dap_client_t *l_client = l_client_pvt->client;
bool l_flag_delete_after = ((struct go_stage_arg *) a_arg)->flag_delete_after ;// Delete after stage achievement
DAP_DELETE(a_arg);
......@@ -321,7 +361,10 @@ void dap_client_go_stage(dap_client_t * a_client, dap_client_stage_t a_stage_tar
}
dap_client_pvt_t * l_client_pvt = dap_client_pvt_find(a_client->pvt_uuid);
assert(l_client_pvt);
if (NULL == l_client_pvt) {
log_it(L_ERROR, "dap_client_go_stage, client_pvt == NULL");
return;
}
struct go_stage_arg *l_stage_arg = DAP_NEW_Z(struct go_stage_arg); if (! l_stage_arg) return;
l_stage_arg->stage_end_callback = a_stage_end_callback;
......
......@@ -159,50 +159,6 @@ void dap_client_pvt_new(dap_client_pvt_t * a_client_pvt)
dap_client_pvt_hh_add_unsafe(a_client_pvt);
}
/**
* @brief dap_client_set_auth_cert
* @param a_client
* @param a_chain_net_name
* @param a_option
*/
void dap_client_set_auth_cert(dap_client_t *a_client, const char *a_chain_net_name)
{
const char *l_auth_hash_str = NULL;
if(a_client == NULL || a_chain_net_name == NULL){
log_it(L_ERROR,"Chain-net is NULL for dap_client_set_auth_cert");
return;
}
char *l_path = dap_strdup_printf("network/%s", a_chain_net_name);
if (!l_path) {
log_it(L_ERROR, "Can't allocate memory: file: %s line: %d", __FILE__, __LINE__);
return;
}
dap_config_t *l_cfg = dap_config_open(l_path);
free(l_path);
if (!l_cfg) {
log_it(L_ERROR, "Can't allocate memory: file: %s line: %d", __FILE__, __LINE__);
return;
}
dap_cert_t *l_cert = dap_cert_find_by_name(dap_config_get_item_str(l_cfg, "general", "auth_cert"));
if (!l_cert) {
dap_config_close(l_cfg);
log_it(L_ERROR,"l_cert is NULL by dap_cert_find_by_name");
return;
}
dap_client_set_auth_cert_unsafe(a_client, l_cert);
//dap_cert_delete(l_cert);
dap_config_close(l_cfg);
}
/**
* @brief dap_client_pvt_delete_unsafe
* @param a_client_pvt
......@@ -980,7 +936,7 @@ static void s_request_response(void * a_response, size_t a_response_size, void *
static void s_enc_init_response(dap_client_t * a_client, void * a_response, size_t a_response_size)
{
dap_client_pvt_t * l_client_pvt = dap_client_pvt_find(a_client->pvt_uuid);
if (!l_client_pvt) return;
if (!l_client_pvt || l_client_pvt->is_to_delete) return;
if (!l_client_pvt->session_key_open){
log_it(L_ERROR, "m_enc_init_response: session is NULL!");
......
......@@ -139,6 +139,7 @@ dap_stream_ch_t * dap_client_get_stream_ch_unsafe(dap_client_t * a_client, uint8
const char * dap_client_get_stream_id(dap_client_t * a_client);
void dap_client_set_active_channels_unsafe (dap_client_t * a_client, const char * a_active_channels);
void dap_client_set_auth_cert_unsafe(dap_client_t * a_client, dap_cert_t *a_cert);
void dap_client_set_auth_cert(dap_client_t *a_client, const char *a_chain_net_name);
dap_client_stage_t dap_client_get_stage(dap_client_t * a_client);
dap_client_stage_status_t dap_client_get_stage_status(dap_client_t * a_client);
......
......@@ -108,6 +108,7 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t
dap_timerfd_t* l_timerfd = dap_timerfd_create( a_timeout_ms, a_callback, a_callback_arg);
if(l_timerfd){
dap_worker_add_events_socket(l_timerfd->events_socket, a_worker);
l_timerfd->worker = a_worker;
return l_timerfd;
}else{
log_it(L_CRITICAL,"Can't create timer");
......@@ -254,52 +255,72 @@ dap_timerfd_t* dap_timerfd_create(uint64_t a_timeout_ms, dap_timerfd_callback_t
return l_timerfd;
}
/**
* @brief s_es_callback_timer
* @param a_event_sock
*/
static void s_es_callback_timer(struct dap_events_socket *a_event_sock)
static void s_timerfd_reset(dap_timerfd_t *a_timerfd, dap_events_socket_t *a_event_sock)
{
dap_timerfd_t *l_timerfd = a_event_sock->_inheritor;
// run user's callback
if(l_timerfd->callback && l_timerfd->callback(l_timerfd->callback_arg)) {
//printf("\nread() returned %d, %d\n", l_ptiu64, l_read_ret);
#if defined DAP_OS_LINUX
struct itimerspec l_ts;
// repeat never
l_ts.it_interval.tv_sec = 0;
l_ts.it_interval.tv_nsec = 0;
// timeout for timer
l_ts.it_value.tv_sec = l_timerfd->timeout_ms / 1000;
l_ts.it_value.tv_nsec = (l_timerfd->timeout_ms % 1000) * 1000000;
if(timerfd_settime(l_timerfd->tfd, 0, &l_ts, NULL) < 0) {
log_it(L_WARNING, "callback_timerfd_read() failed: timerfd_settime() errno=%d\n", errno);
}
struct itimerspec l_ts;
// repeat never
l_ts.it_interval.tv_sec = 0;
l_ts.it_interval.tv_nsec = 0;
// timeout for timer
l_ts.it_value.tv_sec = a_timerfd->timeout_ms / 1000;
l_ts.it_value.tv_nsec = (a_timerfd->timeout_ms % 1000) * 1000000;
if(timerfd_settime(a_timerfd->tfd, 0, &l_ts, NULL) < 0) {
log_it(L_WARNING, "Reset timerfd failed: timerfd_settime() errno=%d\n", errno);
}
#elif defined (DAP_OS_BSD)
dap_worker_add_events_socket_unsafe(a_event_sock,a_event_sock->worker);
//struct kevent * l_event = &a_event_sock->kqueue_event;
//EV_SET(l_event, 0, a_event_sock->kqueue_base_filter, a_event_sock->kqueue_base_flags,a_event_sock->kqueue_base_fflags,a_event_sock->kqueue_data,a_event_sock);
//kevent(a_event_sock->worker->kqueue_fd,l_event,1,NULL,0,NULL);
dap_worker_add_events_socket_unsafe(a_event_sock,a_event_sock->worker);
//struct kevent * l_event = &a_event_sock->kqueue_event;
//EV_SET(l_event, 0, a_event_sock->kqueue_base_filter, a_event_sock->kqueue_base_flags,a_event_sock->kqueue_base_fflags,a_event_sock->kqueue_data,a_event_sock);
//kevent(a_event_sock->worker->kqueue_fd,l_event,1,NULL,0,NULL);
#elif defined (DAP_OS_WINDOWS)
/*LARGE_INTEGER l_due_time;
l_due_time.QuadPart = (long long)l_timerfd->timeout_ms * _MSEC;
if (!SetWaitableTimer(l_timerfd->th, &l_due_time, 0, TimerAPCb, l_timerfd, false)) {
log_it(L_CRITICAL, "Waitable timer not reset, error %d", GetLastError());
CloseHandle(l_timerfd->th);
}*/ // Wtf is this entire thing for?...
/*LARGE_INTEGER l_due_time;
l_due_time.QuadPart = (long long)a_timerfd->timeout_ms * _MSEC;
if (!SetWaitableTimer(a_timerfd->th, &l_due_time, 0, TimerAPCb, a_timerfd, false)) {
log_it(L_CRITICAL, "Waitable timer not reset, error %d", GetLastError());
CloseHandle(a_timerfd->th);
}*/ // Wtf is this entire thing for?...
#else
#error "No timer callback realization for your platform"
#error "No timer reset realization for your platform"
#endif
#ifndef DAP_OS_BSD
dap_events_socket_set_readable_unsafe(a_event_sock, true);
dap_events_socket_set_readable_unsafe(a_event_sock, true);
#endif
}
/**
* @brief s_es_callback_timer
* @param a_event_sock
*/
static void s_es_callback_timer(struct dap_events_socket *a_event_sock)
{
dap_timerfd_t *l_timerfd = a_event_sock->_inheritor;
// run user's callback
if(l_timerfd->callback && l_timerfd->callback(l_timerfd->callback_arg)) {
s_timerfd_reset(l_timerfd, a_event_sock);
} else {
l_timerfd->events_socket->flags |= DAP_SOCK_SIGNAL_CLOSE;
}
}
/**
* @brief dap_timerfd_reset
* @param a_tfd
*/
void dap_timerfd_reset(dap_timerfd_t *a_timerfd)
{
if (!a_timerfd)
return;
dap_events_socket_t *l_sock = NULL;
if (a_timerfd->worker)
l_sock = dap_worker_esocket_find_uuid(a_timerfd->worker, a_timerfd->esocket_uuid);
else if (a_timerfd->proc_thread)
l_sock = a_timerfd->events_socket;
if (l_sock)
s_timerfd_reset(a_timerfd, l_sock);
}
/**
* @brief dap_timerfd_stop
* @param a_tfd
......
......@@ -53,6 +53,8 @@ typedef struct dap_timerfd {
#elif defined(DAP_OS_LINUX)
int tfd; //timer file descriptor
#endif
dap_worker_t *worker;
dap_proc_thread_t *proc_thread;
dap_events_socket_t *events_socket;
dap_events_socket_uuid_t esocket_uuid;
dap_timerfd_callback_t callback;
......@@ -69,4 +71,4 @@ dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a
dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg);
dap_timerfd_t* dap_timerfd_start_on_proc_thread(dap_proc_thread_t * a_proc_thread, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg);
void dap_timerfd_delete(dap_timerfd_t *l_timerfd);
void dap_timerfd_reset(dap_timerfd_t *a_timerfd);
......@@ -646,9 +646,9 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream)
read_bytes_to=0;
if(a_stream->pkt_buf_in_data_size>=(a_stream->pkt_buf_in->hdr.size + sizeof(dap_stream_pkt_hdr_t)) ){ // If we have all the packet in packet buffer
if(a_stream->pkt_buf_in_data_size > a_stream->pkt_buf_in->hdr.size + sizeof(dap_stream_pkt_hdr_t)){ // If we have little more data then we need for packet buffer
//log_it(L_WARNING,"Prefilled packet buffer has %u bytes more than we need, they're lost",a_stream->pkt_buf_in_data_size-a_stream->pkt_buf_in->hdr.size);
log_it(L_WARNING,"Prefilled packet buffer has %zu bytes more than we need, it's lost",a_stream->pkt_buf_in_data_size-a_stream->pkt_buf_in->hdr.size);
DAP_DEL_Z(a_stream->pkt_buf_in);
a_stream->pkt_buf_in_data_size = 0;
a_stream->pkt_buf_in = NULL;
}
else{
s_stream_proc_pkt_in(a_stream);
......@@ -692,24 +692,24 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream)
found_sig=true;
//dap_stream_pkt_t *temp_pkt = dap_stream_pkt_detect( (uint8_t*)pkt + 1 ,pkt->hdr.size+sizeof(stream_pkt_hdr_t) );
size_t l_pkt_size = pkt->hdr.size + sizeof(dap_stream_pkt_hdr_t);
if(bytes_left_to_read >= sizeof (dap_stream_pkt_t)){
if(bytes_left_to_read <(pkt->hdr.size+sizeof(dap_stream_pkt_t) )){ // Is all the packet in da buf?
if (bytes_left_to_read < l_pkt_size) { // Is all the packet in da buf?
read_bytes_to=bytes_left_to_read;
}else{
read_bytes_to=pkt->hdr.size+sizeof(dap_stream_pkt_t);
read_bytes_to = l_pkt_size;
}
}
//log_it(L_DEBUG, "Detected packet signature pkt->hdr.size=%u read_bytes_to=%u bytes_left_to_read=%u pkt_offset=%u"
// ,pkt->hdr.size, read_bytes_to, bytes_left_to_read,pkt_offset);
if(read_bytes_to > HEADER_WITH_SIZE_FIELD){ // If we have size field, we can allocate memory
a_stream->pkt_buf_in_size_expected =( pkt->hdr.size+sizeof(dap_stream_pkt_hdr_t));
size_t pkt_buf_in_size_expected=a_stream->pkt_buf_in_size_expected;
a_stream->pkt_buf_in=(dap_stream_pkt_t *) malloc(pkt_buf_in_size_expected);
if(read_bytes_to>(pkt->hdr.size+sizeof(dap_stream_pkt_hdr_t) )){
a_stream->pkt_buf_in_size_expected = l_pkt_size;
a_stream->pkt_buf_in = DAP_NEW_SIZE(struct dap_stream_pkt, l_pkt_size);
if (read_bytes_to > l_pkt_size) {
//log_it(L_WARNING,"For some strange reasons we have read_bytes_to=%u is bigger than expected pkt length(%u bytes). Dropped %u bytes",
// pkt->hdr.size+sizeof(stream_pkt_hdr_t),read_bytes_to- pkt->hdr.size+sizeof(stream_pkt_hdr_t));
read_bytes_to=(pkt->hdr.size+sizeof(dap_stream_pkt_hdr_t));
read_bytes_to = l_pkt_size;
}
if(read_bytes_to>bytes_left_to_read){
//log_it(L_WARNING,"For some strange reasons we have read_bytes_to=%u is bigger that's left in input buffer (%u bytes). Dropped %u bytes",
......@@ -720,11 +720,11 @@ size_t dap_stream_data_proc_read (dap_stream_t *a_stream)
proc_data+=(read_bytes_to + pkt_offset);
bytes_left_to_read-=read_bytes_to;
a_stream->pkt_buf_in_data_size=(read_bytes_to);
if(a_stream->pkt_buf_in_data_size==(pkt->hdr.size + sizeof(dap_stream_pkt_hdr_t))){
if(a_stream->pkt_buf_in_data_size==l_pkt_size){
// log_it(INFO,"All the packet is present in da buffer (hdr.size=%u read_bytes_to=%u buf_in_size=%u)"
// ,sid->pkt_buf_in->hdr.size,read_bytes_to,sid->conn->buf_in_size);
s_stream_proc_pkt_in(a_stream);
}else if(a_stream->pkt_buf_in_data_size>pkt->hdr.size + sizeof(dap_stream_pkt_hdr_t)){
}else if(a_stream->pkt_buf_in_data_size>l_pkt_size){
//log_it(L_WARNING,"Input: packet buffer has %u bytes more than we need, they're lost",a_stream->pkt_buf_in_data_size-pkt->hdr.size);
}else{
//log_it(L_DEBUG,"Input: Not all stream packet in input (hdr.size=%u read_bytes_to=%u)",a_stream->pkt_buf_in->hdr.size,read_bytes_to);
......@@ -820,13 +820,8 @@ static void s_stream_proc_pkt_in(dap_stream_t * a_stream)
memcpy(l_ret_pkt.sig, c_dap_stream_sig, sizeof(l_ret_pkt.sig));
dap_events_socket_write_unsafe(a_stream->esocket, &l_ret_pkt, sizeof(l_ret_pkt));
// Reset client keepalive timer
if (a_stream->keepalive_timer && a_stream->keepalive_timer->events_socket->worker) {
void *l_arg = a_stream->keepalive_timer->callback_arg;
dap_timerfd_delete(a_stream->keepalive_timer);
a_stream->keepalive_timer = dap_timerfd_start_on_worker(a_stream->stream_worker->worker,
STREAM_KEEPALIVE_TIMEOUT * 1000,
(dap_timerfd_callback_t)s_callback_keepalive,
l_arg);
if (a_stream->keepalive_timer) {
dap_timerfd_reset(a_stream->keepalive_timer);
}
} break;
case STREAM_PKT_TYPE_ALIVE:
......
......@@ -213,7 +213,7 @@ int dap_chain_cell_load(dap_chain_t * a_chain, const char * a_cell_file_path)
unsigned long l_read = fread(l_element, 1, l_el_size, l_f);
if(l_read == l_el_size) {
dap_chain_atom_verify_res_t l_res = a_chain->callback_atom_add(a_chain, l_element, l_el_size); // !!! blocking GDB call !!!
if (l_res == ATOM_PASS && l_res == ATOM_REJECT) {
if (l_res == ATOM_PASS || l_res == ATOM_REJECT) {
DAP_DELETE(l_element);
}
++q;
......
......@@ -1282,7 +1282,7 @@ int dap_chain_ledger_token_emission_add_check(dap_ledger_t *a_ledger, byte_t *a_
*/
int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, byte_t *a_token_emission, size_t a_token_emission_size)
{
int ret = 0;
int l_ret = 0;
dap_ledger_private_t *l_ledger_priv = PVT(a_ledger);
const char * c_token_ticker = ((dap_chain_datum_token_emission_t *)a_token_emission)->hdr.ticker;
......@@ -1321,6 +1321,7 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, byte_t *a_token_
} else {
HASH_ADD(hh, l_ledger_priv->treshold_emissions, datum_token_emission_hash,
sizeof(l_token_emission_hash), l_token_emission_item);
l_ret = DAP_CHAIN_CS_VERIFY_CODE_TX_NO_TOKEN;
}
pthread_rwlock_unlock( l_token_item ? &l_token_item->token_emissions_rwlock
: &l_ledger_priv->treshold_emissions_rwlock);
......@@ -1354,7 +1355,7 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, byte_t *a_token_
if(s_debug_more)
log_it(L_WARNING,"Treshold for emissions is overfulled (%zu max), dropping down new data, added nothing",
s_treshold_emissions_max);
ret = -2;
l_ret = -2;
}
} else {
if (l_token_item) {
......@@ -1367,10 +1368,10 @@ int dap_chain_ledger_token_emission_add(dap_ledger_t *a_ledger, byte_t *a_token_
((dap_chain_datum_token_emission_t *)a_token_emission)->hdr.value, c_token_ticker, l_hash_str);
}
}
ret = -1;
l_ret = -1;
}
DAP_DELETE(l_hash_str);
return ret;
return l_ret;
}
int dap_chain_ledger_token_emission_load(dap_ledger_t *a_ledger, byte_t *a_token_emission, size_t a_token_emission_size)
......
......@@ -127,7 +127,7 @@ int dap_stream_ch_chain_init()
s_stream_ch_packet_out);
s_debug_more = dap_config_get_item_bool_default(g_config,"stream_ch_chain","debug_more",false);
s_update_pack_size = dap_config_get_item_int16_default(g_config,"stream_ch_chain","update_pack_size",100);
s_list_ban_groups = dap_config_get_array_str(g_config, "general", "ban_list_sync_groups", &s_size_ban_groups);
s_list_ban_groups = dap_config_get_array_str(g_config, "stream_ch_chain", "ban_list_sync_groups", &s_size_ban_groups);
return 0;
}
......@@ -150,7 +150,7 @@ void s_stream_ch_new(dap_stream_ch_t* a_ch, void* a_arg)
UNUSED(a_arg);
a_ch->internal = DAP_NEW_Z(dap_stream_ch_chain_t);
dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);
l_ch_chain->ch = a_ch;
l_ch_chain->_inheritor = a_ch;
}
/**
......@@ -339,7 +339,7 @@ static void s_sync_out_gdb_first_worker_callback(dap_worker_t *a_worker, void *a
l_node_addr.uint64 = dap_chain_net_get_cur_addr_int(l_net);
if (s_debug_more )
log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB");
dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch , DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB,
dap_stream_ch_chain_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB,
l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64, &l_node_addr, sizeof(dap_chain_node_addr_t));
if(l_ch_chain->callback_notify_packet_out)
......@@ -371,7 +371,7 @@ static void s_sync_out_gdb_last_worker_callback(dap_worker_t *a_worker, void *a_
if (s_debug_more )
log_it(L_INFO,"Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB");
dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
dap_stream_ch_chain_pkt_write_unsafe(DAP_STREAM_CH(l_ch_chain), DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB,
l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64, NULL, 0);
l_ch_chain->state = CHAIN_STATE_IDLE;
......@@ -724,7 +724,8 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg)
}
}
// save data to global_db
if(!dap_chain_global_db_obj_save(dap_store_obj_copy(l_obj, 1), 1)) {
dap_store_obj_t *l_obj_copy = dap_store_obj_copy(l_obj, 1);
if(!dap_chain_global_db_obj_save(l_obj_copy, 1)) {
struct sync_request *l_sync_req_err = DAP_DUP(l_sync_request);
dap_proc_thread_worker_exec_callback(a_thread, l_sync_request->worker->id,
s_gdb_in_pkt_error_worker_callback, l_sync_req_err);
......@@ -732,6 +733,8 @@ static bool s_gdb_in_pkt_proc_callback(dap_proc_thread_t *a_thread, void *a_arg)
if (s_debug_more)
log_it(L_DEBUG, "Added new GLOBAL_DB synchronization record");
}
DAP_DELETE(l_obj_copy->group);
DAP_DELETE(l_obj_copy);
}
if(l_store_obj) {
dap_store_obj_free(l_store_obj, l_data_obj_count);
......@@ -772,6 +775,53 @@ static void s_stream_ch_write_error_unsafe(dap_stream_ch_t *a_ch, uint64_t a_net
dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, a_net_id, a_chain_id, a_cell_id, a_err_string);
}
static bool s_chain_timer_callback(void *a_arg)
{
dap_worker_t *l_worker = dap_events_get_current_worker(dap_events_get_default());
dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(DAP_STREAM_WORKER(l_worker), *(dap_stream_ch_uuid_t *)a_arg);
if (!l_ch) {
DAP_DELETE(a_arg);
return false;
}
dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
if (!l_ch_chain->was_active) {
if (l_ch_chain->state != CHAIN_STATE_IDLE) {
dap_stream_ch_chain_go_idle(l_ch_chain);
}
DAP_DELETE(a_arg);
l_ch_chain->activity_timer = NULL;
return false;
}
l_ch_chain->was_active = false;
// Sending dumb packet with nothing to inform remote thats we're just skiping atoms of GDB's, nothing freezed
if (l_ch_chain->state == CHAIN_STATE_SYNC_CHAINS)
dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD,
l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64, NULL, 0);
if (l_ch_chain->state == CHAIN_STATE_SYNC_GLOBAL_DB || l_ch_chain->state == CHAIN_STATE_UPDATE_GLOBAL_DB) {
if (s_debug_more)
log_it(L_INFO, "Send one global_db TSD packet (rest=%zu/%zu items)",
dap_db_log_list_get_count_rest(l_ch_chain->request_db_log),
dap_db_log_list_get_count(l_ch_chain->request_db_log));
dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD,
l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64, NULL, 0);
}
return true;
}
static void s_chain_timer_reset(dap_stream_ch_chain_t *a_ch_chain)
{
if (a_ch_chain->state == CHAIN_STATE_IDLE)
return;
if (!a_ch_chain->activity_timer) {
dap_stream_ch_uuid_t *l_uuid = DAP_DUP(&DAP_STREAM_CH(a_ch_chain)->uuid);
a_ch_chain->activity_timer = dap_timerfd_start_on_worker(DAP_STREAM_CH(a_ch_chain)->stream_worker->worker,
3000, s_chain_timer_callback, (void *)l_uuid);
} else
a_ch_chain->was_active = true;
}
/**
* @brief s_stream_ch_packet_in
* @param a_ch
......@@ -795,6 +845,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
sizeof(l_chain_pkt->hdr));
}
s_chain_timer_reset(l_ch_chain);
size_t l_chain_pkt_data_size = l_ch_pkt->hdr.size-sizeof (l_chain_pkt->hdr) ;
uint16_t l_acl_idx = dap_chain_net_acl_idx_by_id(l_chain_pkt->hdr.net_id );
......@@ -1320,7 +1371,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
l_error_str[l_chain_pkt_data_size-1]='\0'; // To be sure that nobody sends us garbage
// without trailing zero
log_it(L_WARNING,"In from remote addr %s chain id 0x%016"DAP_UINT64_FORMAT_x" got error on his side: '%s'",
l_ch_chain->ch->stream->esocket->remote_addr_str ? l_ch_chain->ch->stream->esocket->remote_addr_str: "<no addr>",
DAP_STREAM_CH(l_ch_chain)->stream->esocket->remote_addr_str ? DAP_STREAM_CH(l_ch_chain)->stream->esocket->remote_addr_str: "<no addr>",
l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt_data_size > 1 ? l_error_str:"<empty>");
} break;
......@@ -1388,26 +1439,32 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
return;
dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);
s_chain_timer_reset(l_ch_chain);
switch (l_ch_chain->state) {
// Update list of global DB records to remote
case CHAIN_STATE_UPDATE_GLOBAL_DB: {
dap_stream_ch_chain_update_element_t l_data[s_update_pack_size];
uint_fast16_t i;
dap_db_log_list_obj_t *l_obj = NULL;
for (i = 0; i < s_update_pack_size; i++) {
dap_db_log_list_obj_t *l_obj = dap_db_log_list_get(l_ch_chain->request_db_log);
if (!l_obj)
l_obj = dap_db_log_list_get(l_ch_chain->request_db_log);
if (!l_obj || DAP_POINTER_TO_INT(l_obj) == 1)
break;
memcpy(&l_data[i].hash, &l_obj->hash, sizeof(dap_chain_hash_fast_t));
l_data[i].size = l_obj->pkt->data_size;
}
if (i) {
dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain->ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB,
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB,
l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64,
l_data, i * sizeof(dap_stream_ch_chain_update_element_t));
l_ch_chain->stats_request_gdb_processed += i;
if (s_debug_more)
log_it(L_INFO, "Out: DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB");
} else if (l_obj) {
// We need to return into the write callback
a_ch->stream->esocket->buf_out_zero_count = 0;
} else {
l_ch_chain->request.node_addr.uint64 = dap_chain_net_get_cur_addr_int(dap_chain_net_by_id(
l_ch_chain->request_hdr.net_id));
......@@ -1430,8 +1487,10 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
size_t l_pkt_size = 0;
for (uint_fast16_t l_skip_count = 0; l_skip_count < s_skip_in_reactor_count; ) {
l_obj = dap_db_log_list_get(l_ch_chain->request_db_log);
if (!l_obj)
if (!l_obj || DAP_POINTER_TO_INT(l_obj) == 1) {
l_skip_count = s_skip_in_reactor_count;
break;
}
dap_stream_ch_chain_hash_item_t *l_hash_item = NULL;
unsigned l_hash_item_hashv = 0;
HASH_VALUE(&l_obj->hash, sizeof(dap_chain_hash_fast_t), l_hash_item_hashv);
......@@ -1460,7 +1519,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
}
if (l_pkt_size) {
// If request was from defined node_addr we update its state
if( s_debug_more)
if (s_debug_more)
log_it(L_INFO, "Send one global_db packet len=%zu (rest=%zu/%zu items)", l_pkt_size,
dap_db_log_list_get_count_rest(l_ch_chain->request_db_log),
dap_db_log_list_get_count(l_ch_chain->request_db_log));
......@@ -1469,10 +1528,8 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
l_ch_chain->request_hdr.cell_id.uint64, l_pkt, l_pkt_size);
DAP_DELETE(l_pkt);
} else if (l_obj) {
// Sending dumb packet with nothing to inform remote thats we're just skiping GDBs, nothing freezed
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_GLOBAL_DB_TSD,
l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64, NULL, 0);
// We need to return into the write callback
a_ch->stream->esocket->buf_out_zero_count = 0;
} else {
log_it( L_INFO,"Syncronized database: items syncronyzed %"DAP_UINT64_FORMAT_U" from %zu",
l_ch_chain->stats_request_gdb_processed, dap_db_log_list_get_count(l_ch_chain->request_db_log));
......@@ -1583,10 +1640,8 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
0, l_ch_chain->callback_notify_arg);
}
if (! l_was_sent_smth ){
// Sending dumb packet with nothing to inform remote thats we're just skiping atoms, nothing freezed
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD,
l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64, NULL, 0);
// We need to return into the write callback
a_ch->stream->esocket->buf_out_zero_count = 0;
}
} break;
default: break;
......
......@@ -58,7 +58,7 @@ typedef struct dap_stream_ch_chain_hash_item{
typedef struct dap_stream_ch_chain {
dap_stream_ch_t * ch;
void *_inheritor;
dap_stream_ch_chain_state_t state;
dap_chain_node_client_t * node_client; // Node client associated with stream
......@@ -75,12 +75,16 @@ typedef struct dap_stream_ch_chain {
dap_stream_ch_chain_pkt_hdr_t request_hdr;
dap_list_t *request_db_iter;
bool was_active;
dap_timerfd_t *activity_timer;
dap_stream_ch_chain_callback_packet_t callback_notify_packet_out;
dap_stream_ch_chain_callback_packet_t callback_notify_packet_in;
void *callback_notify_arg;
} dap_stream_ch_chain_t;
#define DAP_STREAM_CH_CHAIN(a) ((dap_stream_ch_chain_t *) ((a)->internal) )
#define DAP_STREAM_CH(a) ((dap_stream_ch_t *)((a)->_inheritor))
#define DAP_CHAIN_PKT_EXPECT_SIZE 7168
int dap_stream_ch_chain_init(void);
......
......@@ -198,8 +198,7 @@ dap_store_obj_t* dap_store_obj_copy(dap_store_obj_t *a_store_obj, size_t a_store
memcpy(l_store_obj_dst, l_store_obj_src, sizeof(dap_store_obj_t));
l_store_obj_dst->group = dap_strdup(l_store_obj_src->group);
l_store_obj_dst->key = dap_strdup(l_store_obj_src->key);
l_store_obj_dst->value = DAP_NEW_SIZE(uint8_t, l_store_obj_dst->value_len);
memcpy(l_store_obj_dst->value, l_store_obj_src->value, l_store_obj_dst->value_len);
l_store_obj_dst->value = DAP_DUP_SIZE(l_store_obj_src->value, l_store_obj_src->value_len);
}
return l_store_obj;
}
......
......@@ -83,12 +83,12 @@ static void cdb_serialize_val_to_dap_store_obj(pdap_store_obj_t a_obj, const cha
a_obj->key = dap_strdup(key);
a_obj->id = dap_hex_to_uint(val, sizeof(uint64_t));
offset += sizeof(uint64_t);
a_obj->value_len = dap_hex_to_uint(val + offset, sizeof(unsigned long));
offset += sizeof(unsigned long);
a_obj->value_len = dap_hex_to_uint(val + offset, sizeof(uint64_t));
offset += sizeof(uint64_t);
a_obj->value = DAP_NEW_SIZE(uint8_t, a_obj->value_len);
memcpy(a_obj->value, val + offset, a_obj->value_len);
offset += a_obj->value_len;
a_obj->timestamp = (time_t)dap_hex_to_uint(val + offset, sizeof(time_t));
a_obj->timestamp = dap_hex_to_uint(val + offset, sizeof(uint64_t));
}
/** A callback function designed for finding a last item */
......@@ -159,11 +159,11 @@ bool dap_cdb_get_count_iter_callback(void *arg, const char *key, int ksize, cons
return true;
}
/**
/**
* @brief Initiates a CDB with main hash table size: 1000000,
* record cache: 128Mb, index page cache: 1024Mb.
* @param a_group a group name
* @param a_flags should be combination of CDB_CREAT / CDB_TRUNC / CDB_PAGEWARMUP
* @param a_group a group name
* @param a_flags should be combination of CDB_CREAT / CDB_TRUNC / CDB_PAGEWARMUP
CDB_PAGEWARMUP
* @return A pointer to CDB, if success. NULL, if error.
*/
......@@ -255,7 +255,7 @@ int dap_db_driver_cdb_init(const char *a_cdb_path, dap_db_driver_callbacks_t *a_
if (!S_ISDIR(buf.st_mode) || !res) {
continue;
}
#elif defined (DAP_OS_BSD)
#elif defined (DAP_OS_BSD)
struct stat buf;
int res = stat(d->d_name, &buf);
if (!S_ISDIR(buf.st_mode) || !res) {
......@@ -290,7 +290,7 @@ int dap_db_driver_cdb_init(const char *a_cdb_path, dap_db_driver_callbacks_t *a_
* @brief Gets CDB by a_group.
* @param a_group a group name
* @return if CDB is found, a pointer to CDB, otherwise NULL.
*/
*/
pcdb_instance dap_cdb_get_db_by_group(const char *a_group) {
pcdb_instance l_cdb_i = NULL;
pthread_rwlock_rdlock(&cdb_rwlock);
......@@ -355,7 +355,7 @@ int dap_db_driver_cdb_flush(void) {
* @brief Read last store item from CDB.
* @param a_group a group name
* @return If successful, a pointer to item, otherwise NULL.
*/
*/
dap_store_obj_t *dap_db_driver_cdb_read_last_store_obj(const char* a_group) {
if (!a_group) {
return NULL;
......@@ -382,7 +382,7 @@ dap_store_obj_t *dap_db_driver_cdb_read_last_store_obj(const char* a_group) {
* @param a_group the group name
* @param a_key the key
* @return true or false
*/
*/
bool dap_db_driver_cdb_is_obj(const char *a_group, const char *a_key)
{
bool l_ret = false;
......@@ -408,7 +408,7 @@ bool dap_db_driver_cdb_is_obj(const char *a_group, const char *a_key)
* @param a_key the key or NULL
* @param a_count_out IN. Count of read items. OUT Count of items was read
* @return If successful, pointer to items; otherwise NULL.
*/
*/
dap_store_obj_t *dap_db_driver_cdb_read_store_obj(const char *a_group, const char *a_key, size_t *a_count_out) {
if (!a_group) {
return NULL;
......@@ -468,7 +468,7 @@ dap_store_obj_t *dap_db_driver_cdb_read_store_obj(const char *a_group, const cha
* @param a_count_out[in] a count of items
* @param a_count[out] a count of items were got
* @return If successful, pointer to items, otherwise NULL.
*/
*/
dap_store_obj_t* dap_db_driver_cdb_read_cond_store_obj(const char *a_group, uint64_t a_id, size_t *a_count_out) {
if (!a_group) {
return NULL;
......@@ -520,7 +520,7 @@ dap_store_obj_t* dap_db_driver_cdb_read_cond_store_obj(const char *a_group, uint
* @param a_group the group name
* @param a_id id
* @return If successful, count of store items; otherwise 0.
*/
*/
size_t dap_db_driver_cdb_read_count_store(const char *a_group, uint64_t a_id)
{
if (!a_group) {
......@@ -595,19 +595,18 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) {
cdb_record l_rec;
l_rec.key = a_store_obj->key; //dap_strdup(a_store_obj->key);
int offset = 0;
char *l_val = DAP_NEW_Z_SIZE(char, sizeof(uint64_t) + sizeof(unsigned long) + a_store_obj->value_len + sizeof(time_t));
char *l_val = DAP_NEW_Z_SIZE(char, sizeof(uint64_t) + sizeof(uint64_t) + a_store_obj->value_len + sizeof(uint64_t));
dap_uint_to_hex(l_val, ++l_cdb_i->id, sizeof(uint64_t));
offset += sizeof(uint64_t);
dap_uint_to_hex(l_val + offset, a_store_obj->value_len, sizeof(unsigned long));
offset += sizeof(unsigned long);
dap_uint_to_hex(l_val + offset, a_store_obj->value_len, sizeof(uint64_t));
offset += sizeof(uint64_t);
if(a_store_obj->value && a_store_obj->value_len){
memcpy(l_val + offset, a_store_obj->value, a_store_obj->value_len);
DAP_DELETE(a_store_obj->value);
}
offset += a_store_obj->value_len;
unsigned long l_time = (unsigned long)a_store_obj->timestamp;
dap_uint_to_hex(l_val + offset, l_time, sizeof(time_t));
offset += sizeof(time_t);
dap_uint_to_hex(l_val + offset, a_store_obj->timestamp, sizeof(uint64_t));
offset += sizeof(uint64_t);
l_rec.val = l_val;
if (cdb_set2(l_cdb_i->cdb, l_rec.key, (int)strlen(l_rec.key), l_rec.val, offset, CDB_INSERTCACHE | CDB_OVERWRITE, 0) != CDB_SUCCESS) {
log_it(L_ERROR, "Couldn't add record with key [%s] to CDB: \"%s\"", l_rec.key, cdb_errmsg(cdb_errno(l_cdb_i->cdb)));
......
......@@ -276,33 +276,34 @@ dap_db_log_list_t* dap_db_log_list_start(dap_chain_node_addr_t a_addr, int a_fla
}
dap_list_free(l_groups_masks);
static int l_try_read_ban_list = 0;
static uint16_t s_size_ban_list = 0;
static char **s_ban_list = NULL;
static bool l_try_read_ban_list = false;
if (!l_try_read_ban_list) {
s_ban_list = dap_config_get_array_str(g_config, "general", "ban_list_sync_groups", &s_size_ban_list);
l_try_read_ban_list = 1;
s_ban_list = dap_config_get_array_str(g_config, "stream_ch_chain", "ban_list_sync_groups", &s_size_ban_list);
l_try_read_ban_list = true;
}
/* delete groups from ban list */
if (s_size_ban_list > 0) {
for (dap_list_t *l_groups = l_dap_db_log_list->groups; l_groups; ) {
int found = 0;
bool l_found = false;
for (int i = 0; i < s_size_ban_list; i++) {
if (dap_fnmatch(s_ban_list[i], l_groups->data, FNM_NOESCAPE)) {
if (!dap_fnmatch(s_ban_list[i], l_groups->data, FNM_NOESCAPE)) {
dap_list_t *l_tmp = l_groups->next;
dap_list_delete_link(l_dap_db_log_list->groups, l_groups);
l_dap_db_log_list->groups = dap_list_delete_link(l_dap_db_log_list->groups, l_groups);
l_groups = l_tmp;
found = 1;
l_found = true;
break;
}
}
if (found) continue;
if (l_found) continue;
l_groups = dap_list_next(l_groups);
}
}
for (dap_list_t *l_groups = l_dap_db_log_list->groups; l_groups; l_groups = dap_list_next(l_groups)) {
dap_db_log_list_group_t *l_replace = DAP_NEW_Z(dap_db_log_list_group_t);
l_replace->name = (char *)l_groups->data;
......@@ -367,33 +368,19 @@ size_t dap_db_log_list_get_count_rest(dap_db_log_list_t *a_db_log_list)
*/
dap_db_log_list_obj_t *dap_db_log_list_get(dap_db_log_list_t *a_db_log_list)
{
if(!a_db_log_list)
if (!a_db_log_list)
return NULL;
dap_list_t *l_list;
bool l_is_process;
int l_count = 0;
while(1) {
pthread_mutex_lock(&a_db_log_list->list_mutex);
l_is_process = a_db_log_list->is_process;
// check next item
l_list = a_db_log_list->list_read;
if (l_list){
a_db_log_list->list_read = dap_list_next(a_db_log_list->list_read);
a_db_log_list->items_rest--;
}
pthread_mutex_unlock(&a_db_log_list->list_mutex);
// wait reading next item, no more 1 sec (50 ms * 100 times)
if(!l_list && l_is_process) {
dap_usleep(DAP_USEC_PER_SEC / 200);
l_count++;
if(l_count > 100)
break;
}
else
break;
pthread_mutex_lock(&a_db_log_list->list_mutex);
int l_is_process = a_db_log_list->is_process;
// check next item
dap_list_t *l_list = a_db_log_list->list_read;
if (l_list){
a_db_log_list->list_read = dap_list_next(a_db_log_list->list_read);
a_db_log_list->items_rest--;
}
pthread_mutex_unlock(&a_db_log_list->list_mutex);
//log_it(L_DEBUG, "get item n=%d", a_db_log_list->items_number - a_db_log_list->items_rest);
return l_list ? (dap_db_log_list_obj_t *)l_list->data : NULL;
return l_list ? (dap_db_log_list_obj_t *)l_list->data : DAP_INT_TO_POINTER(l_is_process);
}
/**
......
......@@ -255,20 +255,20 @@ dap_store_obj_pkt_t *dap_store_packet_single(pdap_store_obj_t a_store_obj)
uint32_t l_type = a_store_obj->type;
memcpy(l_pkt->data, &l_type, sizeof(uint32_t));
uint64_t l_offset = sizeof(uint32_t);
uint16_t group_size = (uint16_t) dap_strlen(a_store_obj->group);
memcpy(l_pkt->data + l_offset, &group_size, sizeof(uint16_t));
uint16_t l_group_size = (uint16_t) dap_strlen(a_store_obj->group);
memcpy(l_pkt->data + l_offset, &l_group_size, sizeof(uint16_t));
l_offset += sizeof(uint16_t);
memcpy(l_pkt->data + l_offset, a_store_obj->group, group_size);
l_offset += group_size;
memcpy(l_pkt->data + l_offset, a_store_obj->group, l_group_size);
l_offset += l_group_size;
memcpy(l_pkt->data + l_offset, &a_store_obj->id, sizeof(uint64_t));
l_offset += sizeof(uint64_t);
memcpy(l_pkt->data + l_offset, &a_store_obj->timestamp, sizeof(time_t));
l_offset += sizeof(time_t);
uint16_t key_size = (uint16_t) dap_strlen(a_store_obj->key);
memcpy(l_pkt->data + l_offset, &key_size, sizeof(uint16_t));
memcpy(l_pkt->data + l_offset, &a_store_obj->timestamp, sizeof(uint64_t));
l_offset += sizeof(uint64_t);
uint16_t l_key_size = (uint16_t) dap_strlen(a_store_obj->key);
memcpy(l_pkt->data + l_offset, &l_key_size, sizeof(uint16_t));
l_offset += sizeof(uint16_t);
memcpy(l_pkt->data + l_offset, a_store_obj->key, key_size);
l_offset += key_size;
memcpy(l_pkt->data + l_offset, a_store_obj->key, l_key_size);
l_offset += l_key_size;
memcpy(l_pkt->data + l_offset, &a_store_obj->value_len, sizeof(uint64_t));
l_offset += sizeof(uint64_t);
memcpy(l_pkt->data + l_offset, a_store_obj->value, a_store_obj->value_len);
......@@ -314,9 +314,9 @@ dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *pkt, siz
memcpy(&obj->id, pkt->data + offset, sizeof(uint64_t));
offset += sizeof(uint64_t);
if (offset+sizeof (time_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'timestamp' field"); break;} // Check for buffer boundries
memcpy(&obj->timestamp, pkt->data + offset, sizeof(time_t));
offset += sizeof(time_t);
if (offset+sizeof (uint64_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'timestamp' field"); break;} // Check for buffer boundries
memcpy(&obj->timestamp, pkt->data + offset, sizeof(uint64_t));
offset += sizeof(uint64_t);
if (offset+sizeof (uint16_t)> pkt->data_size) {log_it(L_ERROR, "Broken GDB element: can't read 'key_length' field"); break;} // Check for buffer boundries
memcpy(&str_length, pkt->data + offset, sizeof(uint16_t));
......@@ -337,7 +337,7 @@ dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *pkt, siz
memcpy(obj->value, pkt->data + offset, obj->value_len);
offset += obj->value_len;
}
//assert(pkt->data_size == offset);
assert(pkt->data_size == offset);
if(store_obj_count)
*store_obj_count = count;
return store_obj;
......
......@@ -31,13 +31,13 @@
typedef struct dap_store_obj {
uint64_t id;
time_t timestamp;
uint64_t timestamp;
uint8_t type;
char *group;
char *key;
const char *c_key;
uint8_t *value;
size_t value_len;
uint64_t value_len;
}DAP_ALIGN_PACKED dap_store_obj_t, *pdap_store_obj_t;
typedef struct dap_store_obj_pkt {
......
......@@ -224,7 +224,8 @@ static const dap_chain_node_client_callbacks_t s_node_link_callbacks={
.connected=s_node_link_callback_connected,
.disconnected=s_node_link_callback_disconnected,
.stage=s_node_link_callback_stage,
.error=s_node_link_callback_error
.error=s_node_link_callback_error,
.delete=s_node_link_callback_delete
};
......@@ -629,29 +630,34 @@ static void s_node_link_callback_error(dap_chain_node_client_t * a_node_client,
*/
static void s_node_link_callback_delete(dap_chain_node_client_t * a_node_client, void * a_arg)
{
log_it(L_DEBUG,"Remove node client from list");
dap_chain_net_t * l_net = (dap_chain_net_t *) a_arg;
dap_chain_net_pvt_t * l_net_pvt = PVT(l_net);
if (!a_node_client->keep_connection) {
dap_notify_server_send_f_mt("{"
"class:\"NetLinkDelete\","
"net_id:0x%016" DAP_UINT64_FORMAT_X ","
"cell_id:0x%016"DAP_UINT64_FORMAT_X","
"address:\""NODE_ADDR_FP_STR"\""
"}\n", a_node_client->net->pub.id.uint64, a_node_client->info->hdr.cell_id.uint64,
NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address));
return;
}
pthread_rwlock_wrlock(&l_net_pvt->rwlock);
for ( dap_list_t * it = l_net_pvt->links; it; it=it->next ){
dap_chain_node_client_t * l_client =(dap_chain_node_client_t *) it->data;
// Cut out current iterator if it equals with deleting handler
if (l_client == a_node_client){
if (it->prev)
it->prev->next = it->next;
if (it->next)
it->next->prev = it->prev;
if (it->data == a_node_client) {
log_it(L_DEBUG,"Replace node client with new one");
it->data = dap_chain_net_client_create_n_connect(l_net, a_node_client->info);
}
}
pthread_rwlock_unlock(&l_net_pvt->rwlock);
dap_notify_server_send_f_mt("{"
"class:\"NetLinkDelete\","
"class:\"NetLinkRestart\","
"net_id:0x%016" DAP_UINT64_FORMAT_X ","
"cell_id:0x%016"DAP_UINT64_FORMAT_X","
"address:\""NODE_ADDR_FP_STR"\""
"}\n", a_node_client->net->pub.id.uint64, a_node_client->info->hdr.cell_id.uint64,
"}\n", a_node_client->net->pub.id.uint64, a_node_client->info->hdr.cell_id.uint64,
NODE_ADDR_FP_ARGS_S(a_node_client->info->hdr.address));
dap_chain_node_client_close(a_node_client);
// Then a_alient wiil be destroyed in a right way
}
/**
......@@ -834,6 +840,7 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg)
dap_list_t *l_tmp = l_net_pvt->links;
while (l_tmp) {
dap_list_t *l_next =l_tmp->next;
((dap_chain_node_client_t *)l_tmp->data)->keep_connection = false;
dap_chain_node_client_close(l_tmp->data);
DAP_DELETE(l_tmp);
l_tmp = l_next;
......@@ -849,6 +856,7 @@ static bool s_net_states_proc(dap_proc_thread_t *a_thread, void *a_arg)
break;
}
// disable SYNC_GDB
l_net_pvt->active_link = NULL;
l_net_pvt->flags &= ~F_DAP_CHAIN_NET_GO_SYNC;
l_net_pvt->last_sync = 0;
} break;
......@@ -1022,7 +1030,8 @@ bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t
if (l_links->data == l_net_pvt->active_link) {
dap_chain_node_client_t *l_client = (dap_chain_node_client_t *)l_links->data;
if (l_client->state >= NODE_CLIENT_STATE_ESTABLISHED &&
l_client->state < NODE_CLIENT_STATE_SYNCED) {
l_client->state < NODE_CLIENT_STATE_SYNCED &&
a_client != l_client) {
l_found = true;
break;
}
......@@ -1036,11 +1045,14 @@ bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t
return !l_found;
}
void dap_chain_net_sync_unlock(dap_chain_net_t *a_net)
void dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client)
{
if (!a_net)
return;
dap_chain_net_pvt_t *l_net_pvt = PVT(a_net);
pthread_rwlock_rdlock(&l_net_pvt->rwlock);
l_net_pvt->active_link = NULL;
if (!a_client || l_net_pvt->active_link == a_client)
l_net_pvt->active_link = NULL;
pthread_rwlock_unlock(&l_net_pvt->rwlock);
}
/**
......@@ -3066,7 +3078,7 @@ static bool s_net_check_acl(dap_chain_net_t *a_net, dap_chain_hash_fast_t *a_pke
uint8_t *l_pkey_ser = dap_enc_key_serealize_pub_key(l_cert->enc_key, &l_pkey_size);
dap_chain_hash_fast_t l_cert_hash;
dap_hash_fast(l_pkey_ser, l_pkey_size, &l_cert_hash);
if (!memcmp(l_pkey_ser, a_pkey_hash, sizeof(dap_chain_hash_fast_t))) {
if (!memcmp(&l_cert_hash, a_pkey_hash, sizeof(dap_chain_hash_fast_t))) {
l_authorized = true;
DAP_DELETE(l_pkey_ser);
break;
......
......@@ -4701,7 +4701,7 @@ SIGNER_COUNT
static char *s_strdup_by_index (const char *a_file, const int a_index);
static dap_tsd_t *s_alloc_metadata (const char *a_file, const int a_meta);
static uint8_t *s_concat_hash_and_mimetypes (dap_chain_hash_fast_t *a_chain, dap_list_t *a_meta_list, int a_index_meta, size_t *a_fullsize);
static uint8_t *s_concat_hash_and_mimetypes (dap_chain_hash_fast_t *a_chain, dap_list_t *a_meta_list, size_t *a_fullsize);
/*
* dap_sign_file - sign a file with flags.
......@@ -4758,8 +4758,6 @@ static int s_sign_file(const char *a_filename, dap_sign_signer_file_t a_flags, c
l_shift <<= 1;
}
int l_ret = 0;
dap_cert_t *l_cert = dap_cert_find_by_name(a_cert_name);
if (!l_cert) {
DAP_FREE(l_buffer);
......@@ -4772,7 +4770,7 @@ static int s_sign_file(const char *a_filename, dap_sign_signer_file_t a_flags, c
}
size_t l_full_size_for_sign;
uint8_t *l_data = s_concat_hash_and_mimetypes (a_hash, l_std_list, l_index_meta, &l_full_size_for_sign);
uint8_t *l_data = s_concat_hash_and_mimetypes(a_hash, l_std_list, &l_full_size_for_sign);
if (!l_data) {
DAP_FREE(l_buffer);
return 0;
......@@ -4788,18 +4786,15 @@ static int s_sign_file(const char *a_filename, dap_sign_signer_file_t a_flags, c
return 1;
}
static byte_t *s_concat_meta (dap_list_t *a_meta, int a_index_meta, size_t *a_fullsize)
static byte_t *s_concat_meta (dap_list_t *a_meta, size_t *a_fullsize)
{
if (a_fullsize)
*a_fullsize = 0;
int l_len = 0;
int l_n;
int l_part = 256;
int l_power = 1;
byte_t *l_buf = DAP_CALLOC(l_part * l_power++, 1);
int l_total = l_part;
int l_counter = 0;
size_t l_counter = 0;
int l_part_power = l_part;
int l_index = 0;
......@@ -4807,13 +4802,13 @@ static byte_t *s_concat_meta (dap_list_t *a_meta, int a_index_meta, size_t *a_fu
if (!l_iter->data) continue;
dap_tsd_t * l_tsd = (dap_tsd_t *) l_iter->data;
l_index = l_counter;
l_counter += strlen(l_tsd->data);
l_counter += strlen((char *)l_tsd->data);
if (l_counter >= l_part_power) {
l_part_power = l_part * l_power++;
l_buf = (byte_t *) DAP_REALLOC(l_buf, l_part_power);
}
memcpy (&l_buf[l_index], l_tsd->data, strlen(l_tsd->data));
memcpy (&l_buf[l_index], l_tsd->data, strlen((char *)l_tsd->data));
}
if (a_fullsize)
......@@ -4822,10 +4817,10 @@ static byte_t *s_concat_meta (dap_list_t *a_meta, int a_index_meta, size_t *a_fu
return l_buf;
}
static uint8_t *s_concat_hash_and_mimetypes (dap_chain_hash_fast_t *a_chain_hash, dap_list_t *a_meta_list, int a_index_meta, size_t *a_fullsize)
static uint8_t *s_concat_hash_and_mimetypes (dap_chain_hash_fast_t *a_chain_hash, dap_list_t *a_meta_list, size_t *a_fullsize)
{
if (!a_fullsize) return NULL;
byte_t *l_buf = s_concat_meta (a_meta_list, a_index_meta, a_fullsize);
byte_t *l_buf = s_concat_meta (a_meta_list, a_fullsize);
if (!l_buf) return (uint8_t *) l_buf;
size_t l_len_meta_buf = *a_fullsize;
......@@ -4858,7 +4853,7 @@ static dap_tsd_t *s_alloc_metadata (const char *a_file, const int a_meta)
case SIGNER_FILENAME_SHORT:
{
char *l_filename_short = NULL;
if (l_filename_short = strrchr(a_file, '.')) {
if ((l_filename_short = strrchr(a_file, '.')) != 0) {
int l_index_of_latest_point = l_filename_short - a_file;
l_filename_short = s_strdup_by_index (a_file, l_index_of_latest_point);
if (!l_filename_short) return NULL;
......@@ -4883,7 +4878,7 @@ static dap_tsd_t *s_alloc_metadata (const char *a_file, const int a_meta)
stat (a_file, &l_st);
char *l_ctime = ctime(&l_st.st_ctime);
char *l = NULL;
if (l = strchr(l_ctime, '\n')) *l = 0;
if ((l = strchr(l_ctime, '\n')) != 0) *l = 0;
return dap_tsd_create_string(SIGNER_DATE, l_ctime);
}
break;
......
......@@ -53,7 +53,7 @@
#include "dap_timerfd.h"
#include "dap_hash.h"
#include "dap_uuid.h"
//#include "dap_http_client_simple.h"
#include "dap_client.h"
#include "dap_client_pvt.h"
#include "dap_chain_global_db_remote.h"
#include "dap_chain_global_db_hist.h"
......@@ -164,17 +164,18 @@ static void s_stage_status_error_callback(dap_client_t *a_client, void *a_arg)
pthread_mutex_unlock(&l_node_client->wait_mutex);
l_node_client->esocket_uuid = 0;
dap_chain_net_sync_unlock(l_node_client->net, l_node_client);
if (l_node_client->callbacks.disconnected) {
l_node_client->callbacks.disconnected(l_node_client, l_node_client->callbacks_arg);
}
if (l_node_client->keep_connection) {
dap_events_socket_uuid_t *l_uuid = DAP_DUP(&l_node_client->uuid);
dap_timerfd_start_on_worker(l_node_client->stream_worker
? l_node_client->stream_worker->worker
: dap_events_worker_get_auto(),
s_timer_update_states * 1000,
s_timer_update_states_callback,
l_uuid);
l_node_client->sync_timer = dap_timerfd_start_on_worker(l_node_client->stream_worker
? l_node_client->stream_worker->worker
: dap_events_worker_get_auto(),
s_timer_update_states * 1000,
s_timer_update_states_callback,
l_uuid);
}
return;
}
......@@ -224,7 +225,7 @@ static bool s_timer_update_states_callback(void *a_arg)
dap_client_t * l_client = dap_client_from_esocket(l_es);
if (l_client ) {
dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t*) l_client->_inheritor;
if (l_node_client && l_node_client->ch_chain) {
if (l_node_client && l_node_client->ch_chain && l_node_client->stream_worker && l_node_client->ch_chain_uuid) {
dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(l_node_client->stream_worker, l_node_client->ch_chain_uuid);
if (l_ch) {
dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
......@@ -249,9 +250,12 @@ static bool s_timer_update_states_callback(void *a_arg)
// if we not returned yet
l_me->state = NODE_CLIENT_STATE_DISCONNECTED;
if (l_me->keep_connection) {
log_it(L_INFO, "Reconnecting node client with peer "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_me->remote_node_addr));
l_me->state = NODE_CLIENT_STATE_CONNECTING ;
dap_client_go_stage(l_me->client, STAGE_STREAM_STREAMING, s_stage_connected_callback);
if (dap_client_pvt_find(l_me->client->pvt_uuid)) {
log_it(L_INFO, "Reconnecting node client with peer "NODE_ADDR_FP_STR, NODE_ADDR_FP_ARGS_S(l_me->remote_node_addr));
l_me->state = NODE_CLIENT_STATE_CONNECTING ;
dap_client_go_stage(l_me->client, STAGE_STREAM_STREAMING, s_stage_connected_callback);
} else
dap_chain_node_client_close(l_me);
}
DAP_DELETE(l_uuid);
return false;
......@@ -294,7 +298,10 @@ static void s_stage_connected_callback(dap_client_t *a_client, void *a_arg)
dap_events_socket_uuid_t *l_uuid = DAP_DUP(&l_node_client->uuid);
dap_worker_exec_callback_on(l_stream->esocket->worker, s_node_client_connected_synchro_start_callback, l_uuid);
dap_events_socket_uuid_t *l_uuid_timer = DAP_DUP(&l_node_client->uuid);
dap_timerfd_start_on_worker(l_stream->esocket->worker, s_timer_update_states * 1000, s_timer_update_states_callback, l_uuid_timer);
l_node_client->sync_timer = dap_timerfd_start_on_worker(l_stream->esocket->worker,
s_timer_update_states * 1000,
s_timer_update_states_callback,
l_uuid_timer);
}
}
#ifndef _WIN32
......@@ -363,6 +370,8 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha
dap_stream_ch_chain_pkt_t *a_pkt, size_t a_pkt_data_size,
void * a_arg)
{
UNUSED(a_ch_chain);
UNUSED(a_pkt_data_size);
dap_chain_node_client_t * l_node_client = (dap_chain_node_client_t *) a_arg;
switch (a_pkt_type) {
......@@ -451,16 +460,18 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha
log_it(L_INFO,"In: Link %s."NODE_ADDR_FP_STR" started to sync %s chain",l_net->pub.name,
NODE_ADDR_FP_ARGS(l_node_addr), l_node_client->cur_chain->name );
}
dap_stream_ch_chain_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ,
dap_stream_ch_chain_pkt_write_unsafe(l_node_client->ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_REQ,
l_net->pub.id.uint64 ,
l_chain_id.uint64,l_cell_id.uint64,NULL,0);
}else{ // If no - over with sync process
dap_chain_node_addr_t * l_node_addr = dap_chain_net_get_cur_addr(l_net);
log_it(L_INFO, "In: State node %s."NODE_ADDR_FP_STR" is SYNCED",l_net->pub.name, NODE_ADDR_FP_ARGS(l_node_addr) );
dap_chain_net_sync_unlock(l_net);
dap_chain_net_sync_unlock(l_net, l_node_client);
l_node_client->state = NODE_CLIENT_STATE_SYNCED;
if (dap_chain_net_get_target_state(l_net) == NET_STATE_ONLINE)
if (dap_chain_net_get_target_state(l_net) == NET_STATE_ONLINE) {
dap_timerfd_reset(l_node_client->sync_timer);
dap_chain_net_set_state(l_net, NET_STATE_ONLINE);
}
else
dap_chain_net_state_go_to(l_net, NET_STATE_OFFLINE);
#ifndef _WIN32
......@@ -741,19 +752,23 @@ void dap_chain_node_client_close(dap_chain_node_client_t *a_client)
if (l_client_found) {
HASH_DEL(s_clients,l_client_found);
DAP_DELETE(l_client_found);
if (a_client->callbacks.delete)
a_client->callbacks.delete(a_client, a_client->net);
char l_node_addr_str[INET_ADDRSTRLEN] = {};
inet_ntop(AF_INET, &a_client->info->hdr.ext_addr_v4, l_node_addr_str, INET_ADDRSTRLEN);
log_it(L_INFO, "Closing node client to uplink %s:%d", l_node_addr_str, a_client->info->hdr.ext_port);
dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(a_client->stream_worker, a_client->ch_chain_uuid);
if (l_ch) {
dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
l_ch_chain->callback_notify_packet_in = NULL;
l_ch_chain->callback_notify_packet_out = NULL;
}
l_ch = dap_stream_ch_find_by_uuid_unsafe(a_client->stream_worker, a_client->ch_chain_net_uuid);
if (l_ch) {
dap_stream_ch_chain_net_t *l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(l_ch);
l_ch_chain_net->notify_callback = NULL;
if (a_client->stream_worker) {
dap_stream_ch_t *l_ch = dap_stream_ch_find_by_uuid_unsafe(a_client->stream_worker, a_client->ch_chain_uuid);
if (l_ch) {
dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(l_ch);
l_ch_chain->callback_notify_packet_in = NULL;
l_ch_chain->callback_notify_packet_out = NULL;
}
l_ch = dap_stream_ch_find_by_uuid_unsafe(a_client->stream_worker, a_client->ch_chain_net_uuid);
if (l_ch) {
dap_stream_ch_chain_net_t *l_ch_chain_net = DAP_STREAM_CH_CHAIN_NET(l_ch);
l_ch_chain_net->notify_callback = NULL;
}
}
// clean client
dap_client_pvt_t *l_client_pvt = dap_client_pvt_find(a_client->client->pvt_uuid);
......
......@@ -119,7 +119,7 @@ void dap_chain_net_set_flag_sync_from_zero(dap_chain_net_t * a_net, bool a_flag_
bool dap_chain_net_get_flag_sync_from_zero( dap_chain_net_t * a_net);
bool dap_chain_net_sync_trylock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client);
void dap_chain_net_sync_unlock(dap_chain_net_t *a_net);
void dap_chain_net_sync_unlock(dap_chain_net_t *a_net, dap_chain_node_client_t *a_client);
dap_chain_net_t * dap_chain_net_by_name( const char * a_name);
dap_chain_net_t * dap_chain_net_by_id( dap_chain_net_id_t a_id);
......
......@@ -113,8 +113,8 @@ typedef struct dap_chain_node_client {
struct in6_addr remote_ipv6;
bool keep_connection;
bool is_connected;
dap_timerfd_t *sync_timer;
// callbacks
dap_chain_node_client_callbacks_t callbacks;
void * callbacks_arg;
......