gatserver: Suspend/resume GAtServer with GAtIO

Support g_at_server_suspend and g_at_server_resume operation by using
GAtIO to handle IO related function.
This commit is contained in:
Zhenhua Zhang 2010-06-12 13:50:40 +08:00 committed by Denis Kenzior
parent 02a54376c9
commit 047ea0cebb
2 changed files with 115 additions and 151 deletions

View File

@ -31,6 +31,7 @@
#include "ringbuffer.h"
#include "gatserver.h"
#include "gatio.h"
#define BUF_SIZE 4096
/* <cr><lf> + the max length of information text + <cr><lf> */
@ -100,16 +101,13 @@ struct at_command {
struct _GAtServer {
gint ref_count; /* Ref count */
struct v250_settings v250; /* V.250 command setting */
GIOChannel *channel; /* Server IO */
guint read_watch; /* GSource read id, 0 if none */
guint write_watch; /* GSource write id, 0 if none */
GAtIO *io; /* Server IO */
guint read_so_far; /* Number of bytes processed */
GAtDisconnectFunc user_disconnect; /* User disconnect func */
gpointer user_disconnect_data; /* User disconnect data */
GAtDebugFunc debugf; /* Debugging output function */
gpointer debug_data; /* Data to pass to debug func */
GHashTable *command_list; /* List of AT commands */
struct ring_buffer *read_buf; /* Current read buffer */
GQueue *write_queue; /* Write buffer queue */
guint max_read_attempts; /* Max reads per select */
enum ParserState parser_state;
@ -117,12 +115,13 @@ struct _GAtServer {
char *last_line; /* Last read line */
unsigned int cur_pos; /* Where we are on the line */
GAtServerResult last_result;
gboolean processing_cmdline;
gboolean suspended;
gboolean final_sent;
gboolean final_async;
gboolean in_read_handler;
};
static void g_at_server_wakeup_writer(GAtServer *server);
static void server_wakeup_writer(GAtServer *server);
static void server_parse_line(GAtServer *server);
static struct ring_buffer *allocate_next(GAtServer *server)
@ -162,7 +161,7 @@ static void send_common(GAtServer *server, const char *buf, unsigned int len)
write_buf = allocate_next(server);
}
g_at_server_wakeup_writer(server);
server_wakeup_writer(server);
}
static void send_result_common(GAtServer *server, const char *result)
@ -198,14 +197,14 @@ void g_at_server_send_final(GAtServer *server, GAtServerResult result)
server->final_sent = TRUE;
server->last_result = result;
if (result == G_AT_SERVER_RESULT_OK && server->processing_cmdline) {
if (result == G_AT_SERVER_RESULT_OK && server->suspended) {
if (server->final_async)
server_parse_line(server);
return;
}
server->processing_cmdline = FALSE;
g_at_server_resume(server);
if (server->v250.is_v1)
sprintf(buf, "%s", server_result_to_string(result));
@ -219,7 +218,7 @@ void g_at_server_send_ext_final(GAtServer *server, const char *result)
{
server->final_sent = TRUE;
server->last_result = G_AT_SERVER_RESULT_EXT_ERROR;
server->processing_cmdline = FALSE;
g_at_server_resume(server);
send_result_common(server, result);
}
@ -668,7 +667,7 @@ static void server_parse_line(GAtServer *server)
server->final_async = FALSE;
if (pos == 0)
server->processing_cmdline = TRUE;
g_at_server_suspend(server);
while (pos < len) {
unsigned int consumed;
@ -702,7 +701,7 @@ static void server_parse_line(GAtServer *server)
return;
}
server->processing_cmdline = FALSE;
g_at_server_resume(server);
g_at_server_send_final(server, G_AT_SERVER_RESULT_OK);
}
@ -780,11 +779,11 @@ out:
return res;
}
static char *extract_line(GAtServer *p)
static char *extract_line(GAtServer *p, struct ring_buffer *rbuf)
{
unsigned int wrap = ring_buffer_len_no_wrap(p->read_buf);
unsigned int wrap = ring_buffer_len_no_wrap(rbuf);
unsigned int pos = 0;
unsigned char *buf = ring_buffer_read_ptr(p->read_buf, pos);
unsigned char *buf = ring_buffer_read_ptr(rbuf, pos);
int strip_front = 0;
int line_length = 0;
gboolean in_string = FALSE;
@ -806,7 +805,7 @@ static char *extract_line(GAtServer *p)
pos += 1;
if (pos == wrap)
buf = ring_buffer_read_ptr(p->read_buf, pos);
buf = ring_buffer_read_ptr(rbuf, pos);
}
/* We will strip AT and S3 */
@ -814,17 +813,17 @@ static char *extract_line(GAtServer *p)
line = g_try_new(char, line_length + 1);
if (!line) {
ring_buffer_drain(p->read_buf, p->read_so_far);
ring_buffer_drain(rbuf, p->read_so_far);
return NULL;
}
/* Strip leading whitespace + AT */
ring_buffer_drain(p->read_buf, strip_front + 2);
ring_buffer_drain(rbuf, strip_front + 2);
pos = 0;
i = 0;
wrap = ring_buffer_len_no_wrap(p->read_buf);
buf = ring_buffer_read_ptr(p->read_buf, pos);
wrap = ring_buffer_len_no_wrap(rbuf);
buf = ring_buffer_read_ptr(rbuf, pos);
while (pos < (p->read_so_far - strip_front - 2)) {
if (*buf == '"')
@ -839,33 +838,39 @@ static char *extract_line(GAtServer *p)
pos += 1;
if (pos == wrap)
buf = ring_buffer_read_ptr(p->read_buf, pos);
buf = ring_buffer_read_ptr(rbuf, pos);
}
/* Strip S3 */
ring_buffer_drain(p->read_buf, p->read_so_far - strip_front - 2);
ring_buffer_drain(rbuf, p->read_so_far - strip_front - 2);
line[i] = '\0';
return line;
}
static void new_bytes(GAtServer *p)
static void new_bytes(struct ring_buffer *rbuf, gpointer user_data)
{
unsigned int len = ring_buffer_len(p->read_buf);
unsigned int wrap = ring_buffer_len_no_wrap(p->read_buf);
unsigned char *buf = ring_buffer_read_ptr(p->read_buf, p->read_so_far);
GAtServer *p = user_data;
unsigned int len = ring_buffer_len(rbuf);
unsigned int wrap = ring_buffer_len_no_wrap(rbuf);
unsigned char *buf = ring_buffer_read_ptr(rbuf, p->read_so_far);
enum ParserResult result;
while (p->channel && (p->read_so_far < len)) {
p->in_read_handler = TRUE;
while (p->io && (p->read_so_far < len)) {
gsize rbytes = MIN(len - p->read_so_far, wrap - p->read_so_far);
result = server_feed(p, (char *)buf, &rbytes);
if (p->v250.echo)
send_common(p, (char *)buf, rbytes);
buf += rbytes;
p->read_so_far += rbytes;
if (p->read_so_far == wrap) {
buf = ring_buffer_read_ptr(p->read_buf, p->read_so_far);
buf = ring_buffer_read_ptr(rbuf, p->read_so_far);
wrap = len;
}
@ -879,14 +884,14 @@ static void new_bytes(GAtServer *p)
* Empty commands must be OK by the DCE
*/
g_at_server_send_final(p, G_AT_SERVER_RESULT_OK);
ring_buffer_drain(p->read_buf, p->read_so_far);
ring_buffer_drain(rbuf, p->read_so_far);
break;
case PARSER_RESULT_COMMAND:
{
g_free(p->last_line);
p->last_line = extract_line(p);
p->last_line = extract_line(p, rbuf);
p->cur_pos = 0;
if (p->last_line)
@ -905,11 +910,11 @@ static void new_bytes(GAtServer *p)
else
g_at_server_send_final(p,
G_AT_SERVER_RESULT_OK);
ring_buffer_drain(p->read_buf, p->read_so_far);
ring_buffer_drain(rbuf, p->read_so_far);
break;
default:
ring_buffer_drain(p->read_buf, p->read_so_far);
ring_buffer_drain(rbuf, p->read_so_far);
break;
}
@ -918,72 +923,15 @@ static void new_bytes(GAtServer *p)
p->read_so_far = 0;
}
/* We're overflowing the buffer, shutdown the socket */
if (p->read_buf && ring_buffer_avail(p->read_buf) == 0)
g_source_remove(p->read_watch);
p->in_read_handler = FALSE;
if (p->destroyed)
g_free(p);
}
static gboolean received_data(GIOChannel *channel, GIOCondition cond,
gpointer data)
{
unsigned char *buf;
GAtServer *server = data;
GIOError err;
gsize rbytes;
gsize toread;
guint total_read = 0;
guint read_count = 0;
if (cond & G_IO_NVAL)
return FALSE;
do {
toread = ring_buffer_avail_no_wrap(server->read_buf);
if (toread == 0)
break;
rbytes = 0;
buf = ring_buffer_write_ptr(server->read_buf, 0);
err = g_io_channel_read(channel, (char *) buf, toread, &rbytes);
g_at_util_debug_chat(TRUE, (char *)buf, rbytes,
server->debugf, server->debug_data);
read_count++;
if (rbytes == 0)
break;
if (server->v250.echo)
send_common(server, (char *)buf, rbytes);
/* Ignore incoming bytes when processing a command line */
if (server->processing_cmdline)
continue;
total_read += rbytes;
ring_buffer_write_advance(server->read_buf, rbytes);
} while (err == G_IO_ERROR_NONE &&
read_count < server->max_read_attempts);
if (total_read > 0)
new_bytes(server);
if (cond & (G_IO_HUP | G_IO_ERR))
return FALSE;
if (read_count > 0 && rbytes == 0 && err != G_IO_ERROR_AGAIN)
return FALSE;
return TRUE;
}
static gboolean can_write_data(GIOChannel *channel, GIOCondition cond,
gpointer data)
static gboolean can_write_data(gpointer data)
{
GAtServer *server = data;
GIOError err;
gsize bytes_written;
gsize towrite;
struct ring_buffer *write_buf;
@ -992,9 +940,6 @@ static gboolean can_write_data(GIOChannel *channel, GIOCondition cond,
int limiter;
#endif
if (cond & (G_IO_NVAL | G_IO_HUP | G_IO_ERR))
return FALSE;
if (!server->write_queue)
return FALSE;
@ -1012,22 +957,17 @@ static gboolean can_write_data(GIOChannel *channel, GIOCondition cond,
limiter = 5;
#endif
err = g_io_channel_write(server->channel,
bytes_written = g_at_io_write(server->io,
(char *)buf,
#ifdef WRITE_SCHEDULER_DEBUG
limiter,
limiter
#else
towrite,
towrite
#endif
&bytes_written);
);
if (err != G_IO_ERROR_NONE) {
g_source_remove(server->read_watch);
if (bytes_written == 0)
return FALSE;
}
g_at_util_debug_chat(FALSE, (char *)buf, bytes_written, server->debugf,
server->debug_data);
ring_buffer_drain(write_buf, bytes_written);
@ -1059,10 +999,6 @@ static void write_queue_free(GQueue *write_queue)
static void g_at_server_cleanup(GAtServer *server)
{
/* Cleanup all received data */
ring_buffer_free(server->read_buf);
server->read_buf = NULL;
/* Cleanup pending data to write */
write_queue_free(server->write_queue);
@ -1071,15 +1007,15 @@ static void g_at_server_cleanup(GAtServer *server)
g_free(server->last_line);
server->channel = NULL;
g_at_io_unref(server->io);
server->io = NULL;
}
static void read_watcher_destroy_notify(gpointer user_data)
static void io_disconnect(gpointer user_data)
{
GAtServer *server = user_data;
g_at_server_cleanup(server);
server->read_watch = 0;
if (server->user_disconnect)
server->user_disconnect(server->user_disconnect_data);
@ -1088,23 +1024,9 @@ static void read_watcher_destroy_notify(gpointer user_data)
g_free(server);
}
static void write_watcher_destroy_notify(gpointer user_data)
static void server_wakeup_writer(GAtServer *server)
{
GAtServer *server = user_data;
server->write_watch = 0;
}
static void g_at_server_wakeup_writer(GAtServer *server)
{
if (server->write_watch != 0)
return;
server->write_watch = g_io_add_watch_full(server->channel,
G_PRIORITY_DEFAULT,
G_IO_OUT | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
can_write_data, server,
write_watcher_destroy_notify);
g_at_io_set_write_handler(server->io, can_write_data, server);
}
static void v250_settings_create(struct v250_settings *v250)
@ -1156,13 +1078,15 @@ GAtServer *g_at_server_new(GIOChannel *io)
server->ref_count = 1;
v250_settings_create(&server->v250);
server->channel = io;
server->io = g_at_io_new(io);
if (!server->io)
goto error;
g_at_io_set_disconnect_function(server->io, io_disconnect, server);
server->command_list = g_hash_table_new_full(g_str_hash, g_str_equal,
g_free,
at_notify_node_destroy);
server->read_buf = ring_buffer_new(BUF_SIZE);
if (!server->read_buf)
goto error;
server->write_queue = g_queue_new();
if (!server->write_queue)
@ -1173,25 +1097,18 @@ GAtServer *g_at_server_new(GIOChannel *io)
server->max_read_attempts = 3;
if (!g_at_util_setup_io(server->channel, G_IO_FLAG_NONBLOCK))
goto error;
server->read_watch = g_io_add_watch_full(io, G_PRIORITY_DEFAULT,
G_IO_IN | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
received_data, server,
read_watcher_destroy_notify);
g_at_io_set_read_handler(server->io, new_bytes, server);
basic_command_register(server);
return server;
error:
g_at_io_unref(server->io);
if (server->command_list)
g_hash_table_destroy(server->command_list);
if (server->read_buf)
ring_buffer_free(server->read_buf);
if (server->write_queue)
write_queue_free(server->write_queue);
@ -1201,6 +1118,22 @@ error:
return NULL;
}
GIOChannel *g_at_server_get_channel(GAtServer *server)
{
if (server == NULL || server->io == NULL)
return NULL;
return g_at_io_get_channel(server->io);
}
GAtIO *g_at_server_get_io(GAtServer *server)
{
if (server == NULL)
return NULL;
return server->io;
}
GAtServer *g_at_server_ref(GAtServer *server)
{
if (server == NULL)
@ -1211,6 +1144,33 @@ GAtServer *g_at_server_ref(GAtServer *server)
return server;
}
void g_at_server_suspend(GAtServer *server)
{
if (server == NULL)
return;
server->suspended = TRUE;
g_at_io_set_write_handler(server->io, NULL, NULL);
g_at_io_set_read_handler(server->io, NULL, NULL);
g_at_io_set_debug(server->io, NULL, NULL);
}
void g_at_server_resume(GAtServer *server)
{
if (server == NULL)
return;
server->suspended = FALSE;
g_at_io_set_debug(server->io, server->debugf, server->debug_data);
g_at_io_set_read_handler(server->io, new_bytes, server);
if (g_queue_get_length(server->write_queue) > 0)
server_wakeup_writer(server);
}
void g_at_server_unref(GAtServer *server)
{
gboolean is_zero;
@ -1223,6 +1183,11 @@ void g_at_server_unref(GAtServer *server)
if (is_zero == FALSE)
return;
if (server->io) {
g_at_server_suspend(server);
g_at_server_cleanup(server);
}
g_at_server_shutdown(server);
/* glib delays the destruction of the watcher until it exits, this
@ -1230,7 +1195,7 @@ void g_at_server_unref(GAtServer *server)
* destroyed already. We have to wait until the read_watcher
* destroy function gets called
*/
if (server->read_watch != 0)
if (server->in_read_handler)
server->destroyed = TRUE;
else
g_free(server);
@ -1245,12 +1210,6 @@ gboolean g_at_server_shutdown(GAtServer *server)
server->user_disconnect = NULL;
server->user_disconnect_data = NULL;
if (server->write_watch)
g_source_remove(server->write_watch);
if (server->read_watch)
g_source_remove(server->read_watch);
return TRUE;
}

View File

@ -28,6 +28,7 @@ extern "C" {
#include "gatresult.h"
#include "gatutil.h"
#include "gatio.h"
struct _GAtServer;
@ -68,8 +69,12 @@ typedef void (*GAtServerNotifyFunc)(GAtServerRequestType type,
GAtResult *result, gpointer user_data);
GAtServer *g_at_server_new(GIOChannel *io);
GIOChannel *g_at_server_get_channel(GAtServer *server);
GAtIO *g_at_server_get_io(GAtServer *server);
GAtServer *g_at_server_ref(GAtServer *server);
void g_at_server_suspend(GAtServer *server);
void g_at_server_resume(GAtServer *server);
void g_at_server_unref(GAtServer *server);
gboolean g_at_server_shutdown(GAtServer *server);