Coverage for src/dependenpy/_internal/structures.py: 47.98%

174 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2025-08-24 18:36 +0200

1from __future__ import annotations 

2 

3import copy 

4import json 

5from typing import TYPE_CHECKING, Any 

6 

7from colorama import Style 

8 

9from dependenpy._internal.helpers import PrintMixin 

10 

11if TYPE_CHECKING: 

12 from dependenpy._internal.dsm import DSM, Module, Package 

13 

14 

15class Matrix(PrintMixin): 

16 """Matrix class. 

17 

18 A class to build a matrix given a list of nodes. After instantiation, 

19 it has two attributes: data, a 2-dimensions array, and keys, the names 

20 of the entities in the corresponding order. 

21 """ 

22 

23 def __init__(self, *nodes: DSM | Package | Module, depth: int = 0): 

24 """Initialization method. 

25 

26 Args: 

27 *nodes: The nodes on which to build the matrix. 

28 depth: The depth of the matrix. This depth is always 

29 absolute, meaning that building a matrix with a sub-package 

30 "A.B.C" and a depth of 1 will return a matrix of size 1, 

31 containing A only. To see the matrix for the sub-modules and 

32 sub-packages in C, you will have to give depth=4. 

33 """ 

34 modules: list[Module] = [] 

35 for node in nodes: 

36 if node.ismodule: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true

37 modules.append(node) # type: ignore[arg-type] 

38 elif node.ispackage or node.isdsm: 38 ↛ 35line 38 didn't jump to line 35 because the condition on line 38 was always true

39 modules.extend(node.submodules) # type: ignore[union-attr] 

40 

41 if depth < 1: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true

42 keys = modules 

43 else: 

44 keys = [] 

45 for module in modules: 

46 if module.depth <= depth: 

47 keys.append(module) 

48 continue 

49 package = module.package 

50 while package.depth > depth and package.package and package not in nodes: # type: ignore[union-attr] 

51 package = package.package # type: ignore[union-attr] 

52 if package not in keys: 

53 keys.append(package) # type: ignore[arg-type] 

54 

55 size = len(keys) 

56 data = [[0] * size for _ in range(size)] 

57 keys = sorted(keys, key=lambda key: key.absolute_name()) 

58 

59 if depth < 1: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true

60 for index, key in enumerate(keys): 

61 key.index = index # type: ignore[attr-defined] 

62 for index, key in enumerate(keys): 

63 for dep in key.dependencies: 

64 if dep.external: 

65 continue 

66 if dep.target.ismodule and dep.target in keys: # type: ignore[union-attr] 

67 data[index][dep.target.index] += 1 # type: ignore[index,union-attr] 

68 elif dep.target.ispackage: # type: ignore[union-attr] 

69 init = dep.target.get("__init__") # type: ignore[union-attr] 

70 if init is not None and init in keys: 

71 data[index][init.index] += 1 # type: ignore[union-attr] 

72 else: 

73 for row, row_key in enumerate(keys): 

74 for col, col_key in enumerate(keys): 

75 data[row][col] = row_key.cardinal(to=col_key) 

76 

77 self.size = size 

78 """The size of the matrix.""" 

79 self.keys = [key.absolute_name() for key in keys] 

80 """The keys of the matrix.""" 

81 self.data = data 

82 """The data of the matrix.""" 

83 

84 @staticmethod 

85 def cast(keys: list[str], data: list[list[int]]) -> Matrix: 

86 """Cast a set of keys and an array to a Matrix object. 

87 

88 Arguments: 

89 keys: The matrix keys. 

90 data: The matrix data. 

91 

92 Returns: 

93 A new matrix. 

94 """ 

95 matrix = Matrix() 

96 matrix.keys = copy.deepcopy(keys) 

97 matrix.data = copy.deepcopy(data) 

98 return matrix 

99 

100 @property 

101 def total(self) -> int: 

102 """Return the total number of dependencies within this matrix. 

103 

104 Returns: 

105 The total number of dependencies. 

106 """ 

107 return sum(cell for line in self.data for cell in line) 

108 

109 def _to_csv(self, **kwargs: Any) -> str: # noqa: ARG002 

110 text = ["module,", ",".join(self.keys)] 

111 for index, key in enumerate(self.keys): 

112 line = ",".join(map(str, self.data[index])) 

113 text.append(f"{key},{line}") 

114 return "\n".join(text) 

115 

116 def _to_json(self, **kwargs: Any) -> str: 

117 return json.dumps({"keys": self.keys, "data": self.data}, **kwargs) 

118 

119 def _to_text(self, **kwargs: Any) -> str: 

120 if not self.keys or not self.data: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true

121 return "" 

122 zero = kwargs.pop("zero", "0") 

123 max_key_length = max(len(key) for key in [*self.keys, "Module"]) 

124 max_dep_length = max([len(str(col)) for line in self.data for col in line] + [len(zero)]) 

125 key_col_length = len(str(len(self.keys))) 

126 key_line_length = max(key_col_length, 2) 

127 column_length = max(key_col_length, max_dep_length) 

128 bold = Style.BRIGHT 

129 reset = Style.RESET_ALL 

130 

131 # first line left headers 

132 text = [f"\n {bold}{'Module':>{max_key_length}}{reset}{bold}{'Id':>{key_line_length}}{reset}"] 

133 # first line column headers 

134 for index, _ in enumerate(self.keys): 

135 text.append(f"{bold}{index:^{column_length}}{reset}") 

136 text.append("\n") 

137 # line of dashes 

138 text.append(f" {'─' * max_key_length}─┼─{'─' * key_line_length}─┼") 

139 for _ in range(len(self.keys) - 1): 

140 text.append(f"{'─' * column_length}") 

141 text.append(f"{'─' * column_length}") 

142 text.append("\n") 

143 # lines 

144 for index, key in enumerate(self.keys): 

145 text.append(f" {key:>{max_key_length}}{bold}{index:>{key_line_length}}{reset}") 

146 for value in self.data[index]: 

147 text.append(f"{value if value else zero:>{column_length}}") 

148 text.append("\n") 

149 text.append("\n") 

150 

151 return "".join(text) 

152 

153 

154class TreeMap(PrintMixin): 

155 """TreeMap class.""" 

156 

157 def __init__(self, *nodes: Any, value: int = -1): # noqa: ARG002 

158 """Initialization method. 

159 

160 Arguments: 

161 *nodes: the nodes from which to build the treemap. 

162 value: the value of the current area. 

163 """ 

164 # if nodes: 

165 # matrix_lower_level = Matrix(*nodes, depth=2) 

166 # matrix_current_level = Matrix(*nodes, depth=1) 

167 # if value == -1: 

168 # value = sum(c for row in matrix_current_level.data for c in row) 

169 # splits = [0] 

170 # key_comp = matrix_lower_level.keys[0].split('.')[0] 

171 # i = 1 

172 # for key in matrix_lower_level.keys[1:]: 

173 # key = key.split('.')[0] 

174 # if key != key_comp: 

175 # splits.append(i) 

176 # key_comp = key 

177 # i += 1 

178 # splits.append(i) 

179 # 

180 # self.data = [] 

181 # for i in range(len(splits) - 1): 

182 # self.data.append([]) 

183 # rows = matrix_lower_level.data[splits[i]:splits[i+1]] 

184 # for j in range(len(splits) - 1): 

185 # self.data[i].append([row[splits[j]:splits[j+1]] for row in rows]) 

186 

187 self.value = value 

188 """The value of the current area.""" 

189 

190 def _to_csv(self, **kwargs: Any) -> str: # noqa: ARG002 

191 return "" 

192 

193 def _to_json(self, **kwargs: Any) -> str: # noqa: ARG002 

194 return "" 

195 

196 def _to_text(self, **kwargs: Any) -> str: # noqa: ARG002 

197 return "" 

198 

199 

200class Vertex: 

201 """Vertex class. Used in Graph class.""" 

202 

203 def __init__(self, name: str) -> None: 

204 """Initialization method. 

205 

206 Args: 

207 name (str): name of the vertex. 

208 """ 

209 self.name = name 

210 """Name of the vertex.""" 

211 self.edges_in: set[Edge] = set() 

212 """Incoming edges.""" 

213 self.edges_out: set[Edge] = set() 

214 """Outgoing edges.""" 

215 

216 def __str__(self): 

217 return self.name 

218 

219 def connect_to(self, vertex: Vertex, weight: int = 1) -> Edge: 

220 """Connect this vertex to another one. 

221 

222 Args: 

223 vertex: Vertex to connect to. 

224 weight: Weight of the edge. 

225 

226 Returns: 

227 The newly created edge. 

228 """ 

229 for edge in self.edges_out: 

230 if vertex == edge.vertex_in: 

231 return edge 

232 return Edge(self, vertex, weight) 

233 

234 def connect_from(self, vertex: Vertex, weight: int = 1) -> Edge: 

235 """Connect another vertex to this one. 

236 

237 Args: 

238 vertex: Vertex to connect from. 

239 weight: Weight of the edge. 

240 

241 Returns: 

242 The newly created edge. 

243 """ 

244 for edge in self.edges_in: 

245 if vertex == edge.vertex_out: 

246 return edge 

247 return Edge(vertex, self, weight) 

248 

249 

250class Edge: 

251 """Edge class. Used in Graph class.""" 

252 

253 def __init__(self, vertex_out: Vertex, vertex_in: Vertex, weight: int = 1) -> None: 

254 """Initialization method. 

255 

256 Args: 

257 vertex_out (Vertex): source vertex (edge going out). 

258 vertex_in (Vertex): target vertex (edge going in). 

259 weight (int): weight of the edge. 

260 """ 

261 self.vertex_out: Vertex | None = None 

262 """Outgoing vertex.""" 

263 self.vertex_in: Vertex | None = None 

264 """Incoming vertex.""" 

265 self.weight = weight 

266 """Weight of the edge.""" 

267 self.go_from(vertex_out) 

268 self.go_in(vertex_in) 

269 

270 def __str__(self): 

271 return f"{self.vertex_out.name} --{self.weight}--> {self.vertex_in.name}" 

272 

273 def go_from(self, vertex: Vertex) -> None: 

274 """Tell the edge to go out from this vertex. 

275 

276 Args: 

277 vertex (Vertex): vertex to go from. 

278 """ 

279 if self.vertex_out: 

280 self.vertex_out.edges_out.remove(self) 

281 self.vertex_out = vertex 

282 vertex.edges_out.add(self) 

283 

284 def go_in(self, vertex: Vertex) -> None: 

285 """Tell the edge to go into this vertex. 

286 

287 Args: 

288 vertex (Vertex): vertex to go into. 

289 """ 

290 if self.vertex_in: 

291 self.vertex_in.edges_in.remove(self) 

292 self.vertex_in = vertex 

293 vertex.edges_in.add(self) 

294 

295 

296class Graph(PrintMixin): 

297 """Graph class. 

298 

299 A class to build a graph given a list of nodes. After instantiation, 

300 it has two attributes: vertices, the set of nodes, 

301 and edges, the set of edges. 

302 """ 

303 

304 def __init__(self, *nodes: DSM | Package | Module, depth: int = 0) -> None: 

305 """Initialization method. 

306 

307 An intermediary matrix is built to ease the creation of the graph. 

308 

309 Args: 

310 *nodes (list of DSM/Package/Module): 

311 the nodes on which to build the graph. 

312 depth (int): the depth of the intermediary matrix. See 

313 the documentation for Matrix class. 

314 """ 

315 self.edges = set() 

316 """Set of edges in the graph.""" 

317 vertices = [] 

318 matrix = Matrix(*nodes, depth=depth) 

319 for key in matrix.keys: 

320 vertices.append(Vertex(key)) 

321 for line_index, line in enumerate(matrix.data): 

322 for col_index, cell in enumerate(line): 

323 if cell > 0: 

324 self.edges.add(Edge(vertices[line_index], vertices[col_index], weight=cell)) 

325 self.vertices = set(vertices) 

326 """Set of vertices in the graph.""" 

327 

328 def _to_csv(self, **kwargs: Any) -> str: 

329 header = kwargs.pop("header", True) 

330 text = ["vertex_out,edge_weight,vertex_in\n" if header else ""] 

331 for edge in self.edges: 

332 text.append(f"{edge.vertex_out.name},{edge.weight},{edge.vertex_in.name}\n") # type: ignore[union-attr] 

333 for vertex in self.vertices: 

334 if not (vertex.edges_out or vertex.edges_in): 

335 text.append("{vertex.name},,\n") 

336 return "".join(text) 

337 

338 def _to_json(self, **kwargs: Any) -> str: 

339 return json.dumps( 

340 { 

341 "vertices": [vertex.name for vertex in self.vertices], 

342 "edges": [ 

343 {"out": edge.vertex_out.name, "weight": edge.weight, "in": edge.vertex_in.name} # type: ignore[union-attr] 

344 for edge in self.edges 

345 ], 

346 }, 

347 **kwargs, 

348 ) 

349 

350 def _to_text(self, **kwargs: Any) -> str: # noqa: ARG002 

351 return ""