Add ridiculously overwrought asynchronous 9P client.

This commit is contained in:
Kris Maglione 2009-05-21 14:38:38 -04:00
parent dd5f800f0e
commit 75abe3f7a5
8 changed files with 374 additions and 116 deletions

View File

@ -1,7 +1,7 @@
import os
import sys
from pyxp import Client
from pyxp.asyncclient import Client
if 'WMII_ADDRESS' in os.environ:
client = Client(os.environ['WMII_ADDRESS'])

View File

@ -16,7 +16,7 @@ class Ctl(object):
pass
def ctl(self, msg):
client.write(self.ctl_path, msg)
client.awrite(self.ctl_path, msg)
def __getitem__(self, key):
for line in self.ctl_lines():
@ -58,7 +58,7 @@ class Ctl(object):
lines = tuple(client.readlines(self.ctl_path))
if self.ctl_hasid:
lines = lines[1:]
return lines[:-1]
return lines
_id = None
@property
@ -111,7 +111,7 @@ class Dir(Ctl):
def __set__(self, dir, val):
if not self.writable:
raise NotImplementedError('File %s is not writable' % self.name)
return client.write('%s/%s' % (dir.path, self.name), val)
return client.awrite('%s/%s' % (dir.path, self.name), val)
@property
def ctl_path(self):
@ -341,13 +341,9 @@ class Button(object):
def create(self, colors=None, label=None):
with client.create(self.path, OWRITE) as f:
if colors or label:
f.write(self.getval(colors, label))
f.write(self.getval(colors, label))
def remove(self):
try:
client.remove(self.path)
except Exception:
pass
client.aremove(self.path)
def getval(self, colors=None, label=None):
if colors is None:
@ -358,7 +354,7 @@ class Button(object):
colors = property(
lambda self: tuple(map(Color, client.read(self.path).split(' ')[:3])),
lambda self, val: client.write(self.path, self.getval(colors=val)))
lambda self, val: client.awrite(self.path, self.getval(colors=val)))
label = property(
lambda self: client.read(self.path).split(' ', 3)[3],
@ -480,7 +476,7 @@ class Rules(collections.MutableMapping):
assert '/' not in k and '\n' not in v
lines.append('/%s/ -> %s' % (k, v))
lines.append('')
client.write(self.path, '\n'.join(lines))
client.awrite(self.path, '\n'.join(lines))
def iteritems(self):
for line in client.readlines(self.path):
@ -520,7 +516,7 @@ class Tags(object):
self.add(t.id)
for b in wmii.lbuttons:
if b.name not in self.tags:
b.remove()
b.aremove()
self.focus(Tag('sel').id)
self.mru = [self.sel.id]

View File

@ -0,0 +1,206 @@
from pyxp import client, fcall
from pyxp.client import *
def awithfile(*oargs, **okwargs):
def wrapper(fn):
def next(self, path, *args, **kwargs):
def next(file, exc, tb):
fn(self, (file, exc, tb), *args, **kwargs)
self.aopen(path, next, *oargs, **okwargs)
return next
return wrapper
def wrap_callback(fn, file):
file.called = 0
def callback(data, exc, tb):
file.called += 1
file.close()
if callable(fn):
fn(data, exc, tb)
return callback
class Client(client.Client):
ROOT_FID = 0
def awalk(self, path, async, fail=None):
ctxt = dict(path=path, fid=self.getfid(), ofid=ROOT_FID)
def next(resp=None, exc=None, tb=None):
if exc and ctxt['ofid'] != ROOT_FID:
self.aclunk(ctxt['fid'])
if not ctxt['path'] and resp or exc:
if exc and fail:
return self.respond(fail, None, exc, tb)
return self.respond(async, ctxt['fid'], exc, tb)
wname = ctxt['path'][:fcall.MAX_WELEM]
ofid = ctxt['ofid']
ctxt['path'] = ctxt['path'][fcall.MAX_WELEM:]
if resp:
ctxt['ofid'] = ctxt['fid']
self.dorpc(fcall.Twalk(fid=ofid,
newfid=ctxt['fid'],
wname=wname),
next)
next()
def _open(self, path, mode, open):
resp = None
with self.walk(path) as nfid:
fid = nfid
resp = self.dorpc(open(fid))
def cleanup():
self.aclunk(fid)
file = File(self, '/'.join(path), resp, fid, mode, cleanup)
self.files[fid] = file
return file
def _aopen(self, path, mode, open, callback):
resp = None
def next(fid, exc, tb):
def next(resp, exc, tb):
def cleanup():
self.clunk(fid)
file = File(self, '/'.join(path), resp, fid, mode, cleanup)
self.files[fid] = file
self.respond(callback, file)
self.dorpc(open(fid), next, callback)
self.awalk(path, next, callback)
def aopen(self, path, callback=True, mode=OREAD):
assert callable(callback)
path = self.splitpath(path)
def open(fid):
return fcall.Topen(fid=fid, mode=mode)
return self._aopen(path, mode, open, callback)
def acreate(self, path, callback=True, mode=OREAD, perm=0):
path = self.splitpath(path)
name = path.pop()
def open(fid):
return fcall.Tcreate(fid=fid, mode=mode, name=name, perm=perm)
if not callable(callback):
def callback(resp, exc, tb):
if resp:
resp.close()
return self._aopen(path, mode, open, async)
def aremove(self, path, callback=True):
path = self.splitpath(path)
def next(fid, exc, tb):
self.dorpc(fcall.Tremove(fid=fid), callback)
self.awalk(path, next, callback)
def astat(self, path, callback):
path = self.splitpath(path)
def next(fid, exc, tb):
def next(resp, exc, tb):
callback(resp.stat, exc, tb)
self.dorpc(fcall.Tstat(fid=fid), next, callback)
@awithfile()
def aread(self, (file, exc, tb), callback, *args, **kwargs):
if exc:
callback(file, exc, tb)
else:
file.aread(wrap_callback(callback, file), *args, **kwargs)
@awithfile(mode=OWRITE)
def awrite(self, (file, exc, tb), data, callback=True, *args, **kwargs):
if exc:
self.respond(callback, file, exc, tb)
else:
file.awrite(data, wrap_callback(callback, file), *args, **kwargs)
@awithfile()
def areadlines(self, (file, exc, tb), fn):
def callback(resp):
if resp is None:
file.close()
if fn(resp) is False:
file.close()
return False
if exc:
callback(None)
else:
file.sreadlines(callback)
class File(client.File):
@staticmethod
def respond(callback, data, exc=None, tb=None):
if callable(callback):
callback(data, exc, tb)
def stat(self, callback):
def next(resp, exc, tb):
callback(resp.stat, exc, tb)
resp = self.dorpc(fcall.Tstat(), next, callback)
def aread(self, callback, count=None, offset=None, buf=''):
ctxt = dict(res=[], count=self.iounit, offset=self.offset)
if count is not None:
ctxt['count'] = count
if offset is not None:
ctxt['offset'] = offset
def next(resp=None, exc=None, tb=None):
if resp and resp.data:
ctxt['res'].append(resp.data)
ctxt['offset'] += len(resp.data)
if ctxt['count'] == 0:
if offset is None:
self.offset = ctxt['offset']
return callback(''.join(ctxt['res']), exc, tb)
n = min(ctxt['count'], self.iounit)
ctxt['count'] -= n
self.dorpc(fcall.Tread(offset=ctxt['offset'], count=n),
next, callback)
next()
def areadlines(self, callback):
ctxt = dict(last=None)
def next(data, exc, tb):
res = True
if data:
lines = data.split('\n')
if ctxt['last']:
lines[0] = ctxt['last'] + lines[0]
for i in range(0, len(lines) - 1):
res = callback(lines[i])
if res is False:
break
ctxt['last'] = lines[-1]
if res is not False:
self.aread(next)
else:
if ctxt['last']:
callback(ctxt['last'])
callback(None)
self.aread(next)
def awrite(self, data, callback, offset=None):
ctxt = dict(offset=self.offset, off=0)
if offset is not None:
ctxt['offset'] = offset
def next(resp=None, exc=None, tb=None):
if resp:
ctxt['off'] += resp.count
ctxt['offset'] += resp.count
if ctxt['off'] < len(data):
n = min(len(data), self.iounit)
self.dorpc(fcall.Twrite(offset=ctxt['offset'],
data=data[ctxt['off']:ctxt['off']+n]),
next, callback)
else:
if offset is None:
self.offset = ctxt['offset']
self.respond(callback, ctxt['off'], exc, tb)
next()
def aremove(self, callback=True):
def next(resp, exc, tb):
self.close()
self.respond(resp and True, exc, tb)
self.dorpc(fcall.Tremove(), next)
# vim:se sts=4 sw=4 et:

View File

@ -1,5 +1,4 @@
# Copyright (C) 2007 Kris Maglione
# See PERMISSIONS
# Copyright (C) 2009 Kris Maglione
import operator
import os
@ -44,6 +43,12 @@ class RPCError(Exception):
class Client(object):
ROOT_FID = 0
@staticmethod
def respond(callback, data, exc=None, tb=None):
if callable(callback):
callback(data, exc, tb)
def __enter__(self):
return self
def __exit__(self, *args):
@ -89,31 +94,50 @@ class Client(object):
self.mux.fd.close()
self.mux = None
def dorpc(self, req):
resp = self.mux.rpc(req)
if isinstance(resp, fcall.Rerror):
raise RPCError, "RPC returned error (%s): %s" % (
req.__class__.__name__, resp.ename)
if req.type != resp.type ^ 1:
raise ProtocolException, "Missmatched RPC message types: %s => %s" % (
req.__class__.__name__, resp.__class__.__name__)
return resp
def dorpc(self, req, callback=None, error=None):
def doresp(resp):
if isinstance(resp, fcall.Rerror):
raise RPCError, "%s[%d] RPC returned error: %s" % (
req.__class__.__name__, resp.tag, resp.ename)
if req.type != resp.type ^ 1:
raise ProtocolException, "Missmatched RPC message types: %s => %s" % (
req.__class__.__name__, resp.__class__.__name__)
return resp
def next(mux, resp):
try:
res = doresp(resp)
except Exception, e:
if error:
self.respond(error, None, e, None)
else:
self.respond(callback, None, e, None)
else:
self.respond(callback, res)
if not callback:
return doresp(self.mux.rpc(req))
self.mux.rpc(req, next)
def splitpath(self, path):
return [v for v in path.split('/') if v != '']
def getfid(self):
with self.lock:
if len(self.fids):
if self.fids:
return self.fids.pop()
else:
self.lastfid += 1
return self.lastfid
self.lastfid += 1
return self.lastfid
def putfid(self, fid):
with self.lock:
self.files.pop(fid)
self.fids.append(fid)
def aclunk(self, fid, callback=None):
def next(resp, exc, tb):
if resp:
self.putfid(fid)
self.respond(callback, resp, exc, tb)
self.dorpc(fcall.Tclunk(fid=fid), next)
def clunk(self, fid):
try:
self.dorpc(fcall.Tclunk(fid=fid))
@ -133,9 +157,9 @@ class Client(object):
@apply
class Res:
def __enter__(self):
def __enter__(res):
return fid
def __exit__(self, exc_type, exc_value, traceback):
def __exit__(res, exc_type, exc_value, traceback):
if exc_type:
self.clunk(fid)
return Res
@ -148,20 +172,20 @@ class Client(object):
resp = self.dorpc(open(fid))
def cleanup():
self.clunk(fid)
file = File(self, resp, fid, mode, cleanup)
self.aclunk(fid)
file = File(self, '/'.join(path), resp, fid, mode, cleanup)
self.files[fid] = file
return file
def open(self, path, mode = OREAD):
def open(self, path, mode=OREAD):
path = self.splitpath(path)
def open(fid):
return fcall.Topen(fid=fid, mode=mode)
return self._open(path, mode, open)
def create(self, path, mode = OREAD, perm = 0):
def create(self, path, mode=OREAD, perm=0):
path = self.splitpath(path)
name = path.pop()
@ -209,22 +233,23 @@ class File(object):
def __exit__(self, *args):
self.close()
def __init__(self, client, fcall, fid, mode, cleanup):
def __init__(self, client, path, fcall, fid, mode, cleanup):
self.lock = RLock()
self.client = client
self.path = path
self.fid = fid
self.cleanup = cleanup
self.mode = mode
self.iounit = fcall.iounit
self.qid = fcall.qid
self.closed = False
self.offset = 0
self.fd = None
def dorpc(self, fcall):
def dorpc(self, fcall, async=None, error=None):
if hasattr(fcall, 'fid'):
fcall.fid = self.fid
return self.client.dorpc(fcall)
return self.client.dorpc(fcall, async, error)
def stat(self):
resp = self.dorpc(fcall.Tstat())
@ -262,6 +287,9 @@ class File(object):
if not data:
break
lines = data.split('\n')
if last:
lines[0] = last + lines[0]
last = None
for i in range(0, len(lines) - 1):
yield lines[i]
last = lines[-1]
@ -300,15 +328,13 @@ class File(object):
yield s
def close(self):
try:
if self.fd:
self.fd_close()
finally:
self.cleanup()
self.tg = None
self.fid = None
self.client = None
self.qid = None
assert not self.closed
self.closed = True
self.cleanup()
self.tg = None
self.fid = None
self.client = None
self.qid = None
def remove(self):
try:
@ -319,10 +345,4 @@ class File(object):
except Exception:
pass
def fd_close(self):
try:
self.fd.close()
finally:
self.fd = None
# vim:se sts=4 sw=4 et:

View File

@ -61,7 +61,7 @@ class Rattach(Fcall):
class Terror(Fcall):
def __init__(self):
raise Error("Illegal 9P tag 'Terror' encountered")
raise Exception("Illegal 9P tag 'Terror' encountered")
class Rerror(Fcall):
ename = String()

View File

@ -74,13 +74,9 @@ class Date(Int):
def marshall(self, val):
return self.encode(int(val.strftime('%s')))
# To do: use unicode strings, ensure UTF-8.
# Not a problem in Python 3K, but there the other
# data blobs are.
class String(Int):
class Data(Int):
def __init__(self, size=2):
super(String, self).__init__(size)
super(Data, self).__init__(size)
def unmarshall(self, data, offset):
n = self.decode(data, offset)
offset += self.size
@ -89,8 +85,18 @@ class String(Int):
def marshall(self, val):
return [self.encode(len(val)), val]
class Data(String):
pass
# Note: Py3K strings are Unicode by default. They can't store binary
# data.
class String(Data):
def unmarshall(self, data, offset):
off, val = super(String, self).unmarshall(data, offset)
return off, val.decode('UTF-8')
def marshall(self, val):
if isinstance(val, str):
str.decode('UTF-8')
else:
val = val.encode('UTF-8')
return super(String, self).marshall(val)
class Array(Int):
def __init__(self, size, spec):

View File

@ -14,6 +14,9 @@
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
import sys
import traceback
from pyxp import fields
from pyxp.dial import dial
from threading import *
@ -22,13 +25,14 @@ Condition = Condition().__class__
__all__ = 'Mux',
class Mux(object):
def __init__(self, con, process, mintag=0, maxtag=1<<16 - 1):
def __init__(self, con, process, flush=None, mintag=0, maxtag=1<<16 - 1):
self.queue = set()
self.lock = RLock()
self.rendez = Condition(self.lock)
self.outlock = RLock()
self.inlock = RLock()
self.process = process
self.flush = flush
self.wait = {}
self.free = set(range(mintag, maxtag))
self.mintag = mintag
@ -42,60 +46,78 @@ class Mux(object):
if self.fd is None:
raise Exception("No connection")
def rpc(self, dat):
r = self.newrpc(dat)
def mux(self, rpc):
try:
rpc.waiting = True
self.lock.acquire()
while self.muxer and self.muxer != r and r.data is None:
r.wait()
while self.muxer and self.muxer != rpc and rpc.data is None:
rpc.wait()
if r.data is None:
if self.muxer and self.muxer != r:
self.fail()
self.muxer = r
if rpc.data is None:
assert not self.muxer or self.muxer is rpc
self.muxer = rpc
self.lock.release()
try:
while r.data is None:
while rpc.data is None:
data = self.recv()
if data is None:
self.lock.acquire()
self.queue.remove(r)
self.queue.remove(rpc)
raise Exception("unexpected eof")
self.dispatch(data)
self.lock.acquire()
finally:
self.lock.acquire()
self.electmuxer()
self.puttag(r)
except Exception, e:
import sys
import traceback
traceback.print_exc(sys.stdout)
print e
if self.flush:
self.flush(self, rpc.data)
raise e
finally:
if self.lock._is_owned():
self.lock.release()
return r.data
if rpc.async:
if callable(rpc.async):
rpc.async(self, rpc.data)
else:
return rpc.data
def rpc(self, dat, async=None):
rpc = self.newrpc(dat, async)
if async:
with self.lock:
if self.muxer is None:
self.electmuxer()
else:
return self.mux(rpc)
def electmuxer(self):
async = None
for rpc in self.queue:
if self.muxer != rpc and rpc.async == False:
self.muxer = rpc
rpc.notify()
return
if self.muxer != rpc:
if rpc.async:
async = rpc
else:
self.muxer = rpc
rpc.notify()
return
self.muxer = None
if async:
self.muxer = async
Thread(target=self.mux, args=(async,)).start()
def dispatch(self, dat):
tag = dat.tag - self.mintag
r = None
tag = dat.tag
rpc = None
with self.lock:
r = self.wait.get(tag, None)
if r is None or r not in self.queue:
print "bad rpc tag: %u (no one waiting on it)" % dat.tag
rpc = self.wait.get(tag, None)
if rpc is None or rpc not in self.queue:
#print "bad rpc tag: %u (no one waiting on it)" % dat.tag
return
self.queue.remove(r)
r.data = dat
r.notify()
self.puttag(rpc)
self.queue.remove(rpc)
rpc.dispatch(dat)
def gettag(self, r):
tag = 0
@ -111,16 +133,13 @@ class Mux(object):
self.wait[tag] = r
r.tag = tag
r.data.tag = r.tag
r.data = None
r.orig.tag = r.tag
return r.tag
def puttag(self, r):
t = r.tag
if self.wait.get(t, None) != r:
self.fail()
del self.wait[t]
self.free.add(t)
def puttag(self, rpc):
if rpc.tag in self.wait:
del self.wait[rpc.tag]
self.free.add(rpc.tag)
self.rendez.notify()
def send(self, dat):
@ -128,17 +147,20 @@ class Mux(object):
n = self.fd.send(data)
return n == len(data)
def recv(self):
data = self.fd.recv(4)
if data:
len = fields.Int.decoders[4](data, 0)
data += self.fd.recv(len - 4)
return self.process(data)
try:
with self.inlock:
data = self.fd.recv(4)
if data:
len = fields.Int.decoders[4](data, 0)
data += self.fd.recv(len - 4)
return self.process(data)
except Exception, e:
traceback.print_exc(sys.stdout)
print repr(data)
return None
def fail():
raise Exception()
def newrpc(self, dat):
rpc = Rpc(self, dat)
def newrpc(self, dat, async=None):
rpc = Rpc(self, dat, async)
tag = None
with self.lock:
@ -153,11 +175,19 @@ class Mux(object):
self.puttag(rpc)
class Rpc(Condition):
def __init__(self, mux, data):
def __init__(self, mux, data, async=None):
super(Rpc, self).__init__(mux.lock)
self.mux = mux
self.orig = data
self.data = None
self.waiting = False
self.async = async
def dispatch(self, data=None):
self.data = data
self.waiting = True
self.async = False
if not self.async or self.waiting:
self.notify()
elif callable(self.async):
Thread(target=self.async, args=(self.mux, data)).start()
# vim:se sts=4 sw=4 et:

View File

@ -65,7 +65,7 @@ def unresponsive_client(client):
# End Configuration
client.write('/event', 'Start wmiirc')
client.awrite('/event', 'Start wmiirc')
tags = Tags()
bind_events({
@ -105,7 +105,7 @@ class Actions(events.Actions):
def exec_(self, args=''):
wmii['exec'] = args
def exit(self, args=''):
client.write('/event', 'Quit')
client.awrite('/event', 'Quit')
program_menu = Menu(histfile='%s/history.prog' % confpath[0], nhist=5000,
action=curry(call, 'wmiir', 'setsid',
@ -140,7 +140,7 @@ class Notice(Button):
self.label = ''
def write(self, notice):
client.write('/event', 'Notice %s' % notice.replace('\n', ' '))
client.awrite('/event', 'Notice %s' % notice.replace('\n', ' '))
def show(self, notice):
if self.timer: