diff --git a/client_mempool.c b/client_mempool.c index 792740411fbb837d34a8a2088c96b05628b0214a..8e1b7f06e0e3005e3024a1272f4c4b2f286f7a63 100644 --- a/client_mempool.c +++ b/client_mempool.c @@ -28,6 +28,13 @@ static void a_response_proc(dap_client_t *a_client, void *str, size_t str_len) client_mempool_t *mempool = a_client->_inheritor; assert(mempool); if(mempool) { + if(str_len > 0) { + mempool->read_data_t.data = DAP_NEW_Z_SIZE(uint8_t, str_len + 1); + if(mempool->read_data_t.data) { + memcpy(mempool->read_data_t.data, str, str_len); + mempool->read_data_t.data_len = str_len; + } + } pthread_mutex_lock(&mempool->wait_mutex); mempool->state = CLIENT_MEMPOOL_SENDED; pthread_cond_signal(&mempool->wait_cond); @@ -102,6 +109,8 @@ client_mempool_t* client_mempool_connect(const char *addr) } /** + * wait for the complete of request + * * timeout_ms timeout in milliseconds * waited_state state which we will wait, sample CLIENT_MEMPOOL_CONNECTED or CLIENT_MEMPOOL_SENDED * return -1 false, 0 timeout, 1 end of connection or sending data @@ -112,23 +121,23 @@ int client_mempool_wait(client_mempool_t *mempool, int waited_state, int timeout if(!mempool) return -1; pthread_mutex_lock(&mempool->wait_mutex); - // have waited +// have waited if(mempool->state == waited_state) { pthread_mutex_unlock(&mempool->wait_mutex); return 1; } - // prepare for signal waiting +// prepare for signal waiting struct timespec to; clock_gettime(CLOCK_MONOTONIC, &to); int64_t nsec_new = to.tv_nsec + timeout_ms * 1000000ll; - // if the new number of nanoseconds is more than a second +// if the new number of nanoseconds is more than a second if(nsec_new > (long) 1e9) { to.tv_sec += nsec_new / (long) 1e9; to.tv_nsec = nsec_new % (long) 1e9; } else to.tv_nsec = (long) nsec_new; - // signal waiting +// signal waiting int wait = pthread_cond_timedwait(&mempool->wait_cond, &mempool->wait_mutex, &to); if(wait == 0) //0 ret = 1; @@ -138,41 +147,67 @@ int client_mempool_wait(client_mempool_t *mempool, int waited_state, int timeout return ret; } +/** + * get read data from server + */ +uint8_t* client_mempool_read(client_mempool_t *mempool, int *data_len) +{ + if(mempool && mempool->read_data_t.data_len > 0) { + + uint8_t*data = DAP_NEW_Z_SIZE(uint8_t, mempool->read_data_t.data_len + 1); + if(mempool->read_data_t.data) { + memcpy(data, mempool->read_data_t.data, mempool->read_data_t.data_len); + if(data_len) + *data_len = mempool->read_data_t.data_len; + return data; + } + } + return NULL; +} + void client_mempool_close(client_mempool_t *mempool) { - if(mempool) - { + if(mempool) { // TODO send last request for dehandshake with "SessionCloseAfterRequest=true" // ... dap_client_pvt_t *l_client_internal = DAP_CLIENT_PVT(mempool->a_client); DAP_DELETE(l_client_internal->uplink_addr); dap_client_delete(mempool->a_client); dap_events_delete(mempool->a_events); + DAP_DELETE(mempool->read_data_t.data); pthread_cond_destroy(&mempool->wait_cond); pthread_mutex_destroy(&mempool->wait_mutex); DAP_DELETE(mempool); } } -/** - * datum add in mempool - * - * return -1 not connected, 1 Send packet OK - */ -int client_mempool_send_datum(client_mempool_t *mempool, dap_datum_mempool_t *datum_mempool) +// set new state and delete previous read data +static void client_mempool_reset(client_mempool_t *mempool, int new_state) { - if(!mempool || !datum_mempool || mempool->state<CLIENT_MEMPOOL_CONNECTED) + if(!mempool) + return; + pthread_mutex_lock(&mempool->wait_mutex); + mempool->read_data_t.data_len = 0; + DAP_DELETE(mempool->read_data_t.data); + mempool->read_data_t.data = NULL; + mempool->state = new_state; + pthread_mutex_unlock(&mempool->wait_mutex); +} + +// send request to server +static int client_mempool_send_request(client_mempool_t *mempool, dap_datum_mempool_t *datum_mempool, uint8_t action) +{ + if(!mempool || !datum_mempool || mempool->state < CLIENT_MEMPOOL_CONNECTED) return -1; const char * a_path = "mempool"; const char *a_suburl = "mempool"; //"enc_init"; const char* a_query = ""; - uint8_t action = DAP_DATUM_MEMPOOL_ADD; size_t a_request_size = 0; uint8_t *a_request = dap_datum_mempool_serialize(datum_mempool, &a_request_size); uint8_t *a_request_out = DAP_NEW_Z_SIZE(uint8_t, a_request_size * 2 + 1); // a_request + 1 byte for type action *((uint8_t*) a_request_out) = action; bin2hex(a_request_out + 1, a_request, a_request_size); - mempool->state = CLIENT_MEMPOOL_SEND; + client_mempool_reset(mempool, CLIENT_MEMPOOL_SEND); dap_client_request_enc(mempool->a_client, a_path, a_suburl, a_query, a_request_out, a_request_size * 2 + 1, a_response_proc, a_response_error); DAP_DELETE(a_request); @@ -180,38 +215,32 @@ int client_mempool_send_datum(client_mempool_t *mempool, dap_datum_mempool_t *da return 1; } +/** + * datum add in mempool + * + * return -1 not connected or error, 1 send packet OK + */ +int client_mempool_send_datum(client_mempool_t *mempool, dap_datum_mempool_t *datum_mempool) +{ + return client_mempool_send_request(mempool, datum_mempool, DAP_DATUM_MEMPOOL_ADD); +} + /** * datum check in mempool * - * return -1 not connected or error, 1 present in mempool, 0 absent in mempool + * return -1 not connected or error, 1 send packet OK */ int client_mempool_check_datum(client_mempool_t *mempool, dap_datum_mempool_t *datum_mempool) { - if(!mempool || !datum_mempool || mempool->state<CLIENT_MEMPOOL_CONNECTED) - return -1; - - const char * a_path = "mempool"; - const char *a_suburl = "mempool"; //"enc_init"; - const char* a_query = ""; - uint8_t action = DAP_DATUM_MEMPOOL_CHECK; - size_t a_request_size = 0; - uint8_t *a_request = dap_datum_mempool_serialize(datum_mempool, &a_request_size); - uint8_t *a_request_out = DAP_NEW_Z_SIZE(uint8_t, a_request_size * 2 + 1); // a_request + 1 byte for type action - *((uint8_t*) a_request_out) = action; - bin2hex(a_request_out + 1, a_request, a_request_size); - dap_client_request_enc(mempool->a_client, a_path, a_suburl, a_query, a_request_out, a_request_size * 2 + 1, - a_response_proc, a_response_error); - DAP_DELETE(a_request); - DAP_DELETE(a_request_out); - return 1; + return client_mempool_send_request(mempool, datum_mempool, DAP_DATUM_MEMPOOL_CHECK); } /** * datum delete from mempool + * + * return -1 not connected or error, 1 send packet OK */ int client_mempool_del_datum(client_mempool_t *mempool, dap_datum_mempool_t *datum_mempool) { - if(!mempool || !datum_mempool || mempool->state<CLIENT_MEMPOOL_CONNECTED) - return -1; - return 0; + return client_mempool_send_request(mempool, datum_mempool, DAP_DATUM_MEMPOOL_DEL); } diff --git a/client_mempool.h b/client_mempool.h index c90d645e901759ebde716b8d1bec9af8165c5eba..936c9d2162cb0d245155e882132a843da2613cd8 100644 --- a/client_mempool.h +++ b/client_mempool.h @@ -22,6 +22,10 @@ typedef struct client_mempool_t { dap_client_t *a_client; pthread_cond_t wait_cond; pthread_mutex_t wait_mutex; + struct readed_data{ + uint8_t *data; + int data_len; + }read_data_t; } client_mempool_t; int client_mempool_init(void); @@ -36,6 +40,11 @@ void client_mempool_close(client_mempool_t *mempool); */ int client_mempool_wait(client_mempool_t *mempool, int waited_state, int timeout_ms); +/** + * get read data from server + */ +uint8_t* client_mempool_read(client_mempool_t *mempool, int *data_len); + /** * datum add in mempool */ diff --git a/dap_chain_mempool.c b/dap_chain_mempool.c index 68cd702980351850e7db3a44b18bb6de09780dcb..8e64f464bcd4b5bbd86ad8b0af989b9140c1fa41 100644 --- a/dap_chain_mempool.c +++ b/dap_chain_mempool.c @@ -59,7 +59,7 @@ dap_datum_mempool_t * dap_datum_mempool_deserialize(uint8_t *datum_mempool_str_i shift_size += sizeof(uint16_t); memcpy(&(datum_mempool->datum_count), datum_mempool_str + shift_size, sizeof(uint16_t)); shift_size += sizeof(uint16_t); - datum_mempool->data = DAP_NEW_Z_SIZE(dap_chain_datum_t*, datum_mempool->datum_count); + datum_mempool->data = DAP_NEW_Z_SIZE(dap_chain_datum_t*, datum_mempool->datum_count * sizeof(dap_chain_datum_t*)); for(int i = 0; i < datum_mempool->datum_count; i++) { size_t size_one = 0; memcpy(&size_one, datum_mempool_str + shift_size, sizeof(uint16_t)); @@ -81,6 +81,8 @@ void dap_datum_mempool_clean(dap_datum_mempool_t *datum) for(int i = 0; i < datum->datum_count; i++) { DAP_DELETE(datum->data[i]); } + DAP_DELETE(datum->data); + datum->data = NULL; } void dap_datum_mempool_free(dap_datum_mempool_t *datum) @@ -178,15 +180,15 @@ static void enc_http_reply_encode_new(struct dap_http_simple *a_http_simple, dap DAP_ENC_DATA_TYPE_RAW); /*/ decode test - size_t l_response_dec_size_max = a_http_simple->reply_size ? a_http_simple->reply_size * 2 + 16 : 0; - char * l_response_dec = a_http_simple->reply_size ? DAP_NEW_Z_SIZE(char, l_response_dec_size_max) : NULL; - size_t l_response_dec_size = 0; - if(a_http_simple->reply_size) - l_response_dec_size = dap_enc_decode(a_http_delegate->key, - a_http_simple->reply, a_http_simple->reply_size, - l_response_dec, l_response_dec_size_max, - DAP_ENC_DATA_TYPE_RAW); - l_response_dec_size_max = 0;*/ + size_t l_response_dec_size_max = a_http_simple->reply_size ? a_http_simple->reply_size * 2 + 16 : 0; + char * l_response_dec = a_http_simple->reply_size ? DAP_NEW_Z_SIZE(char, l_response_dec_size_max) : NULL; + size_t l_response_dec_size = 0; + if(a_http_simple->reply_size) + l_response_dec_size = dap_enc_decode(a_http_delegate->key, + a_http_simple->reply, a_http_simple->reply_size, + l_response_dec, l_response_dec_size_max, + DAP_ENC_DATA_TYPE_RAW); + l_response_dec_size_max = 0;*/ } } @@ -199,9 +201,10 @@ static void enc_http_reply_encode_new(struct dap_http_simple *a_http_simple, dap void chain_mempool_proc(struct dap_http_simple *cl_st, void * arg) { http_status_code_t * return_code = (http_status_code_t*) arg; - dap_enc_key_t *key_tmp = dap_enc_ks_find_http(cl_st->http); - dap_enc_key_serealize_t *key_ser = dap_enc_key_serealize(key_tmp); - dap_enc_key_t *key = dap_enc_key_deserealize(key_ser, sizeof(dap_enc_key_serealize_t)); + // save key while it alive, i.e. still exist + dap_enc_key_t *key = dap_enc_ks_find_http(cl_st->http); + //dap_enc_key_serealize_t *key_ser = dap_enc_key_serealize(key_tmp); + //dap_enc_key_t *key = dap_enc_key_deserealize(key_ser, sizeof(dap_enc_key_serealize_t)); enc_http_delegate_t *dg = enc_http_request_decode(cl_st); if(dg) { @@ -221,43 +224,58 @@ void chain_mempool_proc(struct dap_http_simple *cl_st, void * arg) char *a_value; switch (action) { - case DAP_DATUM_MEMPOOL_ADD: + case DAP_DATUM_MEMPOOL_ADD: // add datum in base a_value = DAP_NEW_Z_SIZE(char, request_size * 2); bin2hex((char*) a_value, (const unsigned char*) request_str, request_size); if(dap_chain_global_db_set(a_key, a_value)) { *return_code = Http_Status_OK; } - log_it(L_NOTICE, "Insert hash: key=%s result:%s", a_key, + log_it(L_INFO, "Insert hash: key=%s result:%s", a_key, (*return_code == Http_Status_OK) ? "OK" : "False!"); DAP_DELETE(a_value); break; - case DAP_DATUM_MEMPOOL_CHECK: + + case DAP_DATUM_MEMPOOL_CHECK: // check datum in base strcpy(cl_st->reply_mime, "text/text"); char *str = dap_chain_global_db_get(a_key); if(str) { - *return_code = Http_Status_OK; dg->response = strdup("1"); - ; //cl_st->reply = strdup("1"); DAP_DELETE(str); + log_it(L_INFO, "Check hash: key=%s result: Present", a_key); } else - dg->response = strdup("0"); //cl_st->reply = strdup("0"); - dg->response_size = strlen(dg->response); //cl_st->reply_size = strlen(cl_st->reply); - //enc_http_reply_encode(cl_st, dg); + { + dg->response = strdup("0"); + log_it(L_INFO, "Check hash: key=%s result: Absent", a_key); + } + dg->response_size = strlen(dg->response); + *return_code = Http_Status_OK; enc_http_reply_encode_new(cl_st, key, dg); - log_it(L_NOTICE, "Check hash: key=%s result:%s", a_key, - (*return_code == Http_Status_OK) ? "Present" : "Absent"); break; - case DAP_DATUM_MEMPOOL_DEL: + + case DAP_DATUM_MEMPOOL_DEL: // delete datum in base + strcpy(cl_st->reply_mime, "text/text"); + if(dap_chain_global_db_del(a_key)) { + dg->response = strdup("1"); + DAP_DELETE(str); + log_it(L_INFO, "Delete hash: key=%s result: Ok", a_key); + } + else + { + dg->response = strdup("0"); + log_it(L_INFO, "Delete hash: key=%s result: False!", a_key); + } *return_code = Http_Status_OK; - log_it(L_NOTICE, "Delete hash: key=%s result:%s", a_key, - (*return_code == Http_Status_OK) ? "OK" : "False!"); + enc_http_reply_encode_new(cl_st, key, dg); break; - default: - log_it(L_NOTICE, "Unknown request=%d! key=%s", action, a_key); + + default: // unsupported command + log_it(L_INFO, "Unknown request=%d! key=%s", action, a_key); DAP_DELETE(a_key); enc_http_delegate_delete(dg); + if(key) + dap_enc_key_delete(key); return; } DAP_DELETE(a_key); @@ -272,6 +290,9 @@ void chain_mempool_proc(struct dap_http_simple *cl_st, void * arg) else { *return_code = Http_Status_Unauthorized; } + // no needed + //if(key) + // dap_enc_key_delete(key); } /**