From 9b6660caffef6b45da9fdc1e6476056cd5ae1549 Mon Sep 17 00:00:00 2001
From: Constantin Papizh <p.const@bk.ru>
Date: Wed, 11 Nov 2020 17:41:37 +0300
Subject: [PATCH] Event and timer types implemented

---
 core/include/dap_common.h      |  4 ++
 core/src/dap_common.c          |  2 -
 net/core/dap_events_socket.c   | 52 ++++++++++++++++++---
 net/core/dap_proc_thread.c     | 16 +++++--
 net/core/dap_timerfd.c         | 85 +++++++++++++++++++++++++++-------
 net/core/include/dap_timerfd.h |  8 ++++
 6 files changed, 137 insertions(+), 30 deletions(-)

diff --git a/core/include/dap_common.h b/core/include/dap_common.h
index 430c7af50..5ed8b225a 100755
--- a/core/include/dap_common.h
+++ b/core/include/dap_common.h
@@ -33,6 +33,10 @@
 #include <stdlib.h>
 #include <stdio.h>
 #include <time.h>
+#ifdef DAP_OS_WINDOWS
+#include <fcntl.h>
+#define pipe(pfds) _pipe(pfds, 4096, _O_BINARY)
+#endif
 #ifdef __MACH__
 #include <dispatch/dispatch.h>
 #endif
diff --git a/core/src/dap_common.c b/core/src/dap_common.c
index f5565971c..1be7ab9cf 100755
--- a/core/src/dap_common.c
+++ b/core/src/dap_common.c
@@ -55,8 +55,6 @@
 
   #define popen _popen
   #define pclose _pclose
-  #define pipe(pfds) _pipe(pfds, 4096, 0x8000)
-
 #endif
 
 #include "dap_common.h"
diff --git a/net/core/dap_events_socket.c b/net/core/dap_events_socket.c
index dd8872a0c..a702c48ef 100644
--- a/net/core/dap_events_socket.c
+++ b/net/core/dap_events_socket.c
@@ -28,11 +28,10 @@
 #include <string.h>
 #include <assert.h>
 #include <errno.h>
-#ifndef _WIN32
+#ifndef DAP_OS_WINDOWS
 #include <sys/epoll.h>
 #include <sys/select.h>
 #include <unistd.h>
-#include <fcntl.h>
 #else
 #include <winsock2.h>
 #include <windows.h>
@@ -42,7 +41,7 @@
 #include "wepoll.h"
 #endif
 #include <pthread.h>
-
+#include <fcntl.h>
 #include "dap_common.h"
 #include "dap_list.h"
 #include "dap_worker.h"
@@ -214,16 +213,22 @@ dap_events_socket_t * s_create_type_pipe(dap_worker_t * a_w, dap_events_socket_c
     l_errbuf[0]=0;
     if( pipe(l_pipe) < 0 ){
         l_errno = errno;
+#if defined DAP_OS_UNIX
         strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
         log_it( L_ERROR, "Error detected, can't create pipe(): '%s' (%d)", l_errbuf, l_errno);
+#elif defined DAP_OS_WINDOWS
+        log_it( L_ERROR, "Can't create pipe, errno: %d", l_errno);
+#endif
         DAP_DELETE(l_es);
         return NULL;
     }//else
      //   log_it(L_DEBUG, "Created one-way unnamed bytestream pipe %d->%d", l_pipe[0], l_pipe[1]);
     l_es->fd = l_pipe[0];
     l_es->fd2 = l_pipe[1];
+#if defined DAP_OS_UNIX
     fcntl( l_pipe[0], F_SETFL, O_NONBLOCK);
     fcntl( l_pipe[1], F_SETFL, O_NONBLOCK);
+#endif
 
 #else
 #error "No defined s_create_type_pipe() for your platform"
@@ -440,8 +445,21 @@ dap_events_socket_t * s_create_type_event(dap_worker_t * a_w, dap_events_socket_
         }
         DAP_DELETE(l_es);
         return NULL;
-    }//else
-     //   log_it(L_DEBUG, "Created eventfd descriptor %d", l_es->fd );
+    } else {
+        l_es->fd2 = l_es->fd;
+        log_it(L_DEBUG, "Created eventfd descriptor %d", l_es->fd );
+    }
+#elif defined DAP_OS_WINDOWS
+    int l_pipe[2];
+    if (pipe(l_pipe) < 0) {
+        log_it(L_ERROR, "Can't create pipe for event type, error: %d", errno);
+        DAP_DELETE(l_es);
+        return NULL;
+    }
+    l_es->fd2   = l_pipe[0];
+    l_es->fd    = l_pipe[1];
+    log_it(L_DEBUG, "Created pipe for event type, %d -> %d", l_es->fd2, l_es->fd);
+
 #endif
     return l_es;
 }
@@ -495,11 +513,24 @@ void dap_events_socket_event_proc_input_unsafe(dap_events_socket_t *a_esocket)
             log_it(L_WARNING, "Can't read packet from event fd: \"%s\"(%d)", l_errbuf, l_errno);
         }else
             return; // do nothing
+#elif defined DAP_OS_WINDOWS
+        uint64_t l_value;
+        int l_ret;
+        switch (l_ret = read(a_esocket->fd, &l_value, 8)) {
+        case -1:
+            log_it(L_CRITICAL, "Can't read from event socket pipe, error: %d", errno);
+            break;
+        case 0:
+            return;
+        default:
+            a_esocket->callbacks.event_callback(a_esocket, l_value);
+            break;
+        }
 #else
-#error "No Queue fetch mechanism implemented on your platform"
+#error "No event fetch mechanism implemented on your platform"
 #endif
     }else
-        log_it(L_ERROR, "Queue socket %d accepted data but callback is NULL ", a_esocket->socket);
+        log_it(L_ERROR, "Event socket %d accepted data but callback is NULL ", a_esocket->socket);
 }
 
 
@@ -642,6 +673,13 @@ int dap_events_socket_event_signal( dap_events_socket_t * a_es, uint64_t a_value
         return l_errno;
     else
         return 1;
+#elif defined DAP_OS_WINDOWS
+    byte_t l_bytes[sizeof(void*)] = { 0 };
+    if(write(a_es->fd2, l_bytes, sizeof(l_bytes)) == -1) {
+        return errno;
+    } else {
+        return 0;
+    }
 #else
 #error "Not implemented dap_events_socket_event_signal() for this platform"
 #endif
diff --git a/net/core/dap_proc_thread.c b/net/core/dap_proc_thread.c
index 732f3ee2d..5c6eee2d9 100644
--- a/net/core/dap_proc_thread.c
+++ b/net/core/dap_proc_thread.c
@@ -216,7 +216,7 @@ static void * s_proc_thread_function(void * a_arg)
 #ifdef DAP_EVENTS_CAPS_EPOLL
         //log_it(L_DEBUG, "Epoll_wait call");
         int l_selected_sockets = epoll_wait(l_thread->epoll_ctl, l_epoll_events, DAP_EVENTS_SOCKET_MAX, -1);
-        size_t l_sockets_max = l_selected_sockets;
+        size_t l_sockets_max = (size_t)l_selected_sockets;
 #elif defined (DAP_EVENTS_CAPS_POLL)
         int l_selected_sockets = poll(l_poll,l_poll_count,-1);
         size_t l_sockets_max = l_poll_count;
@@ -227,10 +227,14 @@ static void * s_proc_thread_function(void * a_arg)
         if(l_selected_sockets == -1) {
             if( errno == EINTR)
                 continue;
+#if defined DAP_OS_UNIX
             int l_errno = errno;
             char l_errbuf[128];
             strerror_r(l_errno, l_errbuf, sizeof (l_errbuf));
             log_it(L_ERROR, "Proc thread #%d got errno:\"%s\" (%d)", l_thread->cpu_id , l_errbuf, l_errno);
+#elif DAP_OS_WINDOWS
+            log_it(L_ERROR, "Error occured on thread #%d, errno: %d", l_thread->cpu_id , errno);
+#endif
             break;
         }
         for(size_t n = 0; n < l_sockets_max; n++) {
@@ -269,12 +273,16 @@ static void * s_proc_thread_function(void * a_arg)
             time_t l_cur_time = time( NULL);
             l_cur->last_time_active = l_cur_time;
             if (l_flag_error){
+#if defined DAP_OS_UNIX
                 int l_errno = errno;
                 char l_errbuf[128];
                 strerror_r(l_errno, l_errbuf,sizeof (l_errbuf));
                 log_it(L_ERROR,"Some error on proc thread #%u with %d socket: %s(%d)",l_thread->cpu_id, l_cur->socket, l_errbuf, l_errno);
+#elif defined DAP_OS_WINDOWS
+                log_it(L_ERROR,"Some error occured on thread #%u with socket %d, errno: %d",l_thread->cpu_id, l_cur->socket, errno);
+#endif
                 if(l_cur->callbacks.error_callback)
-                    l_cur->callbacks.error_callback(l_cur,errno);
+                    l_cur->callbacks.error_callback(l_cur, errno);
             }
             if (l_flag_read ){
                 switch (l_cur->type) {
@@ -285,7 +293,9 @@ static void * s_proc_thread_function(void * a_arg)
                             dap_events_socket_event_proc_input_unsafe (l_cur);
                     break;
 
-                    default:{ log_it(L_ERROR, "Unprocessed descriptor type accepted in proc thread loop"); }
+                    default:
+                        log_it(L_ERROR, "Unprocessed descriptor type accepted in proc thread loop");
+                        break;
                 }
             }
             if(l_cur->flags & DAP_ESOCK_SIGNAL_CLOSE){
diff --git a/net/core/dap_timerfd.c b/net/core/dap_timerfd.c
index 51be755a2..acbc18caf 100644
--- a/net/core/dap_timerfd.c
+++ b/net/core/dap_timerfd.c
@@ -20,17 +20,17 @@
     You should have received a copy of the GNU General Public License
     along with any DAP SDK based project.  If not, see <http://www.gnu.org/licenses/>.
 */
-#ifdef DAP_OS_UNIX
+
 #include <stdint.h>
 #include <stdbool.h>
 #include <errno.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
-#include <sys/time.h>
-#include <sys/timerfd.h>
 #include <inttypes.h>
-
+#ifdef DAP_OS_WINDOWS
+#include <winsock2.h>
+#endif
 #include "dap_common.h"
 #include "dap_events.h"
 #include "dap_worker.h"
@@ -51,6 +51,19 @@ int dap_timerfd_init()
     return 0;
 }
 
+#ifdef DAP_OS_WINDOWS
+void __stdcall TimerAPCb(void* arg, DWORD low, DWORD high) {  // Timer high value.
+    UNREFERENCED_PARAMETER(low)
+    UNREFERENCED_PARAMETER(high)
+    dap_timerfd_t *l_timerfd = (dap_timerfd_t *)arg;
+    byte_t l_bytes[sizeof(void*)] = { 0 };
+    if (write(l_timerfd->pipe_in, l_bytes, sizeof(l_bytes)) == -1) {
+        log_it(L_CRITICAL, "Error occured on writing into pipe from APC, errno: %d", errno);
+    }
+}
+#endif
+
+
 /**
  * @brief dap_timerfd_start
  * @param a_timeout_ms
@@ -73,10 +86,13 @@ dap_timerfd_t* dap_timerfd_start(uint64_t a_timeout_ms, dap_timerfd_callback_t a
 dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_timeout_ms, dap_timerfd_callback_t a_callback, void *a_callback_arg, bool a_repeated)
 
 {
+    dap_timerfd_t *l_timerfd = DAP_NEW(dap_timerfd_t);
+#if defined DAP_OS_UNIX
     struct itimerspec l_ts;
     int l_tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
     if(l_tfd == -1) {
         log_it(L_WARNING, "dap_timerfd_start() failed: timerfd_create() errno=%d\n", errno);
+        DAP_DELETE(l_timerfd);
         return NULL;
     }
     // repeat never
@@ -88,11 +104,34 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t
     if(timerfd_settime(l_tfd, 0, &l_ts, NULL) < 0) {
         log_it(L_WARNING, "dap_timerfd_start() failed: timerfd_settime() errno=%d\n", errno);
         close(l_tfd);
+        DAP_DELETE(l_timerfd);
         return NULL;
     }
 
-    // create dap_timerfd_t structure
-    dap_timerfd_t *l_timerfd = DAP_NEW(dap_timerfd_t);
+#elif defined DAP_OS_WINDOWS
+    HANDLE l_th = CreateWaitableTimer(NULL, true, NULL);
+    if (!l_th) {
+        log_it(L_CRITICAL, "Waitable timer not created, error %d", GetLastError());
+        DAP_DELETE(l_timerfd);
+        return NULL;
+    }
+    int l_pipe[2];
+    if (pipe(l_pipe) < 0) {
+        log_it(L_ERROR, "Can't create pipe, error: %d", errno);
+        DAP_DELETE(l_timerfd);
+        return NULL;
+    }
+    unsigned long l_mode = 0;
+    int l_tfd = l_pipe[0];
+    LARGE_INTEGER l_due_time;
+    l_due_time.QuadPart = (long long)a_timeout_ms * _MSEC;
+    if (!SetWaitableTimer(l_th, &l_due_time, 0, TimerAPCb, l_timerfd, false)) {
+        log_it(L_CRITICAL, "Waitable timer not set, error %d", GetLastError());
+        CloseHandle(l_th);
+        DAP_DELETE(l_timerfd);
+        return NULL;
+    }
+#endif
 
     // create events_socket for timer file descriptor
     dap_events_socket_callbacks_t l_s_callbacks;
@@ -105,14 +144,17 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t
     l_events_socket->_inheritor = l_timerfd;
 
     // fill out dap_timerfd_t structure
-    l_timerfd->timeout_ms = a_timeout_ms;
-    l_timerfd->tfd = l_tfd;
-    l_timerfd->events_socket = l_events_socket;
-    l_timerfd->callback = a_callback;
-    l_timerfd->callback_arg = a_callback_arg;
-    l_timerfd->repeated = a_repeated;
+    l_timerfd->timeout_ms       = a_timeout_ms;
+    l_timerfd->tfd              = l_tfd;
+    l_timerfd->events_socket    = l_events_socket;
+    l_timerfd->callback         = a_callback;
+    l_timerfd->callback_arg     = a_callback_arg;
+    l_timerfd->repeated         = a_repeated;
+#ifdef DAP_OS_WINDOWS
+    l_timerfd->th               = l_th;
+    l_timerfd->pipe_in          = l_pipe[1];
+#endif
     dap_worker_add_events_socket(l_events_socket, a_worker);
-
     return l_timerfd;
 }
 
@@ -122,13 +164,12 @@ dap_timerfd_t* dap_timerfd_start_on_worker(dap_worker_t * a_worker, uint64_t a_t
  */
 static void s_es_callback_timer(struct dap_events_socket *a_event_sock)
 {
-    uint64_t l_ptiu64;
     dap_timerfd_t *l_timerfd = a_event_sock->_inheritor;
     // run user's callback
     if(l_timerfd->callback)
         l_timerfd->callback(l_timerfd->callback_arg);
     if (l_timerfd->repeated) {
-        //printf("\nread() returned %d, %d\n", l_ptiu64, l_read_ret);
+#if defined DAP_OS_UNIX
         struct itimerspec l_ts;
         // repeat never
         l_ts.it_interval.tv_sec = 0;
@@ -139,8 +180,19 @@ static void s_es_callback_timer(struct dap_events_socket *a_event_sock)
         if(timerfd_settime(l_timerfd->tfd, 0, &l_ts, NULL) < 0) {
             log_it(L_WARNING, "callback_timerfd_read() failed: timerfd_settime() errno=%d\n", errno);
         }
+#elif defined DAP_OS_WINDOWS
+        LARGE_INTEGER l_due_time;
+        l_due_time.QuadPart = (long long)l_timerfd->timeout_ms * _MSEC;
+        if (!SetWaitableTimer(l_timerfd->th, &l_due_time, 0, TimerAPCb, l_timerfd, false)) {
+            log_it(L_CRITICAL, "Waitable timer not reset, error %d", GetLastError());
+            CloseHandle(l_timerfd->th);
+        }
+#endif
         dap_events_socket_set_readable_unsafe(a_event_sock, true);
     } else {
+#if defined DAP_OS_WINDOWS
+        CloseHandle(l_timerfd->th);
+#endif
         dap_events_socket_remove_and_delete_unsafe(l_timerfd->events_socket, false);
     }
 }
@@ -154,6 +206,3 @@ void dap_timerfd_delete(dap_timerfd_t *l_timerfd)
 {
     dap_events_socket_remove_and_delete_mt(l_timerfd->events_socket->worker, l_timerfd->events_socket);
 }
-#else
-#error "No dap_timerfd realization for your platform"
-#endif
diff --git a/net/core/include/dap_timerfd.h b/net/core/include/dap_timerfd.h
index 0f600fe55..097f8cf68 100644
--- a/net/core/include/dap_timerfd.h
+++ b/net/core/include/dap_timerfd.h
@@ -27,8 +27,12 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
+#if defined DAP_OS_UNIX
 #include <sys/time.h>
 #include <sys/timerfd.h>
+#elif defined DAP_OS_WINDOWS
+#define _MSEC -10000
+#endif
 #include <inttypes.h>
 
 #include "dap_common.h"
@@ -43,6 +47,10 @@ typedef struct dap_timerfd {
     dap_timerfd_callback_t callback;
     void *callback_arg;
     bool repeated;
+#ifdef DAP_OS_WINDOWS
+    HANDLE th;
+    int pipe_in;
+#endif
 } dap_timerfd_t;
 
 int dap_timerfd_init();
-- 
GitLab