Skip to content

Commit

Permalink
fix process poller
Browse files Browse the repository at this point in the history
  • Loading branch information
waruqi committed Jan 22, 2020
1 parent 01394b4 commit 010e069
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 29 deletions.
5 changes: 4 additions & 1 deletion src/demo/platform/poller_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ tb_int_t tb_demo_platform_poller_process_main(tb_int_t argc, tb_char_t** argv)
}

// wait events
while (tb_poller_wait(poller, tb_demo_poller_event, -1) > 0) ;
while (tb_poller_wait(poller, tb_demo_poller_event, -1) >= 0) ;

// end
tb_trace_i("finished");

} while (0);

Expand Down
12 changes: 7 additions & 5 deletions src/tbox/platform/mach/poller_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,15 @@ static tb_long_t tb_poller_kqueue_wait(tb_poller_t* self, tb_poller_event_func_t
}

// wait events
tb_long_t events_count = 0;
do { events_count = kevent(poller->kqfd, tb_null, 0, poller->events, poller->events_count, timeout >= 0? &t : tb_null); } while (events_count == -1 && errno == EINTR);
tb_long_t events_count = kevent(poller->kqfd, tb_null, 0, poller->events, poller->events_count, timeout >= 0? &t : tb_null);

// timeout or interrupted?
if (!events_count || (events_count == -1 && errno == EINTR))
return 0;

// error?
tb_assert_and_check_return_val(events_count >= 0 && events_count <= poller->events_count, -1);

// timeout?
tb_check_return_val(events_count, 0);

// grow it if events is full
if (events_count == poller->events_count)
{
Expand Down
26 changes: 13 additions & 13 deletions src/tbox/platform/poller.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* includes
*/
#include "poller.h"
#include "time.h"
#include "impl/poller.h"
#include "impl/pollerdata.h"

Expand Down Expand Up @@ -251,25 +252,24 @@ tb_long_t tb_poller_wait(tb_poller_ref_t self, tb_poller_event_func_t func, tb_l

#ifdef TB_POLLER_ENABLE_PROCESS
// prepare to wait the processes
if (poller->process_poller && !tb_poller_process_wait_prepare(poller->process_poller))
return -1;
#endif

// wait the poller objects
tb_long_t wait = poller->wait(poller, func, timeout);
tb_assert_and_check_return_val(wait >= 0, -1);

#ifdef TB_POLLER_ENABLE_PROCESS
// poll all waited processes
if (poller->process_poller)
{
// prepare to wait processes
if (!tb_poller_process_wait_prepare(poller->process_poller))
return -1;

// wait the poller objects
tb_long_t wait = poller->wait(poller, func, timeout);
tb_check_return_val(wait >= 0, -1);

// poll all waited processes
tb_long_t proc_wait = tb_poller_process_wait_poll(poller->process_poller, func);
tb_assert_and_check_return_val(proc_wait >= 0, -1);

tb_check_return_val(proc_wait >= 0, -1);
wait += proc_wait;
return wait;
}
#endif
return wait;
return poller->wait(poller, func, timeout);
}
tb_void_t tb_poller_attach(tb_poller_ref_t self)
{
Expand Down
2 changes: 1 addition & 1 deletion src/tbox/platform/poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ tb_bool_t tb_poller_modify(tb_poller_ref_t poller, tb_poller_object_re
* @param func the events function
* @param timeout the timeout, infinity: -1
*
* @return > 0: the events number, 0: timeout, -1: failed
* @return > 0: the events number, 0: timeout or interrupted, -1: failed
*/
tb_long_t tb_poller_wait(tb_poller_ref_t poller, tb_poller_event_func_t func, tb_long_t timeout);

Expand Down
16 changes: 8 additions & 8 deletions src/tbox/platform/posix/poller_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,9 @@ static tb_int_t tb_poller_process_loop(tb_cpointer_t priv)
}
static tb_void_t tb_poller_process_signal_handler(tb_int_t signo)
{
// check
tb_poller_process_t* poller = g_process_poller;
tb_assert_and_check_return(poller);

// trace
tb_trace_d("process: signo: %d", signo);

// post semaphore to wait processes
if (poller->semaphore) tb_semaphore_post(poller->semaphore, 1);
if (g_process_poller && g_process_poller->semaphore)
tb_semaphore_post(g_process_poller->semaphore, 1);
}

/* //////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -382,8 +376,14 @@ static tb_bool_t tb_poller_process_remove(tb_poller_process_ref_t self, tb_proce
}
static tb_bool_t tb_poller_process_wait_prepare(tb_poller_process_ref_t self)
{
// check
tb_poller_process_t* poller = (tb_poller_process_t*)self;

// trace
tb_trace_d("process: prepare %lu", tb_hash_map_size(((tb_poller_process_t*)self)->processes_data));

// post semaphore to wait processes
if (poller->semaphore) tb_semaphore_post(poller->semaphore, 1);
return tb_true;
}
static tb_long_t tb_poller_process_wait_poll(tb_poller_process_ref_t self, tb_poller_event_func_t func)
Expand Down
2 changes: 1 addition & 1 deletion src/tbox/platform/posix/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
// the thread type
typedef struct __tb_thread_t
{
// the pthread
// the pthread, @note We must put it at the beginning, because posix/mach/thread_affinity will use it.
pthread_t pthread;

// is joined?
Expand Down

0 comments on commit 010e069

Please sign in to comment.