diff --git a/CMakeLists.txt b/CMakeLists.txt index 137487f6b1413c6d24bd50fb1e4c690db8ddc064..bc2c58273ff6b0e91afaf36772c9a14d0f50c41d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -26,7 +26,7 @@ include_directories("${INCLUDE_DIRECTORIES} ${dap_server_udp_INCLUDE_DIRS}") include_directories("${INCLUDE_DIRECTORIES} ${dap_stream_INCLUDE_DIRS}") include_directories("${INCLUDE_DIRECTORIES} ${dap_session_INCLUDE_DIRS}") -target_link_libraries(${PROJECT_NAME} dap_core dap_server_core dap_server dap_server_udp dap_stream dap_session) +target_link_libraries(${PROJECT_NAME} dap_core dap_core_server dap_udp_server dap_stream dap_session) target_include_directories(dap_client INTERFACE .) diff --git a/dap_client.c b/dap_client.c index 2e15c0b7eeb4e67eef120b404663b492081c5f6f..e112f5c5ac9faaa8375407e0b0eed67421898374 100644 --- a/dap_client.c +++ b/dap_client.c @@ -105,7 +105,7 @@ void dap_client_reset(dap_client_t * a_client) dap_enc_key_delete(l_client_internal->stream_key ); l_client_internal->stream_key = NULL; } - l_client_internal->es_stream = NULL; + l_client_internal->stream_es = NULL; l_client_internal->stage = STAGE_BEGIN; l_client_internal->stage_status = STAGE_STATUS_DONE ; @@ -304,7 +304,8 @@ const char * dap_client_stage_str(dap_client_stage_t a_stage) case STAGE_BEGIN: return "BEGIN"; case STAGE_ENC_INIT: return "ENC"; case STAGE_STREAM_CTL: return "STREAM_CTL"; - case STAGE_STREAM: return "STREAM"; + case STAGE_STREAM_SESSION: return "STREAM_SESSION"; + case STAGE_STREAM_STREAMING: return "STREAM"; default: return "UNDEFINED"; } } diff --git a/dap_client.h b/dap_client.h index 38f03eba17fb9d316c1f05c349a9ff5b0bad0995..5d248f4443767e28d4e94e59d1b562479ee4ee5c 100644 --- a/dap_client.h +++ b/dap_client.h @@ -33,7 +33,9 @@ typedef enum dap_client_stage { STAGE_BEGIN=0, STAGE_ENC_INIT=1, STAGE_STREAM_CTL=2, - STAGE_STREAM=3, + STAGE_STREAM_SESSION=3, + STAGE_STREAM_CONNECTED=4, + STAGE_STREAM_STREAMING=5 } dap_client_stage_t; typedef enum dap_client_stage_status { @@ -115,6 +117,8 @@ const char * dap_client_get_stream_id(dap_client_t * a_client); dap_client_stage_t dap_client_get_stage(dap_client_t * a_client); dap_client_stage_status_t dap_client_get_stage_status(dap_client_t * a_client); +#define DAP_CLIENT(a) ((dap_client_t *) (a)->_inheritor ) + #ifdef __cplusplus } #endif diff --git a/dap_client_pool.c b/dap_client_pool.c index e2c0265b56e6e5094879d4afb08b1e3891b38975..e386c4206532118eecfcb486a11e50b193ae8534 100644 --- a/dap_client_pool.c +++ b/dap_client_pool.c @@ -24,7 +24,7 @@ struct dap_client_list{ void s_stage_status_callback(dap_client_t * a_client, void* a_arg); void s_stage_status_error_callback(dap_client_t * a_client, void* a_arg); -dap_events_t * s_events = NULL: +dap_events_t * s_events = NULL; /** * @brief dap_client_pool_init * @param a_events diff --git a/dap_client_pvt.c b/dap_client_pvt.c index f2956443f5653011419aae13330879d0682ef06e..3382f045199246b372ceb05793c5bbfa8910d051 100644 --- a/dap_client_pvt.c +++ b/dap_client_pvt.c @@ -21,6 +21,8 @@ You should have received a copy of the GNU General Public License along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. */ +#include <sys/types.h> /* See NOTES */ +#include <sys/socket.h> #include <stdio.h> #include <string.h> @@ -31,9 +33,13 @@ #include "dap_enc.h" #include "dap_common.h" -#include "dap_http_client.h" +#include "dap_http_client_simple.h" #include "dap_client.h" #include "dap_client_pvt.h" +#include "dap_stream.h" +#include "dap_stream_ch.h" +#include "dap_stream_ch_proc.h" +#include "dap_stream_ch_pkt.h" #define LOG_TAG "dap_client_pvt" @@ -51,11 +57,20 @@ void m_enc_init_error(dap_client_t *, int); // STREAM_CTL stage callbacks void m_stream_ctl_response(dap_client_t *, void *, size_t); void m_stream_ctl_error(dap_client_t *, int); +void m_stage_stream_streaming (dap_client_t * a_client, void* arg); + void m_request_response(void * a_response,size_t a_response_size,void * a_obj); void m_request_error(int,void *); +// stream callbacks + +void m_es_stream_delete(dap_events_socket_t * a_es, void * arg); +void m_es_stream_read(dap_events_socket_t * a_es, void * arg); +void m_es_stream_write(dap_events_socket_t * a_es, void * arg); +void m_es_stream_error(dap_events_socket_t * a_es, void * arg); + /** * @brief dap_client_internal_init * @return @@ -145,11 +160,16 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) m_stream_ctl_response, m_stream_ctl_error); DAP_DELETE(l_request); }break; - case STAGE_STREAM:{ + case STAGE_STREAM_SESSION:{ + log_it(L_INFO,"Go to stage STREAM_SESSION: process the state ops"); + a_client_pvt->stage_status = STAGE_STATUS_DONE; + s_stage_status_after(a_client_pvt); + } break; + case STAGE_STREAM_STREAMING:{ log_it(L_INFO,"Go to stage STREAM: prepare the socket"); a_client_pvt->stream_socket = socket(PF_INET,SOCK_STREAM,0); - setsockopt(a_client_pvt->stream_socket,SOL_SOCKET,SO_SNDBUF,(const void*) 50 000,sizeof(int) ); - setsockopt(a_client_pvt->stream_socket,SOL_SOCKET,SO_RCVBUF,(const void *) 50 000,sizeof(int) ); + setsockopt(a_client_pvt->stream_socket,SOL_SOCKET,SO_SNDBUF,(const void*) 50000,sizeof(int) ); + setsockopt(a_client_pvt->stream_socket,SOL_SOCKET,SO_RCVBUF,(const void *) 50000,sizeof(int) ); // Wrap socket and setup callbacks static dap_events_socket_callbacks_t l_s_callbacks={ .read_callback = m_es_stream_read , @@ -160,7 +180,7 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt) a_client_pvt->stream_es = dap_events_socket_wrap_no_add(a_client_pvt->events, a_client_pvt->stream_socket, &l_s_callbacks); - a_client_pvt->stream_es->_inheritor = a_client; + a_client_pvt->stream_es->_inheritor = a_client_pvt->client; a_client_pvt->stream = dap_stream_new_es(a_client_pvt->stream_es); a_client_pvt->stream->is_client_to_uplink = true; a_client_pvt->stream_session = dap_stream_session_pure_new(); @@ -596,3 +616,116 @@ void m_stream_ctl_error(dap_client_t * a_client, int a_error) s_stage_status_after(l_client_pvt); } + +/** + * @brief m_stage_stream_opened + * @param a_client + * @param arg + */ +void m_stage_stream_streaming(dap_client_t * a_client, void* arg) +{ + log_it(L_INFO, "Stream is opened"); +} + +/** + * @brief m_es_stream_delete + * @param a_es + * @param arg + */ +void m_es_stream_delete(dap_events_socket_t * a_es, void * arg) +{ + log_it(L_INFO, "====================================================== stream delete/peer reconnect"); + dap_client_t* l_client = DAP_CLIENT(a_es); + if(l_client == NULL ){ + log_it(L_ERROR, "dap_client is not initialized"); + return; + } + + dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client); + if(l_client_pvt == NULL ){ + log_it(L_ERROR, "dap_client_pvt is not initialized"); + return; + } + + dap_stream_delete(l_client_pvt->stream); + l_client_pvt->stream = NULL; + if(l_client_pvt->client) + dap_client_reset(l_client_pvt->client); + +// l_client_pvt->client= NULL; + l_client_pvt->stream_es = NULL; + dap_stream_session_close(l_client_pvt->stream_session->id); + l_client_pvt->stream_session = NULL; + dap_client_go_stage(l_client_pvt->client ,STAGE_STREAM_STREAMING ,m_stage_stream_streaming ); +} + +/** + * @brief m_es_stream_read + * @param a_es + * @param arg + */\ +void m_es_stream_read(dap_events_socket_t * a_es, void * arg) +{ + dap_client_t * l_client = DAP_CLIENT(a_es); + dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client); + switch( l_client_pvt->stage ){ + case STAGE_STREAM_SESSION: + dap_client_go_stage(l_client_pvt->client ,STAGE_STREAM_STREAMING ,m_stage_stream_streaming ); + break; + case STAGE_STREAM_CONNECTED:{ // Collect HTTP headers before streaming + if(a_es->buf_in_size>1){ + char * l_pos_endl; + l_pos_endl = (char*) memchr( a_es->buf_in,'\r',a_es->buf_in_size-1); + if ( l_pos_endl ){ + if ( *(l_pos_endl+1) == '\n' ) { + dap_events_socket_shrink_buf_in(a_es,l_pos_endl - a_es->buf_in_str ); + log_it(L_DEBUG,"Header passed, go to streaming (%lu bytes already are in input buffer", a_es->buf_in_size); + l_client_pvt->stage =STAGE_STREAM_STREAMING; + l_client_pvt->stage_status = STAGE_STATUS_DONE; + s_stage_status_after(l_client_pvt); + + dap_stream_data_proc_read(l_client_pvt->stream); + dap_events_socket_shrink_buf_in(a_es,a_es->buf_in_size ); + } + } + } + }break; + case STAGE_STREAM_STREAMING:{ // if streaming - process data with stream processor + dap_stream_data_proc_read(l_client_pvt->stream); + dap_events_socket_shrink_buf_in(a_es,a_es->buf_in_size ); + } + break; + + } +} + +/** + * @brief m_es_stream_write + * @param a_es + * @param arg + */ +void m_es_stream_write(dap_events_socket_t * a_es, void * arg) +{ + dap_client_t * l_client = DAP_CLIENT(a_es); + dap_client_pvt_t * l_client_pvt = DAP_CLIENT_PVT(l_client); + + switch( l_client_pvt->stage ){ + case STAGE_STREAM_STREAMING:{ + size_t i; + bool ready_to_write=false; + // log_it(DEBUG,"Process channels data output (%u channels)",STREAM(sh)->channel_count); + + for(i=0;i< l_client_pvt->stream->channel_count; i++){ + dap_stream_ch_t * ch = l_client_pvt->stream->channel[i]; + if(ch->ready_to_write){ + ch->proc->packet_out_callback(ch,NULL); + ready_to_write|=ch->ready_to_write; + } + } + //log_it(L_DEBUG,"stream_data_out (ready_to_write=%s)", ready_to_write?"true":"false"); + + dap_events_socket_set_writable(l_client_pvt->stream_es,ready_to_write); + //log_it(ERROR,"No stream_data_write_callback is defined"); + }break; + } +} diff --git a/dap_client_pvt.h b/dap_client_pvt.h index 7d4c901014e8ae386fe593cd13612ff5e8f05c79..a25c1d5b4d6286ef6c031295d296386d05fc04e7 100644 --- a/dap_client_pvt.h +++ b/dap_client_pvt.h @@ -26,7 +26,7 @@ #include <stdbool.h> #include <stdint.h> #include "dap_client.h" -#include "stream.h" +#include "dap_stream.h" #include "dap_events_socket.h" typedef struct dap_enc_key dap_enc_key_t; typedef struct dap_http_client dap_http_client_t; @@ -37,8 +37,10 @@ typedef struct dap_client_internal dap_http_client_t * http_client; - dap_events_socket_t * es_stream; + dap_events_socket_t * stream_es; int stream_socket; + dap_stream_session_t * stream_session; + dap_stream_t* stream; dap_events_t * events; dap_enc_key_t * session_key_open; // Open assymetric keys exchange @@ -46,6 +48,7 @@ typedef struct dap_client_internal dap_enc_key_t * stream_key; // Stream private key for stream encryption char stream_id[25]; + char * session_key_id; diff --git a/dap_events_socket.c b/dap_events_socket.c index fbefc8297ab9a961e0ae3b41bec959678e60c3d1..da97454dbe3468ab62dcb19aebb81e4d163ba7ec 100755 --- a/dap_events_socket.c +++ b/dap_events_socket.c @@ -31,11 +31,9 @@ #include <unistd.h> #include <string.h> #include <assert.h> -#include "common.h" +#include "dap_common.h" #include "dap_events.h" -#ifdef dap_SERVER -#include "dap_server.h" -#endif + #include "dap_events_socket.h" #define LOG_TAG "dap_events_socket" @@ -67,13 +65,13 @@ void dap_events_socket_deinit() * @param a_callbacks * @return */ -dap_events_socket_t * dap_events_socket_wrap_no_add(struct dap_events * a_events, +dap_events_socket_t * dap_events_socket_wrap_no_add( dap_events_t * a_events, int a_sock, dap_events_socket_callbacks_t * a_callbacks) { assert(a_events); assert(a_callbacks); - log_it(L_DEBUG,"Sap event socket wrapped around %d sock", a_sock); - dap_events_socket_t * ret = dap_NEW_Z(dap_events_socket_t); + log_it(L_DEBUG,"Dap event socket wrapped around %d sock", a_sock); + dap_events_socket_t * ret = DAP_NEW_Z(dap_events_socket_t); ret->socket = a_sock; ret->events = a_events; ret->callbacks = a_callbacks; @@ -113,13 +111,11 @@ dap_events_socket_t * dap_events_socket_wrap2(dap_server_t * a_server, struct da assert(a_callbacks); assert(a_server); log_it(L_DEBUG,"Sap event socket wrapped around %d sock", a_sock); - dap_events_socket_t * ret = dap_NEW_Z(dap_events_socket_t); + dap_events_socket_t * ret = DAP_NEW_Z(dap_events_socket_t); ret->socket = a_sock; ret->events = a_events; ret->callbacks = a_callbacks; -#ifdef dap_SERVER - ret->server = a_server; -#endif + ret->_ready_to_read=true; ret->is_pingable = true; diff --git a/dap_events_socket.h b/dap_events_socket.h index 32222425433fb9dade630a4a2cc431f4028cb5bf..09e8cb3f323ec5239460a6053f5dada417a903c4 100755 --- a/dap_events_socket.h +++ b/dap_events_socket.h @@ -41,7 +41,7 @@ typedef struct dap_events_socket_callbacks{ } dap_events_socket_callbacks_t; -#define dap_EVENTS_SOCKET_BUF 10000000 +#define DAP_EVENTS_SOCKET_BUF 100000 typedef struct dap_events_socket{ int socket; @@ -52,12 +52,12 @@ typedef struct dap_events_socket{ uint32_t buf_out_zero_count; union{ - uint8_t buf_in[dap_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data - char buf_in_str[dap_EVENTS_SOCKET_BUF+1]; + uint8_t buf_in[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for input data + char buf_in_str[DAP_EVENTS_SOCKET_BUF+1]; }; size_t buf_in_size; // size of data that is in the input buffer - uint8_t buf_out[dap_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data + uint8_t buf_out[DAP_EVENTS_SOCKET_BUF+1]; // Internal buffer for output data char hostaddr[1024]; // Address char service[128]; @@ -65,9 +65,7 @@ typedef struct dap_events_socket{ size_t buf_out_size; // size of data that is in the output buffer struct dap_events * events; -#ifdef dap_SERVER - struct dap_server * server; -#endif + struct dap_worker* dap_worker; dap_events_socket_callbacks_t *callbacks; @@ -89,10 +87,7 @@ void dap_events_socket_create_after(dap_events_socket_t * a_es); dap_events_socket_t * dap_events_socket_wrap_no_add(struct dap_events * a_events, int s, dap_events_socket_callbacks_t * a_callbacks); // Create new client and add it to the list -#ifdef dap_SERVER -dap_events_socket_t * dap_events_socket_wrap2(dap_server_t *a_server,struct dap_events * a_events, - int s, dap_events_socket_callbacks_t * a_callbacks); // Create new client and add it to the list -#endif + dap_events_socket_t * dap_events_socket_find(int sock, struct dap_events * sh); // Find client by socket