Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (4)
Showing
with 294 additions and 75 deletions
......@@ -2,7 +2,7 @@ project(cellframe-sdk C)
cmake_minimum_required(VERSION 2.8)
set(CMAKE_C_STANDARD 11)
set(CELLFRAME_SDK_NATIVE_VERSION "2.8-10")
set(CELLFRAME_SDK_NATIVE_VERSION "2.8-11")
add_definitions ("-DCELLFRAME_SDK_VERSION=\"${CELLFRAME_SDK_NATIVE_VERSION}\"")
set(DAPSDK_MODULES "")
......
......@@ -398,6 +398,23 @@ DAP_STATIC_INLINE void DAP_AtomicUnlock( dap_spinlock_t *lock )
__sync_lock_release( lock );
}
DAP_INLINE void dap_uint_to_hex(char *arr, uint64_t val, short size) {
short i = 0;
for (i = 0; i < size; ++i) {
arr[i] = (char)(((uint64_t) val >> (8 * (size - 1 - i))) & 0xFFu);
}
}
DAP_INLINE uint64_t dap_hex_to_uint(const char *arr, short size) {
uint64_t val = 0;
short i = 0;
for (i = 0; i < size; ++i){
uint8_t byte = (uint8_t) *arr++;
val = (val << 8) | (byte & 0xFFu);
}
return val;
}
extern char *g_sys_dir_path;
//int dap_common_init( const char * a_log_file );
......
......@@ -46,7 +46,7 @@ typedef void * dap_chain_atom_ptr_t;
typedef struct dap_chain_atom_iter{
dap_chain_t * chain;
dap_chain_atom_ptr_t cur;
dap_chain_hash_fast_t cur_hash;
dap_chain_hash_fast_t *cur_hash;
size_t cur_size;
void * cur_item;
void * _inheritor;
......
......@@ -326,5 +326,4 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
*/
void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
{
dap_stream_ch_set_ready_to_write_unsafe(a_ch, false);
}
......@@ -115,6 +115,7 @@ static void s_gdb_in_pkt_error_worker_callback(dap_worker_t *a_thread, void *a_a
static bool s_debug_more=false;
static uint_fast16_t s_update_pack_size=100; // Number of hashes packed into the one packet
static uint_fast16_t s_skip_in_reactor_count=50; // Number of hashes packed to skip in one reactor loop callback out packet
/**
* @brief dap_stream_ch_chain_init
* @return
......@@ -854,11 +855,29 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS :{
uint l_count_added=0;
uint l_count_total=0;
dap_chain_t * l_chain = dap_chain_find_by_id(l_chain_pkt->hdr.net_id, l_chain_pkt->hdr.chain_id);
if (! l_chain){
log_it(L_ERROR, "Invalid UPDATE_CHAINS packet from %s with ext_id %016"DAP_UINT64_FORMAT_x" net id 0x%016"DAP_UINT64_FORMAT_x" chain id 0x%016"DAP_UINT64_FORMAT_x" cell_id 0x%016"DAP_UINT64_FORMAT_x" in packet", a_ch->stream->esocket->remote_addr_str?
a_ch->stream->esocket->remote_addr_str: "<unknown>", l_chain_pkt->hdr.ext_id,
l_chain_pkt->hdr.net_id.uint64, l_chain_pkt->hdr.chain_id.uint64,
l_chain_pkt->hdr.cell_id.uint64);
dap_stream_ch_chain_pkt_write_error_unsafe(a_ch, l_chain_pkt->hdr.net_id.uint64,
l_chain_pkt->hdr.chain_id.uint64, l_chain_pkt->hdr.cell_id.uint64,
"ERROR_NET_INVALID_ID");
// Who are you? I don't know you! go away!
a_ch->stream->esocket->flags |= DAP_SOCK_SIGNAL_CLOSE;
break;
}
dap_chain_atom_iter_t * l_iter = l_chain->callback_atom_iter_create(l_chain);
for ( dap_stream_ch_chain_update_element_t * l_element =(dap_stream_ch_chain_update_element_t *) l_chain_pkt->data;
(size_t) (((byte_t*)l_element) - l_chain_pkt->data ) < l_chain_pkt_data_size;
l_element++){
dap_stream_ch_chain_hash_item_t * l_hash_item = NULL;
HASH_FIND(hh,l_ch_chain->remote_atoms, &l_element->hash, sizeof (l_element->hash), l_hash_item );
HASH_FIND(hh,l_ch_chain->remote_atoms , &l_element->hash, sizeof (l_element->hash), l_hash_item );
if( ! l_hash_item ){
l_hash_item = DAP_NEW(dap_stream_ch_chain_hash_item_t);
memcpy(&l_hash_item->hash, &l_element->hash, sizeof (l_element->hash));
......@@ -876,7 +895,7 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
}
if (s_debug_more)
log_it(L_INFO,"In: Added %u from %u remote atom hash in list",l_count_added,l_count_total);
l_chain->callback_atom_iter_delete(l_iter);
}break;
// End of response
//case DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_END:{
......@@ -1173,12 +1192,20 @@ void s_stream_ch_packet_in(dap_stream_ch_t* a_ch, void* a_arg)
*/
void dap_stream_ch_chain_go_idle ( dap_stream_ch_chain_t * a_ch_chain)
{
a_ch_chain->is_on_request = false;
a_ch_chain->state = CHAIN_STATE_IDLE;
//log_it(L_DEBUG, "CHAIN_STATE_IDLE");
if(s_debug_more)
log_it(L_INFO, "Go in CHAIN_STATE_IDLE");
// Cleanup after request
memset(&a_ch_chain->request, 0, sizeof(a_ch_chain->request));
memset(&a_ch_chain->request_hdr, 0, sizeof(a_ch_chain->request_hdr));
if(a_ch_chain->request_atom_iter)
if(a_ch_chain->request_atom_iter->chain)
if(a_ch_chain->request_atom_iter->chain->callback_atom_iter_delete){
a_ch_chain->request_atom_iter->chain->callback_atom_iter_delete(a_ch_chain->request_atom_iter);
return;
}
DAP_DEL_Z(a_ch_chain->request_atom_iter);
}
......@@ -1259,9 +1286,7 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
dap_stream_ch_chain_update_element_t * l_data= DAP_NEW_Z_SIZE(dap_stream_ch_chain_update_element_t,sizeof (dap_stream_ch_chain_update_element_t)*s_update_pack_size);
size_t l_data_size=0;
for(uint_fast16_t n=0; n<s_update_pack_size && (l_ch_chain->request_atom_iter && l_ch_chain->request_atom_iter->cur);n++){
// If present smth - hash it
dap_hash_fast(l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size,&l_data[n].hash);
l_data[n].size=l_ch_chain->request_atom_iter->cur_size;
memcpy(&l_data[n].hash, l_ch_chain->request_atom_iter->cur_hash, sizeof (l_data[n].hash));
// Shift offset counter
l_data_size += sizeof (dap_stream_ch_chain_update_element_t);
// Then get next atom
......@@ -1299,19 +1324,17 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
// Synchronize chains
case CHAIN_STATE_SYNC_CHAINS: {
if (l_ch_chain->request_atom_iter && l_ch_chain->request_atom_iter->cur) { // Process one chain from l_ch_chain->request_atom_iter
bool l_was_sent_smth=false;
// Process one chain from l_ch_chain->request_atom_iter
// Pack loop to skip quicker
for(uint_fast16_t k=0; k<s_skip_in_reactor_count && l_ch_chain->request_atom_iter && l_ch_chain->request_atom_iter->cur; k++){
// Check if present and skip if present
dap_stream_ch_chain_hash_item_t * l_hash_item = NULL;
dap_chain_hash_fast_t l_request_atom_hash;
dap_hash_fast(l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size,&l_request_atom_hash);
HASH_FIND(hh,l_ch_chain->remote_atoms, &l_request_atom_hash , sizeof (l_hash_item->hash), l_hash_item );
HASH_FIND(hh,l_ch_chain->remote_atoms, l_ch_chain->request_atom_iter->cur_hash , sizeof (l_hash_item->hash), l_hash_item );
if( l_hash_item ){ // If found - skip it
if(s_debug_more){
char l_request_atom_hash_str[81]={[0]='\0'};
dap_chain_hash_fast_to_str(&l_request_atom_hash,l_request_atom_hash_str,sizeof (l_request_atom_hash_str));
dap_chain_hash_fast_to_str(l_ch_chain->request_atom_iter->cur_hash,l_request_atom_hash_str,sizeof (l_request_atom_hash_str));
log_it(L_DEBUG, "Out CHAIN: skip atom hash %s because its already present in remote atom hash table",
l_request_atom_hash_str);
}
......@@ -1327,6 +1350,8 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_CHAIN, l_ch_chain->request_hdr.net_id.uint64,
l_ch_chain->request_hdr.chain_id.uint64, l_ch_chain->request_hdr.cell_id.uint64,
l_ch_chain->request_atom_iter->cur, l_ch_chain->request_atom_iter->cur_size);
l_was_sent_smth = true;
break; // If sent smth - break out from pack loop
l_ch_chain->stats_request_atoms_processed++;
l_hash_item->size = l_ch_chain->request_atom_iter->cur_size;
......@@ -1335,19 +1360,27 @@ void s_stream_ch_packet_out(dap_stream_ch_t* a_ch, void* a_arg)
}
// Then get next atom and populate new last
l_ch_chain->request_atom_iter->chain->callback_atom_iter_get_next(l_ch_chain->request_atom_iter, NULL);
} else { // All chains synced
}
if(!l_ch_chain->request_atom_iter ||
( l_ch_chain->request_atom_iter &&(! l_ch_chain->request_atom_iter->cur) ) ) { // All chains synced
dap_stream_ch_chain_sync_request_t l_request = {0};
// last message
l_was_sent_smth = true;
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS,
l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64, &l_request, sizeof(l_request));
log_it( L_INFO,"Synced: %llu atoms processed", l_ch_chain->stats_request_atoms_processed);
l_ch_chain->is_on_request = false;
dap_stream_ch_chain_go_idle(l_ch_chain);
if (l_ch_chain->callback_notify_packet_out)
l_ch_chain->callback_notify_packet_out(l_ch_chain, DAP_STREAM_CH_CHAIN_PKT_TYPE_SYNCED_CHAINS, NULL,
0, l_ch_chain->callback_notify_arg);
}
if (! l_was_sent_smth ){
// Sending dumb packet with nothing to inform remote thats we're just skiping atoms, nothing freezed
dap_stream_ch_chain_pkt_write_unsafe(a_ch, DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_TSD,
l_ch_chain->request_hdr.net_id.uint64, l_ch_chain->request_hdr.chain_id.uint64,
l_ch_chain->request_hdr.cell_id.uint64, NULL, 0);
}
} break;
default: break;
}
......
......@@ -25,15 +25,14 @@
#include <stdio.h>
#include <stdint.h>
#include <pthread.h>
#include <errno.h>
#include <time.h>
#include <assert.h>
//#include <string.h>
#include "uthash.h"
#include "dap_chain_common.h"
#include "dap_strfuncs.h"
//#include "dap_chain_global_db_pvt.h"
#include "dap_chain_common.h"
#include "dap_chain_global_db_hist.h"
#include "dap_chain_global_db.h"
......@@ -88,7 +87,10 @@ typedef struct history_extra_group_item
static history_group_item_t * s_history_group_items = NULL;
static char *s_storage_path = NULL;
static history_extra_group_item_t * s_history_extra_group_items = NULL;
#ifdef DAP_OS_UNIX
static int cmd_gdb_import(int argc, char ** argv, void *arg_func, char ** a_str_reply);
static int s_command_gdb_export(int argc, char ** argv, void *arg_func, char ** a_str_reply);
#endif
char * extract_group_prefix(const char * a_group);
/**
......@@ -115,6 +117,7 @@ char * extract_group_prefix(const char* a_group)
return l_group_prefix;
}
/*
* Get history group by group name
*/
......@@ -249,6 +252,8 @@ int dap_chain_global_db_init(dap_config_t * g_config)
unlock();
if( res != 0 )
log_it(L_CRITICAL, "Hadn't initialized db driver \"%s\" on path \"%s\"", l_driver_name, s_storage_path );
return res;
}
......@@ -492,7 +497,7 @@ bool dap_chain_global_db_gr_set(char *a_key, void *a_value, size_t a_value_len,
//memcpy(store_data.value, a_value, a_value_len);
store_data.value_len = (a_value_len == (size_t) -1) ? dap_strlen((const char*) a_value) : a_value_len;
store_data.group = dap_strdup(a_group);
store_data.group = (char*)a_group;//dap_strdup(a_group);
store_data.timestamp = time(NULL);
lock();
int l_res = dap_chain_global_db_driver_add(&store_data, 1);
......@@ -556,13 +561,12 @@ bool dap_chain_global_db_gr_del(char *a_key,const char *a_group)
{
if(!a_key)
return NULL;
pdap_store_obj_t store_data = DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(struct dap_store_obj));
store_data->key = a_key;
// store_data->c_key = a_key;
store_data->group = dap_strdup(a_group);
//store_data->c_group = a_group;
dap_store_obj_t store_data;// = DAP_NEW_Z_SIZE(dap_store_obj_t, sizeof(struct dap_store_obj));
memset(&store_data, 0, sizeof(dap_store_obj_t));
store_data.key = a_key;
store_data.group = (char*)a_group;
lock();
int l_res = dap_chain_global_db_driver_delete(store_data, 1);
int l_res = dap_chain_global_db_driver_delete(&store_data, 1);
unlock();
// do not add to history if l_res=1 (already deleted)
if(!l_res) {
......@@ -576,7 +580,7 @@ bool dap_chain_global_db_gr_del(char *a_key,const char *a_group)
if(l_history_group_item) {
if(l_history_group_item->auto_track) {
lock();
dap_db_history_add('d', store_data, 1, l_history_group_item->group_name_for_history);
dap_db_history_add('d', &store_data, 1, l_history_group_item->group_name_for_history);
unlock();
}
if(l_history_group_item->callback_notify)
......@@ -590,7 +594,7 @@ bool dap_chain_global_db_gr_del(char *a_key,const char *a_group)
if(l_history_extra_group_item) {
lock();
dap_db_history_add('d', store_data, 1, l_history_extra_group_item->group_name_for_history);
dap_db_history_add('d', &store_data, 1, l_history_extra_group_item->group_name_for_history);
unlock();
if(l_history_extra_group_item->callback_notify)
l_history_extra_group_item->callback_notify(l_history_extra_group_item->callback_arg, 'd',
......@@ -770,12 +774,11 @@ bool dap_chain_global_db_gr_save(dap_global_db_obj_t* a_objs, size_t a_objs_coun
{
dap_store_obj_t *l_store_data = DAP_NEW_Z_SIZE(dap_store_obj_t, a_objs_count * sizeof(struct dap_store_obj));
time_t l_timestamp = time(NULL);
char *l_group = dap_strdup(a_group);
for(size_t q = 0; q < a_objs_count; ++q) {
dap_store_obj_t *store_data_cur = l_store_data + q;
dap_global_db_obj_t *a_obj_cur = a_objs + q;
store_data_cur->key = a_obj_cur->key;
store_data_cur->group = l_group;
store_data_cur->group = (char*)a_group;
store_data_cur->value = a_obj_cur->value;
store_data_cur->value_len = a_obj_cur->value_len;
store_data_cur->timestamp = l_timestamp;
......@@ -816,11 +819,9 @@ bool dap_chain_global_db_gr_save(dap_global_db_obj_t* a_objs, size_t a_objs_coun
}
DAP_DELETE(l_store_data); //dap_store_obj_free(store_data, a_objs_count);
if(!l_res) {
DAP_DELETE(l_group);
return true;
}
}
DAP_DELETE(l_group);
return false;
}
......
......@@ -57,28 +57,6 @@ static pcdb_instance s_cdb = NULL;
static pthread_mutex_t cdb_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_rwlock_t cdb_rwlock = PTHREAD_RWLOCK_INITIALIZER;
static inline void dap_cdb_uint_to_hex(char *arr, uint64_t val, short size) {
short i = 0;
for (i = 0; i < size; ++i) {
arr[i] = (char)(((uint64_t) val >> (8 * (size - 1 - i))) & 0xFFu);
}
}
static inline uint64_t dap_cdb_hex_to_uint(const char *arr, short size) {
uint64_t val = 0;
short i = 0;
for (i = 0; i < size; ++i){
uint8_t byte = (uint8_t) *arr++;
/*if (byte >= 'a' && byte <='f'){
byte = byte - 'a' + 10;
} else if (byte >= 'A' && byte <='F') {
byte = byte - 'A' + 10;
}*/
val = (val << 8) | (byte & 0xFFu);
}
return val;
}
static void cdb_serialize_val_to_dap_store_obj(pdap_store_obj_t a_obj, const char *key, const char *val) {
if (!key || !val) {
a_obj = NULL;
......@@ -86,14 +64,14 @@ static void cdb_serialize_val_to_dap_store_obj(pdap_store_obj_t a_obj, const cha
}
int offset = 0;
a_obj->key = dap_strdup(key);
a_obj->id = dap_cdb_hex_to_uint(val, sizeof(uint64_t));
a_obj->id = dap_hex_to_uint(val, sizeof(uint64_t));
offset += sizeof(uint64_t);
a_obj->value_len = dap_cdb_hex_to_uint(val + offset, sizeof(unsigned long));
a_obj->value_len = dap_hex_to_uint(val + offset, sizeof(unsigned long));
offset += sizeof(unsigned long);
a_obj->value = DAP_NEW_SIZE(uint8_t, a_obj->value_len);
memcpy(a_obj->value, val + offset, a_obj->value_len);
offset += a_obj->value_len;
a_obj->timestamp = (time_t)dap_cdb_hex_to_uint(val + offset, sizeof(time_t));
a_obj->timestamp = (time_t)dap_hex_to_uint(val + offset, sizeof(time_t));
}
bool dap_cdb_get_last_obj_iter_callback(void *arg, const char *key, int ksize, const char *val, int vsize, uint32_t expire, uint64_t oid) {
......@@ -132,7 +110,7 @@ bool dap_cdb_get_cond_obj_iter_callback(void *arg, const char *key, int ksize, c
UNUSED(expire);
UNUSED(oid);
if (dap_cdb_hex_to_uint(val, sizeof(uint64_t)) < ((pobj_arg)arg)->id) {
if (dap_hex_to_uint(val, sizeof(uint64_t)) < ((pobj_arg)arg)->id) {
return true;
}
pdap_store_obj_t l_obj = (pdap_store_obj_t)((pobj_arg)arg)->o;
......@@ -216,6 +194,16 @@ int dap_db_driver_cdb_init(const char *a_cdb_path, dap_db_driver_callbacks_t *a_
return -1;
}
for (d = readdir(dir); d; d = readdir(dir)) {
#ifdef _DIRENT_HAVE_D_TYPE
if (d->d_type != DT_DIR)
continue;
#else
struct _stat buf;
int res = _stat(d->d_name, &buf);
if (!S_ISDIR(buf.st_mode) || !res) {
continue;
}
#endif
if (!dap_strcmp(d->d_name, ".") || !dap_strcmp(d->d_name, "..")) {
continue;
}
......@@ -499,16 +487,16 @@ int dap_db_driver_cdb_apply_store_obj(pdap_store_obj_t a_store_obj) {
l_rec.key = dap_strdup(a_store_obj->key);
int offset = 0;
char *l_val = DAP_NEW_Z_SIZE(char, sizeof(uint64_t) + sizeof(unsigned long) + a_store_obj->value_len + sizeof(time_t));
dap_cdb_uint_to_hex(l_val, ++l_cdb_i->id, sizeof(uint64_t));
dap_uint_to_hex(l_val, ++l_cdb_i->id, sizeof(uint64_t));
offset += sizeof(uint64_t);
dap_cdb_uint_to_hex(l_val + offset, a_store_obj->value_len, sizeof(unsigned long));
dap_uint_to_hex(l_val + offset, a_store_obj->value_len, sizeof(unsigned long));
offset += sizeof(unsigned long);
if(a_store_obj->value && a_store_obj->value_len){
memcpy(l_val + offset, a_store_obj->value, a_store_obj->value_len);
}
offset += a_store_obj->value_len;
unsigned long l_time = (unsigned long)a_store_obj->timestamp;
dap_cdb_uint_to_hex(l_val + offset, l_time, sizeof(time_t));
dap_uint_to_hex(l_val + offset, l_time, sizeof(time_t));
offset += sizeof(time_t);
l_rec.val = l_val;
if (cdb_set2(l_cdb_i->cdb, l_rec.key, (int)strlen(l_rec.key), l_rec.val, offset, CDB_INSERTCACHE | CDB_OVERWRITE, 0) != CDB_SUCCESS) {
......
......@@ -1105,14 +1105,13 @@ bool dap_db_history_add(char a_type, pdap_store_obj_t a_store_obj, size_t a_dap_
// key - timestamp
// value - keys of added/deleted data
l_store_data.key = dap_db_new_history_timestamp();
l_store_data.value = (uint8_t*) strdup(l_str);
l_store_data.value = (uint8_t*)l_str;
l_store_data.value_len = l_str_len + 1;
l_store_data.group = dap_strdup(a_group);//GROUP_LOCAL_HISTORY;
l_store_data.group = (char*)a_group;//GROUP_LOCAL_HISTORY;
l_store_data.timestamp = time(NULL);
int l_res = dap_chain_global_db_driver_add(&l_store_data, 1);
if(l_rec.keys_count > 1)
DAP_DELETE(l_rec.keys);
DAP_DELETE(l_store_data.value);
DAP_DELETE(l_str);
if(!l_res)
return true;
......
......@@ -109,7 +109,6 @@ dap_global_db_obj_t* dap_chain_global_db_load(size_t *a_data_size_out);
bool dap_chain_global_db_obj_save(void* a_store_data, size_t a_objs_count);
bool dap_chain_global_db_gr_save(dap_global_db_obj_t* a_objs, size_t a_objs_count, const char *a_group);
bool dap_chain_global_db_save(dap_global_db_obj_t* a_objs, size_t a_objs_count);
/**
* Calc hash for data
*
......
......@@ -44,8 +44,7 @@ endif()
if(UNIX)
target_link_libraries(${PROJECT_NAME} dap_core dap_crypto dap_client dap_stream_ch_chain dap_stream_ch_chain_net dap_stream_ch_chain_net_srv dap_chain
dap_chain_wallet dap_chain_net_srv dap_chain_mempool dap_chain_global_db dap_chain_net_srv_stake dap_chain_cs_none
resolv
)
resolv )
endif()
target_include_directories(${PROJECT_NAME} INTERFACE . )
......
......@@ -1026,10 +1026,19 @@ int dap_chain_node_cli_init(dap_config_t * g_config)
dap_chain_node_cli_cmd_item_create("stats", com_stats, NULL, "Print statistics",
"stats cpu");
// Exit
dap_chain_node_cli_cmd_item_create ("exit", com_exit, NULL, "Stop application and exit",
"exit\n" );
#ifdef DAP_OS_UNIX
// Export GDB to JSON
dap_chain_node_cli_cmd_item_create("gdb_export", cmd_gdb_export, NULL, "GDB export to file", "GDB export to file");
//Import GDB from JSON
dap_chain_node_cli_cmd_item_create("gdb_import", cmd_gdb_import, NULL, "GDB import from file", "GDB import from file");
#endif
// create thread for waiting of clients
pthread_t l_thread_id;
......
......@@ -81,6 +81,13 @@
#endif
#include "dap_chain_cell.h"
#include "dap_enc_base64.h"
#include <json-c/json.h>
#ifdef DAP_OS_UNIX
#include <dirent.h>
#endif
#include "dap_chain_common.h"
#include "dap_chain_datum.h"
#include "dap_chain_datum_token.h"
......@@ -94,9 +101,11 @@
#include "dap_stream_ch_chain.h"
#include "dap_stream_ch_chain_pkt.h"
#include "dap_stream_ch_chain_net_pkt.h"
#include "dap_enc_base64.h"
#define LOG_TAG "chain_node_cli_cmd"
/**
* Find in base addr by alias
*
......@@ -4021,3 +4030,163 @@ int com_print_log(int argc, char ** argv, void *arg_func, char **str_reply)
return 0;
}
#ifdef DAP_OS_UNIX
/**
* @brief cmd_gdb_export
* @param argc
* @param argv
* @param arg_func
* @param a_str_reply
* @return
*/
int cmd_gdb_export(int argc, char ** argv, void *arg_func, char ** a_str_reply)
{
int arg_index = 1;
const char *l_filename = NULL;
dap_chain_node_cli_find_option_val(argv, arg_index, argc, "filename", &l_filename);
if (!l_filename) {
dap_chain_node_cli_set_reply_text(a_str_reply, "gdb_export requires parameter 'filename'");
return -1;
}
const char *l_db_path = dap_config_get_item_str(g_config, "resources", "dap_global_db_path");
struct dirent *d;
DIR *dir = opendir(l_db_path);
if (!dir) {
log_it(L_ERROR, "Couldn't open db directory");
dap_chain_node_cli_set_reply_text(a_str_reply, "Couldn't open db directory");
return -1;
}
char l_path[strlen(l_db_path) + strlen(l_filename) + 12];
memset(l_path, '\0', sizeof(l_path));
dap_snprintf(l_path, sizeof(l_path), "%s/../%s.json", l_db_path, l_filename);
/*FILE *l_json_file = fopen(l_path, "a");
if (!l_json_file) {
log_it(L_ERROR, "Can't open file %s", l_path);
dap_chain_node_cli_set_reply_text(a_str_reply, "Can't open specified file");
return -1;
}*/
struct json_object *l_json = json_object_new_array();
for (d = readdir(dir); d; d = readdir(dir)) {
if (!dap_strcmp(d->d_name, ".") || !dap_strcmp(d->d_name, "..")) {
continue;
}
size_t l_data_size = 0;
pdap_store_obj_t l_data = dap_chain_global_db_obj_gr_get(NULL, &l_data_size, d->d_name);
log_it(L_INFO, "Exporting group %s, number of records: %d", d->d_name, l_data_size);
if (!l_data_size) {
continue;
}
struct json_object *l_json_group = json_object_new_array();
struct json_object *l_json_group_inner = json_object_new_object();
json_object_object_add(l_json_group_inner, "group", json_object_new_string(d->d_name));
for (size_t i = 0; i < l_data_size; ++i) {
size_t l_out_size = DAP_ENC_BASE64_ENCODE_SIZE((int64_t)l_data[i].value_len) + 1;
char *l_value_enc_str = DAP_NEW_Z_SIZE(char, l_out_size);
size_t l_enc_size = dap_enc_base64_encode(l_data[i].value, l_data[i].value_len, l_value_enc_str, DAP_ENC_DATA_TYPE_B64);
struct json_object *jobj = json_object_new_object();
json_object_object_add(jobj, "id", json_object_new_int64((int64_t)l_data[i].id));
json_object_object_add(jobj, "key", json_object_new_string(l_data[i].key));
json_object_object_add(jobj, "value", json_object_new_string(l_value_enc_str));
json_object_object_add(jobj, "value_len", json_object_new_int64((int64_t)l_data[i].value_len));
json_object_object_add(jobj, "timestamp", json_object_new_int64((int64_t)l_data[i].timestamp));
json_object_array_add(l_json_group, jobj);
DAP_FREE(l_value_enc_str);
}
json_object_object_add(l_json_group_inner, "records", l_json_group);
json_object_array_add(l_json, l_json_group_inner);
dap_store_obj_free(l_data, l_data_size);
}
if (json_object_to_file(l_path, l_json) == -1) {
#if JSON_C_MINOR_VERSION<15
log_it(L_CRITICAL, "Couldn't export JSON to file, error code %d", errno );
dap_chain_node_cli_set_reply_text (a_str_reply, "Couldn't export JSON to file, error code %d", errno );
#else
log_it(L_CRITICAL, "Couldn't export JSON to file, err '%s'", json_util_get_last_err());
dap_chain_node_cli_set_reply_text(a_str_reply, json_util_get_last_err());
#endif
json_object_put(l_json);
return -1;
}
json_object_put(l_json);
return 0;
}
/**
* @brief cmd_gdb_import
* @param argc
* @param argv
* @param arg_func
* @param a_str_reply
* @return
*/
int cmd_gdb_import(int argc, char ** argv, void *arg_func, char ** a_str_reply)
{
int arg_index = 1;
const char *l_filename = NULL;
dap_chain_node_cli_find_option_val(argv, arg_index, argc, "filename", &l_filename);
if (!l_filename) {
dap_chain_node_cli_set_reply_text(a_str_reply, "gdb_import requires parameter 'filename'");
return -1;
}
const char *l_db_path = dap_config_get_item_str(g_config, "resources", "dap_global_db_path");
char l_path[strlen(l_db_path) + strlen(l_filename) + 12];
memset(l_path, '\0', sizeof(l_path));
dap_snprintf(l_path, sizeof(l_path), "%s/../%s.json", l_db_path, l_filename);
struct json_object *l_json = json_object_from_file(l_path);
if (!l_json) {
#if JSON_C_MINOR_VERSION<15
log_it(L_CRITICAL, "Import error occured: code %d", errno);
dap_chain_node_cli_set_reply_text(a_str_reply, "Import error occured: code %d",errno);
#else
log_it(L_CRITICAL, "Import error occured: %s", json_util_get_last_err());
dap_chain_node_cli_set_reply_text(a_str_reply, json_util_get_last_err());
#endif
return -1;
}
for (size_t i = 0, l_groups_count = json_object_array_length(l_json); i < l_groups_count; ++i) {
struct json_object *l_group_obj = json_object_array_get_idx(l_json, i);
if (!l_group_obj) {
continue;
}
struct json_object *l_json_group_name = json_object_object_get(l_group_obj, "group");
const char *l_group_name = json_object_get_string(l_json_group_name);
// proc group name
log_it(L_INFO, "Group %d: %s", i, l_group_name);
struct json_object *l_json_records = json_object_object_get(l_group_obj, "records");
size_t l_records_count = json_object_array_length(l_json_records);
pdap_store_obj_t l_group_store = DAP_NEW_Z_SIZE(dap_store_obj_t, l_records_count * sizeof(dap_store_obj_t));
for (size_t j = 0; j < l_records_count; ++j) {
struct json_object *l_record, *l_id, *l_key, *l_value, *l_value_len, *l_ts;
l_record = json_object_array_get_idx(l_json_records, j);
l_id = json_object_object_get(l_record, "id");
l_key = json_object_object_get(l_record, "key");
l_value = json_object_object_get(l_record, "value");
l_value_len = json_object_object_get(l_record, "value_len");
l_ts = json_object_object_get(l_record, "timestamp");
//
l_group_store[j].id = (uint64_t)json_object_get_int64(l_id);
l_group_store[j].key = dap_strdup(json_object_get_string(l_key));
l_group_store[j].group = dap_strdup(l_group_name);
l_group_store[j].timestamp = json_object_get_int64(l_ts);
l_group_store[j].value_len = (uint64_t)json_object_get_int64(l_value_len);
l_group_store[j].type = 'a';
const char *l_value_str = json_object_get_string(l_value);
char *l_val = DAP_NEW_Z_SIZE(char, l_group_store[j].value_len);
size_t l_dec_size = dap_enc_base64_decode(l_value_str, strlen(l_value_str), l_val, DAP_ENC_DATA_TYPE_B64);
l_group_store[j].value = (uint8_t*)l_val;
}
if (dap_chain_global_db_driver_appy(l_group_store, l_records_count)) {
log_it(L_CRITICAL, "An error occured on importing group %s...", l_group_name);
}
dap_store_obj_free(l_group_store, l_records_count);
}
json_object_put(l_json);
return 0;
}
#endif
......@@ -322,7 +322,6 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha
l_chain_pkt->hdr.net_id.uint64 = l_net_id;
l_chain_pkt->hdr.cell_id.uint64 = l_cell_id.uint64;
l_chain_pkt->hdr.chain_id.uint64 = l_chain_id.uint64;
dap_stream_ch_pkt_write_unsafe(a_ch_chain->ch,DAP_STREAM_CH_CHAIN_PKT_TYPE_UPDATE_CHAINS_START ,
l_chain_pkt,l_chain_pkt_size);
DAP_DELETE(l_chain_pkt);
......@@ -343,8 +342,9 @@ static void s_ch_chain_callback_notify_packet_in(dap_stream_ch_chain_t* a_ch_cha
}
// Check if we over with it before
if ( !l_node_client->cur_cell ){
log_it(L_WARNING, "In: No current cell in sync state, anyway we over it");
if ( ! l_node_client->cur_cell ){
if(s_stream_ch_chain_debug_more)
log_it(L_INFO, "In: No current cell in sync state, anyway we over it");
}else
l_node_client->cur_cell =(dap_chain_cell_t *) l_node_client->cur_cell->hh.next;
......
......@@ -133,11 +133,12 @@ int com_print_log(int argc, char ** argv, void *arg_func, char **str_reply);
int com_stats(int argc, char ** argv, void *arg_func, char **str_reply);
int com_exit(int argc, char ** argv, void *arg_func, char **str_reply);
int cmd_gdb_import(int argc, char ** argv, void *arg_func, char ** a_str_reply);
int cmd_gdb_export(int argc, char ** argv, void *arg_func, char ** a_str_reply);
int com_mempool_delete(int argc, char ** argv, void *arg_func, char ** a_str_reply);
int com_mempool_list(int argc, char ** argv, void *arg_func, char ** a_str_reply);
int com_mempool_proc(int argc, char ** argv, void *arg_func, char ** a_str_reply);
/**
* Place public CA into the mempool
*/
......
......@@ -924,6 +924,7 @@ static dap_chain_atom_iter_t* s_chain_callback_atom_iter_create_from(dap_chain_t
dap_chain_cs_dag_event_item_t * l_atom_item;
HASH_FIND(hh, PVT(DAP_CHAIN_CS_DAG(a_chain))->events, &l_atom_hash, sizeof(l_atom_hash),l_atom_item );
l_atom_iter->cur_item = l_atom_item;
l_atom_iter->cur_hash = &l_atom_item->hash;
}
return l_atom_iter;
......@@ -982,9 +983,11 @@ static dap_chain_atom_ptr_t s_chain_callback_atom_iter_get_first(dap_chain_atom_
if ( a_atom_iter->cur_item ){
a_atom_iter->cur = ((dap_chain_cs_dag_event_item_t*) a_atom_iter->cur_item)->event;
a_atom_iter->cur_size = ((dap_chain_cs_dag_event_item_t*) a_atom_iter->cur_item)->event_size;
a_atom_iter->cur_hash = &((dap_chain_cs_dag_event_item_t*) a_atom_iter->cur_item)->hash;
}else{
a_atom_iter->cur = NULL;
a_atom_iter->cur_size = 0;
a_atom_iter->cur_hash = NULL;
}
if (a_ret_size)
......@@ -1089,6 +1092,7 @@ static dap_chain_atom_ptr_t s_chain_callback_atom_iter_find_by_hash(dap_chain_at
a_atom_iter->cur_item = l_event_item;
a_atom_iter->cur = l_event_item->event;
a_atom_iter->cur_size= l_event_item->event_size;
a_atom_iter->cur_hash = &l_event_item->hash;
if(a_atom_size)
*a_atom_size = l_event_item->event_size;
return l_event_item->event;
......@@ -1125,6 +1129,7 @@ static dap_chain_atom_ptr_t s_chain_callback_atom_iter_get_next( dap_chain_atom_
// if l_event_item=NULL then items are over
a_atom_iter->cur = l_event_item ? l_event_item->event : NULL;
a_atom_iter->cur_size = a_atom_iter->cur ? l_event_item->event_size : 0;
a_atom_iter->cur_hash = l_event_item ? &l_event_item->hash : NULL;
}
if(a_atom_size)
*a_atom_size = a_atom_iter->cur_size;
......