Skip to content

Commit

Permalink
chore: Upgrades ES to 7.13.3 (amundsen-io#1386)
Browse files Browse the repository at this point in the history
* Upgrades ES to 7.13.3

Signed-off-by: verdan <[email protected]>

* typo

Signed-off-by: verdan <[email protected]>

* Fixes the test cases, and code review

Signed-off-by: verdan <[email protected]>
  • Loading branch information
verdan authored and Zachary Ruiz committed May 13, 2022
1 parent a1b0a16 commit c8f0c60
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 26 deletions.
3 changes: 2 additions & 1 deletion databuilder/databuilder/publisher/elasticsearch_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ def publish_impl(self) -> None:
cnt = 0

# create new index with mapping
self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping)
self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping,
params={'include_type_name': 'true'})
for action in actions:
index_row = dict(index=dict(_index=self.elasticsearch_new_index,
_type=self.elasticsearch_type))
Expand Down
2 changes: 1 addition & 1 deletion databuilder/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

elasticsearch>=6.2.0,<7.0
elasticsearch>=6.2.0,<8.0
neo4j-driver>=1.7.2,<2.0
requests>=2.25.0,<3.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def test_publish_with_data_and_no_old_index(self) -> None:
# ensure indices create endpoint was called
default_mapping = ElasticsearchPublisher.DEFAULT_ELASTICSEARCH_INDEX_MAPPING
self.mock_es_client.indices.create.assert_called_once_with(index=self.test_es_new_index,
body=default_mapping)
body=default_mapping,
params={'include_type_name': 'true'})

# bulk endpoint called once
self.mock_es_client.bulk.assert_called_once_with(
Expand Down Expand Up @@ -102,7 +103,8 @@ def test_publish_with_data_and_old_index(self) -> None:
# ensure indices create endpoint was called
default_mapping = ElasticsearchPublisher.DEFAULT_ELASTICSEARCH_INDEX_MAPPING
self.mock_es_client.indices.create.assert_called_once_with(index=self.test_es_new_index,
body=default_mapping)
body=default_mapping,
params={'include_type_name': 'true'})

# bulk endpoint called once
self.mock_es_client.bulk.assert_called_once_with(
Expand Down
4 changes: 3 additions & 1 deletion docker-amundsen-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ services:
networks:
- amundsennet
elasticsearch:
image: elasticsearch:6.7.0
image: elasticsearch:7.13.3
container_name: es_amundsen
ports:
- 9200:9200
Expand All @@ -28,6 +28,8 @@ services:
nofile:
soft: 65536
hard: 65536
environment:
- discovery.type=single-node
amundsensearch:
build:
context: .
Expand Down
4 changes: 3 additions & 1 deletion docker-amundsen.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ services:
networks:
- amundsennet
elasticsearch:
image: elasticsearch:6.7.0
image: elasticsearch:7.13.3
container_name: es_amundsen
ports:
- 9200:9200
Expand All @@ -32,6 +32,8 @@ services:
nofile:
soft: 65536
hard: 65536
environment:
- discovery.type=single-node
amundsensearch:
image: amundsendev/amundsen-search:2.4.1
container_name: amundsensearch
Expand Down
4 changes: 2 additions & 2 deletions search/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
# SPDX-License-Identifier: Apache-2.0

pyatlasclient==1.0.3
elasticsearch==6.8.2
elasticsearch-dsl==6.4.0
elasticsearch==7.13.3
elasticsearch-dsl==7.4.0
3 changes: 2 additions & 1 deletion search/search_service/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0

import os
from typing import Any, Optional

ELASTICSEARCH_INDEX_KEY = 'ELASTICSEARCH_INDEX'
SEARCH_PAGE_SIZE_KEY = 'SEARCH_PAGE_SIZE'
Expand Down Expand Up @@ -47,7 +48,7 @@ class LocalConfig(Config):
PORT=PROXY_PORT)
)
PROXY_CLIENT = PROXY_CLIENTS[os.environ.get('PROXY_CLIENT', 'ELASTICSEARCH')]
PROXY_CLIENT_KEY = os.environ.get('PROXY_CLIENT_KEY')
PROXY_CLIENT_KEY = os.environ.get('PROXY_CLIENT_KEY') # type: Optional[Any]
PROXY_USER = os.environ.get('CREDENTIALS_PROXY_USER', 'elastic')
PROXY_PASSWORD = os.environ.get('CREDENTIALS_PROXY_PASSWORD', 'elastic')

Expand Down
18 changes: 14 additions & 4 deletions search/search_service/proxy/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ConnectionError as ElasticConnectionError, NotFoundError
from elasticsearch_dsl import Search, query
from elasticsearch_dsl.utils import AttrDict
from flask import current_app

from search_service import config
Expand Down Expand Up @@ -303,7 +304,13 @@ def _get_search_result(self, page_index: int,
except Exception:
LOGGING.exception('The record doesnt contain specified field.')

return search_result_model(total_results=response.hits.total,
# This is to support ESv7.x, and newer version of elasticsearch_dsl
if isinstance(response.hits.total, AttrDict):
_total = response.hits.total.value
else:
_total = response.hits.total

return search_result_model(total_results=_total,
results=results)

def _get_instance(self, attr: str, val: Any) -> Any:
Expand All @@ -330,6 +337,9 @@ def _search_helper(self, page_index: int,
:param query_name: name of query to query the ES
:return:
"""
# This is to support ESv7.x
# ref: https://www.elastic.co/guide/en/elasticsearch/reference/7.0/breaking-changes-7.0.html#track-total-hits-10000-default # noqa: E501
client = client.extra(track_total_hits=True)

if query_name:
q = query.Q(query_name)
Expand Down Expand Up @@ -735,7 +745,7 @@ def _build_delete_actions(self, data: List[str], index_key: str, type: str) -> L
return [{'delete': {'_index': index_key, '_id': id, '_type': type}} for id in data]

def _bulk_helper(self, actions: List[Dict[str, Any]]) -> None:
result = self.elasticsearch.bulk(actions)
result = self.elasticsearch.bulk(body=actions)

if result['errors']:
# ES's error messages are nested within elasticsearch objects and can
Expand All @@ -751,7 +761,7 @@ def _fetch_old_index(self, alias: str) -> List[str]:
:return: list of elasticsearch indices
"""
try:
indices = self.elasticsearch.indices.get_alias(alias).keys()
indices = self.elasticsearch.indices.get_alias(index=alias).keys()
return indices
except NotFoundError:
LOGGING.warn('Received index not found error from Elasticsearch', exc_info=True)
Expand All @@ -776,5 +786,5 @@ def _get_mapping(alias: str) -> str:

# alias our new index
index_actions = {'actions': [{'add': {'index': index_key, 'alias': alias}}]}
self.elasticsearch.indices.update_aliases(index_actions)
self.elasticsearch.indices.update_aliases(body=index_actions)
return index_key
42 changes: 29 additions & 13 deletions search/tests/unit/proxy/test_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import ( # noqa: F401
Any, Iterable, List,
)
from unittest import mock
from unittest.mock import MagicMock, patch

from amundsen_common.models.api import health_check
Expand Down Expand Up @@ -164,8 +165,9 @@ def test_setup_client(self) -> None:
)
a = self.es_proxy.elasticsearch
for client in [a, a.cat, a.cluster, a.indices, a.ingest, a.nodes, a.snapshot, a.tasks]:
self.assertEqual(client.transport.hosts[0]['host'], "0.0.0.0")
self.assertEqual(client.transport.hosts[0]['port'], 9200)
_host = client.transport.hosts[0] # type: ignore
self.assertEqual(_host['host'], "0.0.0.0")
self.assertEqual(_host['port'], 9200)

@patch('search_service.proxy.elasticsearch.Elasticsearch', autospec=True)
def test_setup_client_with_username_and_password(self, elasticsearch_mock: MagicMock) -> None:
Expand Down Expand Up @@ -527,7 +529,6 @@ def test_create_document(self, mock_uuid: MagicMock) -> None:
mock_elasticsearch = self.es_proxy.elasticsearch
new_index_name = 'tester_index_name'
mock_uuid.return_value = new_index_name
mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})])
start_data = [
Table(id='snowflake://blue.test_schema/bank_accounts', cluster='blue', column_names=['1', '2'],
database='snowflake', schema='test_schema', description='A table for something',
Expand Down Expand Up @@ -592,12 +593,17 @@ def test_create_document(self, mock_uuid: MagicMock) -> None:
'programmatic_descriptions': ["test"]
}
]
mock_elasticsearch.bulk.return_value = {'errors': False}

_get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias)
_get_alias.return_value = dict([(new_index_name, {})])

_bulk = mock.create_autospec(mock_elasticsearch.indices.get_alias)
_bulk.return_value = {'errors': False}

expected_alias = 'table_search_index'
result = self.es_proxy.create_document(data=start_data, index=expected_alias)
self.assertEqual(expected_alias, result)
mock_elasticsearch.bulk.assert_called_with(expected_data)
_bulk.assert_called_with(body=expected_data)

def test_update_document_with_no_data(self) -> None:
expected = ''
Expand All @@ -608,7 +614,8 @@ def test_update_document_with_no_data(self) -> None:
def test_update_document(self, mock_uuid: MagicMock) -> None:
mock_elasticsearch = self.es_proxy.elasticsearch
new_index_name = 'tester_index_name'
mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})])
_get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias)
_get_alias.return_value = dict([(new_index_name, {})])
mock_uuid.return_value = new_index_name
table_key = 'snowflake://blue.test_schema/bitcoin_wallets'
expected_alias = 'table_search_index'
Expand Down Expand Up @@ -650,14 +657,18 @@ def test_update_document(self, mock_uuid: MagicMock) -> None:
]
result = self.es_proxy.update_document(data=data, index=expected_alias)
self.assertEqual(expected_alias, result)
mock_elasticsearch.bulk.assert_called_with(expected_data)
_bulk = mock.create_autospec(mock_elasticsearch.bulk)
_bulk.assert_called_with(body=expected_data)

@patch('uuid.uuid4')
def test_delete_table_document(self, mock_uuid: MagicMock) -> None:
mock_elasticsearch = self.es_proxy.elasticsearch
new_index_name = 'tester_index_name'
mock_uuid.return_value = new_index_name
mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})])

_get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias)
_get_alias.return_value = dict([(new_index_name, {})])

expected_alias = 'table_search_index'
data = ['id1', 'id2']

Expand All @@ -668,14 +679,16 @@ def test_delete_table_document(self, mock_uuid: MagicMock) -> None:
result = self.es_proxy.delete_document(data=data, index=expected_alias)

self.assertEqual(expected_alias, result)
mock_elasticsearch.bulk.assert_called_with(expected_data)
_bulk = mock.create_autospec(mock_elasticsearch.bulk)
_bulk.assert_called_with(body=expected_data)

@patch('uuid.uuid4')
def test_delete_user_document(self, mock_uuid: MagicMock) -> None:
mock_elasticsearch = self.es_proxy.elasticsearch
new_index_name = 'tester_index_name'
mock_uuid.return_value = new_index_name
mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})])
_get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias)
_get_alias.return_value = dict([(new_index_name, {})])
expected_alias = 'user_search_index'
data = ['id1', 'id2']

Expand All @@ -686,14 +699,16 @@ def test_delete_user_document(self, mock_uuid: MagicMock) -> None:
result = self.es_proxy.delete_document(data=data, index=expected_alias)

self.assertEqual(expected_alias, result)
mock_elasticsearch.bulk.assert_called_with(expected_data)
_bulk = mock.create_autospec(mock_elasticsearch.bulk)
_bulk.assert_called_with(body=expected_data)

@patch('uuid.uuid4')
def test_delete_feature_document(self, mock_uuid: MagicMock) -> None:
mock_elasticsearch = self.es_proxy.elasticsearch
new_index_name = 'test_indx'
mock_uuid.return_value = new_index_name
mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})])
_get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias)
_get_alias.return_value = dict([(new_index_name, {})])
expected_alias = 'feature_search_index'
data = ['id1', 'id2']

Expand All @@ -704,7 +719,8 @@ def test_delete_feature_document(self, mock_uuid: MagicMock) -> None:
result = self.es_proxy.delete_document(data=data, index=expected_alias)

self.assertEqual(expected_alias, result)
mock_elasticsearch.bulk.assert_called_with(expected_data)
_bulk = mock.create_autospec(mock_elasticsearch.bulk)
_bulk.assert_called_with(body=expected_data)

def test_get_instance_string(self) -> None:
result = self.es_proxy._get_instance('column', 'value')
Expand Down

0 comments on commit c8f0c60

Please sign in to comment.