Coverage for src/dependenpy/_internal/finder.py: 61.61%

78 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2025-08-24 18:36 +0200

1from __future__ import annotations 

2 

3from importlib.util import find_spec 

4from os.path import basename, exists, isdir, isfile, join, splitext 

5from typing import Any 

6 

7 

8class PackageSpec: 

9 """Holder for a package specification (given as argument to DSM).""" 

10 

11 def __init__(self, name: str, path: str, limit_to: list[str] | None = None) -> None: 

12 """Initialization method. 

13 

14 Args: 

15 name (str): name of the package. 

16 path (str): path to the package. 

17 limit_to (list of str): limitations. 

18 """ 

19 self.name = name 

20 """Name of the package.""" 

21 self.path = path 

22 """Path to the package.""" 

23 self.limit_to = limit_to or [] 

24 """List of limitations.""" 

25 

26 def __hash__(self): 

27 """Hash method. 

28 

29 The hash is computed based on the package name and path. 

30 """ 

31 return hash((self.name, self.path)) 

32 

33 @property 

34 def ismodule(self) -> bool: 

35 """Property to tell if the package is in fact a module (a file). 

36 

37 Returns: 

38 Whether this package is in fact a module. 

39 """ 

40 return self.path.endswith(".py") 

41 

42 def add(self, spec: PackageSpec) -> None: 

43 """Add limitations of given spec to self's. 

44 

45 Args: 

46 spec: Another spec. 

47 """ 

48 for limit in spec.limit_to: 

49 if limit not in self.limit_to: 

50 self.limit_to.append(limit) 

51 

52 @staticmethod 

53 def combine(specs: list[PackageSpec]) -> list[PackageSpec]: 

54 """Combine package specifications' limitations. 

55 

56 Args: 

57 specs: The package specifications. 

58 

59 Returns: 

60 The new, merged list of PackageSpec. 

61 """ 

62 new_specs: dict[PackageSpec, PackageSpec] = {} 

63 for spec in specs: 

64 if new_specs.get(spec) is None: 64 ↛ 67line 64 didn't jump to line 67 because the condition on line 64 was always true

65 new_specs[spec] = spec 

66 else: 

67 new_specs[spec].add(spec) 

68 return list(new_specs.values()) 

69 

70 

71class PackageFinder: 

72 """Abstract package finder class.""" 

73 

74 def find(self, package: str, **kwargs: Any) -> PackageSpec | None: 

75 """Find method. 

76 

77 Args: 

78 package: package to find. 

79 **kwargs: additional keyword arguments. 

80 

81 Returns: 

82 Package spec or None. 

83 """ 

84 raise NotImplementedError 

85 

86 

87class LocalPackageFinder(PackageFinder): 

88 """Finder to find local packages (directories on the disk).""" 

89 

90 def find(self, package: str, **kwargs: Any) -> PackageSpec | None: 

91 """Find method. 

92 

93 Args: 

94 package: package to find. 

95 **kwargs: additional keyword arguments. 

96 

97 Returns: 

98 Package spec or None. 

99 """ 

100 if not exists(package): 100 ↛ 102line 100 didn't jump to line 102 because the condition on line 100 was always true

101 return None 

102 name, path = None, None 

103 enforce_init = kwargs.pop("enforce_init", True) 

104 if isdir(package): 

105 if isfile(join(package, "__init__.py")) or not enforce_init: 

106 name, path = basename(package), package 

107 elif isfile(package) and package.endswith(".py"): 

108 name, path = splitext(basename(package))[0], package 

109 if name and path: 

110 return PackageSpec(name, path) 

111 return None 

112 

113 

114class InstalledPackageFinder(PackageFinder): 

115 """Finder to find installed Python packages using importlib.""" 

116 

117 def find(self, package: str, **kwargs: Any) -> PackageSpec | None: # noqa: ARG002 

118 """Find method. 

119 

120 Args: 

121 package: package to find. 

122 **kwargs: additional keyword arguments. 

123 

124 Returns: 

125 Package spec or None. 

126 """ 

127 spec = find_spec(package) 

128 if spec is None: 

129 return None 

130 if "." in package: 130 ↛ 131line 130 didn't jump to line 131 because the condition on line 130 was never true

131 package, rest = package.split(".", 1) 

132 limit = [rest] 

133 spec = find_spec(package) 

134 else: 

135 limit = [] 

136 if spec is not None: 136 ↛ 144line 136 didn't jump to line 144 because the condition on line 136 was always true

137 if spec.submodule_search_locations: 137 ↛ 139line 137 didn't jump to line 139 because the condition on line 137 was always true

138 path = spec.submodule_search_locations[0] 

139 elif spec.origin and spec.origin != "built-in": 

140 path = spec.origin 

141 else: 

142 return None 

143 return PackageSpec(spec.name, path, limit) 

144 return None 

145 

146 

147class Finder: 

148 """Main package finder class. 

149 

150 Initialize it with a list of package finder classes (not instances). 

151 """ 

152 

153 def __init__(self, finders: list[type] | None = None): 

154 """Initialization method. 

155 

156 Args: 

157 finders: list of package finder classes (not instances) in a specific 

158 order. Default: [LocalPackageFinder, InstalledPackageFinder]. 

159 """ 

160 self.finders: list[PackageFinder] 

161 """Selected finders.""" 

162 if finders is None: 162 ↛ 165line 162 didn't jump to line 165 because the condition on line 162 was always true

163 finder_instances = [LocalPackageFinder(), InstalledPackageFinder()] 

164 else: 

165 finder_instances = [finder() for finder in finders] 

166 self.finders = finder_instances 

167 

168 def find(self, package: str, **kwargs: Any) -> PackageSpec | None: 

169 """Find a package using package finders. 

170 

171 Return the first package found. 

172 

173 Args: 

174 package: package to find. 

175 **kwargs: additional keyword arguments used by finders. 

176 

177 Returns: 

178 Package spec or None. 

179 """ 

180 for finder in self.finders: 

181 package_spec = finder.find(package, **kwargs) 

182 if package_spec: 

183 return package_spec 

184 return None