diff --git a/libdap-stream b/libdap-stream deleted file mode 160000 index a4dda122735ad7ff8523d1f53b5e22696fe93d32..0000000000000000000000000000000000000000 --- a/libdap-stream +++ /dev/null @@ -1 +0,0 @@ -Subproject commit a4dda122735ad7ff8523d1f53b5e22696fe93d32 diff --git a/libdap-stream/.gitignore b/libdap-stream/.gitignore new file mode 100755 index 0000000000000000000000000000000000000000..15cef3cea89f1889fb9e7788be912afa92240236 --- /dev/null +++ b/libdap-stream/.gitignore @@ -0,0 +1,54 @@ +# Prerequisites +*.d +build/ +# Object files +*.o +*.ko +*.obj +*.elf + +# Linker output +*.ilk +*.map +*.exp + +# Precompiled Headers +*.gch +*.pch + +# Libraries +*.lib +*.a +*.la +*.lo + +# Shared objects (inc. Windows DLLs) +*.dll +*.so +*.so.* +*.dylib + +# Executables +*.exe +*.out +*.app +*.i*86 +*.x86_64 +*.hex + +# Debug files +*.dSYM/ +*.su +*.idb +*.pdb + +# Kernel Module Compile Results +*.mod* +*.cmd +.tmp_versions/ +modules.order +Module.symvers +Mkfile.old +dkms.conf + +*.txt.user diff --git a/libdap-stream/.gitmodules b/libdap-stream/.gitmodules new file mode 100755 index 0000000000000000000000000000000000000000..35a9e29c25b267f7b4127426d9217677490b5d66 --- /dev/null +++ b/libdap-stream/.gitmodules @@ -0,0 +1,32 @@ +[submodule "libdap"] + path = libdap + url = https://gitlab.demlabs.net/cellframe/libdap + branch = master +[submodule "libdap-server-core"] + path = libdap-server-core + url = https://gitlab.demlabs.net/cellframe/libdap-server-core + branch = master +[submodule "libdap-server"] + path = libdap-server + url = https://gitlab.demlabs.net/cellframe/libdap-server + branch = master +[submodule "libdap-server-udp"] + path = libdap-server-udp + url = https://gitlab.demlabs.net/cellframe/libdap-server-udp + branch = master +[submodule "test/libdap-test"] + path = test/libdap-test + url = https://gitlab.demlabs.net/cellframe/libdap-test + branch = master +[submodule "test/libdap-stream-ch"] + path = libdap-stream-ch + url = https://gitlab.demlabs.net/cellframe/libdap-stream-ch + branch = master +[submodule "libdap-stream-ch"] + path = libdap-stream-ch + url = https://gitlab.demlabs.net/cellframe/libdap-stream-ch + branch = master +[submodule "libdap-crypto"] + path = libdap-crypto + url = https://gitlab.demlabs.net/cellframe/libdap-crypto + branch = master diff --git a/libdap-stream/.travis.yml b/libdap-stream/.travis.yml new file mode 100755 index 0000000000000000000000000000000000000000..ff468b80fc6b94d0f13473bfcd4a9966403539f2 --- /dev/null +++ b/libdap-stream/.travis.yml @@ -0,0 +1,25 @@ +language: c +compiler: gcc +dist: xenial +notifications: + email: false + +before_install: + - git submodule init + - git submodule update + +script: + - mkdir build + - cd build + - cmake -DBUILD_DAP_STREAM_TESTS=ON ../ + - make + - ctest --verbose + +addons: + apt: + sources: + - ubuntu-toolchain-r-test + packages: + - libjson-c-dev + - libev-dev + - libmagic-dev diff --git a/libdap-stream/CMakeLists.txt b/libdap-stream/CMakeLists.txt new file mode 100755 index 0000000000000000000000000000000000000000..3e477f95a24380f3ae2552163e8dff0b3986fe96 --- /dev/null +++ b/libdap-stream/CMakeLists.txt @@ -0,0 +1,35 @@ +project(libdap-stream C) +cmake_minimum_required(VERSION 3.0) + +if(NOT SUBMODULES_NO_BUILD) + if ( NOT ( TARGET dap_core ) ) + add_subdirectory(libdap) + endif() + if ( NOT ( TARGET dap_crypto ) ) + add_subdirectory(libdap-crypto) + endif() + + if ( NOT ( TARGET libdap-server ) ) + add_subdirectory(libdap-server) + endif() + + if ( NOT ( TARGET dap_server_core ) ) + add_subdirectory(libdap-server-core) + endif() + + if ( NOT ( TARGET dap_server_udp ) ) + add_subdirectory(libdap-server-udp) + endif() + + if ( NOT ( TARGET dap_stream_ch ) ) + add_subdirectory(libdap-stream-ch) + endif() +endif() + +if(BUILD_DAP_STREAM_TESTS) + enable_testing() + add_subdirectory(test) +endif() + +add_subdirectory(session) +add_subdirectory(stream) diff --git a/libdap-stream/LICENSE b/libdap-stream/LICENSE new file mode 100755 index 0000000000000000000000000000000000000000..65c5ca88a67c30becee01c5a8816d964b03862f9 --- /dev/null +++ b/libdap-stream/LICENSE @@ -0,0 +1,165 @@ + GNU LESSER GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/> + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + + This version of the GNU Lesser General Public License incorporates +the terms and conditions of version 3 of the GNU General Public +License, supplemented by the additional permissions listed below. + + 0. Additional Definitions. + + As used herein, "this License" refers to version 3 of the GNU Lesser +General Public License, and the "GNU GPL" refers to version 3 of the GNU +General Public License. + + "The Library" refers to a covered work governed by this License, +other than an Application or a Combined Work as defined below. + + An "Application" is any work that makes use of an interface provided +by the Library, but which is not otherwise based on the Library. +Defining a subclass of a class defined by the Library is deemed a mode +of using an interface provided by the Library. + + A "Combined Work" is a work produced by combining or linking an +Application with the Library. The particular version of the Library +with which the Combined Work was made is also called the "Linked +Version". + + The "Minimal Corresponding Source" for a Combined Work means the +Corresponding Source for the Combined Work, excluding any source code +for portions of the Combined Work that, considered in isolation, are +based on the Application, and not on the Linked Version. + + The "Corresponding Application Code" for a Combined Work means the +object code and/or source code for the Application, including any data +and utility programs needed for reproducing the Combined Work from the +Application, but excluding the System Libraries of the Combined Work. + + 1. Exception to Section 3 of the GNU GPL. + + You may convey a covered work under sections 3 and 4 of this License +without being bound by section 3 of the GNU GPL. + + 2. Conveying Modified Versions. + + If you modify a copy of the Library, and, in your modifications, a +facility refers to a function or data to be supplied by an Application +that uses the facility (other than as an argument passed when the +facility is invoked), then you may convey a copy of the modified +version: + + a) under this License, provided that you make a good faith effort to + ensure that, in the event an Application does not supply the + function or data, the facility still operates, and performs + whatever part of its purpose remains meaningful, or + + b) under the GNU GPL, with none of the additional permissions of + this License applicable to that copy. + + 3. Object Code Incorporating Material from Library Header Files. + + The object code form of an Application may incorporate material from +a header file that is part of the Library. You may convey such object +code under terms of your choice, provided that, if the incorporated +material is not limited to numerical parameters, data structure +layouts and accessors, or small macros, inline functions and templates +(ten or fewer lines in length), you do both of the following: + + a) Give prominent notice with each copy of the object code that the + Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the object code with a copy of the GNU GPL and this license + document. + + 4. Combined Works. + + You may convey a Combined Work under terms of your choice that, +taken together, effectively do not restrict modification of the +portions of the Library contained in the Combined Work and reverse +engineering for debugging such modifications, if you also do each of +the following: + + a) Give prominent notice with each copy of the Combined Work that + the Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the Combined Work with a copy of the GNU GPL and this license + document. + + c) For a Combined Work that displays copyright notices during + execution, include the copyright notice for the Library among + these notices, as well as a reference directing the user to the + copies of the GNU GPL and this license document. + + d) Do one of the following: + + 0) Convey the Minimal Corresponding Source under the terms of this + License, and the Corresponding Application Code in a form + suitable for, and under terms that permit, the user to + recombine or relink the Application with a modified version of + the Linked Version to produce a modified Combined Work, in the + manner specified by section 6 of the GNU GPL for conveying + Corresponding Source. + + 1) Use a suitable shared library mechanism for linking with the + Library. A suitable mechanism is one that (a) uses at run time + a copy of the Library already present on the user's computer + system, and (b) will operate properly with a modified version + of the Library that is interface-compatible with the Linked + Version. + + e) Provide Installation Information, but only if you would otherwise + be required to provide such information under section 6 of the + GNU GPL, and only to the extent that such information is + necessary to install and execute a modified version of the + Combined Work produced by recombining or relinking the + Application with a modified version of the Linked Version. (If + you use option 4d0, the Installation Information must accompany + the Minimal Corresponding Source and Corresponding Application + Code. If you use option 4d1, you must provide the Installation + Information in the manner specified by section 6 of the GNU GPL + for conveying Corresponding Source.) + + 5. Combined Libraries. + + You may place library facilities that are a work based on the +Library side by side in a single library together with other library +facilities that are not Applications and are not covered by this +License, and convey such a combined library under terms of your +choice, if you do both of the following: + + a) Accompany the combined library with a copy of the same work based + on the Library, uncombined with any other library facilities, + conveyed under the terms of this License. + + b) Give prominent notice with the combined library that part of it + is a work based on the Library, and explaining where to find the + accompanying uncombined form of the same work. + + 6. Revised Versions of the GNU Lesser General Public License. + + The Free Software Foundation may publish revised and/or new versions +of the GNU Lesser General Public License from time to time. Such new +versions will be similar in spirit to the present version, but may +differ in detail to address new problems or concerns. + + Each version is given a distinguishing version number. If the +Library as you received it specifies that a certain numbered version +of the GNU Lesser General Public License "or any later version" +applies to it, you have the option of following the terms and +conditions either of that published version or of any later version +published by the Free Software Foundation. If the Library as you +received it does not specify a version number of the GNU Lesser +General Public License, you may choose any version of the GNU Lesser +General Public License ever published by the Free Software Foundation. + + If the Library as you received it specifies that a proxy can decide +whether future versions of the GNU Lesser General Public License shall +apply, that proxy's public statement of acceptance of any version is +permanent authorization for you to choose that version for the +Library. diff --git a/libdap-stream/README.md b/libdap-stream/README.md new file mode 100755 index 0000000000000000000000000000000000000000..e0b4a683183628c22ea0cba8ec8a42579e2e01f3 --- /dev/null +++ b/libdap-stream/README.md @@ -0,0 +1,4 @@ +# libdap-stream +Deus Applications Prototypes: multichannel stream + +[](https://travis-ci.com/kelvinblockchain/libdap-stream) diff --git a/libdap-stream/libdap b/libdap-stream/libdap new file mode 160000 index 0000000000000000000000000000000000000000..78cc4a3aca1775288662ef7a9f49f7b747479e15 --- /dev/null +++ b/libdap-stream/libdap @@ -0,0 +1 @@ +Subproject commit 78cc4a3aca1775288662ef7a9f49f7b747479e15 diff --git a/libdap-stream/libdap-crypto b/libdap-stream/libdap-crypto new file mode 160000 index 0000000000000000000000000000000000000000..bcfcef1cd87cc6cfbc7e47887894c0fa3742a218 --- /dev/null +++ b/libdap-stream/libdap-crypto @@ -0,0 +1 @@ +Subproject commit bcfcef1cd87cc6cfbc7e47887894c0fa3742a218 diff --git a/libdap-stream/libdap-server b/libdap-stream/libdap-server new file mode 160000 index 0000000000000000000000000000000000000000..406931547486dbe7eadb5c4b4d92c288d761db66 --- /dev/null +++ b/libdap-stream/libdap-server @@ -0,0 +1 @@ +Subproject commit 406931547486dbe7eadb5c4b4d92c288d761db66 diff --git a/libdap-stream/libdap-server-core b/libdap-stream/libdap-server-core new file mode 160000 index 0000000000000000000000000000000000000000..f3aa992e316e0af2814f741eb5e06b93a9eca651 --- /dev/null +++ b/libdap-stream/libdap-server-core @@ -0,0 +1 @@ +Subproject commit f3aa992e316e0af2814f741eb5e06b93a9eca651 diff --git a/libdap-stream/libdap-server-udp b/libdap-stream/libdap-server-udp new file mode 160000 index 0000000000000000000000000000000000000000..bbf1b9db2c3af9a3a59f6debb90ddc8db55ae2eb --- /dev/null +++ b/libdap-stream/libdap-server-udp @@ -0,0 +1 @@ +Subproject commit bbf1b9db2c3af9a3a59f6debb90ddc8db55ae2eb diff --git a/libdap-stream/libdap-stream-ch b/libdap-stream/libdap-stream-ch new file mode 160000 index 0000000000000000000000000000000000000000..09e76911c2f76c6cc1c4651a99f0e545c1c1c840 --- /dev/null +++ b/libdap-stream/libdap-stream-ch @@ -0,0 +1 @@ +Subproject commit 09e76911c2f76c6cc1c4651a99f0e545c1c1c840 diff --git a/libdap-stream/session/CMakeLists.txt b/libdap-stream/session/CMakeLists.txt new file mode 100755 index 0000000000000000000000000000000000000000..4c702cfbd85a65283c50b5641961e8a486b4d193 --- /dev/null +++ b/libdap-stream/session/CMakeLists.txt @@ -0,0 +1,16 @@ +cmake_minimum_required(VERSION 2.8) +project (dap_session) + +set(SESSION_SRCS dap_stream_session.c) + +if(WIN32) + include_directories(../../3rdparty/wepoll/) + include_directories(../../3rdparty/uthash/src/) + #include_directories(../../3rdparty/curl/include/) +endif() + +add_library(${PROJECT_NAME} STATIC ${SESSION_SRCS}) + +target_link_libraries(dap_session dap_core dap_crypto) + +target_include_directories(dap_session INTERFACE .) diff --git a/libdap-stream/session/dap_stream_session.c b/libdap-stream/session/dap_stream_session.c new file mode 100644 index 0000000000000000000000000000000000000000..d2faca7c9e40c7560d7902e39ac508d05572b94d --- /dev/null +++ b/libdap-stream/session/dap_stream_session.c @@ -0,0 +1,167 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + + +#ifdef _WIN32 +#include <time.h> +#include <stdbool.h> +#include <stdint.h> +#include <stdlib.h> +#include <winsock2.h> +#include <windows.h> +#include <mswsock.h> +#include <ws2tcpip.h> +#include <io.h> +#include <pthread.h> +#endif + +#include "dap_common.h" +#include "dap_stream_session.h" + +#define LOG_TAG "dap_stream_session" + +dap_stream_session_t * sessions=NULL; + +int stream_session_close2(dap_stream_session_t * s); +static void * session_check(void * data); + +void dap_stream_session_init() +{ + log_it(L_INFO,"Init module"); + srand ( time(NULL) ); +} + +void dap_stream_session_deinit() +{ + dap_stream_session_t *current, *tmp; + log_it(L_INFO,"Destroy all the sessions"); + + HASH_ITER(hh, sessions, current, tmp) { + HASH_DEL(sessions,current); + stream_session_close2(current); + } +} + +void dap_stream_session_list() +{ + dap_stream_session_t *current, *tmp; + + log_it(L_INFO,"=== sessions list ======"); + + HASH_ITER( hh, sessions, current, tmp ) { + log_it(L_INFO,"ID %u session %X", current->id, current); + +// HASH_DEL(sessions,current); +// stream_session_close2(current); + } + + log_it(L_INFO,"=== sessions list ======"); +} + + +static void * session_check(void * data) +{ + return NULL; +} + + +dap_stream_session_t * dap_stream_session_pure_new() +{ + dap_stream_session_t * ret=NULL; + + unsigned int session_id=0,session_id_new=0; + do{ + session_id_new=session_id=rand()+rand()*0x100+rand()*0x10000+rand()*0x01000000; + HASH_FIND_INT(sessions,&session_id_new,ret); + }while(ret); + log_it(L_INFO,"Creating new session id %u",session_id); + ret=DAP_NEW_Z(dap_stream_session_t); + pthread_mutex_init(&ret->mutex, NULL); + ret->id=session_id; + ret->time_created=time(NULL); + ret->create_empty=true; + ret->enc_type = 0x01; // Default encryption type + log_it(L_DEBUG,"Timestamp %u",(unsigned int) ret->time_created); + HASH_ADD_INT(sessions,id,ret); + + return ret; +} + +dap_stream_session_t * dap_stream_session_new(unsigned int media_id, bool open_preview) +{ + dap_stream_session_t * ret=dap_stream_session_pure_new(); + ret->media_id=media_id; + ret->open_preview=open_preview; + ret->create_empty=false; + + return ret; +} + +dap_stream_session_t *dap_stream_session_id( unsigned int id ) +{ + dap_stream_session_t *ret; + HASH_FIND_INT( sessions, &id, ret ); + + return ret; +} + + +int dap_stream_session_close(unsigned int id) +{ + log_it(L_INFO,"Close session id %u", id); + +// dap_stream_session_list(); + + dap_stream_session_t *l_s = dap_stream_session_id( id ); + + if(!l_s) { + log_it(L_WARNING, "Session id %u not found", id); + return -1; + } + + return stream_session_close2(l_s); +} + +int stream_session_close2(dap_stream_session_t * a_session) +{ +// log_it(L_INFO,"Close session"); + HASH_DEL(sessions,a_session); + if (a_session->callback_delete) + a_session->callback_delete(a_session, NULL); + if (a_session->_inheritor ) + DAP_DELETE(a_session->_inheritor); + DAP_DELETE(a_session); + return 0; +} + +/** + * @brief dap_stream_session_open + * @param a_session + * @return + */ +int dap_stream_session_open(dap_stream_session_t * a_session) +{ + int ret; + pthread_mutex_lock(&a_session->mutex); + ret=a_session->opened; + if(a_session->opened==0) a_session->opened=1; + pthread_mutex_unlock(&a_session->mutex); + return ret; +} diff --git a/libdap-stream/session/dap_stream_session.h b/libdap-stream/session/dap_stream_session.h new file mode 100755 index 0000000000000000000000000000000000000000..9fdfb91c061fbd4c62356f17eb39cc83863ed7e7 --- /dev/null +++ b/libdap-stream/session/dap_stream_session.h @@ -0,0 +1,76 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://gitlab.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#pragma once + +#include <pthread.h> +#include <time.h> +#include <stdbool.h> +#include <stdint.h> + +#ifndef _WIN32 +#include <netinet/in.h> +#endif + +#include "uthash.h" +#include "dap_enc_key.h" + +typedef enum stream_session_type {STREAM_SESSION_TYPE_MEDIA=0,STREAM_SESSION_TYPE_VPN} stream_session_type_t; +typedef enum stream_session_connection_type {STEAM_SESSION_HTTP = 0, STREAM_SESSION_UDP, STREAM_SESSION_END_TYPE} stream_session_connection_type_t; + +typedef struct dap_stream_session dap_stream_session_t; +typedef void (*dap_stream_session_callback_t)( dap_stream_session_t *,void*); + +struct dap_stream_session { + bool create_empty; + unsigned int id; + unsigned int media_id; + + dap_enc_key_t * key; + + bool open_preview; + pthread_mutex_t mutex; + int opened; + time_t time_created; + + uint8_t enc_type; + + char *service_key;// auth string + char active_channels[16];// channels for open + + stream_session_connection_type_t conn_type; + stream_session_type_t type; + UT_hash_handle hh; + struct in_addr tun_client_addr; + + void * _inheritor; + + dap_stream_session_callback_t callback_delete; +}; +typedef struct dap_stream_session dap_stream_session_t; + +void dap_stream_session_init(); +void dap_stream_session_deinit(); + +dap_stream_session_t * dap_stream_session_pure_new(); +dap_stream_session_t * dap_stream_session_new(unsigned int media_id, bool open_preview); +dap_stream_session_t * dap_stream_session_id(unsigned int id); +int dap_stream_session_open(dap_stream_session_t * a_session); /*Lock for opening for single client , return 0 if ok*/ +int dap_stream_session_close(unsigned int id); + diff --git a/libdap-stream/stream/CMakeLists.txt b/libdap-stream/stream/CMakeLists.txt new file mode 100755 index 0000000000000000000000000000000000000000..7bb630e624b006c673cd95eff672d713a1c66001 --- /dev/null +++ b/libdap-stream/stream/CMakeLists.txt @@ -0,0 +1,20 @@ +cmake_minimum_required(VERSION 2.8) +project (dap_stream) + +set(STREAM_SRCS + dap_stream.c + dap_stream_ctl.c + dap_stream_pkt.c ) + +if(WIN32) + include_directories(../../3rdparty/wepoll/) + include_directories(../../3rdparty/uthash/src/) + #include_directories(../../3rdparty/curl/include/) +endif() + +add_library(${PROJECT_NAME} STATIC ${STREAM_SRCS}) + +target_link_libraries(dap_stream dap_core dap_server_core dap_udp_server dap_crypto + dap_http_server dap_enc_server dap_session dap_stream_ch) + +target_include_directories(dap_stream INTERFACE .) diff --git a/libdap-stream/stream/dap_stream.c b/libdap-stream/stream/dap_stream.c new file mode 100644 index 0000000000000000000000000000000000000000..cda062260efe7e20e6537726ed188ef58ea3df92 --- /dev/null +++ b/libdap-stream/stream/dap_stream.c @@ -0,0 +1,841 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdlib.h> +#include <stdio.h> +#include <time.h> +#include <stdlib.h> +#include <stddef.h> +#include <stdint.h> + +#ifdef _WIN32 +#include <winsock2.h> +#include <windows.h> +#include <mswsock.h> +#include <ws2tcpip.h> +#include <io.h> +#include <pthread.h> +#endif + +#include "dap_common.h" + +#include "dap_stream.h" +#include "dap_stream_pkt.h" +#include "dap_stream_ch.h" +#include "dap_stream_ch_proc.h" +#include "dap_stream_ch_pkt.h" +#include "dap_stream_session.h" +#include "dap_events_socket.h" + +#include "dap_client_remote.h" +#include "dap_http.h" +#include "dap_http_client.h" +#include "dap_http_header.h" +#include "dap_udp_server.h" + + +#define LOG_TAG "stream" +#define HEADER_WITH_SIZE_FIELD 12 //This count of bytes enough for allocate memory for stream packet + +void stream_proc_pkt_in(dap_stream_t * sid); + +// Callbacks for HTTP client +void stream_headers_read(dap_http_client_t * sh, void * arg); // Prepare stream when all headers are read + +void s_headers_write(dap_http_client_t * sh, void * arg); // Output headers +void s_data_write(dap_http_client_t * sh, void * arg); // Write the data +void stream_data_read(dap_http_client_t * sh, void * arg); // Read the data + +void s_data_read(dap_client_remote_t* sh, void * arg); +void stream_dap_data_write(dap_client_remote_t* sh, void * arg); +void stream_dap_delete(dap_client_remote_t* sh, void * arg); +void stream_dap_new(dap_client_remote_t* sh,void * arg); + +// Internal functions +dap_stream_t * stream_new(dap_http_client_t * a_sh); // Create new stream +void stream_delete(dap_http_client_t * sh, void * arg); + +//struct ev_loop *keepalive_loop; +pthread_t keepalive_thread; + +static dap_stream_t *s_stream_keepalive_list = NULL; +static pthread_mutex_t s_mutex_keepalive_list; + +static void start_keepalive( dap_stream_t *sid ); +static void keepalive_cb( void ); + +static bool bKeepaliveLoopQuitSignal = false; +static bool s_dump_packet_headers = false; + +bool dap_stream_get_dump_packet_headers(){ return s_dump_packet_headers; } + +static struct timespec keepalive_loop_sleep = { 0, STREAM_KEEPALIVE_TIMEOUT * 1000 * 1000 }; + +// Start keepalive stream +static void *stream_loop( void *arg ) +{ +// keepalive_loop = ev_loop_new(0); +// ev_loop(keepalive_loop, 0); + do { + + #ifndef _WIN32 + //nanosleep( &keepalive_loop_sleep, NULL ); + sleep( STREAM_KEEPALIVE_TIMEOUT ); + #else + Sleep( STREAM_KEEPALIVE_TIMEOUT * 1000 ); + #endif + + keepalive_cb( ); + + } while ( !bKeepaliveLoopQuitSignal ); + + return NULL; +} + +/** + * @brief stream_init Init stream module + * @return 0 if ok others if not + */ +int dap_stream_init( bool a_dump_packet_headers) +{ + if( dap_stream_ch_init() != 0 ){ + + log_it(L_CRITICAL, "Can't init channel types submodule"); + return -1; + } + s_dump_packet_headers = a_dump_packet_headers; + + bKeepaliveLoopQuitSignal = false; + + pthread_mutex_init( &s_mutex_keepalive_list, NULL ); + //pthread_create( &keepalive_thread, NULL, stream_loop, NULL ); + + log_it(L_NOTICE,"Init streaming module"); + + return 0; +} + +/** + * @brief stream_media_deinit Deinint Stream module + */ +void dap_stream_deinit() +{ + bKeepaliveLoopQuitSignal = true; + pthread_join( keepalive_thread, NULL ); + + pthread_mutex_destroy( &s_mutex_keepalive_list ); + + dap_stream_ch_deinit( ); +} + +/** + * @brief stream_add_proc_http Add URL processor callback for streaming + * @param sh HTTP server instance + * @param url URL + */ +void dap_stream_add_proc_http(struct dap_http * sh, const char * url) +{ + dap_http_add_proc(sh,url,NULL,NULL,stream_delete,stream_headers_read,s_headers_write,stream_data_read,s_data_write,NULL); +} + +/** + * @brief stream_add_proc_udp Add processor callback for streaming + * @param sh UDP server instance + */ +void dap_stream_add_proc_udp(dap_udp_server_t * sh) +{ + dap_server_t* server = sh->dap_server; + server->client_read_callback = s_data_read; + server->client_write_callback = stream_dap_data_write; + server->client_delete_callback = stream_dap_delete; + server->client_new_callback = stream_dap_new; +} + +/** + * @brief stream_states_update + * @param sid stream instance + */ +void stream_states_update(struct dap_stream *sid) +{ + if(sid->conn_http) + sid->conn_http->state_write=DAP_HTTP_CLIENT_STATE_START; + size_t i; + bool ready_to_write=false; + for(i=0;i<sid->channel_count; i++) + ready_to_write|=sid->channel[i]->ready_to_write; + if(sid->conn_udp) + dap_udp_client_ready_to_write(sid->conn_udp->client,ready_to_write); + else + dap_client_remote_ready_to_write(sid->conn,ready_to_write); + if(sid->conn_http) + sid->conn_http->out_content_ready=true; +} + +/** + * @brief stream_header_read Read headers callback for HTTP + * @param cl_ht HTTP client structure + * @param arg Not used + */ +void stream_headers_read(dap_http_client_t * cl_ht, void * arg) +{ + (void) arg; + + // char * raw=0; + // int raw_size; + unsigned int id=0; + + log_it(L_DEBUG,"Prepare data stream"); + if(cl_ht->in_query_string[0]){ + log_it(L_INFO,"Query string [%s]",cl_ht->in_query_string); +// if(sscanf(cl_ht->in_query_string,"fj913htmdgaq-d9hf=%u",&id)==1){ + if(sscanf(cl_ht->in_query_string,"session_id=%u",&id) == 1 || + sscanf(cl_ht->in_query_string,"fj913htmdgaq-d9hf=%u",&id) == 1) { + dap_stream_session_t * ss=NULL; + ss=dap_stream_session_id(id); + if(ss==NULL){ + log_it(L_ERROR,"No session id %u was found",id); + cl_ht->reply_status_code=404; + strcpy(cl_ht->reply_reason_phrase,"Not found"); + }else{ + log_it(L_INFO,"Session id %u was found with channels = %s",id,ss->active_channels); + if(dap_stream_session_open(ss)==0){ // Create new stream + dap_stream_t * sid = stream_new(cl_ht); + sid->session=ss; + dap_http_header_t *header = dap_http_header_find(cl_ht->in_headers, "Service-Key"); + if (header) + ss->service_key = strdup(header->value); + size_t count_channels = strlen(ss->active_channels); + for(size_t i = 0; i < count_channels; i++) { + dap_stream_ch_new(sid, ss->active_channels[i]); + //sid->channel[i]->ready_to_write = true; + } + + cl_ht->reply_status_code=200; + strcpy(cl_ht->reply_reason_phrase,"OK"); + cl_ht->state_read=DAP_HTTP_CLIENT_STATE_DATA; + dap_client_remote_ready_to_read(cl_ht->client,true); + + stream_states_update(sid); + }else{ + log_it(L_ERROR,"Can't open session id %u",id); + cl_ht->reply_status_code=404; + strcpy(cl_ht->reply_reason_phrase,"Not found"); + } + } + } + }else{ + log_it(L_ERROR,"No query string"); + } +} + +/** + * @brief stream_new_udp Create new stream instance for UDP client + * @param sh DAP client structure + */ +dap_stream_t * stream_new_udp(dap_client_remote_t * sh) +{ + dap_stream_t * ret=(dap_stream_t*) calloc(1,sizeof(dap_stream_t)); + + ret->conn = sh; + ret->conn_udp=sh->_inheritor; + ret->buf_defrag_size = 0; + + sh->_internal=ret; + + log_it(L_NOTICE,"New stream instance udp"); + return ret; +} + +/** + * @brief check_session CHeck session status, open if need + * @param id session id + * @param cl DAP client structure + */ +void check_session( unsigned int a_id, dap_client_remote_t *a_client_remote ) +{ + dap_stream_session_t *l_session = NULL; + + l_session = dap_stream_session_id( a_id ); + + if ( l_session == NULL ) { + log_it(L_ERROR,"No session id %u was found",a_id); + return; + } + + log_it( L_INFO, "Session id %u was found with media_id = %d", a_id,l_session->media_id ); + + if ( dap_stream_session_open(l_session) != 0 ) { // Create new stream + + log_it( L_ERROR, "Can't open session id %u", a_id ); + return; + } + + dap_stream_t *l_stream; + + if ( DAP_STREAM(a_client_remote) == NULL ) + l_stream = stream_new_udp( a_client_remote ); + else + l_stream = DAP_STREAM( a_client_remote ); + + l_stream->session = l_session; + + if ( l_session->create_empty ) + log_it( L_INFO, "Session created empty" ); + + log_it( L_INFO, "Opened stream session technical and data channels" ); + + size_t count_channels = strlen(l_session->active_channels); + for (size_t i =0; i<sizeof (l_session->active_channels); i++ ) + if ( l_session->active_channels[i]) + dap_stream_ch_new( l_stream, l_session->active_channels[i] ); + + stream_states_update( l_stream ); + + if ( DAP_STREAM(a_client_remote)->conn_udp ) + dap_udp_client_ready_to_read( a_client_remote, true ); + else + dap_client_remote_ready_to_read( a_client_remote, true ); + + start_keepalive( l_stream ); +} + +/** + * @brief stream_new Create new stream instance for HTTP client + * @return New stream_t instance + */ +dap_stream_t * stream_new(dap_http_client_t * a_sh) +{ + dap_stream_t * ret=(dap_stream_t*) calloc(1,sizeof(dap_stream_t)); + + ret->conn = a_sh->client; + ret->conn_http=a_sh; + ret->buf_defrag_size = 0; + ret->seq_id = 0; + ret->client_last_seq_id_packet = (size_t)-1; + + ret->conn->_internal=ret; + + log_it(L_NOTICE,"New stream instance"); + return ret; +} + +void dap_stream_delete( dap_stream_t *a_stream ) +{ +// log_it(L_DEBUG,"dap_stream_delete( )"); + if(a_stream == NULL) { + log_it(L_ERROR,"stream delete NULL instance"); + return; + } + stream_dap_delete(a_stream->conn, NULL); + free(a_stream); +} + +/** + * @brief dap_stream_new_es + * @param a_es + * @return + */ +dap_stream_t* dap_stream_new_es(dap_events_socket_t * a_es) +{ + dap_stream_t * ret= DAP_NEW_Z(dap_stream_t); + + ret->events_socket = a_es; + ret->buf_defrag_size=0; + ret->is_client_to_uplink = true; + + log_it(L_NOTICE,"New stream with events socket instance for %s",a_es->hostaddr); + return ret; +} + +/** + * @brief s_headers_write Prepare headers for output. Creates stream structure + * @param sh HTTP client instance + * @param arg Not used + */ +void s_headers_write(dap_http_client_t * sh, void *arg) +{ + (void) arg; + + if(sh->reply_status_code==200){ + dap_stream_t *sid=DAP_STREAM(sh->client); + + dap_http_out_header_add(sh,"Content-Type","application/octet-stream"); + dap_http_out_header_add(sh,"Connnection","keep-alive"); + dap_http_out_header_add(sh,"Cache-Control","no-cache"); + + if(sid->stream_size>0) + dap_http_out_header_add_f(sh,"Content-Length","%u", (unsigned int) sid->stream_size ); + + sh->state_read=DAP_HTTP_CLIENT_STATE_DATA; + dap_client_remote_ready_to_read(sh->client,true); + } +} + +/** + Function for keepalive loop +static void keepalive_cb (EV_P_ ev_timer *w, int revents) +{ + struct dap_stream *sid = w->data; + if(sid->keepalive_passed < STREAM_KEEPALIVE_PASSES) + { + dap_stream_send_keepalive(sid); + sid->keepalive_passed+=1; + } + else{ + log_it(L_INFO, "Client disconnected"); + ev_timer_stop (keepalive_loop, &sid->keepalive_watcher); + void * arg; + stream_dap_delete(sid->conn,arg); + } +} +**/ + +static void keepalive_cb( void ) +{ + dap_stream_t *l_stream, *tmp; + return; + pthread_mutex_lock( &s_mutex_keepalive_list ); + DL_FOREACH_SAFE( s_stream_keepalive_list, l_stream, tmp ) { + if ( l_stream->keepalive_passed < STREAM_KEEPALIVE_PASSES ) { + dap_stream_send_keepalive( l_stream ); + l_stream->keepalive_passed += 1; + } + else { + log_it( L_INFO, "Client disconnected" ); + DL_DELETE( s_stream_keepalive_list, l_stream ); + stream_dap_delete( l_stream->conn, NULL ); + } + } + + pthread_mutex_unlock( &s_mutex_keepalive_list ); +} + + + + +/** + * @brief start_keepalive Start keepalive signals exchange for stream + * @param sid Stream instance + */ +void start_keepalive( dap_stream_t *sid ) { + return; +// keepalive_loop = EV_DEFAULT; +// sid->keepalive_watcher.data = sid; +// ev_timer_init (&sid->keepalive_watcher, keepalive_cb, STREAM_KEEPALIVE_TIMEOUT, STREAM_KEEPALIVE_TIMEOUT); +// ev_timer_start (keepalive_loop, &sid->keepalive_watcher); + pthread_mutex_lock( &s_mutex_keepalive_list ); + DL_APPEND( s_stream_keepalive_list, sid ); + pthread_mutex_unlock( &s_mutex_keepalive_list ); +} + +/** + * @brief stream_data_write HTTP data write callback + * @param sh HTTP client instance + * @param arg Not used + */ +void s_data_write(dap_http_client_t * sh, void * arg) +{ + (void) arg; + + if(sh->reply_status_code==200){ + stream_dap_data_write(sh->client,arg); + }else{ + log_it(L_WARNING, "Wrong request, reply status code is %u",sh->reply_status_code); + } +} + +/** + * @brief s_data_read + * @param sh + * @param arg + */ +void s_data_read(dap_client_remote_t* a_client, void * arg) +{ + dap_stream_t * l_stream =DAP_STREAM(a_client); + int * ret = (int *) arg; + + if (s_dump_packet_headers ) { + log_it(L_DEBUG,"dap_stream_data_read: client->buf_in_size=%u" , + (a_client->flags & DAP_SOCK_READY_TO_WRITE)?"true":"false", a_client->buf_in_size ); + } + *ret = dap_stream_data_proc_read( l_stream); +} + +/** + * @brief dap_stream_data_proc_read + * @param a_stream + * @return + */ +size_t dap_stream_data_proc_read (dap_stream_t *a_stream) +{ + bool found_sig=false; + dap_stream_pkt_t * pkt=NULL; + + char *buf_in = (a_stream->conn) ? (char*)a_stream->conn->buf_in : (char*)a_stream->events_socket->buf_in; + size_t buf_in_size = (a_stream->conn) ? a_stream->conn->buf_in_size : a_stream->events_socket->buf_in_size; + uint8_t *proc_data = (uint8_t *)buf_in;//a_stream->conn->buf_in; + bool proc_data_defrag=false; // We are or not in defrag buffer + size_t read_bytes_to=0; + size_t bytes_left_to_read = buf_in_size;//a_stream->conn->buf_in_size; + // Process prebuffered packets or glue defragmented data with the current input + + pkt = a_stream->pkt_buf_in; + + if ( pkt ) { // Packet signature detected + if(a_stream->pkt_buf_in_data_size < sizeof(stream_pkt_hdr_t)) + { + //At first read header + dap_stream_pkt_t* check_pkt = dap_stream_pkt_detect( proc_data , sizeof(stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size); + if(check_pkt){ + // Got duplication of packet header several times + //log_it(L_DEBUG, "Drop incorrect header part"); + a_stream->pkt_buf_in = NULL; + a_stream->pkt_buf_in_data_size=0; + return 0; + } + if(sizeof(stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size > bytes_left_to_read) + read_bytes_to = bytes_left_to_read; + else + read_bytes_to = sizeof(stream_pkt_hdr_t) - a_stream->pkt_buf_in_data_size; + memcpy((uint8_t*)a_stream->pkt_buf_in+a_stream->pkt_buf_in_data_size,proc_data,read_bytes_to); + bytes_left_to_read-=read_bytes_to; + a_stream->pkt_buf_in_data_size += read_bytes_to; + proc_data += read_bytes_to; + read_bytes_to = 0; + } + + if ((pkt->hdr.size + sizeof(stream_pkt_hdr_t) -a_stream->pkt_buf_in_data_size) < bytes_left_to_read ) { // Looks the all packet is present in buffer + read_bytes_to=(a_stream->pkt_buf_in->hdr.size + sizeof(stream_pkt_hdr_t) -a_stream->pkt_buf_in_data_size); + }else{ + read_bytes_to=bytes_left_to_read; + } + memcpy((uint8_t*)a_stream->pkt_buf_in+a_stream->pkt_buf_in_data_size,proc_data,read_bytes_to); + a_stream->pkt_buf_in_data_size+=read_bytes_to; + bytes_left_to_read-=read_bytes_to; + //log_it(L_DEBUG, "Prefilled packet buffer on %u bytes", read_bytes_to); + read_bytes_to=0; + if(a_stream->pkt_buf_in_data_size>=(a_stream->pkt_buf_in->hdr.size + sizeof(stream_pkt_hdr_t)) ){ // If we have all the packet in packet buffer + if(a_stream->pkt_buf_in_data_size > a_stream->pkt_buf_in->hdr.size + sizeof(stream_pkt_hdr_t)){ // If we have little more data then we need for packet buffer + //log_it(L_WARNING,"Prefilled packet buffer has %u bytes more than we need, they're lost",a_stream->pkt_buf_in_data_size-a_stream->pkt_buf_in->hdr.size); + a_stream->pkt_buf_in_data_size = 0; + a_stream->pkt_buf_in = NULL; + } + else{ + stream_proc_pkt_in(a_stream); + } + } + proc_data = (uint8_t *)(buf_in + buf_in_size - bytes_left_to_read);//proc_data=(a_stream->conn->buf_in + a_stream->conn->buf_in_size - bytes_left_to_read); + + }else if( a_stream->buf_defrag_size>0){ // If smth is present in defrag buffer - we glue everything together in it + if( bytes_left_to_read > 0){ // If there is smth to process in input buffer + read_bytes_to=bytes_left_to_read; + if( (read_bytes_to + a_stream->buf_defrag_size) > sizeof(a_stream->buf_defrag) ){ + //log_it(L_WARNING,"Defrag buffer is overfilled, drop that" ); + if(read_bytes_to>sizeof(a_stream->buf_defrag)) + read_bytes_to=sizeof(a_stream->buf_defrag); + a_stream->buf_defrag_size=0; + } + //log_it(L_DEBUG,"Glue together defrag %u bytes and current %u bytes", a_stream->buf_defrag_size, read_bytes_to); + memcpy(a_stream->buf_defrag+a_stream->buf_defrag_size,proc_data,read_bytes_to ); + bytes_left_to_read=a_stream->buf_defrag_size+read_bytes_to; // Then we have to read em all + read_bytes_to=0; + }else{ + bytes_left_to_read=a_stream->buf_defrag_size; + //log_it(L_DEBUG,"Nothing to glue with defrag buffer, going to process just that (%u bytes)", bytes_left_to_read); + } + //log_it(L_WARNING,"Switch to defrag buffer"); + proc_data=a_stream->buf_defrag; + proc_data_defrag=true; + }//else + // log_it(DEBUG,"No prefill or defrag buffer, process directly buf_in"); + // Now lets see how many packets we have in buffer now + while ( (pkt = dap_stream_pkt_detect( proc_data , bytes_left_to_read)) ){ + + if(pkt->hdr.size > STREAM_PKT_SIZE_MAX ){ + //log_it(L_ERROR, "stream_pkt_detect() Too big packet size %u", + // pkt->hdr.size); + bytes_left_to_read=0; + break; + } + size_t pkt_offset=( ((uint8_t*)pkt)- proc_data ); + bytes_left_to_read -= pkt_offset ; + found_sig=true; + + dap_stream_pkt_t *temp_pkt = dap_stream_pkt_detect( (uint8_t*)pkt + 1 ,pkt->hdr.size+sizeof(stream_pkt_hdr_t) ); + + if(bytes_left_to_read <(pkt->hdr.size+sizeof(stream_pkt_hdr_t) )){ // Is all the packet in da buf? + read_bytes_to=bytes_left_to_read; + }else{ + read_bytes_to=pkt->hdr.size+sizeof(stream_pkt_hdr_t); + } + + //log_it(L_DEBUG, "Detected packet signature pkt->hdr.size=%u read_bytes_to=%u bytes_left_to_read=%u pkt_offset=%u" + // ,pkt->hdr.size, read_bytes_to, bytes_left_to_read,pkt_offset); + if(read_bytes_to > HEADER_WITH_SIZE_FIELD){ // If we have size field, we can allocate memory + a_stream->pkt_buf_in_size_expected =( pkt->hdr.size+sizeof(stream_pkt_hdr_t)); + size_t pkt_buf_in_size_expected=a_stream->pkt_buf_in_size_expected; + a_stream->pkt_buf_in=(dap_stream_pkt_t *) malloc(pkt_buf_in_size_expected); + if(read_bytes_to>(pkt->hdr.size+sizeof(stream_pkt_hdr_t) )){ + //log_it(L_WARNING,"For some strange reasons we have read_bytes_to=%u is bigger than expected pkt length(%u bytes). Dropped %u bytes", + // pkt->hdr.size+sizeof(stream_pkt_hdr_t),read_bytes_to- pkt->hdr.size+sizeof(stream_pkt_hdr_t)); + read_bytes_to=(pkt->hdr.size+sizeof(stream_pkt_hdr_t)); + } + if(read_bytes_to>bytes_left_to_read){ + //log_it(L_WARNING,"For some strange reasons we have read_bytes_to=%u is bigger that's left in input buffer (%u bytes). Dropped %u bytes", + // read_bytes_to,bytes_left_to_read); + read_bytes_to=bytes_left_to_read; + } + memcpy(a_stream->pkt_buf_in,pkt,read_bytes_to); + proc_data+=(read_bytes_to + pkt_offset); + bytes_left_to_read-=read_bytes_to; + a_stream->pkt_buf_in_data_size=(read_bytes_to); + if(a_stream->pkt_buf_in_data_size==(pkt->hdr.size + sizeof(stream_pkt_hdr_t))){ + // log_it(INFO,"All the packet is present in da buffer (hdr.size=%u read_bytes_to=%u buf_in_size=%u)" + // ,sid->pkt_buf_in->hdr.size,read_bytes_to,sid->conn->buf_in_size); + stream_proc_pkt_in(a_stream); + }else if(a_stream->pkt_buf_in_data_size>pkt->hdr.size + sizeof(stream_pkt_hdr_t)){ + //log_it(L_WARNING,"Input: packet buffer has %u bytes more than we need, they're lost",a_stream->pkt_buf_in_data_size-pkt->hdr.size); + }else{ + //log_it(L_DEBUG,"Input: Not all stream packet in input (hdr.size=%u read_bytes_to=%u)",a_stream->pkt_buf_in->hdr.size,read_bytes_to); + } + }else{ + break; + } + } + /*if(!found_sig){ + log_it(L_DEBUG,"Input: Not found signature in the incomming data ( client->buf_in_size = %u *ret = %u )", + sh->client->buf_in_size, *ret); + }*/ + if(bytes_left_to_read>0){ + if(proc_data_defrag){ + memmove(a_stream->buf_defrag, proc_data, bytes_left_to_read); + a_stream->buf_defrag_size=bytes_left_to_read; + //log_it(L_INFO,"Fragment of %u bytes shifted in the begining the defrag buffer",bytes_left_to_read); + }else{ + memcpy(a_stream->buf_defrag, proc_data, bytes_left_to_read); + a_stream->buf_defrag_size=bytes_left_to_read; + //log_it(L_INFO,"Fragment of %u bytes stored in defrag buffer",bytes_left_to_read); + } + }else if(proc_data_defrag){ + a_stream->buf_defrag_size=0; + } + + return buf_in_size;//a_stream->conn->buf_in_size; +} + + +/** + * @brief stream_dap_data_write Write callback for UDP client + * @param sh DAP client instance + * @param arg Not used + */ +void stream_dap_data_write(dap_client_remote_t* a_client , void * arg){ + size_t i; + (void) arg; + bool ready_to_write=false; + //log_it(L_DEBUG,"Process channels data output (%u channels)", DAP_STREAM(a_client )->channel_count ); + + for(i=0;i<DAP_STREAM(a_client )->channel_count; i++){ + dap_stream_ch_t * ch = DAP_STREAM(a_client )->channel[i]; + if(ch->ready_to_write){ + if(ch->proc->packet_out_callback) + ch->proc->packet_out_callback(ch,NULL); + ready_to_write|=ch->ready_to_write; + } + } + if (s_dump_packet_headers ) { + log_it(L_DEBUG,"dap_stream_data_write: ready_to_write=%s client->buf_out_size=%u" , + ready_to_write?"true":"false", a_client->buf_out_size ); + } + + /* if(STREAM(sh)->conn_udp) + dap_udp_client_ready_to_write(STREAM(sh)->conn,ready_to_write); + else + dap_client_ready_to_write(sh,ready_to_write);*/ + //log_it(L_ERROR,"No stream_data_write_callback is defined"); + + //log_it(L_DEBUG,"stream_dap_data_write ok"); +} + +/** + * @brief stream_dap_delete Delete callback for UDP client + * @param sh DAP client instance + * @param arg Not used + */ +void stream_dap_delete(dap_client_remote_t* sh, void * arg){ + if(!sh) + return; + dap_stream_t * sid = DAP_STREAM(sh); + if(sid == NULL) + return; + (void) arg; + + pthread_mutex_lock(&s_mutex_keepalive_list); + if(s_stream_keepalive_list){ + DL_DELETE(s_stream_keepalive_list, sid); + } + pthread_mutex_unlock(&s_mutex_keepalive_list); + + size_t i; + for(i=0;i<sid->channel_count; i++) + dap_stream_ch_delete(sid->channel[i]); + sid->channel_count = 0; + if(sid->session) + dap_stream_session_close(sid->session->id); + sid->session = NULL; + //free(sid); + log_it(L_NOTICE,"[core] Stream connection is finished"); +} + +/** + * @brief stream_dap_new New connection callback for UDP client + * @param sh DAP client instance + * @param arg Not used + */ +void stream_dap_new(dap_client_remote_t* sh, void * arg){ +// dap_stream_t *sid = stream_new_udp(sh); + stream_new_udp(sh); +} + + +static bool _detect_loose_packet(dap_stream_t * sid) +{ + dap_stream_ch_pkt_t * ch_pkt = (dap_stream_ch_pkt_t *) sid->buf_pkt_in; + + int count_loosed_packets = ch_pkt->hdr.seq_id - (sid->client_last_seq_id_packet + 1); + if(count_loosed_packets > 0) + { + log_it(L_WARNING, "Detected loosed %d packets. " + "Last read seq_id packet: %d Current: %d", count_loosed_packets, + sid->client_last_seq_id_packet, ch_pkt->hdr.seq_id); + } else if(count_loosed_packets < 0) { + if(sid->client_last_seq_id_packet != 0 && ch_pkt->hdr.seq_id != 0) { + log_it(L_WARNING, "Something wrong. count_loosed packets %d can't less than zero. " + "Last read seq_id packet: %d Current: %d", count_loosed_packets, + sid->client_last_seq_id_packet, ch_pkt->hdr.seq_id); + } // else client don't support seqid functionality + } +// log_it(L_DEBUG, "Packet seq id: %d", ch_pkt->hdr.seq_id); +// log_it(L_DEBUG, "Last seq id: %d", sid->last_seq_id_packet); + sid->client_last_seq_id_packet = ch_pkt->hdr.seq_id; + + return false; +} + + +/** + * @brief stream_proc_pkt_in + * @param sid + */ +void stream_proc_pkt_in(dap_stream_t * a_stream) +{ + if(a_stream->pkt_buf_in->hdr.type == STREAM_PKT_TYPE_DATA_PACKET) + { + dap_stream_ch_pkt_t * l_ch_pkt = (dap_stream_ch_pkt_t *) a_stream->buf_pkt_in; + + if(dap_stream_pkt_read(a_stream,a_stream->pkt_buf_in, l_ch_pkt, STREAM_BUF_SIZE_MAX)==0){ + log_it(L_WARNING, "Input: can't decode packet size=%d",a_stream->pkt_buf_in_data_size); + } + + _detect_loose_packet(a_stream); + + dap_stream_ch_t * ch = NULL; + size_t i; + for(i=0;i<a_stream->channel_count;i++) + if(a_stream->channel[i]->proc){ + if(a_stream->channel[i]->proc->id == l_ch_pkt->hdr.id ){ + ch=a_stream->channel[i]; + } + } + if(ch){ + ch->stat.bytes_read+=l_ch_pkt->hdr.size; + if(ch->proc) + if(ch->proc->packet_in_callback){ + if ( s_dump_packet_headers ){ + log_it(L_INFO,"Income channel packet: id='%c' size=%u type=0x%02Xu seq_id=0x%016X enc_type=0x%02X",(char) l_ch_pkt->hdr.id, + l_ch_pkt->hdr.size, l_ch_pkt->hdr.type, l_ch_pkt->hdr.seq_id , l_ch_pkt->hdr.enc_type); + } + ch->proc->packet_in_callback(ch,l_ch_pkt); + } + + } else if(l_ch_pkt->hdr.id == TECHICAL_CHANNEL_ID && l_ch_pkt->hdr.type == STREAM_CH_PKT_TYPE_KEEPALIVE){ + dap_stream_send_keepalive(a_stream); + } else{ + log_it(L_WARNING, "Input: unprocessed channel packet id '%c'",(char) l_ch_pkt->hdr.id ); + } + } else if(a_stream->pkt_buf_in->hdr.type == STREAM_PKT_TYPE_SERVICE_PACKET) { + stream_srv_pkt_t * srv_pkt = (stream_srv_pkt_t *)malloc(sizeof(stream_srv_pkt_t)); + memcpy(srv_pkt,a_stream->pkt_buf_in->data,sizeof(stream_srv_pkt_t)); + uint32_t session_id = srv_pkt->session_id; + check_session(session_id,a_stream->conn); + free(srv_pkt); + } else { + log_it(L_WARNING, "Unknown header type"); + } + + a_stream->keepalive_passed = 0; + +// ev_timer_again (keepalive_loop, &a_stream->keepalive_watcher); + start_keepalive( a_stream ); + + free(a_stream->pkt_buf_in); + a_stream->pkt_buf_in=NULL; + a_stream->pkt_buf_in_data_size=0; +} + +/** + * @brief stream_data_read HTTP data read callback. Read packet and passes that to the channel's callback + * @param sh HTTP client instance + * @param arg Processed number of bytes + */ +void stream_data_read(dap_http_client_t * sh, void * arg) +{ + s_data_read(sh->client,arg); +} + + + +/** + * @brief stream_delete Delete stream and free its resources + * @param sid Stream id + */ +void stream_delete(dap_http_client_t * sh, void * arg) +{ + stream_dap_delete(sh->client,arg); +} + +/** + * @brief dap_stream_set_ready_to_write + * @param a_stream + * @param a_is_ready + */ +void dap_stream_set_ready_to_write(dap_stream_t * a_stream,bool a_is_ready) +{ + if(a_is_ready && a_stream->conn_http) + a_stream->conn_http->state_write=DAP_HTTP_CLIENT_STATE_DATA; + if(a_stream->conn_udp) + dap_udp_client_ready_to_write(a_stream->conn,a_is_ready); + // for stream server + else if(a_stream->conn) + dap_client_remote_ready_to_write(a_stream->conn,a_is_ready); + // for stream client + else if(a_stream->events_socket) + dap_events_socket_set_writable(a_stream->events_socket, a_is_ready); +} diff --git a/libdap-stream/stream/dap_stream.h b/libdap-stream/stream/dap_stream.h new file mode 100644 index 0000000000000000000000000000000000000000..aa929ecb28d209784066b0aa2975cf18084953df --- /dev/null +++ b/libdap-stream/stream/dap_stream.h @@ -0,0 +1,117 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ +#pragma once + +#include <stdio.h> +#include <stdbool.h> +#include <stdint.h> +#include <stddef.h> +#include <pthread.h> +#include <stdbool.h> + +#include "dap_stream_session.h" +#include "dap_stream_ch.h" + +#include "dap_events_socket.h" +#include "dap_udp_server.h" +#include "dap_udp_client.h" + +#define CHUNK_SIZE_MAX (3 * 1024) + +typedef struct dap_client_remote dap_client_remote_t; +typedef struct dap_udp_server dap_udp_server_t; + + +typedef struct dap_http_client dap_http_client_t; +typedef struct dap_http dap_http_t; +typedef struct dap_stream dap_stream_t; +typedef struct dap_stream_pkt dap_stream_pkt_t; +typedef struct dap_events_socket dap_events_socket_t; +#define STREAM_BUF_SIZE_MAX 500000 +#define STREAM_KEEPALIVE_TIMEOUT 3 // How often send keeplive messages (seconds) +#define STREAM_KEEPALIVE_PASSES 3 // How many messagges without answers need for disconnect client and close session + +typedef void (*dap_stream_callback)( dap_stream_t *,void*); + +typedef struct dap_stream { + + int id; + dap_stream_session_t * session; + struct dap_client_remote * conn; // Connection + + struct dap_http_client * conn_http; // HTTP-specific + + struct dap_udp_client * conn_udp; // UDP-client + dap_events_socket_t * events_socket; + + char * service_key; + + bool is_live; + bool is_client_to_uplink ; + +// ev_timer keepalive_watcher; // Watcher for keepalive loop + uint8_t keepalive_passed; // Number of sended keepalive messages + + struct dap_stream_pkt * in_pkt; + struct dap_stream_pkt *pkt_buf_in; + size_t pkt_buf_in_data_size; + size_t pkt_buf_in_size_expected; + + uint8_t buf_defrag[STREAM_BUF_SIZE_MAX]; + uint64_t buf_defrag_size; + + uint8_t buf[STREAM_BUF_SIZE_MAX]; + uint8_t buf_pkt_in[STREAM_BUF_SIZE_MAX]; + + dap_stream_ch_t *channel[255]; // TODO reduce channels to 16 to economy memory + size_t channel_count; + + size_t frame_sent; // Frame counter + + size_t seq_id; + size_t stream_size; + size_t client_last_seq_id_packet; + + struct dap_stream *prev, *next; + +} dap_stream_t; + +#define DAP_STREAM(a) ((dap_stream_t *) (a)->_internal ) + +int dap_stream_init(bool a_dump_packet_headers); + +bool dap_stream_get_dump_packet_headers(); + +void dap_stream_deinit(); + +void dap_stream_add_proc_http(dap_http_t * sh, const char * url); + +void dap_stream_add_proc_udp(dap_udp_server_t * sh); + +dap_stream_t* dap_stream_new_es(dap_events_socket_t * a_es); +size_t dap_stream_data_proc_read(dap_stream_t * a_stream); +size_t dap_stream_data_proc_write(dap_stream_t * a_stream); +void dap_stream_delete(dap_stream_t * a_stream); +void dap_stream_proc_pkt_in(dap_stream_t * sid); + +void dap_stream_es_rw_states_update(struct dap_stream *a_stream); +void dap_stream_set_ready_to_write(dap_stream_t * a_stream,bool a_is_ready); + + diff --git a/libdap-stream/stream/dap_stream_ctl.c b/libdap-stream/stream/dap_stream_ctl.c new file mode 100644 index 0000000000000000000000000000000000000000..a3829e47a7dea75d140a61f817a4a89b51da49ad --- /dev/null +++ b/libdap-stream/stream/dap_stream_ctl.c @@ -0,0 +1,191 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <time.h> +#include <stdlib.h> +#include <stddef.h> +#include <stdint.h> + +#ifdef WIN32 +#include <winsock2.h> +#include <windows.h> +#include <mswsock.h> +#include <ws2tcpip.h> +#include <io.h> +#include <pthread.h> +#endif + +#include "dap_common.h" + +#include "dap_stream.h" + +#include "dap_enc_http.h" +#include "dap_enc_key.h" + +#include "dap_http.h" +#include "dap_http_client.h" +#include "dap_client_remote.h" +#include "dap_http_simple.h" + +#include "dap_stream_session.h" +#include "dap_stream_ctl.h" +#include "http_status_code.h" + +#define LOG_TAG "dap_stream_ctl" + +const char* connection_type_str[] = +{ + [STEAM_SESSION_HTTP] = "http", + [STREAM_SESSION_UDP] = "udp" +}; + +#define DAPMP_VERSION 13 +bool stream_check_proto_version(unsigned int ver); +void s_proc(struct dap_http_simple *cl_st, void * arg); + +static struct { + size_t size; + dap_enc_key_type_t type; +} s_socket_forward_key; + + +/** + * @brief stream_ctl_init Initialize stream control module + * @return Zero if ok others if not + */ +int dap_stream_ctl_init(dap_enc_key_type_t socket_forward_key_type, + size_t socket_forward_key_size) +{ + s_socket_forward_key.type = socket_forward_key_type; + s_socket_forward_key.size = socket_forward_key_size; + log_it(L_NOTICE,"Initialized stream control module"); + return 0; +} + +/** + * @brief stream_ctl_deinit Deinit stream control module + */ +void dap_stream_ctl_deinit() +{ + +} + +/** + * @brief stream_ctl_add_proc Add stream control url processor + * @param sh HTTP server instance + * @param url URL string + */ +void dap_stream_ctl_add_proc(struct dap_http * sh, const char * url) +{ + dap_http_simple_proc_add(sh,url,14096,s_proc); +} + + +/** + * @brief s_proc Process CTL request + * @param cl_st HTTP server instance + * @param arg Not used + */ +void s_proc(struct dap_http_simple *a_http_simple, void * a_arg) +{ + http_status_code_t * return_code = (http_status_code_t*)a_arg; + + // unsigned int proto_version; + dap_stream_session_t * ss=NULL; + // unsigned int action_cmd=0; + bool l_new_session = false; + + enc_http_delegate_t *l_dg = enc_http_request_decode(a_http_simple); + + if(l_dg){ + size_t l_channels_str_size = sizeof(ss->active_channels); + char l_channels_str[sizeof(ss->active_channels)]; + if(l_dg->url_path && strlen(l_dg->url_path) < 30 && + sscanf(l_dg->url_path, "stream_ctl,channels=%s", l_channels_str) == 1) { + l_new_session = true; + } + else if(strcmp(l_dg->url_path, "socket_forward" ) == 0) { + l_channels_str[0] = '\0'; + l_new_session = true; + } + /* }else if (strcmp(dg->url_path,"stream_ctl")==0) { + l_new_session = true; + }*/ + else{ + log_it(L_ERROR,"ctl command unknown: %s",l_dg->url_path); + enc_http_delegate_delete(l_dg); + *return_code = Http_Status_MethodNotAllowed; + return; + } + if(l_new_session){ + + ss = dap_stream_session_pure_new(); + strncpy(ss->active_channels, l_channels_str, l_channels_str_size); + char *key_str = calloc(1, KEX_KEY_STR_SIZE); + dap_random_string_fill(key_str, KEX_KEY_STR_SIZE); + ss->key = dap_enc_key_new_generate( s_socket_forward_key.type, key_str, KEX_KEY_STR_SIZE, + NULL, 0, s_socket_forward_key.size); + enc_http_reply_f(l_dg,"%u %s",ss->id,key_str); + *return_code = Http_Status_OK; + + log_it(L_INFO," New stream session %u initialized",ss->id); + + free(key_str); + }else{ + log_it(L_ERROR,"Wrong request: \"%s\"",l_dg->in_query); + *return_code = Http_Status_BadRequest; + } + + unsigned int conn_t = 0; + char *ct_str = strstr(l_dg->in_query, "connection_type"); + if (ct_str) + { + sscanf(ct_str, "connection_type=%u", &conn_t); + if (conn_t < 0 || conn_t >= STREAM_SESSION_END_TYPE) + { + log_it(L_WARNING,"Error connection type : %i",conn_t); + conn_t = STEAM_SESSION_HTTP; + } + + if (ss) + { + ss->conn_type = conn_t; + } + + } + + log_it(L_INFO,"setup connection_type: %s", connection_type_str[conn_t]); + + enc_http_reply_encode(a_http_simple,l_dg); + enc_http_delegate_delete(l_dg); + }else{ + log_it(L_ERROR,"No encryption layer was initialized well"); + *return_code = Http_Status_BadRequest; + } +} + + +bool stream_check_proto_version(unsigned int ver) +{ + return ver<=DAPMP_VERSION; +} diff --git a/libdap-stream/stream/dap_stream_ctl.h b/libdap-stream/stream/dap_stream_ctl.h new file mode 100644 index 0000000000000000000000000000000000000000..94a5d62ee2dcc1b644db7c8e657c44a46fade8df --- /dev/null +++ b/libdap-stream/stream/dap_stream_ctl.h @@ -0,0 +1,29 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once +typedef struct dap_http dap_http_t; + +#define KEX_KEY_STR_SIZE 128 + +int dap_stream_ctl_init(dap_enc_key_type_t socket_forward_key_type, + size_t socket_forward_key_size); +void dap_stream_ctl_deinit(); +void dap_stream_ctl_add_proc(struct dap_http * sh, const char * url); diff --git a/libdap-stream/stream/dap_stream_pkt.c b/libdap-stream/stream/dap_stream_pkt.c new file mode 100644 index 0000000000000000000000000000000000000000..d50740bb9889ce7bfa5c92cba1e93fc1c4eafbfe --- /dev/null +++ b/libdap-stream/stream/dap_stream_pkt.c @@ -0,0 +1,182 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <time.h> +#include <stdlib.h> +#include <stddef.h> +#include <stdint.h> + +#ifdef WIN32 +#include <winsock2.h> +#include <windows.h> +#include <mswsock.h> +#include <ws2tcpip.h> +#include <io.h> +#include <pthread.h> +#endif + +#include "dap_common.h" +//#include "config.h" + + +#include "dap_client_remote.h" +#include "dap_http_client.h" + +#include "dap_enc.h" +#include "dap_enc_key.h" + +#include "dap_stream.h" +#include "dap_stream_pkt.h" +#include "dap_stream_ch.h" +#include "dap_stream_ch_pkt.h" +#include "dap_stream_ch_proc.h" + +#include "dap_enc_iaes.h" + +#define LOG_TAG "stream_pkt" + + + +static const size_t s_dap_hdr_size=8+2+1+1+4; +const uint8_t c_dap_stream_sig[8]={0xa0,0x95,0x96,0xa9,0x9e,0x5c,0xfb,0xfa}; + +dap_stream_pkt_t * dap_stream_pkt_detect(void * a_data, size_t data_size) +{ + uint8_t * sig_start=(uint8_t*) a_data; + dap_stream_pkt_t * ret=NULL; + + size_t length_left=data_size; + + while( (sig_start=memchr(sig_start, c_dap_stream_sig[0],length_left)) != NULL ){ + length_left= data_size- (size_t) ( sig_start- (uint8_t *) a_data); + if(length_left < sizeof(c_dap_stream_sig) ) + break; + if(memcmp(sig_start,c_dap_stream_sig,sizeof(c_dap_stream_sig))==0){ + ret= (dap_stream_pkt_t*) sig_start; + if(ret->hdr.size > STREAM_PKT_SIZE_MAX ){ + //log_it(L_ERROR, "Too big packet size %u",ret->hdr.size); + ret=NULL; + } + break; + }else + sig_start+=1; + } + + return ret; +} + +/** + * @brief s_encode_dummy + * @param a_buf + * @param a_buf_size + * @param a_buf_out + * @return + */ +static size_t s_encode_dummy(const void * a_buf, size_t a_buf_size, void * a_buf_out){ + if(memcpy(a_buf_out,a_buf,a_buf_size) != NULL) + return a_buf_size; + else + return 0; +} + +/** + * @brief stream_pkt_read + * @param sid + * @param pkt + * @param buf_out + */ +size_t dap_stream_pkt_read( dap_stream_t * a_stream, dap_stream_pkt_t * a_pkt, void * a_buf_out, size_t a_buf_out_size) +{ + size_t ds = a_stream->session->key->dec_na(a_stream->session->key,a_pkt->data,a_pkt->hdr.size,a_buf_out, a_buf_out_size); +// log_it(L_DEBUG,"Stream decoded %lu bytes ( last bytes 0x%02x 0x%02x 0x%02x 0x%02x ) ", ds, +// *((uint8_t *)buf_out+ds-4),*((uint8_t *)buf_out+ds-3),*((uint8_t *)buf_out+ds-2),*((uint8_t *)buf_out+ds-1) +// ); +// size_t mv=35; +// log_it(L_DEBUG,"(Decoded bytes with mv %lu bytes 0x%02x 0x%02x 0x%02x 0x%02x ) ", mv, +// *((uint8_t *)buf_out+mv-4),*((uint8_t *)buf_out+mv-3),*((uint8_t *)buf_out+mv-2),*((uint8_t *)buf_out+mv-1) +// ); + return ds; +} + + + +/** + * @brief stream_ch_pkt_write + * @param ch + * @param data + * @param data_size + * @return + */ + +size_t dap_stream_pkt_write(dap_stream_t * a_stream, const void * a_data, size_t a_data_size) +{ + size_t ret=0; + stream_pkt_hdr_t pkt_hdr; + + if(a_data_size > STREAM_BUF_SIZE_MAX ){ + log_it(L_ERROR,"Too big data size %lu, bigger than encryption buffer size %lu",a_data_size,sizeof(a_stream->buf)); + a_data_size=sizeof(a_stream->buf); + } + + memset(&pkt_hdr,0,sizeof(pkt_hdr)); + memcpy(pkt_hdr.sig,c_dap_stream_sig,sizeof(pkt_hdr.sig)); + + pkt_hdr.size =(uint32_t) a_stream->session->key->enc_na(a_stream->session->key, a_data,a_data_size,a_stream->buf, STREAM_BUF_SIZE_MAX); +// printf("*[dap_stream_pkt_write] size=%d key=0x%x _inheritor_size=%d\n", pkt_hdr.size, sid->session->key, +// sid->session->key->_inheritor_size); + + if(a_stream->conn_udp){ + ret+=dap_udp_client_write(a_stream->conn,&pkt_hdr,sizeof(pkt_hdr)); + ret+=dap_udp_client_write(a_stream->conn,a_stream->buf,pkt_hdr.size); + dap_client_remote_ready_to_write(a_stream->conn, true); + } + else if(a_stream->conn){ + ret+=dap_client_remote_write(a_stream->conn,&pkt_hdr,sizeof(pkt_hdr)); + ret+=dap_client_remote_write(a_stream->conn,a_stream->buf,pkt_hdr.size); + dap_client_remote_ready_to_write(a_stream->conn, true); + } + else if(a_stream->events_socket) { + ret += dap_events_socket_write(a_stream->events_socket, &pkt_hdr, sizeof(pkt_hdr)); + ret += dap_events_socket_write(a_stream->events_socket, a_stream->buf, pkt_hdr.size); + dap_events_socket_set_writable(a_stream->events_socket, true); + } + + return ret; +} + + + +/** + * @brief dap_stream_send_keepalive + * @param a_stream + */ +void dap_stream_send_keepalive(dap_stream_t * a_stream) +{ + dap_stream_ch_pkt_hdr_t l_pkt={0}; + l_pkt.id = TECHICAL_CHANNEL_ID; + l_pkt.type=STREAM_CH_PKT_TYPE_KEEPALIVE; + + if( dap_stream_pkt_write( a_stream, &l_pkt, sizeof(l_pkt) ) ) + dap_stream_set_ready_to_write( a_stream, true ); +} + diff --git a/libdap-stream/stream/dap_stream_pkt.h b/libdap-stream/stream/dap_stream_pkt.h new file mode 100644 index 0000000000000000000000000000000000000000..ce6fd70636e35133c9f6076896a911fd913e4472 --- /dev/null +++ b/libdap-stream/stream/dap_stream_pkt.h @@ -0,0 +1,62 @@ +/* + Copyright (c) 2017-2018 (c) Project "DeM Labs Inc" https://github.com/demlabsinc + All rights reserved. + + This file is part of DAP (Deus Applications Prototypes) the open source project + + DAP (Deus Applicaions Prototypes) is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + DAP is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with any DAP based project. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once +#include <stdint.h> +#include <stddef.h> + +#define STREAM_PKT_SIZE_MAX 500000 +struct dap_stream; +typedef struct dap_stream dap_stream_t; +#define STREAM_PKT_TYPE_DATA_PACKET 0x00 +#define STREAM_PKT_TYPE_SERVICE_PACKET 0xff +//#define STREAM_PKT_TYPE_KEEPALIVE 0x11 + +typedef struct stream_pkt_hdr{ + uint8_t sig[8]; // Signature to find out beginning of the frame + uint32_t size; + uint64_t timestamp; + uint8_t type; // Packet type + uint64_t src_addr; // Source address ( vasya@domain.net ) + uint64_t dst_addr; // Destination address ( general#domain.net ) +} __attribute__((packed)) stream_pkt_hdr_t; + +typedef struct dap_stream_pkt{ + stream_pkt_hdr_t hdr; + uint8_t data[]; +} __attribute__((packed)) dap_stream_pkt_t; + +typedef struct stream_srv_pkt{ + uint32_t session_id; + uint8_t enc_type; + uint32_t coockie; +} __attribute__((packed)) stream_srv_pkt_t; + +extern const uint8_t c_dap_stream_sig[8]; + +dap_stream_pkt_t * dap_stream_pkt_detect(void * a_data, size_t data_size); + +size_t dap_stream_pkt_read(dap_stream_t * a_stream, dap_stream_pkt_t * a_pkt, void * a_buf_out, size_t a_buf_out_size); + +size_t dap_stream_pkt_write(dap_stream_t * a_stream, const void * data, size_t a_data_size); + +void dap_stream_send_keepalive( dap_stream_t * a_stream); + + diff --git a/libdap-stream/test/CMakeLists.txt b/libdap-stream/test/CMakeLists.txt new file mode 100755 index 0000000000000000000000000000000000000000..1da3b2de34a8d4e810161cf88f5744527690ccbb --- /dev/null +++ b/libdap-stream/test/CMakeLists.txt @@ -0,0 +1,18 @@ +if(TARGET libdap_stream_test) + return() # The project has already been built. +endif() +project(libdap_stream_test) + +add_subdirectory(libdap-test) + +file(GLOB SOURCES *.c) +file(GLOB HEADERS *.h) + +add_executable(${PROJECT_NAME} ${SOURCES} ${HEADERS}) + +target_link_libraries(${PROJECT_NAME} dap_test dap_core) + +add_test( + NAME libdap_stream_test + COMMAND libdap_stream_test +) diff --git a/libdap-stream/test/libdap-test b/libdap-stream/test/libdap-test new file mode 160000 index 0000000000000000000000000000000000000000..b76175acc517f085c319c8e66c62bd143f96bf94 --- /dev/null +++ b/libdap-stream/test/libdap-test @@ -0,0 +1 @@ +Subproject commit b76175acc517f085c319c8e66c62bd143f96bf94 diff --git a/libdap-stream/test/main.c b/libdap-stream/test/main.c new file mode 100755 index 0000000000000000000000000000000000000000..6527055741bdd36b50929c42c1a6bec70f6007cf --- /dev/null +++ b/libdap-stream/test/main.c @@ -0,0 +1,7 @@ +#include "dap_common.h" + +int main(void) { + // switch off debug info from library + dap_log_level_set(L_CRITICAL); + return 0; +}