Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • cellframe/cellframe-sdk
  • MIKA83/cellframe-sdk
2 results
Show changes
Commits on Source (3)
......@@ -45,6 +45,13 @@
#define LOG_TAG "dap_stream_ch"
static struct dap_stream_ch_table_t {
dap_stream_ch_t *ch;
UT_hash_handle hh;
} *s_ch_table = NULL;
static pthread_mutex_t s_ch_table_lock;
/**
* @brief stream_ch_init Init stream channel module
* @return Zero if ok others if no
......@@ -59,6 +66,7 @@ int dap_stream_ch_init()
log_it(L_CRITICAL,"Can't init stream channel packet submodule");
return -1;
}
pthread_mutex_init(&s_ch_table_lock, NULL);
log_it(L_NOTICE,"Module stream channel initialized");
return 0;
}
......@@ -68,6 +76,7 @@ int dap_stream_ch_init()
*/
void dap_stream_ch_deinit()
{
pthread_mutex_destroy(&s_ch_table_lock);
}
/**
......@@ -82,7 +91,8 @@ dap_stream_ch_t* dap_stream_ch_new(dap_stream_t* a_stream, uint8_t id)
dap_stream_ch_t* ret = DAP_NEW_Z(dap_stream_ch_t);
ret->stream = a_stream;
ret->proc = proc;
ret->ready_to_read=true;
ret->ready_to_read = true;
pthread_mutex_init(&(ret->mutex),NULL);
if(ret->proc->new_callback)
ret->proc->new_callback(ret,NULL);
......@@ -92,6 +102,11 @@ dap_stream_ch_t* dap_stream_ch_new(dap_stream_t* a_stream, uint8_t id)
a_stream->channel_count++;
pthread_rwlock_unlock(&a_stream->rwlock);
struct dap_stream_ch_table_t *l_new_ch = DAP_NEW_Z(struct dap_stream_ch_table_t);
pthread_mutex_lock(&s_ch_table_lock);
HASH_ADD_PTR(s_ch_table, ch, l_new_ch);
pthread_mutex_unlock(&s_ch_table_lock);
return ret;
}else{
log_it(L_WARNING, "Unknown stream processor with id %uc",id);
......@@ -99,24 +114,40 @@ dap_stream_ch_t* dap_stream_ch_new(dap_stream_t* a_stream, uint8_t id)
}
}
bool dap_stream_ch_valid(dap_stream_ch_t *a_ch)
{
struct dap_stream_ch_table_t *l_ret;
pthread_mutex_lock(&s_ch_table_lock);
HASH_FIND_PTR(s_ch_table, a_ch, l_ret);
pthread_mutex_unlock(&s_ch_table_lock);
return l_ret;
}
/**
* @brief stream_ch_delete Delete channel instance
* @param ch Channel delete
*/
void dap_stream_ch_delete(dap_stream_ch_t*ch)
void dap_stream_ch_delete(dap_stream_ch_t *a_ch)
{
if(ch->proc)
if(ch->proc->delete_callback)
ch->proc->delete_callback(ch,NULL);
pthread_mutex_lock(&s_ch_table_lock);
struct dap_stream_ch_table_t *l_ret;;
HASH_FIND_PTR(s_ch_table, a_ch, l_ret);
HASH_DEL(s_ch_table, l_ret);
pthread_mutex_unlock(&s_ch_table_lock);
pthread_mutex_destroy(&(ch->mutex));
pthread_mutex_lock(&a_ch->mutex);
if (a_ch->proc)
if (a_ch->proc->delete_callback)
a_ch->proc->delete_callback(a_ch, NULL);
pthread_mutex_unlock(&a_ch->mutex);
pthread_mutex_destroy(&a_ch->mutex);
/* fixed raise, but probably may be memory leak!
if(ch->internal){
free(ch->internal);
}
*/
//free(ch);
DAP_DELETE(a_ch);
}
/**
......@@ -126,6 +157,9 @@ void dap_stream_ch_delete(dap_stream_ch_t*ch)
*/
void dap_stream_ch_set_ready_to_read(dap_stream_ch_t * a_ch,bool a_is_ready)
{
if (!dap_stream_ch_valid(a_ch)) {
return;
}
pthread_mutex_lock(&a_ch->mutex);
if( a_ch->ready_to_read != a_is_ready){
//log_it(L_DEBUG,"Change channel '%c' to %s", (char) ch->proc->id, is_ready?"true":"false");
......@@ -149,6 +183,9 @@ void dap_stream_ch_set_ready_to_read(dap_stream_ch_t * a_ch,bool a_is_ready)
*/
void dap_stream_ch_set_ready_to_write(dap_stream_ch_t * ch,bool is_ready)
{
if (!dap_stream_ch_valid(ch)) {
return;
}
pthread_mutex_lock(&ch->mutex);
if(ch->ready_to_write!=is_ready){
//log_it(L_DEBUG,"Change channel '%c' to %s", (char) ch->proc->id, is_ready?"true":"false");
......@@ -166,3 +203,37 @@ void dap_stream_ch_set_ready_to_write(dap_stream_ch_t * ch,bool is_ready)
}
pthread_mutex_unlock(&ch->mutex);
}
/**
* @brief dap_stream_ch_get_ready_to_read
* @param a_ch
* @return
*/
bool dap_stream_ch_get_ready_to_read(dap_stream_ch_t * a_ch)
{
if (!dap_stream_ch_valid(a_ch)) {
return false;
}
bool l_ret;
pthread_mutex_lock(&a_ch->mutex);
l_ret = a_ch->ready_to_read;
pthread_mutex_unlock(&a_ch->mutex);
return l_ret;
}
/**
* @brief dap_stream_ch_get_ready_to_write
* @param a_ch
* @return
*/
bool dap_stream_ch_get_ready_to_write(dap_stream_ch_t * a_ch)
{
if (!dap_stream_ch_valid(a_ch)) {
return false;
}
bool l_ret;
pthread_mutex_lock(&a_ch->mutex);
l_ret = a_ch->ready_to_write;
pthread_mutex_unlock(&a_ch->mutex);
return l_ret;
}
......@@ -79,6 +79,9 @@ size_t dap_stream_ch_pkt_write(struct dap_stream_ch * a_ch, uint8_t a_type, con
log_it(L_WARNING,"Zero data size to write out in channel");
return 0;
}
if (!dap_stream_ch_valid(a_ch)) {
return 0;
}
pthread_mutex_lock( &a_ch->mutex);
//log_it(L_DEBUG,"Output: Has %u bytes of %c type for %c channel id",data_size, (char)type, (char) ch->proc->id );
......
......@@ -58,34 +58,11 @@ dap_stream_ch_t* dap_stream_ch_new( dap_stream_t * dap_stream,uint8_t id);
void dap_stream_ch_set_ready_to_read(dap_stream_ch_t * ch,bool is_ready);
void dap_stream_ch_set_ready_to_write(dap_stream_ch_t * ch,bool is_ready);
/**
* @brief dap_stream_ch_get_ready_to_read
* @param a_ch
* @return
*/
static inline bool dap_stream_ch_get_ready_to_read(dap_stream_ch_t * a_ch)
{
bool l_ret;
pthread_mutex_lock(&a_ch->mutex);
l_ret = a_ch->ready_to_read;
pthread_mutex_unlock(&a_ch->mutex);
return l_ret;
}
/**
* @brief dap_stream_ch_get_ready_to_write
* @param a_ch
* @return
*/
static inline bool dap_stream_ch_get_ready_to_write(dap_stream_ch_t * a_ch)
{
bool l_ret;
pthread_mutex_lock(&a_ch->mutex);
l_ret = a_ch->ready_to_write;
pthread_mutex_unlock(&a_ch->mutex);
return l_ret;
}
void dap_stream_ch_delete(dap_stream_ch_t*ch);
bool dap_stream_ch_get_ready_to_read(dap_stream_ch_t *a_ch);
bool dap_stream_ch_get_ready_to_write(dap_stream_ch_t *a_ch);
void dap_stream_ch_delete(dap_stream_ch_t *a_ch);
bool dap_stream_ch_valid(dap_stream_ch_t *a_ch);
#endif
......@@ -510,7 +510,6 @@ static int s_callback_response_success(dap_chain_net_srv_t * a_srv, uint32_t a_u
pthread_rwlock_init(&l_usage_client->rwlock,NULL);
memcpy(l_usage_client->receipt, l_receipt, l_receipt_size);
pthread_rwlock_wrlock(&s_clients_rwlock);
HASH_ADD(hh, s_clients,usage_id,sizeof(a_usage_id),l_usage_client);
......@@ -703,7 +702,6 @@ void srv_ch_vpn_delete(dap_stream_ch_t* ch, void* arg)
l_is_unleased = true;
pthread_rwlock_unlock(& s_raw_server_rwlock);
}
pthread_rwlock_wrlock(&s_clients_rwlock);
if(s_ch_vpn_addrs) {
HASH_DEL(s_ch_vpn_addrs, l_ch_vpn);
......@@ -720,7 +718,6 @@ void srv_ch_vpn_delete(dap_stream_ch_t* ch, void* arg)
HASH_FIND(hh,s_clients, &l_ch_vpn->usage_id,sizeof(l_ch_vpn->usage_id),l_usage_client );
if (l_usage_client){
pthread_rwlock_wrlock(&l_usage_client->rwlock);
l_usage_client->ch_vpn = NULL; // NULL the channel, nobody uses that indicates
pthread_rwlock_unlock(&l_usage_client->rwlock);
}
......@@ -1611,7 +1608,7 @@ void* srv_ch_sf_thread_raw(void *arg)
s_update_limits(l_ch_vpn->ch,l_srv_session,l_usage, l_read_ret);
}
}
pthread_rwlock_unlock(&s_clients_rwlock);
pthread_rwlock_unlock(&s_clients_rwlock);\
}
}/*else {
log_it(L_CRITICAL,"select() has no tun handler in the returned set");
......