Skip to content

feat: If core.autocrlf is enabled, replace CRLF with LF when adding to index #1441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 134 additions & 76 deletions git/index/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@

import git.diff as git_diff
import os.path as osp
from pathlib import Path
from typing import Optional

from .fun import (
entry_key,
Expand Down Expand Up @@ -89,6 +91,40 @@
# ------------------------------------------------------------------------------------


class _FileStore:
"""An utility class that stores original files somewhere and restores them
to the original content at the exit"""

_dir: PathLike

def __init__(self, tmp_dir: Optional[PathLike] = None):

self._file_map: dict[PathLike, PathLike] = {}
self._tmp_dir = tempfile.TemporaryDirectory(prefix=str(tmp_dir))

def __enter__(self):
return self

def __exit__(self, exc, value, tb):
for file, store_file in self._file_map.items():
with open(store_file, "rb") as rf, open(file, "wb") as wf:
for line in rf:
wf.write(line)
Path(store_file).unlink()
self._dir.rmdir()

@property
def _dir(self) -> Path:
return Path(self._tmp_dir.name)

def save(self, file: PathLike) -> None:
store_file = self._dir / tempfile.mktemp()
self._file_map[file] = store_file
with open(store_file, "wb") as wf, open(file, "rb") as rf:
for line in rf:
wf.write(line)


__all__ = ("IndexFile", "CheckoutError")


Expand Down Expand Up @@ -611,17 +647,18 @@ def _to_relative_path(self, path: PathLike) -> PathLike:
return os.path.relpath(path, self.repo.working_tree_dir)

def _preprocess_add_items(
self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, "Submodule"]]
self, items: Sequence[Union[PathLike, Blob, BaseIndexEntry, "Submodule"]], file_store: _FileStore
) -> Tuple[List[PathLike], List[BaseIndexEntry]]:
"""Split the items into two lists of path strings and BaseEntries."""
paths = []
entries = []
# if it is a string put in list
if isinstance(items, str):
if isinstance(items, (str, os.PathLike)):
items = [items]

for item in items:
if isinstance(item, str):
if isinstance(item, (str, os.PathLike)):
self._autocrlf(item, file_store)
paths.append(self._to_relative_path(item))
elif isinstance(item, (Blob, Submodule)):
entries.append(BaseIndexEntry.from_blob(item))
Expand All @@ -632,6 +669,30 @@ def _preprocess_add_items(
# END for each item
return paths, entries

def _autocrlf(self, file: PathLike, file_store: _FileStore) -> None:
"""If the config option `autocrlf` is True, replace CRLF with LF"""

reader = self.repo.config_reader()

autocrlf = reader.get_value("core", "autocrlf", False)

if not autocrlf:
return

file_store.save(file)

with tempfile.TemporaryFile("wb+") as tf:
with open(file, "rb") as f:
for line in f:
line = line.replace(b"\r\n", b"\n")
tf.write(line)

tf.seek(0)

with open(file, "wb") as f:
for line in tf:
f.write(line)

def _store_path(self, filepath: PathLike, fprogress: Callable) -> BaseIndexEntry:
"""Store file at filepath in the database and return the base index entry
Needs the git_working_dir decorator active ! This must be assured in the calling code"""
Expand Down Expand Up @@ -802,82 +863,79 @@ def add(
Objects that do not have a null sha will be added even if their paths
do not exist.
"""
# sort the entries into strings and Entries, Blobs are converted to entries
# automatically
# paths can be git-added, for everything else we use git-update-index
paths, entries = self._preprocess_add_items(items)
entries_added: List[BaseIndexEntry] = []
# This code needs a working tree, therefore we try not to run it unless required.
# That way, we are OK on a bare repository as well.
# If there are no paths, the rewriter has nothing to do either
if paths:
entries_added.extend(self._entries_for_paths(paths, path_rewriter, fprogress, entries))

# HANDLE ENTRIES
if entries:
null_mode_entries = [e for e in entries if e.mode == 0]
if null_mode_entries:
raise ValueError(
"At least one Entry has a null-mode - please use index.remove to remove files for clarity"
)
# END null mode should be remove

# HANDLE ENTRY OBJECT CREATION
# create objects if required, otherwise go with the existing shas
null_entries_indices = [i for i, e in enumerate(entries) if e.binsha == Object.NULL_BIN_SHA]
if null_entries_indices:

@git_working_dir
def handle_null_entries(self: "IndexFile") -> None:
for ei in null_entries_indices:
null_entry = entries[ei]
new_entry = self._store_path(null_entry.path, fprogress)

# update null entry
entries[ei] = BaseIndexEntry(
(
null_entry.mode,
new_entry.binsha,
null_entry.stage,
null_entry.path,

with _FileStore() as file_store:
# sort the entries into strings and Entries, Blobs are converted to entries
# automatically
# paths can be git-added, for everything else we use git-update-index
paths, entries = self._preprocess_add_items(items, file_store)
entries_added: List[BaseIndexEntry] = []
# This code needs a working tree, therefore we try not to run it unless required.
# That way, we are OK on a bare repository as well.
# If there are no paths, the rewriter has nothing to do either
if paths:
entries_added.extend(self._entries_for_paths(paths, path_rewriter, fprogress, entries))

# HANDLE ENTRIES
if entries:
null_mode_entries = [e for e in entries if e.mode == 0]
if null_mode_entries:
raise ValueError(
"At least one Entry has a null-mode - please use index.remove to remove files for clarity"
)
# END null mode should be remove

# HANDLE ENTRY OBJECT CREATION
# create objects if required, otherwise go with the existing shas
null_entries_indices = [i for i, e in enumerate(entries) if e.binsha == Object.NULL_BIN_SHA]
if null_entries_indices:

@git_working_dir
def handle_null_entries(self: "IndexFile") -> None:
for ei in null_entries_indices:
null_entry = entries[ei]
new_entry = self._store_path(null_entry.path, fprogress)

# update null entry
entries[ei] = BaseIndexEntry(
(null_entry.mode, new_entry.binsha, null_entry.stage, null_entry.path)
)
)
# END for each entry index

# end closure
handle_null_entries(self)
# END null_entry handling

# REWRITE PATHS
# If we have to rewrite the entries, do so now, after we have generated
# all object sha's
if path_rewriter:
for i, e in enumerate(entries):
entries[i] = BaseIndexEntry((e.mode, e.binsha, e.stage, path_rewriter(e)))
# END for each entry index

# end closure
handle_null_entries(self)
# END null_entry handling

# REWRITE PATHS
# If we have to rewrite the entries, do so now, after we have generated
# all object sha's
if path_rewriter:
for i, e in enumerate(entries):
entries[i] = BaseIndexEntry((e.mode, e.binsha, e.stage, path_rewriter(e)))
# END for each entry
# END handle path rewriting

# just go through the remaining entries and provide progress info
for i, entry in enumerate(entries):
progress_sent = i in null_entries_indices
if not progress_sent:
fprogress(entry.path, False, entry)
fprogress(entry.path, True, entry)
# END handle progress
# END for each entry
# END handle path rewriting

# just go through the remaining entries and provide progress info
for i, entry in enumerate(entries):
progress_sent = i in null_entries_indices
if not progress_sent:
fprogress(entry.path, False, entry)
fprogress(entry.path, True, entry)
# END handle progress
# END for each entry
entries_added.extend(entries)
# END if there are base entries

# FINALIZE
# add the new entries to this instance
for entry in entries_added:
self.entries[(entry.path, 0)] = IndexEntry.from_base(entry)

if write:
self.write(ignore_extension_data=not write_extension_data)
# END handle write
entries_added.extend(entries)
# END if there are base entries

return entries_added
# FINALIZE
# add the new entries to this instance
for entry in entries_added:
self.entries[(entry.path, 0)] = IndexEntry.from_base(entry)

if write:
self.write(ignore_extension_data=not write_extension_data)
# END handle write

return entries_added

def _items_to_rela_paths(
self,
Expand Down
53 changes: 51 additions & 2 deletions test/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import os
from stat import S_ISLNK, ST_MODE
import tempfile
from unittest import skipIf
from unittest import mock, skipIf
import shutil

from git import (
Expand All @@ -24,7 +24,11 @@
CheckoutError,
)
from git.compat import is_win
from git.exc import HookExecutionError, InvalidGitRepositoryError
from git.exc import (
HookExecutionError,
InvalidGitRepositoryError
)
from git.index.base import _FileStore
from git.index.fun import hook_path
from git.index.typ import BaseIndexEntry, IndexEntry
from git.objects import Blob
Expand All @@ -37,6 +41,8 @@
import os.path as osp
from git.cmd import Git

from pathlib import Path

HOOKS_SHEBANG = "#!/usr/bin/env sh\n"

is_win_without_bash = is_win and not shutil.which("bash.exe")
Expand Down Expand Up @@ -943,3 +949,46 @@ def test_commit_msg_hook_fail(self, rw_repo):
assert str(err)
else:
raise AssertionError("Should have caught a HookExecutionError")

@with_rw_repo('HEAD')
def test_index_add_pathlike(self, rw_repo):
git_dir = Path(rw_repo.git_dir)

file = git_dir / "file.txt"
file.touch()

rw_repo.index.add(file)

def test_autocrlf(self):
file_store = mock.MagicMock()

with tempfile.TemporaryDirectory() as d:
dummy_file = Path(d) / "dummy.txt"

with open(dummy_file, "w") as f:
f.write("Hello\r\n")

index = self.rorepo.index

index._autocrlf(dummy_file, file_store)

with open(dummy_file, "r") as f:
assert f.read() == "Hello\n"


def test_filestore(tmp_path):
dummy_file = tmp_path / "dummy.txt"

content = "Dummy\n"

with open(dummy_file, "w") as f:
f.write(content)

with _FileStore() as fs:
fs.save(dummy_file)

with open(dummy_file, "w") as f:
f.write(r"Something else\n")

with open(dummy_file, "r") as f:
assert f.read() == content