Coverage for src/yore/_internal/lib.py: 57.31%
258 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-19 16:19 +0100
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-19 16:19 +0100
1from __future__ import annotations
3import json
4import logging
5import re
6import subprocess
7import sys
8from dataclasses import dataclass
9from datetime import date as Date # noqa: N812
10from datetime import datetime as DateTime # noqa: N812
11from datetime import timedelta as TimeDelta # noqa: N812
12from datetime import timezone as TimeZone # noqa: N812
13from functools import cache
14from re import Pattern
15from typing import TYPE_CHECKING, ClassVar, Literal
16from urllib.request import urlopen
18from humanize import naturaldelta
19from packaging.version import Version
21if TYPE_CHECKING:
22 from collections.abc import Iterator
23 from pathlib import Path
25YoreKind = Literal["bump", "eol", "bol"]
26"""The supported kinds of Yore comments."""
28Scope = Literal["block", "file", "line"]
29"""The scope of a comment."""
31DEFAULT_PREFIX = "YORE"
32"""The default prefix for Yore comments."""
34DEFAULT_EXCLUDE = [".*", "__py*", "build", "dist"]
35"""The default patterns to exclude when scanning directories."""
37_logger = logging.getLogger(__name__)
40def _indent(line: str) -> int:
41 return len(line) - len(line.lstrip())
44def _block_size(buffer: list[str], start: int) -> int:
45 size = 0
46 consecutive_blank = 0
47 indent = _indent(buffer[start])
48 for line in buffer[start:]:
49 if line.strip():
50 line_indent = _indent(line)
51 if line_indent < indent:
52 break
53 if _indent(line) == indent and consecutive_blank:
54 break
55 consecutive_blank = 0
56 else:
57 consecutive_blank += 1
58 size += 1
59 return size - consecutive_blank
62def _scope_range(replace: Scope, buffer: list[str], start: int) -> tuple[int, int]:
63 if replace == "line": 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true
64 return start, start + 1
65 if replace == "block": 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true
66 return start, start + _block_size(buffer, start)
67 if replace == "file": 67 ↛ 69line 67 didn't jump to line 69 because the condition on line 67 was always true
68 return 0, len(buffer)
69 raise ValueError(f"Invlid replace scope: {replace}")
72def _reindent(lines: list[str], indent: int) -> list[str]:
73 common = min(_indent(line) for line in lines)
74 new = indent * " "
75 return [f"{new}{line[common:]}" for line in lines]
78def _match_to_line(match: re.Match) -> int | None:
79 if matched_line := match.group("line"): 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true
80 return int(matched_line)
81 return None
84def _match_to_lines(match: re.Match) -> list[int] | None:
85 if matched_lines := match.group("lines"):
86 lines: list[int] = []
87 matched_lines = matched_lines.replace(" ", ",").strip(",")
88 matched_lines = re.sub(",+", ",", matched_lines)
89 for line_range in matched_lines.split(","):
90 if "-" in line_range:
91 start, end = line_range.split("-")
92 lines.extend(range(int(start), int(end) + 1))
93 else:
94 lines.append(int(line_range))
95 return lines
96 return None
99def _match_to_comment(match: re.Match, file: Path, lineno: int) -> YoreComment:
100 return YoreComment(
101 file=file,
102 lineno=lineno,
103 raw=match.group(0),
104 prefix=match.group("prefix"),
105 suffix=match.group("suffix"),
106 kind=match.group("kind"),
107 version=match.group("version"),
108 remove=match.group("remove"),
109 replace=match.group("replace"),
110 line=_match_to_line(match),
111 lines=_match_to_lines(match),
112 string=match.group("string"),
113 regex=bool(match.group("regex")),
114 pattern1=match.group("pattern1"),
115 pattern2=match.group("pattern2"),
116 within=match.group("within"),
117 )
120def _within(delta: TimeDelta, of: Date) -> bool:
121 return DateTime.now(tz=TimeZone.utc).date() >= of - delta
124def _delta(until: Date) -> TimeDelta:
125 return until - DateTime.now(tz=TimeZone.utc).date()
128# DUE: EOL 3.9: Remove block.
129_dataclass_opts: dict[str, bool] = {}
130if sys.version_info >= (3, 10):
131 _dataclass_opts["kw_only"] = True
134# DUE: EOL 3.9: Replace `**_dataclass_opts` with `kw_only=True` within line.
135@dataclass(**_dataclass_opts)
136class YoreComment:
137 """A Yore comment."""
139 file: Path
140 """The file containing comment."""
141 lineno: int
142 """The line number of the comment."""
143 raw: str
144 """The raw comment."""
145 prefix: str
146 """The prefix of the comment."""
147 suffix: str
148 """The suffix of the comment."""
149 kind: YoreKind
150 """The kind of comment."""
151 version: str
152 """The EOL/bump version."""
153 remove: Scope | None = None
154 """The removal scope."""
155 replace: Scope | None = None
156 """The replacement scope."""
157 line: int | None = None
158 """The line to replace."""
159 lines: list[int] | None = None
160 """The lines to replace."""
161 string: str | None = None
162 """The string to replace."""
163 regex: bool = False
164 """Whether to use regex for replacement."""
165 pattern1: str | None = None
166 """The pattern to replace."""
167 pattern2: str | None = None
168 """The replacement pattern."""
169 within: Scope | None = None
170 """The scope to replace within."""
172 @property
173 def is_bol(self) -> bool:
174 """Whether the comment is an End of Life comment."""
175 return self.kind.lower() == "bol"
177 @property
178 def is_eol(self) -> bool:
179 """Whether the comment is an End of Life comment."""
180 return self.kind.lower() == "eol"
182 @property
183 def is_bump(self) -> bool:
184 """Whether the comment is a bump comment."""
185 return self.kind.lower() == "bump"
187 @property
188 def bol(self) -> Date:
189 """The Beginning of Life date for the Python version."""
190 return python_dates[self.version][0]
192 @property
193 def eol(self) -> Date:
194 """The End of Life date for the Python version."""
195 return python_dates[self.version][1]
197 @property
198 def comment(self) -> str:
199 """The comment without the prefix."""
200 return self.raw.removeprefix(self.prefix).removesuffix(self.suffix)
202 def check(
203 self,
204 *,
205 bump: str | None = None,
206 eol_within: TimeDelta | None = None,
207 bol_within: TimeDelta | None = None,
208 ) -> bool:
209 """Check the comment.
211 Parameters:
212 bump: The next version of the project.
213 eol_within: The time delta to start warning before the End of Life of a Python version.
214 bol_within: The time delta to start warning before the Beginning of Life of a Python version.
216 Returns:
217 True when there is nothing to do, False otherwise.
218 """
219 msg_location = f"{self.file}:{self.lineno}:"
220 if self.is_eol:
221 if eol_within and _within(eol_within, self.eol):
222 _logger.warning(f"{msg_location} in ~{naturaldelta(_delta(self.eol))} {self.comment}")
223 elif _within(TimeDelta(days=0), self.eol):
224 _logger.error(f"{msg_location} since {self.eol} {self.comment}")
225 else:
226 return True
227 elif self.is_bol:
228 if bol_within and _within(bol_within, self.bol):
229 _logger.warning(f"{msg_location} in ~{naturaldelta(_delta(self.bol))} {self.comment}")
230 elif _within(TimeDelta(days=0), self.bol):
231 _logger.error(f"{msg_location} since {self.bol} {self.comment}")
232 else:
233 return True
234 elif self.is_bump and bump and Version(bump) >= Version(self.version):
235 _logger.error(f"{msg_location} version {self.version} >= {self.comment}")
236 else:
237 return True
238 return False
240 def fix(
241 self,
242 buffer: list[str] | None = None,
243 *,
244 bump: str | None = None,
245 eol_within: TimeDelta | None = None,
246 bol_within: TimeDelta | None = None,
247 ) -> bool:
248 """Fix the comment and code below it.
250 Parameters:
251 buffer: The buffer to fix. If not provided, read from and write to the file.
252 bump: The next version of the project.
253 eol_within: The time delta to start fixing before the End of Life of a Python version.
254 bol_within: The time delta to start fixing before the Beginning of Life of a Python version.
256 Returns:
257 Whether the comment was fixed.
258 """
259 write = buffer is None
260 buffer = buffer or self.file.read_text().splitlines(keepends=True)
262 # Check if the fix should be applied.
263 if ( 263 ↛ 310line 263 didn't jump to line 310 because the condition on line 263 was always true
264 (self.is_eol and ((eol_within and _within(eol_within, self.eol)) or _within(TimeDelta(days=0), self.eol)))
265 or (
266 self.is_bol and ((bol_within and _within(bol_within, self.bol)) or _within(TimeDelta(days=0), self.bol))
267 )
268 or (self.is_bump and bump and Version(bump) >= Version(self.version))
269 ):
270 # Start at the commnent line, immediately remove it.
271 start = self.lineno - 1
272 del buffer[start]
274 if self.remove: 274 ↛ 280line 274 didn't jump to line 280 because the condition on line 274 was always true
275 start, end = _scope_range(self.remove, buffer, start)
276 del buffer[start:end]
277 if self.remove == "file": 277 ↛ 306line 277 didn't jump to line 306 because the condition on line 277 was always true
278 self.file.unlink()
280 elif self.replace:
281 # Line numbers/ranges are relative to block starts, absolute for the "file" scope.
282 start, end = _scope_range(self.replace, buffer, start)
283 if self.line:
284 replacement = [buffer[start + self.line - 1]]
285 elif self.lines:
286 replacement = [buffer[start + line] for line in self.lines]
287 elif self.string:
288 replacement = [self.string + "\n"]
289 else:
290 raise RuntimeError("No replacement specified")
291 replacement = _reindent(replacement, _indent(buffer[start]))
292 buffer[start:end] = replacement
294 elif self.within:
295 # Line numbers/ranges are relative to block starts, absolute for the "file" scope.
296 start, end = _scope_range(self.within, buffer, start)
297 block = buffer[start:end]
298 if self.regex:
299 pattern1: Pattern = re.compile(self.pattern1)
300 replacement = [pattern1.sub(self.pattern2, line) for line in block]
301 else:
302 replacement = [line.replace(self.pattern1, self.pattern2) for line in block] # type: ignore[arg-type]
303 replacement = _reindent(replacement, _indent(buffer[start]))
304 buffer[start:end] = replacement
306 if write and buffer: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true
307 self.file.write_text("".join(buffer))
309 return True
310 return False
313COMMENT_PREFIXES: set[str] = {
314 r"\#\ ", # Nim, Perl, PHP, Python, R, Ruby, shell, YAML
315 r"//\ ", # C, C++, Go, Java, Javascript, Rust, Swift
316 r"--\ ", # Haskell, Lua, SQL
317 r";", # Lisp, Scheme
318 r"%\ ", # MATLAB
319 r"'\ ?", # VBA
320 r"/\*\ ", # C, C++, Java, Javascript, CSS
321 r"<!--\ ", # HTML, Markdown, XML
322 r"\{\#-?\ ", # Jinja
323 r"\(\*\ ", # OCaml
324}
325"""The supported comment prefixes."""
327_PATTERN_PREFIX = rf"^(?P<prefix>\s*(?:{'|'.join(sorted(COMMENT_PREFIXES))})PREFIX:\ )"
328_PATTERN_SUFFIX = r"(?P<suffix>\.?.*)$"
330COMMENT_PATTERN: str = r"""
331 (?P<kind>bump|eol)\ (?P<version>[^:]+):\ (?:
332 remove\ (?P<remove>block|file|line)
333 |
334 replace\ (?P<replace>block|file|line)\ with\ (?:
335 line\ (?P<line>\d+)
336 |
337 lines\ (?P<lines>[\d, -]+)
338 |
339 `(?P<string>.+)`
340 )
341 |
342 (?P<regex>regex-)?replace\ `(?P<pattern1>.+)`\ with\ `(?P<pattern2>.*)`\ within\ (?P<within>block|file|line)
343 )
344"""
345"""The Yore comment pattern, as a regular expression."""
348@cache
349def get_pattern(prefix: str = DEFAULT_PREFIX) -> Pattern:
350 """Get the Yore comment pattern with a specific prefix.
352 Parameters:
353 prefix: The prefix to use in the pattern.
355 Returns:
356 The Yore comment pattern.
357 """
358 return re.compile(
359 _PATTERN_PREFIX.replace("PREFIX", prefix) + COMMENT_PATTERN + _PATTERN_SUFFIX,
360 re.VERBOSE | re.IGNORECASE,
361 )
364@cache
365def _get_prematching_pattern(prefix: str = DEFAULT_PREFIX) -> Pattern:
366 return re.compile(_PATTERN_PREFIX.replace("PREFIX", prefix), re.VERBOSE | re.IGNORECASE)
369def yield_files(directory: Path, exclude: list[str] | None = None) -> Iterator[Path]:
370 """Yield all files in a directory."""
371 exclude = DEFAULT_EXCLUDE if exclude is None else exclude
372 _logger.debug(f"{directory}: scanning...")
373 try:
374 git_files = subprocess.run( # noqa: S603
375 ["git", "ls-files", "-z"], # noqa: S607
376 capture_output=True,
377 cwd=directory,
378 text=True,
379 check=False,
380 ).stdout
381 except (FileNotFoundError, subprocess.CalledProcessError):
382 for path in directory.iterdir():
383 if path.is_file():
384 yield path
385 elif path.is_dir() and not any(path.match(pattern) for pattern in exclude):
386 yield from yield_files(path, exclude=exclude)
387 else:
388 for filepath in git_files.strip("\0").split("\0"):
389 yield directory / filepath
392def yield_buffer_comments(file: Path, lines: list[str], *, prefix: str = DEFAULT_PREFIX) -> Iterator[YoreComment]:
393 """Yield all Yore comments in a buffer.
395 Parameters:
396 file: The file to check.
397 lines: The buffer to check (pre-read lines).
398 prefix: The prefix to look for in the comments.
400 Yields:
401 Yore comments.
402 """
403 prepattern = _get_prematching_pattern(prefix)
404 pattern = get_pattern(prefix)
405 for lineno, line in enumerate(lines, 1): 405 ↛ exitline 405 didn't return from function 'yield_buffer_comments' because the loop on line 405 didn't complete
406 if prepattern.match(line): 406 ↛ 405line 406 didn't jump to line 405 because the condition on line 406 was always true
407 if match := pattern.match(line): 407 ↛ 410line 407 didn't jump to line 410 because the condition on line 407 was always true
408 yield _match_to_comment(match, file, lineno)
409 else:
410 _logger.error(f"{file}:{lineno}: invalid Yore comment")
413def yield_file_comments(file: Path, *, prefix: str = DEFAULT_PREFIX) -> Iterator[YoreComment]:
414 """Yield all Yore comments in a file.
416 Parameters:
417 file: The file to check.
418 prefix: The prefix to look for in the comments.
420 Yields:
421 Yore comments.
422 """
423 try:
424 lines = file.read_text().splitlines()
425 except (OSError, UnicodeDecodeError):
426 return
427 yield from yield_buffer_comments(file, lines, prefix=prefix)
430def yield_directory_comments(directory: Path, *, prefix: str = DEFAULT_PREFIX) -> Iterator[YoreComment]:
431 """Yield all Yore comments in a directory.
433 Parameters:
434 directory: The directory to check.
435 prefix: The prefix to look for in the comments.
437 Yields:
438 Yore comments.
439 """
440 for file in yield_files(directory):
441 yield from yield_file_comments(file, prefix=prefix)
444def yield_path_comments(path: Path, *, prefix: str = DEFAULT_PREFIX) -> Iterator[YoreComment]:
445 """Yield all Yore comments in a file or directory.
447 Parameters:
448 path: The file or directory to check.
449 prefix: The prefix to look for in the comments.
451 Yields:
452 Yore comments.
453 """
454 if path.is_dir():
455 yield from yield_directory_comments(path, prefix=prefix)
456 else:
457 yield from yield_file_comments(path, prefix=prefix)
460class _LazyPythonDates:
461 EOL_DATA_URL = "https://raw.githubusercontent.com/python/devguide/main/include/release-cycle.json"
462 _dates: ClassVar[dict[str, tuple[Date, Date]]] = {}
464 def __getitem__(self, version: str) -> tuple[Date, Date]:
465 if not self._dates:
466 self._fetch()
467 return self._dates[version]
469 @staticmethod
470 def _to_date(date: str) -> Date:
471 parts = [int(part) for part in date.split("-")]
472 if len(parts) == 2: # noqa: PLR2004
473 # Without a day, assume date to be the first of the next month.
474 year, month = parts
475 if month == 12: # noqa: PLR2004
476 month = 1
477 year += 1
478 else:
479 month += 1
480 day = 1
481 else:
482 year, month, day = parts
483 return Date(year, month, day)
485 def _fetch(self) -> None:
486 data = json.loads(urlopen(self.EOL_DATA_URL).read()) # noqa: S310
487 for version, info in data.items():
488 bol_date = self._to_date(info["first_release"])
489 eol_date = self._to_date(info["end_of_life"])
490 self._dates[version] = (bol_date, eol_date)
493python_dates = _LazyPythonDates()
494"""A dictionary of Python versions and their Beginning/End of Life dates."""