Source code for copy_guardian.copy_utils
import os
import subprocess
def _expand_key(private_key):
if private_key is None:
private_key = "id_rsa"
if os.path.exists(private_key):
return os.path.abspath(private_key)
if "/" not in private_key:
if private_key == "":
private_key = "id_rsa"
private_key_system = os.path.join(os.path.expanduser("~"), ".ssh", private_key)
if os.path.exists(private_key_system):
return os.path.abspath(private_key_system)
raise ValueError(f"can not find {private_key}")
def _ssh_command(user, private_key, host, port):
ssh = (
f"ssh -o 'StrictHostKeyChecking=no' -i {private_key} -o 'BatchMode=yes'"
f" -p {port} {user}@{host}"
)
return ssh
def _rsync_to(ssh_command, host, source, target=""):
cmd = [
"rsync",
"-rav",
"-e",
ssh_command,
source,
f":{target}",
]
subprocess.run(cmd)
def _rsync_from(ssh_command, host, source, target=""):
cmd = [
"rsync",
"-rav",
"-e",
ssh_command,
f":{source}",
target,
]
subprocess.run(cmd)
[docs]def copy_local_folder(source: str, target: str):
"""
copies a folder with all files locally. this function is optimized for many
files, especially on an hpc system.
:param source: source folder
:param target: target folder, the basename of ``source`` will be a subfolder in
``target``.
"""
if not os.path.exists(source):
raise IOError(f"source folder {source} does not exist.")
if not os.path.exists(target):
raise IOError(f"source folder {target} does not exist.")
if not os.path.isdir(source):
raise IOError(f"source folder {source} is not a folder.")
cmd = f"tar cf - {os.path.basename(source)} | {{ cd {target}; tar xf -; }}"
subprocess.run(cmd, shell=True, cwd=os.path.dirname(source))