Coverage for src/dependenpy/structures.py: 51.89%

164 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-09-04 11:35 +0200

1"""dependenpy structures module.""" 

2 

3from __future__ import annotations 

4 

5import copy 

6import json 

7from typing import TYPE_CHECKING, Any 

8 

9from colorama import Style 

10 

11from dependenpy.helpers import PrintMixin 

12 

13if TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14, because the condition on line 13 was never true

14 from dependenpy.dsm import DSM, Module, Package 

15 

16 

17class Matrix(PrintMixin): 

18 """ 

19 Matrix class. 

20 

21 A class to build a matrix given a list of nodes. After instantiation, 

22 it has two attributes: data, a 2-dimensions array, and keys, the names 

23 of the entities in the corresponding order. 

24 """ 

25 

26 def __init__(self, *nodes: DSM | Package | Module, depth: int = 0): # noqa: WPS231 

27 """ 

28 Initialization method. 

29 

30 Args: 

31 *nodes: The nodes on which to build the matrix. 

32 depth: The depth of the matrix. This depth is always 

33 absolute, meaning that building a matrix with a sub-package 

34 "A.B.C" and a depth of 1 will return a matrix of size 1, 

35 containing A only. To see the matrix for the sub-modules and 

36 sub-packages in C, you will have to give depth=4. 

37 """ 

38 modules: list[Module] = [] 

39 for node in nodes: 

40 if node.ismodule: 40 ↛ 41line 40 didn't jump to line 41, because the condition on line 40 was never true

41 modules.append(node) # type: ignore[arg-type] 

42 elif node.ispackage or node.isdsm: 42 ↛ 39line 42 didn't jump to line 39, because the condition on line 42 was never false

43 modules.extend(node.submodules) # type: ignore[union-attr] 

44 

45 if depth < 1: 45 ↛ 46line 45 didn't jump to line 46, because the condition on line 45 was never true

46 keys = modules 

47 else: 

48 keys = [] 

49 for module in modules: 

50 if module.depth <= depth: 

51 keys.append(module) 

52 continue 

53 package = module.package 

54 while package.depth > depth and package.package and package not in nodes: 

55 package = package.package 

56 if package not in keys: 

57 keys.append(package) 

58 

59 size = len(keys) 

60 data = [[0] * size for _ in range(size)] # noqa: WPS435 

61 keys = sorted(keys, key=lambda key: key.absolute_name()) 

62 

63 if depth < 1: 63 ↛ 64line 63 didn't jump to line 64, because the condition on line 63 was never true

64 for index, key in enumerate(keys): # noqa: WPS440 

65 key.index = index # type: ignore[attr-defined] 

66 for index, key in enumerate(keys): # noqa: WPS440 

67 for dep in key.dependencies: 

68 if dep.external: 

69 continue 

70 if dep.target.ismodule and dep.target in keys: 

71 data[index][dep.target.index] += 1 

72 elif dep.target.ispackage: 

73 init = dep.target.get("__init__") 

74 if init is not None and init in keys: 

75 data[index][init.index] += 1 

76 else: 

77 for row, row_key in enumerate(keys): 

78 for col, col_key in enumerate(keys): 

79 data[row][col] = row_key.cardinal(to=col_key) 

80 

81 self.size = size 

82 self.keys = [key.absolute_name() for key in keys] # noqa: WPS441 

83 self.data = data 

84 

85 @staticmethod # noqa: WPS602 

86 def cast(keys: list[str], data: list[list[int]]) -> Matrix: # noqa: WPS602 

87 """ 

88 Cast a set of keys and an array to a Matrix object. 

89 

90 Arguments: 

91 keys: The matrix keys. 

92 data: The matrix data. 

93 

94 Returns: 

95 A new matrix. 

96 """ 

97 matrix = Matrix() 

98 matrix.keys = copy.deepcopy(keys) 

99 matrix.data = copy.deepcopy(data) 

100 return matrix 

101 

102 @property 

103 def total(self) -> int: 

104 """ 

105 Return the total number of dependencies within this matrix. 

106 

107 Returns: 

108 The total number of dependencies. 

109 """ 

110 return sum(cell for line in self.data for cell in line) 

111 

112 def _to_csv(self, **kwargs): 

113 text = ["module,", ",".join(self.keys)] 

114 for index, key in enumerate(self.keys): 

115 line = ",".join(map(str, self.data[index])) 

116 text.append(f"{key},{line}") 

117 return "\n".join(text) 

118 

119 def _to_json(self, **kwargs): 

120 return json.dumps({"keys": self.keys, "data": self.data}, **kwargs) 

121 

122 def _to_text(self, **kwargs): 

123 if not self.keys or not self.data: 123 ↛ 124line 123 didn't jump to line 124, because the condition on line 123 was never true

124 return "" 

125 zero = kwargs.pop("zero", "0") 

126 max_key_length = max(len(key) for key in self.keys + ["Module"]) 

127 max_dep_length = max([len(str(col)) for line in self.data for col in line] + [len(zero)]) 

128 key_col_length = len(str(len(self.keys))) 

129 key_line_length = max(key_col_length, 2) 

130 column_length = max(key_col_length, max_dep_length) 

131 bold = Style.BRIGHT 

132 reset = Style.RESET_ALL 

133 

134 # first line left headers 

135 text = [f"\n {bold}{'Module':>{max_key_length}}{reset} │ {bold}{'Id':>{key_line_length}}{reset} │"] 

136 # first line column headers 

137 for index, _ in enumerate(self.keys): 

138 text.append(f"{bold}{index:^{column_length}}{reset}│") 

139 text.append("\n") 

140 # line of dashes 

141 text.append(f" {'─' * max_key_length}─┼─{'─' * key_line_length}─┼") 

142 for _ in range(len(self.keys) - 1): 

143 text.append(f"{'─' * column_length}┼") 

144 text.append(f"{'─' * column_length}┤") 

145 text.append("\n") 

146 # lines 

147 for index, key in enumerate(self.keys): # noqa: WPS440 

148 text.append(f" {key:>{max_key_length}} │ {bold}{index:>{key_line_length}}{reset} │") 

149 for value in self.data[index]: 

150 text.append((f"{value if value else zero:>{column_length}}│")) 

151 text.append("\n") 

152 text.append("\n") 

153 

154 return "".join(text) 

155 

156 

157class TreeMap(PrintMixin): 

158 """TreeMap class.""" 

159 

160 def __init__(self, *nodes: Any, value: int = -1): 

161 """ 

162 Initialization method. 

163 

164 Arguments: 

165 *nodes: the nodes from which to build the treemap. 

166 value: the value of the current area. 

167 """ 

168 # if nodes: 

169 # matrix_lower_level = Matrix(*nodes, depth=2) 

170 # matrix_current_level = Matrix(*nodes, depth=1) 

171 # if value == -1: 

172 # value = sum(c for row in matrix_current_level.data for c in row) 

173 # splits = [0] 

174 # key_comp = matrix_lower_level.keys[0].split('.')[0] 

175 # i = 1 

176 # for key in matrix_lower_level.keys[1:]: 

177 # key = key.split('.')[0] 

178 # if key != key_comp: 

179 # splits.append(i) 

180 # key_comp = key 

181 # i += 1 

182 # splits.append(i) 

183 # 

184 # self.data = [] 

185 # for i in range(len(splits) - 1): 

186 # self.data.append([]) 

187 # rows = matrix_lower_level.data[splits[i]:splits[i+1]] 

188 # for j in range(len(splits) - 1): 

189 # self.data[i].append([row[splits[j]:splits[j+1]] for row in rows]) 

190 

191 self.value = value 

192 

193 def _to_csv(self, **kwargs): 

194 return "" 

195 

196 def _to_json(self, **kwargs): 

197 return "" 

198 

199 def _to_text(self, **kwargs): 

200 return "" 

201 

202 

203class Vertex(object): 

204 """Vertex class. Used in Graph class.""" 

205 

206 def __init__(self, name): 

207 """ 

208 Initialization method. 

209 

210 Args: 

211 name (str): name of the vertex. 

212 """ 

213 self.name = name 

214 self.edges_in = set() 

215 self.edges_out = set() 

216 

217 def __str__(self): 

218 return self.name 

219 

220 def connect_to(self, vertex: Vertex, weight: int = 1) -> Edge: 

221 """ 

222 Connect this vertex to another one. 

223 

224 Args: 

225 vertex: Vertex to connect to. 

226 weight: Weight of the edge. 

227 

228 Returns: 

229 The newly created edge. 

230 """ 

231 for edge in self.edges_out: 

232 if vertex == edge.vertex_in: 

233 return edge 

234 return Edge(self, vertex, weight) 

235 

236 def connect_from(self, vertex: Vertex, weight: int = 1) -> Edge: 

237 """ 

238 Connect another vertex to this one. 

239 

240 Args: 

241 vertex: Vertex to connect from. 

242 weight: Weight of the edge. 

243 

244 Returns: 

245 The newly created edge. 

246 """ 

247 for edge in self.edges_in: 

248 if vertex == edge.vertex_out: 

249 return edge 

250 return Edge(vertex, self, weight) 

251 

252 

253class Edge(object): 

254 """Edge class. Used in Graph class.""" 

255 

256 def __init__(self, vertex_out, vertex_in, weight=1): 

257 """ 

258 Initialization method. 

259 

260 Args: 

261 vertex_out (Vertex): source vertex (edge going out). 

262 vertex_in (Vertex): target vertex (edge going in). 

263 weight (int): weight of the edge. 

264 """ 

265 self.vertex_out = None 

266 self.vertex_in = None 

267 self.weight = weight 

268 self.go_from(vertex_out) 

269 self.go_in(vertex_in) 

270 

271 def __str__(self): 

272 return f"{self.vertex_out.name} --{self.weight}--> {self.vertex_in.name}" 

273 

274 def go_from(self, vertex): 

275 """ 

276 Tell the edge to go out from this vertex. 

277 

278 Args: 

279 vertex (Vertex): vertex to go from. 

280 """ 

281 if self.vertex_out: 

282 self.vertex_out.edges_out.remove(self) 

283 self.vertex_out = vertex 

284 vertex.edges_out.add(self) 

285 

286 def go_in(self, vertex): 

287 """ 

288 Tell the edge to go into this vertex. 

289 

290 Args: 

291 vertex (Vertex): vertex to go into. 

292 """ 

293 if self.vertex_in: 

294 self.vertex_in.edges_in.remove(self) 

295 self.vertex_in = vertex 

296 vertex.edges_in.add(self) 

297 

298 

299class Graph(PrintMixin): 

300 """ 

301 Graph class. 

302 

303 A class to build a graph given a list of nodes. After instantiation, 

304 it has two attributes: vertices, the set of nodes, 

305 and edges, the set of edges. 

306 """ 

307 

308 def __init__(self, *nodes, depth=0): 

309 """ 

310 Initialization method. 

311 

312 An intermediary matrix is built to ease the creation of the graph. 

313 

314 Args: 

315 *nodes (list of DSM/Package/Module): 

316 the nodes on which to build the graph. 

317 depth (int): the depth of the intermediary matrix. See 

318 the documentation for Matrix class. 

319 """ 

320 self.edges = set() 

321 vertices = [] 

322 matrix = Matrix(*nodes, depth=depth) 

323 for key in matrix.keys: 

324 vertices.append(Vertex(key)) 

325 for line_index, line in enumerate(matrix.data): 

326 for col_index, cell in enumerate(line): 

327 if cell > 0: 

328 self.edges.add(Edge(vertices[line_index], vertices[col_index], weight=cell)) 

329 self.vertices = set(vertices) 

330 

331 def _to_csv(self, **kwargs): 

332 header = kwargs.pop("header", True) 

333 text = ["vertex_out,edge_weight,vertex_in\n" if header else ""] 

334 for edge in self.edges: 

335 text.append(f"{edge.vertex_out.name},{edge.weight},{edge.vertex_in.name}\n") 

336 for vertex in self.vertices: 

337 if not (vertex.edges_out or vertex.edges_in): 

338 text.append("{vertex.name},,\n") 

339 return "".join(text) 

340 

341 def _to_json(self, **kwargs): 

342 return json.dumps( 

343 { 

344 "vertices": [vertex.name for vertex in self.vertices], 

345 "edges": [ 

346 {"out": edge.vertex_out.name, "weight": edge.weight, "in": edge.vertex_in.name} 

347 for edge in self.edges 

348 ], 

349 }, 

350 **kwargs, 

351 ) 

352 

353 def _to_text(self, **kwargs): 

354 return ""