#!/usr/bin/env python3 import sqlite3 import time import json import html import logging import os import sys import uuid from datetime import datetime import traceback import re import inotify.adapters import org_rw from org_rw import OrgTime, dom, Link from org_rw import dump as dump_org from org_rw import load as load_org from org_rw import token_list_to_raw # Set custom states for state in ("NEXT", "MEETING", "Q", "PAUSED", "SOMETIME", "TRACK", "WAITING"): org_rw.DEFAULT_TODO_KEYWORDS.append(state) for state in ("DISCARDED", "VALIDATING"): org_rw.DEFAULT_DONE_KEYWORDS.append(state) EXTENSIONS = [ ".org", ".org.txt", ] WATCH = True if os.getenv('WATCH_AND_REBUILD', '1') == '0': WATCH = False MIN_HIDDEN_HEADLINE_LEVEL = 2 INDEX_ID = "ea48ec1d-f9d4-4fb7-b39a-faa7b6e2ba95" SITE_NAME = "Código para llevar" MONITORED_EVENT_TYPES = ( 'IN_CREATE', # 'IN_MODIFY', 'IN_CLOSE_WRITE', 'IN_DELETE', 'IN_MOVED_FROM', 'IN_MOVED_TO', 'IN_DELETE_SELF', 'IN_MOVE_SELF', ) TEXT_OR_LINK_RE = re.compile(r'([^\s\[\]]+|.)') ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) STATIC_PATH = os.path.join(ROOT_DIR, 'static') def is_git_path(path): return any([chunk == ".git" for chunk in path.split(os.sep)]) def create_db(path): if os.path.exists(path): os.unlink(path) db = sqlite3.connect(path) db.execute('CREATE VIRTUAL TABLE note_search USING fts5(note_id, title, body, top_level_title, is_done, is_todo);') return db def load_all(top_dir_relative): top = os.path.abspath(top_dir_relative) docs = [] for root, dirs, files in os.walk(top): for name in files: if ".org" not in name: continue path = os.path.join(root, name) try: doc = load_org(open(path), extra_cautious=True) docs.append(doc) except Exception as err: import traceback traceback.print_exc() print(f"== On {path}") sys.exit(1) logging.info("Collected {} files".format(len(docs))) return docs def regen_all(src_top, dest_top, *, docs=None, db=None): files_generated = 0 cur = db.cursor() cur.execute('DELETE FROM note_search;') docs = load_all(src_top) doc_to_headline_remapping = {} os.makedirs(dest_top, exist_ok=True) ## Build headline list # This includes a virtual headline for ID-referenced documents. all_headlines = [] main_headlines_by_path = {} main_headline_to_docid = {} for doc in docs: relpath = os.path.relpath(doc.path, src_top) changed = False headlines = list(doc.getAllHeadlines()) related = None if not relpath.startswith("public/"): # print("Skip:", relpath) continue i = len(headlines) while i > 0: i -= 1 headline = headlines[i] if headline.title.get_text().strip().lower() == "related" and headline.depth == 1: if related is not None: print( "Found duplicated related: {} vs {}".format( related.id, headline.id ) ) assert related is None related = headline headlines.pop(i) for headline in headlines: if headline.id is None: headline.id = str(uuid.uuid4()) changed = True if changed: print("Updated", relpath) save_changes(doc) all_headlines.extend(headlines) main_headline = None topHeadlines = doc.getTopHeadlines() if ((len(topHeadlines) == 1 and related is None) or (len(topHeadlines) == 2 and related is not None)): main_headline = [h for h in topHeadlines if h != related][0] main_headlines_by_path[doc.path] = main_headline if doc.id is not None: doc_to_headline_remapping['id:' + doc.id] = 'id:' + main_headline.id main_headline_to_docid[main_headline.id] = doc.id files_generated += 1 elif doc.id is not None: logging.error("Cannot render document from id: {}. {} headlines {} related".format( relpath, len(topHeadlines), 'with' if related is not None else 'without' )) # Build graph graph = {} for headline in all_headlines: links = [] headline_links = list(headline.get_links()) if headline == main_headline and related is not None: headline_links.extend(list(related.get_links())) for l in headline_links: if l.value.startswith('http://') or l.value.startswith('https://'): pass # Ignore for now, external URL elif l.value.startswith('id:'): links.append({'target': l.value}) elif l.value.startswith('attachment:'): pass # Ignore, attachment elif l.value.startswith('file:'): pass # Ignore, attachment elif l.value.startswith('notmuch:'): pass # Ignore, mail elif l.value.startswith('orgit-rev:'): pass # Ignore, mail elif l.value.startswith('*'): pass # Ignore, internal elif not ':' in l.value.split()[0]: pass # Ignore, internal elif l.value.startswith('./'): pass # TODO: Properly handle else: logging.warning('On document {}, unknown link to {}'.format(doc.path, l.value)) if headline.parent: if isinstance(headline.parent, org_rw.Headline): links.append({ "target": headline.parent.id, "relation": "in" }) graph[headline.id] = { "title": org_rw.org_rw.token_list_to_plaintext(headline.title.contents).strip(), "links": links, "depth": headline.depth, } if headline.id in main_headline_to_docid: graph[main_headline_to_docid[headline.id]] = graph[headline.id] topLevelHeadline = headline while isinstance(topLevelHeadline.parent, org_rw.Headline): topLevelHeadline = topLevelHeadline.parent # Save for full-text-search cur.execute('''INSERT INTO note_search(note_id, title, body, top_level_title, is_done, is_todo) VALUES (?, ?, ?, ?, ?, ?);''', ( headline.id, headline.title.get_text(), ''.join(headline.get_contents('raw')), topLevelHeadline.title.get_text(), headline.is_done, headline.is_todo, )) # Update graph, replace document ids with headline ids for headline_data in graph.values(): for link in headline_data['links']: if link['target'] in doc_to_headline_remapping: link['target'] = doc_to_headline_remapping[link['target']] # Render docs after we've built the graph # Render main headlines full_graph_info = { "nodes": graph, "main_headlines": main_headlines_by_path } for _docpath, main_headline in main_headlines_by_path.items(): if main_headline.doc.id: endpath = os.path.join(dest_top, main_headline.doc.id + ".node.html") with open(endpath, "wt") as f: f.write(render_as_document(main_headline, main_headline.doc, headlineLevel=0, graph=full_graph_info, title=org_rw.token_list_to_plaintext(main_headline.title.contents))) # Render all headlines for headline in all_headlines: endpath = os.path.join(dest_top, headline.id + ".node.html") # Render HTML with open(endpath, "wt") as f: f.write(render_as_document(headline, headline.doc, headlineLevel=0, graph=full_graph_info, title=org_rw.token_list_to_plaintext(headline.title.contents))) files_generated += 1 if headline.id == INDEX_ID: index_endpath = os.path.join(dest_top, "index.html") with open(index_endpath, "wt") as f: f.write(render_as_document(headline, headline.doc, headlineLevel=0, graph=full_graph_info, title=org_rw.token_list_to_plaintext(headline.title.contents))) files_generated += 1 # Output graph files graphpath = os.path.join(dest_top, "graph.json") graph_explorer_path = os.path.join(dest_top, "graph.html") with open(graphpath, "wt") as f: json.dump(obj=graph, fp=f, indent=2) graph_explorer_path = os.path.join(dest_top, "graph.html") with open(graph_explorer_path, 'wt') as f: with open(os.path.join(os.path.dirname(os.path.abspath(dest_top)), '..', 'static', 'graph_explorer.html'), 'rt') as template: source = template.read() f.write(source.replace('', json.dumps(graph))) logging.info("Generated {} files".format(files_generated)) cur.close() db.commit() def main(src_top, dest_top): notifier = inotify.adapters.InotifyTrees([src_top, STATIC_PATH]) ## Initial load t0 = time.time() os.makedirs(dest_top, exist_ok=True) db = create_db(os.path.join(dest_top, 'db.sqlite3')) docs = regen_all(src_top, dest_top, db=db) if not WATCH: logging.info("Build completed in {:.2f}s".format(time.time() - t0)) return 0 logging.info("Initial load completed in {:.2f}s".format(time.time() - t0)) ## Updating for event in notifier.event_gen(yield_nones=False): (ev, types, directory, file) = event if not any([type in MONITORED_EVENT_TYPES for type in types]): continue if is_git_path(directory): continue filepath = os.path.join(directory, file) print("CHANGED: {}".format(filepath)) t0 = time.time() try: docs = regen_all(src_top, dest_top, docs=docs, db=db) except: logging.error(traceback.format_exc()) logging.error("Loading new templates failed 😿") continue logging.info("Updated all in {:.2f}s".format(time.time() - t0)) def get_headline_with_name(target_name, doc): target_name = target_name.strip() for headline in doc.getAllHeadlines(): if headline.title.get_text().strip() == target_name: return headline return None def assert_id_exists(id, src_headline, graph): if id not in graph["nodes"]: raise AssertionError("Cannot follow link to '{}' on headline '{}' ({})" .format(id, src_headline.id, src_headline.title.get_text())) def print_tree(tree, indentation=0, headline=None): # if headline and headline.id != INDEX_ID: # return return for element in tree: if "children" in dir(element): if len(element.children) > 0: print_element(element.children, indentation + 1, headline) print() elif "content" in dir(element): for content in element.content: print_element(content, indentation + 1, headline) def print_element(element, indentation, headline): if isinstance(element, org_rw.Link): print(" " * indentation, "Link:", element.get_raw()) elif isinstance(element, str): print(" " * indentation, "{" + element + "}", type(element)) else: print_tree(element, indentation, headline) def render_property_drawer(element, acc, headline, graph): pass def render_logbook_drawer(element, acc, headline, graph): pass def render_property_node(element, acc, headline, graph): pass def render_list_group(element, acc, headline, graph): acc.append("") def render_table(element, acc, graph, headline): acc.append("") render_tree(element.children, acc, headline, graph) acc.append("
") def render_table_row(element, acc, headline, graph): acc.append("") for cell in element.cells: acc.append("") acc.append(html.escape(cell)) acc.append("") acc.append("") def render_table_separator_row(element, acc, headline, graph): acc.append("") def render_list_item(element, acc, headline, graph): acc.append("
  • ") if element.tag is not None: acc.append("") render_text_tokens(element.tag, acc, headline, graph) acc.append("") acc.append("") render_text_tokens(element.content, acc, headline, graph) acc.append("
  • ") def render_code_block(element, acc, headline, graph): acc.append('
    '.format(element.subtype.lower()))
        content = html.escape(element.lines)
    
        # Remove indentation common to all lines
        base_indentation = min([
            len(l) - len(l.lstrip(' '))
            for l in content.split('\n')
            if len(l.strip()) > 0
        ])
        content_lines = [
            l[base_indentation:]
            for l in content.split('\n')
        ]
    
        acc.append('\n'.join(content_lines))
        acc.append('
    ') def render_results_block(element, acc, headline, graph): # TODO: # acc.append('
    ')
        # render_tree(element.children, acc)
        # acc.append('
    ') pass def render_org_text(element, acc, headline, graph): as_dom = org_rw.text_to_dom(element.contents, element) render_text_tokens(as_dom, acc, headline, graph) def render_text(element, acc, headline, graph): acc.append('
    ') render_text_tokens(element.content, acc, headline, graph) acc.append('
    ') def render_text_tokens(tokens, acc, headline, graph): acc.append('

    ') for chunk in tokens: if isinstance(chunk, str): lines = chunk.split('\n\n') contents = [] for line in lines: line_chunks = [] for word in TEXT_OR_LINK_RE.findall(line): if ':/' in word and not (word.startswith('org-protocol://')): if not (word.startswith('http://') or word.startswith('https://') or word.startswith('ftp://') or word.startswith('ftps://') ): raise Exception('Is this a link? {} (on {})\nLine: {}\nChunks: {}'.format(word, headline.doc.path, line, chunks)) line_chunks.append('{description}' .format(url=word, description=html.escape(word))) else: line_chunks.append(html.escape(word)) contents.append(''.join(line_chunks)) acc.append('{}'.format('

    '.join(contents))) elif isinstance(chunk, Link): link_target = chunk.value is_internal_link = True if link_target.startswith('id:'): assert_id_exists(link_target[3:], headline, graph) link_target = './' + link_target[3:] + '.node.html' elif link_target.startswith('./') or link_target.startswith('../'): if '::' in link_target: logging.warn('Not implemented headline links to other files. Used on {}'.format(link_target)) else: target_path = os.path.abspath(os.path.join(os.path.dirname(headline.doc.path), link_target)) if target_path not in graph['main_headlines']: logging.warn('Link to doc not in graph: {}'.format(target_path)) else: assert_id_exists(graph['main_headlines'][target_path].id, headline, graph) link_target = './' + graph['main_headlines'][target_path].id + '.node.html' elif link_target.startswith('attachment:'): logging.warn('Not implemented `attachment:` links. Used on {}'.format(link_target)) elif link_target.startswith('* '): target_headline = get_headline_with_name(link_target.lstrip('* '), headline.doc) if target_headline is None: logging.warn('No headline found corresponding to {}. On file {}'.format(link_target, headline.doc.path)) else: assert_id_exists(target_headline.id, headline, graph) link_target = './' + target_headline.id + '.node.html' else: is_internal_link = False if not ( link_target.startswith('https://') or link_target.startswith('http://') or link_target.startswith('/') ): raise NotImplementedError('Unknown link type: {}' .format(link_target)) description = chunk.description if description is None: description = chunk.value acc.append('{}'.format( html.escape(link_target), 'internal' if is_internal_link else 'external', html.escape(description), )) # else: # raise NotImplementedError('TextToken: {}'.format(chunk)) acc.append('

    ') def render_tag(element, acc, headline, graph): return { dom.PropertyDrawerNode: render_property_drawer, dom.LogbookDrawerNode: render_logbook_drawer, dom.PropertyNode: render_property_node, dom.ListGroupNode: render_list_group, dom.ListItem: render_list_item, dom.TableNode: render_table, dom.TableSeparatorRow: render_table_separator_row, dom.TableRow: render_table_row, dom.CodeBlock: render_code_block, dom.Text: render_text, dom.ResultsDrawerNode: render_results_block, org_rw.Text: render_org_text, }[type(element)](element, acc, headline, graph) def render_tree(tree, acc, headline, graph): for element in tree: render_tag(element, acc, headline, graph) def render_inline(tree, f, headline, graph): acc = [] f(tree, acc, headline, graph) return ''.join(acc) def render_as_document(headline, doc, headlineLevel, graph, title): if isinstance(headline.parent, org_rw.Headline): topLevelHeadline = headline.parent while isinstance(topLevelHeadline.parent, org_rw.Headline): topLevelHeadline = topLevelHeadline.parent return f""" {title} @ {SITE_NAME} Sending you to the main note... [{org_rw.token_list_to_plaintext(topLevelHeadline.title.contents)}] """ else: return as_document(render(headline, doc, graph=graph, headlineLevel=headlineLevel), title) def render(headline, doc, graph, headlineLevel): try: dom = headline.as_dom() except: logging.error("Error generating DOM for {}".format(doc.path)) raise print_tree(dom, indentation=2, headline=headline) content = [] render_tree(dom, content, headline, graph) for child in headline.children: content.append(render(child, doc, headlineLevel=headlineLevel+1, graph=graph)) if headline.state is None: state = "" else: state = f'{headline.state}' if headline.is_todo: todo_state = "todo" else: todo_state = "done" tag_list = [] for tag in headline.shallow_tags: tag_list.append(f'{html.escape(tag)}') tags = f'{"".join(tag_list)}' # display_state = 'collapsed' # if headlineLevel < MIN_HIDDEN_HEADLINE_LEVEL: # display_state = 'expanded' display_state = 'expanded' title = render_inline(headline.title, render_tag, headline, graph) if headlineLevel > 0: title = f"{title}" return f"""

    {state} {title} {tags}

    {''.join(content)}
    """ def as_document(html, title): return f""" {title} @ {SITE_NAME} {html} """ def save_changes(doc): assert doc.path is not None with open(doc.path, "wt") as f: dump_org(doc, f) if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: {} SOURCE_TOP DEST_TOP".format(sys.argv[0])) exit(0) logging.basicConfig(level=logging.INFO, format="%(levelname)-8s %(message)s") exit(main(sys.argv[1], sys.argv[2]))