bash-stuff/python3-pkg-rabbitvcs/git-gittyup/client.py

2214 lines
74 KiB
Python

from __future__ import absolute_import
from __future__ import print_function
#
# client.py
#
import os, errno
import os.path
import re
import shutil
import fnmatch
import time
import struct
from datetime import datetime
from mimetypes import guess_type
import time
import subprocess
import dulwich.errors
import dulwich.repo
import dulwich.porcelain
import dulwich.objects
from dulwich.index import write_index_dict, SHA1Writer
#from dulwich.patch import write_tree_diff
from .exceptions import *
from . import util
from .objects import *
from .command import GittyupCommand
from rabbitvcs.util import helper
from rabbitvcs.util.strings import *
import six.moves.tkinter
import six.moves.tkinter_messagebox
import six
ENCODING = "UTF-8"
RE_STATUS = re.compile("^([\sA-Z\?]+)\s(?:\S+\s->\s)?(.*?)$")
def callback_notify_null(val):
pass
def callback_get_user():
from pwd import getpwuid
pwuid = getpwuid(os.getuid())
user = pwuid[0]
fullname = pwuid[4]
host = os.getenv("HOSTNAME")
return (fullname, "%s@%s" % (user, host))
def callback_get_cancel():
return False
def mkdir_p(path):
# http://stackoverflow.com/questions/600268/mkdir-p-functionality-in-python
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else: raise
def get_tmp_path(filename):
tmpdir = "/tmp/rabbitvcs"
mkdir_p(tmpdir)
return os.path.join(tmpdir, filename)
class GittyupClient(object):
UTF8 = UTF8_ENCODING
def __init__(self, path=None, create=False):
self.callback_notify = callback_notify_null
self.callback_progress_update = None
self.callback_get_user = callback_get_user
self.callback_get_cancel = callback_get_cancel
self.global_ignore_patterns = []
self.git_version = None
self.numberOfCommandStages = 0
self.numberOfCommandStagesExecuted = 0
if path:
try:
self.repo = dulwich.repo.Repo(path)
self._load_config()
self.global_ignore_patterns = self._get_global_ignore_patterns()
except dulwich.errors.NotGitRepository:
if create:
self.initialize_repository(path)
self.global_ignore_patterns = self._get_global_ignore_patterns()
else:
raise NotRepositoryError()
else:
self.repo = None
#
# Start Private Methods
#
def _initialize_index(self):
index_path = self.repo.index_path()
f = open(index_path, "wb")
try:
f = SHA1Writer(f)
write_index_dict(f, {})
except:
pass
f.close()
def _get_index(self):
if not self.repo.has_index():
self._initialize_index()
return self.repo.open_index()
def _get_tree_at_head(self):
try:
tree = self.repo[self.repo[self.repo.head()].tree]
except KeyError as e:
tree = dulwich.objects.Tree()
return tree
def _get_tree_from_sha1(self, sha1):
return self.repo[self.repo[sha1].tree]
def _get_tree_index(self, tree=None):
if tree is None:
tree = self._get_tree_at_head()
tree_index = {}
if tree:
for item in self.repo.object_store.iter_tree_contents(tree.id):
tree_index[item[0].decode(self.UTF8)] = (item[1], item[2].decode(self.UTF8))
return tree_index
def _get_git_version(self):
"""
Gets the local git version
"""
if self.git_version:
return self.git_version
else:
try:
proc = subprocess.Popen(["git", "--version"],
stdout=subprocess.PIPE,
universal_newlines=True)
response = proc.communicate()[0].split()
version = [int(x) for x in response[2].split(".")]
self.git_version = version
return self.git_version
except Exception as e:
return None
def _get_global_ignore_patterns(self):
"""
Get ignore patterns from $GIT_DIR/info/exclude then from
core.excludesfile in gitconfig.
"""
patterns = []
files = self.get_global_ignore_files()
for path in files:
patterns += self.get_ignore_patterns_from_file(path)
return patterns
def get_global_ignore_files(self):
"""
Returns a list of ignore files possible for this repository
"""
try:
git_dir = os.environ["GIT_DIR"]
except KeyError:
git_dir = os.path.join(self.repo.path, ".git")
files = []
excludefile = os.path.join(git_dir, "info", "exclude")
files.append(excludefile)
try:
core_excludesfile = self._config_get(("core", ), "excludesfile")
if core_excludesfile:
files.append(core_excludesfile)
except KeyError:
pass
return files
def get_local_ignore_file(self, path):
if not os.path.exists(path):
return []
if os.path.isfile(path):
path = os.path.basename(path)
return os.path.join(path, ".gitignore")
def get_ignore_patterns_from_file(self, path):
"""
Read in an ignore patterns file (i.e. .gitignore, $GIT_DIR/info/exclude)
and return a list of patterns
"""
patterns = []
if os.path.isfile(path):
file = open(path, "r")
try:
for line in file:
if line == "" or line.startswith("#"):
continue
patterns.append(line.rstrip("\n"))
except:
pass
file.close()
return patterns
def get_local_config_file(self):
try:
git_dir = os.environ["GIT_DIR"]
except KeyError:
git_dir = os.path.join(self.repo.path, ".git")
return git_dir + "/config"
def _ignore_file(self, patterns, filename):
"""
Determine whether the given file should be ignored
"""
for pattern in patterns:
if fnmatch.fnmatch(filename, pattern) and not pattern.startswith("!"):
return True
return False
def _read_directory_tree(self, path, show_ignored_files=False):
files = []
directories = []
for root, dirs, filenames in os.walk(path, topdown=True):
try:
dirs.remove(".git")
removed_git_dir = True
except ValueError:
pass
# Find the relative root path of this folder
if root == self.repo.path:
rel_root = ""
else:
rel_root = self.get_relative_path(root)
for filename in filenames:
files.append(os.path.join(rel_root, filename))
for _d in dirs:
directories.append(os.path.join(rel_root, _d))
directories.append(rel_root)
#Remove duplicates in list
directories=list(set(directories))
return (sorted(files), directories)
def _get_blob_from_file(self, path):
file = open(path, "rb")
try:
blob = dulwich.objects.Blob.from_string(file.read())
finally:
file.close()
return blob
def _write_blob_to_file(self, path, blob):
dirname = os.path.dirname(path)
if not os.path.isdir(dirname):
os.makedirs(dirname)
file = open(path, "wb")
try:
file.write(blob.data)
finally:
file.close()
def _load_config(self):
self.config = self.repo.get_config()
self.glb_config = self.repo.get_config_stack()
def _config_normalize_section(self, section):
# If some old code is using string sections, convert to a tuple
if isinstance(section, six.string_types):
parts = section.split(" ")
s1 = parts.pop(0)
s2 = " ".join(parts).replace('"', "")
section = (s1, s2)
return section
def _config_set(self, section, key, value):
section = self._config_normalize_section(section)
return self.config.set(section, key, value)
def _config_get(self, section, key):
section = self._config_normalize_section(section)
return self.config.get(section, key)
def _config_get_glb(self, section, key):
section = self._config_normalize_section(section)
return self.glb_config.get(section, key)
def _get_config_user(self):
try:
config_user_name = S(self._config_get(("user", ), "name"))
config_user_email = S(self._config_get(("user", ), "email"))
if config_user_name == "" or config_user_email == "":
raise KeyError()
except KeyError:
(config_user_name, config_user_email) = self.callback_get_user()
if config_user_name == None and config_user_email == None:
return None
self._config_set(("user", ), "name", config_user_name)
self._config_set(("user", ), "email", config_user_email)
self.config.write_to_path()
return "%s <%s>" % (config_user_name, config_user_email)
def string_unescape(self, s):
# Portable utf-8 string unescape.
if isinstance(s, six.text_type):
s = s.encode(IDENTITY_ENCODING)
s = S(s.decode("unicode_escape"), IDENTITY_ENCODING)
return S(s.bytes(IDENTITY_ENCODING))
#
# Start Public Methods
#
def initialize_repository(self, path, bare=False):
mkdir_p(path)
if bare:
dulwich.repo.Repo.init_bare(path)
else:
dulwich.repo.Repo.init(path)
self.set_repository(path)
def set_repository(self, path):
try:
self.repo = dulwich.repo.Repo(path)
self._load_config()
except dulwich.errors.NotGitRepository:
raise NotRepositoryError()
def get_repository(self):
return self.repo.path
def find_repository_path(self, path):
path_to_check = S(path)
while path_to_check != "/" and path_to_check != "":
if os.path.isdir(os.path.join(path_to_check, ".git")):
return path_to_check
path_to_check = os.path.split(path_to_check)[0]
return None
def get_relative_path(self, path):
path = S(path)
if path == self.repo.path:
return "."
return util.relativepath(self.repo.path, path)
def get_absolute_path(self, path):
path = S(path)
if path == ".":
return self.repo.path
return os.path.join(self.repo.path, path).rstrip("/")
def track(self, name):
self.repo.refs.set_symbolic_ref(b"HEAD", name)
def is_tracking(self, name):
return (self.repo.refs.read_ref(b"HEAD")[5:] == name)
def tracking(self):
return self.repo.refs.read_ref(b"HEAD")[5:]
def head(self):
return self.repo.refs[b"HEAD"]
def stage(self, paths):
"""
Stage files to be committed or tracked
@type paths: list
@param paths: A list of files
"""
index = self._get_index()
to_stage = []
if isinstance(paths, (str, six.text_type)):
paths = [paths]
for path in paths:
relative_path = self.get_relative_path(path)
absolute_path = self.get_absolute_path(path)
self.notify({
"action": "Staged",
"path": absolute_path,
"mime_type": guess_type(absolute_path)[0]
})
to_stage.append(S(relative_path))
self.repo.stage(to_stage)
def stage_all(self):
"""
Stage all files in a repository to be committed or tracked
"""
index = self._get_index()
for status in self.status():
if status in [AddedStatus, RemovedStatus, ModifiedStatus]:
abs_path = self.get_absolute_path(status.path)
relative_path = self.get_relative_path(status.path)
if os.path.isfile(abs_path):
self.stage(relative_path)
if status == MissingStatus:
del index[status.path]
index.write()
def unstage(self, paths):
"""
Unstage files so they are not committed or tracked
@type paths: list
@param paths: A list of files
"""
index = self._get_index()
tree = self._get_tree_index()
if isinstance(paths, (str, six.text_type)):
paths = [paths]
for path in paths:
relative_path = S(self.get_relative_path(path)).bytes()
if relative_path in index:
if relative_path in tree:
(ctime, mtime, dev, ino, mode, uid, gid, size, blob_id, flags) = index[relative_path]
(mode, blob_id) = tree[relative_path]
# If the file is locally modified, set these vars to 0
# I'm not sure yet why this needs to happen, but it does
# in order for the file to appear modified and not normal
blob = self._get_blob_from_file(path)
if blob.id != blob_id:
ctime = 0
mtime = 0
dev = 0
ino = 0
uid = 0
gid = 0
size = 0
index[relative_path] = (ctime, mtime, dev, ino, mode, uid, gid, size, blob_id, flags)
else:
del index[relative_path]
else:
if relative_path in tree:
index[relative_path] = (0, 0, 0, 0, tree[relative_path][0], 0, 0, 0, tree[relative_path][1], 0)
self.notify({
"action": "Unstaged",
"path": path,
"mime_type": guess_type(path)[0]
})
index.write()
def unstage_all(self):
"""
Unstage all files so they are not committed or tracked
@type paths: list
@param paths: A list of files
"""
index = self._get_index()
for status in self.status():
abs_path = self.get_absolute_path(status.path)
if os.path.isfile(abs_path):
self.unstage(abs_path)
def get_staged(self):
"""
Gets a list of files that are staged
"""
staged = []
tree = self._get_tree_at_head()
index = self._get_index()
if len(tree) > 0:
for item in index.changes_from_tree(self.repo.object_store, tree.id):
((old_name, new_name), (old_mode, new_mode), (old_sha, new_sha)) = item
if new_name:
staged.append(new_name)
if old_name and old_name != new_name:
staged.append(old_name)
else:
for path in index:
staged.append(path)
return staged
def is_staged(self, path, staged_files=None):
"""
Determines if the specified path is staged
@type path: string
@param path: A file path
@rtype boolean
"""
if not staged_files:
staged_files = self.get_staged()
relative_path = self.get_relative_path(path)
return (relative_path in staged_files)
def branch(self, name, commit_sha=None, track=False):
"""
Create a new branch
@type name: string
@param name: The name of the new branch
@type commit_sha: string
@param commit_sha: A commit sha to branch from. If None, branches
from head
@type track: boolean
@param track: Whether or not to track the new branch, or just create it
"""
cmd = ["git", "branch"]
if track:
cmd.append("-t")
if commit_sha is None:
commit_sha = self.repo.head()
cmd += [name, commit_sha]
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
def branch_delete(self, name):
"""
Delete a branch
@type name: string
@param name: The name of the branch
"""
ref_name = "refs/heads/%s" % name
refs = self.repo.get_refs()
if ref_name in refs:
if self.is_tracking(ref_name):
self.track("refs/heads/master")
del self.repo.refs[ref_name]
def branch_rename(self, old_name, new_name):
"""
Rename a branch
@type old_name: string
@param old_name: The name of the branch to be renamed
@type new_name: string
@param new_name: The name of the new branch
"""
old_ref_name = "refs/heads/%s" % old_name
new_ref_name = "refs/heads/%s" % new_name
refs = self.repo.get_refs()
if old_ref_name in refs:
self.repo.refs[new_ref_name] = self.repo.refs[old_ref_name]
if self.is_tracking(old_ref_name):
self.track(new_ref_name)
del self.repo.refs[old_ref_name]
def branch_list(self, commit_sha=None):
"""
List all branches
"""
"""
refs = self.repo.get_refs()
branches = []
for ref,branch_sha in refs.items():
if ref.startswith("refs/heads"):
branch = Branch(ref[11:], branch_sha, self.repo[branch_sha])
branches.append(branch)
return branches
"""
cmd = ["git", "branch", "-lv", "--no-abbrev", "-a"]
if commit_sha:
cmd += ["--contains", commit_sha]
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
branches = []
for line in stdout:
if not line:
continue
components = line.split()
if components[0] != "*":
components.insert(0, "")
tracking = components.pop(0) == "*" and True or False
if components[0] == "(no":
name = components.pop(0) + " " + components.pop(0)
elif components[0] == "(HEAD":
continue # Detached head is not a branch.
else:
name = components.pop(0)
revision = components.pop(0)
message = " ".join(components)
branches.append({
"tracking": tracking,
"name": name,
"revision": revision,
"message": message
})
return branches
def checkout(self, paths=[], revision="HEAD"):
"""
Checkout a series of paths from a tree or commit. If no tree or commit
information is given, it will check out the files from head. If no
paths are given, all files will be checked out from head.
@type paths: list
@param paths: A list of files to checkout
@type revision: string
@param revision: The sha or branch to checkout
"""
if len(paths) == 1 and paths[0] == self.repo.path:
paths = []
cmd = ["git", "checkout", "-m", revision] + paths
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
def clone(self, host, path, bare=False, origin="origin"):
"""
Clone a repository
@type host: string
@param host: The url of the git repository
@type path: string
@param path: The path to clone to
@type bare: boolean
@param bare: Create a bare repository or not
@type origin: string
@param origin: Specify the origin of the repository
"""
self.numberOfCommandStages = 3
more = ["-o", "origin","--progress"]
if bare:
more.append("--bare")
base_dir = os.path.split(path)[0]
cmd = ["git", "clone", host, path] + more
isUsername = False
isPassword = False
self.modifiedHost = host
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=base_dir, notify=self.notify_and_parse_progress, cancel=self.get_cancel()).execute()
if stdout[1].find('could not read Username') > -1:
# Prompt for username if it does not exist in the url.
isUsername, originalRemoteUrl = self.promptUsername(self.modifiedHost)
# Prompt for password if a username exists in the remote url without a password.
isPassword, originalRemoteUrl2 = self.promptPassword(self.modifiedHost)
elif stdout[1].find('could not read Password') > -1:
# Prompt for password if a username exists in the remote url without a password.
isPassword, originalRemoteUrl = self.promptPassword(self.modifiedHost)
if isUsername == True or isPassword == True:
# Update the cmd with the username and password.
cmd = ["git", "clone", self.modifiedHost, path] + more
# Try again.
(status, stdout, stderr) = GittyupCommand(cmd, cwd=base_dir, notify=self.notify_and_parse_progress, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
# If we prompted for a username or password then it will now be written to the config. Remove it now before continuing.
if isUsername == True or isPassword == True:
# Load new config.
self.repo = dulwich.repo.Repo(path)
self._load_config()
# Write original url back to config.
self._config_set("remote \"origin\"", "url", host)
self.config.write_to_path()
def _get_gpg_sig(self):
try:
config_user_signingkey = S(self._config_get_glb(("user", ), "signingkey"))
if config_user_signingkey == "":
raise KeyError()
except KeyError:
return None
try:
config_commit_gpgsign = S(self._config_get_glb(("commit", ), "gpgsign"))
if config_commit_gpgsign == "" or config_commit_gpgsign.lower() != "true":
raise KeyError()
return config_user_signingkey
except KeyError:
return None
def commit(self, message, parents=None, committer=None, commit_time=None,
commit_timezone=None, author=None, author_time=None,
author_timezone=None, encoding=None, commit_all=False):
"""
Commit staged files to the local repository
@type message: string
@param message: The log message
@type parents: list
@param parents: A list of parent SHAs. Defaults to head.
@type committer: string
@param committer: The person committing. Defaults to
"user.name <user.email>"
@type commit_time: int
@param commit_time: The commit time. Defaults to time.time()
@type commit_timezone: int
@param commit_timezone: The commit timezone.
Defaults to local timezone.
@type author: string
@param author: The author of the file changes. Defaults to
"user.name <user.email>"
@type author_time: int
@param author_time: The author time. Defaults to time.time()
@type author_timezone: int
@param author_timezone: The author timezone.
Defaults to commit timezone.
@type encoding: string
@param encoding: The encoding of the commit. Defaults to UTF-8.
@type commit_all: boolean
@param commit_all: Stage all changed files before committing
"""
if commit_all:
self.stage_all()
initial_commit = False
if encoding is None:
encoding = ENCODING
if commit_timezone is None:
commit_timezone = helper.utc_offset()
config_user_signingkey = self._get_gpg_sig()
commit_id = None
if config_user_signingkey is not None:
self.notify("GPG Signing Key: "+S(config_user_signingkey))
commit_id = self.repo.do_commit(**helper.to_bytes({
"message": message,
"committer": committer,
"commit_timestamp": commit_time,
"commit_timezone": commit_timezone,
"author": author,
"author_timestamp": author_time,
"author_timezone": author_timezone,
"encoding": encoding,
"merge_heads": parents,
"sign": str(config_user_signingkey)}, encoding))
else:
commit_id = self.repo.do_commit(**helper.to_bytes({
"message": message,
"committer": committer,
"commit_timestamp": commit_time,
"commit_timezone": commit_timezone,
"author": author,
"author_timestamp": author_time,
"author_timezone": author_timezone,
"encoding": encoding,
"merge_heads": parents}, encoding))
branch_full = self.repo.refs.read_ref(b"HEAD")
if branch_full is not None:
branch_components = re.search(b"refs/heads/(.+)", branch_full)
if (branch_components != None):
branch = branch_components.group(1)
self.notify("[%s] -> %s" % (S(commit_id), S(branch)))
self.notify("To branch: " + S(branch))
#Print tree changes.
#dulwich.patch.write_tree_diff(sys.stdout, self.repo.object_store, commit.tree, commit.id)
return commit_id
def remove(self, paths):
"""
Remove path from the repository. Also deletes the local file.
@type paths: list
@param paths: A list of paths to remove
"""
if isinstance(paths, (str, six.text_type)):
paths = [paths]
index = self._get_index()
for path in paths:
relative_path = self.get_relative_path(path)
if relative_path in index:
del index[relative_path]
os.remove(path)
index.write()
def move(self, source, dest):
"""
Move a file within the repository
@type source: string
@param source: The source file
@type dest: string
@param dest: The destination. If dest exists as a directory, source
will be added as a child. Otherwise, source will be renamed to
dest.
"""
index = self._get_index()
relative_source = self.get_relative_path(source)
relative_dest = self.get_relative_path(dest)
# Get a list of affected files so we can update the index
source_files = []
if os.path.isdir(source):
for name in index:
name = name.decode(self.UTF8)
if name.startswith(relative_source):
source_files.append(name)
else:
source_files.append(relative_source)
# Rename the affected index entries
for source_file in source_files:
new_path = source_file.replace(relative_source, relative_dest)
if os.path.isdir(dest):
new_path = os.path.join(new_path, os.path.basename(source_file))
source_file = source_file.encode(self.UTF8)
index[new_path.encode(self.UTF8)] = index[source_file]
del index[source_file]
index.write()
# Actually move the file/folder
shutil.move(source, dest)
def pull(self, repository="origin", refspec="master", options=None):
"""
Fetch objects from a remote repository and merge with the local
repository
@type repository: string
@param repository: The name of the repository
@type refspec: string
@param refspec: The branch name to pull from
"""
self.numberOfCommandStages = 2
cmd = ["git", "pull", "--progress"]
if options != None:
if options.count("rebase"):
cmd.append("--rebase")
if options.count("all"):
cmd.append("--all")
else:
cmd.append (repository)
cmd.append (refspec)
# Setup the section name in the config for the remote target.
remoteKey = "remote \"" + repository + "\""
isUsername = False
isPassword = False
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify_and_parse_git_push, cancel=self.get_cancel()).execute()
if stdout[0].find('could not read Username') > -1:
# Prompt for username if it does not exist in the url.
isUsername, originalRemoteUrl = self.promptUsername(remoteKey)
# Prompt for password if a username exists in the remote url without a password.
isPassword, originalRemoteUrl2 = self.promptPassword(remoteKey)
elif stdout[0].find('could not read Password') > -1:
# Prompt for password if a username exists in the remote url without a password.
isPassword, originalRemoteUrl = self.promptPassword(remoteKey)
if isUsername == True or isPassword == True:
# Try again.
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify_and_parse_git_push, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
# If we prompted for a password and write it to the config, remove it now before continuing.
if isUsername == True or isPassword == True:
# Write original url back to config.
self._config_set(remoteKey, "url", originalRemoteUrl)
self.config.write_to_path()
def push(self, repository="origin", refspec="master", tags=True, force_with_lease=False):
"""
Push objects from the local repository into the remote repository
and merge them.
@type repository: string
@param repository: The name of the repository
@type refspec: string
@param refspec: The branch name to pull from
@type tags: boolean
@param tags: True to include tags in push, False to omit
"""
self.numberOfCommandStages = 2
cmd = ["git", "push", "--progress"]
if tags:
cmd.extend(["--tags"])
if force_with_lease:
cmd.extend(["--force-with-lease"])
cmd.extend([repository, refspec])
# Setup the section name in the config for the remote target.
remoteKey = "remote \"" + repository + "\""
isUsername = False
isPassword = False
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify_and_parse_git_push, cancel=self.get_cancel()).execute()
if stdout[0].find('could not read Username') > -1:
# Prompt for username if it does not exist in the url.
isUsername, originalRemoteUrl = self.promptUsername(remoteKey)
# Prompt for password if a username exists in the remote url without a password.
isPassword, originalRemoteUrl2 = self.promptPassword(remoteKey)
elif stdout[0].find('could not read Password') > -1:
# Prompt for password if a username exists in the remote url without a password.
isPassword, originalRemoteUrl = self.promptPassword(remoteKey)
if isUsername == True or isPassword == True:
# Try again.
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify_and_parse_git_push, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
# If we prompted for a password and write it to the config, remove it now before continuing.
if isUsername == True or isPassword == True:
# Write original url back to config.
self._config_set(remoteKey, "url", originalRemoteUrl)
self.config.write_to_path()
def onUsername(self, window, username, remoteKey, originalRemoteUrl, isOk):
if isOk == True:
if username == "":
six.moves.tkinter_messagebox.showinfo("Error", "Please enter a username.", parent=window)
return
else:
# Insert password into url.
newRemoteUrl = originalRemoteUrl.replace("://", "://" + username + "@")
if remoteKey.find("://") == -1:
# Write url temporarily back to config.
self._config_set(remoteKey, "url", newRemoteUrl)
self.config.write_to_path()
else:
# Change the url in memory, since we don't have a config yet.
self.modifiedHost = newRemoteUrl
# Close dialog.
window.destroy()
def onPassword(self, window, password, remoteKey, originalRemoteUrl, isOk):
if isOk == True:
if password == "":
six.moves.tkinter_messagebox.showinfo("Error", "Please enter a password.", parent=window)
return
else:
# Insert password into url.
newRemoteUrl = originalRemoteUrl.replace("@", ":" + password + "@")
if remoteKey.find("://") == -1:
# Write url temporarily back to config.
self._config_set(remoteKey, "url", newRemoteUrl)
self.config.write_to_path()
else:
# Change the url in memory, since we don't have a config yet.
self.modifiedHost = newRemoteUrl
# Close dialog.
window.destroy()
def promptUsername(self, remoteKey):
"""
If the github url contains no username, prompt for one and write the url back to the config.
Note, we'll set the url back to its original (without the password) after the call completes.
https://user@github.com/path/repositoryName.git
"""
isUsername = False
originalRemoteUrl = remoteKey
self.modifiedHost = originalRemoteUrl
if remoteKey.find("://") == -1:
# Get existing url from config, otherwise just use what was provided (the url from cloning, etc).
originalRemoteUrl = S(self._config_get(remoteKey, "url"))
if originalRemoteUrl.find('@') == -1:
# No username or password. Prompt for both. Create dialog.
window = six.moves.tkinter.Tk()
window.title("Please enter your username")
window.resizable(0,0)
window["padx"] = 40
window["pady"] = 20
textFrame = six.moves.tkinter.Frame(window)
# Create textbox label.
entryLabel = six.moves.tkinter.Label(textFrame)
entryLabel["text"] = "Username:"
entryLabel.pack(side=six.moves.tkinter.LEFT)
# Create textbox.
entryWidget = six.moves.tkinter.Entry(textFrame)
entryWidget["width"] = 25
entryWidget.bind("<Return>", (lambda event: self.onUsername(window, entryWidget.get(), remoteKey, originalRemoteUrl, True)))
entryWidget.bind("<KP_Enter>", (lambda event: self.onUsername(window, entryWidget.get(), remoteKey, originalRemoteUrl, True)))
entryWidget.pack(side=six.moves.tkinter.LEFT)
entryWidget.focus();
textFrame.pack()
# Create OK button.
button = six.moves.tkinter.Button(window, width=5, text="OK", command = (lambda: self.onUsername(window, entryWidget.get(), remoteKey, originalRemoteUrl, True)))
button.pack(side=six.moves.tkinter.RIGHT)
# Create Cancel button.
button = six.moves.tkinter.Button(window, width=5, text="Cancel", command = (lambda: self.onUsername(window, entryWidget.get(), remoteKey, originalRemoteUrl, False)))
button.pack(side=six.moves.tkinter.RIGHT)
# Position window in center of screen.
self.center(window)
# Show dialog.
window.mainloop()
isUsername = True
return isUsername, originalRemoteUrl
def promptPassword(self, remoteKey):
"""
If a username exists in the github url without a password, prompt the user and write the url back to the config.
Note, we'll set the url back to its original (without the password) after the call completes.
https://user@github.com/path/repositoryName.git
"""
isPassword = False
originalRemoteUrl = remoteKey
self.modifiedHost = originalRemoteUrl
if remoteKey.find("://") == -1:
# Get existing url from config, otherwise just use what was provided (the url from cloning, etc).
originalRemoteUrl = S(self._config_get(remoteKey, "url"))
# If the url contains a username (@) without a password (:), then prompt for a password.
if originalRemoteUrl.find('@') > -1 and originalRemoteUrl.rfind(':') <= 5:
# Prompt for password. Create dialog.
window = six.moves.tkinter.Tk()
window.title("Please enter your password")
window.resizable(0,0)
window["padx"] = 40
window["pady"] = 20
textFrame = six.moves.tkinter.Frame(window)
# Create textbox label.
entryLabel = six.moves.tkinter.Label(textFrame)
entryLabel["text"] = "Password:"
entryLabel.pack(side=six.moves.tkinter.LEFT)
# Create textbox.
entryWidget = six.moves.tkinter.Entry(textFrame)
entryWidget["show"] = "*"
entryWidget["width"] = 25
entryWidget.bind("<Return>", (lambda event: self.onPassword(window, entryWidget.get(), remoteKey, originalRemoteUrl, True)))
entryWidget.bind("<KP_Enter>", (lambda event: self.onPassword(window, entryWidget.get(), remoteKey, originalRemoteUrl, True)))
entryWidget.pack(side=six.moves.tkinter.LEFT)
entryWidget.focus();
textFrame.pack()
# Create OK button.
button = six.moves.tkinter.Button(window, width=5, text="OK", command = (lambda: self.onPassword(window, entryWidget.get(), remoteKey, originalRemoteUrl, True)))
button.pack(side=six.moves.tkinter.RIGHT)
# Create Cancel button.
button = six.moves.tkinter.Button(window, width=5, text="Cancel", command = (lambda: self.onPassword(window, entryWidget.get(), remoteKey, originalRemoteUrl, False)))
button.pack(side=six.moves.tkinter.RIGHT)
# Position window in center of screen.
self.center(window)
# Show dialog.
window.mainloop()
isPassword = True
return isPassword, originalRemoteUrl
def fetch(self, repository, branch=None):
"""
Fetch objects from a remote repository. This will not merge the files
into the local working copy, use pull for that.
@type repository: string
@param repository: The git repository from which to fetch
@type branch: string
@param branch: The git branch from which to fetch
"""
cmd = ["git", "fetch", repository]
if branch:
cmd.append(branch)
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
def fetch_all(self):
cmd = ["git", "fetch", "--all"]
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
def merge(self, branch):
cmd = ["git", "merge", branch]
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
def remote_add(self, name, host):
"""
Add a remote repository
@type name: string
@param name: The name to give to the remote repository
@type host: string
@param host: The git url to add
"""
cmd = ["git", "remote", "add", name, host]
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
def remote_rename(self, current_name, new_name):
"""
Rename a remote repository
@type current_name: string
@param current_name: The current name of the repository
@type new_name: string
@param new_name: The name to give to the remote repository
"""
cmd = ["git", "remote", "rename", current_name, new_name]
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
def remote_set_url(self, name, url):
"""
Change a remote repository's url
@type name: string
@param name: The name of the repository
@type url: string
@param url: The url for the repository
"""
cmd = ["git", "remote", "set-url", name, url]
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
def remote_delete(self, name):
"""
Remove a remote repository
@type name: string
@param name: The name of the remote repository to remove
"""
cmd = ["git", "remote", "rm", name]
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
def remote_list(self):
"""
Return a list of the remote repositories
@rtype list
@return A list of dicts with keys: remote, url, fetch
"""
cmd = ["git", "remote", "-v"]
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
stdout = []
returner = []
for line in stdout:
components = line.split()
if components:
name = components[0]
host = components[1]
add = True
for item in returner:
if item["name"] == name:
add = False
if add:
returner.append({
"name": name,
"host": host
})
return returner
def tag(self, name, message, revision="HEAD"):
"""
Create a tag object
@type name: string
@param name: The name to give the tag
@type message: string
@param message: A log message
@type revision: string
@param revision: The revision to tag. Defaults to HEAD
"""
config_user_signingkey = self._get_gpg_sig()
if config_user_signingkey is not None:
self.notify("GPG Signing Key: "+S(config_user_signingkey))
if hasattr(dulwich.porcelain,'tag'):
dulwich.porcelain.tag(self.repo, name, objectish=revision, message=message, sign=str(config_user_signingkey))
else:
dulwich.porcelain.tag_create(self.repo, name, objectish=revision, message=message, sign=str(config_user_signingkey))
else:
if hasattr(dulwich.porcelain,'tag'):
dulwich.porcelain.tag(self.repo, name, objectish=revision, message=message)
else:
dulwich.porcelain.tag_create(self.repo, name, objectish=revision, message=message)
def tag_delete(self, name):
"""
Delete a tag
@type name: string
@param name: The name of the tag to delete
"""
ref_name = S("refs/tags/%s" % name).bytes()
refs = self.repo.get_refs()
if ref_name in refs:
del self.repo.refs[ref_name]
def tag_list(self):
"""
Return a list of Tag objects
"""
refs = self.repo.get_refs()
tags = []
for ref, tag_sha in list(refs.items()):
if S(ref).startswith("refs/tags"):
if type(self.repo[tag_sha]) == dulwich.objects.Commit:
tag = CommitTag(ref[10:], tag_sha, self.repo[tag_sha])
else:
tag = Tag(tag_sha, self.repo[tag_sha])
tags.append(tag)
return tags
def status_porcelain(self, path):
if os.path.isdir(path):
(files, directories) = self._read_directory_tree(path)
else:
files = [self.get_relative_path(path)]
directories = []
files_hash = {}
for file in files:
files_hash[file] = True
cmd = ["git", "status", "--porcelain", path]
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify).execute()
except GittyupCommandError as e:
self.callback_notify(e)
statuses = []
modified_files = []
for line in stdout:
components = RE_STATUS.match(line)
if components:
status = components.group(1)
strip_status = status.strip()
path = self.string_unescape(components.group(2))
if path[0] == '"' and path[-1] == '"':
path = path[1:-1]
if status == " D":
statuses.append(MissingStatus(path))
elif any(c in strip_status for c in ["M", "R", "U"]):
statuses.append(ModifiedStatus(path))
elif strip_status in ["A", "C"]:
statuses.append(AddedStatus(path))
elif strip_status == "D":
statuses.append(RemovedStatus(path))
elif strip_status == "??":
statuses.append(UntrackedStatus(path))
modified_files.append(path)
try:
del files_hash[path]
except Exception as e:
pass
# Determine untracked directories
cmd = ["git", "clean", "-nd", self.repo.path]
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify).execute()
except GittyupCommandError as e:
self.callback_notify(e)
untracked_directories = []
for line in stdout:
components = re.match("^(Would remove)\s(.*?)$", line)
if components:
untracked_path = components.group(2)
if untracked_path[-1]=='/':
untracked_directories.append(untracked_path[:-1])
#Determine the ignored files and directories in Repo
cmd = ["git", "clean", "-ndX", self.repo.path]
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify).execute()
except GittyupCommandError as e:
self.callback_notify(e)
ignored_directories=[]
for line in stdout:
components = re.match("^(Would remove)\s(.*?)$", line)
if components:
ignored_path=components.group(2)
if ignored_path[-1]=='/':
ignored_directories.append(ignored_path[:-1])
next
statuses.append(IgnoredStatus(ignored_path))
self.ignored_paths.append(ignored_path)
try:
del files_hash[ignored_path]
except Exception as e:
pass
for file,data in list(files_hash.items()):
ignore_file=False
untracked_file=False
for ignored_path in ignored_directories:
if S(ignored_path) in file:
ignore_file=True
break
for untracked_path in untracked_directories:
if S(untracked_path) in file:
untracked_file=True
break
if untracked_file==True:
statuses.append(UntrackedStatus(file))
if ignore_file==True:
self.ignored_paths.append(file)
elif ignore_file==True:
statuses.append(IgnoredStatus(file))
self.ignored_paths.append(file)
else:
statuses.append(NormalStatus(file))
# Determine status of folders based on child contents
for d in directories:
d_status = NormalStatus(d)
# Check if directory is untracked or a sub-directory of an untracked directory
for untracked_path in untracked_directories:
if untracked_path in d:
d_status = UntrackedStatus(d)
break
dirPattern = "/%s/" % d
if len(d) == 0:
dirPattern = "/"
# Check if directory includes modified files
for file in modified_files:
if ("/%s" % file).startswith(dirPattern): # fix, when file startwith same prefix as directory, fix status for root repo path ""
d_status = ModifiedStatus(d)
break
# Check if directory is ignored
for ignored_path in ignored_directories:
if ignored_path in d:
d_status = IgnoredStatus(d)
break
statuses.append(d_status)
return statuses
def status_dulwich(self, path):
tree = self._get_tree_index()
index = self._get_index()
if os.path.isdir(path):
(files, directories) = self._read_directory_tree(path)
else:
files = [self.get_relative_path(path)]
directories = []
files_hash = {}
for file in files:
files_hash[file] = True
statuses = []
# Calculate statuses for files in the current HEAD
modified_files = []
for name in tree:
try:
if index[name]:
inIndex = True
except Exception as e:
inIndex = False
if inIndex:
absolute_path = self.get_absolute_path(name)
if os.path.isfile(absolute_path):
# Cached, determine if modified or not
blob = self._get_blob_from_file(absolute_path)
if blob.id == tree[name][1]:
statuses.append(NormalStatus(name))
else:
modified_files.append(name)
statuses.append(ModifiedStatus(name))
else:
modified_files.append(name)
statuses.append(MissingStatus(name))
else:
modified_files.append(name)
statuses.append(RemovedStatus(name))
try:
del files_hash[name]
except Exception as e:
pass
# Calculate statuses for untracked files
for name,data in list(files_hash.items()):
try:
inTreeIndex = tree[name]
except Exception as e:
inTreeIndex = False
try:
inIndex = index[name]
except Exception as e:
inIndex = False
if inIndex and not inTreeIndex:
modified_files.append(name)
statuses.append(AddedStatus(name))
continue
# Generate a list of appropriate ignore patterns
patterns = []
path_to_check = os.path.dirname(self.get_absolute_path(name))
while path_to_check != self.repo.path:
patterns += self.get_ignore_patterns_from_file(self.get_local_ignore_file(path_to_check))
path_to_check = os.path.split(path_to_check)[0]
patterns += self.get_ignore_patterns_from_file(self.get_local_ignore_file(self.repo.path))
patterns += self.global_ignore_patterns
if not self._ignore_file(patterns, os.path.basename(name)):
statuses.append(UntrackedStatus(name))
else:
self.ignored_paths.append(name)
# Determine status of folders based on child contents
for d in directories:
d_status = NormalStatus(d)
for file in modified_files:
if os.path.join(d, os.path.basename(file)) == file:
d_status = ModifiedStatus(d)
break
statuses.append(d_status)
return statuses
def get_all_ignore_file_paths(self, path):
return self.ignored_paths
def status(self, path):
# TODO - simply get this from the status implementation / avoid global state
self.ignored_paths = []
return self.status_porcelain(path)
def log(self, path="", skip=0, limit=None, revision="", showtype="all"):
cmd = ["git", "--no-pager", "log", "--numstat", "--parents", "--pretty=fuller",
"--date-order", "--date=default", "-m"]
if showtype == "all":
cmd.append("--all")
if limit:
cmd.append("-%s" % limit)
if skip:
cmd.append("--skip=%s" % skip)
if revision:
if showtype=="push":
cmd.append("%s.." % revision)
else:
cmd.append(revision)
if path == self.repo.path:
path = ""
if path:
cmd += ["--", path]
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify).execute()
except GittyupCommandError as e:
self.callback_notify(e)
return []
revisions = []
revision = {}
changed_file = {}
pattern_from = re.compile(r' \(from (.*)\)')
last_commitId = ""
for line in stdout:
if line == "":
continue
if line[0:6] == "commit":
match = pattern_from.search(line)
commit_line = re.sub(" \(from.*\)","", line).split(" ")
fromPath = ""
if match:
fromPath = match.group(1)
if revision:
if "changed_paths" not in revision:
revision["changed_paths"] = {}
if last_commitId != commit_line[1]:
revisions.append(revision)
revision = {}
else:
del revision["message"]
if len(fromPath) > 0:
if "changed_paths" not in revision:
revision["changed_paths"] =[]
changed_file = {
"additions": "-",
"removals": "-",
"path": "Diff with parent : %s " % fromPath
}
revision["changed_paths"].append(changed_file)
changed_file = {}
revision["commit"] = commit_line[1]
last_commitId = revision["commit"]
revision["parents"] = []
for parent in commit_line[2:]:
revision["parents"].append(parent)
elif line[0:7] == "Author:":
revision["author"] = line[7:].strip()
elif line[0:11] == "AuthorDate:":
revision["author_date"] = line[11:].strip()
elif line[0:7] == "Commit:":
revision["committer"] = line[7:].strip()
elif line[0:11] == "CommitDate:":
revision["commit_date"] = line[11:].strip()
elif line[0:4] == " ":
message = line[4:]
if "message" not in revision:
revision["message"] = ""
else:
revision["message"] += "\n"
revision["message"] = revision["message"] + message
elif line[0].isdigit() or line[0] in "-":
file_line = line.split("\t")
if "changed_paths" not in revision:
revision["changed_paths"] = []
if len(file_line) == 3:
changed_file = {
"additions": file_line[0],
"removals": file_line[1],
"path": self.string_unescape(file_line[2])
}
if changed_file['path'][0] == '"' and changed_file['path'][-1] == '"':
changed_file['path'] = changed_file['path'][1:-1]
revision["changed_paths"].append(changed_file)
if revision:
revisions.append(revision)
return revisions
def annotate(self, path, revision_obj="HEAD"):
"""
Returns an annotation for a specified file
@type path: string
@param path: The absolute path to a tracked file
@type revision: string
@param revision: HEAD or a sha1 hash
"""
relative_path = self.get_relative_path(path)
cmd = ["git", "annotate", "-l", revision_obj, relative_path]
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
stdout = []
returner = []
for line in stdout:
components = re.split("\t", line, 3)
if len(components) < 4:
continue
dt = datetime(*time.strptime(components[2][:-6],"%Y-%m-%d %H:%M:%S")[:-2])
message = components[3].split(")", 1)
code = message[1]
if len(components) == 5:
code = components[4]
returner.append({
"revision": components[0],
"author": components[1][1:],
"date": dt,
"line": code,
"number": message[0]
})
return returner
def show(self, path, revision_obj):
"""
Returns a particular file at a given revision object.
@type path: string
@param path: The absolute path to a file
@type revision_obj: git.Revision()
@param revision_obj: The revision object for path
"""
if not revision_obj:
revision_obj = "HEAD"
relative_path = self.get_relative_path(path)
cmd = ["git", "show", "%s:%s" % (revision_obj, relative_path)]
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
stdout = []
return "\n".join(stdout)
def diff(self, path1, revision_obj1, path2=None, revision_obj2=None, summarize=False):
"""
Returns the diff between the path(s)/revision(s)
@type path1: string
@param path1: The absolute path to a file
@type revision_obj1: git.Revision()
@param revision_obj1: The revision object for path1
@type path2: string
@param path2: The absolute path to a file
@type revision_obj2: git.Revision()
@param revision_obj2: The revision object for path2
"""
relative_path1 = None
relative_path2 = None
if path1:
relative_path1 = self.get_relative_path(path1)
if path2:
relative_path2 = self.get_relative_path(path2)
cmd = ["git", "diff"]
if summarize:
cmd.append("--name-status")
if revision_obj1:
cmd += [revision_obj1]
if revision_obj2 and path2:
cmd += [revision_obj2]
if relative_path1:
cmd += [relative_path1]
if relative_path2 and relative_path2 != relative_path1:
cmd += [relative_path2]
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
stdout = []
return ''.join(x + "\n" for x in stdout)
def diff_summarize(self, path1, revision_obj1, path2=None, revision_obj2=None):
results = self.diff(path1, revision_obj1, path2, revision_obj2, True)
summary = []
for line in results.split("\n"):
if not line:
continue
(action, path) = (line + "\t").split("\t")[:2]
summary.append({
"action": action,
"path": path
})
return summary
def export(self, path, dest_path, revision):
"""
Exports a file or directory from a given revision
@type path: string
@param path: The source file/folder to export
@type dest_path: string
@param dest_path: The path to put the exported file(s)
@type revision: string
@param revision: The revision/tree/commit of the source file being exported
"""
tmp_file = get_tmp_path("rabbitvcs-git-export.tar")
cmd1 = ["git", "archive", "--format", "tar", "-o", tmp_file, revision, path]
cmd2 = ["tar", "-xf", tmp_file, "-C", dest_path]
mkdir_p(dest_path)
try:
(status, stdout, stderr) = GittyupCommand(cmd1, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
(status, stdout, stderr) = GittyupCommand(cmd2, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
stdout = []
self.notify("%s at %s exported to %s" % (path, revision, dest_path))
return "\n".join(stdout)
def clean(self, path, remove_dir=True, remove_ignored_too=False,
remove_only_ignored=False, dry_run=False, force=True):
cmd = ["git", "clean"]
if remove_dir:
cmd.append("-d")
if remove_ignored_too:
cmd.append("-x")
if remove_only_ignored:
cmd.append("-X")
if dry_run:
cmd.append("-n")
if force:
cmd.append("-f")
relative_path = self.get_relative_path(path)
cmd.append(relative_path)
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
return
def reset(self, path, revision, type=None):
relative_path = self.get_relative_path(path)
cmd = ["git", "reset"]
if type:
cmd.append("--%s" % type)
cmd.append(revision)
if relative_path:
cmd.append(relative_path)
try:
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
except GittyupCommandError as e:
self.callback_notify(e)
return
def set_callback_notify(self, func):
self.callback_notify = func
def set_callback_progress_update(self, func):
self.callback_progress_update = func
def set_callback_get_user(self, func):
self.callback_get_user = func
def set_callback_get_cancel(self, func):
self.callback_get_cancel = func
def notify(self, data):
self.callback_notify(data)
def notify_and_parse_progress(self, data):
# When progress is requested to a git command, it will
# respond with the current operation, and that operations current progress
# in the following format: "<Command>: <percentage>% (<pieces compeated>/<num pieces>)".
#
# When a command has reached 100% the format of this final message assumes the formatt:
# "<Command>: 100% (<num pieces>/<num pieces>), <total size> <unit>, done."
returnData = {"action":"","path":"","mime_type":""}
#print "parsing message: " + str(data)
# If data is already a dict, we'll assume it's already been parsed, and return.
if isinstance (data, dict):
self.notify (data);
return
# Is this an error?
message_components = re.search("^([eE]rror|[fF]atal): (.+)", data)
if message_components != None:
returnData["action"] = "Error"
returnData["path"] = message_components.group(2)
self.notify (returnData)
return
# Check to see if this is a remote command.
remote_check = re.search("^(remote: )(.+)$", data)
if remote_check != None:
returnData["action"] = "Remote"
message = remote_check.group(2)
else:
message = data
# First, we'll test to see if this is a progress notification.
if "%" not in message:
# No, this is just a regular message.
# Some messages have a strage tendancy to append a non-printable character,
# followed by a right square brace and a capitol "K". This tests for, and
# strips these superfluous characters.
message_components = re.search("^(.+).\[K", message)
if message_components != None:
returnData["path"] = message_components.group(1)
else:
returnData["path"] = message
self.notify (returnData)
return
# Extract the percentage, which will be all numerals directly
# prior to '%'.
message_components = re.search("^(.+): +([0-9]+)%", message)
if message_components == None:
print("Error: failed to parse git string: " + data)
return
fraction = float(message_components.group(2)) / 100 # Convert percentage to fraction.
current_action = message_components.group(1)
# If we're at 0%, then we want to notify which action we're performing.
if fraction == 0:
returnData["path"] = current_action
self.notify(returnData)
#print "stage fraction: " + str (fraction)
# If we're using a number of stages, adjust the fraction acordingly.
if self.numberOfCommandStages > 0:
fraction = (self.numberOfCommandStagesExecuted + fraction) / self.numberOfCommandStages
# If we've finished the current stage (100%).
if "done" in message:
self.numberOfCommandStagesExecuted += 1
# If we've registered a callback for progress, update with the new fraction.
if self.callback_progress_update != None:
#print "setting pbar: " + str(fraction)
self.callback_progress_update(fraction)
# If we've finished the whole command (all stages).
if fraction == 1 and "done" in message:
# Reset stage variables.
self.numberOfCommandStages = 0
self.numberOfCommandStagesExecuted = 0
def notify_and_parse_git_pull (self, data):
return_data = {"action":"","path":"","mime_type":""}
message_parsed = False
# Look for "From" line (e.g. "From ssh://server:22/my_project")
message_components = re.search("^From (.+)", data)
if message_components != None:
return_data["action"] = "From"
return_data["path"] = message_components.group(1)
message_parsed = True
# Look for "Branch" line (e.g. "* branch master -> FETCH_HEAD")
message_components = re.search("\* branch +([A-z0-9]+) +-> (.+)", data)
if message_components != None:
return_data["action"] = "Branch"
return_data["path"] = message_components.group(1) + " -> " + message_components.group(2)
message_parsed = True
# Look for a file line (e.g. "src/somefile.py | 5 -++++")
message_components = re.search(" +(.+) +\| *([0-9]+) ([+-]+)", data)
if message_components != None:
return_data["action"] = "Modified"
return_data["path"] = message_components.group(1)
return_data["mime_type"] = message_components.group(2) + " " + message_components.group(3)
message_parsed = True
# Look for a updating line (e.g. "Updating ffffff..ffffff")
message_components = re.search("^Updating ([a-f0-9.]+)", data)
if message_components != None:
return_data["action"] = "Updating"
return_data["path"] = message_components.group(1)
message_parsed = True
# Look for a "create mode" line (e.g. "create mode 100755 file.py")
message_components = re.search("create mode ([0-9]+) (.+)", data)
if message_components != None:
return_data["action"] = "Create"
return_data["path"] = message_components.group(2)
return_data["mime_type"] = "mode: " + message_components.group(1)
message_parsed = True
# Look for a "delete mode" line (e.g. "create mode 100755 file.py")
message_components = re.search("delete mode ([0-9]+) (.+)", data)
if message_components != None:
return_data["action"] = "Delete"
return_data["path"] = message_components.group(2)
return_data["mime_type"] = "mode: " + message_components.group(1)
message_parsed = True
# Look for an "Auto-merging" line (e.g. "Auto-merging src/file.py")
message_components = re.search("^Auto-merging (.+)", data)
if message_components != None:
return_data["action"] = "Merging"
return_data["path"] = message_components.group(1)
message_parsed = True
# Look for a "binary" line (e.g. "icons/file.png" | Bin 0 -> 55555 bytes)
message_components = re.search("^[ ](.+) +\| Bin ([0-9]+ -> [0-9]+ bytes)", data)
if message_components != None:
return_data["action"] = "Binary"
return_data["path"] = message_components.group(1)
return_data["mime_type"] = message_components.group(2)
message_parsed = True
# Look for a "rename" line (e.g. "rename src/{foo.py => bar.py} (50%)")
message_components = re.search("rename (.+}) \([0-9]+%\)", data)
if message_components != None:
return_data["action"] = "Rename"
return_data["path"] = message_components.group(1)
message_parsed = True
# Look for a "copy" line (e.g. "copy src/{foo.py => bar.py} (50%)")
message_components = re.search("copy (.+}) \([0-9]+%\)", data)
if message_components != None:
return_data["action"] = "Copy"
return_data["path"] = message_components.group(1)
message_parsed = True
# Prepend "Error" to conflict lines. e.g. :
# CONFLICT (content): Merge conflict in file.py.
# Automatic merge failed; fix conflicts and then commit the result.
message_components = re.search("^CONFLICT \(|Automatic merge failed", data)
if message_components != None:
return_data["action"] = "Error"
return_data["path"] = data
message_parsed = True
if message_parsed == False:
return_data = data
self.notify_and_parse_progress (return_data)
def notify_and_parse_git_push (self, data):
return_data = {"action":"","path":"","mime_type":""}
message_parsed = False
# Look for to line. e.g. "To gitosis@server.org:project.git". Exclude any
# lines that include a space (as this could be a message about something else)
message_components = re.search("^To ([^ ]+$)", data)
if message_components != None:
return_data["action"] = "To"
return_data["path"] = message_components.group(1)
message_parsed = True
# Look for "new branch" line. e.g. " * [new branch] master -> master"
message_components = re.search("^ \* \[new branch\] +(.+) -> (.+)", data)
if message_components != None:
return_data["action"] = "New Branch"
return_data["path"] = message_components.group(1) + " -> " + message_components.group(2)
message_parsed = True
# Look for "rejected" line. e.g. " ![rejected] master -> master (non-fast-forward)".
message_components = re.search("!\[rejected\] +(.+)", data)
if message_components != None:
return_data["action"] = "Rejected"
return_data["path"] = message_components.group(1)
message_parsed = True
if message_parsed == False:
return_data = data
self.notify_and_parse_progress (return_data)
def get_cancel(self):
return self.callback_get_cancel
def center(self, window):
# Temporarily hide the window to avoid update_idletasks() drawing the window in the wrong position.
window.withdraw()
# Update "requested size" from geometry manager.
window.update_idletasks()
x = (window.winfo_screenwidth() - window.winfo_reqwidth()) / 2
y = (window.winfo_screenheight() - window.winfo_reqheight()) / 2
window.geometry("+%d+%d" % (x, y))
# Draw the window frame immediately after setting correct window position.
window.deiconify()