Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
E
envsetup
Manage
Activity
Members
Plan
Redmine
Code
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Deploy
Releases
Container Registry
Model registry
Analyze
Contributor analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
mediaserver
envsetup
Commits
2ffc7926
Verified
Commit
2ffc7926
authored
5 years ago
by
Nicolas KAROLAK
Browse files
Options
Downloads
Patches
Plain Diff
update test email | refs
#28817
parent
98456673
No related branches found
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
tests/test_email.py
+141
-84
141 additions, 84 deletions
tests/test_email.py
with
141 additions
and
84 deletions
tests/test_email.py
+
141
−
84
View file @
2ffc7926
...
@@ -5,9 +5,10 @@ Criticality: High
...
@@ -5,9 +5,10 @@ Criticality: High
Checks that emails can be sent.
Checks that emails can be sent.
"""
"""
import
o
s
from
ipaddress
import
ip_addres
s
from
pathlib
import
Path
from
pathlib
import
Path
import
random
import
random
import
socket
import
subprocess
import
subprocess
import
sys
import
sys
import
time
import
time
...
@@ -20,107 +21,100 @@ sys.path.append(str(Path(__file__).parents[1].resolve()))
...
@@ -20,107 +21,100 @@ sys.path.append(str(Path(__file__).parents[1].resolve()))
from
envsetup
import
utils
as
u
# noqa: E402
from
envsetup
import
utils
as
u
# noqa: E402
def
check_listen
()
->
int
:
def
check_listen
()
->
tuple
:
"""
Check that Postfix is listening on 127.0.0.1:25.
"""
Check that Postfix is listening on 127.0.0.1:25.
:return: Exit return code
:return: Exit return code
s
:rtype:
int
:rtype:
tuple
"""
"""
print
(
"
Checking that postfix is listening locally:
"
)
warnings
=
0
errors
=
0
# get listening state from
netstat
# get listening state from
ss
status
,
out
=
subprocess
.
getstatusoutput
(
"
netstat
-pant | grep master | grep
'
:25
'"
)
status
,
out
=
subprocess
.
getstatusoutput
(
"
ss
-pant | grep master | grep
'
:25
'"
)
if
status
!=
0
or
"
127.0.0.1:25
"
not
in
out
:
if
status
!=
0
or
(
"
127.0.0.1:25
"
not
in
out
and
"
[::1]:25
"
not
in
out
)
:
u
.
error
(
"
Postfix is not listening on
127.0.0.1
:25
"
)
u
.
error
(
"
Postfix is not listening on
localhost
:25
"
)
return
1
errors
+=
1
u
.
success
(
"
Postfix is listening on
127.0.0.1
:25
"
)
u
.
success
(
"
Postfix is listening on
localhost
:25
"
)
return
0
return
warnings
,
errors
def
check_relay
(
conf
:
dict
)
->
int
:
def
check_relay
(
relay
:
str
,
domain
:
str
)
->
tuple
:
"""
Check that Postfix is not an open relay.
"""
Check that Postfix is not an open relay.
:param conf: EnvSetup configuration settings
:param relay: Hostname or IP address of relay host
:type conf: dict
:type relay: str
:return: Exit return code
:param domain: Domain name under which mails will be send
:rtype: int
:type domain: str
:return: Exit return codes
:rtype: tuple
"""
"""
print
(
"
Checking that SMTP relay conforms to conf:
"
)
warnings
=
0
errors
=
0
# get relayhost value from Postfix config
# get relayhost value from Postfix config
status
,
out
=
subprocess
.
getstatusoutput
(
"
grep relayhost /etc/postfix/main.cf
"
)
status
,
out
=
subprocess
.
getstatusoutput
(
"
grep relayhost /etc/postfix/main.cf
"
)
if
status
==
0
:
if
status
==
0
:
configured_relay
=
(
configured_relay
=
(
out
[
len
(
"
relayhost
"
):]
.
strip
(
"
\t
=
"
).
replace
(
"
[
"
,
""
).
replace
(
"
]
"
,
""
)
out
.
replace
(
"
relayhost
"
,
""
)
.
strip
(
"
\t
=
"
).
replace
(
"
[
"
,
""
).
replace
(
"
]
"
,
""
)
)
)
else
:
else
:
configured_relay
=
""
configured_relay
=
""
if
not
configured_relay
:
if
not
configured_relay
:
# check public ip address
ip_addr
=
conf
.
get
(
"
NETWORK_IP_NAT
"
)
or
conf
.
get
(
"
NETWORK_IP
"
)
if
not
ip_addr
:
u
.
warning
(
"
Cannot determine public IP address
"
)
return
3
# check domain origin
# check domain origin
if
os
.
path
.
exists
(
"
/etc/mailname
"
):
if
Path
(
"
/etc/mailname
"
)
.
exists
()
:
with
open
(
"
/etc/mailname
"
,
"
r
"
)
as
mailname
:
with
open
(
"
/etc/mailname
"
,
"
r
"
)
as
mailname
:
myorigin
=
mailname
.
read
().
strip
()
myorigin
=
mailname
.
read
().
strip
()
else
:
else
:
out
=
subprocess
.
getoutput
(
"
grep myorigin /etc/postfix/main.cf
"
)
out
=
subprocess
.
getoutput
(
"
grep myorigin /etc/postfix/main.cf
"
)
myorigin
=
out
.
replace
(
"
myorigin
"
,
""
).
strip
()
myorigin
=
out
.
replace
(
"
myorigin
"
,
""
).
strip
()
if
myorigin
not
in
(
"
ubicast.tv
"
,
"
ubicast.eu
"
):
# possible origin names
u
.
warning
(
"
The
\"
myorigin
\"
setting does not contain ubicast.eu or ubicast.tv
"
)
origins
=
set
(
return
3
(
domain
or
None
,
u
.
exec_cmd
(
"
hostname
"
,
log_output
=
False
)[
1
]
or
None
)
# check spf
)
result
,
_
=
spf
.
check2
(
i
=
ip_addr
,
s
=
"
support@ubicast.eu
"
,
h
=
""
)
if
myorigin
not
in
origins
:
if
result
!=
"
pass
"
:
u
.
warning
(
'"
myorigin
"
setting does not contain a valid domain
'
)
u
.
error
(
"
SPF record for {} in ubicast.eu is missing
"
.
format
(
ip_addr
))
warnings
+=
1
return
3
# get relayhost value from envsetup config
if
relay
!=
configured_relay
:
conf_relay
=
conf
.
get
(
"
EMAIL_SMTP_SERVER
"
,
""
).
replace
(
"
[
"
,
""
).
replace
(
"
]
"
,
""
)
u
.
error
(
"
STMP relay must be {}
"
.
format
(
relay
))
errors
+=
1
if
conf_relay
!=
configured_relay
:
if
not
errors
and
not
warnings
:
u
.
error
(
"
STMP relay must be {}
"
.
format
(
conf_relay
))
u
.
success
(
"
STMP relay is properly set
"
)
return
3
u
.
success
(
"
STMP relay is properly set
"
)
return
warnings
,
errors
return
0
def
check_send
_test_email
(
conf
:
dict
)
->
int
:
def
check_send
(
sender
:
str
)
->
tuple
:
"""
Check that Postfix can send email.
"""
Check that Postfix can send email.
:return: Exit return code
:param sender: Sender mail address
:rtype: int
:type sender: str
:return: Exit return codes
:rtype: tuple
"""
"""
print
(
"
Checking Postfix can send email:
"
)
warnings
=
0
errors
=
0
# check if s-nail is installed, if not set from address
sender
=
""
status
,
out
=
subprocess
.
getstatusoutput
(
"
dpkg -l s-nail | grep -E
'
^ii
'"
)
if
status
==
0
:
u
.
warning
(
"
The package
'
s-nail
'
is installed, the email sender address will be ignored.
"
)
else
:
sender
=
conf
.
get
(
"
EMAIL_SENDER
"
)
or
""
# send email
# send email
email
=
"
noreply+{}-{}@ubicast.eu
"
.
format
(
time
.
time
(),
random
.
randint
(
0
,
1000
))
email
=
"
noreply+{}-{}@ubicast.eu
"
.
format
(
time
.
time
(),
random
.
randint
(
0
,
1000
))
u
.
info
(
"
Sending test email to
'
{}
'"
.
format
(
email
))
if
sender
:
if
sender
:
u
.
info
(
"
Sender address is
'
{}
'"
.
format
(
sender
))
sender
=
"
-a
'
From: {}
'
"
.
format
(
sender
)
sender
=
"
-a
'
From: {}
'
"
.
format
(
sender
)
else
:
else
:
u
.
info
(
"
Sender address is not set
"
)
u
.
warning
(
"
Sender address is not set
"
)
cmd
=
"
echo
'
test email
'
| mail -s
'
Email used to test configuration.
'
{}{}
"
.
format
(
sender
,
email
)
warnings
+=
1
cmd
=
"
echo
'
test email
'
| mail -s
'
Email used to test configuration.
'
{}{}
"
.
format
(
sender
,
email
)
subprocess
.
getoutput
(
cmd
)
subprocess
.
getoutput
(
cmd
)
# init vars
# init vars
...
@@ -130,7 +124,7 @@ def check_send_test_email(conf: dict) -> int:
...
@@ -130,7 +124,7 @@ def check_send_test_email(conf: dict) -> int:
out
=
""
out
=
""
# find logs
# find logs
if
os
.
path
.
isfile
(
"
/var/log/mail.log
"
):
if
Path
(
"
/var/log/mail.log
"
)
.
is_file
()
:
cmd
=
"
grep
'
{}
'
/var/log/mail.log
"
.
format
(
email
)
cmd
=
"
grep
'
{}
'
/var/log/mail.log
"
.
format
(
email
)
else
:
else
:
u
.
info
(
"
/var/log/mail.log not found, trying journalctl
"
)
u
.
info
(
"
/var/log/mail.log not found, trying journalctl
"
)
...
@@ -144,23 +138,13 @@ def check_send_test_email(conf: dict) -> int:
...
@@ -144,23 +138,13 @@ def check_send_test_email(conf: dict) -> int:
if
status
==
0
:
if
status
==
0
:
out
=
out
.
strip
().
split
(
"
\n
"
)[
-
1
]
out
=
out
.
strip
().
split
(
"
\n
"
)[
-
1
]
if
"
status=deferred
"
not
in
out
:
if
"
status=deferred
"
not
in
out
:
u
.
info
(
"
Log entry found after {} seconds
"
.
format
(
waited
))
break
break
# timeout
# timeout
if
waited
>=
timeout
:
if
waited
>=
timeout
:
u
.
error
(
"
Failed to send email.
"
)
u
.
error
(
"
Failed to send email.
"
)
u
.
info
(
u
.
info
(
"
> no log entry found using command: {}
"
.
format
(
cmd
))
"
No log entry found after {} seconds using command: {}
"
.
format
(
errors
+=
1
waited
,
cmd
break
)
)
return
1
# not found yet
u
.
info
(
"
No log entry found after {} seconds, waiting {} more seconds...
"
.
format
(
waited
,
delay
)
)
# wait before next iteration
# wait before next iteration
time
.
sleep
(
delay
)
time
.
sleep
(
delay
)
waited
+=
delay
waited
+=
delay
...
@@ -169,29 +153,102 @@ def check_send_test_email(conf: dict) -> int:
...
@@ -169,29 +153,102 @@ def check_send_test_email(conf: dict) -> int:
# check output for errors
# check output for errors
if
"
bounced
"
in
out
or
"
you tried to reach does not exist
"
in
out
:
if
"
bounced
"
in
out
or
"
you tried to reach does not exist
"
in
out
:
u
.
error
(
"
Failed to send email
"
)
u
.
error
(
"
Failed to send email
"
)
u
.
info
(
"
Sending log line:
\n
{}
"
.
format
(
out
))
u
.
info
(
"
> sending log line:
\n
{}
"
.
format
(
out
))
return
1
errors
+=
1
if
not
errors
:
u
.
success
(
"
Can send email
"
)
return
warnings
,
errors
u
.
success
(
"
Email sent.
"
)
return
0
def
check_spf
(
ip_addr
:
str
,
sender
:
str
,
domain
:
str
)
->
tuple
:
"""
Check that SPF records passes.
:param ip_addr: Host ip address of server or relay
:type ip_addr: str
:param sender: Sender mail address
:type sender: str
:param domain: Domain name under which mails will be send
:type domain: str
:return: Exit return codes
:rtype: tuple
"""
warnings
=
0
errors
=
0
if
ip_address
(
ip_addr
).
is_private
:
u
.
info
(
"
{} is a private address, cannot check SPF
"
)
elif
ip_addr
and
sender
:
# check spf
result
,
_
=
spf
.
check2
(
i
=
ip_addr
,
s
=
domain
,
h
=
""
)
if
result
in
(
"
pass
"
,
"
neutral
"
):
u
.
success
(
"
SPF for {} in {}: {}
"
.
format
(
ip_addr
,
domain
,
result
))
elif
result
==
"
none
"
:
u
.
info
(
"
SPF for {} in {}: {}
"
.
format
(
ip_addr
,
domain
,
result
))
else
:
u
.
warning
(
"
SPF for {} in {}: {}
"
.
format
(
ip_addr
,
domain
,
result
))
warnings
+=
1
else
:
u
.
info
(
"
IP or sender not set, cannot check SPF
"
)
return
warnings
,
errors
def
main
():
def
main
():
"""
Run all checks and exits with corresponding exit code.
"""
"""
Run all checks and exits with corresponding exit code.
"""
if
not
os
.
path
.
exists
(
"
/etc/postfix
"
):
warnings
=
0
u
.
error
(
"
Postfix dir does not exists, please install postfix
"
)
errors
=
0
sys
.
exit
(
1
)
conf
=
u
.
load_conf
(
)
print
(
"
Checking email settings:
"
)
rcode
=
check_listen
()
if
not
Path
(
"
/etc/postfix
"
).
exists
():
if
rcode
==
0
:
u
.
info
(
"
postfix is not installed
"
)
rcode
=
check_relay
(
conf
)
exit
(
2
)
if
check_send_test_email
(
conf
)
==
1
:
rcode
=
1
sys
.
exit
(
rcode
)
# get settings
conf
=
u
.
load_conf
()
relay
=
conf
.
get
(
"
EMAIL_SMTP_SERVER
"
,
""
).
replace
(
"
[
"
,
""
).
replace
(
"
]
"
,
""
)
ip_addr
=
(
(
socket
.
gethostbyname
(
relay
)
if
relay
else
None
)
or
conf
.
get
(
"
NETWORK_IP_NAT
"
)
or
conf
.
get
(
"
NETWORK_IP
"
)
or
None
)
sender
=
conf
.
get
(
"
EMAIL_SENDER
"
)
or
None
if
not
sender
and
Path
(
"
/etc/postfix/generic
"
).
exists
():
with
open
(
"
/etc/postfix/generic
"
)
as
sender_fo
:
sender
=
sender_fo
.
readline
().
split
()[
-
1
]
domain
=
sender
.
split
(
"
@
"
)[
-
1
]
or
None
# check that we are not an open relay
check_listen_warn
,
check_listen_err
=
check_listen
()
warnings
+=
check_listen_warn
if
check_listen_warn
else
warnings
errors
+=
check_listen_err
if
check_listen_err
else
errors
# check that relayhost is correct
check_relay_warn
,
check_relay_err
=
check_relay
(
relay
,
domain
)
warnings
+=
check_relay_warn
if
check_relay_warn
else
warnings
errors
+=
check_relay_err
if
check_relay_err
else
errors
# check that we can send emails
check_send_warn
,
check_send_err
=
check_send
(
sender
)
warnings
+=
check_send_warn
if
check_send_warn
else
warnings
errors
+=
check_send_err
if
check_send_err
else
errors
# check that spf record is correct
check_spf_warn
,
check_spf_err
=
check_spf
(
ip_addr
,
sender
,
domain
)
warnings
+=
check_spf_warn
if
check_spf_warn
else
warnings
errors
+=
check_spf_err
if
check_spf_err
else
errors
if
errors
:
exit
(
1
)
elif
warnings
:
exit
(
3
)
exit
(
0
)
if
__name__
==
"
__main__
"
:
if
__name__
==
"
__main__
"
:
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment