imap: Testing for new messages is now working.

* There is now a CheckSubscribedFoldersCommand that is issued on the main
  connection that triggers everything.
* The new CheckMailboxesCommand divides the new mail check into several
  portions, and requeues itself until the next step, ie. the
  FetchHeadersCommand.
* The headers of the new mails are downloaded, but nothing is done with
  them yet.
* The actual check for the new mails doesn't scale that well yet, not sure
  how to properly do this without having to rely on the mail indices. Might
  be sensible to solve this via some simple heuristic.
This commit is contained in:
Axel Dörfler 2013-04-15 00:53:34 +02:00
parent 82f27de238
commit 229c777323
6 changed files with 317 additions and 61 deletions

View File

@ -8,11 +8,17 @@
#include <Autolock.h>
#include <AutoDeleter.h>
#include "IMAPFolder.h"
#include "IMAPMailbox.h"
#include "IMAPProtocol.h"
static const uint32 kMaxFetchEntries = 500;
static const uint32 kMaxDirectDownloadSize = 4096;
class WorkerPrivate {
public:
WorkerPrivate(IMAPConnectionWorker& worker)
@ -26,6 +32,38 @@ public:
return fWorker.fProtocol;
}
status_t AddFolders(BObjectList<IMAPFolder>& folders)
{
IMAPConnectionWorker::MailboxMap::iterator iterator
= fWorker.fMailboxes.begin();
for (; iterator != fWorker.fMailboxes.end(); iterator++) {
IMAPFolder* folder = iterator->first;
if (!folders.AddItem(folder))
return B_NO_MEMORY;
}
return B_OK;
}
status_t SelectMailbox(IMAPFolder& folder)
{
return fWorker._SelectMailbox(folder, NULL);
}
status_t SelectMailbox(IMAPFolder& folder, uint32& nextUID)
{
return fWorker._SelectMailbox(folder, &nextUID);
}
IMAPMailbox* MailboxFor(IMAPFolder& folder)
{
return fWorker._MailboxFor(folder);
}
status_t EnqueueCommand(WorkerCommand* command)
{
return fWorker._EnqueueCommand(command);
}
void Quit()
{
fWorker.fStopped = true;
@ -42,6 +80,7 @@ public:
virtual ~WorkerCommand() {}
virtual status_t Process(IMAPConnectionWorker& worker) = 0;
virtual bool IsDone() const { return true; }
};
@ -59,12 +98,29 @@ public:
};
class CheckMailboxCommand : public WorkerCommand {
class CheckSubscribedFoldersCommand : public WorkerCommand {
public:
CheckMailboxCommand(IMAPFolder& folder, IMAPMailbox& mailbox)
virtual status_t Process(IMAPConnectionWorker& worker)
{
IMAP::Protocol& protocol = WorkerPrivate(worker).Protocol();
// The main worker checks the subscribed folders, and creates
// other workers as needed
return worker.Owner().CheckSubscribedFolders(protocol,
worker.UsesIdle());
}
};
class FetchHeadersCommand : public WorkerCommand {
public:
FetchHeadersCommand(IMAPFolder& folder, IMAPMailbox& mailbox,
uint32 from, uint32 to)
:
fFolder(folder),
fMailbox(mailbox)
fMailbox(mailbox),
fFrom(from),
fTo(to)
{
}
@ -72,12 +128,19 @@ public:
{
IMAP::Protocol& protocol = WorkerPrivate(worker).Protocol();
IMAP::SelectCommand select(fFolder.MailboxName().String());
status_t status = protocol.ProcessCommand(select);
if (status == B_OK) {
fFolder.SetUIDValidity(select.UIDValidity());
// TODO: trigger download of mails until UIDNext()
}
// TODO: check nextUID if we get one
status_t status = WorkerPrivate(worker).SelectMailbox(fFolder);
if (status != B_OK)
return status;
// TODO: create messages!
// TODO: trigger download of mails for all messages below the
// body fetch limit
// TODO: we would really like to have the message flags at this point
IMAP::FetchCommand fetch(fFrom, fTo, IMAP::kFetchHeader);
status = protocol.ProcessCommand(fetch);
if (status != B_OK)
return status;
return B_OK;
}
@ -85,6 +148,131 @@ public:
private:
IMAPFolder& fFolder;
IMAPMailbox& fMailbox;
uint32 fFrom;
uint32 fTo;
};
class CheckMailboxesCommand : public WorkerCommand {
public:
CheckMailboxesCommand()
:
fFolders(5, false),
fState(INIT),
fFolder(NULL),
fMailbox(NULL)
{
}
virtual status_t Process(IMAPConnectionWorker& worker)
{
IMAP::Protocol& protocol = WorkerPrivate(worker).Protocol();
if (fState == INIT) {
// Collect folders
status_t status = WorkerPrivate(worker).AddFolders(fFolders);
if (status != B_OK || fFolders.IsEmpty())
return status;
fState = SELECT;
}
if (fState == SELECT) {
// Get next mailbox from list, and select it
fFolder = fFolders.RemoveItemAt(fFolders.CountItems() - 1);
if (fFolder == NULL) {
for (int32 i = 0; i < fFetchCommands.CountItems(); i++) {
WorkerPrivate(worker).EnqueueCommand(
fFetchCommands.ItemAt(i));
}
fState = DONE;
return B_OK;
}
fMailbox = WorkerPrivate(worker).MailboxFor(*fFolder);
status_t status = WorkerPrivate(worker).SelectMailbox(*fFolder,
fNextUID);
if (status != B_OK)
return status;
fState = FETCH_ENTRIES;
// fFirstUID = fLastUID = fFolder->LastUID();
fFirstUID = fLastUID = 0;
fMailboxEntries = 0;
}
if (fState == FETCH_ENTRIES) {
status_t status = WorkerPrivate(worker).SelectMailbox(*fFolder);
if (status != B_OK)
return status;
// TODO: this does not scale that well. Over time, the holes in the
// UIDs might become really large
uint32 from = fLastUID;
uint32 to = fNextUID;
if (to - from > kMaxFetchEntries)
to = from + kMaxFetchEntries;
printf("IMAP: get entries from %lu to %lu\n", from, to);
// TODO: we don't really need the flags at this point at all
IMAP::MessageEntryList entries;
IMAP::FetchMessageEntriesCommand fetch(entries, from, to);
status = protocol.ProcessCommand(fetch);
if (status != B_OK)
return status;
// Determine how much we need to download
for (size_t i = 0; i < entries.size(); i++) {
printf("%10lu %8lu bytes, flags: %#lx\n", entries[i].uid,
entries[i].size, entries[i].flags);
fTotalBytes += entries[i].size;
}
fTotalEntries += entries.size();
fMailboxEntries += entries.size();
fLastUID = to;
if (to == fNextUID) {
if (fMailboxEntries > 0) {
// Add pending command to fetch the message headers
WorkerCommand* command = new FetchHeadersCommand(*fFolder,
*fMailbox, fFirstUID, fLastUID);
if (!fFetchCommands.AddItem(command))
delete command;
}
fState = SELECT;
}
}
return B_OK;
}
virtual bool IsDone() const
{
return fState == DONE;
}
private:
enum State {
INIT,
SELECT,
FETCH_ENTRIES,
DONE
};
BObjectList<IMAPFolder> fFolders;
State fState;
IMAPFolder* fFolder;
IMAPMailbox* fMailbox;
uint32 fFirstUID;
uint32 fLastUID;
uint32 fNextUID;
uint32 fMailboxEntries;
uint64 fTotalEntries;
uint64 fTotalBytes;
WorkerCommandList fFetchCommands;
};
@ -108,6 +296,7 @@ IMAPConnectionWorker::IMAPConnectionWorker(IMAPProtocol& owner,
IMAPConnectionWorker::~IMAPConnectionWorker()
{
puts("worker quit");
delete_sem(fPendingCommandsSemaphore);
_Disconnect();
}
@ -179,32 +368,24 @@ IMAPConnectionWorker::Run()
void
IMAPConnectionWorker::Quit()
{
printf("IMAP: worker %p: enqueue quit\n", this);
_EnqueueCommand(new QuitCommand());
}
status_t
IMAPConnectionWorker::EnqueueCheckSubscribedFolders()
{
printf("IMAP: worker %p: enqueue check subscribed folders\n", this);
return _EnqueueCommand(new CheckSubscribedFoldersCommand());
}
status_t
IMAPConnectionWorker::EnqueueCheckMailboxes()
{
BAutolock locker(fLocker);
MailboxMap::iterator iterator = fMailboxes.begin();
for (; iterator != fMailboxes.end(); iterator++) {
IMAPFolder* folder = iterator->first;
printf("%p: check: %s\n", this, folder->MailboxName().String());
IMAPMailbox* mailbox = iterator->second;
if (mailbox == NULL) {
mailbox = new IMAPMailbox(fProtocol, folder->MailboxName());
folder->SetListener(mailbox);
}
status_t status = _EnqueueCommand(
new CheckMailboxCommand(*folder, *mailbox));
if (status != B_OK)
return status;
}
return B_OK;
printf("IMAP: worker %p: enqueue check mailboxes\n", this);
return _EnqueueCommand(new CheckMailboxesCommand());
}
@ -215,6 +396,9 @@ IMAPConnectionWorker::EnqueueRetrieveMail(entry_ref& ref)
}
// #pragma mark - Handler listener
void
IMAPConnectionWorker::MessageExistsReceived(uint32 index)
{
@ -222,20 +406,20 @@ IMAPConnectionWorker::MessageExistsReceived(uint32 index)
}
void
IMAPConnectionWorker::MessageExpungeReceived(uint32 index)
{
printf("Message expunge: %ld\n", index);
}
// #pragma mark - private
status_t
IMAPConnectionWorker::_Worker()
{
while (!fStopped) {
if (fMain) {
// The main worker checks the subscribed folders, and creates
// other workers as needed
status_t status = _Connect();
if (status == B_OK)
status = fOwner.CheckSubscribedFolders(fProtocol, fIdle);
if (status != B_OK)
return status;
}
BAutolock locker(fLocker);
if (fPendingCommands.IsEmpty()) {
@ -250,6 +434,8 @@ IMAPConnectionWorker::_Worker()
if (command == NULL)
continue;
ObjectDeleter<WorkerCommand> deleter(command);
status_t status = _Connect();
if (status != B_OK)
return status;
@ -257,8 +443,14 @@ IMAPConnectionWorker::_Worker()
status = command->Process(*this);
if (status != B_OK)
return status;
if (!command->IsDone()) {
deleter.Detach();
_EnqueueCommand(command);
}
}
fOwner.WorkerQuit(this);
return B_OK;
}
@ -289,6 +481,43 @@ IMAPConnectionWorker::_WaitForCommands()
}
status_t
IMAPConnectionWorker::_SelectMailbox(IMAPFolder& folder, uint32* _nextUID)
{
if (fSelectedBox == &folder && _nextUID == NULL)
return B_OK;
IMAP::SelectCommand select(folder.MailboxName().String());
status_t status = fProtocol.ProcessCommand(select);
if (status == B_OK) {
folder.SetUIDValidity(select.UIDValidity());
if (_nextUID != NULL)
*_nextUID = select.NextUID();
fSelectedBox = &folder;
}
return status;
}
IMAPMailbox*
IMAPConnectionWorker::_MailboxFor(IMAPFolder& folder)
{
MailboxMap::iterator found = fMailboxes.find(&folder);
if (found == fMailboxes.end())
return NULL;
IMAPMailbox* mailbox = found->second;
if (mailbox == NULL) {
mailbox = new IMAPMailbox(fProtocol, folder.MailboxName());
folder.SetListener(mailbox);
found->second = mailbox;
}
return mailbox;
}
status_t
IMAPConnectionWorker::_Connect()
{

View File

@ -24,7 +24,8 @@ class WorkerPrivate;
typedef BObjectList<WorkerCommand> WorkerCommandList;
class IMAPConnectionWorker : public IMAP::ExistsListener {
class IMAPConnectionWorker : public IMAP::ExistsListener,
IMAP::ExpungeListener {
public:
IMAPConnectionWorker(IMAPProtocol& owner,
const Settings& settings,
@ -36,23 +37,34 @@ public:
void AddMailbox(IMAPFolder* folder);
void RemoveAllMailboxes();
IMAPProtocol& Owner() const { return fOwner; }
bool IsMain() const { return fMain; }
bool UsesIdle() const { return fIdle; }
status_t Run();
void Quit();
status_t EnqueueCheckSubscribedFolders();
status_t EnqueueCheckMailboxes();
status_t EnqueueRetrieveMail(entry_ref& ref);
// Handler listener
virtual void MessageExistsReceived(uint32 index);
virtual void MessageExpungeReceived(uint32 index);
private:
status_t _Worker();
status_t _EnqueueCommand(WorkerCommand* command);
void _WaitForCommands();
status_t _SelectMailbox(IMAPFolder& folder,
uint32* _nextUID);
IMAPMailbox* _MailboxFor(IMAPFolder& folder);
IMAPFolder* _Selected() const { return fSelectedBox; }
status_t _Connect();
void _Disconnect();
static status_t _Worker(void* self);
private:
@ -66,8 +78,10 @@ private:
WorkerCommandList fPendingCommands;
IMAP::ExistsHandler fExistsHandler;
IMAP::ExpungeHandler fExpungeHandler;
IMAPFolder* fIdleBox;
IMAPFolder* fSelectedBox;
MailboxMap fMailboxes;
BLocker fLocker;

View File

@ -119,6 +119,13 @@ IMAPProtocol::CheckSubscribedFolders(IMAP::Protocol& protocol, bool idle)
}
void
IMAPProtocol::WorkerQuit(IMAPConnectionWorker* worker)
{
fWorkers.RemoveItem(worker);
}
status_t
IMAPProtocol::SyncMessages()
{
@ -133,6 +140,7 @@ IMAPProtocol::SyncMessages()
return B_NO_MEMORY;
}
worker->EnqueueCheckSubscribedFolders();
return worker->Run();
}

View File

@ -32,6 +32,7 @@ public:
status_t CheckSubscribedFolders(
IMAP::Protocol& protocol, bool idle);
void WorkerQuit(IMAPConnectionWorker* worker);
virtual status_t SyncMessages();
virtual status_t FetchBody(const entry_ref& ref);

View File

@ -330,8 +330,7 @@ FetchMessageEntriesCommand::HandleUntagged(Response& response)
// #pragma mark -
FetchCommand::FetchCommand(uint32 from, uint32 to,
FetchMode mode)
FetchCommand::FetchCommand(uint32 from, uint32 to, FetchMode mode)
:
fFrom(from),
fTo(to),
@ -517,7 +516,7 @@ ExistsHandler::SetListener(ExistsListener* listener)
bool
ExistsHandler::HandleUntagged(Response& response)
{
if (!response.EqualsAt(1, "EXISTS") || response.IsNumberAt(0))
if (!response.EqualsAt(1, "EXISTS") || !response.IsNumberAt(0))
return false;
int32 index = response.NumberAt(0);
@ -552,31 +551,24 @@ ExpungeHandler::ExpungeHandler()
}
void
ExpungeHandler::SetListener(ExpungeListener* listener)
{
fListener = listener;
}
bool
ExpungeHandler::HandleUntagged(Response& response)
{
if (!response.EqualsAt(1, "EXPUNGE") || response.IsNumberAt(0))
if (!response.EqualsAt(1, "EXPUNGE") || !response.IsNumberAt(0))
return false;
// int32 expunge = response.NumberAt(0);
#if 0
Listener().ExpungeReceived(expunge);
int32 index = response.NumberAt(0);
// remove from storage
IMAPStorage& storage = fIMAPMailbox.GetStorage();
storage.DeleteMessage(fIMAPMailbox.MessageNumberToUID(expunge));
if (fListener != NULL)
fListener->MessageExpungeReceived(index);
// remove from min message list
MinMessageList& messageList = const_cast<MinMessageList&>(
fIMAPMailbox.GetMessageList());
messageList.erase(messageList.begin() + expunge - 1);
TRACE("EXPUNGE %i\n", (int)expunge);
// the watching loop restarts again, we need to watch again to because
// some IDLE implementation stop sending notifications
fIMAPMailbox.SendRawCommand("DONE");
#endif
return true;
}

View File

@ -237,11 +237,23 @@ public:
};
class ExpungeListener {
public:
virtual void MessageExpungeReceived(uint32 index) = 0;
};
class ExpungeHandler : public Handler {
public:
ExpungeHandler();
bool HandleUntagged(Response& response);
void SetListener(ExpungeListener* listener);
ExpungeListener* Listener() const { return fListener; }
virtual bool HandleUntagged(Response& response);
private:
ExpungeListener* fListener;
};