D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
saltstack
/
salt
/
lib
/
python3.10
/
site-packages
/
salt
/
utils
/
Filename :
url.py
back
Copy
""" URL utils """ import re import sys from urllib.parse import urlparse, urlunparse import salt.utils.data import salt.utils.path import salt.utils.platform import salt.utils.versions def parse(url): """ Parse a salt:// URL; return the path and a possible saltenv query. """ if not url.startswith("salt://"): return url, None # urlparse will split on valid filename chars such as '?' and '&' resource = url.split("salt://", 1)[-1] if "?env=" in resource: # "env" is not supported; Use "saltenv". path, saltenv = resource.split("?env=", 1)[0], None elif "?saltenv=" in resource: path, saltenv = resource.split("?saltenv=", 1) else: path, saltenv = resource, None if salt.utils.platform.is_windows(): path = salt.utils.path.sanitize_win_path(path) return path, saltenv def create(path, saltenv=None): """ join `path` and `saltenv` into a 'salt://' URL. """ path = path.replace("\\", "/") if salt.utils.platform.is_windows(): path = salt.utils.path.sanitize_win_path(path) path = salt.utils.data.decode(path) query = "saltenv={}".format(saltenv) if saltenv else "" url = salt.utils.data.decode(urlunparse(("file", "", path, "", query, ""))) return "salt://{}".format(url[len("file:///") :]) def is_escaped(url): """ test whether `url` is escaped with `|` """ scheme = urlparse(url).scheme if not scheme: return url.startswith("|") elif scheme == "salt": path, saltenv = parse(url) if salt.utils.platform.is_windows() and "|" in url: return path.startswith("_") else: return path.startswith("|") else: return False def escape(url): """ add escape character `|` to `url` """ if salt.utils.platform.is_windows(): return url scheme = urlparse(url).scheme if not scheme: if url.startswith("|"): return url else: return "|{}".format(url) elif scheme == "salt": path, saltenv = parse(url) if path.startswith("|"): return create(path, saltenv) else: return create("|{}".format(path), saltenv) else: return url def unescape(url): """ remove escape character `|` from `url` """ scheme = urlparse(url).scheme if not scheme: return url.lstrip("|") elif scheme == "salt": path, saltenv = parse(url) if salt.utils.platform.is_windows() and "|" in url: return create(path.lstrip("_"), saltenv) else: return create(path.lstrip("|"), saltenv) else: return url def add_env(url, saltenv): """ append `saltenv` to `url` as a query parameter to a 'salt://' url """ if not url.startswith("salt://"): return url path, senv = parse(url) return create(path, saltenv) def split_env(url): """ remove the saltenv query parameter from a 'salt://' url """ if not url.startswith("salt://"): return url, None path, senv = parse(url) return create(path), senv def validate(url, protos): """ Return true if the passed URL scheme is in the list of accepted protos """ if urlparse(url).scheme in protos: return True return False def strip_proto(url): """ Return a copy of the string with the protocol designation stripped, if one was present. """ return re.sub("^[^:/]+://", "", url) def add_http_basic_auth(url, user=None, password=None, https_only=False): """ Return a string with http basic auth incorporated into it """ if user is None and password is None: return url else: urltuple = urlparse(url) if https_only and urltuple.scheme != "https": raise ValueError("Basic Auth only supported for HTTPS") if password is None: netloc = "{}@{}".format(user, urltuple.netloc) urltuple = urltuple._replace(netloc=netloc) return urlunparse(urltuple) else: netloc = "{}:{}@{}".format(user, password, urltuple.netloc) urltuple = urltuple._replace(netloc=netloc) return urlunparse(urltuple) def redact_http_basic_auth(output): """ Remove HTTP user and password """ # We can't use re.compile because re.compile(someregex).sub() doesn't # support flags even in Python 2.7. url_re = "(https?)://.*@" redacted = r"\1://<redacted>@" if sys.version_info >= (2, 7): # re.sub() supports flags as of 2.7, use this to do a case-insensitive # match. return re.sub(url_re, redacted, output, flags=re.IGNORECASE) else: # We're on python 2.6, test if a lowercased version of the output # string matches the regex... if re.search(url_re, output.lower()): # ... and if it does, perform the regex substitution. return re.sub(url_re, redacted, output.lower()) # No match, just return the original string return output