Newer
Older
#!/usr/bin/env python3
"""
A wrapper of apt module that is actually usable.
"""
import utils as u
try:
import apt
except ModuleNotFoundError:
u.warning("apt python module not found")
exit(2)
class AptInstallProgress(apt.progress.base.InstallProgress):
def fork(self):
pid = os.fork()
if pid == 0:
logfd = os.open("/tmp/envsetup-dpkg.log", os.O_RDWR | os.O_CREAT, 0o644)
os.dup2(logfd, 1)
os.dup2(logfd, 2)
return pid
# cache: apt.cache.Cache
# packages: list
# _purgeable_packages: list
# purgeable_packages: list
def __init__(self, update: bool = False):
os.environ[
"PATH"
] = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
self.cache = self.get_cache(update)
self.packages = self.get_packages()
self._installed_packages = self.get_installed_packages()
self.installed_packages = list(map(str, self._installed_packages))
self._removable_packages = self.get_removable_packages()
self.removable_packages = list(map(str, self._removable_packages))
self._purgeable_packages = self.get_purgeable_packages()
self.purgeable_packages = list(map(str, self._purgeable_packages))
self._upgradable_packages = self.get_upgradable_packages()
self.upgradable_packages = list(map(str, self._upgradable_packages))
def install(self, name: str) -> bool:
"""Install a package with APT.
:param name: Package name
:type name: str
:return: Wether installation is successful or not
:rtype: bool
"""
pkg = self.cache[name]
if not pkg.installed:
pkg.mark_install(auto_fix=False)
success = self.cache.commit(install_progress=AptInstallProgress())
self.reload()
return success
def is_installed(self, name: str) -> bool:
"""Check a package is installed.
:param name: Package name
:type name: str
:return: Wether package is installed or not
:rtype: bool
"""
if name in self.cache:
return self.cache[name].installed
def remove(self, name: str, purge: bool = False) -> bool:
"""Remove a package with APT.
:param name: Package name
:type name: str
:param purge: Wether to purge package configuration files ot not, default=False
:type purge: str
:return: Wether uninstallation is successful or not
:rtype: bool
"""
pkg = self.cache[name]
if pkg.installed:
pkg.mark_delete(auto_fix=False, purge=purge)
success = self.cache.commit(install_progress=AptInstallProgress())
self.reload()
return success
def purge(self, name: str) -> bool:
"""Purge a package with APT.
:param name: Package name
:type name: str
:return: Wether uninstallation is successful or not
:rtype: bool
"""
return self.remove(name, purge=True)
def reload(self):
"""Reload object."""
self.cache.clear()
self.__init__()
def get_cache(self, update: bool = False) -> apt.cache.Cache:
"""Get an eventually updated Cache object.
:param update: Wether to update cache or not, default=False
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
:type update: bool
:return: An APT Cache object
:rtype: apt.cache.Cache
"""
apt_cache = apt.cache.Cache()
if update:
apt_cache.update()
apt_cache.open()
return apt_cache
def get_packages(self) -> list:
"""Get packages list.
:return: Packages list
:rtype: list
"""
packages = list(self.cache)
return packages
def get_installed_packages(self) -> list:
"""Get installed packages list.
:return: Installed packages list
:rtype: list
"""
_installed_packages = [p for p in self.packages if p.is_installed]
def get_removable_packages(self) -> list:
"""Get auto-removable packages list.
:return: Auto-removable packages list
:rtype: list
"""
_removable_packages = [
p for p in self._installed_packages if p.is_auto_removable
]
def get_purgeable_packages(self) -> list:
"""Get auto-removable packages list.
:return: Auto-removable packages list
:rtype: list
"""
_removable_packages = [
p for p in self.packages if not p.is_installed and p.has_config_files
]
return _removable_packages
def get_upgradable_packages(self) -> list:
"""Get upgradable packages list.
:return: Upgradable packages list
:rtype: list
"""
_upgradable_packages = [p for p in self._installed_packages if p.is_upgradable]