Coverage for src/markdown_exec/formatters/python.py: 93.10%

44 statements  

« prev     ^ index     » next       coverage.py v7.2.3, created at 2023-04-18 18:20 +0200

1"""Formatter for executing Python code.""" 

2 

3from __future__ import annotations 

4 

5import traceback 

6from collections import defaultdict 

7from functools import partial 

8from io import StringIO 

9from typing import Any 

10 

11from markdown_exec.formatters.base import ExecutionError, base_format 

12from markdown_exec.rendering import code_block 

13 

14_sessions_globals: dict[str, dict] = defaultdict(dict) 

15_sessions_counter: dict[str | None, int] = defaultdict(int) 

16_code_blocks: dict[str, list[str]] = {} 

17 

18 

19def _buffer_print(buffer: StringIO, *texts: str, end: str = "\n", **kwargs: Any) -> None: # noqa: ARG001 

20 buffer.write(" ".join(str(text) for text in texts) + end) 

21 

22 

23def _code_block_id( 

24 id: str | None = None, # noqa: A002 

25 session: str | None = None, 

26 title: str | None = None, 

27) -> str: 

28 _sessions_counter[session] += 1 

29 if id: 

30 code_block_id = f"id {id}" 

31 elif session: 

32 code_block_id = f"session {session}; n{_sessions_counter[session]}" 

33 if title: 33 ↛ 34line 33 didn't jump to line 34, because the condition on line 33 was never true

34 code_block_id = f"{code_block_id}; title {title}" 

35 else: 

36 code_block_id = f"n{_sessions_counter[session]}" 

37 if title: 37 ↛ 38line 37 didn't jump to line 38, because the condition on line 37 was never true

38 code_block_id = f"{code_block_id}; title {title}" 

39 return f"<code block: {code_block_id}>" 

40 

41 

42def _run_python( 

43 code: str, 

44 returncode: int | None = None, # noqa: ARG001 

45 session: str | None = None, 

46 id: str | None = None, # noqa: A002 

47 **extra: str, 

48) -> str: 

49 title = extra.get("title", None) 

50 code_block_id = _code_block_id(id, session, title) 

51 _code_blocks[code_block_id] = code.split("\n") 

52 exec_globals = _sessions_globals[session] if session else {} 

53 

54 buffer = StringIO() 

55 exec_globals["print"] = partial(_buffer_print, buffer) 

56 

57 try: 

58 compiled = compile(code, filename=code_block_id, mode="exec") 

59 exec(compiled, exec_globals) # noqa: S102 

60 except Exception as error: # noqa: BLE001 

61 trace = traceback.TracebackException.from_exception(error) 

62 for frame in trace.stack: 

63 if frame.filename.startswith("<code block: "): 

64 frame._line = _code_blocks[frame.filename][frame.lineno - 1] # type: ignore[attr-defined,operator] 

65 raise ExecutionError(code_block("python", "".join(trace.format()), **extra)) from error 

66 return buffer.getvalue() 

67 

68 

69def _format_python(**kwargs: Any) -> str: 

70 return base_format(language="python", run=_run_python, **kwargs)