From 8d0373871e7feace6581f2bded129f6d0ff8e5b7 Mon Sep 17 00:00:00 2001 From: Aleksandr Lysikov <lysikov@inbox.ru> Date: Fri, 18 Jan 2019 23:26:38 +0500 Subject: [PATCH] add request to mempool base --- client_mempool.c | 61 ++++++++++--------- client_mempool.h | 9 ++- dap_chain_mempool.c | 143 ++++++++++++++++++++++++++++++++++++++++++-- dap_chain_mempool.h | 26 +++++++- 4 files changed, 199 insertions(+), 40 deletions(-) diff --git a/client_mempool.c b/client_mempool.c index 9ddfd3e..5885e29 100644 --- a/client_mempool.c +++ b/client_mempool.c @@ -10,57 +10,53 @@ #include "dap_http_client_simple.h" #include "client_mempool.h" -// connection states -enum { - ERROR = -1, INIT, CONNECT, CONNECTED, SENDED, END -}; - -int dap_http_client_simple_wait(); - +// callback for dap_client_new() in client_mempool_connect() static void stage_status_callback(dap_client_t *a_client, void *a_arg) { printf("* stage_status_callback client=%x data=%x\n", a_client, a_arg); } - +// callback for dap_client_new() in client_mempool_connect() static void stage_status_error_callback(dap_client_t *a_client, void *a_arg) { printf("* tage_status_error_callback client=%x data=%x\n", a_client, a_arg); } -void a_response_proc(dap_client_t *a_client, void *str, size_t str_len) +// callback for dap_client_request_enc() in client_mempool_send_datum() +static void a_response_proc(dap_client_t *a_client, void *str, size_t str_len) { printf("a* _response_proc a_client=%x str=%x str_len=%d\n", a_client, str, str_len); client_mempool_t *mempool = a_client->_inheritor; assert(mempool); if(mempool) { pthread_mutex_lock(&mempool->wait_mutex); - mempool->state = SENDED; + mempool->state = CLIENT_MEMPOOL_SENDED; pthread_cond_signal(&mempool->wait_cond); pthread_mutex_unlock(&mempool->wait_mutex); } } -void a_response_error(dap_client_t *a_client, int val) +// callback for dap_client_request_enc() in client_mempool_send_datum() +static void a_response_error(dap_client_t *a_client, int val) { printf("* a_response_error a_client=%x val=%d\n", a_client, val); client_mempool_t *mempool = a_client->_inheritor; assert(mempool); if(mempool) { pthread_mutex_lock(&mempool->wait_mutex); - mempool->state = ERROR; + mempool->state = CLIENT_MEMPOOL_ERROR; pthread_cond_signal(&mempool->wait_cond); pthread_mutex_unlock(&mempool->wait_mutex); } } -// callback for the end of handshake +// callback for the end of handshake in dap_client_go_stage() / client_mempool_connect() static void a_stage_end_callback(dap_client_t *a_client, void *a_arg) { client_mempool_t *mempool = a_client->_inheritor; assert(mempool); if(mempool) { pthread_mutex_lock(&mempool->wait_mutex); - mempool->state = CONNECTED; + mempool->state = CLIENT_MEMPOOL_CONNECTED; pthread_cond_signal(&mempool->wait_cond); pthread_mutex_unlock(&mempool->wait_mutex); } @@ -84,7 +80,7 @@ client_mempool_t* client_mempool_connect(const char *addr) if(!addr || strlen(addr) < 1) return NULL; client_mempool_t *mempool = DAP_NEW_Z(client_mempool_t); - mempool->state = INIT; + mempool->state = CLIENT_MEMPOOL_INIT; pthread_condattr_t attr; pthread_condattr_init(&attr); pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); @@ -99,7 +95,7 @@ client_mempool_t* client_mempool_connect(const char *addr) l_client_internal->uplink_port = 8079; // TODO read from kelvin-node.cfg [server][listen_port_tcp] dap_client_stage_t a_stage_target = STAGE_ENC_INIT; - mempool->state = CONNECT; + mempool->state = CLIENT_MEMPOOL_CONNECT; // Handshake dap_client_go_stage(mempool->a_client, a_stage_target, a_stage_end_callback); return mempool; @@ -107,14 +103,21 @@ client_mempool_t* client_mempool_connect(const char *addr) /** * timeout_ms timeout in milliseconds + * waited_state state which we will wait, sample CLIENT_MEMPOOL_CONNECTED or CLIENT_MEMPOOL_SENDED * return -1 false, 0 timeout, 1 end of connection or sending data */ -int client_mempool_wait(client_mempool_t *mempool, int timeout_ms) +int client_mempool_wait(client_mempool_t *mempool, int waited_state, int timeout_ms) { int ret = -1; if(!mempool) return -1; pthread_mutex_lock(&mempool->wait_mutex); + // have waited + if(mempool->state == waited_state) { + pthread_mutex_unlock(&mempool->wait_mutex); + return 1; + } + // prepare for signal waiting struct timespec to; clock_gettime(CLOCK_MONOTONIC, &to); int64_t nsec_new = to.tv_nsec + timeout_ms * 1000000ll; @@ -125,6 +128,7 @@ int client_mempool_wait(client_mempool_t *mempool, int timeout_ms) } else to.tv_nsec = (long) nsec_new; + // signal waiting int wait = pthread_cond_timedwait(&mempool->wait_cond, &mempool->wait_mutex, &to); if(wait == 0) //0 ret = 1; @@ -148,24 +152,19 @@ void client_mempool_close(client_mempool_t *mempool) } } -int client_mempool_send_datum(client_mempool_t *mempool, dap_datum_mempool_t *datum) +int client_mempool_send_datum(client_mempool_t *mempool, dap_datum_mempool_t *datum_mempool) { - /* - void *a_request = "123"; - size_t a_request_size = 3*/ const char * a_path = "mempool"; const char *a_suburl = "mempool"; //"enc_init"; const char* a_query = ""; - size_t a_request_size = 0, shift_size = 0; - for(int i = 0; i < datum->datum_count; i++) { - a_request_size += _dap_chain_datum_data_size(datum->data[i]); - } - uint8_t *a_request = DAP_NEW_SIZE(uint8_t, a_request_size); - for(int i = 0; i < datum->datum_count; i++) { - memcpy(a_request + shift_size, datum->data[i], _dap_chain_datum_data_size(datum->data[i])); - } - dap_client_request_enc(mempool->a_client, a_path, a_suburl, a_query, a_request, a_request_size, + size_t a_request_size = 0; + uint8_t *a_request = dap_datum_mempool_serialize(datum_mempool, &a_request_size); + uint8_t *a_request_out = DAP_NEW_Z_SIZE(uint8_t, a_request_size * 2); + bin2hex(a_request_out, a_request, a_request_size); + //uint8_t *a_request = "1234567\089012345643634346i34itkreghrth"; + //size_t a_request_size = 20; + dap_client_request_enc(mempool->a_client, a_path, a_suburl, a_query, a_request_out, a_request_size*2, a_response_proc, a_response_error); - DAP_DELETE(a_request); + //DAP_DELETE(a_request); return 1; } diff --git a/client_mempool.h b/client_mempool.h index 862e8c6..972515e 100644 --- a/client_mempool.h +++ b/client_mempool.h @@ -4,6 +4,11 @@ #include "dap_client.h" #include "dap_chain_mempool.h" +// connection states +enum { + CLIENT_MEMPOOL_ERROR = -1, CLIENT_MEMPOOL_INIT, CLIENT_MEMPOOL_CONNECT, CLIENT_MEMPOOL_CONNECTED, CLIENT_MEMPOOL_SENDED, CLIENT_MEMPOOL_END +}; + // state for a client connection with mempool typedef struct client_mempool_t { int state; @@ -24,7 +29,7 @@ void client_mempool_close(client_mempool_t *mempool); * timeout_ms timeout in milliseconds * return -1 false, 0 timeout, 1 end of connection or sending data */ -int client_mempool_wait(client_mempool_t *mempool, int timeout_ms); +int client_mempool_wait(client_mempool_t *mempool, int waited_state, int timeout_ms); -int client_mempool_send_datum(client_mempool_t *mempool, dap_datum_mempool_t *datum); +int client_mempool_send_datum(client_mempool_t *mempool, dap_datum_mempool_t *datum_mempool); diff --git a/dap_chain_mempool.c b/dap_chain_mempool.c index eb256ee..9b2adff 100644 --- a/dap_chain_mempool.c +++ b/dap_chain_mempool.c @@ -1,22 +1,78 @@ +#include <stddef.h> #include <stdio.h> +#include <string.h> +#include <stdio.h> +#include <assert.h> +//#include <dap_http_simple.h> +//#include <http_status_code.h> #include "dap_common.h" +#include "dap_hash.h" #include "dap_http_client.h" #include "dap_http_simple.h" //#include "dap_enc_http.h" #include "dap_enc_http.h" //#include "dap_http.h" #include "http_status_code.h" +#include "dap_chain_common.h" #include "dap_chain_global_db.h" #include "dap_chain_mempool.h" #define FILE_MEMPOOL_DB "1.db" // TODO get from settings +uint8_t* dap_datum_mempool_serialize(dap_datum_mempool_t *datum_mempool, size_t *size) +{ + size_t a_request_size = 2 * sizeof(uint16_t), shift_size = 0; + for(int i = 0; i < datum_mempool->datum_count; i++) { + a_request_size += _dap_chain_datum_data_size(datum_mempool->data[i]) + sizeof(uint16_t); + } + uint8_t *a_request = DAP_NEW_SIZE(uint8_t, a_request_size); + memcpy(a_request + shift_size, &(datum_mempool->version), sizeof(uint16_t)); + shift_size += sizeof(uint16_t); + memcpy(a_request + shift_size, &(datum_mempool->datum_count), sizeof(uint16_t)); + shift_size += sizeof(uint16_t); + for(int i = 0; i < datum_mempool->datum_count; i++) { + size_t size_one = _dap_chain_datum_data_size(datum_mempool->data[i]); + memcpy(a_request + shift_size, &size_one, sizeof(uint16_t)); + shift_size += sizeof(uint16_t); + memcpy(a_request + shift_size, datum_mempool->data[i], size_one); + shift_size += size_one; + } + assert(shift_size == a_request_size); + if(size) + *size = a_request_size; + return a_request; +} + +dap_datum_mempool_t * dap_datum_mempool_deserialize(uint8_t *datum_mempool_str_in, size_t datum_mempool_size) +{ + size_t shift_size = 0; + uint8_t *datum_mempool_str = DAP_NEW_Z_SIZE(uint8_t, datum_mempool_size / 2 + 1); + datum_mempool_size = hex2bin(datum_mempool_str, datum_mempool_str_in, datum_mempool_size) / 2; + dap_datum_mempool_t *datum_mempool = DAP_NEW_Z(dap_datum_mempool_t); + memcpy(&(datum_mempool->version), datum_mempool_str + shift_size, sizeof(uint16_t)); + shift_size += sizeof(uint16_t); + memcpy(&(datum_mempool->datum_count), datum_mempool_str + shift_size, sizeof(uint16_t)); + shift_size += sizeof(uint16_t); + datum_mempool->data = DAP_NEW_Z_SIZE(dap_chain_datum_t*, datum_mempool->datum_count); + for(int i = 0; i < datum_mempool->datum_count; i++) { + size_t size_one = 0; + memcpy(&size_one, datum_mempool_str + shift_size, sizeof(uint16_t)); + shift_size += sizeof(uint16_t); + datum_mempool->data[i] = (dap_chain_datum_t*) DAP_NEW_Z_SIZE(uint8_t, size_one); + memcpy(datum_mempool->data[i], datum_mempool_str + shift_size, size_one); + shift_size += size_one; + datum_mempool->data[i]; + } + assert(shift_size == datum_mempool_size); + DAP_DELETE(datum_mempool_str); + return datum_mempool; +} void dap_datum_mempool_clean(dap_datum_mempool_t *datum) { if(!datum) return; - for(int i = 0; i < datum->datum_count; i++){ + for(int i = 0; i < datum->datum_count; i++) { DAP_DELETE(datum->data[i]); } } @@ -27,6 +83,70 @@ void dap_datum_mempool_free(dap_datum_mempool_t *datum) DAP_DELETE(datum); } +/** + * + */ +char* calc_datum_hash(const char *datum_str, size_t datum_size) +{ + dap_chain_hash_t a_hash; + dap_hash((char*) datum_str, datum_size, a_hash.raw, sizeof(a_hash.raw), DAP_HASH_TYPE_SLOW_0); + size_t a_str_max = sizeof(a_hash.raw) * 2; + char *a_str = DAP_NEW_Z_SIZE(char, a_str_max); + size_t hash_len = dap_chain_hash_to_str(&a_hash, a_str, a_str_max); + if(hash_len) { + DAP_DELETE(a_str); + return NULL; + } + return a_str; +} + +/** + * Convert binary data to binhex encoded data. + * + * out output buffer, must be twice the number of bytes to encode. + * len is the size of the data in the in[] buffer to encode. + * return the number of bytes encoded, or -1 on error. + */ +int bin2hex(char *out, const unsigned char *in, int len) +{ + int ct = len; + static char hex[] = "0123456789ABCDEF"; + if(!in || !out || len < 0) + return -1; + // hexadecimal lookup table + while(ct-- > 0) + { + *out++ = hex[*in >> 4]; + *out++ = hex[*in++ & 0x0F]; + } + return len; +} + +/** + * Convert binhex encoded data to binary data + * + * len is the size of the data in the in[] buffer to decode, and must be even. + * out outputbuffer must be at least half of "len" in size. + * The buffers in[] and out[] can be the same to allow in-place decoding. + * return the number of bytes encoded, or -1 on error. + */ +int hex2bin(char *out, const unsigned char *in, int len) +{ + // '0'-'9' = 0x30-0x39 + // 'a'-'f' = 0x61-0x66 + // 'A'-'F' = 0x41-0x46 + int ct = len; + if(!in || !out || len < 0 || len & 1) + return -1; + while(ct > 0) + { + char ch1 = ((*in >= 'a') ? (*in++ - 'a' + 10) : ((*in >= 'A') ? (*in++ - 'A' + 10) : (*in++ - '0'))) << 4; + char ch2 = ((*in >= 'a') ? (*in++ - 'a' + 10) : ((*in >= 'A') ? (*in++ - 'A' + 10) : (*in++ - '0'))); // ((*in >= 'A') ? (*in++ - 'A' + 10) : (*in++ - '0')); + *out++ = ch1 + ch2; + ct -= 2; + } + return len; +} /** * @brief @@ -43,11 +163,22 @@ void chain_mempool_proc(struct dap_http_simple *cl_st, void * arg) int request_size = dg->request_size; printf("!!***!!! chain_mempool_proc arg=%d url=%s str=%s len=%d\n", arg, url, request_str, request_size); if(request_str && request_size > 0) { - dap_datum_mempool_t *mempool = (dap_datum_mempool_t*)request_str; - const char *a_key = "";//TODO hash(mempool) - const char *a_value = "";// TODO mempool; - if(dap_chain_global_db_set(a_key, a_value)) - *return_code = Http_Status_OK; + dap_datum_mempool_t *datum_mempool = dap_datum_mempool_deserialize(request_str, (size_t) request_size); + if(datum_mempool) + { + dap_datum_mempool_free(datum_mempool); + char *a_key = calc_datum_hash(request_str, (size_t) request_size); + char *a_value = DAP_NEW_Z_SIZE(char, request_size * 2); + bin2hex((char*) a_value, (const unsigned char*) request_str, request_size); + if(dap_chain_global_db_set(a_key, a_value)) { + *return_code = Http_Status_OK; + DAP_DELETE(a_key); + DAP_DELETE(a_value); + return; + } + DAP_DELETE(a_key); + DAP_DELETE(a_value); + } else *return_code = Http_Status_InternalServerError; } diff --git a/dap_chain_mempool.h b/dap_chain_mempool.h index 2fbc199..2a9896f 100644 --- a/dap_chain_mempool.h +++ b/dap_chain_mempool.h @@ -14,14 +14,38 @@ }DAP_ALIGN_PACKED dap_datum_mempool_t; */ +#define DAP_DATUM_MEMPOOL_VERSION "01" + // datum mempool structure typedef struct dap_datum_mempool { - int16_t version; // structure version + uint16_t version; // structure version uint16_t datum_count;// datums count dap_chain_datum_t **data;// mass of datums }DAP_ALIGN_PACKED dap_datum_mempool_t; +uint8_t* dap_datum_mempool_serialize(dap_datum_mempool_t *datum_mempool, size_t *size); +dap_datum_mempool_t * dap_datum_mempool_deserialize(uint8_t *datum_mempool_str, size_t size); + void dap_datum_mempool_clean(dap_datum_mempool_t *datum); void dap_datum_mempool_free(dap_datum_mempool_t *datum); void dap_chain_mempool_add_proc(struct dap_http * sh, const char * url); + +/** + * Convert binary data to binhex encoded data. + * + * out output buffer, must be twice the number of bytes to encode. + * len is the size of the data in the in[] buffer to encode. + * return the number of bytes encoded, or -1 on error. + */ +int bin2hex(char *out, const unsigned char *in, int len); + +/** + * Convert binhex encoded data to binary data + * + * len is the size of the data in the in[] buffer to decode, and must be even. + * out outputbuffer must be at least half of "len" in size. + * The buffers in[] and out[] can be the same to allow in-place decoding. + * return the number of bytes encoded, or -1 on error. + */ +int hex2bin(char *out, const unsigned char *in, int len); -- GitLab