Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] annotation wrapper class #282

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .spellcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ matrix:
camel-case: true
mode: markdown
sources:
- "**/*.md|!venv/**"
- "**/*.md|!venv*/**"
dictionary:
wordlists:
- .spellcheck-en-custom.txt
Expand Down
249 changes: 249 additions & 0 deletions src/otk/annotation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
import copy
import os
import pathlib
from typing import Any, Optional, TypeVar, Generic, Union

# import yaml
# from yaml._yaml import SequenceNode, ScalarNode, MappingNode
# from yaml.resolver import BaseResolver

AnnotatedNodeT = TypeVar('AnnotatedNodeT')


class AnnotatedNode(Generic[AnnotatedNodeT]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally would just call it OtkValue or OtkNode (as we are not writing a generic library here (yet))


# to be able to print relative paths
_basedirs = [os.path.curdir]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(unrelated but I can't help noticing) - using a class attribute like this is problematic pattern as this is shared access all instance, I think it's best to avoid this kind of shared state as it may lead to subtle surprises down the line.


def __init__(self, value: Optional[AnnotatedNodeT] = None, annotations: Optional[dict[str, Any]] = None):
self.value: AnnotatedNodeT = self.deep_convert(value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that self.value should be the real value, not anything we convert. If it's a bool, it is a bool, dict etc. This way we avoid the issue that AnnoatedBool(true) is True is actually (and surprisingly) False. With the real value at value this beomces the correct AnnotatedValue(True).value is True == True .

Fwiw, a similar comment as in #280 (comment) - I think the yaml buildup will give us the right types here and we don't need to convert (except for the loading of data fromthe external which we can probably do there instead of here)

self.annotations: dict[str, Any] = annotations or {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just use a dataclass like here with the minimal amount of informatin we need

# don't nest Path in Path - sometimes happens in wrapped calls
if isinstance(self.value, AnnotatedPath) and isinstance(self, AnnotatedPath):
# type/mypy: TBD fix typing - not sure why mypy has this false positive here
self.squash_annotations([self.value]) # type: ignore[attr-defined]
self.value = self.value.value # type: ignore[attr-defined]

def __iter__(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If .value the real value, could we try to get as minimal as we can on the "magic" methods here? It seems an OtkNode would need __eq__ and __hash__ so that values it can be get/set into dicts. Then we would need a Otk{List,Dict} with __getitem__ so that we can navigate the resulting json in a "natural" way but beyond that I am not sure we need more if we always use ".value" for the real manipulations.

return iter(self.value)

def __getitem__(self, item):
if not isinstance(self.value, (dict, list, str)):
raise KeyError
if isinstance(self.value, dict) and isinstance(item, int):
return list(self.value.keys())[item]
return self.value[item]

def __setitem__(self, key, value):
if not isinstance(self.value, (dict, list)):
raise KeyError
self.value[key] = value

def get_annotations(self):
return copy.deepcopy(self.annotations)

def set_annotations(self, annotations: dict) -> None:
self.annotations = annotations

def __eq__(self, other):
if isinstance(other, AnnotatedNode):
return self.value == other.value
return self.value == other

def __hash__(self):
return hash(self.value)

@classmethod
def get_specific_type(cls, value):
ret = None
if isinstance(value, AnnotatedNode):
ret = value
elif isinstance(value, dict):
ret = AnnotatedDict(value)
elif isinstance(value, list):
ret = AnnotatedList(value)
elif isinstance(value, str):
ret = AnnotatedStr(value)
elif isinstance(value, int):
ret = AnnotatedInt(value)
elif isinstance(value, float):
ret = AnnotatedFloat(value)
elif isinstance(value, bool):
ret = AnnotatedBool(value)
if ret is None:
return value
return ret

@classmethod
def deep_convert(cls, data: Any) -> Any:
"""Recursively convert nested dicts and lists into AnnotatedDict and HiddenAttrList."""
if isinstance(data, AnnotatedNode):
return data
if isinstance(data, dict):
return {key: cls.get_specific_type(value) for key, value in data.items()}
if isinstance(data, list):
return [cls.get_specific_type(item) for item in data]

return data

def deep_dump(self) -> Any:
if isinstance(self.value, dict):
return {key: value.deep_dump() for key, value in self.value.items() if not value is None}
if isinstance(self.value, list):
return [item.deep_dump() for item in self.value]

return self.value

def __str__(self):
return str(self.deep_dump())

def normalize_path(self, path: pathlib.Path) -> pathlib.Path:
filename = path
if isinstance(self._basedirs, list):
for d in self._basedirs:
filename = pathlib.Path(os.path.relpath(filename, d))
else:
filename = os.path.relpath(filename, self._basedirs)
return filename

def add_source_attributes(self, key_data, prefix=""):
line_number = key_data.start_mark.line + 1
line_number_end = key_data.end_mark.line + 1
column = key_data.start_mark.column + 1
column_end = key_data.end_mark.column + 1
filename = self.normalize_path(key_data.start_mark.name)
self.annotations[f"{prefix}src"] = f"{filename}:{line_number}"
self.annotations[f"{prefix}filename"] = filename
self.annotations[f"{prefix}line_number"] = line_number
self.annotations[f"{prefix}line_number_end"] = line_number_end
self.annotations[f"{prefix}column"] = column
self.annotations[f"{prefix}column_end"] = column_end

def squash_annotations(self, annotation_list: list) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe merge_annotations as the other ones are merged?

""" just aggregates all annotations in the list.

new annotation keys get appended to the resulting dict

duplicate annotations will get converted to string
(if you need an exception for a key please implement here)

for "src" there is an exception
this is expected to be in the form "filename:line_number"
and will be sanely unified
"""
ret: dict[str, Any] = {}
files: dict[str, str] = {}
for item in [self.annotations] + annotation_list:
if not isinstance(item, AnnotatedNode):
continue
for k in item.annotations.keys():
if k == "src":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merging via string manipulation is not ideal and the special handling of "src" even less. I think we should go YAGNI on this too and just have "src" as a dataclass and then we can have a list of dataclasses if there are multiple annotations (and a maybe a str to format them nicley?)

src = item.annotations[k].split(":")
if src[0] in files:
if files[src[0]] != src[1]:
files[src[0]] += ", " + src[1]
else:
files[src[0]] = src[1]
else:
if k in ret:
if ret[k] != item.annotations[k]:
ret[k] = str(ret[k]) + ", " + str(item.annotations[k])
else:
ret[k] = item.annotations[k]

src_ret = "\n".join([f"* {k}:{v}" for k, v in files.items()])
if len(src_ret) > 0:
ret["src"] = src_ret

self.annotations = ret


# for isinstance() we need actual classes
class AnnotatedList(AnnotatedNode[list]):
pass


class AnnotatedDict(AnnotatedNode[dict]):
def __init__(self, value: Optional[dict] = None, annotations: Optional[dict[str, Any]] = None):
super().__init__(value, annotations)
# default to an empty dictionary on None
if value is None:
self.value = {}


class AnnotatedStr(AnnotatedNode[str]):
pass


class AnnotatedInt(AnnotatedNode[int]):
pass


class AnnotatedFloat(AnnotatedNode[float]):
pass


class AnnotatedBool(AnnotatedNode[bool]):
pass


class AnnotatedPath(AnnotatedNode[pathlib.Path]):
def __init__(self, value: Optional[Union[pathlib.Path, str]] = None, annotations: Optional[dict[str, Any]] = None):
if isinstance(value, str):
value = pathlib.Path(value)
super().__init__(value, annotations)
if not isinstance(self.value, pathlib.Path):
self.value = pathlib.Path(str(self.value))

def fspath_with_include(self) -> str:
src = None
file_name = self.normalize_path(self.value)
src = self.annotations.get("src")
if src is not None:
return f"{file_name} (included from {src})"
return str(file_name)


# class AnnotatedDumper(yaml.Dumper):
# verbose = True
#
# def represent_data(self, data):
# if isinstance(data, Annotated):
# sequence = []
# if isinstance(data.value, dict):
# if data.annotations and self.verbose:
# sequence.append(self.represent_data(f"### {data.annotations}"))
# for key, value in data.value.items():
# if (isinstance(value, Annotated)
# and not isinstance(value.value, (dict, list))
# and value.annotations):
# sequence.append(self.represent_data(f"### {key}: {value.annotations}"))
# sequence.append(self.represent_data({key: value}))
# elif isinstance(data.value, list):
# if data.annotations and self.verbose:
# sequence.append(self.represent_data(f"### {data.annotations}"))
# for i, entry in enumerate(data.value):
# if (isinstance(entry, Annotated)
# and not isinstance(entry.value, (dict, list))
# and entry.annotations):
# sequence.append(self.represent_data(f"### [{i}] {entry.annotations}"))
# sequence.append(self.represent_data(entry))
# else:
# return self.represent_data(data.value)
# return SequenceNode(tag=BaseResolver.DEFAULT_SEQUENCE_TAG,value=sequence)
#
# elif isinstance(data, dict):
# ret_list = []
# for key, value in data.items():
# ret_list.append(
# (ScalarNode(tag=BaseResolver.DEFAULT_SCALAR_TAG, value=key),
# self.represent_data(value)
# )
# )
# return MappingNode(tag=BaseResolver.DEFAULT_MAPPING_TAG, value=ret_list)
#
# elif isinstance(data, list):
# ret = SequenceNode(tag=BaseResolver.DEFAULT_SEQUENCE_TAG, value=[self.represent_data(v) for v in data])
# else:
# ret = ScalarNode(tag=BaseResolver.DEFAULT_SCALAR_TAG, value=str(data))
# return ret
13 changes: 8 additions & 5 deletions src/otk/command.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import argparse
import logging
import pathlib
import sys
from typing import List

from . import __version__
from .annotation import AnnotatedPath
from .document import Omnifest

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -43,9 +43,9 @@ def _process(arguments: argparse.Namespace, dry_run: bool) -> int:
dst = sys.stdout if arguments.output is None else open(arguments.output, "w", encoding="utf-8")

if arguments.input is None:
path = pathlib.Path(f"/proc/self/fd/{sys.stdin.fileno()}")
path = AnnotatedPath(f"/proc/self/fd/{sys.stdin.fileno()}")
else:
path = pathlib.Path(arguments.input)
path = AnnotatedPath(arguments.input)

# First pass of resolving the otk file is "shallow", it will not run
# externals and not resolve anything under otk.target.*
Expand All @@ -57,12 +57,12 @@ def _process(arguments: argparse.Namespace, dry_run: bool) -> int:
target_available = doc.targets
target_requested = arguments.target
if not target_requested:
if len(target_available) > 1:
if len(target_available.value) > 1:
log.fatal("INPUT contains multiple targets, `-t` is required")
return 1
# set the requested target to the default case now that we know that
# there aren't multiple targets available and none are requested
target_requested = list(target_available.keys())[0]
target_requested = list(target_available.value.keys())[0]

if target_requested not in target_available:
log.fatal("requested target %r does not exist in INPUT", target_requested)
Expand All @@ -78,6 +78,9 @@ def _process(arguments: argparse.Namespace, dry_run: bool) -> int:
if not dry_run:
dst.write(doc.as_target_string())

if arguments.output is not None:
dst.close()

return 0


Expand Down
Loading
Loading