/* $Id: child-fetch.c,v 1.64 2007/09/25 21:46:19 nicm Exp $ */

/*
 * Copyright (c) 2006 Nicholas Marriott <nicm@users.sourceforge.net>
 *
 * Permission to use, copy, modify, and distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
 * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
 * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 */

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/wait.h>

#include <fcntl.h>
#include <fnmatch.h>
#include <limits.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#include "fdm.h"
#include "deliver.h"
#include "fetch.h"
#include "match.h"

int	fetch_account(struct account *, struct io *, int, double);
int	fetch_match(struct account *, struct msg *, struct msgbuf *);
int	fetch_deliver(struct account *, struct msg *, struct msgbuf *);
int	fetch_poll(struct account *, struct iolist *, struct io *, int);
int	fetch_purge(struct account *);;
void	fetch_free(void);
void	fetch_free1(struct mail_ctx *);

int	fetch_enqueue(struct account *, struct io *, struct mail *);
int	fetch_dequeue(struct account *, struct mail_ctx *);

#ifdef DEBUG
double			 fetch_time_polling = 0.0;
double			 fetch_time_blocked = 0.0;
#endif

struct mail_queue 	 fetch_matchq;
struct mail_queue 	 fetch_deliverq;

u_int		  	 fetch_dropped;
u_int		  	 fetch_kept;

u_int			 fetch_queued;   /* number of mails queued */
u_int			 fetch_blocked;  /* blocked for parent */

int
open_cache(struct account *a, struct cache *cache)
{
	int	n;

	if (cache->db != NULL)
		return (0);

	if ((cache->db = db_open(cache->path)) == NULL) {
		log_warn("%s: %s", a->name, cache->path);
		return (-1);
	}

	n = db_size(cache->db);
	log_debug3("%s: opened cache %s: %d keys", a->name, cache->path, n);

	if (cache->expire == 0)
		return (0);
	if (db_expire(cache->db, cache->expire) != 0) {
		log_warnx("%s: %s: expiry failed", a->name, cache->path);
		return (-1);
	}

	n -= db_size(cache->db);
	if (n < 0)
		n = 0;
	log_debug3("%s: cache %s: expired %d keys", a->name, cache->path, n);

	return (0);
}

int
child_fetch(struct child *child, struct io *pio)
{
	struct child_fetch_data	*data = child->data;
	enum fdmop 		 op = data->op;
	struct account 		*a = data->account;
	struct msg	 	 msg;
	int			 error, flags;
	double			 tim;

#ifdef DEBUG
	xmalloc_clear();
	COUNTFDS(a->name);
#endif

	log_debug2("%s: fetch started, pid %ld", a->name, (long) getpid());

#ifndef NO_SETPROCTITLE
	setproctitle("child: %s", a->name);
#endif

	fill_info(NULL);
	log_debug2("%s: user is: %s, home is: %s", a->name, conf.info.user,
	    conf.info.home);
	tim = get_time();

	/* Process fetch or poll. */
	log_debug2("%s: started processing", a->name);
	flags = 0;
	if (op == FDMOP_POLL)
		flags |= FETCH_POLL;
	error = fetch_account(a, pio, flags, tim);
	log_debug2("%s: finished processing. exiting", a->name);

	memset(&msg, 0, sizeof msg);
	msg.type = MSG_EXIT;
	log_debug3("%s: sending exit message to parent", a->name);
	if (privsep_send(pio, &msg, NULL) != 0)
		fatalx("privsep_send error");
	log_debug3("%s: waiting for exit message from parent", a->name);
	if (privsep_recv(pio, &msg, NULL) != 0)
		fatalx("privsep_recv error");
	if (msg.type != MSG_EXIT)
		fatalx("unexpected message");

#ifdef DEBUG
	COUNTFDS(a->name);
	xmalloc_report(getpid(), a->name);
#endif

	return (error);
}

int
fetch_poll(struct account *a, struct iolist *iol, struct io *pio, int timeout)
{
	struct io	*rio;
	char		*cause;
	double		 tim;

	log_debug3(
	    "%s: polling: %u, timeout=%d", a->name, ARRAY_LENGTH(iol), timeout);
	tim = get_time();
	switch (io_polln(
	    ARRAY_DATA(iol), ARRAY_LENGTH(iol), &rio, timeout, &cause)) {
	case 0:
		if (rio == pio)
			fatalx("parent socket closed");
		log_warnx("%s: connection closed", a->name);
		return (-1);
	case -1:
		if (errno == EAGAIN)
			break;
		if (rio == pio)
			fatalx("parent socket error");
		log_warnx("%s: %s", a->name, cause);
		xfree(cause);
		return (-1);
	}
	tim = get_time() - tim;

#ifdef DEBUG
	fetch_time_polling += tim;
	if (fetch_blocked == fetch_queued && fetch_queued != 0)
		fetch_time_blocked += tim;
#endif

	return (0);
}

int
fetch_match(struct account *a, struct msg *msg, struct msgbuf *msgbuf)
{
	struct mail_ctx	*mctx, *this;

	if (TAILQ_EMPTY(&fetch_matchq))
		return (0);

	mctx = TAILQ_FIRST(&fetch_matchq);
	while (mctx != NULL) {
		this = mctx;
		mctx = TAILQ_NEXT(this, entry);

		log_debug3("%s: "
		    "trying (match) message %u", a->name, this->mail->idx);
		switch (mail_match(this, msg, msgbuf)) {
		case MAIL_ERROR:
			return (-1);
		case MAIL_DELIVER:
			TAILQ_REMOVE(&fetch_matchq, this, entry);
			TAILQ_INSERT_TAIL(&fetch_deliverq, this, entry);
			break;
		case MAIL_DONE:
			if (fetch_dequeue(a, this) != 0)
				return (-1);
			break;
		case MAIL_BLOCKED:
			fetch_blocked++;
			break;
		}
	}

	return (0);
}

int
fetch_deliver(struct account *a, struct msg *msg, struct msgbuf *msgbuf)
{
	struct mail_ctx	*mctx, *this;

	if (TAILQ_EMPTY(&fetch_deliverq))
		return (0);

	mctx = TAILQ_FIRST(&fetch_deliverq);
	while (mctx != NULL) {
		this = mctx;
		mctx = TAILQ_NEXT(this, entry);

		log_debug3("%s:"
		    " trying (deliver) message %u", a->name, this->mail->idx);
		switch (mail_deliver(this, msg, msgbuf)) {
		case MAIL_ERROR:
			return (-1);
		case MAIL_MATCH:
			TAILQ_REMOVE(&fetch_deliverq, this, entry);
			TAILQ_INSERT_TAIL(&fetch_matchq, this, entry);
			break;
		case MAIL_BLOCKED:
			fetch_blocked++;
			break;
		}
	}

	return (0);
}

void
fetch_free1(struct mail_ctx *mctx)
{
	struct deliver_ctx	*dctx;

	while (!TAILQ_EMPTY(&mctx->dqueue)) {
		dctx = TAILQ_FIRST(&mctx->dqueue);
		TAILQ_REMOVE(&mctx->dqueue, dctx, entry);
		xfree(dctx);
	}

	ARRAY_FREE(&mctx->stack);
	mail_destroy(mctx->mail);
	xfree(mctx->mail);
	xfree(mctx);
}

void
fetch_free(void)
{
	struct mail_ctx	*mctx;

	while (!TAILQ_EMPTY(&fetch_matchq)) {
		mctx = TAILQ_FIRST(&fetch_matchq);
		TAILQ_REMOVE(&fetch_matchq, mctx, entry);
		fetch_free1(mctx);
	}

	while (!TAILQ_EMPTY(&fetch_deliverq)) {
		mctx = TAILQ_FIRST(&fetch_deliverq);
		TAILQ_REMOVE(&fetch_deliverq, mctx, entry);
		fetch_free1(mctx);
	}
}

int
fetch_purge(struct account *a)
{
	static u_int	last_total = 0, last_dropped = 0;
	u_int		n;

	if (conf.purge_after == 0)
		return (0);

	n = fetch_dropped + fetch_kept;
	if (n == last_total || n % conf.purge_after != 0)
		return (0);
	last_total = n;

	if (last_dropped == fetch_dropped) {
		log_debug("%s: not purging, no mails dropped", a->name);
		return (0);
	}
	last_dropped = fetch_dropped;

	log_debug("%s: purging after %u mails", a->name, n);
	return (1);
}

int
fetch_account(struct account *a, struct io *pio, int nflags, double tim)
{
	struct msg	 msg, *msgp;
	struct msgbuf	 msgbuf;
	struct fetch_ctx fctx;
	struct cache	*cache;
	struct iolist 	 iol;
	u_int		 n;
	int		 aborted, complete, holding, timeout;

	log_debug2("%s: fetching", a->name);

	TAILQ_INIT(&fetch_matchq);
 	TAILQ_INIT(&fetch_deliverq);
	fetch_queued = fetch_dropped = fetch_kept = 0;

	if (nflags & FETCH_POLL && a->fetch->total == NULL) {
		log_info("%s: polling not supported", a->name);
		return (0);
	}

	fctx.llen = IO_LINESIZE;
	fctx.lbuf = xmalloc(fctx.llen);
	fctx.flags = nflags;

	fctx.mail = xcalloc(1, sizeof *fctx.mail);
	fctx.state = a->fetch->first;

	ARRAY_INIT(&iol);

	aborted = complete = holding = 0;
	for (;;) {
		fetch_blocked = 0;

		/* Check for new privsep messages. */
		msgp = NULL;
		if (privsep_check(pio)) {
			if (privsep_recv(pio, &msg, &msgbuf) != 0)
				fatalx("privsep_recv error");
			log_debug3("%s: got message type %d, id %u", a->name,
			    msg.type, msg.id);
			msgp = &msg;
		}

		/* Match and deliver mail. */
		if (fetch_match(a, msgp, &msgbuf) != 0)
			goto abort;
		if (fetch_deliver(a, msgp, &msgbuf) != 0)
			goto abort;

		/* Check for purge and set flag if necessary. */
		if (fetch_purge(a))
			fctx.flags |= FETCH_PURGE;

		/* Update the holding flag. */
		if (fetch_queued <= (u_int) conf.queue_low)
			holding = 0;
		if (fetch_queued >= (u_int) conf.queue_high)
			holding = 1;

		/* If not holding and not finished, call the fetch handler. */
		if (!holding && !complete) {
			/*
			 * Set the empty flag if queues are empty. Purging
			 * shouldn't happen if this is clear.
			 */
			fctx.flags &= ~FETCH_EMPTY;
			if (fetch_queued == 0)
				fctx.flags |= FETCH_EMPTY;

			/* Call the fetch function. */
			switch (fctx.state(a, &fctx)) {
			case FETCH_ERROR:
				/* Fetch error. */
				goto abort;
			case FETCH_EXIT:
				/* Fetch completed. */
				complete = 1;
				break;
			case FETCH_AGAIN:
				/* Fetch again - no blocking. */
				continue;
			case FETCH_BLOCK:
				/* Fetch again - allow blocking. */
				break;
			case FETCH_MAIL:
				/* Mail ready. */
				if (fetch_enqueue(a, pio, fctx.mail) != 0)
					goto abort;
				fctx.mail = xcalloc(1, sizeof *fctx.mail);
				break;
			}
		}

		/* If fetch finished and no more mails queued, exit. */
		if (complete && fetch_queued == 0)
			goto finished;

		/* Prepare for poll. */
		ARRAY_CLEAR(&iol);
		ARRAY_ADD(&iol, pio);
		if (a->fetch->fill != NULL)
			a->fetch->fill(a, &iol);

		/*
		 * Work out timeout. If the queues are empty, we can block,
		 * unless this fetch type doesn't have any sockets to poll -
		 * then we would block forever. Otherwise, if the queues are
		 * non-empty, we can block unless there are mails that aren't
		 * blocked (these mails can continue to be processed).
		 */
		timeout = conf.timeout;
		if (fetch_queued == 0 && ARRAY_LENGTH(&iol) == 1)
			timeout = 0;
		else if (fetch_queued != 0 && fetch_blocked != fetch_queued)
			timeout = 0;

		/* Poll for fetch data or privsep messages. */
		log_debug3("%s: queued %u; blocked %u; flags 0x%02x", a->name,
		    fetch_queued, fetch_blocked, fctx.flags);
		if (fetch_poll(a, &iol, pio, timeout) != 0)
			goto abort;
	}

abort:
	a->fetch->abort(a);

	if (nflags & FETCH_POLL)
		log_warnx("%s: polling error. aborted", a->name);
	else
		log_warnx("%s: fetching error. aborted", a->name);

	aborted = 1;

finished:
	if (fctx.mail != NULL) {
		mail_destroy(fctx.mail);
		xfree(fctx.mail);
	}

	xfree(fctx.lbuf);
	fetch_free();
	ARRAY_FREE(&iol);

	/* Close caches. */
	TAILQ_FOREACH(cache, &conf.caches, entry) {
		if (cache->db != NULL)
			db_close(cache->db);
	}

	/* Print results. */
	if (nflags & FETCH_POLL)
		log_info("%s: %u messages found", a->name, a->fetch->total(a));
	else {
		tim = get_time() - tim;
		n = fetch_dropped + fetch_kept;
		if (n > 0) {
			log_info("%s: %u messages processed (%u kept) in %.3f "
			    "seconds (average %.3f)", a->name, n, fetch_kept,
			    tim, tim / n);
		} else {
			log_info("%s: 0 messages processed in %.3f seconds",
			    a->name, tim);
		}
	}

#ifdef DEBUG
	log_debug("%s: polled for %.3f seconds (%.3f blocked)", a->name,
	    fetch_time_polling, fetch_time_blocked);
#endif

	return (aborted);
}

/*
 * Check mail for various problems, add headers and fill tags, then create an
 * and enqueue it onto the fetch queue.
 */
int
fetch_enqueue(struct account *a, struct io *pio, struct mail *m)
{
	struct mail_ctx		*mctx;
	char			*hdr, rtime[128], *rhost, total[16];
	u_int		 	 n, b;
	size_t		 	 size;
	int		 	 error;
 	struct tm		*tm;
	time_t		 	 t;

	/*
	 * Check for oversize mails. This must be first since there is no
	 * guarantee anything other than size is valid if oversize.
	 */
	if (m->size > conf.max_size) {
		log_warnx("%s: message too big: %zu bytes", a->name, m->size);
		if (!conf.del_big)
			return (-1);

		/* Delete the mail. */
		m->decision = DECISION_DROP;
		if (a->fetch->commit != NULL &&
		    a->fetch->commit(a, m) == FETCH_ERROR)
			return (-1);

		mail_destroy(m);
		xfree(m);
		return (0);
	}

 	/*
	 * Find the mail body (needed by trim_from). This is probably slower
	 * than doing it during fetching but it guarantees consistency.
	 */
	m->body = find_body(m);

 	/* Trim "From" line, if any. */
	trim_from(m);

	/* Check for empty mails. */
	if (m->size == 0) {
		log_warnx("%s: empty message", a->name);
		return (-1);
	}

	/* Fill in standard mail attributes. */
	m->decision = DECISION_DROP;
	m->idx = ++a->idx;
	m->tim = get_time();

	/* Add account name tag. */
 	add_tag(&m->tags, "account", "%s", a->name);

	/* Add mail time tags. */
	if (mailtime(m, &t) != 0) {
		log_debug2("%s: bad date header, using current time", a->name);
		t = time(NULL);
	}
	if ((tm = localtime(&t)) != NULL) {
		add_tag(&m->tags, "mail_hour", "%.2d", tm->tm_hour);
		add_tag(&m->tags, "mail_minute", "%.2d", tm->tm_min);
		add_tag(&m->tags, "mail_second", "%.2d", tm->tm_sec);
		add_tag(&m->tags, "mail_day", "%.2d", tm->tm_mday);
		add_tag(&m->tags, "mail_month", "%.2d", tm->tm_mon);
		add_tag(&m->tags, "mail_year", "%.4d", 1900 + tm->tm_year);
		add_tag(&m->tags, "mail_year2", "%.2d", tm->tm_year % 100);
		add_tag(&m->tags, "mail_dayofweek", "%d", tm->tm_wday);
		add_tag(&m->tags, "mail_dayofyear", "%.2d", tm->tm_yday);
		add_tag(&m->tags,
		    "mail_quarter", "%d", (tm->tm_mon - 1) / 3 + 1);
	}
	if (rfc822time(t, rtime, sizeof rtime) != NULL)
		add_tag(&m->tags, "mail_rfc822date", "%s", rtime);

	/* Fill in lines tags. */
	count_lines(m, &n, &b);
	log_debug2("%s: found %u lines, %u in body", a->name, n, b);
	add_tag(&m->tags, "lines", "%u", n);
	add_tag(&m->tags, "body_lines", "%u", b);
	if (n - b != 0)
		b++;	/* don't include the separator */
	add_tag(&m->tags, "header_lines", "%u", n - b);

	/* Insert message-id tag. */
	hdr = find_header(m, "message-id", &size, 1);
	if (hdr == NULL || size == 0 || size > INT_MAX)
		log_debug2("%s: message-id not found", a->name);
	else {
		log_debug2("%s: message-id is: %.*s", a->name, (int) size, hdr);
		add_tag(&m->tags, "message_id", "%.*s", (int) size, hdr);
	}

	/*
	 * Insert received header.
	 *
	 * No header line must exceed 998 bytes. Limiting the user-supplied
	 * stuff to 900 bytes gives plenty of space for the other stuff, and if
	 * it gets truncated, who cares?
	 */
	if (!conf.no_received) {
		error = 1;
		if (rfc822time(time(NULL), rtime, sizeof rtime) != NULL) {
			rhost = conf.info.fqdn;
			if (rhost == NULL)
				rhost = conf.info.host;

			error = insert_header(m, "received", "Received: by "
			    "%.450s (%s " BUILD ", account \"%.450s\");\n\t%s",
			    rhost, __progname, a->name, rtime);
		}
		if (error != 0)
			log_debug3("%s: couldn't add received header", a->name);
	}

	/* Fill wrapped line list. */
	n = fill_wrapped(m);
	log_debug2("%s: found %u wrapped lines", a->name, n);

	/* Create the mctx. */
	mctx = xcalloc(1, sizeof *mctx);
	mctx->account = a;
	mctx->io = pio;
	mctx->mail = m;
	mctx->msgid = 0;
	mctx->done = 0;

	mctx->matched = 0;

	mctx->rule = TAILQ_FIRST(&conf.rules);
	TAILQ_INIT(&mctx->dqueue);
	ARRAY_INIT(&mctx->stack);

	/* And enqueue it. */
	TAILQ_INSERT_TAIL(&fetch_matchq, mctx, entry);
	fetch_queued++;

	*total = '\0';
	if (a->fetch->total != NULL && a->fetch->total(a) != 0)
		xsnprintf(total, sizeof total, " of %u", a->fetch->total(a));
	log_debug("%s: got message %u%s: size %zu, body %zu", a->name, m->idx,
	    total, m->size, m->body);
	return (0);
}

/* Resolve final decision and dequeue mail. */
int
fetch_dequeue(struct account *a, struct mail_ctx *mctx)
{
	struct mail	*m = mctx->mail;

	if (conf.keep_all || a->keep)
		m->decision = DECISION_KEEP;

	switch (m->decision) {
	case DECISION_DROP:
		fetch_dropped++;
		log_debug("%s: deleting message %u", a->name, m->idx);
		break;
	case DECISION_KEEP:
		fetch_kept++;
		log_debug("%s: keeping message %u", a->name, m->idx);
		break;
	default:
		fatalx("invalid decision");
	}

	if (a->fetch->commit != NULL && a->fetch->commit(a, m) == FETCH_ERROR)
		return (-1);

	TAILQ_REMOVE(&fetch_matchq, mctx, entry);
	fetch_queued--;

	fetch_free1(mctx);

	return (0);
}


syntax highlighted by Code2HTML, v. 0.9.1