Skip to content
Snippets Groups Projects
network.py 4.29 KiB
Newer Older
#!/usr/bin/env python3

"""EnvSetup network utilities."""

from pathlib import Path
import socket
import subprocess

from .commands import exec_cmd
from .logging import log


def add_hosts_to_localhost(hosts: list):
    """Add a list of hosts to 127.0.0.1 in /etc/hosts.

    :param hosts: List of commands
    :type hosts: list
    :raises Exception: Houston we have a problem
    """

    rc, hostname = exec_cmd("hostname")
    if rc == 0 and hostname not in hosts:
        hosts.insert(0, hostname)
    with open("/etc/hosts", "r") as fo:
        content = fo.read()
    new_content = list()
    found_127 = False
    for line in content.split("\n"):
        if not found_127 and line.startswith("127.0.0.1"):
            found_127 = True
            for host in hosts:
                if " " + host not in line:
                    line += " " + host
                    log("Adding host %s to /etc/hosts 127.0.0.1 aliases." % host)
        new_content.append(line)
    if not found_127:
        new_content.append("127.0.0.1 %s" % " ".join(hosts))
    new_content = "\n".join(new_content)
    if new_content != content:
        with open("/etc/hosts", "w") as fo:
            fo.write(new_content)
        log("/etc/hosts updated.")
    else:
        log("/etc/hosts is already up to date.")


OPENSSL_CONFIG_TEMPLATE = """
[ req ]

prompt             = no
default_bits       = 4096
default_keyfile    = envsetup.csr.pem
distinguished_name = subject
req_extensions     = req_ext
x509_extensions    = x509_ext
string_mask        = utf8only

[ subject ]

C            = FR
ST           = IDF
L            = Paris
O            = UbiCast
CN           = MediaServer
emailAddress = root@localhost

[ x509_ext ]

subjectKeyIdentifier   = hash
authorityKeyIdentifier = keyid,issuer
basicConstraints       = CA:FALSE
keyUsage               = digitalSignature, keyEncipherment
subjectAltName         = @alternate_names
nsComment              = "OpenSSL Generated Certificate"

[ req_ext ]

subjectKeyIdentifier = hash
basicConstraints     = CA:FALSE
keyUsage             = digitalSignature, keyEncipherment
subjectAltName       = @alternate_names

[ alternate_names ]

"""


def mkcert(
    domains: list,
    ecc: bool = True,
    days: int = 3650,
    config_tpl: str = OPENSSL_CONFIG_TEMPLATE,
):
    """Generate a self-signed certificate for the domains list.

    :param domains: Domains for which the certificates will be self-signed
    :type domains: list
    :param ecc: Wether to use Elliptic Curve cryptography or not, defaults to True, if Fasle RSA is used
    :type ecc: bool, optional
    :param days: Validity lifetime of the certificate, defaults to 3650
    :type days: int, optional
    :param config_tpl: OpenSSL config file template, defaults to OPENSSL_CONFIG_TEMPLATE
    :type config_tpl: str, optional
    """

    # create certs dir
    cert_dir = "/etc/ssl/envsetup"
    Path(cert_dir).mkdir(mode=0o755, parents=True, exist_ok=True)
    # populate template with domains
    for i, domain in enumerate(domains, start=1):
        config_tpl = config_tpl + "DNS.{} = {}\n".format(i, domain)
    # write openssl config file
    with open(cert_dir + "/conf", "w") as config_fh:
        config_fh.write(config_tpl)
    # key type: elliptic curve (default) or rsa
    if ecc:
        subprocess.check_call(
            ["openssl", "ecparam", "-name", "secp384r1", "-out", cert_dir + "/ecparam"]
        )
        keytype = "ec:" + cert_dir + "/ecparam"
    else:
        keytype = "rsa"
    # execute openssl to generate keypair
    subprocess.check_call(
        [
            "openssl",
            "req",
            "-config",
            cert_dir + "/conf",
            "-new",
            "-x509",
            "-sha256",
            "-nodes",
            "-newkey",
            keytype,
            "-keyout",
            cert_dir + "/key.pem",
            "-days",
            str(days),
            "-out",
            cert_dir + "/cert.pem",
        ]
    )


def get_ip() -> str:
    """Get the "primary" ip address, the one used by the default route.

    :return: IP address
    :rtype: str
    """

    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        # doesn't have to be reachable
        s.connect(("10.255.255.255", 1))
        IP = s.getsockname()[0]
    except Exception:
        IP = "127.0.0.1"
    finally:
        s.close()

    return IP