Coverage for src/yore/_internal/lib.py: 57.31%

258 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-03-19 16:19 +0100

1from __future__ import annotations 

2 

3import json 

4import logging 

5import re 

6import subprocess 

7import sys 

8from dataclasses import dataclass 

9from datetime import date as Date # noqa: N812 

10from datetime import datetime as DateTime # noqa: N812 

11from datetime import timedelta as TimeDelta # noqa: N812 

12from datetime import timezone as TimeZone # noqa: N812 

13from functools import cache 

14from re import Pattern 

15from typing import TYPE_CHECKING, ClassVar, Literal 

16from urllib.request import urlopen 

17 

18from humanize import naturaldelta 

19from packaging.version import Version 

20 

21if TYPE_CHECKING: 

22 from collections.abc import Iterator 

23 from pathlib import Path 

24 

25YoreKind = Literal["bump", "eol", "bol"] 

26"""The supported kinds of Yore comments.""" 

27 

28Scope = Literal["block", "file", "line"] 

29"""The scope of a comment.""" 

30 

31DEFAULT_PREFIX = "YORE" 

32"""The default prefix for Yore comments.""" 

33 

34DEFAULT_EXCLUDE = [".*", "__py*", "build", "dist"] 

35"""The default patterns to exclude when scanning directories.""" 

36 

37_logger = logging.getLogger(__name__) 

38 

39 

40def _indent(line: str) -> int: 

41 return len(line) - len(line.lstrip()) 

42 

43 

44def _block_size(buffer: list[str], start: int) -> int: 

45 size = 0 

46 consecutive_blank = 0 

47 indent = _indent(buffer[start]) 

48 for line in buffer[start:]: 

49 if line.strip(): 

50 line_indent = _indent(line) 

51 if line_indent < indent: 

52 break 

53 if _indent(line) == indent and consecutive_blank: 

54 break 

55 consecutive_blank = 0 

56 else: 

57 consecutive_blank += 1 

58 size += 1 

59 return size - consecutive_blank 

60 

61 

62def _scope_range(replace: Scope, buffer: list[str], start: int) -> tuple[int, int]: 

63 if replace == "line": 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true

64 return start, start + 1 

65 if replace == "block": 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true

66 return start, start + _block_size(buffer, start) 

67 if replace == "file": 67 ↛ 69line 67 didn't jump to line 69 because the condition on line 67 was always true

68 return 0, len(buffer) 

69 raise ValueError(f"Invlid replace scope: {replace}") 

70 

71 

72def _reindent(lines: list[str], indent: int) -> list[str]: 

73 common = min(_indent(line) for line in lines) 

74 new = indent * " " 

75 return [f"{new}{line[common:]}" for line in lines] 

76 

77 

78def _match_to_line(match: re.Match) -> int | None: 

79 if matched_line := match.group("line"): 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 return int(matched_line) 

81 return None 

82 

83 

84def _match_to_lines(match: re.Match) -> list[int] | None: 

85 if matched_lines := match.group("lines"): 

86 lines: list[int] = [] 

87 matched_lines = matched_lines.replace(" ", ",").strip(",") 

88 matched_lines = re.sub(",+", ",", matched_lines) 

89 for line_range in matched_lines.split(","): 

90 if "-" in line_range: 

91 start, end = line_range.split("-") 

92 lines.extend(range(int(start), int(end) + 1)) 

93 else: 

94 lines.append(int(line_range)) 

95 return lines 

96 return None 

97 

98 

99def _match_to_comment(match: re.Match, file: Path, lineno: int) -> YoreComment: 

100 return YoreComment( 

101 file=file, 

102 lineno=lineno, 

103 raw=match.group(0), 

104 prefix=match.group("prefix"), 

105 suffix=match.group("suffix"), 

106 kind=match.group("kind"), 

107 version=match.group("version"), 

108 remove=match.group("remove"), 

109 replace=match.group("replace"), 

110 line=_match_to_line(match), 

111 lines=_match_to_lines(match), 

112 string=match.group("string"), 

113 regex=bool(match.group("regex")), 

114 pattern1=match.group("pattern1"), 

115 pattern2=match.group("pattern2"), 

116 within=match.group("within"), 

117 ) 

118 

119 

120def _within(delta: TimeDelta, of: Date) -> bool: 

121 return DateTime.now(tz=TimeZone.utc).date() >= of - delta 

122 

123 

124def _delta(until: Date) -> TimeDelta: 

125 return until - DateTime.now(tz=TimeZone.utc).date() 

126 

127 

128# DUE: EOL 3.9: Remove block. 

129_dataclass_opts: dict[str, bool] = {} 

130if sys.version_info >= (3, 10): 

131 _dataclass_opts["kw_only"] = True 

132 

133 

134# DUE: EOL 3.9: Replace `**_dataclass_opts` with `kw_only=True` within line. 

135@dataclass(**_dataclass_opts) 

136class YoreComment: 

137 """A Yore comment.""" 

138 

139 file: Path 

140 """The file containing comment.""" 

141 lineno: int 

142 """The line number of the comment.""" 

143 raw: str 

144 """The raw comment.""" 

145 prefix: str 

146 """The prefix of the comment.""" 

147 suffix: str 

148 """The suffix of the comment.""" 

149 kind: YoreKind 

150 """The kind of comment.""" 

151 version: str 

152 """The EOL/bump version.""" 

153 remove: Scope | None = None 

154 """The removal scope.""" 

155 replace: Scope | None = None 

156 """The replacement scope.""" 

157 line: int | None = None 

158 """The line to replace.""" 

159 lines: list[int] | None = None 

160 """The lines to replace.""" 

161 string: str | None = None 

162 """The string to replace.""" 

163 regex: bool = False 

164 """Whether to use regex for replacement.""" 

165 pattern1: str | None = None 

166 """The pattern to replace.""" 

167 pattern2: str | None = None 

168 """The replacement pattern.""" 

169 within: Scope | None = None 

170 """The scope to replace within.""" 

171 

172 @property 

173 def is_bol(self) -> bool: 

174 """Whether the comment is an End of Life comment.""" 

175 return self.kind.lower() == "bol" 

176 

177 @property 

178 def is_eol(self) -> bool: 

179 """Whether the comment is an End of Life comment.""" 

180 return self.kind.lower() == "eol" 

181 

182 @property 

183 def is_bump(self) -> bool: 

184 """Whether the comment is a bump comment.""" 

185 return self.kind.lower() == "bump" 

186 

187 @property 

188 def bol(self) -> Date: 

189 """The Beginning of Life date for the Python version.""" 

190 return python_dates[self.version][0] 

191 

192 @property 

193 def eol(self) -> Date: 

194 """The End of Life date for the Python version.""" 

195 return python_dates[self.version][1] 

196 

197 @property 

198 def comment(self) -> str: 

199 """The comment without the prefix.""" 

200 return self.raw.removeprefix(self.prefix).removesuffix(self.suffix) 

201 

202 def check( 

203 self, 

204 *, 

205 bump: str | None = None, 

206 eol_within: TimeDelta | None = None, 

207 bol_within: TimeDelta | None = None, 

208 ) -> bool: 

209 """Check the comment. 

210 

211 Parameters: 

212 bump: The next version of the project. 

213 eol_within: The time delta to start warning before the End of Life of a Python version. 

214 bol_within: The time delta to start warning before the Beginning of Life of a Python version. 

215 

216 Returns: 

217 True when there is nothing to do, False otherwise. 

218 """ 

219 msg_location = f"{self.file}:{self.lineno}:" 

220 if self.is_eol: 

221 if eol_within and _within(eol_within, self.eol): 

222 _logger.warning(f"{msg_location} in ~{naturaldelta(_delta(self.eol))} {self.comment}") 

223 elif _within(TimeDelta(days=0), self.eol): 

224 _logger.error(f"{msg_location} since {self.eol} {self.comment}") 

225 else: 

226 return True 

227 elif self.is_bol: 

228 if bol_within and _within(bol_within, self.bol): 

229 _logger.warning(f"{msg_location} in ~{naturaldelta(_delta(self.bol))} {self.comment}") 

230 elif _within(TimeDelta(days=0), self.bol): 

231 _logger.error(f"{msg_location} since {self.bol} {self.comment}") 

232 else: 

233 return True 

234 elif self.is_bump and bump and Version(bump) >= Version(self.version): 

235 _logger.error(f"{msg_location} version {self.version} >= {self.comment}") 

236 else: 

237 return True 

238 return False 

239 

240 def fix( 

241 self, 

242 buffer: list[str] | None = None, 

243 *, 

244 bump: str | None = None, 

245 eol_within: TimeDelta | None = None, 

246 bol_within: TimeDelta | None = None, 

247 ) -> bool: 

248 """Fix the comment and code below it. 

249 

250 Parameters: 

251 buffer: The buffer to fix. If not provided, read from and write to the file. 

252 bump: The next version of the project. 

253 eol_within: The time delta to start fixing before the End of Life of a Python version. 

254 bol_within: The time delta to start fixing before the Beginning of Life of a Python version. 

255 

256 Returns: 

257 Whether the comment was fixed. 

258 """ 

259 write = buffer is None 

260 buffer = buffer or self.file.read_text().splitlines(keepends=True) 

261 

262 # Check if the fix should be applied. 

263 if ( 263 ↛ 310line 263 didn't jump to line 310 because the condition on line 263 was always true

264 (self.is_eol and ((eol_within and _within(eol_within, self.eol)) or _within(TimeDelta(days=0), self.eol))) 

265 or ( 

266 self.is_bol and ((bol_within and _within(bol_within, self.bol)) or _within(TimeDelta(days=0), self.bol)) 

267 ) 

268 or (self.is_bump and bump and Version(bump) >= Version(self.version)) 

269 ): 

270 # Start at the commnent line, immediately remove it. 

271 start = self.lineno - 1 

272 del buffer[start] 

273 

274 if self.remove: 274 ↛ 280line 274 didn't jump to line 280 because the condition on line 274 was always true

275 start, end = _scope_range(self.remove, buffer, start) 

276 del buffer[start:end] 

277 if self.remove == "file": 277 ↛ 306line 277 didn't jump to line 306 because the condition on line 277 was always true

278 self.file.unlink() 

279 

280 elif self.replace: 

281 # Line numbers/ranges are relative to block starts, absolute for the "file" scope. 

282 start, end = _scope_range(self.replace, buffer, start) 

283 if self.line: 

284 replacement = [buffer[start + self.line - 1]] 

285 elif self.lines: 

286 replacement = [buffer[start + line] for line in self.lines] 

287 elif self.string: 

288 replacement = [self.string + "\n"] 

289 else: 

290 raise RuntimeError("No replacement specified") 

291 replacement = _reindent(replacement, _indent(buffer[start])) 

292 buffer[start:end] = replacement 

293 

294 elif self.within: 

295 # Line numbers/ranges are relative to block starts, absolute for the "file" scope. 

296 start, end = _scope_range(self.within, buffer, start) 

297 block = buffer[start:end] 

298 if self.regex: 

299 pattern1: Pattern = re.compile(self.pattern1) 

300 replacement = [pattern1.sub(self.pattern2, line) for line in block] 

301 else: 

302 replacement = [line.replace(self.pattern1, self.pattern2) for line in block] # type: ignore[arg-type] 

303 replacement = _reindent(replacement, _indent(buffer[start])) 

304 buffer[start:end] = replacement 

305 

306 if write and buffer: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true

307 self.file.write_text("".join(buffer)) 

308 

309 return True 

310 return False 

311 

312 

313COMMENT_PREFIXES: set[str] = { 

314 r"\#\ ", # Nim, Perl, PHP, Python, R, Ruby, shell, YAML 

315 r"//\ ", # C, C++, Go, Java, Javascript, Rust, Swift 

316 r"--\ ", # Haskell, Lua, SQL 

317 r";", # Lisp, Scheme 

318 r"%\ ", # MATLAB 

319 r"'\ ?", # VBA 

320 r"/\*\ ", # C, C++, Java, Javascript, CSS 

321 r"<!--\ ", # HTML, Markdown, XML 

322 r"\{\#-?\ ", # Jinja 

323 r"\(\*\ ", # OCaml 

324} 

325"""The supported comment prefixes.""" 

326 

327_PATTERN_PREFIX = rf"^(?P<prefix>\s*(?:{'|'.join(sorted(COMMENT_PREFIXES))})PREFIX:\ )" 

328_PATTERN_SUFFIX = r"(?P<suffix>\.?.*)$" 

329 

330COMMENT_PATTERN: str = r""" 

331 (?P<kind>bump|eol)\ (?P<version>[^:]+):\ (?: 

332 remove\ (?P<remove>block|file|line) 

333 | 

334 replace\ (?P<replace>block|file|line)\ with\ (?: 

335 line\ (?P<line>\d+) 

336 | 

337 lines\ (?P<lines>[\d, -]+) 

338 | 

339 `(?P<string>.+)` 

340 ) 

341 | 

342 (?P<regex>regex-)?replace\ `(?P<pattern1>.+)`\ with\ `(?P<pattern2>.*)`\ within\ (?P<within>block|file|line) 

343 ) 

344""" 

345"""The Yore comment pattern, as a regular expression.""" 

346 

347 

348@cache 

349def get_pattern(prefix: str = DEFAULT_PREFIX) -> Pattern: 

350 """Get the Yore comment pattern with a specific prefix. 

351 

352 Parameters: 

353 prefix: The prefix to use in the pattern. 

354 

355 Returns: 

356 The Yore comment pattern. 

357 """ 

358 return re.compile( 

359 _PATTERN_PREFIX.replace("PREFIX", prefix) + COMMENT_PATTERN + _PATTERN_SUFFIX, 

360 re.VERBOSE | re.IGNORECASE, 

361 ) 

362 

363 

364@cache 

365def _get_prematching_pattern(prefix: str = DEFAULT_PREFIX) -> Pattern: 

366 return re.compile(_PATTERN_PREFIX.replace("PREFIX", prefix), re.VERBOSE | re.IGNORECASE) 

367 

368 

369def yield_files(directory: Path, exclude: list[str] | None = None) -> Iterator[Path]: 

370 """Yield all files in a directory.""" 

371 exclude = DEFAULT_EXCLUDE if exclude is None else exclude 

372 _logger.debug(f"{directory}: scanning...") 

373 try: 

374 git_files = subprocess.run( # noqa: S603 

375 ["git", "ls-files", "-z"], # noqa: S607 

376 capture_output=True, 

377 cwd=directory, 

378 text=True, 

379 check=False, 

380 ).stdout 

381 except (FileNotFoundError, subprocess.CalledProcessError): 

382 for path in directory.iterdir(): 

383 if path.is_file(): 

384 yield path 

385 elif path.is_dir() and not any(path.match(pattern) for pattern in exclude): 

386 yield from yield_files(path, exclude=exclude) 

387 else: 

388 for filepath in git_files.strip("\0").split("\0"): 

389 yield directory / filepath 

390 

391 

392def yield_buffer_comments(file: Path, lines: list[str], *, prefix: str = DEFAULT_PREFIX) -> Iterator[YoreComment]: 

393 """Yield all Yore comments in a buffer. 

394 

395 Parameters: 

396 file: The file to check. 

397 lines: The buffer to check (pre-read lines). 

398 prefix: The prefix to look for in the comments. 

399 

400 Yields: 

401 Yore comments. 

402 """ 

403 prepattern = _get_prematching_pattern(prefix) 

404 pattern = get_pattern(prefix) 

405 for lineno, line in enumerate(lines, 1): 405 ↛ exitline 405 didn't return from function 'yield_buffer_comments' because the loop on line 405 didn't complete

406 if prepattern.match(line): 406 ↛ 405line 406 didn't jump to line 405 because the condition on line 406 was always true

407 if match := pattern.match(line): 407 ↛ 410line 407 didn't jump to line 410 because the condition on line 407 was always true

408 yield _match_to_comment(match, file, lineno) 

409 else: 

410 _logger.error(f"{file}:{lineno}: invalid Yore comment") 

411 

412 

413def yield_file_comments(file: Path, *, prefix: str = DEFAULT_PREFIX) -> Iterator[YoreComment]: 

414 """Yield all Yore comments in a file. 

415 

416 Parameters: 

417 file: The file to check. 

418 prefix: The prefix to look for in the comments. 

419 

420 Yields: 

421 Yore comments. 

422 """ 

423 try: 

424 lines = file.read_text().splitlines() 

425 except (OSError, UnicodeDecodeError): 

426 return 

427 yield from yield_buffer_comments(file, lines, prefix=prefix) 

428 

429 

430def yield_directory_comments(directory: Path, *, prefix: str = DEFAULT_PREFIX) -> Iterator[YoreComment]: 

431 """Yield all Yore comments in a directory. 

432 

433 Parameters: 

434 directory: The directory to check. 

435 prefix: The prefix to look for in the comments. 

436 

437 Yields: 

438 Yore comments. 

439 """ 

440 for file in yield_files(directory): 

441 yield from yield_file_comments(file, prefix=prefix) 

442 

443 

444def yield_path_comments(path: Path, *, prefix: str = DEFAULT_PREFIX) -> Iterator[YoreComment]: 

445 """Yield all Yore comments in a file or directory. 

446 

447 Parameters: 

448 path: The file or directory to check. 

449 prefix: The prefix to look for in the comments. 

450 

451 Yields: 

452 Yore comments. 

453 """ 

454 if path.is_dir(): 

455 yield from yield_directory_comments(path, prefix=prefix) 

456 else: 

457 yield from yield_file_comments(path, prefix=prefix) 

458 

459 

460class _LazyPythonDates: 

461 EOL_DATA_URL = "https://raw.githubusercontent.com/python/devguide/main/include/release-cycle.json" 

462 _dates: ClassVar[dict[str, tuple[Date, Date]]] = {} 

463 

464 def __getitem__(self, version: str) -> tuple[Date, Date]: 

465 if not self._dates: 

466 self._fetch() 

467 return self._dates[version] 

468 

469 @staticmethod 

470 def _to_date(date: str) -> Date: 

471 parts = [int(part) for part in date.split("-")] 

472 if len(parts) == 2: # noqa: PLR2004 

473 # Without a day, assume date to be the first of the next month. 

474 year, month = parts 

475 if month == 12: # noqa: PLR2004 

476 month = 1 

477 year += 1 

478 else: 

479 month += 1 

480 day = 1 

481 else: 

482 year, month, day = parts 

483 return Date(year, month, day) 

484 

485 def _fetch(self) -> None: 

486 data = json.loads(urlopen(self.EOL_DATA_URL).read()) # noqa: S310 

487 for version, info in data.items(): 

488 bol_date = self._to_date(info["first_release"]) 

489 eol_date = self._to_date(info["end_of_life"]) 

490 self._dates[version] = (bol_date, eol_date) 

491 

492 

493python_dates = _LazyPythonDates() 

494"""A dictionary of Python versions and their Beginning/End of Life dates."""