"""basic cfvers classes
This module implements the basic cfvers objects, in a
repository-independent way.
"""
# Copyright 2003-2005 Iustin Pop
#
# This file is part of cfvers.
#
# cfvers is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# cfvers is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with cfvers; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# $Id: main.py 218 2005-10-30 09:26:23Z iusty $
import os, struct, stat, os.path, re, commands, sys
import base64
import types
import random
import difflib
import time
import bz2
import sha
import errno
import string
import pwd
import grp
from cStringIO import StringIO
from mx import DateTime
import popen2
import datetime
import cfvers
__all__ = [
"Area", "Item", "Entry", "Revision", "forcejoin", "rexists",
"CfversException", "ConfigException", "RepositoryException",
"CommException", "Result", "ParsingException",
"OperationError"
]
def forcejoin(a, *p):
"""Join two or more pathname components, considering them all relative"""
path = a
for b in p:
if path[-1:] == '/' and b[:1] == '/':
path = path + b[1:]
elif path[-1:] != '/' and b[:1] != '/':
path = path + '/' + b
else:
path = path + b
return path
def rexists(filename):
"""Checks if a file exists, even if is a broken symlink"""
try:
os.lstat(filename)
except OSError, e:
if e.errno == errno.ENOENT:
return False
return True
class Area(object):
"""Class implementing an area object.
An area is a independent group of versioned items in a
repository. You will usually use more than one area for different
servers in a networked repository, and/or for multiple chroot
jails on one server.
"""
__slots__ = ["name", "revno", "_ctime", "root",
"description", "numitems", "revno"]
def __init__(self, name=None,
description="", root="/", ctime = None,
numitems=0, revno=0):
"""Constructor for the Area class"""
self.name = name
self.ctime = ctime or DateTime.utc()
self.description = description
self.root = root
self.numitems = int(numitems)
self.revno = revno
return
def _set_ctime(self, val):
if isinstance(val, types.StringTypes):
val = DateTime.ISO.ParseDateTime(val)
elif isinstance(val, datetime.datetime):
val = DateTime.ISO.ParseDateTime(val.isoformat())
self._ctime = val
def _get_ctime(self):
return self._ctime
ctime = property(_get_ctime, _set_ctime, None,
"The creation time of this area")
class Item(object):
__slots__ = ["id", "area", "name", "ctime", "dirname", "flags", "command"]
STORE_METADATA = 1 << 0
STORE_CHECKSUM = 1 << 1
STORE_CONTENTS = 1 << 2
STORE_VIRTUAL = 1 << 3
STORE_FILE = (STORE_METADATA | STORE_CHECKSUM | STORE_CONTENTS)
def __init__(self, id=-1, area=None, name=None,
ctime=None, dirname=None, flags=None,
command=None):
if name is None or not name.startswith("/"):
raise ValueError, "Invalid name '%s'" % name
self.id = id
self.area = area
self.name = name
if flags is None:
self.flags = Item.STORE_METADATA | Item.STORE_CHECKSUM | Item.STORE_CONTENTS
else:
self.flags = flags
if self.flags & Item.STORE_VIRTUAL and command is None:
raise ValueError("Command not specified but flags denotes a virtual")
self.command = command
if dirname is None:
self.dirname = os.path.dirname(name)
else:
self.dirname = dirname
if ctime is None:
self.ctime = DateTime.utc()
elif isinstance(ctime, datetime.datetime):
ctime = ctime.isoformat()
self.ctime = DateTime.ISO.ParseDateTime(ctime)
elif not isinstance(ctime, DateTime.DateTimeType):
self.ctime = DateTime.ISO.ParseDateTime(ctime)
else:
self.ctime = ctime
return
def __repr__(self):
return "<Item: id #%d, name %s>" % (self.id, self.name)
class Entry(object):
__slots__ = ["item", "revno",
"filename", "filetype",
"filecontents",
"mode",
"mtime", "atime", "ctime",
"inode", "device", "nlink",
"uid", "gid", "uname", "gname",
"sha1sum", "size",
"blocks", "rdev", "blksize",
"areaname", "status",
"exitcode",
]
# Do not test atime, it's irrelevant
# Otherwise, almost like __slots__
DIFFABLE_ATTRS = ('filename', 'filetype', 'mode',
'mtime', 'ctime',
'inode', 'device', 'nlink',
'uid', 'gid', 'uname', 'gname',
'blocks', 'rdev', 'blksize',
'size', 'sha1sum', 'filecontents',
'exitcode')
STATUS_ADDED = "A"
STATUS_MODIFIED = "M"
STATUS_DELETED = "D"
STATUS_FROZEN = "F"
STATUS_MAP = {
STATUS_ADDED: "registered",
STATUS_MODIFIED: "modified",
STATUS_DELETED: "deleted",
STATUS_FROZEN: "frozen",
}
S_IFVIRT = stat.S_IFIFO | stat.S_IFLNK
modemap = {
stat.S_IFDIR: ('directory', 'd', 'd'),
stat.S_IFREG: ('regular file', 'f', '-'),
stat.S_IFLNK: ('symbolic link', 'l', 'l'),
stat.S_IFBLK: ('block device', 'b', 'b'),
stat.S_IFCHR: ('character device', 'c', 'c'),
stat.S_IFIFO: ('pipe', 'p', 'p'),
stat.S_IFSOCK:('socket', 's', 's'),
S_IFVIRT: ('virtual', 'v', 'v'),
None: ('null entry', '?', '?'),
}
def newDeleted(item, revno):
e = Entry()
e.item = item.id
e.revno = revno
e.filename = item.name
e.areaname = item.area
e.status = Entry.STATUS_DELETED
return e
newDeleted = staticmethod(newDeleted)
def newBorn(item, revno):
e = Entry()
e.item = item.id
e.revno = revno
e.filename = item.name
e.areaname = item.area
e.status = Entry.STATUS_ADDED
return e
newBorn = staticmethod(newBorn)
def __init__(self, item=None, revno=None, area=None):
for i in Entry.__slots__:
setattr(self, i, None)
if item is None:
# the init will be done manually
# ugly....
return
self.filename = item.name
self.item = item.id
self.revno = revno
self.areaname = area.name
if item.flags & Item.STORE_FILE:
self._init_fromfile(area, item)
elif item.flags & Item.STORE_VIRTUAL:
self._init_fromvirtual(area, item)
else:
raise ValueError("Invalid item.flags = %s" % item.flags)
def _init_fromvirtual(self, area, item):
p4 = popen2.Popen4(item.command)
p4.tochild.close()
output = p4.fromchild.read()
status = p4.wait()
self.filecontents = output
self.size = len(self.filecontents)
self.exitcode = status
self.status = Entry.STATUS_MODIFIED
self.filetype = Entry.S_IFVIRT
self.mode = self.filetype | stat.S_IRUSR
self.sha1sum = sha.new(self.filecontents).hexdigest()
return
def _init_fromfile(self, area, item):
source = forcejoin(area.root, item.name)
deleted = False
try:
st = os.lstat(source)
except OSError, e:
if e.errno == errno.ENOENT:
deleted = True
else:
raise
if deleted:
self.status = Entry.STATUS_DELETED
return
self.status = Entry.STATUS_MODIFIED
if item.flags & Item.STORE_METADATA == 0:
# Skip the stat call and metadata reading
# This implies that the contents also can't be stored
return
# Read mandatory stat attributes
self.filetype = stat.S_IFMT(st.st_mode)
# Read filecontents, if needed
if item.flags & Item.STORE_CHECKSUM or item.flags & Item.STORE_CONTENTS:
if self.filetype == stat.S_IFREG:
self.filecontents = file(source, "r").read()
elif self.filetype == stat.S_IFDIR:
self.filecontents = "/".join(os.listdir(str(source)))
if isinstance(self.filecontents, unicode):
self.filecontents = self.filecontents.encode('utf-8')
elif self.filetype == stat.S_IFLNK:
self.filecontents = os.readlink(source)
else:
self.filecontents = None
if item.flags & Item.STORE_CHECKSUM and self.filecontents is not None:
psha = sha.new(self.filecontents)
self.sha1sum = psha.hexdigest()
if (item.flags & Item.STORE_CONTENTS == 0) and self.filetype == stat.S_IFREG:
self.filecontents = None
self.inode = st.st_ino
self.device = st.st_dev
self.nlink = st.st_nlink
self.mode = st.st_mode
self.mtime = st.st_mtime
self.atime = st.st_atime
self.ctime = st.st_ctime
self.uid = st.st_uid
try:
self.uname = pwd.getpwuid(self.uid).pw_name
except KeyError:
self.uname = None
self.gid = st.st_gid
try:
self.gname = grp.getgrgid(self.gid).gr_name
except KeyError:
self.gname = None
self.size = st.st_size
# Try to read optional stat components
self.blocks = getattr(st, 'st_blocks', None)
self.blksize = getattr(st, 'st_blksize', None)
if self.filetype == stat.S_IFBLK or self.filetype == stat.S_IFCHR:
self.rdev = getattr(st, 'st_rdev', None)
else:
self.rdev = None
return
def _diffdata(older, newer, ofname=None, nfname=None, oftime=None, nftime=None):
if older is None:
a = []
else:
a = older.splitlines(True)
if newer is None:
b = []
else:
b = newer.splitlines(True)
differ = difflib.unified_diff(
a, b,
ofname, nfname,
oftime, nftime,
)
data = "".join(differ)
return data
_diffdata = staticmethod(_diffdata)
def to_filesys(self, destdir=None, use_dirs=1):
"""Writes the revision entry to the filesystem.
This is one of the most important functions in the whole
software. It tries to restore a given version to the
filesystem, with almost all the attributes intact (ctime can't
be restored, as far as I know).
"""
if self.filecontents is None:
return Result(Result.RETR_NODATA, item=self.item, entry=self)
retval = Result.RETR_ALLOK
retexc = None
self._check_sum()
if destdir is None:
target = self.filename
else:
if use_dirs:
target = forcejoin(destdir, self.filename)
else:
target = os.path.join(destdir, os.path.basename(self.filename))
if self.filetype not in (stat.S_IFREG, stat.S_IFLNK, stat.S_IFCHR,
stat.S_IFBLK, stat.S_IFIFO, stat.S_IFDIR,
Entry.S_IFVIRT):
return Result(Result.RETR_NA, item=self.item, entry=self)
if (self.filetype == stat.S_IFBLK or self.filetype == stat.S_IFCHR) \
and self.rdev is None:
return Result(Result.RETR_RDEV, item=self.item, entry=self)
do_create = True
do_payload = True
do_attrs = True
do_rename = True
if self.filetype == Entry.S_IFVIRT:
do_attrs = False
# check for parent existence
targetbase = os.path.dirname(target)
if not rexists(targetbase):
try:
os.makedirs(targetbase)
except OSError, e:
return Result(Result.RETR_PATHERR, item=self.item, entry=self, exc=e)
if self.filetype == stat.S_IFDIR and rexists(target):
if not os.path.islink(target) and not os.path.isdir(target):
return Result(Result.RETR_INVTRANS, item=self.item, entry=self)
else: # The directory already exists; we must restore ownership and attrs
do_create = False
do_payload = False
do_rename = False
newname = target
else:
if not os.path.islink(target) and os.path.isdir(target):
return Result(Result.RETR_INVTRANS, item=self.item, entry=self)
newname = "%s.%07d" % (target, random.randint(0, 999999))
retries = 0
while rexists(newname) and retries < 1000:
newname = "%s.%07d" % (target, random.randint(0, 999999))
retries += 1
if rexists(newname):
return Result(Result.RETR_TEMPFILE, item=self.item, entry=self)
oldumask = os.umask(0777)
# try...finally for umask restoration
try:
must_remove_new = False
# The ideea is the operation is done in five steps:
# 1. creation of item; can fail; fatal
# 2. if item is file, write contents; can fail; fatal
# 3. change ownership; can fail; non-fatal
# 4. change permissions and timestamps; shouldn't fail
# except for symlinks; fatal
# 5. rename to target; can fail; fatal
# Step 1
if do_create:
try:
if self.filetype == stat.S_IFIFO:
os.mkfifo(newname, 0)
elif self.filetype == stat.S_IFREG or \
self.filetype == Entry.S_IFVIRT:
fd = os.open(newname, os.O_WRONLY|os.O_CREAT|os.O_EXCL|os.O_NOCTTY)
elif self.filetype == stat.S_IFLNK:
os.symlink(self.filecontents, newname)
elif self.filetype == stat.S_IFCHR or self.filetype == stat.S_IFBLK:
os.mknod(newname, self.mode, self.rdev)
elif self.filetype == stat.S_IFDIR:
os.mkdir(newname, 0)
else:
raise TypeError, "Programming error: shouldn't have to handle file type %s!" % self.modemap[self.filetype][0]
except EnvironmentError, e:
return Result(Result.RETR_IOERROR, item=self.item, entry=self, exc=e, descr="while creating file")
else:
must_remove_new = True
# From now on, we must cleanup on exit (done via ...finally)
# Step 2
if do_payload:
try:
if self.filetype == stat.S_IFREG or \
self.filetype == Entry.S_IFVIRT:
os.write(fd, self.filecontents)
os.close(fd)
except EnvironmentError, e:
return Result(Result.RETR_IOERROR, item=self.item, entry=self, exc=e, descr="while writing file")
# Step 3
if do_attrs:
# try to get real uid/gid from saved entries, if available
if self.uname is None:
newuid = self.uid
else:
try:
newuid = pwd.getpwnam(self.uname).pw_uid
except KeyError:
newuid = self.uid
if self.gname is None:
newgid = self.gid
else:
try:
newgid = grp.getgrnam(self.gname).gr_gid
except KeyError:
newgid = self.gid
try:
os.lchown(newname, self.uid, self.gid)
except OSError, e:
retval = Result.RETR_PARTOK
retexc = e
#print >>sys.stderr, "Warning: error '%s' while modifying temporary file ownership." % e
# WARNING: don't chmod for symlinks, it acts on the target!!!
if not self.filetype == stat.S_IFLNK:
# Step 4
try:
os.chmod(newname, self.mode)
os.utime(newname, (self.atime, self.mtime))
except EnvironmentError, e:
return Result(Result.RETR_IOERROR, item=self.item, entry=self, exc=e, descr="while modifying attributes")
elif self.filetype == Entry.S_IFVIRT: # alternate do_attrs for virtuals
os.chmod(newname, stat.S_IRUSR)
# Step 5
if do_rename:
try:
os.rename(newname, target)
except OSError, e:
return Result(Result.RETR_IOERROR, item=self.item, entry=self, exc=e, descr="while renaming")
else:
must_remove_new = False # We managed to finish!
finally:
os.umask(oldumask)
if must_remove_new:
try:
os.unlink(newname)
except OSError, e:
print >>sys.stderr, "Error: while cleaning-up: %s" % e
return Result(retval, item=self.item, entry=self, exc=retexc)
def diff(self, older, options=None):
obuff = []
if not isinstance(older, Entry):
raise TypeError("Invalid diff!")
if self.compare(older, options.checks):
return []
if options.checks is None:
options.checks = self.DIFFABLE_ATTRS
# Either the filetype is the same, and we do content diff,
# or the filetype has changed, and we list only metadata change
if self.filetype != older.filetype:
if 'filetype' in options.checks:
obuff.append(('File type',
older.modemap[older.filetype][0],
self.modemap[self.filetype][0]))
# file type is the same
elif "filecontents" in options.checks:
if self.filetype == stat.S_IFREG:
if self.filecontents != older.filecontents:
if self.printablepayload() and older.printablepayload():
if self.revno is None:
newrev = 'current'
else:
newrev = "rev %s" % self.revno
orev = "rev %s" % older.revno
data = self._diffdata(
older.filecontents, self.filecontents,
older.filename, self.filename,
"%s (%s)" % (time.ctime(older.mtime), orev),
"%s (%s)" % (time.ctime(self.mtime), newrev)
)
if data[-1] == '\n':
data = data[:-1]
obuff.append(('File contents', data))
else:
obuff.append(('File contents', 'binary files differ'))
if self.filetype == Entry.S_IFVIRT:
if self.filecontents != older.filecontents:
if self.printablepayload() and older.printablepayload():
if self.revno is None:
newrev = 'current'
else:
newrev = "rev %s" % self.revno
orev = "rev %s" % older.revno
data = self._diffdata(
older.filecontents, self.filecontents,
older.filename, self.filename,
"(%s)" % (orev,),
"(%s)" % (newrev,)
)
if data[-1] == '\n':
data = data[:-1]
obuff.append(('Command output', data))
else:
obuff.append(('Command output', 'binary output differ'))
elif self.filetype == stat.S_IFDIR:
if self.filecontents != older.filecontents:
olist = older.filecontents.split("/")
nlist = self.filecontents.split("/")
dlist = filter(lambda x: x not in nlist, olist)
alist = filter(lambda x: x not in olist, nlist)
obuff.append(('Directory contents', dlist, alist))
elif self.filetype == stat.S_IFLNK:
if self.filecontents != older.filecontents:
obuff.append(('Symlink target', older.filecontents, self.filecontents))
else:
pass
for i in self.DIFFABLE_ATTRS:
if i not in options.checks or i in ('filetype', 'filecontents'):
continue
oval = getattr(older, i)
nval = getattr(self, i)
if oval != nval:
if hasattr(self, '%s2str' % i):
nval = getattr(self, '%s2str' % i)()
oval = getattr(older, '%s2str' % i)()
obuff.append(('Attribute %s' % i, oval, nval))
return obuff
def __eq__(self, other):
if not isinstance(other, Entry):
return NotImplemented
return self.compare(other)
def compare(self, other, attrset=None):
if not isinstance(other, Entry):
return false
attrlist = list(self.DIFFABLE_ATTRS)
if self.filecontents is None or \
other.filecontents is None:
attrlist.remove('filecontents')
if attrset is not None:
attrlist = [a for a in attrlist if a in attrset]
for i in attrlist:
if getattr(self, i) != getattr(other, i):
return False
return True
def printablepayload(self):
if self.filecontents is None:
# Assume no data is printable as ""
return True
for i in self.filecontents:
if i not in string.printable:
return False
return True
def isdir(self):
return self.filetype == stat.S_IFDIR
def isreg(self):
return self.filetype == stat.S_IFREG
def islnk(self):
return self.filetype == stat.S_IFLNK
def isblk(self):
return self.filetype == stat.S_ISBLK
def ischr(self):
return self.filetype == stat.S_ISCHR
def ififo(self):
return self.filetype == stat.S_IFIFO
def ifsock(self):
return self.filetype == stat.S_IFSOCK
def mode2str(self):
def mapbit(mode, bit, y):
if mode & bit:
return y
else:
return '-'
tchar = self.modemap[self.filetype][2]
tchar += mapbit(self.mode, stat.S_IRUSR, 'r')
tchar += mapbit(self.mode, stat.S_IWUSR, 'w')
if self.mode & stat.S_ISUID:
if self.mode & stat.S_IXUSR:
tchar += 's'
else:
tchar += 'S'
else:
tchar += mapbit(self.mode, stat.S_IXUSR, 'x')
tchar += mapbit(self.mode, stat.S_IRGRP, 'r')
tchar += mapbit(self.mode, stat.S_IWGRP, 'w')
if self.mode & stat.S_ISGID:
if self.mode & stat.S_IXGRP:
tchar += 's'
else:
tchar += 'S'
else:
tchar += mapbit(self.mode, stat.S_IXGRP, 'x')
tchar += mapbit(self.mode, stat.S_IROTH, 'r')
tchar += mapbit(self.mode, stat.S_IWOTH, 'w')
if self.mode & stat.S_ISVTX:
if self.mode & stat.S_IXOTH:
tchar += 't'
else:
tchar += 'T'
else:
tchar += mapbit(self.mode, stat.S_IXOTH, 'x')
return tchar
def mtime2str(self):
return time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime(self.mtime))
def ctime2str(self):
return time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime(self.ctime))
def atime2str(self):
return time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime(self.atime))
def exitcode2str(self):
code = self.exitcode
if os.WIFSTOPPED(code):
return "has been stopped (signal %s)" % os.WSTOPSIG(code)
elif os.WIFSIGNALED(code):
return "exited as a result of a signal %s" % os.WTERMSIG(code)
elif os.WIFEXITED(code):
return "finished with exit code %s" % os.WEXITSTATUS(code)
return "unknown status %s" % code
def _check_sum(self):
psha = sha.new(self.filecontents)
psum = psha.hexdigest()
if psum != self.sha1sum:
raise ValueError, "Invalid checksum in file contents (%s != %s)!" % (psum, self.sha1sum)
return
def stat(self):
obuf=StringIO()
obuf.write(" File: `%s'\n" % self.filename)
if self.status != Entry.STATUS_MODIFIED:
obuf.write("Status: %s\n" % Entry.STATUS_MAP[self.status])
return obuf.getvalue()
obuf.write(" Size: %-15d Blocks: " % self.size)
if self.blocks is None:
obuf.write("%-10s " % "N/A")
else:
obuf.write("%-10d " % self.blocks)
obuf.write("IO Block: ")
if self.blksize is None:
obuf.write("%-6s " % "N/A")
else:
obuf.write("%-6d " % self.blksize)
obuf.write(self.modemap[self.filetype][0])
obuf.write("\n")
if self.device is not None and self.inode is not None \
and self.nlink is not None:
obuf.write("Device: %3xh/%3dd Inode: %-11d Links: %d\n" %
(self.device, self.device, self.inode, self.nlink))
if self.mode is not None and \
self.uid is not None and self.gid is not None:
obuf.write("Access: (%04o/%s) Uid: (%5d/%8s) Gid: (%5d/%8s)\n" %
(stat.S_IMODE(self.mode), self.mode2str(),
self.uid, self.uname or "UNKNOWN",
self.gid, self.gname or "UNKNOWN")
)
for i in (('Access', self.atime),
('Modify', self.mtime),
('Change', self.ctime)):
if i[1] is None:
continue
ovar = DateTime.localtime(i[1])
obuf.write("%s: %s %s\n" % (i[0], ovar.strftime("%Y-%m-%d %T.000000000"), ovar.tz))
if self.exitcode is not None:
obuf.write("Command status: %s" % self.exitcode2str())
return obuf.getvalue()
def __repr__(self):
return "<Entry for %s>" % (self.filename)
class Revision(object):
__slots__ = ["area", "revno", "logmsg", "uid", "gid",
"uname", "gname",
"commiter", "_ctime", "itemids", "server"]
def __init__(self, area=None, logmsg=None, commiter=None,
server=None):
if area is None and logmsg is None:
# manual initialization
return
self.area = area.name
if server is None:
self.server = os.uname()[1]
else:
self.server = server
self.logmsg = logmsg
self._ctime = DateTime.utc()
self.uid = os.getuid()
try:
self.uname = pwd.getpwuid(self.uid).pw_name
except KeyError:
self.uname = None
self.gid = os.getgid()
try:
self.gname = grp.getgrgid(self.uid).gr_name
except KeyError:
self.gname = None
if commiter is None:
try:
self.commiter = os.getlogin()
except OSError:
self.commiter = '<unknown>'
else:
self.commiter = commiter
self.itemids = []
def _set_ctime(self, val):
if isinstance(val, types.StringTypes):
val = DateTime.ISO.ParseDateTime(val)
elif isinstance(val, datetime.datetime):
val = DateTime.ISO.ParseDateTime(val.isoformat())
self._ctime = val
def _get_ctime(self):
return self._ctime
ctime = property(_get_ctime, _set_ctime, None,
"The creation time of this revision")
def collapse_revnos(revs):
"""Transform a list of revnos is a condensed one
Example:
[1,2,3,4,6,8,10,11,13] =>
[(1,4), (6,6), (8,8), (10,11), (13,13)]
"""
revs.sort()
revs = [(x,x) for x in revs]
while True:
nrev = revs[0:1]
for lo, hi in revs[1:]:
if lo == nrev[-1][1] + 1:
nrev[-1] = (nrev[-1][0], hi)
else:
nrev.append((lo, hi))
if len(revs) == len(nrev):
break
revs = nrev
return revs
collapse_revnos = staticmethod(collapse_revnos)
def format_crevnos(revs):
"""Format a collapsed list of revnos
Example:
[(1,4), (6,6), (8,8), (10,11), (13,13)] =>
1-4, 6, 8, 10-11, 13
"""
rl = []
for lo, hi in revs:
if lo == hi:
rl.append(str(lo))
else:
rl.append("%s-%s" % (lo, hi))
return ", ".join(rl)
format_crevnos = staticmethod(format_crevnos)
class CfversException(Exception):
pass
class ConfigException(CfversException):
pass
class RepositoryException(CfversException):
pass
class CommException(CfversException):
pass
class ProgrammingException(CfversException):
pass
class ParsingException(CfversException):
pass
class OperationError(CfversException):
pass
class Result(object):
# Codes for retrieve operation
RETR_ALLOK = 0
RETR_PARTOK = 1
RETR_NTRACK = 2
RETR_IOERROR = 3
RETR_NA = 4
RETR_RDEV = 5
RETR_PATHERR = 6
RETR_INVTRANS = 7
RETR_TEMPFILE = 8
RETR_NOREVS = 9
RETR_NOXREV = 10
RETR_NODATA = 11
#Return codes for store operation
STORED_OK = 1001
STORED_DELETED = 1002
STORED_TOSKIP = 1003
STORED_NOTCHANGED = 1004
STORED_IOERROR = 1005
STORED_NOTREG = 1006
#Return codes for add operation
ADDED_OK = 2001
ADDED_EXISTING = 2002
ADDED_INVALIDNAME = 2003
ADDED_EACCES = 2004
_codeinfo = {
RETR_ALLOK: "Retrieved (fully)",
RETR_PARTOK: "Retrieved (partially)",
RETR_NTRACK: "Skipped (not versioned)",
RETR_IOERROR: "Skipped (error)",
RETR_NA: "Skipped (non-retrievable)",
RETR_RDEV: "Skipped (rdev info missing)",
RETR_PATHERR: "Skipped (path error)",
RETR_INVTRANS: "Skipped (invalid transition)",
RETR_TEMPFILE: "Skipped (can't create tempfile)",
RETR_NOREVS: "Skipped (no revisions)",
RETR_NOXREV: "Skipped (no such revision)",
RETR_NODATA: "Skipped (only metadata versioned)",
STORED_OK: "Stored",
STORED_NOTCHANGED: "Skipped (not changed)",
STORED_IOERROR: "Skipped (error)",
STORED_NOTREG: "Skipped (not registered)",
STORED_DELETED: "Marked deleted",
STORED_TOSKIP: "Skipped (marked to be ignored)",
ADDED_OK: "Registered",
ADDED_EXISTING: "Skipped (already registered)",
ADDED_INVALIDNAME: "Skipped (invalid name)",
ADDED_EACCES: "Skipped (permission denied)",
}
def __init__(self, code, item=None, entry=None,
exc=None, fname=None, descr=None):
self.code = code
self.item = item
self.entry = entry
self.exc = exc
self.descr = descr
self.critical = code not in (self.RETR_ALLOK,
self.RETR_NA,
self.STORED_OK,
self.STORED_NOTCHANGED,
self.STORED_TOSKIP,
self.ADDED_OK,
self.ADDED_EACCES,
)
# Deduct error information (if applicable)
if exc is None:
errinfo = ""
elif hasattr(exc, "strerror") and hasattr(exc, "errno"):
errinfo = "%s (%s)" % (exc.strerror, errno.errorcode[exc.errno])
else:
errinfo = str(exc)
self.errinfo = errinfo
# Deduct file name information
if fname is not None:
self.fname = fname
elif hasattr(exc, "filename") and exc.filename:
self.fname = exc.filename
elif hasattr(entry, "filename") and entry.filename:
self.fname = entry.filename
elif hasattr(item, "name") and item.name:
self.fname = item.name
else:
self.fname = '<unknown file name>'
# Actual message
self.msg = self._codeinfo.get(code, 'Unknown result')
return
def __str__(self):
if len(self.errinfo) > 0:
b = " "
else:
b = ""
return "%s: %s%s'%s'" % (self.msg, self.errinfo, b, self.fname)
syntax highlighted by Code2HTML, v. 0.9.1