"""Module implementing a SQLite based repository for cfvers"""
# Copyright 2003-2005 Iustin Pop
#
# This file is part of cfvers.
#
# cfvers is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# cfvers is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with cfvers; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# $Id: r_sqlite.py 218 2005-10-30 09:26:23Z iusty $
from pysqlite2 import dbapi2 as sqlite
#import sqlite
import os.path
import fnmatch
import cfvers.repository.sql
import cfvers
from cfvers.repository.sql import Param
from cfvers.main import ProgrammingException
class RSqlite(cfvers.repository.sql.RSql):
def __init__(self, create=False, cnxargs=None, createopts=None):
self._quote = self._quote_qmark
self.conn = sqlite.connect(cnxargs)
self.backend = "sqlite"
self.cnxargs = cnxargs
if create:
self._create(createopts=createopts)
cursor = self.conn.cursor()
self._check_schema(cursor)
cursor.close()
self.conn.create_function("basename", 1, os.path.basename)
self.conn.create_function("fnmatch", 2, fnmatch.filter)
return
def getAreas(self):
cursor = self.conn.cursor()
cursor.execute("select name, root, ctime, description from areas")
c2 = self.conn.cursor()
areas = []
for row in cursor.fetchall():
c2.execute("select count(*) from items where items.area = ?", (row[0],))
nitems = int(c2.fetchone()[0])
c2.execute("select max(revno) from revisions where area = ?", (row[0],))
revno = c2.fetchone()[0]
if revno is not None:
revno = int(revno)
areas.append(cfvers.Area(name=row[0], root=row[1], ctime=row[2],
numitems=nitems, revno=revno,
description=row[3]))
cursor.close()
c2.close()
return areas
def getArea(self, name):
cursor = self.conn.cursor()
self._exec(cursor, "select name, root, ctime, description from areas where name = ", Param(name))
row = cursor.fetchone()
if row is None:
return None
c2 = self.conn.cursor()
self._exec(c2, "select count(*) from items where items.area = ", Param(name))
nitems = int(c2.fetchone()[0])
self._exec(c2, "select max(revno) from revisions where area = ", Param(name))
revno = c2.fetchone()[0]
if revno is not None:
revno = int(revno)
area = cfvers.Area(name=row[0], root=row[1], ctime=row[2],
numitems=nitems, revno=revno,
description=row[3])
cursor.close()
c2.close()
return area
def TimestampFromMx(mxtimestamp):
return str(mxtimestamp)
TimestampFromMx = staticmethod(TimestampFromMx)
def getEntries(self, options):
mall = getattr(options, "allentries", False)
options.allentries = True
elist = list(super(type(self), self).getEntries(options))
if not mall:
em = {}
for e in elist:
# FIXME maybe key is only e.item since item unique over areas
key = (e.areaname, e.item)
em[key] = max(em.get(key, -1), e.revno)
nlist = []
for e in elist:
key = (e.areaname, e.item)
if e.revno == em[key]:
nlist.append(e)
elist = nlist
options.allentries = mall
return elist
def _encode_binary(value):
if not isinstance(value, str):
raise ProgrammingException("Invalid type passed to _encode_binary: %s" % type(value))
return buffer(value)
_encode_binary = staticmethod(_encode_binary)
def _decode_binary(value):
return str(value)
_decode_binary = staticmethod(_decode_binary)
def putRevision(self, r):
"""Adds a Revision to the database and fills its revno
It is important to note that only this function can return a
valid new revision number. Until stored in the repository, the
new revision number cannot be safely determined.
"""
cursor = self.conn.cursor()
# Lock the areas row in order that the next query performs correctly
self._exec(cursor, "select * from areas where name = ", Param(r.area))
self._exec(cursor, "insert into revisions (area, revno, logmsg, ctime, uid, gid, uname, gname, commiter, server) values (", Param(r.area), ", (select coalesce(max(revno), 0)+1 from revisions where area = ", Param(r.area), "), ", Param(r.logmsg), ",", Param(self.TimestampFromMx(r.ctime)), ",", Param(r.uid), ",", Param(r.gid), ",", Param(r.uname), ",", Param(r.gname), ",", Param(r.commiter), ",", Param(r.server), ")")
oid = cursor.lastrowid
self._exec(cursor, "select revno from revisions where oid = ", Param(oid))
row = cursor.fetchone()
r.revno = row[0]
cursor.close()
return r
def quotestr(value):
v = value.replace("'", "''")
v = v.replace("\\", "\\\\")
return "'%s'" % v
quotestr = staticmethod(quotestr)
syntax highlighted by Code2HTML, v. 0.9.1