diff --git a/dap-sdk/net/stream/ch/dap_stream_ch.c b/dap-sdk/net/stream/ch/dap_stream_ch.c index 6402394bf3fb0db0276b505c77212692d9cac02b..4c4a8148c2b1d10ac82f8f13a3edc3cac60b6ede 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch.c @@ -45,6 +45,13 @@ #define LOG_TAG "dap_stream_ch" +static struct dap_stream_ch_table_t { + dap_stream_ch_t *ch; + UT_hash_handle hh; +} *s_ch_table = NULL; + +static pthread_mutex_t s_ch_table_lock; + /** * @brief stream_ch_init Init stream channel module * @return Zero if ok others if no @@ -59,6 +66,7 @@ int dap_stream_ch_init() log_it(L_CRITICAL,"Can't init stream channel packet submodule"); return -1; } + pthread_mutex_init(&s_ch_table_lock, NULL); log_it(L_NOTICE,"Module stream channel initialized"); return 0; } @@ -68,6 +76,7 @@ int dap_stream_ch_init() */ void dap_stream_ch_deinit() { + pthread_mutex_destroy(&s_ch_table_lock); } /** @@ -82,7 +91,8 @@ dap_stream_ch_t* dap_stream_ch_new(dap_stream_t* a_stream, uint8_t id) dap_stream_ch_t* ret = DAP_NEW_Z(dap_stream_ch_t); ret->stream = a_stream; ret->proc = proc; - ret->ready_to_read=true; + ret->ready_to_read = true; + pthread_mutex_init(&(ret->mutex),NULL); if(ret->proc->new_callback) ret->proc->new_callback(ret,NULL); @@ -92,6 +102,11 @@ dap_stream_ch_t* dap_stream_ch_new(dap_stream_t* a_stream, uint8_t id) a_stream->channel_count++; pthread_rwlock_unlock(&a_stream->rwlock); + struct dap_stream_ch_table_t *l_new_ch = DAP_NEW_Z(struct dap_stream_ch_table_t); + pthread_mutex_lock(&s_ch_table_lock); + HASH_ADD_PTR(s_ch_table, ch, l_new_ch); + pthread_mutex_unlock(&s_ch_table_lock); + return ret; }else{ log_it(L_WARNING, "Unknown stream processor with id %uc",id); @@ -99,24 +114,40 @@ dap_stream_ch_t* dap_stream_ch_new(dap_stream_t* a_stream, uint8_t id) } } +bool dap_stream_ch_valid(dap_stream_ch_t *a_ch) +{ + struct dap_stream_ch_table_t *l_ret; + pthread_mutex_lock(&s_ch_table_lock); + HASH_FIND_PTR(s_ch_table, a_ch, l_ret); + pthread_mutex_unlock(&s_ch_table_lock); + return l_ret; +} + /** * @brief stream_ch_delete Delete channel instance * @param ch Channel delete */ -void dap_stream_ch_delete(dap_stream_ch_t*ch) +void dap_stream_ch_delete(dap_stream_ch_t *a_ch) { - if(ch->proc) - if(ch->proc->delete_callback) - ch->proc->delete_callback(ch,NULL); + pthread_mutex_lock(&s_ch_table_lock); + struct dap_stream_ch_table_t *l_ret;; + HASH_FIND_PTR(s_ch_table, a_ch, l_ret); + HASH_DEL(s_ch_table, l_ret); + pthread_mutex_unlock(&s_ch_table_lock); - pthread_mutex_destroy(&(ch->mutex)); + pthread_mutex_lock(&a_ch->mutex); + if (a_ch->proc) + if (a_ch->proc->delete_callback) + a_ch->proc->delete_callback(a_ch, NULL); + pthread_mutex_unlock(&a_ch->mutex); + pthread_mutex_destroy(&a_ch->mutex); /* fixed raise, but probably may be memory leak! if(ch->internal){ free(ch->internal); } */ - //free(ch); + DAP_DELETE(a_ch); } /** @@ -126,6 +157,9 @@ void dap_stream_ch_delete(dap_stream_ch_t*ch) */ void dap_stream_ch_set_ready_to_read(dap_stream_ch_t * a_ch,bool a_is_ready) { + if (!dap_stream_ch_valid(a_ch)) { + return; + } pthread_mutex_lock(&a_ch->mutex); if( a_ch->ready_to_read != a_is_ready){ //log_it(L_DEBUG,"Change channel '%c' to %s", (char) ch->proc->id, is_ready?"true":"false"); @@ -149,6 +183,9 @@ void dap_stream_ch_set_ready_to_read(dap_stream_ch_t * a_ch,bool a_is_ready) */ void dap_stream_ch_set_ready_to_write(dap_stream_ch_t * ch,bool is_ready) { + if (!dap_stream_ch_valid(ch)) { + return; + } pthread_mutex_lock(&ch->mutex); if(ch->ready_to_write!=is_ready){ //log_it(L_DEBUG,"Change channel '%c' to %s", (char) ch->proc->id, is_ready?"true":"false"); @@ -166,3 +203,37 @@ void dap_stream_ch_set_ready_to_write(dap_stream_ch_t * ch,bool is_ready) } pthread_mutex_unlock(&ch->mutex); } + +/** + * @brief dap_stream_ch_get_ready_to_read + * @param a_ch + * @return + */ +bool dap_stream_ch_get_ready_to_read(dap_stream_ch_t * a_ch) +{ + if (!dap_stream_ch_valid(a_ch)) { + return false; + } + bool l_ret; + pthread_mutex_lock(&a_ch->mutex); + l_ret = a_ch->ready_to_read; + pthread_mutex_unlock(&a_ch->mutex); + return l_ret; +} + +/** + * @brief dap_stream_ch_get_ready_to_write + * @param a_ch + * @return + */ +bool dap_stream_ch_get_ready_to_write(dap_stream_ch_t * a_ch) +{ + if (!dap_stream_ch_valid(a_ch)) { + return false; + } + bool l_ret; + pthread_mutex_lock(&a_ch->mutex); + l_ret = a_ch->ready_to_write; + pthread_mutex_unlock(&a_ch->mutex); + return l_ret; +} diff --git a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c index cb575948c43b1c9604965ca8ec56f881f7cc5bbc..b1600592b17f4572c400430a2aba24b0fb123f24 100644 --- a/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c +++ b/dap-sdk/net/stream/ch/dap_stream_ch_pkt.c @@ -79,6 +79,9 @@ size_t dap_stream_ch_pkt_write(struct dap_stream_ch * a_ch, uint8_t a_type, con log_it(L_WARNING,"Zero data size to write out in channel"); return 0; } + if (!dap_stream_ch_valid(a_ch)) { + return 0; + } pthread_mutex_lock( &a_ch->mutex); //log_it(L_DEBUG,"Output: Has %u bytes of %c type for %c channel id",data_size, (char)type, (char) ch->proc->id ); diff --git a/dap-sdk/net/stream/ch/include/dap_stream_ch.h b/dap-sdk/net/stream/ch/include/dap_stream_ch.h index cde7633792a8928bb6398c0dc67def14e3bb1f00..a9c1f484fee1a16f231be3a3ea13a1d046f5a834 100644 --- a/dap-sdk/net/stream/ch/include/dap_stream_ch.h +++ b/dap-sdk/net/stream/ch/include/dap_stream_ch.h @@ -58,34 +58,11 @@ dap_stream_ch_t* dap_stream_ch_new( dap_stream_t * dap_stream,uint8_t id); void dap_stream_ch_set_ready_to_read(dap_stream_ch_t * ch,bool is_ready); void dap_stream_ch_set_ready_to_write(dap_stream_ch_t * ch,bool is_ready); -/** - * @brief dap_stream_ch_get_ready_to_read - * @param a_ch - * @return - */ -static inline bool dap_stream_ch_get_ready_to_read(dap_stream_ch_t * a_ch) -{ - bool l_ret; - pthread_mutex_lock(&a_ch->mutex); - l_ret = a_ch->ready_to_read; - pthread_mutex_unlock(&a_ch->mutex); - return l_ret; -} - -/** - * @brief dap_stream_ch_get_ready_to_write - * @param a_ch - * @return - */ -static inline bool dap_stream_ch_get_ready_to_write(dap_stream_ch_t * a_ch) -{ - bool l_ret; - pthread_mutex_lock(&a_ch->mutex); - l_ret = a_ch->ready_to_write; - pthread_mutex_unlock(&a_ch->mutex); - return l_ret; -} - -void dap_stream_ch_delete(dap_stream_ch_t*ch); +bool dap_stream_ch_get_ready_to_read(dap_stream_ch_t *a_ch); +bool dap_stream_ch_get_ready_to_write(dap_stream_ch_t *a_ch); + +void dap_stream_ch_delete(dap_stream_ch_t *a_ch); + +bool dap_stream_ch_valid(dap_stream_ch_t *a_ch); #endif diff --git a/modules/service/vpn/dap_chain_net_srv_vpn.c b/modules/service/vpn/dap_chain_net_srv_vpn.c index cc7194b05b6d08f09cf32cacaea2382716152d3d..10db028050013b28ac642d57ce77e109cad90cdd 100644 --- a/modules/service/vpn/dap_chain_net_srv_vpn.c +++ b/modules/service/vpn/dap_chain_net_srv_vpn.c @@ -510,7 +510,6 @@ static int s_callback_response_success(dap_chain_net_srv_t * a_srv, uint32_t a_u pthread_rwlock_init(&l_usage_client->rwlock,NULL); memcpy(l_usage_client->receipt, l_receipt, l_receipt_size); - pthread_rwlock_wrlock(&s_clients_rwlock); HASH_ADD(hh, s_clients,usage_id,sizeof(a_usage_id),l_usage_client); @@ -703,7 +702,6 @@ void srv_ch_vpn_delete(dap_stream_ch_t* ch, void* arg) l_is_unleased = true; pthread_rwlock_unlock(& s_raw_server_rwlock); } - pthread_rwlock_wrlock(&s_clients_rwlock); if(s_ch_vpn_addrs) { HASH_DEL(s_ch_vpn_addrs, l_ch_vpn); @@ -720,7 +718,6 @@ void srv_ch_vpn_delete(dap_stream_ch_t* ch, void* arg) HASH_FIND(hh,s_clients, &l_ch_vpn->usage_id,sizeof(l_ch_vpn->usage_id),l_usage_client ); if (l_usage_client){ pthread_rwlock_wrlock(&l_usage_client->rwlock); - l_usage_client->ch_vpn = NULL; // NULL the channel, nobody uses that indicates pthread_rwlock_unlock(&l_usage_client->rwlock); } @@ -1611,7 +1608,7 @@ void* srv_ch_sf_thread_raw(void *arg) s_update_limits(l_ch_vpn->ch,l_srv_session,l_usage, l_read_ret); } } - pthread_rwlock_unlock(&s_clients_rwlock); + pthread_rwlock_unlock(&s_clients_rwlock);\ } }/*else { log_it(L_CRITICAL,"select() has no tun handler in the returned set");