# POPFILE LOADABLE MODULE package POPFile::History; use POPFile::Module; @ISA = ("POPFile::Module"); #---------------------------------------------------------------------------- # # This module handles POPFile's history. It manages entries in the POPFile # database and on disk that store messages previously classified by POPFile. # # Copyright (c) 2004-2006 John Graham-Cumming # # This file is part of POPFile # # POPFile is free software; you can redistribute it and/or modify it # under the terms of version 2 of the GNU General Public License as # published by the Free Software Foundation. # # POPFile is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with POPFile; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # #---------------------------------------------------------------------------- use strict; use warnings; use locale; use Date::Parse; use Digest::MD5 qw( md5_hex ); my $fields_slot = 'history.id, hdr_from, hdr_to, hdr_cc, hdr_subject, hdr_date, hash, inserted, buckets.name, usedtobe, history.bucketid, magnets.val, size'; #---------------------------------------------------------------------------- # new # # Class new() function #---------------------------------------------------------------------------- sub new { my $proto = shift; my $class = ref($proto) || $proto; my $self = POPFile::Module->new(); # List of committed history items waiting to be committed # into the database, it consists of lists containing three # elements: the slot id, the bucket classified to and the # magnet if used $self->{commit_list__} = (); # Contains queries started with start_query and consists # of a mapping between unique IDs and quadruples containing # a reference to the SELECT and a cache of already fetched # rows and a total row count. These quadruples are implemented # as a sub-hash with keys query, count, cache, fields $self->{queries__} = (); $self->{firsttime__} = 1; # Will contain the database handle retrieved from # Classifier::Bayes $self->{db__} = undef; $self->{classifier__} = 0; bless($self, $class); $self->name( 'history' ); return $self; } #---------------------------------------------------------------------------- # # initialize # # Called to initialize the history module # #---------------------------------------------------------------------------- sub initialize { my ( $self ) = @_; # Keep the history for two days $self->config_( 'history_days', 2 ); # If 1, Messages are saved to an archive when they are removed or expired # from the history cache $self->config_( 'archive', 0 ); # The directory where messages will be archived to, in sub-directories for # each bucket $self->config_( 'archive_dir', 'archive' ); # This is an advanced setting which will save archived files to a # randomly numbered sub-directory, if set to greater than zero, otherwise # messages will be saved in the bucket directory # # 0 <= directory name < archive_classes $self->config_( 'archive_classes', 0 ); # Need TICKD message for history clean up, COMIT when a message # is committed to the history $self->mq_register_( 'TICKD', $self ); $self->mq_register_( 'COMIT', $self ); return 1; } #---------------------------------------------------------------------------- # # stop # # Called to stop the history module # #---------------------------------------------------------------------------- sub stop { my ( $self ) = @_; # Commit any remaining history items. This is needed because it's # possible that we get called with a stop after things have been # added to the queue and before service() is called $self->commit_history__(); } #---------------------------------------------------------------------------- # # db__ # # Since we don't know the order in which the start() methods of PLMs # is called we cannot be sure that Classifier::Bayes will have started # and connected to the database before us, hence we can't set our # database handle at start time. So instead we access the db handle # through this method # #---------------------------------------------------------------------------- sub db__ { my ( $self ) = @_; if ( !defined( $self->{db__} ) ) { $self->{db__} = $self->{classifier__}->db()->clone; } return $self->{db__}; } #---------------------------------------------------------------------------- # # service # # Called periodically so that the module can do its work # #---------------------------------------------------------------------------- sub service { my ( $self ) = @_; if ( $self->{firsttime__} ) { $self->upgrade_history_files__(); $self->{firsttime__} = 0; } # Note when we go to multiuser POPFile we'll need to change this call # so that we are sure that the session IDs that it is using are still # valid. The easiest way will be to call it in deliver() when we get # a COMIT message. $self->commit_history__(); return 1; } #---------------------------------------------------------------------------- # # deliver # # Called by the message queue to deliver a message # # There is no return value from this method # #---------------------------------------------------------------------------- sub deliver { my ( $self, $type, @message ) = @_; # If a day has passed then clean up the history if ( $type eq 'TICKD' ) { $self->cleanup_history(); } if ( $type eq 'COMIT' ) { push ( @{$self->{commit_list__}}, \@message ); } } # --------------------------------------------------------------------------- # # forked # # This is called inside a child process that has just forked, since the # child needs access to the database we open it # # --------------------------------------------------------------------------- sub forked { my ( $self ) = @_; $self->{db__} = undef; } #---------------------------------------------------------------------------- # # ADDING TO THE HISTORY # # To add a message to the history the following sequence of calls # is made: # # 1. Obtain a unique ID and filename for the new message by a call # to reserve_slot # # 2. Write the message into the filename returned # # 3. Call commit_slot with the bucket into which the message was # classified # # If an error occurs after #1 and the slot is unneeded then call # release_slot # #---------------------------------------------------------------------------- # # FINDING A HISTORY ENTRY # # 1. If you know the slot id then call get_slot_file to obtain # the full path where the file is stored # # 2. If you know the message hash then call get_slot_from hash # to get the slot id # # 3. If you know the message headers then use get_message_hash # to get the hash # #---------------------------------------------------------------------------- #---------------------------------------------------------------------------- # # reserve_slot # # Called to reserve a place in the history for a message that is in the # process of being received. It returns a unique ID for this slot and # the full path to the file where the message should be stored. The # caller is expected to later call either release_slot (if the slot is not # going to be used) or commit_slot (if the file has been written and the # entry should be added to the history). # #---------------------------------------------------------------------------- sub reserve_slot { my ( $self ) = @_; my $r; while (1) { $r = int(rand( 1000000000 )+2); $self->log_( 2, "reserve_slot selected random number $r" ); # TODO Replace the hardcoded user ID 1 with the looked up # user ID from the session key my $test = $self->db__()->selectrow_arrayref( "select id from history where committed = $r limit 1;"); if ( defined( $test ) ) { next; } # Get the date/time now which will be stored in the database # so that we can sort on the Date: header in the message and # when we received it my $now = time; $self->db__()->do( "insert into history ( userid, committed, inserted ) values ( 1, $r, $now );" ); last; } my $result = $self->db__()->selectrow_arrayref( "select id from history where committed = $r limit 1;"); my $slot = $result->[0]; $self->log_( 2, "reserve_slot returning slot id $slot" ); return ( $slot, $self->get_slot_file( $slot ) ); } #---------------------------------------------------------------------------- # # release_slot # # See description with reserve_slot; release_slot releases a history slot # previously allocated with reserve_slot and discards it. # # id Unique ID returned by reserve_slot # #---------------------------------------------------------------------------- sub release_slot { my ( $self, $slot ) = @_; # Remove the entry from the database and delete the file # if present my $delete = "delete from history where history.id = $slot;"; $self->db__()->do( $delete ); my $file = $self->get_slot_file( $slot ); unlink $file; # It's not possible that the directory for the slot file is empty # and we want to delete it so that things get cleaned up automatically $file =~ s/popfile[a-f0-9]{2}\.msg$//i; my $depth = 3; while ( $depth > 0 ) { my @files = glob( $file . '*' ); if ( $#files == -1 ) { if ( !( rmdir( $file ) ) ) { last; } $file =~ s![a-f0-9]{2}/$!!i; } else { last; } $depth--; } } #---------------------------------------------------------------------------- # # commit_slot # # See description with reserve_slot; commit_slot commits a history # slot to the database and makes it part of the history. Before this # is called the full message should have been written to the file # returned by reserve_slot. Note that commit_slot queues the message # for insertion and does not commit it until some (short) time later # # session User session with Classifier::Bayes API # slot Unique ID returned by reserve_slot # bucket Bucket classified to # magnet Magnet if used # #---------------------------------------------------------------------------- sub commit_slot { my ( $self, $session, $slot, $bucket, $magnet ) = @_; $self->mq_post_( 'COMIT', $session, $slot, $bucket, $magnet ); } #---------------------------------------------------------------------------- # # change_slot_classification # # Used to 'reclassify' a message by changing its classification in the # database. # # slot The slot to update # class The new classification # session A valid API session # undo If set to 1 then indicates an undo operation # #---------------------------------------------------------------------------- sub change_slot_classification { my ( $self, $slot, $class, $session, $undo ) = @_; $self->log_( 0, "Change slot classification of $slot to $class" ); # Get the bucket ID associated with the new classification # then retrieve the current classification for this slot # and update the database my $bucketid = $self->{classifier__}->get_bucket_id( $session, $class ); my $oldbucketid = 0; if ( !$undo ) { my @fields = $self->get_slot_fields( $slot ); $oldbucketid = $fields[10]; } $self->db__()->do( "update history set bucketid = $bucketid, usedtobe = $oldbucketid where id = $slot;" ); $self->force_requery__(); } #---------------------------------------------------------------------------- # # revert_slot_classification # # Used to undo a 'reclassify' a message by changing its classification # in the database. # # slot The slot to update # #---------------------------------------------------------------------------- sub revert_slot_classification { my ( $self, $slot ) = @_; my @fields = $self->get_slot_fields( $slot ); my $oldbucketid = $fields[9]; $self->db__()->do( "update history set bucketid = $oldbucketid, usedtobe = 0 where id = $slot;" ); $self->force_requery__(); } #--------------------------------------------------------------------------- # # get_slot_fields # # Returns the fields associated with a specific slot. We return the # same collection of fields as get_query_rows. # # slot The slot id # #--------------------------------------------------------------------------- sub get_slot_fields { my ( $self, $slot ) = @_; return $self->db__()->selectrow_array( "select $fields_slot from history, buckets, magnets where history.id = $slot and buckets.id = history.bucketid and magnets.id = magnetid;" ); } #--------------------------------------------------------------------------- # # is_valid_slot # # Returns 1 if the slot ID passed in is valid # # slot The slot id # #--------------------------------------------------------------------------- sub is_valid_slot { my ( $self, $slot ) = @_; my @row = $self->db__()->selectrow_array( "select id from history where history.id = $slot;" ); return ( ( @row ) && ( $row[0] == $slot ) ); } #--------------------------------------------------------------------------- # # commit_history__ # # (private) Used internally to commit messages that have been committed # with a call to commit_slot to the database # #---------------------------------------------------------------------------- sub commit_history__ { my ( $self ) = @_; if ( $#{$self->{commit_list__}} == -1 ) { return; } foreach my $entry (@{$self->{commit_list__}}) { my ( $session, $slot, $bucket, $magnet ) = @{$entry}; my $file = $self->get_slot_file( $slot ); # Committing to the history requires the following steps # # 1. Parse the message to extract the headers # 2. Compute MD5 hash of Message-ID, Date and Subject # 3. Update the related row with the headers and # committed set to 1 my %header; if ( open FILE, "<$file" ) { my $last; while ( ) { s/[\r\n]//g; if ( /^$/ ) { last; } if ( /^([^ \t]+):[ \t]*(.*)$/ ) { $last = lc $1; push @{$header{$last}}, $2; } else { if ( defined $last ) { ${$header{$last}}[$#{$header{$last}}] .= $_; } } } close FILE; } else { $self->log_( 0, "Could not open history message file $file for reading." ); } my $hash = $self->get_message_hash( ${$header{'message-id'}}[0], ${$header{'date'}}[0], ${$header{'subject'}}[0], ${$header{'received'}}[0] ); $hash = $self->db__()->quote( $hash ); # For sorting purposes the From, To and CC headers have special # cleaned up versions of themselves in the database. The idea # is that case and certain characters should be ignored when # sorting these fields # # "John Graham-Cumming" maps to # john graham-cumming spam@jgc.org my @sortable = ( 'from', 'to', 'cc' ); my %sort_headers; foreach my $h (@sortable) { $sort_headers{$h} = $self->{classifier__}->{parser__}->decode_string( ${$header{$h}}[0] ); $sort_headers{$h} = lc($sort_headers{$h} || ''); $sort_headers{$h} =~ s/[\"<>]//g; $sort_headers{$h} =~ s/^[ \t]+//g; $sort_headers{$h} =~ s/\0//g; $sort_headers{$h} = $self->db__()->quote( $sort_headers{$h} ); } # Make sure that the headers we are going to insert into # the database have been defined and are suitably quoted my @required = ( 'from', 'to', 'cc', 'subject' ); foreach my $h (@required) { ${$header{$h}}[0] = $self->{classifier__}->{parser__}->decode_string( ${$header{$h}}[0] ); if ( !defined ${$header{$h}}[0] || ${$header{$h}}[0] =~ /^\s*$/ ) { if ( $h ne 'cc' ) { ${$header{$h}}[0] = "<$h header missing>"; } else { ${$header{$h}}[0] = ''; } } ${$header{$h}}[0] =~ s/\0//g; ${$header{$h}}[0] = $self->db__()->quote( ${$header{$h}}[0] ); } # If we do not have a date header then set the date to # 0 (start of the Unix epoch), otherwise parse the string # using Date::Parse to interpret it and turn it into the # Unix epoch. if ( !defined( ${$header{date}}[0] ) ) { ${$header{date}}[0] = 0; } else { ${$header{date}}[0] = str2time( ${$header{date}}[0] ) || 0; } # Figure out the ID of the bucket this message has been # classified into (and the same for the magnet if it is # defined) my $bucketid = $self->{classifier__}->get_bucket_id( $session, $bucket ); my $msg_size = -s $file; # If we can't get the bucket ID because the bucket doesn't exist # which could happen when we are upgrading the history which # has old bucket names in it then we will remove the entry from the # history and log the failure if ( defined( $bucketid ) ) { my $result = $self->db__()->do( "update history set hdr_from = ${$header{from}}[0], hdr_to = ${$header{to}}[0], hdr_date = ${$header{date}}[0], hdr_cc = ${$header{cc}}[0], hdr_subject = ${$header{subject}}[0], sort_from = $sort_headers{from}, sort_to = $sort_headers{to}, sort_cc = $sort_headers{cc}, committed = 1, bucketid = $bucketid, usedtobe = 0, magnetid = $magnet, hash = $hash, size = $msg_size where id = $slot;" ); } else { $self->log_( 0, "Couldn't find bucket ID for bucket $bucket when committing $slot" ); $self->release_slot( $slot ); } } $self->{commit_list__} = (); $self->force_requery__(); } # --------------------------------------------------------------------------- # # delete_slot # # Deletes an entry from the database and disk, optionally archiving it # if the archive parameters have been set # # $slot The slot ID # $archive 1 if it's OK to archive this entry # # --------------------------------------------------------------------------- sub delete_slot { my ( $self, $slot, $archive ) = @_; my $file = $self->get_slot_file( $slot ); $self->log_( 2, "delete_slot called for slot $slot, file $file" ); if ( $archive && $self->config_( 'archive' ) ) { my $path = $self->get_user_path_( $self->config_( 'archive_dir' ), 0 ); $self->make_directory__( $path ); my @b = $self->db__()->selectrow_array( "select buckets.name from history, buckets where history.bucketid = buckets.id and history.id = $slot;" ); my $bucket = $b[0]; if ( ( $bucket ne 'unclassified' ) && ( $bucket ne 'unknown class' ) ) { $path .= "\/" . $bucket; $self->make_directory__( $path ); if ( $self->config_( 'archive_classes' ) > 0) { # Archive to a random sub-directory of the bucket archive my $subdirectory = int( rand( $self->config_( 'archive_classes' ) ) ); $path .= "\/" . $subdirectory; $self->make_directory__( $path ); } # Previous comment about this potentially being unsafe # (may have placed messages in unusual places, or # overwritten files) no longer applies. Files are now # placed in the user directory, in the archive_dir # subdirectory $self->copy_file__( $file, $path, "popfile$slot.msg" ); } } # Now remove the entry from the database, and the file from disk, # and also invalidate the caches of any open queries since they # may have been affected $self->release_slot( $slot ); $self->force_requery__(); } #---------------------------------------------------------------------------- # # start_deleting # # Called before doing a block of calls to delete_slot. This will call # back into the Classifier::Bayes to tweak the database performance to # make this quick. # #---------------------------------------------------------------------------- sub start_deleting { my ( $self ) = @_; $self->{classifier__}->tweak_sqlite( 1, 1, $self->db__() ); } #---------------------------------------------------------------------------- # # stop_deleting # # Called after doing a block of calls to delete_slot. This will call # back into the Classifier::Bayes to untweak the database performance. # #---------------------------------------------------------------------------- sub stop_deleting { my ( $self ) = @_; $self->{classifier__}->tweak_sqlite( 1, 0, $self->db__() ); } #---------------------------------------------------------------------------- # # get_slot_file # # Used to map a slot ID to the full path of the file will contain # the message associated with the slot # #---------------------------------------------------------------------------- sub get_slot_file { my ( $self, $slot ) = @_; # The mapping between the slot and the file goes as follows: # # 1. Convert the file to an 8 digit hex number (with leading # zeroes). # 2. Call that number aabbccdd # 3. Build the path aa/bb/cc # 4. Name the file popfiledd.msg # 5. Add the msgdir location to obtain # msgdir/aa/bb/cc/popfiledd.msg # # Hence each directory can have up to 256 entries my $hex_slot = sprintf( '%8.8x', $slot ); my $path = $self->get_user_path_( $self->global_config_( 'msgdir' ) . substr( $hex_slot, 0, 2 ) . '/', 0 ); $self->make_directory__( $path ); $path .= substr( $hex_slot, 2, 2 ) . '/'; $self->make_directory__( $path ); $path .= substr( $hex_slot, 4, 2 ) . '/'; $self->make_directory__( $path ); my $file = 'popfile' . substr( $hex_slot, 6, 2 ) . '.msg'; return $path . $file; } #---------------------------------------------------------------------------- # # get_message_hash # # Used to compute an MD5 hash of the headers of a message # so that the same message can later me identified by a # call to get_slot_from_hash # # messageid The message id header # date The date header # subject The subject header # received First Received header line # # Note that the values passed in are everything after the : in # header without the trailing \r or \n. If a header is missing # then pass in the empty string # #---------------------------------------------------------------------------- sub get_message_hash { my ( $self, $messageid, $date, $subject, $received ) = @_; $messageid = '' if ( !defined( $messageid ) ); $date = '' if ( !defined( $date ) ); $subject = '' if ( !defined( $subject ) ); $received = '' if ( !defined( $received ) ); return md5_hex( "[$messageid][$date][$subject][$received]" ); } #---------------------------------------------------------------------------- # # get_slot_from_hash # # Given a hash value (returned by get_message_hash), find any # corresponding message in the database and return its slot # id. If the message does not exist then return the empty # string. # # hash The hash value # #---------------------------------------------------------------------------- sub get_slot_from_hash { my ( $self, $hash ) = @_; $hash = $self->db__()->quote( $hash ); my $result = $self->db__()->selectrow_arrayref( "select id from history where hash = $hash limit 1;" ); return defined( $result )?$result->[0]:''; } #---------------------------------------------------------------------------- # # QUERYING THE HISTORY # # 1. Start a query session by calling start_query and obtain a unique # ID # # 2. Set the query parameter (i.e. sort, search and filter) with a call # to set_query # # 3. Obtain the number of history rows returned by calling get_query_size # # 4. Get segments of the history returned by calling get_query_rows with # the start and end rows needed # # 5. When finished with the query call stop_query # #---------------------------------------------------------------------------- #---------------------------------------------------------------------------- # # start_query # # Used to start a query session, returns a unique ID for this # query. When the caller is done with the query they return # stop_query. # #---------------------------------------------------------------------------- sub start_query { my ( $self ) = @_; # Think of a large random number, make sure that it hasn't # been used and then return it while (1) { my $id = sprintf( '%8.8x', int(rand(4294967295)) ); if ( !defined( $self->{queries__}{$id} ) ) { $self->{queries__}{$id}{query} = 0; $self->{queries__}{$id}{count} = 0; $self->{queries__}{$id}{cache} = (); return $id } } } #---------------------------------------------------------------------------- # # stop_query # # Used to clean up after a query session # # id The ID returned by start_query # #---------------------------------------------------------------------------- sub stop_query { my ( $self, $id ) = @_; # If the cache size hasn't grown to the row # count then we didn't fetch everything and so # we fill call finish to clean up my $q = $self->{queries__}{$id}{query}; if ( ( defined $q ) && ( $q != 0 ) ) { if ( $#{$self->{queries__}{$id}{cache}} != $self->{queries__}{$id}{count} ) { $q->finish; } } delete $self->{queries__}{$id}; } #---------------------------------------------------------------------------- # # set_query # # Called to set up a query with sort, filter and search options # # id The ID returned by start_query # filter Name of bucket to filter on # search From/Subject line to search for # sort The field to sort on (from, subject, to, cc, bucket, date) # (optional leading - for descending sort) # not If set to 1 negates the search # #---------------------------------------------------------------------------- sub set_query { my ( $self, $id, $filter, $search, $sort, $not ) = @_; # If this query has already been done and is in the cache # then do no work here if ( defined( $self->{queries__}{$id}{fields} ) && ( $self->{queries__}{$id}{fields} eq "$filter:$search:$sort:$not" ) ) { return; } $self->{queries__}{$id}{fields} = "$filter:$search:$sort:$not"; # We do two queries, the first to get the total number of rows that # would be returned and then we start the real query. This is done # so that we know the size of the resulting data without having # to retrieve it all $self->{queries__}{$id}{base} = 'select XXX from history, buckets, magnets where history.userid = 1 and committed = 1'; $self->{queries__}{$id}{base} .= ' and history.bucketid = buckets.id'; $self->{queries__}{$id}{base} .= ' and magnets.id = magnetid'; # If there's a search portion then add the appropriate clause # to find the from/subject header my $not_word = $not?'not':''; my $not_equal = $not?'!=':'='; my $equal = $not?'=':'!='; if ( $search ne '' ) { $search = $self->db__()->quote( '%' . $search . '%' ); $self->{queries__}{$id}{base} .= " and $not_word ( hdr_from like $search or hdr_subject like $search )"; } # If there's a filter option then we'll need to get the bucket # id for the filtered bucket and add the appropriate clause if ( $filter ne '' ) { if ( $filter eq '__filter__magnet' ) { $self->{queries__}{$id}{base} .= " and history.magnetid $equal 0"; } else { my $session = $self->{classifier__}->get_session_key( 'admin', '' ); my $bucketid = $self->{classifier__}->get_bucket_id( $session, $filter ); $self->{classifier__}->release_session_key( $session ); $self->{queries__}{$id}{base} .= " and history.bucketid $not_equal $bucketid"; } } # Add the sort option (if there is one) if ( $sort ne '' ) { $sort =~ s/^(\-)//; my $direction = defined($1)?'desc':'asc'; if ( $sort eq 'bucket' ) { $sort = 'buckets.name'; } else { if ( $sort =~ /from|to|cc/ ) { $sort = "sort_$sort"; } else { if ( $sort ne 'inserted' && $sort ne 'size' ) { $sort = "hdr_$sort"; } } } $self->{queries__}{$id}{base} .= " order by $sort $direction;"; } else { $self->{queries__}{$id}{base} .= ' order by inserted desc;'; } my $count = $self->{queries__}{$id}{base}; $self->log_( 2, "Base query is $count" ); $count =~ s/XXX/COUNT(*)/; $self->{queries__}{$id}{count} = $self->db__()->selectrow_arrayref( $count )->[0]; my $select = $self->{queries__}{$id}{base}; $select =~ s/XXX/$fields_slot/; $self->{queries__}{$id}{query} = $self->db__()->prepare( $select ); $self->{queries__}{$id}{query}->execute; $self->{queries__}{$id}{cache} = (); } #---------------------------------------------------------------------------- # # delete_query # # Called to delete all the rows returned in a query # # id The ID returned by start_query # #---------------------------------------------------------------------------- sub delete_query { my ( $self, $id ) = @_; $self->start_deleting(); my $delete = $self->{queries__}{$id}{base}; $delete =~ s/XXX/history.id/; my $d = $self->db__()->prepare( $delete ); $d->execute; my @row; my @ids; while ( @row = $d->fetchrow_array ) { push ( @ids, $row[0] ); } foreach my $id (@ids) { $self->delete_slot( $id, 1 ); } $self->stop_deleting(); } #---------------------------------------------------------------------------- # # get_query_size # # Called to return the number of elements in the query. # Should only be called after a call to set_query. # # id The ID returned by start_query # #---------------------------------------------------------------------------- sub get_query_size { my ( $self, $id ) = @_; return $self->{queries__}{$id}{count}; } #---------------------------------------------------------------------------- # # get_query_rows # # Returns the rows in the range [$start, $end) from a query that has # already been set up with a call to set_query. The first row is row 1. # # id The ID returned by start_query # start The first row to return # count Number of rows to return # # Each row contains the fields: # # id (0), from (1), to (2), cc (3), subject (4), date (5), hash (6), # inserted date (7), bucket name (8), reclassified id (9), bucket id (10), # magnet value (11), size (12) #---------------------------------------------------------------------------- sub get_query_rows { my ( $self, $id, $start, $count ) = @_; # First see if we have already retrieved these rows from the query # if we have then we can just return them from the cache. Otherwise # fetch the rows from the database and then return them my $size = $#{$self->{queries__}{$id}{cache}}+1; $self->log_( 2, "Request for rows $start ($count), current size $size" ); if ( ( $size < ( $start + $count - 1 ) ) ) { my $rows = $start + $count - $size; $self->log_( 2, "Getting $rows rows from database" ); push ( @{$self->{queries__}{$id}{cache}}, @{$self->{queries__}{$id}{query}->fetchall_arrayref( undef, $start + $count - $size )} ); } my ( $from, $to ) = ( $start-1, $start+$count-2 ); $self->log_( 2, "Returning $from..$to" ); return @{$self->{queries__}{$id}{cache}}[$from..$to]; } # --------------------------------------------------------------------------- # # make_directory__ # # Wrapper for mkdir that ensures that the path we are making doesn't end in # / or \ (Done because your can't do mkdir 'foo/' on NextStep. # # $path The directory to make # # Returns whatever mkdir returns # # --------------------------------------------------------------------------- sub make_directory__ { my ( $self, $path ) = @_; $path =~ s/[\\\/]$//; return 1 if ( -d $path ); return mkdir( $path ); } # --------------------------------------------------------------------------- # # compare_mf__ # # Compares two mailfiles, used for sorting mail into order # # --------------------------------------------------------------------------- sub compare_mf__ { $a =~ /popfile(\d+)=(\d+)\.msg/; my ( $ad, $am ) = ( $1, $2 ); $b =~ /popfile(\d+)=(\d+)\.msg/; my ( $bd, $bm ) = ( $1, $2 ); if ( $ad == $bd ) { return ( $bm <=> $am ); } else { return ( $bd <=> $ad ); } } # --------------------------------------------------------------------------- # # upgrade_history_files__ # # Looks for old .MSG/.CLS history entries and sticks them in the database # # --------------------------------------------------------------------------- sub upgrade_history_files__ { my ( $self ) = @_; # See if there are any .MSG files in the msgdir, and if there are # upgrade them by placing them in the database my @msgs = sort compare_mf__ glob $self->get_user_path_( $self->global_config_( 'msgdir' ) . 'popfile*.msg', 0 ); if ( $#msgs != -1 ) { my $session = $self->{classifier__}->get_session_key( 'admin', '' ); print "\nFound old history files, moving them into database\n "; my $i = 0; $self->db__()->begin_work; foreach my $msg (@msgs) { if ( ( ++$i % 100 ) == 0 ) { print "[$i]"; flush STDOUT; } # NOTE. We drop the information in $usedtobe, so that # reclassified messages will no longer appear reclassified # in upgraded history. Also the $magnet is ignored so # upgraded history will have no magnet information. my ( $reclassified, $bucket, $usedtobe, $magnet ) = $self->history_read_class__( $msg ); if ( $bucket ne 'unknown_class' ) { my ( $slot, $file ) = $self->reserve_slot(); rename $msg, $file; my @message = ( $session, $slot, $bucket, 0 ); push ( @{$self->{commit_list__}}, \@message ); } } $self->db__()->commit; print "\nDone upgrading history\n"; $self->commit_history__(); $self->{classifier__}->release_session_key( $session ); unlink $self->get_user_path_( $self->global_config_( 'msgdir' ) . 'history_cache', 0 ); } } # --------------------------------------------------------------------------- # # history_read_class__ - load and delete the class file for a message. # # returns: ( reclassified, bucket, usedtobe, magnet ) # values: # reclassified: boolean, true if message has been reclassified # bucket: string, the bucket the message is in presently, # unknown class if an error occurs # usedtobe: string, the bucket the message used to be in # (null if not reclassified) # magnet: string, the magnet # # $filename The name of the message to load the class for # # --------------------------------------------------------------------------- sub history_read_class__ { my ( $self, $filename ) = @_; $filename =~ s/msg$/cls/; my $reclassified = 0; my $bucket = 'unknown class'; my $usedtobe; my $magnet = ''; if ( open CLASS, "<$filename" ) { $bucket = ; if ( defined( $bucket ) && ( $bucket =~ /([^ ]+) MAGNET ([^\r\n]+)/ ) ) { $bucket = $1; $magnet = $2; } $reclassified = 0; if ( defined( $bucket ) && ( $bucket =~ /RECLASSIFIED/ ) ) { $bucket = ; $usedtobe = ; $reclassified = 1; $usedtobe =~ s/[\r\n]//g; } close CLASS; $bucket =~ s/[\r\n]//g if defined( $bucket ); unlink $filename; } else { return ( undef, $bucket, undef, undef ); } $bucket = 'unknown class' if ( !defined( $bucket ) ); return ( $reclassified, $bucket, $usedtobe, $magnet ); } #---------------------------------------------------------------------------- # # cleanup_history # # Removes the popfile*.msg files that are older than a number of days # configured as history_days. # #---------------------------------------------------------------------------- sub cleanup_history { my ( $self ) = @_; my $seconds_per_day = 24 * 60 * 60; my $old = time - $self->config_( 'history_days' ) * $seconds_per_day; my $d = $self->db__()->prepare( "select id from history where inserted < $old;" ); $d->execute; my @row; my @ids; while ( @row = $d->fetchrow_array ) { push ( @ids, $row[0] ); } foreach my $id (@ids) { $self->delete_slot( $id, 1 ); } } # --------------------------------------------------------------------------- # # copy_file__ # # Utility to copy a file and ensure that the path it is going to # exists # # $from Where to copy from # $to_dir The directory it will be copied to # $to_name The name of the destination (without the directory) # # --------------------------------------------------------------------------- sub copy_file__ { my ( $self, $from, $to_dir, $to_name ) = @_; if ( open( FROM, "<$from") ) { if ( open( TO, ">$to_dir\/$to_name") ) { binmode FROM; binmode TO; while () { print TO $_; } close TO; } close FROM; } } # --------------------------------------------------------------------------- # # force_requery__ # # Called when the database has changed to invalidate any queries that are # open so that cached data is not returned and the database is requeried # # --------------------------------------------------------------------------- sub force_requery__ { my ( $self ) = @_; # Force requery since the messages have changed foreach my $id (keys %{$self->{queries__}}) { $self->{queries__}{$id}{fields} = ''; } } # SETTER sub classifier { my ( $self, $classifier ) = @_; $self->{classifier__} = $classifier; } 1;