#!/usr/bin/env python3 import sqlite3 import time import json import html import logging import os import sys import uuid from datetime import datetime import traceback import re from itertools import chain import shutil import inotify.adapters import org_rw from org_rw import OrgTime, dom, Link from org_rw import dump as dump_org from org_rw import load as load_org from org_rw import token_list_to_raw import pygments import pygments.lexers import pygments.formatters import gen_centered_graph # Set custom states for state in ("NEXT", "MEETING", "Q", "PAUSED", "SOMETIME", "TRACK", "WAITING"): org_rw.DEFAULT_TODO_KEYWORDS.append(state) for state in ("DISCARDED", "VALIDATING"): org_rw.DEFAULT_DONE_KEYWORDS.append(state) EXTENSIONS = [ ".org", ".org.txt", ] IMG_EXTENSIONS = set([ "svg", "png", "jpg", "jpeg", "gif", ]) SKIPPED_TAGS = set(['attach']) PARSER_NAMESPACE = 'codigoparallevar.com/notes' WATCH = True if os.getenv('WATCH_AND_REBUILD', '1') == '0': WATCH = False MIN_HIDDEN_HEADLINE_LEVEL = 2 INDEX_ID = "ea48ec1d-f9d4-4fb7-b39a-faa7b6e2ba95" SITE_NAME = "Código para llevar" MONITORED_EVENT_TYPES = ( 'IN_CREATE', # 'IN_MODIFY', 'IN_CLOSE_WRITE', 'IN_DELETE', 'IN_MOVED_FROM', 'IN_MOVED_TO', 'IN_DELETE_SELF', 'IN_MOVE_SELF', ) TEXT_OR_LINK_RE = re.compile(r'([^\s\[\]]+|.)') ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) STATIC_PATH = os.path.join(ROOT_DIR, 'static') class NonExistingLocalNoteError(AssertionError): def __init__(self, note_id, src_headline): AssertionError.__init__(self) self.note_id = note_id self.src_headline = src_headline def get_message(self): return ("Cannot follow link to '{}' on headline '{}' ({})" .format(self.note_id, self.src_headline.id, self.src_headline.title.get_text().strip())) def is_git_path(path): return any([chunk == ".git" for chunk in path.split(os.sep)]) def create_db(path): db = sqlite3.connect(path) db.execute('CREATE VIRTUAL TABLE IF NOT EXISTS note_search USING fts5(note_id, title, body, top_level_title, is_done, is_todo, parser_namespace, url tokenize="trigram");') db.execute('DELETE FROM note_search WHERE parser_namespace = ?;', (PARSER_NAMESPACE,)) return db def load_all(top_dir_relative): top = os.path.abspath(top_dir_relative) docs = [] for root, dirs, files in os.walk(top): for name in files: if ".org" not in name: continue path = os.path.join(root, name) try: doc = load_org(open(path), extra_cautious=True) docs.append(doc) except Exception as err: import traceback traceback.print_exc() print(f"== On {path}") sys.exit(1) logging.info("Collected {} files".format(len(docs))) return docs def regen_all(src_top, dest_top, *, docs=None, db=None): files_generated = 0 cur = db.cursor() cleaned_db = False try: cur.execute('DELETE FROM note_search WHERE parser_namespace = ?;', (PARSER_NAMESPACE,)) cleaned_db = True except sqlite3.OperationalError as err: if WATCH: logging.warning("Error pre-cleaning DB, search won't be updated") else: raise docs = load_all(src_top) base_dirs = set() doc_to_headline_remapping = {} os.makedirs(dest_top, exist_ok=True) ## Build headline list # This includes a virtual headline for ID-referenced documents. all_headlines = [] main_headlines_by_path = {} main_headline_to_docid = {} for doc in docs: relpath = os.path.relpath(doc.path, src_top) changed = False headlines = list(doc.getAllHeadlines()) related = None if not relpath.startswith("public/"): # print("Skip:", relpath) continue base_dirs.add(os.path.dirname(relpath)) i = len(headlines) while i > 0: i -= 1 headline = headlines[i] if headline.title.get_text().strip().lower() == "related" and headline.depth == 1: if related is not None: print( "Found duplicated related: {} vs {}".format( related.id, headline.id ) ) assert related is None related = headline headlines.pop(i) for headline in headlines: if headline.id is None: headline.id = str(uuid.uuid4()) changed = True if changed: print("Updated", relpath) save_changes(doc) all_headlines.extend(headlines) main_headline = None topHeadlines = doc.getTopHeadlines() if ((len(topHeadlines) == 1 and related is None) or (len(topHeadlines) == 2 and related is not None)): main_headline = [h for h in topHeadlines if h != related][0] main_headlines_by_path[doc.path] = main_headline if doc.id is not None: doc_to_headline_remapping['id:' + doc.id] = 'id:' + main_headline.id main_headline_to_docid[main_headline.id] = doc.id files_generated += 1 elif doc.id is not None: logging.error("Cannot render document from id: {}. {} headlines {} related".format( relpath, len(topHeadlines), 'with' if related is not None else 'without' )) # Build graph graph = {} backlink_graph = {} for headline in all_headlines: links = [] headline_links = list(headline.get_links()) if headline == main_headline and related is not None: headline_links.extend(list(related.get_links())) for l in headline_links: if l.value.startswith('http://') or l.value.startswith('https://'): pass # Ignore for now, external URL elif l.value.startswith('id:'): links.append({'target': l.value}) elif l.value.startswith('attachment:'): pass # Ignore, attachment elif l.value.startswith('file:'): pass # Ignore, attachment elif l.value.startswith('notmuch:'): pass # Ignore, mail elif l.value.startswith('orgit-rev:'): pass # Ignore, mail elif l.value.startswith('*'): pass # Ignore, internal elif not ':' in l.value.split()[0]: pass # Ignore, internal elif l.value.startswith('./'): pass # TODO: Properly handle else: logging.warning('On document {}, unknown link to {}'.format(doc.path, l.value)) if headline.parent: if isinstance(headline.parent, org_rw.Headline): links.append({ "target": headline.parent.id, "relation": "in" }) for backlink in links: if 'relation' in backlink and backlink['relation'] == 'in': continue target = backlink['target'] if target.startswith('id:'): target = target[len('id:'):] if target not in backlink_graph: backlink_graph[target] = set() backlink_graph[target].add(headline.id) graph[headline.id] = { "title": org_rw.org_rw.token_list_to_plaintext(headline.title.contents).strip(), "links": links, "depth": headline.depth, } if headline.id in main_headline_to_docid: graph[main_headline_to_docid[headline.id]] = graph[headline.id] topLevelHeadline = headline while isinstance(topLevelHeadline.parent, org_rw.Headline): topLevelHeadline = topLevelHeadline.parent # Save for full-text-search cur.execute('''INSERT INTO note_search(note_id, title, body, top_level_title, is_done, is_todo, parser_namespace, url) VALUES (?, ?, ?, ?, ?, ?, ?, ?);''', ( headline.id, headline.title.get_text(), '\n'.join(headline.doc.dump_headline(headline, recursive=False)), topLevelHeadline.title.get_text(), headline.is_done, headline.is_todo, PARSER_NAMESPACE, headline.id + '.node.html', )) # Update graph, replace document ids with headline ids for headline_data in graph.values(): for link in headline_data['links']: if link['target'] in doc_to_headline_remapping: link['target'] = doc_to_headline_remapping[link['target']] # Remap document ids backlinks to main headlines for doc_id, main_headline_id in doc_to_headline_remapping.items(): if doc_id.startswith('id:'): doc_id = doc_id[len('id:'):] if main_headline_id.startswith('id:'): main_headline_id = main_headline_id[len('id:'):] for backlink in backlink_graph.get(doc_id, []): if main_headline_id not in backlink_graph: backlink_graph[main_headline_id] = set() backlink_graph[main_headline_id].add(backlink) # Output graph files graphpath = os.path.join(dest_top, "graph.json") graph_explorer_path = os.path.join(dest_top, "graph.html") with open(graphpath, "wt") as f: json.dump(obj=graph, fp=f, indent=2) graph_explorer_path = os.path.join(dest_top, "graph.html") with open(graph_explorer_path, 'wt') as f: with open(os.path.join(os.path.dirname(os.path.abspath(dest_top)), '..', 'static', 'graph_explorer.html'), 'rt') as template: source = template.read() f.write(source.replace('', json.dumps(graph))) logging.info("Generated {} files".format(files_generated)) # Render docs after we've built the graph # Render main headlines full_graph_info = { "nodes": graph, "backlinks": backlink_graph, "main_headlines": main_headlines_by_path } for _docpath, main_headline in main_headlines_by_path.items(): if main_headline.doc.id: endpath = os.path.join(dest_top, main_headline.doc.id + ".node.html") with open(endpath, "wt") as f: f.write(render_as_document(main_headline, main_headline.doc, headlineLevel=0, graph=full_graph_info, doc_to_headline_remapping=doc_to_headline_remapping, title=org_rw.token_list_to_plaintext(main_headline.title.contents))) # Render all headlines for headline in all_headlines: endpath = os.path.join(dest_top, headline.id + ".node.html") # Render HTML with open(endpath, "wt") as f: f.write(render_as_document(headline, headline.doc, headlineLevel=0, graph=full_graph_info, doc_to_headline_remapping=doc_to_headline_remapping, title=org_rw.token_list_to_plaintext(headline.title.contents))) files_generated += 1 if headline.id == INDEX_ID: index_endpath = os.path.join(dest_top, "index.html") with open(index_endpath, "wt") as f: f.write(render_as_document(headline, headline.doc, headlineLevel=0, graph=full_graph_info, doc_to_headline_remapping=doc_to_headline_remapping, title=org_rw.token_list_to_plaintext(headline.title.contents))) files_generated += 1 cur.close() db.commit() logging.info("Copying attachments") attachments_dir = os.path.join(dest_top, 'attachments') os.makedirs(attachments_dir, exist_ok=True) for base in base_dirs: data_dir = os.path.join(src_top, base, 'data') logging.info("Copying attachments from: {}".format(data_dir)) if not os.path.exists(data_dir): continue for subdir in os.listdir(data_dir): shutil.copytree(os.path.join(data_dir, subdir), os.path.join(attachments_dir, subdir), dirs_exist_ok=True) def main(src_top, dest_top): notifier = inotify.adapters.InotifyTrees([src_top, STATIC_PATH]) ## Initial load t0 = time.time() os.makedirs(dest_top, exist_ok=True) db = create_db(os.path.join(dest_top, '..', 'db.sqlite3')) docs = regen_all(src_top, dest_top, db=db) if not WATCH: logging.info("Build completed in {:.2f}s".format(time.time() - t0)) return 0 logging.info("Initial load completed in {:.2f}s".format(time.time() - t0)) ## Updating for event in notifier.event_gen(yield_nones=False): (ev, types, directory, file) = event if not any([type in MONITORED_EVENT_TYPES for type in types]): continue if is_git_path(directory): continue filepath = os.path.join(directory, file) print("CHANGED: {}".format(filepath)) t0 = time.time() try: docs = regen_all(src_top, dest_top, docs=docs, db=db) except: logging.error(traceback.format_exc()) logging.error("Loading new templates failed 😿") continue logging.info("Updated all in {:.2f}s".format(time.time() - t0)) def get_headline_with_name(target_name, doc): target_name = target_name.strip() for headline in doc.getAllHeadlines(): if headline.title.get_text().strip() == target_name: return headline return None def assert_id_exists(id, src_headline, graph): if id not in graph["nodes"]: raise NonExistingLocalNoteError(id, src_headline) def print_tree(tree, indentation=0, headline=None): # if headline and headline.id != INDEX_ID: # return return for element in tree: if "children" in dir(element): if len(element.children) > 0: print_element(element.children, indentation + 1, headline) print() elif "content" in dir(element): for content in element.content: print_element(content, indentation + 1, headline) def print_element(element, indentation, headline): if isinstance(element, org_rw.Link): print(" " * indentation, "Link:", element.get_raw()) elif isinstance(element, str): print(" " * indentation, "{" + element + "}", type(element)) else: print_tree(element, indentation, headline) def render_property_drawer(element, acc, headline, graph): pass def render_logbook_drawer(element, acc, headline, graph): pass def render_property_node(element, acc, headline, graph): pass def render_list_group(element, acc, headline, graph): acc.append("") def render_table(element, acc, headline, graph): acc.append("") render_tree(element.children, acc, headline, graph) acc.append("
") def render_table_row(element, acc, headline, graph): acc.append("") for cell in element.cells: acc.append("") acc.append(html.escape(cell)) acc.append("") acc.append("") def render_table_separator_row(element, acc, headline, graph): acc.append("") def render_list_item(element, acc, headline, graph): acc.append("
  • ") if element.tag is not None: acc.append("") render_text_tokens(element.tag, acc, headline, graph) acc.append("") acc.append("") render_text_tokens(element.content, acc, headline, graph) acc.append("
  • ") def render_block(content, acc, _class, is_code): acc.append('
    '.format(_class))
        if is_code:
            acc.append('')
    
        # Remove indentation common to all lines
        acc.append(unindent(content))
        if is_code:
            acc.append('')
        acc.append('
    ') def unindent(content): base_indentation = min([ len(l) - len(l.lstrip(' ')) for l in content.split('\n') if len(l.strip()) > 0 ]) content_lines = [ l[base_indentation:] for l in content.split('\n') ] return '\n'.join(content_lines) def render_code_block(element, acc, headline, graph): code = element.lines if element.arguments is not None and len(element.arguments) > 0 : try: lexer = pygments.lexers.get_lexer_by_name(element.arguments.split()[0], stripall=True) content = pygments.highlight(unindent(code), lexer, pygments.formatters.HtmlFormatter() ) acc.append(content) return except pygments.util.ClassNotFound: pass logging.error("Cannot find lexer for {}".format(element.subtype.lower())) content = html.escape(code) render_block(content, acc, _class='code ' + element.subtype.lower(), is_code=True) def render_results_block(element, acc, headline, graph): items = [e.get_raw() for e in element.children] content = '\n'.join(items) if len(content.strip()) > 0: render_block(content, acc, _class='results lang-text', is_code=False) def render_org_text(element, acc, headline, graph): as_dom = org_rw.text_to_dom(element.contents, element) render_text_tokens(as_dom, acc, headline, graph) def render_text(element, acc, headline, graph): acc.append('
    ') render_text_tokens(element.content, acc, headline, graph) acc.append('
    ') def render_text_tokens(tokens, acc, headline, graph): acc.append('

    ') if isinstance(tokens, org_rw.Text): tokens = tokens.contents for chunk in tokens: if isinstance(chunk, str): lines = chunk.split('\n\n') contents = [] for line in lines: line_chunks = [] for word in TEXT_OR_LINK_RE.findall(line): if '://' in word and not (word.startswith('org-protocol://')): if not (word.startswith('http://') or word.startswith('https://') or word.startswith('ftp://') or word.startswith('ftps://') ): logging.warning('Is this a link? {} (on {})\nLine: {}\nChunks: {}'.format(word, headline.doc.path, line, line_chunks)) line_chunks.append(html.escape(word)) else: line_chunks.append('{description}' .format(url=word, description=html.escape(word))) else: line_chunks.append(html.escape(word)) contents.append(' '.join(line_chunks)) acc.append('{}'.format('

    '.join(contents))) elif isinstance(chunk, Link): link_target = chunk.value is_internal_link = True description = chunk.description if description is None: description = chunk.value try: if link_target.startswith('id:'): assert_id_exists(link_target[3:], headline, graph) link_target = './' + link_target[3:] + '.node.html' elif link_target.startswith('./') or link_target.startswith('../'): if '::' in link_target: logging.warning('Not implemented headline links to other files. Used on {}'.format(link_target)) else: target_path = os.path.abspath(os.path.join(os.path.dirname(headline.doc.path), link_target)) if target_path not in graph['main_headlines']: logging.warning('Link to doc not in graph: {}'.format(target_path)) else: assert_id_exists(graph['main_headlines'][target_path].id, headline, graph) link_target = './' + graph['main_headlines'][target_path].id + '.node.html' elif link_target.startswith('attachment:'): inner_target = link_target.split(':', 1)[1] link_target = 'attachments/{}/{}/{}'.format(headline.id[:2], headline.id[2:], inner_target) logging.warning('Not implemented `attachment:` links. Used on {}'.format(link_target)) elif link_target.startswith('* '): target_headline = get_headline_with_name(link_target.lstrip('* '), headline.doc) if target_headline is None: logging.warning('No headline found corresponding to {}. On file {}'.format(link_target, headline.doc.path)) else: assert_id_exists(target_headline.id, headline, graph) link_target = './' + target_headline.id + '.node.html' else: is_internal_link = False if link_target.startswith('orgit-rev'): raise NonExistingLocalNoteError(link_target, headline) elif link_target.startswith('file:'): raise NonExistingLocalNoteError(link_target, headline) elif not ( link_target.startswith('https://') or link_target.startswith('http://') or link_target.startswith('/') ): raise NotImplementedError('Unknown link type: {}' .format(link_target)) if link_target.rsplit('.', 1)[-1].lower() in IMG_EXTENSIONS: acc.append(''.format( html.escape(link_target), 'internal' if is_internal_link else 'external', html.escape(link_target), )) else: acc.append('{}'.format( html.escape(link_target), 'internal' if is_internal_link else 'external', html.escape(description), )) except NonExistingLocalNoteError as err: logging.warning(err.get_message()) acc.append(html.escape(description)) elif isinstance(chunk, org_rw.MarkerToken): tag = '<' if chunk.closing: tag += '/' tag += { org_rw.MarkerType.BOLD_MODE: 'strong', org_rw.MarkerType.CODE_MODE: 'code', org_rw.MarkerType.ITALIC_MODE: 'em', org_rw.MarkerType.STRIKE_MODE: 's', org_rw.MarkerType.UNDERLINED_MODE: 'span class="underlined"' if not chunk.closing else 'span', org_rw.MarkerType.VERBATIM_MODE: 'span class="verbatim"' if not chunk.closing else 'span', }[chunk.tok_type] tag += '>' acc.append(tag) else: raise NotImplementedError('TextToken: {}'.format(chunk)) acc.append('

    ') def render_tag(element, acc, headline, graph): return { dom.PropertyDrawerNode: render_property_drawer, dom.LogbookDrawerNode: render_logbook_drawer, dom.PropertyNode: render_property_node, dom.ListGroupNode: render_list_group, dom.ListItem: render_list_item, dom.TableNode: render_table, dom.TableSeparatorRow: render_table_separator_row, dom.TableRow: render_table_row, dom.CodeBlock: render_code_block, dom.Text: render_text, dom.ResultsDrawerNode: render_results_block, org_rw.Text: render_org_text, }[type(element)](element, acc, headline, graph) def render_tree(tree, acc, headline, graph): for element in tree: render_tag(element, acc, headline, graph) def render_inline(tree, f, headline, graph): acc = [] f(tree, acc, headline, graph) return ''.join(acc) def render_as_document(headline, doc, headlineLevel, graph, title, doc_to_headline_remapping): if isinstance(headline.parent, org_rw.Headline): topLevelHeadline = headline.parent while isinstance(topLevelHeadline.parent, org_rw.Headline): topLevelHeadline = topLevelHeadline.parent return f""" {title} @ {SITE_NAME} Sending you to the main note... [{org_rw.token_list_to_plaintext(topLevelHeadline.title.contents)}] """ else: return as_document(render(headline, doc, graph=graph, headlineLevel=headlineLevel, doc_to_headline_remapping=doc_to_headline_remapping), title, render_toc(doc)) def render_toc(doc): acc = ['') if sum([chunk == '
  • ' for chunk in acc]) < 2: # If < 2 headlines, ignore it return None return ''.join(acc) def render_toc_headline(headline, acc): acc.append('
  • ') acc.append(f'{html.escape(headline.title.get_text())}') children = list(headline.children) if children: acc.append('') acc.append('
  • ') def render_connections(headline_id, content, graph, doc_to_headline_remapping): # if headline_id != 'aa29be89-70e7-4465-91ed-361cf0ce62f2': # return logging.info("Generating centered graph for {}".format(headline_id)) svg = gen_centered_graph.gen(headline_id, graph['nodes'], doc_to_headline_remapping) content.append("
    {}
    ".format(svg)) def render(headline, doc, graph, headlineLevel, doc_to_headline_remapping): try: dom = headline.as_dom() except: logging.error("Error generating DOM for {}".format(doc.path)) raise print_tree(dom, indentation=2, headline=headline) content = [] if headline.id and headlineLevel == 0: render_connections(headline.id, content, graph, doc_to_headline_remapping=doc_to_headline_remapping) render_tree(dom, content, headline, graph) for child in headline.children: content.append(render(child, doc, headlineLevel=headlineLevel+1, graph=graph, doc_to_headline_remapping=doc_to_headline_remapping)) if headline.state is None: state = "" else: state = f'{headline.state}' if headline.is_todo: todo_state = "todo" else: todo_state = "done" tag_list = [] for tag in headline.shallow_tags: if tag.lower() not in SKIPPED_TAGS: tag_list.append(f'{html.escape(tag)}') tags = f'{"".join(tag_list)}' display_state = 'expanded' # # Update display based on document STARTUP config # visual_level = doc.get_keywords('STARTUP', 'showall') # if visual_level.startswith('show') and visual_level.endswith('levels'): # visual_level_num = int(visual_level[len('show'):-len('levels')]) - 1 # # Note that level is 0 indexed inside this loop # if headlineLevel >= visual_level_num: # display_state = 'collapsed' title = render_inline(headline.title, render_tag, headline, graph) if headlineLevel > 0: title = f"{title}" return f"""

    {state} {title} {tags}

    {''.join(content)}
    """ def as_document(html, title, global_toc): body_classes = [] if global_toc is None: toc_section = "" body_classes.append('no-toc') else: toc_section = f"""

    Table of contents

    {global_toc}
    """ return f""" {title} @ {SITE_NAME} {toc_section} {html} """ def save_changes(doc): assert doc.path is not None with open(doc.path, "wt") as f: dump_org(doc, f) if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: {} SOURCE_TOP DEST_TOP".format(sys.argv[0])) exit(0) logging.basicConfig(level=logging.INFO, format="%(levelname)-8s %(message)s") exit(main(sys.argv[1], sys.argv[2]))