Source code for okra.repo_mgmt

""" GitHub Repo Managment

Related to downloading and updating GitHub repos. See
the 'assn1' script in bin/assn1 to handle get and update
features.
"""
import logging
import os
import subprocess
from urllib.parse import urljoin

from okra.error_handling import DirectoryNotCreatedError, NetworkError

logger = logging.getLogger(__name__)


[docs]def read_repos(fpath: str) -> list: """ Read list of repos from disk """ try: with open(fpath, "r") as infile: data = infile.readlines() data = [i.strip() for i in data] return data except FileNotFoundError: logger.error("File not found: {}".format(fpath)) return []
[docs]def create_parent_dir(repo_name: str, dirpath: str) -> bool: """ Create parent directory before cloning repo. https://github.com/tbonza/EDS19/issues/8 """ parent_dir = repo_name.split("/")[0] ppath = urljoin(dirpath,parent_dir) if os.path.exists(ppath): return True res = subprocess.run(["mkdir", ppath], stdout=subprocess.PIPE, stderr=subprocess.PIPE) if res.returncode == 0 and os.path.exists(ppath): return True else: logger.error(res.stderr) return False
[docs]def gcloud_clone_or_fetch_repo(repo_name: str, ssh=False) -> bool: """ Clone or fetch updates from git repo GCloud operations only work on one repository at a time so we don't have to use a parent directory. :param repo_name: '<repo owner>/<repo name>' :param repo_path: local path to the git repo :return: current git repo :rtype: None, file written to disk """ cache = os.getenv("CACHE") repo_path = urljoin(cache, repo_name) if os.path.exists(repo_path): return update_repo(repo_name, cache) return clone_repo(repo_name, cache, ssh)
[docs]def clone_repo(repo_name: str, dirpath: str, ssh=False) -> bool: """ Clone GitHub repo. """ try: if ssh: repo_path = "git@github.com:{}.git".format(repo_name) else: repo_path = "https://github.com/{}.git".format(repo_name) rpath = urljoin(dirpath, repo_name) res = subprocess.run(["git", "clone", repo_path, rpath], stdout=subprocess.PIPE, stderr=subprocess.PIPE) if res.returncode == 0 and os.path.exists(rpath): return True raise NetworkError( expression = res.stderr, message = "Issue with git clone" ) except Exception as exc: logger.error("Issue with git clone: {}".format(res.stderr)) raise exc
[docs]def update_repo(repo_name: str, dirpath: str) -> bool: """ Update repo with new code. """ c1 = ["git", "fetch"] rpath = urljoin(dirpath, repo_name) res = subprocess.run(c1, cwd=rpath, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if res.returncode == 0: return True else: logger.error(res.stderr) return False
[docs]def compress_repo(repo_name: str, cache: str, repo_comp: str) -> bool: """ Compress repo for upload. :param repo_name: git repo name with owner included; tensorflow/tensorflow :param dirpath: directory path to git repo :return: creates a compressed file of github repo :rtype: True if git repo successfully compressed """ c1 = ["tar", "-zcf", repo_comp, repo_name] res = subprocess.run(c1, cwd=cache, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if res.returncode == 0: return True else: logger.error(res.stderr) return False
[docs]def decompress_repo(repo_comp: str, cache) -> bool: """ Decompress repo to a directory. :param repo_name: git repo name with owner included; tensorflow/tensorflow :param dirpath: directory path to place uncompressed file with repo owner :param filepath: path to file to be decompressed :return: Uncompresses file and writes 'git_owner_name/git_repo_name' to the specified directory. :rtype: boolean :raises: :class:`okra.error_handling.DirectoryNotCreatedError` """ c2 = ["tar", "-zxf", repo_comp, "-C", cache] res = subprocess.run(c2, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if res.returncode == 0: return True else: logger.error(res.stderr) return False