Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (6)
Showing
with 376 additions and 372 deletions
......@@ -2,7 +2,7 @@ project(cellframe-sdk C)
cmake_minimum_required(VERSION 2.8)
set(CMAKE_C_STANDARD 11)
set(CELLFRAME_SDK_NATIVE_VERSION "2.5-9")
set(CELLFRAME_SDK_NATIVE_VERSION "2.5-10")
add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"")
set(DAPSDK_MODULES "")
......
......@@ -256,14 +256,15 @@ void *dap_worker_thread(void *arg)
// Socket is ready to write
if(((l_epoll_events[n].events & EPOLLOUT) || (l_cur->flags & DAP_SOCK_READY_TO_WRITE))
&& !(l_cur->flags & DAP_SOCK_SIGNAL_CLOSE)) {
//log_it(DEBUG, "Main loop output: %u bytes to send",sa_cur->buf_out_size);
//log_it(L_DEBUG, "Main loop output: %u bytes to send", l_cur->buf_out_size);
if(l_cur->callbacks.write_callback)
l_cur->callbacks.write_callback(l_cur, NULL); // Call callback to process write event
if (l_cur->worker == NULL ){ // esocket was unassigned in callback, we don't need any ops with it now,
// continue to poll another esockets
continue;
}
if(l_cur->flags & DAP_SOCK_READY_TO_WRITE) {
static const uint32_t buf_out_zero_count_max = 5;
......@@ -307,15 +308,11 @@ void *dap_worker_thread(void *arg)
if (l_errno != EAGAIN && l_errno != EWOULDBLOCK ){ // If we have non-blocking socket
log_it(L_ERROR, "Some error occured in send(): %s", strerror(errno));
l_cur->flags |= DAP_SOCK_SIGNAL_CLOSE;
break;
}
}else{
//log_it(L_DEBUG, "Output: %u from %u bytes are sent ", l_bytes_sent,l_cur->buf_out_size);
//}
//log_it(L_DEBUG,"Output: sent %u bytes",total_sent);
if (l_bytes_sent) {
l_cur->buf_out_size -= l_bytes_sent;
//log_it(L_DEBUG,"Output: left %u bytes in buffer",l_cur->buf_out_size);
if (l_cur->buf_out_size) {
memmove(l_cur->buf_out, &l_cur->buf_out[l_bytes_sent], l_cur->buf_out_size);
} else {
......@@ -325,7 +322,9 @@ void *dap_worker_thread(void *arg)
}
}
}
if (l_cur->buf_out_size) {
dap_events_socket_set_writable_unsafe(l_cur,true);
}
if((l_cur->flags & DAP_SOCK_SIGNAL_CLOSE) && !l_cur->no_close) {
// protect against double deletion
l_cur->kill_signal = true;
......
......@@ -182,14 +182,7 @@ static void write_cb( EPOLL_HANDLE efd, int revents, dap_server_t *sh )
dap_events_socket_t *client = udp_client->esocket;
if( client != NULL && !check_close(client) && (client->flags & DAP_SOCK_READY_TO_WRITE) ) {
if ( sh->client_callbacks.write_callback )
sh->client_callbacks.write_callback( client, NULL );
if ( client->buf_out_size > 0 ) {
struct sockaddr_in addr;
addr.sin_family = AF_INET;
dap_udp_client_get_address( client, (unsigned int *)&addr.sin_addr.s_addr, &addr.sin_port );
......@@ -211,6 +204,8 @@ static void write_cb( EPOLL_HANDLE efd, int revents, dap_server_t *sh )
sb_payload_ready = false;
}
LL_DELETE( udp->waiting_clients, udp_client );
if ( sh->client_callbacks.write_callback )
sh->client_callbacks.write_callback( client, NULL );
}
else if( client == NULL ) {
LL_DELETE( udp->waiting_clients, udp_client );
......
......@@ -297,6 +297,7 @@ void dap_http_folder_data_write(dap_http_client_t * cl_ht, void * arg)
dap_http_file_t * cl_ht_file= DAP_HTTP_FILE(cl_ht);
cl_ht->esocket->buf_out_size=fread(cl_ht->esocket->buf_out,1,sizeof(cl_ht->esocket->buf_out),cl_ht_file->fd);
cl_ht_file->position+=cl_ht->esocket->buf_out_size;
dap_events_socket_set_writable_unsafe(cl_ht->esocket, true);
if(feof(cl_ht_file->fd)!=0){
log_it(L_INFO, "All the file %s is sent out",cl_ht_file->local_path);
......
......@@ -228,14 +228,10 @@ inline static bool _is_supported_user_agents_list_setted()
inline static void _set_only_write_http_client_state(dap_http_client_t* http_client)
{
// log_it(L_DEBUG,"_set_only_write_http_client_state");
// Sleep(300);
http_client->esocket->flags = DAP_SOCK_READY_TO_WRITE; // To not to touch epoll_fd we clean flags by ourself
// http_client->state_write=DAP_HTTP_CLIENT_STATE_NONE;
http_client->state_write=DAP_HTTP_CLIENT_STATE_START;
dap_events_socket_set_writable_unsafe(http_client->esocket,true);
// http_client->state_write=DAP_HTTP_CLIENT_STATE_START;
dap_events_socket_set_readable_unsafe(http_client->esocket, false);
}
static void _copy_reply_and_mime_to_response( dap_http_simple_t *cl_sh )
......@@ -380,6 +376,7 @@ static void s_http_client_data_write( dap_http_client_t * a_http_client, void *a
l_http_simple->reply_sent += dap_events_socket_write_unsafe( a_http_client->esocket,
l_http_simple->reply_byte + l_http_simple->reply_sent,
a_http_client->out_content_length - l_http_simple->reply_sent );
dap_events_socket_set_writable_unsafe(a_http_client->esocket, true);
if ( l_http_simple->reply_sent >= a_http_client->out_content_length ) {
log_it(L_INFO, "All the reply (%u) is sent out", a_http_client->out_content_length );
......
......@@ -499,7 +499,7 @@ void dap_http_client_write( dap_events_socket_t * cl, void *arg )
log_it( L_INFO," HTTP response with %u status code", l_http_client->reply_status_code );
dap_events_socket_write_f_unsafe(cl, "HTTP/1.1 %u %s\r\n",l_http_client->reply_status_code, l_http_client->reply_reason_phrase[0] ?
l_http_client->reply_reason_phrase : http_status_reason_phrase(l_http_client->reply_status_code) );
dap_events_socket_set_writable_unsafe(cl, true);
dap_http_client_out_header_generate( l_http_client );
l_http_client->state_write = DAP_HTTP_CLIENT_STATE_HEADERS;
} break;
......@@ -510,6 +510,7 @@ void dap_http_client_write( dap_events_socket_t * cl, void *arg )
log_it(L_DEBUG, "Output: headers are over (reply status code %u content_lentgh %u)",
l_http_client->reply_status_code);
dap_events_socket_write_f_unsafe(cl, "\r\n");
dap_events_socket_set_writable_unsafe(cl, true);
if ( l_http_client->out_content_length || l_http_client->out_content_ready ) {
l_http_client->state_write=DAP_HTTP_CLIENT_STATE_DATA;
} else {
......@@ -522,6 +523,7 @@ void dap_http_client_write( dap_events_socket_t * cl, void *arg )
} else {
//log_it(L_WARNING,"Output: header %s: %s",hdr->name,hdr->value);
dap_events_socket_write_f_unsafe(cl, "%s: %s\r\n", hdr->name, hdr->value);
dap_events_socket_set_writable_unsafe(cl, true);
dap_http_header_remove( &l_http_client->out_headers, hdr );
}
} break;
......
......@@ -472,25 +472,25 @@ static void s_esocket_data_read(dap_events_socket_t* a_client, void * a_arg)
* @param sh DAP client instance
* @param arg Not used
*/
static void s_esocket_write(dap_events_socket_t* a_client , void * a_arg){
static void s_esocket_write(dap_events_socket_t* a_esocket , void * a_arg){
(void) a_arg;
size_t i;
bool ready_to_write=false;
dap_http_client_t *l_http_client = DAP_HTTP_CLIENT(a_client);
bool l_ready_to_write=false;
dap_http_client_t *l_http_client = DAP_HTTP_CLIENT(a_esocket);
//log_it(L_DEBUG,"Process channels data output (%u channels)", DAP_STREAM(l_http_client)->channel_count );
for(i=0;i<DAP_STREAM(l_http_client)->channel_count; i++){
dap_stream_ch_t * ch = DAP_STREAM(l_http_client)->channel[i];
if(ch->ready_to_write){
if(ch->proc->packet_out_callback)
ch->proc->packet_out_callback(ch,NULL);
ready_to_write|=ch->ready_to_write;
l_ready_to_write|=ch->ready_to_write;
}
}
if (s_dump_packet_headers ) {
log_it(L_DEBUG,"dap_stream_data_write: ready_to_write=%s client->buf_out_size=%u" ,
ready_to_write?"true":"false", a_client->buf_out_size );
l_ready_to_write?"true":"false", a_esocket->buf_out_size );
}
dap_events_socket_set_writable_unsafe(a_esocket, l_ready_to_write);
//log_it(L_DEBUG,"stream_dap_data_write ok");
}
......
......@@ -138,12 +138,12 @@ bool s_sync_chains_callback(dap_proc_thread_t *a_thread, void *a_arg)
dap_chain_atom_iter_t* l_iter = l_chain->callback_atom_iter_create(l_chain);
l_ch_chain->request_atom_iter = l_iter;
l_lasts = l_chain->callback_atom_iter_get_lasts(l_iter, &l_lasts_count, &l_lasts_sizes);
if(l_lasts&& l_lasts_sizes) {
for(size_t i = l_lasts_count - 1; i >= 0; i--) {
log_it(L_INFO, "Found %d atoms for synchronization", l_lasts_count);
if (l_lasts && l_lasts_sizes) {
for(long int i = l_lasts_count - 1; i >= 0; i--) {
dap_chain_atom_item_t * l_item = NULL;
dap_chain_hash_fast_t l_atom_hash;
dap_hash_fast(l_lasts[i], l_lasts_sizes[i],
&l_atom_hash);
dap_hash_fast(l_lasts[i], l_lasts_sizes[i], &l_atom_hash);
pthread_mutex_lock(&l_ch_chain->mutex);
HASH_FIND(hh, l_ch_chain->request_atoms_lasts, &l_atom_hash, sizeof(l_atom_hash),
l_item);
......@@ -207,6 +207,7 @@ bool s_sync_gdb_callback(dap_proc_thread_t *a_thread, void *a_arg)
//log_it(L_DEBUG, "Start getting items %u:%u", l_request->id_start + 1,l_db_log->items_number);//dap_list_length(l_list));
// Add it to outgoing list
l_ch_chain->request_global_db_trs = l_db_log;
l_ch_chain->db_iter = NULL;
l_ch_chain->state = CHAIN_STATE_SYNC_GLOBAL_DB;
} else {
dap_stream_ch_chain_sync_request_t l_request = {};
......@@ -214,13 +215,12 @@ bool s_sync_gdb_callback(dap_proc_thread_t *a_thread, void *a_arg)
l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0;
l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64);
dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
l_ch_chain->request_cell_id, &l_request, sizeof(l_request));
l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
l_ch_chain->request_cell_id, &l_request, sizeof(l_request));
l_ch_chain->state = CHAIN_STATE_IDLE;
if(l_ch_chain->callback_notify_packet_out)
l_ch_chain->callback_notify_packet_out(l_ch_chain,
DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
NULL, 0, l_ch_chain->callback_notify_arg);
l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
NULL, 0, l_ch_chain->callback_notify_arg);
}
//log_it(L_INFO, "Prepared %u items for sync", l_db_log->items_number - l_request->id_start);
// go to send data from list [in s_stream_ch_packet_out()]
......@@ -381,91 +381,75 @@ bool s_gdb_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
*/
void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
{
//static char *s_net_name = NULL;
dap_stream_ch_chain_t * l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);
if(l_ch_chain) {
dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg;
dap_stream_ch_chain_pkt_t * l_chain_pkt = (dap_stream_ch_chain_pkt_t *) l_ch_pkt->data;
uint8_t l_acl_idx = dap_chain_net_acl_idx_by_id(l_chain_pkt->hdr.net_id);
bool l_error = false;
char l_err_str[64];
if (l_acl_idx == (uint8_t)-1) {
log_it(L_ERROR, "Invalid net id in packet");
strcpy(l_err_str, "ERROR_NET_INVALID_ID");
l_error = true;
}
if (!l_error && a_ch->stream->session->acl && !a_ch->stream->session->acl[l_acl_idx]) {
log_it(L_WARNING, "Unauthorized request attempt to network %s",
dap_chain_net_by_id(l_chain_pkt->hdr.net_id)->pub.name);
strcpy(l_err_str, "ERROR_NET_NOT_AUTHORIZED");
l_error = true;
}
if (l_error) {
if (!l_ch_chain) {
return;
}
dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_arg;
dap_stream_ch_chain_pkt_t * l_chain_pkt = (dap_stream_ch_chain_pkt_t *) l_ch_pkt->data;
if (!l_chain_pkt) {
return;
}
size_t l_chain_pkt_data_size = l_ch_pkt->hdr.size - sizeof(l_chain_pkt->hdr);
uint8_t l_acl_idx = dap_chain_net_acl_idx_by_id(l_chain_pkt->hdr.net_id);
if (l_acl_idx == (uint8_t)-1) {
log_it(L_ERROR, "Invalid net id in packet");
if (l_ch_pkt->hdr.type == DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR) {
if(l_ch_chain->callback_notify_packet_in) {
l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt,
l_chain_pkt_data_size, l_ch_chain->callback_notify_arg);
}
} else {
dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, l_err_str);
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
"ERROR_NET_INVALID_ID");
dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
}
size_t l_chain_pkt_data_size = l_ch_pkt->hdr.size - sizeof(l_chain_pkt->hdr);
if (!l_error && l_chain_pkt) {
switch (l_ch_pkt->hdr.type) {
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: {
log_it(L_INFO, "In: SYNCED_ALL pkt");
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: {
log_it(L_INFO, "In: SYNCED_GLOBAL_DB pkt");
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP: {
log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt");
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: {
log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt");
}
break;
return;
}
if (a_ch->stream->session->acl && !a_ch->stream->session->acl[l_acl_idx]) {
log_it(L_WARNING, "Unauthorized request attempt to network %s",
dap_chain_net_by_id(l_chain_pkt->hdr.net_id)->pub.name);
dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
"ERROR_NET_NOT_AUTHORIZED");
dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
return;
}
switch (l_ch_pkt->hdr.type) {
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_ALL: {
log_it(L_INFO, "In: SYNCED_ALL pkt");
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB: {
log_it(L_INFO, "In: SYNCED_GLOBAL_DB pkt");
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB_GROUP: {
log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt");
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB_GROUP: {
log_it(L_INFO, "In: SYNCED_GLOBAL_DB_GROUP pkt");
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: {
log_it(L_INFO, "In: SYNCED_CHAINS pkt");
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: {
log_it(L_INFO, "In: SYNC_CHAINS pkt");
dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
if(l_chain) {
if(l_ch_chain->state != CHAIN_STATE_IDLE) {
log_it(L_INFO, "Can't process SYNC_CHAINS request because not in idle state");
dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
"ERROR_STATE_NOT_IN_IDLE");
dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
} else {
// fill ids
if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) {
dap_stream_ch_chain_sync_request_t * l_request =
(dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data;
memcpy(&l_ch_chain->request, l_request, l_chain_pkt_data_size);
memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
}
dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_chains_callback, a_ch);
}
}
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: {
log_it(L_INFO, "In: SYNC_GLOBAL_DB pkt");
if(l_ch_chain->state != CHAIN_STATE_IDLE) {
log_it(L_INFO, "Can't process SYNC_GLOBAL_DB request because not in idle state");
dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
"ERROR_STATE_NOT_IN_IDLE");
dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
break;
}
// receive the latest global_db revision of the remote node -> go to send mode
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS: {
log_it(L_INFO, "In: SYNCED_CHAINS pkt");
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS: {
log_it(L_INFO, "In: SYNC_CHAINS pkt");
dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
if(l_chain) {
if(l_ch_chain->state != CHAIN_STATE_IDLE) {
log_it(L_INFO, "Can't process SYNC_CHAINS request because not in idle state");
dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
"ERROR_STATE_NOT_IN_IDLE");
dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
} else {
// fill ids
if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) {
dap_stream_ch_chain_sync_request_t * l_request =
(dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data;
......@@ -473,110 +457,138 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_gdb_callback, a_ch);
}
else {
log_it(L_ERROR, "Get DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB session_id=%u bad request",
a_ch->stream->session->id);
dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
"ERROR_SYNC_GLOBAL_DB_REQUEST_BAD");
dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
}
}
break;
// first packet of data with source node address
case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN: {
log_it(L_INFO, "In: FIRST_CHAIN data_size=%d", l_chain_pkt_data_size);
if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t))
memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size);
dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_chains_callback, a_ch);
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN: {
//log_it(L_INFO, "In: CHAIN pkt");
dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
if(l_chain) {
// Expect atom element in
if(l_chain_pkt_data_size > 0) {
memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size);
memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
l_ch_chain->pkt_data_size = l_chain_pkt_data_size;
dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_chain_pkt_callback, a_ch);
} else {
log_it(L_WARNING, "Empty chain packet");
dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
"ERROR_CHAIN_PACKET_EMPTY");
dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
}
}
}
break;
// first packet of data with source node address
case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB: {
log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d", l_chain_pkt_data_size);
if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t))
memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size);
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB: {
//log_it(L_INFO, "In: GLOBAL_DB data_size=%d", l_chain_pkt_data_size);
// get transaction and save it to global_db
if(l_chain_pkt_data_size > 0) {
memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size);
memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
l_ch_chain->pkt_data_size = l_chain_pkt_data_size;
dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_gdb_pkt_callback, a_ch);
} else {
log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size");
dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
"ERROR_GLOBAL_DB_PACKET_EMPTY");
dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
}
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS: {
dap_stream_ch_chain_sync_request_t l_sync_gdb = {};
memcpy(&l_sync_gdb, l_chain_pkt->data, l_chain_pkt_data_size);
dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id);
l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ?
dap_chain_net_get_cur_addr(l_net)->uint64 :
dap_db_get_cur_node_addr(l_net->pub.name);
// Get last timestamp in log
l_sync_gdb.id_start = (uint64_t) dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64);
// no limit
l_sync_gdb.id_end = (uint64_t)0;
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_gdb, sizeof(l_sync_gdb));
}
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: {
dap_stream_ch_chain_sync_request_t l_sync_chains = {};
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_chains, sizeof(l_sync_chains));
}
default: {
}
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB: {
log_it(L_INFO, "In: SYNC_GLOBAL_DB pkt");
if(l_ch_chain->state != CHAIN_STATE_IDLE) {
log_it(L_INFO, "Can't process SYNC_GLOBAL_DB request because not in idle state");
dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
"ERROR_STATE_NOT_IN_IDLE");
dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
break;
}
// receive the latest global_db revision of the remote node -> go to send mode
if(l_chain_pkt_data_size == sizeof(dap_stream_ch_chain_sync_request_t)) {
dap_stream_ch_chain_sync_request_t * l_request =
(dap_stream_ch_chain_sync_request_t *) l_chain_pkt->data;
memcpy(&l_ch_chain->request, l_request, l_chain_pkt_data_size);
memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_sync_gdb_callback, a_ch);
}
else {
log_it(L_ERROR, "Get DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB session_id=%u bad request",
a_ch->stream->session->id);
dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
"ERROR_SYNC_GLOBAL_DB_REQUEST_BAD");
dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
}
}
break;
// first packet of data with source node address
case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_CHAIN: {
log_it(L_INFO, "In: FIRST_CHAIN data_size=%d", l_chain_pkt_data_size);
if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t))
memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size);
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN: {
log_it(L_INFO, "In: CHAIN pkt data_size=%d", l_chain_pkt_data_size);
dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
if(l_chain) {
// Expect atom element in
if(l_chain_pkt_data_size > 0) {
memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size);
memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
l_ch_chain->pkt_data_size = l_chain_pkt_data_size;
dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_chain_pkt_callback, a_ch);
} else {
log_it(L_WARNING, "Empty chain packet");
dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
"ERROR_UNKNOWN_CHAIN_PKT_TYPE");
}
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
"ERROR_CHAIN_PACKET_EMPTY");
dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
}
if(l_ch_chain->callback_notify_packet_in)
l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt,
l_chain_pkt_data_size, l_ch_chain->callback_notify_arg);
}
}
break;
// first packet of data with source node address
case DAP_STREAM_CH_CHAIN_PKT_TYPE_FIRST_GLOBAL_DB: {
log_it(L_INFO, "In: FIRST_GLOBAL_DB data_size=%d", l_chain_pkt_data_size);
if(l_chain_pkt_data_size == sizeof(dap_chain_node_addr_t))
memcpy(&l_ch_chain->request.node_addr, l_chain_pkt->data, l_chain_pkt_data_size);
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB: {
log_it(L_INFO, "In: GLOBAL_DB data_size=%d", l_chain_pkt_data_size);
// get transaction and save it to global_db
if(l_chain_pkt_data_size > 0) {
memcpy(&l_ch_chain->request_net_id, &l_chain_pkt->hdr.net_id, sizeof(dap_chain_net_id_t));
memcpy(&l_ch_chain->request_chain_id, &l_chain_pkt->hdr.chain_id, sizeof(dap_chain_id_t));
memcpy(&l_ch_chain->request_cell_id, &l_chain_pkt->hdr.cell_id, sizeof(dap_chain_cell_id_t));
l_ch_chain->pkt_data = DAP_CALLOC(1, l_chain_pkt_data_size);
memcpy(l_ch_chain->pkt_data, l_chain_pkt->data, l_chain_pkt_data_size);
l_ch_chain->pkt_data_size = l_chain_pkt_data_size;
dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
dap_proc_queue_add_callback(a_ch->stream_worker->worker->proc_queue, s_gdb_pkt_callback, a_ch);
} else {
log_it(L_WARNING, "Packet with GLOBAL_DB atom has zero body size");
dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
"ERROR_GLOBAL_DB_PACKET_EMPTY");
dap_stream_ch_set_ready_to_write_unsafe(a_ch, true);
}
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB_RVRS: {
dap_stream_ch_chain_sync_request_t l_sync_gdb = {};
memcpy(&l_sync_gdb, l_chain_pkt->data, l_chain_pkt_data_size);
dap_chain_net_t *l_net = dap_chain_net_by_id(l_chain_pkt->hdr.net_id);
l_sync_gdb.node_addr.uint64 = dap_chain_net_get_cur_addr(l_net) ?
dap_chain_net_get_cur_addr(l_net)->uint64 :
dap_db_get_cur_node_addr(l_net->pub.name);
// Get last timestamp in log
l_sync_gdb.id_start = (uint64_t) dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64);
// no limit
l_sync_gdb.id_end = (uint64_t)0;
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_GLOBAL_DB, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_gdb, sizeof(l_sync_gdb));
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS_RVRS: {
dap_stream_ch_chain_sync_request_t l_sync_chains = {};
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id, &l_sync_chains, sizeof(l_sync_chains));
}
break;
case DAP_STREAM_CH_CHAIN_PKT_TYPE_ERROR:
break;
default: {
dap_stream_ch_chain_pkt_write_error(a_ch, l_chain_pkt->hdr.net_id,
l_chain_pkt->hdr.chain_id, l_chain_pkt->hdr.cell_id,
"ERROR_UNKNOWN_CHAIN_PKT_TYPE");
}
}
if(l_ch_chain->callback_notify_packet_in)
l_ch_chain->callback_notify_packet_in(l_ch_chain, l_ch_pkt->hdr.type, l_chain_pkt,
l_chain_pkt_data_size, l_ch_chain->callback_notify_arg);
}
/**
* @brief dap_stream_ch_chain_go_idle
* @param a_ch_chain
......@@ -603,6 +615,29 @@ void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain)
pthread_mutex_unlock(&a_ch_chain->mutex);
}
bool s_process_gdb_iter(dap_stream_ch_t *a_ch)
{
dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);
dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs;
dap_store_obj_pkt_t *l_pkt = (dap_store_obj_pkt_t *)l_ch_chain->db_iter->data;
uint32_t l_pkt_size = sizeof(dap_store_obj_pkt_t) + l_pkt->data_size;
log_it(L_INFO, "Send one global_db record packet len=%d (rest=%d/%d items)", l_pkt_size,
dap_db_log_list_get_count_rest(l_db_list), dap_db_log_list_get_count(l_db_list));
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB,
l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
l_ch_chain->request_cell_id, l_pkt, l_pkt_size);
dap_list_t *l_iter = dap_list_next(l_ch_chain->db_iter);
if (l_iter) {
l_ch_chain->db_iter = l_iter;
} else {
l_ch_chain->stats_request_gdb_processed++;
l_ch_chain->db_iter = dap_list_first(l_ch_chain->db_iter);
dap_list_free_full(l_ch_chain->db_iter, free);
l_ch_chain->db_iter = NULL;
}
return true;
}
bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
{
UNUSED(a_thread);
......@@ -613,79 +648,56 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
// log_it( L_DEBUG,"l_ch_chain %X", l_ch_chain );
bool l_packet_out = false;
switch (l_ch_chain->state) {
case CHAIN_STATE_IDLE: {
dap_stream_ch_chain_go_idle(l_ch_chain);
} break;
case CHAIN_STATE_SYNC_ALL:
case CHAIN_STATE_SYNC_GLOBAL_DB: {
// Get log diff
dap_db_log_list_t *l_db_list = l_ch_chain->request_global_db_trs;
dap_global_db_obj_t *l_obj = dap_db_log_list_get(l_db_list);
bool l_is_stop = true;
while(l_obj) {
size_t l_item_size_out = 0;
uint8_t *l_item = dap_db_log_pack(l_obj, &l_item_size_out);
// Item not found, maybe it has deleted? Then go to the next item
if(!l_item || !l_item_size_out) {
//log_it(L_WARNING, "Log pack returned NULL??? data=0x%x (nothing to send) (rest=%d records)", l_obj, l_items_rest);
l_item_size_out = 0;
// go to next item
l_obj = dap_db_log_list_get(l_db_list);
}
else {
/*size_t l_items_total = dap_db_log_list_get_count(l_db_list);
size_t l_items_rest = dap_db_log_list_get_count_rest(l_db_list);
log_it(L_INFO, "Send one global_db record data=0x%x len=%d (rest=%d/%d items)", l_item, l_item_size_out,
l_items_rest, l_items_total);*/
dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB,
l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
l_ch_chain->request_cell_id, l_item, l_item_size_out);
if (l_ch_chain->db_iter) {
l_packet_out = s_process_gdb_iter(l_ch);
} else {
dap_global_db_obj_t *l_obj;
do { // Get log diff
size_t l_item_size_out = 0;
l_obj = dap_db_log_list_get(l_ch_chain->request_global_db_trs);
l_ch_chain->db_iter = dap_db_log_pack(l_obj, &l_item_size_out);
if (l_ch_chain->db_iter && l_item_size_out) {
break;
}
// Item not found, maybe it has deleted? Then go to the next item
} while (l_obj);
if (l_ch_chain->db_iter) {
l_packet_out = s_process_gdb_iter(l_ch);
} else {
//log_it(L_DEBUG, "l_obj == 0, STOP");
// free log list
dap_db_log_list_delete(l_ch_chain->request_global_db_trs);
l_ch_chain->request_global_db_trs = NULL;
// last message
dap_stream_ch_chain_sync_request_t l_request = {};
dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id);
l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0;
l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64);
l_request.id_end = 0;
log_it( L_DEBUG,"Syncronized database: last id %llu, items syncronyzed %llu ", l_request.id_start,
l_ch_chain->stats_request_gdb_processed );
dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
l_ch_chain->request_cell_id, &l_request, sizeof(l_request));
l_packet_out = true;
l_ch_chain->stats_request_gdb_processed++;
DAP_DELETE(l_item);
// sent the record, another will be sent
l_is_stop = false;
break;
}
}
if(l_is_stop){
//log_it(L_DEBUG, "l_obj == 0, STOP");
// free log list
l_ch_chain->request_global_db_trs = NULL;
dap_db_log_list_delete(l_db_list);
// last message
dap_stream_ch_chain_sync_request_t l_request = {};
dap_chain_net_t *l_net = dap_chain_net_by_id(l_ch_chain->request_net_id);
l_request.node_addr.uint64 = l_net ? dap_db_get_cur_node_addr(l_net->pub.name) : 0;
l_request.id_start = dap_db_log_get_last_id_remote(l_ch_chain->request.node_addr.uint64);
l_request.id_end = 0;
log_it( L_DEBUG,"Syncronized database: last id %llu, items syncronyzed %llu ", l_request.id_start,
l_ch_chain->stats_request_gdb_processed );
dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
l_ch_chain->request_net_id, l_ch_chain->request_chain_id,
l_ch_chain->request_cell_id, &l_request, sizeof(l_request));
l_packet_out = true;
if(l_ch_chain->callback_notify_packet_out)
l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
NULL, 0, l_ch_chain->callback_notify_arg);
if(l_ch_chain->state != CHAIN_STATE_SYNC_ALL)
if(l_ch_chain->callback_notify_packet_out)
l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_GLOBAL_DB,
NULL, 0, l_ch_chain->callback_notify_arg);
dap_stream_ch_chain_go_idle(l_ch_chain);
}
}
} break;
}
if(l_ch_chain->state != CHAIN_STATE_SYNC_ALL)
break;
// Synchronize chains
// Synchronize chains
case CHAIN_STATE_SYNC_CHAINS: {
//log_it(L_DEBUG, "CHAIN_STATE_SYNC_CHAINS");
dap_chain_t * l_chain = dap_chain_find_by_id(l_ch_chain->request_net_id, l_ch_chain->request_chain_id);
......@@ -724,6 +736,7 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
// Then flush it out to the remote
size_t l_atom_size = l_atom_item->atom_size;
log_it(L_INFO, "Send one chain packet len=%d", l_atom_size);
dap_stream_ch_chain_pkt_write_unsafe(l_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_net_id,
l_ch_chain->request_chain_id, l_ch_chain->request_cell_id,
l_atom_item->atom, l_atom_size);
......@@ -760,8 +773,8 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
}
pthread_mutex_unlock(&l_ch_chain->mutex);
}
}
break;
} break;
default: break;
}
if (l_packet_out) {
dap_stream_ch_set_ready_to_write_unsafe(l_ch, true);
......@@ -778,6 +791,10 @@ bool s_out_pkt_callback(dap_proc_thread_t *a_thread, void *a_arg)
void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
{
(void) a_arg;
if (a_ch->stream->esocket->buf_out_size > DAP_EVENTS_SOCKET_BUF / 2) {
return;
}
dap_stream_ch_set_ready_to_write_unsafe(a_ch, false);
dap_stream_ch_chain_t *l_ch_chain = DAP_STREAM_CH_CHAIN(a_ch);
if (l_ch_chain && l_ch_chain->state != CHAIN_STATE_IDLE) {
dap_events_socket_remove_from_worker_unsafe(a_ch->stream->esocket, a_ch->stream_worker->worker);
......
......@@ -49,8 +49,8 @@ dap_stream_ch_chain_state_t dap_stream_ch_chain_pkt_type_to_dap_stream_ch_chain_
* @return
*/
size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id,
dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id,
const void * a_data, size_t a_data_size)
dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id,
const void * a_data, size_t a_data_size)
{
dap_stream_ch_chain_pkt_t * l_chain_pkt;
size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr) + a_data_size;
......@@ -69,8 +69,8 @@ size_t dap_stream_ch_chain_pkt_write_unsafe(dap_stream_ch_t *a_ch, uint8_t a_typ
}
size_t dap_stream_ch_chain_pkt_write_mt(dap_stream_worker_t *a_worker, dap_stream_ch_t *a_ch, uint8_t a_type,dap_chain_net_id_t a_net_id,
dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id,
const void * a_data, size_t a_data_size)
dap_chain_id_t a_chain_id, dap_chain_cell_id_t a_cell_id,
const void * a_data, size_t a_data_size)
{
dap_stream_ch_chain_pkt_t * l_chain_pkt;
size_t l_chain_pkt_size = sizeof (l_chain_pkt->hdr) + a_data_size;
......
......@@ -49,7 +49,8 @@ typedef struct dap_stream_ch_chain {
pthread_mutex_t mutex;
dap_stream_ch_t * ch;
dap_db_log_list_t *request_global_db_trs; // list of transactions
dap_db_log_list_t *request_global_db_trs; // list of global db records
dap_list_t *db_iter;
dap_stream_ch_chain_state_t state;
dap_chain_atom_iter_t * request_atom_iter;
......
......@@ -57,7 +57,7 @@ typedef enum dap_stream_ch_chain_state{
CHAIN_STATE_IDLE=0,
CHAIN_STATE_SYNC_CHAINS,
CHAIN_STATE_SYNC_GLOBAL_DB,
CHAIN_STATE_SYNC_ALL,
CHAIN_STATE_SYNC_ALL
} dap_stream_ch_chain_state_t;
typedef struct dap_stream_ch_chain_sync_request{
......
......@@ -196,53 +196,64 @@ static size_t dap_db_get_size_pdap_store_obj_t(pdap_store_obj_t store_obj)
* @param a_size_out[out] size of output structure
* @return NULL in case of an error
*/
dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp,
dap_list_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj, time_t a_timestamp,
size_t a_store_obj_count)
{
if(!a_store_obj || a_store_obj_count < 1)
if (!a_store_obj || a_store_obj_count < 1)
return NULL;
size_t l_data_size_out = sizeof(uint32_t); // size of output data
// calculate output structure size
for(size_t l_q = 0; l_q < a_store_obj_count; ++l_q)
dap_list_t *l_ret = NULL;
dap_store_obj_pkt_t *l_pkt;
uint32_t l_obj_count = 0, l_data_size_out = 0;
for (size_t l_q = 0; l_q < a_store_obj_count; ++l_q) {
l_data_size_out += dap_db_get_size_pdap_store_obj_t(&a_store_obj[l_q]);
dap_store_obj_pkt_t *l_pkt = DAP_NEW_Z_SIZE(dap_store_obj_pkt_t, sizeof(dap_store_obj_pkt_t) + l_data_size_out);
l_pkt->data_size = l_data_size_out;
l_pkt->timestamp = a_timestamp;
uint64_t l_offset = 0;
uint32_t l_count = (uint32_t) a_store_obj_count;
memcpy(l_pkt->data + l_offset, &l_count, sizeof(uint32_t));
l_offset += sizeof(uint32_t);
for(size_t l_q = 0; l_q < a_store_obj_count; ++l_q) {
dap_store_obj_t obj = a_store_obj[l_q];
//uint16_t section_size = (uint16_t) dap_strlen(obj.section);
uint16_t group_size = (uint16_t) dap_strlen(obj.group);
uint16_t key_size = (uint16_t) dap_strlen(obj.key);
memcpy(l_pkt->data + l_offset, &obj.type, sizeof(int));
l_offset += sizeof(int);
//memcpy(l_pkt->data + l_offset, &section_size, sizeof(uint16_t));
//l_offset += sizeof(uint16_t);
//memcpy(l_pkt->data + l_offset, obj.section, section_size);
//l_offset += section_size;
memcpy(l_pkt->data + l_offset, &group_size, sizeof(uint16_t));
l_offset += sizeof(uint16_t);
memcpy(l_pkt->data + l_offset, obj.group, group_size);
l_offset += group_size;
memcpy(l_pkt->data + l_offset, &obj.id, sizeof(uint64_t));
l_offset += sizeof(uint64_t);
memcpy(l_pkt->data + l_offset, &obj.timestamp, sizeof(time_t));
l_offset += sizeof(time_t);
memcpy(l_pkt->data + l_offset, &key_size, sizeof(uint16_t));
l_offset += sizeof(uint16_t);
memcpy(l_pkt->data + l_offset, obj.key, key_size);
l_offset += key_size;
memcpy(l_pkt->data + l_offset, &obj.value_len, sizeof(size_t));
l_offset += sizeof(size_t);
memcpy(l_pkt->data + l_offset, obj.value, obj.value_len);
l_offset += obj.value_len;
if (l_data_size_out > DAP_CHAIN_PKT_EXPECT_SIZE || (l_q == a_store_obj_count - 1 && l_data_size_out)) {
l_pkt = DAP_NEW_Z_SIZE(dap_store_obj_pkt_t, sizeof(dap_store_obj_pkt_t) + l_data_size_out);
l_pkt->data_size = l_data_size_out;
l_pkt->timestamp = a_timestamp;
l_pkt->obj_count = l_q + 1 - l_obj_count;
l_ret = dap_list_append(l_ret, l_pkt);
l_data_size_out = 0;
l_obj_count = l_q + 1;
}
}
assert(l_data_size_out == l_offset);
return l_pkt;
l_obj_count = 0;
for (dap_list_t *l_iter = l_ret; l_iter; l_iter = dap_list_next(l_iter)) {
l_pkt = (dap_store_obj_pkt_t *)l_iter->data;
uint64_t l_offset = 0;
for(size_t l_q = 0; l_q < l_pkt->obj_count; ++l_q) {
dap_store_obj_t obj = a_store_obj[l_obj_count + l_q];
//uint16_t section_size = (uint16_t) dap_strlen(obj.section);
uint16_t group_size = (uint16_t) dap_strlen(obj.group);
uint16_t key_size = (uint16_t) dap_strlen(obj.key);
memcpy(l_pkt->data + l_offset, &obj.type, sizeof(int));
l_offset += sizeof(int);
//memcpy(l_pkt->data + l_offset, &section_size, sizeof(uint16_t));
//l_offset += sizeof(uint16_t);
//memcpy(l_pkt->data + l_offset, obj.section, section_size);
//l_offset += section_size;
memcpy(l_pkt->data + l_offset, &group_size, sizeof(uint16_t));
l_offset += sizeof(uint16_t);
memcpy(l_pkt->data + l_offset, obj.group, group_size);
l_offset += group_size;
memcpy(l_pkt->data + l_offset, &obj.id, sizeof(uint64_t));
l_offset += sizeof(uint64_t);
memcpy(l_pkt->data + l_offset, &obj.timestamp, sizeof(time_t));
l_offset += sizeof(time_t);
memcpy(l_pkt->data + l_offset, &key_size, sizeof(uint16_t));
l_offset += sizeof(uint16_t);
memcpy(l_pkt->data + l_offset, obj.key, key_size);
l_offset += key_size;
memcpy(l_pkt->data + l_offset, &obj.value_len, sizeof(size_t));
l_offset += sizeof(size_t);
memcpy(l_pkt->data + l_offset, obj.value, obj.value_len);
l_offset += obj.value_len;
}
l_obj_count += l_pkt->obj_count;
assert(l_pkt->data_size == l_offset);
}
return l_ret;
}
/**
* deserialization
......@@ -255,9 +266,7 @@ dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *pkt, siz
if(!pkt || pkt->data_size < 1)
return NULL;
uint64_t offset = 0;
uint32_t count;
memcpy(&count, pkt->data, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t count = pkt->obj_count;
dap_store_obj_t *store_obj = DAP_NEW_Z_SIZE(dap_store_obj_t, count * sizeof(struct dap_store_obj));
for(size_t q = 0; q < count; ++q) {
dap_store_obj_t *obj = store_obj + q;
......
......@@ -77,7 +77,7 @@ static char* dap_db_new_history_timestamp()
*
* return dap_store_obj_pkt_t*
*/
uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out)
dap_list_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out)
{
if(!a_obj)
return NULL;
......@@ -122,15 +122,18 @@ uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out)
i++;
}
// serialize data
dap_store_obj_pkt_t *l_data_out = dap_store_packet_multiple(l_store_obj, l_timestamp, l_count);
dap_list_t *l_data_out = dap_store_packet_multiple(l_store_obj, l_timestamp, l_count);
dap_store_obj_free(l_store_obj, l_count);
dap_strfreev(l_keys);
if(l_data_out && a_data_size_out) {
*a_data_size_out = sizeof(dap_store_obj_pkt_t) + l_data_out->data_size;
*a_data_size_out = 0;
for (dap_list_t *l_iter = l_data_out; l_iter; l_iter = dap_list_next(l_iter)) {
*a_data_size_out += sizeof(dap_store_obj_pkt_t) + ((dap_store_obj_pkt_t *)l_data_out)->data_size;
}
}
return (uint8_t*) l_data_out;
return l_data_out;
}
......@@ -1326,7 +1329,6 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_gr
dap_db_log_list_t *l_dap_db_log_list = DAP_NEW_Z(dap_db_log_list_t);
size_t l_add_groups_num = 0;// number of group
//size_t l_add_groups_items_num = 0;// number items in all groups
dap_list_t *l_add_groups_mask = a_add_groups_mask;
// calc l_add_groups_num
while(l_add_groups_mask) {
......@@ -1337,7 +1339,6 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_gr
l_add_groups_mask = dap_list_next(l_add_groups_mask);
}
//size_t l_add_groups_num = dap_list_length(a_add_groups_mask);
size_t l_data_size_out_main = dap_chain_global_db_driver_count(GROUP_LOCAL_HISTORY, first_id);
size_t *l_data_size_out_add_items = DAP_NEW_Z_SIZE(size_t, sizeof(size_t) * l_add_groups_num);
uint64_t *l_group_last_id = DAP_NEW_Z_SIZE(uint64_t, sizeof(uint64_t) * l_add_groups_num);
......@@ -1366,9 +1367,6 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_gr
}
if(!(l_data_size_out_main + l_data_size_out_add_items_count))
return NULL;
// debug
// if(l_data_size_out>11)
// l_data_size_out = 11;
l_dap_db_log_list->item_start = first_id;
l_dap_db_log_list->item_last = first_id + l_data_size_out_main;
l_dap_db_log_list->items_number_main = l_data_size_out_main;
......@@ -1381,31 +1379,9 @@ dap_db_log_list_t* dap_db_log_list_start(uint64_t first_id, dap_list_t *a_add_gr
l_dap_db_log_list->group_names = l_group_names;
l_dap_db_log_list->group_cur = -1;
l_dap_db_log_list->add_groups = a_add_groups_mask;
// there are too few items, read items right now
if(0) {//l_data_size_out <= 10) {
dap_list_t *l_list = NULL;
// read first items
size_t l_objs_count = 0;
dap_store_obj_t *l_objs = dap_chain_global_db_cond_load(GROUP_LOCAL_HISTORY, first_id, &l_objs_count);
for(size_t i = 0; i < l_objs_count; i++) {
dap_store_obj_t *l_obj_cur = l_objs + i;
dap_global_db_obj_t *l_item = DAP_NEW(dap_global_db_obj_t);
l_item->id = l_obj_cur->id;
l_item->key = dap_strdup(l_obj_cur->key);
l_item->value = (uint8_t*) dap_strdup((char*) l_obj_cur->value);
l_list = dap_list_append(l_list, l_item);
}
l_dap_db_log_list->list_write = l_list;
l_dap_db_log_list->list_read = l_list;
//log_it(L_DEBUG, "loaded items n=%d", l_data_size_out);
dap_store_obj_free(l_objs, l_objs_count);
}
// start thread for items loading
else {
l_dap_db_log_list->is_process = true;
pthread_mutex_init(&l_dap_db_log_list->list_mutex, NULL);
pthread_create(&l_dap_db_log_list->thread, NULL, s_list_thread_proc, l_dap_db_log_list);
}
l_dap_db_log_list->is_process = true;
pthread_mutex_init(&l_dap_db_log_list->list_mutex, NULL);
pthread_create(&l_dap_db_log_list->thread, NULL, s_list_thread_proc, l_dap_db_log_list);
return l_dap_db_log_list;
}
......
......@@ -119,7 +119,7 @@ char* dap_chain_global_db_hash(const uint8_t *data, size_t data_size);
char* dap_chain_global_db_hash_fast(const uint8_t *data, size_t data_size);
// Get data according the history log
uint8_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out);
dap_list_t* dap_db_log_pack(dap_global_db_obj_t *a_obj, size_t *a_data_size_out);
// Get data according the history log
//char* dap_db_history_tx(dap_chain_hash_fast_t * a_tx_hash, const char *a_group_mempool);
......
......@@ -29,6 +29,8 @@
#include "dap_common.h"
#include "dap_list.h"
#define DAP_CHAIN_PKT_EXPECT_SIZE 7168
typedef struct dap_store_obj {
uint64_t id;
time_t timestamp;
......@@ -42,8 +44,9 @@ typedef struct dap_store_obj {
}DAP_ALIGN_PACKED dap_store_obj_t, *pdap_store_obj_t;
typedef struct dap_store_obj_pkt {
time_t timestamp;
size_t data_size;
uint64_t timestamp;
uint64_t data_size;
uint32_t obj_count;
uint8_t data[];
}__attribute__((packed)) dap_store_obj_pkt_t;
......@@ -90,7 +93,7 @@ bool dap_chain_global_db_driver_is(const char *a_group, const char *a_key);
size_t dap_chain_global_db_driver_count(const char *a_group, uint64_t id);
dap_list_t* dap_chain_global_db_driver_get_groups_by_mask(const char *a_group_mask);
dap_store_obj_pkt_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj,
dap_list_t *dap_store_packet_multiple(pdap_store_obj_t a_store_obj,
time_t a_timestamp, size_t a_store_obj_count);
dap_store_obj_t *dap_store_unpacket_multiple(const dap_store_obj_pkt_t *a_pkt,
size_t *a_store_obj_count);
......@@ -268,7 +268,9 @@ void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const c
l_obj->type = (uint8_t)a_op_code;
DAP_DELETE(l_obj->group);
l_obj->group = dap_strdup(a_group);
dap_store_obj_pkt_t *l_data_out = dap_store_packet_multiple(l_obj, l_obj->timestamp, 1);
dap_list_t *l_list_out = dap_store_packet_multiple(l_obj, l_obj->timestamp, 1);
// Expect only one element in list
dap_store_obj_pkt_t *l_data_out = (dap_store_obj_pkt_t *)l_list_out->data;
dap_store_obj_free(l_obj, 1);
dap_chain_t *l_chain = dap_chain_net_get_chain_by_name(l_net, "gdb");
dap_chain_id_t l_chain_id = l_chain ? l_chain->id : (dap_chain_id_t) {};
......@@ -278,7 +280,7 @@ void dap_chain_net_sync_gdb_broadcast(void *a_arg, const char a_op_code, const c
dap_stream_ch_chain_pkt_write_unsafe(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_GLOBAL_DB, l_net->pub.id,
l_chain_id, l_net->pub.cell_id, l_data_out, sizeof(dap_store_obj_pkt_t) + l_data_out->data_size);
}
DAP_DELETE(l_data_out);
dap_list_free_full(l_list_out, free);
}
}
......@@ -389,10 +391,10 @@ static int s_net_states_proc(dap_chain_net_t * l_net)
case NODE_ROLE_ARCHIVE:
case NODE_ROLE_CELL_MASTER: {
// Add other root nodes as synchronization links
while (dap_list_length(l_pvt_net->links_info) < s_max_links_count) {
int i = rand() % l_pvt_net->seed_aliases_count;
for (int i = 0; i < MIN(s_max_links_count, l_pvt_net->seed_aliases_count); i++) {
dap_chain_node_addr_t *l_link_addr = dap_chain_node_alias_find(l_net, l_pvt_net->seed_aliases[i]);
dap_chain_node_info_read(l_net, l_link_addr);
dap_chain_node_info_t *l_link_node_info = dap_chain_node_info_read(l_net, l_link_addr);
l_pvt_net->links_info = dap_list_append(l_pvt_net->links_info, l_link_node_info);
}
} break;
case NODE_ROLE_FULL:
......@@ -494,7 +496,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net)
}
// wait for finishing of request
int timeout_ms = 300000; // 5 min = 300 sec = 300 000 ms
int timeout_ms = 30000; // 5 min = 300 sec = 300 000 ms
// TODO add progress info to console
l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
switch (l_res) {
......@@ -554,7 +556,7 @@ static int s_net_states_proc(dap_chain_net_t * l_net)
dap_stream_ch_chain_pkt_write_mt(l_worker, l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNC_CHAINS, l_net->pub.id,
l_chain->id, l_net->pub.cell_id, &l_request, sizeof(l_request));
// wait for finishing of request
int timeout_ms = 120000; // 2 min = 120 sec = 120 000 ms
int timeout_ms = 20000; // 2 min = 120 sec = 120 000 ms
// TODO add progress info to console
if (dap_client_get_stream_ch(l_node_client->client, dap_stream_ch_chain_get_id())) {
l_res = dap_chain_node_client_wait(l_node_client, NODE_CLIENT_STATE_SYNCED, timeout_ms);
......
......@@ -1223,6 +1223,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
assert(l_tun);
// Unsafely send it
dap_events_socket_write_unsafe( l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size);
dap_events_socket_set_writable_unsafe(l_tun->es, true);
} break;
// for servier only
......@@ -1231,6 +1232,7 @@ void s_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
assert(l_tun);
// Unsafely send it
size_t l_ret = dap_events_socket_write_unsafe( l_tun->es, l_vpn_pkt->data, l_vpn_pkt->header.op_data.data_size);
dap_events_socket_set_writable_unsafe(l_tun->es, true);
if( l_ret)
s_update_limits (a_ch, l_srv_session, l_usage,l_ret );
} break;
......