/* $Id: increment.pc,v 2.21 2008-08-22 17:49:12 zacheiss Exp $
 *
 * Deal with incremental updates
 *
 * Copyright (C) 1989-1998 by the Massachusetts Institute of Technology
 * For copying and distribution information, please see the file
 * <mit-copyright.h>.
 */

#include <mit-copyright.h>
#include "mr_server.h"
#include "query.h"
#include "qrtn.h"

#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

EXEC SQL INCLUDE sqlca;

RCSID("$Header: /afs/athena.mit.edu/astaff/project/moiradev/repository/moira/server/increment.pc,v 2.21 2008-08-22 17:49:12 zacheiss Exp $");

extern char *whoami;
extern char *table_name[];
extern int num_tables;

int inc_pid = 0;
int inc_running = 0;
time_t inc_started;

#define MAXARGC 16

EXEC SQL WHENEVER SQLERROR DO dbmserr();

EXEC SQL BEGIN DECLARE SECTION;
/* structures to save before args */
static char *before[MAXARGC];
static int beforec;
static enum tables beforetable;

/* structures to save after args */
static char *after[MAXARGC];
static int afterc;
EXEC SQL END DECLARE SECTION;

/* structures to save entire sets of incremental changes */
struct save_queue *incremental_sq = NULL;
struct save_queue *incremental_exec = NULL;
struct iupdate {
  char *table;
  int beforec;
  char **before;
  int afterc;
  char **after;
  char *service;
};

void next_incremental(void);
char **copy_argv(char **argv, int argc);
void free_argv(char **argv, int argc);
int table_num(char *table);

void incremental_init(void)
{
  int i;

  if (!incremental_sq)
    incremental_sq = sq_create();
  if (!incremental_exec)
    incremental_exec = sq_create();

  for (i = 0; i < MAXARGC; i++)
    {
      before[i] = xmalloc(MAX_FIELD_WIDTH);
      after[i] = xmalloc(MAX_FIELD_WIDTH);
    }
}


/* record the state of a table row before it is changed */

void incremental_before(enum tables table, char *qual, char **argv)
{
  EXEC SQL BEGIN DECLARE SECTION;
  int id;
  EXEC SQL END DECLARE SECTION;

  char *name, *name2;

  beforetable = table;

  switch (table)
    {
    case USERS_TABLE:
      sprintf(stmt_buf, "SELECT u.login, u.unix_uid, u.shell, "
	      "u.winconsoleshell, u.last, u.first, u.middle, u.status, "
	      "u.clearid, u.type, u.users_id, u.winhomedir, u.winprofiledir, "
	      "u.potype FROM users u WHERE %s", qual);
      dosql(before);
      beforec = 14;
      break;
    case MACHINE_TABLE:
      sprintf(stmt_buf, "SELECT m.name, m.vendor, m.mach_id FROM machine m "
	      "WHERE %s", qual);
      dosql(before);
      beforec = 3;
      break;
    case CLUSTERS_TABLE:
      sprintf(stmt_buf, "SELECT c.name, c.description, c.location, "
	      "c.clu_id FROM clusters c WHERE %s", qual);
      dosql(before);
      beforec = 4;
      break;
    case CONTAINERS_TABLE:
      sprintf(stmt_buf, "SELECT c.name, c.description, c.location, c.contact, "
	      "c.acl_type, c.acl_id, c.cnt_id, c.list_id FROM containers c "
	      "WHERE %s", qual);
      dosql(before);
      beforec = 8;
      name = xmalloc(0);
      id = atoi(before[5]);
      if (!strncmp(before[4], "USER", 4))
	{
	  id_to_name(id, USERS_TABLE, &name);
	  strcpy(before[5], name);
	}
      else if (!strncmp(before[4], "LIST", 4))
	{
	  id_to_name(id, LIST_TABLE, &name);
	  strcpy(before[5], name);
	}
      else if (!strncmp(before[4], "KERBEROS", 8))
	{
	  id_to_name(id, STRINGS_TABLE, &name);
	  strcpy(before[5], name);
	}
      id = atoi(before[7]);
      id_to_name(id, LIST_TABLE, &name);
      strcpy(before[7], name);
      break;
    case MCMAP_TABLE:
      strcpy(before[0], argv[0]);
      strcpy(before[1], argv[1]);
      beforec = 2;
      break;
    case MCNTMAP_TABLE:
      strcpy(before[0], argv[0]);
      strcpy(before[1], argv[1]);
      name_to_id(before[0], MACHINE_TABLE, &id);
      sprintf(before[2], "%d", id);
      name_to_id(before[1], CONTAINERS_TABLE, &id);
      sprintf(before[3], "%d", id);
      name = xmalloc(0);
      EXEC SQL SELECT list_id INTO :before[4] FROM containers
	WHERE cnt_id = :id;
      id = atoi(before[4]);
      id_to_name(id, LIST_TABLE, &name);
      strcpy(before[4], name);
      beforec = 5;
      break;
    case SVC_TABLE:
      strcpy(before[0], argv[0]);
      strcpy(before[1], argv[1]);
      strcpy(before[2], argv[2]);
      beforec = 3;
      break;
    case FILESYS_TABLE:
      sprintf(stmt_buf, "SELECT fs.label, fs.type, fs.mach_id, fs.name, "
	      "fs.mount, fs.rwaccess, fs.comments, fs.owner, fs.owners, "
	      "fs.createflg, fs.lockertype, fs.filsys_id FROM filesys fs "
	      "WHERE %s", qual);
      dosql(before);
      name = xmalloc(0);
      id = atoi(before[2]);
      id_to_name(id, MACHINE_TABLE, &name);
      strcpy(before[2], name);
      id = atoi(before[7]);
      id_to_name(id, USERS_TABLE, &name);
      strcpy(before[7], name);
      id = atoi(before[8]);
      id_to_name(id, LIST_TABLE, &name);
      strcpy(before[8], name);
      free(name);
      beforec = 12;
      break;
    case QUOTA_TABLE:
      strcpy(before[0], "?");
      strcpy(before[1], argv[1]);
      strcpy(before[2], "?");
      sprintf(stmt_buf, "SELECT q.quota, fs.name FROM quota q, filesys fs "
	      "WHERE %s AND fs.filsys_id = q.filsys_id", qual);
      dosql(&(before[3]));
      strcpy(before[2], argv[1]);
      beforec = 5;
      break;
    case LIST_TABLE:
      sprintf(stmt_buf, "SELECT l.name, l.active, l.publicflg, l.hidden, "
	      "l.maillist, l.grouplist, l.gid, l.acl_type, l.acl_id, "
	      "l.description, l.list_id FROM list l WHERE %s", qual);
      dosql(before);
      beforec = 11;
      break;
    case IMEMBERS_TABLE:
      id = (int) argv[0];
      sprintf(stmt_buf, "SELECT active, publicflg, hidden, maillist, "
	      "grouplist, gid FROM list WHERE list_id = %d", id);
      dosql(&(before[3]));
      name = xmalloc(0);
      id_to_name(id, LIST_TABLE, &name);
      name2 = xmalloc(0);
      strcpy(before[0], name);
      strcpy(before[1], argv[1]);
      id = (int) argv[2];
      beforec = 10;
      if (!strcmp(before[1], "USER"))
	{
	  id_to_name(id, USERS_TABLE, &name2);
	  EXEC SQL SELECT status, users_id INTO :before[9], :before[11] 
	    FROM users WHERE users_id = :id;
	  EXEC SQL SELECT list_id INTO :before[10] FROM list
	    WHERE name = :name;
	  beforec = 12;
      }
      else if (!strcmp(before[1], "LIST"))
	{
	  id_to_name(id, LIST_TABLE, &name2);
	  EXEC SQL SELECT list_id INTO :before[9] FROM list
	    WHERE name = :name;
	  sprintf(before[10], "%d", id);
	  beforec = 11;
	}
      else if (!strcmp(before[1], "STRING") || !strcmp(before[1], "KERBEROS"))
	{
	  id_to_name(id, STRINGS_TABLE, &name2);
	  EXEC SQL SELECT list_id INTO :before[9] FROM list
	    WHERE name = :name;
	}
      else if (!strcmp(before[1], "MACHINE"))
	{
	  id_to_name(id, MACHINE_TABLE, &name2);
	  EXEC SQL SELECT list_id INTO :before[9] FROM list
	    WHERE name = :name;
	  sprintf(before[10], "%d", id);
	  beforec = 11;
	}
      strcpy(before[2], name2);
      free(name);
      free(name2);
      break;
    default:
	/*
	com_err(whoami, 0, "requested incremental on unexpected table `%s'",
		table_name[table]);
	*/
      break;
    }
}


void incremental_clear_before(void)
{
  beforec = 0;
}


/* add an element to the incremental queue for the changed row */

void incremental_after(enum tables table, char *qual, char **argv)
{
  char *name, *name2;
  EXEC SQL BEGIN DECLARE SECTION;
  int id; 
  EXEC SQL END DECLARE SECTION;
  struct iupdate *iu;

  switch (table)
    {
    case USERS_TABLE:
      sprintf(stmt_buf, "SELECT u.login, u.unix_uid, u.shell, "
	      "u.winconsoleshell, u.last, u.first, u.middle, u.status, "
	      "u.clearid, u.type, u.users_id, u.winhomedir, u.winprofiledir, "
	      "u.potype FROM users u WHERE %s", qual);
      dosql(after);
      afterc = 14;
      break;
    case MACHINE_TABLE:
      sprintf(stmt_buf, "SELECT m.name, m.vendor, m.mach_id FROM machine m "
	      "WHERE %s", qual);
      dosql(after);
      afterc = 3;
      break;
    case CLUSTERS_TABLE:
      sprintf(stmt_buf, "SELECT c.name, c.description, c.location, "
	      "c.clu_id FROM clusters c WHERE %s", qual);
      dosql(after);
      afterc = 4;
      break;
    case CONTAINERS_TABLE:
      sprintf(stmt_buf, "SELECT c.name, c.description, c.location, c.contact, "
	      "c.acl_type, c.acl_id, c.cnt_id, c.list_id FROM containers c "
	      "WHERE %s", qual);
      dosql(after);
      afterc = 8;
      name = xmalloc(0);
      id = atoi(after[5]);
      if (!strncmp(after[4], "USER", 4))
	{
	  id_to_name(id, USERS_TABLE, &name);
	  strcpy(after[5], name);
	}
      else if (!strncmp(after[4], "LIST", 4))
	{
	  id_to_name(id, LIST_TABLE, &name);
	  strcpy(after[5], name);
	}
      else if (!strncmp(after[4], "KERBEROS", 8))
	{
	  id_to_name(id, STRINGS_TABLE, &name);
	  strcpy(after[5], name);
	}
      id = atoi(after[7]);
      id_to_name(id, LIST_TABLE, &name);
      strcpy(after[7], name);
      break;
    case MCMAP_TABLE:
      strcpy(after[0], argv[0]);
      strcpy(after[1], argv[1]);
      afterc = 2;
      break;
    case MCNTMAP_TABLE:
      strcpy(after[0], argv[0]);
      strcpy(after[1], argv[1]);
      name_to_id(after[0], MACHINE_TABLE, &id);
      sprintf(after[2], "%d", id);
      name_to_id(after[1], CONTAINERS_TABLE, &id);
      sprintf(after[3], "%d", id);
      name = xmalloc(0);
      EXEC SQL SELECT list_id INTO :after[4] FROM containers
	WHERE cnt_id = :id;
      id = atoi(after[4]);
      id_to_name(id, LIST_TABLE, &name);
      strcpy(after[4], name);
      afterc = 5;
      break;
    case SVC_TABLE:
      strcpy(after[0], argv[0]);
      strcpy(after[1], argv[1]);
      strcpy(after[2], argv[2]);
      afterc = 3;
      break;
    case FILESYS_TABLE:
      sprintf(stmt_buf, "SELECT fs.label, fs.type, fs.mach_id, fs.name, "
	      "fs.mount, fs.rwaccess, fs.comments, fs.owner, fs.owners, "
	      "fs.createflg, fs.lockertype, fs.filsys_id FROM filesys fs "
	      "WHERE %s", qual);
      dosql(after);
      name = xmalloc(0);
      id = atoi(after[2]);
      id_to_name(id, MACHINE_TABLE, &name);
      strcpy(after[2], name);
      id = atoi(after[7]);
      id_to_name(id, USERS_TABLE, &name);
      strcpy(after[7], name);
      id = atoi(after[8]);
      id_to_name(id, LIST_TABLE, &name);
      strcpy(after[8], name);
      free(name);
      afterc = 12;
      break;
    case QUOTA_TABLE:
      strcpy(after[0], "?");
      strcpy(after[1], argv[1]);
      strcpy(after[2], "?");
      sprintf(stmt_buf, "SELECT q.quota, fs.name FROM quota q, filesys fs "
	      "WHERE %s and fs.filsys_id = q.filsys_id and q.type = '%s'",
	      qual, argv[1]);
      dosql(&(after[3]));
      afterc = 5;
      break;
    case LIST_TABLE:
      sprintf(stmt_buf, "SELECT l.name, l.active, l.publicflg, l.hidden, "
	      "l.maillist, l.grouplist, l.gid, l.acl_type, l.acl_id, "
	      "l.description, l.list_id FROM list l WHERE %s", qual);
      dosql(after);
      afterc = 11;
      break;
    case IMEMBERS_TABLE:
      id = (int) argv[0];
      sprintf(stmt_buf, "SELECT active, publicflg, hidden, maillist, "
	      "grouplist, gid FROM list WHERE list_id = %d", id);
      dosql(&(after[3]));
      name = xmalloc(0);
      id_to_name(id, LIST_TABLE, &name);
      name2 = xmalloc(0);
      strcpy(after[0], name);
      strcpy(after[1], argv[1]);
      id = (int) argv[2];
      afterc = 10;
      if (!strcmp(after[1], "USER"))
	{
	  id_to_name(id, USERS_TABLE, &name2);
	  EXEC SQL SELECT status, users_id INTO :after[9], :after[11]
	    FROM users WHERE users_id = :id;
	  EXEC SQL SELECT list_id INTO :after[10] FROM list
	    WHERE name = :name;
	  afterc = 12;
	}
      else if (!strcmp(after[1], "LIST"))
	{
	  id_to_name(id, LIST_TABLE, &name2);
	  EXEC SQL SELECT list_id INTO :after[9] FROM list
	    WHERE name = :name;
	  sprintf(after[10], "%d", id);
	  afterc = 11;
	}
      else if (!strcmp(after[1], "STRING") || !strcmp(after[1], "KERBEROS"))
	{
	  id_to_name(id, STRINGS_TABLE, &name2);
	  EXEC SQL SELECT list_id INTO :after[9] FROM list
	    WHERE name = :name;
	}
      else if (!strcmp(after[1], "MACHINE"))
	{
	  id_to_name(id, MACHINE_TABLE, &name2);
	  EXEC SQL SELECT list_id INTO :after[9] FROM list
	    WHERE name = :name;
	  sprintf(after[10], "%d", id);
	  afterc = 11;
	}
      strcpy(after[2], name2);
      free(name);
      free(name2);
      break;
    case NO_TABLE:
      afterc = 0;
      table = beforetable;
      break;
    default:
	/*
	com_err(whoami, 0, "requested incremental on unexpected table `%s'",
		table_name[table]);
	*/
      break;
    }

  iu = xmalloc(sizeof(struct iupdate));
  iu->table = table_name[table];
  iu->beforec = beforec;
  iu->before = copy_argv(before, beforec);
  iu->afterc = afterc;
  iu->after = copy_argv(after, afterc);
  sq_save_data(incremental_sq, iu);
}

void incremental_clear_after(void)
{
  incremental_after(NO_TABLE, NULL, NULL);
}


/* Called when the current transaction is committed to start any queued
 * incremental updates.  This caches the update table the first time it
 * is called.
 */

struct inc_cache {
  struct inc_cache *next;
  char *table, *service;
};


void incremental_update(void)
{
  static int inited = 0;
  static struct inc_cache *cache;
  struct inc_cache *c;
  EXEC SQL BEGIN DECLARE SECTION;
  char tab[INCREMENTAL_TABLE_NAME_SIZE], serv[INCREMENTAL_SERVICE_SIZE];
  EXEC SQL END DECLARE SECTION;
  struct iupdate *iu, *iu_save;

  if (!inited)
    {
      inited++;

      EXEC SQL DECLARE inc CURSOR FOR SELECT table_name, service
	FROM incremental;
      EXEC SQL OPEN inc;
      while (1)
	{
	  EXEC SQL FETCH inc INTO :tab, :serv;
	  if (sqlca.sqlcode)
	    break;
	  c = xmalloc(sizeof(struct inc_cache));
	  c->next = cache;
	  c->table = xstrdup(strtrim(tab));
	  c->service = xstrdup(strtrim(serv));
	  cache = c;
	}
      EXEC SQL CLOSE inc;
      EXEC SQL COMMIT WORK;
    }

  while (sq_remove_data(incremental_sq, &iu))
    {
      for (c = cache; c; c = c->next)
	{
	  if (!strcmp(c->table, iu->table))
	    {
	      iu->service = c->service;
	      iu_save = xmalloc(sizeof(struct iupdate));
	      iu_save->service = iu->service;
	      iu_save->table = iu->table;
	      iu_save->beforec = iu->beforec;
	      iu_save->afterc = iu->afterc;
	      iu_save->before = copy_argv(iu->before, iu->beforec);
	      iu_save->after = copy_argv(iu->after, iu->afterc);
	      sq_save_data(incremental_exec, iu_save);
	    }
	}
      if (!c)
	{
	  free_argv(iu->before, iu->beforec);
	  free_argv(iu->after, iu->afterc);
	  free(iu);
	}
    }
  if (inc_running == 0)
    next_incremental();
}

/* Pro*C 2.2.4 can't cope with the sigset_t below, at least in Solaris 2.6.
   We add DEFINE=_PROC_ to the proc invocation and then #ifndef that around
   this function so proc will pass it through without reading it. */

#ifndef _PROC_
void next_incremental(void)
{
  struct iupdate *iu;
  char *argv[MAXARGC * 2 + 4], cafter[3], cbefore[3], prog[MAXPATHLEN];
  int i;
  sigset_t sigs;

  if (!incremental_exec)
    incremental_init();

  if (sq_empty(incremental_exec) ||
      (inc_running && now - inc_started < INC_TIMEOUT))
    return;

  if (inc_running)
    com_err(whoami, 0, "incremental timeout on pid %d", inc_pid);

  sq_remove_data(incremental_exec, &iu);
  argv[1] = iu->table;
  sprintf(cbefore, "%d", iu->beforec);
  argv[2] = cbefore;
  sprintf(cafter, "%d", iu->afterc);
  argv[3] = cafter;
  for (i = 0; i < iu->beforec; i++)
    argv[4 + i] = iu->before[i];
  for (i = 0; i < iu->afterc; i++)
    argv[4 + iu->beforec + i] = iu->after[i];

  sprintf(prog, "%s/%s.incr", BIN_DIR, iu->service);
  argv[0] = prog;
  argv[4 + iu->beforec + iu->afterc] = 0;

  sigemptyset(&sigs);
  sigaddset(&sigs, SIGCHLD);
  sigprocmask(SIG_BLOCK, &sigs, NULL);
  inc_pid = vfork();
  switch (inc_pid)
    {
    case 0:
      execv(prog, argv);
      _exit(1);
    case -1:
      com_err(whoami, 0, "Failed to start incremental update");
      break;
    default:
      inc_running = 1;
      inc_started = now;
    }
  sigprocmask(SIG_UNBLOCK, &sigs, NULL);

  free_argv(iu->before, iu->beforec);
  free_argv(iu->after, iu->afterc);
  free(iu);
}
#endif

/* Called when the current transaction is aborted to throw away any queued
 * incremental updates
 */

void incremental_flush(void)
{
  struct iupdate *iu;

  while (sq_get_data(incremental_sq, &iu))
    {
      free_argv(iu->before, iu->beforec);
      free_argv(iu->after, iu->afterc);
      free(iu);
    }
  sq_destroy(incremental_sq);
  incremental_sq = sq_create();
}


char **copy_argv(char **argv, int argc)
{
  char **ret = xmalloc(sizeof(char *) * argc);
  while (--argc >= 0)
    ret[argc] = xstrdup(strtrim(argv[argc]));
  return ret;
}

void free_argv(char **argv, int argc)
{
  while (--argc >= 0)
    free(argv[argc]);
  free(argv);
}

int table_num(char *name)
{
  int i;

  for (i = num_tables - 1; i; i--)
    {
      if (!strcmp(table_name[i], name))
	break;
    }

  return i; /* 0 = "none" if no match */
}
