Source code for fastsnmp.snmp_poller

#!/usr/bin/python3
# -*- coding: utf-8 -*-
import sys

if sys.version_info[0] < 3:
    sys.stderr.write("This program work only with python3. Sorry.")
    sys.exit(1)
import select

if hasattr(select, 'epoll'):
    from select import epoll as poll
    from select import EPOLLOUT as POLLOUT
    from select import EPOLLIN as POLLIN
    from select import EPOLLERR as POLLERR
elif hasattr(select, 'poll'):
    from select import poll
    from select import POLLOUT
    from select import POLLIN
    from select import POLLERR
else:
    print("The current platform does not support epoll", file=sys.stderr)
    sys.exit(1)

import logging
import socket
import queue
import collections
from fastsnmp import snmp_parser
from time import time
import random
from itertools import cycle

try:
    import mass_resolver
except ImportError:
    mass_resolver = None

DEBUG = False
logger = logging.getLogger('fastsnmp.snmp_poller')
MAX_SOCKETS_COUNT = 100


[docs]def resolve(hosts): if mass_resolver: res = mass_resolver.resolve(hosts) else: # slow way res = dict() for host in hosts: host_ips = res.setdefault(host, list()) try: host_ip = socket.gethostbyname(host) except socket.gaierror: logger.error("unable to resolve %s. skipping this host" % host) continue host_ips.append(host_ip) return res
[docs]def poller(hosts, oids_groups, community, timeout=3, backoff=2, retry=2, msg_type="GetBulk"): """ A generator that yields SNMP data :param hosts: hosts :param oids_groups: oids_groups :param community: community :type hosts: list | tuple :type oids_groups: list | tuple :type community: str :return: host, main_oid, index_part, value :rtype: tuple """ job_queue = queue.Queue() socksize = 0x2000000 retried_req = collections.defaultdict(int) # message cache reqid_to_msg = {} pending_query = {} # ip => fqdn target_info = {} # fqdn => ips target_info_r = resolve(hosts) varbinds_cache = {} for fqdn, ips in list(target_info_r.items()): if ips: target_info[ips[0]] = fqdn varbinds_cache[ips[0]] = collections.UserDict() varbinds_cache[ips[0]].by_oids = {} else: logger.error("unable to resolve %s. skipping this host", fqdn) del target_info_r[fqdn] # preparation of targets start_reqid = random.randint(1, 999) * 10000 for oids_group in oids_groups: if isinstance(oids_group, list): oids_group = tuple(oids_group) target_oid_group = (oids_group, oids_group) for fqdn, ips in target_info_r.items(): varbinds_cache[ips[0]][start_reqid] = target_oid_group varbinds_cache[ips[0]].by_oids[target_oid_group] = start_reqid start_reqid += 10000 # add initial jobs for ip, poll_data in varbinds_cache.items(): for reqid in poll_data: job_queue.put((ip, reqid)) # preparation of sockets socket_map = {} epoll = poll() socket_count = min((MAX_SOCKETS_COUNT, len(target_info_r))) for _ in range(socket_count): new_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) new_sock.bind(('0.0.0.0', 0)) socket_map[new_sock.fileno()] = new_sock epoll.register(new_sock, POLLOUT) # main loop while True: try: events = epoll.poll(0.1) for fileno, event in events: if event & POLLOUT: fdfmt = POLLIN if not job_queue.empty(): host, pdudata_reqid = job_queue.get() oids_to_poll, main_oids = varbinds_cache[host][pdudata_reqid] q = (pdudata_reqid, oids_to_poll) if q in reqid_to_msg: message = reqid_to_msg[q] else: message = snmp_parser.msg_encode(pdudata_reqid, community, oids_to_poll, max_repetitions=20, msg_type=msg_type) reqid_to_msg[q] = message socket_map[fileno].sendto(message, (host, 161)) pending_query[(host, pdudata_reqid)] = int(time()) if DEBUG: logger.debug('sendto %s reqid=%s get oids=%s', host, pdudata_reqid, oids_to_poll) job_queue.task_done() if not job_queue.empty(): fdfmt = fdfmt | POLLOUT epoll.modify(fileno, fdfmt) elif event & POLLIN: data, remotehost = socket_map[fileno].recvfrom(socksize) host_ip = remotehost[0] try: pdudata_reqid, error_status, error_index, var_bind_list = snmp_parser.msg_decode(data) except Exception as e: logger.critical("%s. unable to decode PDU from %s. data=%r", e, host_ip, repr(data)) continue if pending_query.pop((host_ip, pdudata_reqid), None) is None: if DEBUG: logger.debug("received answer after timeout from %s reqid=%s", host_ip, pdudata_reqid) continue if error_status: logger.error('%s get error_status %s at %s. query=%s', target_info[host_ip], error_status, error_index, varbinds_cache[host_ip][pdudata_reqid][0]) continue if DEBUG: logger.debug('%s recv reqid=%s' % (host_ip, pdudata_reqid)) if pdudata_reqid not in varbinds_cache[host_ip]: if DEBUG: logger.debug('received unknown reqid=%s for host=%s. skipping', pdudata_reqid, host_ip) continue oids_to_poll, main_oids = varbinds_cache[host_ip][pdudata_reqid] main_oids_len = len(main_oids) main_oids_positions = cycle(range(main_oids_len)) var_bind_list_len = len(var_bind_list) skip_column = {} # if some oid in requested oids is not supported, column with it is index will # be filled with another oid. need to skip last_seen_index = {} for var_bind_pos in range(var_bind_list_len): oid, value = var_bind_list[var_bind_pos] # oids in received var_bind_list in round-robin order respectively query main_oids_pos = next(main_oids_positions) if value is None: if DEBUG: logger.debug('found none value %s %s %s' % (host_ip, oid, value)) skip_column[main_oids_pos] = True if main_oids_pos in skip_column: continue main_oid = main_oids[main_oids_pos] if msg_type == "GetBulk": if oid.startswith(main_oid + '.'): index_part = oid[len(main_oid) + 1:] last_seen_index[main_oids_pos] = index_part yield (target_info[host_ip], main_oid, index_part, value) else: if DEBUG: logger.debug( 'host_ip=%s column_pos=%s skip oid %s=%s, reqid=%s. Not found in %s' % (host_ip, main_oids_pos, oid, value, pdudata_reqid, main_oids)) logger.debug('vp=%s oid=%s main_oid=%s main_oids_pos=%s main_oids=%s', var_bind_pos, oid, main_oid, main_oids_pos, main_oids) skip_column[main_oids_pos] = True if len(skip_column) == var_bind_list_len: break else: yield (target_info[host_ip], main_oid, "", value) skip_column[main_oids_pos] = True if len(skip_column) < main_oids_len: if len(skip_column): oids_to_poll = list() new_main_oids = list() for pos in range(main_oids_len): if pos in skip_column: continue oids_to_poll.append("%s.%s" % (main_oids[pos], last_seen_index[pos])) new_main_oids.append(main_oids[pos]) oids_to_poll = tuple(oids_to_poll) new_main_oids = tuple(new_main_oids) else: oids_to_poll = tuple( "%s.%s" % (main_oids[p], last_seen_index[p]) for p in range(main_oids_len)) new_main_oids = main_oids oid_group = (oids_to_poll, new_main_oids) if oid_group in varbinds_cache[host_ip]: next_reqid = varbinds_cache[host_ip][oid_group] else: next_reqid = pdudata_reqid + 10 varbinds_cache[host_ip][next_reqid] = oid_group varbinds_cache[host_ip].by_oids[oid_group] = next_reqid job_queue.put((host_ip, next_reqid)) else: if DEBUG: logger.debug('found not interested in oid=%s value=%s host=%s reqid=%s' % ( oid, value, host_ip, pdudata_reqid)) epoll.modify(fileno, POLLOUT | POLLIN) elif event & POLLERR: logger.critical('socket error') raise Exception('epoll error') if not events and job_queue.empty() and not pending_query: break if pending_query: # check timeouts cur_time = int(time()) timeouted_querys = [] for query, query_time in pending_query.items(): attempt = retried_req.get(query, 1) if attempt == 1: query_timeout = attempt * timeout else: query_timeout = attempt * backoff * timeout if cur_time - query_time > query_timeout: timeouted_querys.append(query) if DEBUG: logger.warning('timeout %s > %s. attempt=%s, %s', cur_time - query_time, query_timeout, attempt, query) for timeouted_query in timeouted_querys: if timeouted_query not in retried_req or retried_req[timeouted_query] < retry: if DEBUG: logger.debug('resend %s', timeouted_query) job_queue.put(timeouted_query) retried_req[timeouted_query] += 1 else: logger.warning("%s ip=%s query timeout for OID's: %s", target_info[timeouted_query[0]], timeouted_query[0], varbinds_cache[timeouted_query[0]][timeouted_query[1]][0]) del pending_query[timeouted_query] if not job_queue.empty(): sockets_write_count = min(job_queue.qsize(), len(socket_map)) for sock in list(socket_map.values())[0:sockets_write_count]: epoll.modify(sock, POLLOUT | POLLIN) except InterruptedError: # signal in syscall. suppressed by default on python 3.5 pass