Skip to content

Commit

Permalink
add poller_pipe demo
Browse files Browse the repository at this point in the history
  • Loading branch information
waruqi committed Jan 12, 2020
1 parent f2de23b commit 8415397
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 14 deletions.
14 changes: 7 additions & 7 deletions src/demo/coroutine/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,37 @@
static tb_int_t tb_demo_thread_server(tb_cpointer_t priv)
{
tb_pipe_file_ref_t pipe = (tb_pipe_file_ref_t) priv;
tb_byte_t buf[BUFSIZE];
tb_byte_t data[BUFSIZE];
tb_size_t count = 0;
while (1)
{
count++;
if (!tb_pipe_file_bread(pipe, buf, sizeof(buf))) break;
if (!tb_pipe_file_bread(pipe, data, sizeof(data))) break;
tb_usleep(50000);
if (!tb_pipe_file_bwrit(pipe, buf, sizeof(buf))) break;
if (!tb_pipe_file_bwrit(pipe, data, sizeof(data))) break;
tb_trace_i(" === %.4llu (%llu bytes)", count, count * BUFSIZE);
}
return 0;
}

static tb_void_t tb_demo_coroutine_writ(tb_cpointer_t priv)
{
tb_byte_t buf[BUFSIZE];
tb_byte_t data[BUFSIZE];
tb_pipe_file_ref_t pipe = (tb_pipe_file_ref_t) priv;
for (tb_size_t i = 0; i < COUNT; i++)
{
if (!tb_pipe_file_bwrit(pipe, buf, sizeof(buf))) break;
if (!tb_pipe_file_bwrit(pipe, data, sizeof(data))) break;
tb_trace_i("[-->] %.4llu", i);
}
}

static tb_void_t tb_demo_coroutine_read(tb_cpointer_t priv)
{
tb_byte_t buf[BUFSIZE];
tb_byte_t data[BUFSIZE];
tb_pipe_file_ref_t pipe = (tb_pipe_file_ref_t) priv;
for (tb_size_t count = 0;; count++)
{
if (!tb_pipe_file_bread(pipe, buf, sizeof(buf))) break;
if (!tb_pipe_file_bread(pipe, data, sizeof(data))) break;
tb_trace_i("[<--] %.4llu", count);
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/demo/coroutine/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,37 @@
static tb_int_t tb_demo_thread_server(tb_cpointer_t priv)
{
tb_socket_ref_t sock = (tb_socket_ref_t) priv;
tb_byte_t buf[BUFSIZE];
tb_byte_t data[BUFSIZE];
tb_size_t count = 0;
while (1)
{
count++;
if (!tb_socket_brecv(sock, buf, sizeof(buf))) break;
if (!tb_socket_brecv(sock, data, sizeof(data))) break;
tb_usleep(50000);
if (!tb_socket_bsend(sock, buf, sizeof(buf))) break;
if (!tb_socket_bsend(sock, data, sizeof(data))) break;
tb_trace_i(" === %.4llu (%llu bytes)", count, count * BUFSIZE);
}
return 0;
}

static tb_void_t tb_demo_coroutine_send(tb_cpointer_t priv)
{
tb_byte_t buf[BUFSIZE];
tb_byte_t data[BUFSIZE];
tb_socket_ref_t sock = (tb_socket_ref_t) priv;
for (tb_size_t i = 0; i < COUNT; i++)
{
if (!tb_socket_bsend(sock, buf, sizeof(buf))) break;
if (!tb_socket_bsend(sock, data, sizeof(data))) break;
tb_trace_i("[-->] %.4llu", i);
}
}

static tb_void_t tb_demo_coroutine_recv(tb_cpointer_t priv)
{
tb_byte_t buf[BUFSIZE];
tb_byte_t data[BUFSIZE];
tb_socket_ref_t sock = (tb_socket_ref_t) priv;
for (tb_size_t count = 0;; count++)
{
if (!tb_socket_brecv(sock, buf, sizeof(buf))) break;
if (!tb_socket_brecv(sock, data, sizeof(data))) break;
tb_trace_i("[<--] %.4llu", count);
}
}
Expand Down
1 change: 1 addition & 0 deletions src/demo/demo.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ static tb_demo_t g_demo[] =
, TB_DEMO_MAIN_ITEM(platform_thread)
, TB_DEMO_MAIN_ITEM(platform_thread_pool)
, TB_DEMO_MAIN_ITEM(platform_thread_local)
, TB_DEMO_MAIN_ITEM(platform_poller_pipe)
, TB_DEMO_MAIN_ITEM(platform_poller_client)
, TB_DEMO_MAIN_ITEM(platform_poller_server)
#ifdef TB_CONFIG_MODULE_HAVE_COROUTINE
Expand Down
1 change: 1 addition & 0 deletions src/demo/demo.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ TB_DEMO_MAIN_DECL(platform_environment);
TB_DEMO_MAIN_DECL(platform_thread);
TB_DEMO_MAIN_DECL(platform_thread_pool);
TB_DEMO_MAIN_DECL(platform_thread_local);
TB_DEMO_MAIN_DECL(platform_poller_pipe);
TB_DEMO_MAIN_DECL(platform_poller_client);
TB_DEMO_MAIN_DECL(platform_poller_server);
TB_DEMO_MAIN_DECL(platform_context);
Expand Down
162 changes: 162 additions & 0 deletions src/demo/platform/poller_pipe.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/* //////////////////////////////////////////////////////////////////////////////////////
* includes
*/
#include "../demo.h"

/* //////////////////////////////////////////////////////////////////////////////////////
* macros
*/

// port
#define TB_DEMO_PORT (9090)

/* //////////////////////////////////////////////////////////////////////////////////////
* types
*/

// the client type
typedef struct __tb_demo_client_t
{
// the pipe file
tb_pipe_file_ref_t pipe;

// the read size
tb_hize_t size;

// the read data
tb_byte_t data[8192];

// wait event
tb_bool_t wait;

}tb_demo_client_t, *tb_demo_client_ref_t;

/* //////////////////////////////////////////////////////////////////////////////////////
* implementation
*/
static tb_void_t tb_demo_session_exit(tb_demo_client_ref_t client, tb_poller_ref_t poller)
{
if (client)
{
// trace
tb_trace_d("[%p]: read %llu", client->pipe, client->size);

// remove pipe from poller
// tb_poller_remove(poller, client->pipe);

// exit pipe file
if (client->pipe) tb_pipe_file_exit(client->pipe);
client->pipe = tb_null;

// exit client
tb_free(client);
}
}
static tb_long_t tb_demo_session_read(tb_demo_client_ref_t client)
{
while (1)
{
tb_long_t real = tb_pipe_file_read(client->pipe, client->data, sizeof(client->data));
if (real > 0)
{
client->size += real;
client->wait = tb_false;
}
else if (!real && !client->wait)
{
client->wait = tb_true;
break;
}
else return -1;
}
return 0;
}
static tb_int_t tb_demo_session_writ(tb_cpointer_t priv)
{
tb_pipe_file_ref_t pipe = (tb_pipe_file_ref_t)priv;
tb_byte_t data[8192 * 16];
tb_size_t count = 100;
tb_usleep(50000);
while (count--)
{
tb_trace_d("[%p]: write %llu", pipe, sizeof(data));
if (!tb_pipe_file_bwrit(pipe, data, sizeof(data))) break;
}
return 0;
}
static tb_void_t tb_demo_session_start(tb_poller_ref_t poller, tb_pipe_file_ref_t pipe)
{
// trace
tb_trace_d("[%p]: recving ..", pipe);

// init client
tb_demo_client_ref_t client = tb_malloc0_type(tb_demo_client_t);
if (client)
{
client->pipe = pipe;
client->wait = tb_false;
if (!tb_demo_session_read(client))
{
tb_size_t events = TB_POLLER_EVENT_RECV;
if (tb_poller_support(poller, TB_POLLER_EVENT_CLEAR))
events |= TB_POLLER_EVENT_CLEAR;
//tb_poller_insert(poller, pipe, events, client);
}
else tb_demo_session_exit(client, poller);
}
}
static tb_void_t tb_demo_poller_open(tb_poller_ref_t poller)
{
tb_pipe_file_ref_t pair[2];
if (tb_pipe_file_init_pair(pair, 4096))
{
tb_demo_session_start(poller, pair[0]);
tb_thread_init(tb_null, tb_demo_session_writ, pair[1], 0);
}
}
static tb_void_t tb_demo_poller_event(tb_poller_ref_t poller, tb_socket_ref_t pipe, tb_size_t events, tb_cpointer_t priv)
{
switch (events)
{
case TB_POLLER_EVENT_RECV | TB_POLLER_EVENT_EOF:
case TB_POLLER_EVENT_RECV:
{
tb_demo_client_ref_t client = (tb_demo_client_ref_t)priv;
if (tb_demo_session_read(client) || (events & TB_POLLER_EVENT_EOF))
tb_demo_session_exit(client, poller);
}
break;
default:
break;
}
}

/* //////////////////////////////////////////////////////////////////////////////////////
* main
*/
tb_int_t tb_demo_platform_poller_pipe_main(tb_int_t argc, tb_char_t** argv)
{
// start file client
tb_poller_ref_t poller = tb_null;
do
{
// init poller
poller = tb_poller_init(tb_null);
tb_assert_and_check_break(poller);

// attach poller to the current thread
tb_poller_attach(poller);

// attempt to open clients
tb_demo_poller_open(poller);

// wait events
while (tb_poller_wait(poller, tb_demo_poller_event, -1) > 0) ;

} while (0);

// exit poller
if (poller) tb_poller_exit(poller);
poller = tb_null;
return 0;
}

0 comments on commit 8415397

Please sign in to comment.