Skip to content

Commit

Permalink
Merge pull request #160 from statisticsnorway/restrict-numpy
Browse files Browse the repository at this point in the history
Remove Spark workaround. Loosen pyarrow constraints
  • Loading branch information
mallport authored Jun 19, 2024
2 parents 9ca2760 + faea90a commit 8fb00da
Show file tree
Hide file tree
Showing 13 changed files with 1,381 additions and 1,168 deletions.
9 changes: 6 additions & 3 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.testing.unittestArgs": [
"-v",
"-s",
"./tests",
"-p",
"test*.py"
],
"python.testing.pytestEnabled": false,
"python.testing.unittestEnabled": true
]
}
2,435 changes: 1,343 additions & 1,092 deletions poetry.lock

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "dapla-toolbelt"
version = "2.0.17"
version = "2.0.18"
description = "Dapla Toolbelt"
authors = ["Dapla Developers <[email protected]>"]
license = "MIT"
Expand All @@ -17,7 +17,7 @@ Changelog = "https://github.com/statisticsnorway/dapla-toolbelt/releases"
[tool.poetry.dependencies]
python = ">=3.10,<4.0"
requests = ">=2.27.1"
pyarrow = ">=14.0.2, <15" # This tight constraint is dependent on fixing https://issues.apache.org/jira/browse/ARROW-7867
pyarrow = ">=14.0.2"
pandas = { version = ">=1.4.2", extras = ["excel", "xml"] }
gcsfs = ">=2022.7.1"
ipython = ">=8.10.0"
Expand Down Expand Up @@ -78,6 +78,10 @@ warn_unreachable = true
pretty = true
show_column_numbers = true
show_error_context = true
disallow_untyped_calls = false
# the above is added due to Google libs being untyped
# note that this *does* enforce typing functions defined in dapla-toolbelt,
# but it allows calling untyped functions in a typed context.

[[tool.mypy.overrides]]
# Allow missing type hints in third-party libraries without type information.
Expand All @@ -87,6 +91,7 @@ module = [
"fsspec.*",
"responses.*",
"tomli.*",
"google.*",
"google.cloud.*",
]
ignore_missing_imports = true
Expand Down
4 changes: 2 additions & 2 deletions src/dapla/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def fetch_local_user_from_jupyter() -> dict[str, Any]:
hub = HubAuth()
response = requests.get(
os.environ["LOCAL_USER_PATH"],
headers={"Authorization": "token %s" % hub.api_token},
headers={"Authorization": f"token {hub.api_token}"},
cert=(hub.certfile, hub.keyfile),
verify=hub.client_ca,
allow_redirects=False,
Expand Down Expand Up @@ -229,6 +229,6 @@ def _print_warning(self) -> None:
display(
HTML(
'Your session has timed out. Please <a href="/hub/login">log in</a> to continue.'
) # type: ignore [no-untyped-call]
)
)
)
6 changes: 3 additions & 3 deletions src/dapla/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def start(self, specification: dict[str, Any]) -> Response:
collector_response = requests.put(
self.collector_url,
headers={
"Authorization": "Bearer %s" % keycloak_token,
"Authorization": f"Bearer {keycloak_token}",
"Content-type": "application/json",
},
data=json.dumps(specification),
Expand All @@ -42,7 +42,7 @@ def running_tasks(self) -> Response:
"""Get all running collector tasks."""
keycloak_token = AuthClient.fetch_personal_token()
collector_response = requests.get(
self.collector_url, headers={"Authorization": "Bearer %s" % keycloak_token}
self.collector_url, headers={"Authorization": f"Bearer {keycloak_token}"}
)
collector_response.raise_for_status()
return collector_response
Expand All @@ -59,7 +59,7 @@ def stop(self, task_id: int) -> Response:
keycloak_token = AuthClient.fetch_personal_token()
collector_response = requests.delete(
f"{self.collector_url}/{task_id}",
headers={"Authorization": "Bearer %s" % keycloak_token},
headers={"Authorization": f"Bearer {keycloak_token}"},
)
if collector_response.status_code == 400:
print(
Expand Down
12 changes: 6 additions & 6 deletions src/dapla/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def start(self, job_config: dict[str, Any]) -> Response:
converter_response = requests.post(
f"{self.converter_url}/jobs",
headers={
"Authorization": "Bearer %s" % keycloak_token,
"Authorization": f"Bearer {keycloak_token}",
"Content-type": "application/json",
},
data=json.dumps(job_config),
Expand All @@ -50,7 +50,7 @@ def start_simulation(self, job_config: dict[str, Any]) -> Response:
converter_response = requests.post(
f"{self.converter_url}/jobs/simulation",
headers={
"Authorization": "Bearer %s" % keycloak_token,
"Authorization": f"Bearer {keycloak_token}",
"Content-type": "application/json",
},
data=json.dumps(job_config),
Expand All @@ -72,7 +72,7 @@ def get_job_summary(self, job_id: str) -> Response:
job_summary = requests.get(
f"{self.converter_url}/jobs/{job_id}/execution-summary",
headers={
"Authorization": "Bearer %s" % keycloak_token,
"Authorization": f"Bearer {keycloak_token}",
"Content-type": "application/json",
},
)
Expand All @@ -93,7 +93,7 @@ def stop_job(self, job_id: str) -> Response:
job_status = requests.post(
f"{self.converter_url}/jobs/{job_id}/stop",
headers={
"Authorization": "Bearer %s" % keycloak_token,
"Authorization": f"Bearer {keycloak_token}",
"Content-type": "application/json",
},
)
Expand Down Expand Up @@ -122,7 +122,7 @@ def get_pseudo_report(self, job_id: str) -> Response:
pseudo_report = requests.get(
f"{self.converter_url}/jobs/{job_id}/reports/pseudo",
headers={
"Authorization": "Bearer %s" % keycloak_token,
"Authorization": f"Bearer {keycloak_token}",
"Content-type": "application/json",
},
)
Expand All @@ -145,7 +145,7 @@ def get_pseudo_schema(self, job_id: str) -> Response:
pseudo_report = requests.get(
f"{self.converter_url}/jobs/{job_id}/reports/pseudo-schema-hierarchy",
headers={
"Authorization": "Bearer %s" % keycloak_token,
"Authorization": f"Bearer {keycloak_token}",
"Content-type": "application/json",
},
)
Expand Down
4 changes: 2 additions & 2 deletions src/dapla/doctor.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ def gcs_credentials_valid() -> bool:
print("Checking your Google Cloud Storage credentials...")

# Fetch the google token
google_token, _ = AuthClient.fetch_google_credentials().token
google_token = AuthClient.fetch_google_credentials().token

try:
requests.get(
"https://oauth2.googleapis.com/tokeninfo?access_token=%s" % google_token
f"https://oauth2.googleapis.com/tokeninfo?access_token={google_token}"
)
except HttpError as ex:
if str(ex) == "Invalid Credentials, 401":
Expand Down
42 changes: 0 additions & 42 deletions src/dapla/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,50 +14,8 @@ def __init__(
) -> None:
"""Initialize GCSFileSystem."""
super().__init__(token=token, **kwargs)
# Temporary bug fix for https://issues.apache.org/jira/browse/ARROW-7867
# Spark writes an empty file to GCS (to mimic a folder structure) before writing partitioned data
# Resolve this by ignoring the "empty" file when reading partitioned parquet files
try:
# Constant is moved to core module in Pyarrow 10.0.0
from pyarrow.parquet.core import ( # type: ignore [attr-defined]
EXCLUDED_PARQUET_PATHS,
)
except ImportError:
# Fallback for Pyarrow versions <10.0.0
from pyarrow.parquet.core import ( # type: ignore [attr-defined]
EXCLUDED_PARQUET_PATHS,
)
from pyarrow.parquet import ParquetManifest

EXCLUDED_PARQUET_PATHS.add("")
ParquetManifest._should_silently_exclude = ( # type: ignore [attr-defined]
GCSFileSystem._should_silently_exclude
)

def isdir(self, path: str) -> bool:
"""Check if path is a directory."""
info = super(gcsfs.GCSFileSystem, self).info(path)
return t.cast(bool, info["type"] == "directory")

@staticmethod
# This code is from from pyarrow.parquet.core
def _should_silently_exclude(file_name: str) -> bool:
try:
# Constant is moved to core module in Pyarrow 10.0.0
from pyarrow.parquet.core import ( # type: ignore [attr-defined]
EXCLUDED_PARQUET_PATHS,
)
except ImportError:
# Fallback for Pyarrow versions <10.0.0
from pyarrow.parquet.core import ( # type: ignore [attr-defined]
EXCLUDED_PARQUET_PATHS,
)

return (
file_name.endswith(".crc")
or file_name.endswith("_$folder$") # Checksums
or file_name.startswith(".") # HDFS directories in S3
or file_name.startswith("_") # Hidden files starting with .
or ".tmp" in file_name # Hidden files starting with _
or file_name in EXCLUDED_PARQUET_PATHS # Temp files
)
4 changes: 2 additions & 2 deletions src/dapla/guardian.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def call_api(
api_response = requests.get(
api_endpoint_url,
headers={
"Authorization": "Bearer %s" % maskinporten_token,
"Authorization": f"Bearer {maskinporten_token}",
"Accept": "application/json",
},
)
Expand Down Expand Up @@ -74,7 +74,7 @@ def get_guardian_token(
guardian_response = requests.post(
guardian_endpoint,
headers={
"Authorization": "Bearer %s" % keycloak_token,
"Authorization": f"Bearer {keycloak_token}",
"Content-type": "application/json",
},
json=body,
Expand Down
2 changes: 1 addition & 1 deletion src/dapla/jupyterhub.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def generate_api_token(
+ os.environ["JUPYTERHUB_USER"]
+ "/tokens",
json=body,
headers={"Authorization": "token %s" % hub.api_token},
headers={"Authorization": f"token {hub.api_token}"},
allow_redirects=False,
)
hub_response.raise_for_status()
Expand Down
3 changes: 1 addition & 2 deletions tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,11 @@ def test_fetch_google_credentials_expired(
client = AuthClient()
credentials = client.fetch_google_credentials()

assert credentials.expired

fetch_google_token_from_oidc_exchange_mock.return_value = (
"google_token",
datetime.now() + timedelta(hours=1),
)

credentials.refresh(None)
assert not credentials.expired

Expand Down
9 changes: 3 additions & 6 deletions tests/test_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ def test_initiate_201_response(auth_client_mock: Mock) -> None:
auth_client_mock.fetch_personal_token.return_value = fake_token
specification: dict[str, str] = {}
worker_id = "abcd"
collector_response = (
"""{
"workerId": "%s"
}"""
% worker_id
)
collector_response = f"""{{
"workerId": "{worker_id}"
}}"""
responses.add(
responses.PUT, collector_test_url, json=collector_response, status=201
)
Expand Down
10 changes: 5 additions & 5 deletions tests/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ def test_gcs_deadlock(mock_fetch_google_token: Mock, mock_is_ready: Mock) -> Non

mock_is_ready.return_value = True # Mock client ready to not use ADC
mock_fetch_google_token.side_effect = [
("FakeToken1", utcnow()), # type: ignore[no-untyped-call]
("FakeToken2", utcnow()), # type: ignore[no-untyped-call]
("FakeToken3", utcnow()), # type: ignore[no-untyped-call]
("FakeToken4", utcnow()), # type: ignore[no-untyped-call]
("FakeToken5Valid", utcnow() + timedelta(seconds=30)), # type: ignore[no-untyped-call]
("FakeToken1", utcnow()),
("FakeToken2", utcnow()),
("FakeToken3", utcnow()),
("FakeToken4", utcnow()),
("FakeToken5Valid", utcnow() + timedelta(seconds=30)),
]

gcs_path = "gs://ssb-dapla-pseudo-data-produkt-test/integration_tests_data/personer.parquet"
Expand Down

0 comments on commit 8fb00da

Please sign in to comment.