c5845d670f
Note that we might want to restore the offset correction at some point to allow looking at the line positions after the load is completed.
2816 lines
89 KiB
Python
2816 lines
89 KiB
Python
from __future__ import annotations
|
|
from datetime import timedelta
|
|
import collections
|
|
import difflib
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import date, datetime, timedelta
|
|
from enum import Enum
|
|
from typing import Any, cast, Iterator, List, Literal, Optional, Tuple, TypedDict, TypeVar, Union
|
|
|
|
from .types import HeadlineDict
|
|
|
|
from . import dom
|
|
|
|
|
|
DEBUG_DIFF_CONTEXT = 10
|
|
|
|
DEFAULT_TODO_KEYWORDS = ["TODO"]
|
|
DEFAULT_DONE_KEYWORDS = ["DONE"]
|
|
|
|
BASE_ENVIRONMENT = {
|
|
"org-footnote-section": "Footnotes",
|
|
"org-todo-keywords": ' '.join(DEFAULT_TODO_KEYWORDS) + ' | ' + ' '.join(DEFAULT_DONE_KEYWORDS),
|
|
"org-options-keywords": (
|
|
"ARCHIVE:",
|
|
"AUTHOR:",
|
|
"BIND:",
|
|
"CATEGORY:",
|
|
"COLUMNS:",
|
|
"CREATOR:",
|
|
"DATE:",
|
|
"DESCRIPTION:",
|
|
"DRAWERS:",
|
|
"EMAIL:",
|
|
"EXCLUDE_TAGS:",
|
|
"FILETAGS:",
|
|
"INCLUDE:",
|
|
"INDEX:",
|
|
"KEYWORDS:",
|
|
"LANGUAGE:",
|
|
"MACRO:",
|
|
"OPTIONS:",
|
|
"PROPERTY:",
|
|
"PRIORITIES:",
|
|
"SELECT_TAGS:",
|
|
"SEQ_TODO:",
|
|
"SETUPFILE:",
|
|
"STARTUP:",
|
|
"TAGS:" "TITLE:",
|
|
"TODO:",
|
|
"TYP_TODO:",
|
|
"SELECT_TAGS:",
|
|
"EXCLUDE_TAGS:",
|
|
),
|
|
}
|
|
|
|
HEADLINE_TAGS_RE = re.compile(r"((:(\w|[0-9_@#%])+)+:)\s*$")
|
|
HEADLINE_RE = re.compile(r"^(?P<stars>\*+)(?P<spacing>\s+)(?P<line>.*?)$")
|
|
KEYWORDS_RE = re.compile(
|
|
r"^(?P<indentation>\s*)#\+(?P<key>[^:\[]+)(\[(?P<options>[^\]]*)\])?:(?P<spacing>\s*)(?P<value>.*)$"
|
|
)
|
|
DRAWER_START_RE = re.compile(r"^(?P<indentation>\s*):([^:]+):(?P<end_indentation>\s*)$")
|
|
DRAWER_END_RE = re.compile(r"^(?P<indentation>\s*):END:(?P<end_indentation>\s*)$", re.I)
|
|
NODE_PROPERTIES_RE = re.compile(
|
|
r"^(?P<indentation>\s*):(?P<key>[^ ()+:]+)(?P<plus>\+)?:(?P<spacing>\s*)(?P<value>.+)$"
|
|
)
|
|
RAW_LINE_RE = re.compile(r"^\s*([^\s#:*|]|$)")
|
|
BASE_TIME_STAMP_RE = r"(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})( ?(?P<dow>[^ ]+))?( (?P<start_hour>\d{1,2}):(?P<start_minute>\d{1,2})(-+(?P<end_hour>\d{1,2}):(?P<end_minute>\d{1,2}))?)?(?P<repetition> (?P<rep_mark>(\+|\+\+|\.\+|-|--))(?P<rep_value>\d+)(?P<rep_unit>[hdwmy]))?"
|
|
CLEAN_TIME_STAMP_RE = r"\d{4}-\d{2}-\d{2}( ?([^ ]+))?( (\d{1,2}):(\d{1,2})(-+(\d{1,2}):(\d{1,2}))?)?( (\+|\+\+|\.\+|-|--)\d+[hdwmy])?"
|
|
|
|
ACTIVE_TIME_STAMP_RE = re.compile(r"<{}>".format(BASE_TIME_STAMP_RE))
|
|
INACTIVE_TIME_STAMP_RE = re.compile(r"\[{}\]".format(BASE_TIME_STAMP_RE))
|
|
PLANNING_RE = re.compile(
|
|
r"(?P<indentation>\s*)"
|
|
+ r"(SCHEDULED:\s*(?P<scheduled>[<\[]"
|
|
+ CLEAN_TIME_STAMP_RE
|
|
+ r"[>\]](--[<\[]"
|
|
+ CLEAN_TIME_STAMP_RE
|
|
+ r"[>\]])?)\s*"
|
|
+ r"|CLOSED:\s*(?P<closed>[<\[]"
|
|
+ CLEAN_TIME_STAMP_RE
|
|
+ r"[>\]](--[<\[]"
|
|
+ CLEAN_TIME_STAMP_RE
|
|
+ r"[>\]])?)\s*"
|
|
+ r"|DEADLINE:\s*(?P<deadline>[<\[]"
|
|
+ CLEAN_TIME_STAMP_RE
|
|
+ r"[>\]](--[<\[]"
|
|
+ CLEAN_TIME_STAMP_RE
|
|
+ r"[>\]])?)\s*"
|
|
r")+\s*"
|
|
)
|
|
LIST_ITEM_RE = re.compile(
|
|
r"(?P<indentation>\s*)((?P<bullet>[*\-+])|((?P<counter>\d|[a-zA-Z])(?P<counter_sep>[.)]))) ((?P<checkbox_indentation>\s*)\[(?P<checkbox_value>[ Xx])\])?((?P<tag_indentation>\s*)(?P<tag>.*?)::)?(?P<content>.*)"
|
|
)
|
|
|
|
IMPLICIT_LINK_RE = re.compile(r"(https?:[^<> ]*[a-zA-Z0-9])")
|
|
|
|
# Org-Babel
|
|
BEGIN_BLOCK_RE = re.compile(r"^\s*#\+BEGIN_(?P<subtype>[^ ]+)(?P<arguments>.*)$", re.I)
|
|
END_BLOCK_RE = re.compile(r"^\s*#\+END_(?P<subtype>[^ ]+)\s*$", re.I)
|
|
RESULTS_DRAWER_RE = re.compile(r"^\s*:results:\s*$", re.I)
|
|
CodeSnippet = collections.namedtuple(
|
|
"CodeSnippet", ("name", "content", "result", "arguments")
|
|
)
|
|
|
|
# Groupings
|
|
NON_FINISHED_GROUPS = (
|
|
type(None),
|
|
dom.ListGroupNode,
|
|
dom.ResultsDrawerNode,
|
|
dom.PropertyDrawerNode,
|
|
)
|
|
FREE_GROUPS = (dom.CodeBlock,)
|
|
|
|
# States
|
|
class HeadlineState(TypedDict):
|
|
# To be extended to handle keyboard shortcuts
|
|
name: str
|
|
|
|
class OrgDocDeclaredStates(TypedDict):
|
|
not_completed: List[HeadlineState]
|
|
completed: List[HeadlineState]
|
|
|
|
|
|
class NonReproducibleDocument(Exception):
|
|
"""
|
|
Exception thrown when a document would be saved as different contents
|
|
from what it's loaded from.
|
|
"""
|
|
|
|
pass
|
|
|
|
|
|
def get_tokens(value):
|
|
if isinstance(value, Text):
|
|
return value.contents
|
|
if isinstance(value, RawLine):
|
|
return [value.line]
|
|
if isinstance(value, list):
|
|
return value
|
|
raise Exception("Unknown how to get tokens from: {}".format(value))
|
|
|
|
|
|
class RangeInRaw:
|
|
def __init__(self, content, start_token, end_token):
|
|
self._content = content
|
|
self._start_id = id(start_token)
|
|
self._end_id = id(end_token)
|
|
|
|
def update_range(self, new_contents):
|
|
contents = self._content
|
|
if isinstance(self._content, Text):
|
|
contents = self._content.contents
|
|
|
|
# Find start token
|
|
for start_idx, tok in enumerate(contents):
|
|
if id(tok) == self._start_id:
|
|
break
|
|
else:
|
|
raise Exception("Start token not found")
|
|
|
|
# Find end token
|
|
for offset, tok in enumerate(contents[start_idx:]):
|
|
if id(tok) == self._end_id:
|
|
break
|
|
else:
|
|
raise Exception("End token not found")
|
|
|
|
# Remove old contents
|
|
for i in range(1, offset):
|
|
contents.pop(start_idx + 1)
|
|
|
|
# Add new ones
|
|
for i, element in enumerate(new_contents):
|
|
contents.insert(start_idx + i + 1, element)
|
|
|
|
|
|
def unescape_block_lines(block: str) -> str:
|
|
"""
|
|
Remove leading ',' from block_lines if they escape `*` characters.
|
|
"""
|
|
i = 0
|
|
lines = block.split("\n")
|
|
while i < len(lines):
|
|
line = lines[i]
|
|
if line.lstrip(" ").startswith(",") and line.lstrip(" ,").startswith("*"):
|
|
# Remove leading ','
|
|
lead_pos = line.index(",")
|
|
line = line[:lead_pos] + line[lead_pos + 1 :]
|
|
lines[i] = line
|
|
|
|
i += 1
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def get_links_from_content(content):
|
|
in_link = False
|
|
in_description = False
|
|
link_value: List[str] = []
|
|
link_description: List[str] = []
|
|
|
|
for i, tok in enumerate(get_tokens(content)):
|
|
if isinstance(tok, LinkToken):
|
|
if tok.tok_type == LinkTokenType.OPEN_LINK:
|
|
in_link = True
|
|
open_link_token = tok
|
|
elif tok.tok_type == LinkTokenType.OPEN_DESCRIPTION:
|
|
in_description = True
|
|
elif tok.tok_type == LinkTokenType.CLOSE:
|
|
rng = RangeInRaw(content, open_link_token, tok)
|
|
yield Link(
|
|
"".join(link_value),
|
|
"".join(link_description) if in_description else None,
|
|
rng,
|
|
)
|
|
in_link = False
|
|
in_description = False
|
|
link_value = []
|
|
link_description = []
|
|
elif isinstance(tok, str) and in_link:
|
|
if in_description:
|
|
link_description.append(tok)
|
|
else:
|
|
link_value.append(tok)
|
|
elif isinstance(tok, str):
|
|
implicit_links = IMPLICIT_LINK_RE.findall(tok)
|
|
for link in implicit_links:
|
|
yield Link(cast(str, link), cast(str, link), None)
|
|
|
|
|
|
def text_to_dom(tokens, item):
|
|
if tokens is None:
|
|
return None
|
|
|
|
in_link = False
|
|
in_description = False
|
|
link_value: List[str] = []
|
|
link_description: List[str] = []
|
|
|
|
contents = []
|
|
|
|
for tok in tokens:
|
|
if isinstance(tok, LinkToken):
|
|
if tok.tok_type == LinkTokenType.OPEN_LINK:
|
|
in_link = True
|
|
open_link_token = tok
|
|
elif tok.tok_type == LinkTokenType.OPEN_DESCRIPTION:
|
|
in_description = True
|
|
elif tok.tok_type == LinkTokenType.CLOSE:
|
|
rng = RangeInRaw(item, open_link_token, tok)
|
|
contents.append(
|
|
Link(
|
|
"".join(link_value),
|
|
"".join(link_description) if in_description else None,
|
|
rng,
|
|
)
|
|
)
|
|
in_link = False
|
|
in_description = False
|
|
link_value = []
|
|
link_description = []
|
|
elif isinstance(tok, str) and in_link:
|
|
if in_description:
|
|
link_description.append(tok)
|
|
else:
|
|
link_value.append(tok)
|
|
else:
|
|
contents.append(tok)
|
|
|
|
return contents
|
|
|
|
|
|
def get_line(item):
|
|
if isinstance(item, Text):
|
|
return item.linenum
|
|
elif isinstance(item, ListItem):
|
|
return item.linenum
|
|
elif isinstance(item, Property):
|
|
return item.linenum
|
|
elif isinstance(item, tuple):
|
|
return item[0]
|
|
else:
|
|
raise Exception("Unknown item type: {}".format(item))
|
|
|
|
|
|
class Headline:
|
|
def __init__(
|
|
self,
|
|
start_line,
|
|
depth,
|
|
orig,
|
|
properties,
|
|
keywords,
|
|
priority_start,
|
|
priority,
|
|
title_start,
|
|
title,
|
|
state,
|
|
tags_start,
|
|
tags,
|
|
contents,
|
|
children,
|
|
structural,
|
|
delimiters,
|
|
list_items,
|
|
table_rows,
|
|
parent,
|
|
is_todo: bool,
|
|
is_done: bool,
|
|
spacing,
|
|
scheduled: Optional[Time] = None,
|
|
deadline: Optional[Time] = None,
|
|
closed: Optional[Time] = None,
|
|
):
|
|
self.start_line = start_line
|
|
self.depth = depth
|
|
self.orig = orig
|
|
self.properties = properties
|
|
self.keywords = keywords
|
|
self.priority_start = priority_start
|
|
self.priority = priority
|
|
self.title_start = title_start
|
|
self.title = parse_content_block([RawLine(linenum=start_line, line=title)])
|
|
self.state = state
|
|
self.tags_start = tags_start
|
|
self.shallow_tags = tags
|
|
self.contents = contents
|
|
self.children = children
|
|
self.structural = structural
|
|
self.delimiters = delimiters
|
|
self.list_items = list_items
|
|
self.table_rows = table_rows
|
|
self.parent = parent
|
|
self.is_todo = is_todo
|
|
self.is_done = is_done
|
|
self.scheduled = scheduled
|
|
self.deadline = deadline
|
|
self.closed = closed
|
|
self.spacing = spacing
|
|
|
|
# Read planning line
|
|
planning_line = self.get_element_in_line(start_line + 1)
|
|
|
|
# Ignore if not found or is a structural line
|
|
if planning_line is None or isinstance(planning_line, tuple):
|
|
return
|
|
|
|
if m := PLANNING_RE.match(planning_line.get_raw()):
|
|
self._planning_indendation = m.group("indentation")
|
|
self._planning_order = []
|
|
|
|
keywords = ["SCHEDULED", "CLOSED", "DEADLINE"]
|
|
plan = planning_line.get_raw().split("\n")[0]
|
|
indexes = [(kw, plan.find(kw)) for kw in keywords]
|
|
|
|
self._planning_order = [
|
|
kw
|
|
for (kw, idx) in sorted(
|
|
filter(lambda v: v[1] >= 0, indexes), key=lambda v: v[1]
|
|
)
|
|
]
|
|
|
|
if scheduled_m := m.group("scheduled"):
|
|
self.scheduled = parse_time(scheduled_m)
|
|
if closed_m := m.group("closed"):
|
|
self.closed = parse_time(closed_m)
|
|
if deadline_m := m.group("deadline"):
|
|
self.deadline = parse_time(deadline_m)
|
|
|
|
# Remove from contents
|
|
self._remove_element_in_line(start_line + 1)
|
|
|
|
@property
|
|
def doc(self):
|
|
par = self.parent
|
|
while isinstance(par, Headline):
|
|
par = par.parent
|
|
return par
|
|
|
|
def as_dom(self):
|
|
everything = (
|
|
self.keywords
|
|
+ self.contents
|
|
+ self.list_items
|
|
+ self.table_rows
|
|
+ self.properties
|
|
+ self.structural
|
|
+ self.delimiters
|
|
)
|
|
|
|
tree: List[dom.DomNode] = []
|
|
current_node: Optional[dom.DomNode] = None
|
|
indentation_tree: List[dom.ContainerDomNode] = []
|
|
contents: Optional[str] = None
|
|
|
|
for line in sorted(everything, key=get_line):
|
|
if isinstance(current_node, dom.CodeBlock):
|
|
if (
|
|
isinstance(line, DelimiterLine)
|
|
and line.delimiter_type == DelimiterLineType.END_BLOCK
|
|
):
|
|
|
|
start = current_node.header.linenum
|
|
end = line.linenum
|
|
|
|
lines = self.get_lines_between(start + 1, end)
|
|
contents = unescape_block_lines("\n".join(lines))
|
|
if contents.endswith("\n"):
|
|
# This is not ideal, but to avoid having to do this maybe
|
|
# the content parsing must be re-thinked
|
|
contents = contents[:-1]
|
|
|
|
current_node.set_lines(contents)
|
|
tree.append(current_node)
|
|
current_node = None
|
|
else:
|
|
pass # Ignore
|
|
|
|
elif isinstance(line, Property):
|
|
if type(current_node) in NON_FINISHED_GROUPS:
|
|
current_node = dom.PropertyDrawerNode()
|
|
tree.append(current_node)
|
|
assert isinstance(current_node, dom.PropertyDrawerNode)
|
|
current_node.append(dom.PropertyNode(line.key, line.value))
|
|
|
|
elif isinstance(line, Text):
|
|
tree_up = list(indentation_tree)
|
|
while len(tree_up) > 0:
|
|
node: dom.DomNode = tree_up[-1]
|
|
if isinstance(node, dom.BlockNode) or isinstance(
|
|
node, dom.DrawerNode
|
|
):
|
|
node.append(dom.Text(line))
|
|
current_node = node
|
|
contents = None
|
|
break
|
|
elif (not isinstance(node, dom.TableNode)) and (
|
|
type(node) not in NON_FINISHED_GROUPS
|
|
):
|
|
raise NotImplementedError(
|
|
"Not implemented node type: {} (headline_id={}, line={}, doc={})".format(
|
|
node,
|
|
self.id,
|
|
line.linenum,
|
|
self.doc.path,
|
|
)
|
|
)
|
|
else:
|
|
tree_up.pop(-1)
|
|
else:
|
|
current_node = None
|
|
contents = None
|
|
tree.append(dom.Text(text_to_dom(line.contents, line)))
|
|
indentation_tree = tree_up
|
|
|
|
elif isinstance(line, ListItem):
|
|
if (
|
|
current_node is None
|
|
or isinstance(current_node, dom.TableNode)
|
|
or isinstance(current_node, dom.BlockNode)
|
|
or isinstance(current_node, dom.DrawerNode)
|
|
):
|
|
was_node = current_node
|
|
current_node = dom.ListGroupNode()
|
|
if was_node is None:
|
|
tree.append(current_node)
|
|
else:
|
|
was_node.append(current_node)
|
|
indentation_tree.append(current_node)
|
|
if not isinstance(current_node, dom.ListGroupNode):
|
|
if not isinstance(current_node, dom.ListGroupNode):
|
|
raise Exception(
|
|
"Expected a {}, found: {} on line {} on {}".format(
|
|
dom.ListGroupNode,
|
|
current_node,
|
|
line.linenum,
|
|
self.doc.path,
|
|
)
|
|
)
|
|
# This can happen. Frequently inside a LogDrawer
|
|
|
|
if len(indentation_tree) > 0 and (
|
|
(len(indentation_tree[-1].children) > 0)
|
|
and len(
|
|
[
|
|
c
|
|
for c in indentation_tree[-1].children
|
|
if isinstance(c, dom.ListItem)
|
|
][-1].orig.indentation
|
|
)
|
|
< len(line.indentation)
|
|
):
|
|
sublist = dom.ListGroupNode()
|
|
current_node.append(sublist)
|
|
current_node = sublist
|
|
indentation_tree.append(current_node)
|
|
|
|
while len(indentation_tree) > 0:
|
|
list_children = [
|
|
c
|
|
for c in indentation_tree[-1].children
|
|
if isinstance(c, dom.ListItem)
|
|
]
|
|
|
|
if len(list_children) == 0:
|
|
break
|
|
if len(list_children[-1].orig.indentation) <= len(line.indentation):
|
|
# No more breaking out of lists, it's indentation
|
|
# is less than ours
|
|
break
|
|
|
|
rem = indentation_tree.pop(-1)
|
|
if len(indentation_tree) == 0:
|
|
indentation_tree.append(rem)
|
|
current_node = rem
|
|
break
|
|
else:
|
|
current_node = indentation_tree[-1]
|
|
|
|
node = dom.ListItem(
|
|
text_to_dom(line.tag, line),
|
|
text_to_dom(line.content, line),
|
|
orig=line,
|
|
)
|
|
current_node.append(node)
|
|
|
|
elif isinstance(line, TableRow):
|
|
if current_node is None:
|
|
current_node = dom.TableNode()
|
|
tree.append(current_node)
|
|
# TODO: Allow indentation of this element inside others
|
|
indentation_tree = [current_node]
|
|
elif not isinstance(current_node, dom.TableNode):
|
|
if isinstance(current_node, dom.ListGroupNode):
|
|
# As an item inside a list
|
|
list_node = current_node
|
|
current_node = dom.TableNode()
|
|
list_node.append(current_node)
|
|
indentation_tree.append(current_node)
|
|
else:
|
|
logging.debug(
|
|
"Expected a {}, found: {} on line {}".format(
|
|
dom.TableNode, current_node, line.linenum
|
|
)
|
|
)
|
|
# This can happen. Frequently inside a LogDrawer
|
|
|
|
if (
|
|
len(line.cells) > 0
|
|
and len(line.cells[0]) > 0
|
|
and line.cells[0][0] == "-"
|
|
):
|
|
node = dom.TableSeparatorRow(orig=line)
|
|
else:
|
|
node = dom.TableRow(line.cells, orig=line)
|
|
current_node = cast(dom.ContainerDomNode, current_node)
|
|
current_node.append(node)
|
|
|
|
elif (
|
|
isinstance(line, DelimiterLine)
|
|
and line.delimiter_type == DelimiterLineType.BEGIN_BLOCK
|
|
):
|
|
assert type(current_node) in NON_FINISHED_GROUPS
|
|
current_node = dom.CodeBlock(
|
|
line, line.type_data.subtype, line.arguments
|
|
)
|
|
|
|
elif isinstance(line, Keyword):
|
|
logging.warning("Keywords not implemented on `as_dom()`")
|
|
|
|
# elif (
|
|
# isinstance(line, DelimiterLine)
|
|
# and line.delimiter_type == DelimiterLineType.END_BLOCK
|
|
# ):
|
|
# assert isinstance(current_node, dom.BlockNode)
|
|
# current_node = None
|
|
|
|
elif (
|
|
isinstance(line, tuple)
|
|
and len(line) == 2
|
|
and isinstance(line[0], int)
|
|
and isinstance(line[1], str)
|
|
):
|
|
# Structural
|
|
(linenum, content) = line
|
|
if content.strip().upper() == ":PROPERTIES:":
|
|
assert current_node is None
|
|
current_node = dom.PropertyDrawerNode()
|
|
tree.append(current_node)
|
|
# TODO: Check if this can be nested
|
|
indentation_tree = [current_node]
|
|
elif content.strip().upper() == ":LOGBOOK:":
|
|
assert current_node is None
|
|
current_node = dom.LogbookDrawerNode()
|
|
tree.append(current_node)
|
|
# TODO: Check if this can be nested
|
|
indentation_tree = [current_node]
|
|
elif content.strip().upper() == ":END:":
|
|
if current_node is None and len(indentation_tree) == 0:
|
|
logging.error("Finished node (:END:) with no known starter")
|
|
else:
|
|
tree_up = list(indentation_tree)
|
|
while len(tree_up) > 0:
|
|
node = tree_up[-1]
|
|
if isinstance(node, dom.DrawerNode):
|
|
indentation_tree = tree_up
|
|
current_node = node
|
|
tree_up.pop(-1)
|
|
break
|
|
else:
|
|
tree_up.pop(-1)
|
|
else:
|
|
raise Exception(
|
|
"Unexpected node ({}) on headline (id={}), line {}".format(
|
|
current_node, self.id, linenum
|
|
)
|
|
)
|
|
current_node = None
|
|
elif content.strip().upper() == ":RESULTS:":
|
|
assert current_node is None
|
|
current_node = dom.ResultsDrawerNode()
|
|
|
|
# TODO: Allow indentation of these blocks inside others
|
|
indentation_tree = [current_node]
|
|
tree.append(current_node)
|
|
else:
|
|
raise Exception("Unknown structural line: {}".format(line))
|
|
else:
|
|
raise Exception("Unknown node type: {}".format(line))
|
|
|
|
return tree
|
|
|
|
def get_lists(self):
|
|
lists = []
|
|
last_line = None
|
|
|
|
for li in self.list_items:
|
|
if last_line is None:
|
|
lists.append([li])
|
|
else:
|
|
num_lines = li.linenum - (last_line + 1)
|
|
lines_between = "".join(
|
|
[
|
|
"\n" + l
|
|
for l in self.get_lines_between(last_line + 1, li.linenum)
|
|
]
|
|
)
|
|
|
|
# Only empty lines
|
|
if (num_lines == lines_between.count("\n")) and (
|
|
len(lines_between.strip()) == 0
|
|
):
|
|
lists[-1].append(li)
|
|
else:
|
|
lists.append([li])
|
|
|
|
last_line = li.linenum + sum(c.count("\n") for c in li.content)
|
|
return lists
|
|
|
|
# @DEPRECATED: use `get_lists`
|
|
def getLists(self):
|
|
return self.get_lists()
|
|
|
|
def get_tables(self):
|
|
tables: List[List] = [] # TableRow[][]
|
|
last_line = None
|
|
|
|
for row in self.table_rows:
|
|
if last_line == row.linenum - 1:
|
|
tables[-1].append(row)
|
|
else:
|
|
tables.append([row])
|
|
|
|
last_line = row.linenum
|
|
return tables
|
|
|
|
def get_planning_line(self):
|
|
if self.scheduled is None and self.closed is None and self.deadline is None:
|
|
return None
|
|
|
|
contents = [self._planning_indendation]
|
|
|
|
for el in self._planning_order:
|
|
if el == "SCHEDULED" and self.scheduled is not None:
|
|
contents.append("SCHEDULED: {} ".format(self.scheduled.to_raw()))
|
|
|
|
elif el == "CLOSED" and self.closed is not None:
|
|
contents.append("CLOSED: {} ".format(self.closed.to_raw()))
|
|
|
|
elif el == "DEADLINE" and self.deadline is not None:
|
|
contents.append("DEADLINE: {} ".format(self.deadline.to_raw()))
|
|
|
|
# Consider elements added (not present on planning order)
|
|
if ("SCHEDULED" not in self._planning_order) and (self.scheduled is not None):
|
|
contents.append("SCHEDULED: {} ".format(self.scheduled.to_raw()))
|
|
|
|
if ("CLOSED" not in self._planning_order) and (self.closed is not None):
|
|
contents.append("CLOSED: {} ".format(self.closed.to_raw()))
|
|
|
|
if ("DEADLINE" not in self._planning_order) and (self.deadline is not None):
|
|
contents.append("DEADLINE: {} ".format(self.deadline.to_raw()))
|
|
|
|
return "".join(contents).rstrip()
|
|
|
|
@property
|
|
def id(self):
|
|
return self.get_property("ID")
|
|
|
|
@id.setter
|
|
def id(self, value):
|
|
self.set_property("ID", value)
|
|
|
|
@property
|
|
def clock(self):
|
|
times = []
|
|
for chunk in self.contents:
|
|
for line in chunk.get_raw().split("\n"):
|
|
content = line.strip()
|
|
if not content.startswith("CLOCK:"):
|
|
continue
|
|
|
|
time_seg = content[len("CLOCK:") :].strip()
|
|
|
|
parsed: Optional[Time] = None
|
|
if "--" in time_seg:
|
|
# TODO: Consider duration
|
|
start, end = time_seg.split("=")[0].split("--")
|
|
as_time_range = parse_org_time_range(start, end)
|
|
parsed = as_time_range
|
|
else:
|
|
parsed = OrgTime.parse(time_seg)
|
|
|
|
if parsed is not None:
|
|
times.append(parsed)
|
|
|
|
return times
|
|
|
|
@property
|
|
def tags(self):
|
|
if isinstance(self.parent, OrgDoc):
|
|
return list(self.shallow_tags)
|
|
else:
|
|
return list(self.shallow_tags) + self.parent.tags
|
|
|
|
def add_tag(self, tag: str):
|
|
self.shallow_tags.append(tag)
|
|
|
|
def get_property(self, name: str, default=None):
|
|
for prop in self.properties:
|
|
if prop.key == name:
|
|
return prop.value
|
|
|
|
return default
|
|
|
|
def set_property(self, name: str, value: str):
|
|
for prop in self.properties:
|
|
|
|
# A matching property is found, update it
|
|
if prop.key == name:
|
|
prop.value = value
|
|
return
|
|
|
|
# No matching property found, add it
|
|
else:
|
|
if len(self.properties) > 0:
|
|
last_prop = self.properties[-1]
|
|
last_line = last_prop.linenum
|
|
last_match = last_prop.match
|
|
else:
|
|
self.structural.append(
|
|
(
|
|
-2, # Linenum
|
|
":PROPERTIES:",
|
|
)
|
|
)
|
|
self.structural.append(
|
|
(
|
|
0, # Linenum
|
|
":END:",
|
|
)
|
|
)
|
|
|
|
last_line = -1
|
|
last_match = None
|
|
self.properties.append(
|
|
Property(
|
|
linenum=last_line,
|
|
match=last_match,
|
|
key=name,
|
|
value=value,
|
|
options=None,
|
|
)
|
|
)
|
|
|
|
def get_links(self):
|
|
for content in self.contents:
|
|
yield from get_links_from_content(content)
|
|
|
|
for lst in self.get_lists():
|
|
for item in lst:
|
|
if item.tag:
|
|
yield from get_links_from_content(item.tag)
|
|
yield from get_links_from_content(item.content)
|
|
|
|
def get_lines_between(self, start, end):
|
|
for line in self.contents:
|
|
if start <= line.linenum < end:
|
|
yield "".join(line.get_raw())
|
|
|
|
def get_contents(self, format):
|
|
if format == "raw":
|
|
yield from map(
|
|
lambda x: token_list_to_raw(x.contents),
|
|
sorted(self.contents, key=lambda x: x.linenum),
|
|
)
|
|
else:
|
|
raise NotImplementedError()
|
|
|
|
def update_raw_contents(self, new_contents):
|
|
|
|
reader = OrgDocReader(environment=self.doc.environment)
|
|
reader.read(new_contents)
|
|
|
|
# No need to finalize as we can take the data from the reader instead of from a doc
|
|
if len(reader.headlines) > 0:
|
|
# Probably can be done by just adding the headlines to this one's children
|
|
raise NotImplementedError('new headlines on raw contents not supported yet. This probably should be simple, see comment on code.')
|
|
|
|
# Clear elements
|
|
self.keywords = reader.keywords
|
|
self.contents = reader.contents
|
|
self.list_items = reader.list_items
|
|
self.table_rows = reader.table_rows
|
|
self.properties = reader.properties
|
|
self.structural = reader.structural
|
|
self.delimiters = reader.delimiters
|
|
|
|
# TODO: Support update of scheduled/deadline/closed line
|
|
self.scheduled = None
|
|
self.deadline = None
|
|
self.closed = None
|
|
|
|
# Environment is not used, as it's known
|
|
|
|
|
|
def get_element_in_line(self, linenum):
|
|
for line in self.contents:
|
|
if linenum == line.linenum:
|
|
return line
|
|
|
|
for s_lnum, struc in self.structural:
|
|
if linenum == s_lnum:
|
|
return ("structural", struc)
|
|
|
|
def _remove_element_in_line(self, linenum):
|
|
found = None
|
|
for i, line in enumerate(self.contents):
|
|
if linenum == line.linenum:
|
|
found = i
|
|
break
|
|
|
|
assert found is not None
|
|
el = self.contents[found]
|
|
assert isinstance(el, Text)
|
|
|
|
raw = el.get_raw()
|
|
if "\n" not in raw:
|
|
# Remove the element found
|
|
self.contents.pop(found)
|
|
else:
|
|
# Remove the first line
|
|
self.contents[found] = parse_content_block(
|
|
[RawLine(self.contents[found].linenum + 1, raw.split("\n", 1)[1])]
|
|
)
|
|
|
|
def get_structural_end_after(self, linenum):
|
|
for s_lnum, struc in self.structural:
|
|
if s_lnum > linenum and struc.strip().upper() == ":END:":
|
|
return (s_lnum, struc)
|
|
|
|
def get_code_snippets(self):
|
|
inside_code = False
|
|
|
|
sections = []
|
|
arguments = None
|
|
|
|
for delimiter in self.delimiters:
|
|
if (
|
|
delimiter.delimiter_type == DelimiterLineType.BEGIN_BLOCK
|
|
and delimiter.type_data.subtype.lower() == "src"
|
|
):
|
|
line_start = delimiter.linenum
|
|
inside_code = True
|
|
arguments = delimiter.arguments
|
|
elif (
|
|
delimiter.delimiter_type == DelimiterLineType.END_BLOCK
|
|
and delimiter.type_data.subtype.lower() == "src"
|
|
):
|
|
inside_code = False
|
|
start, end = line_start, delimiter.linenum
|
|
|
|
lines = self.get_lines_between(start + 1, end)
|
|
contents = unescape_block_lines("\n".join(lines))
|
|
if contents.endswith("\n"):
|
|
# This is not ideal, but to avoid having to do this maybe
|
|
# the content parsing must be re-thinked
|
|
contents = contents[:-1]
|
|
|
|
sections.append(
|
|
{
|
|
"line_first": start + 1,
|
|
"line_last": end - 1,
|
|
"content": contents,
|
|
"arguments": arguments,
|
|
}
|
|
)
|
|
arguments = None
|
|
line_start = None
|
|
|
|
for kword in self.keywords:
|
|
if kword.key.upper() == "RESULTS":
|
|
for snippet in sections:
|
|
if kword.linenum > snippet["line_last"]:
|
|
result_first = self.get_element_in_line(kword.linenum + 1)
|
|
|
|
if isinstance(result_first, Text):
|
|
result = "\n".join(result_first.contents)
|
|
snippet["result"] = result
|
|
|
|
if result.strip().startswith(": "):
|
|
# Split lines and remove ':'
|
|
lines = result.split("\n")
|
|
s_result = []
|
|
for line in lines:
|
|
if ": " not in line:
|
|
break
|
|
s_result.append(line.lstrip(" ")[2:])
|
|
snippet["result"] = "\n".join(s_result)
|
|
elif (
|
|
isinstance(result_first, tuple)
|
|
and len(result_first) == 2
|
|
and result_first[0] == "structural"
|
|
and result_first[1].strip().upper() == ":RESULTS:"
|
|
):
|
|
|
|
(end_line, _) = self.get_structural_end_after(
|
|
kword.linenum + 1
|
|
)
|
|
contents = "\n".join(
|
|
self.get_lines_between(kword.linenum + 1, end_line)
|
|
)
|
|
indentation = result_first[1].index(":")
|
|
dedented = "\n".join(
|
|
[line[indentation:] for line in contents.split("\n")]
|
|
)
|
|
if dedented.endswith("\n"):
|
|
dedented = dedented[:-1]
|
|
|
|
snippet["result"] = dedented
|
|
|
|
break
|
|
|
|
results = []
|
|
for section in sections:
|
|
name = None
|
|
content = section["content"]
|
|
code_result = section.get("result", None)
|
|
arguments = section.get("arguments", None)
|
|
results.append(
|
|
CodeSnippet(
|
|
name=name, content=content, result=code_result, arguments=arguments
|
|
)
|
|
)
|
|
|
|
return results
|
|
|
|
def create_headline_at_end(self) -> Headline:
|
|
headline = Headline(
|
|
start_line=1,
|
|
depth=self.depth + 1,
|
|
orig=None,
|
|
properties=[],
|
|
keywords=[],
|
|
priority_start=None,
|
|
priority=None,
|
|
title_start=None,
|
|
title="",
|
|
state="",
|
|
tags_start=None,
|
|
tags=[],
|
|
contents=[],
|
|
children=[],
|
|
structural=[],
|
|
delimiters=[],
|
|
list_items=[],
|
|
table_rows=[],
|
|
parent=self,
|
|
is_todo=False,
|
|
is_done=False,
|
|
spacing=" ",
|
|
)
|
|
|
|
self.children.append(headline)
|
|
return headline
|
|
|
|
|
|
RawLine = collections.namedtuple("RawLine", ("linenum", "line"))
|
|
Keyword = collections.namedtuple(
|
|
"Keyword", ("linenum", "match", "key", "value", "options")
|
|
)
|
|
Property = collections.namedtuple(
|
|
"Property", ("linenum", "match", "key", "value", "options")
|
|
)
|
|
|
|
|
|
class ListItem:
|
|
def __init__(
|
|
self,
|
|
linenum,
|
|
match,
|
|
indentation,
|
|
bullet,
|
|
counter,
|
|
counter_sep,
|
|
checkbox_indentation,
|
|
checkbox_value,
|
|
tag_indentation,
|
|
tag,
|
|
content,
|
|
):
|
|
self.linenum = linenum
|
|
self.match = match
|
|
self.indentation = indentation
|
|
self.bullet = bullet
|
|
self.counter = counter
|
|
self.counter_sep = counter_sep
|
|
self.checkbox_indentation = checkbox_indentation
|
|
self.checkbox_value = checkbox_value
|
|
self.tag_indentation = tag_indentation
|
|
self.tag = tag
|
|
self.content = content
|
|
|
|
@property
|
|
def text_start_pos(self):
|
|
return len(self.indentation) + 1 # Indentation + bullet
|
|
|
|
def append_line(self, line):
|
|
self.content += parse_content_block("\n" + line).contents
|
|
|
|
|
|
TableRow = collections.namedtuple(
|
|
"TableRow",
|
|
(
|
|
"linenum",
|
|
"indentation",
|
|
"suffix",
|
|
"last_cell_closed",
|
|
"cells",
|
|
),
|
|
)
|
|
|
|
# @TODO How are [YYYY-MM-DD HH:mm--HH:mm] and ([... HH:mm]--[... HH:mm]) differentiated ?
|
|
# @TODO Consider recurrence annotations
|
|
class Timestamp:
|
|
def __init__(
|
|
self,
|
|
active: bool = True,
|
|
year: Optional[int] = None,
|
|
month: Optional[int] = None,
|
|
day: Optional[int] = None,
|
|
dow: Optional[str] = None,
|
|
hour: Optional[int] = None,
|
|
minute: Optional[int] = None,
|
|
repetition: Optional[str] = None,
|
|
datetime_: Optional[Union[date, datetime]] = None,
|
|
):
|
|
"""
|
|
Initializes a Timestamp instance.
|
|
|
|
Args:
|
|
active (bool): Whether the timestamp is active.
|
|
year (Optional[int]): The year of the timestamp.
|
|
month (Optional[int]): The month of the timestamp.
|
|
day (Optional[int]): The day of the timestamp.
|
|
dow (Optional[str]): The day of the week, if any.
|
|
hour (Optional[int]): The hour of the timestamp, if any.
|
|
minute (Optional[int]): The minute of the timestamp, if any.
|
|
repetition (Optional[str]): The repetition pattern, if any.
|
|
datetime_ (Optional[Union[date, datetime]]): A date or datetime object.
|
|
|
|
Raises:
|
|
ValueError: If neither datetime_ nor the combination of year, month, and day are provided.
|
|
"""
|
|
self.active = active
|
|
|
|
if datetime_ is not None:
|
|
self.from_datetime(datetime_)
|
|
elif year is not None and month is not None and day is not None:
|
|
self._year = year
|
|
self._month = month
|
|
self._day = day
|
|
self.dow = dow
|
|
self.hour = hour
|
|
self.minute = minute
|
|
else:
|
|
raise ValueError(
|
|
"Either datetime_ or year, month, and day must be provided."
|
|
)
|
|
self.repetition = repetition
|
|
|
|
def to_datetime(self) -> datetime:
|
|
"""
|
|
Converts the Timestamp to a datetime object.
|
|
|
|
Returns:
|
|
datetime: The corresponding datetime object.
|
|
"""
|
|
if self.hour is not None:
|
|
return datetime(self.year, self.month, self.day, self.hour, self.minute or 0)
|
|
else:
|
|
return datetime(self.year, self.month, self.day, 0, 0)
|
|
|
|
def from_datetime(self, dt: Union[datetime, date]) -> None:
|
|
"""
|
|
Updates the current Timestamp instance based on a datetime or date object.
|
|
|
|
Args:
|
|
dt (Union[datetime, date]): The datetime or date object to use for updating the instance.
|
|
"""
|
|
if isinstance(dt, datetime):
|
|
self._year = dt.year
|
|
self._month = dt.month
|
|
self._day = dt.day
|
|
self.hour = dt.hour
|
|
self.minute = dt.minute
|
|
elif isinstance(dt, date):
|
|
self._year = dt.year
|
|
self._month = dt.month
|
|
self._day = dt.day
|
|
self.hour = None
|
|
self.minute = None
|
|
else:
|
|
raise TypeError("Expected datetime or date object")
|
|
|
|
self.dow = None # Day of the week can be set to None
|
|
|
|
def __add__(self, delta: timedelta) -> "Timestamp":
|
|
"""
|
|
Adds a timedelta to the Timestamp.
|
|
|
|
Args:
|
|
delta (timedelta): The time difference to add.
|
|
|
|
Returns:
|
|
Timestamp: The resulting Timestamp instance.
|
|
"""
|
|
as_dt = self.to_datetime()
|
|
to_dt = as_dt + delta
|
|
|
|
return Timestamp(
|
|
self.active,
|
|
year=to_dt.year,
|
|
month=to_dt.month,
|
|
day=to_dt.day,
|
|
dow=None,
|
|
hour=to_dt.hour if self.hour is not None or to_dt.hour != 0 else None,
|
|
minute=(
|
|
to_dt.minute if self.minute is not None or to_dt.minute != 0 else None
|
|
),
|
|
repetition=self.repetition,
|
|
)
|
|
|
|
def __eq__(self, other: object) -> bool:
|
|
"""
|
|
Checks if two Timestamp instances are equal.
|
|
|
|
Args:
|
|
other (object): The other object to compare with.
|
|
|
|
Returns:
|
|
bool: True if the instances are equal, False otherwise.
|
|
"""
|
|
if not isinstance(other, Timestamp):
|
|
return False
|
|
return (
|
|
self.active == other.active
|
|
and self.year == other.year
|
|
and self.month == other.month
|
|
and self.day == other.day
|
|
and self.dow == other.dow
|
|
and self.hour == other.hour
|
|
and self.minute == other.minute
|
|
and self.repetition == other.repetition
|
|
)
|
|
|
|
def __lt__(self, other: object) -> bool:
|
|
"""
|
|
Checks if the Timestamp is less than another Timestamp.
|
|
|
|
Args:
|
|
other (object): The other object to compare with.
|
|
|
|
Returns:
|
|
bool: True if this Timestamp is less than the other, False otherwise.
|
|
"""
|
|
if not isinstance(other, Timestamp):
|
|
return False
|
|
return self.to_datetime() < other.to_datetime()
|
|
|
|
def __gt__(self, other: object) -> bool:
|
|
"""
|
|
Checks if the Timestamp is greater than another Timestamp.
|
|
|
|
Args:
|
|
other (object): The other object to compare with.
|
|
|
|
Returns:
|
|
bool: True if this Timestamp is greater than the other, False otherwise.
|
|
"""
|
|
if not isinstance(other, Timestamp):
|
|
return False
|
|
return self.to_datetime() > other.to_datetime()
|
|
|
|
def __repr__(self) -> str:
|
|
"""
|
|
Returns a string representation of the Timestamp.
|
|
|
|
Returns:
|
|
str: The string representation of the Timestamp.
|
|
"""
|
|
return timestamp_to_string(self)
|
|
|
|
@property
|
|
def year(self) -> int:
|
|
"""Returns the year of the timestamp."""
|
|
return self._year
|
|
|
|
@year.setter
|
|
def year(self, value: int) -> None:
|
|
"""Sets the year of the timestamp and resets the day of the week."""
|
|
self._year = value
|
|
self.dow = None
|
|
|
|
@property
|
|
def month(self) -> int:
|
|
"""Returns the month of the timestamp."""
|
|
return self._month
|
|
|
|
@month.setter
|
|
def month(self, value: int) -> None:
|
|
"""Sets the month of the timestamp and resets the day of the week."""
|
|
self._month = value
|
|
self.dow = None
|
|
|
|
@property
|
|
def day(self) -> int:
|
|
"""Returns the day of the timestamp."""
|
|
return self._day
|
|
|
|
@day.setter
|
|
def day(self, value: int) -> None:
|
|
"""Sets the day of the timestamp and resets the day of the week."""
|
|
self._day = value
|
|
self.dow = None
|
|
|
|
|
|
class DelimiterLineType(Enum):
|
|
BEGIN_BLOCK = 1
|
|
END_BLOCK = 2
|
|
|
|
|
|
BlockDelimiterTypeData = collections.namedtuple("BlockDelimiterTypeData", ("subtype"))
|
|
|
|
DelimiterLine = collections.namedtuple(
|
|
"DelimiterLine", ("linenum", "line", "delimiter_type", "type_data", "arguments")
|
|
)
|
|
|
|
|
|
class MarkerType(Enum):
|
|
NO_MODE = 0b0
|
|
BOLD_MODE = 0b1
|
|
CODE_MODE = 0b10
|
|
ITALIC_MODE = 0b100
|
|
STRIKE_MODE = 0b1000
|
|
UNDERLINED_MODE = 0b10000
|
|
VERBATIM_MODE = 0b100000
|
|
|
|
|
|
MARKERS = {
|
|
"*": MarkerType.BOLD_MODE,
|
|
"~": MarkerType.CODE_MODE,
|
|
"/": MarkerType.ITALIC_MODE,
|
|
"+": MarkerType.STRIKE_MODE,
|
|
"_": MarkerType.UNDERLINED_MODE,
|
|
"=": MarkerType.VERBATIM_MODE,
|
|
}
|
|
|
|
ModeToMarker = {}
|
|
|
|
for tok, mode in MARKERS.items():
|
|
ModeToMarker[mode] = tok
|
|
|
|
MarkerToken = collections.namedtuple("MarkerToken", ("closing", "tok_type"))
|
|
LinkToken = collections.namedtuple("LinkToken", ("tok_type"))
|
|
|
|
|
|
class LinkTokenType(Enum):
|
|
OPEN_LINK = 3
|
|
OPEN_DESCRIPTION = 5
|
|
CLOSE = 4
|
|
|
|
|
|
BEGIN_PROPERTIES = "OPEN_PROPERTIES"
|
|
END_PROPERTIES = "CLOSE_PROPERTIES"
|
|
|
|
|
|
def token_from_type(tok_type):
|
|
return ModeToMarker[tok_type]
|
|
|
|
|
|
class TimeRange:
|
|
"""Represents a range of time with a start and end time.
|
|
|
|
Attributes:
|
|
start_time (OrgTime): The start time of the range.
|
|
end_time (OrgTime): The end time of the range.
|
|
"""
|
|
|
|
def __init__(self, start_time: OrgTime, end_time: OrgTime) -> None:
|
|
"""Initializes a TimeRange with a start time and an end time.
|
|
|
|
Args:
|
|
start_time (OrgTime): The start time of the range.
|
|
end_time (OrgTime): The end time of the range.
|
|
|
|
Raises:
|
|
AssertionError: If start_time or end_time is None.
|
|
"""
|
|
if start_time is None or end_time is None:
|
|
raise ValueError("start_time and end_time must not be None.")
|
|
self.start_time = start_time
|
|
self.end_time = end_time
|
|
|
|
def to_raw(self) -> str:
|
|
"""Converts the TimeRange to its raw string representation.
|
|
|
|
Returns:
|
|
str: The raw string representation of the TimeRange.
|
|
"""
|
|
return timerange_to_string(self)
|
|
|
|
@property
|
|
def duration(self) -> timedelta:
|
|
"""Calculates the duration of the TimeRange.
|
|
|
|
Returns:
|
|
timedelta: The duration between start_time and end_time.
|
|
"""
|
|
delta = self.end - self.start
|
|
return delta
|
|
|
|
@property
|
|
def start(self) -> datetime:
|
|
"""Gets the start time as a datetime object.
|
|
|
|
Returns:
|
|
datetime: The start time of the TimeRange.
|
|
"""
|
|
return self.start_time.time.to_datetime()
|
|
|
|
@property
|
|
def end(self) -> datetime:
|
|
"""Gets the end time as a datetime object.
|
|
|
|
Returns:
|
|
datetime: The end time of the TimeRange.
|
|
"""
|
|
return self.end_time.time.to_datetime()
|
|
|
|
def activate(self) -> None:
|
|
"""
|
|
Sets the active state for the times.
|
|
"""
|
|
self.start_time.active = True
|
|
self.end_time.active = True
|
|
|
|
def deactivate(self) -> None:
|
|
"""
|
|
Sets the inactive state for the times.
|
|
"""
|
|
self.start_time.active = False
|
|
self.end_time.active = False
|
|
|
|
|
|
class OrgTime:
|
|
"""Represents a point in time with optional end time and repetition.
|
|
|
|
Attributes:
|
|
time (Timestamp): The start time of the OrgTime instance.
|
|
end_time (Optional[Timestamp]): The end time of the OrgTime instance, if any.
|
|
"""
|
|
|
|
def __init__(self, ts: Timestamp, end_time: Optional[Timestamp] = None) -> None:
|
|
"""Initializes an OrgTime with a start time and an optional end time.
|
|
|
|
Args:
|
|
ts (Timestamp): The start time of the OrgTime instance.
|
|
end_time (Optional[Timestamp], optional): The end time of the OrgTime instance. Defaults to None.
|
|
|
|
Raises:
|
|
ValueError: If ts is None.
|
|
"""
|
|
if ts is None:
|
|
raise ValueError("Timestamp (ts) must not be None.")
|
|
self.time = ts
|
|
self.end_time = end_time
|
|
|
|
@property
|
|
def repetition(self) -> Optional[str]:
|
|
"""Gets the repetition information from the start time.
|
|
|
|
Returns:
|
|
Optional[str]: The repetition information, or None if not present.
|
|
"""
|
|
return self.time.repetition
|
|
|
|
@property
|
|
def duration(self) -> timedelta:
|
|
"""Calculates the duration between the start and end times.
|
|
|
|
Returns:
|
|
timedelta: The duration between the start and end times. If no end time is present, returns zero timedelta.
|
|
"""
|
|
if self.end_time is None:
|
|
return timedelta() # No duration
|
|
return self.end_time.to_datetime() - self.time.to_datetime()
|
|
|
|
def to_raw(self) -> str:
|
|
"""Converts the OrgTime to its raw string representation.
|
|
|
|
Returns:
|
|
str: The raw string representation of the OrgTime.
|
|
"""
|
|
return timestamp_to_string(self.time, self.end_time)
|
|
|
|
def __repr__(self) -> str:
|
|
"""Provides a string representation of the OrgTime instance.
|
|
|
|
Returns:
|
|
str: The string representation of the OrgTime.
|
|
"""
|
|
return f"OrgTime({self.to_raw()})"
|
|
|
|
@classmethod
|
|
def parse(cls, value: str) -> Optional["OrgTime"]:
|
|
"""Parses a string into an OrgTime object.
|
|
|
|
Args:
|
|
value (str): The string representation of the OrgTime.
|
|
|
|
Returns:
|
|
Optional[OrgTime]: The parsed OrgTime instance, or None if parsing fails.
|
|
"""
|
|
if m := ACTIVE_TIME_STAMP_RE.match(value):
|
|
active = True
|
|
elif m := INACTIVE_TIME_STAMP_RE.match(value):
|
|
active = False
|
|
else:
|
|
return None
|
|
|
|
repetition = None
|
|
if m.group("repetition"):
|
|
repetition = m.group("repetition").strip()
|
|
|
|
if m.group("end_hour"):
|
|
return cls(
|
|
Timestamp(
|
|
active,
|
|
int(m.group("year")),
|
|
int(m.group("month")),
|
|
int(m.group("day")),
|
|
m.group("dow"),
|
|
int(m.group("start_hour")),
|
|
int(m.group("start_minute")),
|
|
repetition=repetition,
|
|
),
|
|
Timestamp(
|
|
active,
|
|
int(m.group("year")),
|
|
int(m.group("month")),
|
|
int(m.group("day")),
|
|
m.group("dow"),
|
|
int(m.group("end_hour")),
|
|
int(m.group("end_minute")),
|
|
),
|
|
)
|
|
|
|
return cls(
|
|
Timestamp(
|
|
active,
|
|
int(m.group("year")),
|
|
int(m.group("month")),
|
|
int(m.group("day")),
|
|
m.group("dow"),
|
|
int(m.group("start_hour")) if m.group("start_hour") else None,
|
|
int(m.group("start_minute")) if m.group("start_minute") else None,
|
|
repetition=repetition,
|
|
)
|
|
)
|
|
|
|
@property
|
|
def active(self) -> bool:
|
|
"""
|
|
Checks if the time is set as active.
|
|
"""
|
|
return self.time.active
|
|
|
|
|
|
@active.setter
|
|
def active(self, value: bool) -> None:
|
|
"""
|
|
Sets the active state for the timestamp.
|
|
"""
|
|
self.time.active = value
|
|
|
|
def activate(self) -> None:
|
|
"""
|
|
Sets the active state for the timestamp.
|
|
"""
|
|
self.active = True
|
|
|
|
def deactivate(self) -> None:
|
|
"""
|
|
Sets the inactive state for the timestamp.
|
|
"""
|
|
self.active = False
|
|
|
|
def from_datetime(self, dt: datetime) -> None:
|
|
"""
|
|
Updates the timestamp to use the given datetime.
|
|
|
|
Args:
|
|
dt (datetime): The datetime to update the timestamp with.
|
|
"""
|
|
self.time.from_datetime(dt)
|
|
if self.end_time:
|
|
self.end_time.from_datetime(dt)
|
|
|
|
|
|
def time_from_str(s: str) -> Optional[OrgTime]:
|
|
return OrgTime.parse(s)
|
|
|
|
|
|
def timerange_to_string(tr: TimeRange):
|
|
return tr.start_time.to_raw() + "--" + tr.end_time.to_raw()
|
|
|
|
|
|
def timestamp_to_string(ts: Timestamp, end_time: Optional[Timestamp] = None) -> str:
|
|
date = "{year}-{month:02d}-{day:02d}".format(
|
|
year=ts.year, month=ts.month, day=ts.day
|
|
)
|
|
if ts.dow:
|
|
date = date + " " + ts.dow
|
|
|
|
if ts.hour is not None:
|
|
base = "{date} {hour:02}:{minute:02d}".format(
|
|
date=date, hour=ts.hour, minute=ts.minute or 0
|
|
)
|
|
else:
|
|
base = date
|
|
|
|
if end_time is not None:
|
|
assert end_time.hour is not None
|
|
assert end_time.minute is not None
|
|
base = "{base}-{hour:02}:{minute:02d}".format(
|
|
base=base, hour=end_time.hour, minute=end_time.minute
|
|
)
|
|
|
|
if ts.repetition is not None:
|
|
base = base + " " + ts.repetition
|
|
|
|
if ts.active:
|
|
return "<{}>".format(base)
|
|
else:
|
|
return "[{}]".format(base)
|
|
|
|
|
|
Time = Union[TimeRange, OrgTime]
|
|
|
|
|
|
def parse_time(value: str) -> Optional[Time]:
|
|
if (value.count(">--<") == 1) or (value.count("]--[") == 1):
|
|
# Time ranges with two different dates
|
|
# @TODO properly consider "=> DURATION" section
|
|
start, end = value.split("=")[0].split("--")
|
|
as_time_range = parse_org_time_range(start, end)
|
|
if as_time_range is None:
|
|
return None
|
|
|
|
if (as_time_range.start_time is not None) and (
|
|
as_time_range.end_time is not None
|
|
):
|
|
return as_time_range
|
|
else:
|
|
raise Exception("Unknown time range format: {}".format(value))
|
|
elif as_time := OrgTime.parse(value):
|
|
return as_time
|
|
else:
|
|
return None
|
|
|
|
|
|
def parse_org_time_range(start, end) -> Optional[TimeRange]:
|
|
start_time = OrgTime.parse(start)
|
|
end_time = OrgTime.parse(end)
|
|
|
|
if start_time is None or end_time is None:
|
|
return None
|
|
return TimeRange(start_time, end_time)
|
|
|
|
|
|
def get_raw(doc):
|
|
if isinstance(doc, str):
|
|
return doc
|
|
else:
|
|
return doc.get_raw()
|
|
|
|
|
|
class Line:
|
|
def __init__(self, linenum, contents):
|
|
self.linenum = linenum
|
|
self.contents = contents
|
|
|
|
def get_raw(self):
|
|
rawchunks = []
|
|
for chunk in self.contents:
|
|
if isinstance(chunk, str):
|
|
rawchunks.append(chunk)
|
|
else:
|
|
rawchunks.append(chunk.get_raw())
|
|
return "".join(rawchunks) + "\n"
|
|
|
|
|
|
class Link:
|
|
def __init__(
|
|
self, value: str, description: Optional[str], origin: Optional[RangeInRaw]
|
|
):
|
|
self._value = value
|
|
self._description = description
|
|
self._origin = origin
|
|
|
|
def get_raw(self):
|
|
if self.description:
|
|
return "[[{}][{}]]".format(self.value, self.description)
|
|
else:
|
|
return "[[{}]]".format(self.value)
|
|
|
|
def _update_content(self):
|
|
new_contents: List[Union[str, LinkToken]] = []
|
|
new_contents.append(self._value)
|
|
if self._description:
|
|
new_contents.append(LinkToken(LinkTokenType.OPEN_DESCRIPTION))
|
|
new_contents.append(self._description)
|
|
if self._origin is not None:
|
|
self._origin.update_range(new_contents)
|
|
|
|
@property
|
|
def value(self):
|
|
return self._value
|
|
|
|
@value.setter
|
|
def value(self, new_value):
|
|
self._value = new_value
|
|
self._update_content()
|
|
|
|
@property
|
|
def description(self):
|
|
return self._description
|
|
|
|
@description.setter
|
|
def description(self, new_description):
|
|
self._description = new_description
|
|
self._update_content()
|
|
|
|
|
|
class Text:
|
|
def __init__(self, contents, line):
|
|
self.contents = contents
|
|
self.linenum = line
|
|
|
|
def __repr__(self):
|
|
return "{{Text line: {}; content: {} }}".format(self.linenum, self.contents)
|
|
|
|
def get_text(self):
|
|
return token_list_to_plaintext(self.contents)
|
|
|
|
def get_raw(self):
|
|
return token_list_to_raw(self.contents)
|
|
|
|
|
|
def token_list_to_plaintext(tok_list) -> str:
|
|
contents = []
|
|
in_link = False
|
|
in_description = False
|
|
link_description = []
|
|
link_url = []
|
|
for chunk in tok_list:
|
|
if isinstance(chunk, str):
|
|
if not in_link:
|
|
contents.append(chunk)
|
|
elif in_description:
|
|
link_description.append(chunk)
|
|
else:
|
|
link_url.append(chunk)
|
|
elif isinstance(chunk, LinkToken):
|
|
if chunk.tok_type == LinkTokenType.OPEN_LINK:
|
|
in_link = True
|
|
elif chunk.tok_type == LinkTokenType.OPEN_DESCRIPTION:
|
|
in_description = True
|
|
else:
|
|
assert chunk.tok_type == LinkTokenType.CLOSE
|
|
if not in_description:
|
|
# This might happen when link doesn't have a separate description
|
|
link_description = link_url
|
|
contents.append("".join(link_description))
|
|
|
|
in_link = False
|
|
in_description = False
|
|
link_description = []
|
|
link_url = []
|
|
else:
|
|
assert isinstance(chunk, MarkerToken)
|
|
|
|
return "".join(contents)
|
|
|
|
|
|
def token_list_to_raw(tok_list):
|
|
contents = []
|
|
for chunk in tok_list:
|
|
if isinstance(chunk, str):
|
|
contents.append(chunk)
|
|
elif isinstance(chunk, LinkToken):
|
|
if chunk.tok_type == LinkTokenType.OPEN_LINK:
|
|
contents.append("[[")
|
|
elif chunk.tok_type == LinkTokenType.OPEN_DESCRIPTION:
|
|
contents.append("][")
|
|
else:
|
|
assert chunk.tok_type == LinkTokenType.CLOSE
|
|
contents.append("]]")
|
|
else:
|
|
assert isinstance(chunk, MarkerToken)
|
|
contents.append(token_from_type(chunk.tok_type))
|
|
return "".join(contents)
|
|
|
|
|
|
class Bold:
|
|
Marker = "*"
|
|
|
|
def __init__(self, contents, line):
|
|
self.contents = contents
|
|
|
|
def get_raw(self):
|
|
raw = "".join(map(get_raw, self.contents))
|
|
return f"{self.Marker}{raw}{self.Marker}"
|
|
|
|
|
|
class Code:
|
|
Marker = "~"
|
|
|
|
def __init__(self, contents, line):
|
|
self.contents = contents
|
|
|
|
def get_raw(self):
|
|
raw = "".join(map(get_raw, self.contents))
|
|
return f"{self.Marker}{raw}{self.Marker}"
|
|
|
|
|
|
class Italic:
|
|
Marker = "/"
|
|
|
|
def __init__(self, contents, line):
|
|
self.contents = contents
|
|
|
|
def get_raw(self):
|
|
raw = "".join(map(get_raw, self.contents))
|
|
return f"{self.Marker}{raw}{self.Marker}"
|
|
|
|
|
|
class Strike:
|
|
Marker = "+"
|
|
|
|
def __init__(self, contents, line):
|
|
self.contents = contents
|
|
|
|
def get_raw(self):
|
|
raw = "".join(map(get_raw, self.contents))
|
|
return f"{self.Marker}{raw}{self.Marker}"
|
|
|
|
|
|
class Underlined:
|
|
Marker = "_"
|
|
|
|
def __init__(self, contents, line):
|
|
self.contents = contents
|
|
|
|
def get_raw(self):
|
|
raw = "".join(map(get_raw, self.contents))
|
|
return f"{self.Marker}{raw}{self.Marker}"
|
|
|
|
|
|
class Verbatim:
|
|
Marker = "="
|
|
|
|
def __init__(self, contents, line):
|
|
self.contents = contents
|
|
|
|
def get_raw(self):
|
|
raw = "".join(map(get_raw, self.contents))
|
|
return f"{self.Marker}{raw}{self.Marker}"
|
|
|
|
|
|
def is_pre(char: Optional[str]) -> bool:
|
|
if isinstance(char, str):
|
|
return char in "\n\r\t -({'\""
|
|
else:
|
|
return True
|
|
|
|
|
|
def is_marker(char: str) -> bool:
|
|
if isinstance(char, str):
|
|
return char in "*=/+_~"
|
|
else:
|
|
return False
|
|
|
|
|
|
def is_border(char: str) -> bool:
|
|
if isinstance(char, str):
|
|
return char not in "\n\r\t "
|
|
else:
|
|
return False
|
|
|
|
|
|
def is_body(char: str) -> bool:
|
|
if isinstance(char, str):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
def is_post(char: str) -> bool:
|
|
if isinstance(char, str):
|
|
return char in "-.,;:!?')}[\""
|
|
else:
|
|
return False
|
|
|
|
|
|
TOKEN_TYPE_TEXT = 0
|
|
TOKEN_TYPE_OPEN_MARKER = 1
|
|
TOKEN_TYPE_CLOSE_MARKER = 2
|
|
TOKEN_TYPE_OPEN_LINK = 3
|
|
TOKEN_TYPE_CLOSE_LINK = 4
|
|
TOKEN_TYPE_OPEN_DESCRIPTION = 5
|
|
|
|
TokenItems = Union[Tuple[int, Union[None, str, MarkerToken]],]
|
|
|
|
|
|
def tokenize_contents(contents: str) -> List[TokenItems]:
|
|
tokens: List[TokenItems] = []
|
|
last_char = None
|
|
|
|
text: List[str] = []
|
|
closes = set()
|
|
in_link = False
|
|
in_link_description = False
|
|
last_link_start = 0
|
|
|
|
def cut_string():
|
|
nonlocal text
|
|
nonlocal tokens
|
|
|
|
if len(text) > 0:
|
|
tokens.append((TOKEN_TYPE_TEXT, "".join(text)))
|
|
text = []
|
|
|
|
cursor = enumerate(contents)
|
|
for i, char in cursor:
|
|
has_changed = False
|
|
|
|
# Possible link opening
|
|
if char == "[":
|
|
if (
|
|
len(contents) > i + 3
|
|
# At least 3 characters more to open and close a link
|
|
and contents[i + 1] == "["
|
|
# TODO: Generalize this to a backtracking, don't just fix the test case...
|
|
and contents[i + 2] != "["
|
|
):
|
|
close = contents.find("]]", i)
|
|
|
|
if close != -1:
|
|
# Link with no description
|
|
cut_string()
|
|
|
|
in_link = True
|
|
tokens.append((TOKEN_TYPE_OPEN_LINK, None))
|
|
assert "[" == (next(cursor)[1])
|
|
last_link_start = i
|
|
continue
|
|
if close != -1 and contents[close + 1] == "[":
|
|
# Link with description?
|
|
|
|
close = contents.find("]", close + 1)
|
|
if close != -1 and contents[close + 1] == "]":
|
|
# No match here means this is not an Org link
|
|
cut_string()
|
|
|
|
in_link = True
|
|
tokens.append((TOKEN_TYPE_OPEN_LINK, None))
|
|
assert "[" == (next(cursor)[1])
|
|
last_link_start = i
|
|
continue
|
|
|
|
# Possible link close or open of description
|
|
if char == "]" and len(contents) > i + 1 and in_link:
|
|
if contents[i + 1] == "]":
|
|
cut_string()
|
|
|
|
tokens.append((TOKEN_TYPE_CLOSE_LINK, None))
|
|
assert "]" == (next(cursor)[1])
|
|
in_link = False
|
|
in_link_description = False
|
|
continue
|
|
|
|
elif contents[i + 1] == "[":
|
|
cut_string()
|
|
|
|
tokens.append((TOKEN_TYPE_OPEN_DESCRIPTION, None))
|
|
assert "[" == (next(cursor)[1])
|
|
continue
|
|
|
|
if in_link and not in_link_description:
|
|
# Link's pointer have no formatting
|
|
pass
|
|
|
|
elif (
|
|
(i not in closes)
|
|
and is_marker(char)
|
|
and is_pre(last_char)
|
|
and ((i + 1 < len(contents)) and is_border(contents[i + 1]))
|
|
):
|
|
|
|
is_valid_mark = False
|
|
# Check that is closed later
|
|
text_in_line = True
|
|
for j in range(i, len(contents) - 1):
|
|
if contents[j] == "\n":
|
|
if not text_in_line:
|
|
break
|
|
text_in_line = False
|
|
elif is_border(contents[j]) and contents[j + 1] == char:
|
|
is_valid_mark = True
|
|
closes.add(j + 1)
|
|
break
|
|
else:
|
|
text_in_line |= is_body(contents[j])
|
|
|
|
if is_valid_mark:
|
|
cut_string()
|
|
tokens.append((TOKEN_TYPE_OPEN_MARKER, char))
|
|
has_changed = True
|
|
elif i in closes:
|
|
cut_string()
|
|
tokens.append((TOKEN_TYPE_CLOSE_MARKER, char))
|
|
has_changed = True
|
|
|
|
if not has_changed:
|
|
text.append(char)
|
|
last_char = char
|
|
|
|
if len(text) > 0:
|
|
tokens.append((TOKEN_TYPE_TEXT, "".join(text)))
|
|
|
|
return tokens
|
|
|
|
|
|
def parse_contents(raw_contents: List[RawLine]):
|
|
if len(raw_contents) == 0:
|
|
return []
|
|
|
|
blocks = []
|
|
current_block: List[RawLine] = []
|
|
|
|
for line in raw_contents:
|
|
if len(current_block) == 0:
|
|
# Seed the first block
|
|
current_line = line.linenum
|
|
current_block.append(line)
|
|
else:
|
|
current_line = cast(int, current_line)
|
|
if line.linenum == current_line + 1:
|
|
# Continue with the current block
|
|
current_line = line.linenum
|
|
current_block.append(line)
|
|
else:
|
|
# Split the blocks
|
|
blocks.append(current_block)
|
|
current_line = line.linenum
|
|
current_block = [line]
|
|
|
|
# Check that the current block is not left behind
|
|
if len(current_block) > 0:
|
|
blocks.append(current_block)
|
|
|
|
return [parse_content_block(block) for block in blocks]
|
|
|
|
|
|
def parse_content_block(raw_contents: Union[List[RawLine], str]):
|
|
contents_buff = []
|
|
if isinstance(raw_contents, str):
|
|
contents_buff.append(raw_contents)
|
|
else:
|
|
for line in raw_contents:
|
|
contents_buff.append(line.line)
|
|
|
|
contents_buff_text = "\n".join(contents_buff)
|
|
tokens = tokenize_contents(contents_buff_text)
|
|
if isinstance(raw_contents, str):
|
|
current_line = None
|
|
else:
|
|
current_line = raw_contents[0].linenum
|
|
|
|
contents: List[Union[str, MarkerToken, LinkToken]] = []
|
|
# Use tokens to tag chunks of text with it's container type
|
|
for tok_type, tok_val in tokens:
|
|
if tok_type == TOKEN_TYPE_TEXT:
|
|
assert isinstance(tok_val, str)
|
|
contents.append(tok_val)
|
|
elif tok_type == TOKEN_TYPE_OPEN_MARKER:
|
|
assert isinstance(tok_val, str)
|
|
contents.append(MarkerToken(False, MARKERS[tok_val]))
|
|
elif tok_type == TOKEN_TYPE_CLOSE_MARKER:
|
|
assert isinstance(tok_val, str)
|
|
contents.append(MarkerToken(True, MARKERS[tok_val]))
|
|
elif tok_type == TOKEN_TYPE_OPEN_LINK:
|
|
contents.append(LinkToken(LinkTokenType.OPEN_LINK))
|
|
elif tok_type == TOKEN_TYPE_OPEN_DESCRIPTION:
|
|
contents.append(LinkToken(LinkTokenType.OPEN_DESCRIPTION))
|
|
elif tok_type == TOKEN_TYPE_CLOSE_LINK:
|
|
contents.append(LinkToken(LinkTokenType.CLOSE))
|
|
|
|
return Text(contents, current_line)
|
|
|
|
|
|
def dump_contents(raw):
|
|
if isinstance(raw, RawLine):
|
|
return (raw.linenum, raw.line)
|
|
|
|
elif isinstance(raw, ListItem):
|
|
bullet = raw.bullet if raw.bullet else raw.counter + raw.counter_sep
|
|
content_full = token_list_to_raw(raw.content)
|
|
content_lines = content_full.split("\n")
|
|
content = "\n".join(content_lines)
|
|
checkbox = f"[{raw.checkbox_value}]" if raw.checkbox_value else ""
|
|
tag = (
|
|
f"{raw.tag_indentation}{token_list_to_raw(raw.tag or '')}::"
|
|
if raw.tag or raw.tag_indentation
|
|
else ""
|
|
)
|
|
return (
|
|
raw.linenum,
|
|
f"{raw.indentation}{bullet} {checkbox}{tag}{content}",
|
|
)
|
|
|
|
elif isinstance(raw, TableRow):
|
|
closed = "|" if raw.last_cell_closed else ""
|
|
return (
|
|
raw.linenum,
|
|
f"{' ' * raw.indentation}|{'|'.join(raw.cells)}{closed}{raw.suffix}",
|
|
)
|
|
|
|
return (raw.linenum, raw.get_raw())
|
|
|
|
|
|
def parse_headline(hl, doc, parent) -> Headline:
|
|
stars = hl["orig"].group("stars")
|
|
depth = len(stars)
|
|
spacing = hl["orig"].group("spacing")
|
|
|
|
# TODO: Parse line for priority, cookies and tags
|
|
line = hl["orig"].group("line")
|
|
hl_tags = HEADLINE_TAGS_RE.search(line)
|
|
|
|
if hl_tags is None:
|
|
tags = []
|
|
else:
|
|
tags = hl_tags.group(0)[1:-1].split(":")
|
|
line = HEADLINE_TAGS_RE.sub("", line)
|
|
|
|
hl_state = None
|
|
title = line
|
|
is_done = is_todo = False
|
|
for state in doc.todo_keywords or []:
|
|
if title.startswith(state['name'] + " "):
|
|
hl_state = state
|
|
title = title[len(state['name'] + " ") :]
|
|
is_todo = True
|
|
break
|
|
else:
|
|
for state in doc.done_keywords or []:
|
|
if title.startswith(state['name'] + " "):
|
|
hl_state = state
|
|
title = title[len(state['name'] + " ") :]
|
|
is_done = True
|
|
break
|
|
|
|
contents = parse_contents(hl["contents"])
|
|
|
|
if not (isinstance(parent, OrgDoc) or depth > parent.depth):
|
|
raise AssertionError(
|
|
"Incorrectly parsed parent on `{}' > `{}'".format(parent.title, title)
|
|
)
|
|
|
|
headline = Headline(
|
|
start_line=hl["linenum"],
|
|
depth=depth,
|
|
orig=hl["orig"],
|
|
title=title,
|
|
state=hl_state,
|
|
contents=contents,
|
|
children=None,
|
|
keywords=hl["keywords"],
|
|
properties=hl["properties"],
|
|
structural=hl["structural"],
|
|
delimiters=hl["delimiters"],
|
|
list_items=hl["list_items"],
|
|
table_rows=hl["table_rows"],
|
|
title_start=None,
|
|
priority=None,
|
|
priority_start=None,
|
|
tags_start=None,
|
|
tags=tags,
|
|
parent=parent,
|
|
is_todo=is_todo,
|
|
is_done=is_done,
|
|
spacing=spacing,
|
|
)
|
|
|
|
headline.children = [
|
|
parse_headline(child, doc, headline) for child in hl["children"]
|
|
]
|
|
return headline
|
|
|
|
|
|
def dump_kw(kw):
|
|
options = kw.match.group("options")
|
|
if not options:
|
|
options = ""
|
|
|
|
return (
|
|
kw.linenum,
|
|
"{indentation}#+{key}{options}:{spacing}{value}".format(
|
|
indentation=kw.match.group("indentation"),
|
|
key=kw.key,
|
|
options=kw.options,
|
|
spacing=kw.match.group("spacing"),
|
|
value=kw.value,
|
|
),
|
|
)
|
|
|
|
|
|
def dump_property(prop: Property):
|
|
plus = ""
|
|
indentation = ""
|
|
spacing = " "
|
|
if prop.match is not None:
|
|
plus = prop.match.group("plus")
|
|
if plus is None:
|
|
plus = ""
|
|
indentation = prop.match.group("indentation")
|
|
spacing = prop.match.group("spacing")
|
|
|
|
if isinstance(prop.value, TimeRange):
|
|
value = timerange_to_string(prop.value)
|
|
elif isinstance(prop.value, OrgTime):
|
|
value = prop.value.to_raw()
|
|
else:
|
|
value = prop.value
|
|
|
|
return (
|
|
prop.linenum,
|
|
"{indentation}:{key}{plus}:{spacing}{value}".format(
|
|
indentation=indentation,
|
|
key=prop.key,
|
|
plus=plus,
|
|
spacing=spacing,
|
|
value=value,
|
|
),
|
|
)
|
|
|
|
|
|
def dump_structural(structural: Tuple):
|
|
return (structural[0], structural[1])
|
|
|
|
|
|
def dump_delimiters(line: DelimiterLine):
|
|
return (line.linenum, line.line)
|
|
|
|
|
|
def parse_todo_done_keywords(line: str) -> OrgDocDeclaredStates:
|
|
clean_line = re.sub(r"\([^)]+\)", "", line)
|
|
if '|' in clean_line:
|
|
todo_kws, done_kws = clean_line.split("|", 1)
|
|
has_split = True
|
|
else:
|
|
# Standard behavior in this case is: the last state is the one considered as DONE
|
|
todo_kws = clean_line
|
|
|
|
todo_keywords = re.sub(r"\s{2,}", " ", todo_kws.strip()).split()
|
|
if has_split:
|
|
done_keywords = re.sub(r"\s{2,}", " ", done_kws.strip()).split()
|
|
else:
|
|
done_keywods = [todo_keywords[-1]]
|
|
todo_keywords = todo_keywords[:-1]
|
|
|
|
return {
|
|
"not_completed": [
|
|
HeadlineState(name=keyword)
|
|
for keyword in todo_keywords
|
|
],
|
|
"completed": [
|
|
HeadlineState(name=keyword)
|
|
for keyword in done_keywords
|
|
],
|
|
}
|
|
|
|
|
|
class OrgDoc:
|
|
def __init__(
|
|
self, headlines, keywords, contents, list_items, structural, properties,
|
|
environment=BASE_ENVIRONMENT,
|
|
):
|
|
self.todo_keywords = [HeadlineState(name=kw) for kw in DEFAULT_TODO_KEYWORDS]
|
|
self.done_keywords = [HeadlineState(name=kw) for kw in DEFAULT_DONE_KEYWORDS]
|
|
|
|
keywords_set_in_file = False
|
|
for keyword in keywords:
|
|
if keyword.key in ("TODO", "SEQ_TODO"):
|
|
states = parse_todo_done_keywords(keyword.value)
|
|
self.todo_keywords, self.done_keywords = states['not_completed'], states['completed']
|
|
keywords_set_in_file = True
|
|
|
|
if not keywords_set_in_file and 'org-todo-keywords' in environment:
|
|
# Read keywords from environment
|
|
states = parse_todo_done_keywords(environment['org-todo-keywords'])
|
|
self.todo_keywords, self.done_keywords = states['not_completed'], states['completed']
|
|
|
|
self.keywords: List[Property] = keywords
|
|
self.contents: List[RawLine] = contents
|
|
self.list_items: List[ListItem] = list_items
|
|
self.structural: List = structural
|
|
self.properties: List = properties
|
|
self._path = None
|
|
self.headlines: List[Headline] = list(
|
|
map(lambda hl: parse_headline(hl, self, self), headlines)
|
|
)
|
|
self.environment = environment
|
|
|
|
@property
|
|
def id(self):
|
|
"""
|
|
Created by org-roam v2.
|
|
"""
|
|
for p in self.properties:
|
|
if p.key == "ID":
|
|
return p.value
|
|
return None
|
|
|
|
@property
|
|
def path(self):
|
|
return self._path
|
|
|
|
## Querying
|
|
def get_links(self):
|
|
for headline in self.headlines:
|
|
yield from headline.get_links()
|
|
|
|
for content in self.contents:
|
|
yield from get_links_from_content(content)
|
|
|
|
def get_keywords(self, name: str, default=None):
|
|
for prop in self.keywords:
|
|
if prop.key == name:
|
|
return prop.value
|
|
|
|
return default
|
|
|
|
def get_property(self, name: str, default=None):
|
|
for prop in self.properties:
|
|
if prop.key == name:
|
|
return prop.value
|
|
|
|
return default
|
|
|
|
def getProperties(self):
|
|
return self.keywords
|
|
|
|
def getTopHeadlines(self):
|
|
return self.headlines
|
|
|
|
def getAllHeadlines(self) -> Iterator[Headline]:
|
|
todo = self.headlines[::-1] # We go backwards, to pop/append and go depth-first
|
|
while len(todo) != 0:
|
|
hl = todo.pop()
|
|
todo.extend(hl.children[::-1])
|
|
|
|
yield hl
|
|
|
|
def get_code_snippets(self):
|
|
for headline in self.headlines:
|
|
yield from headline.get_code_snippets()
|
|
|
|
# Writing
|
|
def dump_headline(self, headline, recursive=True):
|
|
|
|
tags = ""
|
|
if len(headline.shallow_tags) > 0:
|
|
tags = ":" + ":".join(headline.shallow_tags) + ":"
|
|
|
|
state = ""
|
|
if headline.state:
|
|
state = headline.state['name'] + " "
|
|
|
|
raw_title = token_list_to_raw(headline.title.contents)
|
|
tags_padding = ""
|
|
if not (raw_title.endswith(" ") or raw_title.endswith("\t")) and tags:
|
|
tags_padding = " "
|
|
|
|
yield "*" * headline.depth + headline.spacing + state + raw_title + tags_padding + tags
|
|
|
|
planning = headline.get_planning_line()
|
|
if planning is not None:
|
|
yield planning
|
|
|
|
lines = []
|
|
KW_T = 0
|
|
CONTENT_T = 1
|
|
PROPERTIES_T = 2
|
|
STRUCTURAL_T = 3
|
|
for keyword in headline.keywords:
|
|
lines.append((KW_T, dump_kw(keyword)))
|
|
|
|
for content in headline.contents:
|
|
lines.append((CONTENT_T, dump_contents(content)))
|
|
|
|
for li in headline.list_items:
|
|
lines.append((CONTENT_T, dump_contents(li)))
|
|
|
|
for row in headline.table_rows:
|
|
lines.append((CONTENT_T, dump_contents(row)))
|
|
|
|
for prop in headline.properties:
|
|
lines.append((PROPERTIES_T, dump_property(prop)))
|
|
|
|
for struct in headline.structural:
|
|
lines.append((STRUCTURAL_T, dump_structural(struct)))
|
|
|
|
for content in headline.delimiters:
|
|
lines.append((STRUCTURAL_T, dump_delimiters(content)))
|
|
|
|
lines = sorted(lines, key=lambda x: x[1][0])
|
|
|
|
structured_lines = []
|
|
last_type = None
|
|
for i, line in enumerate(lines):
|
|
ltype = line[0]
|
|
content = line[1][1]
|
|
|
|
content = content + "\n"
|
|
last_type = ltype
|
|
structured_lines.append(content)
|
|
|
|
if last_type == PROPERTIES_T:
|
|
# No structural closing
|
|
|
|
indentation = 0
|
|
if len(lines) > 0:
|
|
last_line = lines[i - 1][1][1]
|
|
indentation = last_line.index(":")
|
|
structured_lines.append(" " * indentation + ":END:\n")
|
|
logging.warning(
|
|
"Added structural:{}: {}".format(
|
|
line[1][0], structured_lines[-1].strip()
|
|
)
|
|
)
|
|
|
|
if len(structured_lines) > 0:
|
|
content = "".join(structured_lines)
|
|
|
|
# Remove the last line jump, which will be accounted for by the "yield operation"
|
|
assert content.endswith("\n")
|
|
content = content[:-1]
|
|
yield content
|
|
|
|
if recursive:
|
|
for child in headline.children:
|
|
yield from self.dump_headline(child, recursive=recursive)
|
|
|
|
def dump(self):
|
|
lines = []
|
|
for prop in self.properties:
|
|
lines.append(dump_property(prop))
|
|
|
|
for struct in self.structural:
|
|
lines.append(dump_structural(struct))
|
|
|
|
for kw in self.keywords:
|
|
lines.append(dump_kw(kw))
|
|
|
|
for line in self.contents:
|
|
lines.append(dump_contents(line))
|
|
|
|
for li in self.list_items:
|
|
lines.append(dump_contents(li))
|
|
|
|
yield from map(lambda x: x[1], sorted(lines, key=lambda x: x[0]))
|
|
|
|
for headline in self.headlines:
|
|
yield from self.dump_headline(headline)
|
|
|
|
|
|
class OrgDocReader:
|
|
def __init__(self, environment=BASE_ENVIRONMENT):
|
|
self.headlines: List[HeadlineDict] = []
|
|
self.keywords: List[Keyword] = []
|
|
self.headline_hierarchy: List[Optional[HeadlineDict]] = []
|
|
self.contents: List[RawLine] = []
|
|
self.delimiters: List[DelimiterLine] = []
|
|
self.list_items: List[ListItem] = []
|
|
self.table_rows: List[TableRow] = []
|
|
self.structural: List = []
|
|
self.properties: List = []
|
|
self.current_drawer: Optional[List] = None
|
|
self.environment = environment
|
|
|
|
def finalize(self):
|
|
return OrgDoc(
|
|
self.headlines,
|
|
self.keywords,
|
|
self.contents,
|
|
self.list_items,
|
|
self.structural,
|
|
self.properties,
|
|
self.environment,
|
|
)
|
|
|
|
## Construction
|
|
def add_headline(self, linenum: int, match: re.Match):
|
|
# Position reader on the proper headline
|
|
stars = match.group("stars")
|
|
depth = len(stars)
|
|
|
|
headline: HeadlineDict = {
|
|
"linenum": linenum,
|
|
"orig": match,
|
|
"title": match.group("line"),
|
|
"contents": [],
|
|
"children": [],
|
|
"keywords": [],
|
|
"properties": [],
|
|
"logbook": [],
|
|
"structural": [],
|
|
"delimiters": [],
|
|
"results": [], # TODO: Move to each specific code block?
|
|
"list_items": [],
|
|
"table_rows": [],
|
|
}
|
|
|
|
while (depth - 1) > len(self.headline_hierarchy):
|
|
# Introduce structural headlines
|
|
self.headline_hierarchy.append(None)
|
|
while depth <= len(self.headline_hierarchy):
|
|
self.headline_hierarchy.pop()
|
|
|
|
if depth == 1:
|
|
self.headlines.append(headline)
|
|
else:
|
|
parent_idx = len(self.headline_hierarchy) - 1
|
|
while self.headline_hierarchy[parent_idx] is None:
|
|
parent_idx -= 1
|
|
parent_headline = self.headline_hierarchy[parent_idx]
|
|
assert parent_headline is not None
|
|
parent_headline["children"].append(headline)
|
|
self.headline_hierarchy.append(headline)
|
|
|
|
if all([hl is not None for hl in self.headline_hierarchy]):
|
|
if not (
|
|
[
|
|
len(cast(HeadlineDict, hl)["orig"].group("stars"))
|
|
for hl in self.headline_hierarchy
|
|
]
|
|
== list(range(1, len(self.headline_hierarchy) + 1))
|
|
):
|
|
raise AssertionError("Error on Headline Hierarchy")
|
|
else:
|
|
# This might happen if headlines with more that 1 level deeper are found
|
|
pass
|
|
|
|
# We can safely assert this as all the `None`s are there to
|
|
# support the addition of a `HeadlineDict` at the correct
|
|
# depth but not more
|
|
assert self.headline_hierarchy[-1] is not None
|
|
|
|
def add_list_item_line(self, linenum: int, match: re.Match) -> ListItem:
|
|
li = ListItem(
|
|
linenum=linenum,
|
|
match=match,
|
|
indentation=match.group("indentation"),
|
|
bullet=match.group("bullet"),
|
|
counter=match.group("counter"),
|
|
counter_sep=match.group("counter_sep"),
|
|
checkbox_indentation=match.group("checkbox_indentation"),
|
|
checkbox_value=match.group("checkbox_value"),
|
|
tag_indentation=match.group("tag_indentation"),
|
|
tag=(
|
|
parse_content_block(
|
|
[RawLine(linenum=linenum, line=match.group("tag"))]
|
|
).contents
|
|
if match.group("tag")
|
|
else None
|
|
),
|
|
content=parse_content_block(
|
|
[RawLine(linenum=linenum, line=match.group("content"))]
|
|
).contents,
|
|
)
|
|
|
|
if len(self.headline_hierarchy) == 0:
|
|
self.list_items.append(li)
|
|
else:
|
|
assert self.headline_hierarchy[-1] is not None
|
|
self.headline_hierarchy[-1]["list_items"].append(li)
|
|
return li
|
|
|
|
def add_table_line(self, linenum: int, line: str):
|
|
chunks = line.split("|")
|
|
indentation = len(chunks[0])
|
|
if chunks[-1].strip() == "":
|
|
suffix = chunks[-1]
|
|
cells = chunks[1:-1]
|
|
last_cell_closed = True
|
|
else:
|
|
suffix = ""
|
|
cells = chunks[1:]
|
|
last_cell_closed = False
|
|
|
|
row = TableRow(
|
|
linenum,
|
|
indentation,
|
|
suffix,
|
|
last_cell_closed,
|
|
cells,
|
|
)
|
|
|
|
if len(self.headline_hierarchy) == 0:
|
|
self.table_rows.append(row)
|
|
else:
|
|
assert self.headline_hierarchy[-1] is not None
|
|
self.headline_hierarchy[-1]["table_rows"].append(row)
|
|
|
|
def add_keyword_line(self, linenum: int, match: re.Match):
|
|
options = match.group("options")
|
|
kw = Keyword(
|
|
linenum,
|
|
match,
|
|
match.group("key"),
|
|
match.group("value"),
|
|
options if options is not None else "",
|
|
)
|
|
if len(self.headline_hierarchy) == 0:
|
|
self.keywords.append(kw)
|
|
else:
|
|
assert self.headline_hierarchy[-1] is not None
|
|
self.headline_hierarchy[-1]["keywords"].append(kw)
|
|
|
|
def add_raw_line(self, linenum: int, line: str):
|
|
raw = RawLine(linenum, line)
|
|
if len(self.headline_hierarchy) == 0:
|
|
self.contents.append(raw)
|
|
else:
|
|
assert self.headline_hierarchy[-1] is not None
|
|
self.headline_hierarchy[-1]["contents"].append(raw)
|
|
|
|
def add_begin_block_line(self, linenum: int, match: re.Match):
|
|
line = DelimiterLine(
|
|
linenum,
|
|
match.group(0),
|
|
DelimiterLineType.BEGIN_BLOCK,
|
|
BlockDelimiterTypeData(match.group("subtype")),
|
|
match.group("arguments"),
|
|
)
|
|
if len(self.headline_hierarchy) == 0:
|
|
self.delimiters.append(line)
|
|
else:
|
|
assert self.headline_hierarchy[-1] is not None
|
|
self.headline_hierarchy[-1]["delimiters"].append(line)
|
|
|
|
def add_end_block_line(self, linenum: int, match: re.Match):
|
|
line = DelimiterLine(
|
|
linenum,
|
|
match.group(0),
|
|
DelimiterLineType.END_BLOCK,
|
|
BlockDelimiterTypeData(match.group("subtype")),
|
|
None,
|
|
)
|
|
if len(self.headline_hierarchy) == 0:
|
|
self.delimiters.append(line)
|
|
else:
|
|
assert self.headline_hierarchy[-1] is not None
|
|
self.headline_hierarchy[-1]["delimiters"].append(line)
|
|
|
|
def add_property_drawer_line(self, linenum: int, line: str, match: re.Match):
|
|
if len(self.headline_hierarchy) == 0:
|
|
self.current_drawer = self.properties
|
|
self.structural.append((linenum, line))
|
|
else:
|
|
assert self.headline_hierarchy[-1] is not None
|
|
self.current_drawer = self.headline_hierarchy[-1]["properties"]
|
|
self.headline_hierarchy[-1]["structural"].append((linenum, line))
|
|
|
|
def add_results_drawer_line(self, linenum: int, line: str, match: re.Match):
|
|
assert self.headline_hierarchy[-1] is not None
|
|
self.current_drawer = self.headline_hierarchy[-1]["results"]
|
|
self.headline_hierarchy[-1]["structural"].append((linenum, line))
|
|
|
|
def add_logbook_drawer_line(self, linenum: int, line: str, match: re.Match):
|
|
assert self.headline_hierarchy[-1] is not None
|
|
self.current_drawer = self.headline_hierarchy[-1]["logbook"]
|
|
self.headline_hierarchy[-1]["structural"].append((linenum, line))
|
|
|
|
def add_drawer_end_line(self, linenum: int, line: str, match: re.Match):
|
|
self.current_drawer = None
|
|
if len(self.headline_hierarchy) == 0:
|
|
self.structural.append((linenum, line))
|
|
else:
|
|
assert self.headline_hierarchy[-1] is not None
|
|
self.headline_hierarchy[-1]["structural"].append((linenum, line))
|
|
|
|
def add_node_properties_line(self, linenum: int, match: re.Match):
|
|
key = match.group("key")
|
|
value = match.group("value").strip()
|
|
|
|
if as_time := parse_time(value):
|
|
value = as_time
|
|
|
|
if self.current_drawer is None: # Throw a better error on this case
|
|
raise Exception(
|
|
"Found properties before :PROPERTIES: line. Error on Org file?"
|
|
)
|
|
|
|
self.current_drawer.append(Property(linenum, match, key, value, None))
|
|
|
|
def read(self, s):
|
|
lines = s.split("\n")
|
|
line_count = len(lines)
|
|
reader = enumerate(lines)
|
|
in_drawer = False
|
|
in_block = False
|
|
list_item_indentation = None
|
|
list_item = None
|
|
|
|
def add_raw_line_with_possible_indentation(linenum, line):
|
|
added = False
|
|
nonlocal list_item
|
|
nonlocal list_item_indentation
|
|
if list_item:
|
|
if (line[: list_item.text_start_pos].strip() == "") or (
|
|
len(line.strip()) == 0
|
|
):
|
|
list_item.append_line(line)
|
|
added = True
|
|
else:
|
|
list_item = None
|
|
list_item_indentation = None
|
|
|
|
if not added:
|
|
self.add_raw_line(linenum, line)
|
|
|
|
for lnum, line in reader:
|
|
linenum = lnum + 1
|
|
try:
|
|
if in_block:
|
|
if m := END_BLOCK_RE.match(line):
|
|
self.add_end_block_line(linenum, m)
|
|
in_block = False
|
|
list_item_indentation = None
|
|
list_item = None
|
|
else:
|
|
add_raw_line_with_possible_indentation(linenum, line)
|
|
|
|
elif m := HEADLINE_RE.match(line):
|
|
list_item_indentation = None
|
|
list_item = None
|
|
self.add_headline(linenum, m)
|
|
elif m := LIST_ITEM_RE.match(line):
|
|
list_item = self.add_list_item_line(linenum, m)
|
|
list_item_indentation = m.group("indentation")
|
|
elif m := RAW_LINE_RE.match(line):
|
|
add_raw_line_with_possible_indentation(linenum, line)
|
|
# Org-babel
|
|
elif m := BEGIN_BLOCK_RE.match(line):
|
|
self.add_begin_block_line(linenum, m)
|
|
in_block = True
|
|
list_item_indentation = None
|
|
list_item = None
|
|
elif m := END_BLOCK_RE.match(line):
|
|
self.add_end_block_line(linenum, m)
|
|
in_block = False
|
|
list_item_indentation = None
|
|
list_item = None
|
|
# Generic properties
|
|
elif m := KEYWORDS_RE.match(line):
|
|
self.add_keyword_line(linenum, m)
|
|
elif m := DRAWER_END_RE.match(line):
|
|
self.add_drawer_end_line(linenum, line, m)
|
|
in_drawer = False
|
|
list_item_indentation = None
|
|
list_item = None
|
|
elif (not in_drawer) and (m := DRAWER_START_RE.match(line)):
|
|
self.add_property_drawer_line(linenum, line, m)
|
|
in_drawer = True
|
|
list_item_indentation = None
|
|
list_item = None
|
|
elif (not in_drawer) and (m := RESULTS_DRAWER_RE.match(line)):
|
|
self.add_results_drawer_line(linenum, line, m)
|
|
in_drawer = True
|
|
list_item_indentation = None
|
|
list_item = None
|
|
elif m := NODE_PROPERTIES_RE.match(line):
|
|
self.add_node_properties_line(linenum, m)
|
|
elif line.strip().startswith("|"):
|
|
self.add_table_line(linenum, line)
|
|
list_item_indentation = None
|
|
list_item = None
|
|
# Not captured
|
|
else:
|
|
add_raw_line_with_possible_indentation(linenum, line)
|
|
except:
|
|
logging.error("Error line {}: {}".format(linenum + 1, line))
|
|
raise
|
|
|
|
|
|
def loads(s, environment=BASE_ENVIRONMENT, extra_cautious=True):
|
|
reader = OrgDocReader(environment)
|
|
reader.read(s)
|
|
doc = reader.finalize()
|
|
if extra_cautious: # Check that all options can be properly re-serialized
|
|
after_dump = dumps(doc)
|
|
if after_dump != s:
|
|
diff = list(
|
|
difflib.Differ().compare(
|
|
s.splitlines(keepends=True), after_dump.splitlines(keepends=True)
|
|
)
|
|
)
|
|
|
|
context_start = None
|
|
context_last_line = None
|
|
for i, line in enumerate(diff):
|
|
if not line.startswith(" "):
|
|
if context_start is None:
|
|
context_start = i
|
|
context_last_line = i
|
|
elif context_start:
|
|
assert context_last_line is not None
|
|
if i > (context_last_line + DEBUG_DIFF_CONTEXT):
|
|
start = max(0, context_start - DEBUG_DIFF_CONTEXT)
|
|
end = min(len(diff), context_last_line + DEBUG_DIFF_CONTEXT)
|
|
print(
|
|
"## Lines {} to {}".format(start + 1, end + 1),
|
|
file=sys.stderr,
|
|
)
|
|
sys.stderr.writelines(diff[start:end])
|
|
context_start = None
|
|
context_last_line = None
|
|
# print("---\n" + after_dump + "\n---")
|
|
|
|
raise NonReproducibleDocument(
|
|
"Difference found between existing version and dumped"
|
|
)
|
|
return doc
|
|
|
|
|
|
def load(f, environment=BASE_ENVIRONMENT, extra_cautious=False):
|
|
doc = loads(f.read(), environment, extra_cautious)
|
|
doc._path = os.path.abspath(f.name)
|
|
return doc
|
|
|
|
|
|
def dumps(doc):
|
|
dump = list(doc.dump())
|
|
result = "\n".join(dump)
|
|
# print(result)
|
|
return result
|
|
|
|
|
|
def dump(doc, fp):
|
|
it = doc.dump()
|
|
|
|
# Write first line separately
|
|
line = next(it)
|
|
fp.write(line)
|
|
|
|
# Write following ones preceded by line jump
|
|
for line in it:
|
|
fp.write("\n" + line)
|