Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • dap/dap-sdk
1 result
Show changes
Commits on Source (18)
Showing
with 149 additions and 178 deletions
......@@ -20,8 +20,13 @@ tests:amd64.debian:
image: demlabs/debian/amd64:qt5
before_script: /opt/buildtools/prepare_environment.sh amd64-linux
script:
- pwd
- mkdir build
- cd build && cmake .. -DCMAKE_BUILD_TYPE=Release -DBUILD_DAP_SDK_TESTS=ON -DBUILD_WITH_ECDSA=ON && make -j$(nproc) && ctest --verbose
- cd build && cmake .. -DCMAKE_BUILD_TYPE=Release -DBUILD_DAP_SDK_TESTS=ON -DBUILD_WITH_ECDSA=ON && make -j$(nproc) && ctest --verbose && make cppcheck
- cd /opt/buildtools/ && python3 -m cppcheck_codequality --input-file=${CI_PROJECT_DIR}/build/cppcheck_results.xml --output-file=${CI_PROJECT_DIR}/build/cppcheck.json
artifacts:
reports:
codequality: build/cppcheck.json
tests:arm32.debian:
extends: .tests
......
......@@ -257,3 +257,28 @@ if ( CELLFRAME_NO_OPTIMIZATION)
set(DAP_CRYPTO_XKCP_PLAINC ON)
endif ()
FIND_PROGRAM(CPPCHECK "cppcheck")
IF(CPPCHECK)
# Set export commands on
message("[!] CPPCHECK FOUND, ADDED TARGET CPPCHECK")
SET(CMAKE_EXPORT_COMPILE_COMMANDS ON)
ADD_CUSTOM_TARGET(
cppcheck
COMMAND
${CPPCHECK} --enable=all --project=${CMAKE_BINARY_DIR}/compile_commands.json --std=c++11 --verbose --quiet
--xml-version=2 --language=c++ --suppress=missingIncludeSystem
--output-file=${CMAKE_BINARY_DIR}/cppcheck_results.xml ${CHECK_CXX_SOURCE_FILES}
COMMENT "Generate cppcheck report for the project")
FIND_PROGRAM(CPPCHECK_HTML "cppcheck-htmlreport")
IF(CPPCHECK_HTML)
ADD_CUSTOM_TARGET(
cppcheck-html
COMMAND ${CPPCHECK_HTML} --title=${CMAKE_PROJECT_NAME} --file=${CMAKE_BINARY_DIR}/cppcheck_results.xml
--report-dir=${CMAKE_BINARY_DIR}/cppcheck_results --source-dir=${CMAKE_SOURCE_DIR}
COMMENT "Convert cppcheck report to HTML output")
ADD_DEPENDENCIES(cppcheck-html cppcheck)
ENDIF()
ENDIF()
......@@ -196,16 +196,16 @@ static void print_it_stderr(unsigned a_off, const char *a_fmt, va_list va)
{
vfprintf(s_print_param, a_fmt, va);
}
*/
#if ANDROID
#include <android/log.h>
static void print_it_alog (unsigned a_off, const char *a_fmt, va_list va)
{
__android_log_vprint(ANDROID_LOG_INFO, s_print_param, a_fmt, va);
__android_log_vprint(ANDROID_LOG_INFO, "CellframeNodeNative", a_fmt, va);
}
#endif*/
#endif
void dap_log_set_external_output(LOGGER_EXTERNAL_OUTPUT output, void *param)
{
......@@ -230,10 +230,9 @@ void dap_log_set_external_output(LOGGER_EXTERNAL_OUTPUT output, void *param)
s_print_callback = print_it_none;
break;
#ifdef ANDROID
/*case LOGGER_OUTPUT_ALOG:
case LOGGER_OUTPUT_ALOG:
s_print_callback = print_it_alog;
s_print_param = param;
break;*/
break;
#endif
default:
......
......@@ -127,8 +127,13 @@ DAP_STATIC_INLINE int dap_chain_hash_fast_to_str(const dap_hash_fast_t *a_hash,
return DAP_CHAIN_HASH_FAST_STR_SIZE;
}
const char *dap_chain_hash_fast_to_str_static(const dap_hash_fast_t *a_hash);
DAP_STATIC_INLINE dap_hash_str_t dap_chain_hash_fast_to_hash_str(const dap_hash_fast_t *a_hash) {
dap_hash_str_t l_ret = { };
dap_chain_hash_fast_to_str(a_hash, l_ret.s, DAP_CHAIN_HASH_FAST_STR_SIZE);
return l_ret;
}
#define dap_chain_hash_fast_to_str_static(hash) dap_chain_hash_fast_to_hash_str(hash).s
#define dap_hash_fast_to_str dap_chain_hash_fast_to_str
#define dap_hash_fast_to_str_static dap_chain_hash_fast_to_str_static
......
......@@ -604,7 +604,7 @@ char *dap_cert_dump(dap_cert_t *a_cert)
const char *dap_cert_get_folder(int a_n_folder_path)
{
char **l_p = utarray_eltptr(s_cert_folders, (u_int)a_n_folder_path);
return *l_p;
return l_p ? *l_p : ( log_it(L_ERROR, "No default cert path, check \"ca_folders\" in cellframe-node.cfg"), NULL );
}
......
......@@ -78,10 +78,3 @@ int dap_chain_hash_fast_from_str( const char *a_hash_str, dap_chain_hash_fast_t
{
return dap_chain_hash_fast_from_hex_str(a_hash_str, a_hash) && dap_chain_hash_fast_from_base58_str(a_hash_str, a_hash);
}
const char *dap_chain_hash_fast_to_str_static(const dap_hash_fast_t *a_hash)
{
_Thread_local static char s_hash_str[DAP_HASH_FAST_STR_SIZE];
return dap_chain_hash_fast_to_str(a_hash, s_hash_str, sizeof(s_hash_str)) == DAP_CHAIN_HASH_FAST_STR_SIZE
? s_hash_str : NULL;
}
......@@ -34,10 +34,10 @@
#include "dap_app_cli.h"
#include "dap_app_cli_net.h"
#include "dap_app_cli_shell.h"
#ifdef DAP_OS_ANDROID
#include "dap_json_rpc_params.h"
#include "dap_json_rpc_request.h"
#ifdef DAP_OS_ANDROID
#include <android/log.h>
#include <jni.h>
static dap_config_t *cli_config;
......@@ -154,6 +154,28 @@ static int shell_reader_loop()
return 0;
}
char *dap_cli_exec(int argc, char **argv) {
dap_app_cli_cmd_state_t cmd = {
.cmd_name = (char*)argv[0],
.cmd_param_count = argc - 2,
.cmd_param = argc - 2 > 0 ? (char**)(argv + 1) : NULL
};
char *l_cmd_str = dap_app_cli_form_command(&cmd);
dap_json_rpc_params_t *params = dap_json_rpc_params_create();
dap_json_rpc_params_add_data(params, l_cmd_str, TYPE_PARAM_STRING);
DAP_DELETE(l_cmd_str);
dap_json_rpc_request_t *a_request = dap_json_rpc_request_creation(cmd.cmd_name, params, 0);
char *req_str = dap_json_rpc_request_to_json_string(a_request),
*res = dap_cli_cmd_exec(req_str);
dap_json_rpc_request_free(a_request);
return res;
}
#ifdef DAP_OS_ANDROID
JNIEXPORT jstring JNICALL Java_com_CellframeWallet_Node_cellframeNodeCliMain(JNIEnv *javaEnv, jobject __unused jobj, jobjectArray argvStr)
{
......
......@@ -121,9 +121,7 @@ dap_app_cli_connect_param_t dap_app_cli_connect()
if (l_addr) {
l_addrs[0] = NULL;
dap_config_get_item_str_path_array_free(l_addrs, &l_array_count);
#ifdef DAP_OS_WINDOWS
printf("Unix socket-based server is not yet implemented, consider localhost usage\n"); // TODO
return ~0;
#if defined(DAP_OS_WINDOWS) || defined(DAP_OS_ANDROID)
#else
if ( -1 == (l_socket = socket(AF_UNIX, SOCK_STREAM, 0)) ) {
printf ("socket() error %d", errno);
......
......@@ -42,6 +42,8 @@ typedef struct dap_app_cli_cmd_state {
extern "C" {
#endif
int dap_app_cli_main(const char *a_app_name, int argc, const char **argv);
char *dap_cli_exec(int argc, char **argv);
#ifdef __cplusplus
}
#endif
......@@ -1337,16 +1337,16 @@ static void s_stream_es_callback_read(dap_events_socket_t * a_es, void * arg)
l_client_pvt->stage_status = STAGE_STATUS_DONE;
s_stage_status_after(l_client_pvt);
dap_stream_data_proc_read(l_client_pvt->stream);
dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size);
size_t l_bytes_read = dap_stream_data_proc_read(l_client_pvt->stream);
dap_events_socket_shrink_buf_in(a_es, l_bytes_read);
}
}
}
}
break;
case STAGE_STREAM_STREAMING: { // if streaming - process data with stream processor
dap_stream_data_proc_read(l_client_pvt->stream);
dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size);
size_t l_bytes_read = dap_stream_data_proc_read(l_client_pvt->stream);
dap_events_socket_shrink_buf_in(a_es, l_bytes_read);
}
break;
default: {
......
#include "dap_json_rpc_response.h"
#define LOG_TAG "dap_json_rpc_response"
#define INDENTATION_LEVEL " "
dap_json_rpc_response_t *dap_json_rpc_response_init()
{
......@@ -231,7 +232,7 @@ void json_print_object(json_object *obj, int indent_level) {
case json_type_object: {
json_object_object_foreach(obj, key, val) {
for (int i = 0; i <= indent_level; i++) {
printf(" "); // indentation level
printf(INDENTATION_LEVEL); // indentation level
}
printf("%s: ", key);
json_print_value(val, key, indent_level + 1, false);
......@@ -242,10 +243,13 @@ void json_print_object(json_object *obj, int indent_level) {
case json_type_array: {
int length = json_object_array_length(obj);
for (int i = 0; i < length; i++) {
for (int j = 0; j <= indent_level; j++) {
printf(INDENTATION_LEVEL); // indentation level
}
json_object *item = json_object_array_get_idx(obj, i);
json_print_value(item, NULL, indent_level + 1, length - 1 - i);
printf("\n");
}
printf("\n");
break;
}
default:
......@@ -349,24 +353,8 @@ int dap_json_rpc_response_printf_result(dap_json_rpc_response_t* response, char
switch(json_print_commands(cmd_name)) {
case 1: json_print_for_tx_history(response); break; return 0;
// case 2: json_print_for_mempool_list(response); break; return 0;
default: {
int json_type = 0;
json_object_is_type(response->result_json_object, json_type);
if (json_type == json_type_array) { /* print json array */
int result_count = json_object_array_length(response->result_json_object);
if (result_count <= 0) {
printf("response json array length is 0\n");
return -3;
}
for (int i = 0; i < result_count; i++) {
struct json_object * json_obj_result = json_object_array_get_idx(response->result_json_object, i);
json_print_object(json_obj_result, 0);
printf("\n");
json_object_put(json_obj_result);
}
} else { /* print json */
json_print_object(response->result_json_object, 0);
}
default: {/* print json */
json_print_object(response->result_json_object, 0);
}
break;
}
......
......@@ -31,4 +31,7 @@ DAP_PRINTF_ATTR(2, 3) int dap_notify_server_send_f_inter(uint32_t a_worker_id, c
int dap_notify_server_send_mt(const char * a_data);
DAP_PRINTF_ATTR(1, 2) int dap_notify_server_send_f_mt(const char *a_format, ...);
typedef bool (*dap_notify_data_user_callback_t)(const char *data);
void dap_notify_data_set_user_callback(dap_notify_data_user_callback_t callback);
void dap_notify_srv_set_callback_new(dap_events_socket_callback_t);
\ No newline at end of file
......@@ -48,9 +48,15 @@ pthread_rwlock_t s_notify_server_clients_mutex = PTHREAD_RWLOCK_INITIALIZER;
static void s_notify_server_callback_queue(dap_events_socket_t * a_es, void * a_arg);
static void s_notify_server_callback_new(dap_events_socket_t * a_es, void * a_arg);
static void s_notify_server_callback_delete(dap_events_socket_t * a_es, void * a_arg);
static dap_notify_data_user_callback_t s_notify_data_user_callback = NULL;
dap_events_socket_callback_t s_notify_server_callback_new_ex = NULL;
void dap_notify_data_set_user_callback(dap_notify_data_user_callback_t callback)
{
s_notify_data_user_callback = callback;
}
void dap_notify_srv_set_callback_new(dap_events_socket_callback_t a_cb) {
s_notify_server_callback_new_ex = a_cb;
}
......@@ -144,6 +150,7 @@ int dap_notify_server_send_f_inter(uint32_t a_worker_id, const char * a_format,.
*/
int dap_notify_server_send_mt(const char *a_data)
{
if (s_notify_data_user_callback) s_notify_data_user_callback(a_data);
if(!s_notify_server_queue) // If not initialized - nothing to notify
return 0;
return dap_events_socket_queue_ptr_send(s_notify_server_queue, dap_strdup(a_data));
......@@ -157,8 +164,9 @@ int dap_notify_server_send_mt(const char *a_data)
*/
int dap_notify_server_send_f_mt(const char *a_format, ...)
{
if(!s_notify_server_queue) // If not initialized - nothing to notify
if (!s_notify_data_user_callback && s_notify_server_queue)
return 0;
va_list ap, ap_copy;
va_start(ap, a_format);
va_copy(ap_copy, ap);
......@@ -178,6 +186,13 @@ int dap_notify_server_send_f_mt(const char *a_format, ...)
}
vsprintf(l_str, a_format, ap_copy);
va_end(ap_copy);
if (s_notify_data_user_callback) s_notify_data_user_callback(l_str);
if(!s_notify_server_queue) // If not initialized - nothing to notify
return 0;
int l_ret = dap_events_socket_queue_ptr_send(s_notify_server_queue, l_str);
DAP_DELETE(l_str);
return l_ret;
......
......@@ -183,13 +183,18 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg)
HASH_VALUE(l_payload_hash, sizeof(dap_hash_t), l_hash_value);
pthread_rwlock_wrlock(&s_gossip_lock);
HASH_FIND_BYHASHVALUE(hh, s_gossip_last_msgs, l_ch_pkt->data, sizeof(dap_hash_t), l_hash_value, l_msg_item);
if (l_msg_item && l_msg_item->with_payload && l_ch_pkt->hdr.type == DAP_STREAM_CH_GOSSIP_MSG_TYPE_REQUEST) {
debug_if(s_debug_more, L_INFO, "OUT: GOSSIP_DATA packet for hash %s", dap_hash_fast_to_str_static((dap_hash_fast_t *)&l_ch_pkt->data));
// Send data associated with this hash by request
dap_gossip_msg_t *l_msg = (dap_gossip_msg_t *)l_msg_item->message;
dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_GOSSIP_MSG_TYPE_DATA, l_msg, dap_gossip_msg_get_size(l_msg));
}
if (!l_msg_item && l_ch_pkt->hdr.type == DAP_STREAM_CH_GOSSIP_MSG_TYPE_HASH) {
if (l_msg_item) {
if (l_msg_item->timestamp < dap_nanotime_now() - DAP_GOSSIP_LIFETIME * 1000000000UL) {
debug_if(s_debug_more, L_INFO, "Packet for hash %s is derelict", dap_hash_fast_to_str_static(l_payload_hash));
HASH_DEL(s_gossip_last_msgs, l_msg_item);
DAP_DELETE(l_msg_item);
} else if (l_msg_item->with_payload && l_ch_pkt->hdr.type == DAP_STREAM_CH_GOSSIP_MSG_TYPE_REQUEST) {
debug_if(s_debug_more, L_INFO, "OUT: GOSSIP_DATA packet for hash %s", dap_hash_fast_to_str_static((dap_hash_fast_t *)&l_ch_pkt->data));
// Send data associated with this hash by request
dap_gossip_msg_t *l_msg = (dap_gossip_msg_t *)l_msg_item->message;
dap_stream_ch_pkt_write_unsafe(a_ch, DAP_STREAM_CH_GOSSIP_MSG_TYPE_DATA, l_msg, dap_gossip_msg_get_size(l_msg));
}
} else if (l_ch_pkt->hdr.type == DAP_STREAM_CH_GOSSIP_MSG_TYPE_HASH) {
struct gossip_msg_item *l_item_new = DAP_NEW_Z(struct gossip_msg_item);
if (!l_item_new) {
log_it(L_CRITICAL, "%s", c_error_memory_alloc);
......@@ -242,6 +247,12 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg)
pthread_rwlock_unlock(&s_gossip_lock);
break;
}
if (l_payload_item->timestamp < dap_nanotime_now() - DAP_GOSSIP_LIFETIME * 1000000000UL) {
HASH_DEL(s_gossip_last_msgs, l_payload_item);
DAP_DELETE(l_payload_item);
pthread_rwlock_unlock(&s_gossip_lock);
break;
}
dap_cluster_t *l_links_cluster = dap_cluster_find(l_msg->cluster_id);
if (l_links_cluster) {
dap_cluster_member_t *l_check = dap_cluster_member_find_unsafe(l_links_cluster, &a_ch->stream->node);
......@@ -283,7 +294,6 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg)
*(dap_stream_node_addr_t *)(l_msg_new->trace_n_payload + l_msg->trace_len) = g_node_addr;
memcpy(l_msg_new->trace_n_payload + l_msg_new->trace_len, l_msg->trace_n_payload + l_msg->trace_len, l_msg->payload_len);
HASH_ADD_BYHASHVALUE(hh, s_gossip_last_msgs, payload_hash, sizeof(dap_hash_t), l_hash_value, l_payload_item);
pthread_rwlock_unlock(&s_gossip_lock);
// Broadcast new message
debug_if(s_debug_more, L_INFO, "OUT: GOSSIP_HASH broadcast for hash %s",
dap_hash_fast_to_str_static(&l_msg_new->payload_hash));
......@@ -292,6 +302,7 @@ static bool s_stream_ch_packet_in(dap_stream_ch_t *a_ch, void *a_arg)
&l_msg_new->payload_hash, sizeof(dap_hash_t),
(dap_stream_node_addr_t *)l_msg_new->trace_n_payload,
l_msg_new->trace_len / sizeof(dap_stream_node_addr_t));
pthread_rwlock_unlock(&s_gossip_lock);
// Call back the payload func if any
struct gossip_callback *l_callback = s_get_callbacks_by_ch_id(l_msg->payload_ch_id);
if (!l_callback) {
......
......@@ -81,7 +81,7 @@ static dap_enc_key_type_t s_stream_get_preferred_encryption_type = DAP_ENC_KEY
static int s_add_stream_info(authorized_stream_t **a_hash_table, authorized_stream_t *a_item, dap_stream_t *a_stream);
static void s_stream_proc_pkt_in(dap_stream_t * a_stream, dap_stream_pkt_t *l_pkt, size_t l_pkt_size);
static void s_stream_proc_pkt_in(dap_stream_t * a_stream, dap_stream_pkt_t *l_pkt);
// Callbacks for HTTP client
static void s_http_client_headers_read(dap_http_client_t * a_http_client, void * a_arg); // Prepare stream when all headers are read
......@@ -429,7 +429,6 @@ void dap_stream_delete_unsafe(dap_stream_t *a_stream)
s_stream_delete_from_list(a_stream);
DAP_DEL_Z(a_stream->buf_fragments);
DAP_DEL_Z(a_stream->pkt_buf_in);
DAP_DELETE(a_stream);
log_it(L_NOTICE,"Stream connection is over");
}
......@@ -676,96 +675,38 @@ static void s_http_client_delete(dap_http_client_t * a_http_client, void *a_arg)
size_t dap_stream_data_proc_read (dap_stream_t *a_stream)
{
dap_return_val_if_fail(a_stream && a_stream->esocket && a_stream->esocket->buf_in, 0);
byte_t *l_buf_in = a_stream->esocket->buf_in;
size_t l_buf_in_size = a_stream->esocket->buf_in_size;
// Save the received data to stream memory
if (!a_stream->pkt_buf_in) {
a_stream->pkt_buf_in = DAP_DUP_SIZE(l_buf_in, l_buf_in_size);
a_stream->pkt_buf_in_data_size = l_buf_in_size;
} else {
debug_if(s_dump_packet_headers, L_DEBUG, "dap_stream_data_proc_read() Receive previously unprocessed data %zu bytes + new %zu bytes",
a_stream->pkt_buf_in_data_size, l_buf_in_size);
// The current data is added to rest of the previous package
a_stream->pkt_buf_in = DAP_REALLOC(a_stream->pkt_buf_in, a_stream->pkt_buf_in_data_size + l_buf_in_size);
memcpy((byte_t*)a_stream->pkt_buf_in + a_stream->pkt_buf_in_data_size, l_buf_in, l_buf_in_size);
// Increase the size of pkt_buf_in
a_stream->pkt_buf_in_data_size += l_buf_in_size;
}
// Switch to stream memory
l_buf_in = (byte_t*) a_stream->pkt_buf_in;
l_buf_in_size = a_stream->pkt_buf_in_data_size;
size_t l_buf_in_left = l_buf_in_size;
dap_stream_pkt_t *l_pkt = NULL;
if(l_buf_in_left >= sizeof(dap_stream_pkt_hdr_t)) {
// Now lets see how many packets we have in buffer now
while(l_buf_in_left > 0 && (l_pkt = dap_stream_pkt_detect(l_buf_in, l_buf_in_left))) { // Packet signature detected
if(l_pkt->hdr.size > DAP_STREAM_PKT_SIZE_MAX) {
log_it(L_ERROR, "dap_stream_data_proc_read() Too big packet size %u, drop %zu bytes", l_pkt->hdr.size, l_buf_in_left);
// Skip this packet
l_buf_in_left = 0;
break;
}
size_t l_pkt_offset = (((uint8_t*) l_pkt) - l_buf_in);
l_buf_in += l_pkt_offset;
l_buf_in_left -= l_pkt_offset;
size_t l_pkt_size = l_pkt->hdr.size + sizeof(dap_stream_pkt_hdr_t);
//log_it(L_DEBUG, "read packet offset=%zu size=%zu buf_in_left=%zu)",l_pkt_offset, l_pkt_size, l_buf_in_left);
// Got the whole package
if(l_buf_in_left >= l_pkt_size) {
// Process data
s_stream_proc_pkt_in(a_stream, (dap_stream_pkt_t*) l_pkt, l_pkt_size);
// Go to the next data
l_buf_in += l_pkt_size;
l_buf_in_left -= l_pkt_size;
} else {
debug_if(s_dump_packet_headers,L_DEBUG, "Input: Not all stream packet in input (pkt_size=%zu buf_in_left=%zu)", l_pkt_size, l_buf_in_left);
byte_t *l_pos = a_stream->esocket->buf_in, *l_end = l_pos + a_stream->esocket->buf_in_size;
size_t l_shift = 0, l_processed_size = 0;
while ( l_pos < l_end && (l_pos = memchr( l_pos, c_dap_stream_sig[0], (size_t)(l_end - l_pos))) ) {
if ( (size_t)(l_end - l_pos) < sizeof(dap_stream_pkt_hdr_t) )
break;
if ( !memcmp(l_pos, c_dap_stream_sig, sizeof(c_dap_stream_sig)) ) {
dap_stream_pkt_t *l_pkt = (dap_stream_pkt_t*)l_pos;
if (l_pkt->hdr.size > DAP_STREAM_PKT_SIZE_MAX) {
log_it(L_ERROR, "Invalid packet size %lu, dump it", l_pkt->hdr.size);
l_shift = sizeof(dap_stream_pkt_hdr_t);
} else if ( (l_shift = sizeof(dap_stream_pkt_hdr_t) + l_pkt->hdr.size) <= (size_t)(l_end - l_pos) ) {
debug_if(s_dump_packet_headers, L_DEBUG, "Processing full packet, size %lu", l_shift);
s_stream_proc_pkt_in(a_stream, l_pkt);
} else
break;
}
}
}
if(l_buf_in_left > 0) {
// Save the received data to stream memory for the next piece of data
if(!l_pkt) {
// pkt header not found, maybe l_buf_in_left is too small to detect pkt header, will do that next time
l_pkt = (dap_stream_pkt_t*) l_buf_in;
debug_if(s_dump_packet_headers, L_DEBUG, "dap_stream_data_proc_read() left unprocessed data %zu bytes, l_pkt=0", l_buf_in_left);
}
if(l_pkt) {
a_stream->pkt_buf_in_data_size = l_buf_in_left;
if(l_pkt != a_stream->pkt_buf_in){
memmove(a_stream->pkt_buf_in, l_pkt, a_stream->pkt_buf_in_data_size);
//log_it(L_DEBUG, "dap_stream_data_proc_read() l_pkt=%zu != a_stream->pkt_buf_in=%zu", l_pkt, a_stream->pkt_buf_in);
}
debug_if(s_dump_packet_headers,L_DEBUG, "dap_stream_data_proc_read() left unprocessed data %zu bytes", l_buf_in_left);
}
else {
log_it(L_ERROR, "dap_stream_data_proc_read() pkt header not found, drop %zu bytes", l_buf_in_left);
DAP_DEL_Z(a_stream->pkt_buf_in);
a_stream->pkt_buf_in_data_size = 0;
}
}
else {
DAP_DEL_Z(a_stream->pkt_buf_in);
a_stream->pkt_buf_in_data_size = 0;
l_pos += l_shift;
l_processed_size += l_shift;
} else
++l_pos;
}
return a_stream->esocket->buf_in_size; //a_stream->conn->buf_in_size;
debug_if( s_dump_packet_headers && l_processed_size, L_DEBUG, "Processed %lu / %lu bytes",
l_processed_size, (size_t)(l_end - a_stream->esocket->buf_in) );
return l_processed_size;
}
/**
* @brief stream_proc_pkt_in
* @param sid
*/
static void s_stream_proc_pkt_in(dap_stream_t * a_stream, dap_stream_pkt_t *a_pkt, size_t a_pkt_size)
static void s_stream_proc_pkt_in(dap_stream_t * a_stream, dap_stream_pkt_t *a_pkt)
{
size_t a_pkt_size = sizeof(dap_stream_pkt_hdr_t) + a_pkt->hdr.size;
bool l_is_clean_fragments = false;
a_stream->is_active = true;
......
......@@ -33,37 +33,6 @@
const uint8_t c_dap_stream_sig [STREAM_PKT_SIG_SIZE] = {0xa0,0x95,0x96,0xa9,0x9e,0x5c,0xfb,0xfa};
dap_stream_pkt_t * dap_stream_pkt_detect(void * a_data, size_t data_size)
{
uint8_t * sig_start=(uint8_t*) a_data;
dap_stream_pkt_t * hpkt = NULL;
size_t length_left = data_size;
while ( (sig_start = memchr(sig_start, c_dap_stream_sig[0], length_left)) ) {
length_left = data_size - (size_t)(sig_start - (uint8_t *)a_data);
if(length_left < sizeof(c_dap_stream_sig) )
break;
if ( !memcmp(sig_start, c_dap_stream_sig, sizeof(c_dap_stream_sig)) ) {
hpkt = (dap_stream_pkt_t *)sig_start;
if (length_left < sizeof(dap_stream_ch_pkt_hdr_t)) {
//log_it(L_ERROR, "Too small packet size %zu", length_left); // it's not an error, just random case
hpkt = NULL;
break;
}
if(hpkt->hdr.size > DAP_STREAM_PKT_SIZE_MAX ){
log_it(L_ERROR, "Too big packet size %u (%#x), type:%d(%#x)",
hpkt->hdr.size, hpkt->hdr.size, hpkt->hdr.type, hpkt->hdr.type);
hpkt = NULL;
}
break;
} else
sig_start++;
}
return hpkt;
}
/**
* @brief stream_pkt_read
* @param sid
......@@ -94,10 +63,8 @@ size_t dap_stream_pkt_write_unsafe(dap_stream_t *a_stream, uint8_t a_type, const
dap_stream_pkt_hdr_t *l_pkt_hdr = (dap_stream_pkt_hdr_t*)s_pkt_buf;
*l_pkt_hdr = (dap_stream_pkt_hdr_t) { .size = dap_enc_code( l_key, a_data, a_data_size, s_pkt_buf + sizeof(*l_pkt_hdr),
l_full_size - sizeof(*l_pkt_hdr), DAP_ENC_DATA_TYPE_RAW ),
.type = a_type,
.timestamp = dap_time_now(),
.src_addr = g_node_addr.uint64,
.dst_addr = a_stream->node.uint64 };
.timestamp = dap_time_now(), .type = a_type,
.src_addr = g_node_addr.uint64, .dst_addr = a_stream->node.uint64 };
memcpy(l_pkt_hdr->sig, c_dap_stream_sig, sizeof(l_pkt_hdr->sig));
return dap_events_socket_write_unsafe(a_stream->esocket, s_pkt_buf, l_full_size);
}
......
......@@ -60,11 +60,6 @@ typedef struct dap_stream {
char *service_key;
bool is_client_to_uplink;
struct dap_stream_pkt *in_pkt;
struct dap_stream_pkt *pkt_buf_in;
size_t pkt_buf_in_data_size;
size_t pkt_buf_in_size_expected;
uint8_t *buf_fragments, *pkt_cache;
size_t buf_fragments_size_total;// Full size of all fragments
size_t buf_fragments_size_filled;// Received size
......
......@@ -9,4 +9,6 @@ build_dependencies:
- 'xsltproc'
- 'curl'
- 'jq'
- 'cppcheck'
- 'python3-xmltodict'
\ No newline at end of file