from __future__ import annotations import collections import difflib import logging import os import re import sys from datetime import date, datetime, timedelta from enum import Enum from typing import ( Dict, Iterator, List, Literal, Optional, TextIO, Tuple, TypedDict, Union, cast, ) from . import dom from .types import HeadlineDict DEBUG_DIFF_CONTEXT = 10 DEFAULT_TODO_KEYWORDS = ["TODO"] DEFAULT_DONE_KEYWORDS = ["DONE"] BASE_ENVIRONMENT = { "org-footnote-section": "Footnotes", "org-todo-keywords": " ".join(DEFAULT_TODO_KEYWORDS) + " | " + " ".join(DEFAULT_DONE_KEYWORDS), "org-options-keywords": ( "ARCHIVE:", "AUTHOR:", "BIND:", "CATEGORY:", "COLUMNS:", "CREATOR:", "DATE:", "DESCRIPTION:", "DRAWERS:", "EMAIL:", "EXCLUDE_TAGS:", "FILETAGS:", "INCLUDE:", "INDEX:", "KEYWORDS:", "LANGUAGE:", "MACRO:", "OPTIONS:", "PROPERTY:", "PRIORITIES:", "SELECT_TAGS:", "SEQ_TODO:", "SETUPFILE:", "STARTUP:", "TAGS:" "TITLE:", "TODO:", "TYP_TODO:", "SELECT_TAGS:", "EXCLUDE_TAGS:", ), } HEADLINE_TAGS_RE = re.compile(r"((:(\w|[0-9_@#%])+)+:)\s*$") HEADLINE_RE = re.compile(r"^(?P\*+)(?P\s+)(?P.*?)$") KEYWORDS_RE = re.compile( r"^(?P\s*)#\+(?P[^:\[]+)(\[(?P[^\]]*)\])?:(?P\s*)(?P.*)$" ) DRAWER_START_RE = re.compile(r"^(?P\s*):([^:]+):(?P\s*)$") DRAWER_END_RE = re.compile(r"^(?P\s*):END:(?P\s*)$", re.I) NODE_PROPERTIES_RE = re.compile( r"^(?P\s*):(?P[^ ()+:]+)(?P\+)?:(?P\s*)(?P.+)$" ) RAW_LINE_RE = re.compile(r"^\s*([^\s#:*|]|$)") BASE_TIME_STAMP_RE = r"(?P\d{4})-(?P\d{2})-(?P\d{2})( ?(?P[^ ]+))?( (?P\d{1,2}):(?P\d{1,2})(-+(?P\d{1,2}):(?P\d{1,2}))?)?(?P (?P(\+|\+\+|\.\+|-|--))(?P\d+)(?P[hdwmy]))?" CLEAN_TIME_STAMP_RE = r"\d{4}-\d{2}-\d{2}( ?([^ ]+))?( (\d{1,2}):(\d{1,2})(-+(\d{1,2}):(\d{1,2}))?)?( (\+|\+\+|\.\+|-|--)\d+[hdwmy])?" ACTIVE_TIME_STAMP_RE = re.compile(r"<{}>".format(BASE_TIME_STAMP_RE)) INACTIVE_TIME_STAMP_RE = re.compile(r"\[{}\]".format(BASE_TIME_STAMP_RE)) PLANNING_RE = re.compile( r"(?P\s*)" + r"(SCHEDULED:\s*(?P[<\[]" + CLEAN_TIME_STAMP_RE + r"[>\]](--[<\[]" + CLEAN_TIME_STAMP_RE + r"[>\]])?)\s*" + r"|CLOSED:\s*(?P[<\[]" + CLEAN_TIME_STAMP_RE + r"[>\]](--[<\[]" + CLEAN_TIME_STAMP_RE + r"[>\]])?)\s*" + r"|DEADLINE:\s*(?P[<\[]" + CLEAN_TIME_STAMP_RE + r"[>\]](--[<\[]" + CLEAN_TIME_STAMP_RE + r"[>\]])?)\s*" r")+\s*" ) LIST_ITEM_RE = re.compile( r"(?P\s*)((?P[*\-+])|((?P\d|[a-zA-Z])(?P[.)]))) ((?P\s*)\[(?P[ Xx])\])?((?P\s*)((?P.*?)\s::))?(?P.*)" ) IMPLICIT_LINK_RE = re.compile(r"(https?:[^<> ]*[a-zA-Z0-9])") # Org-Babel BEGIN_BLOCK_RE = re.compile(r"^\s*#\+BEGIN_(?P[^ ]+)(?P.*)$", re.I) END_BLOCK_RE = re.compile(r"^\s*#\+END_(?P[^ ]+)\s*$", re.I) RESULTS_DRAWER_RE = re.compile(r"^\s*:results:\s*$", re.I) CodeSnippet = collections.namedtuple( "CodeSnippet", ("name", "content", "result", "arguments") ) # Groupings NON_FINISHED_GROUPS = ( type(None), dom.ListGroupNode, dom.ResultsDrawerNode, dom.PropertyDrawerNode, ) FREE_GROUPS = (dom.CodeBlock,) # States class HeadlineState(TypedDict): # To be extended to handle keyboard shortcuts name: str class OrgDocDeclaredStates(TypedDict): not_completed: List[HeadlineState] completed: List[HeadlineState] class NonReproducibleDocument(Exception): """ Exception thrown when a document would be saved as different contents from what it's loaded from. """ pass def get_tokens(value): if isinstance(value, Text): return value.contents if isinstance(value, RawLine): return [value.line] if isinstance(value, list): return value raise Exception("Unknown how to get tokens from: {}".format(value)) class RangeInRaw: def __init__(self, content, start_token, end_token): self._content = content self._start_id = id(start_token) self._end_id = id(end_token) def update_range(self, new_contents): contents = self._content if isinstance(self._content, Text): contents = self._content.contents # Find start token for start_idx, tok in enumerate(contents): if id(tok) == self._start_id: break else: raise Exception("Start token not found") # Find end token for offset, tok in enumerate(contents[start_idx:]): if id(tok) == self._end_id: break else: raise Exception("End token not found") # Remove old contents for i in range(1, offset): contents.pop(start_idx + 1) # Add new ones for i, element in enumerate(new_contents): contents.insert(start_idx + i + 1, element) def unescape_block_lines(block: str) -> str: """ Remove leading ',' from block_lines if they escape `*` characters. """ i = 0 lines = block.split("\n") while i < len(lines): line = lines[i] if line.lstrip(" ").startswith(",") and line.lstrip(" ,").startswith("*"): # Remove leading ',' lead_pos = line.index(",") line = line[:lead_pos] + line[lead_pos + 1 :] lines[i] = line i += 1 return "\n".join(lines) def get_links_from_content(content): in_link = False in_description = False link_value: List[str] = [] link_description: List[str] = [] for i, tok in enumerate(get_tokens(content)): if isinstance(tok, LinkToken): if tok.tok_type == LinkTokenType.OPEN_LINK: in_link = True open_link_token = tok elif tok.tok_type == LinkTokenType.OPEN_DESCRIPTION: in_description = True elif tok.tok_type == LinkTokenType.CLOSE: rng = RangeInRaw(content, open_link_token, tok) yield Link( "".join(link_value), "".join(link_description) if in_description else None, rng, ) in_link = False in_description = False link_value = [] link_description = [] elif isinstance(tok, str) and in_link: if in_description: link_description.append(tok) else: link_value.append(tok) elif isinstance(tok, str): implicit_links = IMPLICIT_LINK_RE.findall(tok) for link in implicit_links: yield Link(cast(str, link), cast(str, link), None) def text_to_dom(tokens, item): if tokens is None: return None in_link = False in_description = False link_value: List[str] = [] link_description: List[str] = [] contents = [] for tok in tokens: if isinstance(tok, LinkToken): if tok.tok_type == LinkTokenType.OPEN_LINK: in_link = True open_link_token = tok elif tok.tok_type == LinkTokenType.OPEN_DESCRIPTION: in_description = True elif tok.tok_type == LinkTokenType.CLOSE: rng = RangeInRaw(item, open_link_token, tok) contents.append( Link( "".join(link_value), "".join(link_description) if in_description else None, rng, ) ) in_link = False in_description = False link_value = [] link_description = [] elif isinstance(tok, str) and in_link: if in_description: link_description.append(tok) else: link_value.append(tok) else: contents.append(tok) return contents def get_line(item): if isinstance(item, Text): return item.linenum elif isinstance(item, ListItem): return item.linenum elif isinstance(item, Property): return item.linenum elif isinstance(item, tuple): return item[0] else: raise Exception("Unknown item type: {}".format(item)) class Headline: def __init__( self, start_line, depth, orig, properties, keywords, priority_start, priority, title_start, title, state, tags_start, tags, contents, children, structural, delimiters, list_items, table_rows, parent, is_todo: bool, is_done: bool, spacing, scheduled: Optional[Time] = None, deadline: Optional[Time] = None, closed: Optional[Time] = None, ): self.start_line = start_line self.depth = depth self.orig = orig self.properties = properties self.keywords = keywords self.priority_start = priority_start self.priority = priority self.title_start = title_start self.title = parse_content_block([RawLine(linenum=start_line, line=title)]) self.state = state self.tags_start = tags_start self.shallow_tags = tags self.contents = contents self.children = children self.structural = structural self.delimiters = delimiters self.list_items = list_items self.table_rows = table_rows self.parent = parent self.is_todo = is_todo self.is_done = is_done self.scheduled = scheduled self.deadline = deadline self.closed = closed self.spacing = spacing # Read planning line planning_line = self.get_element_in_line(start_line + 1) # Ignore if not found or is a structural line if planning_line is None or isinstance(planning_line, tuple): return if m := PLANNING_RE.match(planning_line.get_raw()): self._planning_indendation = m.group("indentation") self._planning_order = [] keywords = ["SCHEDULED", "CLOSED", "DEADLINE"] plan = planning_line.get_raw().split("\n")[0] indexes = [(kw, plan.find(kw)) for kw in keywords] self._planning_order = [ kw for (kw, idx) in sorted( filter(lambda v: v[1] >= 0, indexes), key=lambda v: v[1] ) ] if scheduled_m := m.group("scheduled"): self.scheduled = parse_time(scheduled_m) if closed_m := m.group("closed"): self.closed = parse_time(closed_m) if deadline_m := m.group("deadline"): self.deadline = parse_time(deadline_m) # Remove from contents self._remove_element_in_line(start_line + 1) @property def doc(self): par = self.parent while isinstance(par, Headline): par = par.parent return par def as_dom(self): everything = ( self.keywords + self.contents + self.list_items + self.table_rows + self.properties + self.structural + self.delimiters ) tree: List[dom.DomNode] = [] current_node: Optional[dom.DomNode] = None indentation_tree: List[dom.ContainerDomNode] = [] contents: Optional[str] = None for line in sorted(everything, key=get_line): if isinstance(current_node, dom.CodeBlock): if ( isinstance(line, DelimiterLine) and line.delimiter_type == DelimiterLineType.END_BLOCK ): start = current_node.header.linenum end = line.linenum lines = self.get_lines_between(start + 1, end) contents = unescape_block_lines("\n".join(lines)) if contents.endswith("\n"): # This is not ideal, but to avoid having to do this maybe # the content parsing must be re-thinked contents = contents[:-1] current_node.set_lines(contents) tree.append(current_node) current_node = None else: pass # Ignore elif isinstance(line, Property): if type(current_node) in NON_FINISHED_GROUPS: current_node = dom.PropertyDrawerNode() tree.append(current_node) assert isinstance(current_node, dom.PropertyDrawerNode) current_node.append(dom.PropertyNode(line.key, line.value)) elif isinstance(line, Text): tree_up = list(indentation_tree) while len(tree_up) > 0: node: dom.DomNode = tree_up[-1] if isinstance(node, dom.BlockNode) or isinstance( node, dom.DrawerNode ): node.append(dom.Text(line)) current_node = node contents = None break elif (not isinstance(node, dom.TableNode)) and ( type(node) not in NON_FINISHED_GROUPS ): raise NotImplementedError( "Not implemented node type: {} (headline_id={}, line={}, doc={})".format( node, self.id, line.linenum, self.doc.path, ) ) else: tree_up.pop(-1) else: current_node = None contents = None tree.append(dom.Text(text_to_dom(line.contents, line))) indentation_tree = tree_up elif isinstance(line, ListItem): if ( current_node is None or isinstance(current_node, dom.TableNode) or isinstance(current_node, dom.BlockNode) or isinstance(current_node, dom.DrawerNode) ): was_node = current_node current_node = dom.ListGroupNode() if was_node is None: tree.append(current_node) else: was_node.append(current_node) indentation_tree.append(current_node) if not isinstance(current_node, dom.ListGroupNode): if not isinstance(current_node, dom.ListGroupNode): raise Exception( "Expected a {}, found: {} on line {} on {}".format( dom.ListGroupNode, current_node, line.linenum, self.doc.path, ) ) # This can happen. Frequently inside a LogDrawer if len(indentation_tree) > 0 and ( (len(indentation_tree[-1].children) > 0) and len( [ c for c in indentation_tree[-1].children if isinstance(c, dom.ListItem) ][-1].orig.indentation ) < len(line.indentation) ): sublist = dom.ListGroupNode() current_node.append(sublist) current_node = sublist indentation_tree.append(current_node) while len(indentation_tree) > 0: list_children = [ c for c in indentation_tree[-1].children if isinstance(c, dom.ListItem) ] if len(list_children) == 0: break if len(list_children[-1].orig.indentation) <= len(line.indentation): # No more breaking out of lists, it's indentation # is less than ours break rem = indentation_tree.pop(-1) if len(indentation_tree) == 0: indentation_tree.append(rem) current_node = rem break else: current_node = indentation_tree[-1] node = dom.ListItem( text_to_dom(line.tag, line), text_to_dom(line.content, line), orig=line, ) current_node.append(node) elif isinstance(line, TableRow): if current_node is None: current_node = dom.TableNode() tree.append(current_node) # TODO: Allow indentation of this element inside others indentation_tree = [current_node] elif not isinstance(current_node, dom.TableNode): if isinstance(current_node, dom.ListGroupNode): # As an item inside a list list_node = current_node current_node = dom.TableNode() list_node.append(current_node) indentation_tree.append(current_node) else: logging.debug( "Expected a {}, found: {} on line {}".format( dom.TableNode, current_node, line.linenum ) ) # This can happen. Frequently inside a LogDrawer if ( len(line.cells) > 0 and len(line.cells[0]) > 0 and line.cells[0][0] == "-" ): node = dom.TableSeparatorRow(orig=line) else: node = dom.TableRow(line.cells, orig=line) current_node = cast(dom.ContainerDomNode, current_node) current_node.append(node) elif ( isinstance(line, DelimiterLine) and line.delimiter_type == DelimiterLineType.BEGIN_BLOCK ): assert type(current_node) in NON_FINISHED_GROUPS current_node = dom.CodeBlock( line, line.type_data.subtype, line.arguments ) elif isinstance(line, Keyword): logging.warning("Keywords not implemented on `as_dom()`") # elif ( # isinstance(line, DelimiterLine) # and line.delimiter_type == DelimiterLineType.END_BLOCK # ): # assert isinstance(current_node, dom.BlockNode) # current_node = None elif ( isinstance(line, tuple) and len(line) == 2 and isinstance(line[0], int) and isinstance(line[1], str) ): # Structural (linenum, content) = line if content.strip().upper() == ":PROPERTIES:": assert current_node is None current_node = dom.PropertyDrawerNode() tree.append(current_node) # TODO: Check if this can be nested indentation_tree = [current_node] elif content.strip().upper() == ":LOGBOOK:": assert current_node is None current_node = dom.LogbookDrawerNode() tree.append(current_node) # TODO: Check if this can be nested indentation_tree = [current_node] elif content.strip().upper() == ":END:": if current_node is None and len(indentation_tree) == 0: logging.error("Finished node (:END:) with no known starter") else: tree_up = list(indentation_tree) while len(tree_up) > 0: node = tree_up[-1] if isinstance(node, dom.DrawerNode): indentation_tree = tree_up current_node = node tree_up.pop(-1) break else: tree_up.pop(-1) else: raise Exception( "Unexpected node ({}) on headline (id={}), line {}".format( current_node, self.id, linenum ) ) current_node = None elif content.strip().upper() == ":RESULTS:": assert current_node is None current_node = dom.ResultsDrawerNode() # TODO: Allow indentation of these blocks inside others indentation_tree = [current_node] tree.append(current_node) else: raise Exception("Unknown structural line: {}".format(line)) else: raise Exception("Unknown node type: {}".format(line)) return tree def get_lists(self): lists = [] last_line = None for li in self.list_items: if last_line is None: lists.append([li]) else: num_lines = li.linenum - (last_line + 1) lines_between = "".join( [ "\n" + l for l in self.get_lines_between(last_line + 1, li.linenum) ] ) # Only empty lines if (num_lines == lines_between.count("\n")) and ( len(lines_between.strip()) == 0 ): lists[-1].append(li) else: lists.append([li]) last_line = li.linenum + sum(c.count("\n") for c in li.content) return lists # @DEPRECATED: use `get_lists` def getLists(self): return self.get_lists() def get_tables(self): tables: List[List] = [] # TableRow[][] last_line = None for row in self.table_rows: if last_line == row.linenum - 1: tables[-1].append(row) else: tables.append([row]) last_line = row.linenum return tables def get_planning_line(self): if self.scheduled is None and self.closed is None and self.deadline is None: return None contents = [self._planning_indendation] for el in self._planning_order: if el == "SCHEDULED" and self.scheduled is not None: contents.append("SCHEDULED: {} ".format(self.scheduled.to_raw())) elif el == "CLOSED" and self.closed is not None: contents.append("CLOSED: {} ".format(self.closed.to_raw())) elif el == "DEADLINE" and self.deadline is not None: contents.append("DEADLINE: {} ".format(self.deadline.to_raw())) # Consider elements added (not present on planning order) if ("SCHEDULED" not in self._planning_order) and (self.scheduled is not None): contents.append("SCHEDULED: {} ".format(self.scheduled.to_raw())) if ("CLOSED" not in self._planning_order) and (self.closed is not None): contents.append("CLOSED: {} ".format(self.closed.to_raw())) if ("DEADLINE" not in self._planning_order) and (self.deadline is not None): contents.append("DEADLINE: {} ".format(self.deadline.to_raw())) return "".join(contents).rstrip() @property def id(self): return self.get_property("ID") @id.setter def id(self, value): self.set_property("ID", value) @property def clock(self): times = [] for chunk in self.contents: for line in chunk.get_raw().split("\n"): content = line.strip() if not content.startswith("CLOCK:"): continue time_seg = content[len("CLOCK:") :].strip() parsed: Optional[Time] = None if "--" in time_seg: # TODO: Consider duration start, end = time_seg.split("=")[0].split("--") as_time_range = parse_org_time_range(start, end) parsed = as_time_range else: parsed = OrgTime.parse(time_seg) if parsed is not None: times.append(parsed) return times @property def tags(self) -> list[str]: parent_tags = self.parent.tags if self.doc.environment.get('org-use-tag-inheritance'): accepted_tags = [] for tag in self.doc.environment.get('org-use-tag-inheritance'): if tag in parent_tags: accepted_tags.append(tag) parent_tags = accepted_tags elif self.doc.environment.get('org-tags-exclude-from-inheritance'): for tag in self.doc.environment.get('org-tags-exclude-from-inheritance'): if tag in parent_tags: parent_tags.remove(tag) return list(self.shallow_tags) + parent_tags def add_tag(self, tag: str): self.shallow_tags.append(tag) def get_property(self, name: str, default=None): for prop in self.properties: if prop.key == name: return prop.value return default def set_property(self, name: str, value: str): for prop in self.properties: # A matching property is found, update it if prop.key == name: prop.value = value return # No matching property found, add it else: if len(self.properties) > 0: last_prop = self.properties[-1] last_line = last_prop.linenum last_match = last_prop.match else: self.structural.append( ( -2, # Linenum ":PROPERTIES:", ) ) self.structural.append( ( 0, # Linenum ":END:", ) ) last_line = -1 last_match = None self.properties.append( Property( linenum=last_line, match=last_match, key=name, value=value, options=None, ) ) def get_links(self): for content in self.contents: yield from get_links_from_content(content) for lst in self.get_lists(): for item in lst: if item.tag: yield from get_links_from_content(item.tag) yield from get_links_from_content(item.content) def get_lines_between(self, start, end): for line in self.contents: if start <= line.linenum < end: yield "".join(line.get_raw()) def get_contents(self, format): if format == "raw": yield from map( lambda x: token_list_to_raw(x.contents), sorted(self.contents, key=lambda x: x.linenum), ) else: raise NotImplementedError() def get_element_in_line(self, linenum): for line in self.contents: if linenum == line.linenum: return line for s_lnum, struc in self.structural: if linenum == s_lnum: return ("structural", struc) def _remove_element_in_line(self, linenum): found = None for i, line in enumerate(self.contents): if linenum == line.linenum: found = i break assert found is not None el = self.contents[found] assert isinstance(el, Text) raw = el.get_raw() if "\n" not in raw: # Remove the element found self.contents.pop(found) else: # Remove the first line self.contents[found] = parse_content_block( [RawLine(self.contents[found].linenum + 1, raw.split("\n", 1)[1])] ) def get_structural_end_after(self, linenum): for s_lnum, struc in self.structural: if s_lnum > linenum and struc.strip().upper() == ":END:": return (s_lnum, struc) def get_code_snippets(self): inside_code = False sections = [] arguments = None for delimiter in self.delimiters: if ( delimiter.delimiter_type == DelimiterLineType.BEGIN_BLOCK and delimiter.type_data.subtype.lower() == "src" ): line_start = delimiter.linenum inside_code = True arguments = delimiter.arguments elif ( delimiter.delimiter_type == DelimiterLineType.END_BLOCK and delimiter.type_data.subtype.lower() == "src" ): inside_code = False start, end = line_start, delimiter.linenum lines = self.get_lines_between(start + 1, end) contents = unescape_block_lines("\n".join(lines)) if contents.endswith("\n"): # This is not ideal, but to avoid having to do this maybe # the content parsing must be re-thinked contents = contents[:-1] sections.append( { "line_first": start + 1, "line_last": end - 1, "content": contents, "arguments": arguments, } ) arguments = None line_start = None for kword in self.keywords: if kword.key.upper() == "RESULTS": for snippet in sections: if kword.linenum > snippet["line_last"]: result_first = self.get_element_in_line(kword.linenum + 1) if isinstance(result_first, Text): result = "\n".join(result_first.contents) snippet["result"] = result if result.strip().startswith(": "): # Split lines and remove ':' lines = result.split("\n") s_result = [] for line in lines: if ": " not in line: break s_result.append(line.lstrip(" ")[2:]) snippet["result"] = "\n".join(s_result) elif ( isinstance(result_first, tuple) and len(result_first) == 2 and result_first[0] == "structural" and result_first[1].strip().upper() == ":RESULTS:" ): (end_line, _) = self.get_structural_end_after( kword.linenum + 1 ) contents = "\n".join( self.get_lines_between(kword.linenum + 1, end_line) ) indentation = result_first[1].index(":") dedented = "\n".join( [line[indentation:] for line in contents.split("\n")] ) if dedented.endswith("\n"): dedented = dedented[:-1] snippet["result"] = dedented break results = [] for section in sections: name = None content = section["content"] code_result = section.get("result", None) arguments = section.get("arguments", None) results.append( CodeSnippet( name=name, content=content, result=code_result, arguments=arguments ) ) return results def create_headline_at_end(self) -> Headline: headline = Headline( start_line=1, depth=self.depth + 1, orig=None, properties=[], keywords=[], priority_start=None, priority=None, title_start=None, title="", state="", tags_start=None, tags=[], contents=[], children=[], structural=[], delimiters=[], list_items=[], table_rows=[], parent=self, is_todo=False, is_done=False, spacing=" ", ) self.children.append(headline) return headline RawLine = collections.namedtuple("RawLine", ("linenum", "line")) Keyword = collections.namedtuple( "Keyword", ("linenum", "match", "key", "value", "options") ) Property = collections.namedtuple( "Property", ("linenum", "match", "key", "value", "options") ) class ListItem: def __init__( self, linenum, match, indentation, bullet, counter, counter_sep, checkbox_indentation, checkbox_value, tag_indentation, tag, content, ): self.linenum = linenum self.match = match self.indentation = indentation self.bullet = bullet self.counter = counter self.counter_sep = counter_sep self.checkbox_indentation = checkbox_indentation self.checkbox_value = checkbox_value self.tag_indentation = tag_indentation self.tag = tag self.content = content @property def text_start_pos(self): return len(self.indentation) + 1 # Indentation + bullet def append_line(self, line): self.content += parse_content_block("\n" + line).contents TableRow = collections.namedtuple( "TableRow", ( "linenum", "indentation", "suffix", "last_cell_closed", "cells", ), ) # @TODO How are [YYYY-MM-DD HH:mm--HH:mm] and ([... HH:mm]--[... HH:mm]) differentiated ? # @TODO Consider recurrence annotations class Timestamp: def __init__( self, active: bool = True, year: Optional[int] = None, month: Optional[int] = None, day: Optional[int] = None, dow: Optional[str] = None, hour: Optional[int] = None, minute: Optional[int] = None, repetition: Optional[str] = None, datetime_: Optional[Union[date, datetime]] = None, ): """ Initializes a Timestamp instance. Args: active (bool): Whether the timestamp is active. year (Optional[int]): The year of the timestamp. month (Optional[int]): The month of the timestamp. day (Optional[int]): The day of the timestamp. dow (Optional[str]): The day of the week, if any. hour (Optional[int]): The hour of the timestamp, if any. minute (Optional[int]): The minute of the timestamp, if any. repetition (Optional[str]): The repetition pattern, if any. datetime_ (Optional[Union[date, datetime]]): A date or datetime object. Raises: ValueError: If neither datetime_ nor the combination of year, month, and day are provided. """ self.active = active if datetime_ is not None: self.from_datetime(datetime_) elif year is not None and month is not None and day is not None: self._year = year self._month = month self._day = day self.dow = dow self.hour = hour self.minute = minute else: raise ValueError( "Either datetime_ or year, month, and day must be provided." ) self.repetition = repetition def to_datetime(self) -> datetime: """ Converts the Timestamp to a datetime object. Returns: datetime: The corresponding datetime object. """ if self.hour is not None: return datetime( self.year, self.month, self.day, self.hour, self.minute or 0 ) else: return datetime(self.year, self.month, self.day, 0, 0) def from_datetime(self, dt: Union[datetime, date]) -> None: """ Updates the current Timestamp instance based on a datetime or date object. Args: dt (Union[datetime, date]): The datetime or date object to use for updating the instance. """ if isinstance(dt, datetime): self._year = dt.year self._month = dt.month self._day = dt.day self.hour = dt.hour self.minute = dt.minute elif isinstance(dt, date): self._year = dt.year self._month = dt.month self._day = dt.day self.hour = None self.minute = None else: raise TypeError("Expected datetime or date object") self.dow = None # Day of the week can be set to None def __add__(self, delta: timedelta) -> "Timestamp": """ Adds a timedelta to the Timestamp. Args: delta (timedelta): The time difference to add. Returns: Timestamp: The resulting Timestamp instance. """ as_dt = self.to_datetime() to_dt = as_dt + delta return Timestamp( self.active, year=to_dt.year, month=to_dt.month, day=to_dt.day, dow=None, hour=to_dt.hour if self.hour is not None or to_dt.hour != 0 else None, minute=( to_dt.minute if self.minute is not None or to_dt.minute != 0 else None ), repetition=self.repetition, ) def __eq__(self, other: object) -> bool: """ Checks if two Timestamp instances are equal. Args: other (object): The other object to compare with. Returns: bool: True if the instances are equal, False otherwise. """ if not isinstance(other, Timestamp): return False return ( self.active == other.active and self.year == other.year and self.month == other.month and self.day == other.day and self.dow == other.dow and self.hour == other.hour and self.minute == other.minute and self.repetition == other.repetition ) def __lt__(self, other: object) -> bool: """ Checks if the Timestamp is less than another Timestamp. Args: other (object): The other object to compare with. Returns: bool: True if this Timestamp is less than the other, False otherwise. """ if not isinstance(other, Timestamp): return False return self.to_datetime() < other.to_datetime() def __gt__(self, other: object) -> bool: """ Checks if the Timestamp is greater than another Timestamp. Args: other (object): The other object to compare with. Returns: bool: True if this Timestamp is greater than the other, False otherwise. """ if not isinstance(other, Timestamp): return False return self.to_datetime() > other.to_datetime() def __repr__(self) -> str: """ Returns a string representation of the Timestamp. Returns: str: The string representation of the Timestamp. """ return timestamp_to_string(self) @property def year(self) -> int: """Returns the year of the timestamp.""" return self._year @year.setter def year(self, value: int) -> None: """Sets the year of the timestamp and resets the day of the week.""" self._year = value self.dow = None @property def month(self) -> int: """Returns the month of the timestamp.""" return self._month @month.setter def month(self, value: int) -> None: """Sets the month of the timestamp and resets the day of the week.""" self._month = value self.dow = None @property def day(self) -> int: """Returns the day of the timestamp.""" return self._day @day.setter def day(self, value: int) -> None: """Sets the day of the timestamp and resets the day of the week.""" self._day = value self.dow = None class DelimiterLineType(Enum): BEGIN_BLOCK = 1 END_BLOCK = 2 BlockDelimiterTypeData = collections.namedtuple("BlockDelimiterTypeData", ("subtype")) DelimiterLine = collections.namedtuple( "DelimiterLine", ("linenum", "line", "delimiter_type", "type_data", "arguments") ) class MarkerType(Enum): NO_MODE = 0b0 BOLD_MODE = 0b1 CODE_MODE = 0b10 ITALIC_MODE = 0b100 STRIKE_MODE = 0b1000 UNDERLINED_MODE = 0b10000 VERBATIM_MODE = 0b100000 MARKERS = { "*": MarkerType.BOLD_MODE, "~": MarkerType.CODE_MODE, "/": MarkerType.ITALIC_MODE, "+": MarkerType.STRIKE_MODE, "_": MarkerType.UNDERLINED_MODE, "=": MarkerType.VERBATIM_MODE, } ModeToMarker = {} for tok, mode in MARKERS.items(): ModeToMarker[mode] = tok MarkerToken = collections.namedtuple("MarkerToken", ("closing", "tok_type")) LinkToken = collections.namedtuple("LinkToken", ("tok_type")) class LinkTokenType(Enum): OPEN_LINK = 3 OPEN_DESCRIPTION = 5 CLOSE = 4 BEGIN_PROPERTIES = "OPEN_PROPERTIES" END_PROPERTIES = "CLOSE_PROPERTIES" def token_from_type(tok_type): return ModeToMarker[tok_type] class TimeRange: """Represents a range of time with a start and end time. Attributes: start_time (OrgTime): The start time of the range. end_time (OrgTime): The end time of the range. """ def __init__(self, start_time: OrgTime, end_time: OrgTime) -> None: """Initializes a TimeRange with a start time and an end time. Args: start_time (OrgTime): The start time of the range. end_time (OrgTime): The end time of the range. Raises: AssertionError: If start_time or end_time is None. """ if start_time is None or end_time is None: raise ValueError("start_time and end_time must not be None.") self.start_time = start_time self.end_time = end_time def to_raw(self) -> str: """Converts the TimeRange to its raw string representation. Returns: str: The raw string representation of the TimeRange. """ return timerange_to_string(self) @property def duration(self) -> timedelta: """Calculates the duration of the TimeRange. Returns: timedelta: The duration between start_time and end_time. """ delta = self.end - self.start return delta @property def start(self) -> datetime: """Gets the start time as a datetime object. Returns: datetime: The start time of the TimeRange. """ return self.start_time.time.to_datetime() @property def end(self) -> datetime: """Gets the end time as a datetime object. Returns: datetime: The end time of the TimeRange. """ return self.end_time.time.to_datetime() def activate(self) -> None: """ Sets the active state for the times. """ self.start_time.active = True self.end_time.active = True def deactivate(self) -> None: """ Sets the inactive state for the times. """ self.start_time.active = False self.end_time.active = False class OrgTime: """Represents a point in time with optional end time and repetition. Attributes: time (Timestamp): The start time of the OrgTime instance. end_time (Optional[Timestamp]): The end time of the OrgTime instance, if any. """ def __init__(self, ts: Timestamp, end_time: Optional[Timestamp] = None) -> None: """Initializes an OrgTime with a start time and an optional end time. Args: ts (Timestamp): The start time of the OrgTime instance. end_time (Optional[Timestamp], optional): The end time of the OrgTime instance. Defaults to None. Raises: ValueError: If ts is None. """ if ts is None: raise ValueError("Timestamp (ts) must not be None.") self.time = ts self.end_time = end_time @property def repetition(self) -> Optional[str]: """Gets the repetition information from the start time. Returns: Optional[str]: The repetition information, or None if not present. """ return self.time.repetition @property def duration(self) -> timedelta: """Calculates the duration between the start and end times. Returns: timedelta: The duration between the start and end times. If no end time is present, returns zero timedelta. """ if self.end_time is None: return timedelta() # No duration return self.end_time.to_datetime() - self.time.to_datetime() def to_raw(self) -> str: """Converts the OrgTime to its raw string representation. Returns: str: The raw string representation of the OrgTime. """ return timestamp_to_string(self.time, self.end_time) def __repr__(self) -> str: """Provides a string representation of the OrgTime instance. Returns: str: The string representation of the OrgTime. """ return f"OrgTime({self.to_raw()})" @classmethod def parse(cls, value: str) -> Optional["OrgTime"]: """Parses a string into an OrgTime object. Args: value (str): The string representation of the OrgTime. Returns: Optional[OrgTime]: The parsed OrgTime instance, or None if parsing fails. """ if m := ACTIVE_TIME_STAMP_RE.match(value): active = True elif m := INACTIVE_TIME_STAMP_RE.match(value): active = False else: return None repetition = None if m.group("repetition"): repetition = m.group("repetition").strip() if m.group("end_hour"): return cls( Timestamp( active, int(m.group("year")), int(m.group("month")), int(m.group("day")), m.group("dow"), int(m.group("start_hour")), int(m.group("start_minute")), repetition=repetition, ), Timestamp( active, int(m.group("year")), int(m.group("month")), int(m.group("day")), m.group("dow"), int(m.group("end_hour")), int(m.group("end_minute")), ), ) return cls( Timestamp( active, int(m.group("year")), int(m.group("month")), int(m.group("day")), m.group("dow"), int(m.group("start_hour")) if m.group("start_hour") else None, int(m.group("start_minute")) if m.group("start_minute") else None, repetition=repetition, ) ) @property def active(self) -> bool: """ Checks if the time is set as active. """ return self.time.active @active.setter def active(self, value: bool) -> None: """ Sets the active state for the timestamp. """ self.time.active = value def activate(self) -> None: """ Sets the active state for the timestamp. """ self.active = True def deactivate(self) -> None: """ Sets the inactive state for the timestamp. """ self.active = False def from_datetime(self, dt: datetime) -> None: """ Updates the timestamp to use the given datetime. Args: dt (datetime): The datetime to update the timestamp with. """ self.time.from_datetime(dt) if self.end_time: self.end_time.from_datetime(dt) def time_from_str(s: str) -> Optional[OrgTime]: return OrgTime.parse(s) def timerange_to_string(tr: TimeRange): return tr.start_time.to_raw() + "--" + tr.end_time.to_raw() def timestamp_to_string(ts: Timestamp, end_time: Optional[Timestamp] = None) -> str: date = "{year}-{month:02d}-{day:02d}".format( year=ts.year, month=ts.month, day=ts.day ) if ts.dow: date = date + " " + ts.dow if ts.hour is not None: base = "{date} {hour:02}:{minute:02d}".format( date=date, hour=ts.hour, minute=ts.minute or 0 ) else: base = date if end_time is not None: assert end_time.hour is not None assert end_time.minute is not None base = "{base}-{hour:02}:{minute:02d}".format( base=base, hour=end_time.hour, minute=end_time.minute ) if ts.repetition is not None: base = base + " " + ts.repetition if ts.active: return "<{}>".format(base) else: return "[{}]".format(base) Time = Union[TimeRange, OrgTime] def parse_time(value: str) -> Optional[Time]: if (value.count(">--<") == 1) or (value.count("]--[") == 1): # Time ranges with two different dates # @TODO properly consider "=> DURATION" section start, end = value.split("=")[0].split("--") as_time_range = parse_org_time_range(start, end) if as_time_range is None: return None if (as_time_range.start_time is not None) and ( as_time_range.end_time is not None ): return as_time_range else: raise Exception("Unknown time range format: {}".format(value)) elif as_time := OrgTime.parse(value): return as_time else: return None def parse_org_time_range(start, end) -> Optional[TimeRange]: start_time = OrgTime.parse(start) end_time = OrgTime.parse(end) if start_time is None or end_time is None: return None return TimeRange(start_time, end_time) def get_raw(doc): if isinstance(doc, str): return doc else: return doc.get_raw() class Line: def __init__(self, linenum, contents): self.linenum = linenum self.contents = contents def get_raw(self): rawchunks = [] for chunk in self.contents: if isinstance(chunk, str): rawchunks.append(chunk) else: rawchunks.append(chunk.get_raw()) return "".join(rawchunks) + "\n" class Link: def __init__( self, value: str, description: Optional[str], origin: Optional[RangeInRaw] ): self._value = value self._description = description self._origin = origin def get_raw(self): if self.description: return "[[{}][{}]]".format(self.value, self.description) else: return "[[{}]]".format(self.value) def _update_content(self): new_contents: List[Union[str, LinkToken]] = [] new_contents.append(self._value) if self._description: new_contents.append(LinkToken(LinkTokenType.OPEN_DESCRIPTION)) new_contents.append(self._description) if self._origin is not None: self._origin.update_range(new_contents) @property def value(self): return self._value @value.setter def value(self, new_value): self._value = new_value self._update_content() @property def description(self): return self._description @description.setter def description(self, new_description): self._description = new_description self._update_content() class Text: def __init__(self, contents, line): self.contents = contents self.linenum = line def __repr__(self): return "{{Text line: {}; content: {} }}".format(self.linenum, self.contents) def get_text(self) -> str: return token_list_to_plaintext(self.contents) def get_raw(self): return token_list_to_raw(self.contents) def token_list_to_plaintext(tok_list) -> str: contents = [] in_link = False in_description = False link_description = [] link_url = [] for chunk in tok_list: if isinstance(chunk, str): if not in_link: contents.append(chunk) elif in_description: link_description.append(chunk) else: link_url.append(chunk) elif isinstance(chunk, LinkToken): if chunk.tok_type == LinkTokenType.OPEN_LINK: in_link = True elif chunk.tok_type == LinkTokenType.OPEN_DESCRIPTION: in_description = True else: assert chunk.tok_type == LinkTokenType.CLOSE if not in_description: # This might happen when link doesn't have a separate description link_description = link_url contents.append("".join(link_description)) in_link = False in_description = False link_description = [] link_url = [] else: assert isinstance(chunk, MarkerToken) return "".join(contents) def token_list_to_raw(tok_list): contents = [] for chunk in tok_list: if isinstance(chunk, str): contents.append(chunk) elif isinstance(chunk, LinkToken): if chunk.tok_type == LinkTokenType.OPEN_LINK: contents.append("[[") elif chunk.tok_type == LinkTokenType.OPEN_DESCRIPTION: contents.append("][") else: assert chunk.tok_type == LinkTokenType.CLOSE contents.append("]]") else: assert isinstance(chunk, MarkerToken) contents.append(token_from_type(chunk.tok_type)) return "".join(contents) class Bold: Marker = "*" def __init__(self, contents, line): self.contents = contents def get_raw(self): raw = "".join(map(get_raw, self.contents)) return f"{self.Marker}{raw}{self.Marker}" class Code: Marker = "~" def __init__(self, contents, line): self.contents = contents def get_raw(self): raw = "".join(map(get_raw, self.contents)) return f"{self.Marker}{raw}{self.Marker}" class Italic: Marker = "/" def __init__(self, contents, line): self.contents = contents def get_raw(self): raw = "".join(map(get_raw, self.contents)) return f"{self.Marker}{raw}{self.Marker}" class Strike: Marker = "+" def __init__(self, contents, line): self.contents = contents def get_raw(self): raw = "".join(map(get_raw, self.contents)) return f"{self.Marker}{raw}{self.Marker}" class Underlined: Marker = "_" def __init__(self, contents, line): self.contents = contents def get_raw(self): raw = "".join(map(get_raw, self.contents)) return f"{self.Marker}{raw}{self.Marker}" class Verbatim: Marker = "=" def __init__(self, contents, line): self.contents = contents def get_raw(self): raw = "".join(map(get_raw, self.contents)) return f"{self.Marker}{raw}{self.Marker}" def is_pre(char: Optional[str]) -> bool: if isinstance(char, str): return char in "\n\r\t -({'\"" else: return True def is_marker(char: str) -> bool: if isinstance(char, str): return char in "*=/+_~" else: return False def is_border(char: str) -> bool: if isinstance(char, str): return char not in "\n\r\t " else: return False def is_body(char: str) -> bool: if isinstance(char, str): return True else: return False def is_post(char: str) -> bool: if isinstance(char, str): return char in "-.,;:!?')}[\"" else: return False TOKEN_TYPE_TEXT = 0 TOKEN_TYPE_OPEN_MARKER = 1 TOKEN_TYPE_CLOSE_MARKER = 2 TOKEN_TYPE_OPEN_LINK = 3 TOKEN_TYPE_CLOSE_LINK = 4 TOKEN_TYPE_OPEN_DESCRIPTION = 5 TokenItems = Union[Tuple[int, Union[None, str, MarkerToken]],] def tokenize_contents(contents: str) -> List[TokenItems]: tokens: List[TokenItems] = [] last_char = None text: List[str] = [] closes = set() in_link = False in_link_description = False last_link_start = 0 def cut_string(): nonlocal text nonlocal tokens if len(text) > 0: tokens.append((TOKEN_TYPE_TEXT, "".join(text))) text = [] cursor = enumerate(contents) for i, char in cursor: has_changed = False # Possible link opening if char == "[": if ( len(contents) > i + 3 # At least 3 characters more to open and close a link and contents[i + 1] == "[" # TODO: Generalize this to a backtracking, don't just fix the test case... and contents[i + 2] != "[" ): close = contents.find("]]", i) if close != -1: # Link with no description cut_string() in_link = True tokens.append((TOKEN_TYPE_OPEN_LINK, None)) assert "[" == (next(cursor)[1]) last_link_start = i continue if close != -1 and contents[close + 1] == "[": # Link with description? close = contents.find("]", close + 1) if close != -1 and contents[close + 1] == "]": # No match here means this is not an Org link cut_string() in_link = True tokens.append((TOKEN_TYPE_OPEN_LINK, None)) assert "[" == (next(cursor)[1]) last_link_start = i continue # Possible link close or open of description if ( char == "]" and len(contents) > i + 1 and in_link and contents[i + 1] in "][" ): if contents[i + 1] == "]": cut_string() tokens.append((TOKEN_TYPE_CLOSE_LINK, None)) assert "]" == (next(cursor)[1]) in_link = False in_link_description = False continue elif contents[i + 1] == "[": cut_string() tokens.append((TOKEN_TYPE_OPEN_DESCRIPTION, None)) assert "[" == (next(cursor)[1]) continue if in_link and not in_link_description: # Link's pointer have no formatting pass elif ( (i not in closes) and is_marker(char) and is_pre(last_char) and ((i + 1 < len(contents)) and is_border(contents[i + 1])) ): is_valid_mark = False # Check that is closed later text_in_line = True for j in range(i, len(contents) - 1): if contents[j] == "\n": if not text_in_line: break text_in_line = False elif is_border(contents[j]) and contents[j + 1] == char: is_valid_mark = True closes.add(j + 1) break else: text_in_line |= is_body(contents[j]) if is_valid_mark: cut_string() tokens.append((TOKEN_TYPE_OPEN_MARKER, char)) has_changed = True elif i in closes: cut_string() tokens.append((TOKEN_TYPE_CLOSE_MARKER, char)) has_changed = True closes.remove(i) if not has_changed: text.append(char) last_char = char if len(text) > 0: tokens.append((TOKEN_TYPE_TEXT, "".join(text))) return tokens def parse_contents(raw_contents: List[RawLine]): if len(raw_contents) == 0: return [] blocks = [] current_block: List[RawLine] = [] for line in raw_contents: if len(current_block) == 0: # Seed the first block current_line = line.linenum current_block.append(line) else: current_line = cast(int, current_line) if line.linenum == current_line + 1: # Continue with the current block current_line = line.linenum current_block.append(line) else: # Split the blocks blocks.append(current_block) current_line = line.linenum current_block = [line] # Check that the current block is not left behind if len(current_block) > 0: blocks.append(current_block) return [parse_content_block(block) for block in blocks] def parse_content_block(raw_contents: Union[List[RawLine], str]) -> Text: contents_buff = [] if isinstance(raw_contents, str): contents_buff.append(raw_contents) else: for line in raw_contents: contents_buff.append(line.line) contents_buff_text = "\n".join(contents_buff) tokens = tokenize_contents(contents_buff_text) if isinstance(raw_contents, str): current_line = None else: current_line = raw_contents[0].linenum contents: List[Union[str, MarkerToken, LinkToken]] = [] # Use tokens to tag chunks of text with it's container type for tok_type, tok_val in tokens: if tok_type == TOKEN_TYPE_TEXT: assert isinstance(tok_val, str) contents.append(tok_val) elif tok_type == TOKEN_TYPE_OPEN_MARKER: assert isinstance(tok_val, str) contents.append(MarkerToken(False, MARKERS[tok_val])) elif tok_type == TOKEN_TYPE_CLOSE_MARKER: assert isinstance(tok_val, str) contents.append(MarkerToken(True, MARKERS[tok_val])) elif tok_type == TOKEN_TYPE_OPEN_LINK: contents.append(LinkToken(LinkTokenType.OPEN_LINK)) elif tok_type == TOKEN_TYPE_OPEN_DESCRIPTION: contents.append(LinkToken(LinkTokenType.OPEN_DESCRIPTION)) elif tok_type == TOKEN_TYPE_CLOSE_LINK: contents.append(LinkToken(LinkTokenType.CLOSE)) return Text(contents, current_line) def dump_contents(raw): if isinstance(raw, RawLine): return (raw.linenum, raw.line) elif isinstance(raw, ListItem): bullet = raw.bullet if raw.bullet else raw.counter + raw.counter_sep content_full = token_list_to_raw(raw.content) content_lines = content_full.split("\n") content = "\n".join(content_lines) checkbox = f"[{raw.checkbox_value}]" if raw.checkbox_value else "" tag = ( f"{raw.tag_indentation}{token_list_to_raw(raw.tag or '')} ::" if raw.tag or raw.tag_indentation else "" ) return ( raw.linenum, f"{raw.indentation}{bullet} {checkbox}{tag}{content}", ) elif isinstance(raw, TableRow): closed = "|" if raw.last_cell_closed else "" return ( raw.linenum, f"{' ' * raw.indentation}|{'|'.join(raw.cells)}{closed}{raw.suffix}", ) return (raw.linenum, raw.get_raw()) def parse_headline(hl, doc, parent) -> Headline: stars = hl["orig"].group("stars") depth = len(stars) spacing = hl["orig"].group("spacing") # TODO: Parse line for priority, cookies and tags line = hl["orig"].group("line") hl_tags = HEADLINE_TAGS_RE.search(line) if hl_tags is None: tags = [] else: tags = hl_tags.group(0)[1:-1].split(":") line = HEADLINE_TAGS_RE.sub("", line) hl_state = None title = line is_done = is_todo = False for state in doc.todo_keywords or []: if title.startswith(state["name"] + " "): hl_state = state title = title[len(state["name"] + " ") :] is_todo = True break else: for state in doc.done_keywords or []: if title.startswith(state["name"] + " "): hl_state = state title = title[len(state["name"] + " ") :] is_done = True break contents = parse_contents(hl["contents"]) if not (isinstance(parent, OrgDoc) or depth > parent.depth): raise AssertionError( "Incorrectly parsed parent on `{}' > `{}'".format(parent.title, title) ) headline = Headline( start_line=hl["linenum"], depth=depth, orig=hl["orig"], title=title, state=hl_state, contents=contents, children=None, keywords=hl["keywords"], properties=hl["properties"], structural=hl["structural"], delimiters=hl["delimiters"], list_items=hl["list_items"], table_rows=hl["table_rows"], title_start=None, priority=None, priority_start=None, tags_start=None, tags=tags, parent=parent, is_todo=is_todo, is_done=is_done, spacing=spacing, ) headline.children = [ parse_headline(child, doc, headline) for child in hl["children"] ] return headline def dump_kw(kw): options = kw.match.group("options") if not options: options = "" return ( kw.linenum, "{indentation}#+{key}{options}:{spacing}{value}".format( indentation=kw.match.group("indentation"), key=kw.key, options=kw.options, spacing=kw.match.group("spacing"), value=kw.value, ), ) def dump_property(prop: Property): plus = "" indentation = "" spacing = " " if prop.match is not None: plus = prop.match.group("plus") if plus is None: plus = "" indentation = prop.match.group("indentation") spacing = prop.match.group("spacing") if isinstance(prop.value, TimeRange): value = timerange_to_string(prop.value) elif isinstance(prop.value, OrgTime): value = prop.value.to_raw() else: value = prop.value return ( prop.linenum, "{indentation}:{key}{plus}:{spacing}{value}".format( indentation=indentation, key=prop.key, plus=plus, spacing=spacing, value=value, ), ) def dump_structural(structural: Tuple): return (structural[0], structural[1]) def dump_delimiters(line: DelimiterLine): return (line.linenum, line.line) def parse_todo_done_keywords(line: str) -> OrgDocDeclaredStates: clean_line = re.sub(r"\([^)]+\)", "", line) if "|" in clean_line: todo_kws, done_kws = clean_line.split("|", 1) has_split = True else: # Standard behavior in this case is: the last state is the one considered as DONE todo_kws = clean_line todo_keywords = re.sub(r"\s{2,}", " ", todo_kws.strip()).split() if has_split: done_keywords = re.sub(r"\s{2,}", " ", done_kws.strip()).split() else: done_keywods = [todo_keywords[-1]] todo_keywords = todo_keywords[:-1] return { "not_completed": [HeadlineState(name=keyword) for keyword in todo_keywords], "completed": [HeadlineState(name=keyword) for keyword in done_keywords], } class OrgDoc: def __init__( self, headlines, keywords, contents, list_items, structural, properties, environment=BASE_ENVIRONMENT, ): self.todo_keywords = [HeadlineState(name=kw) for kw in DEFAULT_TODO_KEYWORDS] self.done_keywords = [HeadlineState(name=kw) for kw in DEFAULT_DONE_KEYWORDS] self.environment = environment keywords_set_in_file = False for keyword in keywords: if keyword.key in ("TODO", "SEQ_TODO"): states = parse_todo_done_keywords(keyword.value) self.todo_keywords, self.done_keywords = ( states["not_completed"], states["completed"], ) keywords_set_in_file = True if not keywords_set_in_file and "org-todo-keywords" in environment: # Read keywords from environment states = parse_todo_done_keywords(environment["org-todo-keywords"]) self.todo_keywords, self.done_keywords = ( states["not_completed"], states["completed"], ) self.keywords: List[Property] = keywords self.contents: List[RawLine] = contents self.list_items: List[ListItem] = list_items self.structural: List = structural self.properties: List = properties self._path = None self.headlines: List[Headline] = list( map(lambda hl: parse_headline(hl, self, self), headlines) ) @property def id(self): """ Created by org-roam v2. """ for p in self.properties: if p.key == "ID": return p.value return None @property def path(self): return self._path @property def tags(self) -> list[str]: for kw in self.keywords: if kw.key == "FILETAGS": return kw.value.strip(':').split(':') return [] @property def shallow_tags(self) -> list[str]: return self.tags ## Querying def get_links(self): for headline in self.headlines: yield from headline.get_links() for content in self.contents: yield from get_links_from_content(content) def get_keywords(self, name: str, default=None): for prop in self.keywords: if prop.key == name: return prop.value return default def get_property(self, name: str, default=None): for prop in self.properties: if prop.key == name: return prop.value return default def getProperties(self): return self.keywords def getTopHeadlines(self): return self.headlines def getAllHeadlines(self) -> Iterator[Headline]: todo = self.headlines[::-1] # We go backwards, to pop/append and go depth-first while len(todo) != 0: hl = todo.pop() todo.extend(hl.children[::-1]) yield hl def get_code_snippets(self): for headline in self.headlines: yield from headline.get_code_snippets() # Writing def dump_headline(self, headline, recursive=True): tags = "" if len(headline.shallow_tags) > 0: tags = ":" + ":".join(headline.shallow_tags) + ":" state = "" if headline.state: state = headline.state["name"] + " " raw_title = token_list_to_raw(headline.title.contents) tags_padding = "" if not (raw_title.endswith(" ") or raw_title.endswith("\t")) and tags: tags_padding = " " yield "*" * headline.depth + headline.spacing + state + raw_title + tags_padding + tags planning = headline.get_planning_line() if planning is not None: yield planning lines = [] KW_T = 0 CONTENT_T = 1 PROPERTIES_T = 2 STRUCTURAL_T = 3 for keyword in headline.keywords: lines.append((KW_T, dump_kw(keyword))) for content in headline.contents: lines.append((CONTENT_T, dump_contents(content))) for li in headline.list_items: lines.append((CONTENT_T, dump_contents(li))) for row in headline.table_rows: lines.append((CONTENT_T, dump_contents(row))) for prop in headline.properties: lines.append((PROPERTIES_T, dump_property(prop))) for struct in headline.structural: lines.append((STRUCTURAL_T, dump_structural(struct))) for content in headline.delimiters: lines.append((STRUCTURAL_T, dump_delimiters(content))) lines = sorted(lines, key=lambda x: x[1][0]) structured_lines = [] last_type = None for i, line in enumerate(lines): ltype = line[0] content = line[1][1] content = content + "\n" last_type = ltype structured_lines.append(content) if last_type == PROPERTIES_T: # No structural closing indentation = 0 if len(lines) > 0: last_line = lines[i - 1][1][1] indentation = last_line.index(":") structured_lines.append(" " * indentation + ":END:\n") logging.warning( "Added structural:{}: {}".format( line[1][0], structured_lines[-1].strip() ) ) if len(structured_lines) > 0: content = "".join(structured_lines) # Remove the last line jump, which will be accounted for by the "yield operation" assert content.endswith("\n") content = content[:-1] yield content if recursive: for child in headline.children: yield from self.dump_headline(child, recursive=recursive) def dump(self): lines = [] for prop in self.properties: lines.append(dump_property(prop)) for struct in self.structural: lines.append(dump_structural(struct)) for kw in self.keywords: lines.append(dump_kw(kw)) for line in self.contents: lines.append(dump_contents(line)) for li in self.list_items: lines.append(dump_contents(li)) yield from map(lambda x: x[1], sorted(lines, key=lambda x: x[0])) for headline in self.headlines: yield from self.dump_headline(headline) class OrgDocReader: def __init__(self, environment=BASE_ENVIRONMENT): self.headlines: List[HeadlineDict] = [] self.keywords: List[Keyword] = [] self.headline_hierarchy: List[Optional[HeadlineDict]] = [] self.contents: List[RawLine] = [] self.delimiters: List[DelimiterLine] = [] self.list_items: List[ListItem] = [] self.table_rows: List[TableRow] = [] self.structural: List = [] self.properties: List = [] self.current_drawer: Optional[List] = None self.environment = environment def finalize(self) -> OrgDoc: return OrgDoc( self.headlines, self.keywords, self.contents, self.list_items, self.structural, self.properties, self.environment, ) ## Construction def add_headline(self, linenum: int, match: re.Match): # Position reader on the proper headline stars = match.group("stars") depth = len(stars) headline: HeadlineDict = { "linenum": linenum, "orig": match, "title": match.group("line"), "contents": [], "children": [], "keywords": [], "properties": [], "logbook": [], "structural": [], "delimiters": [], "results": [], # TODO: Move to each specific code block? "list_items": [], "table_rows": [], } while (depth - 1) > len(self.headline_hierarchy): # Introduce structural headlines self.headline_hierarchy.append(None) while depth <= len(self.headline_hierarchy): self.headline_hierarchy.pop() if depth == 1: self.headlines.append(headline) else: parent_idx = len(self.headline_hierarchy) - 1 while self.headline_hierarchy[parent_idx] is None: parent_idx -= 1 parent_headline = self.headline_hierarchy[parent_idx] assert parent_headline is not None parent_headline["children"].append(headline) self.headline_hierarchy.append(headline) if all([hl is not None for hl in self.headline_hierarchy]): if not ( [ len(cast(HeadlineDict, hl)["orig"].group("stars")) for hl in self.headline_hierarchy ] == list(range(1, len(self.headline_hierarchy) + 1)) ): raise AssertionError("Error on Headline Hierarchy") else: # This might happen if headlines with more that 1 level deeper are found pass # We can safely assert this as all the `None`s are there to # support the addition of a `HeadlineDict` at the correct # depth but not more assert self.headline_hierarchy[-1] is not None def add_list_item_line(self, linenum: int, match: re.Match) -> ListItem: li = ListItem( linenum=linenum, match=match, indentation=match.group("indentation"), bullet=match.group("bullet"), counter=match.group("counter"), counter_sep=match.group("counter_sep"), checkbox_indentation=match.group("checkbox_indentation"), checkbox_value=match.group("checkbox_value"), tag_indentation=match.group("tag_indentation"), tag=( parse_content_block( [RawLine(linenum=linenum, line=match.group("tag"))] ).contents if match.group("tag") else None ), content=parse_content_block( [RawLine(linenum=linenum, line=match.group("content"))] ).contents, ) if len(self.headline_hierarchy) == 0: self.list_items.append(li) else: assert self.headline_hierarchy[-1] is not None self.headline_hierarchy[-1]["list_items"].append(li) return li def add_table_line(self, linenum: int, line: str): chunks = line.split("|") indentation = len(chunks[0]) if chunks[-1].strip() == "": suffix = chunks[-1] cells = chunks[1:-1] last_cell_closed = True else: suffix = "" cells = chunks[1:] last_cell_closed = False row = TableRow( linenum, indentation, suffix, last_cell_closed, cells, ) if len(self.headline_hierarchy) == 0: self.table_rows.append(row) else: assert self.headline_hierarchy[-1] is not None self.headline_hierarchy[-1]["table_rows"].append(row) def add_keyword_line(self, linenum: int, match: re.Match): options = match.group("options") kw = Keyword( linenum, match, match.group("key"), match.group("value"), options if options is not None else "", ) if len(self.headline_hierarchy) == 0: self.keywords.append(kw) else: assert self.headline_hierarchy[-1] is not None self.headline_hierarchy[-1]["keywords"].append(kw) def add_raw_line(self, linenum: int, line: str): raw = RawLine(linenum, line) if len(self.headline_hierarchy) == 0: self.contents.append(raw) else: assert self.headline_hierarchy[-1] is not None self.headline_hierarchy[-1]["contents"].append(raw) def add_begin_block_line(self, linenum: int, match: re.Match): line = DelimiterLine( linenum, match.group(0), DelimiterLineType.BEGIN_BLOCK, BlockDelimiterTypeData(match.group("subtype")), match.group("arguments"), ) if len(self.headline_hierarchy) == 0: self.delimiters.append(line) else: assert self.headline_hierarchy[-1] is not None self.headline_hierarchy[-1]["delimiters"].append(line) def add_end_block_line(self, linenum: int, match: re.Match): line = DelimiterLine( linenum, match.group(0), DelimiterLineType.END_BLOCK, BlockDelimiterTypeData(match.group("subtype")), None, ) if len(self.headline_hierarchy) == 0: self.delimiters.append(line) else: assert self.headline_hierarchy[-1] is not None self.headline_hierarchy[-1]["delimiters"].append(line) def add_property_drawer_line(self, linenum: int, line: str, match: re.Match): if len(self.headline_hierarchy) == 0: self.current_drawer = self.properties self.structural.append((linenum, line)) else: assert self.headline_hierarchy[-1] is not None self.current_drawer = self.headline_hierarchy[-1]["properties"] self.headline_hierarchy[-1]["structural"].append((linenum, line)) def add_results_drawer_line(self, linenum: int, line: str, match: re.Match): assert self.headline_hierarchy[-1] is not None self.current_drawer = self.headline_hierarchy[-1]["results"] self.headline_hierarchy[-1]["structural"].append((linenum, line)) def add_logbook_drawer_line(self, linenum: int, line: str, match: re.Match): assert self.headline_hierarchy[-1] is not None self.current_drawer = self.headline_hierarchy[-1]["logbook"] self.headline_hierarchy[-1]["structural"].append((linenum, line)) def add_drawer_end_line(self, linenum: int, line: str, match: re.Match): self.current_drawer = None if len(self.headline_hierarchy) == 0: self.structural.append((linenum, line)) else: assert self.headline_hierarchy[-1] is not None self.headline_hierarchy[-1]["structural"].append((linenum, line)) def add_node_properties_line(self, linenum: int, match: re.Match): key = match.group("key") value = match.group("value").strip() if as_time := parse_time(value): value = as_time if self.current_drawer is None: # Throw a better error on this case raise Exception( "Found properties before :PROPERTIES: line. Error on Org file?" ) self.current_drawer.append(Property(linenum, match, key, value, None)) def read(self, s): lines = s.split("\n") line_count = len(lines) reader = enumerate(lines) in_drawer = False in_block = False list_item_indentation = None list_item = None def add_raw_line_with_possible_indentation(linenum, line): added = False nonlocal list_item nonlocal list_item_indentation if list_item: if (line[: list_item.text_start_pos].strip() == "") or ( len(line.strip()) == 0 ): list_item.append_line(line) added = True else: list_item = None list_item_indentation = None if not added: self.add_raw_line(linenum, line) for lnum, line in reader: linenum = lnum + 1 try: if in_block: if m := END_BLOCK_RE.match(line): self.add_end_block_line(linenum, m) in_block = False list_item_indentation = None list_item = None else: add_raw_line_with_possible_indentation(linenum, line) elif m := HEADLINE_RE.match(line): list_item_indentation = None list_item = None self.add_headline(linenum, m) elif m := LIST_ITEM_RE.match(line): list_item = self.add_list_item_line(linenum, m) list_item_indentation = m.group("indentation") elif m := RAW_LINE_RE.match(line): add_raw_line_with_possible_indentation(linenum, line) # Org-babel elif m := BEGIN_BLOCK_RE.match(line): self.add_begin_block_line(linenum, m) in_block = True list_item_indentation = None list_item = None elif m := END_BLOCK_RE.match(line): self.add_end_block_line(linenum, m) in_block = False list_item_indentation = None list_item = None # Generic properties elif m := KEYWORDS_RE.match(line): self.add_keyword_line(linenum, m) elif m := DRAWER_END_RE.match(line): self.add_drawer_end_line(linenum, line, m) in_drawer = False list_item_indentation = None list_item = None elif (not in_drawer) and (m := DRAWER_START_RE.match(line)): self.add_property_drawer_line(linenum, line, m) in_drawer = True list_item_indentation = None list_item = None elif (not in_drawer) and (m := RESULTS_DRAWER_RE.match(line)): self.add_results_drawer_line(linenum, line, m) in_drawer = True list_item_indentation = None list_item = None elif m := NODE_PROPERTIES_RE.match(line): self.add_node_properties_line(linenum, m) elif line.strip().startswith("|"): self.add_table_line(linenum, line) list_item_indentation = None list_item = None # Not captured else: add_raw_line_with_possible_indentation(linenum, line) except: logging.error("Error line {}: {}".format(linenum + 1, line)) raise def loads( s: str, environment: Optional[Dict] = BASE_ENVIRONMENT, extra_cautious: bool = True ) -> OrgDoc: """ Load an Org-mode document from a string. Args: s (str): The string representation of the Org-mode document. environment (Optional[dict]): The environment for parsing. Defaults to `BASE_ENVIRONMENT`. extra_cautious (bool): If True, perform an extra check to ensure that the document can be re-serialized to the original string. Defaults to True. Returns: OrgDoc: The loaded Org-mode document. Raises: NonReproducibleDocument: If `extra_cautious` is True and there is a difference between the original string and the re-serialized document. """ reader = OrgDocReader(environment) reader.read(s) doc = reader.finalize() if extra_cautious: # Check that all options can be properly re-serialized after_dump = dumps(doc) if after_dump != s: diff = list( difflib.Differ().compare( s.splitlines(keepends=True), after_dump.splitlines(keepends=True) ) ) context_start = None context_last_line = None for i, line in enumerate(diff): if not line.startswith(" "): if context_start is None: context_start = i context_last_line = i elif context_start: assert context_last_line is not None if i > (context_last_line + DEBUG_DIFF_CONTEXT): start = max(0, context_start - DEBUG_DIFF_CONTEXT) end = min(len(diff), context_last_line + DEBUG_DIFF_CONTEXT) print( "## Lines {} to {}".format(start + 1, end + 1), file=sys.stderr, ) sys.stderr.writelines(diff[start:end]) context_start = None context_last_line = None # print("---\n" + after_dump + "\n---") raise NonReproducibleDocument( "Difference found between existing version and dumped" ) return doc def load( f: TextIO, environment: Optional[dict] = BASE_ENVIRONMENT, extra_cautious: bool = False, ) -> OrgDoc: """ Load an Org-mode document from a file object. Args: f (TextIO): The file object containing the Org-mode document. environment (Optional[dict]): The environment for parsing. Defaults to `BASE_ENVIRONMENT`. extra_cautious (bool): If True, perform an extra check to ensure that the document can be re-serialized to the original string. Defaults to False. Returns: OrgDoc: The loaded Org-mode document. """ doc = loads(f.read(), environment, extra_cautious) doc._path = os.path.abspath(f.name) return doc def dumps(doc: OrgDoc) -> str: """ Serialize an OrgDoc object to a string. Args: doc (OrgDoc): The OrgDoc object to serialize. Returns: str: The serialized string representation of the OrgDoc object. """ dump = list(doc.dump()) result = "\n".join(dump) return result def dump(doc: OrgDoc, fp: TextIO) -> None: """ Serialize an OrgDoc object to a file. Args: doc (OrgDoc): The OrgDoc object to serialize. fp (TextIO): The file-like object to write the serialized data to. Returns: None """ it = doc.dump() # Write first line separately line = next(it) fp.write(line) # Write following ones preceded by line jump for line in it: fp.write("\n" + line)