Coverage for NeuralTSNE/NeuralTSNE/TSNE/tests/test_parametric_tsne.py: 100%

143 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-18 16:32 +0000

1import random 

2from collections import OrderedDict 

3from typing import Any, List, Tuple 

4from unittest.mock import MagicMock, patch 

5 

6import pytest 

7import torch 

8from torch.utils.data import DataLoader, TensorDataset 

9 

10from NeuralTSNE.TSNE.ParametricTSNE import ParametricTSNE 

11 

12from NeuralTSNE.TSNE.tests.fixtures.dataloader_fixtures import mock_dataloaders 

13from NeuralTSNE.TSNE.tests.fixtures.parametric_tsne_fixtures import ( 

14 parametric_tsne_instance, 

15 default_parametric_tsne_instance, 

16) 

17 

18 

19@pytest.mark.parametrize( 

20 "parametric_tsne_instance", 

21 [ 

22 { 

23 "loss_fn": "mse", 

24 "n_components": 2, 

25 "perplexity": 30, 

26 "batch_size": 64, 

27 "early_exaggeration_epochs": 5, 

28 "early_exaggeration_value": 12.0, 

29 "max_iterations": 300, 

30 "features": 256, 

31 "multipliers": [1.0, 2.0], 

32 "n_jobs": 0, 

33 "tolerance": 1e-5, 

34 "force_cpu": False, 

35 }, 

36 { 

37 "loss_fn": "kl_divergence", 

38 "n_components": 3, 

39 "perplexity": 50, 

40 "batch_size": 128, 

41 "early_exaggeration_epochs": 10, 

42 "early_exaggeration_value": 8.0, 

43 "max_iterations": 500, 

44 "features": 512, 

45 "multipliers": [1.0, 1.5], 

46 "n_jobs": 2, 

47 "tolerance": 1e-6, 

48 "force_cpu": True, 

49 }, 

50 ], 

51 indirect=True, 

52) 

53def test_parametric_tsne_init(parametric_tsne_instance): 

54 tsne_instance, params, mocks = parametric_tsne_instance 

55 tsne_dict = tsne_instance.__dict__ 

56 del tsne_dict["device"], tsne_dict["model"] 

57 

58 mocks["loss_fn"].assert_called_once_with(params["loss_fn"]) 

59 mocks["nn"].assert_called_once_with( 

60 params["features"], 

61 params["n_components"], 

62 params["multipliers"], 

63 ) 

64 del ( 

65 params["features"], 

66 params["n_components"], 

67 params["multipliers"], 

68 params["force_cpu"], 

69 ) 

70 assert isinstance(tsne_instance, ParametricTSNE) 

71 assert tsne_dict == params 

72 

73 

74@pytest.mark.parametrize("loss_fn", ["kl_divergence"]) 

75def test_set_loss_fn(default_parametric_tsne_instance, loss_fn: List[str]): 

76 tsne_instance, _ = default_parametric_tsne_instance 

77 tsne_instance.loss_fn = None 

78 tsne_instance.set_loss_fn(loss_fn) 

79 

80 assert tsne_instance.loss_fn is not None 

81 

82 

83@pytest.mark.parametrize("loss_fn", ["dummy"]) 

84def test_set_invalid_loss_fn(default_parametric_tsne_instance, loss_fn: List[str]): 

85 tsne_instance, _ = default_parametric_tsne_instance 

86 tsne_instance.loss_fn = None 

87 with pytest.raises(AttributeError): 

88 tsne_instance.set_loss_fn(loss_fn) 

89 

90 

91@pytest.mark.parametrize("filename", ["test", "model"]) 

92@patch("NeuralTSNE.TSNE.neural_tsne.torch.save") 

93def test_save_model( 

94 mock_save: MagicMock, filename: str, default_parametric_tsne_instance 

95): 

96 tsne_instance, _ = default_parametric_tsne_instance 

97 tsne_instance.save_model(filename) 

98 

99 mock_save.assert_called_once() 

100 args = mock_save.call_args_list[0].args 

101 assert isinstance(args[0], OrderedDict) 

102 assert args[1] == filename 

103 

104 

105@pytest.mark.parametrize("filename", ["test", "model"]) 

106@patch("NeuralTSNE.TSNE.ParametricTSNE.parametric_tsne.torch.load") 

107@patch("NeuralTSNE.TSNE.NeuralNetwork.neural_network.NeuralNetwork.load_state_dict") 

108def test_read_model( 

109 mock_load_dict: MagicMock, 

110 mock_load: MagicMock, 

111 filename: str, 

112 default_parametric_tsne_instance, 

113): 

114 tsne_instance, _ = default_parametric_tsne_instance 

115 tsne_instance.read_model(filename) 

116 

117 mock_load.assert_called_once_with(filename) 

118 mock_load_dict.assert_called_once_with(mock_load(filename)) 

119 

120 

121@pytest.mark.parametrize( 

122 "split", [(0.8, 0.2), (0.6, 0.4), (0.55, 0.45), (0, 1), (1, 0)] 

123) 

124@pytest.mark.parametrize("labels", [True, False]) 

125def test_split_dataset( 

126 mock_dataloaders, 

127 default_parametric_tsne_instance, 

128 split: Tuple[float, float], 

129 labels: bool, 

130): 

131 tsne_instance, _ = default_parametric_tsne_instance 

132 y = None 

133 X = torch.randn(100, 10) 

134 if labels: 

135 y = torch.randint(0, 2, (100,)) 

136 train_dataloader, test_dataloader = tsne_instance.split_dataset( 

137 X, y, train_size=split[0], test_size=split[1] 

138 ) 

139 

140 assert isinstance(train_dataloader, DataLoader) 

141 assert isinstance(test_dataloader, DataLoader) 

142 assert mock_dataloaders.call_count == 1 

143 

144 args = mock_dataloaders.call_args_list[0].args 

145 train = args[1] 

146 test = args[2] 

147 

148 train_len = len(train) if split[0] != 0 else None 

149 test_len = len(test) if split[1] != 0 else None 

150 tensors_number = 1 if not labels else 2 

151 

152 if train_len is None or test_len is None: 

153 if split[0] == 0: 

154 assert train is None 

155 assert len(test.dataset.tensors) == tensors_number 

156 assert test_len == X.shape[0] 

157 else: 

158 assert test is None 

159 assert len(train.dataset.tensors) == tensors_number 

160 assert train_len is X.shape[0] 

161 else: 

162 assert len(train.dataset.tensors) == tensors_number 

163 assert len(test.dataset.tensors) == tensors_number 

164 

165 eps = 1e-4 

166 assert ( 

167 split[0] - eps < train_len / (train_len + test_len) < split[0] + eps 

168 ) is True 

169 assert ( 

170 split[1] - eps < test_len / (train_len + test_len) < split[1] + eps 

171 ) is True 

172 

173 

174@pytest.mark.parametrize( 

175 "input_values, output", 

176 [ 

177 ((None, None), (0.8, 0.2)), 

178 ((0.7, None), (0.7, 0.3)), 

179 ((None, 0.4), (0.6, 0.4)), 

180 ((0.6, 0.4), (0.6, 0.4)), 

181 ((0.8, 0.5), (0.8, 0.2)), 

182 ((0.5, 0.8), (0.5, 0.5)), 

183 ], 

184) 

185def test_determine_train_test_split( 

186 default_parametric_tsne_instance, 

187 input_values: Tuple[float | None, float | None], 

188 output: Tuple[float, float], 

189): 

190 tsne_instance, _ = default_parametric_tsne_instance 

191 train_size, test_size = tsne_instance._determine_train_test_split(*input_values) 

192 eps = 1e-4 

193 assert (train_size - eps < output[0] < train_size + eps) is True 

194 assert (test_size - eps < output[1] < test_size + eps) is True 

195 

196 

197@pytest.mark.parametrize( 

198 "train_dataset", 

199 [TensorDataset(torch.randn(100, 10), torch.randint(0, 2, (100,))), None], 

200) 

201@pytest.mark.parametrize( 

202 "test_dataset", 

203 [TensorDataset(torch.randn(20, 10), torch.randint(0, 2, (20,))), None], 

204) 

205def test_create_dataloaders( 

206 default_parametric_tsne_instance, 

207 train_dataset: TensorDataset | None, 

208 test_dataset: TensorDataset | None, 

209): 

210 tsne_instance, _ = default_parametric_tsne_instance 

211 train_loader, test_loader = tsne_instance.create_dataloaders( 

212 train_dataset, test_dataset 

213 ) 

214 

215 if train_dataset is None: 

216 assert train_loader is None 

217 else: 

218 assert isinstance(train_loader, DataLoader) 

219 

220 if test_dataset is None: 

221 assert test_loader is None 

222 else: 

223 assert isinstance(test_loader, DataLoader) 

224 

225 

226def test_calculate_P(default_parametric_tsne_instance): 

227 TQDM_DISABLE = 1 

228 tsne_instance, params = default_parametric_tsne_instance 

229 

230 dataloader = DataLoader( 

231 TensorDataset(torch.randn(50, 15), torch.randint(0, 2, (50,))), 

232 batch_size=params["batch_size"], 

233 ) 

234 

235 result_P = tsne_instance._calculate_P(dataloader) 

236 

237 assert result_P.shape == (50, params["batch_size"]) 

238 

239 

240@pytest.mark.parametrize("fill_with", [0, "NaN"]) 

241@patch("NeuralTSNE.TSNE.ParametricTSNE.parametric_tsne.x2p") 

242def test_calculate_P_mocked( 

243 mock_x2p: MagicMock, default_parametric_tsne_instance, fill_with: Any 

244): 

245 TQDM_DISABLE = 1 

246 tsne_instance, params = default_parametric_tsne_instance 

247 samples = 50 

248 

249 dataloader = DataLoader( 

250 TensorDataset(torch.randn(samples, 15), torch.randint(0, 2, (50,))), 

251 batch_size=params["batch_size"], 

252 ) 

253 

254 select_one = random.randint(0, params["batch_size"] - 1) 

255 if fill_with == 0: 

256 ret = torch.zeros(params["batch_size"], params["batch_size"]) 

257 if fill_with == "NaN": 

258 ret = torch.full((params["batch_size"], params["batch_size"]), torch.nan) 

259 

260 ret[select_one] = 1 

261 mock_x2p.return_value = ret 

262 

263 result_P = tsne_instance._calculate_P(dataloader) 

264 

265 assert result_P.shape == (samples, params["batch_size"]) 

266 

267 expected_tensor = torch.zeros(samples, params["batch_size"]) 

268 expected_tensor[:, select_one] = 1 

269 for i in range(0, samples, params["batch_size"]): 

270 cut = expected_tensor[i : i + params["batch_size"]] 

271 cut = cut + cut.T 

272 cut = cut / cut.sum() 

273 expected_tensor[i : i + params["batch_size"]] = cut 

274 assert torch.allclose(result_P, expected_tensor) 

275 

276 

277@pytest.mark.parametrize("fill_with", [0, "NaN"]) 

278@patch("NeuralTSNE.TSNE.ParametricTSNE.parametric_tsne.x2p") 

279def test_calculate_P_mocked_nan( 

280 mock_x2p: MagicMock, default_parametric_tsne_instance, fill_with: Any 

281): 

282 TQDM_DISABLE = 1 

283 tsne_instance, params = default_parametric_tsne_instance 

284 samples = 50 

285 

286 dataloader = DataLoader( 

287 TensorDataset(torch.randn(samples, 15), torch.randint(0, 2, (50,))), 

288 batch_size=params["batch_size"], 

289 ) 

290 

291 if fill_with == 0: 

292 ret = torch.zeros(params["batch_size"], params["batch_size"]) 

293 if fill_with == "NaN": 

294 ret = torch.full((params["batch_size"], params["batch_size"]), torch.nan) 

295 

296 mock_x2p.return_value = ret 

297 

298 result_P = tsne_instance._calculate_P(dataloader) 

299 

300 assert result_P.shape == (samples, params["batch_size"]) 

301 assert torch.allclose( 

302 result_P, torch.full((samples, params["batch_size"]), torch.nan), equal_nan=True 

303 )