import regex import string import StringIO import sys import counter import default_handler import producers import os, sys, time def log(*args): sys.stderr.write("[" + time.strftime("%m/%d %T", time.localtime(time.time())) + "] ") for arg in args: sys.stderr.write(str(arg) + " ") sys.stderr.write("\n") from default_handler import split_path, unquote, get_header import thread import threading # From threading.py:_test() class BoundedQueue (threading._Verbose): def __init__(self, limit): threading._Verbose.__init__ (self, limit) self.mon = threading.RLock() self.rc = threading.Condition(self.mon) self.wc = threading.Condition(self.mon) self.limit = limit self.queue = [] def put(self, item): self.mon.acquire() while len(self.queue) >= self.limit: #self._note("put(%s): queue full", item) self.wc.wait() self.queue.append(item) #self._note("put(%s): appended, length now %d", item, len(self.queue)) self.rc.notify() self.mon.release() def get(self): self.mon.acquire() while not self.queue: #self._note("get(): queue empty") self.rc.wait() item = self.queue[0] del self.queue[0] #self._note("get(): got %s, %d left", item, len(self.queue)) self.wc.notify() self.mon.release() return item def make_database_proxy(): import MySQL db = MySQL.connect() #db.selectdb ('test') return db # <db_thread> manages access to a database proxy. # It pulls thunks out of the thread_wait_queue for execution. class db_thread (threading.Thread): def __init__ (self, db, queue): # database proxy self.db = db # thread wait queue self.queue = queue threading.Thread.__init__ (self) self.setDaemon (1) def run (self): while 1: function, (env, sin, sout) = self.queue.get() ## function (self.db, env, sin, sout) function (env, sin, sout) # call close() on the output file. sout.close() header2env= { 'Content-Length' : 'CONTENT_LENGTH', 'Content-Type' : 'CONTENT_TYPE', 'Referer' : 'HTTP_REFERER', 'User-Agent' : 'HTTP_USER_AGENT', 'Accept' : 'HTTP_ACCEPT', 'Accept-Charset' : 'HTTP_ACCEPT_CHARSET', 'Accept-Language' : 'HTTP_ACCEPT_LANGUAGE', 'Host' : 'HTTP_HOST', 'Connection' : 'CONNECTION_TYPE', # 'Pragma' : None, 'Authorization' : 'HTTP_AUTHORIZATION', 'Cookie' : 'HTTP_COOKIE', } # convert keys to lower case for case-insensitive matching for (key,value) in header2env.items(): del header2env[key] key=string.lower(key) header2env[key]=value # Handler for Medusa's HTTP server. # Identifies script URI's and queues them for execution. class script_handler: def __init__ (self, document_root="", queue_size=100, worker_threads=5): self.modules = {} self.document_root = document_root # if uri_base is unspecified, assume it # starts with the published module name self.hits = counter.counter() self.exceptions = counter.counter() self.thread_wait_queue = BoundedQueue (queue_size) for i in range(worker_threads): t = db_thread ( #make_database_proxy(), None, self.thread_wait_queue ) t.start() def add_module (self, module, *names): if not names: names = ["/%s" % module.__name__] for name in names: self.modules['/'+name] = module def match (self, request): uri = request.uri i = string.find(uri, "/", 1) if i != -1: uri = uri[:i] i = string.find(uri, "?", 1) if i != -1: uri = uri[:i] if self.modules.has_key (uri): request.module = self.modules[uri] return 1 else: return 0 def handle_request (self, request): [path, params, query, fragment] = split_path (request.uri) while path and path[0] == '/': path = path[1:] if '%' in path: path = unquote (path) env = {} if 1: env['REQUEST_URI'] = "/" + path env['REQUEST_METHOD'] = string.upper(request.command) env['SERVER_PORT'] = str(request.channel.server.port) env['SERVER_NAME'] = request.channel.server.server_name env['SERVER_SOFTWARE'] = request['Server'] env['DOCUMENT_ROOT'] = self.document_root parts = string.split(path, "/") env['SCRIPT_NAME'] = "/" + parts[0] # are script_name and path_info ok? if query and query[0] == "?": query = query[1:] env['QUERY_STRING'] = query try: path_info="/" + string.join(parts[1:], "/") except: path_info='' env['PATH_INFO'] = path_info env['GATEWAY_INTERFACE']='CGI/1.1' # what should this really be? env['REMOTE_ADDR'] =request.channel.addr[0] env['REMOTE_HOST'] =request.channel.addr[0] #what should this be? for header in request.header: [key,value]=string.split(header,": ",1) key=string.lower(key) if header2env.has_key(key): if header2env[key]: env[header2env[key]]=value else: key='HTTP_' + string.upper(string.join(string.split(key,"-"),"_")) env[key]=value ## remove empty environment variables for key in env.keys(): if env[key]=="" or env[key]==None: del env[key] try: httphost = env['HTTP_HOST'] parts = string.split(httphost,":") env['HTTP_HOST'] = parts[0] except KeyError: pass if request.command in ('post', 'put'): request.collector=input_collector (self, request, env) request.channel.set_terminator (None) else: sin=StringIO.StringIO ('') self.continue_request(sin, request, env) def continue_request (self, stdin, request, env): request.channel.del_channel() request.channel.socket.setblocking (0) # This was placed here by match() above. module = request.module ## reload(module) if 1: stdout = trans_producer_file (request) else: stdout = producer_file (request) # not a file_producer! self.thread_wait_queue.put ((module.main, (env, stdin, stdout))) HEADER_LINE = regex.compile ('\([A-Za-z0-9-]+\): \(.*\)') def strip_eol (l): while l and l[-1] in '\r\n': l = l[:-1] return l # A File-Like object that buffers and eagerly feeds data to a # non-blocking socket. When closed, it will hand the socket back to # Medusa if there is still pending data. class producer_file: def __init__ (self, request): self.count = 0 self.buffer = '' self.socket = request.channel.socket self.request = request self.got_header = 0 self.request['Content-Type'] = 'text/html' def write (self, data): if self.got_header: self._write (data) else: # CGI scripts may optionally provide extra headers. # # If they do not, then the output is assumed to be # text/html, with an HTTP reply code of '200 OK'. # # If they do, we need to scan those headers for one in # particular: the 'Status:' header, which will tell us # to use a different HTTP reply code [like '302 Moved'] # self.buffer = self.buffer + data lines = string.split (self.buffer, '\n') # look for something un-header-like for i in range(len(lines)): if i == (len(lines)-1): if lines[i] == '': break elif HEADER_LINE.match (lines[i]) == -1: # this is not a header line. self.got_header = 1 self.buffer = self.build_header (lines[:i]) # rejoin the rest of the data self._write (string.join (lines[i:], '\n')) break def build_header (self, lines): status = '200 OK' for line in lines: if line[:8] == 'Status: ': status = line[8:] lines.insert (0, 'HTTP/1.0 %s' % status) lines.append ('Server: ' + self.request['Server']) lines.append ('Date: ' + self.request['Date']) lines.append ("Content-Type: %s" % (self.request['Content-Type'])) return string.join (lines, '\r\n')+'\r\n\r\n' def _write (self, data): # append <data> to our outgoing buffer. self.buffer = self.buffer + data # fill the system's TCP buffers by writing as much as it will let us. while 1: n = min (8192, len (self.buffer)) d = self.buffer[:n] sent = self.socket.send (d) # count outgoing bytes self.count = self.count + sent if not sent: break else: self.buffer = self.buffer[n:] def writelines(self, list): self.write (string.join (list, '')) t_sent = 0 m_sent = 0 def flush(self): pass def close (self): # module.main() has finished. # queue remaining output back to medusa. producer_file.t_sent = producer_file.t_sent + self.count self.request.log(self.count) if self.buffer: producer_file.m_sent = producer_file.m_sent + len(self.buffer) # give the channel back to medusa #self.socket.setblocking (0) self.request.channel.add_channel() self.request.channel.push_with_producer ( producers.globbing_producer ( producers.simple_producer ( self.buffer ) ) ) self.request.channel.shutdown(2) self.request.channel.current_request = None self.request.channel.close_when_done() else: # no remaining data, close self.socket.shutdown(1) self.socket.close() self.request.channel.close() import trans class trans_producer_file(producer_file): def __init__ (self, request, lang="english"): self.SetLanguage(lang) producer_file.__init__(self, request) def SetLanguage(self, lang): self.lang = lang def _write (self, data): # append <data> to our outgoing buffer. self.buffer = self.buffer + data # fill the system's TCP buffers by writing as much as it will let us. while 1: n = min (65536, len (self.buffer)) d = self.buffer[:n] d = trans.translate(self.lang, d) sent = self.socket.send (d) # count outgoing bytes self.count = self.count + sent if not sent: break else: self.buffer = self.buffer[n:] CONTENT_LENGTH = regex.compile ('Content-Length: \([0-9]+\)', regex.casefold) class input_collector: "gathers input for put and post requests" def __init__ (self, handler, request, env): self.handler = handler self.env = env self.request = request self.data = StringIO.StringIO() # make sure there's a content-length header self.cl = get_header (CONTENT_LENGTH, request.header) if not self.cl: request.error(411) return else: self.cl = string.atoi(self.cl) def collect_incoming_data (self, data): self.data.write(data) if self.data.tell() >= self.cl: self.data.seek(0) h=self.handler r=self.request # set the terminator back to the default self.request.channel.set_terminator ('\r\n\r\n') del self.handler del self.request h.continue_request(self.data,r, self.env)