#! /usr/bin/env python3
#
# check_otp - Nagios check plugin for LinOTP/PrivacyIDEA OTP validation
#
# Version 1.2, latest version, documentation and bugtracker available at:
#              https://gitlab.lindenaar.net/scripts/nagios-plugins
#
# Copyright (c) 2018 - 2024 Frederik Lindenaar
#
# This script is free software: you can redistribute and/or modify it under the
# terms of version 3 of the GNU General Public License as published by the Free
# Software Foundation, or (at your option) any later version of the license.
#
# This script is distributed in the hope that it will be useful but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program.  If not, visit <http://www.gnu.org/licenses/> to download it.

import sys, os, logging, socket, hmac, json
from time import time
from struct import pack
from hashlib import sha1
from getpass import getpass
from urllib.parse import urlencode
from urllib.request import Request, HTTPError, URLError, urlopen
from ssl import CertificateError, \
                create_default_context as create_default_SSL_context, \
                _create_unverified_context as create_unverified_SSL_context
from base64 import b16decode, b32decode, standard_b64decode
from argparse import ArgumentParser as StandardArgumentParser, FileType, \
              _StoreAction as StoreAction, _StoreConstAction as StoreConstAction

# Constants (no need to change but allows for easy customization)
VERSION="1.2"
PROG_NAME=os.path.splitext(os.path.basename(__file__))[0]
PROG_VERSION=PROG_NAME + ' ' + VERSION
HTTP_AGENT=PROG_NAME + '/' + VERSION
URL_API_SUFFIX='/validate/check'
ENV_VAR_USER='USER_NAME'
ENV_VAR_PWD='USER_PASSWORD'
ENV_VAR_SERIAL='TOKEN_SERIAL'
ENV_VAR_KEY='TOKEN_KEY'
ENV_VAR_NAME='CHECK_NAME'
OTP_DIGITS=6
OTP_TOTP_WINDOW=30
OTP_DIGEST_ALG=sha1
LOG_FORMAT='%(levelname)s - %(message)s'
LOG_FORMAT_FILE='%(asctime)s - ' + LOG_FORMAT
LOGGING_NONE=logging.CRITICAL + 10
NAGIOS_OK = ( 'OK', 0)
NAGIOS_WARN = ( 'WARNING', 1)
NAGIOS_CRITICAL = ( 'CRITICAL', 2 )
NAGIOS_UNKNOWN = ( 'UNKNOWN', 3 )

# Setup logging
logging.basicConfig(format=LOG_FORMAT)
logging.addLevelName(LOGGING_NONE, 'NONE')
logger = logging.getLogger(PROG_NAME)
logger.setLevel(logging.CRITICAL)

###################[ minimalistic HOTP/TOTP implementation ]###################
# based on Yoav Aner's code: http://blog.gingerlime.com/2010/once-upon-a-time #
# latest version: https://github.com/gingerlime/hotpie/blob/master/hotpie.py  #

def HOTP(K, C, digits=OTP_DIGITS, digest=OTP_DIGEST_ALG):
    """Calculate the HOTP value for Key K and count C, returns OTP value"""
    digest = hmac.new(key=K, msg=pack(b"!Q", C), digestmod=digest).hexdigest()
    offset = int(digest[-1], 16)
    return str(int(digest[(offset<<1):((offset<<1)+8)],16)&0x7fffffff)[-digits:]

def TOTP(K, C=None, d=OTP_DIGITS, win=OTP_TOTP_WINDOW, dg=OTP_DIGEST_ALG):
    """Calculate the TOTP value for Key K (using HOTP), returns OTP value"""
    return HOTP(K, int((time() if C is None else C) / win), digits=d, digest=dg)

###############################################################################

def Password(K, C):
    """Dummy routine to represent Password authentication (without OTP)"""
    return None

###################[ patch to force urllib on ipv4 / ipv6  ]###################
# Support functions to allow forcing urllib2.urlopen() IPv4 or IPv6 connections
# based on http://stackoverflow.com/questions/2014534/force-python-mechanize-urllib2-to-only-use-a-requests/6319043#6319043
# the trick is to wrap the original socket.getaddrinfo and enforce the family
socket.origGetAddrInfo = socket.getaddrinfo
socket.getAddrInfoFamily = 0
def getAddrInfoWrapper(host, port, family=0, socktype=0, proto=0, flags=0):
    """wrapper for socket.getaddrinfo() to connect only to a specific family"""
    if family == 0:
        family = socket.getAddrInfoFamily
    result=socket.origGetAddrInfo(host, port, family, socktype, proto, flags)
    logger.debug('connecting over IPv4 to %s:%d' if family==socket.AF_INET  else
                 'connecting over IPv6 to %s:%d' if family==socket.AF_INET6 else
                 'connecting to %s:%d' ,result[0][4][0],result[0][4][1])
    return result
socket.getaddrinfo = getAddrInfoWrapper
###############################################################################

################[ wrapper to stop ArgumentParser from exiting ]################
# based on http://stackoverflow.com/questions/14728376/i-want-python-argparse-to-throw-an-exception-rather-than-usage/14728477#14728477
# the only way to do this is overriding the error method and throw and Exception
class ArgumentParserError(Exception): pass

class ArgumentParser(StandardArgumentParser):
    """ArgumentParser not exiting with non-Nagios format message upon errors"""
    def error(self, message):
        raise ArgumentParserError(message)

##################[ Action to immediately set the log level ]##################
class SetSocketAddrFamily(StoreConstAction):
    """ArgumentParser action to set socket.getAddrInfo() Addr Family to const"""
    def __call__(self, parser, namespace, values, option_string=None):
        socket.getAddrInfoFamily = self.const

##################[ Action to immediately set the log level ]##################
class SetLogLevel(StoreConstAction):
    """ArgumentParser action to set log level to provided const value"""
    def __call__(self, parser, namespace, values, option_string=None):
        logging.getLogger(PROG_NAME).setLevel(self.const)

####################[ Action to immediately log to a file ]####################
class SetLogFile(StoreAction):
    """ArgumentParser action to log to file (sets up FileHandler accordingly)"""
    def __call__(self, parser, namespace, values, option_string=None):
        super(SetLogFile, self).__call__(parser,namespace,values,option_string)
        formatter = logging.Formatter(LOG_FORMAT_FILE)
        handler = logging.FileHandler(values)
        handler.setFormatter(formatter)
        logger = logging.getLogger(PROG_NAME)
        logger.propagate = False
        logger.addHandler(handler)

####################[ Action to load contents from a file ]####################
class LoadFromFile(StoreAction):
    """ArgumentParser action to load file contents into another variable """
    def __init__(self,option_strings,dest,nargs=None,const=None,default=None,
                 type=None,choices=None,required=False,help=None,metavar=None):
        if not isinstance(type, FileLoadType):
            raise ArgumentParserError('LoadFromFile action option %s requires '
                          'type FileLoadType (got %s)' % (option_strings, type))
        super(LoadFromFile, self).__init__(option_strings,dest,nargs,const,
                                     default,type,choices,required,help,metavar)

    def __call__(self, parser, namespace, values, option_string=None):
        super(LoadFromFile,self).__call__(parser,namespace,values,option_string)
        logger.info('reading %s from %s', self.type.name, values.name)
        try:
            content = values.readline().strip()
            if self.type.close:
                values.close()
            setattr(namespace, self.type.name, self.type.type(content) \
                                if not isempty(content) else self.type.type())
        except (IOError, ValueError) as e:
            raise ArgumentParserError('cannot read %s from %s: %s' %
                                            (self.type.name, values.name, e))

####################[ Enhanced FileType for LoadFromFile ]####################
class FileLoadType(FileType):
    """ArgumentParser FileType extension storing data needed by LoadFromFile"""
    def __init__(self, name, valuetype=str, mode='r', close=None, bufsize=-1):
        self.name = name
        self.type = valuetype
        self.close = mode == 'r' if close is None else close
        super(FileLoadType, self).__init__(mode, bufsize)

##############[ Action to prompt for password if value is empty ]##############
class PasswordPrompt(StoreAction):
    """ArgumentParser action to prompt for password when empty (and store it)"""
    def __call__(self, parser, namespace, values, option_string=None):
        pwd = getpass('Please enter password: ') if values is None else values
        super(PasswordPrompt, self).__call__(parser,namespace,pwd,option_string)

###############################################################################


def isempty(string):
    """Checks whether string 'str' provided is unset or empty"""
    return string is None or len(string) == 0


def envvar(name, default=None):
    """Returns value of environment var 'name', or value of default otherwise"""
    return os.environ.get(name, default)


def base16_32_64(string=None):
    """convert encoded string to binary, supports base16, base32 and base64

        Args:
            string (str) : base 16/32/64 encoded string

        Returns:
            binary version of string, provided it is base 16/32/64 encoded
    """
    if not isempty(string):
        try:
            encoding, value = ('base16', b16decode(string, True))
        except TypeError:
            try:
                encoding, value = ('base32', b32decode(string, False))
            except TypeError:
                encoding, value = ('base64', standard_b64decode(string))
        logger.debug("converted %s encoded key '%s' to %d bytes of binary data",
                                                encoding, string, len(value))
        return value


def parse_args():
    """Parse command line and get parameters from environment, if present"""

    # Setup argument parser, the workhorse gluing it all together
    parser = ArgumentParser(
        description='Nagios check for OTP validation against LinOTP/PrivacyIDEA'
    )
    parser.add_argument('-V', '--version',action="version",version=PROG_VERSION)

    pgroup = parser.add_mutually_exclusive_group(required=True)
    pgroup.add_argument('-u', '--url',
                        help='URL to check OTP authentication against')
    pgroup.add_argument('-H', '--host',
                        help='hostname to test against (to construct URL)')

    parser.add_argument('-p', '--port',     type=int,
                        help='port number to connect to (only used with -H)')
    parser.add_argument('-P', '--path',
                        help='URL path to be used (only used with -H)')

    parser.add_argument('-S', '--no-ssl',   action='store_true',
                        help='connect WITHOUT SSL (only used with -H)')
    parser.add_argument('--no-ssl-validation',
                        action='store_const', dest='sslcontext',
                        const=create_unverified_SSL_context(),
                        default=create_default_SSL_context(),
                        help='Do not validate server SSL certificate (DANGEROUS)')
    parser.add_argument('--cacert',
                        help='CA Certificate file for SSL server validation')
    parser.add_argument('--capath',
                        help='CA Certificate directory for SSL server validation')

    parser.add_argument('-n', '--name', default=envvar(ENV_VAR_NAME, PROG_NAME),
                        help="name in authentication request for logging "
                             "(defaults to '%s')" % PROG_NAME)

    parser.add_argument('-w', '--warn',     type=float,
                        help='Response time for warning status (seconds)')
    parser.add_argument('-c','--critical',  type=float,
                        help='Response time for critical status (seconds)')

    pgroup = parser.add_mutually_exclusive_group(required=False)
    pgroup.add_argument('-4', '--ipv4',     action=SetSocketAddrFamily,
                        const=socket.AF_INET, help='connect using IPv4')
    pgroup.add_argument('-6', '--ipv6',     action=SetSocketAddrFamily,
                        const=socket.AF_INET6,help='connect using IPv6')

    pgroup = parser.add_mutually_exclusive_group(required=False)
    pgroup.add_argument('-q', '--quiet',    default=logging.CRITICAL,
                        action=SetLogLevel, const=LOGGING_NONE,
                        help='quiet (no output, only exit with exit code)')
    pgroup.add_argument('-v', '--verbose',  help='more verbose output',
                        action=SetLogLevel, const=logging.INFO)
    pgroup.add_argument('-d', '--debug',    help='debug output (more verbose)',
                        action=SetLogLevel, const=logging.DEBUG)

    parser.add_argument('-l', '--logfile',  action=SetLogFile,
                        help='send logging output to logfile')

    commonparser = ArgumentParser(add_help=False)
    pgroup = commonparser.add_mutually_exclusive_group(required=False)
    pgroup.add_argument('-l', '--login',    default=envvar(ENV_VAR_USER),
                        help='username to login with, can only be omitted when '
                             '%s is set or -s/--serial is used' % ENV_VAR_USER)
    pgroup.add_argument('-s', '--serial',   default=envvar(ENV_VAR_SERIAL),
                        help='token serial to use, can only be omitted when %s '
                             'is set or -l/--login is used' % ENV_VAR_SERIAL)

    pgroup = commonparser.add_mutually_exclusive_group(required=False)
    pgroup.add_argument('-p', '--password', nargs='?', action=PasswordPrompt,
                        default=envvar(ENV_VAR_PWD),
                        help='password or OTP+PIN to authenticate, uses env. '
                             'var %s if not present and prompts when PASSWORD '
                             'omitted (use "" for empty password)'%ENV_VAR_PWD)
    pgroup.add_argument('-P', '--passwordfile',
                        action=LoadFromFile,type=FileLoadType('password'),
                        help='read password/authentication secret from file'),

    subparser = parser.add_subparsers(title='Implemented test modes / checks')

    cmdparser = subparser.add_parser('password', parents=[commonparser],
                                    help='perform test with provided secret')
    cmdparser.set_defaults(func=Password,   cmdparser=cmdparser)

    otpparser = ArgumentParser(add_help=False)
    pgroup = otpparser.add_mutually_exclusive_group(required=False)
    pgroup.add_argument('-k', '--key',      default=envvar(ENV_VAR_KEY),
                        type=base16_32_64,  help='HOTP/TOTP key, can only be'
                             'omitted if %s is set or -K is used' % ENV_VAR_KEY)
    pgroup.add_argument('-K', '--keyfile',  action=LoadFromFile,
                        type=FileLoadType('key', base16_32_64),
                        help='read HOTP key from the file specified'),

    otpparser.add_argument('-m','--merge',  choices=['pwdOTP', 'OTPpwd' ],
                        default='pwdOTP', help='how to merge password and OTP')


    cmdparser = subparser.add_parser('hotp', parents=[commonparser, otpparser],
                        help='perform HOTP check using provided key and count')

    pgroup = cmdparser.add_mutually_exclusive_group(required=True)
    pgroup.add_argument('-c', '--count',    type=int,
                        help='count to be used to calculate the HTOP value')
    pgroup.add_argument('-C', '--countfile',type=FileLoadType('count',int,'r+'),
                        action=LoadFromFile,
                        help='read HOTP count from file and update it')
    cmdparser.add_argument('-i', '--increment',  type=int, default=2,
                           help='increment value for count (default=1)')
    cmdparser.set_defaults(func=HOTP,       cmdparser=cmdparser)

    cmdparser = subparser.add_parser('totp',parents=[commonparser, otpparser],
                                    help='perform TOTP test using provided key')
    cmdparser.set_defaults(func=TOTP, cmdparser=cmdparser)

    # parse arguments and post-process command line options
    args = parser.parse_args()

    # Generate the URL if not provided on the command line
    if isempty(args.url):
        args.url = 'http://' if args.no_ssl else 'https://'
        args.url+= args.host
        if not isempty(args.port):
            args.url+= ':' + args.port
        if not isempty(args.path):
            args.url+= args.path if args.path[0]=='/' else '/' + args.path

    # Pass-on CA file/path to SSLContext if provided on the command line
    if not isempty(args.cacert):
        try:
            args.sslcontext.load_verify_locations(cafile=args.cacert)
        except IOError as e:
            args.cmdparser.error("cafile '%s': %s" % (args.cacert, str(e)))
    elif not isempty(args.capath):
        args.sslcontext.load_verify_locations(capath=args.capath)

    # We should now be ready to authenticate, fail if that's not the case
    if args.func == Password:
        if isempty(args.login) and isempty(args.serial) \
                               or isempty(args.password):
            args.cmdparser.error('user/serial and a secret are required!')

    elif args.func == HOTP or args.func == TOTP:
        if isempty(args.login) and isempty(args.serial) or isempty(args.key):
            args.cmdparser.error('user/serial and a key are required!')

    else:
        args.cmdparser.error("BUG: mode %s is not supported"%args.func.__name__)

    # if we got here all seems OK
    return args


def checkotp(url, subject, secret, isserial=False, nas=None, sslcontext=create_default_SSL_context()):
    """Check a subject (user or token) with secret against PrivacyIDEA / LinOTP.

    Args:
        url      (str) : URL to connect to, URL_API_SUFFIX is added if missing
        subject  (str) : subject to authenticate (user or a token serial)
        secret   (str) : secret (password+OTP) to authenticate with
        isserial (bool): True if subject is a token serial (optional)
        nas      (str) : string to pass-on as the nas string (optional)

    Returns:
        The result response from the PrivacyIDEA server (mapping object)
    """
    # Complete (fix) URL
    if not url.endswith(URL_API_SUFFIX):
        url += URL_API_SUFFIX[1:] if url[-1] == '/' else URL_API_SUFFIX
    logger.info('connecting to %s', url)

    # Prepare the parameters
    params = { 'pass': secret, 'serial' if isserial else 'user': subject }
    if not isempty(nas):
        params['nas'] = nas
    if logger.isEnabledFor(logging.DEBUG):
        logger.debug('HTTP request parameters: %s', ', '.join([ '%s=%s' % (k,
            v if k!='pass' else '***MASKED***') for k,v in params.iteritems()]))

     # Perform the API authentication request
    response = json.load(urlopen(Request(url, data=urlencode(params),
                                         headers={'User-Agent':HTTP_AGENT}),
                                 context=sslcontext))
    if logger.isEnabledFor(logging.DEBUG):
        logger.debug('result: %s', json.dumps(response, indent=4))

    return response


def nagios_exit(status, message, data=None):
    """exit 'nagios-style', print status and message followed by perf. data"""
    if logger.isEnabledFor(logging.CRITICAL):
        if data is not None and len(data) > 0:
            perfdata = ' | ' + ' '.join([ "'%s'=%s" % (k,
                ';'.join(['' if x is None else str(x) for x in v])
                        if isinstance(v,list) else v) for k,v in data ])
        else:
            perfdata = ''
        print('OTP %s: %s%s' % (status[0], message, perfdata))
    sys.exit(status[1])


if __name__ == '__main__':
    try:
        args = parse_args()
    except ArgumentParserError as e:
        nagios_exit(NAGIOS_UNKNOWN,'error with setup: ' + ','.join(e.args))
    except (KeyboardInterrupt, EOFError) as e:
        print()
        nagios_exit(NAGIOS_UNKNOWN,'initialization aborted')

    message = args.func.__name__ + ' authentication'

    if 'key' in args:
        secret = args.func(args.key, args.count if 'count' in args else None)
        if not isempty(args.password):
            secret = (secret + args.password) if args.merge=='OTPpwd' \
                                              else (args.password + secret)
    else:
        secret = args.password

    try:
        starttime = time()
        response=checkotp(args.url,
                          args.login if isempty(args.serial) else args.serial,
                          secret, isserial=not isempty(args.serial),
                          nas=args.name, sslcontext=args.sslcontext)
        endtime = time()

    except (HTTPError, URLError, CertificateError) as e:
        nagios_exit(NAGIOS_CRITICAL,'%s request failed: %s' % (message, e))

    except (KeyboardInterrupt, EOFError) as e:
        nagios_exit(NAGIOS_UNKNOWN,'%s request aborted' % message)

    resultdata = response.get('result')
    if resultdata is None or not resultdata['status']:
        nagios_exit(NAGIOS_CRITICAL,'%s request processing failed' % message)

    authenticated = resultdata['status'] and resultdata['value']
    detaildata = response.get('detail')
    elapse = endtime-starttime
    if logger.isEnabledFor(logging.INFO):
        logger.info('Got response from : %s', response.get('version'))
        logger.info('Processing time   : %.2fs', elapse)
        logger.info('Got valid result  : %s', resultdata.get('status'))
        logger.info('Authenticated     : %s', authenticated)
        for field in 'message', 'type', 'serial':
            if field in detaildata:
                logger.info('Token %-12s: %s', field, detaildata.get(field))

    errmsgs = []
    if authenticated and 'countfile' in args and args.increment > 0:
        fname = args.countfile.name
        nwcnt = args.count + args.increment
        logger.info('updating count in %s from %d to %d',fname,args.count,nwcnt)
        try:
            with args.countfile as countfile:
                countfile.seek(0)
                countfile.write(str(nwcnt))
                countfile.truncate()
        except IOError as e:
            errmsgs.append('unable to update countfile %s: %s' % (fname, e))
            logger.critical(errmsgs[-1])

    for k, src, g in ('serial',detaildata,' of '),('version',response,' with '):
        if k in src:
            message += g + src.get(k)
    if authenticated:
        nagiosresult = NAGIOS_OK
        message += ' succeeded'
    else:
        nagiosresult = NAGIOS_CRITICAL
        message += ' failed'

    if args.critical is not None and elapse > args.critical:
        errmsgs.append('took too long (%.2fs > %.2fs)' % (elapse,args.critical))
        logger.critical('response %s', errmsgs[-1])
    elif args.warn is not None and elapse > args.warn:
        if nagiosresult != NAGIOS_CRITICAL:
            nagiosresult = NAGIOS_WARN
        errmsgs.append('is slow (%.2fs > %.2fs)' % (elapse, args.warn))
        logger.warn('response time %s', errmsgs[-1])
    else:
        message+= ' in %.1fs' % elapse
        logger.info('response completed in %.1fs', elapse)

    if len(errmsgs) > 0:
        message+= ' and ' if nagiosresult == NAGIOS_CRITICAL else ' but '
        message+= ' and '.join(errmsgs)
        if nagiosresult == NAGIOS_OK:
            nagiosresult = NAGIOS_CRITICAL
    if 'message' in detaildata:
        message += ': ' + detaildata.get('message')

    nagios_exit(nagiosresult, message, [
                  ('time', [ elapse, args.warn, args.critical, 0, None ])])