/* $Id: qrtn.pc,v 2.31 2009-07-06 16:23:15 zacheiss Exp $
 *
 * Query-processing routines
 *
 * Copyright (C) 1987-1998 by the Massachusetts Institute of Technology
 * For copying and distribution information, please see the file
 * <mit-copyright.h>.
 *
 */

#include <mit-copyright.h>
#include "mr_server.h"
#include "qrtn.h"
#include "query.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

EXEC SQL INCLUDE sqlca;  /* SQL Communications Area */
EXEC SQL INCLUDE sqlda;  /* SQL Descriptor Area */

RCSID("$Header: /afs/athena.mit.edu/astaff/project/moiradev/repository/moira/server/qrtn.pc,v 2.31 2009-07-06 16:23:15 zacheiss Exp $");

SQLDA *mr_sqlda;
EXEC SQL BEGIN DECLARE SECTION;
char stmt_buf[MR_STMTBUF_LEN];
int proxy_acl;
EXEC SQL END DECLARE SECTION;

char *Argv[QMAXARGS];
extern char *table_name[];
extern char *sqlbuffer[QMAXARGS];

int dbms_errno = 0;
int mr_errcode = 0;
EXEC SQL BEGIN DECLARE SECTION;
int query_timeout = 30;
char *database = "moira";
EXEC SQL END DECLARE SECTION;
extern char *whoami;
extern FILE *journal;
extern int QueryCount, max_version;
extern struct query Queries[];

/* Put this in a variable so that we can patch it if necessary */
int max_row_count = 8192;

int mr_verify_query(client *cl, struct query *q, int argc, char *argv_ro[]);
int do_retrieve(struct query *q, char *pqual,
		int (*action)(int, char *[], void *), void *actarg);
int do_update(struct query *q, char *argv[], char *qual,
	      int (*action)(int, char *[], void *), void *actarg);
int do_append(struct query *q, char *argv[], char *pqual,
	      int (*action)(int, char *[], void *), void *actarg);
int do_delete(struct query *q, char *qual,
	      int (*action)(int, char *[], void *), void *actarg);
void build_sql_stmt(char *result_buf, char *cmd, char *targetlist,
		    char *argv[], char *qual);

SQLDA *mr_alloc_sqlda(void);
void sqlglm(char *, int *, int *);

/*
 * dbmserr: Called when the DBMS indicates an error.
 */

void dbmserr(void)
{
  EXEC SQL BEGIN DECLARE SECTION;
  char err_msg[256];
  EXEC SQL END DECLARE SECTION;
  int bufsize = 256, msglength = 0;

  dbms_errno = -sqlca.sqlcode;
  mr_errcode = MR_DBMS_ERR;
  com_err(whoami, MR_DBMS_ERR, " code %d\n", dbms_errno);
  sqlglm(err_msg, &bufsize, &msglength);
  err_msg[msglength] = 0;
  com_err(whoami, 0, "SQL error text = %s", err_msg);
  critical_alert(whoami, "MOIRA", "Moira server encountered DBMS ERROR %d\n%s",
		 dbms_errno, err_msg);
}

/* This is declarative, not executed.  Applies from here on, in this file. */
EXEC SQL WHENEVER SQLERROR DO dbmserr();

int mr_open_database(void)
{
  int i;
  static int first_open = 1;

  if (first_open)
    {
      first_open = 0;

      /* initialize local argv */
      for (i = 0; i < QMAXARGS; i++)
	Argv[i] = xmalloc(MAX_FIELD_WIDTH);

      mr_sqlda = mr_alloc_sqlda();

      incremental_init();
    }

  dbms_errno = 0;
  mr_errcode = 0;

  /* open the database */
  EXEC SQL CONNECT :database IDENTIFIED BY :database;

  if (dbms_errno)
    return mr_errcode;

  EXEC SQL SELECT list_id INTO :proxy_acl FROM capacls
    WHERE capability = 'proxy';
  if (dbms_errno)
    return mr_errcode;

  return MR_SUCCESS;
}

void mr_close_database(void)
{
  EXEC SQL COMMIT RELEASE;
}

int mr_check_access(client *cl, char *name, int argc, char *argv_ro[])
{
  struct query *q;

  dbms_errno = 0;
  mr_errcode = 0;

  q = get_query_by_name(name, cl->version);
  if (!q)
    return MR_NO_HANDLE;

  return mr_verify_query(cl, q, argc, argv_ro);
}

int mr_process_query(client *cl, char *name, int argc, char *argv_ro[],
		     int (*action)(int, char *[], void *), void *actarg)
{
  struct query *q;
  int status;
  struct validate *v;
  char *qual = NULL;
  EXEC SQL BEGIN DECLARE SECTION;
  char *table;
  EXEC SQL END DECLARE SECTION;
  struct save_queue *sq;

  dbms_errno = 0;
  mr_errcode = 0;

  /* list queries command */
  if (!strcmp(name, "_list_queries"))
    {
      list_queries(cl, action, actarg);
      return MR_SUCCESS;
    }

  /* help query command */
  if (!strcmp(name, "_help"))
    {
      if (argc < 1)
	return MR_ARGS;
      q = get_query_by_name(argv_ro[0], cl->version);
      if (!q)
	return MR_NO_HANDLE;
      help_query(q, action, actarg);
      return MR_SUCCESS;
    }

  /* get query structure, return error if named query does not exist */
  q = get_query_by_name(name, cl->version);
  if (!q)
    return MR_NO_HANDLE;
  v = q->validate;

  /* setup argument vector, verify access and arguments */
  if ((status = mr_verify_query(cl, q, argc, argv_ro)) != MR_SUCCESS)
    goto out;

  /* perform any special query pre-processing */
  if (v && v->pre_rtn)
    {
      status = (*v->pre_rtn)(q, Argv, cl);
      if (status != MR_SUCCESS)
	goto out;
    }

  switch (q->type)
    {
    case MR_Q_RETRIEVE:
      /* for queries that do not permit wildcarding, check if row
	 uniquely exists */
      if (v && v->field)
	{
	  status = validate_row(q, Argv, v);
	  if (status != MR_EXISTS)
	    break;
	}

      /* build "where" clause if needed */
      if (q->qual)
	qual = build_qual(q->qual, q->argc, Argv);

      /* if there is a followup routine, then we must save the results */
      /* of the first query for use by the followup routine */
      /* if q->rvar = NULL, perform post_rtn only */
      if (q->rvar)
	{
	  if (v && v->post_rtn)
	    {
	      sq = sq_create();
	      status = do_retrieve(q, qual, sq_save_args, sq);
	      if (status != MR_SUCCESS)
		{
		  char **argv;
		  int i;

		  while (sq_get_data(sq, &argv))
		    {
		      for (i = 0; i < q->vcnt; i++)
			free(argv[i]);
		      free(argv);
		    }
		  sq_destroy(sq);
		  break;
		}
	      status = (*v->post_rtn)(q, sq, v, action, actarg, cl);
	    }
	  else
	    {
	      /* normal retrieve */
	      status = do_retrieve(q, qual, action, actarg);
	    }
	  if (status != MR_SUCCESS)
	    break;
	}
      else
	status = (*v->post_rtn)(q, Argv, cl, action, actarg);

      break;

    case MR_Q_UPDATE:
      /* see if row already exists */
      if (v->field)
	{
	  status = validate_row(q, Argv, v);
	  if (status != MR_EXISTS)
	    break;
	}

      /* build "where" clause and perform update */
      /* if q->rvar = NULL, perform post_rtn only */
      if (q->rvar)
	{
	  qual = build_qual(q->qual, q->argc, Argv);
	  incremental_before(q->rtable, qual, argv_ro);
	  status = do_update(q, &Argv[q->argc], qual, action, actarg);
	  incremental_after(q->rtable, qual, argv_ro);
	  if (status != MR_SUCCESS)
	    break;
	  table = table_name[q->rtable];
	  if (strcmp(q->shortname, "sshi") && strcmp(q->shortname, "ssif"))
	    {
	      EXEC SQL UPDATE tblstats
		SET updates = updates + 1, modtime = SYSDATE
		WHERE table_name = :table;
	    }
	}

      /* execute followup routine (if any) */
      if (v->post_rtn)
	status = (*v->post_rtn)(q, Argv, cl);

      break;

    case MR_Q_APPEND:
      /* see if row already exists */
      if (v->field)
	{
	  status = validate_row(q, Argv, v);
	  if (status != MR_NO_MATCH)
	    break;
	}

      /* build "where" clause if needed */
      if (q->qual)
	qual = build_qual(q->qual, q->argc, Argv);

      /* perform the append */
      /* if q->rvar = NULL, perform post_rtn only */
      if (q->rvar)
	{
	  incremental_clear_before();
	  status = do_append(q, &Argv[q->argc], qual, action, actarg);
	  if (status != MR_SUCCESS)
	    break;
	  if (v && v->object_id)
	    {
	      qual = realloc(qual, 15 + strlen(q->rvar) +
			     strlen(Argv[q->argc + q->vcnt]));
	      sprintf(qual, "%s.%s = %s", q->rvar, v->object_id,
		      Argv[q->argc + q->vcnt]);
	      incremental_after(q->rtable, qual, argv_ro);
	    }
	  else
	    incremental_after(q->rtable, qual, argv_ro);

	  table = table_name[q->rtable];
	  EXEC SQL UPDATE tblstats
	    SET appends = appends + 1, modtime = SYSDATE
	    WHERE table_name = :table;
	}

      /* execute followup routine */
      if (v->post_rtn)
	status = (*v->post_rtn)(q, Argv, cl);
      break;

    case MR_Q_DELETE:
      /* see if row already exists */
      if (v->field)
	{
	  status = validate_row(q, Argv, v);
	  if (status != MR_EXISTS)
	    break;
	}

      /* build "where" clause and perform delete */
      /* if q->rvar = NULL, perform post_rtn only */
      if (q->rvar)
	{
	  qual = build_qual(q->qual, q->argc, Argv);
	  table = table_name[q->rtable];
	  incremental_before(q->rtable, qual, argv_ro);
	  status = do_delete(q, qual, action, actarg);
	  incremental_clear_after();
	  if (status != MR_SUCCESS)
	    break;
	  EXEC SQL UPDATE tblstats
	    SET deletes = deletes + 1, modtime = SYSDATE
	    WHERE table_name = :table;
	}

      /* execute followup routine */
      if (v->post_rtn)
	status = (*v->post_rtn)(q, Argv, cl);
      break;

    case MR_Q_SPECIAL:
      break;
    }

out:
  free(qual);

  if (status == MR_SUCCESS && dbms_errno != 0)
    {
      com_err(whoami, MR_INTERNAL, "Server didn't notice DBMS ERROR %d",
	      dbms_errno);
      status = mr_errcode;
    }

  if (q->type == MR_Q_RETRIEVE)
    EXEC SQL COMMIT WORK;
  else
    {
      if (status == MR_SUCCESS)
	{
	  EXEC SQL COMMIT WORK;
	  if (journal)
	    {
	      char *buf;
	      int i;
	      extern time_t now;

	      fprintf(journal, "%% %s %s %s",
		      cl->clname, cl->entity, ctime(&now));
	      fprintf(journal, "%s ", q->name);
	      for (i = 0; i < argc; i++)
		{
		  if (i != 0)
		    putc(' ', journal);
		  buf = requote(argv_ro[i]);
		  fputs(buf, journal);
		  free(buf);
		}
	      putc('\n', journal);
	      fflush(journal);
	    }
	  incremental_update();
	}
      else
	{
	  EXEC SQL ROLLBACK WORK;
	  incremental_flush();
	}
    }

  if (status != MR_SUCCESS)
    com_err(whoami, status, " (Query failed)");
  return status;
}

char *build_qual(char *fmt_buf, int argc, char *argv[])
{
  char *res, *result_buf, *fmt, *arg, *like, *p;

  result_buf = xmalloc(2 * (strlen(fmt_buf) + argc * ARGLEN));

  res = result_buf;
  fmt = fmt_buf;

  like = strstr(fmt, "LIKE");
  arg = strchr(fmt, '%');

  /* Look through the format for LIKE expressions and arguments.
     Substitute in the arguments, simplify the `LIKE's to `='s
     where possible, and insert ESCAPE clauses where needed */

  while (*fmt)
    {
      
      if ((!like && !arg) || argc == 0)
	{
	  /* only plain text remains */
	  strcpy(res, fmt);
	  res = strchr(res, '\0');
	  break;
	}
      else if (!like || arg < like)
	{
	  /* regular arg: copy up to arg, then substitute */
	  strncpy(res, fmt, arg - fmt);
	  res += arg - fmt;
	  if (*++arg)
	    {
	      switch (*arg++)
		{
		case '%':
		  *res++ = '%';
		  break;

		case 's':
		  p = *argv;
		  /* copy string, doubling single quotes */
		  while (*p)
		    {
		      if (*p == '\'')
			*res++ = '\'';
		      *res++ = *p++;
		    }
		  argv++;
		  break;

		case 'd':
		  res += sprintf(res, "%d", *(int *)*argv++);
		  break;
		}
	    }
	  fmt = arg;
	  arg = strchr(fmt, '%');
	} else {
	  /* LIKE arg: copy over up to the arg, then copy and convert arg */
	  int escape = 0, pattern = 0;
	  char *likepos = res + (like - fmt);

	  strncpy(res, fmt, arg - fmt);
	  res += arg - fmt;

	  /* copy arg, converting UNIX globs to `SQL voodoo', and noting
	     if we'll need an ESCAPE clause */
	  for (p = *argv++; *p; p++)
	    {
	      switch (*p)
		{
		case '*':
		  *res++ = '%';
		  *res++ = '%'; /* need to double for build_sql_stmt */
		  pattern = 1;
		  break;

		case '?':
		  *res++ = '_';
		  pattern = 1;
		  break;

		case '%':
		case '_':
		  *res++ = '*';
		  *res++ = *p;
		  if (*p == '%')
		    *res++ = *p;
		  escape = 1;
		  break;

		case '\'':
		  *res++ = '\'';
		  /* fall through */

		default:
		  *res++ = *p;
		}
	    }

	  /* if no pattern characters, write over "LIKE" with " =  " */
	  if (!pattern && !escape)
	    memcpy(likepos, " =  ", 4);

	  fmt = arg + 2;
	  while (*fmt && *fmt != ' ')
	    *res++ = *fmt++;

	  if (escape)
	    res += sprintf(res, " ESCAPE '*'");

	  arg = strchr(fmt, '%');
	  like = strstr(fmt, "LIKE");
	}
    }

  *res = '\0';
  result_buf = realloc(result_buf, strlen(result_buf) + 1);
  return result_buf;
}

/* Build arguement vector, verify query and arguments */

int privileged;

int mr_verify_query(client *cl, struct query *q, int argc, char *argv_ro[])
{
  int argreq;
  int status;
  struct validate *v = q->validate;
  int i;
  char *to, *fr, *stop;

  privileged = 0;

  /* check argument count */
  argreq = q->argc;
  if (q->type == MR_Q_UPDATE || q->type == MR_Q_APPEND)
    argreq += q->vcnt;
  if (argc != argreq)
    return MR_ARGS;

  /* copy the arguments into a local argv that we can modify */
  for (i = 0; i < argc; i++)
    {
      for (to = Argv[i], fr = argv_ro[i], stop = to + MAX_FIELD_WIDTH; (*fr) && (to < stop);)
	*to++ = *fr++;

      if (*fr)
	return MR_ARG_TOO_LONG;
      *to = '\0';
    }

  /* Check initial query access.  If we're acting as a proxy, only allow
   * access if the query has "default" as a capacl.
   */
  status = check_query_access(q, Argv, cl);
  if (status != MR_SUCCESS && status != MR_PERM)
    return status;
  if (status == MR_SUCCESS && (!cl->proxy_id || q->everybody))
      privileged++;

  /* validate arguments */
  if (v && v->valobj)
    {
      status = validate_fields(q, Argv, v->valobj, v->objcnt);
      if (status != MR_SUCCESS)
	return status;
    }

  /* perform special query access check */
  if (!privileged && v && v->acs_rtn)
    {
      status = (*v->acs_rtn)(q, Argv, cl);
      if (status != MR_SUCCESS && status != MR_PERM)
	return status;
      if (status == MR_SUCCESS)
	return MR_SUCCESS;
    }

  return privileged ? MR_SUCCESS : MR_PERM;
}

int check_query_access(struct query *q, char *argv[], client *cl)
{
  EXEC SQL BEGIN DECLARE SECTION;
  char *name;
  int acl_id;
  static int def_uid;
  EXEC SQL END DECLARE SECTION;

  /* initialize default uid */
  if (def_uid == 0)
    EXEC SQL SELECT users_id INTO :def_uid FROM users WHERE login = 'default';

  name = q->shortname;
  EXEC SQL SELECT list_id INTO :acl_id FROM capacls WHERE tag = :name;
  if (sqlca.sqlcode < 0)
    return MR_DBMS_ERR;
  if (sqlca.sqlcode == SQL_NO_MATCH)
    return MR_PERM;
  q->acl = acl_id;
  
  /* check for default access */
  EXEC SQL SELECT member_id INTO :acl_id FROM imembers
    WHERE list_id = :acl_id AND member_type = 'USER'
    AND member_id = :def_uid;
  if (sqlca.sqlerrd[2] == 0)
    q->everybody = 0;
  else
    q->everybody = 1;

  if (q->everybody)
    return MR_SUCCESS;

  if (find_member("LIST", acl_id, cl))
    return MR_SUCCESS;
  else
    return MR_PERM;
}

int find_member(char *list_type, int list_id, client *cl)
{
  EXEC SQL BEGIN DECLARE SECTION;
  int flag, users_id, client_id;
  EXEC SQL END DECLARE SECTION;

  if (!strcmp(strtrim(list_type), "USER") && list_id == cl->users_id)
    return 1;

  if (!strcmp(strtrim(list_type), "KERBEROS") && list_id == -cl->client_id)
    return 1;

  if (!strcmp(strtrim(list_type), "LIST"))
    {
      /* see if client is a member of list */
      flag = 0;
      users_id = cl->users_id;
      client_id = -cl->client_id;
      EXEC SQL SELECT COUNT(member_id) INTO :flag FROM imembers
	WHERE list_id = :list_id
	AND ( ( member_type = 'USER' AND member_id = :users_id )
	      OR (member_type = 'KERBEROS' AND member_id = :client_id ) );
      if (sqlca.sqlcode == 0)
	return flag;
    }

  return 0;
}


int do_retrieve(struct query *q, char *pqual,
		int (*action)(int, char *[], void *), void *actarg)
{
  build_sql_stmt(stmt_buf, "SELECT", q->tlist, NULL, pqual);
  if (q->sort)
    {
      strcat(stmt_buf, " ORDER BY ");
      strcat(stmt_buf, q->sort);
    }

  return do_for_all_rows(stmt_buf, q->vcnt, action, actarg);
}

void build_sql_stmt(char *result_buf, char *cmd, char *targetlist,
		    char *argv[], char *qual)
{
  char fmt_buf[MR_STMTBUF_LEN];
  char *res, *fmt;

  if (qual)
    sprintf(fmt_buf, "%s %s WHERE %s", cmd, targetlist, qual);
  else
    sprintf(fmt_buf, "%s %s", cmd, targetlist);

  for (res = result_buf, fmt = fmt_buf; *fmt; fmt++)
    {
      if (*fmt == '%')
	{
	  if (*++fmt)
	    {
	      switch (*fmt)
		{
		case '%':			/* %% -> % */
		  *res++ = *fmt;
		  break;
		case 's':
		  if (*argv[0])
		    {
		      char *p = *argv;
		      while (*p)
			{
			  if (*p == '\'')
			    *res++ = '\'';	/* double the ' */
			  *res++ = *p++;
			}
		    }
		  argv++;
		  break;
		case 'd':
		  res += sprintf(res, "%d", *(int *)*argv++);
		  break;
		default:			/* Swallow other %? pairs */
		  break;
		}
	    }
	  else
	    break;
	}
      else
	*res++ = *fmt;				/* text -> result buffer */
    }
  *res = '\0';
}

int do_update(struct query *q, char *argv[], char *qual,
	      int (*action)(int, char *[], void *), void *actarg)
{
  build_sql_stmt(stmt_buf, "UPDATE", q->tlist, argv, qual);
  EXEC SQL EXECUTE IMMEDIATE :stmt_buf;
  if (mr_errcode)
    return mr_errcode;
  return MR_SUCCESS;
}

int do_append(struct query *q, char *argv[], char *pqual,
	      int (*action)(int, char *[], void *), void *actarg)
{
  build_sql_stmt(stmt_buf, "INSERT", q->tlist, argv, pqual);
  EXEC SQL EXECUTE IMMEDIATE :stmt_buf;
  if (mr_errcode)
    return mr_errcode;
  return MR_SUCCESS;
}

int do_delete(struct query *q, char *qual,
	      int (*action)(int, char *[], void *), void *actarg)
{
  sprintf(stmt_buf, "DELETE FROM %s WHERE %s", table_name[q->rtable], qual);
  EXEC SQL EXECUTE IMMEDIATE :stmt_buf;
  if (mr_errcode)
    return mr_errcode;
  return MR_SUCCESS;
}


/**
 ** set_next_object_id - set next object id in values table
 **
 ** Inputs: object - object name in values table and in objects
 **	    table - name of table objects are found in
 **	    limit - should the ID be range limited
 **
 ** - called before an MR_Q_APPEND operation to set the next object id to
 **   be used for the new record to the next free value
 **
 **/

int set_next_object_id(char *object, enum tables table, int limit)
{
  EXEC SQL BEGIN DECLARE SECTION;
  int value;
  char *obj = object;
  EXEC SQL END DECLARE SECTION;
  int starting_value;

  EXEC SQL SELECT value INTO :value FROM numvalues WHERE name = :obj;
  if (sqlca.sqlerrd[2] != 1)
    return MR_NO_ID;

  starting_value = value;
  while (1)
    {
#ifdef ULTRIX_ID_HOLE
      if (limit && value > 31999 && value < 32768)
	value = 32768;
#endif
      if (limit && value > MAX_ID_VALUE)
	value = MIN_ID_VALUE;

      sprintf(stmt_buf, "SELECT %s FROM %s WHERE %s = %d",
	      object, table_name[table], object, value);
      dosql(sqlbuffer);
      if (sqlca.sqlcode < 0)
	return mr_errcode;
      if (sqlca.sqlcode == SQL_NO_MATCH)
	break;

      value++;
      if (limit && value == starting_value)
	{
	  com_err(whoami, 0, "All id values have been used");
	  return MR_NO_ID;
	}
    }

  com_err(whoami, 0, "setting ID %s to %d", object, value);
  EXEC SQL UPDATE numvalues SET value = :value WHERE name = :obj;
  return MR_SUCCESS;
}


/* Turn a kerberos name into the user's ID of the account that principal
 * owns.  Sets the kerberos ID and user ID.
 */

int set_krb_mapping(char *name, char *login, int ok, int *kid, int *uid)
{
  EXEC SQL BEGIN DECLARE SECTION;
  int u_id, k_id;
  char *krbname;
  EXEC SQL END DECLARE SECTION;

  krbname = name;
  *kid = 0;
  *uid = 0;

  EXEC SQL SELECT km.users_id, km.string_id INTO :u_id, :k_id
    FROM krbmap km, strings str
    WHERE km.string_id = str.string_id AND str.string = :krbname;
  EXEC SQL COMMIT WORK;

  if (dbms_errno)
    return mr_errcode;

  if (sqlca.sqlerrd[2] == 1)
    {
      *kid = -k_id;
      *uid = u_id;
      return MR_SUCCESS;
    }

  if (name_to_id(name, STRINGS_TABLE, &k_id) == MR_SUCCESS)
    *kid = -k_id;

  if (!ok)
    {
      *uid = *kid;
      return MR_SUCCESS;
    }

  if (name_to_id(login, USERS_TABLE, uid) != MR_SUCCESS)
    *uid = 0;

  if (*kid == 0)
    *kid = *uid;
  if (dbms_errno)
    return mr_errcode;
  return MR_SUCCESS;
}


void sanity_check_queries(void)
{
  int i;
  int maxv = 0, maxa = 0;

#define MAX(x, y) ((x) > (y) ? (x) : (y))

  for (i = 0; i < QueryCount; i++)
    {
      maxv = MAX(maxv, Queries[i].vcnt);
      maxa = MAX(maxa, Queries[i].argc);
      max_version = MAX(max_version, Queries[i].version);
    }
  if (MAX(maxv, maxa) > QMAXARGS)
    {
      com_err(whoami, 0, "A query has more args than QMAXARGS");
      exit(1);
    }
}


/* Generically do a SELECT, storing the results in the provided buffers */

void dosql(char *buffers[])
{
  int i, errcode = 0, errlen;

  EXEC SQL PREPARE inc_stmt FROM :stmt_buf;
  if (sqlca.sqlcode)
    return;
  EXEC SQL DECLARE inc_crs CURSOR FOR inc_stmt;
  EXEC SQL OPEN inc_crs;
  mr_sqlda->N = QMAXARGS;
  EXEC SQL DESCRIBE SELECT LIST FOR inc_stmt INTO mr_sqlda;
  mr_sqlda->N = mr_sqlda->F;
  for (i = 0; i < mr_sqlda->N; i++)
    {
      mr_sqlda->V[i] = buffers[i];
      mr_sqlda->T[i] = 97;
      mr_sqlda->L[i] = MAX_FIELD_WIDTH;
    }
  EXEC SQL FETCH inc_crs USING DESCRIPTOR mr_sqlda;

  /* if we got an error from the FETCH, we have to preserve it or the
     close will reset it and the caller will think nothing happened */
  if (sqlca.sqlcode)
    {
      errcode = sqlca.sqlcode;
      errlen = sqlca.sqlerrm.sqlerrml;
    }

  EXEC SQL CLOSE inc_crs;
  if (errcode)
    {
      sqlca.sqlcode = errcode;
      sqlca.sqlerrm.sqlerrml = errlen;
    }
}

int do_for_all_rows(char *query, int count,
		    int (*action)(int, char *[], void *), void *actarg)
{
  int i, rowcount = 0;
  EXEC SQL BEGIN DECLARE SECTION;
  char *q = query;
  EXEC SQL END DECLARE SECTION;

  EXEC SQL PREPARE stmt FROM :q;
  if (sqlca.sqlcode)
    return MR_INTERNAL;
  EXEC SQL DECLARE curs CURSOR FOR stmt;
  EXEC SQL OPEN curs;
  mr_sqlda->N = count;
  EXEC SQL DESCRIBE SELECT LIST FOR stmt INTO mr_sqlda;
  mr_sqlda->N = mr_sqlda->F;
  for (i = 0; i < mr_sqlda->N; i++)
    {
      mr_sqlda->V[i] = sqlbuffer[i];
      mr_sqlda->T[i] = 97;
      mr_sqlda->L[i] = MAX_FIELD_WIDTH;
    }

  while (rowcount < max_row_count)
    {
      EXEC SQL FETCH curs USING DESCRIPTOR mr_sqlda;
      if (sqlca.sqlcode)
	break;
      (*action)(count, sqlbuffer, actarg);
      rowcount++;
    }
  EXEC SQL CLOSE curs;

  if (mr_errcode)
    return mr_errcode;
  if (rowcount == max_row_count)
    {
      critical_alert(whoami, "moirad", "attempted query with too many rows");
      return MR_NO_MEM;
    }
  else if (rowcount == 0)
    return MR_NO_MATCH;
  else
    return MR_SUCCESS;
}

/* Do a name to ID translation.  Moved from cache.pc because we did away
 * with the cache, but what this function does is still useful to us.
 */

int name_to_id(char *name, enum tables type, int *id)
{
  EXEC SQL BEGIN DECLARE SECTION;
  char *iname;
  int j;
  EXEC SQL END DECLARE SECTION;

  iname = name;

  switch (type)
    {
    case USERS_TABLE:
      if (strchr(iname, '@') || (strlen(iname) > 8))
	{
	  sqlca.sqlcode = SQL_NO_MATCH;
	  break;
	}
      EXEC SQL SELECT users_id INTO :j FROM users WHERE login = :iname;
      break;
    case LIST_TABLE:
      EXEC SQL SELECT list_id INTO :j FROM list WHERE name = :iname;
      break;
    case MACHINE_TABLE:
      EXEC SQL SELECT mach_id INTO :j FROM machine WHERE name = UPPER(:iname);
      break;
    case SUBNET_TABLE:
      EXEC SQL SELECT snet_id INTO :j FROM subnet WHERE name = UPPER(:iname);
      break;
    case CLUSTERS_TABLE:
      EXEC SQL SELECT clu_id INTO :j FROM clusters WHERE name = :iname;
      break;
    case FILESYS_TABLE:
      EXEC SQL SELECT filsys_id INTO :j FROM filesys WHERE label = :iname;
      break;
    case STRINGS_TABLE:
      if (!iname[0])	/* special-case empty string */
	{
	  *id = 0;
	  return MR_SUCCESS;
	}
      EXEC SQL SELECT string_id INTO :j FROM strings WHERE string = :iname;
      break;
    case CONTAINERS_TABLE:
      EXEC SQL SELECT cnt_id INTO :j FROM containers WHERE LOWER(name) = 
	LOWER(:iname);
      break;
    default:
      return MR_INTERNAL;
    }
  if (sqlca.sqlcode == SQL_NO_MATCH)
    return MR_NO_MATCH;
  if (sqlca.sqlerrd[2] > 1)
    return MR_NOT_UNIQUE;
  if (sqlca.sqlcode)
    return MR_DBMS_ERR;
  *id = j;

  return MR_SUCCESS;
}

/* Perform an ID to name mapping.  name should be a pointer to a pointer to
 * malloc'ed data.  The buffer it refers to will be freed, and a new buffer
 * allocated with the answer.
 * 
 * This used to be in cache.pc, but we've removed the cache, and this function 
 * is still useful to us.
 */

int id_to_name(int id, enum tables type, char **name)
{
  EXEC SQL BEGIN DECLARE SECTION;
  char iname[MAX_FIELD_WIDTH];
  int j;
  EXEC SQL END DECLARE SECTION;

  j = id;

  switch (type)
    {
    case USERS_TABLE:
      EXEC SQL SELECT login INTO :iname FROM users WHERE users_id = :j;
      break;
    case LIST_TABLE:
      EXEC SQL SELECT name INTO :iname FROM list WHERE list_id = :j;
      break;
    case MACHINE_TABLE:
      EXEC SQL SELECT name INTO :iname FROM machine WHERE mach_id = :j;
      break;
    case SUBNET_TABLE:
      EXEC SQL SELECT name INTO :iname FROM subnet WHERE snet_id = :j;
      break;
    case CLUSTERS_TABLE:
      EXEC SQL SELECT name INTO :iname FROM clusters WHERE clu_id = :j;
      break;
    case FILESYS_TABLE:
      EXEC SQL SELECT label INTO :iname FROM filesys WHERE filsys_id = :j;
      break;
    case STRINGS_TABLE:
      EXEC SQL SELECT string INTO :iname FROM strings WHERE string_id = :j;
      break;
    case CONTAINERS_TABLE:
      EXEC SQL SELECT name INTO :iname FROM containers WHERE cnt_id = :j;
      break;
    default:
      return MR_INTERNAL;
    }
  if (sqlca.sqlcode == SQL_NO_MATCH)
    {
      free(*name);
      sprintf(iname, "#%d", j);
      *name = xstrdup(iname);
      return MR_NO_MATCH;
    }
  if (sqlca.sqlerrd[2] > 1)
    return MR_INTERNAL;
  if (sqlca.sqlcode)
    return MR_DBMS_ERR;
  free(*name);
  *name = xstrdup(strtrim(iname));

  return MR_SUCCESS;
}

/* eof:qrtn.dc */
