Make typed functions pass mypy
check.
This commit is contained in:
parent
99e44fd8b2
commit
5ed34df57a
@ -1,3 +1,6 @@
|
||||
from typing import Union
|
||||
|
||||
|
||||
class DrawerNode:
|
||||
def __init__(self):
|
||||
self.children = []
|
||||
@ -102,4 +105,15 @@ class CodeBlock(BlockNode):
|
||||
def __repr__(self):
|
||||
return "<Code: {}>".format(len(self.lines))
|
||||
|
||||
DomNode = Union[DrawerNode,
|
||||
PropertyNode,
|
||||
ListGroupNode,
|
||||
TableNode,
|
||||
TableSeparatorRow,
|
||||
TableRow,
|
||||
Text,
|
||||
ListItem,
|
||||
BlockNode,
|
||||
]
|
||||
|
||||
from .utils import get_raw_contents
|
||||
|
116
org_rw/org_rw.py
116
org_rw/org_rw.py
@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import collections
|
||||
from ctypes import ArgumentError
|
||||
import difflib
|
||||
import logging
|
||||
import os
|
||||
@ -8,7 +9,9 @@ import re
|
||||
import sys
|
||||
from datetime import date, datetime, timedelta
|
||||
from enum import Enum
|
||||
from typing import Generator, List, Optional, Tuple, Union
|
||||
from typing import cast, Iterator, List, Optional, Tuple, Union
|
||||
|
||||
from .types import HeadlineDict
|
||||
|
||||
from . import dom
|
||||
|
||||
@ -154,12 +157,12 @@ class RangeInRaw:
|
||||
contents.insert(start_idx + i + 1, element)
|
||||
|
||||
|
||||
def unescape_block_lines(lines: str) -> str:
|
||||
def unescape_block_lines(block: str) -> str:
|
||||
"""
|
||||
Remove leading ',' from block_lines if they escape `*` characters.
|
||||
"""
|
||||
i = 0
|
||||
lines = lines.split('\n')
|
||||
lines = block.split('\n')
|
||||
while i < len(lines):
|
||||
line = lines[i]
|
||||
if (line.lstrip(' ').startswith(',')
|
||||
@ -177,8 +180,8 @@ def unescape_block_lines(lines: str) -> str:
|
||||
def get_links_from_content(content):
|
||||
in_link = False
|
||||
in_description = False
|
||||
link_value = []
|
||||
link_description = []
|
||||
link_value: List[str] = []
|
||||
link_description: List[str] = []
|
||||
|
||||
for i, tok in enumerate(get_tokens(content)):
|
||||
if isinstance(tok, LinkToken):
|
||||
@ -210,8 +213,8 @@ def text_to_dom(tokens, item):
|
||||
|
||||
in_link = False
|
||||
in_description = False
|
||||
link_value = []
|
||||
link_description = []
|
||||
link_value: List[str] = []
|
||||
link_description: List[str] = []
|
||||
|
||||
contents = []
|
||||
|
||||
@ -361,9 +364,10 @@ class Headline:
|
||||
+ self.delimiters
|
||||
)
|
||||
|
||||
tree = []
|
||||
current_node = None
|
||||
indentation_tree = []
|
||||
tree: List[dom.DomNode] = []
|
||||
current_node: Optional[dom.DomNode] = None
|
||||
indentation_tree: List[dom.DomNode] = []
|
||||
contents: Optional[str] = None
|
||||
|
||||
for line in sorted(everything, key=get_line):
|
||||
if isinstance(current_node, dom.CodeBlock):
|
||||
@ -404,7 +408,7 @@ class Headline:
|
||||
):
|
||||
node.append(dom.Text(line))
|
||||
current_node = node
|
||||
contents = []
|
||||
contents = None
|
||||
break
|
||||
elif ((not isinstance(node, dom.TableNode)) and
|
||||
(type(node) not in NON_FINISHED_GROUPS)
|
||||
@ -419,7 +423,7 @@ class Headline:
|
||||
tree_up.pop(-1)
|
||||
else:
|
||||
current_node = None
|
||||
contents = []
|
||||
contents = None
|
||||
tree.append(dom.Text(text_to_dom(line.contents, line)))
|
||||
indentation_tree = tree_up
|
||||
|
||||
@ -669,6 +673,8 @@ class Headline:
|
||||
parsed = as_time_range
|
||||
else:
|
||||
parsed = OrgTime.parse(time_seg)
|
||||
|
||||
if parsed is not None:
|
||||
times.append(parsed)
|
||||
|
||||
return times
|
||||
@ -1130,6 +1136,9 @@ def parse_time(value: str) -> Union[None, TimeRange, OrgTime]:
|
||||
# @TODO properly consider "=> DURATION" section
|
||||
start, end = value.split("=")[0].split("--")
|
||||
as_time_range = parse_org_time_range(start, end)
|
||||
if as_time_range is None:
|
||||
return None
|
||||
|
||||
if (as_time_range.start_time is not None) and (
|
||||
as_time_range.end_time is not None
|
||||
):
|
||||
@ -1142,8 +1151,13 @@ def parse_time(value: str) -> Union[None, TimeRange, OrgTime]:
|
||||
return None
|
||||
|
||||
|
||||
def parse_org_time_range(start, end) -> TimeRange:
|
||||
return TimeRange(OrgTime.parse(start), OrgTime.parse(end))
|
||||
def parse_org_time_range(start, end) -> Optional[TimeRange]:
|
||||
start_time = OrgTime.parse(start)
|
||||
end_time = OrgTime.parse(end)
|
||||
|
||||
if start_time is None or end_time is None:
|
||||
return None
|
||||
return TimeRange(start_time, end_time)
|
||||
|
||||
|
||||
class OrgTime:
|
||||
@ -1170,12 +1184,13 @@ class OrgTime:
|
||||
return f"OrgTime({self.to_raw()})"
|
||||
|
||||
@classmethod
|
||||
def parse(self, value: str) -> OrgTime:
|
||||
def parse(self, value: str) -> Optional[OrgTime]:
|
||||
if m := ACTIVE_TIME_STAMP_RE.match(value):
|
||||
active = True
|
||||
elif m := INACTIVE_TIME_STAMP_RE.match(value):
|
||||
active = False
|
||||
else:
|
||||
# raise ArgumentError("Cannot parse `{}` as OrgTime".format(value))
|
||||
return None
|
||||
|
||||
repetition = None
|
||||
@ -1219,7 +1234,7 @@ class OrgTime:
|
||||
)
|
||||
|
||||
|
||||
def time_from_str(s: str) -> OrgTime:
|
||||
def time_from_str(s: str) -> Optional[OrgTime]:
|
||||
return OrgTime.parse(s)
|
||||
|
||||
|
||||
@ -1280,7 +1295,7 @@ class Line:
|
||||
|
||||
|
||||
class Link:
|
||||
def __init__(self, value: str, description: str, origin: RangeInRaw):
|
||||
def __init__(self, value: str, description: Optional[str], origin: RangeInRaw):
|
||||
self._value = value
|
||||
self._description = description
|
||||
self._origin = origin
|
||||
@ -1452,7 +1467,7 @@ class Verbatim:
|
||||
return f"{self.Marker}{raw}{self.Marker}"
|
||||
|
||||
|
||||
def is_pre(char: str) -> bool:
|
||||
def is_pre(char: Optional[str]) -> bool:
|
||||
if isinstance(char, str):
|
||||
return char in "\n\r\t -({'\""
|
||||
else:
|
||||
@ -1499,7 +1514,7 @@ def tokenize_contents(contents: str):
|
||||
tokens = []
|
||||
last_char = None
|
||||
|
||||
text = []
|
||||
text: List[str] = []
|
||||
closes = set()
|
||||
in_link = False
|
||||
in_link_description = False
|
||||
@ -1619,7 +1634,7 @@ def parse_contents(raw_contents: List[RawLine]):
|
||||
return []
|
||||
|
||||
blocks = []
|
||||
current_block = []
|
||||
current_block: List[RawLine] = []
|
||||
|
||||
for line in raw_contents:
|
||||
if len(current_block) == 0:
|
||||
@ -1627,6 +1642,7 @@ def parse_contents(raw_contents: List[RawLine]):
|
||||
current_line = line.linenum
|
||||
current_block.append(line)
|
||||
else:
|
||||
current_line = cast(int, current_line)
|
||||
if line.linenum == current_line + 1:
|
||||
# Continue with the current block
|
||||
current_line = line.linenum
|
||||
@ -1652,8 +1668,8 @@ def parse_content_block(raw_contents: Union[List[RawLine],str]):
|
||||
for line in raw_contents:
|
||||
contents_buff.append(line.line)
|
||||
|
||||
contents = "\n".join(contents_buff)
|
||||
tokens = tokenize_contents(contents)
|
||||
contents_buff_text = "\n".join(contents_buff)
|
||||
tokens = tokenize_contents(contents_buff_text)
|
||||
if isinstance(raw_contents, str):
|
||||
current_line = None
|
||||
else:
|
||||
@ -1893,7 +1909,7 @@ class OrgDoc:
|
||||
def getTopHeadlines(self):
|
||||
return self.headlines
|
||||
|
||||
def getAllHeadlines(self) -> Generator[Headline]:
|
||||
def getAllHeadlines(self) -> Iterator[Headline]:
|
||||
todo = self.headlines[::-1] # We go backwards, to pop/append and go depth-first
|
||||
while len(todo) != 0:
|
||||
hl = todo.pop()
|
||||
@ -2016,15 +2032,16 @@ class OrgDoc:
|
||||
|
||||
class OrgDocReader:
|
||||
def __init__(self):
|
||||
self.headlines: List[Headline] = []
|
||||
self.keywords: List[Property] = []
|
||||
self.headline_hierarchy: List[OrgDoc] = []
|
||||
self.headlines: List[HeadlineDict] = []
|
||||
self.keywords: List[Keyword] = []
|
||||
self.headline_hierarchy: List[HeadlineDict] = []
|
||||
self.contents: List[RawLine] = []
|
||||
self.delimiters: List[DelimiterLine] = []
|
||||
self.list_items: List[ListItem] = []
|
||||
self.table_rows: List[TableRow] = []
|
||||
self.structural: List = []
|
||||
self.properties: List = []
|
||||
self.current_drawer: Optional[List] = None
|
||||
|
||||
def finalize(self):
|
||||
return OrgDoc(
|
||||
@ -2037,12 +2054,12 @@ class OrgDocReader:
|
||||
)
|
||||
|
||||
## Construction
|
||||
def add_headline(self, linenum: int, match: re.Match) -> int:
|
||||
def add_headline(self, linenum: int, match: re.Match):
|
||||
# Position reader on the proper headline
|
||||
stars = match.group("stars")
|
||||
depth = len(stars)
|
||||
|
||||
headline = {
|
||||
headline: HeadlineDict = {
|
||||
"linenum": linenum,
|
||||
"orig": match,
|
||||
"title": match.group("line"),
|
||||
@ -2058,27 +2075,35 @@ class OrgDocReader:
|
||||
"table_rows": [],
|
||||
}
|
||||
|
||||
while (depth - 1) > len(self.headline_hierarchy):
|
||||
headline_hierarchy: List[Optional[HeadlineDict]] = list(self.headline_hierarchy)
|
||||
|
||||
while (depth - 1) > len(headline_hierarchy):
|
||||
# Introduce structural headlines
|
||||
self.headline_hierarchy.append(None)
|
||||
while depth <= len(self.headline_hierarchy):
|
||||
self.headline_hierarchy.pop()
|
||||
headline_hierarchy.append(None)
|
||||
while depth <= len(headline_hierarchy):
|
||||
headline_hierarchy.pop()
|
||||
|
||||
if depth == 1:
|
||||
self.headlines.append(headline)
|
||||
else:
|
||||
parent_idx = len(self.headline_hierarchy) - 1
|
||||
while self.headline_hierarchy[parent_idx] is None:
|
||||
parent_idx = len(headline_hierarchy) - 1
|
||||
while headline_hierarchy[parent_idx] is None:
|
||||
parent_idx -= 1
|
||||
self.headline_hierarchy[parent_idx]["children"].append(headline)
|
||||
self.headline_hierarchy.append(headline)
|
||||
parent_headline = headline_hierarchy[parent_idx]
|
||||
assert parent_headline is not None
|
||||
parent_headline["children"].append(headline)
|
||||
headline_hierarchy.append(headline)
|
||||
|
||||
if all([hl is not None for hl in self.headline_hierarchy]):
|
||||
if all([hl is not None for hl in headline_hierarchy]):
|
||||
if not ([ len(hl['orig'].group('stars')) for hl in self.headline_hierarchy ]
|
||||
== list(range(1, len(self.headline_hierarchy) + 1))):
|
||||
raise AssertionError('Error on Headline Hierarchy')
|
||||
else:
|
||||
raise AssertionError('None found on headline hierarchy')
|
||||
|
||||
def add_list_item_line(self, linenum: int, match: re.Match) -> int:
|
||||
self.headline_hierarchy = cast(List[HeadlineDict], headline_hierarchy)
|
||||
|
||||
def add_list_item_line(self, linenum: int, match: re.Match) -> ListItem:
|
||||
li = ListItem(
|
||||
linenum=linenum,
|
||||
match=match,
|
||||
@ -2103,7 +2128,7 @@ class OrgDocReader:
|
||||
self.headline_hierarchy[-1]["list_items"].append(li)
|
||||
return li
|
||||
|
||||
def add_table_line(self, linenum: int, line: str) -> int:
|
||||
def add_table_line(self, linenum: int, line: str):
|
||||
chunks = line.split('|')
|
||||
indentation = len(chunks[0])
|
||||
if chunks[-1].strip() == '':
|
||||
@ -2128,7 +2153,7 @@ class OrgDocReader:
|
||||
else:
|
||||
self.headline_hierarchy[-1]["table_rows"].append(row)
|
||||
|
||||
def add_keyword_line(self, linenum: int, match: re.Match) -> int:
|
||||
def add_keyword_line(self, linenum: int, match: re.Match):
|
||||
options = match.group("options")
|
||||
kw = Keyword(
|
||||
linenum,
|
||||
@ -2188,22 +2213,19 @@ class OrgDocReader:
|
||||
else:
|
||||
self.headline_hierarchy[-1]["structural"].append((linenum, line))
|
||||
|
||||
def add_node_properties_line(self, linenum: int, match: re.Match) -> int:
|
||||
def add_node_properties_line(self, linenum: int, match: re.Match):
|
||||
key = match.group("key")
|
||||
value = match.group("value").strip()
|
||||
|
||||
if as_time := parse_time(value):
|
||||
value = as_time
|
||||
|
||||
try:
|
||||
self.current_drawer.append(Property(linenum, match, key, value, None))
|
||||
except Exception:
|
||||
if "current_drawer" not in dir(self): # Throw a better error on this case
|
||||
if self.current_drawer is None: # Throw a better error on this case
|
||||
raise Exception(
|
||||
"Found properties before :PROPERTIES: line. Error on Org file?"
|
||||
)
|
||||
else:
|
||||
raise # Let the exception pass
|
||||
|
||||
self.current_drawer.append(Property(linenum, match, key, value, None))
|
||||
|
||||
def read(self, s, environment):
|
||||
lines = s.split("\n")
|
||||
|
17
org_rw/types.py
Normal file
17
org_rw/types.py
Normal file
@ -0,0 +1,17 @@
|
||||
import re
|
||||
from typing import List, TypedDict
|
||||
|
||||
class HeadlineDict(TypedDict):
|
||||
linenum: int
|
||||
orig: re.Match
|
||||
title: str
|
||||
contents: List
|
||||
children: List
|
||||
keywords: List
|
||||
properties: List
|
||||
logbook: List
|
||||
structural: List
|
||||
delimiters: List
|
||||
results: List # TODO: Move to each specific code block?
|
||||
list_items: List
|
||||
table_rows: List
|
Loading…
Reference in New Issue
Block a user