chardev: fix race with client connections in tcp_chr_wait_connected
When the 'reconnect' option is given for a client connection, the qmp_chardev_open_socket_client method will run an asynchronous connection attempt. The QIOChannel socket executes this is a single use background thread, so the connection will succeed immediately (assuming the server is listening). The chardev, however, won't get the result from this background thread until the main loop starts running and processes idle callbacks. Thus when tcp_chr_wait_connected is run s->ioc will be NULL, but the state will be TCP_CHARDEV_STATE_CONNECTING, and there may already be an established connection that will be associated with the chardev by the pending idle callback. tcp_chr_wait_connected doesn't check the state, only s->ioc, so attempts to establish another connection synchronously. If the server allows multiple connections this is unhelpful but not a fatal problem as the duplicate connection will get ignored by the tcp_chr_new_client method when it sees the state is already connected. If the server only supports a single connection, however, the tcp_chr_wait_connected method will hang forever because the server will not accept its synchronous connection attempt until the first connection is closed. To deal with this tcp_chr_wait_connected needs to synchronize with the completion of the background connection task. To do this it needs to create the QIOTask directly and use the qio_task_wait_thread method. This will cancel the pending idle callback and directly dispatch the task completion callback, allowing the connection to be associated with the chardev. If the background connection failed, it can still attempt a new synchronous connection. Signed-off-by: Daniel P. Berrangé <berrange@redhat.com> Message-Id: <20190211182442.8542-15-berrange@redhat.com> Signed-off-by: Marc-André Lureau <marcandre.lureau@redhat.com>
This commit is contained in:
parent
d1885e549d
commit
4b47373a0d
@ -80,6 +80,8 @@ typedef struct {
|
|||||||
GSource *reconnect_timer;
|
GSource *reconnect_timer;
|
||||||
int64_t reconnect_time;
|
int64_t reconnect_time;
|
||||||
bool connect_err_reported;
|
bool connect_err_reported;
|
||||||
|
|
||||||
|
QIOTask *connect_task;
|
||||||
} SocketChardev;
|
} SocketChardev;
|
||||||
|
|
||||||
#define SOCKET_CHARDEV(obj) \
|
#define SOCKET_CHARDEV(obj) \
|
||||||
@ -118,6 +120,7 @@ static void qemu_chr_socket_restart_timer(Chardev *chr)
|
|||||||
char *name;
|
char *name;
|
||||||
|
|
||||||
assert(s->state == TCP_CHARDEV_STATE_DISCONNECTED);
|
assert(s->state == TCP_CHARDEV_STATE_DISCONNECTED);
|
||||||
|
assert(!s->reconnect_timer);
|
||||||
name = g_strdup_printf("chardev-socket-reconnect-%s", chr->label);
|
name = g_strdup_printf("chardev-socket-reconnect-%s", chr->label);
|
||||||
s->reconnect_timer = qemu_chr_timeout_add_ms(chr,
|
s->reconnect_timer = qemu_chr_timeout_add_ms(chr,
|
||||||
s->reconnect_time * 1000,
|
s->reconnect_time * 1000,
|
||||||
@ -965,7 +968,56 @@ static int tcp_chr_wait_connected(Chardev *chr, Error **errp)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
while (!s->ioc) {
|
tcp_chr_reconn_timer_cancel(s);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We expect states to be as follows:
|
||||||
|
*
|
||||||
|
* - server
|
||||||
|
* - wait -> CONNECTED
|
||||||
|
* - nowait -> DISCONNECTED
|
||||||
|
* - client
|
||||||
|
* - reconnect == 0 -> CONNECTED
|
||||||
|
* - reconnect != 0 -> CONNECTING
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
if (s->state == TCP_CHARDEV_STATE_CONNECTING) {
|
||||||
|
if (!s->connect_task) {
|
||||||
|
error_setg(errp,
|
||||||
|
"Unexpected 'connecting' state without connect task "
|
||||||
|
"while waiting for connection completion");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
* tcp_chr_wait_connected should only ever be run from the
|
||||||
|
* main loop thread associated with chr->gcontext, otherwise
|
||||||
|
* qio_task_wait_thread has a dangerous race condition with
|
||||||
|
* free'ing of the s->connect_task object.
|
||||||
|
*
|
||||||
|
* Acquiring the main context doesn't 100% prove we're in
|
||||||
|
* the main loop thread, but it does at least guarantee
|
||||||
|
* that the main loop won't be executed by another thread
|
||||||
|
* avoiding the race condition with the task idle callback.
|
||||||
|
*/
|
||||||
|
g_main_context_acquire(chr->gcontext);
|
||||||
|
qio_task_wait_thread(s->connect_task);
|
||||||
|
g_main_context_release(chr->gcontext);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The completion callback (qemu_chr_socket_connected) for
|
||||||
|
* s->connect_task should have set this to NULL by the time
|
||||||
|
* qio_task_wait_thread has returned.
|
||||||
|
*/
|
||||||
|
assert(!s->connect_task);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* NB we are *not* guaranteed to have "s->state == ..CONNECTED"
|
||||||
|
* at this point as this first connect may be failed, so
|
||||||
|
* allow the next loop to run regardless.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
||||||
|
while (s->state != TCP_CHARDEV_STATE_CONNECTED) {
|
||||||
if (s->is_listen) {
|
if (s->is_listen) {
|
||||||
tcp_chr_accept_server_sync(chr);
|
tcp_chr_accept_server_sync(chr);
|
||||||
} else {
|
} else {
|
||||||
@ -1014,6 +1066,8 @@ static void qemu_chr_socket_connected(QIOTask *task, void *opaque)
|
|||||||
SocketChardev *s = SOCKET_CHARDEV(chr);
|
SocketChardev *s = SOCKET_CHARDEV(chr);
|
||||||
Error *err = NULL;
|
Error *err = NULL;
|
||||||
|
|
||||||
|
s->connect_task = NULL;
|
||||||
|
|
||||||
if (qio_task_propagate_error(task, &err)) {
|
if (qio_task_propagate_error(task, &err)) {
|
||||||
tcp_chr_change_state(s, TCP_CHARDEV_STATE_DISCONNECTED);
|
tcp_chr_change_state(s, TCP_CHARDEV_STATE_DISCONNECTED);
|
||||||
check_report_connect_error(chr, err);
|
check_report_connect_error(chr, err);
|
||||||
@ -1028,6 +1082,20 @@ cleanup:
|
|||||||
object_unref(OBJECT(sioc));
|
object_unref(OBJECT(sioc));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void tcp_chr_connect_client_task(QIOTask *task,
|
||||||
|
gpointer opaque)
|
||||||
|
{
|
||||||
|
QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
|
||||||
|
SocketAddress *addr = opaque;
|
||||||
|
Error *err = NULL;
|
||||||
|
|
||||||
|
qio_channel_socket_connect_sync(ioc, addr, &err);
|
||||||
|
|
||||||
|
qio_task_set_error(task, err);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void tcp_chr_connect_client_async(Chardev *chr)
|
static void tcp_chr_connect_client_async(Chardev *chr)
|
||||||
{
|
{
|
||||||
SocketChardev *s = SOCKET_CHARDEV(chr);
|
SocketChardev *s = SOCKET_CHARDEV(chr);
|
||||||
@ -1036,9 +1104,23 @@ static void tcp_chr_connect_client_async(Chardev *chr)
|
|||||||
tcp_chr_change_state(s, TCP_CHARDEV_STATE_CONNECTING);
|
tcp_chr_change_state(s, TCP_CHARDEV_STATE_CONNECTING);
|
||||||
sioc = qio_channel_socket_new();
|
sioc = qio_channel_socket_new();
|
||||||
tcp_chr_set_client_ioc_name(chr, sioc);
|
tcp_chr_set_client_ioc_name(chr, sioc);
|
||||||
qio_channel_socket_connect_async(sioc, s->addr,
|
/*
|
||||||
qemu_chr_socket_connected,
|
* Normally code would use the qio_channel_socket_connect_async
|
||||||
chr, NULL, chr->gcontext);
|
* method which uses a QIOTask + qio_task_set_error internally
|
||||||
|
* to avoid blocking. The tcp_chr_wait_connected method, however,
|
||||||
|
* needs a way to synchronize with completion of the background
|
||||||
|
* connect task which can't be done with the QIOChannelSocket
|
||||||
|
* async APIs. Thus we must use QIOTask directly to implement
|
||||||
|
* the non-blocking concept locally.
|
||||||
|
*/
|
||||||
|
s->connect_task = qio_task_new(OBJECT(sioc),
|
||||||
|
qemu_chr_socket_connected,
|
||||||
|
chr, NULL);
|
||||||
|
qio_task_run_in_thread(s->connect_task,
|
||||||
|
tcp_chr_connect_client_task,
|
||||||
|
s->addr,
|
||||||
|
NULL,
|
||||||
|
chr->gcontext);
|
||||||
}
|
}
|
||||||
|
|
||||||
static gboolean socket_reconnect_timeout(gpointer opaque)
|
static gboolean socket_reconnect_timeout(gpointer opaque)
|
||||||
|
Loading…
Reference in New Issue
Block a user