make fetchers scheduled

This commit is contained in:
Vincent Sanders 2014-06-26 19:04:14 +01:00
parent 4b2101ba6a
commit 1b7aa7ffe5
4 changed files with 170 additions and 90 deletions

View File

@ -37,9 +37,11 @@
#include <strings.h>
#include <time.h>
#include <libwapcaplet/libwapcaplet.h>
#include <curl/curl.h>
#include "utils/config.h"
#include "desktop/netsurf.h"
#include "desktop/gui_factory.h"
#include "utils/corestrings.h"
#include "utils/nsoption.h"
#include "utils/log.h"
@ -63,7 +65,11 @@
/** The maximum number of fetchers that can be added */
#define MAX_FETCHERS 8
bool fetch_active; /**< Fetches in progress, please call fetch_poll(). */
#ifdef DEBUG_FETCH_VERBOSE
#define FETCH_LOG(x) LOG(x)
#else
#define FETCH_LOG(x)
#endif
/**
* Information about a fetcher for a given scheme.
@ -129,7 +135,7 @@ static int get_fetcher_for_scheme(lwc_string *scheme)
scheme, &match) == lwc_error_ok) &&
(match == true)) {
return fetcherd;
}
}
}
return -1;
}
@ -140,10 +146,9 @@ static int get_fetcher_for_scheme(lwc_string *scheme)
static bool fetch_dispatch_job(struct fetch *fetch)
{
RING_REMOVE(queue_ring, fetch);
#ifdef DEBUG_FETCH_VERBOSE
LOG(("Attempting to start fetch %p, fetcher %p, url %s", fetch,
FETCH_LOG(("Attempting to start fetch %p, fetcher %p, url %s", fetch,
fetch->fetcher_handle, nsurl_access(fetch->url)));
#endif
if (!fetchers[fetch->fetcherd].ops.start(fetch->fetcher_handle)) {
RING_INSERT(queue_ring, fetch); /* Put it back on the end of the queue */
return false;
@ -191,56 +196,76 @@ static bool fetch_choose_and_dispatch(void)
return false;
}
/**
* Dispatch as many jobs as we have room to dispatch.
*/
static void fetch_dispatch_jobs(void)
static void dump_rings(void)
{
int all_active, all_queued;
#ifdef DEBUG_FETCH_VERBOSE
struct fetch *q;
struct fetch *f;
#endif
if (!queue_ring)
return; /* Nothing to do, the queue is empty */
RING_GETSIZE(struct fetch, queue_ring, all_queued);
RING_GETSIZE(struct fetch, fetch_ring, all_active);
#ifdef DEBUG_FETCH_VERBOSE
LOG(("queue_ring %i, fetch_ring %i", all_queued, all_active));
q = queue_ring;
if (q) {
do {
LOG(("queue_ring: %s", q->url));
LOG(("queue_ring: %s", nsurl_access(q->url)));
q = q->r_next;
} while (q != queue_ring);
}
f = fetch_ring;
if (f) {
do {
LOG(("fetch_ring: %s", f->url));
LOG(("fetch_ring: %s", nsurl_access(f->url)));
f = f->r_next;
} while (f != fetch_ring);
}
#endif
}
while ( all_queued && all_active < nsoption_int(max_fetchers) ) {
/*LOG(("%d queued, %d fetching", all_queued, all_active));*/
if (fetch_choose_and_dispatch()) {
/**
* Dispatch as many jobs as we have room to dispatch.
*
* @return true if there are active fetchers that require polling else false.
*/
static bool fetch_dispatch_jobs(void)
{
int all_active;
int all_queued;
RING_GETSIZE(struct fetch, queue_ring, all_queued);
RING_GETSIZE(struct fetch, fetch_ring, all_active);
FETCH_LOG(("queue_ring %i, fetch_ring %i", all_queued, all_active));
dump_rings();
while ((all_queued != 0) &&
(all_active < nsoption_int(max_fetchers)) &&
fetch_choose_and_dispatch()) {
all_queued--;
all_active++;
} else {
/* Either a dispatch failed or we ran out. Just stop */
break;
}
FETCH_LOG(("%d queued, %d fetching",
all_queued, all_active));
}
FETCH_LOG(("Fetch ring is now %d elements.", all_active));
FETCH_LOG(("Queue ring is now %d elements.", all_queued));
return (all_active > 0);
}
static void fetcher_poll(void *unused)
{
int fetcherd;
if (fetch_dispatch_jobs()) {
FETCH_LOG(("Polling fetchers"));
for (fetcherd = 0; fetcherd < MAX_FETCHERS; fetcherd++) {
if (fetchers[fetcherd].refcount > 0) {
/* fetcher present */
fetchers[fetcherd].ops.poll(fetchers[fetcherd].scheme);
}
}
/* schedule active fetchers to run again in 10ms */
guit->browser->schedule(10, fetcher_poll, NULL);
}
fetch_active = (all_active > 0);
#ifdef DEBUG_FETCH_VERBOSE
LOG(("Fetch ring is now %d elements.", all_active));
LOG(("Queue ring is now %d elements.", all_queued));
#endif
}
/******************************************************************************
@ -256,8 +281,6 @@ nserror fetcher_init(void)
fetch_resource_register();
fetch_about_register();
fetch_active = false;
return NSERROR_OK;
}
@ -303,6 +326,63 @@ fetcher_add(lwc_string *scheme, const struct fetcher_operation_table *ops)
return NSERROR_OK;
}
/* exported interface documented in content/fetch.h */
nserror fetcher_fdset(fd_set *read_fd_set,
fd_set *write_fd_set,
fd_set *except_fd_set,
int *maxfd_out)
{
CURLMcode code;
int maxfd;
int fetcherd; /* fetcher index */
if (!fetch_dispatch_jobs()) {
FETCH_LOG(("No jobs"));
*maxfd_out = -1;
return NSERROR_OK;
}
FETCH_LOG(("Polling fetchers"));
for (fetcherd = 0; fetcherd < MAX_FETCHERS; fetcherd++) {
if (fetchers[fetcherd].refcount > 0) {
/* fetcher present */
fetchers[fetcherd].ops.poll(fetchers[fetcherd].scheme);
}
}
FD_ZERO(read_fd_set);
FD_ZERO(write_fd_set);
FD_ZERO(except_fd_set);
code = curl_multi_fdset(fetch_curl_multi,
read_fd_set,
write_fd_set,
except_fd_set,
&maxfd);
assert(code == CURLM_OK);
if (maxfd >= 0) {
/* change the scheduled poll to happen is a 1000ms as
* we assume fetching an fdset means the fetchers will
* be run by the client waking up on data available on
* the fd and re-calling fetcher_fdset() if this does
* not happen the fetch polling will continue as
* usual.
*/
/** @note adjusting the schedule time is only done for
* curl currently. This is because as it is assumed to
* be the only fetcher that can possibly have fd to
* select on. All the other fetchers continue to need
* polling frequently.
*/
guit->browser->schedule(1000, fetcher_poll, NULL);
}
*maxfd_out = maxfd;
return NSERROR_OK;
}
/* exported interface documented in content/fetch.h */
struct fetch *
fetch_start(nsurl *url,
@ -336,9 +416,7 @@ fetch_start(nsurl *url,
return NULL;
}
#ifdef DEBUG_FETCH_VERBOSE
LOG(("fetch %p, url '%s'", fetch, nsurl_access(url)));
#endif
FETCH_LOG(("fetch %p, url '%s'", fetch, nsurl_access(url)));
/* construct a new fetch structure */
fetch->callback = callback;
@ -421,9 +499,15 @@ fetch_start(nsurl *url,
/* Rah, got it, so ref the fetcher. */
fetch_ref_fetcher(fetch->fetcherd);
/* Dump us in the queue and ask the queue to run. */
/* Dump new fetch in the queue. */
RING_INSERT(queue_ring, fetch);
fetch_dispatch_jobs();
/* Ask the queue to run. */
if (fetch_dispatch_jobs()) {
FETCH_LOG(("scheduling poll"));
/* schedule active fetchers to run again in 10ms */
guit->browser->schedule(10, fetcher_poll, NULL);
}
return fetch;
}
@ -432,47 +516,31 @@ fetch_start(nsurl *url,
void fetch_abort(struct fetch *f)
{
assert(f);
#ifdef DEBUG_FETCH_VERBOSE
LOG(("fetch %p, fetcher %p, url '%s'", f, f->fetcher_handle,
FETCH_LOG(("fetch %p, fetcher %p, url '%s'", f, f->fetcher_handle,
nsurl_access(f->url)));
#endif
fetchers[f->fetcherd].ops.abort(f->fetcher_handle);
}
/* exported interface documented in content/fetch.h */
void fetch_free(struct fetch *f)
{
#ifdef DEBUG_FETCH_VERBOSE
LOG(("Freeing fetch %p, fetcher %p", f, f->fetcher_handle));
#endif
FETCH_LOG(("Freeing fetch %p, fetcher %p", f, f->fetcher_handle));
fetchers[f->fetcherd].ops.free(f->fetcher_handle);
fetch_unref_fetcher(f->fetcherd);
nsurl_unref(f->url);
if (f->referer != NULL)
if (f->referer != NULL) {
nsurl_unref(f->referer);
if (f->host != NULL)
}
if (f->host != NULL) {
lwc_string_unref(f->host);
}
free(f);
}
/* exported interface documented in content/fetchers.h */
void fetcher_poll(void)
{
int fetcherd;
fetch_dispatch_jobs();
if (fetch_active) {
for (fetcherd = 0; fetcherd < MAX_FETCHERS; fetcherd++) {
if (fetchers[fetcherd].refcount > 0) {
/* fetcher present */
fetchers[fetcherd].ops.poll(fetchers[fetcherd].scheme);
}
}
}
}
/* exported interface documented in content/fetch.h */
bool fetch_can_fetch(const nsurl *url)
@ -589,7 +657,7 @@ void fetch_multipart_data_destroy(struct fetch_multipart_data *list)
free(list->name);
free(list->value);
if (list->file) {
LOG(("Freeing rawfile: %s", list->rawfile));
FETCH_LOG(("Freeing rawfile: %s", list->rawfile));
free(list->rawfile);
}
free(list);
@ -607,12 +675,8 @@ fetch_send_callback(const fetch_msg *msg, struct fetch *fetch)
/* exported interface documented in content/fetch.h */
void fetch_remove_from_queues(struct fetch *fetch)
{
int all_active;
#ifdef DEBUG_FETCH_VERBOSE
int all_queued;
LOG(("Fetch %p, fetcher %p can be freed", fetch, fetch->fetcher_handle));
#endif
FETCH_LOG(("Fetch %p, fetcher %p can be freed",
fetch, fetch->fetcher_handle));
/* Go ahead and free the fetch properly now */
if (fetch->fetch_is_active) {
@ -621,15 +685,15 @@ void fetch_remove_from_queues(struct fetch *fetch)
RING_REMOVE(queue_ring, fetch);
}
RING_GETSIZE(struct fetch, fetch_ring, all_active);
fetch_active = (all_active > 0);
#ifdef DEBUG_FETCH_VERBOSE
LOG(("Fetch ring is now %d elements.", all_active));
int all_active;
int all_queued;
RING_GETSIZE(struct fetch, fetch_ring, all_active);
RING_GETSIZE(struct fetch, queue_ring, all_queued);
LOG(("Fetch ring is now %d elements.", all_active));
LOG(("Queue ring is now %d elements.", all_queued));
#endif
}
@ -638,9 +702,8 @@ void fetch_remove_from_queues(struct fetch *fetch)
/* exported interface documented in content/fetch.h */
void fetch_set_http_code(struct fetch *fetch, long http_code)
{
#ifdef DEBUG_FETCH_VERBOSE
LOG(("Setting HTTP code to %ld", http_code));
#endif
FETCH_LOG(("Setting HTTP code to %ld", http_code));
fetch->http_code = http_code;
}

View File

@ -23,14 +23,13 @@
#ifndef _NETSURF_DESKTOP_FETCHERS_H_
#define _NETSURF_DESKTOP_FETCHERS_H_
#include <sys/select.h>
#include <libwapcaplet/libwapcaplet.h>
struct nsurl;
struct fetch_multipart_data;
struct fetch;
extern bool fetch_active;
/**
* Fetcher operations API
*
@ -86,6 +85,7 @@ struct fetcher_operation_table {
void (*finalise)(lwc_string *scheme);
};
/**
* Register a fetcher for a scheme
*
@ -95,6 +95,7 @@ struct fetcher_operation_table {
*/
nserror fetcher_add(lwc_string *scheme, const struct fetcher_operation_table *ops);
/**
* Initialise the fetchers.
*
@ -102,6 +103,7 @@ nserror fetcher_add(lwc_string *scheme, const struct fetcher_operation_table *op
*/
nserror fetcher_init(void);
/**
* Clean up for quit.
*
@ -109,11 +111,31 @@ nserror fetcher_init(void);
*/
void fetcher_quit(void);
/**
* Do some work on current fetches.
* Get the set of file descriptors the fetchers are currently using.
*
* Must be called regularly to make progress on fetches.
* This obtains the file descriptors the fetch system is using to
* obtain data. It will cause the fetchers to make progress, if
* possible, potentially completing fetches before requiring activity
* on file descriptors.
*
* If a set of descriptors is returned (maxfd is not -1) The caller is
* expected to wait on them (with select etc.) and continue to obtain
* the fdset with this call. This will switch the fetchers from polled
* mode to waiting for network activity which is much more efficient.
*
* \note If the caller does not subsequently obtain the fdset again
* the fetchers will fall back to the less efficient polled
* operation. The fallback to polled operation will only occour after
* a timeout which introduces additional delay.
*
* \param read_fd_set[out] The fd set for read.
* \param write_fd_set[out] The fd set for write.
* \param except_fd_set[out] The fd set for exceptions.
* \param maxfd[out] The highest fd number in the set or -1 if no fd available.
* \return NSERROR_OK on success or appropriate error code.
*/
void fetcher_poll(void);
nserror fetcher_fdset(fd_set *read_fd_set, fd_set *write_fd_set, fd_set *except_fd_set, int *maxfd);
#endif

View File

@ -500,9 +500,7 @@ bool fetch_curl_initiate_fetch(struct curl_fetch_info *fetch, CURL *handle)
/* add to the global curl multi handle */
codem = curl_multi_add_handle(fetch_curl_multi, fetch->curl_handle);
assert(codem == CURLM_OK || codem == CURLM_CALL_MULTI_PERFORM);
guit->browser->schedule(10, (void *)fetch_curl_poll, NULL);
return true;
}

View File

@ -257,8 +257,7 @@ nserror netsurf_init(const char *messages, const char *store_path)
int netsurf_main_loop(void)
{
while (!netsurf_quit) {
guit->browser->poll(fetch_active);
fetcher_poll();
guit->browser->poll(false);
}
return 0;
@ -310,5 +309,3 @@ void netsurf_exit(void)
LOG(("Exited successfully"));
}