/*
 * sfeed.c Copyright 1999 Christopher M Sedore.  All Rights Reserved.
 * Please see the "COPYING" file for license information. 
 *
 * This file implements outgoing feeds.  This feeder is quite
 * simple-minded in that it is lock step--it doesn't support
 * out of order responses.  It should probably rewritten to
 * be smarter.  All the lower-level io support is there, its just
 * simple matter of application logic :)
 * 
 * Each destination for a feed is a "site".  Sites have "connections" 
 * that exchange nntp sequences with the site.
 */

#include "main.h"

int artsFed=0;

TAILQ_HEAD(feedtqh,ofeed) headArtWait;
	
	TAILQ_HEAD(livesitetqg,osite) activeSites;
	
	TAILQ_HEAD(connwhead,context) waitNewArticles;
	
struct osite {
	
	char hostname[255];
	int port;
	
	int num;
	
	int mark;
	int flags;
	
	int accept,refuse,reject,errors,flaggedfor,missing;
	int errc;
	
	int numConnections,connected;

	int maxQueue;
	
	void *dist;
	char *diststr;
	
	TAILQ_HEAD(chead,context) conns;
	TAILQ_ENTRY(osite) list;
};

int FeedMain(struct context *);
int FeedConnError(struct context *);

/* 
 * Wakeup sites waiting for new articles.  This is called from the
 * callback that have just finished putting the article on disk.
 */

void
SiteConnWakeup()
{
	struct context *cc,*cn;
	
	for (cc=TAILQ_FIRST(&waitNewArticles);cc;cc=cn) {
		cn=TAILQ_NEXT(cc,list);
		assert(cc->callback!=NULL);
//		printf("wakeup site\n");
		TAILQ_REMOVE(&waitNewArticles,cc,list);
		if (cc->flags & NNTP_STREAMING) {
			if (StreamFeed(cc)) {
				NextIo(cc);
				return;
			}
		} else {
			if (IhaveFeed(cc)) {
				NextIo(cc);
				return;
			}
		}
	}
}

/*
 * SiteFind()
 *
 * Find a site by name.
 */

struct osite *
SiteFind(char *name)
{
	struct osite *s;
	
	for (s=TAILQ_FIRST(&activeSites);s;s=TAILQ_NEXT(s,list)) {
		if (!strcasecmp(name,s->hostname)) 
			return s;
	}
	
	return NULL;
}

/* 
 * SiteMarkAll()
 *
 * This sets a flag on each site.  The flag was intended to be used to
 * determine which sites had been "recreated" by a re-read of the config
 * files.  Any site still marked is one that wasn't in the edited files
 * and needs to be deleted.
 */

struct osite *
SiteMarkAll()
{
	struct osite *s;
	
	for (s=TAILQ_FIRST(&activeSites);s;s=TAILQ_NEXT(s,list)) {
		s->mark=1;
	}
	
	return NULL;
}

struct osite *
SiteDeleteMarked()
{
	struct osite *s,*n;
	
	for (s=TAILQ_FIRST(&activeSites);s;s=n) {
		n=TAILQ_NEXT(s,list);
		if (s->mark) 
			SiteDelete(s->hostname);
	}
	
	return NULL;
}

/* 
 * SiteCreate()
 *
 * Create a new outgoing site.
 */

int 
SiteCreate(int num,char *hostname)
{
	struct osite *s;
	int destinationport;
	
	if (SiteFind(hostname)) {
		return -1;
	}
	
	s=malloc(sizeof(struct osite));
	bzero(s,sizeof(struct osite));
	
	strcpy(s->hostname,hostname);
	
	s->dist=NULL;
	s->diststr=NULL;
	s->num=num;
	GetConfigInt("DestinationPort",&destinationport);
	if(destinationport==0 || destinationport<1)
	{
		printf("No DestinationPort defined - defaulting to 119");
		destinationport=119;
	}
	s->port=destinationport;
	
	if (GetConfigInt("DefaultMaxQueue",&s->maxQueue)==NULL) {
		s->maxQueue=2000000;
	} 

	if (s->maxQueue<5) {
		s->maxQueue=5;
	}

	DnsAddName(hostname);
	
	TAILQ_INIT(&s->conns);
	
	TAILQ_INSERT_TAIL(&activeSites,s,list);
	
	return 0;
}

int
SiteDelete(char *name)
{
	struct osite *s;
	
	s=SiteFind(name);
	
	if (!s)
		return -1;
	
	s->flags|=NNTP_FEED_CLOSE;
	if (s->numConnections) {
		SiteSet(name,"connections","0");
	} else {
		TAILQ_REMOVE(&activeSites,s,list);
		free(s);
	}
	
	return 0;
}

/*
 * SiteSet()
 * 
 * Set a parameter for site.
 */

int
SiteSet(char *name,char *item,char *value)
{
	
	struct osite *s;
	struct context *c,*cn;
	
	s=SiteFind(name);
	
	if (s==NULL)
		return -1;
	
	if (!strcasecmp(item,"dist")) {
		char *t;
		
		if (s->diststr) {
			if (!strcmp(s->diststr,value)) {
				return 0;
			}
			
			free(s->diststr);
		}
		
		s->diststr=strdup(value);
		
		s->dist=CompileMatchString(value);
		
		return 0;
	}
	
	if (!strcasecmp(item,"port")) {
		s->port=atoi(value);
		return 0;
	}
	
	if (!strcasecmp(item,"connections")) {
		int nc=atoi(value);
		if ((nc<0) || (nc>20))
			return -1;
		if (nc<s->numConnections) {
			while ((c=TAILQ_FIRST(&s->conns)) && 
				(nc-s->numConnections)) {
				TAILQ_REMOVE(&s->conns,c,clist);
				c->flags|=NNTP_FEED_CLOSE;
				nc--;
			}
			s->numConnections=nc;
		} else {
			while (nc>s->numConnections) {
				StartConn(s);
				s->numConnections++;
			}
		}
		return 0;
	}
	
	if (!strcasecmp(item,"maxqueue")) {
		unsigned int mq=atoi(value);
	

		if (mq<5) mq=5;

		s->maxQueue=mq;

		return 0;
	}
	
	return -1;
}

/*
 * FlagForSites()
 *
 * Given an article, check the distribution for each site set a bit flag
 * for it if it matches (no distribution set == send everything).
 */

int
FlagForSites(struct article *art)
{
	struct osite *s;
	int dist;
	
	for (s=TAILQ_FIRST(&activeSites);s;s=TAILQ_NEXT(s,list)) {
		dist=1;
		
		if (s->dist) {
			dist=CheckMatch(art,s->dist);
/*			if
((dist) && ((MatchHeaderSpec("Newsgroups:","*binaries*",art))
|| (art->len>10000)))
{
				abort();
			} */
		}
		if (dist) {
			FeedFlagSite(s->num,art->distflags);
			s->flaggedfor++;
		} 
	}
}

/*
 * StartConn()
 * 
 * Create a new connection to a site.
 */

int
StartConn(struct osite *s)
{
	int c;
	struct context *oc;
	int addr;
	
	addr=DnsGetAddr(s->hostname);
	
	if (addr==0) {
		ScheduleCallback(30,0,StartConn,s);		
		return;
	}
	
//	printf("addr=%s\n",inet_ntoa(addr));
	
	c=AsyncConnect(addr,s->port);
	
	
	if (c<0) {
		ScheduleCallback(30,0,StartConn,s);		
		return;
	}
	
//	printf("connecting to %s on %i\n",s->hostname,c);
	
	oc=malloc(sizeof(struct context));
	
	bzero(oc,sizeof(struct context));
	
	oc->site=s;
	
	oc->bufsz=8192;
	oc->obufsz=8192;
	
	oc->cb.cb.aio_fildes=c;
 	oc->fd=c;
        {
          char fname[64];
          sprintf(fname,"/tmp/%u.%u",getpid(),c);
//          oc->cb.logfd=open(fname,O_CREAT|O_TRUNC|O_RDWR); 
        }
	
	oc->callback=FeedMain;
	oc->flags|=NNTP_FEED_START;
	oc->bp=oc->buf;
	
	oc->errorCallback=(int (*)(void *))FeedConnError;
	
	oc->state="connecting";
	
	TAILQ_INSERT_TAIL(&s->conns,oc,clist);
	
	NextIo(oc);
}

/*
 * StreamFeed()
 *
 * Send a bunch of "check <msgid>" commands.
 */

int
StreamFeed(struct context *oc)
{
	int x,y;
	char *s,*b;
	
	x=FeedDBGetArticles(oc->site->num,32,&oc->pae,oc->site->maxQueue);
	
	if (x<0) {
		return x; // XXX XXX XXX
	}
	
	if (!x) {
		oc->state="artwait";
		TAILQ_INSERT_HEAD(&waitNewArticles,oc,list);
		return x;
	}
	
	oc->state="check";
	
	y=0;
	
	s=&oc->obuf[oc->obuflen];
	while ((y<x) && (s-oc->obuf<oc->obufsz-(MAX_ARTICLEID+8))) {
		*s='c'; s++;
		*s='h'; s++;
		*s='e'; s++;
		*s='c'; s++;
		*s='k'; s++;
		*s=' '; s++;
		b=oc->pae[y]->mid;
		while (*b) {
			*s=*b; s++; b++;
		}
		*s='\r'; s++;
		*s='\n'; s++;
		UpdatePrecommit(HashArticleID(oc->pae[y]->mid,0),oc->pae[y]);
		y++;
		oc->outstanding++;
	}
	
	if ((y) && (y<x)) FeedSetOffset(oc->pae[y-1]);
	
	assert(s-oc->obuf<=oc->obufsz);	
	
	oc->obuflen=s-oc->obuf;
	oc->lastbatch=y;
	
	return x;
}

/*
 * IhaveFeed()
 *
 * Send an "ihave <msgid>"
 */

int
IhaveFeed(struct context *oc)
{
	int x,y;
	char *s,*b;
	struct artent ae;
	
	assert(oc->obuflen==0);
	x=FeedDBGetArticles(oc->site->num,1,&oc->pae,oc->site->maxQueue);
	
	if (x<0) {
		return x; // XXX XXX XXX
	}
	
	if (!x) {
		oc->state="artwait";
		TAILQ_INSERT_HEAD(&waitNewArticles,oc,list);
		return x;
	}
	
	//	UpdatePrecommit(HashArticleID(oc->pae[0]->mid,0),oc->pae[0]);
	
	oc->state="ihave";
	
	y=0;
	
	s=&oc->obuf[oc->obuflen];
	while ((y<x) && (s-oc->obuf<MAX_ARTICLEID+8)) {
		*s='i'; s++;
		*s='h'; s++;
		*s='a'; s++;
		*s='v'; s++;
		*s='e'; s++;
		*s=' '; s++;
		b=oc->pae[y]->mid;
		while (*b) {
			*s=*b; s++; b++;
		}
		*s='\r'; s++;
		*s='\n'; s++;
		y++;
		oc->outstanding++;
	}
	
	oc->obuflen=s-oc->obuf;
	
	return x;
}

/*
 * TakethisTransmit()
 * 
 * We got a "238 <msgid>"--try to send the associated article.
 */

int
TakethisTransmit(struct context *oc)
{
	int x,y;
	struct artent ae;
	
	
	if (GetArticleID(oc)) {
		return -1;
	}
	
	y=0;
	if (!GetPrecommitArtEnt(oc->mid64,&ae)) {
		oc->site->missing++;
		return -1;
	}
	
//	assert(FeedCheckFlags(oc->site->num,ae.siteflags));
	
	sprintf(&oc->obuf[oc->obuflen],"takethis %s\r\n",oc->mid);
	x=oc->obuflen;
	oc->obuflen+=strlen(&oc->obuf[oc->obuflen]);
	
	if (CopyArt(oc,&ae)) {
		oc->site->missing++;
		oc->obuflen=x;
		return -1;
	}
	
	artsFed++;
	
	oc->state="takethis";
	
	oc->outstanding++;
	return 0;
}

/*
 * IhaveTransmit()
 *
 * We got a "335"--send the article we recently IHAVE'd
 */

void
IhaveTransmit(struct context *oc)
{
	
	assert(oc->obuflen==0);
	
	oc->outstanding++;
	if (CopyArt(oc,oc->pae[0])) {
		oc->site->missing++;
		sprintf(&oc->obuf[oc->obuflen],".\r\n");
		oc->obuflen+=strlen(&oc->obuf[oc->obuflen]);
		NextIo(oc);
		return;
	}
	artsFed++;
	oc->state="ihavesnd";
}

/*
 * FeedConnError() 
 *
 * Callback for when an "error" occurs.  This might be an actual error 
 * (reset connection, etc), or a connection gracefully closed by the
 * remote host. 
 */

int
FeedConnError(struct context *oc) 
{
	TAILQ_REMOVE(&oc->site->conns,oc,clist);
	
	if (!(oc->flags & NNTP_FEED_START)) 
		oc->site->connected--;
	
	if (oc->site->flags & NNTP_FEED_CLOSE) {
		TAILQ_REMOVE(&oc->site->conns,oc,clist);
		if (TAILQ_FIRST(&oc->site->conns)==NULL) {
			free(oc->site);
		}
		return;
	}
	
	
	if (!(oc->flags & NNTP_FEED_CLOSE)) {
//		printf("reconnecting\n");		
		ScheduleCallback(10,0,StartConn,oc->site);
	} 
	
	TAILQ_REMOVE(&oc->site->conns,oc,clist);
}

/*
 * FeedMain()
 *
 * The main upcall.  This receives responses from the remote site, 
 * executes commands, and does basic checking to see if we need to
 * send another command.
 */

int
FeedMain(struct context *oc)
{
	char *s;
	int ret=0;
	
	
	if (oc->flags & NNTP_FEED_START) {
		
		/* our read may come back empty because we were awoken
		 * when the connection completed, but before any data
		 * could be sent
		 */
		
		if (oc->buflen==0) {
			if (!oc->cmdcount) {
				oc->connerror=0;
				oc->cmdcount++;
				return;
			}
			FLAG_FOR_CONNECTION_CLOSE(oc);
			return 0;
		}
		
		ret=atoi(oc->buf);
		
		if ((ret>199) && (ret<210)) {
			int val=0;
			
			if (ioctl(oc->fd,FIONBIO,&val)) {
				perror("ioctl");
			}
			
			oc->state="mode";
			strcpy(oc->obuf,"mode stream\r\n");
			oc->obuflen+=strlen(oc->obuf);
			oc->flags&=~NNTP_FEED_START;
			oc->buflen=0;
			oc->site->connected++;
		} else {
			printf("oc->buf=%s,oc->bp=%s,oc->buflen=%i\n",oc->buf,oc->bp,oc->buflen);
			FLAG_FOR_CONNECTION_CLOSE(oc);
		}
		return 0;
	}
	
	oc->state="loop";
	
	if (oc->buflen) {
		switch (atoi(oc->bp)) {
			case 	431 :
			case	435 :
			case	438 : {
					oc->site->refuse++;
					oc->outstanding--;
				} break;	
			case	238 : {
					if (!TakethisTransmit(oc)) {
						ret=1;
					}
					oc->outstanding--;
				} break;
			case	235 :
			case	239 : {
					oc->site->accept++;
					oc->outstanding--;
				} break;				
			case	437:
			case	439 : {
					oc->site->reject++;
					oc->outstanding--;
				} break;	
			case	335 : {
					IhaveTransmit(oc);
					oc->outstanding--;
					ret=1;
				} break;
			case	203 : {
					oc->flags|=NNTP_STREAMING;
				} break;
			case 	205:  {
					FLAG_FOR_CONNECTION_CLOSE_AOP(oc);
				} break;
				
			case	200 :
			case	201 : break;
			default: {
					oc->site->errors++;
					oc->connerror++;
					if ((!(oc->flags & NNTP_STREAMING)) && 
						(oc->connerror>1)) {
						FLAG_FOR_CONNECTION_CLOSE(oc);
						return 0;
					}
					if (oc->outstanding)
						oc->outstanding--;
					if (oc->connerror>5) {
						FLAG_FOR_CONNECTION_CLOSE(oc);
						return 0;
					}
					printf("err: %s\n",oc->bp);
				} break;
		}
		
		
//printf("len=%i,code=%i,outs=%u\n",oc->buflen,atoi(oc->bp),oc->outstanding);
//printf("==>%s",oc->bp);
		
		if (ret) {
			return ret;
		}
	}
	
//	printf("outstanding=%i\n",oc->outstanding);
	
	if (!oc->outstanding) {
		if (oc->flags & NNTP_STREAMING) {
			if (!StreamFeed(oc)) {
				return 1;
			} 
		} else {
			if (!IhaveFeed(oc)) {
				return 1;
			}
		}
	}
	
	return 0;
}

void
NntpFeedOut(struct context *cc)
{
	char *s,*p,*v;
	
	
	s=strtok(cc->bp+7,"\t ");
	
	if (!*s) {
		goto error;
	}
	
	if (!strcasecmp(s,"create")) {
		s=strtok(NULL,"\t ");
		p=strtok(NULL,"\t ");
		if ((!p) || (!s) || (atoi(s)==0))
			goto error;
		if (SiteCreate(atoi(s),p)==-1) {
			strcpy(&cc->obuf[cc->obuflen],"500 failed\r\n");
			cc->obuflen+=strlen(&cc->obuf[cc->obuflen]);
		} else {
			sprintf(&cc->obuf[cc->obuflen],"%s created as %i\r\n", 
				p,atoi(s));
			cc->obuflen+=strlen(&cc->obuf[cc->obuflen]);
		}
		return;
	}
	
	if (!strcasecmp(s,"set")) {
		s=strtok(NULL,"\t ");
		p=strtok(NULL,"\t ");
		v=strtok(NULL,"\n");
		if (SiteSet(s,p,v)==-1) {
			strcpy(&cc->obuf[cc->obuflen],"500 failed\r\n");
			cc->obuflen+=strlen(&cc->obuf[cc->obuflen]);
		} else {
			sprintf(&cc->obuf[cc->obuflen],"%s set to %s for %s\r\n", 
				p,v,s);
			cc->obuflen+=strlen(&cc->obuf[cc->obuflen]);
		}
		
		return;
	}
	
	error:
	
	strcpy(&cc->obuf[cc->obuflen],"500 feedout [create num name]|[set name param value]\r\n");
	cc->obuflen+=strlen(&cc->obuf[cc->obuflen]);
	return;
}

void
PrintStats()
{
	struct osite *s;
	struct context *c;
	
	for (s=TAILQ_FIRST(&activeSites);s;s=TAILQ_NEXT(s,list)) {
		printf("%s - accepted %u refused %u rejected %u errors %u flagged %u\n\t\tmissing %u connected %u\n",
			s->hostname,s->accept,s->refuse,s->reject,s->errors,s->flaggedfor,
			s->missing,s->connected);
		for (c=TAILQ_FIRST(&s->conns);c;c=TAILQ_NEXT(c,clist)) {
			printf("\tconn state=%s (%s) (%i)\n"
				,c->state,c->cb.state,c->outstanding);
		}
	}
}


void
FeedInit()
{
	TAILQ_INIT(&waitNewArticles);
	TAILQ_INIT(&activeSites);
	ScheduleCallback(30,9999999,PrintStats,0);
}
