Skip to content
Snippets Groups Projects
test_partitions.py 4.68 KiB
Newer Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2017, Florent Thiery
'''
Criticality: Normal
Checks that partitions are in conformity with expected norms, and that sufficient free space is available
'''
import subprocess
import sys
import os

BS = 512
YELLOW = '\033[93m'
GREEN = '\033[92m'
RED = '\033[91m'
DEF = '\033[0m'


def to_gbytes(size_bytes):
Stéphane Diemer's avatar
Stéphane Diemer committed
    return int(round(size_bytes / (1024 * 1024 * 1024)))


def read_file(fname):
    with open(fname, 'r') as f:
        return f.read()

paths = [
    {
        'mount_point': '/',
        'recommended_types': ('ext4',),
        'min_size_gbytes': 9,
        'reco_size_gbytes': 14,
        'min_available_gbytes': 4,
    },
    {
        'mount_point': '/home/msuser/msinstance',
        'recommended_types': ('ext4', 'nfs', 'nfs4'),
        'min_size_gbytes': 5,
        'reco_size_gbytes': 300,
        'min_available_gbytes': 5,
    {
        'mount_point': '/home/skyreach',
        'recommended_types': ('ext4', 'nfs', 'nfs4'),
        'min_size_gbytes': 5,
        'reco_size_gbytes': 9,
        'min_available_gbytes': 2,
    {
        'type': 'swap',
        'min_size_gbytes': 1,
        'reco_size_gbytes': 2,
Stéphane Diemer's avatar
Stéphane Diemer committed

def get_swap_gbytes():
    d = read_file('/proc/meminfo')
    for l in d.split('\n'):
        if 'SwapTotal'in l:
            swap = l.split('SwapTotal:')[1].strip()
            swap_kbytes, unit = swap.split(' ')
            if unit != 'kB':
                print('Warning, unexpected unit %s.' % unit)
Stéphane Diemer's avatar
Stéphane Diemer committed
            swap_gbytes = int(round(int(swap_kbytes) / (1024 * 1024)))
            return swap_gbytes

Stéphane Diemer's avatar
Stéphane Diemer committed

def get_path(path):
    # Filesystem     Type   1B-blocks        Used   Available
    # /dev/loop2     ext4 52710469632 38253940736 11755397120
    status, output = subprocess.getstatusoutput('df --output="source,fstype,size,avail" -B 1 %s | tail -n 1' % path)
    if status == 0:
        dev, fstype, size, available = output.split()
        dev = fstype = size = available = None
    return dev, fstype, to_gbytes(int(size)), to_gbytes(int(available))
Stéphane Diemer's avatar
Stéphane Diemer committed

def check_allocation(dev):
    root_dev = os.path.basename(dev)[:3]
    if not root_dev:
        return True
    d = read_file('/proc/partitions')
    dev_partitions = list()
    for l in d.split('\n'):
        if root_dev in l:
            dev_partitions.append(l)

    max_size = 0
    total_size = 0
    for p in dev_partitions:
        major, minor, blocks, name = p.split()
        size = int(blocks) * 512
        if name == root_dev:
            max_size = size
        else:
            total_size += size
    unallocated = max_size - total_size
    unallocated_gbytes = to_gbytes(unallocated)
    if unallocated_gbytes > 1:
        print('Warning, %s GB are unallocated on %s.' % (unallocated_gbytes, root_dev))
        return False
    return True

error = False
warning = False

Stéphane Diemer's avatar
Stéphane Diemer committed

def check_path(p):
    global error
    global warning

    psize = None
    mount_point = p.get('mount_point')
    if mount_point:
        if os.path.exists(mount_point):
            mount_point = os.path.realpath(mount_point)
Stéphane Diemer's avatar
Stéphane Diemer committed
            name = mount_point
            dev, fstype, psize, available = get_path(mount_point)
            if fstype not in p.get('recommended_types'):
                print('Warning, partition of %s fs type not recommended %s(current: %s, recommended: %s)%s.' % (name, YELLOW, fstype, p['recommended_types'], DEF))
                warning = True
            if 'nfs' not in fstype:
                warning = not check_allocation(dev)
            min_available_gbytes = p.get('min_available_gbytes')
            if min_available_gbytes and available < min_available_gbytes:
                print('Partition of %s has less than %s GB available %s(%s GB available)%s.' % (name, min_available_gbytes, RED, available, DEF))
                error = True
                print('Partition of %s has %s GB available.' % (name, available))
            print('%s not found, cannot check.' % mount_point)
    elif p.get('type') == 'swap':
        name = 'swap'
Stéphane Diemer's avatar
Stéphane Diemer committed
        psize = get_swap_gbytes()
    if psize:
        if psize < p['min_size_gbytes']:
            print('Partition of %s is smaller than the minimum required size %s(%s GB < %s GB)%s.' % (name, RED, psize, p['min_size_gbytes'], DEF))
            error = True
        elif psize < p['reco_size_gbytes']:
            print('Partition of %s is smaller than the recommended size %s(%s GB < %s GB)%s.' % (name, YELLOW, psize, p['reco_size_gbytes'], DEF))
            warning = True
for p in paths:
    check_path(p)

if error:
    print('Errors found.')
elif warning:
    print('Some warnings were found.')
    print(GREEN + 'All OK.' + DEF)

sys.exit(code)