import subprocess from typing import List from AbstractModule import AbstractModule from PyInquirer import prompt def _run_shell_command(cmd: List[str]) -> str: return subprocess.check_output(cmd).decode("UTF-8").strip() class SwapModule(AbstractModule): def __init__(self): super().__init__() def run(self): actions = { "Create/Resize temp. swap": self._create_resize_swap, "Create/Resize persistent swap": self._create_resize_persistent_swap, "Delete swap": self._delete_swap, "Make temp. swap persistent": self._make_swap_persistent, "Get swap location": self._get_swap_location, "Get swap size": self._get_swap_size, "Show swapiness": self._show_swapiness, "Adjust temp. swapiness": self._adjust_swapiness_temp, "Check fstab for entry": self._fstab_entry_exists } menu = [ { 'type': 'list', 'message': 'Select action', 'name': 'action', 'choices': list(map(lambda x: {"name": x}, actions.keys())) } ] selected_action = prompt(menu)['action'] actions[selected_action]() def _create_resize_persistent_swap(self): self._create_resize_swap() self._make_swap_persistent() def _get_swap_location(self): output_swaps = _run_shell_command(["cat", "/proc/swaps"]) try: swap_location = output_swaps.split()[5] print("Swap is located here:", swap_location) return swap_location except IndexError: print("Swap file doesn´t exist!") return None def _get_swap_size(self): swap_size = _run_shell_command(['swapon', '--show']) try: swap_size = swap_size.split()[7] print("Swap is {}b big!".format(swap_size)) except IndexError: print("Swap file doesn´t exist!") def _create_resize_swap(self): output_swapon = _run_shell_command(['swapon', '--show']) if output_swapon is not "": swap_size = output_swapon.split()[7] print("") print("Swap already installed! You can resize it!") print("Curr. swap size: {}b".format(swap_size)) resize = input("How big should your swap become (numbers in Gigabyte)? ") resize_swapfile = "sudo swapoff /swapfile && " + \ "sudo fallocate -l {}G /swapfile && ".format(resize) + \ "sudo mkswap /swapfile && " + \ "sudo swapon /swapfile" subprocess.call(resize_swapfile, shell=True) output_free = _run_shell_command(["free", "-h"]) print(output_free.strip()) else: size = input("How big should the swap be (numbers in Gigabyte)? ") create_swapfile = "sudo fallocate -l {}G /swapfile && ".format(size) + \ "sudo chmod 600 /swapfile && " + \ "sudo mkswap /swapfile && " + \ "sudo swapon /swapfile" subprocess.call(create_swapfile, shell=True) output_free = _run_shell_command(["free", "-h"]) print(output_free.strip()) def _make_swap_persistent(self): swap_location = self._get_swap_location() if swap_location is None: print("Swap file doesn't exist!") return backup_fstab = "sudo cp /etc/fstab /etc/fstab.bak" enable_persistence = "echo '{} none swap sw 0 0' | sudo tee -a /etc/fstab".format(swap_location) if self._fstab_entry_exists(): print("Swap is already persistent!") else: subprocess.call(backup_fstab, shell=True) subprocess.call(enable_persistence, shell=True) print("Swap is now persistent!") def _show_swapiness(self): print(_run_shell_command(["cat", "/proc/sys/vm/swappiness"])) def _adjust_swapiness_temp(self): options = { "Light": 25, "Default": 60, "Aggressive": 100 } menu = [ { 'type': 'list', 'message': 'Select action', 'name': 'action', 'choices': list(map(lambda x: {"name": x}, options.keys())) } ] selected_swapiness = prompt(menu)['action'] adjust = "sudo sysctl vm.swappiness=" + str(options[selected_swapiness]) subprocess.call(adjust, shell=True) print("Temporary swapiness is " + str(options[selected_swapiness])) def _delete_swap(self): swap_location = self._get_swap_location() if swap_location is None: return disable_swapfile = "sudo swapoff {} && ".format(swap_location) + \ "sudo rm {}".format(swap_location) if self._fstab_entry_exists(): with open("/etc/fstab", "r") as fstab_out: content = fstab_out.readlines() with open("/etc/fstab", "w") as fstab_in: content = content[:-1] for line in content: fstab_in.write(line) else: print("No entry in /etc/fstab!") subprocess.call(disable_swapfile, shell=True) output_swapon = _run_shell_command(['swapon', '--show']) output_free = _run_shell_command(["free", "-h"]) if not output_swapon: print("Swap deleted!") print(output_free.strip()) def _fstab_entry_exists(self): swap_location = self._get_swap_location() fstab_entry = "{} none swap sw 0 0\n".format(swap_location) with open("/etc/fstab", "r") as fstab_file: line = fstab_file.readlines()[-1] if line != fstab_entry: print("No entry in /etc/fstab") return False else: print("fstab entry:", line.strip()) return True