Coverage for NeuralTSNE/NeuralTSNE/TSNE/tests/test_parametric_tsne.py: 100%
143 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-18 16:32 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-18 16:32 +0000
1import random
2from collections import OrderedDict
3from typing import Any, List, Tuple
4from unittest.mock import MagicMock, patch
6import pytest
7import torch
8from torch.utils.data import DataLoader, TensorDataset
10from NeuralTSNE.TSNE.ParametricTSNE import ParametricTSNE
12from NeuralTSNE.TSNE.tests.fixtures.dataloader_fixtures import mock_dataloaders
13from NeuralTSNE.TSNE.tests.fixtures.parametric_tsne_fixtures import (
14 parametric_tsne_instance,
15 default_parametric_tsne_instance,
16)
19@pytest.mark.parametrize(
20 "parametric_tsne_instance",
21 [
22 {
23 "loss_fn": "mse",
24 "n_components": 2,
25 "perplexity": 30,
26 "batch_size": 64,
27 "early_exaggeration_epochs": 5,
28 "early_exaggeration_value": 12.0,
29 "max_iterations": 300,
30 "features": 256,
31 "multipliers": [1.0, 2.0],
32 "n_jobs": 0,
33 "tolerance": 1e-5,
34 "force_cpu": False,
35 },
36 {
37 "loss_fn": "kl_divergence",
38 "n_components": 3,
39 "perplexity": 50,
40 "batch_size": 128,
41 "early_exaggeration_epochs": 10,
42 "early_exaggeration_value": 8.0,
43 "max_iterations": 500,
44 "features": 512,
45 "multipliers": [1.0, 1.5],
46 "n_jobs": 2,
47 "tolerance": 1e-6,
48 "force_cpu": True,
49 },
50 ],
51 indirect=True,
52)
53def test_parametric_tsne_init(parametric_tsne_instance):
54 tsne_instance, params, mocks = parametric_tsne_instance
55 tsne_dict = tsne_instance.__dict__
56 del tsne_dict["device"], tsne_dict["model"]
58 mocks["loss_fn"].assert_called_once_with(params["loss_fn"])
59 mocks["nn"].assert_called_once_with(
60 params["features"],
61 params["n_components"],
62 params["multipliers"],
63 )
64 del (
65 params["features"],
66 params["n_components"],
67 params["multipliers"],
68 params["force_cpu"],
69 )
70 assert isinstance(tsne_instance, ParametricTSNE)
71 assert tsne_dict == params
74@pytest.mark.parametrize("loss_fn", ["kl_divergence"])
75def test_set_loss_fn(default_parametric_tsne_instance, loss_fn: List[str]):
76 tsne_instance, _ = default_parametric_tsne_instance
77 tsne_instance.loss_fn = None
78 tsne_instance.set_loss_fn(loss_fn)
80 assert tsne_instance.loss_fn is not None
83@pytest.mark.parametrize("loss_fn", ["dummy"])
84def test_set_invalid_loss_fn(default_parametric_tsne_instance, loss_fn: List[str]):
85 tsne_instance, _ = default_parametric_tsne_instance
86 tsne_instance.loss_fn = None
87 with pytest.raises(AttributeError):
88 tsne_instance.set_loss_fn(loss_fn)
91@pytest.mark.parametrize("filename", ["test", "model"])
92@patch("NeuralTSNE.TSNE.neural_tsne.torch.save")
93def test_save_model(
94 mock_save: MagicMock, filename: str, default_parametric_tsne_instance
95):
96 tsne_instance, _ = default_parametric_tsne_instance
97 tsne_instance.save_model(filename)
99 mock_save.assert_called_once()
100 args = mock_save.call_args_list[0].args
101 assert isinstance(args[0], OrderedDict)
102 assert args[1] == filename
105@pytest.mark.parametrize("filename", ["test", "model"])
106@patch("NeuralTSNE.TSNE.ParametricTSNE.parametric_tsne.torch.load")
107@patch("NeuralTSNE.TSNE.NeuralNetwork.neural_network.NeuralNetwork.load_state_dict")
108def test_read_model(
109 mock_load_dict: MagicMock,
110 mock_load: MagicMock,
111 filename: str,
112 default_parametric_tsne_instance,
113):
114 tsne_instance, _ = default_parametric_tsne_instance
115 tsne_instance.read_model(filename)
117 mock_load.assert_called_once_with(filename)
118 mock_load_dict.assert_called_once_with(mock_load(filename))
121@pytest.mark.parametrize(
122 "split", [(0.8, 0.2), (0.6, 0.4), (0.55, 0.45), (0, 1), (1, 0)]
123)
124@pytest.mark.parametrize("labels", [True, False])
125def test_split_dataset(
126 mock_dataloaders,
127 default_parametric_tsne_instance,
128 split: Tuple[float, float],
129 labels: bool,
130):
131 tsne_instance, _ = default_parametric_tsne_instance
132 y = None
133 X = torch.randn(100, 10)
134 if labels:
135 y = torch.randint(0, 2, (100,))
136 train_dataloader, test_dataloader = tsne_instance.split_dataset(
137 X, y, train_size=split[0], test_size=split[1]
138 )
140 assert isinstance(train_dataloader, DataLoader)
141 assert isinstance(test_dataloader, DataLoader)
142 assert mock_dataloaders.call_count == 1
144 args = mock_dataloaders.call_args_list[0].args
145 train = args[1]
146 test = args[2]
148 train_len = len(train) if split[0] != 0 else None
149 test_len = len(test) if split[1] != 0 else None
150 tensors_number = 1 if not labels else 2
152 if train_len is None or test_len is None:
153 if split[0] == 0:
154 assert train is None
155 assert len(test.dataset.tensors) == tensors_number
156 assert test_len == X.shape[0]
157 else:
158 assert test is None
159 assert len(train.dataset.tensors) == tensors_number
160 assert train_len is X.shape[0]
161 else:
162 assert len(train.dataset.tensors) == tensors_number
163 assert len(test.dataset.tensors) == tensors_number
165 eps = 1e-4
166 assert (
167 split[0] - eps < train_len / (train_len + test_len) < split[0] + eps
168 ) is True
169 assert (
170 split[1] - eps < test_len / (train_len + test_len) < split[1] + eps
171 ) is True
174@pytest.mark.parametrize(
175 "input_values, output",
176 [
177 ((None, None), (0.8, 0.2)),
178 ((0.7, None), (0.7, 0.3)),
179 ((None, 0.4), (0.6, 0.4)),
180 ((0.6, 0.4), (0.6, 0.4)),
181 ((0.8, 0.5), (0.8, 0.2)),
182 ((0.5, 0.8), (0.5, 0.5)),
183 ],
184)
185def test_determine_train_test_split(
186 default_parametric_tsne_instance,
187 input_values: Tuple[float | None, float | None],
188 output: Tuple[float, float],
189):
190 tsne_instance, _ = default_parametric_tsne_instance
191 train_size, test_size = tsne_instance._determine_train_test_split(*input_values)
192 eps = 1e-4
193 assert (train_size - eps < output[0] < train_size + eps) is True
194 assert (test_size - eps < output[1] < test_size + eps) is True
197@pytest.mark.parametrize(
198 "train_dataset",
199 [TensorDataset(torch.randn(100, 10), torch.randint(0, 2, (100,))), None],
200)
201@pytest.mark.parametrize(
202 "test_dataset",
203 [TensorDataset(torch.randn(20, 10), torch.randint(0, 2, (20,))), None],
204)
205def test_create_dataloaders(
206 default_parametric_tsne_instance,
207 train_dataset: TensorDataset | None,
208 test_dataset: TensorDataset | None,
209):
210 tsne_instance, _ = default_parametric_tsne_instance
211 train_loader, test_loader = tsne_instance.create_dataloaders(
212 train_dataset, test_dataset
213 )
215 if train_dataset is None:
216 assert train_loader is None
217 else:
218 assert isinstance(train_loader, DataLoader)
220 if test_dataset is None:
221 assert test_loader is None
222 else:
223 assert isinstance(test_loader, DataLoader)
226def test_calculate_P(default_parametric_tsne_instance):
227 TQDM_DISABLE = 1
228 tsne_instance, params = default_parametric_tsne_instance
230 dataloader = DataLoader(
231 TensorDataset(torch.randn(50, 15), torch.randint(0, 2, (50,))),
232 batch_size=params["batch_size"],
233 )
235 result_P = tsne_instance._calculate_P(dataloader)
237 assert result_P.shape == (50, params["batch_size"])
240@pytest.mark.parametrize("fill_with", [0, "NaN"])
241@patch("NeuralTSNE.TSNE.ParametricTSNE.parametric_tsne.x2p")
242def test_calculate_P_mocked(
243 mock_x2p: MagicMock, default_parametric_tsne_instance, fill_with: Any
244):
245 TQDM_DISABLE = 1
246 tsne_instance, params = default_parametric_tsne_instance
247 samples = 50
249 dataloader = DataLoader(
250 TensorDataset(torch.randn(samples, 15), torch.randint(0, 2, (50,))),
251 batch_size=params["batch_size"],
252 )
254 select_one = random.randint(0, params["batch_size"] - 1)
255 if fill_with == 0:
256 ret = torch.zeros(params["batch_size"], params["batch_size"])
257 if fill_with == "NaN":
258 ret = torch.full((params["batch_size"], params["batch_size"]), torch.nan)
260 ret[select_one] = 1
261 mock_x2p.return_value = ret
263 result_P = tsne_instance._calculate_P(dataloader)
265 assert result_P.shape == (samples, params["batch_size"])
267 expected_tensor = torch.zeros(samples, params["batch_size"])
268 expected_tensor[:, select_one] = 1
269 for i in range(0, samples, params["batch_size"]):
270 cut = expected_tensor[i : i + params["batch_size"]]
271 cut = cut + cut.T
272 cut = cut / cut.sum()
273 expected_tensor[i : i + params["batch_size"]] = cut
274 assert torch.allclose(result_P, expected_tensor)
277@pytest.mark.parametrize("fill_with", [0, "NaN"])
278@patch("NeuralTSNE.TSNE.ParametricTSNE.parametric_tsne.x2p")
279def test_calculate_P_mocked_nan(
280 mock_x2p: MagicMock, default_parametric_tsne_instance, fill_with: Any
281):
282 TQDM_DISABLE = 1
283 tsne_instance, params = default_parametric_tsne_instance
284 samples = 50
286 dataloader = DataLoader(
287 TensorDataset(torch.randn(samples, 15), torch.randint(0, 2, (50,))),
288 batch_size=params["batch_size"],
289 )
291 if fill_with == 0:
292 ret = torch.zeros(params["batch_size"], params["batch_size"])
293 if fill_with == "NaN":
294 ret = torch.full((params["batch_size"], params["batch_size"]), torch.nan)
296 mock_x2p.return_value = ret
298 result_P = tsne_instance._calculate_P(dataloader)
300 assert result_P.shape == (samples, params["batch_size"])
301 assert torch.allclose(
302 result_P, torch.full((samples, params["batch_size"]), torch.nan), equal_nan=True
303 )