-
-
Notifications
You must be signed in to change notification settings - Fork 85
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Document embedding script and widget
- Loading branch information
Showing
4 changed files
with
651 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
import unittest | ||
from unittest.mock import patch | ||
import asyncio | ||
from numpy.testing import assert_array_equal | ||
|
||
from orangecontrib.text.vectorization.document_embedder import PretrainedEmbedder | ||
from orangecontrib.text import Corpus | ||
|
||
PATCH_METHOD = 'httpx.AsyncClient.post' | ||
|
||
class DummyResponse: | ||
def __init__(self, content): | ||
self.content = content | ||
|
||
def make_dummy_post(response, sleep=0): | ||
@staticmethod | ||
async def dummy_post(url, headers, data): | ||
await asyncio.sleep(sleep) | ||
return DummyResponse(content=response) | ||
return dummy_post | ||
|
||
class PretrainedEmbedderTest(unittest.TestCase): | ||
|
||
def setUp(self): | ||
self.embedder = PretrainedEmbedder() # default params | ||
self.corpus = Corpus.from_file('deerwester') | ||
|
||
def tearDown(self): | ||
self.embedder.clear_cache() | ||
|
||
@patch(PATCH_METHOD) | ||
def test_with_empty_corpus(self, mock): | ||
self.assertEqual(len(self.embedder(self.corpus[:0])), 0) | ||
mock.request.assert_not_called() | ||
mock.get_response.assert_not_called() | ||
self.assertEqual(self.embedder._embedder._cache._cache_dict, dict()) | ||
|
||
@patch(PATCH_METHOD, make_dummy_post(b'{"embedding": [0.3, 1]}')) | ||
def test_success_subset(self): | ||
res = self.embedder(self.corpus[[0]]) | ||
assert_array_equal(res.X, [[0.3, 1]]) | ||
self.assertEqual(len(self.embedder._embedder._cache._cache_dict), 1) | ||
|
||
@patch(PATCH_METHOD, make_dummy_post(b'{"embedding": [0.3, 1]}')) | ||
def test_success_shapes(self): | ||
res = self.embedder(self.corpus) | ||
self.assertEqual(res.X.shape, (len(self.corpus), 2)) | ||
self.assertEqual(len(res.domain), len(self.corpus.domain) + 2) | ||
|
||
@patch(PATCH_METHOD, make_dummy_post(b'')) | ||
def test_empty_response(self): | ||
with self.assertWarns(RuntimeWarning): | ||
res = self.embedder(self.corpus[[0]]) | ||
self.assertEqual(res.X.shape, (0, 0)) | ||
self.assertEqual(len(self.embedder._embedder._cache._cache_dict), 0) | ||
|
||
@patch(PATCH_METHOD, make_dummy_post(b'str')) | ||
def test_invalid_response(self): | ||
with self.assertWarns(RuntimeWarning): | ||
res = self.embedder(self.corpus[[0]]) | ||
self.assertEqual(res.X.shape, (0, 0)) | ||
self.assertEqual(len(self.embedder._embedder._cache._cache_dict), 0) | ||
|
||
@patch(PATCH_METHOD, make_dummy_post(b'{"embeddings": [0.3, 1]}')) | ||
def test_invalid_json_key(self): | ||
with self.assertWarns(RuntimeWarning): | ||
res = self.embedder(self.corpus[[0]]) | ||
self.assertEqual(res.X.shape, (0, 0)) | ||
self.assertEqual(len(self.embedder._embedder._cache._cache_dict), 0) | ||
|
||
@patch(PATCH_METHOD, make_dummy_post(b'{"embedding": [0.3, 1]}')) | ||
def test_persistent_caching(self): | ||
self.assertEqual(len(self.embedder._embedder._cache._cache_dict), 0) | ||
self.embedder(self.corpus[[0]]) | ||
self.assertEqual(len(self.embedder._embedder._cache._cache_dict), 1) | ||
self.embedder._embedder._cache.persist_cache() | ||
|
||
self.embedder = PretrainedEmbedder() | ||
self.assertEqual(len(self.embedder._embedder._cache._cache_dict), 1) | ||
|
||
self.embedder.clear_cache() | ||
self.embedder = PretrainedEmbedder() | ||
self.assertEqual(len(self.embedder._embedder._cache._cache_dict), 0) | ||
|
||
@patch(PATCH_METHOD, make_dummy_post(b'{"embedding": [0.3, 1]}')) | ||
def test_cache_for_different_languages(self): | ||
embedder = PretrainedEmbedder(language='sl') | ||
embedder.clear_cache() | ||
self.assertEqual(len(embedder._embedder._cache._cache_dict), 0) | ||
embedder(self.corpus[[0]]) | ||
self.assertEqual(len(embedder._embedder._cache._cache_dict), 1) | ||
embedder._embedder._cache.persist_cache() | ||
|
||
self.embedder = PretrainedEmbedder() | ||
self.assertEqual(len(self.embedder._embedder._cache._cache_dict), 0) | ||
self.embedder._embedder._cache.persist_cache() | ||
|
||
embedder = PretrainedEmbedder(language='sl') | ||
self.assertEqual(len(embedder._embedder._cache._cache_dict), 1) | ||
embedder.clear_cache() | ||
self.embedder.clear_cache() | ||
|
||
@patch(PATCH_METHOD, make_dummy_post(b'{"embedding": [0.3, 1]}')) | ||
def test_cache_for_different_aggregators(self): | ||
embedder = PretrainedEmbedder(aggregator='max') | ||
embedder.clear_cache() | ||
self.assertEqual(len(embedder._embedder._cache._cache_dict), 0) | ||
embedder(self.corpus[[0]]) | ||
self.assertEqual(len(embedder._embedder._cache._cache_dict), 1) | ||
embedder._embedder._cache.persist_cache() | ||
|
||
embedder = PretrainedEmbedder(aggregator='min') | ||
self.assertEqual(len(embedder._embedder._cache._cache_dict), 1) | ||
embedder(self.corpus[[0]]) | ||
self.assertEqual(len(embedder._embedder._cache._cache_dict), 2) | ||
|
||
@patch(PATCH_METHOD, make_dummy_post(b'{"embedding": [0.3, 1]}')) | ||
def test_with_statement(self): | ||
with self.embedder as embedder: | ||
res = embedder(self.corpus[[0]]) | ||
assert_array_equal(res.X, [[0.3, 1]]) | ||
|
||
@patch(PATCH_METHOD, make_dummy_post(b'{"embedding": [0.3, 1]}')) | ||
def test_cancel(self): | ||
self.assertFalse(self.embedder._embedder._cancelled) | ||
self.embedder._embedder._cancelled = True | ||
with self.assertRaises(Exception): | ||
self.embedder(self.corpus[[0]]) | ||
|
||
@patch(PATCH_METHOD, side_effect=OSError) | ||
def test_connection_error(self, _): | ||
embedder = PretrainedEmbedder() | ||
with self.assertRaises(ConnectionError): | ||
embedder(self.corpus[[0]]) | ||
|
||
def test_invalid_parameters(self): | ||
with self.assertRaises(ValueError): | ||
self.embedder = PretrainedEmbedder(language='eng') | ||
with self.assertRaises(ValueError): | ||
self.embedder = PretrainedEmbedder(aggregator='average') | ||
|
||
def test_invalid_corpus_type(self): | ||
with self.assertRaises(ValueError): | ||
self.embedder(self.corpus[0]) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
"""This module contains classes used for embedding documents | ||
into a vector space. | ||
""" | ||
import zlib | ||
import base64 | ||
import json | ||
import sys | ||
import warnings | ||
from typing import Tuple, Any, Optional | ||
import numpy as np | ||
|
||
from Orange.misc.server_embedder import ServerEmbedderCommunicator | ||
from orangecontrib.text import Corpus | ||
|
||
|
||
AGGREGATORS = ['mean', 'sum', 'max', 'min'] | ||
LANGS_TO_ISO = {'English': 'en', 'Slovenian': 'sl', 'German': 'de'} | ||
LANGUAGES = list(LANGS_TO_ISO.values()) | ||
|
||
class PretrainedEmbedder: | ||
"""This class is used for obtaining dense embeddings of documents in | ||
corpus using fastText pretrained models from: | ||
E. Grave, P. Bojanowski, P. Gupta, A. Joulin, T. Mikolov, | ||
Learning Word Vectors for 157 Languages. | ||
Proceedings of the International Conference on Language Resources and Evaluation, 2018. | ||
Embedding is performed on server so the internet connection is a | ||
prerequisite for using the class. Currently supported languages are: | ||
- English (en) | ||
- Slovenian (sl) | ||
- German (de) | ||
Attributes | ||
---------- | ||
language : str | ||
ISO 639-1 (two-letter) code of desired language. | ||
aggregator : str | ||
Aggregator which creates document embedding (single | ||
vector) from word embeddings (multiple vectors). | ||
Allowed values are mean, sum, max, min. | ||
""" | ||
|
||
def __init__(self, language: str = 'en', | ||
aggregator: str = 'mean') -> None: | ||
lang_error = '{} is not a valid language. Allowed values: {}' | ||
agg_error = '{} is not a valid aggregator. Allowed values: {}' | ||
if language.lower() not in LANGUAGES: | ||
raise ValueError(lang_error.format(language, ', '.join(LANGUAGES))) | ||
self.language = language.lower() | ||
if aggregator.lower() not in AGGREGATORS: | ||
raise ValueError(agg_error.format(aggregator, ', '.join(AGGREGATORS))) | ||
self.aggregator = aggregator.lower() | ||
|
||
self._dim = None | ||
self._embedder = _ServerEmbedder(self.aggregator, | ||
model_name='fasttext-'+self.language, | ||
max_parallel_requests=100, | ||
server_url='https://example.com', | ||
# TODO set proper url | ||
embedder_type='text') | ||
|
||
def __call__(self, corpus: Corpus, copy: bool = True, | ||
processed_callback=None) -> Corpus: | ||
"""Adds matrix of document embeddings to a corpus. | ||
Parameters | ||
---------- | ||
corpus : Corpus | ||
Corpus on which transform is performed. | ||
copy : bool | ||
If set to True, a copy of corpus is made. | ||
Returns | ||
------- | ||
Corpus | ||
Corpus (original or a copy) with new features added. | ||
Raises | ||
------ | ||
ValueError | ||
If corpus is not instance of Corpus. | ||
RuntimeError | ||
If document in corpus is larger than | ||
50 KB after compression. | ||
""" | ||
if not isinstance(corpus, Corpus): | ||
raise ValueError("Input should be instance of Corpus.") | ||
corpus = corpus.copy() if copy else corpus | ||
embs = self._embedder.embedd_data( | ||
corpus.tokens, | ||
processed_callback=processed_callback) | ||
|
||
for emb in embs: # find embedding dimension | ||
if emb is not None: | ||
self._dim = len(emb) | ||
break | ||
# Check if some documents in corpus in weren't embedded | ||
# for some reason. This is a very rare case. | ||
warnings.simplefilter('always', RuntimeWarning) | ||
inds = list() | ||
for i, emb in enumerate(embs): | ||
if emb is not None: | ||
inds.append(i) | ||
else: | ||
embs[i] = np.zeros(self._dim) * np.nan | ||
warnings.warn(("Some documents were not embedded for " + | ||
"unknown reason. Those documents " + | ||
"are skipped."), | ||
RuntimeWarning) | ||
variable_attrs = { | ||
'hidden': True, | ||
'skip-normalization': True, | ||
'document-embedding-feature': True | ||
} | ||
embs = np.array(embs) | ||
new_corpus = corpus[inds] | ||
|
||
if len(inds) > 0: | ||
# if at least one embedding is not None, | ||
# extend attributes | ||
new_corpus.extend_attributes( | ||
np.array(embs[inds]), | ||
['Dim{}'.format(i) for i in range(self._dim)], | ||
var_attrs=variable_attrs) | ||
return new_corpus | ||
|
||
def report(self) -> Tuple[Tuple[str, str], Tuple[str, str]]: | ||
"""Reports on current parameters of PretrainedEmbedder. | ||
Returns | ||
------- | ||
tuple | ||
Tuple of parameters. | ||
""" | ||
return (('Language', self.language), | ||
('Aggregator', self.aggregator)) | ||
|
||
def set_cancelled(self): | ||
"""Cancels current embedding process""" | ||
if hasattr(self, '_embedder'): | ||
self._embedder.set_cancelled() | ||
|
||
def clear_cache(self): | ||
"""Clears embedder cache""" | ||
if self._embedder: | ||
self._embedder.clear_cache() | ||
|
||
def __enter__(self): | ||
return self | ||
|
||
def __exit__(self, ex_type, value, traceback): | ||
self.set_cancelled() | ||
|
||
def __del__(self): | ||
self.__exit__(None, None, None) | ||
|
||
|
||
class _ServerEmbedder(ServerEmbedderCommunicator): | ||
def __init__(self, aggregator: str, *args, **kwargs) -> None: | ||
super().__init__(*args, **kwargs) | ||
self.content_type = 'application/json' | ||
self.aggregator = aggregator | ||
|
||
async def _encode_data_instance(self, data_instance: Any) -> Optional[bytes]: | ||
data_string = json.dumps(list(data_instance)) | ||
data = base64.b64encode(zlib.compress( | ||
data_string.encode('utf-8', 'replace'), | ||
level=-1)).decode('utf-8', 'replace') | ||
|
||
if sys.getsizeof(data) > 50000: | ||
raise RuntimeError("Document in corpus is too large. \ | ||
Size limit is 50 KB (after compression).") | ||
|
||
data_dict = { | ||
"data": data, | ||
"aggregator": self.aggregator | ||
} | ||
|
||
json_string = json.dumps(data_dict) | ||
return json_string.encode('utf-8', 'replace') | ||
|
||
if __name__ == '__main__': | ||
with PretrainedEmbedder(language='en', aggregator='max') as embedder: | ||
embedder.clear_cache() | ||
embedder(Corpus.from_file('deerwester')) |
Oops, something went wrong.