Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Kevin Lyda
chkcrontab
Commits
dc322abe
Commit
dc322abe
authored
Jan 03, 2017
by
Kevin Lyda
💬
Browse files
Merge branch 'spahan' into spahan-merge
parents
e637a817
3c37ed3c
Pipeline
#1048
passed with stage
in 3 minutes and 58 seconds
Changes
5
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
chkcrontab
View file @
dc322abe
...
...
@@ -30,33 +30,27 @@ import sys
import
chkcrontab_lib
as
check
from
optparse
import
OptionParser
from
argparse
import
ArgumentParser
def
main
(
argv
):
"""main program."""
if
len
(
argv
)
==
1
:
print
(
'ERROR: No crontab file was specified.'
)
sys
.
exit
(
1
)
parser
=
ArgumentParser
(
description
=
"Parse crontab files and check each type of line for potential syntax errors."
)
parser
.
add_argument
(
'crontab'
,
help
=
'the crontab file to parse'
)
parser
.
add_argument
(
'-w'
,
'--whitelist'
,
action
=
'append'
,
dest
=
'whitelisted_users'
,
help
=
'A user to ignore when warning of unrecognized users This argument may be passed multiple times.'
,
default
=
[
'postgres'
,
'buildbot'
])
parser
.
add_argument
(
'-n'
,
'--no-check-passwd'
,
action
=
'store_false'
,
dest
=
'check_passwd'
,
help
=
'Disable user lookup'
)
parser
.
add_argument
(
'-q'
,
'--quiet'
,
action
=
'store_true'
,
dest
=
'quiet'
,
help
=
"Be quiet"
)
args
=
parser
.
parse_args
()
log
=
check
.
LogCounter
()
(
options
,
args
)
=
parse_chkcrontab_options
(
argv
)
for
crontab
in
args
[
1
:]:
print
(
'Checking correctness of %s'
%
crontab
)
return
check
.
check_crontab
(
crontab
,
log
,
options
.
whitelisted_users
)
def
parse_chkcrontab_options
(
argv
):
"""Parse the options for chkcrontab.
Args:
argv: The argument string supplied by the caller.
Returns:
options: A dictionary of options and their values.
args: Remaining arguments.
"""
parser
=
OptionParser
()
parser
.
add_option
(
'-w'
,
'--whitelist'
,
dest
=
'whitelisted_users'
,
action
=
'append'
,
help
=
'A user to ignore when warning of unrecognized users This argument may be passed multiple times.'
)
return
parser
.
parse_args
(
argv
)
if
not
args
.
quiet
:
print
(
'Checking correctness of %s'
%
args
.
crontab
)
return
check
.
check_crontab
(
args
,
log
)
if
__name__
==
'__main__'
:
sys
.
exit
(
main
(
sys
.
argv
))
chkcrontab_lib.py
View file @
dc322abe
...
...
@@ -78,9 +78,6 @@ import re
import
string
# The following usernames are created locally by packages.
USER_WHITELIST
=
set
((
'postgres'
,
'buildbot'
,
))
# The following extensions imply further postprocessing or that the slack
# role was for a cron that allowed dots in cron scripts.
FILE_RE_WHITELIST
=
[
re
.
compile
(
x
)
for
x
in
...
...
@@ -681,6 +678,11 @@ class CronLineAssignment(object):
'Variable assignments in crontabs are not like shell.'
' $VAR is not expanded.'
)
if
re
.
match
(
'".+" ?#'
,
self
.
variable
)
or
re
.
match
(
'[^"].*#'
,
self
.
variable
):
log
.
LineError
(
log
.
MSG_COMMENT
,
'Variable assignments in crontabs are not like shell.'
' # comment is not allowed.'
)
class
CronLineTimeAction
(
object
):
"""Checks cron lines that specify a time and an action.
...
...
@@ -688,10 +690,16 @@ class CronLineTimeAction(object):
Must be used as a subclass - subclass must implement _CheckTimeField.
"""
def
__init__
(
self
,
time_field
,
user
,
command
):
def
__init__
(
self
,
time_field
,
user
,
command
,
options
):
self
.
time_field
=
time_field
self
.
user
=
user
self
.
command
=
command
self
.
whitelisted_users
=
[]
if
hasattr
(
options
,
'whitelisted_users'
):
self
.
whitelisted_users
=
options
.
whitelisted_users
self
.
check_passwd
=
True
if
hasattr
(
options
,
'check_passwd'
):
self
.
check_passwd
=
options
.
check_passwd
def
_CheckTimeField
(
self
,
log
):
"""Virtual method to be implemented by subclasses to check time field."""
...
...
@@ -706,7 +714,7 @@ class CronLineTimeAction(object):
self
.
_CheckTimeField
(
log
)
# User checks.
if
self
.
user
in
USER_WHITELIST
:
if
self
.
user
in
self
.
whitelisted_users
:
pass
elif
len
(
self
.
user
)
>
31
:
log
.
LineError
(
log
.
MSG_INVALID_USER
,
...
...
@@ -715,12 +723,15 @@ class CronLineTimeAction(object):
log
.
LineError
(
log
.
MSG_INVALID_USER
,
'Invalid username "%s"'
%
self
.
user
)
elif
re
.
search
(
r
'[\s!"#$%&\'()*+,/:;<=>?@[\\\]^`{|}~]'
,
self
.
user
):
log
.
LineError
(
log
.
MSG_INVALID_USER
,
'Invalid username "%s"'
%
self
.
user
)
el
se
:
el
if
self
.
check_passwd
:
try
:
pwd
.
getpwnam
(
self
.
user
)
except
KeyError
:
log
.
LineWarn
(
log
.
MSG_USER_NOT_FOUND
,
'User "%s" not found.'
%
self
.
user
)
else
:
log
.
LineWarn
(
log
.
MSG_USER_NOT_FOUND
,
'User "%s" not found.'
%
self
.
user
)
# Command checks.
if
self
.
command
.
startswith
(
'%'
)
or
re
.
search
(
r
'[^\\]%'
,
self
.
command
):
...
...
@@ -798,11 +809,12 @@ class CronLineFactory(object):
def
__init__
(
self
):
pass
def
ParseLine
(
self
,
line
):
def
ParseLine
(
self
,
line
,
options
):
"""Classify a line.
Args:
line: The line to classify.
options: a dictionary with options to pass to the CronLineAction Object
Returns:
A CronLine* class (must have a ValidateAndLog method).
...
...
@@ -833,7 +845,7 @@ class CronLineFactory(object):
match
=
at_line_re
.
match
(
line
)
if
match
:
return
CronLineAt
(
match
.
groups
()[
0
],
match
.
groups
()[
1
],
match
.
groups
()[
2
])
match
.
groups
()[
2
]
,
options
)
# Is this line a cron job specifier?
match
=
time_field_job_line_re
.
match
(
line
)
...
...
@@ -845,7 +857,7 @@ class CronLineFactory(object):
'month'
:
match
.
groups
()[
3
],
'day of week'
:
match
.
groups
()[
4
],
}
return
CronLineTime
(
field
,
match
.
groups
()[
5
],
match
.
groups
()[
6
])
return
CronLineTime
(
field
,
match
.
groups
()[
5
],
match
.
groups
()[
6
]
,
options
)
return
CronLineUnknown
()
...
...
@@ -881,7 +893,8 @@ class LogCounter(object):
'QUOTE_VALUES'
,
'SHELL_VAR'
,
'USER_NOT_FOUND'
,
'HOURS_NOT_MINUTES'
))
'HOURS_NOT_MINUTES'
,
'COMMENT'
))
def
__init__
(
self
):
"""Inits LogCounter."""
...
...
@@ -1043,16 +1056,15 @@ class LogCounter(object):
return
self
.
_error_count
def
check_crontab
(
crontab_file
,
log
,
whitelisted_users
=
None
):
def
check_crontab
(
arguments
,
log
):
"""Check a crontab file.
Checks crontab_file for a variety of errors or potential errors. This only
works with the crontab format found in /etc/crontab and /etc/cron.d.
Args:
crontab_file: Name of
the crontab file
to check.
arguments: ArgumentPArser Object containing
the crontab file
and options
log: A LogCounter object.
whitelisted_users: A comma delimited list of users to ignore when warning on unrecognized users.
Returns:
0 if there were no errors.
...
...
@@ -1061,19 +1073,19 @@ def check_crontab(crontab_file, log, whitelisted_users=None):
"""
# Check if the file even exists.
if
not
os
.
path
.
exists
(
crontab
_file
):
if
not
os
.
path
.
exists
(
arguments
.
crontab
):
log
.
Warn
(
'File "%s" does not exist.'
%
crontab_file
)
return
log
.
Summary
()
# Add the any specified users to the whitelist
if
whitelisted_users
:
USER_WHITELIST
.
update
(
whitelisted_users
)
#
if
arguments.
whitelisted_users:
#
USER_WHITELIST.update(
arguments.
whitelisted_users)
# Check the file name.
if
re
.
search
(
'[^A-Za-z0-9_-]'
,
os
.
path
.
basename
(
crontab
_file
)):
if
re
.
search
(
'[^A-Za-z0-9_-]'
,
os
.
path
.
basename
(
arguments
.
crontab
)):
in_whitelist
=
False
for
pattern
in
FILE_RE_WHITELIST
:
if
pattern
.
search
(
os
.
path
.
basename
(
crontab
_file
)):
if
pattern
.
search
(
os
.
path
.
basename
(
arguments
.
crontab
)):
in_whitelist
=
True
break
if
not
in_whitelist
:
...
...
@@ -1082,14 +1094,14 @@ def check_crontab(crontab_file, log, whitelisted_users=None):
line_no
=
0
cron_line_factory
=
CronLineFactory
()
with
open
(
crontab
_file
,
'r'
)
as
crontab_f
:
with
open
(
arguments
.
crontab
,
'r'
)
as
crontab_f
:
for
line
in
crontab_f
:
missing_newline
=
line
[
-
1
]
!=
"
\n
"
line
=
line
.
strip
()
line_no
+=
1
cron_line
=
cron_line_factory
.
ParseLine
(
line
)
cron_line
=
cron_line_factory
.
ParseLine
(
line
,
arguments
)
cron_line
.
ValidateAndLog
(
log
)
log
.
Emit
(
line_no
,
line
)
...
...
tests/test_check.py
View file @
dc322abe
...
...
@@ -307,11 +307,10 @@ class CheckCrontabUnitTest(unittest.TestCase):
exp_rc
=
0
return
(
exp_warn
,
exp_fail
,
exp_rc
)
def
CheckACrontab
(
self
,
crontab
,
whitelisted_users
=
None
):
def
CheckACrontab
(
self
,
arguments
):
log
=
check
.
LogCounter
()
crontab_file
=
os
.
path
.
join
(
BASE_PATH
,
crontab
)
(
exp_warn
,
exp_fail
,
exp_rc
)
=
self
.
GetExpWFRs
(
crontab_file
)
self
.
assertEquals
(
check
.
check_crontab
(
crontab_file
,
log
,
whitelisted_users
),
exp_rc
,
(
exp_warn
,
exp_fail
,
exp_rc
)
=
self
.
GetExpWFRs
(
arguments
.
crontab
)
self
.
assertEquals
(
check
.
check_crontab
(
arguments
,
log
),
exp_rc
,
'Failed to return %d for crontab errors.'
%
exp_rc
)
self
.
assertEquals
(
log
.
warn_count
,
exp_warn
,
'Found %d warns not %d.'
%
(
log
.
warn_count
,
exp_warn
))
...
...
@@ -319,19 +318,36 @@ class CheckCrontabUnitTest(unittest.TestCase):
'Found %d errors not %d.'
%
(
log
.
error_count
,
exp_fail
))
def
testCheckBadCrontab
(
self
):
self
.
CheckACrontab
(
'test_crontab'
)
args
=
type
(
""
,
(),
{})()
args
.
crontab
=
os
.
path
.
join
(
BASE_PATH
,
'test_crontab'
)
self
.
CheckACrontab
(
args
)
def
testCheckWarnCrontab
(
self
):
self
.
CheckACrontab
(
'test_crontab.warn'
)
args
=
type
(
""
,
(),
{})()
args
.
crontab
=
os
.
path
.
join
(
BASE_PATH
,
'test_crontab.warn'
)
self
.
CheckACrontab
(
args
)
def
testCheckWarnWithDisablesCrontab
(
self
):
self
.
CheckACrontab
(
'test_crontab.no-warn'
)
args
=
type
(
""
,
(),
{})()
args
.
crontab
=
os
.
path
.
join
(
BASE_PATH
,
'test_crontab.no-warn'
)
self
.
CheckACrontab
(
args
)
def
testCheckBadWithDisablesCrontab
(
self
):
self
.
CheckACrontab
(
'test_crontab.disable'
)
args
=
type
(
""
,
(),
{})()
args
.
crontab
=
os
.
path
.
join
(
BASE_PATH
,
'test_crontab.disable'
)
self
.
CheckACrontab
(
args
)
def
testCheckWarnWithWhitelistedUser
(
self
):
self
.
CheckACrontab
(
'test_crontab.whitelist'
,
[
'not_a_user'
])
args
=
type
(
""
,
(),
{})()
args
.
crontab
=
os
.
path
.
join
(
BASE_PATH
,
'test_crontab.whitelist'
)
args
.
whitelisted_users
=
[
'not_a_user'
]
self
.
CheckACrontab
(
args
)
def
testCheckBadWithUserLookup
(
self
):
args
=
type
(
""
,
(),
{})()
args
.
crontab
=
os
.
path
.
join
(
BASE_PATH
,
'test_crontab.lookup'
)
args
.
check_passwd
=
False
self
.
CheckACrontab
(
args
)
if
__name__
==
'__main__'
:
...
...
tests/test_crontab.disable
View file @
dc322abe
...
...
@@ -24,6 +24,11 @@ OK_EMPTY=""
BAD_SPACE=
OK_SPACE=" "
# disable fail 1 for comment in variable assignment.
NO_COMMENT="something" # comment
NO_COMMENT=something # comment
NO_COMMENT="something # comment"
# disable warn 1 for bad time spec.
* 3 * * * root Warn for hours not minutes
# disable fail 1 for bad time spec.
...
...
@@ -74,7 +79,7 @@ OK_SPACE=" "
1,4,6,*/5 */3,2,7 * * mOn-Fri root Good Day Range
1,4,6,*/5 */3,2,7 * * mOn-Fri/2 root Good Day Range Step
# chkcrontab: enable-msg=FIELD_VALUE_ERROR
# FAIL
1
for bad time spec.
# FAIL
3
for bad time spec.
1,4,6,*/5 */3,2,7 * * mOn/2 root Bad Day Step
1,4,6,*/5 */3,2,7 * * mOn-Fri/8 root Good Day Range Step
# disable warn 1 for probable missing user.
...
...
tests/test_crontab.lookup
0 → 100644
View file @
dc322abe
# WARN 1 for questionable file name.
# WARN 1 for missing user
1 * * * * root Command
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment