|
1
|
#! /usr/bin/env python3
|
|
2
3
4
|
#
# check_otp - Nagios check plugin for LinOTP/PrivacyIDEA OTP validation
#
|
|
5
|
# Version 1.2, latest version, documentation and bugtracker available at:
|
|
6
7
|
# https://gitlab.lindenaar.net/scripts/nagios-plugins
#
|
|
8
|
# Copyright (c) 2018 - 2024 Frederik Lindenaar
|
|
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
#
# This script is free software: you can redistribute and/or modify it under the
# terms of version 3 of the GNU General Public License as published by the Free
# Software Foundation, or (at your option) any later version of the license.
#
# This script is distributed in the hope that it will be useful but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, visit <http://www.gnu.org/licenses/> to download it.
import sys, os, logging, socket, hmac, json
from time import time
from struct import pack
from hashlib import sha1
from getpass import getpass
|
|
26
27
|
from urllib.parse import urlencode
from urllib.request import Request, HTTPError, URLError, urlopen
|
|
28
29
30
|
from ssl import CertificateError, \
create_default_context as create_default_SSL_context, \
_create_unverified_context as create_unverified_SSL_context
|
|
31
32
33
34
35
|
from base64 import b16decode, b32decode, standard_b64decode
from argparse import ArgumentParser as StandardArgumentParser, FileType, \
_StoreAction as StoreAction, _StoreConstAction as StoreConstAction
# Constants (no need to change but allows for easy customization)
|
|
36
|
VERSION="1.2"
|
|
37
38
|
PROG_NAME=os.path.splitext(os.path.basename(__file__))[0]
PROG_VERSION=PROG_NAME + ' ' + VERSION
|
|
39
|
HTTP_AGENT=PROG_NAME + '/' + VERSION
|
|
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
|
URL_API_SUFFIX='/validate/check'
ENV_VAR_USER='USER_NAME'
ENV_VAR_PWD='USER_PASSWORD'
ENV_VAR_SERIAL='TOKEN_SERIAL'
ENV_VAR_KEY='TOKEN_KEY'
ENV_VAR_NAME='CHECK_NAME'
OTP_DIGITS=6
OTP_TOTP_WINDOW=30
OTP_DIGEST_ALG=sha1
LOG_FORMAT='%(levelname)s - %(message)s'
LOG_FORMAT_FILE='%(asctime)s - ' + LOG_FORMAT
LOGGING_NONE=logging.CRITICAL + 10
NAGIOS_OK = ( 'OK', 0)
NAGIOS_WARN = ( 'WARNING', 1)
NAGIOS_CRITICAL = ( 'CRITICAL', 2 )
NAGIOS_UNKNOWN = ( 'UNKNOWN', 3 )
# Setup logging
logging.basicConfig(format=LOG_FORMAT)
logging.addLevelName(LOGGING_NONE, 'NONE')
logger = logging.getLogger(PROG_NAME)
logger.setLevel(logging.CRITICAL)
###################[ minimalistic HOTP/TOTP implementation ]###################
# based on Yoav Aner's code: http://blog.gingerlime.com/2010/once-upon-a-time #
# latest version: https://github.com/gingerlime/hotpie/blob/master/hotpie.py #
def HOTP(K, C, digits=OTP_DIGITS, digest=OTP_DIGEST_ALG):
"""Calculate the HOTP value for Key K and count C, returns OTP value"""
digest = hmac.new(key=K, msg=pack(b"!Q", C), digestmod=digest).hexdigest()
offset = int(digest[-1], 16)
return str(int(digest[(offset<<1):((offset<<1)+8)],16)&0x7fffffff)[-digits:]
def TOTP(K, C=None, d=OTP_DIGITS, win=OTP_TOTP_WINDOW, dg=OTP_DIGEST_ALG):
"""Calculate the TOTP value for Key K (using HOTP), returns OTP value"""
return HOTP(K, int((time() if C is None else C) / win), digits=d, digest=dg)
###############################################################################
def Password(K, C):
"""Dummy routine to represent Password authentication (without OTP)"""
return None
###################[ patch to force urllib on ipv4 / ipv6 ]###################
# Support functions to allow forcing urllib2.urlopen() IPv4 or IPv6 connections
# based on http://stackoverflow.com/questions/2014534/force-python-mechanize-urllib2-to-only-use-a-requests/6319043#6319043
# the trick is to wrap the original socket.getaddrinfo and enforce the family
socket.origGetAddrInfo = socket.getaddrinfo
socket.getAddrInfoFamily = 0
def getAddrInfoWrapper(host, port, family=0, socktype=0, proto=0, flags=0):
"""wrapper for socket.getaddrinfo() to connect only to a specific family"""
if family == 0:
family = socket.getAddrInfoFamily
result=socket.origGetAddrInfo(host, port, family, socktype, proto, flags)
logger.debug('connecting over IPv4 to %s:%d' if family==socket.AF_INET else
'connecting over IPv6 to %s:%d' if family==socket.AF_INET6 else
'connecting to %s:%d' ,result[0][4][0],result[0][4][1])
return result
socket.getaddrinfo = getAddrInfoWrapper
###############################################################################
################[ wrapper to stop ArgumentParser from exiting ]################
# based on http://stackoverflow.com/questions/14728376/i-want-python-argparse-to-throw-an-exception-rather-than-usage/14728477#14728477
# the only way to do this is overriding the error method and throw and Exception
class ArgumentParserError(Exception): pass
class ArgumentParser(StandardArgumentParser):
"""ArgumentParser not exiting with non-Nagios format message upon errors"""
def error(self, message):
raise ArgumentParserError(message)
##################[ Action to immediately set the log level ]##################
class SetSocketAddrFamily(StoreConstAction):
"""ArgumentParser action to set socket.getAddrInfo() Addr Family to const"""
def __call__(self, parser, namespace, values, option_string=None):
socket.getAddrInfoFamily = self.const
##################[ Action to immediately set the log level ]##################
class SetLogLevel(StoreConstAction):
"""ArgumentParser action to set log level to provided const value"""
def __call__(self, parser, namespace, values, option_string=None):
logging.getLogger(PROG_NAME).setLevel(self.const)
####################[ Action to immediately log to a file ]####################
class SetLogFile(StoreAction):
"""ArgumentParser action to log to file (sets up FileHandler accordingly)"""
def __call__(self, parser, namespace, values, option_string=None):
super(SetLogFile, self).__call__(parser,namespace,values,option_string)
formatter = logging.Formatter(LOG_FORMAT_FILE)
handler = logging.FileHandler(values)
handler.setFormatter(formatter)
logger = logging.getLogger(PROG_NAME)
logger.propagate = False
logger.addHandler(handler)
####################[ Action to load contents from a file ]####################
class LoadFromFile(StoreAction):
"""ArgumentParser action to load file contents into another variable """
def __init__(self,option_strings,dest,nargs=None,const=None,default=None,
type=None,choices=None,required=False,help=None,metavar=None):
if not isinstance(type, FileLoadType):
raise ArgumentParserError('LoadFromFile action option %s requires '
'type FileLoadType (got %s)' % (option_strings, type))
super(LoadFromFile, self).__init__(option_strings,dest,nargs,const,
default,type,choices,required,help,metavar)
def __call__(self, parser, namespace, values, option_string=None):
super(LoadFromFile,self).__call__(parser,namespace,values,option_string)
logger.info('reading %s from %s', self.type.name, values.name)
try:
content = values.readline().strip()
if self.type.close:
values.close()
setattr(namespace, self.type.name, self.type.type(content) \
if not isempty(content) else self.type.type())
except (IOError, ValueError) as e:
raise ArgumentParserError('cannot read %s from %s: %s' %
(self.type.name, values.name, e))
####################[ Enhanced FileType for LoadFromFile ]####################
class FileLoadType(FileType):
"""ArgumentParser FileType extension storing data needed by LoadFromFile"""
def __init__(self, name, valuetype=str, mode='r', close=None, bufsize=-1):
self.name = name
self.type = valuetype
self.close = mode == 'r' if close is None else close
super(FileLoadType, self).__init__(mode, bufsize)
##############[ Action to prompt for password if value is empty ]##############
class PasswordPrompt(StoreAction):
"""ArgumentParser action to prompt for password when empty (and store it)"""
def __call__(self, parser, namespace, values, option_string=None):
pwd = getpass('Please enter password: ') if values is None else values
super(PasswordPrompt, self).__call__(parser,namespace,pwd,option_string)
###############################################################################
def isempty(string):
"""Checks whether string 'str' provided is unset or empty"""
return string is None or len(string) == 0
def envvar(name, default=None):
"""Returns value of environment var 'name', or value of default otherwise"""
return os.environ.get(name, default)
def base16_32_64(string=None):
"""convert encoded string to binary, supports base16, base32 and base64
Args:
string (str) : base 16/32/64 encoded string
Returns:
binary version of string, provided it is base 16/32/64 encoded
"""
if not isempty(string):
try:
encoding, value = ('base16', b16decode(string, True))
except TypeError:
try:
encoding, value = ('base32', b32decode(string, False))
except TypeError:
encoding, value = ('base64', standard_b64decode(string))
logger.debug("converted %s encoded key '%s' to %d bytes of binary data",
encoding, string, len(value))
return value
def parse_args():
"""Parse command line and get parameters from environment, if present"""
# Setup argument parser, the workhorse gluing it all together
parser = ArgumentParser(
description='Nagios check for OTP validation against LinOTP/PrivacyIDEA'
)
parser.add_argument('-V', '--version',action="version",version=PROG_VERSION)
pgroup = parser.add_mutually_exclusive_group(required=True)
pgroup.add_argument('-u', '--url',
help='URL to check OTP authentication against')
pgroup.add_argument('-H', '--host',
help='hostname to test against (to construct URL)')
parser.add_argument('-p', '--port', type=int,
help='port number to connect to (only used with -H)')
parser.add_argument('-P', '--path',
help='URL path to be used (only used with -H)')
|
|
229
|
|
|
230
231
|
parser.add_argument('-S', '--no-ssl', action='store_true',
help='connect WITHOUT SSL (only used with -H)')
|
|
232
233
234
235
236
237
238
239
240
|
parser.add_argument('--no-ssl-validation',
action='store_const', dest='sslcontext',
const=create_unverified_SSL_context(),
default=create_default_SSL_context(),
help='Do not validate server SSL certificate (DANGEROUS)')
parser.add_argument('--cacert',
help='CA Certificate file for SSL server validation')
parser.add_argument('--capath',
help='CA Certificate directory for SSL server validation')
|
|
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
|
parser.add_argument('-n', '--name', default=envvar(ENV_VAR_NAME, PROG_NAME),
help="name in authentication request for logging "
"(defaults to '%s')" % PROG_NAME)
parser.add_argument('-w', '--warn', type=float,
help='Response time for warning status (seconds)')
parser.add_argument('-c','--critical', type=float,
help='Response time for critical status (seconds)')
pgroup = parser.add_mutually_exclusive_group(required=False)
pgroup.add_argument('-4', '--ipv4', action=SetSocketAddrFamily,
const=socket.AF_INET, help='connect using IPv4')
pgroup.add_argument('-6', '--ipv6', action=SetSocketAddrFamily,
const=socket.AF_INET6,help='connect using IPv6')
pgroup = parser.add_mutually_exclusive_group(required=False)
pgroup.add_argument('-q', '--quiet', default=logging.CRITICAL,
action=SetLogLevel, const=LOGGING_NONE,
help='quiet (no output, only exit with exit code)')
pgroup.add_argument('-v', '--verbose', help='more verbose output',
action=SetLogLevel, const=logging.INFO)
pgroup.add_argument('-d', '--debug', help='debug output (more verbose)',
action=SetLogLevel, const=logging.DEBUG)
parser.add_argument('-l', '--logfile', action=SetLogFile,
help='send logging output to logfile')
commonparser = ArgumentParser(add_help=False)
pgroup = commonparser.add_mutually_exclusive_group(required=False)
pgroup.add_argument('-l', '--login', default=envvar(ENV_VAR_USER),
help='username to login with, can only be omitted when '
'%s is set or -s/--serial is used' % ENV_VAR_USER)
pgroup.add_argument('-s', '--serial', default=envvar(ENV_VAR_SERIAL),
help='token serial to use, can only be omitted when %s '
'is set or -l/--login is used' % ENV_VAR_SERIAL)
pgroup = commonparser.add_mutually_exclusive_group(required=False)
pgroup.add_argument('-p', '--password', nargs='?', action=PasswordPrompt,
default=envvar(ENV_VAR_PWD),
help='password or OTP+PIN to authenticate, uses env. '
'var %s if not present and prompts when PASSWORD '
'omitted (use "" for empty password)'%ENV_VAR_PWD)
pgroup.add_argument('-P', '--passwordfile',
action=LoadFromFile,type=FileLoadType('password'),
help='read password/authentication secret from file'),
subparser = parser.add_subparsers(title='Implemented test modes / checks')
cmdparser = subparser.add_parser('password', parents=[commonparser],
help='perform test with provided secret')
cmdparser.set_defaults(func=Password, cmdparser=cmdparser)
otpparser = ArgumentParser(add_help=False)
pgroup = otpparser.add_mutually_exclusive_group(required=False)
pgroup.add_argument('-k', '--key', default=envvar(ENV_VAR_KEY),
type=base16_32_64, help='HOTP/TOTP key, can only be'
'omitted if %s is set or -K is used' % ENV_VAR_KEY)
pgroup.add_argument('-K', '--keyfile', action=LoadFromFile,
type=FileLoadType('key', base16_32_64),
help='read HOTP key from the file specified'),
otpparser.add_argument('-m','--merge', choices=['pwdOTP', 'OTPpwd' ],
default='pwdOTP', help='how to merge password and OTP')
cmdparser = subparser.add_parser('hotp', parents=[commonparser, otpparser],
help='perform HOTP check using provided key and count')
pgroup = cmdparser.add_mutually_exclusive_group(required=True)
pgroup.add_argument('-c', '--count', type=int,
help='count to be used to calculate the HTOP value')
pgroup.add_argument('-C', '--countfile',type=FileLoadType('count',int,'r+'),
action=LoadFromFile,
help='read HOTP count from file and update it')
cmdparser.add_argument('-i', '--increment', type=int, default=2,
help='increment value for count (default=1)')
cmdparser.set_defaults(func=HOTP, cmdparser=cmdparser)
cmdparser = subparser.add_parser('totp',parents=[commonparser, otpparser],
help='perform TOTP test using provided key')
cmdparser.set_defaults(func=TOTP, cmdparser=cmdparser)
# parse arguments and post-process command line options
args = parser.parse_args()
# Generate the URL if not provided on the command line
if isempty(args.url):
args.url = 'http://' if args.no_ssl else 'https://'
args.url+= args.host
if not isempty(args.port):
args.url+= ':' + args.port
if not isempty(args.path):
args.url+= args.path if args.path[0]=='/' else '/' + args.path
|
|
336
337
338
339
340
341
342
343
344
|
# Pass-on CA file/path to SSLContext if provided on the command line
if not isempty(args.cacert):
try:
args.sslcontext.load_verify_locations(cafile=args.cacert)
except IOError as e:
args.cmdparser.error("cafile '%s': %s" % (args.cacert, str(e)))
elif not isempty(args.capath):
args.sslcontext.load_verify_locations(capath=args.capath)
|
|
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
|
# We should now be ready to authenticate, fail if that's not the case
if args.func == Password:
if isempty(args.login) and isempty(args.serial) \
or isempty(args.password):
args.cmdparser.error('user/serial and a secret are required!')
elif args.func == HOTP or args.func == TOTP:
if isempty(args.login) and isempty(args.serial) or isempty(args.key):
args.cmdparser.error('user/serial and a key are required!')
else:
args.cmdparser.error("BUG: mode %s is not supported"%args.func.__name__)
# if we got here all seems OK
return args
|
|
362
|
def checkotp(url, subject, secret, isserial=False, nas=None, sslcontext=create_default_SSL_context()):
|
|
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
|
"""Check a subject (user or token) with secret against PrivacyIDEA / LinOTP.
Args:
url (str) : URL to connect to, URL_API_SUFFIX is added if missing
subject (str) : subject to authenticate (user or a token serial)
secret (str) : secret (password+OTP) to authenticate with
isserial (bool): True if subject is a token serial (optional)
nas (str) : string to pass-on as the nas string (optional)
Returns:
The result response from the PrivacyIDEA server (mapping object)
"""
# Complete (fix) URL
if not url.endswith(URL_API_SUFFIX):
url += URL_API_SUFFIX[1:] if url[-1] == '/' else URL_API_SUFFIX
logger.info('connecting to %s', url)
# Prepare the parameters
params = { 'pass': secret, 'serial' if isserial else 'user': subject }
if not isempty(nas):
params['nas'] = nas
if logger.isEnabledFor(logging.DEBUG):
|
|
385
386
|
logger.debug('HTTP request parameters: %s', ', '.join([ '%s=%s' % (k,
v if k!='pass' else '***MASKED***') for k,v in params.iteritems()]))
|
|
387
388
|
# Perform the API authentication request
|
|
389
390
391
|
response = json.load(urlopen(Request(url, data=urlencode(params),
headers={'User-Agent':HTTP_AGENT}),
context=sslcontext))
|
|
392
393
394
395
396
397
398
|
if logger.isEnabledFor(logging.DEBUG):
logger.debug('result: %s', json.dumps(response, indent=4))
return response
def nagios_exit(status, message, data=None):
|
|
399
|
"""exit 'nagios-style', print status and message followed by perf. data"""
|
|
400
401
|
if logger.isEnabledFor(logging.CRITICAL):
if data is not None and len(data) > 0:
|
|
402
403
404
|
perfdata = ' | ' + ' '.join([ "'%s'=%s" % (k,
';'.join(['' if x is None else str(x) for x in v])
if isinstance(v,list) else v) for k,v in data ])
|
|
405
|
else:
|
|
406
|
perfdata = ''
|
|
407
|
print('OTP %s: %s%s' % (status[0], message, perfdata))
|
|
408
409
410
411
412
413
414
|
sys.exit(status[1])
if __name__ == '__main__':
try:
args = parse_args()
except ArgumentParserError as e:
|
|
415
|
nagios_exit(NAGIOS_UNKNOWN,'error with setup: ' + ','.join(e.args))
|
|
416
|
except (KeyboardInterrupt, EOFError) as e:
|
|
417
|
print()
|
|
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
|
nagios_exit(NAGIOS_UNKNOWN,'initialization aborted')
message = args.func.__name__ + ' authentication'
if 'key' in args:
secret = args.func(args.key, args.count if 'count' in args else None)
if not isempty(args.password):
secret = (secret + args.password) if args.merge=='OTPpwd' \
else (args.password + secret)
else:
secret = args.password
try:
starttime = time()
response=checkotp(args.url,
args.login if isempty(args.serial) else args.serial,
secret, isserial=not isempty(args.serial),
|
|
435
|
nas=args.name, sslcontext=args.sslcontext)
|
|
436
437
|
endtime = time()
|
|
438
|
except (HTTPError, URLError, CertificateError) as e:
|
|
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
|
nagios_exit(NAGIOS_CRITICAL,'%s request failed: %s' % (message, e))
except (KeyboardInterrupt, EOFError) as e:
nagios_exit(NAGIOS_UNKNOWN,'%s request aborted' % message)
resultdata = response.get('result')
if resultdata is None or not resultdata['status']:
nagios_exit(NAGIOS_CRITICAL,'%s request processing failed' % message)
authenticated = resultdata['status'] and resultdata['value']
detaildata = response.get('detail')
elapse = endtime-starttime
if logger.isEnabledFor(logging.INFO):
logger.info('Got response from : %s', response.get('version'))
logger.info('Processing time : %.2fs', elapse)
logger.info('Got valid result : %s', resultdata.get('status'))
logger.info('Authenticated : %s', authenticated)
for field in 'message', 'type', 'serial':
if field in detaildata:
logger.info('Token %-12s: %s', field, detaildata.get(field))
errmsgs = []
if authenticated and 'countfile' in args and args.increment > 0:
fname = args.countfile.name
nwcnt = args.count + args.increment
logger.info('updating count in %s from %d to %d',fname,args.count,nwcnt)
try:
with args.countfile as countfile:
countfile.seek(0)
countfile.write(str(nwcnt))
countfile.truncate()
except IOError as e:
errmsgs.append('unable to update countfile %s: %s' % (fname, e))
logger.critical(errmsgs[-1])
for k, src, g in ('serial',detaildata,' of '),('version',response,' with '):
if k in src:
message += g + src.get(k)
if authenticated:
nagiosresult = NAGIOS_OK
message += ' succeeded'
else:
nagiosresult = NAGIOS_CRITICAL
message += ' failed'
if args.critical is not None and elapse > args.critical:
errmsgs.append('took too long (%.2fs > %.2fs)' % (elapse,args.critical))
logger.critical('response %s', errmsgs[-1])
elif args.warn is not None and elapse > args.warn:
if nagiosresult != NAGIOS_CRITICAL:
nagiosresult = NAGIOS_WARN
errmsgs.append('is slow (%.2fs > %.2fs)' % (elapse, args.warn))
logger.warn('response time %s', errmsgs[-1])
else:
message+= ' in %.1fs' % elapse
logger.info('response completed in %.1fs', elapse)
if len(errmsgs) > 0:
message+= ' and ' if nagiosresult == NAGIOS_CRITICAL else ' but '
message+= ' and '.join(errmsgs)
if nagiosresult == NAGIOS_OK:
nagiosresult = NAGIOS_CRITICAL
if 'message' in detaildata:
message += ': ' + detaildata.get('message')
|
|
504
505
|
nagios_exit(nagiosresult, message, [
('time', [ elapse, args.warn, args.critical, 0, None ])])
|
|
506
|
|