From 34bfa1cdf00f041ba49ee9faeb8dc7adc1727047 Mon Sep 17 00:00:00 2001
From: "alexander.lysikov" <alexander.lysikov@demlabs.net>
Date: Tue, 15 Sep 2020 09:13:02 +0000
Subject: [PATCH] made a thread for each message to write to the pipe in case
 of EAGAIN (11)

---
 dap-sdk/net/client/dap_client_pvt.c  | 12 +++-
 dap-sdk/net/core/dap_events_socket.c | 91 +++++++++++++++++++++++++++-
 2 files changed, 100 insertions(+), 3 deletions(-)

diff --git a/dap-sdk/net/client/dap_client_pvt.c b/dap-sdk/net/client/dap_client_pvt.c
index 63db3542a1..9a97bc1e8f 100644
--- a/dap-sdk/net/client/dap_client_pvt.c
+++ b/dap-sdk/net/client/dap_client_pvt.c
@@ -612,7 +612,11 @@ static void s_stage_status_after(dap_client_pvt_t * a_client_pvt)
         log_it(L_ERROR, "Error state, doing callback if present");
         if(a_client_pvt->stage_status_error_callback) {
             //dap_client_pvt_ref(a_client_pvt);
-            a_client_pvt->stage_status_error_callback(a_client_pvt->client, (void*)l_is_last_attempt);
+            if(a_client_pvt == a_client_pvt->client->_internal)
+                a_client_pvt->stage_status_error_callback(a_client_pvt->client, (void*) l_is_last_attempt);
+            else {
+                log_it(L_ERROR, "client_pvt->client=%x corrupted", a_client_pvt->client->_internal);
+            }
             //dap_client_pvt_unref(a_client_pvt);
             // Expecting that its one-shot callback
             //a_client_internal->stage_status_error_callback = NULL;
@@ -1241,7 +1245,11 @@ void m_es_stream_delete(dap_events_socket_t *a_es, void *arg)
     log_it(L_DEBUG, "client_pvt=0x%x", l_client_pvt);
 
     if (l_client_pvt->stage_status_error_callback) {
-        l_client_pvt->stage_status_error_callback(l_client_pvt->client, (void *)true);
+        if(l_client_pvt == l_client_pvt->client->_internal)
+            l_client_pvt->stage_status_error_callback(l_client_pvt->client, (void *) true);
+        else {
+            log_it(L_ERROR, "client_pvt->client=%x corrupted", l_client_pvt->client->_internal);
+        }
     }
     dap_stream_delete(l_client_pvt->stream);
     l_client_pvt->stream = NULL;
diff --git a/dap-sdk/net/core/dap_events_socket.c b/dap-sdk/net/core/dap_events_socket.c
index 5da958b3f0..8f4f017504 100644
--- a/dap-sdk/net/core/dap_events_socket.c
+++ b/dap-sdk/net/core/dap_events_socket.c
@@ -30,6 +30,7 @@
 #include <errno.h>
 #ifndef _WIN32
 #include <sys/epoll.h>
+#include <sys/select.h>
 #include <unistd.h>
 #include <fcntl.h>
 #else
@@ -39,10 +40,11 @@
 #include <ws2tcpip.h>
 #include <io.h>
 #include "wepoll.h"
-#include <pthread.h>
 #endif
+#include <pthread.h>
 
 #include "dap_common.h"
+#include "dap_list.h"
 #include "dap_worker.h"
 #include "dap_events.h"
 
@@ -476,6 +478,90 @@ void dap_events_socket_event_proc_input_unsafe(dap_events_socket_t *a_esocket)
         log_it(L_ERROR, "Queue socket %d accepted data but callback is NULL ", a_esocket->socket);
 }
 
+
+typedef struct dap_events_socket_buf_item
+{
+    dap_events_socket_t * es;
+    void *arg;
+} dap_events_socket_buf_item_t;
+
+int dap_events_socket_queue_ptr_send(dap_events_socket_t * a_es, void* a_arg);
+
+/**
+ *  Waits on the socket
+ *  return 0: timeout, 1: may send data, -1 error
+ */
+static int wait_send_socket(int a_sockfd, long timeout_ms)
+{
+    struct timeval l_tv;
+    fd_set l_outfd, l_errfd;
+
+    l_tv.tv_sec = timeout_ms / 1000;
+    l_tv.tv_usec = (timeout_ms % 1000) * 1000;
+
+    FD_ZERO(&l_outfd);
+    FD_ZERO(&l_errfd);
+    FD_SET(a_sockfd, &l_errfd);
+    FD_SET(a_sockfd, &l_outfd);
+
+    while(1) {
+#ifdef DAP_OS_WINDOWS
+    int l_res = select(1, NULL, &l_outfd, &l_errfd, &l_tv);
+#else
+        int l_res = select(a_sockfd + 1, NULL, &l_outfd, &l_errfd, &l_tv);
+#endif
+        if(l_res == 0){
+            log_it(L_DEBUG, "socket %d timed out", a_sockfd);
+        }
+        if(l_res == -1) {
+            if(errno == EINTR)
+                continue;
+            log_it(L_DEBUG, "socket %d waiting errno=%d", errno);
+            return l_res;
+        }
+        break;
+    };
+
+    if(FD_ISSET(a_sockfd, &l_outfd))
+        return 1;
+
+    return -1;
+}
+
+/**
+ * @brief dap_events_socket_buf_thread
+ * @param arg
+ * @return
+ */
+void *dap_events_socket_buf_thread(void *arg)
+{
+    dap_events_socket_buf_item_t *l_item = (dap_events_socket_buf_item_t*) arg;
+    if(!l_item) {
+        pthread_exit(0);
+    }
+    int l_res = 0;
+    //int l_count = 0;
+    //while(l_res < 1 && l_count < 3) {
+    // wait max 5 min
+    l_res = wait_send_socket(l_item->es->fd2, 300000);
+    //    l_count++;
+    //}
+    // if timeout or
+    if(l_res >= 0)
+        dap_events_socket_queue_ptr_send(l_item->es, l_item->arg);
+    DAP_DELETE(l_item);
+    pthread_exit(0);
+}
+
+static void add_ptr_to_buf(dap_events_socket_t * a_es, void* a_arg)
+{
+    dap_events_socket_buf_item_t *l_item = DAP_NEW(dap_events_socket_buf_item_t);
+    l_item->es = a_es;
+    l_item->arg = a_arg;
+    pthread_t l_thread;
+    pthread_create(&l_thread, NULL, dap_events_socket_buf_thread, l_item);
+}
+
 /**
  * @brief dap_events_socket_send_event
  * @param a_es
@@ -491,6 +577,9 @@ int dap_events_socket_queue_ptr_send( dap_events_socket_t * a_es, void* a_arg)
     else{
         char l_errbuf[128];
         log_it(L_ERROR, "Can't send ptr to queue:\"%s\" code %d", strerror_r(l_errno, l_errbuf, sizeof (l_errbuf)), l_errno);
+        // Try again
+        if(l_errno == EAGAIN)
+            add_ptr_to_buf(a_es, a_arg);
         return l_errno;
     }
 #elif defined (DAP_EVENTS_CAPS_QUEUE_POSIX)
-- 
GitLab