Logo Search packages:      
Sourcecode: dcc version File versions

oflod.c

/* Distributed Checksum Clearinghouse
 *
 * deal with outgoing floods of checksums
 *
 * Copyright (c) 2005 by Rhyolite Software, LLC
 *
 * This agreement is not applicable to any entity which sells anti-spam
 * solutions to others or provides an anti-spam solution as part of a
 * security solution sold to other entities, or to a private network
 * which employs the DCC or uses data provided by operation of the DCC
 * but does not provide corresponding data to other users.
 *
 * Permission to use, copy, modify, and distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * Parties not eligible to receive a license under this agreement can
 * obtain a commercial license to use DCC and permission to use
 * U.S. Patent 6,330,590 by contacting Commtouch at http://www.commtouch.com/
 * or by email to nospam@commtouch.com.
 *
 * A commercial license would be for Distributed Checksum and Reputation
 * Clearinghouse software.  That software includes additional features.  This
 * free license for Distributed ChecksumClearinghouse Software does not in any
 * way grant permision to use Distributed Checksum and Reputation Clearinghouse
 * software
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND RHYOLITE SOFTWARE, LLC DISCLAIMS ALL
 * WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES
 * OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL RHYOLITE SOFTWARE, LLC
 * BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES
 * OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
 * WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION,
 * ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
 * SOFTWARE.
 *
 * Rhyolite Software DCC 1.3.42-1.97 $Revision$
 */

#include "dccd_defs.h"
#include "dcc_ck.h"


int flods_off;                      /* # of reasons flooding is off */
int flod_db_sick;             /* # of flooding DB sicknesses */


u_char too_busy;              /* too busy to flood */
time_t next_flods_ck;
enum FLODS_ST flods_st = FLODS_ST_OFF;
OFLODS oflods;

/* records after this have not been flooded
 *    0 if invalid */
DB_PTR oflods_max_pos;

static time_t flod_mtime = 1;

int summarize_delay_secs;           /* delay summaries by this */


static void oflod_fill(OFLOD_INFO *);


/* the socket must already be closed */
static void
oflod_clear(OFLOD_INFO *ofp)
{
      memset(ofp, 0, sizeof(*ofp));
      ofp->s = -1;
}



static void
oflods_clear(void)
{
      OFLOD_INFO *ofp;

      for (ofp = oflods.infos; ofp <= LAST(oflods.infos); ++ofp)
            oflod_clear(ofp);
      oflods.total = 0;
      oflods.active = 0;
      complained_many_iflods = 0;
      oflods_max_pos = 0;
}



void
oflods_unmap(void)
{
      OFLOD_INFO *ofp;

      for (ofp = oflods.infos; ofp <= LAST(oflods.infos); ++ofp)
            ofp->mp = 0;
      flod_unmap(0, &dccd_stats);
}



/* parse ID1->tgt in a flood file entry */
static char
oflod_parse_map(OFLOD_OPTS *opts, const char *str0, int lno)
{
      DCC_FNM_LNO_BUF fnm_buf;
      const char *str;
      OFLOD_SRVR_ID_MAP *imp;

      if (opts->num_maps >= DIM(opts->srvr_map)) {
            dcc_error_msg("too many ID mappings with\"%s\"%s",
                        str0, fnm_lno(fnm_buf, flod_path, lno));
            return 0;
      }
      imp = &opts->srvr_map[opts->num_maps];

      if (!CSTRCMP(str0, "self->")) {
            str = str0+STRZ("self->");
            imp->from_lo = imp->from_hi = my_srvr_id;
      } else if (!CSTRCMP(str0, "all->")) {
            str = str0+STRZ("all->");
            imp->from_lo = DCC_SRVR_ID_MIN;
            imp->from_hi = DCC_SRVR_ID_MAX;
      } else {
            /* get ID1 */
            str = dcc_get_srvr_id(0, &imp->from_lo, str0, "-",
                              flod_path, lno);
            if (!str)
                  return 0;
            if (str[0] == '-' && str[1] == '>') {
                  /* ID1 is not a range */
                  imp->from_hi = imp->from_lo;
            } else {
                  /* ID1 is a range of IDs */
                  str = dcc_get_srvr_id(0, &imp->from_hi,
                                    str+1, "-", flod_path, lno);
                  if (!str)
                        return 0;
                  if (imp->from_hi < imp->from_lo) {
                        dcc_error_msg("invalid ID mapping range "
                                    "\"%d-%d\"%s",
                                    imp->from_lo, imp->from_hi,
                                    fnm_lno(fnm_buf, flod_path, lno));
                        return 0;
                  }
            }
            if (*str++ != '-' || *str++ != '>') {
                  dcc_error_msg("invalid server-ID mapping \"%s\"%s",
                              str0, fnm_lno(fnm_buf, flod_path, lno));
                  return 0;
            }
      }
      if (!strcasecmp(str, "self")) {
            imp->result = ID_MAP_SELF;
      } else if (!strcasecmp(str, "reject")) {
            imp->result = ID_MAP_REJ;
      } else if (!strcasecmp(str, "ok")) {
            imp->result = ID_MAP_NO;
      } else {
            dcc_error_msg("invalid ID mapping result \"%s\"%s",
                        str, fnm_lno(fnm_buf, flod_path, lno));
            return 0;
      }

      ++opts->num_maps;
      return 1;
}



/* parse remote or local options that can be any of
 *  a "off", "del", "no-del", "log-del", "passive", ID->map, etc. */
static const char *                 /* rest of the line */
oflod_parse_opts(OFLOD_INFO *ofp,
             OFLOD_OPTS *opts,
             const char *buf, int lno)
{
      DCC_FNM_LNO_BUF fnm_buf;
      char opts_buf[200];
      char opt[20];
      const char *buf_ptr, *p;
      char *end;
      unsigned long l;
      u_int olen;

      /* pick out the blank delimited string of options */
      buf = dcc_parse_word(0, opts_buf, sizeof(opts_buf),
                       buf, "flood options", flod_path, lno);
      if (!buf)
            return 0;

      opts->path_len = DCC_MAX_FLOD_PATH;
      if (grey_on)
            opts->flags |= (FLOD_OPT_DEL_OK
                        | FLOD_OPT_NO_LOG_DEL
                        | FLOD_OPT_DEL_SET);

      /* parse the options */
      buf_ptr = opts_buf;
      while (*buf_ptr != '\0') {
            while (*buf_ptr == ',')
                  ++buf_ptr;
            olen = strcspn(buf_ptr, ",");
            if (olen >= sizeof(opt))
                  olen = sizeof(opt)-1;
            strncpy(opt, buf_ptr, olen);
            opt[olen] = '\0';
            buf_ptr += olen;

            /* ignore "-" */
            if (!strcmp(opt, "-"))
                  continue;

            if (!strcasecmp(opt, "off")) {
                  opts->flags |= FLOD_OPT_OFF;
                  continue;
            }
            if (!grey_on) {
                  if (!strcasecmp(opt, "traps")) {
                        opts->flags |= FLOD_OPT_TRAPS;
                        continue;
                  }
                  if (!strcasecmp(opt, "no-del")) {
                        opts->flags &= ~FLOD_OPT_DEL_OK;
                        opts->flags |= FLOD_OPT_DEL_SET;
                        continue;
                  }
                  if (!strcasecmp(opt, "del")) {
                        opts->flags |= FLOD_OPT_DEL_OK;
                        opts->flags |= FLOD_OPT_DEL_SET;
                        continue;
                  }
            }

            /* put some options in one or or the other no matter
             * for which they are specified */
            if (!strcasecmp(opt, "no-log-del")) {
                  ofp->i_opts.flags |= FLOD_OPT_NO_LOG_DEL;
                  continue;
            }
            if (!strcasecmp(opt, "log-del")) {
                  ofp->i_opts.flags &= ~FLOD_OPT_NO_LOG_DEL;
                  continue;
            }
            if (!strcasecmp(opt, "passive")) {
                  if (ofp->o_opts.flags & FLOD_OPT_SOCKS) {
                        dcc_error_msg("\"passive\" and \"SOCKS\";"
                                    " cannot both be%s",
                                    fnm_lno(fnm_buf, flod_path, lno));
                        return 0;
                  }
                  ofp->o_opts.flags |= FLOD_OPT_PASSIVE;
                  continue;
            }
            if (!strcasecmp(opt, "socks")) {
                  if (ofp->o_opts.flags & FLOD_OPT_PASSIVE) {
                        dcc_error_msg("\"passive\" and \"SOCKS\";"
                                    " cannot both be%s",
                                    fnm_lno(fnm_buf, flod_path, lno));
                        return 0;
                  }
                  if (ofp->loc_hostname[0] != '\0'
                      || ofp->loc_port != 0) {
                        dcc_error_msg("local host name or port number"
                                    " and \"SOCKS\";"
                                    " cannot both be%s",
                                    fnm_lno(fnm_buf, flod_path, lno));
                        ofp->loc_hostname[0] = '\0';
                        ofp->loc_port = 0;
                  }
                  ofp->o_opts.flags |= FLOD_OPT_SOCKS;
                  continue;
            }
            if (!strcasecmp(opt, "IPv4")) {
                  if (ofp->o_opts.flags & FLOD_OPT_IPv6) {
                        dcc_error_msg("\"IPv4\" and \"IPv6\";"
                                    " cannot both be%s",
                                    fnm_lno(fnm_buf, flod_path, lno));
                        return 0;
                  }
                  ofp->o_opts.flags |= FLOD_OPT_IPv4;
                  continue;
            }
            if (!strcasecmp(opt, "IPv6")) {
                  if (ofp->o_opts.flags & FLOD_OPT_IPv4) {
                        dcc_error_msg("\"IPv4\" and \"IPv6\";"
                                    " cannot both be%s",
                                    fnm_lno(fnm_buf, flod_path, lno));
                        return 0;
                  }
                  ofp->o_opts.flags |= FLOD_OPT_IPv6;
                  continue;
            }
            if (!strcasecmp(opt, "no-reputations")) {
                  continue;
            }
            if (!CSTRCMP(opt, "leaf=")
                && (l = strtoul(opt+STRZ("leaf="), &end, 10),
                  *end == '\0')) {
                  if (l > DCC_MAX_FLOD_PATH)
                        l = DCC_MAX_FLOD_PATH;
                  ofp->o_opts.path_len = l;
                  continue;
            }

#ifdef DCC_FLOD_VERSION7
            if (!strcasecmp(opt, "version7")) {
                  ofp->oversion = DCC_FLOD_VERSION7;
                  continue;
            }

#endif
            /* parse an ID->map */
            p = strchr(opt, '>');
            if (p && p > opt && *(p-1) == '-') {
                  if (!oflod_parse_map(opts, opt, lno))
                        return 0;
                  continue;
            }

            dcc_error_msg("unknown option \"%s\"%s",
                        opt, fnm_lno(fnm_buf, flod_path, lno));
            return 0;
      }

      return buf;
}



static const char *                 /* rest of a flod file line */
oflod_parse_id(DCC_SRVR_ID *id,
             const char *buf, const char *type, int lno)
{
      char id_buf[20];

      buf = dcc_parse_word(0, id_buf, sizeof(id_buf),
                       buf, type, flod_path, lno);
      if (!buf)
            return 0;

      if (!strcmp(id_buf, "-")
          || id_buf[0] == '\0') {
            *id = DCC_ID_INVALID;
            return buf;
      }

      if (!dcc_get_srvr_id(0, id, id_buf, 0, flod_path, lno))
            return 0;

      /* do not check whether we know the local ID here, because
       * changes in the ids file can make that check moot */

      return buf;
}



/* compute the maximum position among all floods */
static void
get_oflods_max_pos(void)
{
      OFLOD_INFO *ofp;

      oflods_max_pos = DB_PTR_BASE;
      for (ofp = oflods.infos; ofp <= LAST(oflods.infos); ++ofp) {
            if (ofp->rem_hostname[0] != '\0'
                && oflods_max_pos < ofp->cur_pos)
                  oflods_max_pos = ofp->cur_pos;
      }
}



static void
copy_opts2mp(OFLOD_INFO *ofp)
{
      FLOD_MMAP *mp = ofp->mp;

      mp->flags &= (FLOD_MMAP_FG_REWINDING
                  | FLOD_MMAP_FG_NEED_REWIND
                  | FLOD_MMAP_FG_FFWD_IN);

      if (db_parms.flags & DB_PARM_FG_CLEARED) {
            mp->flags |= FLOD_MMAP_FG_NEED_REWIND;
            mp->flags &= ~FLOD_MMAP_FG_FFWD_IN;
      }

      if (ofp->o_opts.flags & FLOD_OPT_PASSIVE)
            mp->flags |= FLOD_MMAP_FG_PASSIVE;
      else if (ofp->o_opts.flags & FLOD_OPT_SOCKS)
            mp->flags |= FLOD_MMAP_FG_SOCKS;

      if (ofp->o_opts.path_len != DCC_MAX_FLOD_PATH)
            mp->flags |= FLOD_MMAP_FG_LEAF;

      if (ofp->o_opts.num_maps != 0
          || ofp->i_opts.num_maps != 0)
            mp->flags |= FLOD_MMAP_FG_MAPPED;

      if (ofp->o_opts.flags & FLOD_OPT_IPv4) {
            if (mp->flags & FLOD_MMAP_FG_IPv6) {
                  mp->flags &= ~FLOD_MMAP_FG_IPv6;
                  got_hosts = 0;
            }
            mp->flags |= FLOD_MMAP_FG_IPv4;
      } else if (ofp->o_opts.flags & FLOD_OPT_IPv6) {
            if (mp->flags & FLOD_MMAP_FG_IPv4) {
                  mp->flags &= ~FLOD_MMAP_FG_IPv4;
                  got_hosts = 0;
            }
            mp->flags |= FLOD_MMAP_FG_IPv6;
      }

      mp->flags &= ~(FLOD_MMAP_FG_FLOD_REP_OFF
                   | FLOD_MMAP_FG_PEER_REP_OFF);

      /* get new hostname if it changes */
      if (strcasecmp(mp->rem_hostname, ofp->rem_hostname)) {
            BUFCPY(mp->rem_hostname, ofp->rem_hostname);
            /* force name resolution for new name */
            got_hosts = 0;
            new_peer(ofp);
      }
      /* always get new port name
       * in case the name but not the number changes */
      BUFCPY(mp->rem_portname, ofp->rem_portname);
      if (mp->rem_port != ofp->rem_port) {
            mp->rem_port = ofp->rem_port;
            new_peer(ofp);
      }
}



/* Load the hostnames of DCC server peers and their output flood positions.
 *    flod_names_resolve_ck() must say ok before this function is called,
 *    to avoid races with changing host names.
 *
 *    oflods.active and iflods.active must both be 0 to ensure that
 *    there are no old pointers to the ofp structures.
 *
 *    Parse lines of the form
 *      hostname port rem-ID [passwd-id [out-opts [in-opts [versionX]]]]
 */
u_char                              /* 1=ok to start flooding */
oflods_load(void)
{
      DCC_FNM_LNO_BUF fnm_buf;
      OFLOD_INFO *ofp, *ofp1;
      FILE *f;
      struct stat sb;
      int lno;
      char buf[200];
      char hostname[60];
      const char *bufp, *bufp1;
      FLOD_MMAP *mp, *mp1;
      union {
          OFLOD_INFO    info;
          FLOD_MMAP     map;
      } swap;
      char *p;
      int i;

      /* forget everything about output flooding */
      oflods_clear();

      /* keep the map open and locked most of the time */
      if (!flod_mmap(dcc_emsg, db_parms.sn, &dccd_stats, 1,
                   (DCC_TRACE_FLOD_BIT & dccd_tracemask) != 0)) {
            dcc_error_msg("%s", dcc_emsg);
            flod_mtime = 0;
            return 0;
      }

      f = fopen(flod_path, "r");
      if (!f) {
            int serrno = errno;

            if (flod_mtime != 0) {
                  dcc_error_msg("fopen(%s): %s",
                              flod_path, ERROR_STR());
                  flod_mtime = 0;
            }

            /* do not clear or unmap the flod map file if the text file
             * is missing in case it has been temporarily deleted
             * while being edited */
            if (serrno != ENOENT)
                  flod_unmap(0, &dccd_stats);
            return 0;
      }
      if (0 > fstat(fileno(f), &sb)) {
            if (flod_mtime!= 0)
                  dcc_error_msg("stat(%s): %s",
                              flod_path, ERROR_STR());
            fclose(f);
            flod_mtime = 0;
            return 0;
      }
      flod_mtime = sb.st_mtime;

      /* Parse the file of names and numbers first so that we do not
       * destroy the position information if there is a problem with names */
      ofp = oflods.infos;
      lno = 0;
      for (;;) {
            /* clear the entry in case we started to set it with the
             * preceding line from the /var/dcc/flod file */
            if (ofp <= LAST(oflods.infos))
                  oflod_clear(ofp);

            ++lno;
            bufp = fgets(buf, sizeof(buf), f);
            if (!bufp) {
                  if (ferror(f)) {
                        dcc_error_msg("fgets(%s): %s",
                                    flod_path, ERROR_STR());
                        break;
                  }
                  if (fclose(f) == EOF) {
                        dcc_error_msg("fclose(%s): %s",
                                    flod_path, ERROR_STR());
                  }
                  f = 0;
                  break;
            }
            i = strlen(bufp);
            if (i >= ISZ(buf)-1) {
                  dcc_error_msg("too many characters%s",
                              fnm_lno(fnm_buf, flod_path, lno));
                  do {
                        i = getc(f);
                  } while (i != '\n' && i != EOF);
                  continue;
            }
            /* ignore comments */
            p = strchr(bufp, '#');
            if (p)
                  *p = '\0';
            else
                  p = &buf[i];
            /* trim trailing blanks */
            while (--p > bufp && (*p == ' ' || *p == '\t' || *p == '\n'))
                  *p = '\0';
            /* skip blank lines */
            bufp += strspn(bufp, DCC_WHITESPACE);
            if (*bufp == '\0')
                  continue;

            if (oflods.total >= DIM(oflods.infos)) {
                  dcc_error_msg("too many DCC peers in %s; max=%d",
                              flod_path, DIM(oflods.infos));
                  continue;
            }

            ofp->lno = lno;

            /* get IP address and port number of remote DCC server */
            bufp1 = bufp+strcspn(bufp, DCC_WHITESPACE";");
            if (*bufp1 != ';') {
                  bufp1 = 0;
            } else {
                  /* Allow the local or client TCP IP address and
                   * port number to be specified. */
                  buf[bufp1++ - buf] = '\0';
            }
            bufp = dcc_parse_nm_port(0, bufp, def_port,
                               ofp->rem_hostname,
                               sizeof(ofp->rem_hostname),
                               &ofp->rem_port,
                               ofp->rem_portname,
                               sizeof(ofp->rem_portname),
                               flod_path, lno);
            if (!bufp)
                  continue;
            if (bufp1) {
                  /* parse the local IP address first */
                  bufp = dcc_parse_nm_port(0, bufp1, 0,
                                     ofp->loc_hostname,
                                     sizeof(ofp->loc_hostname),
                                     &ofp->loc_port, 0, 0,
                                     flod_path, lno);
                  if (!bufp)
                        continue;
            }

            bufp = oflod_parse_id(&ofp->rem_id, bufp,
                              "rem-id", lno);
            if (!bufp)
                  continue;
            if (ofp->rem_id == DCC_ID_INVALID) {
                  dcc_error_msg("missing rem-id%s",
                              fnm_lno(fnm_buf, flod_path, lno));
                  continue;
            }

            bufp = oflod_parse_id(&ofp->out_passwd_id, bufp,
                              "passwd-id", lno);
            if (!bufp)
                  continue;
            if (ofp->out_passwd_id == DCC_ID_INVALID) {
                  ofp->out_passwd_id = my_srvr_id;
                  ofp->in_passwd_id = ofp->rem_id;
            } else {
                  ofp->in_passwd_id = ofp->out_passwd_id;
            }

            ofp->oversion = DCC_FLOD_VERSION_DEF;
            bufp = oflod_parse_opts(ofp, &ofp->o_opts, bufp, lno);
            if (!bufp)
                  continue;
            bufp = oflod_parse_opts(ofp, &ofp->i_opts, bufp, lno);
            if (!bufp)
                  continue;
            if (*bufp != '\0')
                  dcc_error_msg("trailing garbage \"%s\" ignored%s",
                              bufp, fnm_lno(fnm_buf, flod_path, lno));

            /* both servers having spam traps and assuming the other
             * doesn't makes no sense */
            if (((ofp->o_opts.flags & FLOD_OPT_TRAPS)
                 || (ofp->i_opts.flags & FLOD_OPT_TRAPS))
                && !(ofp->i_opts.flags & FLOD_OPT_OFF)
                && !(ofp->o_opts.flags & FLOD_OPT_OFF)) {
                  dcc_error_msg("symmetric trap-only link%s",
                              fnm_lno(fnm_buf, flod_path, lno));
                  continue;
            }

            for (ofp1 = oflods.infos; ofp1 < ofp; ++ofp1) {
                  if ((!strcmp(ofp1->rem_hostname, ofp->rem_hostname)
                       && ofp1->rem_port == ofp->rem_port)
                      || ofp1->rem_id == ofp->rem_id)
                        break;
            }
            if (ofp1 != ofp) {
                  dcc_error_msg("duplicate DCC peer%s",
                              fnm_lno(fnm_buf, flod_path, lno));
                  continue;
            }

            /* ignore ourself */
            if (ofp->rem_id == my_srvr_id)
                  continue;

            ofp->limit_reset = db_time.tv_sec + FLOD_LIM_CLEAR_SECS;

            ++ofp;
            ++oflods.total;
      }
      if (f)
            fclose(f);

      /* sort the list by server-ID so that `cdcc "flood list"` is sorted */
      ofp = oflods.infos;
      while (ofp < LAST(oflods.infos)) {
            ofp1 = ofp+1;
            if (ofp1->rem_hostname[0] == '\0')
                  break;
            if (ofp->rem_id <= ofp1->rem_id) {
                  ofp = ofp1;
                  continue;
            }
            /* bubble sort because the list is usually already
             * ordered and almost always tiny */
            memcpy(&swap.info, ofp1, sizeof(swap.info));
            memcpy(ofp1, ofp, sizeof(*ofp1));
            memcpy(ofp, &swap.info, sizeof(*ofp));
            ofp = oflods.infos;
      }

      /* Bubble sort the list in the /var/dcc/flod/map file so that is
       * sorted for `dblist -Hv`.  The file will usually already be sorted
       * and is almost always very short. */
      mp = flod_mmaps->mmaps;
      mp1 = mp+1;
      while (mp1 <= LAST(flod_mmaps->mmaps)) {
            if (mp1->rem_hostname[0] == '\0') {
                  ++mp1;
                  continue;
            }
            if (mp->rem_hostname[0] == '\0'
                || mp->rem_id <= mp1->rem_id) {
                  mp = mp1++;
                  continue;
            }
            memcpy(&swap.map, mp1, sizeof(swap.map));
            memcpy(mp1, mp, sizeof(*mp1));
            memcpy(mp, &swap.map, sizeof(*mp));
            mp = flod_mmaps->mmaps;
            mp1 = mp+1;
      }


      /* combine our list that is based on the /var/dcc/flod file
       * with the memory mapped /var/dcc/flod/map file list of what has
       * been sent to each peer */
      for (mp = flod_mmaps->mmaps; mp <= LAST(flod_mmaps->mmaps); ++mp)
            mp->oflod_index = -1;

      /* make one pass matching old names with their slots in the
       * mapped file */
      for (ofp = oflods.infos; ofp <= LAST(oflods.infos); ++ofp) {
            if (ofp->rem_hostname[0] == '\0')
                  break;
            for (i = 0; i < DIM(flod_mmaps->mmaps); ++i) {
                  if (++mp > LAST(flod_mmaps->mmaps))
                        mp = flod_mmaps->mmaps;
                  if (mp->rem_hostname[0] == '\0')
                        continue;
                  if (mp->oflod_index < 0
                      && ofp->rem_id == mp->rem_id) {
                        /* found the old slot */
                        if (DB_PTR_IS_BAD_FULL(mp->confirm_pos)
                            || mp->confirm_pos > db_csize) {
                              dcc_error_msg("bogus position "L_HPAT
                                        " for %s in %s",
                                        mp->confirm_pos,
                                        ofp->rem_hostname,
                                        flod_mmap_path);
                              mp->rem_hostname[0] = '\0';
                              continue;
                        }
                        ofp->cur_pos = mp->confirm_pos;
                        ofp->rewind_pos = db_csize;
                        ofp->mp = mp;
                        mp->oflod_index = ofp - oflods.infos;
                        copy_opts2mp(ofp);
                        break;
                  }
            }
      }


      /* use a free or obsolete slot in the mapped file for new entries */
      mp = flod_mmaps->mmaps;
      for (ofp = oflods.infos; ofp <= LAST(oflods.infos); ++ofp) {
            if (ofp->rem_hostname[0] == '\0')
                  break;
            if (ofp->mp != 0)
                  continue;

            /* find a free or no longer used entry */
            while (mp->oflod_index >= 0) {
                  if (++mp > LAST(flod_mmaps->mmaps)) {
                        bad_stop("too few oflod mmap slots");
                        goto out;
                  }
            }
            if (mp->rem_hostname[0] != '\0')
                  dcc_error_msg("forget flood to %s %d",
                              dcc_host_portname(hostname,
                                          sizeof(hostname),
                                          mp->rem_hostname,
                                          mp->rem_portname),
                              mp->rem_id);
            memset(mp, 0, sizeof(*mp));
            ofp->mp = mp;
            mp->rem_su.sa.sa_family = AF_UNSPEC;
            mp->rem_id = ofp->rem_id;
            copy_opts2mp(ofp);

            mp->cnts.cnts_cleared = db_time.tv_sec;

            ofp->cur_pos = mp->confirm_pos = DB_PTR_BASE;
            ofp->recv_pos = ofp->xmit_pos = DB_PTR_BASE;
            mp->oflod_index = ofp - oflods.infos;
            dcc_error_msg("initialize flood to %s %d%s",
                        dcc_host_portname(hostname, sizeof(hostname),
                                    mp->rem_hostname,
                                    mp->rem_portname),
                        mp->rem_id,
                        fnm_lno(fnm_buf, flod_path, ofp->lno));
      }
out:;

      /* clear the slots that contain forgotten hosts */
      for (mp = flod_mmaps->mmaps; mp <= LAST(flod_mmaps->mmaps); ++mp) {
            if (mp->oflod_index == -1) {
                  if (mp->rem_hostname[0] != '\0')
                        dcc_error_msg("forget flood to %s %d",
                                    dcc_host_portname(hostname,
                                          sizeof(hostname),
                                          mp->rem_hostname,
                                          mp->rem_portname),
                                    mp->rem_id);
                  memset(mp, 0, sizeof(*mp));
            }
      }

      flod_mmap_sync(0, 1);

      db_parms.flags &= ~DB_PARM_FG_CLEARED;
      db_flush_parms(0);

      get_oflods_max_pos();
      return 1;
}



/* put the flood counters in stable storage */
void
save_flod_cnts(OFLOD_INFO *ofp)
{
      FLOD_MMAP *mp;
      time_t delta;

      dccd_stats.iflod_total += ofp->cnts.total;
      dccd_stats.iflod_accepted += ofp->cnts.accepted;
      dccd_stats.iflod_stale += ofp->cnts.stale.val;
      dccd_stats.iflod_dup += ofp->cnts.dup.val;
      dccd_stats.iflod_ok2 += ofp->cnts.ok2.val;
      dccd_stats.iflod_not_deleted += ofp->cnts.not_deleted.val;

      mp = ofp->mp;
      if (mp) {
            if (ofp->xmit_pos == ofp->recv_pos)
                  ofp->mp->confirm_pos = ofp->cur_pos;

            mp->cnts.total += ofp->cnts.total;
            mp->cnts.accepted += ofp->cnts.accepted;
            mp->cnts.stale += ofp->cnts.stale.val;
            mp->cnts.dup += ofp->cnts.dup.val;
            mp->cnts.ok2 += ofp->cnts.ok2.val;
            mp->cnts.not_deleted += ofp->cnts.not_deleted.val;

            mp->cnts.out_reports += ofp->cnts.out_reports;

            delta = db_time.tv_sec - ofp->cnts.saved;
            if (delta < 0)
                  delta = 0;
            if (ofp->ifp) {
                  if (mp->flags & FLOD_MMAP_FG_IN_CONN) {
                        mp->cnts.in_total_conn += delta;
                  } else {
                        mp->flags |= FLOD_MMAP_FG_IN_CONN;
                        mp->cnts.in_conn_changed = db_time.tv_sec;
                  }
            } else {
                  if (mp->flags & FLOD_MMAP_FG_IN_CONN) {
                        mp->flags &= ~FLOD_MMAP_FG_IN_CONN;
                        mp->cnts.in_conn_changed = db_time.tv_sec;
                  }
            }

            if (ofp->flags & OFLOD_FG_CONNECTED) {
                  if (mp->flags & FLOD_MMAP_FG_OUT_CONN) {
                        mp->cnts.out_total_conn += delta;
                  } else {
                        mp->flags |= FLOD_MMAP_FG_OUT_CONN;
                        mp->cnts.out_conn_changed = db_time.tv_sec;
                  }
            } else {
                  if (mp->flags & FLOD_MMAP_FG_OUT_CONN) {
                        mp->flags &= ~FLOD_MMAP_FG_OUT_CONN;
                        mp->cnts.out_conn_changed = db_time.tv_sec;
                  }
            }
      }

      memset(&ofp->cnts, 0, sizeof(ofp->cnts));
      ofp->cnts.saved = db_time.tv_sec;
}



void
oflod_close(OFLOD_INFO *ofp, u_char fail,
          enum FLOD_ERR_OP err_op, int new_errno)
{
      const char *opstr;
      int perrno;

      if (ofp->rem_hostname[0] == '\0')
            return;

      if (ofp->s >= 0) {
            if (0 > close(ofp->s)
                && !fail)
                  dcc_error_msg("close(oflod %s %s): %s",
                              ofp->rem_hostname,
                              dcc_su2str_err(&ofp->rem_su),
                              ERROR_STR());
            if (!ofp->mp) {
                  TMSG2(FLOD, "close(oflod %s %s) (stranger)",
                        ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su));
            } else if (err_op == FLOD_ERR_SAME) {
                  opstr = flod_stats_str(&perrno,
                                     ofp->mp->o_err.old_errno,
                                     ofp->mp->o_err.op);
                  if (perrno)
                        TMSG4(FLOD, "close(oflod %s %s) same %s: %s",
                              ofp->rem_hostname,
                              dcc_su2str_err(&ofp->rem_su),
                              opstr, ERROR_STR1(perrno));
                  else
                        TMSG3(FLOD, "close(oflod %s %s) same %s",
                              ofp->rem_hostname,
                              dcc_su2str_err(&ofp->rem_su),
                              opstr);
            } else {
                  opstr = flod_stats_str(&perrno, new_errno, err_op);
                  if (perrno == 0) {
                        rpt_err(&ofp->mp->o_err, 0, err_op, new_errno,
                              "close(oflod %s %s): %s",
                              ofp->rem_hostname,
                              dcc_su2str_err(&ofp->rem_su),
                              opstr);
                  } else {
                        rpt_err(&ofp->mp->o_err, 0, err_op, new_errno,
                              "close(oflod %s %s): %s %s",
                              ofp->rem_hostname,
                              dcc_su2str_err(&ofp->rem_su),
                              opstr, ERROR_STR1(perrno));
                  }
            }
            ofp->s = -1;
            ofp->flags &= ~OFLOD_FG_CONNECTED;
            ofp->obuf_len = 0;
            ofp->cnts.out_reports = 0;

            save_flod_cnts(ofp);

            if (--oflods.active == 0
                && iflods.active == 0
                && flods_st != FLODS_ST_ON)
                  oflods_unmap();
      }

      if (fail) {
            ofp->out_try_again = db_time.tv_sec + ofp->out_try_secs;
            TMSG2(FLOD, "postpone restarting flood to %s for %d seconds",
                  ofp->rem_hostname, ofp->out_try_secs);
            if (ofp->out_try_secs < FLOD_RETRY_SECS)
                  ofp->out_try_secs = FLOD_RETRY_SECS;
      }
}



/* get ready to shut down */
static void
start_shutdown(OFLOD_INFO *ofp)
{
      if (ofp->flags & OFLOD_FG_SHUTDOWN_REQ)
            return;

      /* arrange to ask the peer to ask us to stop */
      ofp->flags  |= OFLOD_FG_SHUTDOWN_REQ;
      oflod_fill(ofp);

      if (stopint)
            ofp->keep_out_time = db_time.tv_sec +     SHUTDOWN_DELAY;
      else
            ofp->keep_out_time = db_time.tv_sec + KEEPALIVE_OUT_STOP;
}



/* Half-close the TCP connection.
 *    The other DCC server will notice and send our final position
 *    to acknowledge dealing with our reports. */
static void
oflod_shutdown(OFLOD_INFO *ofp)
{
      struct linger nowait;

      /* wait until the output buffer is empty */
      if (ofp->obuf_len != 0)
            return;

      /* do it only once */
      if ((ofp->flags & OFLOD_FG_SHUTDOWN))
            return;
      ofp->flags |= OFLOD_FG_SHUTDOWN;

      /* on Solaris and Linux you must set SO_LINGER before shutdown() */
      nowait.l_onoff = 1;
      nowait.l_linger = SHUTDOWN_DELAY;
      if (0 > setsockopt(ofp->s, SOL_SOCKET, SO_LINGER,
                     &nowait, sizeof(nowait)))
            rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_IO, 0,
                  "setsockopt(SO_LINGER flood to %s %s): %s",
                  ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su),
                  ERROR_STR());

      if (0 > shutdown(ofp->s, 1)) {
            rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_IO, 0,
                  "shutdown(flood to %s %s): %s",
                  ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su),
                  ERROR_STR());
            oflod_close(ofp, 0, FLOD_ERR_SAME, 0);
      }
}



/* see if a report should be put into the output buffer for a flood
 *  db_sts.rcd.d.r points to the current record.
 *  ofp->cur_pos has already been advanced */
static u_char                       /* 1=flood this report, 0=don't */
oflod_ck_put(void)
{
      const DB_RCD_CK *cur_rcd_ck;
      DCC_TGTS rcd_tgts, ck_tgts;
      int num_cks;
      DCC_CK_TYPES type;
      u_char obs_lvl, result;

      /* skip padding, whitelisted, compressed, trimmed
       * and deleted entries */
      if (DB_RCD_ID(db_sts.rcd.d.r) == DCC_ID_WHITE
          || DB_RCD_ID(db_sts.rcd.d.r) == DCC_ID_COMP
          || (rcd_tgts = DB_TGTS_RCD_RAW(db_sts.rcd.d.r)) == 0
          || DB_RCD_TRIMMED(db_sts.rcd.d.r))
            return 0;

      /* Skip reports that should not be flooded yet
       * The flooding thresholds are used to set the delay flag.
       * Small reports are marked with the delay flag when they are added
       * to the database.  If later it seems they should be flooded,
       * they are summarized in a new report that is flooded. */
      if (DB_RCD_DELAY(db_sts.rcd.d.r))
            return 0;

      result = 0;
      obs_lvl = 0;
      cur_rcd_ck = db_sts.rcd.d.r->cks;
      for (num_cks = DB_NUM_CKS(db_sts.rcd.d.r);
           num_cks != 0;
           ++cur_rcd_ck, --num_cks) {
            type = DB_CK_TYPE(cur_rcd_ck);

            /* ignore junk for deciding whether we can send this report. */
            if (DB_TEST_NOKEEP(db_parms.nokeep_cks, type))
                  continue;

            if (DB_CK_OBS(cur_rcd_ck)) {
                  /* an obsolete fuzzier checksum
                   * makes less fuzzy checksums obsolete */
                  if (obs_lvl < db_ck_fuzziness[type]) {
                        obs_lvl = db_ck_fuzziness[type];
                        result = 0;
                  }
                  continue;
            }

            /* send server-ID declarations */
            if (type == DCC_CK_SRVR_ID)
                  return 1;

            /* do not send what we think are stale reports */
            if (DCC_TS_OLDER_TS(db_sts.rcd.d.r->ts,
                            db_parms.ex_ts[type].all))
                  continue;

            /* do not send whitelisted reports */
            ck_tgts = DB_TGTS_CK(cur_rcd_ck);
            if (ck_tgts == DCC_TGTS_OK || ck_tgts == DCC_TGTS_OK2)
                  return 0;

            /* send non-obsolete results */
            if (obs_lvl <= db_ck_fuzziness[type]) {
                  obs_lvl = db_ck_fuzziness[type];
                  result = 1;
            }
      }
      return result;
}



static void
put_rcd_obuf(OFLOD_INFO *ofp, const DB_RCD *cur_rcd)
{
      DCC_FLOD *bp;
      DCC_TGTS tgts;
      DCC_SRVR_ID srvr, psrvr;
      const DB_RCD_CK *cur_rcd_ck;
      DCC_CK *buf_ck;
      DCC_FLOD_PATH_ID *new_path_idp, *new_path_id_limp, *rcd_path_id;
      int path_id_max;
      DCC_CK_TYPES type;
      ID_MAP_RESULT srvr_mapped;
      u_char reflecting;            /* 1=report is pointed at its source */
      u_char non_path, all_spam;
      int num_cks, i, j;

      /* decide whether to send this report */
      if (!oflod_ck_put())
            return;                 /* skip it */

      bp = (DCC_FLOD *)&ofp->obuf.c[ofp->obuf_len];
      db_ptr2flod_pos(bp->pos, ofp->cur_pos);
      tgts = DB_TGTS_RCD_RAW(cur_rcd);
      if (tgts == DCC_TGTS_DEL) {
            /* don't send delete requests to systems that don't want them */
            if (!(ofp->o_opts.flags & FLOD_OPT_DEL_OK))
                  return;
      } else if (ofp->o_opts.flags & FLOD_OPT_TRAPS) {
            tgts = DCC_TGTS_TOO_MANY;
      }

      srvr = DB_RCD_ID(cur_rcd);
      /* translate the source server-ID */
      srvr_mapped = id_map(srvr, &ofp->o_opts);
      switch (srvr_mapped) {
      case ID_MAP_NO:
            break;
      case ID_MAP_REJ:
            return;
      case ID_MAP_SELF:
            srvr = my_srvr_id;
            break;
      }
      /* this loses the DCC_SRVR_ID_AUTH bit */
      bp->srvr_id_auth[0] = srvr>>8;
      bp->srvr_id_auth[1] = srvr;

      reflecting = (srvr == ofp->rem_id);
      non_path = 0;

      memcpy(bp->ts, cur_rcd->ts, sizeof(bp->ts));

      cur_rcd_ck = cur_rcd->cks;

      /* Add a path if we are not the source of the report
       * or if it already has a path */
      buf_ck = bp->cks;
      if (srvr != my_srvr_id
          || DB_CK_TYPE(cur_rcd_ck) == DCC_CK_FLOD_PATH) {
            /* Add a checksum entry for a path consisting of only our
             * server-ID.  If the report contains a path, we will
             * concatenate to this entry */
            memset(buf_ck, 0, sizeof(*buf_ck));
            buf_ck->len = sizeof(*buf_ck);
            buf_ck->type = DCC_CK_FLOD_PATH;
            new_path_idp = (DCC_FLOD_PATH_ID *)buf_ck->sum;
            new_path_idp->hi = my_srvr_id>>8;
            new_path_idp->lo = my_srvr_id;
            new_path_id_limp = new_path_idp + DCC_NUM_FLOD_PATH;
            path_id_max = ofp->o_opts.path_len-1;
            ++new_path_idp;
            ++buf_ck;
            bp->num_cks = 1;
      } else {
            /* do not add a path */
            new_path_idp = new_path_id_limp = 0;
            path_id_max = 0;
            bp->num_cks = 0;
      }

      all_spam = 1;
      for (num_cks = DB_NUM_CKS(cur_rcd);
           num_cks != 0;
           ++cur_rcd_ck, --num_cks) {
            type = DB_CK_TYPE(cur_rcd_ck);
            if (type == DCC_CK_FLOD_PATH) {
                  rcd_path_id = (DCC_FLOD_PATH_ID *)&cur_rcd_ck->sum;
                  for (j = 0; j < DCC_NUM_FLOD_PATH; ++j, ++rcd_path_id) {
                        psrvr = ((rcd_path_id->hi<<8)
                               | rcd_path_id->lo);
                        /* stop copying the path at its end */
                        if (psrvr == DCC_ID_INVALID)
                              break;
                        /* don't send report if its path is too long */
                        if (--path_id_max < 0)
                              return;
                        /* add another "checksum" to continue path */
                        if (new_path_idp >= new_path_id_limp) {
                              memset(buf_ck, 0, sizeof(*buf_ck));
                              buf_ck->len = sizeof(*buf_ck);
                              buf_ck->type = DCC_CK_FLOD_PATH;
                              new_path_idp = (DCC_FLOD_PATH_ID *
                                          )buf_ck->sum;
                              new_path_id_limp = (new_path_idp
                                          + DCC_NUM_FLOD_PATH);
                              ++buf_ck;
                              ++bp->num_cks;
                        }
                        /* Do not send reports from the target back
                         * to the target unless the report is a
                         * Server-ID declaration */
                        if (psrvr == ofp->rem_id)
                              reflecting = 1;
                        switch (id_map(psrvr, &ofp->o_opts)) {
                        case ID_MAP_NO:
                              break;
                        case ID_MAP_REJ:
                              return;
                        case ID_MAP_SELF:
                              psrvr = my_srvr_id;
                              break;
                        }
                        new_path_idp->hi = psrvr>>8;
                        new_path_idp->lo = psrvr;
                        ++new_path_idp;
                  }

            } else {
                  /* Do not send translated server-ID declarations
                   * or checksums in our own or in translated server-ID
                   * reports that we wouldn't have kept if we had
                   * received the original reports */
                  if (srvr_mapped == ID_MAP_SELF) {
                        if (type == DCC_CK_SRVR_ID)
                              return;
                        if (DB_TEST_NOKEEP(db_parms.nokeep_cks, type))
                              continue;
                  }
                  /* Do not send reports from the target to the target
                   * unless the report is a Server-ID declaration */
                  if (reflecting && type != DCC_CK_SRVR_ID)
                        return;
                  /* send everything else */
                  buf_ck->type = type;
                  buf_ck->len = sizeof(*buf_ck);
                  memcpy(buf_ck->sum, cur_rcd_ck->sum,
                         sizeof(buf_ck->sum));
                  ++buf_ck;
                  ++bp->num_cks;

                  non_path = 1;
                  if (all_spam
                      && DB_TGTS_CK(cur_rcd_ck) != DCC_TGTS_TOO_MANY)
                        all_spam = 0;
            }
      }

      /* quit if we found nothing but the path to send */
      if (!non_path)
            return;

      if (all_spam && srvr == my_srvr_id)
            tgts = DCC_TGTS_TOO_MANY;
      tgts = htonl(tgts);
      memcpy(&bp->tgts, &tgts, sizeof(bp->tgts));

      i = (char *)buf_ck - (char *)bp;
      ofp->obuf_len += i;
      ++ofp->obuf_reports;

      ofp->xmit_pos = ofp->cur_pos;
}



/* send reports from the database to a peer DCC server
 *    This routine only fills the buffer.  The buffer is eventually
 *    written by oflod_write(). */
static void
oflod_fill(OFLOD_INFO *ofp)
{
      u_int cur_rcd_len;
      int work;

      ofp->flags &= ~(OFLOD_FG_TOO_BUSY | OFLOD_FG_NEW);

      /* stop when things are not ready or shutting down */
      if (!(ofp->flags & OFLOD_FG_CONNECTED)
          || (ofp->flags & OFLOD_FG_SHUTDOWN))
            return;

      /* stop when we are about to clean the database for a deletion so
       * that we will not be shut down cleaning along with our neighbors */
      if (need_del_dbclean)
            return;

      /* after having started sending the buffer,
       * wait for it to drain before adding more */
      if (ofp->obuf_off != 0)
            return;

      work = 0;
      while (ofp->obuf_len < sizeof(ofp->obuf) - sizeof(DCC_FLOD)) {
            if (ofp->cur_pos >= db_csize) {
                  /* nothing to send
                   * shut down if needed */
                  if (ofp->xmit_pos == ofp->recv_pos)
                        ofp->mp->confirm_pos = ofp->cur_pos;
                  if (ofp->mp->confirm_pos >= ofp->rewind_pos)
                        ofp->mp->flags &= ~FLOD_MMAP_FG_REWINDING;
                  if (ofp->flags & OFLOD_FG_SHUTDOWN_REQ)
                        oflod_shutdown(ofp);
                  break;
            }

            /* don't try to look at reports crossing page bounardies */
            if (ofp->cur_pos%db_page_size >= db_page_max) {
                  ofp->cur_pos += DB_RCD_HDR_LEN;
                  ++work;
                  continue;
            }

            if (!db_map_rcd(dcc_emsg, &db_sts.rcd, ofp->cur_pos,
                        &cur_rcd_len)) {
                  dcc_error_msg("oflod_fill() starting at "L_HPAT
                              " for %s: %s",
                              ofp->cur_pos, ofp->rem_hostname,
                              dcc_emsg);
                  ofp->cur_pos = db_csize;
                  break;
            }

            if (DB_NUM_CKS(db_sts.rcd.d.r) > DCC_DIM_CKS) {
                  dcc_error_msg("impossible %d checksums in "L_HPAT,
                              DB_NUM_CKS(db_sts.rcd.d.r),
                              ofp->cur_pos);
                  ofp->cur_pos = db_csize;
                  break;
            }

            /* start a new entry unless we are shutting down */
            if (ofp->flags & OFLOD_FG_SHUTDOWN_REQ) {
                  oflod_shutdown(ofp);
                  break;
            }

            /* stop if we've spent enough time here so that
             * incoming requests aren't lost */
            if (++work >= 10) {
                  if (too_busy) {
                        ofp->flags |= OFLOD_FG_TOO_BUSY;
                        break;
                  }
                  gettimeofday(&db_time, 0);
                  if (tv_diff2us(&db_time, &wake_time) > DCC_USECS) {
                        too_busy = 1;
                        ofp->flags |= OFLOD_FG_TOO_BUSY;
                        break;
                  }
                  work = 0;
            }

            ofp->cur_pos += cur_rcd_len;

            /* send the record */
            put_rcd_obuf(ofp, db_sts.rcd.d.r);
      }

      if (oflods_max_pos < ofp->cur_pos)
            oflods_max_pos = ofp->cur_pos;
}



/* figure out what version to tell the peer */
const char *
version_str(const OFLOD_INFO *ofp)
{
      if (ofp->oversion == 0)
            return DCC_FLOD_VERSION_CUR_STR;
#ifdef DCC_FLOD_VERSION7
      if (ofp->oversion == DCC_FLOD_VERSION7)
            return DCC_FLOD_VERSION_CUR_STR;
#endif
      dcc_logbad(EX_SOFTWARE, "unknown ofp->oversion=%d",
               ofp->oversion);
}



static void
oflod_retry_backoff(OFLOD_INFO *ofp)
{
      ofp->out_try_secs *= 2;
      if (ofp->out_try_secs > FLOD_SLOW_RETRY_SECS)
            ofp->out_try_secs = FLOD_SLOW_RETRY_SECS;
      else if (ofp->out_try_secs < FLOD_RETRY_SECS)
            ofp->out_try_secs = FLOD_RETRY_SECS;
}



/* finish connecting output flood by sending our version number and signature */
u_char
oflod_connect_fin(OFLOD_INFO *ofp)
{
      DCC_FNM_LNO_BUF fnm_buf;
      ID_TBL *tp;

      tp = find_id_tbl(ofp->out_passwd_id);
      if (!tp || tp->cur_passwd[0] == '\0') {
            if (ofp->mp != 0)
                  rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_SIGN, 0,
                        "oflod "DCC_FLOD_PASSWD_ID_MSG" %d%s",
                        ofp->out_passwd_id,
                        fnm_lno(fnm_buf, flod_path, ofp->lno));
            return 0;
      }

      ofp->keep_out_time = db_time.tv_sec + KEEPALIVE_OUT;
      ofp->mp->o_err.ok = db_time.tv_sec + LAST_ERROR_OK_SECS;
      ofp->flags |= OFLOD_FG_CONNECTED;
      save_flod_cnts(ofp);

      ofp->flags &= ~OFLOD_FG_SHUTDOWN;
      ofp->flags &= ~OFLOD_FG_SHUTDOWN_REQ;

      ofp->recv_pos = ofp->xmit_pos = ofp->cur_pos = ofp->mp->confirm_pos;
      get_oflods_max_pos();

      ofp->ibuf_len = 0;
      ofp->obuf_off = 0;

      /* convince the peer we're sane */
      ofp->obuf_len = sizeof(ofp->obuf.vers);
      memset(&ofp->obuf.vers, 0, sizeof(ofp->obuf.vers));

      strcpy(ofp->obuf.vers.body.str, version_str(ofp));
      ofp->obuf.vers.body.sender_srvr_id = htons(my_srvr_id);

      if (tp->next_passwd[0] != '\0') {
            ofp->flags |= OFLOD_FG_PASSWD_NEXT;
      } else {
            ofp->flags &= ~OFLOD_FG_PASSWD_NEXT;
            ofp->mp->flags &= ~FLOD_MMAP_FG_PASSWD_NEXT;
      }
      if (ofp->ids_mtime != ids_mtime) {
            ofp->ids_mtime = ids_mtime;
            ofp->mp->flags &= ~FLOD_MMAP_FG_PASSWD_NEXT;
      }
      if (ofp->mp->flags & FLOD_MMAP_FG_PASSWD_NEXT)
            dcc_sign(tp->next_passwd, sizeof(tp->next_passwd),
                   &ofp->obuf.vers, ofp->obuf_len);
      else
            dcc_sign(tp->cur_passwd, sizeof(tp->cur_passwd),
                   &ofp->obuf.vers, ofp->obuf_len);
      TMSG2(FLOD, "start flood to %s %s",
            ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su));

      ofp->flags |= OFLOD_FG_NEW;   /* pause to let peer reject us */

      return 1;
}



/* start to connect an out-going flood */
static int                    /* -1=failure, 0=not yet, 1=done */
oflod_connect_start(OFLOD_INFO *ofp,
                enum FLOD_ERR_OP op,
                const char *msg)
{
      DCC_FNM_LNO_BUF fnm_buf;
      int i;

      if (ofp->o_opts.flags & FLOD_OPT_SOCKS)
            i = Rconnect(ofp->s, &ofp->rem_su.sa, DCC_SU_LEN(&ofp->rem_su));
      else
            i = connect(ofp->s, &ofp->rem_su.sa, DCC_SU_LEN(&ofp->rem_su));
      if (0 > i && errno != EISCONN) {
            if (errno == EAGAIN
                || errno == EINPROGRESS
                || errno == EALREADY)
                  return 0;

            /* several UNIX-like systems return EINVAL for the second
             * connect() after a Unreachable ICMP message or timeout */
            if (errno == EINVAL)
                  errno = ECONNREFUSED;
            rpt_err(&ofp->mp->o_err, 0, op, errno,
                  "%s(oflod %s %s%s): %s",
                  msg, ofp->rem_hostname,
                  dcc_su2str_err(&ofp->rem_su),
                  fnm_lno(fnm_buf, flod_path, ofp->lno),
                  ERROR_STR());
            oflod_close(ofp, 1, FLOD_ERR_SAME, 0);
            return -1;
      }

      if (!oflod_connect_fin(ofp)) {
            oflod_close(ofp, 1, FLOD_ERR_SAME, 0);
            return -1;
      }

      return 1;
}



void
oflod_open(OFLOD_INFO *ofp)
{
      DCC_FNM_LNO_BUF fnm_buf;
      DCC_SOCKU loc_su, su2;
      const DCC_SOCKU *sup;
      const SRVR_SOC *sp;
      int error;

      if (ofp->s >= 0
          || ofp->rem_hostname[0] == '\0'
          || (ofp->o_opts.flags & FLOD_OPT_OFF)
          || flods_st != FLODS_ST_ON
          || (ofp->o_opts.flags & FLOD_OPT_PASSIVE))
            return;

      if (!DB_IS_TIME(ofp->out_try_again, ofp->out_try_secs))
            return;

      if (!flod_names_resolve_start())
            return;                 /* wait for name resolution */

      if (ofp->mp->rem_su.sa.sa_family == AF_UNSPEC) {
            if (ofp->mp->host_error != 0)
                  rpt_err(&ofp->mp->o_err, 0,
                        FLOD_ERR_GET_HOST, ofp->mp->host_error,
                        "flood peer name %s: %s%s",
                        ofp->rem_hostname,
                        DCC_HSTRERROR(ofp->mp->host_error),
                        fnm_lno(fnm_buf, flod_path, ofp->lno));
            oflod_close(ofp, 1, FLOD_ERR_SAME, 0);
            return;
      }
      ofp->rem_su = ofp->mp->rem_su;

      ofp->s = socket(ofp->rem_su.sa.sa_family, SOCK_STREAM, 0);
      if (ofp->s < 0) {
            rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_IO, errno,
                  "oflod%s socket(): %s",
                  fnm_lno(fnm_buf, flod_path, ofp->lno),
                  ERROR_STR());
            oflod_close(ofp, 1, FLOD_ERR_SAME, 0);
            return;
      }
      ++oflods.active;

      if (!set_flod_socket(ofp->s, ofp->rem_hostname, &ofp->rem_su, 0)) {
            oflod_close(ofp, 1, FLOD_ERR_SAME, 0);
            return;
      }

      memset(&loc_su, 0, sizeof(loc_su));
      if (ofp->loc_hostname[0] != '\0'
               || ofp->loc_port != 0) {
            /* Resolve the local host name.
             * This should not take significant time because
             * the hostnames should be locally known.  That
             * means we don't need to use a separate thread. */
            if (ofp->loc_hostname[0] != '\0') {
                  dcc_host_lock();
                  if (!dcc_get_host(ofp->loc_hostname,
                                ofp->rem_su.sa.sa_family == AF_INET
                                ? 0 : 1,
                                &error)) {
                        dcc_error_msg("flood local name %s: %s%s",
                                    ofp->loc_hostname,
                                    DCC_HSTRERROR(error),
                                    fnm_lno(fnm_buf, flod_path,
                                          ofp->lno));
                  } else {
                        /* match local address family to remote */
                        sup = dcc_hostaddrs;
                        for (;;) {
                              if (sup->sa.sa_family
                                  == ofp->rem_su.sa.sa_family) {
                                  loc_su = *sup;
                                  break;
                              }
                              if (++sup >= dcc_hostaddrs_end) {
                                  dcc_error_msg("family matching %s"
                                          " %s not available for"
                                          " %s",
                                          ofp->rem_hostname,
                                          dcc_su2str_err(&ofp
                                              ->rem_su),
                                          ofp->loc_hostname);
                                  ofp->loc_hostname[0] = '\0';
                                  break;
                              }
                        }
                  }
                  dcc_host_unlock();
            }
      }

      /* if there is a single "-a address" other than localhost
       * and of the right family, then default to it */
      if (!(ofp->o_opts.flags & FLOD_OPT_SOCKS)
          && loc_su.sa.sa_family == AF_UNSPEC) {
            for (sp = srvr_socs; sp; sp = sp->fwd) {
                  if (dcc_ipv6sutoipv4(&su2, &sp->su)
                      && su2.ipv4.sin_addr.s_addr == ntohl(0x7f000001))
                        continue;
                  if (sp->su.sa.sa_family != ofp->rem_su.sa.sa_family)
                        continue;
                  if (loc_su.sa.sa_family != AF_UNSPEC) {
                        memset(&loc_su, 0, sizeof(loc_su));
                        break;
                  }
                  loc_su = sp->su;
            }
      }

      if (loc_su.sa.sa_family != AF_UNSPEC
          || ofp->loc_port != 0) {
            loc_su.sa.sa_family = ofp->rem_su.sa.sa_family;
            *DCC_SU_PORT(&loc_su) = ofp->loc_port;
            if (0 > bind(ofp->s, &loc_su.sa, DCC_SU_LEN(&loc_su)))
                  dcc_error_msg("bind(oflod %s%s): %s",
                              dcc_su2str_err(&loc_su),
                              fnm_lno(fnm_buf, flod_path, ofp->lno),
                              ERROR_STR());
      }

      oflod_connect_start(ofp, FLOD_ERR_CONNECT, "connect");
}



void
oflod_write(OFLOD_INFO *ofp)
{
      int i;

      if (ofp->obuf_len == 0) {
            if (!(ofp->flags & OFLOD_FG_CONNECTED)
                && 0 >= oflod_connect_start(ofp, FLOD_ERR_CONNECT2,
                                    "connect2"))
                  return;
            oflod_fill(ofp);
            if (ofp->obuf_len == 0)
                  return;
      }

      if (ofp->o_opts.flags & FLOD_OPT_SOCKS)
            i = Rsend(ofp->s, &ofp->obuf.c[ofp->obuf_off],
                    ofp->obuf_len - ofp->obuf_off, 0);
      else
            i = send(ofp->s, &ofp->obuf.c[ofp->obuf_off],
                   ofp->obuf_len - ofp->obuf_off, 0);
      if (i > 0) {
            ofp->obuf_off += i;
            if (ofp->obuf_off >= ofp->obuf_len) {
                  /* that emptied the buffer */
                  ofp->obuf_len = 0;
                  ofp->obuf_off = 0;
                  ofp->cnts.out_reports += ofp->obuf_reports;
                  /* we might want to shut down, so fill buffer again */
                  oflod_fill(ofp);
            }
            ofp->flags &= ~OFLOD_FG_EAGAIN;
            return;
      }

      /* we had an error or EOF */
      if (i < 0) {
            /* we come here only when select() has said that we can send().
             * However, it seems that Solaris nevertheless sometimes
             * says EAGAIN */
            if (DCC_BLOCK_ERROR()) {
                  ofp->flags |= OFLOD_FG_EAGAIN;
                  TMSG3(FLOD, "pause after send(oflod to %s %s): %s",
                        ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su),
                        ERROR_STR());
                  return;
            }

            rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_IO, errno,
                  "send(oflod to %s %s): %s",
                  ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su),
                  ERROR_STR());
      } else {
            rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_IO, 0,
                  "premature end of oflod to %s %s",
                  ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su));
      }
      oflod_read(ofp);        /* get any last error message */
      oflod_close(ofp, 1, FLOD_ERR_SAME, 0);
}



static void
oflod_read_emsg(OFLOD_INFO *ofp)
{
      enum FLOD_ERR_OP op;
      u_char fail = 1;

      if (!strncmp(ofp->ibuf.end.msg, DCC_FLOD_BAD_VER_MSG_BASE,
                 STRZ(DCC_FLOD_BAD_VER_MSG_BASE))) {
            op = FLOD_ERR_BAD_VERS;
            if (ofp->oversion != ofp->iversion) {
                  /* notice if this peer demands a version
                   * other than what we have been trying */
                  ofp->oversion = ofp->iversion;
                  fail = 0;

            } else {
                  /* otherwise don't try for a while */
                  oflod_retry_backoff(ofp);
            }

      } else if (!memcmp(ofp->ibuf.end.msg, DCC_FLOD_BAD_ID_MSG,
                     STRZ(DCC_FLOD_BAD_ID_MSG))) {
            op = FLOD_ERR_ID;
            oflod_retry_backoff(ofp);

      } else if (!memcmp(ofp->ibuf.end.msg, DCC_FLOD_PASSWD_ID_MSG,
                     STRZ(DCC_FLOD_PASSWD_ID_MSG))
               || !memcmp(ofp->ibuf.end.msg, DCC_FLOD_NO_PASSWD_MSG,
                        STRZ(DCC_FLOD_NO_PASSWD_MSG))) {
            op = FLOD_ERR_SIGN;
            oflod_retry_backoff(ofp);

      } else if (!memcmp(ofp->ibuf.end.msg, DCC_FLOD_BAD_AUTH_MSG,
                     STRZ(DCC_FLOD_BAD_AUTH_MSG))) {
            op = FLOD_ERR_SIGN;
            /* try the second password if possible
             * after the peer rejects the first */
            if ((ofp->flags & OFLOD_FG_PASSWD_NEXT)
                && !(ofp->mp->flags & FLOD_MMAP_FG_PASSWD_NEXT)) {
                  ofp->mp->flags |= FLOD_MMAP_FG_PASSWD_NEXT;
                  ofp->out_try_again = 0;
            } else {
                  oflod_retry_backoff(ofp);
            }

      } else {
            ofp->out_try_secs = FLOD_RETRY_SECS/2;
            op = ((flods_st == FLODS_ST_OFF
                   || (ofp->o_opts.flags & FLOD_OPT_OFF))
                  ? FLOD_ERR_LOCAL_OFF
                  : FLOD_ERR_REMOTE_OFF);
      }

      rpt_err(&ofp->mp->o_err, 0, op, 0,
            "oflod end status from %s %s: %s",
            ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su),
            ofp->ibuf.end.msg);
      oflod_close(ofp, fail, FLOD_ERR_SAME, 0);
}



/* see what the target has to say about the reports we have been sending */
void
oflod_read(OFLOD_INFO *ofp)
{
      int used, i;
      DB_PTR pos;

again:;
      if (ofp->o_opts.flags & FLOD_OPT_SOCKS)
            i = Rrecv(ofp->s, &ofp->ibuf.b[ofp->ibuf_len],
                    sizeof(ofp->ibuf) - ofp->ibuf_len, 0);
      else
            i = recv(ofp->s, &ofp->ibuf.b[ofp->ibuf_len],
                   sizeof(ofp->ibuf) - ofp->ibuf_len, 0);
      if (i <= 0) {
            if (i < 0 && errno != ECONNRESET) {
                  if (!DCC_BLOCK_ERROR()) {
                        rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_IO, errno,
                              "recv(oflod %s %s): %s",
                              ofp->rem_hostname,
                              dcc_su2str_err(&ofp->rem_su),
                              ERROR_STR());
                        oflod_close(ofp, 1, FLOD_ERR_SAME, 0);
                  }
                  return;
            }

            /* before closing, the peer is supposed to send a
             * "position" of DCC_FLOD_POS_END followed by
             * an ASCII message */
            if (ofp->ibuf_len <= sizeof(DCC_FLOD_POS)) {
                  if (ofp->obuf_len != 0)
                        rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_BAD_DATA,0,
                              "truncated oflod end status from %s %s",
                              ofp->rem_hostname,
                              dcc_su2str_err(&ofp->rem_su));
                  oflod_close(ofp, 1, FLOD_ERR_SAME, 0);
                  return;
            }

            pos = flod_pos2db_ptr(ofp->ibuf.end.z);
            if (pos != DCC_FLOD_POS_END) {
                  dcc_error_msg("oflod end status from %s %s flag="L_HPAT,
                              ofp->rem_hostname,
                              dcc_su2str_err(&ofp->rem_su),
                              pos);
            }
            ofp->ibuf.b[ofp->ibuf_len] = '\0';
            if (!strncmp(ofp->ibuf.end.msg, DCC_FLOD_OK_STR,
                       sizeof(DCC_FLOD_OK_STR)-1)) {
                  TMSG3(FLOD, "oflod end status from %s %s: %s",
                        ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su),
                        ofp->ibuf.end.msg+sizeof(DCC_FLOD_OK_STR)-1);
                  oflod_close(ofp, 0,
                            (flods_st == FLODS_ST_OFF
                             || (ofp->o_opts.flags & FLOD_OPT_OFF))
                            ? FLOD_ERR_LOCAL_OFF
                            : FLOD_ERR_REMOTE_OFF,
                            0);
            } else {
                  oflod_read_emsg(ofp);
            }
            return;
      }
      ofp->ibuf_len += i;
      if (ofp->mp != 0)
            ofp->mp->o_err.ok = db_time.tv_sec + LAST_ERROR_OK_SECS;
      if (!(ofp->flags & OFLOD_FG_SHUTDOWN_REQ))
            ofp->keep_out_time = db_time.tv_sec + KEEPALIVE_OUT;

      used = 0;
      while (ofp->ibuf_len >= sizeof(ofp->ibuf.pos)) {
            if (used > 0) {
                  ofp->ibuf_len -= used;
                  if (ofp->ibuf_len == 0)
                        return;

                  memmove(ofp->ibuf.b, &ofp->ibuf.b[used],
                        ofp->ibuf_len);
                  used = 0;
            }

            pos = flod_pos2db_ptr(ofp->ibuf.end.z);
            if (pos < DCC_FLOD_POS_MIN) switch ((DCC_FLOD_POS_OPS)pos) {
            case DCC_FLOD_POS_END:
                  /* Wait for all of final status message
                   * until the target closes the TCP connection.
                   * Do not worry if the target stops without
                   * asking nicely, since at worst we will
                   * resend whatever was in the pipe next time. */
                  if (ofp->ibuf_len >= sizeof(ofp->ibuf.end)) {
                        rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_BAD_DATA,0,
                              "babbling flod receiver %s %s",
                              ofp->rem_hostname,
                              dcc_su2str_err(&ofp->rem_su));
                        oflod_close(ofp, 1, FLOD_ERR_SAME, 0);
                        return;
                  }
                  goto again; /* wait for the status */

            case DCC_FLOD_POS_END_REQ:
                  /* since the peer asked us to stop,
                   * do not start again for a while */
                  ofp->out_try_secs = FLOD_RETRY_SECS;
                  ofp->out_try_again = (db_time.tv_sec
                                    + ofp->out_try_secs);
                  TMSG2(FLOD, "postpone restarting flood to "
                        "%s for %d seconds",
                        ofp->rem_hostname, ofp->out_try_secs);
                  start_shutdown(ofp);
                  used = sizeof(ofp->ibuf.pos);
                  continue;

            case DCC_FLOD_POS_NOTE:
                  /* wait until we get the length of the complaint */
                  if (ofp->ibuf_len < FLOD_NOTE_OVHD)
                        goto again;
                  used = ofp->ibuf.note.len;
                  if (used > ISZ(ofp->ibuf.note)
                      || used <= FLOD_NOTE_OVHD) {
                        rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_BAD_DATA,0,
                              "bogus oflod note length %d from %s %s",
                              used, ofp->rem_hostname,
                              dcc_su2str_err(&ofp->rem_su));
                        oflod_close(ofp, 1, FLOD_ERR_SAME, 0);
                        return;
                  }
                  if ((int)ofp->ibuf_len < used)
                        goto again;
                  TMSG4(FLOD, "oflod note from %s %s: %.*s",
                        ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su),
                        used-FLOD_NOTE_OVHD, ofp->ibuf.note.str);
                  continue;

            case DCC_FLOD_POS_COMPLAINT:
                  /* wait until we get the length of the complaint */
                  if (ofp->ibuf_len < FLOD_NOTE_OVHD)
                        goto again;
                  used = ofp->ibuf.note.len;
                  if (used > ISZ(ofp->ibuf.note)
                      || used <= FLOD_NOTE_OVHD) {
                        rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_BAD_DATA,0,
                              "bogus oflod complaint length %d"
                              " from %s %s",
                              used, ofp->rem_hostname,
                              dcc_su2str_err(&ofp->rem_su));
                        oflod_close(ofp, 1, FLOD_ERR_SAME, 0);
                        return;
                  }
                  if ((int)ofp->ibuf_len < used)
                        goto again;
                  i = FLOD_CNTERR(&ofp->cnts.complaint);
                  if (i <= 0)
                        dcc_error_msg("oflod complaint from %s %s:"
                                    " %.*s%s",
                                    ofp->rem_hostname,
                                    dcc_su2str_err(&ofp->rem_su),
                                    used-FLOD_NOTE_OVHD,
                                    ofp->ibuf.note.str,
                                    (i<0 ? "" : "; stop complaints"));
                  continue;

            case DCC_FLOD_POS_REWIND:
                  used = sizeof(ofp->ibuf.pos);
                  dcc_trace_msg("oflod rewind from %s %s",
                              ofp->rem_hostname,
                              dcc_su2str_err(&ofp->rem_su));
                  ofp->mp->flags |= FLOD_MMAP_FG_REWINDING;
                  ofp->cur_pos = ofp->mp->confirm_pos = DB_PTR_BASE;
                  ofp->recv_pos = ofp->xmit_pos = DB_PTR_BASE;
                  ofp->rewind_pos = db_csize;
                  get_oflods_max_pos();
                  oflod_fill(ofp);
                  continue;

            case DCC_FLOD_POS_FFWD_IN:
                  used = sizeof(ofp->ibuf.pos);
                  dcc_trace_msg("FFWD its input from %s %s",
                              ofp->rem_hostname,
                              dcc_su2str_err(&ofp->rem_su));
                  ofp->cur_pos = db_csize;
                  get_oflods_max_pos();
                  continue;
            }


            /* The position from the peer must be one we sent,
             * and in the window we expect unless our
             * window has been broken by rewinding.
             * Even if our window is broken, the position must
             * be reasonable. */
            if ((pos < ofp->recv_pos
                 || pos > ofp->xmit_pos)
                && (!(ofp->mp->flags & FLOD_MMAP_FG_REWINDING)
                  || pos < DCC_FLOD_POS_MIN
                  || pos > db_csize)) {
                  rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_BAD_DATA,0,
                        "bogus confirmed flood position"
                        " "L_HPAT" from %s;"
                        " recv_pos="L_HPAT"  xmit_pos="L_HPAT,
                        pos, ofp->rem_hostname,
                        ofp->recv_pos, ofp->xmit_pos);
                  oflod_close(ofp, 1, FLOD_ERR_SAME, 0);
                  return;
            }
            ofp->recv_pos = pos;
            if (ofp->xmit_pos == ofp->recv_pos)
                  ofp->mp->confirm_pos = ofp->cur_pos;
            else if (ofp->mp->confirm_pos < ofp->recv_pos)
                  ofp->mp->confirm_pos = ofp->recv_pos;
            used = sizeof(ofp->ibuf.pos);

            /* things are going ok, so reset restart backoff */
            ofp->out_try_secs = 0;
      }
}



static void
oflods_ck(void)
{
      OFLOD_INFO *ofp;

      for (ofp = oflods.infos; ofp <= LAST(oflods.infos); ++ofp) {
            if (ofp->rem_hostname[0] == '\0')
                  break;

            if (ofp->flags & OFLOD_FG_EAGAIN) {
                  TMSG1(FLOD, "resume flooding %s after EAGAIN",
                        ofp->rem_hostname);
                  ofp->flags &= ~OFLOD_FG_EAGAIN;
            }

            /* shut down any streams that have been quiet for too long */
            if ((ofp->flags & OFLOD_FG_CONNECTED)
                && DB_IS_TIME(ofp->keep_out_time, KEEPALIVE_OUT))
                  oflod_close(ofp, 0, FLOD_ERR_KEEPALIVE, 0);
      }
}



void
oflods_stop(u_char force)
{
      OFLOD_INFO *ofp;

      flods_st = FLODS_ST_OFF;

      flod_mmap_sync(0, 0);

      for (ofp = oflods.infos; ofp <= LAST(oflods.infos); ++ofp) {
            if (ofp->rem_hostname[0] == '\0')
                  break;
            if (ofp->s < 0 || !(ofp->flags & OFLOD_FG_CONNECTED)) {
                  oflod_close(ofp, 0, FLOD_ERR_LOCAL_OFF, 0);
            } else if (force) {
                  oflod_close(ofp, 1, FLOD_ERR_LOCAL_OFF, 0);
            } else {
                  start_shutdown(ofp);
            }
      }

      if (oflods.active == 0
          && iflods.active == 0)
            oflods_unmap();
}



void PATTRIB(3,4)
db_broken(int linenum, const char *file, const char *p, ...)
{
      va_list args;

      if (!db_failed_line) {
            db_failed_line = linenum;
            db_failed_file = file;
      }
      va_start(args, p);
      dcc_verror_msg(p, args);
      va_end(args);
}



/* (re)start listening for incoming floods and sending outgoing floods */
void
flods_restart(const char *msg)
{
      /* unlocked database keeps flooding off */
      if (flods_off < 0)
            flods_off  = 0;
      if (flods_off > 0 || !DB_IS_LOCKED())
            return;

      if (msg)
            iflods_stop(msg, 0);

      flods_st = FLODS_ST_RESTART;

      flods_ck(0);
}



/* called periodically and at need */
void
flods_ck(u_char force)
{
      static int map_delayed;
      IFLOD_INFO *ifp;
      OFLOD_INFO *ofp;
      struct stat flod_sb;
      struct timeval;
      DCC_TS past, present;
      u_int rcd_len;
      int work;
      u_char mapped;
      int i, j;

      if (force)              /* force hostname resolution */
            got_hosts = 0;

      for (ifp = iflods.infos; ifp <= LAST(iflods.infos); ++ifp) {
            if (ifp->s < 0)
                  continue;

            /* end connections that failed to be completed */
            ofp = ifp->ofp;
            if (!ofp) {
                  if (DB_IS_TIME(ifp->quit_connect, IFLOD_CONNECT_SECS))
                        iflod_close(ifp, 1, FLOD_ERR_SIGN, 0,
                                  "%s failed to authenticate itself",
                                  ifp->hostname);
                  continue;
            }

            /* allow more complaints */
            if (DB_IS_TIME(ofp->limit_reset, FLOD_LIM_CLEAR_SECS)
                || force) {
                  complained_many_iflods = 0;
                  ofp->cnts.stale.limit = ofp->cnts.stale.val;
                  ofp->cnts.dup.limit = ofp->cnts.dup.val;
                  ofp->cnts.ok2.limit = ofp->cnts.ok2.val;
                  ofp->cnts.not_deleted.limit = ofp->cnts.not_deleted.val;
                  ofp->cnts.bad_id.limit = ofp->cnts.bad_id.val;
                  ofp->cnts.complaint.limit = ofp->cnts.complaint.val;
                  ofp->cnts.iflod_bad.limit = ofp->cnts.iflod_bad.val;
                  ofp->limit_reset = db_time.tv_sec+FLOD_LIM_CLEAR_SECS;
            }

            if (!(ifp->flags & IFLOD_FG_VERS_CK))
                  continue;
            save_flod_cnts(ofp);
            if (!DB_IS_TIME(ifp->ofp->keep_in_time, KEEPALIVE_IN)) {
                  /* Tell the peer how much we've processed. */
                  iflod_send_pos(ifp, 0);
            } else {
                  /* If we have not heard from the peer for a long time,
                   * then repeat our position or close the connection
                   * as a "keepalive".
                   * If we have asked the peer to stop and it has not
                   * done so, then break the link. */
                  if (ifp->flags & IFLOD_FG_END_REQ) {
                        iflod_close(ifp, 1, FLOD_ERR_KEEPALIVE, 0,
                                  "%s ignored close request",
                                  ifp->hostname);
                  } else if (!iflod_send_pos(ifp, 1)) {
                        iflod_stop(ifp, "keepalive");
                  }
            }
      }

      if (!flods_off && DB_IS_LOCKED()) {
            /* stop and restart the pumps if the list of peers has
             * changed or if our map has disappeared */
            if (0 > stat(flod_path, &flod_sb)) {
                  if (errno != ENOENT
                      && flod_mtime != 0)
                        dcc_error_msg("stat(%s): %s",
                                    flod_path, ERROR_STR());
                  flod_sb.st_mtime = 0;
            }
            if (flod_mtime != 0
                && 0 > access(flod_mmap_path, W_OK | R_OK)) {
                  if (errno != ENOENT)
                        dcc_error_msg("access(%s): %s",
                                    flod_mmap_path, ERROR_STR());
                  flod_sb.st_mtime = 0;
            }
            if (flods_st != FLODS_ST_RESTART
                && flod_sb.st_mtime != flod_mtime) {
                  if (flod_mtime != 0) {
                        dcc_trace_msg("%s has changed", flod_path);
                        flod_mtime = 0;
                  }
                  flods_st = FLODS_ST_RESTART;
            }
      }

      if (flods_st != FLODS_ST_ON) {
            oflods_stop(0);
            iflods_stop(0, 0);

            /* wait until the previous floods have stopped to restart */
            if (!flods_off && DB_IS_LOCKED()) {
                  if (oflods.active != 0 || iflods.active != 0
                      || !flod_names_resolve_ck()) {
                        flods_st = FLODS_ST_RESTART;
                        /* check again soon but not immediately */
                        if (next_flods_ck > db_time.tv_sec + 1)
                              next_flods_ck = db_time.tv_sec + 1;
                  } else {
                        if (oflods_load())
                              flods_st = FLODS_ST_ON;
                  }
            }
      }

      /* that is all we can do if flooding is off */
      if (flods_st != FLODS_ST_ON || flods_off || !DB_IS_LOCKED()) {
            oflods_ck();
            return;
      }

      iflods_start();

      /* generate delayed summaries */
      if (summarize_delay_secs == 0) {
            /* must summarize before reports are compressed */
            summarize_delay_secs = MAX_SUMMARIZE_DELAY_SECS;
            for (i = 0; i < DIM(db_parms.ex_secs); ++i) {
                  j = db_parms.ex_secs[i].all;
                  if (j != 0
                      && j < summarize_delay_secs)
                        j = summarize_delay_secs;
            }
      }
      dcc_timeval2ts(past, &db_time, -summarize_delay_secs);
      dcc_timeval2ts(present, &db_time, 0);
      if (flod_mmaps) {
            if (flod_mmaps->delay_pos > db_csize
                || flod_mmaps->delay_pos < DB_PTR_BASE)
                  flod_mmaps->delay_pos = DB_PTR_BASE;
            work = 0;
            while (flod_mmaps->delay_pos < db_csize) {
                  if (!db_map_rcd(0, &db_sts.sumrcd,
                              flod_mmaps->delay_pos,
                              &rcd_len)) {
                        flod_mmaps->delay_pos = db_csize;
                        break;
                  }
                  if (DB_RCD_DELAY(db_sts.sumrcd.d.r)) {
                        /* wait until it is time */
                        if (DCC_TS_NEWER_TS(db_sts.sumrcd.d.r->ts,
                                        past)
                            && !DCC_TS_NEWER_TS(db_sts.sumrcd.d.r->ts,
                                          present))
                              break;
                        if (!summarize_dly()) {
                              flod_mmaps->delay_pos = db_csize;
                              break;
                        }
                  }
                  flod_mmaps->delay_pos += rcd_len;

                  if (++work >= 20) {
                        /* spend at most 1 second at this
                         * and then let other processes run*/
                        if (too_busy)
                              break;
                        gettimeofday(&db_time, 0);
                        if (tv_diff2us(&db_time,&wake_time)>DCC_USECS) {
                              too_busy = 1;
                              break;
                        }
                        work = 0;
                  }
            }

            /* prime the outgoing pumps */
            for (ofp = oflods.infos;
                 ofp <= LAST(oflods.infos);
                 ++ofp) {
                  if (ofp->rem_hostname[0] == '\0')
                        break;

                  /* force error messages soon but not now to give
                   * new connetions a chance to be made */
                  if (force)
                        new_peer(ofp);

                  if (ofp->s >= 0) {
                        if (ofp->flags & OFLOD_FG_NEW)
                              ofp->flags &= ~OFLOD_FG_NEW;
                        else
                              oflod_fill(ofp);
                  } else {
                        oflod_open(ofp);
                  }

                  if ((ofp->o_opts.flags & FLOD_OPT_SOCKS)
                      && !(ofp->i_opts.flags & FLOD_OPT_OFF)) {
                        iflod_socks_start(ofp);
                  }
            }
      }

      mapped = 0;
      if (flod_mmaps) {
            map_delayed = 0;
      } else if (!force && ++map_delayed > 10) {
            oflods_load();
            mapped = 1;
            map_delayed = 0;
      }
      for (ofp = oflods.infos; ofp <= LAST(oflods.infos); ++ofp) {
            if (ofp->rem_hostname[0] == '\0')
                  break;
            if (!ofp->mp)
                  continue;

            if (ofp->s < 0
                && !(ofp->o_opts.flags & FLOD_OPT_OFF)) {
                  rpt_err(&ofp->mp->o_err, 0, FLOD_ERR_SAME, 0,
                        "no outgoing flood connection to"
                        " %s, server-ID %d",
                        ofp->rem_hostname, ofp->rem_id);
            }
            if (!ofp->ifp
                && !(ofp->i_opts.flags & FLOD_OPT_OFF)) {
                  rpt_err(&ofp->mp->i_err, 0, FLOD_ERR_SAME, 0,
                        "no incoming flood connection from"
                        " %s, server-ID %d",
                        ofp->rem_hostname, ofp->rem_id);
            }
      }
      if (mapped)
            oflods_unmap();

      oflods_ck();

      /* send the flood positions to the disk so that dblist can see them */
      flod_mmap_sync(0, 0);

      /* try to reap the hostname resolving child */
      flod_names_resolve_ck();
}



void
flods_init(void)
{
      IFLOD_INFO *ifp;

      for (ifp = iflods.infos; ifp <= LAST(iflods.infos); ++ifp)
            ifp->s = -1;

      oflods_clear();

      flods_restart(0);
}

Generated by  Doxygen 1.6.0   Back to index