Coverage for src/dependenpy/dsm.py: 83.77%

185 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-09-04 11:35 +0200

1""" 

2dependenpy dsm module. 

3 

4This is the core module of dependenpy. It contains the following classes: 

5 

6- [`DSM`][dependenpy.dsm.DSM]: to create a DSM-capable object for a list of packages, 

7- [`Package`][dependenpy.dsm.Package]: which represents a Python package, 

8- [`Module`][dependenpy.dsm.Module]: which represents a Python module, 

9- [`Dependency`][dependenpy.dsm.Dependency]: which represents a dependency between two modules. 

10""" 

11 

12from __future__ import annotations 

13 

14import ast 

15import json 

16import sys 

17from os import listdir 

18from os.path import isdir, isfile, join, splitext 

19from pathlib import Path 

20from typing import List 

21 

22from dependenpy.finder import Finder, PackageSpec 

23from dependenpy.helpers import PrintMixin 

24from dependenpy.node import LeafNode, NodeMixin, RootNode 

25 

26 

27class DSM(RootNode, NodeMixin, PrintMixin): 

28 """ 

29 DSM-capable class. 

30 

31 Technically speaking, a DSM instance is not a real DSM but more a tree 

32 representing the Python packages structure. However, it has the 

33 necessary methods to build a real DSM in the form of a square matrix, 

34 a dictionary or a tree-map. 

35 """ 

36 

37 def __init__( 

38 self, *packages: str, build_tree: bool = True, build_dependencies: bool = True, enforce_init: bool = True 

39 ): 

40 """ 

41 Initialization method. 

42 

43 Args: 

44 *packages: list of packages to search for. 

45 build_tree: auto-build the tree or not. 

46 build_dependencies: auto-build the dependencies or not. 

47 enforce_init: if True, only treat directories if they contain an `__init__.py` file. 

48 """ 

49 self.base_packages = packages 

50 self.finder = Finder() 

51 self.specs = [] 

52 self.not_found = [] 

53 self.enforce_init = enforce_init 

54 

55 specs = [] 

56 for package in packages: 

57 spec = self.finder.find(package, enforce_init=enforce_init) 

58 if spec: 

59 specs.append(spec) 

60 else: 

61 self.not_found.append(package) 

62 

63 if not specs: 

64 print("** dependenpy: DSM empty.", file=sys.stderr) 

65 

66 self.specs = PackageSpec.combine(specs) 

67 

68 for module in self.not_found: 

69 print(f"** dependenpy: Not found: {module}.", file=sys.stderr) 

70 

71 super().__init__(build_tree) 

72 

73 if build_tree and build_dependencies: 

74 self.build_dependencies() 

75 

76 def __str__(self): 

77 packages_names = ", ".join([package.name for package in self.packages]) 

78 return f"Dependency DSM for packages: [{packages_names}]" 

79 

80 @property 

81 def isdsm(self) -> bool: 

82 """ 

83 Inherited from NodeMixin. Always True. 

84 

85 Returns: 

86 Whether this object is a DSM. 

87 """ 

88 return True 

89 

90 def build_tree(self): 

91 """Build the Python packages tree.""" 

92 for spec in self.specs: 

93 if spec.ismodule: 93 ↛ 94line 93 didn't jump to line 94, because the condition on line 93 was never true

94 self.modules.append(Module(spec.name, spec.path, dsm=self)) 

95 else: 

96 self.packages.append( 

97 Package( 

98 spec.name, 

99 spec.path, 

100 dsm=self, 

101 limit_to=spec.limit_to, 

102 build_tree=True, 

103 build_dependencies=False, 

104 enforce_init=self.enforce_init, 

105 ) 

106 ) 

107 

108 

109class Package(RootNode, LeafNode, NodeMixin, PrintMixin): # noqa: WPS215 

110 """ 

111 Package class. 

112 

113 This class represent Python packages as nodes in a tree. 

114 """ 

115 

116 def __init__( 

117 self, 

118 name: str, 

119 path: str, 

120 dsm: DSM = None, 

121 package: "Package" = None, 

122 limit_to: List[str] = None, 

123 build_tree: bool = True, 

124 build_dependencies: bool = True, 

125 enforce_init: bool = True, 

126 ): 

127 """ 

128 Initialization method. 

129 

130 Args: 

131 name: name of the package. 

132 path: path to the package. 

133 dsm: parent DSM. 

134 package: parent package. 

135 limit_to: list of string to limit the recursive tree-building to what is specified. 

136 build_tree: auto-build the tree or not. 

137 build_dependencies: auto-build the dependencies or not. 

138 enforce_init: if True, only treat directories if they contain an `__init__.py` file. 

139 """ 

140 self.name = name 

141 self.path = path 

142 self.package = package 

143 self.dsm = dsm 

144 self.limit_to = limit_to or [] 

145 self.enforce_init = enforce_init 

146 

147 RootNode.__init__(self, build_tree) # noqa: WPS609 

148 LeafNode.__init__(self) # noqa: WPS609 

149 

150 if build_tree and build_dependencies: 150 ↛ 151line 150 didn't jump to line 151, because the condition on line 150 was never true

151 self.build_dependencies() 

152 

153 @property 

154 def ispackage(self) -> bool: 

155 """ 

156 Inherited from NodeMixin. Always True. 

157 

158 Returns: 

159 Whether this object is a package. 

160 """ 

161 return True 

162 

163 @property 

164 def issubpackage(self) -> bool: 

165 """ 

166 Property to tell if this node is a sub-package. 

167 

168 Returns: 

169 This package has a parent. 

170 """ 

171 return self.package is not None 

172 

173 @property 

174 def isroot(self) -> bool: 

175 """ 

176 Property to tell if this node is a root node. 

177 

178 Returns: 

179 This package has no parent. 

180 """ 

181 return self.package is None 

182 

183 def split_limits_heads(self) -> tuple[list[str], list[str]]: 

184 """ 

185 Return first parts of dot-separated strings, and rest of strings. 

186 

187 Returns: 

188 The heads and rest of the strings. 

189 """ 

190 heads = [] 

191 new_limit_to = [] 

192 for limit in self.limit_to: 192 ↛ 193line 192 didn't jump to line 193, because the loop on line 192 never started

193 if "." in limit: 

194 name, limit = limit.split(".", 1) # noqa: WPS440 

195 heads.append(name) 

196 new_limit_to.append(limit) 

197 else: 

198 heads.append(limit) 

199 return heads, new_limit_to 

200 

201 def build_tree(self): # noqa: WPS231 

202 """Build the tree for this package.""" 

203 for module in listdir(self.path): 

204 abs_m = join(self.path, module) 

205 if isfile(abs_m) and module.endswith(".py"): 

206 name = splitext(module)[0] 

207 if not self.limit_to or name in self.limit_to: 207 ↛ 203line 207 didn't jump to line 203, because the condition on line 207 was never false

208 self.modules.append(Module(name, abs_m, self.dsm, self)) 

209 elif isdir(abs_m): 

210 if isfile(join(abs_m, "__init__.py")) or not self.enforce_init: 

211 heads, new_limit_to = self.split_limits_heads() 

212 if not heads or module in heads: 212 ↛ 203line 212 didn't jump to line 203, because the condition on line 212 was never false

213 self.packages.append( 

214 Package( 

215 module, 

216 abs_m, 

217 self.dsm, 

218 self, 

219 new_limit_to, 

220 build_tree=True, 

221 build_dependencies=False, 

222 enforce_init=self.enforce_init, 

223 ) 

224 ) 

225 

226 def cardinal(self, to) -> int: 

227 """ 

228 Return the number of dependencies of this package to the given node. 

229 

230 Args: 

231 to (Package/Module): target node. 

232 

233 Returns: 

234 Number of dependencies. 

235 """ 

236 return sum(module.cardinal(to) for module in self.submodules) 

237 

238 

239class Module(LeafNode, NodeMixin, PrintMixin): # noqa: WPS338 

240 """ 

241 Module class. 

242 

243 This class represents a Python module (a Python file). 

244 """ 

245 

246 RECURSIVE_NODES = (ast.ClassDef, ast.FunctionDef, ast.If, ast.IfExp, ast.Try, ast.With, ast.ExceptHandler) 

247 

248 def __init__(self, name, path, dsm=None, package=None): 

249 """ 

250 Initialization method. 

251 

252 Args: 

253 name (str): name of the module. 

254 path (str): path to the module. 

255 dsm (DSM): parent DSM. 

256 package (Package): parent Package. 

257 """ 

258 super().__init__() 

259 self.name = name 

260 self.path = path 

261 self.package = package 

262 self.dsm = dsm 

263 self.dependencies = [] 

264 

265 def __contains__(self, item) -> bool: 

266 """ 

267 Whether given item is contained inside this module. 

268 

269 Args: 

270 item (Package/Module): a package or module. 

271 

272 Returns: 

273 True if self is item or item is self's package and 

274 self if an `__init__` module. 

275 """ 

276 if self is item: 

277 return True 

278 elif self.package is item and self.name == "__init__": 

279 return True 

280 return False 

281 

282 @property 

283 def ismodule(self) -> bool: 

284 """ 

285 Inherited from NodeMixin. Always True. 

286 

287 Returns: 

288 Whether this object is a module. 

289 """ 

290 return True 

291 

292 def as_dict(self, absolute: bool = False) -> dict: 

293 """ 

294 Return the dependencies as a dictionary. 

295 

296 Arguments: 

297 absolute: Whether to use the absolute name. 

298 

299 Returns: 

300 dict: dictionary of dependencies. 

301 """ 

302 return { 

303 "name": self.absolute_name() if absolute else self.name, 

304 "path": self.path, 

305 "dependencies": [ 

306 { 

307 # 'source': d.source.absolute_name(), # redundant 

308 "target": dep.target if dep.external else dep.target.absolute_name(), 

309 "lineno": dep.lineno, 

310 "what": dep.what, 

311 "external": dep.external, 

312 } 

313 for dep in self.dependencies 

314 ], 

315 } 

316 

317 def _to_text(self, **kwargs): 

318 indent = kwargs.pop("indent", 2) 

319 base_indent = kwargs.pop("base_indent", None) 

320 if base_indent is None: 320 ↛ 321line 320 didn't jump to line 321, because the condition on line 320 was never true

321 base_indent = indent 

322 indent = 0 

323 text = [" " * indent + self.name + "\n"] 

324 new_indent = indent + base_indent 

325 for dep in self.dependencies: 

326 external = "! " if dep.external else "" 

327 text.append(" " * new_indent + external + str(dep) + "\n") 

328 return "".join(text) 

329 

330 def _to_csv(self, **kwargs): 

331 header = kwargs.pop("header", True) 

332 text = ["module,path,target,lineno,what,external\n" if header else ""] 

333 name = self.absolute_name() 

334 for dep in self.dependencies: 

335 target = dep.target if dep.external else dep.target.absolute_name() 

336 text.append(f"{name},{self.path},{target},{dep.lineno},{dep.what or ''},{dep.external}\n") 

337 return "".join(text) 

338 

339 def _to_json(self, **kwargs): 

340 absolute = kwargs.pop("absolute", False) 

341 return json.dumps(self.as_dict(absolute=absolute), **kwargs) 

342 

343 def build_dependencies(self): 

344 """ 

345 Build the dependencies for this module. 

346 

347 Parse the code with ast, find all the import statements, convert 

348 them into Dependency objects. 

349 """ 

350 highest = self.dsm or self.root 

351 if self is highest: 351 ↛ 352line 351 didn't jump to line 352, because the condition on line 351 was never true

352 highest = LeafNode() 

353 for import_ in self.parse_code(): 

354 target = highest.get_target(import_["target"]) 

355 if target: 

356 what = import_["target"].split(".")[-1] 

357 if what != target.name: 

358 import_["what"] = what 

359 import_["target"] = target 

360 self.dependencies.append(Dependency(source=self, **import_)) 

361 

362 def parse_code(self) -> list[dict]: 

363 """ 

364 Read the source code and return all the import statements. 

365 

366 Returns: 

367 list of dict: the import statements. 

368 """ 

369 code = Path(self.path).read_text(encoding="utf-8") 

370 try: 

371 body = ast.parse(code).body 

372 except SyntaxError: 

373 code = code.encode("utf-8") # type: ignore[assignment] 

374 try: # noqa: WPS505 

375 body = ast.parse(code).body 

376 except SyntaxError: 

377 return [] 

378 return self.get_imports(body) 

379 

380 def get_imports(self, ast_body) -> list[dict]: # noqa: WPS231,WPS615 

381 """ 

382 Return all the import statements given an AST body (AST nodes). 

383 

384 Args: 

385 ast_body (compiled code's body): the body to filter. 

386 

387 Returns: 

388 The import statements. 

389 """ 

390 imports: list[dict] = [] 

391 for node in ast_body: 

392 if isinstance(node, ast.Import): 

393 imports.extend({"target": name.name, "lineno": node.lineno} for name in node.names) 

394 elif isinstance(node, ast.ImportFrom): 

395 for name in node.names: 

396 abs_name = self.absolute_name(self.depth - node.level) + "." if node.level > 0 else "" 

397 node_module = node.module + "." if node.module else "" 

398 name = abs_name + node_module + name.name # type: ignore[assignment] 

399 imports.append({"target": name, "lineno": node.lineno}) 

400 elif isinstance(node, Module.RECURSIVE_NODES): 

401 imports.extend(self.get_imports(node.body)) 

402 if isinstance(node, ast.Try): 

403 imports.extend(self.get_imports(node.finalbody)) 

404 return imports 

405 

406 def cardinal(self, to) -> int: 

407 """ 

408 Return the number of dependencies of this module to the given node. 

409 

410 Args: 

411 to (Package/Module): the target node. 

412 

413 Returns: 

414 Number of dependencies. 

415 """ 

416 return len([dep for dep in self.dependencies if not dep.external and dep.target in to]) 

417 

418 

419class Dependency(object): 

420 """ 

421 Dependency class. 

422 

423 Represent a dependency from a module to another. 

424 """ 

425 

426 def __init__(self, source, lineno, target, what=None): 

427 """ 

428 Initialization method. 

429 

430 Args: 

431 source (Module): source Module. 

432 lineno (int): number of line at which import statement occurs. 

433 target (str/Module/Package): the target node. 

434 what (str): what is imported (optional). 

435 """ 

436 self.source = source 

437 self.lineno = lineno 

438 self.target = target 

439 self.what = what 

440 

441 def __str__(self): 

442 what = f"{self.what or ''} from " 

443 target = self.target if self.external else self.target.absolute_name() 

444 return f"{self.source.name} imports {what}{target} (line {self.lineno})" 

445 

446 @property 

447 def external(self) -> bool: 

448 """ 

449 Property to tell if the dependency's target is a valid node. 

450 

451 Returns: 

452 Whether the dependency's target is a valid node. 

453 """ 

454 return isinstance(self.target, str)