Logo Search packages:      
Sourcecode: jabber-jit version File versions  Download package

base_accept.c

/* --------------------------------------------------------------------------
 *
 * License
 *
 * The contents of this file are subject to the Jabber Open Source License
 * Version 1.0 (the "License").  You may not copy or use this file, in either
 * source code or executable form, except in compliance with the License.  You
 * may obtain a copy of the License at http://www.jabber.com/license/ or at
 * http://www.opensource.org/.  
 *
 * Software distributed under the License is distributed on an "AS IS" basis,
 * WITHOUT WARRANTY OF ANY KIND, either express or implied.  See the License
 * for the specific language governing rights and limitations under the
 * License.
 *
 * Copyrights
 * 
 * Portions created by or assigned to Jabber.com, Inc. are 
 * Copyright (c) 1999-2000 Jabber.com, Inc.  All Rights Reserved.  Contact
 * information for Jabber.com, Inc. is available at http://www.jabber.com/.
 *
 * Portions Copyright (c) 1998-1999 Jeremie Miller.
 * 
 * Acknowledgements
 * 
 * Special thanks to the Jabber Open Source Contributors for their
 * suggestions and support of Jabber.
 * 
 * --------------------------------------------------------------------------*/
 
#include "jabberd.h"

extern pool jabberd__runtime;

#define A_ERROR  -1
#define A_READY   1
#define A_CLOSING 2

typedef struct queue_struct
{
  int stamp;
  xmlnode x;
  struct queue_struct *next;
} *queue, _queue;

typedef struct accept_instance_st
{
    mio m;
    int state;
    char *id;
    pool p;
    instance i;
    char *ip;
    char *secret;
    int port;
    int timeout;
    int log;
    queue q_start,q_end;
    dpacket dplast;
    pthread_mutex_t sem;
} *accept_instance, _accept_instance;


/* main thread ! */
void base_accept_kill(void *arg)
{
  accept_instance ai = (accept_instance)arg;
  log_debug(ZONE,"base accept kill");

  if (ai->state == A_READY){
    xmlnode h;
    log_debug(ZONE,"send end");

    /* send request to end transmission to wpj and other transports */
    h = xmlnode_new_tag("endconnection");
    if (ai->m != NULL) {
      mio_write(ai->m, h, NULL, -1);
      ai->state = A_CLOSING;
    }
    else
      xmlnode_free(h);
  }
}

void base_accept_queue(accept_instance ai, xmlnode x)
{
    queue q;
    if(ai == NULL || x == NULL) return;

    pthread_mutex_lock(&(ai->sem));
    q = pmalloco(xmlnode_pool(x),sizeof(_queue));
    q->stamp = time(NULL);
    q->x = x;
    q->next = NULL;

    /* is there something */
    if (ai->q_end)
      ai->q_end->next = q;
    else
      ai->q_start = q;

    ai->q_end = q;
    pthread_mutex_unlock(&(ai->sem));
}

/* Write packets to a xmlio object */
result base_accept_deliver(instance i, dpacket p, void* arg)
{
    accept_instance ai = (accept_instance)arg;

    /* Insert the message into the write_queue if we don't have a MIO socket yet.. */
    if (ai->state == A_READY || ai->state == A_CLOSING )
    {
        if(ai->dplast == p) /* don't return packets that they sent us! circular reference! */
            deliver_fail(p,"Circular Refernce Detected");
        else
            mio_write(ai->m, p->x, NULL, 0);
        return r_DONE;
    }

    base_accept_queue(ai, p->x);
    return r_DONE;
}


/* Handle incoming packets from the xstream associated with an MIO object */
void base_accept_process_xml(mio m, int state, void* arg, xmlnode x)
{
    accept_instance ai = (accept_instance)arg;
    xmlnode cur;
    queue q, q2;
    char hashbuf[41];
    int socket_size = 32768;
    int s = sizeof(int);

    log_debug(ZONE, "process XML: m:%X state:%d, arg:%X, x:%X", m, state, arg, x);

    switch(state)
    {
        case MIO_NEW:
        setsockopt(m->fd,SOL_SOCKET,SO_SNDBUF,(char *)&socket_size,s);
        setsockopt(m->fd,SOL_SOCKET,SO_RCVBUF,(char *)&socket_size,s);
        socket_size = 1;
        setsockopt(m->fd,SOL_SOCKET,SO_KEEPALIVE,(char *)&socket_size,s);
        return;

        case MIO_XML_ROOT:
            /* Ensure request namespace is correct... */
            if (j_strcmp(xmlnode_get_attrib(x, "xmlns"), "jabber:component:accept") != 0)
            {
                /* Log that the connected component sent an invalid namespace */
                log_warn(ai->i->id, "Recv'd invalid namespace. Closing connection.");
                /* Notify component with stream:error */
                mio_write(m, NULL, SERROR_NAMESPACE, -1);
                /* Close the socket and cleanup */
                mio_close(m);
                break;
            }

            /* Send header w/ proper namespace, using instance i */
            cur = xstream_header("jabber:component:accept", NULL, ai->i->id);
          
            /* Save stream ID for auth'ing later */
            ai->id = pstrdup(ai->p, xmlnode_get_attrib(cur, "id"));

          /* override if other connections */
          ai->state = A_ERROR;

            mio_write(m, NULL, xstream_header_char(cur), -1);
            xmlnode_free(cur);
            break;

        case MIO_XML_NODE:
            /* If aio has been authenticated previously, go ahead and deliver the packet */
          /* deliver packets even if closing */
              if(ai->state == A_READY || ai->state == A_CLOSING)   {
                  /* Hide 1.0 style transports etherx:* attribs */            
                  xmlnode_hide_attrib(x, "etherx:to");
                  xmlnode_hide_attrib(x, "etherx:from");
                  ai->dplast = dpacket_new(x);
                  
                  /* FOR LOG */
                  if ((ai->log >= 0) && (ai->dplast)){
                    cur = NULL;
                    if(ai->dplast->type == p_ROUTE) {
                        cur  = xmlnode_get_firstchild(ai->dplast->x);
                    }
                    else 
                        if(ai->dplast->type == p_NORM) {
                          cur = ai->dplast->x;
                        }
                    
                    if (cur){
                        if(strncmp(xmlnode_get_name(cur),"message",7) == 0) {
                          xmlnode curx;
                          __wplog->tab[ai->log]->messages++;
                          
                          if ((curx = xmlnode_get_tag(cur,"x"))
                                &&(j_strncmp(xmlnode_get_attrib(curx,"type"),"req",3)== 0))
                              __wplog->tab[ai->log]->files++;
                        }
                        else if(strncmp(xmlnode_get_name(cur),"presence",8) == 0){
                          __wplog->tab[ai->log]->presences++;
                        }
                        else if(strncmp(xmlnode_get_name(cur),"iq",2) == 0){
                          __wplog->tab[ai->log]->queries++;
                        }
                        else if(strncmp(xmlnode_get_name(cur),"iq",2) == 0){
                          __wplog->tab[ai->log]->queries++;
                        }
                    }
                  }
                  /* END LOG */
                  
                  /* xdb packet sends through socket goes to mtq */
                  if ((ai->dplast) && (ai->dplast->type == p_XDB)) {
                    /* XXX mtq_send */
                    deliver(ai->dplast, ai->i);
                    ai->dplast = NULL;
                    return;
                  }
                  else {
                    deliver(ai->dplast, ai->i);
                    ai->dplast = NULL;
                    return;
                  }
              }
              /* only other packets are handshakes */
              if(j_strcmp(xmlnode_get_name(x), "handshake") != 0) {
                  mio_write(m, NULL, "<stream:error>Must send handshake first.</stream:error>", -1);
                  mio_close(m);
                  break;
              }
              
              /* Create and check a SHA hash of this instance's password & SID */
              shahash_r(spools(xmlnode_pool(x), ai->id, ai->secret, xmlnode_pool(x)), hashbuf);
              if(j_strcmp(hashbuf, xmlnode_get_data(x)) != 0) {
                  mio_write(m, NULL, "<stream:error>Invalid handshake</stream:error>", -1);
                  mio_close(m);
                  break;
              }
              
              /* Send a handshake confirmation with our start time  */
              cur = xmlnode_new_tag("handshake");
              xmlnode_put_attrib(cur,"time",__wplog->server_start);
              mio_write(m, cur, NULL, -1);
              
              /* check for existing conenction and kill it */
            if(ai->m != NULL)  {
                    log_warn(ai->i->id, "Socket override by another connection from %s",mio_ip(m));
                    mio_write(ai->m, NULL, "<stream:error>Socket override by another connection.</stream:error>", -1);
                    mio_close(ai->m);
            }
                  
            /* hook us up! */
            ai->m = m;
            ai->state = A_READY;
                  
            /* flush old queue */
                  pthread_mutex_lock(&(ai->sem));
            q = ai->q_start;
            while(q != NULL)  {
                    q2 = q->next;
                    mio_write(m, q->x, NULL, 0);
                    q = q2;
            }
            ai->q_start = NULL;
            ai->q_end = NULL;
                  pthread_mutex_unlock(&(ai->sem));
                  
            break;
                  
      case MIO_ERROR:
        /* make sure it's the important one */
          log_debug(ZONE,"m=%d  ai->m = %d",m,ai->m);
          
            if(m != ai->m)
              return;
            
            ai->state = A_ERROR;
            
            /* clean up any tirds */
            while((cur = mio_cleanup(m)) != NULL)
              deliver_fail(dpacket_new(cur), "External Server Error");
            
            return;
            
      case MIO_CLOSED:
        /* make sure it's the important one */
        if(m != ai->m)
                return;
        
        log_debug(ZONE,"closing accepted socket");
        ai->m = NULL;
        ai->state = A_ERROR;
        
        return;
      default:
            return;
    }
    xmlnode_free(x);
}

/* check the packet queue for stale packets */
result base_accept_beat(void *arg)
{
    accept_instance ai = (accept_instance)arg;
    queue last, cur, next;
    int now = time(NULL);
    
    pthread_mutex_lock(&(ai->sem));
    cur = ai->q_start;
    while(cur != NULL)
    {
        if( (now - cur->stamp) <= ai->timeout)
        {
            last = cur;
            cur = cur->next;
            continue;
        }

        /* timed out sukkah! */
        next = cur->next;

      /* if first */
        if (ai->q_start == cur)
      {
            ai->q_start = next;

          /* first and last */
          if (ai->q_end == cur)
            ai->q_end = next; /* next must be NULL in here */
      }
      else /* if last */
        if (ai->q_end == cur)
        {
            ai->q_end = last;
        }
        else /* if not first, not last */
        {
            last->next = next;
        }

        deliver_fail(dpacket_new(cur->x),"Internal Timeout");
        cur = next;
    }
    
    pthread_mutex_unlock(&(ai->sem));
    return r_DONE;
}

result base_accept_config(instance id, xmlnode x, void *arg)
{
    char *secret = xmlnode_get_tag_data(x, "secret");
    accept_instance inst;
    int port = j_atoi(xmlnode_get_tag_data(x, "port"),0);

    if(id == NULL)
    {
        log_debug(ZONE,"base_accept_config validating configuration...");
        if (port == 0 || (xmlnode_get_tag(x,"secret") == NULL))
        {
            xmlnode_put_attrib(x,"error","<accept> requires the following subtags: <port>, and <secret>");
            return r_ERR;
        }         
        return r_PASS;
    }

    log_debug(ZONE,"base_accept_config performing configuration %s\n",xmlnode2str(x));

    /* Setup the default sink for this instance */ 
    inst              = pmalloco(id->p, sizeof(_accept_instance));
    inst->p           = id->p;
    inst->i           = id;
    inst->secret      = secret;
    inst->ip          = xmlnode_get_tag_data(x,"ip");
    inst->port        = port;
    inst->log         = j_atoi(xmlnode_get_tag_data(x, "log"),-1);
    inst->timeout     = j_atoi(xmlnode_get_tag_data(x, "timeout"),30); /* 30 sec */
    pthread_mutex_init(&(inst->sem),NULL);

    /* Start a new listening thread and associate this <listen> tag with it */
    if(mio_listen(inst->port, inst->ip, base_accept_process_xml, (void*)inst, NULL, mio_handlers_new(NULL, NULL, MIO_XML_PARSER)) == NULL)
    {
        xmlnode_put_attrib(x,"error","<accept> unable to listen on the configured ip and port");
        return r_ERR;
    }

    /* Register a packet handler and cleanup heartbeat for this instance */
    register_phandler(id, o_DELIVER, base_accept_deliver, (void *)inst);

    /* timeout check */
    register_beat(inst->timeout, base_accept_beat, (void *)inst);
    
    /* base shutdown kill */
    register_shutdown(base_accept_kill, (void *)inst);

    return r_DONE;
}


void base_accept(void)
{
    log_debug(ZONE,"base_accept loading...\n");
    register_config("accept",base_accept_config,NULL);
}




Generated by  Doxygen 1.6.0   Back to index