work on main loop changes

This commit is contained in:
Jay Sorg 2015-07-05 23:14:46 -07:00
parent f8432d0bb7
commit b56aa9832e
7 changed files with 142 additions and 29 deletions

View File

@ -40,13 +40,17 @@ struct stream
char *end; char *end;
char *data; char *data;
int size; int size;
int pad0;
/* offsets of various headers */ /* offsets of various headers */
char *iso_hdr; char *iso_hdr;
char *mcs_hdr; char *mcs_hdr;
char *sec_hdr; char *sec_hdr;
char *rdp_hdr; char *rdp_hdr;
char *channel_hdr; char *channel_hdr;
/* other */
char *next_packet; char *next_packet;
struct stream *next;
int *source;
}; };
/******************************************************************************/ /******************************************************************************/

View File

@ -24,6 +24,8 @@
#include "parse.h" #include "parse.h"
#include "ssl_calls.h" #include "ssl_calls.h"
#define MAX_SBYTES 0
/*****************************************************************************/ /*****************************************************************************/
int APP_CC int APP_CC
trans_tls_recv(struct trans *self, void *ptr, int len) trans_tls_recv(struct trans *self, void *ptr, int len)
@ -171,10 +173,26 @@ int APP_CC
trans_get_wait_objs_rw(struct trans *self, tbus *robjs, int *rcount, trans_get_wait_objs_rw(struct trans *self, tbus *robjs, int *rcount,
tbus *wobjs, int *wcount) tbus *wobjs, int *wcount)
{ {
if (self == 0)
{
return 1;
}
if (self->status != TRANS_STATUS_UP)
{
return 1;
}
if ((self->si != 0) && (self->si->source[self->my_source] > MAX_SBYTES))
{
}
else
{
if (trans_get_wait_objs(self, robjs, rcount) != 0) if (trans_get_wait_objs(self, robjs, rcount) != 0)
{ {
return 1; return 1;
} }
}
if (self->wait_s != 0) if (self->wait_s != 0)
{ {
@ -187,7 +205,7 @@ trans_get_wait_objs_rw(struct trans *self, tbus *robjs, int *rcount,
/*****************************************************************************/ /*****************************************************************************/
int APP_CC int APP_CC
send_waiting(struct trans *self, int block) trans_send_waiting(struct trans *self, int block)
{ {
struct stream *temp_s; struct stream *temp_s;
int bytes; int bytes;
@ -209,9 +227,13 @@ send_waiting(struct trans *self, int block)
if (sent > 0) if (sent > 0)
{ {
temp_s->p += sent; temp_s->p += sent;
if (temp_s->source != 0)
{
temp_s->source[0] -= sent;
}
if (temp_s->p >= temp_s->end) if (temp_s->p >= temp_s->end)
{ {
self->wait_s = (struct stream *) (temp_s->next_packet); self->wait_s = temp_s->next;
free_stream(temp_s); free_stream(temp_s);
} }
} }
@ -247,6 +269,7 @@ trans_check_wait_objs(struct trans *self)
int to_read = 0; int to_read = 0;
int read_so_far = 0; int read_so_far = 0;
int rv = 0; int rv = 0;
int cur_source;
if (self == 0) if (self == 0)
{ {
@ -310,8 +333,17 @@ trans_check_wait_objs(struct trans *self)
} }
else /* connected server or client (2 or 3) */ else /* connected server or client (2 or 3) */
{ {
if (self->trans_can_recv(self, self->sck, 0)) if (self->si != 0 && self->si->source[self->my_source] > MAX_SBYTES)
{ {
}
else if (g_tcp_can_recv(self->sck, 0))
{
cur_source = 0;
if (self->si != 0)
{
cur_source = self->si->cur_source;
self->si->cur_source = self->my_source;
}
read_so_far = (int) (self->in_s->end - self->in_s->data); read_so_far = (int) (self->in_s->end - self->in_s->data);
to_read = self->header_size - read_so_far; to_read = self->header_size - read_so_far;
@ -329,6 +361,10 @@ trans_check_wait_objs(struct trans *self)
{ {
/* error */ /* error */
self->status = TRANS_STATUS_DOWN; self->status = TRANS_STATUS_DOWN;
if (self->si != 0)
{
self->si->cur_source = cur_source;
}
return 1; return 1;
} }
} }
@ -336,6 +372,10 @@ trans_check_wait_objs(struct trans *self)
{ {
/* error */ /* error */
self->status = TRANS_STATUS_DOWN; self->status = TRANS_STATUS_DOWN;
if (self->si != 0)
{
self->si->cur_source = cur_source;
}
return 1; return 1;
} }
else else
@ -357,8 +397,12 @@ trans_check_wait_objs(struct trans *self)
} }
} }
} }
if (self->si != 0)
{
self->si->cur_source = cur_source;
} }
if (send_waiting(self, 0) != 0) }
if (trans_send_waiting(self, 0) != 0)
{ {
/* error */ /* error */
self->status = TRANS_STATUS_DOWN; self->status = TRANS_STATUS_DOWN;
@ -453,7 +497,7 @@ trans_force_write_s(struct trans *self, struct stream *out_s)
size = (int) (out_s->end - out_s->data); size = (int) (out_s->end - out_s->data);
total = 0; total = 0;
if (send_waiting(self, 1) != 0) if (trans_send_waiting(self, 1) != 0)
{ {
self->status = TRANS_STATUS_DOWN; self->status = TRANS_STATUS_DOWN;
return 1; return 1;
@ -512,23 +556,68 @@ trans_force_write(struct trans *self)
/*****************************************************************************/ /*****************************************************************************/
int APP_CC int APP_CC
trans_write_copy(struct trans *self) trans_write_copy_s(struct trans *self, struct stream *out_s)
{ {
int size; int size;
struct stream *out_s; int sent;
struct stream *wait_s; struct stream *wait_s;
struct stream *temp_s; struct stream *temp_s;
char *out_data;
if (self->status != TRANS_STATUS_UP) if (self->status != TRANS_STATUS_UP)
{ {
return 1; return 1;
} }
/* try to send any left over */
out_s = self->out_s; if (trans_send_waiting(self, 0) != 0)
{
/* error */
self->status = TRANS_STATUS_DOWN;
return 1;
}
out_data = out_s->data;
sent = 0;
size = (int) (out_s->end - out_s->data); size = (int) (out_s->end - out_s->data);
if (self->wait_s == 0)
{
/* if no left over, try to send this new data */
if (g_tcp_can_send(self->sck, 0))
{
sent = self->trans_send(self, out_s->data, size);
if (sent > 0)
{
out_data += sent;
size -= sent;
}
else if (sent == 0)
{
return 1;
}
else
{
if (!g_tcp_last_error_would_block(self->sck))
{
return 1;
}
}
}
}
if (size < 1)
{
return 0;
}
/* did not send right away, have to copy */
make_stream(wait_s); make_stream(wait_s);
init_stream(wait_s, size); init_stream(wait_s, size);
out_uint8a(wait_s, out_s->data, size); if (self->si != 0)
{
if (self->si->cur_source != 0)
{
self->si->source[self->si->cur_source] += size;
wait_s->source = self->si->source + self->si->cur_source;
}
}
out_uint8a(wait_s, out_data, size);
s_mark_end(wait_s); s_mark_end(wait_s);
wait_s->p = wait_s->data; wait_s->p = wait_s->data;
if (self->wait_s == 0) if (self->wait_s == 0)
@ -538,24 +627,22 @@ trans_write_copy(struct trans *self)
else else
{ {
temp_s = self->wait_s; temp_s = self->wait_s;
while (temp_s->next_packet != 0) while (temp_s->next != 0)
{ {
temp_s = (struct stream *) (temp_s->next_packet); temp_s = temp_s->next;
} }
temp_s->next_packet = (char *) wait_s; temp_s->next = wait_s;
} }
/* try to send */
if (send_waiting(self, 0) != 0)
{
/* error */
self->status = TRANS_STATUS_DOWN;
return 1;
}
return 0; return 0;
} }
/*****************************************************************************/
int APP_CC
trans_write_copy(struct trans* self)
{
return trans_write_copy_s(self, self->out_s);
}
/*****************************************************************************/ /*****************************************************************************/
int APP_CC int APP_CC
trans_connect(struct trans *self, const char *server, const char *port, trans_connect(struct trans *self, const char *server, const char *port,

View File

@ -45,6 +45,20 @@ typedef int (APP_CC *trans_recv_proc) (struct trans *self, void *ptr, int len);
typedef int (APP_CC *trans_send_proc) (struct trans *self, const void *data, int len); typedef int (APP_CC *trans_send_proc) (struct trans *self, const void *data, int len);
typedef int (APP_CC *trans_can_recv_proc) (struct trans *self, int sck, int millis); typedef int (APP_CC *trans_can_recv_proc) (struct trans *self, int sck, int millis);
/* optional source info */
#define XRDP_SOURCE_NONE 0
#define XRDP_SOURCE_CLIENT 1
#define XRDP_SOURCE_SESMAN 2
#define XRDP_SOURCE_CHANSRV 3
#define XRDP_SOURCE_MOD 4
struct source_info
{
int cur_source;
int source[7];
};
struct trans struct trans
{ {
tbus sck; /* socket handle */ tbus sck; /* socket handle */
@ -68,6 +82,8 @@ struct trans
trans_recv_proc trans_recv; trans_recv_proc trans_recv;
trans_send_proc trans_send; trans_send_proc trans_send;
trans_can_recv_proc trans_can_recv; trans_can_recv_proc trans_can_recv;
struct source_info *si;
int my_source;
}; };
struct trans* APP_CC struct trans* APP_CC
@ -93,6 +109,8 @@ trans_force_write(struct trans* self);
int APP_CC int APP_CC
trans_write_copy(struct trans* self); trans_write_copy(struct trans* self);
int APP_CC int APP_CC
trans_write_copy_s(struct trans* self, struct stream* out_s);
int APP_CC
trans_connect(struct trans* self, const char* server, const char* port, trans_connect(struct trans* self, const char* server, const char* port,
int timeout); int timeout);
int APP_CC int APP_CC

View File

@ -72,6 +72,8 @@ struct xrdp_session
int up_and_running; int up_and_running;
int (*is_term)(void); int (*is_term)(void);
int in_process_data; /* inc / dec libxrdp_process_data calls */ int in_process_data; /* inc / dec libxrdp_process_data calls */
struct source_info si;
}; };
struct xrdp_session * DEFAULT_CC struct xrdp_session * DEFAULT_CC

View File

@ -137,7 +137,7 @@ xrdp_fastpath_session_callback(struct xrdp_fastpath *self, int msg,
int APP_CC int APP_CC
xrdp_fastpath_send(struct xrdp_fastpath *self, struct stream *s) xrdp_fastpath_send(struct xrdp_fastpath *self, struct stream *s)
{ {
if (trans_force_write_s(self->trans, s) != 0) if (trans_write_copy_s(self->trans, s) != 0)
{ {
return 1; return 1;
} }

View File

@ -286,7 +286,7 @@ xrdp_iso_send_cc(struct xrdp_iso *self)
len_ptr[1] = len; len_ptr[1] = len;
len_indicator_ptr[0] = len_indicator; len_indicator_ptr[0] = len_indicator;
if (trans_force_write_s(self->trans, s) != 0) if (trans_write_copy_s(self->trans, s) != 0)
{ {
free_stream(s); free_stream(s);
return 1; return 1;
@ -409,7 +409,7 @@ xrdp_iso_send(struct xrdp_iso *self, struct stream *s)
out_uint8(s, ISO_PDU_DT); out_uint8(s, ISO_PDU_DT);
out_uint8(s, 0x80); out_uint8(s, 0x80);
if (trans_force_write_s(self->trans, s) != 0) if (trans_write_copy_s(self->trans, s) != 0)
{ {
return 1; return 1;
} }

View File

@ -195,6 +195,8 @@ xrdp_process_main_loop(struct xrdp_process *self)
self->server_trans->callback_data = self; self->server_trans->callback_data = self;
init_stream(self->server_trans->in_s, 8192 * 4); init_stream(self->server_trans->in_s, 8192 * 4);
self->session = libxrdp_init((tbus)self, self->server_trans); self->session = libxrdp_init((tbus)self, self->server_trans);
self->server_trans->si = &(self->session->si);
self->server_trans->my_source = XRDP_SOURCE_CLIENT;
/* this callback function is in xrdp_wm.c */ /* this callback function is in xrdp_wm.c */
self->session->callback = callback; self->session->callback = callback;
/* this function is just above */ /* this function is just above */
@ -217,8 +219,8 @@ xrdp_process_main_loop(struct xrdp_process *self)
robjs[robjs_count++] = self->self_term_event; robjs[robjs_count++] = self->self_term_event;
xrdp_wm_get_wait_objs(self->wm, robjs, &robjs_count, xrdp_wm_get_wait_objs(self->wm, robjs, &robjs_count,
wobjs, &wobjs_count, &timeout); wobjs, &wobjs_count, &timeout);
trans_get_wait_objs(self->server_trans, robjs, &robjs_count); trans_get_wait_objs_rw(self->server_trans, robjs, &robjs_count,
wobjs, &wobjs_count);
/* wait */ /* wait */
if (g_obj_wait(robjs, robjs_count, wobjs, wobjs_count, timeout) != 0) if (g_obj_wait(robjs, robjs_count, wobjs, wobjs_count, timeout) != 0)
{ {