From 84a740dd4b8b2d0eea42cb889c755284d40f3f15 Mon Sep 17 00:00:00 2001 From: Aleksandr Lysikov <45232000+sunny-gh@users.noreply.github.com> Date: Mon, 25 Mar 2019 13:49:01 +0500 Subject: [PATCH] Feature 2198 (#4) Fix stream interaction --- .gitignore | 3 + .gitmodules | 6 + CMakeLists.txt | 5 +- dap_client.c | 42 +++ dap_client.h | 10 +- dap_client_pvt.c | 786 +++++++++++++++++++++++++++------------------ libdap | 2 +- libdap-server | 2 +- libdap-server-core | 2 +- libdap-server-udp | 1 + libdap-stream | 2 +- libdap-stream-ch | 2 +- test/libdap-test | 2 +- 13 files changed, 548 insertions(+), 317 deletions(-) create mode 160000 libdap-server-udp diff --git a/.gitignore b/.gitignore index c6127b3..682ea68 100644 --- a/.gitignore +++ b/.gitignore @@ -50,3 +50,6 @@ modules.order Module.symvers Mkfile.old dkms.conf +/build/ +/.cproject +/.project diff --git a/.gitmodules b/.gitmodules index 511a70b..8c451bb 100644 --- a/.gitmodules +++ b/.gitmodules @@ -16,3 +16,9 @@ [submodule "test/libdap-test"] path = test/libdap-test url = https://github.com/kelvinblockchain/libdap-test +[submodule "libdap-stream-ch-chain"] + path = libdap-stream-ch-chain + url = https://github.com/kelvinblockchain//libdap-stream-ch-chain +[submodule "libdap-server-udp"] + path = libdap-server-udp + url = https://github.com/kelvinblockchain/libdap-server-udp diff --git a/CMakeLists.txt b/CMakeLists.txt index 83e712a..7acd042 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,11 +2,12 @@ cmake_minimum_required(VERSION 3.0) project (dap_client) set(CMAKE_C_STANDARD 11) - if(NOT (${SUBMODULES_NO_BUILD} MATCHES ON)) + set(SUBMODULES_NO_BUILD ON) add_subdirectory(libdap) add_subdirectory(libdap-server-core) add_subdirectory(libdap-server) + add_subdirectory(libdap-server-udp) add_subdirectory(libdap-stream) add_subdirectory(libdap-stream-ch) @@ -20,7 +21,7 @@ file(GLOB DAP_CLIENT_HEADERS *.h) add_library(${PROJECT_NAME} STATIC ${DAP_CLIENT_SRCS} ${DAP_CLIENT_HEADERS}) -target_link_libraries(${PROJECT_NAME} dap_crypto dap_core dap_http_server dap_session dap_stream curl) +target_link_libraries(${PROJECT_NAME} dap_crypto dap_core dap_http_server dap_session dap_stream dap_stream_ch curl) target_include_directories(${PROJECT_NAME} INTERFACE .) diff --git a/dap_client.c b/dap_client.c index a86f135..79494b1 100644 --- a/dap_client.c +++ b/dap_client.c @@ -8,6 +8,7 @@ #include "dap_http_client.h" #include "dap_client.h" #include "dap_client_pvt.h" +#include "dap_stream_ch_proc.h" @@ -216,6 +217,11 @@ void m_stage_fsm_operator(dap_client_t * a_client, void * a_arg) { (void *) a_arg; dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); + if(!l_client_internal){ + log_it(L_ERROR, "FSM Op: l_client_internal is NULL!"); + return; + } + if (l_client_internal->stage_target == l_client_internal->stage){ log_it(L_WARNING, "FSM Op: current stage %s is same as target one, nothing to do", dap_client_stage_str( l_client_internal->stage ) ); @@ -254,6 +260,12 @@ void dap_client_request_enc(dap_client_t * a_client, const char * a_path, const dap_client_pvt_request_enc(l_client_internal, a_path, a_suburl, a_query,a_request,a_request_size, a_response_proc,a_response_error); } +void dap_client_request(dap_client_t * a_client, const char * a_full_path, void * a_request, size_t a_request_size, + dap_client_callback_data_size_t a_response_proc, dap_client_callback_int_t a_response_error ) +{ + dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); + dap_client_pvt_request(l_client_internal, a_full_path, a_request, a_request_size, a_response_proc, a_response_error); +} /** @@ -342,6 +354,7 @@ const char * dap_client_stage_str(dap_client_stage_t a_stage) case STAGE_ENC_INIT: return "ENC"; case STAGE_STREAM_CTL: return "STREAM_CTL"; case STAGE_STREAM_SESSION: return "STREAM_SESSION"; + case STAGE_STREAM_CONNECTED: return "STREAM_CONNECTED"; case STAGE_STREAM_STREAMING: return "STREAM"; default: return "UNDEFINED"; } @@ -366,6 +379,32 @@ dap_enc_key_t * dap_client_get_key_stream(dap_client_t * a_client){ } +/** + * @brief dap_client_get_stream + * @param a_client + * @return + */ +dap_stream_t * dap_client_get_stream(dap_client_t * a_client) +{ + dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); + return (l_client_internal) ? l_client_internal->stream : NULL; +} + +dap_stream_ch_t * dap_client_get_stream_ch(dap_client_t * a_client, uint8_t a_ch_id) +{ + dap_stream_ch_t * l_ch = NULL; + dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); + if(l_client_internal && l_client_internal->stream) + for(int i = 0; i < l_client_internal->stream->channel_count; i++) { + dap_stream_ch_proc_t *l_ch_id = l_client_internal->stream->channel[i]->proc; + if(l_client_internal->stream->channel[i]->proc->id == a_ch_id) { + l_ch = l_client_internal->stream->channel[i]; + break; + } + } + return l_ch; +} + /** * @brief dap_client_get_stream_id * @param a_client @@ -373,5 +412,8 @@ dap_enc_key_t * dap_client_get_key_stream(dap_client_t * a_client){ */ const char * dap_client_get_stream_id(dap_client_t * a_client) { + dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); + if(!l_client_internal) + return NULL; return DAP_CLIENT_PVT(a_client)->stream_id; } diff --git a/dap_client.h b/dap_client.h index 2d44d6b..9d79be1 100644 --- a/dap_client.h +++ b/dap_client.h @@ -26,6 +26,9 @@ #include <stdint.h> #include "dap_enc_key.h" #include "dap_events.h" +#include "dap_stream.h" +#include "dap_stream_ch.h" + /** * @brief The dap_client_stage enum. Top level of client's state machine **/ @@ -77,7 +80,7 @@ typedef void (*dap_client_callback_data_size_t) (dap_client_t *, void *, size_t) #define DAP_UPLINK_PATH_ENC_INIT "enc_init" //"1901248124123459" #define DAP_UPLINK_PATH_DB "01094787531354" #define DAP_UPLINK_PATH_STREAM_CTL "stream_ctl" //"091348758013553" -#define DAP_UPLINK_PATH_STREAM "874751843144" +#define DAP_UPLINK_PATH_STREAM "stream" //"874751843144" #define DAP_UPLINK_PATH_LICENSE "license" #define DAP_UPLINK_PATH_SERVER_LIST "slist" @@ -108,6 +111,9 @@ void dap_client_reset(dap_client_t * a_client); void dap_client_request_enc(dap_client_t * a_client, const char * a_path,const char * a_suburl,const char* a_query, void * a_request, size_t a_request_size, dap_client_callback_data_size_t a_response_proc, dap_client_callback_int_t a_response_error); +void dap_client_request(dap_client_t * a_client, const char * a_full_path, void * a_request, size_t a_request_size, + dap_client_callback_data_size_t a_response_proc, dap_client_callback_int_t a_response_error); + const char * dap_client_get_stage_str(dap_client_t * a_client); const char * dap_client_stage_str(dap_client_stage_t a_stage); @@ -118,6 +124,8 @@ const char * dap_client_error_str(dap_client_error_t a_client_error); const char * dap_client_get_error_str(dap_client_t * a_client); const char * dap_client_get_auth_cookie(dap_client_t * a_client); +dap_stream_t * dap_client_get_stream(dap_client_t * a_client); +dap_stream_ch_t * dap_client_get_stream_ch(dap_client_t * a_client, uint8_t a_ch_id); const char * dap_client_get_stream_id(dap_client_t * a_client); dap_client_stage_t dap_client_get_stage(dap_client_t * a_client); diff --git a/dap_client_pvt.c b/dap_client_pvt.c index d9fe04e..1d46d30 100644 --- a/dap_client_pvt.c +++ b/dap_client_pvt.c @@ -26,8 +26,11 @@ #include <stdio.h> #include <string.h> +#include <stdbool.h> #include <assert.h> #include <json-c/json.h> +#include <unistd.h> // for close +#include <fcntl.h> #include "dap_enc_key.h" #include "dap_enc_base64.h" @@ -41,6 +44,8 @@ #include "dap_stream_ch.h" #include "dap_stream_ch_proc.h" #include "dap_stream_ch_pkt.h" +#include "dap_stream_pkt.h" +#include "dap_http_client.h" #define LOG_TAG "dap_client_pvt" @@ -60,20 +65,20 @@ void m_stream_ctl_response(dap_client_t *, void *, size_t); void m_stream_ctl_error(dap_client_t *, int); void m_stage_stream_streaming (dap_client_t * a_client, void* arg); +// STREAM stage callbacks +void m_stream_response(dap_client_t *, void *, size_t); +void m_stream_error(dap_client_t *, int); - -void m_request_response(void * a_response,size_t a_response_size,void * a_obj); -void m_request_error(int,void *); +void m_request_response(void * a_response, size_t a_response_size, void * a_obj); +void m_request_error(int, void *); // stream callbacks void m_es_stream_delete(dap_events_socket_t * a_es, void * arg); void m_es_stream_read(dap_events_socket_t * a_es, void * arg); void m_es_stream_write(dap_events_socket_t * a_es, void * arg); -void m_es_stream_error(dap_events_socket_t * a_es, void * arg) -{ - // TODO function is used, need to implement it!!! -} +void m_es_stream_error(dap_events_socket_t * a_es, void * arg); + /** * @brief dap_client_internal_init * @return @@ -101,8 +106,6 @@ void dap_client_pvt_new(dap_client_pvt_t * a_client_internal) a_client_internal->uplink_protocol_version = DAP_PROTOCOL_VERSION; } - - /** * @brief dap_client_pvt_delete * @param a_client_pvt @@ -120,6 +123,26 @@ void dap_client_pvt_delete(dap_client_pvt_t * a_client_pvt) } +/** + * Make socket non-blocking / blocking + * is_nonblock - (true) non-blocking / (false) blocking + */ +static void s_set_sock_nonblock(int sockfd, bool is_nonblock) +{ +// for Windows +#ifdef _WIN32 + unsigned long arg = is_nonblock; + ioctlsocket((SOCKET)sockfd, FIONBIO, &arg); +// for Unix-like OS +#else + int arg = fcntl(sockfd, F_GETFL, NULL); + if(is_nonblock) + arg |= O_NONBLOCK; + else + arg |= ~O_NONBLOCK; + fcntl(sockfd, F_SETFL, arg); +#endif +} /** * @brief s_client_internal_stage_status_proc @@ -127,123 +150,196 @@ void dap_client_pvt_delete(dap_client_pvt_t * a_client_pvt) */ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) { - switch(a_client_pvt->stage_status){ - case STAGE_STATUS_IN_PROGRESS:{ - switch( a_client_pvt->stage){ - case STAGE_ENC_INIT:{ - log_it(L_INFO,"Go to stage ENC: prepare the request"); - - a_client_pvt->session_key_open = dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_MSRLN, - NULL,0, - NULL,0, - 0); - - - size_t l_key_str_size_max = DAP_ENC_BASE64_ENCODE_SIZE(a_client_pvt->session_key_open->pub_key_data_size); - char *l_key_str= DAP_NEW_Z_SIZE(char,l_key_str_size_max+1); - // DAP_ENC_DATA_TYPE_B64_URLSAFE not need because send it by POST request - size_t l_key_str_enc_size = dap_enc_base64_encode(a_client_pvt->session_key_open->pub_key_data, - a_client_pvt->session_key_open->pub_key_data_size, - l_key_str,DAP_ENC_DATA_TYPE_B64); - - log_it(L_DEBUG,"ENC request size %u",l_key_str_enc_size); - dap_client_pvt_request( a_client_pvt, DAP_UPLINK_PATH_ENC_INIT "/gd4y5yh78w42aaagh", - l_key_str,l_key_str_size_max, m_enc_init_response, m_enc_init_error ); - DAP_DELETE(l_key_str); - }break; - case STAGE_STREAM_CTL:{ - log_it(L_INFO,"Go to stage STREAM_CTL: prepare the request"); - - size_t l_request_size = 10; - char *l_request = DAP_NEW_Z_SIZE (char,l_request_size) ; - snprintf(l_request, l_request_size,"%d", DAP_CLIENT_PROTOCOL_VERSION); - log_it(L_DEBUG,"STREAM_CTL request size %u",strlen(l_request)); - - dap_client_pvt_request_enc(a_client_pvt, - DAP_UPLINK_PATH_STREAM_CTL, - "stream_ctl","type=tcp,maxconn=4",l_request,l_request_size, - m_stream_ctl_response, m_stream_ctl_error); - DAP_DELETE(l_request); - }break; - case STAGE_STREAM_SESSION:{ - log_it(L_INFO,"Go to stage STREAM_SESSION: process the state ops"); + switch (a_client_pvt->stage_status) { + case STAGE_STATUS_IN_PROGRESS: { + switch (a_client_pvt->stage) { + case STAGE_ENC_INIT: { + log_it(L_INFO, "Go to stage ENC: prepare the request"); + + a_client_pvt->session_key_open = dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_MSRLN, NULL, 0, NULL, 0, 0); + + size_t l_key_str_size_max = DAP_ENC_BASE64_ENCODE_SIZE(a_client_pvt->session_key_open->pub_key_data_size); + char *l_key_str = DAP_NEW_Z_SIZE(char, l_key_str_size_max + 1); + // DAP_ENC_DATA_TYPE_B64_URLSAFE not need because send it by POST request + size_t l_key_str_enc_size = dap_enc_base64_encode(a_client_pvt->session_key_open->pub_key_data, + a_client_pvt->session_key_open->pub_key_data_size, + l_key_str, DAP_ENC_DATA_TYPE_B64); + + log_it(L_DEBUG, "ENC request size %u", l_key_str_enc_size); + dap_client_pvt_request(a_client_pvt, DAP_UPLINK_PATH_ENC_INIT "/gd4y5yh78w42aaagh", + l_key_str, l_key_str_size_max, m_enc_init_response, m_enc_init_error); + DAP_DELETE(l_key_str); + } + break; + case STAGE_STREAM_CTL: { + log_it(L_INFO, "Go to stage STREAM_CTL: prepare the request"); + + size_t l_request_size = 10; + char *l_request = DAP_NEW_Z_SIZE(char, l_request_size); + snprintf(l_request, l_request_size, "%d", DAP_CLIENT_PROTOCOL_VERSION); + log_it(L_DEBUG, "STREAM_CTL request size %u", strlen(l_request)); + + dap_client_pvt_request_enc(a_client_pvt, + DAP_UPLINK_PATH_STREAM_CTL, + "stream_ctl,channels=N", "type=tcp,maxconn=4", l_request, l_request_size, + m_stream_ctl_response, m_stream_ctl_error); + DAP_DELETE(l_request); + } + break; + case STAGE_STREAM_SESSION: { + log_it(L_INFO, "Go to stage STREAM_SESSION: process the state ops"); + + a_client_pvt->stream_socket = socket(PF_INET, SOCK_STREAM, 0); + setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_SNDBUF, (const void*) 50000, sizeof(int)); + setsockopt(a_client_pvt->stream_socket, SOL_SOCKET, SO_RCVBUF, (const void *) 50000, sizeof(int)); + // Wrap socket and setup callbacks + static dap_events_socket_callbacks_t l_s_callbacks = { + .read_callback = m_es_stream_read, + .write_callback = m_es_stream_write, + .error_callback = m_es_stream_error, + .delete_callback = m_es_stream_delete + }; + a_client_pvt->stream_es = dap_events_socket_wrap_no_add(a_client_pvt->events, + a_client_pvt->stream_socket, &l_s_callbacks); + // add to dap_worker + dap_events_socket_create_after(a_client_pvt->stream_es); + + a_client_pvt->stream_es->_inheritor = a_client_pvt->client; + a_client_pvt->stream = dap_stream_new_es(a_client_pvt->stream_es); + a_client_pvt->stream->is_client_to_uplink = true; + a_client_pvt->stream_session = dap_stream_session_pure_new(); // may be from in packet? + + // new added, whether it is necessary? + a_client_pvt->stream->session = a_client_pvt->stream_session; + a_client_pvt->stream->session->key = a_client_pvt->stream_key; + + // connect + struct sockaddr_in l_remote_addr; + memset(&l_remote_addr, 0, sizeof(l_remote_addr)); + l_remote_addr.sin_family = AF_INET; + l_remote_addr.sin_port = htons(a_client_pvt->uplink_port); + if(inet_pton(AF_INET, a_client_pvt->uplink_addr, &(l_remote_addr.sin_addr)) < 0) { + log_it(L_ERROR, "Wrong remote address '%s:%u'", a_client_pvt->uplink_addr, a_client_pvt->uplink_port); + close(a_client_pvt->stream_socket); + a_client_pvt->stream_socket = 0; + a_client_pvt->stage_status = STAGE_STATUS_ERROR; + } + else { + int l_err = 0; + if((l_err = connect(a_client_pvt->stream_socket, (struct sockaddr *) &l_remote_addr, + sizeof(struct sockaddr_in))) != -1) { + //s_set_sock_nonblock(a_client_pvt->stream_socket, false); + log_it(L_INFO, "Remote address connected (%s:%u) with sock_id %d", a_client_pvt->uplink_addr, + a_client_pvt->uplink_port, a_client_pvt->stream_socket); a_client_pvt->stage_status = STAGE_STATUS_DONE; - s_stage_status_after(a_client_pvt); - } break; - case STAGE_STREAM_STREAMING:{ - log_it(L_INFO,"Go to stage STREAM: prepare the socket"); - a_client_pvt->stream_socket = socket(PF_INET,SOCK_STREAM,0); - setsockopt(a_client_pvt->stream_socket,SOL_SOCKET,SO_SNDBUF,(const void*) 50000,sizeof(int) ); - setsockopt(a_client_pvt->stream_socket,SOL_SOCKET,SO_RCVBUF,(const void *) 50000,sizeof(int) ); - // Wrap socket and setup callbacks - static dap_events_socket_callbacks_t l_s_callbacks={ - .read_callback = m_es_stream_read , - .write_callback = m_es_stream_write , - .error_callback = m_es_stream_error , - .delete_callback = m_es_stream_delete - }; - a_client_pvt->stream_es = dap_events_socket_wrap_no_add(a_client_pvt->events, - a_client_pvt->stream_socket, &l_s_callbacks); - - a_client_pvt->stream_es->_inheritor = a_client_pvt->client; - a_client_pvt->stream = dap_stream_new_es(a_client_pvt->stream_es); - a_client_pvt->stream->is_client_to_uplink = true; - a_client_pvt->stream_session = dap_stream_session_pure_new(); - - - } break; - - default:{ - log_it(L_ERROR,"Undefined proccessing actions for stage status %s", - dap_client_stage_status_str(a_client_pvt->stage_status)); + } + else { + log_it(L_ERROR, "Remote address can't connected (%s:%u) with sock_id %d", a_client_pvt->uplink_addr, + a_client_pvt->uplink_port); + close(a_client_pvt->stream_socket); + a_client_pvt->stream_socket = 0; a_client_pvt->stage_status = STAGE_STATUS_ERROR; - s_stage_status_after(a_client_pvt); // be carefull to not to loop! } } - } break; - case STAGE_STATUS_ERROR:{ - log_it(L_ERROR, "Error state, doing callback if present"); - if( a_client_pvt->stage_status_error_callback ){ - a_client_pvt->stage_status_error_callback(a_client_pvt->client,NULL); - // Expecting that its one-shot callback - //a_client_internal->stage_status_error_callback = NULL; + s_stage_status_after(a_client_pvt); + } + break; + case STAGE_STREAM_CONNECTED: { + log_it(L_INFO, "Go to stage STAGE_STREAM_CONNECTED"); + dap_stream_ch_new(a_client_pvt->stream, 'N'); + + char* l_full_path = NULL; + const char * l_path = "stream"; + const char *l_suburl = "globaldb"; + int l_full_path_size = snprintf(l_full_path, 0, "%s/%s?session_id=%s", DAP_UPLINK_PATH_STREAM, l_suburl, + dap_client_get_stream_id(a_client_pvt->client)); + l_full_path = DAP_NEW_Z_SIZE(char, l_full_path_size + 1); + snprintf(l_full_path, l_full_path_size + 1, "%s/%s?session_id=%s", DAP_UPLINK_PATH_STREAM, l_suburl, + dap_client_get_stream_id(a_client_pvt->client)); + + //dap_client_request(a_client_pvt->client, l_full_path, "12345", 0, m_stream_response, m_stream_error); + { + size_t l_message_size = snprintf(NULL, 0, + "GET /%s HTTP/1.1\r\nHost: %s:%d\r\n\r\n", + l_full_path, a_client_pvt->uplink_addr, a_client_pvt->uplink_port); + char *l_message = DAP_NEW_Z_SIZE(char, l_message_size + 1); + l_message_size = snprintf(l_message, l_message_size + 1, + "GET /%s HTTP/1.1\r\nHost: %s:%d\r\n\r\n", + l_full_path, a_client_pvt->uplink_addr, a_client_pvt->uplink_port); + int count = send(a_client_pvt->stream_socket, l_message, l_message_size, 0); + DAP_DELETE(l_message); } - a_client_pvt->stage = STAGE_ENC_INIT; - // Trying the step again - a_client_pvt->stage_status = STAGE_STATUS_IN_PROGRESS; + DAP_DELETE(l_full_path); + + a_client_pvt->stage_status = STAGE_STATUS_DONE; + s_stage_status_after(a_client_pvt); + } + break; + case STAGE_STREAM_STREAMING: { + log_it(L_INFO, "Go to stage STAGE_STREAM_STREAMING"); + + a_client_pvt->stage_status = STAGE_STATUS_DONE; s_stage_status_after(a_client_pvt); - } break; - case STAGE_STATUS_DONE :{ - log_it(L_INFO, "Stage status %s is done", - dap_client_stage_str(a_client_pvt->stage) ); - bool l_is_last_stage=( a_client_pvt->stage == a_client_pvt->stage_target ); - if( a_client_pvt->stage_status_done_callback ){ - a_client_pvt->stage_status_done_callback(a_client_pvt->client,NULL); + + } + break; + + default: { + log_it(L_ERROR, "Undefined proccessing actions for stage status %s", + dap_client_stage_status_str(a_client_pvt->stage_status)); + a_client_pvt->stage_status = STAGE_STATUS_ERROR; + s_stage_status_after(a_client_pvt); // be carefull to not to loop! + } + } + } + break; + case STAGE_STATUS_ERROR: { + log_it(L_ERROR, "Error state, doing callback if present"); + if(a_client_pvt->stage_status_error_callback) { + a_client_pvt->stage_status_error_callback(a_client_pvt->client, NULL); + // Expecting that its one-shot callback + //a_client_internal->stage_status_error_callback = NULL; + + } + a_client_pvt->stage = STAGE_ENC_INIT; + // Trying the step again + a_client_pvt->stage_status = STAGE_STATUS_IN_PROGRESS; + s_stage_status_after(a_client_pvt); + } + break; + case STAGE_STATUS_DONE: { + log_it(L_INFO, "Stage status %s is done", + dap_client_stage_str(a_client_pvt->stage)); + bool l_is_last_stage = (a_client_pvt->stage == a_client_pvt->stage_target); + if(a_client_pvt->stage_status_done_callback) { + a_client_pvt->stage_status_done_callback(a_client_pvt->client, NULL); + // Expecting that its one-shot callback + //a_client_internal->stage_status_done_callback = NULL; + } else + log_it(L_WARNING, "Stage done callback is not present"); + + if(l_is_last_stage) { + log_it(L_NOTICE, "Stage %s is achieved", + dap_client_stage_str(a_client_pvt->stage)); + if(a_client_pvt->stage_target_done_callback) { + a_client_pvt->stage_target_done_callback(a_client_pvt->client, NULL); // Expecting that its one-shot callback - //a_client_internal->stage_status_done_callback = NULL; - }else - log_it(L_WARNING,"Stage done callback is not present"); - - if (l_is_last_stage ){ - log_it(L_NOTICE, "Stage %s is achieved", - dap_client_stage_str(a_client_pvt->stage)); - if( a_client_pvt->stage_target_done_callback ){ - a_client_pvt->stage_target_done_callback(a_client_pvt->client,NULL); - // Expecting that its one-shot callback - a_client_pvt->stage_target_done_callback = NULL; - } - } else log_it(L_ERROR,"!! dap_CLIENT_STAGE_STATUS_DONE but not l_is_last_stage !!"); - }break; - default: log_it(L_ERROR,"Undefined proccessing actions for stage status %s", - dap_client_stage_status_str(a_client_pvt->stage_status)); + a_client_pvt->stage_target_done_callback = NULL; + } + } else + log_it(L_ERROR, "!! dap_CLIENT_STAGE_STATUS_DONE but not l_is_last_stage !!"); + } + break; + default: + log_it(L_ERROR, "Undefined proccessing actions for stage status %s", + dap_client_stage_status_str(a_client_pvt->stage_status)); } - if( a_client_pvt->stage_status_callback ) - a_client_pvt->stage_status_callback( a_client_pvt->client,NULL ); + if(a_client_pvt->stage_status_callback) + a_client_pvt->stage_status_callback(a_client_pvt->client, NULL); } - /** * @brief dap_client_internal_stage_transaction_begin * @param a_client_internal @@ -251,7 +347,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) * @param a_done_callback */ void dap_client_pvt_stage_transaction_begin(dap_client_pvt_t * a_client_internal, dap_client_stage_t a_stage_next, - dap_client_callback_t a_done_callback) + dap_client_callback_t a_done_callback) { a_client_internal->stage_status_done_callback = a_done_callback; a_client_internal->stage = a_stage_next; @@ -268,7 +364,8 @@ void dap_client_pvt_stage_transaction_begin(dap_client_pvt_t * a_client_internal * @param a_response_proc */ void dap_client_pvt_request(dap_client_pvt_t * a_client_internal, const char * a_path, void * a_request, - size_t a_request_size, dap_client_callback_data_size_t a_response_proc, dap_client_callback_int_t a_response_error) + size_t a_request_size, dap_client_callback_data_size_t a_response_proc, + dap_client_callback_int_t a_response_error) { a_client_internal->request_response_callback = a_response_proc; a_client_internal->request_error_callback = a_response_error; @@ -276,19 +373,20 @@ void dap_client_pvt_request(dap_client_pvt_t * a_client_internal, const char * a size_t l_url_size_max = 0; char *l_url = NULL; - if(a_path){ - l_url_size_max= strlen(a_client_internal->uplink_addr)+strlen(a_path)+15; - l_url = DAP_NEW_Z_SIZE(char,l_url_size_max); - - snprintf(l_url,l_url_size_max,"http://%s:%u/%s",a_client_internal->uplink_addr, a_client_internal->uplink_port, a_path ); - }else{ - l_url_size_max= strlen(a_client_internal->uplink_addr)+15; - l_url = DAP_NEW_Z_SIZE(char,l_url_size_max); - snprintf(l_url,l_url_size_max,"http://%s:%u",a_client_internal->uplink_addr, a_client_internal->uplink_port ); + if(a_path) { + l_url_size_max = strlen(a_client_internal->uplink_addr) + strlen(a_path) + 15; + l_url = DAP_NEW_Z_SIZE(char, l_url_size_max); + + snprintf(l_url, l_url_size_max, "http://%s:%u/%s", a_client_internal->uplink_addr, + a_client_internal->uplink_port, a_path); + } else { + l_url_size_max = strlen(a_client_internal->uplink_addr) + 15; + l_url = DAP_NEW_Z_SIZE(char, l_url_size_max); + snprintf(l_url, l_url_size_max, "http://%s:%u", a_client_internal->uplink_addr, a_client_internal->uplink_port); } - dap_http_client_simple_request(l_url, a_request?"POST":"GET","text/text", a_request, a_request_size, - NULL, - m_request_response, m_request_error, a_client_internal,NULL); + dap_http_client_simple_request(l_url, a_request ? "POST" : "GET", "text/text", a_request, a_request_size, + NULL, + m_request_response, m_request_error, a_client_internal, NULL); DAP_DELETE(l_url); } @@ -304,31 +402,34 @@ void dap_client_pvt_request(dap_client_pvt_t * a_client_internal, const char * a * @param a_response_error */ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char * a_path, - const char * a_sub_url, const char * a_query - , void * a_request, size_t a_request_size - ,dap_client_callback_data_size_t a_response_proc - , dap_client_callback_int_t a_response_error) + const char * a_sub_url, const char * a_query + , void * a_request, size_t a_request_size + , dap_client_callback_data_size_t a_response_proc + , dap_client_callback_int_t a_response_error) { - log_it(L_DEBUG,"Encrypted request: sub_url '%s' query '%s'",a_sub_url?a_sub_url:"NULL", a_query?a_query:"NULL" ); - size_t l_sub_url_size = a_sub_url?strlen(a_sub_url): 0; - size_t l_query_size = a_query?strlen(a_query):0; + bool is_query_enc = false; // it true, then encode a_query string + log_it(L_DEBUG, "Encrypted request: sub_url '%s' query '%s'", a_sub_url ? a_sub_url : "NULL", + a_query ? a_query : "NULL"); + size_t l_sub_url_size = a_sub_url ? strlen(a_sub_url) : 0; + size_t l_query_size = a_query ? strlen(a_query) : 0; size_t l_url_size; - char l_url[1024]={0}; - snprintf(l_url,1024,"http://%s:%u",a_client_internal->uplink_addr, a_client_internal->uplink_port ); + char l_url[1024] = { 0 }; + snprintf(l_url, 1024, "http://%s:%u", a_client_internal->uplink_addr, a_client_internal->uplink_port); l_url_size = strlen(l_url); - size_t l_sub_url_enc_size_max = l_sub_url_size ? (5*l_sub_url_size+16 ): 0; - char *l_sub_url_enc = l_sub_url_size ? DAP_NEW_Z_SIZE(char, l_sub_url_enc_size_max+1 ): NULL; + size_t l_sub_url_enc_size_max = l_sub_url_size ? (5 * l_sub_url_size + 16) : 0; + char *l_sub_url_enc = l_sub_url_size ? DAP_NEW_Z_SIZE(char, l_sub_url_enc_size_max + 1) : NULL; - size_t l_query_enc_size_max = l_query_size*5+16; - char *l_query_enc = l_query_size ? DAP_NEW_Z_SIZE(char,l_query_enc_size_max+1 ):NULL; + size_t l_query_enc_size_max = (is_query_enc) ? (l_query_size * 5 + 16) : l_query_size; + char *l_query_enc = + (is_query_enc) ? (l_query_size ? DAP_NEW_Z_SIZE(char, l_query_enc_size_max + 1) : NULL) : (char*) a_query; - size_t l_url_full_size_max = 5*l_sub_url_size + 5*l_query_size + 16 + l_url_size+2; - char * l_url_full = DAP_NEW_Z_SIZE(char, l_url_full_size_max+1); + size_t l_url_full_size_max = 5 * l_sub_url_size + 5 * l_query_size + 16 + l_url_size + 2; + char * l_url_full = DAP_NEW_Z_SIZE(char, l_url_full_size_max + 1); - size_t l_request_enc_size_max = a_request_size ?a_request_size*2+16 : 0; - char * l_request_enc = a_request_size? DAP_NEW_Z_SIZE(char,l_request_enc_size_max+1 ) : NULL; + size_t l_request_enc_size_max = a_request_size ? a_request_size * 2 + 16 : 0; + char * l_request_enc = a_request_size ? DAP_NEW_Z_SIZE(char, l_request_enc_size_max + 1) : NULL; size_t l_request_enc_size = 0; a_client_internal->request_response_callback = a_response_proc; @@ -337,50 +438,48 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char size_t i; dap_enc_data_type_t l_enc_type; - if( a_client_internal->uplink_protocol_version >= 21 ) + if(a_client_internal->uplink_protocol_version >= 21) l_enc_type = DAP_ENC_DATA_TYPE_B64_URLSAFE; else l_enc_type = DAP_ENC_DATA_TYPE_B64; - if ( l_sub_url_size ) + if(l_sub_url_size) dap_enc_code(a_client_internal->session_key, - a_sub_url,l_sub_url_size, - l_sub_url_enc, l_sub_url_enc_size_max, - l_enc_type); + a_sub_url, l_sub_url_size, + l_sub_url_enc, l_sub_url_enc_size_max, + l_enc_type); - if ( l_query_size ) + if(is_query_enc && l_query_size) dap_enc_code(a_client_internal->session_key, - a_query,l_query_size, - l_query_enc,l_query_enc_size_max, - l_enc_type); - + a_query, l_query_size, + l_query_enc, l_query_enc_size_max, + l_enc_type); - if ( a_request_size ) + if(a_request_size) l_request_enc_size = dap_enc_code(a_client_internal->session_key, - a_request, a_request_size, - l_request_enc, l_request_enc_size_max, - DAP_ENC_DATA_TYPE_RAW ); - - if (a_path){ - if( l_sub_url_size ){ - if( l_query_size ){ - snprintf(l_url_full,l_url_full_size_max-1,"%s/%s/%s?%s" - ,l_url,a_path, l_sub_url_enc, l_query_enc ); - - }else{ - snprintf(l_url_full,l_url_full_size_max,"%s/%s/%s",l_url,a_path, l_sub_url_enc); + a_request, a_request_size, + l_request_enc, l_request_enc_size_max, + DAP_ENC_DATA_TYPE_RAW); + + if(a_path) { + if(l_sub_url_size) { + if(l_query_size) { + snprintf(l_url_full, l_url_full_size_max - 1, "%s/%s/%s?%s" + , l_url, a_path, l_sub_url_enc, l_query_enc); + + } else { + snprintf(l_url_full, l_url_full_size_max, "%s/%s/%s", l_url, a_path, l_sub_url_enc); } - }else{ - snprintf(l_url_full,l_url_full_size_max,"%s/%s",l_url,a_path); + } else { + snprintf(l_url_full, l_url_full_size_max, "%s/%s", l_url, a_path); } - }else{ - snprintf(l_url_full,l_url_full_size_max,"%s",l_url); + } else { + snprintf(l_url_full, l_url_full_size_max, "%s", l_url); } - size_t l_key_hdr_str_size_max = strlen(a_client_internal->session_key_id)+10; - char *l_key_hdr_str = DAP_NEW_Z_SIZE(char,l_key_hdr_str_size_max); - snprintf(l_key_hdr_str,l_key_hdr_str_size_max,"KeyID: %s",a_client_internal->session_key_id ); - + size_t l_key_hdr_str_size_max = strlen(a_client_internal->session_key_id) + 10; + char *l_key_hdr_str = DAP_NEW_Z_SIZE(char, l_key_hdr_str_size_max); + snprintf(l_key_hdr_str, l_key_hdr_str_size_max, "KeyID: %s", a_client_internal->session_key_id); char *a_custom_new[2]; size_t a_custom_count = 1; @@ -395,16 +494,16 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char m_request_response, m_request_error, a_client_internal, a_custom_new, a_custom_count); DAP_DELETE(l_key_hdr_str); - if( l_sub_url_enc ) + if(l_sub_url_enc) DAP_DELETE(l_sub_url_enc); - if( l_query_enc ) + if(is_query_enc && l_query_enc) DAP_DELETE(l_query_enc); - if( l_url_full ) + if(l_url_full) DAP_DELETE(l_url_full); - if( l_request_enc ) + if(l_request_enc) DAP_DELETE(l_request_enc); } @@ -416,7 +515,7 @@ void dap_client_pvt_request_enc(dap_client_pvt_t * a_client_internal, const char void m_request_error(int a_err_code, void * a_obj) { dap_client_pvt_t * a_client_internal = (dap_client_pvt_t *) a_obj; - a_client_internal->request_error_callback(a_client_internal->client, a_err_code ); + a_client_internal->request_error_callback(a_client_internal->client, a_err_code); } /** @@ -425,38 +524,41 @@ void m_request_error(int a_err_code, void * a_obj) * @param a_response_size * @param a_obj */ -void m_request_response(void * a_response,size_t a_response_size,void * a_obj) +void m_request_response(void * a_response, size_t a_response_size, void * a_obj) { dap_client_pvt_t * a_client_internal = (dap_client_pvt_t *) a_obj; - if( a_client_internal->is_encrypted){ - size_t l_response_dec_size_max = a_response_size ?a_response_size*2+16 : 0; - char * l_response_dec = a_response_size? DAP_NEW_Z_SIZE(char, l_response_dec_size_max ) : NULL; + if(a_client_internal->is_encrypted) { + size_t l_response_dec_size_max = a_response_size ? a_response_size * 2 + 16 : 0; + char * l_response_dec = a_response_size ? DAP_NEW_Z_SIZE(char, l_response_dec_size_max) : NULL; size_t l_response_dec_size = 0; - if ( a_response_size) - l_response_dec_size = dap_enc_decode( a_client_internal->session_key, - a_response, a_response_size, - l_response_dec, l_response_dec_size_max, - DAP_ENC_DATA_TYPE_RAW ); - - a_client_internal->request_response_callback(a_client_internal->client, l_response_dec, l_response_dec_size ); - - if( l_response_dec ) - DAP_DELETE ( l_response_dec ); - }else{ - a_client_internal->request_response_callback(a_client_internal->client, a_response, a_response_size ); + if(a_response_size) + l_response_dec_size = dap_enc_decode(a_client_internal->session_key, + a_response, a_response_size, + l_response_dec, l_response_dec_size_max, + DAP_ENC_DATA_TYPE_RAW); + + a_client_internal->request_response_callback(a_client_internal->client, l_response_dec, l_response_dec_size); + + if(l_response_dec) + DAP_DELETE(l_response_dec); + } else { + a_client_internal->request_response_callback(a_client_internal->client, a_response, a_response_size); } } - /** * @brief m_enc_init_response * @param a_client * @param a_response * @param a_response_size */ -void m_enc_init_response(dap_client_t * a_client, void * a_response,size_t a_response_size) +void m_enc_init_response(dap_client_t * a_client, void * a_response, size_t a_response_size) { dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(a_client); + if(!l_client_pvt) { + log_it(L_ERROR, "m_enc_init_response: l_client_pvt is NULL!"); + return; + } if(a_response_size > 10) { // && a_response_size < 50){ char *l_session_id_b64 = NULL; @@ -487,46 +589,46 @@ void m_enc_init_response(dap_client_t * a_client, void * a_response,size_t a_res //char l_session_id_b64[DAP_ENC_BASE64_ENCODE_SIZE(DAP_ENC_KS_KEY_ID_SIZE) + 1] = { 0 }; //char *l_bob_message_b64 = DAP_NEW_Z_SIZE(char, a_response_size - sizeof(l_session_id_b64) + 1); if(json_parse_count == 2) { //if (sscanf (a_response,"%s %s",l_session_id_b64, l_bob_message_b64) == 2 ){ - l_client_pvt->session_key_id = DAP_NEW_Z_SIZE(char,strlen(l_session_id_b64)+1); - dap_enc_base64_decode(l_session_id_b64,strlen(l_session_id_b64), - l_client_pvt->session_key_id, DAP_ENC_DATA_TYPE_B64); - log_it(L_DEBUG,"ENC: session Key ID %s",l_client_pvt->session_key_id); - - char *l_bob_message = DAP_NEW_Z_SIZE(char,strlen(l_bob_message_b64)+1); - size_t l_bob_message_size = dap_enc_base64_decode(l_bob_message_b64,strlen(l_bob_message_b64), - l_bob_message, DAP_ENC_DATA_TYPE_B64); + l_client_pvt->session_key_id = DAP_NEW_Z_SIZE(char, strlen(l_session_id_b64) + 1); + dap_enc_base64_decode(l_session_id_b64, strlen(l_session_id_b64), + l_client_pvt->session_key_id, DAP_ENC_DATA_TYPE_B64); + log_it(L_DEBUG, "ENC: session Key ID %s", l_client_pvt->session_key_id); + + char *l_bob_message = DAP_NEW_Z_SIZE(char, strlen(l_bob_message_b64) + 1); + size_t l_bob_message_size = dap_enc_base64_decode(l_bob_message_b64, strlen(l_bob_message_b64), + l_bob_message, DAP_ENC_DATA_TYPE_B64); l_client_pvt->session_key_open->gen_alice_shared_key( - l_client_pvt->session_key_open, l_client_pvt->session_key_open->priv_key_data, - l_bob_message_size, l_bob_message); + l_client_pvt->session_key_open, l_client_pvt->session_key_open->priv_key_data, + l_bob_message_size, l_bob_message); l_client_pvt->session_key = dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_IAES, - l_client_pvt->session_key_open->priv_key_data, // shared key - l_client_pvt->session_key_open->priv_key_data_size, - l_client_pvt->session_key_id,strlen(l_client_pvt->session_key_id), 0); + l_client_pvt->session_key_open->priv_key_data, // shared key + l_client_pvt->session_key_open->priv_key_data_size, + l_client_pvt->session_key_id, strlen(l_client_pvt->session_key_id), 0); DAP_DELETE(l_bob_message); - if( l_client_pvt->stage == STAGE_ENC_INIT ){ // We are in proper stage + if(l_client_pvt->stage == STAGE_ENC_INIT) { // We are in proper stage l_client_pvt->stage_status = STAGE_STATUS_DONE; s_stage_status_after(l_client_pvt); - }else{ - log_it(L_WARNING,"ENC: initialized encryption but current stage is %s (%s)", - dap_client_get_stage_str(a_client),dap_client_get_stage_status_str(a_client)); + } else { + log_it(L_WARNING, "ENC: initialized encryption but current stage is %s (%s)", + dap_client_get_stage_str(a_client), dap_client_get_stage_status_str(a_client)); } } else { - log_it(L_ERROR, "ENC: Wrong response (size %u data '%s')",a_response_size,(char*) a_response); + log_it(L_ERROR, "ENC: Wrong response (size %u data '%s')", a_response_size, (char* ) a_response); l_client_pvt->last_error = ERROR_ENC_NO_KEY; l_client_pvt->stage_status = STAGE_STATUS_ERROR; s_stage_status_after(l_client_pvt); } DAP_DELETE(l_session_id_b64); DAP_DELETE(l_bob_message_b64); - }else if( a_response_size>1){ - log_it(L_ERROR, "ENC: Wrong response (size %u data '%s')",a_response_size,(char*) a_response); + } else if(a_response_size > 1) { + log_it(L_ERROR, "ENC: Wrong response (size %u data '%s')", a_response_size, (char* ) a_response); l_client_pvt->last_error = ERROR_ENC_NO_KEY; l_client_pvt->stage_status = STAGE_STATUS_ERROR; s_stage_status_after(l_client_pvt); - }else{ - log_it(L_ERROR, "ENC: Wrong response (size %u)",a_response_size); + } else { + log_it(L_ERROR, "ENC: Wrong response (size %u)", a_response_size); l_client_pvt->last_error = ERROR_ENC_NO_KEY; l_client_pvt->stage_status = STAGE_STATUS_ERROR; s_stage_status_after(l_client_pvt); @@ -541,17 +643,18 @@ void m_enc_init_response(dap_client_t * a_client, void * a_response,size_t a_res void m_enc_init_error(dap_client_t * a_client, int a_err_code) { dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(a_client); + if(!l_client_pvt) { + log_it(L_ERROR, "m_enc_init_error: l_client_pvt is NULL!"); + return; + } //dap_client_internal_t * l_client_internal = dap_CLIENT_INTERNAL(a_client); log_it(L_ERROR,"ENC: Can't init ecnryption session, err code %d",a_err_code); - l_client_pvt->last_error = ERROR_NETWORK_CONNECTION_REFUSE ; + l_client_pvt->last_error = ERROR_NETWORK_CONNECTION_REFUSE; l_client_pvt->stage_status = STAGE_STATUS_ERROR; s_stage_status_after(l_client_pvt); } - - - /** * @brief m_stream_ctl_response * @param a_client @@ -561,73 +664,76 @@ void m_enc_init_error(dap_client_t * a_client, int a_err_code) void m_stream_ctl_response(dap_client_t * a_client, void * a_data, size_t a_data_size) { dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); - + if(!l_client_internal) { + log_it(L_ERROR, "m_stream_ctl_response: l_client_internal is NULL!"); + return; + } log_it(L_DEBUG, "STREAM_CTL response %u bytes length recieved", a_data_size); - char * l_response_str = DAP_NEW_Z_SIZE(char, a_data_size+1); - memcpy(l_response_str, a_data,a_data_size); + char * l_response_str = DAP_NEW_Z_SIZE(char, a_data_size + 1); + memcpy(l_response_str, a_data, a_data_size); - if( a_data_size<4 ){ + if(a_data_size < 4) { log_it(L_ERROR, "STREAM_CTL Wrong reply: '%s'", l_response_str); l_client_internal->last_error = ERROR_STREAM_CTL_ERROR_RESPONSE_FORMAT; l_client_internal->stage_status = STAGE_STATUS_ERROR; s_stage_status_after(l_client_internal); - }else if ( strcmp(l_response_str, "ERROR") == 0 ){ + } else if(strcmp(l_response_str, "ERROR") == 0) { log_it(L_WARNING, "STREAM_CTL Got ERROR from the remote site,expecting thats ERROR_AUTH"); l_client_internal->last_error = ERROR_STREAM_CTL_ERROR_AUTH; l_client_internal->stage_status = STAGE_STATUS_ERROR; s_stage_status_after(l_client_internal); - }else { + } else { int l_arg_count; - char l_stream_id[25]={0}; - char *l_stream_key = DAP_NEW_Z_SIZE(char,4096*3); - void * l_stream_key_raw = DAP_NEW_Z_SIZE(char,4096); + char l_stream_id[25] = { 0 }; + char *l_stream_key = DAP_NEW_Z_SIZE(char, 4096 * 3); + void * l_stream_key_raw = DAP_NEW_Z_SIZE(char, 4096); size_t l_stream_key_raw_size = 0; uint32_t l_remote_protocol_version; - l_arg_count = sscanf(l_response_str,"%25s %4096s %u" - ,l_stream_id,l_stream_key,&l_remote_protocol_version ); - if (l_arg_count <2 ){ - log_it(L_WARNING, "STREAM_CTL Need at least 2 arguments in reply (got %d)",l_arg_count); + l_arg_count = sscanf(l_response_str, "%25s %4096s %u" + , l_stream_id, l_stream_key, &l_remote_protocol_version); + if(l_arg_count < 2) { + log_it(L_WARNING, "STREAM_CTL Need at least 2 arguments in reply (got %d)", l_arg_count); l_client_internal->last_error = ERROR_STREAM_CTL_ERROR_RESPONSE_FORMAT; l_client_internal->stage_status = STAGE_STATUS_ERROR; s_stage_status_after(l_client_internal); - }else{ + } else { - if( l_arg_count >2){ + if(l_arg_count > 2) { l_client_internal->uplink_protocol_version = l_remote_protocol_version; - log_it(L_DEBUG,"Uplink protocol version %u",l_remote_protocol_version); - }else - log_it(L_WARNING,"No uplink protocol version, use the default version %d" - ,l_client_internal->uplink_protocol_version = DAP_PROTOCOL_VERSION); + log_it(L_DEBUG, "Uplink protocol version %u", l_remote_protocol_version); + } else + log_it(L_WARNING, "No uplink protocol version, use the default version %d" + , l_client_internal->uplink_protocol_version = DAP_PROTOCOL_VERSION); - if(strlen(l_stream_id)<13){ + if(strlen(l_stream_id) < 13) { //log_it(L_DEBUG, "Stream server id %s, stream key length(base64 encoded) %u" // ,l_stream_id,strlen(l_stream_key) ); log_it(L_DEBUG, "Stream server id %s, stream key '%s'" - ,l_stream_id,l_stream_key ); + , l_stream_id, l_stream_key); //l_stream_key_raw_size = dap_enc_base64_decode(l_stream_key,strlen(l_stream_key), - // l_stream_key_raw,DAP_ENC_DATA_TYPE_B64); + // l_stream_key_raw,DAP_ENC_DATA_TYPE_B64); // Delete old key if present if(l_client_internal->stream_key) dap_enc_key_delete(l_client_internal->stream_key); - strncpy(l_client_internal->stream_id,l_stream_id,sizeof(l_client_internal->stream_id)-1); + strncpy(l_client_internal->stream_id, l_stream_id, sizeof(l_client_internal->stream_id) - 1); l_client_internal->stream_key = - dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_IAES, l_stream_key, strlen(l_stream_key), NULL, 0, 0); + dap_enc_key_new_generate(DAP_ENC_KEY_TYPE_OAES, l_stream_key, strlen(l_stream_key), NULL, 0, 32); //streamSocket->connectToHost(SapSession::me()->address(),SapSession::me()->port().toUShort(),QIODevice::ReadWrite); - if( l_client_internal->stage == STAGE_STREAM_CTL ){ // We are on the right stage + if(l_client_internal->stage == STAGE_STREAM_CTL) { // We are on the right stage l_client_internal->stage_status = STAGE_STATUS_DONE; s_stage_status_after(l_client_internal); - }else{ - log_it(L_WARNING,"Expected to be stage STREAM_CTL but current stage is %s (%s)", - dap_client_get_stage_str(a_client),dap_client_get_stage_status_str(a_client)); + } else { + log_it(L_WARNING, "Expected to be stage STREAM_CTL but current stage is %s (%s)", + dap_client_get_stage_str(a_client), dap_client_get_stage_status_str(a_client)); } - }else{ - log_it(L_WARNING,"Wrong stream id response"); + } else { + log_it(L_WARNING, "Wrong stream id response"); l_client_internal->last_error = ERROR_STREAM_CTL_ERROR_RESPONSE_FORMAT; l_client_internal->stage_status = STAGE_STATUS_ERROR; s_stage_status_after(l_client_internal); @@ -646,15 +752,57 @@ void m_stream_ctl_response(dap_client_t * a_client, void * a_data, size_t a_data */ void m_stream_ctl_error(dap_client_t * a_client, int a_error) { - log_it(L_WARNING, "STREAM_CTL error %d",a_error); + log_it(L_WARNING, "STREAM_CTL error %d", a_error); + + dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(a_client); + if(!l_client_pvt) { + log_it(L_ERROR, "m_stream_ctl_error: l_client_pvt is NULL!"); + return; + } + l_client_pvt->last_error = ERROR_STREAM_CTL_ERROR; + l_client_pvt->stage_status = STAGE_STATUS_ERROR; - dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(a_client); + s_stage_status_after(l_client_pvt); - l_client_pvt->last_error = ERROR_STREAM_CTL_ERROR; - l_client_pvt->stage_status = STAGE_STATUS_ERROR; +} - s_stage_status_after(l_client_pvt); +// STREAM stage callbacks +void m_stream_response(dap_client_t * a_client, void * a_data, size_t a_data_size) +{ + dap_client_pvt_t * l_client_internal = DAP_CLIENT_PVT(a_client); + if(!l_client_internal) { + log_it(L_ERROR, "m_stream_ctl_response: l_client_internal is NULL!"); + return; + } + log_it(L_DEBUG, "STREAM response %u bytes length recieved", a_data_size); +// char * l_response_str = DAP_NEW_Z_SIZE(char, a_data_size + 1); +// memcpy(l_response_str, a_data, a_data_size); + if(l_client_internal->stage == STAGE_STREAM_CONNECTED) { // We are on the right stage + l_client_internal->stage_status = STAGE_STATUS_DONE; + s_stage_status_after(l_client_internal); + } + else { + log_it(L_WARNING, "Expected to be stage STREAM_CONNECTED but current stage is %s (%s)", + dap_client_get_stage_str(a_client), dap_client_get_stage_status_str(a_client)); + l_client_internal->stage_status = STAGE_STATUS_ERROR; + } + s_stage_status_after(l_client_internal); +} + +void m_stream_error(dap_client_t * a_client, int a_error) +{ + log_it(L_WARNING, "STREAM error %d", a_error); + + dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(a_client); + if(!l_client_pvt) { + log_it(L_ERROR, "m_stream_error: l_client_pvt is NULL!"); + return; + } + l_client_pvt->last_error = ERROR_STREAM_RESPONSE_WRONG; + l_client_pvt->stage_status = STAGE_STATUS_ERROR; + + s_stage_status_after(l_client_pvt); } /** @@ -664,7 +812,7 @@ void m_stream_ctl_error(dap_client_t * a_client, int a_error) */ void m_stage_stream_streaming(dap_client_t * a_client, void* arg) { - log_it(L_INFO, "Stream is opened"); + log_it(L_INFO, "Stream is opened"); } /** @@ -676,13 +824,13 @@ void m_es_stream_delete(dap_events_socket_t * a_es, void * arg) { log_it(L_INFO, "====================================================== stream delete/peer reconnect"); dap_client_t* l_client = DAP_CLIENT(a_es); - if(l_client == NULL ){ + if(l_client == NULL) { log_it(L_ERROR, "dap_client is not initialized"); return; } dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client); - if(l_client_pvt == NULL ){ + if(l_client_pvt == NULL) { log_it(L_ERROR, "dap_client_pvt is not initialized"); return; } @@ -696,44 +844,50 @@ void m_es_stream_delete(dap_events_socket_t * a_es, void * arg) l_client_pvt->stream_es = NULL; dap_stream_session_close(l_client_pvt->stream_session->id); l_client_pvt->stream_session = NULL; - dap_client_go_stage(l_client_pvt->client ,STAGE_STREAM_STREAMING ,m_stage_stream_streaming ); + dap_client_go_stage(l_client_pvt->client, STAGE_STREAM_STREAMING, m_stage_stream_streaming); } /** * @brief m_es_stream_read * @param a_es * @param arg - */\ + */ void m_es_stream_read(dap_events_socket_t * a_es, void * arg) { dap_client_t * l_client = DAP_CLIENT(a_es); - dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client); - switch( l_client_pvt->stage ){ - case STAGE_STREAM_SESSION: - dap_client_go_stage(l_client_pvt->client ,STAGE_STREAM_STREAMING ,m_stage_stream_streaming ); + dap_client_pvt_t * l_client_pvt = (l_client) ? DAP_CLIENT_PVT(l_client) : NULL; + if(!l_client_pvt) { + log_it(L_ERROR, "m_es_stream_read: l_client_pvt is NULL!"); + return; + } + switch (l_client_pvt->stage) { + case STAGE_STREAM_SESSION: + dap_client_go_stage(l_client_pvt->client, STAGE_STREAM_STREAMING, m_stage_stream_streaming); break; - case STAGE_STREAM_CONNECTED:{ // Collect HTTP headers before streaming - if(a_es->buf_in_size>1){ - char * l_pos_endl; - l_pos_endl = (char*) memchr( a_es->buf_in,'\r',a_es->buf_in_size-1); - if ( l_pos_endl ){ - if ( *(l_pos_endl+1) == '\n' ) { - dap_events_socket_shrink_buf_in(a_es,l_pos_endl - a_es->buf_in_str ); - log_it(L_DEBUG,"Header passed, go to streaming (%lu bytes already are in input buffer", a_es->buf_in_size); - l_client_pvt->stage =STAGE_STREAM_STREAMING; - l_client_pvt->stage_status = STAGE_STATUS_DONE; - s_stage_status_after(l_client_pvt); - - dap_stream_data_proc_read(l_client_pvt->stream); - dap_events_socket_shrink_buf_in(a_es,a_es->buf_in_size ); - } - } - } - }break; - case STAGE_STREAM_STREAMING:{ // if streaming - process data with stream processor - dap_stream_data_proc_read(l_client_pvt->stream); - dap_events_socket_shrink_buf_in(a_es,a_es->buf_in_size ); + case STAGE_STREAM_CONNECTED: { // Collect HTTP headers before streaming + if(a_es->buf_in_size > 1) { + char * l_pos_endl; + l_pos_endl = (char*) memchr(a_es->buf_in, '\r', a_es->buf_in_size - 1); + if(l_pos_endl) { + if(*(l_pos_endl + 1) == '\n') { + dap_events_socket_shrink_buf_in(a_es, l_pos_endl - a_es->buf_in_str); + log_it(L_DEBUG, "Header passed, go to streaming (%lu bytes already are in input buffer", + a_es->buf_in_size); + l_client_pvt->stage = STAGE_STREAM_STREAMING; + l_client_pvt->stage_status = STAGE_STATUS_DONE; + s_stage_status_after(l_client_pvt); + + dap_stream_data_proc_read(l_client_pvt->stream); + dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size); + } + } } + } + break; + case STAGE_STREAM_STREAMING: { // if streaming - process data with stream processor + dap_stream_data_proc_read(l_client_pvt->stream); + dap_events_socket_shrink_buf_in(a_es, a_es->buf_in_size); + } break; } @@ -747,25 +901,41 @@ void m_es_stream_read(dap_events_socket_t * a_es, void * arg) void m_es_stream_write(dap_events_socket_t * a_es, void * arg) { dap_client_t * l_client = DAP_CLIENT(a_es); - dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client); - - switch( l_client_pvt->stage ){ - case STAGE_STREAM_STREAMING:{ - size_t i; - bool ready_to_write=false; - // log_it(DEBUG,"Process channels data output (%u channels)",STREAM(sh)->channel_count); - - for(i=0;i< l_client_pvt->stream->channel_count; i++){ - dap_stream_ch_t * ch = l_client_pvt->stream->channel[i]; - if(ch->ready_to_write){ - ch->proc->packet_out_callback(ch,NULL); - ready_to_write|=ch->ready_to_write; - } + dap_client_pvt_t * l_client_pvt = (l_client) ? DAP_CLIENT_PVT(l_client) : NULL; + if(!l_client_pvt) { + log_it(L_ERROR, "m_es_stream_write: l_client_pvt is NULL!"); + return; + } + switch (l_client_pvt->stage) { + case STAGE_STREAM_STREAMING: { + size_t i; + bool ready_to_write = false; + // log_it(DEBUG,"Process channels data output (%u channels)",STREAM(sh)->channel_count); + + for(i = 0; i < l_client_pvt->stream->channel_count; i++) { + dap_stream_ch_t * ch = l_client_pvt->stream->channel[i]; + if(ch->ready_to_write) { + ch->proc->packet_out_callback(ch, NULL); + ready_to_write |= ch->ready_to_write; } - //log_it(L_DEBUG,"stream_data_out (ready_to_write=%s)", ready_to_write?"true":"false"); + } + //log_it(L_DEBUG,"stream_data_out (ready_to_write=%s)", ready_to_write?"true":"false"); - dap_events_socket_set_writable(l_client_pvt->stream_es,ready_to_write); - //log_it(ERROR,"No stream_data_write_callback is defined"); - }break; + dap_events_socket_set_writable(l_client_pvt->stream_es, ready_to_write); + //log_it(ERROR,"No stream_data_write_callback is defined"); + } + break; } } + +void m_es_stream_error(dap_events_socket_t * a_es, void * arg) +{ + dap_client_t * l_client = DAP_CLIENT(a_es); + dap_client_pvt_t * l_client_pvt = (l_client) ? DAP_CLIENT_PVT(l_client) : NULL; + if(!l_client_pvt) { + log_it(L_ERROR, "m_es_stream_error: l_client_pvt is NULL!"); + return; + } + log_it(L_INFO, "m_es_stream_error"); +} + diff --git a/libdap b/libdap index 600ed91..999a4bc 160000 --- a/libdap +++ b/libdap @@ -1 +1 @@ -Subproject commit 600ed913252c001580ad0549d27e563c2f68be10 +Subproject commit 999a4bc46231c2398a8d010dee06628965b3c478 diff --git a/libdap-server b/libdap-server index f5f7c68..1b3a871 160000 --- a/libdap-server +++ b/libdap-server @@ -1 +1 @@ -Subproject commit f5f7c68a364d5230696c23db05730fb0aacd8089 +Subproject commit 1b3a871d6ec93801f14081de51157bbb703593f1 diff --git a/libdap-server-core b/libdap-server-core index edf554c..7c6065b 160000 --- a/libdap-server-core +++ b/libdap-server-core @@ -1 +1 @@ -Subproject commit edf554c77e05146aee6a90a6a15fab28ba38b626 +Subproject commit 7c6065bc699760e6e66ee4e80861a3562b2366c1 diff --git a/libdap-server-udp b/libdap-server-udp new file mode 160000 index 0000000..19a6376 --- /dev/null +++ b/libdap-server-udp @@ -0,0 +1 @@ +Subproject commit 19a6376646f497d97bfe1ea3fade1a907c32f76a diff --git a/libdap-stream b/libdap-stream index 9b68cf5..8fa982b 160000 --- a/libdap-stream +++ b/libdap-stream @@ -1 +1 @@ -Subproject commit 9b68cf5b9375e19e2db11a21f7ed0fd9b2b71c7f +Subproject commit 8fa982b5333a4ea85429fe772987f9d84e5fb96e diff --git a/libdap-stream-ch b/libdap-stream-ch index f5ba302..dfbdaa1 160000 --- a/libdap-stream-ch +++ b/libdap-stream-ch @@ -1 +1 @@ -Subproject commit f5ba302012357d175ff532d11125699bcc8bcc96 +Subproject commit dfbdaa1df0498069e60ffaf42de963375bffa816 diff --git a/test/libdap-test b/test/libdap-test index d225778..b76175a 160000 --- a/test/libdap-test +++ b/test/libdap-test @@ -1 +1 @@ -Subproject commit d2257789e0c796a5a3b637e14dcbaf8a8c7880cc +Subproject commit b76175acc517f085c319c8e66c62bd143f96bf94 -- GitLab