D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
alt
/
python37
/
lib
/
python3.7
/
site-packages
/
clwizard
/
Filename :
utils.py
back
Copy
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # from __future__ import absolute_import import os import subprocess import logging from contextlib import contextmanager from typing import Iterable # NOQA from .constants import CRASH_LOG_PATH from .exceptions import PackageMissingError import psutil @contextmanager def atomic_write(filepath, fsync=True): """ Writeable file object that atomically updates a file (using a temporary file). :param filepath: the file path to be opened :param fsync: whether to force write the file to disk """ tmppath = filepath + '.tmp' try: with open(tmppath, 'w') as f: yield f if fsync: f.flush() os.fsync(f.fileno()) os.rename(tmppath, filepath) finally: try: os.remove(tmppath) except (IOError, OSError): pass def is_background_process_running(): """ Detects if the process installing modules is running in the background. """ def _is_wizard_cmdline(cmdline): # type: (Iterable[str]) -> bool return ( {'/usr/sbin/cloudlinux-wizard', 'install', '--no-async'} <= set(cmdline) or {'/sbin/cloudlinux-wizard', 'install', '--no-async'} <= set(cmdline) ) for proc in psutil.process_iter(): # cmdline example: # ['/opt/cloudlinux/venv/bin/python3', # 'usr/sbin/cloudlinux-wizard', # '--json', 'install', '--json-data', '{json obj}'] try: cmdline = list(proc.cmdline()) except psutil.NoSuchProcess: continue if _is_wizard_cmdline(cmdline): return True return False def run_background(cmd): fnull = open(os.devnull, 'w') log_crash = open(CRASH_LOG_PATH, 'w') return subprocess.Popen( cmd, stdin=fnull, stdout=log_crash, stderr=log_crash ) def installed_interpreters_list(interpreter): try: from clselect.clselectctl import interpreter_versions_short_summary except ImportError: raise PackageMissingError('lvemanager') return interpreter_versions_short_summary(interpreter) def convert_package_version(version, version_size): """ Format version string e.g: 3.6.0 -> 3.6 if version size = 2 9.11.3 -> 9 if version size = 1 :param version given version to convert :param version_size expected size of result version string """ return '.'.join(version.split('.')[:version_size]) def setup_logger(logger_name, log_file): """ Logger setup for all modules :return: """ app_logger = logging.getLogger(logger_name) app_logger.setLevel(logging.INFO) try: fh = logging.FileHandler(log_file) except IOError: pass else: fh.formatter = logging.Formatter( '[%(levelname)s | %(asctime)s]: %(message)s') app_logger.addHandler(fh) return app_logger