-
alexander.lysikov authored8c23d41b
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <stdint.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <pthread.h>
#ifdef _WIN32
#include <winsock2.h>
#include <windows.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <io.h>
#include <time.h>
#endif
#include <curl/curl.h>
#include "dap_common.h"
#include "dap_http_client.h"
#include "dap_http_client_simple.h"
typedef struct dap_http_client_internal {
dap_http_client_simple_callback_data_t response_callback;
dap_http_client_simple_callback_error_t error_callback;
void *obj;
uint8_t *request;
size_t request_size;
size_t request_sent_size;
struct curl_slist *request_headers;
uint8_t *response;
size_t response_size;
size_t response_size_max;
} dap_http_client_internal_t;
typedef struct dap_http_client_active_conn{
void *curl_h;// CURL
void *client_obj;// dap_client_pvt_t
UT_hash_handle hh;
} dap_http_client_active_conn_t;
// List of active connections
static dap_http_client_active_conn_t *s_conn_list = NULL;
// for separate access to s_conn_list
static pthread_mutex_t s_conn_list_mutex = PTHREAD_MUTEX_INITIALIZER;
CURLM *m_curl_mh = NULL; // Multi-thread handle to stack lot of parallel requests
#ifndef _WIN32
pthread_t curl_pid = 0;
#else
pthread_t curl_pid = { NULL, 0 };
#endif
pthread_cond_t m_curl_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t m_curl_mutex = PTHREAD_MUTEX_INITIALIZER;
static void *dap_http_client_thread( void *arg );
size_t dap_http_client_curl_request_callback( char *a_ptr, size_t a_size, size_t a_nmemb, void *a_userdata );
size_t dap_http_client_curl_response_callback(char *a_ptr, size_t a_size, size_t a_nmemb, void *a_userdata );
void dap_http_client_internal_delete( dap_http_client_internal_t *a_client );
#define DAP_HTTP_CLIENT_RESPONSE_SIZE_MAX 40960
#define LOG_TAG "dap_http_client"
/**
* @brief dap_http_client_init
* @return
*/
int dap_http_client_simple_init( )
{
log_it( L_NOTICE,"dap_http_client_simple_init( )" );
curl_global_init( CURL_GLOBAL_ALL );
m_curl_mh = curl_multi_init( );
pthread_create( &curl_pid, NULL, dap_http_client_thread, NULL );
return 0;
}
/**
* @brief dap_http_client_deinit
*/
void dap_http_client_simple_deinit( )
{
curl_multi_cleanup( m_curl_mh );
}
/**
* dap_http_client_lock_active_conn
*/
int dap_http_client_lock_active_conn(void)
{
return pthread_mutex_lock(&s_conn_list_mutex);
}
/**
* dap_http_client_unlock_active_conn
*/
int dap_http_client_unlock_active_conn(void)
{
return pthread_mutex_unlock(&s_conn_list_mutex);
}
/**
* find active connection in the list by CURL
*
* return 0 OK, -1 error, -2 connection not found
*/
static dap_http_client_active_conn_t* dap_http_client_get_active_conn_by_curl(void *a_curl_h)
{
if(!a_curl_h)
return NULL;
dap_http_client_active_conn_t *l_cur_item;
HASH_FIND(hh, s_conn_list, a_curl_h, sizeof(CURL), l_cur_item);
return l_cur_item;
}
/**
* find active connection in the list by dap_http_client_internal_t
*
* return 0 OK, -1 error, -2 connection not found
*/
void* dap_http_client_get_active_conn_by_obj(void *a_client_obj)
{
if(!a_client_obj)
return NULL;
dap_http_client_active_conn_t * l_cur_item, *l_item_tmp;
HASH_ITER(hh, s_conn_list, l_cur_item, l_item_tmp)
{
if(l_cur_item->client_obj == a_client_obj)
break;
}
return (void*)l_cur_item;
}
/**
* Add new active connection to the list
*
* return 0 OK, -1 error, -2 connection present
*/
static int dap_http_client_add_active_conn(CURL *a_curl_h, dap_http_client_internal_t *a_client_obj)
{
int l_ret = 0;
if(!a_curl_h || !a_client_obj)
return -1;
pthread_mutex_lock(&s_conn_list_mutex);
dap_http_client_active_conn_t *l_cur_item = dap_http_client_get_active_conn_by_curl(a_curl_h);
if(l_cur_item == NULL) {
l_cur_item = DAP_NEW(dap_http_client_active_conn_t);
l_cur_item->curl_h = a_curl_h;
l_cur_item->client_obj = a_client_obj;
HASH_ADD(hh, s_conn_list, curl_h, sizeof(CURL), l_cur_item); // address: name of key field
l_ret = 0;
}
// connection already present
else
l_ret = -2;
//connect_list = g_list_append(connect_list, client);
pthread_mutex_unlock(&s_conn_list_mutex);
return l_ret;
}
/**
* Delete active connection from the list
*
* return 0 OK, -1 error, -2 connection not found
*/
int dap_http_client_del_active_conn(void *a_curl_h, void *a_client_obj)
{
int ret = -1;
if(!a_curl_h && !a_client_obj)
return -1;
dap_http_client_active_conn_t *l_cur_item;
pthread_mutex_lock(&s_conn_list_mutex);
do {
if(a_curl_h)
l_cur_item = dap_http_client_get_active_conn_by_curl(a_curl_h);
else if(a_client_obj)
l_cur_item = dap_http_client_get_active_conn_by_obj(a_client_obj);
if(l_cur_item != NULL) {
HASH_DEL(s_conn_list, l_cur_item);
DAP_DELETE(l_cur_item);
ret = 0;
}
// connection not found in the hash
else {
ret = -2;
break;
}
}
// maybe some active conn with the same a_client_obj, required del all of them
while(l_cur_item);
pthread_mutex_unlock(&s_conn_list_mutex);
return ret;
}
/**
* @brief dap_http_client_internal_delete
* @param a_client
*/
void dap_http_client_internal_delete( dap_http_client_internal_t * a_client_internal )
{
log_it( L_NOTICE,"dap_http_client_internal_delete" );
if( a_client_internal->request_headers )
curl_slist_free_all( a_client_internal->request_headers );
if ( a_client_internal->request )
free( a_client_internal->request );
if ( a_client_internal->response )
free( a_client_internal->response );
free( a_client_internal );
log_it( L_NOTICE,"dap_http_client_internal_delete ok" );
}
/**
* @brief dap_http_client_simple_request_break
* @param a_curl
*/
void dap_http_client_simple_request_break(long a_curl_sock)
{
//if(!a_curl)
if(a_curl_sock<0)
return;
#ifdef _WIN32
closesocket(a_curl_sock);
#else
close(a_curl_sock);
#endif
//curl_multi_remove_handle(m_curl_mh, a_curl);
//curl_easy_cleanup(a_curl);
}
/**
* @brief dap_http_client_simple_request
* @param a_url
* @param a_method
* @param a_request_content_type
* @param a_request
* @param a_request_size
* @param a_response_callback
* @param a_error_callback
* @param a_obj
*/
void* dap_http_client_simple_request_custom( const char *a_url, const char *a_method, const char *a_request_content_type,
void *a_request, size_t a_request_size, char *a_cookie,
dap_http_client_simple_callback_data_t a_response_callback,
dap_http_client_simple_callback_error_t a_error_callback,
long *curl_sockfd, void *a_obj, char **a_custom, size_t a_custom_count )
{
log_it( L_DEBUG, "Simple HTTP request with static predefined buffer (%lu bytes) on url '%s'",
DAP_HTTP_CLIENT_RESPONSE_SIZE_MAX, a_url );
CURL *l_curl_h = curl_easy_init( );
dap_http_client_internal_t *l_client_internal = DAP_NEW_Z( dap_http_client_internal_t );
l_client_internal->error_callback = a_error_callback;
l_client_internal->response_callback = a_response_callback;
l_client_internal->obj = a_obj;
l_client_internal->response_size_max = DAP_HTTP_CLIENT_RESPONSE_SIZE_MAX;
l_client_internal->response = (uint8_t*) calloc( 1 ,DAP_HTTP_CLIENT_RESPONSE_SIZE_MAX );
l_client_internal->request = malloc(a_request_size);
memcpy(l_client_internal->request, a_request, a_request_size);
l_client_internal->request_size = a_request_size;
if( ( a_request ) && ( (
(strcmp( a_method , "POST" ) == 0) ||
(strcmp( a_method , "POST_ENC" ) == 0)
) ) ){
char l_buf[1024];
log_it ( L_DEBUG , "POST request with %u bytes of decoded data" , a_request_size );
if( a_request_content_type )
l_client_internal->request_headers = curl_slist_append(l_client_internal->request_headers, a_request_content_type );
if ( a_custom ) {
for( int i = 0; i < a_custom_count; i++ ) {
l_client_internal->request_headers = curl_slist_append( l_client_internal->request_headers, (char*) a_custom[i] );
}
}
if ( a_cookie )
l_client_internal->request_headers = curl_slist_append( l_client_internal->request_headers,(char*) a_cookie );
dap_snprintf(l_buf,sizeof(l_buf),"Content-Length: %lu", a_request_size );
l_client_internal->request_headers = curl_slist_append(l_client_internal->request_headers, l_buf);
//curl_easy_setopt( l_curl_h , CURLOPT_READDATA , l_client_internal );
curl_easy_setopt( l_curl_h , CURLOPT_POST , 1 );
curl_easy_setopt( l_curl_h , CURLOPT_POSTFIELDSIZE, a_request_size );
}
if ( l_client_internal->request_headers )
curl_easy_setopt( l_curl_h, CURLOPT_HTTPHEADER, l_client_internal->request_headers );
curl_easy_setopt( l_curl_h , CURLOPT_PRIVATE, l_client_internal );
curl_easy_setopt( l_curl_h , CURLOPT_URL, a_url);
curl_easy_setopt( l_curl_h , CURLOPT_READDATA , l_client_internal );
curl_easy_setopt( l_curl_h , CURLOPT_READFUNCTION , dap_http_client_curl_request_callback );
curl_easy_setopt( l_curl_h , CURLOPT_WRITEDATA , l_client_internal );
curl_easy_setopt( l_curl_h , CURLOPT_WRITEFUNCTION , dap_http_client_curl_response_callback );
// add active connection to list
dap_http_client_add_active_conn(l_curl_h, a_obj);
//if(curl_sockfd)
//curl_easy_getinfo( l_curl_h , CURLINFO_LASTSOCKET, curl_sockfd);
curl_multi_add_handle( m_curl_mh, l_curl_h );
//curl_multi_perform(m_curl_mh, &m_curl_cond);
#ifndef _WIN32
pthread_cond_signal( &m_curl_cond);
send_select_break( );
#endif
return l_curl_h;
}
/**
* @brief dap_http_client_simple_request
* @param a_url
* @param a_method
* @param a_request_content_type
* @param a_request
* @param a_request_size
* @param a_response_callback
* @param a_error_callback
* @param a_obj
*/
void* dap_http_client_simple_request(const char * a_url, const char * a_method, const char* a_request_content_type, void *a_request, size_t a_request_size, char * a_cookie, dap_http_client_simple_callback_data_t a_response_callback,
dap_http_client_simple_callback_error_t a_error_callback, long *curl_sockfd, void *a_obj, void * a_custom)
{
char *a_custom_new[1];
size_t a_custom_count = 0;
a_custom_new[0] = (char*)a_custom;
if(a_custom)
a_custom_count = 1;
return dap_http_client_simple_request_custom(a_url, a_method, a_request_content_type, a_request, a_request_size,
a_cookie, a_response_callback, a_error_callback, curl_sockfd, a_obj, a_custom_new, a_custom_count);
}
/**
* @brief dap_http_client_curl_response_callback
* @param a_ptr
* @param a_size
* @param a_nmemb
* @param a_userdata
* @return
*/
size_t dap_http_client_curl_response_callback( char *a_ptr, size_t a_size, size_t a_nmemb, void *a_userdata )
{
dap_http_client_internal_t * l_client_internal = (dap_http_client_internal_t *) a_userdata;
log_it(L_DEBUG, "Recieved %lu bytes in HTTP resonse", a_size*a_nmemb);
if( l_client_internal->response_size < l_client_internal->response_size_max){
size_t l_size = a_size * a_nmemb;
if( l_size > ( l_client_internal->response_size_max - l_client_internal->response_size) )
l_size = l_client_internal->response_size_max - l_client_internal->response_size;
memcpy(l_client_internal->response + l_client_internal->response_size,a_ptr,l_size);
l_client_internal->response_size += l_size;
}else{
log_it(L_WARNING,"Too big reply, %lu bytes a lost",a_size*a_nmemb);
}
return a_size*a_nmemb;
}
/**
* @brief dap_http_client_curl_request_callback
* @param a_ptr
* @param a_size
* @param a_nmemb
* @param a_userdata
* @return
*/
size_t dap_http_client_curl_request_callback(char * a_ptr, size_t a_size, size_t a_nmemb, void * a_userdata)
{
dap_http_client_internal_t * l_client_internal = (dap_http_client_internal_t *) a_userdata;
size_t l_size = a_size * a_nmemb;
if( ( l_size + l_client_internal->request_sent_size) > l_client_internal->request_size )
l_size = l_client_internal->request_size - l_client_internal->request_sent_size;
if( l_size ) {
memcpy( a_ptr, l_client_internal->request + l_client_internal->request_sent_size, l_size );
l_client_internal->request_sent_size += l_size;
}
return l_size;
}
/**
* @brief dap_http_client_thread
* @param arg
*/
static void* dap_http_client_thread(void * arg)
{
(void) arg;
bool l_still_running = true;
// return NULL;
log_it(L_DEBUG, "dap_http_client_thread started");
do {
struct timeval timeout;
int rc = 0; /* select() return code */
CURLMcode mc; /* curl_multi_fdset() return code */
fd_set fdread;
fd_set fdwrite;
fd_set fdexcep;
int maxfd = -1;
long curl_timeo = -1;
FD_ZERO(&fdread);
FD_ZERO(&fdwrite);
FD_ZERO(&fdexcep);
/* set a suitable timeout to play around with */
timeout.tv_sec = 10;
timeout.tv_usec = 0;
// log_it(L_DEBUG, "curl_multi_timeout( )\n");
curl_multi_timeout( m_curl_mh, &curl_timeo );
// Sleep(1000);
// log_it(L_DEBUG, "curl_timeo = %u\n", curl_timeo);
if(curl_timeo >= 0) {
timeout.tv_sec = curl_timeo / 1000;
if(timeout.tv_sec > 1)
timeout.tv_sec = 1;
else
timeout.tv_usec = (curl_timeo % 1000) * 1000;
}
/* get file descriptors from the transfers */
mc = curl_multi_fdset( m_curl_mh, &fdread, &fdwrite, &fdexcep, &maxfd );
// log_it(L_DEBUG, "curl_multi_fdset() = %u maxfd = %u\n", mc, maxfd );
#ifndef _WIN32
FD_SET(get_select_breaker(),&fdread);
if(get_select_breaker() > maxfd)
maxfd = get_select_breaker();
#endif
if(mc != CURLM_OK) {
log_it(L_ERROR, "curl_multi_fdset() failed, code %d.\n", mc);
break;
}
/* On success the value of maxfd is guaranteed to be >= -1. We call
select(maxfd + 1, ...); specially in case of (maxfd == -1) there are
no fds ready yet so we call select(0, ...) --or Sleep() on Windows--
to sleep 100ms, which is the minimum suggested value in the
curl_multi_fdset() doc. */
rc = 0;
if ( maxfd == -1 ) {
// log_it(L_DEBUG, "Waiting for signal");
#ifndef _WIN32
pthread_cond_wait( &m_curl_cond, &m_curl_mutex );
#else
Sleep(100);
#endif
// Sleep(1000);
} else {
// Sleep(100);
// log_it(L_DEBUG, "Selecting stuff maxfd = %u select timeout %u", maxfd, timeout );
/* Note that on some platforms 'timeout' may be modified by select().
If you need access to the original value save a copy beforehand. */
rc = select( maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout );
}
// log_it(L_DEBUG, "Select error rc = %u", rc );
switch(rc) {
case -1: {
// log_it(L_DEBUG, "Select error rc = %u", rc );
/* select error */
} break;
case 0: /* timeout */
// log_it(L_DEBUG, "Select timeout rc = %u", rc );
default: { /* action */
// log_it(L_DEBUG, "ACTTION ? rc = %u", rc );
int l_curl_eh_count = 0;
curl_multi_perform( m_curl_mh , &l_curl_eh_count );
// Check if we have smth complete
struct CURLMsg *m;
do {
int msgq = 0;
m = curl_multi_info_read(m_curl_mh, &msgq);
if(m && (m->msg == CURLMSG_DONE)) {
// log_it(L_DEBUG, "CURLMSG_DONE" );
CURL *e = m->easy_handle;
char * l_private = NULL;
int l_err_code = 0;
curl_easy_getinfo( e, CURLINFO_PRIVATE, &l_private );
// log_it(L_DEBUG, "l_private = %p", l_private );
if( l_private ){
bool l_is_ok = false;
dap_http_client_internal_t *l_client_internal = (dap_http_client_internal_t *) l_private;
switch ( m->data.result){
case CURLE_OUT_OF_MEMORY: l_err_code = 1 ; log_it(L_CRITICAL, "Out of memory"); break;
case CURLE_COULDNT_CONNECT: l_err_code = 2 ; log_it(L_ERROR, "Couldn't connect to the destination server"); break;
case CURLE_COULDNT_RESOLVE_HOST: l_err_code = 3 ; log_it(L_ERROR, "Couldn't resolve destination address"); break;
case CURLE_OPERATION_TIMEDOUT: l_err_code = 4 ; log_it(L_ERROR, "HTTP request timeout"); break;
case CURLE_URL_MALFORMAT: l_err_code = 5 ; log_it(L_ERROR, "Wrong URL format in the outgoing request"); break;
case CURLE_FTP_WEIRD_SERVER_REPLY: l_err_code = 6 ; log_it(L_WARNING, "Weird server reply"); break;
case CURLE_OK:{
l_is_ok = true;
log_it(L_DEBUG, "Response size %u",l_client_internal->response_size);
}break;
default: l_err_code = 12345;
}
if( l_is_ok){
l_client_internal->response_callback(l_client_internal->response,
l_client_internal->response_size,
l_client_internal->obj );
}else {
log_it(L_WARNING, "HTTP request wasn't processed well with error code %d",m->data.result );
l_client_internal->error_callback(l_err_code , l_client_internal->obj );
}
dap_http_client_internal_delete(l_client_internal);
}
else {
log_it(L_CRITICAL, "Can't get private information from libcurl handle to perform the reply to SAP connection");
}
// del active connection from list
dap_http_client_del_active_conn(e, NULL);
curl_multi_remove_handle(m_curl_mh, e);
curl_easy_cleanup(e);
}
} while(m);
} break;
}
} while(l_still_running);
return NULL;
}