# -*- Mode: Python; tab-width: 4 -*- # # Author: Sam Rushing <rushing@nightmare.com> # Copyright 1997 by Sam Rushing # All Rights Reserved. # # This software is provided free for non-commercial use. If you are # interested in using this software in a commercial context, or in # purchasing support, please contact the author. RCS_ID = '$Id: default_extension.py,v 1.1.1.1 1999/01/08 06:58:44 rushing Exp $' import os import regex import stat import string import time import http_date import http_server import mime_type_table import mstatus # This is the 'default' handler. it implements the base set of # features expected of a simple file-delivering HTTP server. file # services are provided through a 'filesystem' object, the very same # one used by the FTP server. # # You can replace or modify this handler if you want a non-standard # HTTP server. You can also derive your own handler classes from # it. # # support for handling POST requests is available in the derived # class <default_with_post_handler>, defined below. # class default_handler: # count total hits hit_counter = http_server.counter() # count file deliveries file_counter = http_server.counter() # count cache hits cache_count = http_server.counter() valid_commands = ['get', 'head'] IDENT = 'Default HTTP Request Handler' # Pathnames that are tried when a URI resolves to a directory name directory_defaults = [ 'index.html', 'default.html' ] default_file_producer = producers.file_producer def __init__ (self, filesystem): self.filesystem = filesystem def __repr__ (self): return '<%s (%d hits) at %x>' % ( self.IDENT, self.hits, id (self) ) # always match, since this is a default def match (self, uri): return 1 # -------------------------------------------------- # This class does not handle POST data. # If any is sent to us, we complain. # -------------------------------------------------- def collect_incoming_data (self, data): print 'unexpected incoming data: %s' %(repr(data)) def found_terminator (self): pass # handle a file request, with caching. def handle_request (self, request): if request.command not in self.valid_commands: request.error (400) # bad request return self.hit_counter.increment() path = uri[0] # strip off all leading slashes while path and path[0] == '/': path = path[1:] # unquote if necessary if '%' in path: path = unquote (path) if self.filesystem.isdir (path): # we could also generate a directory listing here, # may want to move this into another method for that # purpose found = 0 for default in self.directory_defaults: if self.filesystem.isfile (path+'/'+default): path = path + '/' + default found = 1 if not found: request.error (404) # Not Found return elif not self.filesystem.isfile (path): request.error (404) # Not Found else: file_length = self.filesystem.stat (path)[stat.ST_SIZE] ims = get_header (IF_MODIFIED_SINCE, request.header) length_match = 1 if ims: length = IF_MODIFIED_SINCE.group(4) if length: try: length = string.atoi (length) if length != file_length: length_match = 0 except: pass ims_date = 0 if ims: ims_date = http_date.parse_http_date (ims) try: mtime = self.filesystem.stat (path)[stat.ST_MTIME] except: request.error (404) return if length_match and ims_date: if mtime <= ims_date: request.reply_code = 304 request.done() self.cache_counter.increment() return try: file = self.filesystem.open (path, 'rb') except IOError: request.error (404) return request['Last-Modified'] = http_date.build_http_date (mtime) request['Content-Length'] = file_length self.set_content_type() if request.command == 'get': request.push (self.default_file_producer (file)) self.file_counter.increment() request.done() def set_content_type (self, request): ext = get_extension (request.uri[0]) if mime_type_table.content_type_map.has_key (ext): request['Content-Type'] = mime_type_table.content_type_map[ext] else: request['Content-Type'] = 'text/plain' # still needs rewriting #class default_with_post_handler (default_handler): # # valid_commands = default_handler.valid_commands + ['post'] # # content_type = None # # def handle_request (self, channel): # # # set the terminator correctly, depending on the # # command and content-type. # # if channel.command == 'post': # # join multi-line headers together. we've delayed this # # operation until we really needed it, since it is relatively # # expensive, and unnecessary with most requests. # # [I'm thinking about just always doing this, and moving # # <join_headers> into http_server.py] # channel.header = join_headers (channel.header) # # # check content-type # self.content_type = content_type = string.split ( # string.lower ( # get_header (CONTENT_TYPE, channel.header) # ), # '/' # ) # # if content_type == ['application', 'x-www-form-urlencoded']: # channel.set_terminator ('\r\n') # elif content_type == ['multipart', 'form-data']: # self.boundary = CONTENT_TYPE.group(4) # channel.set_terminator (self.boundary) # else: # # unknown/unexpected content type # channel.send_reply ( # 400, # channel.request, # message="Unknown Content-Type: %s" % (string.join (content_type,'/')) # ) # return # # default_handler.handle_request (self, channel) # # # by default, post data is collected into a single buffer. this # # approach is probably ok for application/x-www-form-urlencoded, # # but not such a good idea for multipart/form-data, since this is # # usually a file upload. # # post_data = '' # # def collect_post_data (self, data): # self.post_data = self.post_data + data # pass # # post_variables = [] # # def found_post_terminator (self): # if self.content_type == ['application', 'x-www-form-urlencoded']: # self.post_variables = string.split (self.post_data, '&') # elif self.content_type == ['multipart', 'form-data']: # self.post_variables = self.post_variables + [self.post_data] # else: # pass # self.post_data = '' ACCEPT = regex.compile ('Accept: \(.*\)', regex.casefold) # HTTP/1.0 doesn't say anything about the "; length=nnnn" addition # to this header. I suppose it's purpose is to avoid the overhead # of parsing dates... IF_MODIFIED_SINCE = regex.compile ( 'If-Modified-Since: \([^;]+\)\(\(; length=\([0-9]+\)$\)\|$\)', regex.casefold ) USER_AGENT = regex.compile ('User-Agent: \(.*\)', regex.casefold) boundary_chars = "A-Za-z0-9'()+_,./:=?-" CONTENT_TYPE = regex.compile ( 'Content-Type: \([^;]+\)\(\(; boundary=\([%s]+\)$\)\|$\)' % boundary_chars, regex.casefold ) get_header = http_server.get_header def get_extension (path): dirsep = string.rfind (path, '/') dotsep = string.rfind (path, '.') if dotsep > dirsep: return path[dotsep+1:] else: return '' # from <lib/urllib.py> _quoteprog = regex.compile('%[0-9a-fA-F][0-9a-fA-F]') def unquote(s): i = 0 n = len(s) res = [] while 0 <= i < n: j = _quoteprog.search(s, i) if j < 0: res.append(s[i:]) break res.append(s[i:j] + chr(string.atoi(s[j+1:j+3], 16))) i = j+3 return string.join (res, '')