#! /usr/bin/env python3 # # check_temperature - Nagios temperature check for RaspberryPi-connected sensors # # Version 1.3 latest version, documentation and bugtracker available at: # https://gitlab.lindenaar.net/scripts/nagios-plugins # # Copyright (c) 2017 - 2024 Frederik Lindenaar # # This script is free software: you can redistribute and/or modify it under the # terms of version 3 of the GNU General Public License as published by the Free # Software Foundation, or (at your option) any later version of the license. # # This script is distributed in the hope that it will be useful but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along with # this program. If not, visit <http://www.gnu.org/licenses/> to download it. from sys import exit from os.path import basename, splitext from glob import glob from time import time, sleep from argparse import ArgumentParser as StandardArgumentParser, FileType, \ _StoreAction as StoreAction, _StoreConstAction as StoreConstAction import logging # Constants (no need to change but allows for easy customization) VERSION="1.3" PROG_NAME=splitext(basename(__file__))[0] PROG_VERSION=PROG_NAME + ' ' + VERSION CPU_SENSOR_DEV = '/sys/class/thermal/thermal_zone0/temp' CPU_SENSOR_SCALE=1000 I2C_BMX280_DEFAULT_ADDR=0x77 I2C_BMX280_CALIBRATE_ADDR=0x88 I2C_BMX280_CALIBRATE_LEN=24 I2C_BMX280_CAL_HUM0_ADDR=0xA1 I2C_BMX280_CAL_HUM_ADDR=0xE1 I2C_BMX280_CAL_HUM_LEN=7 I2C_BMX280_CONFIG_ADDR=0xF5 I2C_BMX280_CONFIG = 0xA0 # Stand_by time = 1000 ms I2C_BMX280_CTRL_MEAS_ADDR=0xF4 I2C_BMX280_CTRL_MEAS=0x27 # Normal mode, Pressure # and Temperature Oversampling rate = 1 I2C_BMX280_CTRL_HUM_ADDR=0xF2 I2C_BMX280_CTRL_HUM=1 # Humidity Oversampling rate = 1 I2C_BMX280_MEAS_ADDR=0xF7 I2C_BMX280_MEAS_LEN=8 I2C_BMX280_SENSOR_SCALE=5120.0 I2C_MCP9808_DEFAULT_ADDR=0x18 I2C_MCP9808_CONFIG_ADDR=0x1 I2C_MCP9808_CONFIG = [ 0x00, 0x00 ] # continuous conversion (power-up default) I2C_MCP9808_PRECISION_ADDR=0x08 I2C_MCP9808_PRECISION=3 # 0=0.5, 1=0.25, 2=0.125, 3=0.0625 degr. C I2C_MCP9808_TEMP_ADDR=0x05 I2C_MCP9808_SENSOR_SCALE=16 W1_SENSOR_DEV_DIR = '/sys/bus/w1/devices/' W1_SENSOR_DEV_PREFIX = '28-' W1_SENSOR_DEV_SUFFIX = '/w1_slave' W1_SENSOR_READ_RETRIES=10 W1_SENSOR_SCALE=1000 LOG_FORMAT='%(levelname)s - %(message)s' LOG_FORMAT_FILE='%(asctime)s - ' + LOG_FORMAT LOGGING_NONE=logging.CRITICAL + 10 NAGIOS_OK = ( 'OK', 0) NAGIOS_WARN = ( 'WARNING', 1) NAGIOS_CRITICAL = ( 'CRITICAL', 2 ) NAGIOS_UNKNOWN = ( 'UNKNOWN', 3 ) # Setup logging logging.basicConfig(format=LOG_FORMAT) logging.addLevelName(LOGGING_NONE, 'NONE') logger = logging.getLogger(PROG_NAME) logger.setLevel(logging.CRITICAL) ################[ wrapper to stop ArgumentParser from exiting ]################ # based on http://stackoverflow.com/questions/14728376/i-want-python-argparse-to-throw-an-exception-rather-than-usage/14728477#14728477 # the only way to do this is overriding the error method and throw and Exception class ArgumentParserError(Exception): pass class ArgumentParser(StandardArgumentParser): """ArgumentParser not exiting with non-Nagios format message upon errors""" def error(self, message): raise ArgumentParserError(message) ##################[ Action to immediately set the log level ]################## class SetLogLevel(StoreConstAction): """ArgumentParser action to set log level to provided const value""" def __call__(self, parser, namespace, values, option_string=None): logging.getLogger(PROG_NAME).setLevel(self.const) ####################[ Action to immediately log to a file ]#################### class SetLogFile(StoreAction): """ArgumentParser action to log to file (sets up FileHandler accordingly)""" def __call__(self, parser, namespace, values, option_string=None): super(SetLogFile, self).__call__(parser,namespace,values,option_string) formatter = logging.Formatter(LOG_FORMAT_FILE) handler = logging.FileHandler(values) handler.setFormatter(formatter) logger = logging.getLogger(PROG_NAME) logger.propagate = False logger.addHandler(handler) ############################################################################### def hex_int(string): """Use int()'s auto-detection to parse 10-base and 16-base (0x..) numbers""" return int(string, 0); def convert_celcius(temp_read, scale = 1): """Converts raw temperature sensore value to degrees Celcius""" return float(temp_read) / float(scale) CONVERT_CELCIUS = ( convert_celcius, 'C', 'Celcius' ) def convert_farenheit(temp_read, scale = 1): """Converts raw temperature sensore value to degrees Farenheit""" return float(temp_read * 9) / float(5 * scale) + 32.0 CONVERT_FARENHEIT = ( convert_farenheit, 'F', 'Farenheit' ) ####################[ Get CPU temperature of Raspberry Pi ]#################### def read_rpi_cpu_temp(args): """Reads CPU temperature ands returns it converted to desired unit""" with open(args.file, 'r') as f: lines = f.readlines() logger.debug('Temperature sensor data read from %s: %s', f.name, lines) temp_read = int(lines[0]) temp = args.converter[0](temp_read, CPU_SENSOR_SCALE) logger.debug('Temperature sensor value %d is %.2f%s', temp_read, temp, args.converter[1]) return temp, 1 ###############[ Get I2C (sm)bus object and I2C device address ]############### def i2c_get_smbus_devaddr(args): try: import smbus except ImportError: try: import smbus2 as smbus except ImportError: logger.critical("Unable to import either smbus or smbus2 library"); raise ImportError("missing I2C library, please install smbus2 " "or Debian python-smbus package "); try: return (smbus.SMBus(args.i2cbus), # get i2c bus args.default_address if args.address is None else args.address) except OSError as e: logger.critical(e) raise IOError("Invalid I2C bus: %d" % args.i2cbus) ####################[ Get I2C BME280/BMP280 Sensor values ]#################### # Inspired by https://github.com/ControlEverythingCommunity/BME280 def read_i2c_bmX280(args): """Returns temperature from I2C BME280/BMP280 sensor in desired unit""" i2c_bus, i2c_addr = i2c_get_smbus_devaddr(args) def convertLE16(data, offset, signed=False): result = (data[offset + 1] << 8) | data[offset] if signed and result > 32767: result -= 65536 return result try: # Get Temperature and Pressure Calibration Data data = i2c_bus.read_i2c_block_data(i2c_addr, I2C_BMX280_CALIBRATE_ADDR, I2C_BMX280_CALIBRATE_LEN) dig_T1 = convertLE16(data, 0) dig_T2 = convertLE16(data, 2, True) dig_T3 = convertLE16(data, 4, True) dig_P1 = convertLE16(data, 6) dig_P2 = convertLE16(data, 8, True) dig_P3 = convertLE16(data, 10, True) dig_P4 = convertLE16(data, 12, True) dig_P5 = convertLE16(data, 14, True) dig_P6 = convertLE16(data, 16, True) dig_P7 = convertLE16(data, 18, True) dig_P8 = convertLE16(data, 20, True) dig_P9 = convertLE16(data, 22, True) if args.read_humidity: # Get Humidity Calibration Data dig_H1 = i2c_bus.read_byte_data(i2c_addr,I2C_BMX280_CAL_HUM0_ADDR) data = i2c_bus.read_i2c_block_data(i2c_addr,I2C_BMX280_CAL_HUM_ADDR, I2C_BMX280_CAL_HUM_LEN) dig_H2 = convertLE16(data, 0, True) dig_H3 = data[2] dig_H4 = (data[3] << 4) | (data[4] & 0x0f) dig_H5 = (data[5] << 4) | (data[4] >> 4) dig_H6 = data[6] if dig_H6 > 127: dig_H6 -= 256 i2c_bus.write_byte_data(i2c_addr, I2C_BMX280_CTRL_HUM_ADDR, I2C_BMX280_CTRL_HUM) # Setup BMP280/BME280 configuration and wait 0.5 seconds for things to settle i2c_bus.write_byte_data(i2c_addr, I2C_BMX280_CTRL_MEAS_ADDR, I2C_BMX280_CTRL_MEAS) i2c_bus.write_byte_data(i2c_addr, I2C_BMX280_CONFIG_ADDR, I2C_BMX280_CONFIG) sleep(0.5) # Read sensor data and and convert using calibration data data = i2c_bus.read_i2c_block_data(i2c_addr, I2C_BMX280_MEAS_ADDR, I2C_BMX280_MEAS_LEN) # Convert 20-bits Temperature and calculate value with calibration data adc_t = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4) var1 = ((adc_t) / 16384.0 - (dig_T1) / 1024.0) * (dig_T2) var2 = (((adc_t) / 131072.0 - (dig_T1) / 8192.0) * ((adc_t)/131072.0 - (dig_T1)/8192.0)) * (dig_T3) t_fine = (var1 + var2) # Convert 20-bits Pressure and calculate value with calibration data adc_p = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4) var1 = (t_fine / 2.0) - 64000.0 var2 = var1 * var1 * (dig_P6) / 32768.0 var2 = var2 + var1 * (dig_P5) * 2.0 var2 = (var2 / 4.0) + ((dig_P4) * 65536.0) var1 = ((dig_P3) * var1 * var1 / 524288.0 + ( dig_P2) * var1) / 524288.0 var1 = (1.0 + var1 / 32768.0) * (dig_P1) p = 1048576.0 - adc_p p = (p - (var2 / 4096.0)) * 6250.0 / var1 var1 = (dig_P9) * p * p / 2147483648.0 var2 = p * (dig_P8) / 32768.0 pressure = (p + (var1 + var2 + (dig_P7)) / 16.0) / 100 extra_readings = [ ('pressure', pressure, 'hPa'), ] if args.read_humidity: # Convert 16-byte Humidity and calculate value with calibration data adc_h = (data[6] << 8) | data[7] var_H = ((t_fine) - 76800.0) var_H = (adc_h - (dig_H4 * 64.0 + dig_H5 / 16384.0 * var_H)) * (dig_H2 / 65536.0 * (1.0 + dig_H6 / 67108864.0 * var_H * (1.0 + dig_H3 / 67108864.0 * var_H))) humidity = var_H * (1.0 - dig_H1 * var_H / 524288.0) if humidity > 100.0 : humidity = 100.0 elif humidity < 0.0 : humidity = 0.0 extra_readings += ('humidity', humidity, '%', 0, 100), logger.debug('BME280 sensor data: temperature %#02x: %#x = %d, ' 'pressure: %#x, humidity: %#x', i2c_addr, adc_t, t_fine, adc_p, adc_h) logger.debug('Sensor Humidity value is %.2f %%', humidity) else: logger.debug('BMP280 sensor data: temperature %#02x: %#x = %d, ' 'pressure: %#x',i2c_addr, adc_t, t_fine, adc_p) # convert temperature to right units and scale and return value temp = args.converter[0](t_fine, I2C_BMX280_SENSOR_SCALE) logger.debug('Sensor Temperature value %d is %.2f%s', t_fine, temp, args.converter[1]) logger.debug('Sensor Pressure value is %.2f hPa', pressure) return (temp, extra_readings), 1 except IOError as e: logger.critical(e) raise IOError("Error while communicating with I2C device %#02x" % i2c_addr) ########################[ Get I2C MCP9808 Temperature ]######################## # Inspired by https://github.com/ControlEverythingCommunity/MCP9808 def read_i2c_mcp9808(args): """Returns temperature from I2C mcp9808 sensor in desired unit""" i2c_bus, i2c_addr = i2c_get_smbus_devaddr(args) try: # Setup MCP9808 configuration and wait 0.5 seconds for things to settle i2c_bus.write_i2c_block_data(i2c_addr, I2C_MCP9808_CONFIG_ADDR, I2C_MCP9808_CONFIG) i2c_bus.write_byte_data(i2c_addr, I2C_MCP9808_PRECISION_ADDR, I2C_MCP9808_PRECISION) sleep(0.5) # Read temperature (2 bytes - MSB,LSB) and convert to 13-bit signed int data = i2c_bus.read_i2c_block_data(i2c_addr,I2C_MCP9808_TEMP_ADDR,2) temp_read = ((data[0] & 0xF) << 8) | data[1] if (data[0] & 0x10): temp_read = -temp_read logger.debug('MCP9808 sensor data %#02x: 0x%02x%02x = %d', i2c_addr, data[0], data[1], temp_read) # convert temperatur to right units and scale and return value temp = args.converter[0](temp_read, I2C_MCP9808_SENSOR_SCALE) logger.debug('Temperature sensor value %d is %.2f%s', temp_read, temp, args.converter[1]) return temp, 1 except IOError as e: logger.critical(e) raise IOError("Error while communicating with I2C device %#02x" % i2c_addr) #####################[ Get 1-Wire sensor device filename ]##################### def get_w1_sensor_device_filename(args, dev_dir=W1_SENSOR_DEV_DIR, prefix=W1_SENSOR_DEV_PREFIX, suffix=W1_SENSOR_DEV_SUFFIX): """Auto-determine sensor datafile name (unless args.file is set)""" if not args.file: search_pat = dev_dir + ('/' if dev_dir[-1]!='/' else '') search_pat+= prefix + ('*' + args.serial if args.serial else '*') logger.debug('looking for sensors with search pattern %s', search_pat) device_folders = glob(search_pat) if len(device_folders) == 1: filename = device_folders[0] + suffix else: if len(device_folders) == 0: errmsg = 'no supported temperature sensors in %s' % dev_dir else: serials = [ basename(x)[len(prefix):] if x.find(prefix)>=0 else basename(x) for x in device_folders ] errmsg = 'found multiple temperature sensors (%s), please '\ 'specify which one to use' % ', '.join(serials) logger.critical(errmsg) raise ValueError(errmsg) else: filename = args.file logger.debug('using temperature sensor at %s', filename) return filename ###################[ Get 1-Wire DS18b20 sensor temperature ]################### def read_w1_ds18b20(args): """Returns temperature from 1-wire ds18b20 sensor in desired unit""" device_file = get_w1_sensor_device_filename(args) lines=[ '' ] tries = 0 while tries <= args.retries and lines[0].strip()[-3:] != 'YES': if tries > 0: logger.warn('Temperature sensor data not stable, reading once more') sleep(0.2) tries += 1 with open(device_file, 'r') as f: lines = f.readlines() logger.debug('Temperature sensor data read from %s: %s', f.name, lines) if lines[0].strip()[-3:] != 'YES': errmsg = 'no stable temperature sensor data after %d tries' % tries else: equals_pos = lines[1].find('t=') if equals_pos == -1: errmsg = 'temperature sensor data format is not supported' else: temp_read = int(lines[1][equals_pos+2:]) temp = args.converter[0](temp_read, W1_SENSOR_SCALE) logger.debug('Temperature sensor value %d is %.2f%s', temp_read, temp, args.converter[1]) return temp, tries logger.critical(errmsg) raise ValueError(errmsg) #######################[ Parse Command Line parameters ]####################### def parse_args(): """Parse command line and get parameters from environment, if present""" # Setup argument parser, the workhorse gluing it all together parser = ArgumentParser( description='Nagios check plugin for temperature sensors on RaspberryPi' ) parser.add_argument('-V', '--version',action="version",version=PROG_VERSION) pgroup = parser.add_mutually_exclusive_group(required=False) pgroup.add_argument('-C', '--celcius', action='store_const', dest='converter', const=CONVERT_CELCIUS, help='measure, critical and warn values in Celcius ' '(default)', default=CONVERT_CELCIUS) pgroup.add_argument('-F', '--farenheit',action='store_const', dest='converter', const=CONVERT_FARENHEIT, help='measure, critical and warn values in Farenheit') parser.add_argument('-w', '--warn', type=float, help='temperature for warning status') parser.add_argument('-c','--critical', type=float, help='temperature for critical status') pgroup = parser.add_mutually_exclusive_group(required=False) pgroup.add_argument('-q', '--quiet', default=logging.CRITICAL, action=SetLogLevel, const=LOGGING_NONE, help='quiet (no output, only exit with exit code)') pgroup.add_argument('-v', '--verbose', help='more verbose output', action=SetLogLevel, const=logging.INFO) pgroup.add_argument('-d', '--debug', help='debug output (more verbose)', action=SetLogLevel, const=logging.DEBUG) parser.add_argument('-l', '--logfile', action=SetLogFile, help='send logging output to logfile') subparser = parser.add_subparsers(title='Supported temperature sensors', required=True) cpuparser = ArgumentParser(add_help=False) cpuparser.add_argument('-f', '--file', default=CPU_SENSOR_DEV, help='input file (or device) to obtain data from' ' (defaults to %s)' % CPU_SENSOR_DEV) cmdparser = subparser.add_parser('rpi_cpu', parents=[cpuparser], help='read built-in Raspberry Pi CPU temperature') cmdparser.set_defaults(func=read_rpi_cpu_temp,cmdparser=cmdparser,retries=0) i2cparser = ArgumentParser(add_help=False) i2cparser.add_argument('-a', '--address', type=hex_int, help='I2C Address of sensor, use 0x.. for hex (*)') i2cparser.add_argument('-b', '--i2cbus', default=1, type=int, help='I2C Bus to use (defaults to 1)') cmdparser = subparser.add_parser('i2c_bme280', parents=[i2cparser], help='read I2C connected BME280 sensor', epilog='(*) default I2C address for an BME280 is %#x' % I2C_BMX280_DEFAULT_ADDR) cmdparser.set_defaults(func=read_i2c_bmX280, cmdparser=cmdparser, retries=0, read_humidity=True, default_address=I2C_BMX280_DEFAULT_ADDR) cmdparser = subparser.add_parser('i2c_bmp280', parents=[i2cparser], help='read I2C connected BMP280 sensor', epilog='(*) default I2C address for an BMP280 is %#x' % I2C_BMX280_DEFAULT_ADDR) cmdparser.set_defaults(func=read_i2c_bmX280, cmdparser=cmdparser, retries=0, read_humidity=False, default_address=I2C_BMX280_DEFAULT_ADDR) cmdparser = subparser.add_parser('i2c_mcp9808', parents=[i2cparser], help='read I2C connected MCP9808 sensor', epilog='(*) default I2C address for an MCP9808 is %#x' % I2C_MCP9808_DEFAULT_ADDR) cmdparser.set_defaults(func=read_i2c_mcp9808, cmdparser=cmdparser, retries=0, default_address=I2C_MCP9808_DEFAULT_ADDR) w1parser = ArgumentParser(add_help=False) pgroup = w1parser.add_mutually_exclusive_group(required=False) pgroup.add_argument('-s', '--serial', help='(unique part of) temperature sensor serial (*)') pgroup.add_argument('-f', '--file', help='input file (or device) to obtain data from (*)') w1parser.add_argument('-r', '--retries', type=int, default=W1_SENSOR_READ_RETRIES, help='number of times to retry reading sensor data when' ' unstable (defaults to %d)' % W1_SENSOR_READ_RETRIES) cmdparser = subparser.add_parser('w1_ds18b20', parents=[w1parser], help='read 1-wire connected DS18b20 sensor', epilog='(*) by default the script will look for the first device that ' 'matches %s* in %s, if multiple entries are found -s or -f must ' 'be used to specify which sensor to read.' % (W1_SENSOR_DEV_PREFIX, W1_SENSOR_DEV_DIR)) cmdparser.set_defaults(func=read_w1_ds18b20, cmdparser=cmdparser) # parse arguments and post-process command line options args = parser.parse_args() # if we got here all seems OK return args ############################[ Exit the Nagios way ]############################ def nagios_exit(status, message, data=None): """exit 'nagios-style', print status and message followed by perf. data""" if logger.isEnabledFor(logging.CRITICAL): if data is not None and len(data) > 0: perfdata = ' | ' + ' '.join([ "'%s'=%s" % (k, ';'.join(['' if x is None else str(x) for x in v]) if isinstance(v,list) else v) for k,v in data ]) else: perfdata = '' print('Temperature %s: %s%s' % (status[0], message, perfdata)) exit(status[1]) #################################[ Main logic ]################################# if __name__ == '__main__': try: args = parse_args() except ArgumentParserError as e: nagios_exit(NAGIOS_UNKNOWN,'error with setup: ' + ','.join(e.args)) except (KeyboardInterrupt, EOFError) as e: print() nagios_exit(NAGIOS_UNKNOWN,'initialization aborted') try: starttime = time() temperature, tries = args.func(args) endtime = time() if isinstance(temperature, tuple): temperature, extra_data = temperature else: extra_data = () except (KeyboardInterrupt) as e: nagios_exit(NAGIOS_UNKNOWN,'sensor read aborted by user') except (IOError, ValueError, ImportError) as e: nagios_exit(NAGIOS_UNKNOWN,'sensor read failed: %s' % e) elapse = endtime-starttime logger.info('Got temperature reading of %.2f degrees %s in %fs', temperature, args.converter[2], elapse) unit = args.converter[1] message = 'current temperature is %.2f%s' % (temperature, unit) data = [ ('temperature', [ '%f%s' % (temperature, unit), args.warn, args.critical, None, None]), ('retries', [ tries-1, None, args.retries, 0, None ]), ('checktime', [ '%fs' % elapse, None, None, 0, None]) ] if args.critical is not None and temperature > args.critical: nagiosresult = NAGIOS_CRITICAL message+= ' and exceeds critical threshold %.2f%s' %(args.critical,unit) elif args.warn is not None and temperature > args.warn: nagiosresult = NAGIOS_WARN message+= ' and exceeds warning threshold %.2f%s' % (args.warn,unit) else: nagiosresult = NAGIOS_OK for e in extra_data: message += ', %s is %.2f%s' % (e[0], e[1], e[2]) data += (e[0], [ '%f%s' % (e[1], e[2]), None, None, e[3] if len(e)>3 else None, e[4] if len(e)>4 else None ]), nagios_exit(nagiosresult, message, data)