/* ** Copyright (C) 2004-2011 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.227.7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract F19628-00-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ /* ** Interface to pull a single flow from a NetFlow v5 PDU ** */ #include RCSIDENT("$Id: pdusource.c 16619 2011-01-11 21:07:54Z mthomas $"); #include #include "v5pdu.h" #include "udpsource.h" #include #include /* Number of milliseconds the calculated router boot time for a PDU * packet must be beyond the last packet in order to consider the * router to have rebooted. */ #define ROUTER_BOOT_FUZZ 1e3 /* Encapsulate timing information from a NetFlow v5 PDU header. */ typedef struct cflowdTimeInfo_st { /* router boot time as milliseconds since the UNIX epoch */ intmax_t router_boot; /* milliseconds since the router booted */ intmax_t sysUptime; } cflowdTimeInfo_t; typedef struct pdu_source_st { flowsource_stats_t statistics; pthread_mutex_t stats_mutex; udpSource_t *source; cflowdTimeInfo_t ti; intmax_t last_router_boot; uint8_t count; /* Number of recs left in pdu */ v5PDU *pdu; /* Current pdu */ uint32_t flowSeqNumbers[0xffff]; BITMAP_DECLARE( engineMasks, 0xffff); uint8_t logopt; unsigned file : 1; /* File-based source */ unsigned stopped : 1; } pdu_source_t; /* typedef struct pdu_source_st *pduSource_t; // libflowsource.h */ #define COUNT_BAD_PACKET(source, pdu) \ { \ pthread_mutex_lock(&((source)->stats_mutex)); \ (source)->statistics.badPkts++; \ pthread_mutex_unlock(&((source)->stats_mutex)); \ } #define COUNT_BAD_RECORD(source, pdu) \ { \ pthread_mutex_lock(&((source)->stats_mutex)); \ (source)->statistics.badRecs++; \ pthread_mutex_unlock(&((source)->stats_mutex)); \ } /* * TIME VALUES IN THE NETFLOW V5 PDU * * The naive ordering of events with respect to time in the router * would be to collect the flows and generate the PDU. Thus, one * would expect: * * flow.Start < flow.End < hdr.sysUptime * * where all values are given as milliseconds since the router's * interface was booted, and hdr.sysUptime is advertised as the * "current" time. * * However, since values are given as 32bit numbers, the values will * roll-over after about 49.7 days. If the values roll-over in the * middle of writing the PDU, we will see one of these two * conditions: * * hdr.sysUptime << flow.Start < flow.End * * flow.End < hdr.sysUptime << flow.Start * * Thus, if flow.End less than flow.Start, we need to account for the * roll-over when computing the flow's duration. * * In practice, the PDU's header gets filled in before flows are * added, making the hdr.sysUptime not have any true time ordering * with respect to the flow.Start and flow.End, and we have seen * cases in real NetFlow data where hdr.sysUptime is slightly less * than flow.End: * * flow.Start < hdr.sysUptime < flow.End * * Moreover, some naive NetFlow PDU generators simply pin the * hdr.sysUptime to zero, and don't account for rollover at all. * This can make hdr.sysUptime much less than flow.Start. * * In order to make the determination whether the flow.Start or * hdr.sysUptime values have overflown their values and rolled-over, * we look at the difference between them. If the absolute value of * the difference is greater than some very large maximum defined in * maximumFlowTimeDeviation (currently 45 days), we assume that one * of the two has rolled over, and adjust based on that assumption. */ static const intmax_t maximumFlowTimeDeviation = (intmax_t)45 * 24 * 3600 * 1000; /* 45 days */ static const intmax_t maximumSequenceDeviation = (intmax_t)3600 * 1000; /* 1 hour of flows at 1k flows/sec */ #define ROLLOVER32 ((intmax_t)UINT32_MAX + 1) /* FUNCTION PROTOTYPES */ static void cflowdTimeInfoSetup( const v5Header *hdr, cflowdTimeInfo_t *ti); /* FUNCTION DEFINITIONS */ pduSource_t pduSourceCreate( const sk_sockaddr_array_t *from_address, const sk_sockaddr_array_t *listen_address, uint32_t max_pkts) { int rv; udpSource_t *udpsource; pduSource_t source; rv = udpSourceNetworkCreate(&udpsource, from_address, listen_address, V5PDU_LEN, max_pkts); if (rv != 0) { return NULL; } source = (pduSource_t)calloc(1, sizeof(*source)); if (source == NULL) { udpSourceDestroy(udpsource); return NULL; } source->source = udpsource; pthread_mutex_init(&source->stats_mutex, NULL); source->logopt = SOURCE_LOG_ALL; return source; } pduSource_t pduFileSourceCreate( const char *path) { udpSource_t *udpsource; pduSource_t source; int rv; rv = udpFileSourceCreate(&udpsource, path, V5PDU_LEN); if (rv != 0) { return NULL; } source = (pduSource_t)calloc(1, sizeof(*source)); if (source == NULL) { udpSourceDestroy(udpsource); return NULL; } source->file = 1; source->source = udpsource; return source; } void pduSourceStop(pduSource_t source) { source->stopped = 1; udpSourceStop(source->source); } void pduSourceDestroy(pduSource_t source) { if (!source->stopped) { pduSourceStop(source); } udpSourceDestroy(source->source); pthread_mutex_destroy(&source->stats_mutex); free(source); } /* * cflowdTimeInfoSetup(hdr, ti); * * Given the NetFlow v5 header 'hdr', initialize the time info * structure 'ti' with the boot time of the router. */ static void cflowdTimeInfoSetup( const v5Header *hdr, cflowdTimeInfo_t *ti) { intmax_t now; /* get the sysUptime, which is the current time in milliseconds * since the export device booted */ ti->sysUptime = ntohl(hdr->SysUptime); /* use the PDU header to get the "current" time as milliseconds * since the UNIX epoch; round nanoseconds by adding 5e5 before * dividing. */ now = ((intmax_t)1000 * ntohl(hdr->unix_secs) + ((ntohl(hdr->unix_nsecs) + 5e5L) / 1e6L)); /* subtract sysUpTime from current-time to get router boot time as * milliseconds since UNIX epoch */ ti->router_boot = now - ti->sysUptime; } static v5PDU *pduNext(pduSource_t source) { /* keep this array and the enum in sync. we use these and the * macro to count the number of invalid PDUs we saw consecutively, * and we print a log message for the bad PDUs as a group. */ static const char *err_msgs[] = { "No Error", "not marked as version 5", "reporting more than " V5PDU_MAX_RECS_STR " records", "reporting zero records" }; enum err_status_en { PDU_OK = 0, PDU_BAD_VERSION, PDU_ZERO_RECORDS, PDU_OVERFLOW_RECORDS } err_status = PDU_OK; uint32_t err_count = 0; #define LOG_BAD_PDUS(new_err_status) \ if (new_err_status == err_status) { \ ++err_count; \ } else { \ if (err_count) { \ assert(PDU_OK != err_status); \ NOTICEMSG(("Rejected %" PRIu32 " PDU record%s %s"), \ err_count, ((err_count == 1) ? "" : "s"), \ err_msgs[err_status]); \ } \ err_count = 1; \ err_status = new_err_status; \ } assert (source != NULL); /* Infloop; exit by return only */ for (;;) { v5PDU *pdu; uint16_t count; uint16_t engine; uint32_t flow_sequence; pdu = (v5PDU *)udpNext(source->source); if (pdu == NULL) { /* if we saw any bad PDUs, print message before returning */ LOG_BAD_PDUS(PDU_OK); return NULL; } pthread_mutex_lock(&source->stats_mutex); source->statistics.procPkts++; pthread_mutex_unlock(&source->stats_mutex); if (ntohs(pdu->hdr.version) != 5) { /* reject packet */ LOG_BAD_PDUS(PDU_BAD_VERSION); COUNT_BAD_PACKET(source, pdu); continue; } count = ntohs(pdu->hdr.count); if (count > V5PDU_MAX_RECS) { /* reject packet */ LOG_BAD_PDUS(PDU_OVERFLOW_RECORDS); COUNT_BAD_PACKET(source, pdu); continue; } if (count == 0) { /* reject packet */ LOG_BAD_PDUS(PDU_ZERO_RECORDS); COUNT_BAD_PACKET(source, pdu); continue; } /* at this point we are guaranteed to return from this * function. Use the PDU_OK "error" to log about any bad PDUs * we saw previously */ LOG_BAD_PDUS(PDU_OK); /* Calculate the offset from which times in the records are * calculated */ cflowdTimeInfoSetup(&pdu->hdr, &source->ti); /* Handle router reboots by resetting the engine masks */ if (((source->ti.router_boot > source->last_router_boot) && ((source->ti.router_boot - source->last_router_boot) > ROUTER_BOOT_FUZZ)) || ((source->last_router_boot - source->ti.router_boot) > ROUTER_BOOT_FUZZ)) { DEBUGMSG("Router reboot noticed"); BITMAP_INIT(source->engineMasks); } source->last_router_boot = source->ti.router_boot; /* handle seq numbers here */ flow_sequence = ntohl(pdu->hdr.flow_sequence); engine = (pdu->hdr.engine_type << 8) | pdu->hdr.engine_id; if (!BITMAP_GETBIT(source->engineMasks, engine)) { /* A new engine. Mark and record */ BITMAP_SETBIT(source->engineMasks, engine); source->flowSeqNumbers[engine] = flow_sequence + count; } else if (flow_sequence == source->flowSeqNumbers[engine]) { /* This packet is in sequence. Update the next expected * seq */ source->flowSeqNumbers[engine] = flow_sequence + count; } else { intmax_t expected; intmax_t actual; pthread_mutex_lock(&source->stats_mutex); actual = flow_sequence; expected = source->flowSeqNumbers[engine]; if (actual > expected) { if ((actual - expected) > maximumSequenceDeviation) { expected += ROLLOVER32; } } else if ((expected - actual) > maximumSequenceDeviation) { actual += ROLLOVER32; } if (actual < expected) { /* ** Out of sequence packet. Reduce missing flow count. ** However, do not change the expected seq num. */ source->statistics.missingRecs -= count; } else { /* Increase missing flow count */ source->statistics.missingRecs += actual - expected; if (source->logopt & SOURCE_LOG_MISSING) { uint64_t allrecs = source->statistics.goodRecs + source->statistics.badRecs + source->statistics.missingRecs; INFOMSG(("Missing netflow records: " "%" PRId64 "/%" PRIu64 " == %7.4g%%"), source->statistics.missingRecs, allrecs, ((float)source->statistics.missingRecs / (float)allrecs * 100.0)); } /* Update the next expected seq */ source->flowSeqNumbers[engine] = flow_sequence + count; } pthread_mutex_unlock(&source->stats_mutex); } return pdu; } } static v5Record *pduSourceGetNextRec( pduSource_t source) { assert (source != NULL); /* Infloop; exit by return only */ for (;;) { v5Record *v5RPtr; intmax_t difference; if (source->stopped) { return NULL; } /* If we need a pdu, get a new one, otherwise we are not finished with the last. */ if (source->count == 0) { source->pdu = pduNext(source); if (source->pdu == NULL) { return NULL; } source->count = ntohs(source->pdu->hdr.count); } /* Get next record, and decrement counter*/ v5RPtr = &source->pdu->data[ntohs(source->pdu->hdr.count) - source->count--]; /* Check for zero packets or bytes. No need for byteswapping when checking zero. */ if (v5RPtr->dPkts == 0 || v5RPtr->dOctets == 0) { if (source->logopt & SOURCE_LOG_BAD) { NOTICEMSG("Netflow record has zero packets or bytes."); } COUNT_BAD_RECORD(source, source->pdu); continue; } /* Check to see if more packets them bytes. */ if (ntohl(v5RPtr->dPkts) > ntohl(v5RPtr->dOctets)) { if (source->logopt & SOURCE_LOG_BAD) { NOTICEMSG("Netflow record has more packets them bytes."); } COUNT_BAD_RECORD(source, source->pdu); continue; } /* Check to see if the First and Last timestamps for the flow * record are reasonable, accounting for rollover. If the * absolute value of the difference is greater than * maximumFlowTimeDeviation, we assume it has rolled over. */ difference = (intmax_t)ntohl(v5RPtr->Last) - ntohl(v5RPtr->First); if ((difference > maximumFlowTimeDeviation) || ((difference < 0) && (difference > (-maximumFlowTimeDeviation)))) { if (source->logopt & SOURCE_LOG_BAD) { NOTICEMSG(("Netflow record has earlier end time" " than start time.")); } COUNT_BAD_RECORD(source, source->pdu); continue; } /* Check for bogosities in how the ICMP type/code are set. It should be in dest port, but sometimes it is backwards in src port. */ if (v5RPtr->prot == 1 && /* ICMP */ v5RPtr->dstport == 0) /* No byteswapping for check against 0 */ { uint32_t *ports = (uint32_t *)&v5RPtr->srcport; *ports = BSWAP32(*ports); /* This will swap src into dest, while byteswapping. */ } pthread_mutex_lock(&source->stats_mutex); source->statistics.goodRecs++; pthread_mutex_unlock(&source->stats_mutex); return v5RPtr; } } int pduSourceGetGeneric( pduSource_t source, rwRec *rwrec) { const v5Record *v5RPtr; intmax_t v5_first, v5_last; intmax_t sTime; intmax_t difference; v5RPtr = pduSourceGetNextRec(source); if (v5RPtr == NULL) { return -1; } /* Setup start and duration */ v5_first = ntohl(v5RPtr->First); v5_last = ntohl(v5RPtr->Last); if (v5_first > v5_last) { /* End has rolled over, while start has not. Adjust end by * 2^32 msecs in order to allow us to subtract start from end * and get a correct value for the duration. */ v5_last += ROLLOVER32; } /* v5_first is milliseconds since the router booted. To get UNIX * epoch milliseconds, add the router's boot time. */ sTime = v5_first + source->ti.router_boot; /* Check to see if the difference between the 32bit start time and * the sysUptime is overly large. If it is, one of the two has * more than likely rolled over. We need to adjust based on * this. */ difference = source->ti.sysUptime - v5_first; if (difference > maximumFlowTimeDeviation) { /* sTime rollover */ sTime += ROLLOVER32; } else if (difference < (-maximumFlowTimeDeviation)) { /* sysUptime rollover */ sTime -= ROLLOVER32; } RWREC_CLEAR(rwrec); /* Convert NetFlow v5 to SiLK */ rwRecSetSIPv4(rwrec, ntohl(v5RPtr->srcaddr)); rwRecSetDIPv4(rwrec, ntohl(v5RPtr->dstaddr)); rwRecSetSPort(rwrec, ntohs(v5RPtr->srcport)); rwRecSetDPort(rwrec, ntohs(v5RPtr->dstport)); rwRecSetProto(rwrec, v5RPtr->prot); rwRecSetFlags(rwrec, v5RPtr->tcp_flags); rwRecSetInput(rwrec, ntohs(v5RPtr->input)); rwRecSetOutput(rwrec, ntohs(v5RPtr->output)); rwRecSetNhIPv4(rwrec, ntohl(v5RPtr->nexthop)); rwRecSetStartTime(rwrec, (sktime_t)sTime); rwRecSetPkts(rwrec, ntohl(v5RPtr->dPkts)); rwRecSetBytes(rwrec, ntohl(v5RPtr->dOctets)); rwRecSetElapsed(rwrec, (uint32_t)(v5_last - v5_first)); rwRecSetRestFlags(rwrec, 0); rwRecSetTcpState(rwrec, SK_TCPSTATE_NO_INFO); #if SK_ENABLE_TOS_APPLICATION_HACK rwRecSetApplication(rwrec, v5RPtr->tos); #else rwRecSetApplication(rwrec, 0); #endif return 0; } pduSource_t pduSourceCreateFromProbeDef( const skpc_probe_t *probe, uint32_t max_pkts) { const sk_sockaddr_array_t *paddr; const sk_sockaddr_array_t *haddr; pduSource_t source; int rv; uint8_t flags; assert(probe); flags = skpcProbeGetLogFlags(probe); rv = skpcProbeGetListenOnSockaddr(probe, &paddr); if (rv == -1) { return NULL; } rv = skpcProbeGetAcceptFromHost(probe, &haddr); if (rv == -1) { haddr = NULL; } source = pduSourceCreate(haddr, paddr, max_pkts); if (source == NULL) { return NULL; } pduSourceSetLogopt(source, flags); return source; } /* Get statistics associated with a pdu source. */ void pduSourceGetStats(pduSource_t source, flowStats_t stats) { pthread_mutex_lock(&source->stats_mutex); *stats = source->statistics; pthread_mutex_unlock(&source->stats_mutex); } /* Clear out current statistics */ void pduSourceClearStats(pduSource_t source) { pthread_mutex_lock(&source->stats_mutex); memset(&source->statistics, 0, sizeof(source->statistics)); pthread_mutex_unlock(&source->stats_mutex); } /* Set logging options */ void pduSourceSetLogopt(pduSource_t source, uint8_t opt) { source->logopt = opt; } /* ** Local Variables: ** mode:c ** indent-tabs-mode:nil ** c-basic-offset:4 ** End: */