/* multipipe -- pipe output to multiple programs
 * Copyright (C) 2000 Bruce Guenter <bruceg@em.ca>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */
#include <dirent.h>
#include <fcntl.h>
#include <signal.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include "select.h"
#include "direntry.h"
#include "bool.h"
#include "svcfns.h"

void err(const char* msg)
{
  write(2, "multipipe: Error: ", 18);
  write(2, msg, strlen(msg));
  write(2, "\n", 1);
}

void err2(const char* msg1, const char* msg2)
{
  write(2, "multipipe: Error: ", 18);
  write(2, msg1, strlen(msg1));
  write(2, msg2, strlen(msg2));
  write(2, "\n", 1);
}

void set_ndelay(int fd)
{
  int flags = fcntl(fd, F_GETFL, 0);
  fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

#define BUFSIZE 8192
static char buffer[BUFSIZE];
static unsigned buf_start = 0;
static unsigned buf_end = 0;
static bool buf_eof = false;
#define buf_wrapped (buf_end < buf_start)
#define buf_left (buf_start-buf_end + (buf_wrapped ? -1 : BUFSIZE-1))
#define buf_len (buf_end-buf_start + (buf_wrapped ? BUFSIZE : 0))

struct reader 
{
  ino_t inode;
  const char* name;
  pid_t pid;
  int fd;
  unsigned buf_pos;
  bool marked;
  struct reader* next;
};

static struct reader* readers = 0;

void reset_buf_start(void)
{
  struct reader* reader;
  bool wrapped = (buf_end < buf_start);

  buf_start = buf_end;
  for(reader = readers; reader; reader = reader->next) {
    unsigned bp = reader->buf_pos;
    if(bp < buf_start && (!wrapped || bp >= buf_end))
      buf_start = bp;
  }
  if(buf_start == buf_end) {
    buf_end = buf_start = 0;
    for(reader = readers; reader; reader = reader->next)
      reader->buf_pos = 0;
  }
}

void read_input(void)
{
  unsigned readable = buf_wrapped ? buf_left : BUFSIZE-buf_end;
  ssize_t rd;
  if(readable >= buf_left)
    readable = buf_left;
  rd = read(FD_STDIN, buffer+buf_end, readable);
  if(rd <= 0)
    buf_eof = true;
  else
    buf_end = (buf_end + rd) % BUFSIZE;
}

void write_output(struct reader* reader)
{
  unsigned writable = buf_wrapped ? BUFSIZE-buf_end : buf_len;
  ssize_t wr = write(reader->fd, buffer+reader->buf_pos, writable);
  if(wr > 0) {
    reader->buf_pos = (reader->buf_pos + wr) % BUFSIZE;
    reset_buf_start();
  }
}

void add_reader(const char* name, ino_t inode)
{
  struct reader* r = malloc(sizeof(struct reader));
  r->name = strdup(name);
  r->inode = inode;
  r->pid = 0;
  r->fd = -1;
  r->buf_pos = buf_end;
  r->next = readers;
  readers = r;
}

bool del_reader(pid_t pid) 
{
  struct reader* curr = readers;
  struct reader* prev = 0;
  while(curr) {
    struct reader* next = curr->next;
    if(curr->pid == pid) {
      if(prev)
	prev->next = next;
      else
	readers = next;
      free((char*)curr->name);
      free(curr);
      return true;
    }
    prev = curr;
    curr = next;
  }
  return false;
}

void start_reader(struct reader* reader)
{
  int fd[2];
  if(pipe(fd)) {
    err2("Could not create pipe to reader ", reader->name);
    return;
  }
  reader->pid = start_supervise(reader->name, fd[0], FD_STDOUT);
  close(fd[0]);
  reader->fd = fd[1];
  set_ndelay(reader->fd);
}

void stop_reader(struct reader* reader)
{
  stop_supervise(reader->name, reader->pid);
}

void stop_readers(void)
{
  struct reader* reader;
  for(reader = readers; reader; reader = reader->next)
    stop_reader(reader);
}

void reap_children(void)
{
  pid_t pid;
  int status;
  while((pid = waitpid(0, &status, WNOHANG)) > 0) {
    if(!del_reader(pid))
      err("Caught exit of unknown process");
  }
}

void scan_dirs(void)
{
  direntry* entry;
  DIR* dir = opendir(".");
  struct reader* reader;
  struct reader* prev;
  
  if(!dir) {
    err("Unable to read directory");
    return;
  }

  /* Clear all the marked flags */
  for(reader = readers; reader; reader = reader->next)
    reader->marked = false;

  /* For each directory entry, mark the corresponding reader.
   * If a matching reader is not found, make one. */
  while((entry = readdir(dir)) != 0) {
    struct stat statbuf;
    if(entry->d_name[0] == '.' || !strcmp(entry->d_name, "supervise"))
      continue;
    if(stat(entry->d_name, &statbuf))
      continue;
    if(!S_ISDIR(statbuf.st_mode))
      continue;
    for(reader = readers; reader; reader = reader->next)
      if(reader->inode == statbuf.st_ino) {
	reader->marked = true;
	break;
      }
    if(!reader) {
      add_reader(entry->d_name, statbuf.st_ino);
      start_reader(readers);
      readers->marked = true;
    }
  }
  closedir(dir);

  reap_children();
  
  /* Clean up any reader that was removed from the directory */
  prev = 0;
  reader = readers;
  while(reader) {
    struct reader* next = reader->next;
    
    if(!reader->marked) {
      /* Don't stop it, since the directory is no longer there */
      /* stop_reader(reader); */
      if(prev)
	prev->next = next;
      else
	readers = next;
      close(reader->fd);
      free((char*)reader->name);
      free(reader);
    }
    prev = reader;
    reader = next;
  }
}

#define EVENT_INTR 0
#define EVENT_ALRM 1
static int selfpipe[2];

void read_event(void)
{
  char buf[1];
  if(read(selfpipe[0], buf, 1) != 1)
    return;
  switch(buf[0]) {
  case EVENT_INTR:
    buf_eof = true;
    break;
  case EVENT_ALRM:
    scan_dirs();
    alarm(5);
    break;
  default:
    err("Unknown event sent to self?!?");
  }
}

void write_event(int event)
{
  char buf[1];
  buf[0] = event;
  if(write(selfpipe[1], buf, 1) != 1)
    err("Could not send event to self");
}

void handle_signal(int sig)
{
  int event;
  switch(sig) {
  case SIGALRM: event = EVENT_ALRM; break;
  default:      event = EVENT_INTR; break;
  }
  write_event(event);
}

void main_loop(void) 
{
  for(;;) {
    struct reader* reader;
    fd_set readfds;
    fd_set writefds;
    int fdmax = selfpipe[0];
    FD_ZERO(&readfds);
    FD_ZERO(&writefds);
    FD_SET(selfpipe[0], &readfds);
    if(buf_eof) {
      if(buf_start == buf_end)
	return;
    }
    else if(buf_left)
      FD_SET(FD_STDIN, &readfds);
    for(reader = readers; reader; reader = reader->next) {
      if(reader->buf_pos != buf_end) {
	int fd = reader->fd;
	FD_SET(fd, &writefds);
	if(fd > fdmax)
	  fdmax = fd;
      }
    }
    if(select(fdmax+1, &readfds, &writefds, 0, 0) == -1)
      continue;
    /* If an event arrived, skip all other I/O */
    if(FD_ISSET(selfpipe[0], &readfds)) {
      read_event();
      continue;
    }
    if(FD_ISSET(FD_STDIN, &readfds))
      read_input();
    for(reader = readers; reader; reader = reader->next)
      if(FD_ISSET(reader->fd, &writefds))
	write_output(reader);
  }
}

int main(int argc, char** argv)
{
  if(argc > 1 && chdir(argv[1]) != 0) {
    err2("Couldn't chdir to ", argv[1]);
    return 1;
  }
  if(pipe(selfpipe)) {
    err("Couldn't create self pipe");
    return 1;
  }
  scan_dirs();
  signal(SIGALRM, handle_signal);
  signal(SIGINT, handle_signal);
  signal(SIGTERM, handle_signal);
  signal(SIGQUIT, handle_signal);
  alarm(5);
  set_ndelay(FD_STDIN);
  main_loop();
  stop_readers();
  return 0;
}


syntax highlighted by Code2HTML, v. 0.9.1