/* Distributed Checksum Clearinghouse * * deal with outgoing floods of checksums * * Copyright (c) 2006 by Rhyolite Software, LLC * * This agreement is not applicable to any entity which sells anti-spam * solutions to others or provides an anti-spam solution as part of a * security solution sold to other entities, or to a private network * which employs the DCC or uses data provided by operation of the DCC * but does not provide corresponding data to other users. * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above * copyright notice and this permission notice appear in all copies. * * Parties not eligible to receive a license under this agreement can * obtain a commercial license to use DCC and permission to use * U.S. Patent 6,330,590 by contacting Commtouch at http://www.commtouch.com/ * or by email to nospam@commtouch.com. * * A commercial license would be for Distributed Checksum and Reputation * Clearinghouse software. That software includes additional features. This * free license for Distributed ChecksumClearinghouse Software does not in any * way grant permision to use Distributed Checksum and Reputation Clearinghouse * software * * THE SOFTWARE IS PROVIDED "AS IS" AND RHYOLITE SOFTWARE, LLC DISCLAIMS ALL * WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL RHYOLITE SOFTWARE, LLC * BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES * OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, * WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, * ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS * SOFTWARE. * * Rhyolite Software DCC 1.3.50-1.108 $Revision$ */ #include "dccd_defs.h" #include "dcc_ck.h" int flods_off; /* # of reasons flooding is off */ int flod_db_sick; /* # of flooding DB sicknesses */ u_char too_busy; /* too busy to flood */ time_t next_flods_ck; enum FLODS_ST flods_st = FLODS_ST_OFF; OFLODS oflods; /* records after this have not been flooded * 0 if invalid */ DB_PTR oflods_max_pos; static time_t flod_mtime = 1; int summarize_delay_secs; /* delay summaries by this */ static void oflod_fill(OFLOD_INFO *); /* the socket must already be closed */ static void oflod_clear(OFLOD_INFO *ofp) { memset(ofp, 0, sizeof(*ofp)); ofp->s = -1; } static void oflods_clear(void) { OFLOD_INFO *ofp; for (ofp = oflods.infos; ofp <= LAST(oflods.infos); ++ofp) oflod_clear(ofp); oflods.total = 0; oflods.active = 0; complained_many_iflods = 0; oflods_max_pos = 0; } void oflods_unmap(void) { OFLOD_INFO *ofp; for (ofp = oflods.infos; ofp <= LAST(oflods.infos); ++ofp) ofp->mp = 0; flod_unmap(0, &dccd_stats); } /* parse ID1->tgt in a flood file entry */ static char oflod_parse_map(OFLOD_OPTS *opts, const char *str0, int lno) { DCC_FNM_LNO_BUF fnm_buf; const char *str; OFLOD_SRVR_ID_MAP *imp; if (opts->num_maps >= DIM(opts->srvr_map)) { dcc_error_msg("too many ID mappings with\"%s\"%s", str0, fnm_lno(fnm_buf, flod_path, lno)); return 0; } imp = &opts->srvr_map[opts->num_maps]; if (!CSTRCMP(str0, "self->")) { str = str0+STRZ("self->"); imp->from_lo = imp->from_hi = my_srvr_id; } else if (!CSTRCMP(str0, "all->")) { str = str0+STRZ("all->"); imp->from_lo = DCC_SRVR_ID_MIN; imp->from_hi = DCC_SRVR_ID_MAX; } else { /* get ID1 */ str = dcc_get_srvr_id(0, &imp->from_lo, str0, "-", flod_path, lno); if (!str) return 0; if (str[0] == '-' && str[1] == '>') { /* ID1 is not a range */ imp->from_hi = imp->from_lo; } else { /* ID1 is a range of IDs */ str = dcc_get_srvr_id(0, &imp->from_hi, str+1, "-", flod_path, lno); if (!str) return 0; if (imp->from_hi < imp->from_lo) { dcc_error_msg("invalid ID mapping range " "\"%d-%d\"%s", imp->from_lo, imp->from_hi, fnm_lno(fnm_buf, flod_path, lno)); return 0; } } if (*str++ != '-' || *str++ != '>') { dcc_error_msg("invalid server-ID mapping \"%s\"%s", str0, fnm_lno(fnm_buf, flod_path, lno)); return 0; } } if (!strcasecmp(str, "self")) { imp->result = ID_MAP_SELF; } else if (!strcasecmp(str, "reject")) { imp->result = ID_MAP_REJ; } else if (!strcasecmp(str, "ok")) { imp->result = ID_MAP_NO; } else { dcc_error_msg("invalid ID mapping result \"%s\"%s", str, fnm_lno(fnm_buf, flod_path, lno)); return 0; } ++opts->num_maps; return 1; } /* parse remote or local options that can be any of * a "off", "del", "no-del", "log-del", "passive", ID->map, etc. */ static const char * /* rest of the line */ oflod_parse_opts(OFLOD_INFO *ofp, OFLOD_OPTS *opts, const char *buf, int lno) { DCC_FNM_LNO_BUF fnm_buf; char opts_buf[200]; char opt[20]; const char *buf_ptr, *p; char *end; unsigned long l; u_int olen; /* pick out the blank delimited string of options */ buf = dcc_parse_word(0, opts_buf, sizeof(opts_buf), buf, "flood options", flod_path, lno); if (!buf) return 0; opts->path_len = DCC_MAX_FLOD_PATH; if (grey_on) opts->flags |= (FLOD_OPT_DEL_OK | FLOD_OPT_NO_LOG_DEL | FLOD_OPT_DEL_SET); /* parse the options */ buf_ptr = opts_buf; while (*buf_ptr != '\0') { while (*buf_ptr == ',') ++buf_ptr; olen = strcspn(buf_ptr, ","); if (olen >= sizeof(opt)) olen = sizeof(opt)-1; strncpy(opt, buf_ptr, olen); opt[olen] = '\0'; buf_ptr += olen; /* ignore "-" */ if (!strcmp(opt, "-")) continue; if (!strcasecmp(opt, "off")) { opts->flags |= FLOD_OPT_OFF; continue; } if (!grey_on) { if (!strcasecmp(opt, "traps")) { opts->flags |= FLOD_OPT_TRAPS; continue; } if (!strcasecmp(opt, "no-del")) { opts->flags &= ~FLOD_OPT_DEL_OK; opts->flags |= FLOD_OPT_DEL_SET; continue; } if (!strcasecmp(opt, "del")) { opts->flags |= FLOD_OPT_DEL_OK; opts->flags |= FLOD_OPT_DEL_SET; continue; } } /* put some options in one or or the other no matter * for which they are specified */ if (!strcasecmp(opt, "no-log-del")) { ofp->i_opts.flags |= FLOD_OPT_NO_LOG_DEL; continue; } if (!strcasecmp(opt, "log-del")) { ofp->i_opts.flags &= ~FLOD_OPT_NO_LOG_DEL; continue; } if (!strcasecmp(opt, "passive")) { if (ofp->o_opts.flags & FLOD_OPT_SOCKS) { dcc_error_msg("\"passive\" and \"SOCKS\";" " cannot both be%s", fnm_lno(fnm_buf, flod_path, lno)); return 0; } ofp->o_opts.flags |= FLOD_OPT_PASSIVE; continue; } if (!strcasecmp(opt, "socks")) { if (ofp->o_opts.flags & FLOD_OPT_PASSIVE) { dcc_error_msg("\"passive\" and \"SOCKS\";" " cannot both be%s", fnm_lno(fnm_buf, flod_path, lno)); return 0; } if (ofp->loc_hostname[0] != '\0' || ofp->loc_port != 0) { dcc_error_msg("local host name or port number" " and \"SOCKS\";" " cannot both be%s", fnm_lno(fnm_buf, flod_path, lno)); ofp->loc_hostname[0] = '\0'; ofp->loc_port = 0; } ofp->o_opts.flags |= FLOD_OPT_SOCKS; continue; } if (!strcasecmp(opt, "IPv4")) { if (ofp->o_opts.flags & FLOD_OPT_IPv6) { dcc_error_msg("\"IPv4\" and \"IPv6\";" " cannot both be%s", fnm_lno(fnm_buf, flod_path, lno)); return 0; } ofp->o_opts.flags |= FLOD_OPT_IPv4; continue; } if (!strcasecmp(opt, "IPv6")) { if (ofp->o_opts.flags & FLOD_OPT_IPv4) { dcc_error_msg("\"IPv4\" and \"IPv6\";" " cannot both be%s", fnm_lno(fnm_buf, flod_path, lno)); return 0; } ofp->o_opts.flags |= FLOD_OPT_IPv6; continue; } if (!strcasecmp(opt, "no-reputations")) { continue; } if (!CSTRCMP(opt, "leaf=") && (l = strtoul(opt+STRZ("leaf="), &end, 10), *end == '\0')) { if (l > DCC_MAX_FLOD_PATH) l = DCC_MAX_FLOD_PATH; ofp->o_opts.path_len = l; continue; } #ifdef DCC_FLOD_VERSION7 if (!strcasecmp(opt, "version7")) { ofp->oversion = DCC_FLOD_VERSION7; continue; } #endif /* parse an ID->map */ p = strchr(opt, '>'); if (p && p > opt && *(p-1) == '-') { if (!oflod_parse_map(opts, opt, lno)) return 0; continue; } dcc_error_msg("unknown option \"%s\"%s", opt, fnm_lno(fnm_buf, flod_path, lno)); return 0; } return buf; } static const char * /* rest of a flod file line */ oflod_parse_id(DCC_SRVR_ID *id, const char *buf, const char *type, int lno) { char id_buf[20]; buf = dcc_parse_word(0, id_buf, sizeof(id_buf), buf, type, flod_path, lno); if (!buf) return 0; if (!strcmp(id_buf, "-") || id_buf[0] == '\0') { *id = DCC_ID_INVALID; return buf; } if (!dcc_get_srvr_id(0, id, id_buf, 0, flod_path, lno)) return 0; /* do not check whether we know the local ID here, because * changes in the ids file can make that check moot */ return buf; } /* compute the maximum position among all floods */ static void get_oflods_max_pos(void) { OFLOD_INFO *ofp; oflods_max_pos = DB_PTR_BASE; for (ofp = oflods.infos; ofp <= LAST(oflods.infos); ++ofp) { if (ofp->rem_hostname[0] != '\0' && oflods_max_pos < ofp->cur_pos) oflods_max_pos = ofp->cur_pos; } } static void copy_opts2mp(OFLOD_INFO *ofp) { FLOD_MMAP *mp = ofp->mp; mp->flags &= (FLOD_MMAP_FG_REWINDING | FLOD_MMAP_FG_NEED_REWIND | FLOD_MMAP_FG_FFWD_IN); if (db_parms.flags & DB_PARM_FG_CLEARED) { mp->flags |= FLOD_MMAP_FG_NEED_REWIND; mp->flags &= ~FLOD_MMAP_FG_FFWD_IN; } if (ofp->o_opts.flags & FLOD_OPT_PASSIVE) mp->flags |= FLOD_MMAP_FG_PASSIVE; else if (ofp->o_opts.flags & FLOD_OPT_SOCKS) mp->flags |= FLOD_MMAP_FG_SOCKS; if (ofp->o_opts.path_len != DCC_MAX_FLOD_PATH) mp->flags |= FLOD_MMAP_FG_LEAF; if (ofp->o_opts.num_maps != 0 || ofp->i_opts.num_maps != 0) mp->flags |= FLOD_MMAP_FG_MAPPED; if (ofp->o_opts.flags & FLOD_OPT_IPv4) { if (mp->flags & FLOD_MMAP_FG_IPv6) { mp->flags &= ~FLOD_MMAP_FG_IPv6; got_hosts = 0; } mp->flags |= FLOD_MMAP_FG_IPv4; } else if (ofp->o_opts.flags & FLOD_OPT_IPv6) { if (mp->flags & FLOD_MMAP_FG_IPv4) { mp->flags &= ~FLOD_MMAP_FG_IPv4; got_hosts = 0; } mp->flags |= FLOD_MMAP_FG_IPv6; } mp->flags &= ~(FLOD_MMAP_FG_FLOD_REP_OFF | FLOD_MMAP_FG_PEER_REP_OFF); /* get new hostname if it changes */ if (strcasecmp(mp->rem_hostname, ofp->rem_hostname)) { BUFCPY(mp->rem_hostname, ofp->rem_hostname); /* force name resolution for new name */ got_hosts = 0; new_peer(ofp); } /* always get new port name * in case the name but not the number changes */ BUFCPY(mp->rem_portname, ofp->rem_portname); if (mp->rem_port != ofp->rem_port) { mp->rem_port = ofp->rem_port; new_peer(ofp); } } /* Load the hostnames of DCC server peers and their output flood positions. * flod_names_resolve_ck() must say ok before this function is called, * to avoid races with changing host names. * * oflods.active and iflods.active must both be 0 to ensure that * there are no old pointers to the ofp structures. * * Parse lines of the form * hostname port rem-ID [passwd-id [out-opts [in-opts [versionX]]]] */ u_char /* 1=ok to start flooding */ oflods_load(void) { DCC_FNM_LNO_BUF fnm_buf; OFLOD_INFO *ofp, *ofp1; FILE *f; struct stat sb; int lno; char buf[200]; char hostname[60]; const char *bufp, *bufp1; FLOD_MMAP *mp, *mp1; union { OFLOD_INFO info; FLOD_MMAP map; } swap; char *p; int i; /* forget everything about output flooding */ oflods_clear(); /* keep the map open and locked most of the time */ if (!flod_mmap(dcc_emsg, db_parms.sn, &dccd_stats, 1, (DCC_TRACE_FLOD_BIT & dccd_tracemask) != 0)) { dcc_error_msg("%s", dcc_emsg); flod_mtime = 0; return 0; } f = fopen(flod_path, "r"); if (!f) { int serrno = errno; if (flod_mtime != 0) { dcc_error_msg("fopen(%s): %s", flod_path, ERROR_STR()); flod_mtime = 0; } /* do not clear or unmap the flod map file if the text file * is missing in case it has been temporarily deleted * while being edited */ if (serrno != ENOENT) flod_unmap(0, &dccd_stats); return 0; } if (0 > fstat(fileno(f), &sb)) { if (flod_mtime!= 0) dcc_error_msg("stat(%s): %s", flod_path, ERROR_STR()); fclose(f); flod_mtime = 0; return 0; } flod_mtime = sb.st_mtime; /* Parse the file of names and numbers first so that we do not * destroy the position information if there is a problem with names */ ofp = oflods.infos; lno = 0; for (;;) { /* clear the entry in case we started to set it with the * preceding line from the /var/dcc/flod file */ if (ofp <= LAST(oflods.infos)) oflod_clear(ofp); ++lno; bufp = fgets(buf, sizeof(buf), f); if (!bufp) { if (ferror(f)) { dcc_error_msg("fgets(%s): %s", flod_path, ERROR_STR()); break; } if (fclose(f) == EOF) { dcc_error_msg("fclose(%s): %s", flod_path, ERROR_STR()); } f = 0; break; } i = strlen(bufp); if (i >= ISZ(buf)-1) { dcc_error_msg("too many characters%s", fnm_lno(fnm_buf, flod_path, lno)); do { i = getc(f); } while (i != '\n' && i != EOF); continue; } /* ignore comments */ p = strchr(bufp, '#'); if (p) *p = '\0'; else p = &buf[i]; /* trim trailing blanks */ while (--p > bufp && (*p == ' ' || *p == '\t' || *p == '\n')) *p = '\0'; /* skip blank lines */ bufp += strspn(bufp, DCC_WHITESPACE); if (*bufp == '\0') continue; if (oflods.total >= DIM(oflods.infos)) { dcc_error_msg("too many DCC peers in %s; max=%d", flod_path, DIM(oflods.infos)); continue; } ofp->lno = lno; /* get IP address and port number of remote DCC server */ bufp1 = bufp+strcspn(bufp, DCC_WHITESPACE";"); if (*bufp1 != ';') { bufp1 = 0; } else { /* Allow the local or client TCP IP address and * port number to be specified. */ buf[bufp1++ - buf] = '\0'; } bufp = dcc_parse_nm_port(0, bufp, def_port, ofp->rem_hostname, sizeof(ofp->rem_hostname), &ofp->rem_port, ofp->rem_portname, sizeof(ofp->rem_portname), flod_path, lno); if (!bufp) continue; if (bufp1) { /* parse the local IP address first */ bufp = dcc_parse_nm_port(0, bufp1, 0, ofp->loc_hostname, sizeof(ofp->loc_hostname), &ofp->loc_port, 0, 0, flod_path, lno); if (!bufp) continue; } bufp = oflod_parse_id(&ofp->rem_id, bufp, "rem-id", lno); if (!bufp) continue; if (ofp->rem_id == DCC_ID_INVALID) { dcc_error_msg("missing rem-id%s", fnm_lno(fnm_buf, flod_path, lno)); continue; } bufp = oflod_parse_id(&ofp->out_passwd_id, bufp, "passwd-id", lno); if (!bufp) continue; if (ofp->out_passwd_id == DCC_ID_INVALID) { ofp->out_passwd_id = my_srvr_id; ofp->in_passwd_id = ofp->rem_id; } else { ofp->in_passwd_id = ofp->out_passwd_id; } ofp->oversion = DCC_FLOD_VERSION_DEF; bufp = oflod_parse_opts(ofp, &ofp->o_opts, bufp, lno); if (!bufp) continue; bufp = oflod_parse_opts(ofp, &ofp->i_opts, bufp, lno); if (!bufp) continue; if (*bufp != '\0') dcc_error_msg("trailing garbage \"%s\" ignored%s", bufp, fnm_lno(fnm_buf, flod_path, lno)); /* both servers having spam traps and assuming the other * doesn't makes no sense */ if (((ofp->o_opts.flags & FLOD_OPT_TRAPS) || (ofp->i_opts.flags & FLOD_OPT_TRAPS)) && !(ofp->i_opts.flags & FLOD_OPT_OFF) && !(ofp->o_opts.flags & FLOD_OPT_OFF)) { dcc_error_msg("symmetric trap-only link%s", fnm_lno(fnm_buf, flod_path, lno)); continue; } for (ofp1 = oflods.infos; ofp1 < ofp; ++ofp1) { if ((!strcmp(ofp1->rem_hostname, ofp->rem_hostname) && ofp1->rem_port == ofp->rem_port) || ofp1->rem_id == ofp->rem_id) break; } if (ofp1 != ofp) { dcc_error_msg("duplicate DCC peer%s", fnm_lno(fnm_buf, flod_path, lno)); continue; } /* ignore ourself */ if (ofp->rem_id == my_srvr_id) continue; ofp->limit_reset = db_time.tv_sec + FLOD_LIM_CLEAR_SECS; ++ofp; ++oflods.total; } if (f) fclose(f); /* sort the list by server-ID so that `cdcc "flood list"` is sorted */ ofp = oflods.infos; while (ofp < LAST(oflods.infos)) { ofp1 = ofp+1; if (ofp1->rem_hostname[0] == '\0') break; if (ofp->rem_id <= ofp1->rem_id) { ofp = ofp1; continue; } /* bubble sort because the list is usually already * ordered and almost always tiny */ memcpy(&swap.info, ofp1, sizeof(swap.info)); memcpy(ofp1, ofp, sizeof(*ofp1)); memcpy(ofp, &swap.info, sizeof(*ofp)); ofp = oflods.infos; } /* Bubble sort the list in the /var/dcc/flod/map file so that is * sorted for `dblist -Hv`. The file will usually already be sorted * and is almost always very short. */ mp = flod_mmaps->mmaps; mp1 = mp+1; while (mp1 <= LAST(flod_mmaps->mmaps)) { if (mp1->rem_hostname[0] == '\0') { ++mp1; continue; } if (mp->rem_hostname[0] == '\0' || mp->rem_id <= mp1->rem_id) { mp = mp1++; continue; } memcpy(&swap.map, mp1, sizeof(swap.map)); memcpy(mp1, mp, sizeof(*mp1)); memcpy(mp, &swap.map, sizeof(*mp)); mp = flod_mmaps->mmaps; mp1 = mp+1; } /* combine our list that is based on the /var/dcc/flod file * with the memory mapped /var/dcc/flod/map file list of what has * been sent to each peer */ for (mp = flod_mmaps->mmaps; mp <= LAST(flod_mmaps->mmaps); ++mp) mp->oflod_index = -1; /* make one pass matching old names with their slots in the * mapped file */ for (ofp = oflods.infos; ofp <= LAST(oflods.infos); ++ofp) { if (ofp->rem_hostname[0] == '\0') break; for (i = 0; i < DIM(flod_mmaps->mmaps); ++i) { if (++mp > LAST(flod_mmaps->mmaps)) mp = flod_mmaps->mmaps; if (mp->rem_hostname[0] == '\0') continue; if (mp->oflod_index < 0 && ofp->rem_id == mp->rem_id) { /* found the old slot */ if (DB_PTR_IS_BAD(mp->confirm_pos) || mp->confirm_pos > db_csize) { dcc_error_msg("bogus position "L_HPAT " for %s in %s", mp->confirm_pos, ofp->rem_hostname, flod_mmap_path); mp->rem_hostname[0] = '\0'; continue; } ofp->cur_pos = mp->confirm_pos; ofp->rewind_pos = db_csize; ofp->mp = mp; mp->oflod_index = ofp - oflods.infos; copy_opts2mp(ofp); break; } } } /* use a free or obsolete slot in the mapped file for new entries */ mp = flod_mmaps->mmaps; for (ofp = oflods.infos; ofp <= LAST(oflods.infos); ++ofp) { if (ofp->rem_hostname[0] == '\0') break; if (ofp->mp != 0) continue; /* find a free or no longer used entry */ while (mp->oflod_index >= 0) { if (++mp > LAST(flod_mmaps->mmaps)) { bad_stop("too few oflod mmap slots"); goto out; } } if (mp->rem_hostname[0] != '\0') dcc_error_msg("forget flood to %s %d", dcc_host_portname(hostname, sizeof(hostname), mp->rem_hostname, mp->rem_portname), mp->rem_id); memset(mp, 0, sizeof(*mp)); ofp->mp = mp; mp->rem_su.sa.sa_family = AF_UNSPEC; mp->rem_id = ofp->rem_id; copy_opts2mp(ofp); mp->cnts.cnts_cleared = db_time.tv_sec; ofp->cur_pos = mp->confirm_pos = DB_PTR_BASE; ofp->recv_pos = ofp->xmit_pos = DB_PTR_BASE; mp->oflod_index = ofp - oflods.infos; dcc_error_msg("initialize flood to %s %d%s", dcc_host_portname(hostname, sizeof(hostname), mp->rem_hostname, mp->rem_portname), mp->rem_id, fnm_lno(fnm_buf, flod_path, ofp->lno)); } out:; /* clear the slots that contain forgotten hosts */ for (mp = flod_mmaps->mmaps; mp <= LAST(flod_mmaps->mmaps); ++mp) { if (mp->oflod_index == -1) { if (mp->rem_hostname[0] != '\0') dcc_error_msg("forget flood to %s %d", dcc_host_portname(hostname, sizeof(hostname), mp->rem_hostname, mp->rem_portname), mp->rem_id); memset(mp, 0, sizeof(*mp)); } } flod_mmap_sync(0, 1); db_parms.flags &= ~DB_PARM_FG_CLEARED; db_flush_parms(0); get_oflods_max_pos(); return 1; } /* put the flood counters in stable storage */ void save_flod_cnts(OFLOD_INFO *ofp) { FLOD_MMAP *mp; time_t delta; dccd_stats.iflod_total += ofp->cnts.total; dccd_stats.iflod_accepted += ofp->cnts.accepted; dccd_stats.iflod_stale += ofp->cnts.stale.val; dccd_stats.iflod_dup += ofp->cnts.dup.val; dccd_stats.iflod_ok2 += ofp->cnts.ok2.val; dccd_stats.iflod_not_deleted += ofp->cnts.not_deleted.val; mp = ofp->mp; if (mp) { if (ofp->xmit_pos == ofp->recv_pos) ofp->mp->confirm_pos = ofp->cur_pos; mp->cnts.total += ofp->cnts.total; mp->cnts.accepted += ofp->cnts.accepted; mp->cnts.stale += ofp->cnts.stale.val; mp->cnts.dup += ofp->cnts.dup.val; mp->cnts.ok2 += ofp->cnts.ok2.val; mp->cnts.not_deleted += ofp->cnts.not_deleted.val; mp->cnts.out_reports += ofp->cnts.out_reports; delta = db_time.tv_sec - ofp->cnts.saved; if (delta < 0) delta = 0; if (ofp->ifp) { if (mp->flags & FLOD_MMAP_FG_IN_CONN) { mp->cnts.in_total_conn += delta; } else { mp->flags |= FLOD_MMAP_FG_IN_CONN; mp->cnts.in_conn_changed = db_time.tv_sec; } } else { if (mp->flags & FLOD_MMAP_FG_IN_CONN) { mp->flags &= ~FLOD_MMAP_FG_IN_CONN; mp->cnts.in_conn_changed = db_time.tv_sec; } } if (ofp->flags & OFLOD_FG_CONNECTED) { if (mp->flags & FLOD_MMAP_FG_OUT_CONN) { mp->cnts.out_total_conn += delta; } else { mp->flags |= FLOD_MMAP_FG_OUT_CONN; mp->cnts.out_conn_changed = db_time.tv_sec; } } else { if (mp->flags & FLOD_MMAP_FG_OUT_CONN) { mp->flags &= ~FLOD_MMAP_FG_OUT_CONN; mp->cnts.out_conn_changed = db_time.tv_sec; } } } memset(&ofp->cnts, 0, sizeof(ofp->cnts)); ofp->cnts.saved = db_time.tv_sec; } void oflod_close(OFLOD_INFO *ofp, u_char fail, enum FLOD_ERR_OP err_op, int new_errno) { const char *opstr; int perrno; if (ofp->rem_hostname[0] == '\0') return; if (ofp->s >= 0) { if (0 > close(ofp->s) && !fail) dcc_error_msg("close(oflod %s %s): %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su), ERROR_STR()); if (!ofp->mp) { TMSG2(FLOD, "close(oflod %s %s) (stranger)", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su)); } else if (err_op == FLOD_ERR_SAME) { opstr = flod_stats_str(&perrno, ofp->mp->o_err.old_errno, ofp->mp->o_err.op); if (perrno) TMSG4(FLOD, "close(oflod %s %s) same %s: %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su), opstr, ERROR_STR1(perrno)); else TMSG3(FLOD, "close(oflod %s %s) same %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su), opstr); } else { opstr = flod_stats_str(&perrno, new_errno, err_op); if (perrno == 0) { rpt_err(&ofp->mp->o_err, 0, err_op, new_errno, "close(oflod %s %s): %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su), opstr); } else { rpt_err(&ofp->mp->o_err, 0, err_op, new_errno, "close(oflod %s %s): %s %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su), opstr, ERROR_STR1(perrno)); } } ofp->s = -1; ofp->flags &= ~OFLOD_FG_CONNECTED; ofp->obuf_len = 0; ofp->cnts.out_reports = 0; save_flod_cnts(ofp); if (--oflods.active == 0 && iflods.active == 0 && flods_st != FLODS_ST_ON) oflods_unmap(); } if (fail) { ofp->out_try_again = db_time.tv_sec + ofp->out_try_secs; TMSG2(FLOD, "postpone restarting flood to %s for %d seconds", ofp->rem_hostname, ofp->out_try_secs); if (ofp->out_try_secs < FLOD_RETRY_SECS) ofp->out_try_secs = FLOD_RETRY_SECS; } } /* get ready to shut down */ static void start_shutdown(OFLOD_INFO *ofp) { if (ofp->flags & OFLOD_FG_SHUTDOWN_REQ) return; /* arrange to ask the peer to ask us to stop */ ofp->flags |= OFLOD_FG_SHUTDOWN_REQ; oflod_fill(ofp); if (stopint) ofp->keep_out_time = db_time.tv_sec + SHUTDOWN_DELAY; else ofp->keep_out_time = db_time.tv_sec + KEEPALIVE_OUT_STOP; } /* Half-close the TCP connection. * The other DCC server will notice and send our final position * to acknowledge dealing with our reports. */ static void oflod_shutdown(OFLOD_INFO *ofp) { struct linger nowait; /* wait until the output buffer is empty */ if (ofp->obuf_len != 0) return; /* do it only once */ if ((ofp->flags & OFLOD_FG_SHUTDOWN)) return; ofp->flags |= OFLOD_FG_SHUTDOWN; /* on Solaris and Linux you must set SO_LINGER before shutdown() */ nowait.l_onoff = 1; nowait.l_linger = SHUTDOWN_DELAY; if (0 > setsockopt(ofp->s, SOL_SOCKET, SO_LINGER, &nowait, sizeof(nowait))) rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_IO, 0, "setsockopt(SO_LINGER flood to %s %s): %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su), ERROR_STR()); if (0 > shutdown(ofp->s, 1)) { rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_IO, 0, "shutdown(flood to %s %s): %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su), ERROR_STR()); oflod_close(ofp, 0, FLOD_ERR_SAME, 0); } } /* see if a report should be put into the output buffer for a flood * db_sts.rcd.d.r points to the current record. * ofp->cur_pos has already been advanced */ static u_char /* 1=flood this report, 0=don't */ oflod_ck_put(void) { const DB_RCD_CK *cur_rcd_ck; DCC_TGTS rcd_tgts, ck_tgts; int num_cks; DCC_CK_TYPES type; u_char obs_lvl, result; /* skip padding, whitelisted, compressed, trimmed * and deleted entries */ if (DB_RCD_ID(db_sts.rcd.d.r) == DCC_ID_WHITE || DB_RCD_ID(db_sts.rcd.d.r) == DCC_ID_COMP || (rcd_tgts = DB_TGTS_RCD_RAW(db_sts.rcd.d.r)) == 0 || DB_RCD_TRIMMED(db_sts.rcd.d.r)) return 0; /* Skip reports that should not be flooded yet * The flooding thresholds are used to set the delay flag. * Small reports are marked with the delay flag when they are added * to the database. If later it seems they should be flooded, * they are summarized in a new report that is flooded. */ if (DB_RCD_DELAY(db_sts.rcd.d.r)) return 0; result = 0; obs_lvl = 0; cur_rcd_ck = db_sts.rcd.d.r->cks; for (num_cks = DB_NUM_CKS(db_sts.rcd.d.r); num_cks != 0; ++cur_rcd_ck, --num_cks) { type = DB_CK_TYPE(cur_rcd_ck); /* ignore junk for deciding whether we can send this report. */ if (DB_TEST_NOKEEP(db_parms.nokeep_cks, type)) continue; if (DB_CK_OBS(cur_rcd_ck)) { /* an obsolete fuzzier checksum * makes less fuzzy checksums obsolete */ if (obs_lvl < db_ck_fuzziness[type]) { obs_lvl = db_ck_fuzziness[type]; result = 0; } continue; } /* send server-ID declarations */ if (type == DCC_CK_SRVR_ID) return 1; /* do not send what we think are stale reports */ if (DCC_TS_OLDER_TS(db_sts.rcd.d.r->ts, stale_ts[type])) continue; /* do not send whitelisted reports */ ck_tgts = DB_TGTS_CK(cur_rcd_ck); if (ck_tgts == DCC_TGTS_OK || ck_tgts == DCC_TGTS_OK2) return 0; /* send non-obsolete results */ if (obs_lvl <= db_ck_fuzziness[type]) { obs_lvl = db_ck_fuzziness[type]; result = 1; } } return result; } static void put_rcd_obuf(OFLOD_INFO *ofp, const DB_RCD *cur_rcd) { DCC_FLOD *bp; DCC_TGTS tgts; DCC_SRVR_ID srvr, psrvr; const DB_RCD_CK *cur_rcd_ck; DCC_CK *buf_ck; DCC_FLOD_PATH_ID *new_path_idp, *new_path_id_limp, *rcd_path_id; int path_id_max; DCC_CK_TYPES type; ID_MAP_RESULT srvr_mapped; u_char reflecting; /* 1=report is pointed at its source */ u_char non_path, all_spam; int num_cks, i, j; /* decide whether to send this report */ if (!oflod_ck_put()) return; /* skip it */ bp = (DCC_FLOD *)&ofp->obuf.c[ofp->obuf_len]; db_ptr2flod_pos(bp->pos, ofp->cur_pos); tgts = DB_TGTS_RCD_RAW(cur_rcd); if (tgts == DCC_TGTS_DEL) { /* don't send delete requests to systems that don't want them */ if (!(ofp->o_opts.flags & FLOD_OPT_DEL_OK)) return; } else if (ofp->o_opts.flags & FLOD_OPT_TRAPS) { tgts = DCC_TGTS_TOO_MANY; } srvr = DB_RCD_ID(cur_rcd); /* translate the source server-ID */ srvr_mapped = id_map(srvr, &ofp->o_opts); switch (srvr_mapped) { case ID_MAP_NO: break; case ID_MAP_REJ: return; case ID_MAP_SELF: srvr = my_srvr_id; break; } /* this loses the DCC_SRVR_ID_AUTH bit */ bp->srvr_id_auth[0] = srvr>>8; bp->srvr_id_auth[1] = srvr; reflecting = (srvr == ofp->rem_id); non_path = 0; memcpy(bp->ts, cur_rcd->ts, sizeof(bp->ts)); cur_rcd_ck = cur_rcd->cks; /* Add a path if we are not the source of the report * or if it already has a path */ buf_ck = bp->cks; if (srvr != my_srvr_id || DB_CK_TYPE(cur_rcd_ck) == DCC_CK_FLOD_PATH) { /* Add a checksum entry for a path consisting of only our * server-ID. If the report contains a path, we will * concatenate to this entry */ memset(buf_ck, 0, sizeof(*buf_ck)); buf_ck->len = sizeof(*buf_ck); buf_ck->type = DCC_CK_FLOD_PATH; new_path_idp = (DCC_FLOD_PATH_ID *)buf_ck->sum; new_path_idp->hi = my_srvr_id>>8; new_path_idp->lo = my_srvr_id; new_path_id_limp = new_path_idp + DCC_NUM_FLOD_PATH; path_id_max = ofp->o_opts.path_len-1; ++new_path_idp; ++buf_ck; bp->num_cks = 1; } else { /* do not add a path */ new_path_idp = new_path_id_limp = 0; path_id_max = 0; bp->num_cks = 0; } all_spam = 1; for (num_cks = DB_NUM_CKS(cur_rcd); num_cks != 0; ++cur_rcd_ck, --num_cks) { type = DB_CK_TYPE(cur_rcd_ck); if (type == DCC_CK_FLOD_PATH) { rcd_path_id = (DCC_FLOD_PATH_ID *)&cur_rcd_ck->sum; for (j = 0; j < DCC_NUM_FLOD_PATH; ++j, ++rcd_path_id) { psrvr = ((rcd_path_id->hi<<8) | rcd_path_id->lo); /* stop copying the path at its end */ if (psrvr == DCC_ID_INVALID) break; /* don't send report if its path is too long */ if (--path_id_max < 0) return; /* add another "checksum" to continue path */ if (new_path_idp >= new_path_id_limp) { memset(buf_ck, 0, sizeof(*buf_ck)); buf_ck->len = sizeof(*buf_ck); buf_ck->type = DCC_CK_FLOD_PATH; new_path_idp = (DCC_FLOD_PATH_ID * )buf_ck->sum; new_path_id_limp = (new_path_idp + DCC_NUM_FLOD_PATH); ++buf_ck; ++bp->num_cks; } /* Do not send reports from the target back * to the target unless the report is a * Server-ID declaration */ if (psrvr == ofp->rem_id) reflecting = 1; switch (id_map(psrvr, &ofp->o_opts)) { case ID_MAP_NO: break; case ID_MAP_REJ: return; case ID_MAP_SELF: psrvr = my_srvr_id; break; } new_path_idp->hi = psrvr>>8; new_path_idp->lo = psrvr; ++new_path_idp; } } else { /* Do not send translated server-ID declarations * or checksums in our own or in translated server-ID * reports that we wouldn't have kept if we had * received the original reports */ if (srvr_mapped == ID_MAP_SELF) { if (type == DCC_CK_SRVR_ID) return; if (DB_TEST_NOKEEP(db_parms.nokeep_cks, type)) continue; } /* Do not send reports from the target to the target * unless the report is a Server-ID declaration */ if (reflecting && type != DCC_CK_SRVR_ID) return; /* send everything else */ buf_ck->type = type; buf_ck->len = sizeof(*buf_ck); memcpy(buf_ck->sum, cur_rcd_ck->sum, sizeof(buf_ck->sum)); ++buf_ck; ++bp->num_cks; non_path = 1; if (all_spam && DB_TGTS_CK(cur_rcd_ck) != DCC_TGTS_TOO_MANY) all_spam = 0; } } /* quit if we found nothing but the path to send */ if (!non_path) return; if (all_spam && srvr == my_srvr_id) tgts = DCC_TGTS_TOO_MANY; tgts = htonl(tgts); memcpy(&bp->tgts, &tgts, sizeof(bp->tgts)); i = (char *)buf_ck - (char *)bp; ofp->obuf_len += i; ++ofp->obuf_reports; ofp->xmit_pos = ofp->cur_pos; } /* send reports from the database to a peer DCC server * This routine only fills the buffer. The buffer is eventually * written by oflod_write(). */ static void oflod_fill(OFLOD_INFO *ofp) { u_int cur_rcd_len; int work; ofp->flags &= ~(OFLOD_FG_TOO_BUSY | OFLOD_FG_NEW); /* stop when things are not ready or shutting down */ if (!(ofp->flags & OFLOD_FG_CONNECTED) || (ofp->flags & OFLOD_FG_SHUTDOWN)) return; /* stop when we are about to clean the database for a deletion so * that we will not be shut down cleaning along with our neighbors */ if (need_del_dbclean) return; /* after having started sending the buffer, * wait for it to drain before adding more */ if (ofp->obuf_off != 0) return; work = 0; while (ofp->obuf_len < sizeof(ofp->obuf) - sizeof(DCC_FLOD)) { if (ofp->cur_pos >= db_csize) { /* nothing to send * shut down if needed */ if (ofp->xmit_pos == ofp->recv_pos) ofp->mp->confirm_pos = ofp->cur_pos; if (ofp->mp->confirm_pos >= ofp->rewind_pos) ofp->mp->flags &= ~FLOD_MMAP_FG_REWINDING; if (ofp->flags & OFLOD_FG_SHUTDOWN_REQ) oflod_shutdown(ofp); break; } /* don't try to look at reports crossing page bounardies */ if (ofp->cur_pos%db_pagesize >= db_page_max) { ofp->cur_pos += DB_RCD_HDR_LEN; ++work; continue; } if (!db_map_rcd(dcc_emsg, &db_sts.rcd, ofp->cur_pos, &cur_rcd_len)) { dcc_error_msg("oflod_fill() starting at "L_HPAT " for %s: %s", ofp->cur_pos, ofp->rem_hostname, dcc_emsg); ofp->cur_pos = db_csize; break; } if (DB_NUM_CKS(db_sts.rcd.d.r) > DCC_DIM_CKS) { dcc_error_msg("impossible %d checksums in "L_HPAT, DB_NUM_CKS(db_sts.rcd.d.r), ofp->cur_pos); ofp->cur_pos = db_csize; break; } /* start a new entry unless we are shutting down */ if (ofp->flags & OFLOD_FG_SHUTDOWN_REQ) { oflod_shutdown(ofp); break; } /* stop if we've spent enough time here so that * incoming requests aren't lost */ if (++work >= 100) { if (too_busy) { ofp->flags |= OFLOD_FG_TOO_BUSY; break; } gettimeofday(&db_time, 0); if (tv_diff2us(&db_time, &wake_time) > DCC_US/10) { too_busy = 1; ofp->flags |= OFLOD_FG_TOO_BUSY; break; } work = 0; } ofp->cur_pos += cur_rcd_len; /* send the record */ put_rcd_obuf(ofp, db_sts.rcd.d.r); } if (oflods_max_pos < ofp->cur_pos) oflods_max_pos = ofp->cur_pos; } /* figure out what version to tell the peer */ const char * version_str(const OFLOD_INFO *ofp) { if (ofp->oversion == 0) return DCC_FLOD_VERSION_CUR_STR; #ifdef DCC_FLOD_VERSION7 if (ofp->oversion == DCC_FLOD_VERSION7) return DCC_FLOD_VERSION_CUR_STR; #endif dcc_logbad(EX_SOFTWARE, "unknown ofp->oversion=%d", ofp->oversion); } static void oflod_retry_backoff(OFLOD_INFO *ofp) { ofp->out_try_secs *= 2; if (ofp->out_try_secs > FLOD_SLOW_RETRY_SECS) ofp->out_try_secs = FLOD_SLOW_RETRY_SECS; else if (ofp->out_try_secs < FLOD_RETRY_SECS) ofp->out_try_secs = FLOD_RETRY_SECS; } /* finish connecting output flood by sending our version number and signature */ u_char oflod_connect_fin(OFLOD_INFO *ofp) { DCC_FNM_LNO_BUF fnm_buf; ID_TBL *tp; tp = find_id_tbl(ofp->out_passwd_id); if (!tp || tp->cur_passwd[0] == '\0') { if (ofp->mp != 0) rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_SIGN, 0, "oflod "DCC_FLOD_PASSWD_ID_MSG" %d%s", ofp->out_passwd_id, fnm_lno(fnm_buf, flod_path, ofp->lno)); return 0; } ofp->keep_out_time = db_time.tv_sec + KEEPALIVE_OUT; ofp->mp->o_err.ok = db_time.tv_sec + LAST_ERROR_OK_SECS; ofp->flags |= OFLOD_FG_CONNECTED; save_flod_cnts(ofp); ofp->flags &= ~OFLOD_FG_SHUTDOWN; ofp->flags &= ~OFLOD_FG_SHUTDOWN_REQ; ofp->recv_pos = ofp->xmit_pos = ofp->cur_pos = ofp->mp->confirm_pos; get_oflods_max_pos(); ofp->ibuf_len = 0; ofp->obuf_off = 0; /* convince the peer we're sane */ ofp->obuf_len = sizeof(ofp->obuf.vers); memset(&ofp->obuf.vers, 0, sizeof(ofp->obuf.vers)); strcpy(ofp->obuf.vers.body.str, version_str(ofp)); ofp->obuf.vers.body.sender_srvr_id = htons(my_srvr_id); if (tp->next_passwd[0] != '\0') { ofp->flags |= OFLOD_FG_PASSWD_NEXT; } else { ofp->flags &= ~OFLOD_FG_PASSWD_NEXT; ofp->mp->flags &= ~FLOD_MMAP_FG_PASSWD_NEXT; } if (ofp->ids_mtime != ids_mtime) { ofp->ids_mtime = ids_mtime; ofp->mp->flags &= ~FLOD_MMAP_FG_PASSWD_NEXT; } if (ofp->mp->flags & FLOD_MMAP_FG_PASSWD_NEXT) dcc_sign(tp->next_passwd, sizeof(tp->next_passwd), &ofp->obuf.vers, ofp->obuf_len); else dcc_sign(tp->cur_passwd, sizeof(tp->cur_passwd), &ofp->obuf.vers, ofp->obuf_len); TMSG2(FLOD, "start flood to %s %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su)); ofp->flags |= OFLOD_FG_NEW; /* pause to let peer reject us */ return 1; } /* start to connect an out-going flood */ static int /* -1=failure, 0=not yet, 1=done */ oflod_connect_start(OFLOD_INFO *ofp, enum FLOD_ERR_OP op, const char *msg) { DCC_FNM_LNO_BUF fnm_buf; int i; if (ofp->o_opts.flags & FLOD_OPT_SOCKS) i = Rconnect(ofp->s, &ofp->rem_su.sa, DCC_SU_LEN(&ofp->rem_su)); else i = connect(ofp->s, &ofp->rem_su.sa, DCC_SU_LEN(&ofp->rem_su)); if (0 > i && errno != EISCONN) { if (errno == EAGAIN || errno == EINPROGRESS || errno == EALREADY) return 0; /* several UNIX-like systems return EINVAL for the second * connect() after a Unreachable ICMP message or timeout */ if (errno == EINVAL) errno = ECONNREFUSED; rpt_err(&ofp->mp->o_err, 0, op, errno, "%s(oflod %s %s%s): %s", msg, ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su), fnm_lno(fnm_buf, flod_path, ofp->lno), ERROR_STR()); oflod_close(ofp, 1, FLOD_ERR_SAME, 0); return -1; } if (!oflod_connect_fin(ofp)) { oflod_close(ofp, 1, FLOD_ERR_SAME, 0); return -1; } return 1; } void oflod_open(OFLOD_INFO *ofp) { DCC_FNM_LNO_BUF fnm_buf; DCC_SOCKU loc_su, su2; const DCC_SOCKU *sup; const SRVR_SOC *sp; int error; if (ofp->s >= 0 || ofp->rem_hostname[0] == '\0' || (ofp->o_opts.flags & FLOD_OPT_OFF) || flods_st != FLODS_ST_ON || (ofp->o_opts.flags & FLOD_OPT_PASSIVE)) return; if (!DB_IS_TIME(ofp->out_try_again, ofp->out_try_secs)) return; if (!flod_names_resolve_start()) return; /* wait for name resolution */ if (ofp->mp->rem_su.sa.sa_family == AF_UNSPEC) { if (ofp->mp->host_error != 0) rpt_err(&ofp->mp->o_err, 0, FLOD_ERR_GET_HOST, ofp->mp->host_error, "flood peer name %s: %s%s", ofp->rem_hostname, DCC_HSTRERROR(ofp->mp->host_error), fnm_lno(fnm_buf, flod_path, ofp->lno)); oflod_close(ofp, 1, FLOD_ERR_SAME, 0); return; } ofp->rem_su = ofp->mp->rem_su; ofp->s = socket(ofp->rem_su.sa.sa_family, SOCK_STREAM, 0); if (ofp->s < 0) { rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_IO, errno, "oflod%s socket(): %s", fnm_lno(fnm_buf, flod_path, ofp->lno), ERROR_STR()); oflod_close(ofp, 1, FLOD_ERR_SAME, 0); return; } ++oflods.active; if (!set_flod_socket(ofp->s, ofp->rem_hostname, &ofp->rem_su, 0)) { oflod_close(ofp, 1, FLOD_ERR_SAME, 0); return; } memset(&loc_su, 0, sizeof(loc_su)); if (ofp->loc_hostname[0] != '\0' || ofp->loc_port != 0) { /* Resolve the local host name. * This should not take significant time because * the hostnames should be locally known. That * means we don't need to use a separate thread. */ if (ofp->loc_hostname[0] != '\0') { dcc_host_lock(); if (!dcc_get_host(ofp->loc_hostname, ofp->rem_su.sa.sa_family == AF_INET ? 0 : 1, &error)) { dcc_error_msg("flood local name %s: %s%s", ofp->loc_hostname, DCC_HSTRERROR(error), fnm_lno(fnm_buf, flod_path, ofp->lno)); } else { /* match local address family to remote */ sup = dcc_hostaddrs; for (;;) { if (sup->sa.sa_family == ofp->rem_su.sa.sa_family) { loc_su = *sup; break; } if (++sup >= dcc_hostaddrs_end) { dcc_error_msg("family matching %s" " %s not available for" " %s", ofp->rem_hostname, dcc_su2str_err(&ofp ->rem_su), ofp->loc_hostname); ofp->loc_hostname[0] = '\0'; break; } } } dcc_host_unlock(); } } /* if there is a single "-a address" other than localhost * and of the right family, then default to it */ if (!(ofp->o_opts.flags & FLOD_OPT_SOCKS) && loc_su.sa.sa_family == AF_UNSPEC) { for (sp = srvr_socs; sp; sp = sp->fwd) { if (dcc_ipv6sutoipv4(&su2, &sp->su) && su2.ipv4.sin_addr.s_addr == ntohl(0x7f000001)) continue; if (sp->su.sa.sa_family != ofp->rem_su.sa.sa_family) continue; if (loc_su.sa.sa_family != AF_UNSPEC) { memset(&loc_su, 0, sizeof(loc_su)); break; } loc_su = sp->su; } } if (loc_su.sa.sa_family != AF_UNSPEC || ofp->loc_port != 0) { loc_su.sa.sa_family = ofp->rem_su.sa.sa_family; *DCC_SU_PORTP(&loc_su) = ofp->loc_port; if (0 > bind(ofp->s, &loc_su.sa, DCC_SU_LEN(&loc_su))) dcc_error_msg("bind(oflod %s%s): %s", dcc_su2str_err(&loc_su), fnm_lno(fnm_buf, flod_path, ofp->lno), ERROR_STR()); } oflod_connect_start(ofp, FLOD_ERR_CONNECT, "connect"); } void oflod_write(OFLOD_INFO *ofp) { int i; if (ofp->obuf_len == 0) { if (!(ofp->flags & OFLOD_FG_CONNECTED) && 0 >= oflod_connect_start(ofp, FLOD_ERR_CONNECT2, "connect2")) return; oflod_fill(ofp); if (ofp->obuf_len == 0) return; } if (ofp->o_opts.flags & FLOD_OPT_SOCKS) i = Rsend(ofp->s, &ofp->obuf.c[ofp->obuf_off], ofp->obuf_len - ofp->obuf_off, 0); else i = send(ofp->s, &ofp->obuf.c[ofp->obuf_off], ofp->obuf_len - ofp->obuf_off, 0); if (i > 0) { ofp->obuf_off += i; if (ofp->obuf_off >= ofp->obuf_len) { /* that emptied the buffer */ ofp->obuf_len = 0; ofp->obuf_off = 0; ofp->cnts.out_reports += ofp->obuf_reports; ofp->obuf_reports = 0; /* we might want to shut down, so fill buffer again */ oflod_fill(ofp); } ofp->flags &= ~OFLOD_FG_EAGAIN; return; } /* we had an error or EOF */ if (i < 0) { /* we come here only when select() has said that we can send(). * However, it seems that Solaris nevertheless sometimes * says EAGAIN */ if (DCC_BLOCK_ERROR()) { ofp->flags |= OFLOD_FG_EAGAIN; TMSG3(FLOD, "pause after send(oflod to %s %s): %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su), ERROR_STR()); return; } rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_IO, errno, "send(oflod to %s %s): %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su), ERROR_STR()); } else { rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_IO, 0, "premature end of oflod to %s %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su)); } oflod_read(ofp); /* get any last error message */ oflod_close(ofp, 1, FLOD_ERR_SAME, 0); } static void oflod_read_emsg(OFLOD_INFO *ofp) { enum FLOD_ERR_OP op; u_char fail = 1; if (!strncmp(ofp->ibuf.end.msg, DCC_FLOD_BAD_VER_MSG_BASE, STRZ(DCC_FLOD_BAD_VER_MSG_BASE))) { op = FLOD_ERR_BAD_VERS; if (ofp->oversion != ofp->iversion) { /* notice if this peer demands a version * other than what we have been trying */ ofp->oversion = ofp->iversion; fail = 0; } else { /* otherwise don't try for a while */ oflod_retry_backoff(ofp); } } else if (!memcmp(ofp->ibuf.end.msg, DCC_FLOD_BAD_ID_MSG, STRZ(DCC_FLOD_BAD_ID_MSG))) { op = FLOD_ERR_ID; oflod_retry_backoff(ofp); } else if (!memcmp(ofp->ibuf.end.msg, DCC_FLOD_PASSWD_ID_MSG, STRZ(DCC_FLOD_PASSWD_ID_MSG)) || !memcmp(ofp->ibuf.end.msg, DCC_FLOD_NO_PASSWD_MSG, STRZ(DCC_FLOD_NO_PASSWD_MSG))) { op = FLOD_ERR_SIGN; oflod_retry_backoff(ofp); } else if (!memcmp(ofp->ibuf.end.msg, DCC_FLOD_BAD_AUTH_MSG, STRZ(DCC_FLOD_BAD_AUTH_MSG))) { op = FLOD_ERR_SIGN; /* try the second password if possible * after the peer rejects the first */ if ((ofp->flags & OFLOD_FG_PASSWD_NEXT) && !(ofp->mp->flags & FLOD_MMAP_FG_PASSWD_NEXT)) { ofp->mp->flags |= FLOD_MMAP_FG_PASSWD_NEXT; ofp->out_try_again = 0; } else { oflod_retry_backoff(ofp); } } else { ofp->out_try_secs = FLOD_RETRY_SECS/2; op = ((flods_st == FLODS_ST_OFF || (ofp->o_opts.flags & FLOD_OPT_OFF)) ? FLOD_ERR_LOCAL_OFF : FLOD_ERR_REMOTE_OFF); } rpt_err(&ofp->mp->o_err, 0, op, 0, "oflod end status from %s %s: %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su), ofp->ibuf.end.msg); oflod_close(ofp, fail, FLOD_ERR_SAME, 0); } /* see what the target has to say about the reports we have been sending */ void oflod_read(OFLOD_INFO *ofp) { int used, i; DB_PTR pos; again:; if (ofp->o_opts.flags & FLOD_OPT_SOCKS) i = Rrecv(ofp->s, &ofp->ibuf.b[ofp->ibuf_len], sizeof(ofp->ibuf) - ofp->ibuf_len, 0); else i = recv(ofp->s, &ofp->ibuf.b[ofp->ibuf_len], sizeof(ofp->ibuf) - ofp->ibuf_len, 0); if (i <= 0) { if (i < 0 && errno != ECONNRESET) { if (!DCC_BLOCK_ERROR()) { rpt_err(&ofp->mp->o_err, 0, FLOD_ERR_IO, errno, "recv(oflod %s %s): %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su), ERROR_STR()); oflod_close(ofp, 1, FLOD_ERR_SAME, 0); } return; } /* before closing, the peer is supposed to send a * "position" of DCC_FLOD_POS_END followed by * an ASCII message */ if (ofp->ibuf_len <= sizeof(DCC_FLOD_POS)) { if (ofp->obuf_len != 0) rpt_err(&ofp->mp->o_err, 0, FLOD_ERR_BAD_DATA,0, "truncated oflod end status from %s %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su)); oflod_close(ofp, 1, FLOD_ERR_SAME, 0); return; } pos = flod_pos2db_ptr(ofp->ibuf.end.z); if (pos != DCC_FLOD_POS_END) { dcc_error_msg("oflod end status from %s %s flag="L_HPAT, ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su), pos); } ofp->ibuf.b[ofp->ibuf_len] = '\0'; if (!strncmp(ofp->ibuf.end.msg, DCC_FLOD_OK_STR, sizeof(DCC_FLOD_OK_STR)-1)) { TMSG3(FLOD, "oflod end status from %s %s: %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su), ofp->ibuf.end.msg+sizeof(DCC_FLOD_OK_STR)-1); oflod_close(ofp, 0, (flods_st == FLODS_ST_OFF || (ofp->o_opts.flags & FLOD_OPT_OFF)) ? FLOD_ERR_LOCAL_OFF : FLOD_ERR_REMOTE_OFF, 0); } else { oflod_read_emsg(ofp); } return; } ofp->ibuf_len += i; if (ofp->mp != 0) ofp->mp->o_err.ok = db_time.tv_sec + LAST_ERROR_OK_SECS; if (!(ofp->flags & OFLOD_FG_SHUTDOWN_REQ)) ofp->keep_out_time = db_time.tv_sec + KEEPALIVE_OUT; used = 0; while (ofp->ibuf_len >= sizeof(ofp->ibuf.pos)) { if (used > 0) { ofp->ibuf_len -= used; if (ofp->ibuf_len == 0) return; memmove(ofp->ibuf.b, &ofp->ibuf.b[used], ofp->ibuf_len); used = 0; } pos = flod_pos2db_ptr(ofp->ibuf.end.z); if (pos < DCC_FLOD_POS_MIN) switch ((DCC_FLOD_POS_OPS)pos) { case DCC_FLOD_POS_END: /* Wait for all of final status message * until the target closes the TCP connection. * Do not worry if the target stops without * asking nicely, since at worst we will * resend whatever was in the pipe next time. */ if (ofp->ibuf_len >= sizeof(ofp->ibuf.end)) { rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_BAD_DATA,0, "babbling flod receiver %s %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su)); oflod_close(ofp, 1, FLOD_ERR_SAME, 0); return; } goto again; /* wait for the status */ case DCC_FLOD_POS_END_REQ: /* since the peer asked us to stop, * do not start again for a while */ ofp->out_try_secs = FLOD_RETRY_SECS; ofp->out_try_again = (db_time.tv_sec + ofp->out_try_secs); TMSG2(FLOD, "postpone restarting flood to " "%s for %d seconds", ofp->rem_hostname, ofp->out_try_secs); start_shutdown(ofp); used = sizeof(ofp->ibuf.pos); continue; case DCC_FLOD_POS_NOTE: /* wait until we get the length of the complaint */ if (ofp->ibuf_len < FLOD_NOTE_OVHD) goto again; used = ofp->ibuf.note.len; if (used > ISZ(ofp->ibuf.note) || used <= FLOD_NOTE_OVHD) { rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_BAD_DATA,0, "bogus oflod note length %d from %s %s", used, ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su)); oflod_close(ofp, 1, FLOD_ERR_SAME, 0); return; } if ((int)ofp->ibuf_len < used) goto again; TMSG4(FLOD, "oflod note from %s %s: %.*s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su), used-FLOD_NOTE_OVHD, ofp->ibuf.note.str); continue; case DCC_FLOD_POS_COMPLAINT: /* wait until we get the length of the complaint */ if (ofp->ibuf_len < FLOD_NOTE_OVHD) goto again; used = ofp->ibuf.note.len; if (used > ISZ(ofp->ibuf.note) || used <= FLOD_NOTE_OVHD) { rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_BAD_DATA,0, "bogus oflod complaint length %d" " from %s %s", used, ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su)); oflod_close(ofp, 1, FLOD_ERR_SAME, 0); return; } if ((int)ofp->ibuf_len < used) goto again; i = FLOD_CNTERR(&ofp->cnts.complaint); if (i <= 0) dcc_error_msg("oflod complaint from %s %s:" " %.*s%s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su), used-FLOD_NOTE_OVHD, ofp->ibuf.note.str, (i<0 ? "" : "; stop complaints")); continue; case DCC_FLOD_POS_REWIND: used = sizeof(ofp->ibuf.pos); dcc_trace_msg("oflod rewind from %s %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su)); ofp->mp->flags |= FLOD_MMAP_FG_REWINDING; ofp->cur_pos = ofp->mp->confirm_pos = DB_PTR_BASE; ofp->recv_pos = ofp->xmit_pos = DB_PTR_BASE; ofp->rewind_pos = db_csize; get_oflods_max_pos(); oflod_fill(ofp); continue; case DCC_FLOD_POS_FFWD_IN: used = sizeof(ofp->ibuf.pos); dcc_trace_msg("FFWD its input from %s %s", ofp->rem_hostname, dcc_su2str_err(&ofp->rem_su)); ofp->cur_pos = db_csize; get_oflods_max_pos(); continue; } /* The position from the peer must be one we sent, * and in the window we expect unless our * window has been broken by rewinding. * Even if our window is broken, the position must * be reasonable. */ if ((pos < ofp->recv_pos || pos > ofp->xmit_pos) && (!(ofp->mp->flags & FLOD_MMAP_FG_REWINDING) || pos < DCC_FLOD_POS_MIN || pos > db_csize)) { rpt_err(&ofp->mp->o_err, 1, FLOD_ERR_BAD_DATA,0, "bogus confirmed flood position" " "L_HPAT" from %s;" " recv_pos="L_HPAT" xmit_pos="L_HPAT, pos, ofp->rem_hostname, ofp->recv_pos, ofp->xmit_pos); oflod_close(ofp, 1, FLOD_ERR_SAME, 0); return; } ofp->recv_pos = pos; if (ofp->xmit_pos == ofp->recv_pos) ofp->mp->confirm_pos = ofp->cur_pos; else if (ofp->mp->confirm_pos < ofp->recv_pos) ofp->mp->confirm_pos = ofp->recv_pos; used = sizeof(ofp->ibuf.pos); /* things are going ok, so reset restart backoff */ ofp->out_try_secs = 0; } } static void oflods_ck(void) { OFLOD_INFO *ofp; for (ofp = oflods.infos; ofp <= LAST(oflods.infos); ++ofp) { if (ofp->rem_hostname[0] == '\0') break; if (ofp->flags & OFLOD_FG_EAGAIN) { TMSG1(FLOD, "resume flooding %s after EAGAIN", ofp->rem_hostname); ofp->flags &= ~OFLOD_FG_EAGAIN; } /* shut down any streams that have been quiet for too long */ if ((ofp->flags & OFLOD_FG_CONNECTED) && DB_IS_TIME(ofp->keep_out_time, KEEPALIVE_OUT)) oflod_close(ofp, 0, FLOD_ERR_KEEPALIVE, 0); } } void oflods_stop(u_char force) { OFLOD_INFO *ofp; flods_st = FLODS_ST_OFF; for (ofp = oflods.infos; ofp <= LAST(oflods.infos); ++ofp) { if (ofp->rem_hostname[0] == '\0') break; if (ofp->s < 0 || !(ofp->flags & OFLOD_FG_CONNECTED)) { oflod_close(ofp, 0, FLOD_ERR_LOCAL_OFF, 0); } else if (force) { oflod_close(ofp, 1, FLOD_ERR_LOCAL_OFF, 0); } else { start_shutdown(ofp); } } if (oflods.active == 0 && iflods.active == 0) oflods_unmap(); } void PATTRIB(3,4) db_broken(int linenum, const char *file, const char *p, ...) { va_list args; if (!db_failed_line) { db_failed_line = linenum; db_failed_file = file; } va_start(args, p); dcc_verror_msg(p, args); va_end(args); } /* (re)start listening for incoming floods and sending outgoing floods */ void flods_restart(const char *msg) { /* unlocked database keeps flooding off */ if (flods_off < 0) flods_off = 0; if (flods_off > 0 || db_minimum_map) return; if (msg) iflods_stop(msg, 0); flods_st = FLODS_ST_RESTART; flods_ck(0); } /* called periodically and at need */ void flods_ck(u_char force) { static int map_delayed; IFLOD_INFO *ifp; OFLOD_INFO *ofp; struct stat flod_sb; struct timeval; DCC_TS past, present; u_int rcd_len; int work; u_char mapped; int i, j; if (force) /* force hostname resolution */ got_hosts = 0; for (ifp = iflods.infos; ifp <= LAST(iflods.infos); ++ifp) { if (ifp->s < 0) continue; /* end connections that failed to be completed */ ofp = ifp->ofp; if (!ofp) { if (DB_IS_TIME(ifp->quit_connect, IFLOD_CONNECT_SECS)) iflod_close(ifp, 1, 1, FLOD_ERR_SIGN, 0, "%s failed to authenticate itself", ifp->hostname); continue; } /* allow more complaints */ if (DB_IS_TIME(ofp->limit_reset, FLOD_LIM_CLEAR_SECS) || force) { complained_many_iflods = 0; ofp->cnts.stale.limit = ofp->cnts.stale.val; ofp->cnts.dup.limit = ofp->cnts.dup.val; ofp->cnts.ok2.limit = ofp->cnts.ok2.val; ofp->cnts.not_deleted.limit = ofp->cnts.not_deleted.val; ofp->cnts.bad_id.limit = ofp->cnts.bad_id.val; ofp->cnts.complaint.limit = ofp->cnts.complaint.val; ofp->cnts.iflod_bad.limit = ofp->cnts.iflod_bad.val; ofp->limit_reset = db_time.tv_sec+FLOD_LIM_CLEAR_SECS; } if (!(ifp->flags & IFLOD_FG_VERS_CK)) continue; save_flod_cnts(ofp); if (!DB_IS_TIME(ifp->ofp->keep_in_time, KEEPALIVE_IN)) { /* Tell the peer how much we've processed. */ iflod_send_pos(ifp, 0); } else { /* If we have not heard from the peer for a long time, * then repeat our position or close the connection * as a "keepalive". * If we have asked the peer to stop and it has not * done so, then break the link. */ if (ifp->flags & IFLOD_FG_END_REQ) { iflod_close(ifp, 1, 0, FLOD_ERR_KEEPALIVE, 0, "%s ignored close request", ifp->hostname); } else if (!iflod_send_pos(ifp, 1)) { iflod_stop(ifp, "keepalive"); } } } if (!flods_off && !db_minimum_map) { /* stop and restart the pumps if the list of peers has * changed or if our map has disappeared */ if (0 > stat(flod_path, &flod_sb)) { if (errno != ENOENT && flod_mtime != 0) dcc_error_msg("stat(%s): %s", flod_path, ERROR_STR()); flod_sb.st_mtime = 0; } if (flod_mtime != 0 && 0 > access(flod_mmap_path, W_OK | R_OK)) { if (errno != ENOENT) dcc_error_msg("access(%s): %s", flod_mmap_path, ERROR_STR()); flod_sb.st_mtime = 0; } if (flods_st != FLODS_ST_RESTART && flod_sb.st_mtime != flod_mtime) { if (flod_mtime != 0) { dcc_trace_msg("%s has changed", flod_path); flod_mtime = 0; } flods_st = FLODS_ST_RESTART; } } if (flods_st != FLODS_ST_ON) { oflods_stop(0); iflods_stop(0, 0); /* wait until the previous floods have stopped to restart */ if (!flods_off && !db_minimum_map) { if (oflods.active != 0 || iflods.active != 0 || !flod_names_resolve_ck()) { flods_st = FLODS_ST_RESTART; /* check again soon but not immediately */ RUSH_NEXT_FLODS_CK(); } else { if (oflods_load()) flods_st = FLODS_ST_ON; } } } /* that is all we can do if flooding is off */ if (flods_st != FLODS_ST_ON || flods_off || db_minimum_map) { oflods_ck(); return; } iflods_start(); /* generate delayed summaries */ if (summarize_delay_secs == 0) { /* must summarize before reports are compressed */ summarize_delay_secs = MAX_SUMMARIZE_DELAY_SECS; for (i = 0; i < DIM(db_parms.ex_secs); ++i) { j = db_parms.ex_secs[i].all; if (j != 0 && j < summarize_delay_secs) j = summarize_delay_secs; } } dcc_timeval2ts(past, &db_time, -summarize_delay_secs); dcc_timeval2ts(present, &db_time, 0); if (flod_mmaps) { if (flod_mmaps->delay_pos > db_csize || flod_mmaps->delay_pos < DB_PTR_BASE) flod_mmaps->delay_pos = DB_PTR_BASE; work = 0; while (flod_mmaps->delay_pos < db_csize) { if (!db_map_rcd(0, &db_sts.sumrcd, flod_mmaps->delay_pos, &rcd_len)) { flod_mmaps->delay_pos = db_csize; break; } if (DB_RCD_DELAY(db_sts.sumrcd.d.r)) { /* wait until it is time */ if (DCC_TS_NEWER_TS(db_sts.sumrcd.d.r->ts, past) && !DCC_TS_NEWER_TS(db_sts.sumrcd.d.r->ts, present)) break; if (!summarize_dly()) { flod_mmaps->delay_pos = db_csize; break; } } flod_mmaps->delay_pos += rcd_len; if (++work >= 100) { /* spend at most 0.1 second at this * and then let other processes run*/ if (too_busy) break; gettimeofday(&db_time, 0); if (tv_diff2us(&db_time,&wake_time)>DCC_US/10) { too_busy = 1; break; } work = 0; } } /* prime the outgoing pumps */ for (ofp = oflods.infos; ofp <= LAST(oflods.infos); ++ofp) { if (ofp->rem_hostname[0] == '\0') break; /* force error messages soon but not now to give * new connetions a chance to be made */ if (force) new_peer(ofp); if (ofp->s >= 0) { if (ofp->flags & OFLOD_FG_NEW) ofp->flags &= ~OFLOD_FG_NEW; else oflod_fill(ofp); } else { oflod_open(ofp); } if ((ofp->o_opts.flags & FLOD_OPT_SOCKS) && !(ofp->i_opts.flags & FLOD_OPT_OFF)) { iflod_socks_start(ofp); } } } mapped = 0; if (flod_mmaps) { map_delayed = 0; } else if (!force && ++map_delayed > 10) { oflods_load(); mapped = 1; map_delayed = 0; } for (ofp = oflods.infos; ofp <= LAST(oflods.infos); ++ofp) { if (ofp->rem_hostname[0] == '\0') break; if (!ofp->mp) continue; if (ofp->s < 0 && !(ofp->o_opts.flags & FLOD_OPT_OFF)) { rpt_err(&ofp->mp->o_err, 0, FLOD_ERR_SAME, 0, "no outgoing flood connection to" " %s, server-ID %d", ofp->rem_hostname, ofp->rem_id); } if (!ofp->ifp && !(ofp->i_opts.flags & FLOD_OPT_OFF)) { rpt_err(&ofp->mp->i_err, 0, FLOD_ERR_SAME, 0, "no incoming flood connection from" " %s, server-ID %d", ofp->rem_hostname, ofp->rem_id); } } if (mapped) oflods_unmap(); oflods_ck(); /* try to reap the hostname resolving child */ flod_names_resolve_ck(); } void flods_init(void) { IFLOD_INFO *ifp; for (ifp = iflods.infos; ifp <= LAST(iflods.infos); ++ifp) ifp->s = -1; oflods_clear(); flods_restart(0); }