diff --git a/databuilder/databuilder/publisher/elasticsearch_publisher.py b/databuilder/databuilder/publisher/elasticsearch_publisher.py index b024844bc1..4eb7cd3045 100644 --- a/databuilder/databuilder/publisher/elasticsearch_publisher.py +++ b/databuilder/databuilder/publisher/elasticsearch_publisher.py @@ -90,7 +90,8 @@ def publish_impl(self) -> None: cnt = 0 # create new index with mapping - self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping) + self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping, + params={'include_type_name': 'true'}) for action in actions: index_row = dict(index=dict(_index=self.elasticsearch_new_index, _type=self.elasticsearch_type)) diff --git a/databuilder/requirements.txt b/databuilder/requirements.txt index e885aaa191..8439187f59 100644 --- a/databuilder/requirements.txt +++ b/databuilder/requirements.txt @@ -1,7 +1,7 @@ # Copyright Contributors to the Amundsen project. # SPDX-License-Identifier: Apache-2.0 -elasticsearch>=6.2.0,<7.0 +elasticsearch>=6.2.0,<8.0 neo4j-driver>=1.7.2,<2.0 requests>=2.25.0,<3.0 diff --git a/databuilder/tests/unit/publisher/test_elasticsearch_publisher.py b/databuilder/tests/unit/publisher/test_elasticsearch_publisher.py index 22b00b3c7f..bdcc23e63f 100644 --- a/databuilder/tests/unit/publisher/test_elasticsearch_publisher.py +++ b/databuilder/tests/unit/publisher/test_elasticsearch_publisher.py @@ -69,7 +69,8 @@ def test_publish_with_data_and_no_old_index(self) -> None: # ensure indices create endpoint was called default_mapping = ElasticsearchPublisher.DEFAULT_ELASTICSEARCH_INDEX_MAPPING self.mock_es_client.indices.create.assert_called_once_with(index=self.test_es_new_index, - body=default_mapping) + body=default_mapping, + params={'include_type_name': 'true'}) # bulk endpoint called once self.mock_es_client.bulk.assert_called_once_with( @@ -102,7 +103,8 @@ def test_publish_with_data_and_old_index(self) -> None: # ensure indices create endpoint was called default_mapping = ElasticsearchPublisher.DEFAULT_ELASTICSEARCH_INDEX_MAPPING self.mock_es_client.indices.create.assert_called_once_with(index=self.test_es_new_index, - body=default_mapping) + body=default_mapping, + params={'include_type_name': 'true'}) # bulk endpoint called once self.mock_es_client.bulk.assert_called_once_with( diff --git a/docker-amundsen-local.yml b/docker-amundsen-local.yml index fdda8cc9ee..8129d0ed58 100644 --- a/docker-amundsen-local.yml +++ b/docker-amundsen-local.yml @@ -18,7 +18,7 @@ services: networks: - amundsennet elasticsearch: - image: elasticsearch:6.7.0 + image: elasticsearch:7.13.3 container_name: es_amundsen ports: - 9200:9200 @@ -28,6 +28,8 @@ services: nofile: soft: 65536 hard: 65536 + environment: + - discovery.type=single-node amundsensearch: build: context: . diff --git a/docker-amundsen.yml b/docker-amundsen.yml index 3dd7365cc7..6fa73711f7 100644 --- a/docker-amundsen.yml +++ b/docker-amundsen.yml @@ -20,7 +20,7 @@ services: networks: - amundsennet elasticsearch: - image: elasticsearch:6.7.0 + image: elasticsearch:7.13.3 container_name: es_amundsen ports: - 9200:9200 @@ -32,6 +32,8 @@ services: nofile: soft: 65536 hard: 65536 + environment: + - discovery.type=single-node amundsensearch: image: amundsendev/amundsen-search:2.4.1 container_name: amundsensearch diff --git a/search/requirements.txt b/search/requirements.txt index e67d257a13..9078ef2158 100644 --- a/search/requirements.txt +++ b/search/requirements.txt @@ -2,5 +2,5 @@ # SPDX-License-Identifier: Apache-2.0 pyatlasclient==1.0.3 -elasticsearch==6.8.2 -elasticsearch-dsl==6.4.0 +elasticsearch==7.13.3 +elasticsearch-dsl==7.4.0 diff --git a/search/search_service/config.py b/search/search_service/config.py index 78244d902f..78e880307f 100644 --- a/search/search_service/config.py +++ b/search/search_service/config.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 import os +from typing import Any, Optional ELASTICSEARCH_INDEX_KEY = 'ELASTICSEARCH_INDEX' SEARCH_PAGE_SIZE_KEY = 'SEARCH_PAGE_SIZE' @@ -47,7 +48,7 @@ class LocalConfig(Config): PORT=PROXY_PORT) ) PROXY_CLIENT = PROXY_CLIENTS[os.environ.get('PROXY_CLIENT', 'ELASTICSEARCH')] - PROXY_CLIENT_KEY = os.environ.get('PROXY_CLIENT_KEY') + PROXY_CLIENT_KEY = os.environ.get('PROXY_CLIENT_KEY') # type: Optional[Any] PROXY_USER = os.environ.get('CREDENTIALS_PROXY_USER', 'elastic') PROXY_PASSWORD = os.environ.get('CREDENTIALS_PROXY_PASSWORD', 'elastic') diff --git a/search/search_service/proxy/elasticsearch.py b/search/search_service/proxy/elasticsearch.py index f3d940ffc3..c4d2f02995 100644 --- a/search/search_service/proxy/elasticsearch.py +++ b/search/search_service/proxy/elasticsearch.py @@ -14,6 +14,7 @@ from elasticsearch import Elasticsearch from elasticsearch.exceptions import ConnectionError as ElasticConnectionError, NotFoundError from elasticsearch_dsl import Search, query +from elasticsearch_dsl.utils import AttrDict from flask import current_app from search_service import config @@ -303,7 +304,13 @@ def _get_search_result(self, page_index: int, except Exception: LOGGING.exception('The record doesnt contain specified field.') - return search_result_model(total_results=response.hits.total, + # This is to support ESv7.x, and newer version of elasticsearch_dsl + if isinstance(response.hits.total, AttrDict): + _total = response.hits.total.value + else: + _total = response.hits.total + + return search_result_model(total_results=_total, results=results) def _get_instance(self, attr: str, val: Any) -> Any: @@ -330,6 +337,9 @@ def _search_helper(self, page_index: int, :param query_name: name of query to query the ES :return: """ + # This is to support ESv7.x + # ref: https://www.elastic.co/guide/en/elasticsearch/reference/7.0/breaking-changes-7.0.html#track-total-hits-10000-default # noqa: E501 + client = client.extra(track_total_hits=True) if query_name: q = query.Q(query_name) @@ -735,7 +745,7 @@ def _build_delete_actions(self, data: List[str], index_key: str, type: str) -> L return [{'delete': {'_index': index_key, '_id': id, '_type': type}} for id in data] def _bulk_helper(self, actions: List[Dict[str, Any]]) -> None: - result = self.elasticsearch.bulk(actions) + result = self.elasticsearch.bulk(body=actions) if result['errors']: # ES's error messages are nested within elasticsearch objects and can @@ -751,7 +761,7 @@ def _fetch_old_index(self, alias: str) -> List[str]: :return: list of elasticsearch indices """ try: - indices = self.elasticsearch.indices.get_alias(alias).keys() + indices = self.elasticsearch.indices.get_alias(index=alias).keys() return indices except NotFoundError: LOGGING.warn('Received index not found error from Elasticsearch', exc_info=True) @@ -776,5 +786,5 @@ def _get_mapping(alias: str) -> str: # alias our new index index_actions = {'actions': [{'add': {'index': index_key, 'alias': alias}}]} - self.elasticsearch.indices.update_aliases(index_actions) + self.elasticsearch.indices.update_aliases(body=index_actions) return index_key diff --git a/search/tests/unit/proxy/test_elasticsearch.py b/search/tests/unit/proxy/test_elasticsearch.py index deffb62332..c89933feca 100644 --- a/search/tests/unit/proxy/test_elasticsearch.py +++ b/search/tests/unit/proxy/test_elasticsearch.py @@ -5,6 +5,7 @@ from typing import ( # noqa: F401 Any, Iterable, List, ) +from unittest import mock from unittest.mock import MagicMock, patch from amundsen_common.models.api import health_check @@ -164,8 +165,9 @@ def test_setup_client(self) -> None: ) a = self.es_proxy.elasticsearch for client in [a, a.cat, a.cluster, a.indices, a.ingest, a.nodes, a.snapshot, a.tasks]: - self.assertEqual(client.transport.hosts[0]['host'], "0.0.0.0") - self.assertEqual(client.transport.hosts[0]['port'], 9200) + _host = client.transport.hosts[0] # type: ignore + self.assertEqual(_host['host'], "0.0.0.0") + self.assertEqual(_host['port'], 9200) @patch('search_service.proxy.elasticsearch.Elasticsearch', autospec=True) def test_setup_client_with_username_and_password(self, elasticsearch_mock: MagicMock) -> None: @@ -527,7 +529,6 @@ def test_create_document(self, mock_uuid: MagicMock) -> None: mock_elasticsearch = self.es_proxy.elasticsearch new_index_name = 'tester_index_name' mock_uuid.return_value = new_index_name - mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})]) start_data = [ Table(id='snowflake://blue.test_schema/bank_accounts', cluster='blue', column_names=['1', '2'], database='snowflake', schema='test_schema', description='A table for something', @@ -592,12 +593,17 @@ def test_create_document(self, mock_uuid: MagicMock) -> None: 'programmatic_descriptions': ["test"] } ] - mock_elasticsearch.bulk.return_value = {'errors': False} + + _get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias) + _get_alias.return_value = dict([(new_index_name, {})]) + + _bulk = mock.create_autospec(mock_elasticsearch.indices.get_alias) + _bulk.return_value = {'errors': False} expected_alias = 'table_search_index' result = self.es_proxy.create_document(data=start_data, index=expected_alias) self.assertEqual(expected_alias, result) - mock_elasticsearch.bulk.assert_called_with(expected_data) + _bulk.assert_called_with(body=expected_data) def test_update_document_with_no_data(self) -> None: expected = '' @@ -608,7 +614,8 @@ def test_update_document_with_no_data(self) -> None: def test_update_document(self, mock_uuid: MagicMock) -> None: mock_elasticsearch = self.es_proxy.elasticsearch new_index_name = 'tester_index_name' - mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})]) + _get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias) + _get_alias.return_value = dict([(new_index_name, {})]) mock_uuid.return_value = new_index_name table_key = 'snowflake://blue.test_schema/bitcoin_wallets' expected_alias = 'table_search_index' @@ -650,14 +657,18 @@ def test_update_document(self, mock_uuid: MagicMock) -> None: ] result = self.es_proxy.update_document(data=data, index=expected_alias) self.assertEqual(expected_alias, result) - mock_elasticsearch.bulk.assert_called_with(expected_data) + _bulk = mock.create_autospec(mock_elasticsearch.bulk) + _bulk.assert_called_with(body=expected_data) @patch('uuid.uuid4') def test_delete_table_document(self, mock_uuid: MagicMock) -> None: mock_elasticsearch = self.es_proxy.elasticsearch new_index_name = 'tester_index_name' mock_uuid.return_value = new_index_name - mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})]) + + _get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias) + _get_alias.return_value = dict([(new_index_name, {})]) + expected_alias = 'table_search_index' data = ['id1', 'id2'] @@ -668,14 +679,16 @@ def test_delete_table_document(self, mock_uuid: MagicMock) -> None: result = self.es_proxy.delete_document(data=data, index=expected_alias) self.assertEqual(expected_alias, result) - mock_elasticsearch.bulk.assert_called_with(expected_data) + _bulk = mock.create_autospec(mock_elasticsearch.bulk) + _bulk.assert_called_with(body=expected_data) @patch('uuid.uuid4') def test_delete_user_document(self, mock_uuid: MagicMock) -> None: mock_elasticsearch = self.es_proxy.elasticsearch new_index_name = 'tester_index_name' mock_uuid.return_value = new_index_name - mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})]) + _get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias) + _get_alias.return_value = dict([(new_index_name, {})]) expected_alias = 'user_search_index' data = ['id1', 'id2'] @@ -686,14 +699,16 @@ def test_delete_user_document(self, mock_uuid: MagicMock) -> None: result = self.es_proxy.delete_document(data=data, index=expected_alias) self.assertEqual(expected_alias, result) - mock_elasticsearch.bulk.assert_called_with(expected_data) + _bulk = mock.create_autospec(mock_elasticsearch.bulk) + _bulk.assert_called_with(body=expected_data) @patch('uuid.uuid4') def test_delete_feature_document(self, mock_uuid: MagicMock) -> None: mock_elasticsearch = self.es_proxy.elasticsearch new_index_name = 'test_indx' mock_uuid.return_value = new_index_name - mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})]) + _get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias) + _get_alias.return_value = dict([(new_index_name, {})]) expected_alias = 'feature_search_index' data = ['id1', 'id2'] @@ -704,7 +719,8 @@ def test_delete_feature_document(self, mock_uuid: MagicMock) -> None: result = self.es_proxy.delete_document(data=data, index=expected_alias) self.assertEqual(expected_alias, result) - mock_elasticsearch.bulk.assert_called_with(expected_data) + _bulk = mock.create_autospec(mock_elasticsearch.bulk) + _bulk.assert_called_with(body=expected_data) def test_get_instance_string(self) -> None: result = self.es_proxy._get_instance('column', 'value')