From d23ee1adba2a68b299b4e37f7f755e9826f9fa31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Mart=C3=ADnez=20Portela?= Date: Mon, 22 Jun 2020 00:40:47 +0200 Subject: [PATCH] Add base dumping/serialization mechanism. --- org_dom/__init__.py | 2 +- org_dom/org_dom.py | 201 ++++++++++++++++++++++++++++------ tests/test_dom.py | 11 +- tests/utils/dom_assertions.py | 4 +- 4 files changed, 177 insertions(+), 41 deletions(-) diff --git a/org_dom/__init__.py b/org_dom/__init__.py index cff8cf0..522a603 100644 --- a/org_dom/__init__.py +++ b/org_dom/__init__.py @@ -1 +1 @@ -from .org_dom import OrgDom, load, loads +from .org_dom import * diff --git a/org_dom/org_dom.py b/org_dom/org_dom.py index c3e3f53..a4af677 100644 --- a/org_dom/org_dom.py +++ b/org_dom/org_dom.py @@ -1,6 +1,7 @@ +import logging import re import collections -from typing import List +from typing import List, Tuple BASE_ENVIRONMENT = { 'org-footnote-section': 'Footnotes', @@ -62,10 +63,17 @@ Headline = collections.namedtuple('Headline', ('start_line', 'depth', 'children', )) -Property = collections.namedtuple('Property', ('name', 'value', 'options')) -TimeRange = collections.namedtuple('TimeRange', ('start_time', 'end_time')) -Timestamp = collections.namedtuple('Timestamp', ('year', 'month', 'day', 'dow', 'hour', 'minute')) +RawLine = collections.namedtuple('RawLine', ('linenum', 'line')) +Keyword = collections.namedtuple('Keyword', ('linenum', 'match', 'key', 'value', 'options')) +Property = collections.namedtuple('Property', ('linenum', 'match', 'key', 'value', 'options')) +# @TODO How are [YYYY-MM-DD HH:mm--HH:mm] and ([... HH:mm]--[... HH:mm]) differentiated ? +# @TODO Consider recurrence annotations +TimeRange = collections.namedtuple('TimeRange', ('start_time', 'end_time')) +Timestamp = collections.namedtuple('Timestamp', ('active', 'year', 'month', 'day', 'dow', 'hour', 'minute')) + +BEGIN_PROPERTIES = 'OPEN_PROPERTIES' +END_PROPERTIES = 'CLOSE_PROPERTIES' def parse_org_time(value): if m := ACTIVE_TIME_STAMP_RE.match(value): @@ -76,42 +84,152 @@ def parse_org_time(value): return None if m.group('end_hour'): - return TimeRange(Timestamp(int(m.group('year')), int(m.group('month')), int(m.group('day')), m.group('dow'), int(m.group('start_hour')), int(m.group('start_minute'))), - Timestamp(int(m.group('year')), int(m.group('month')), int(m.group('day')), m.group('dow'), int(m.group('end_hour')), int(m.group('end_minute')))) - return Timestamp(int(m.group('year')), int(m.group('month')), int(m.group('day')), m.group('dow'), int(m.group('start_hour')), int(m.group('start_minute'))) + return TimeRange(Timestamp(active, int(m.group('year')), int(m.group('month')), int(m.group('day')), m.group('dow'), int(m.group('start_hour')), int(m.group('start_minute'))), + Timestamp(active, int(m.group('year')), int(m.group('month')), int(m.group('day')), m.group('dow'), int(m.group('end_hour')), int(m.group('end_minute')))) + return Timestamp(active, int(m.group('year')), int(m.group('month')), int(m.group('day')), m.group('dow'), int(m.group('start_hour')), int(m.group('start_minute'))) +def timestamp_to_string(ts): + date = '{year}-{month:02d}-{day:02d}'.format( + year=ts.year, + month=ts.month, + day=ts.day + ) + if ts.dow: + date = date + ' ' + ts.dow + + if ts.hour is not None: + base = '{date} {hour:02}:{minute:02d}'.format(date=date, hour=ts.hour, minute=ts.minute) + else: + base = date + + if ts.active: + return '<{}>'.format(base) + else: + return '[{}]'.format(base) class OrgDom: - def __init__(self, headlines, keywords): + def __init__(self, headlines, keywords, contents): self.headlines: List[Headline] = headlines self.keywords: List[Property] = keywords + self.contents: List[RawLine] = contents def serialize(self): raise NotImplementedError() - ## Querying def getProperties(self): - return [ - Property(name=kw.group('key'), - value=kw.group('value'), - options=kw.group('options'), - ) - for kw in self.keywords - ] + return self.keywords def getTopHeadlines(self): return self.headlines + # Writing + def dump_kw(self, kw): + options = kw.match.group('options') + if not options: + options = '' + + return (kw.linenum, + '{indentation}#+{key}{options}:{spacing}{value}'.format( + indentation=kw.match.group('indentation'), + key=kw.key, + options=kw.options, + spacing=kw.match.group('spacing'), + value=kw.value, + )) + + def dump_property(self, prop: Property): + plus = prop.match.group('plus') + if plus is None: plus = '' + + if isinstance(prop.value, Timestamp): + value = timestamp_to_string(prop.value) + else: + value = prop.value + + return (prop.linenum, '{indentation}:{key}{plus}:{spacing}{value}'.format( + indentation=prop.match.group('indentation'), + key=prop.key, + plus=plus, + spacing=prop.match.group('spacing'), + value=value, + )) + + def dump_contents(self, raw: RawLine): + return (raw.linenum, raw.line) + + def dump_structural(self, structural: Tuple): + return (structural[0], structural[1]) + + def dump_headline(self, headline): + yield headline['orig'].group('stars') + ' ' + headline['orig'].group('spacing') + headline['orig'].group('line') + + lines = [] + KW_T = 0 + CONTENT_T = 1 + PROPERTIES_T = 2 + STRUCTURAL_T = 3 + for keyword in headline['keywords']: + lines.append((KW_T, self.dump_kw(keyword))) + + for content in headline['contents']: + lines.append((CONTENT_T, self.dump_contents(content))) + + for prop in headline['properties']: + lines.append((PROPERTIES_T, self.dump_property(prop))) + + for struct in headline['structural']: + lines.append((STRUCTURAL_T, self.dump_structural(struct))) + + lines = sorted(lines, key=lambda x: x[1][0]) + + structured_lines = [] + last_type = None + for i, line in enumerate(lines): + ltype = line[0] + content = line[1][1] + + if ltype == PROPERTIES_T and last_type not in (STRUCTURAL_T, PROPERTIES_T): + # No structural opening + structured_lines.append(' ' * content.index(':') + ':PROPERTIES:') + logging.warning("Added structural: ".format(line[1][0], structured_lines[-1].strip())) + elif ltype not in (STRUCTURAL_T, PROPERTIES_T) and last_type == PROPERTIES_T: + # No structural closing + last_line = lines[i - 1][1][1] + structured_lines.append(' ' * last_line.index(':') + ':END:') + logging.warning("Added structural:{}: {}".format(line[1][0], structured_lines[-1].strip())) + + last_type = ltype + structured_lines.append(content) + + yield from structured_lines + + for child in headline['children']: + yield from self.dump_headline(child) + + def dump(self): + lines = [] + for kw in self.keywords: + lines.append(self.dump_kw(kw)) + + for line in self.contents: + lines.append(self.dump_contents(line)) + + yield from map(lambda x: x[1], sorted(lines, key=lambda x: x[0])) + + for headline in self.headlines: + yield from self.dump_headline(headline) + class OrgDomReader: def __init__(self): self.headlines: List[Headline] = [] self.keywords: List[Property] = [] self.headline_hierarchy: List[OrgDom] = [] + self.contents: List[RawLine] = [] def finalize(self): - return OrgDom(self.headlines, self.keywords) + return OrgDom(self.headlines, self.keywords, self.contents) ## Construction def add_headline(self, linenum: int, match: re.Match) -> int: @@ -127,6 +245,7 @@ class OrgDomReader: 'children': [], 'keywords': [], 'properties': [], + 'structural': [], } while (depth - 1) > len(self.headline_hierarchy): @@ -143,20 +262,27 @@ class OrgDomReader: def add_keyword_line(self, linenum: int, match: re.Match) -> int: + options = match.group('options') + kw = Keyword(linenum, match, match.group('key'), match.group('value'), options if options is not None else '') if len(self.headline_hierarchy) == 0: - self.keywords.append(match) + self.keywords.append(kw) else: - self.headline_hierarchy[-1]['keywords'].append('match') + self.headline_hierarchy[-1]['keywords'].append(kw) def add_raw_line(self, linenum: int, line: str) -> int: - print('>>', line) - pass + raw = RawLine(linenum, line) + if len(self.headline_hierarchy) == 0: + self.contents.append(raw) + else: + self.headline_hierarchy[-1]['contents'].append(raw) - def add_property_drawer_line(self, linenum: int, match: re.Match) -> int: + def add_property_drawer_line(self, linenum: int, line: str, match: re.Match) -> int: self.current_drawer = self.headline_hierarchy[-1]['properties'] + self.headline_hierarchy[-1]['structural'].append((linenum, line)) - def add_drawer_end_line(self, linenum: int, match: re.Match) -> int: + def add_drawer_end_line(self, linenum: int, line: str, match: re.Match) -> int: self.current_drawer = None + self.headline_hierarchy[-1]['structural'].append((linenum, line)) def add_node_properties_line(self, linenum: int, match: re.Match) -> int: key = match.group('key') @@ -172,7 +298,7 @@ class OrgDomReader: elif as_time := parse_org_time(value): value = as_time - self.current_drawer.append(Property(key, value, None)) + self.current_drawer.append(Property(linenum, match, key, value, None)) def read(self, s, environment): lines = s.split('\n') @@ -180,32 +306,35 @@ class OrgDomReader: for linenum, line in reader: if m := RAW_LINE_RE.match(line): - # TODO: Parse line self.add_raw_line(linenum, line) elif m := HEADLINE_RE.match(line): - # TODO: Parse headline self.add_headline(linenum, m) elif m := KEYWORDS_RE.match(line): - # TODO: Parse line self.add_keyword_line(linenum, m) elif m := PROPERTY_DRAWER_RE.match(line): - # TODO: Parse line - self.add_property_drawer_line(linenum, m) + self.add_property_drawer_line(linenum, line, m) elif m := DRAWER_END_RE.match(line): - # TODO: Parse line - self.add_drawer_end_line(linenum, m) + self.add_drawer_end_line(linenum, line, m) elif m := NODE_PROPERTIES_RE.match(line): - # TODO: Parse line self.add_node_properties_line(linenum, m) else: raise NotImplementedError('{}: ‘{}’'.format(linenum, line)) -def loads(s, environment=BASE_ENVIRONMENT): +def loads(s, environment=BASE_ENVIRONMENT, extra_cautious=False): doc = OrgDomReader() doc.read(s, environment) - return doc.finalize() + dom = doc.finalize() + if extra_cautious: # Check that all options can be properly re-serialized + if dumps(dom) != s: + raise NotImplementedError("Error re-serializing, file uses something not implemented") + return dom -def load(f, environment=BASE_ENVIRONMENT): - return loads(f.read(), environment) +def load(f, environment=BASE_ENVIRONMENT, extra_cautious=False): + return loads(f.read(), environment, extra_cautious) + + +def dumps(doc): + result = '\n'.join(doc.dump()) + return result diff --git a/tests/test_dom.py b/tests/test_dom.py index ddf4249..bef59ec 100644 --- a/tests/test_dom.py +++ b/tests/test_dom.py @@ -1,10 +1,9 @@ import logging import os -import sys import unittest from datetime import datetime as DT -from org_dom import load, loads +from org_dom import dumps, load, loads from utils.dom_assertions import HL, Dom DIR = os.path.dirname(os.path.abspath(__file__)) @@ -37,3 +36,11 @@ class TestSerde(unittest.TestCase): ]))) ex.assert_matches(self, doc) + + def test_mimic_write_file_01(self): + """A goal of this library is to be able to update a file without changing parts not directly modified.""" + with open(os.path.join(DIR, '01-simple.org')) as f: + orig = f.read() + doc = loads(orig) + + self.assertEqual(dumps(doc), orig) diff --git a/tests/utils/dom_assertions.py b/tests/utils/dom_assertions.py index 61a33c1..3562687 100644 --- a/tests/utils/dom_assertions.py +++ b/tests/utils/dom_assertions.py @@ -23,7 +23,7 @@ class Dom: test_case.assertEqual(len(doc_props), len(self.props)) for i, prop in enumerate(self.props): - test_case.assertEqual(doc_props[i].name, prop[0]) + test_case.assertEqual(doc_props[i].key, prop[0]) test_case.assertEqual(doc_props[i].value, prop[1]) # @TODO: Check properties @@ -58,7 +58,7 @@ class HL: test_case.assertEqual(len(doc_props), len(self.props)) for i, prop in enumerate(self.props): - test_case.assertEqual(doc_props[i].name, prop[0]) + test_case.assertEqual(doc_props[i].key, prop[0]) if isinstance(prop[1], datetime): test_case.assertEqual( timestamp_to_datetime(doc_props[i].value), prop[1])