2214 lines
74 KiB
Python
2214 lines
74 KiB
Python
from __future__ import absolute_import
|
|
from __future__ import print_function
|
|
#
|
|
# client.py
|
|
#
|
|
|
|
import os, errno
|
|
import os.path
|
|
import re
|
|
import shutil
|
|
import fnmatch
|
|
import time
|
|
import struct
|
|
from datetime import datetime
|
|
from mimetypes import guess_type
|
|
import time
|
|
|
|
import subprocess
|
|
|
|
import dulwich.errors
|
|
import dulwich.repo
|
|
import dulwich.porcelain
|
|
import dulwich.objects
|
|
from dulwich.index import write_index_dict, SHA1Writer
|
|
#from dulwich.patch import write_tree_diff
|
|
|
|
from .exceptions import *
|
|
from . import util
|
|
from .objects import *
|
|
from .command import GittyupCommand
|
|
|
|
from rabbitvcs.util import helper
|
|
from rabbitvcs.util.strings import *
|
|
|
|
import six.moves.tkinter
|
|
import six.moves.tkinter_messagebox
|
|
import six
|
|
|
|
ENCODING = "UTF-8"
|
|
|
|
RE_STATUS = re.compile("^([\sA-Z\?]+)\s(?:\S+\s->\s)?(.*?)$")
|
|
|
|
|
|
def callback_notify_null(val):
|
|
pass
|
|
|
|
def callback_get_user():
|
|
from pwd import getpwuid
|
|
pwuid = getpwuid(os.getuid())
|
|
|
|
user = pwuid[0]
|
|
fullname = pwuid[4]
|
|
host = os.getenv("HOSTNAME")
|
|
|
|
return (fullname, "%s@%s" % (user, host))
|
|
|
|
def callback_get_cancel():
|
|
return False
|
|
|
|
def mkdir_p(path):
|
|
# http://stackoverflow.com/questions/600268/mkdir-p-functionality-in-python
|
|
try:
|
|
os.makedirs(path)
|
|
except OSError as exc: # Python >2.5
|
|
if exc.errno == errno.EEXIST and os.path.isdir(path):
|
|
pass
|
|
else: raise
|
|
|
|
def get_tmp_path(filename):
|
|
tmpdir = "/tmp/rabbitvcs"
|
|
mkdir_p(tmpdir)
|
|
return os.path.join(tmpdir, filename)
|
|
|
|
|
|
class GittyupClient(object):
|
|
UTF8 = UTF8_ENCODING
|
|
|
|
def __init__(self, path=None, create=False):
|
|
self.callback_notify = callback_notify_null
|
|
self.callback_progress_update = None
|
|
self.callback_get_user = callback_get_user
|
|
self.callback_get_cancel = callback_get_cancel
|
|
|
|
self.global_ignore_patterns = []
|
|
|
|
self.git_version = None
|
|
|
|
self.numberOfCommandStages = 0
|
|
self.numberOfCommandStagesExecuted = 0
|
|
|
|
if path:
|
|
try:
|
|
self.repo = dulwich.repo.Repo(path)
|
|
self._load_config()
|
|
self.global_ignore_patterns = self._get_global_ignore_patterns()
|
|
except dulwich.errors.NotGitRepository:
|
|
if create:
|
|
self.initialize_repository(path)
|
|
self.global_ignore_patterns = self._get_global_ignore_patterns()
|
|
else:
|
|
raise NotRepositoryError()
|
|
else:
|
|
self.repo = None
|
|
|
|
#
|
|
# Start Private Methods
|
|
#
|
|
|
|
def _initialize_index(self):
|
|
index_path = self.repo.index_path()
|
|
f = open(index_path, "wb")
|
|
try:
|
|
f = SHA1Writer(f)
|
|
write_index_dict(f, {})
|
|
except:
|
|
pass
|
|
|
|
f.close()
|
|
|
|
def _get_index(self):
|
|
if not self.repo.has_index():
|
|
self._initialize_index()
|
|
|
|
return self.repo.open_index()
|
|
|
|
def _get_tree_at_head(self):
|
|
try:
|
|
tree = self.repo[self.repo[self.repo.head()].tree]
|
|
except KeyError as e:
|
|
tree = dulwich.objects.Tree()
|
|
|
|
return tree
|
|
|
|
def _get_tree_from_sha1(self, sha1):
|
|
return self.repo[self.repo[sha1].tree]
|
|
|
|
def _get_tree_index(self, tree=None):
|
|
if tree is None:
|
|
tree = self._get_tree_at_head()
|
|
|
|
tree_index = {}
|
|
if tree:
|
|
for item in self.repo.object_store.iter_tree_contents(tree.id):
|
|
tree_index[item[0].decode(self.UTF8)] = (item[1], item[2].decode(self.UTF8))
|
|
|
|
return tree_index
|
|
|
|
def _get_git_version(self):
|
|
"""
|
|
Gets the local git version
|
|
"""
|
|
|
|
if self.git_version:
|
|
return self.git_version
|
|
else:
|
|
try:
|
|
proc = subprocess.Popen(["git", "--version"],
|
|
stdout=subprocess.PIPE,
|
|
universal_newlines=True)
|
|
response = proc.communicate()[0].split()
|
|
version = [int(x) for x in response[2].split(".")]
|
|
self.git_version = version
|
|
return self.git_version
|
|
except Exception as e:
|
|
return None
|
|
|
|
def _get_global_ignore_patterns(self):
|
|
"""
|
|
Get ignore patterns from $GIT_DIR/info/exclude then from
|
|
core.excludesfile in gitconfig.
|
|
|
|
"""
|
|
patterns = []
|
|
|
|
files = self.get_global_ignore_files()
|
|
for path in files:
|
|
patterns += self.get_ignore_patterns_from_file(path)
|
|
|
|
return patterns
|
|
|
|
def get_global_ignore_files(self):
|
|
"""
|
|
Returns a list of ignore files possible for this repository
|
|
"""
|
|
|
|
try:
|
|
git_dir = os.environ["GIT_DIR"]
|
|
except KeyError:
|
|
git_dir = os.path.join(self.repo.path, ".git")
|
|
|
|
files = []
|
|
|
|
excludefile = os.path.join(git_dir, "info", "exclude")
|
|
files.append(excludefile)
|
|
|
|
try:
|
|
core_excludesfile = self._config_get(("core", ), "excludesfile")
|
|
if core_excludesfile:
|
|
files.append(core_excludesfile)
|
|
except KeyError:
|
|
pass
|
|
|
|
return files
|
|
|
|
def get_local_ignore_file(self, path):
|
|
if not os.path.exists(path):
|
|
return []
|
|
|
|
if os.path.isfile(path):
|
|
path = os.path.basename(path)
|
|
|
|
return os.path.join(path, ".gitignore")
|
|
|
|
def get_ignore_patterns_from_file(self, path):
|
|
"""
|
|
Read in an ignore patterns file (i.e. .gitignore, $GIT_DIR/info/exclude)
|
|
and return a list of patterns
|
|
"""
|
|
|
|
patterns = []
|
|
if os.path.isfile(path):
|
|
file = open(path, "r")
|
|
try:
|
|
for line in file:
|
|
if line == "" or line.startswith("#"):
|
|
continue
|
|
|
|
patterns.append(line.rstrip("\n"))
|
|
except:
|
|
pass
|
|
|
|
file.close()
|
|
|
|
return patterns
|
|
|
|
def get_local_config_file(self):
|
|
try:
|
|
git_dir = os.environ["GIT_DIR"]
|
|
except KeyError:
|
|
git_dir = os.path.join(self.repo.path, ".git")
|
|
|
|
return git_dir + "/config"
|
|
|
|
def _ignore_file(self, patterns, filename):
|
|
"""
|
|
Determine whether the given file should be ignored
|
|
|
|
"""
|
|
for pattern in patterns:
|
|
if fnmatch.fnmatch(filename, pattern) and not pattern.startswith("!"):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _read_directory_tree(self, path, show_ignored_files=False):
|
|
files = []
|
|
directories = []
|
|
for root, dirs, filenames in os.walk(path, topdown=True):
|
|
try:
|
|
dirs.remove(".git")
|
|
removed_git_dir = True
|
|
except ValueError:
|
|
pass
|
|
|
|
# Find the relative root path of this folder
|
|
if root == self.repo.path:
|
|
rel_root = ""
|
|
else:
|
|
rel_root = self.get_relative_path(root)
|
|
|
|
for filename in filenames:
|
|
files.append(os.path.join(rel_root, filename))
|
|
|
|
for _d in dirs:
|
|
directories.append(os.path.join(rel_root, _d))
|
|
|
|
directories.append(rel_root)
|
|
|
|
#Remove duplicates in list
|
|
directories=list(set(directories))
|
|
return (sorted(files), directories)
|
|
|
|
def _get_blob_from_file(self, path):
|
|
file = open(path, "rb")
|
|
try:
|
|
blob = dulwich.objects.Blob.from_string(file.read())
|
|
finally:
|
|
file.close()
|
|
|
|
return blob
|
|
|
|
def _write_blob_to_file(self, path, blob):
|
|
dirname = os.path.dirname(path)
|
|
if not os.path.isdir(dirname):
|
|
os.makedirs(dirname)
|
|
|
|
file = open(path, "wb")
|
|
try:
|
|
file.write(blob.data)
|
|
finally:
|
|
file.close()
|
|
|
|
def _load_config(self):
|
|
self.config = self.repo.get_config()
|
|
self.glb_config = self.repo.get_config_stack()
|
|
|
|
def _config_normalize_section(self, section):
|
|
# If some old code is using string sections, convert to a tuple
|
|
if isinstance(section, six.string_types):
|
|
parts = section.split(" ")
|
|
s1 = parts.pop(0)
|
|
s2 = " ".join(parts).replace('"', "")
|
|
section = (s1, s2)
|
|
|
|
return section
|
|
|
|
def _config_set(self, section, key, value):
|
|
section = self._config_normalize_section(section)
|
|
return self.config.set(section, key, value)
|
|
|
|
def _config_get(self, section, key):
|
|
section = self._config_normalize_section(section)
|
|
return self.config.get(section, key)
|
|
|
|
def _config_get_glb(self, section, key):
|
|
section = self._config_normalize_section(section)
|
|
return self.glb_config.get(section, key)
|
|
|
|
def _get_config_user(self):
|
|
try:
|
|
config_user_name = S(self._config_get(("user", ), "name"))
|
|
config_user_email = S(self._config_get(("user", ), "email"))
|
|
if config_user_name == "" or config_user_email == "":
|
|
raise KeyError()
|
|
except KeyError:
|
|
(config_user_name, config_user_email) = self.callback_get_user()
|
|
|
|
if config_user_name == None and config_user_email == None:
|
|
return None
|
|
|
|
self._config_set(("user", ), "name", config_user_name)
|
|
self._config_set(("user", ), "email", config_user_email)
|
|
self.config.write_to_path()
|
|
return "%s <%s>" % (config_user_name, config_user_email)
|
|
|
|
def string_unescape(self, s):
|
|
# Portable utf-8 string unescape.
|
|
if isinstance(s, six.text_type):
|
|
s = s.encode(IDENTITY_ENCODING)
|
|
s = S(s.decode("unicode_escape"), IDENTITY_ENCODING)
|
|
return S(s.bytes(IDENTITY_ENCODING))
|
|
|
|
#
|
|
# Start Public Methods
|
|
#
|
|
|
|
def initialize_repository(self, path, bare=False):
|
|
mkdir_p(path)
|
|
|
|
if bare:
|
|
dulwich.repo.Repo.init_bare(path)
|
|
else:
|
|
dulwich.repo.Repo.init(path)
|
|
self.set_repository(path)
|
|
|
|
def set_repository(self, path):
|
|
try:
|
|
self.repo = dulwich.repo.Repo(path)
|
|
self._load_config()
|
|
except dulwich.errors.NotGitRepository:
|
|
raise NotRepositoryError()
|
|
|
|
def get_repository(self):
|
|
return self.repo.path
|
|
|
|
def find_repository_path(self, path):
|
|
path_to_check = S(path)
|
|
while path_to_check != "/" and path_to_check != "":
|
|
if os.path.isdir(os.path.join(path_to_check, ".git")):
|
|
return path_to_check
|
|
|
|
path_to_check = os.path.split(path_to_check)[0]
|
|
|
|
return None
|
|
|
|
def get_relative_path(self, path):
|
|
path = S(path)
|
|
if path == self.repo.path:
|
|
return "."
|
|
return util.relativepath(self.repo.path, path)
|
|
|
|
def get_absolute_path(self, path):
|
|
path = S(path)
|
|
if path == ".":
|
|
return self.repo.path
|
|
return os.path.join(self.repo.path, path).rstrip("/")
|
|
|
|
def track(self, name):
|
|
self.repo.refs.set_symbolic_ref(b"HEAD", name)
|
|
|
|
def is_tracking(self, name):
|
|
return (self.repo.refs.read_ref(b"HEAD")[5:] == name)
|
|
|
|
def tracking(self):
|
|
return self.repo.refs.read_ref(b"HEAD")[5:]
|
|
|
|
def head(self):
|
|
return self.repo.refs[b"HEAD"]
|
|
|
|
def stage(self, paths):
|
|
"""
|
|
Stage files to be committed or tracked
|
|
|
|
@type paths: list
|
|
@param paths: A list of files
|
|
|
|
"""
|
|
index = self._get_index()
|
|
to_stage = []
|
|
|
|
if isinstance(paths, (str, six.text_type)):
|
|
paths = [paths]
|
|
|
|
for path in paths:
|
|
relative_path = self.get_relative_path(path)
|
|
absolute_path = self.get_absolute_path(path)
|
|
|
|
self.notify({
|
|
"action": "Staged",
|
|
"path": absolute_path,
|
|
"mime_type": guess_type(absolute_path)[0]
|
|
})
|
|
to_stage.append(S(relative_path))
|
|
self.repo.stage(to_stage)
|
|
|
|
def stage_all(self):
|
|
"""
|
|
Stage all files in a repository to be committed or tracked
|
|
|
|
"""
|
|
|
|
index = self._get_index()
|
|
for status in self.status():
|
|
if status in [AddedStatus, RemovedStatus, ModifiedStatus]:
|
|
abs_path = self.get_absolute_path(status.path)
|
|
relative_path = self.get_relative_path(status.path)
|
|
if os.path.isfile(abs_path):
|
|
self.stage(relative_path)
|
|
|
|
if status == MissingStatus:
|
|
del index[status.path]
|
|
index.write()
|
|
|
|
def unstage(self, paths):
|
|
"""
|
|
Unstage files so they are not committed or tracked
|
|
|
|
@type paths: list
|
|
@param paths: A list of files
|
|
|
|
"""
|
|
|
|
index = self._get_index()
|
|
tree = self._get_tree_index()
|
|
|
|
if isinstance(paths, (str, six.text_type)):
|
|
paths = [paths]
|
|
|
|
for path in paths:
|
|
relative_path = S(self.get_relative_path(path)).bytes()
|
|
if relative_path in index:
|
|
if relative_path in tree:
|
|
(ctime, mtime, dev, ino, mode, uid, gid, size, blob_id, flags) = index[relative_path]
|
|
(mode, blob_id) = tree[relative_path]
|
|
|
|
# If the file is locally modified, set these vars to 0
|
|
# I'm not sure yet why this needs to happen, but it does
|
|
# in order for the file to appear modified and not normal
|
|
blob = self._get_blob_from_file(path)
|
|
if blob.id != blob_id:
|
|
ctime = 0
|
|
mtime = 0
|
|
dev = 0
|
|
ino = 0
|
|
uid = 0
|
|
gid = 0
|
|
size = 0
|
|
|
|
index[relative_path] = (ctime, mtime, dev, ino, mode, uid, gid, size, blob_id, flags)
|
|
else:
|
|
del index[relative_path]
|
|
else:
|
|
if relative_path in tree:
|
|
index[relative_path] = (0, 0, 0, 0, tree[relative_path][0], 0, 0, 0, tree[relative_path][1], 0)
|
|
|
|
self.notify({
|
|
"action": "Unstaged",
|
|
"path": path,
|
|
"mime_type": guess_type(path)[0]
|
|
})
|
|
index.write()
|
|
|
|
def unstage_all(self):
|
|
"""
|
|
Unstage all files so they are not committed or tracked
|
|
|
|
@type paths: list
|
|
@param paths: A list of files
|
|
|
|
"""
|
|
|
|
index = self._get_index()
|
|
for status in self.status():
|
|
abs_path = self.get_absolute_path(status.path)
|
|
if os.path.isfile(abs_path):
|
|
self.unstage(abs_path)
|
|
|
|
def get_staged(self):
|
|
"""
|
|
Gets a list of files that are staged
|
|
|
|
"""
|
|
|
|
staged = []
|
|
tree = self._get_tree_at_head()
|
|
index = self._get_index()
|
|
|
|
if len(tree) > 0:
|
|
for item in index.changes_from_tree(self.repo.object_store, tree.id):
|
|
((old_name, new_name), (old_mode, new_mode), (old_sha, new_sha)) = item
|
|
|
|
if new_name:
|
|
staged.append(new_name)
|
|
if old_name and old_name != new_name:
|
|
staged.append(old_name)
|
|
else:
|
|
for path in index:
|
|
staged.append(path)
|
|
|
|
return staged
|
|
|
|
def is_staged(self, path, staged_files=None):
|
|
"""
|
|
Determines if the specified path is staged
|
|
|
|
@type path: string
|
|
@param path: A file path
|
|
|
|
@rtype boolean
|
|
|
|
"""
|
|
|
|
if not staged_files:
|
|
staged_files = self.get_staged()
|
|
|
|
relative_path = self.get_relative_path(path)
|
|
return (relative_path in staged_files)
|
|
|
|
def branch(self, name, commit_sha=None, track=False):
|
|
"""
|
|
Create a new branch
|
|
|
|
@type name: string
|
|
@param name: The name of the new branch
|
|
|
|
@type commit_sha: string
|
|
@param commit_sha: A commit sha to branch from. If None, branches
|
|
from head
|
|
|
|
@type track: boolean
|
|
@param track: Whether or not to track the new branch, or just create it
|
|
|
|
"""
|
|
|
|
cmd = ["git", "branch"]
|
|
if track:
|
|
cmd.append("-t")
|
|
|
|
if commit_sha is None:
|
|
commit_sha = self.repo.head()
|
|
|
|
cmd += [name, commit_sha]
|
|
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
|
|
def branch_delete(self, name):
|
|
"""
|
|
Delete a branch
|
|
|
|
@type name: string
|
|
@param name: The name of the branch
|
|
|
|
"""
|
|
|
|
ref_name = "refs/heads/%s" % name
|
|
refs = self.repo.get_refs()
|
|
if ref_name in refs:
|
|
if self.is_tracking(ref_name):
|
|
self.track("refs/heads/master")
|
|
|
|
del self.repo.refs[ref_name]
|
|
|
|
def branch_rename(self, old_name, new_name):
|
|
"""
|
|
Rename a branch
|
|
|
|
@type old_name: string
|
|
@param old_name: The name of the branch to be renamed
|
|
|
|
@type new_name: string
|
|
@param new_name: The name of the new branch
|
|
|
|
"""
|
|
|
|
old_ref_name = "refs/heads/%s" % old_name
|
|
new_ref_name = "refs/heads/%s" % new_name
|
|
refs = self.repo.get_refs()
|
|
if old_ref_name in refs:
|
|
self.repo.refs[new_ref_name] = self.repo.refs[old_ref_name]
|
|
if self.is_tracking(old_ref_name):
|
|
self.track(new_ref_name)
|
|
|
|
del self.repo.refs[old_ref_name]
|
|
|
|
def branch_list(self, commit_sha=None):
|
|
"""
|
|
List all branches
|
|
|
|
"""
|
|
"""
|
|
refs = self.repo.get_refs()
|
|
branches = []
|
|
for ref,branch_sha in refs.items():
|
|
if ref.startswith("refs/heads"):
|
|
branch = Branch(ref[11:], branch_sha, self.repo[branch_sha])
|
|
branches.append(branch)
|
|
|
|
return branches
|
|
"""
|
|
cmd = ["git", "branch", "-lv", "--no-abbrev", "-a"]
|
|
if commit_sha:
|
|
cmd += ["--contains", commit_sha]
|
|
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
|
|
branches = []
|
|
for line in stdout:
|
|
if not line:
|
|
continue
|
|
|
|
components = line.split()
|
|
if components[0] != "*":
|
|
components.insert(0, "")
|
|
tracking = components.pop(0) == "*" and True or False
|
|
if components[0] == "(no":
|
|
name = components.pop(0) + " " + components.pop(0)
|
|
elif components[0] == "(HEAD":
|
|
continue # Detached head is not a branch.
|
|
else:
|
|
name = components.pop(0)
|
|
revision = components.pop(0)
|
|
message = " ".join(components)
|
|
|
|
branches.append({
|
|
"tracking": tracking,
|
|
"name": name,
|
|
"revision": revision,
|
|
"message": message
|
|
})
|
|
|
|
return branches
|
|
|
|
def checkout(self, paths=[], revision="HEAD"):
|
|
"""
|
|
Checkout a series of paths from a tree or commit. If no tree or commit
|
|
information is given, it will check out the files from head. If no
|
|
paths are given, all files will be checked out from head.
|
|
|
|
@type paths: list
|
|
@param paths: A list of files to checkout
|
|
|
|
@type revision: string
|
|
@param revision: The sha or branch to checkout
|
|
|
|
"""
|
|
|
|
if len(paths) == 1 and paths[0] == self.repo.path:
|
|
paths = []
|
|
|
|
cmd = ["git", "checkout", "-m", revision] + paths
|
|
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
|
|
|
|
def clone(self, host, path, bare=False, origin="origin"):
|
|
"""
|
|
Clone a repository
|
|
|
|
@type host: string
|
|
@param host: The url of the git repository
|
|
|
|
@type path: string
|
|
@param path: The path to clone to
|
|
|
|
@type bare: boolean
|
|
@param bare: Create a bare repository or not
|
|
|
|
@type origin: string
|
|
@param origin: Specify the origin of the repository
|
|
|
|
"""
|
|
|
|
self.numberOfCommandStages = 3
|
|
|
|
more = ["-o", "origin","--progress"]
|
|
if bare:
|
|
more.append("--bare")
|
|
|
|
base_dir = os.path.split(path)[0]
|
|
|
|
cmd = ["git", "clone", host, path] + more
|
|
|
|
isUsername = False
|
|
isPassword = False
|
|
self.modifiedHost = host
|
|
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=base_dir, notify=self.notify_and_parse_progress, cancel=self.get_cancel()).execute()
|
|
|
|
if stdout[1].find('could not read Username') > -1:
|
|
# Prompt for username if it does not exist in the url.
|
|
isUsername, originalRemoteUrl = self.promptUsername(self.modifiedHost)
|
|
|
|
# Prompt for password if a username exists in the remote url without a password.
|
|
isPassword, originalRemoteUrl2 = self.promptPassword(self.modifiedHost)
|
|
elif stdout[1].find('could not read Password') > -1:
|
|
# Prompt for password if a username exists in the remote url without a password.
|
|
isPassword, originalRemoteUrl = self.promptPassword(self.modifiedHost)
|
|
|
|
if isUsername == True or isPassword == True:
|
|
# Update the cmd with the username and password.
|
|
cmd = ["git", "clone", self.modifiedHost, path] + more
|
|
|
|
# Try again.
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=base_dir, notify=self.notify_and_parse_progress, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
|
|
# If we prompted for a username or password then it will now be written to the config. Remove it now before continuing.
|
|
if isUsername == True or isPassword == True:
|
|
# Load new config.
|
|
self.repo = dulwich.repo.Repo(path)
|
|
self._load_config()
|
|
|
|
# Write original url back to config.
|
|
self._config_set("remote \"origin\"", "url", host)
|
|
self.config.write_to_path()
|
|
|
|
def _get_gpg_sig(self):
|
|
try:
|
|
config_user_signingkey = S(self._config_get_glb(("user", ), "signingkey"))
|
|
if config_user_signingkey == "":
|
|
raise KeyError()
|
|
except KeyError:
|
|
return None
|
|
|
|
try:
|
|
config_commit_gpgsign = S(self._config_get_glb(("commit", ), "gpgsign"))
|
|
if config_commit_gpgsign == "" or config_commit_gpgsign.lower() != "true":
|
|
raise KeyError()
|
|
return config_user_signingkey
|
|
except KeyError:
|
|
return None
|
|
|
|
def commit(self, message, parents=None, committer=None, commit_time=None,
|
|
commit_timezone=None, author=None, author_time=None,
|
|
author_timezone=None, encoding=None, commit_all=False):
|
|
"""
|
|
Commit staged files to the local repository
|
|
|
|
@type message: string
|
|
@param message: The log message
|
|
|
|
@type parents: list
|
|
@param parents: A list of parent SHAs. Defaults to head.
|
|
|
|
@type committer: string
|
|
@param committer: The person committing. Defaults to
|
|
"user.name <user.email>"
|
|
|
|
@type commit_time: int
|
|
@param commit_time: The commit time. Defaults to time.time()
|
|
|
|
@type commit_timezone: int
|
|
@param commit_timezone: The commit timezone.
|
|
Defaults to local timezone.
|
|
|
|
@type author: string
|
|
@param author: The author of the file changes. Defaults to
|
|
"user.name <user.email>"
|
|
|
|
@type author_time: int
|
|
@param author_time: The author time. Defaults to time.time()
|
|
|
|
@type author_timezone: int
|
|
@param author_timezone: The author timezone.
|
|
Defaults to commit timezone.
|
|
|
|
@type encoding: string
|
|
@param encoding: The encoding of the commit. Defaults to UTF-8.
|
|
|
|
@type commit_all: boolean
|
|
@param commit_all: Stage all changed files before committing
|
|
|
|
"""
|
|
if commit_all:
|
|
self.stage_all()
|
|
|
|
initial_commit = False
|
|
|
|
if encoding is None:
|
|
encoding = ENCODING
|
|
|
|
if commit_timezone is None:
|
|
commit_timezone = helper.utc_offset()
|
|
|
|
config_user_signingkey = self._get_gpg_sig()
|
|
|
|
commit_id = None
|
|
if config_user_signingkey is not None:
|
|
self.notify("GPG Signing Key: "+S(config_user_signingkey))
|
|
commit_id = self.repo.do_commit(**helper.to_bytes({
|
|
"message": message,
|
|
"committer": committer,
|
|
"commit_timestamp": commit_time,
|
|
"commit_timezone": commit_timezone,
|
|
"author": author,
|
|
"author_timestamp": author_time,
|
|
"author_timezone": author_timezone,
|
|
"encoding": encoding,
|
|
"merge_heads": parents,
|
|
"sign": str(config_user_signingkey)}, encoding))
|
|
else:
|
|
commit_id = self.repo.do_commit(**helper.to_bytes({
|
|
"message": message,
|
|
"committer": committer,
|
|
"commit_timestamp": commit_time,
|
|
"commit_timezone": commit_timezone,
|
|
"author": author,
|
|
"author_timestamp": author_time,
|
|
"author_timezone": author_timezone,
|
|
"encoding": encoding,
|
|
"merge_heads": parents}, encoding))
|
|
|
|
branch_full = self.repo.refs.read_ref(b"HEAD")
|
|
|
|
if branch_full is not None:
|
|
branch_components = re.search(b"refs/heads/(.+)", branch_full)
|
|
|
|
if (branch_components != None):
|
|
branch = branch_components.group(1)
|
|
|
|
self.notify("[%s] -> %s" % (S(commit_id), S(branch)))
|
|
self.notify("To branch: " + S(branch))
|
|
|
|
#Print tree changes.
|
|
#dulwich.patch.write_tree_diff(sys.stdout, self.repo.object_store, commit.tree, commit.id)
|
|
|
|
return commit_id
|
|
|
|
def remove(self, paths):
|
|
"""
|
|
Remove path from the repository. Also deletes the local file.
|
|
|
|
@type paths: list
|
|
@param paths: A list of paths to remove
|
|
|
|
"""
|
|
|
|
if isinstance(paths, (str, six.text_type)):
|
|
paths = [paths]
|
|
|
|
index = self._get_index()
|
|
|
|
for path in paths:
|
|
relative_path = self.get_relative_path(path)
|
|
if relative_path in index:
|
|
del index[relative_path]
|
|
os.remove(path)
|
|
|
|
index.write()
|
|
|
|
def move(self, source, dest):
|
|
"""
|
|
Move a file within the repository
|
|
|
|
@type source: string
|
|
@param source: The source file
|
|
|
|
@type dest: string
|
|
@param dest: The destination. If dest exists as a directory, source
|
|
will be added as a child. Otherwise, source will be renamed to
|
|
dest.
|
|
"""
|
|
|
|
index = self._get_index()
|
|
relative_source = self.get_relative_path(source)
|
|
relative_dest = self.get_relative_path(dest)
|
|
|
|
# Get a list of affected files so we can update the index
|
|
source_files = []
|
|
if os.path.isdir(source):
|
|
for name in index:
|
|
name = name.decode(self.UTF8)
|
|
if name.startswith(relative_source):
|
|
source_files.append(name)
|
|
else:
|
|
source_files.append(relative_source)
|
|
|
|
# Rename the affected index entries
|
|
for source_file in source_files:
|
|
new_path = source_file.replace(relative_source, relative_dest)
|
|
if os.path.isdir(dest):
|
|
new_path = os.path.join(new_path, os.path.basename(source_file))
|
|
|
|
source_file = source_file.encode(self.UTF8)
|
|
index[new_path.encode(self.UTF8)] = index[source_file]
|
|
del index[source_file]
|
|
|
|
index.write()
|
|
|
|
# Actually move the file/folder
|
|
shutil.move(source, dest)
|
|
|
|
def pull(self, repository="origin", refspec="master", options=None):
|
|
"""
|
|
Fetch objects from a remote repository and merge with the local
|
|
repository
|
|
|
|
@type repository: string
|
|
@param repository: The name of the repository
|
|
|
|
@type refspec: string
|
|
@param refspec: The branch name to pull from
|
|
|
|
"""
|
|
self.numberOfCommandStages = 2
|
|
|
|
cmd = ["git", "pull", "--progress"]
|
|
|
|
if options != None:
|
|
if options.count("rebase"):
|
|
cmd.append("--rebase")
|
|
|
|
if options.count("all"):
|
|
cmd.append("--all")
|
|
else:
|
|
cmd.append (repository)
|
|
cmd.append (refspec)
|
|
|
|
# Setup the section name in the config for the remote target.
|
|
remoteKey = "remote \"" + repository + "\""
|
|
isUsername = False
|
|
isPassword = False
|
|
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify_and_parse_git_push, cancel=self.get_cancel()).execute()
|
|
if stdout[0].find('could not read Username') > -1:
|
|
# Prompt for username if it does not exist in the url.
|
|
isUsername, originalRemoteUrl = self.promptUsername(remoteKey)
|
|
|
|
# Prompt for password if a username exists in the remote url without a password.
|
|
isPassword, originalRemoteUrl2 = self.promptPassword(remoteKey)
|
|
elif stdout[0].find('could not read Password') > -1:
|
|
# Prompt for password if a username exists in the remote url without a password.
|
|
isPassword, originalRemoteUrl = self.promptPassword(remoteKey)
|
|
|
|
if isUsername == True or isPassword == True:
|
|
# Try again.
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify_and_parse_git_push, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
|
|
# If we prompted for a password and write it to the config, remove it now before continuing.
|
|
if isUsername == True or isPassword == True:
|
|
# Write original url back to config.
|
|
self._config_set(remoteKey, "url", originalRemoteUrl)
|
|
self.config.write_to_path()
|
|
|
|
def push(self, repository="origin", refspec="master", tags=True, force_with_lease=False):
|
|
"""
|
|
Push objects from the local repository into the remote repository
|
|
and merge them.
|
|
|
|
@type repository: string
|
|
@param repository: The name of the repository
|
|
|
|
@type refspec: string
|
|
@param refspec: The branch name to pull from
|
|
|
|
@type tags: boolean
|
|
@param tags: True to include tags in push, False to omit
|
|
|
|
"""
|
|
|
|
self.numberOfCommandStages = 2
|
|
|
|
cmd = ["git", "push", "--progress"]
|
|
if tags:
|
|
cmd.extend(["--tags"])
|
|
if force_with_lease:
|
|
cmd.extend(["--force-with-lease"])
|
|
cmd.extend([repository, refspec])
|
|
|
|
# Setup the section name in the config for the remote target.
|
|
remoteKey = "remote \"" + repository + "\""
|
|
isUsername = False
|
|
isPassword = False
|
|
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify_and_parse_git_push, cancel=self.get_cancel()).execute()
|
|
if stdout[0].find('could not read Username') > -1:
|
|
# Prompt for username if it does not exist in the url.
|
|
isUsername, originalRemoteUrl = self.promptUsername(remoteKey)
|
|
|
|
# Prompt for password if a username exists in the remote url without a password.
|
|
isPassword, originalRemoteUrl2 = self.promptPassword(remoteKey)
|
|
elif stdout[0].find('could not read Password') > -1:
|
|
# Prompt for password if a username exists in the remote url without a password.
|
|
isPassword, originalRemoteUrl = self.promptPassword(remoteKey)
|
|
|
|
if isUsername == True or isPassword == True:
|
|
# Try again.
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify_and_parse_git_push, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
|
|
# If we prompted for a password and write it to the config, remove it now before continuing.
|
|
if isUsername == True or isPassword == True:
|
|
# Write original url back to config.
|
|
self._config_set(remoteKey, "url", originalRemoteUrl)
|
|
self.config.write_to_path()
|
|
|
|
def onUsername(self, window, username, remoteKey, originalRemoteUrl, isOk):
|
|
if isOk == True:
|
|
if username == "":
|
|
six.moves.tkinter_messagebox.showinfo("Error", "Please enter a username.", parent=window)
|
|
return
|
|
else:
|
|
# Insert password into url.
|
|
newRemoteUrl = originalRemoteUrl.replace("://", "://" + username + "@")
|
|
|
|
if remoteKey.find("://") == -1:
|
|
# Write url temporarily back to config.
|
|
self._config_set(remoteKey, "url", newRemoteUrl)
|
|
self.config.write_to_path()
|
|
else:
|
|
# Change the url in memory, since we don't have a config yet.
|
|
self.modifiedHost = newRemoteUrl
|
|
|
|
# Close dialog.
|
|
window.destroy()
|
|
|
|
def onPassword(self, window, password, remoteKey, originalRemoteUrl, isOk):
|
|
if isOk == True:
|
|
if password == "":
|
|
six.moves.tkinter_messagebox.showinfo("Error", "Please enter a password.", parent=window)
|
|
return
|
|
else:
|
|
# Insert password into url.
|
|
newRemoteUrl = originalRemoteUrl.replace("@", ":" + password + "@")
|
|
|
|
if remoteKey.find("://") == -1:
|
|
# Write url temporarily back to config.
|
|
self._config_set(remoteKey, "url", newRemoteUrl)
|
|
self.config.write_to_path()
|
|
else:
|
|
# Change the url in memory, since we don't have a config yet.
|
|
self.modifiedHost = newRemoteUrl
|
|
|
|
# Close dialog.
|
|
window.destroy()
|
|
|
|
def promptUsername(self, remoteKey):
|
|
"""
|
|
If the github url contains no username, prompt for one and write the url back to the config.
|
|
Note, we'll set the url back to its original (without the password) after the call completes.
|
|
https://user@github.com/path/repositoryName.git
|
|
"""
|
|
isUsername = False
|
|
originalRemoteUrl = remoteKey
|
|
self.modifiedHost = originalRemoteUrl
|
|
|
|
if remoteKey.find("://") == -1:
|
|
# Get existing url from config, otherwise just use what was provided (the url from cloning, etc).
|
|
originalRemoteUrl = S(self._config_get(remoteKey, "url"))
|
|
|
|
if originalRemoteUrl.find('@') == -1:
|
|
# No username or password. Prompt for both. Create dialog.
|
|
window = six.moves.tkinter.Tk()
|
|
|
|
window.title("Please enter your username")
|
|
window.resizable(0,0)
|
|
window["padx"] = 40
|
|
window["pady"] = 20
|
|
textFrame = six.moves.tkinter.Frame(window)
|
|
|
|
# Create textbox label.
|
|
entryLabel = six.moves.tkinter.Label(textFrame)
|
|
entryLabel["text"] = "Username:"
|
|
entryLabel.pack(side=six.moves.tkinter.LEFT)
|
|
|
|
# Create textbox.
|
|
entryWidget = six.moves.tkinter.Entry(textFrame)
|
|
entryWidget["width"] = 25
|
|
entryWidget.bind("<Return>", (lambda event: self.onUsername(window, entryWidget.get(), remoteKey, originalRemoteUrl, True)))
|
|
entryWidget.bind("<KP_Enter>", (lambda event: self.onUsername(window, entryWidget.get(), remoteKey, originalRemoteUrl, True)))
|
|
entryWidget.pack(side=six.moves.tkinter.LEFT)
|
|
entryWidget.focus();
|
|
|
|
textFrame.pack()
|
|
|
|
# Create OK button.
|
|
button = six.moves.tkinter.Button(window, width=5, text="OK", command = (lambda: self.onUsername(window, entryWidget.get(), remoteKey, originalRemoteUrl, True)))
|
|
button.pack(side=six.moves.tkinter.RIGHT)
|
|
|
|
# Create Cancel button.
|
|
button = six.moves.tkinter.Button(window, width=5, text="Cancel", command = (lambda: self.onUsername(window, entryWidget.get(), remoteKey, originalRemoteUrl, False)))
|
|
button.pack(side=six.moves.tkinter.RIGHT)
|
|
|
|
# Position window in center of screen.
|
|
self.center(window)
|
|
|
|
# Show dialog.
|
|
window.mainloop()
|
|
|
|
isUsername = True
|
|
|
|
return isUsername, originalRemoteUrl
|
|
|
|
def promptPassword(self, remoteKey):
|
|
"""
|
|
If a username exists in the github url without a password, prompt the user and write the url back to the config.
|
|
Note, we'll set the url back to its original (without the password) after the call completes.
|
|
https://user@github.com/path/repositoryName.git
|
|
"""
|
|
isPassword = False
|
|
originalRemoteUrl = remoteKey
|
|
self.modifiedHost = originalRemoteUrl
|
|
|
|
if remoteKey.find("://") == -1:
|
|
# Get existing url from config, otherwise just use what was provided (the url from cloning, etc).
|
|
originalRemoteUrl = S(self._config_get(remoteKey, "url"))
|
|
|
|
# If the url contains a username (@) without a password (:), then prompt for a password.
|
|
if originalRemoteUrl.find('@') > -1 and originalRemoteUrl.rfind(':') <= 5:
|
|
# Prompt for password. Create dialog.
|
|
window = six.moves.tkinter.Tk()
|
|
|
|
window.title("Please enter your password")
|
|
window.resizable(0,0)
|
|
window["padx"] = 40
|
|
window["pady"] = 20
|
|
textFrame = six.moves.tkinter.Frame(window)
|
|
|
|
# Create textbox label.
|
|
entryLabel = six.moves.tkinter.Label(textFrame)
|
|
entryLabel["text"] = "Password:"
|
|
entryLabel.pack(side=six.moves.tkinter.LEFT)
|
|
|
|
# Create textbox.
|
|
entryWidget = six.moves.tkinter.Entry(textFrame)
|
|
entryWidget["show"] = "*"
|
|
entryWidget["width"] = 25
|
|
entryWidget.bind("<Return>", (lambda event: self.onPassword(window, entryWidget.get(), remoteKey, originalRemoteUrl, True)))
|
|
entryWidget.bind("<KP_Enter>", (lambda event: self.onPassword(window, entryWidget.get(), remoteKey, originalRemoteUrl, True)))
|
|
entryWidget.pack(side=six.moves.tkinter.LEFT)
|
|
entryWidget.focus();
|
|
|
|
textFrame.pack()
|
|
|
|
# Create OK button.
|
|
button = six.moves.tkinter.Button(window, width=5, text="OK", command = (lambda: self.onPassword(window, entryWidget.get(), remoteKey, originalRemoteUrl, True)))
|
|
button.pack(side=six.moves.tkinter.RIGHT)
|
|
|
|
# Create Cancel button.
|
|
button = six.moves.tkinter.Button(window, width=5, text="Cancel", command = (lambda: self.onPassword(window, entryWidget.get(), remoteKey, originalRemoteUrl, False)))
|
|
button.pack(side=six.moves.tkinter.RIGHT)
|
|
|
|
# Position window in center of screen.
|
|
self.center(window)
|
|
|
|
# Show dialog.
|
|
window.mainloop()
|
|
|
|
isPassword = True
|
|
|
|
return isPassword, originalRemoteUrl
|
|
|
|
def fetch(self, repository, branch=None):
|
|
"""
|
|
Fetch objects from a remote repository. This will not merge the files
|
|
into the local working copy, use pull for that.
|
|
|
|
@type repository: string
|
|
@param repository: The git repository from which to fetch
|
|
|
|
@type branch: string
|
|
@param branch: The git branch from which to fetch
|
|
"""
|
|
|
|
cmd = ["git", "fetch", repository]
|
|
if branch:
|
|
cmd.append(branch)
|
|
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
|
|
def fetch_all(self):
|
|
cmd = ["git", "fetch", "--all"]
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
|
|
def merge(self, branch):
|
|
cmd = ["git", "merge", branch]
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
|
|
def remote_add(self, name, host):
|
|
"""
|
|
Add a remote repository
|
|
|
|
@type name: string
|
|
@param name: The name to give to the remote repository
|
|
|
|
@type host: string
|
|
@param host: The git url to add
|
|
|
|
"""
|
|
|
|
cmd = ["git", "remote", "add", name, host]
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
|
|
def remote_rename(self, current_name, new_name):
|
|
"""
|
|
Rename a remote repository
|
|
|
|
@type current_name: string
|
|
@param current_name: The current name of the repository
|
|
|
|
@type new_name: string
|
|
@param new_name: The name to give to the remote repository
|
|
|
|
"""
|
|
|
|
cmd = ["git", "remote", "rename", current_name, new_name]
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
|
|
def remote_set_url(self, name, url):
|
|
"""
|
|
Change a remote repository's url
|
|
|
|
@type name: string
|
|
@param name: The name of the repository
|
|
|
|
@type url: string
|
|
@param url: The url for the repository
|
|
|
|
"""
|
|
|
|
cmd = ["git", "remote", "set-url", name, url]
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
|
|
def remote_delete(self, name):
|
|
"""
|
|
Remove a remote repository
|
|
|
|
@type name: string
|
|
@param name: The name of the remote repository to remove
|
|
|
|
"""
|
|
|
|
cmd = ["git", "remote", "rm", name]
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
|
|
def remote_list(self):
|
|
"""
|
|
Return a list of the remote repositories
|
|
|
|
@rtype list
|
|
@return A list of dicts with keys: remote, url, fetch
|
|
|
|
"""
|
|
|
|
cmd = ["git", "remote", "-v"]
|
|
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
stdout = []
|
|
|
|
returner = []
|
|
for line in stdout:
|
|
components = line.split()
|
|
if components:
|
|
name = components[0]
|
|
host = components[1]
|
|
|
|
add = True
|
|
for item in returner:
|
|
if item["name"] == name:
|
|
add = False
|
|
|
|
if add:
|
|
returner.append({
|
|
"name": name,
|
|
"host": host
|
|
})
|
|
|
|
return returner
|
|
|
|
def tag(self, name, message, revision="HEAD"):
|
|
"""
|
|
Create a tag object
|
|
|
|
@type name: string
|
|
@param name: The name to give the tag
|
|
|
|
@type message: string
|
|
@param message: A log message
|
|
|
|
@type revision: string
|
|
@param revision: The revision to tag. Defaults to HEAD
|
|
|
|
"""
|
|
config_user_signingkey = self._get_gpg_sig()
|
|
|
|
if config_user_signingkey is not None:
|
|
self.notify("GPG Signing Key: "+S(config_user_signingkey))
|
|
if hasattr(dulwich.porcelain,'tag'):
|
|
dulwich.porcelain.tag(self.repo, name, objectish=revision, message=message, sign=str(config_user_signingkey))
|
|
else:
|
|
dulwich.porcelain.tag_create(self.repo, name, objectish=revision, message=message, sign=str(config_user_signingkey))
|
|
else:
|
|
if hasattr(dulwich.porcelain,'tag'):
|
|
dulwich.porcelain.tag(self.repo, name, objectish=revision, message=message)
|
|
else:
|
|
dulwich.porcelain.tag_create(self.repo, name, objectish=revision, message=message)
|
|
|
|
def tag_delete(self, name):
|
|
"""
|
|
Delete a tag
|
|
|
|
@type name: string
|
|
@param name: The name of the tag to delete
|
|
|
|
"""
|
|
|
|
ref_name = S("refs/tags/%s" % name).bytes()
|
|
refs = self.repo.get_refs()
|
|
if ref_name in refs:
|
|
del self.repo.refs[ref_name]
|
|
|
|
def tag_list(self):
|
|
"""
|
|
Return a list of Tag objects
|
|
|
|
"""
|
|
|
|
refs = self.repo.get_refs()
|
|
|
|
tags = []
|
|
for ref, tag_sha in list(refs.items()):
|
|
if S(ref).startswith("refs/tags"):
|
|
if type(self.repo[tag_sha]) == dulwich.objects.Commit:
|
|
tag = CommitTag(ref[10:], tag_sha, self.repo[tag_sha])
|
|
else:
|
|
tag = Tag(tag_sha, self.repo[tag_sha])
|
|
tags.append(tag)
|
|
|
|
return tags
|
|
|
|
def status_porcelain(self, path):
|
|
if os.path.isdir(path):
|
|
(files, directories) = self._read_directory_tree(path)
|
|
else:
|
|
files = [self.get_relative_path(path)]
|
|
directories = []
|
|
|
|
files_hash = {}
|
|
for file in files:
|
|
files_hash[file] = True
|
|
|
|
cmd = ["git", "status", "--porcelain", path]
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
|
|
statuses = []
|
|
modified_files = []
|
|
for line in stdout:
|
|
components = RE_STATUS.match(line)
|
|
if components:
|
|
status = components.group(1)
|
|
strip_status = status.strip()
|
|
path = self.string_unescape(components.group(2))
|
|
if path[0] == '"' and path[-1] == '"':
|
|
path = path[1:-1]
|
|
|
|
if status == " D":
|
|
statuses.append(MissingStatus(path))
|
|
elif any(c in strip_status for c in ["M", "R", "U"]):
|
|
statuses.append(ModifiedStatus(path))
|
|
elif strip_status in ["A", "C"]:
|
|
statuses.append(AddedStatus(path))
|
|
elif strip_status == "D":
|
|
statuses.append(RemovedStatus(path))
|
|
elif strip_status == "??":
|
|
statuses.append(UntrackedStatus(path))
|
|
|
|
modified_files.append(path)
|
|
try:
|
|
del files_hash[path]
|
|
except Exception as e:
|
|
pass
|
|
|
|
# Determine untracked directories
|
|
cmd = ["git", "clean", "-nd", self.repo.path]
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
|
|
untracked_directories = []
|
|
for line in stdout:
|
|
components = re.match("^(Would remove)\s(.*?)$", line)
|
|
if components:
|
|
untracked_path = components.group(2)
|
|
if untracked_path[-1]=='/':
|
|
untracked_directories.append(untracked_path[:-1])
|
|
|
|
#Determine the ignored files and directories in Repo
|
|
cmd = ["git", "clean", "-ndX", self.repo.path]
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
ignored_directories=[]
|
|
for line in stdout:
|
|
components = re.match("^(Would remove)\s(.*?)$", line)
|
|
if components:
|
|
ignored_path=components.group(2)
|
|
if ignored_path[-1]=='/':
|
|
ignored_directories.append(ignored_path[:-1])
|
|
next
|
|
statuses.append(IgnoredStatus(ignored_path))
|
|
self.ignored_paths.append(ignored_path)
|
|
try:
|
|
del files_hash[ignored_path]
|
|
except Exception as e:
|
|
pass
|
|
for file,data in list(files_hash.items()):
|
|
ignore_file=False
|
|
untracked_file=False
|
|
for ignored_path in ignored_directories:
|
|
if S(ignored_path) in file:
|
|
ignore_file=True
|
|
break
|
|
for untracked_path in untracked_directories:
|
|
if S(untracked_path) in file:
|
|
untracked_file=True
|
|
break
|
|
if untracked_file==True:
|
|
statuses.append(UntrackedStatus(file))
|
|
if ignore_file==True:
|
|
self.ignored_paths.append(file)
|
|
elif ignore_file==True:
|
|
statuses.append(IgnoredStatus(file))
|
|
self.ignored_paths.append(file)
|
|
else:
|
|
statuses.append(NormalStatus(file))
|
|
|
|
# Determine status of folders based on child contents
|
|
for d in directories:
|
|
d_status = NormalStatus(d)
|
|
|
|
# Check if directory is untracked or a sub-directory of an untracked directory
|
|
for untracked_path in untracked_directories:
|
|
if untracked_path in d:
|
|
d_status = UntrackedStatus(d)
|
|
break
|
|
|
|
dirPattern = "/%s/" % d
|
|
if len(d) == 0:
|
|
dirPattern = "/"
|
|
|
|
# Check if directory includes modified files
|
|
for file in modified_files:
|
|
if ("/%s" % file).startswith(dirPattern): # fix, when file startwith same prefix as directory, fix status for root repo path ""
|
|
d_status = ModifiedStatus(d)
|
|
break
|
|
|
|
# Check if directory is ignored
|
|
for ignored_path in ignored_directories:
|
|
if ignored_path in d:
|
|
d_status = IgnoredStatus(d)
|
|
break
|
|
statuses.append(d_status)
|
|
|
|
return statuses
|
|
|
|
def status_dulwich(self, path):
|
|
tree = self._get_tree_index()
|
|
index = self._get_index()
|
|
|
|
if os.path.isdir(path):
|
|
(files, directories) = self._read_directory_tree(path)
|
|
else:
|
|
files = [self.get_relative_path(path)]
|
|
directories = []
|
|
|
|
files_hash = {}
|
|
for file in files:
|
|
files_hash[file] = True
|
|
|
|
statuses = []
|
|
# Calculate statuses for files in the current HEAD
|
|
modified_files = []
|
|
for name in tree:
|
|
try:
|
|
if index[name]:
|
|
inIndex = True
|
|
except Exception as e:
|
|
inIndex = False
|
|
|
|
if inIndex:
|
|
absolute_path = self.get_absolute_path(name)
|
|
if os.path.isfile(absolute_path):
|
|
# Cached, determine if modified or not
|
|
blob = self._get_blob_from_file(absolute_path)
|
|
if blob.id == tree[name][1]:
|
|
statuses.append(NormalStatus(name))
|
|
else:
|
|
modified_files.append(name)
|
|
statuses.append(ModifiedStatus(name))
|
|
else:
|
|
modified_files.append(name)
|
|
statuses.append(MissingStatus(name))
|
|
else:
|
|
modified_files.append(name)
|
|
statuses.append(RemovedStatus(name))
|
|
|
|
try:
|
|
del files_hash[name]
|
|
except Exception as e:
|
|
pass
|
|
|
|
# Calculate statuses for untracked files
|
|
for name,data in list(files_hash.items()):
|
|
try:
|
|
inTreeIndex = tree[name]
|
|
except Exception as e:
|
|
inTreeIndex = False
|
|
|
|
try:
|
|
inIndex = index[name]
|
|
except Exception as e:
|
|
inIndex = False
|
|
|
|
if inIndex and not inTreeIndex:
|
|
modified_files.append(name)
|
|
statuses.append(AddedStatus(name))
|
|
continue
|
|
|
|
# Generate a list of appropriate ignore patterns
|
|
patterns = []
|
|
path_to_check = os.path.dirname(self.get_absolute_path(name))
|
|
while path_to_check != self.repo.path:
|
|
patterns += self.get_ignore_patterns_from_file(self.get_local_ignore_file(path_to_check))
|
|
path_to_check = os.path.split(path_to_check)[0]
|
|
|
|
patterns += self.get_ignore_patterns_from_file(self.get_local_ignore_file(self.repo.path))
|
|
patterns += self.global_ignore_patterns
|
|
|
|
if not self._ignore_file(patterns, os.path.basename(name)):
|
|
statuses.append(UntrackedStatus(name))
|
|
else:
|
|
self.ignored_paths.append(name)
|
|
|
|
# Determine status of folders based on child contents
|
|
for d in directories:
|
|
d_status = NormalStatus(d)
|
|
|
|
for file in modified_files:
|
|
if os.path.join(d, os.path.basename(file)) == file:
|
|
d_status = ModifiedStatus(d)
|
|
break
|
|
|
|
statuses.append(d_status)
|
|
|
|
return statuses
|
|
|
|
def get_all_ignore_file_paths(self, path):
|
|
return self.ignored_paths
|
|
|
|
|
|
def status(self, path):
|
|
# TODO - simply get this from the status implementation / avoid global state
|
|
self.ignored_paths = []
|
|
|
|
return self.status_porcelain(path)
|
|
|
|
def log(self, path="", skip=0, limit=None, revision="", showtype="all"):
|
|
|
|
cmd = ["git", "--no-pager", "log", "--numstat", "--parents", "--pretty=fuller",
|
|
"--date-order", "--date=default", "-m"]
|
|
|
|
if showtype == "all":
|
|
cmd.append("--all")
|
|
|
|
if limit:
|
|
cmd.append("-%s" % limit)
|
|
if skip:
|
|
cmd.append("--skip=%s" % skip)
|
|
if revision:
|
|
if showtype=="push":
|
|
cmd.append("%s.." % revision)
|
|
else:
|
|
cmd.append(revision)
|
|
|
|
if path == self.repo.path:
|
|
path = ""
|
|
if path:
|
|
cmd += ["--", path]
|
|
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
return []
|
|
|
|
revisions = []
|
|
revision = {}
|
|
changed_file = {}
|
|
pattern_from = re.compile(r' \(from (.*)\)')
|
|
last_commitId = ""
|
|
for line in stdout:
|
|
if line == "":
|
|
continue
|
|
|
|
if line[0:6] == "commit":
|
|
match = pattern_from.search(line)
|
|
commit_line = re.sub(" \(from.*\)","", line).split(" ")
|
|
fromPath = ""
|
|
if match:
|
|
fromPath = match.group(1)
|
|
if revision:
|
|
if "changed_paths" not in revision:
|
|
revision["changed_paths"] = {}
|
|
if last_commitId != commit_line[1]:
|
|
revisions.append(revision)
|
|
revision = {}
|
|
else:
|
|
del revision["message"]
|
|
|
|
|
|
if len(fromPath) > 0:
|
|
if "changed_paths" not in revision:
|
|
revision["changed_paths"] =[]
|
|
changed_file = {
|
|
"additions": "-",
|
|
"removals": "-",
|
|
"path": "Diff with parent : %s " % fromPath
|
|
}
|
|
revision["changed_paths"].append(changed_file)
|
|
|
|
changed_file = {}
|
|
revision["commit"] = commit_line[1]
|
|
last_commitId = revision["commit"]
|
|
revision["parents"] = []
|
|
for parent in commit_line[2:]:
|
|
revision["parents"].append(parent)
|
|
elif line[0:7] == "Author:":
|
|
revision["author"] = line[7:].strip()
|
|
elif line[0:11] == "AuthorDate:":
|
|
revision["author_date"] = line[11:].strip()
|
|
elif line[0:7] == "Commit:":
|
|
revision["committer"] = line[7:].strip()
|
|
elif line[0:11] == "CommitDate:":
|
|
revision["commit_date"] = line[11:].strip()
|
|
elif line[0:4] == " ":
|
|
message = line[4:]
|
|
if "message" not in revision:
|
|
revision["message"] = ""
|
|
else:
|
|
revision["message"] += "\n"
|
|
|
|
revision["message"] = revision["message"] + message
|
|
elif line[0].isdigit() or line[0] in "-":
|
|
file_line = line.split("\t")
|
|
if "changed_paths" not in revision:
|
|
revision["changed_paths"] = []
|
|
|
|
if len(file_line) == 3:
|
|
changed_file = {
|
|
"additions": file_line[0],
|
|
"removals": file_line[1],
|
|
"path": self.string_unescape(file_line[2])
|
|
}
|
|
if changed_file['path'][0] == '"' and changed_file['path'][-1] == '"':
|
|
changed_file['path'] = changed_file['path'][1:-1]
|
|
revision["changed_paths"].append(changed_file)
|
|
|
|
if revision:
|
|
revisions.append(revision)
|
|
|
|
return revisions
|
|
|
|
def annotate(self, path, revision_obj="HEAD"):
|
|
"""
|
|
Returns an annotation for a specified file
|
|
|
|
@type path: string
|
|
@param path: The absolute path to a tracked file
|
|
|
|
@type revision: string
|
|
@param revision: HEAD or a sha1 hash
|
|
|
|
"""
|
|
|
|
relative_path = self.get_relative_path(path)
|
|
|
|
cmd = ["git", "annotate", "-l", revision_obj, relative_path]
|
|
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
stdout = []
|
|
|
|
returner = []
|
|
for line in stdout:
|
|
components = re.split("\t", line, 3)
|
|
if len(components) < 4:
|
|
continue
|
|
|
|
dt = datetime(*time.strptime(components[2][:-6],"%Y-%m-%d %H:%M:%S")[:-2])
|
|
|
|
message = components[3].split(")", 1)
|
|
code = message[1]
|
|
if len(components) == 5:
|
|
code = components[4]
|
|
|
|
returner.append({
|
|
"revision": components[0],
|
|
"author": components[1][1:],
|
|
"date": dt,
|
|
"line": code,
|
|
"number": message[0]
|
|
})
|
|
|
|
return returner
|
|
|
|
def show(self, path, revision_obj):
|
|
"""
|
|
Returns a particular file at a given revision object.
|
|
|
|
@type path: string
|
|
@param path: The absolute path to a file
|
|
|
|
@type revision_obj: git.Revision()
|
|
@param revision_obj: The revision object for path
|
|
|
|
"""
|
|
if not revision_obj:
|
|
revision_obj = "HEAD"
|
|
|
|
relative_path = self.get_relative_path(path)
|
|
|
|
cmd = ["git", "show", "%s:%s" % (revision_obj, relative_path)]
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
stdout = []
|
|
|
|
return "\n".join(stdout)
|
|
|
|
def diff(self, path1, revision_obj1, path2=None, revision_obj2=None, summarize=False):
|
|
"""
|
|
Returns the diff between the path(s)/revision(s)
|
|
|
|
@type path1: string
|
|
@param path1: The absolute path to a file
|
|
|
|
@type revision_obj1: git.Revision()
|
|
@param revision_obj1: The revision object for path1
|
|
|
|
@type path2: string
|
|
@param path2: The absolute path to a file
|
|
|
|
@type revision_obj2: git.Revision()
|
|
@param revision_obj2: The revision object for path2
|
|
|
|
"""
|
|
relative_path1 = None
|
|
relative_path2 = None
|
|
if path1:
|
|
relative_path1 = self.get_relative_path(path1)
|
|
if path2:
|
|
relative_path2 = self.get_relative_path(path2)
|
|
|
|
cmd = ["git", "diff"]
|
|
|
|
if summarize:
|
|
cmd.append("--name-status")
|
|
|
|
if revision_obj1:
|
|
cmd += [revision_obj1]
|
|
if revision_obj2 and path2:
|
|
cmd += [revision_obj2]
|
|
if relative_path1:
|
|
cmd += [relative_path1]
|
|
if relative_path2 and relative_path2 != relative_path1:
|
|
cmd += [relative_path2]
|
|
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
stdout = []
|
|
|
|
return ''.join(x + "\n" for x in stdout)
|
|
|
|
def diff_summarize(self, path1, revision_obj1, path2=None, revision_obj2=None):
|
|
results = self.diff(path1, revision_obj1, path2, revision_obj2, True)
|
|
summary = []
|
|
for line in results.split("\n"):
|
|
if not line:
|
|
continue
|
|
|
|
(action, path) = (line + "\t").split("\t")[:2]
|
|
summary.append({
|
|
"action": action,
|
|
"path": path
|
|
})
|
|
|
|
return summary
|
|
|
|
def export(self, path, dest_path, revision):
|
|
"""
|
|
Exports a file or directory from a given revision
|
|
|
|
@type path: string
|
|
@param path: The source file/folder to export
|
|
|
|
@type dest_path: string
|
|
@param dest_path: The path to put the exported file(s)
|
|
|
|
@type revision: string
|
|
@param revision: The revision/tree/commit of the source file being exported
|
|
|
|
"""
|
|
|
|
tmp_file = get_tmp_path("rabbitvcs-git-export.tar")
|
|
cmd1 = ["git", "archive", "--format", "tar", "-o", tmp_file, revision, path]
|
|
cmd2 = ["tar", "-xf", tmp_file, "-C", dest_path]
|
|
|
|
mkdir_p(dest_path)
|
|
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd1, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
(status, stdout, stderr) = GittyupCommand(cmd2, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
stdout = []
|
|
|
|
self.notify("%s at %s exported to %s" % (path, revision, dest_path))
|
|
return "\n".join(stdout)
|
|
|
|
def clean(self, path, remove_dir=True, remove_ignored_too=False,
|
|
remove_only_ignored=False, dry_run=False, force=True):
|
|
|
|
cmd = ["git", "clean"]
|
|
if remove_dir:
|
|
cmd.append("-d")
|
|
|
|
if remove_ignored_too:
|
|
cmd.append("-x")
|
|
|
|
if remove_only_ignored:
|
|
cmd.append("-X")
|
|
|
|
if dry_run:
|
|
cmd.append("-n")
|
|
|
|
if force:
|
|
cmd.append("-f")
|
|
|
|
relative_path = self.get_relative_path(path)
|
|
cmd.append(relative_path)
|
|
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
return
|
|
|
|
def reset(self, path, revision, type=None):
|
|
relative_path = self.get_relative_path(path)
|
|
|
|
cmd = ["git", "reset"]
|
|
if type:
|
|
cmd.append("--%s" % type)
|
|
|
|
cmd.append(revision)
|
|
if relative_path:
|
|
cmd.append(relative_path)
|
|
|
|
try:
|
|
(status, stdout, stderr) = GittyupCommand(cmd, cwd=self.repo.path, notify=self.notify, cancel=self.get_cancel()).execute()
|
|
except GittyupCommandError as e:
|
|
self.callback_notify(e)
|
|
return
|
|
|
|
def set_callback_notify(self, func):
|
|
self.callback_notify = func
|
|
|
|
def set_callback_progress_update(self, func):
|
|
self.callback_progress_update = func
|
|
|
|
def set_callback_get_user(self, func):
|
|
self.callback_get_user = func
|
|
|
|
def set_callback_get_cancel(self, func):
|
|
self.callback_get_cancel = func
|
|
|
|
def notify(self, data):
|
|
self.callback_notify(data)
|
|
|
|
def notify_and_parse_progress(self, data):
|
|
# When progress is requested to a git command, it will
|
|
# respond with the current operation, and that operations current progress
|
|
# in the following format: "<Command>: <percentage>% (<pieces compeated>/<num pieces>)".
|
|
#
|
|
# When a command has reached 100% the format of this final message assumes the formatt:
|
|
# "<Command>: 100% (<num pieces>/<num pieces>), <total size> <unit>, done."
|
|
|
|
|
|
returnData = {"action":"","path":"","mime_type":""}
|
|
|
|
#print "parsing message: " + str(data)
|
|
|
|
# If data is already a dict, we'll assume it's already been parsed, and return.
|
|
if isinstance (data, dict):
|
|
self.notify (data);
|
|
return
|
|
|
|
# Is this an error?
|
|
message_components = re.search("^([eE]rror|[fF]atal): (.+)", data)
|
|
|
|
if message_components != None:
|
|
returnData["action"] = "Error"
|
|
returnData["path"] = message_components.group(2)
|
|
self.notify (returnData)
|
|
return
|
|
|
|
# Check to see if this is a remote command.
|
|
remote_check = re.search("^(remote: )(.+)$", data)
|
|
|
|
if remote_check != None:
|
|
returnData["action"] = "Remote"
|
|
message = remote_check.group(2)
|
|
|
|
else:
|
|
message = data
|
|
|
|
# First, we'll test to see if this is a progress notification.
|
|
if "%" not in message:
|
|
# No, this is just a regular message.
|
|
# Some messages have a strage tendancy to append a non-printable character,
|
|
# followed by a right square brace and a capitol "K". This tests for, and
|
|
# strips these superfluous characters.
|
|
message_components = re.search("^(.+).\[K", message)
|
|
if message_components != None:
|
|
returnData["path"] = message_components.group(1)
|
|
else:
|
|
returnData["path"] = message
|
|
|
|
self.notify (returnData)
|
|
return
|
|
|
|
# Extract the percentage, which will be all numerals directly
|
|
# prior to '%'.
|
|
message_components = re.search("^(.+): +([0-9]+)%", message)
|
|
|
|
if message_components == None:
|
|
print("Error: failed to parse git string: " + data)
|
|
return
|
|
|
|
fraction = float(message_components.group(2)) / 100 # Convert percentage to fraction.
|
|
current_action = message_components.group(1)
|
|
|
|
# If we're at 0%, then we want to notify which action we're performing.
|
|
if fraction == 0:
|
|
returnData["path"] = current_action
|
|
self.notify(returnData)
|
|
|
|
#print "stage fraction: " + str (fraction)
|
|
|
|
# If we're using a number of stages, adjust the fraction acordingly.
|
|
if self.numberOfCommandStages > 0:
|
|
fraction = (self.numberOfCommandStagesExecuted + fraction) / self.numberOfCommandStages
|
|
|
|
# If we've finished the current stage (100%).
|
|
if "done" in message:
|
|
self.numberOfCommandStagesExecuted += 1
|
|
|
|
# If we've registered a callback for progress, update with the new fraction.
|
|
if self.callback_progress_update != None:
|
|
#print "setting pbar: " + str(fraction)
|
|
self.callback_progress_update(fraction)
|
|
|
|
# If we've finished the whole command (all stages).
|
|
if fraction == 1 and "done" in message:
|
|
# Reset stage variables.
|
|
self.numberOfCommandStages = 0
|
|
self.numberOfCommandStagesExecuted = 0
|
|
|
|
def notify_and_parse_git_pull (self, data):
|
|
return_data = {"action":"","path":"","mime_type":""}
|
|
|
|
message_parsed = False
|
|
|
|
# Look for "From" line (e.g. "From ssh://server:22/my_project")
|
|
message_components = re.search("^From (.+)", data)
|
|
|
|
if message_components != None:
|
|
return_data["action"] = "From"
|
|
return_data["path"] = message_components.group(1)
|
|
message_parsed = True
|
|
|
|
# Look for "Branch" line (e.g. "* branch master -> FETCH_HEAD")
|
|
message_components = re.search("\* branch +([A-z0-9]+) +-> (.+)", data)
|
|
|
|
if message_components != None:
|
|
return_data["action"] = "Branch"
|
|
return_data["path"] = message_components.group(1) + " -> " + message_components.group(2)
|
|
message_parsed = True
|
|
|
|
# Look for a file line (e.g. "src/somefile.py | 5 -++++")
|
|
message_components = re.search(" +(.+) +\| *([0-9]+) ([+-]+)", data)
|
|
|
|
if message_components != None:
|
|
return_data["action"] = "Modified"
|
|
return_data["path"] = message_components.group(1)
|
|
return_data["mime_type"] = message_components.group(2) + " " + message_components.group(3)
|
|
message_parsed = True
|
|
|
|
# Look for a updating line (e.g. "Updating ffffff..ffffff")
|
|
message_components = re.search("^Updating ([a-f0-9.]+)", data)
|
|
|
|
if message_components != None:
|
|
return_data["action"] = "Updating"
|
|
return_data["path"] = message_components.group(1)
|
|
message_parsed = True
|
|
|
|
# Look for a "create mode" line (e.g. "create mode 100755 file.py")
|
|
message_components = re.search("create mode ([0-9]+) (.+)", data)
|
|
|
|
if message_components != None:
|
|
return_data["action"] = "Create"
|
|
return_data["path"] = message_components.group(2)
|
|
return_data["mime_type"] = "mode: " + message_components.group(1)
|
|
message_parsed = True
|
|
|
|
# Look for a "delete mode" line (e.g. "create mode 100755 file.py")
|
|
message_components = re.search("delete mode ([0-9]+) (.+)", data)
|
|
|
|
if message_components != None:
|
|
return_data["action"] = "Delete"
|
|
return_data["path"] = message_components.group(2)
|
|
return_data["mime_type"] = "mode: " + message_components.group(1)
|
|
message_parsed = True
|
|
|
|
# Look for an "Auto-merging" line (e.g. "Auto-merging src/file.py")
|
|
message_components = re.search("^Auto-merging (.+)", data)
|
|
|
|
if message_components != None:
|
|
return_data["action"] = "Merging"
|
|
return_data["path"] = message_components.group(1)
|
|
message_parsed = True
|
|
|
|
# Look for a "binary" line (e.g. "icons/file.png" | Bin 0 -> 55555 bytes)
|
|
message_components = re.search("^[ ](.+) +\| Bin ([0-9]+ -> [0-9]+ bytes)", data)
|
|
|
|
if message_components != None:
|
|
return_data["action"] = "Binary"
|
|
return_data["path"] = message_components.group(1)
|
|
return_data["mime_type"] = message_components.group(2)
|
|
message_parsed = True
|
|
|
|
# Look for a "rename" line (e.g. "rename src/{foo.py => bar.py} (50%)")
|
|
message_components = re.search("rename (.+}) \([0-9]+%\)", data)
|
|
|
|
if message_components != None:
|
|
return_data["action"] = "Rename"
|
|
return_data["path"] = message_components.group(1)
|
|
message_parsed = True
|
|
|
|
# Look for a "copy" line (e.g. "copy src/{foo.py => bar.py} (50%)")
|
|
message_components = re.search("copy (.+}) \([0-9]+%\)", data)
|
|
|
|
if message_components != None:
|
|
return_data["action"] = "Copy"
|
|
return_data["path"] = message_components.group(1)
|
|
message_parsed = True
|
|
|
|
# Prepend "Error" to conflict lines. e.g. :
|
|
# CONFLICT (content): Merge conflict in file.py.
|
|
# Automatic merge failed; fix conflicts and then commit the result.
|
|
message_components = re.search("^CONFLICT \(|Automatic merge failed", data)
|
|
|
|
if message_components != None:
|
|
return_data["action"] = "Error"
|
|
return_data["path"] = data
|
|
message_parsed = True
|
|
|
|
if message_parsed == False:
|
|
return_data = data
|
|
|
|
self.notify_and_parse_progress (return_data)
|
|
|
|
def notify_and_parse_git_push (self, data):
|
|
return_data = {"action":"","path":"","mime_type":""}
|
|
|
|
message_parsed = False
|
|
|
|
# Look for to line. e.g. "To gitosis@server.org:project.git". Exclude any
|
|
# lines that include a space (as this could be a message about something else)
|
|
message_components = re.search("^To ([^ ]+$)", data)
|
|
|
|
if message_components != None:
|
|
return_data["action"] = "To"
|
|
return_data["path"] = message_components.group(1)
|
|
message_parsed = True
|
|
|
|
# Look for "new branch" line. e.g. " * [new branch] master -> master"
|
|
message_components = re.search("^ \* \[new branch\] +(.+) -> (.+)", data)
|
|
|
|
if message_components != None:
|
|
return_data["action"] = "New Branch"
|
|
return_data["path"] = message_components.group(1) + " -> " + message_components.group(2)
|
|
message_parsed = True
|
|
|
|
# Look for "rejected" line. e.g. " ![rejected] master -> master (non-fast-forward)".
|
|
message_components = re.search("!\[rejected\] +(.+)", data)
|
|
|
|
if message_components != None:
|
|
return_data["action"] = "Rejected"
|
|
return_data["path"] = message_components.group(1)
|
|
message_parsed = True
|
|
|
|
if message_parsed == False:
|
|
return_data = data
|
|
|
|
self.notify_and_parse_progress (return_data)
|
|
|
|
def get_cancel(self):
|
|
return self.callback_get_cancel
|
|
|
|
def center(self, window):
|
|
# Temporarily hide the window to avoid update_idletasks() drawing the window in the wrong position.
|
|
window.withdraw()
|
|
|
|
# Update "requested size" from geometry manager.
|
|
window.update_idletasks()
|
|
|
|
x = (window.winfo_screenwidth() - window.winfo_reqwidth()) / 2
|
|
y = (window.winfo_screenheight() - window.winfo_reqheight()) / 2
|
|
window.geometry("+%d+%d" % (x, y))
|
|
|
|
# Draw the window frame immediately after setting correct window position.
|
|
window.deiconify()
|