Coverage for src/markdown_exec/formatters/python.py: 93.10%
44 statements
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-18 18:20 +0200
« prev ^ index » next coverage.py v7.2.3, created at 2023-04-18 18:20 +0200
1"""Formatter for executing Python code."""
3from __future__ import annotations
5import traceback
6from collections import defaultdict
7from functools import partial
8from io import StringIO
9from typing import Any
11from markdown_exec.formatters.base import ExecutionError, base_format
12from markdown_exec.rendering import code_block
14_sessions_globals: dict[str, dict] = defaultdict(dict)
15_sessions_counter: dict[str | None, int] = defaultdict(int)
16_code_blocks: dict[str, list[str]] = {}
19def _buffer_print(buffer: StringIO, *texts: str, end: str = "\n", **kwargs: Any) -> None: # noqa: ARG001
20 buffer.write(" ".join(str(text) for text in texts) + end)
23def _code_block_id(
24 id: str | None = None, # noqa: A002
25 session: str | None = None,
26 title: str | None = None,
27) -> str:
28 _sessions_counter[session] += 1
29 if id:
30 code_block_id = f"id {id}"
31 elif session:
32 code_block_id = f"session {session}; n{_sessions_counter[session]}"
33 if title: 33 ↛ 34line 33 didn't jump to line 34, because the condition on line 33 was never true
34 code_block_id = f"{code_block_id}; title {title}"
35 else:
36 code_block_id = f"n{_sessions_counter[session]}"
37 if title: 37 ↛ 38line 37 didn't jump to line 38, because the condition on line 37 was never true
38 code_block_id = f"{code_block_id}; title {title}"
39 return f"<code block: {code_block_id}>"
42def _run_python(
43 code: str,
44 returncode: int | None = None, # noqa: ARG001
45 session: str | None = None,
46 id: str | None = None, # noqa: A002
47 **extra: str,
48) -> str:
49 title = extra.get("title", None)
50 code_block_id = _code_block_id(id, session, title)
51 _code_blocks[code_block_id] = code.split("\n")
52 exec_globals = _sessions_globals[session] if session else {}
54 buffer = StringIO()
55 exec_globals["print"] = partial(_buffer_print, buffer)
57 try:
58 compiled = compile(code, filename=code_block_id, mode="exec")
59 exec(compiled, exec_globals) # noqa: S102
60 except Exception as error: # noqa: BLE001
61 trace = traceback.TracebackException.from_exception(error)
62 for frame in trace.stack:
63 if frame.filename.startswith("<code block: "):
64 frame._line = _code_blocks[frame.filename][frame.lineno - 1] # type: ignore[attr-defined,operator]
65 raise ExecutionError(code_block("python", "".join(trace.format()), **extra)) from error
66 return buffer.getvalue()
69def _format_python(**kwargs: Any) -> str:
70 return base_format(language="python", run=_run_python, **kwargs)