#!/usr/bin/env python3 import sqlite3 import time import json import html import logging import os import sys import uuid from datetime import datetime import traceback import re from itertools import chain import inotify.adapters import org_rw from org_rw import OrgTime, dom, Link from org_rw import dump as dump_org from org_rw import load as load_org from org_rw import token_list_to_raw import pygments import pygments.lexers import pygments.formatters # Set custom states for state in ("NEXT", "MEETING", "Q", "PAUSED", "SOMETIME", "TRACK", "WAITING"): org_rw.DEFAULT_TODO_KEYWORDS.append(state) for state in ("DISCARDED", "VALIDATING"): org_rw.DEFAULT_DONE_KEYWORDS.append(state) EXTENSIONS = [ ".org", ".org.txt", ] WATCH = True if os.getenv('WATCH_AND_REBUILD', '1') == '0': WATCH = False MIN_HIDDEN_HEADLINE_LEVEL = 2 INDEX_ID = "ea48ec1d-f9d4-4fb7-b39a-faa7b6e2ba95" SITE_NAME = "Código para llevar" MONITORED_EVENT_TYPES = ( 'IN_CREATE', # 'IN_MODIFY', 'IN_CLOSE_WRITE', 'IN_DELETE', 'IN_MOVED_FROM', 'IN_MOVED_TO', 'IN_DELETE_SELF', 'IN_MOVE_SELF', ) TEXT_OR_LINK_RE = re.compile(r'([^\s\[\]]+|.)') ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) STATIC_PATH = os.path.join(ROOT_DIR, 'static') class NonExistingLocalNoteError(AssertionError): def __init__(self, note_id, src_headline): AssertionError.__init__(self) self.note_id = note_id self.src_headline = src_headline def get_message(self): return ("Cannot follow link to '{}' on headline '{}' ({})" .format(self.note_id, self.src_headline.id, self.src_headline.title.get_text().strip())) def is_git_path(path): return any([chunk == ".git" for chunk in path.split(os.sep)]) def create_db(path): if os.path.exists(path): os.unlink(path) db = sqlite3.connect(path) db.execute('CREATE VIRTUAL TABLE note_search USING fts5(note_id, title, body, top_level_title, is_done, is_todo, tokenize="trigram");') return db def load_all(top_dir_relative): top = os.path.abspath(top_dir_relative) docs = [] for root, dirs, files in os.walk(top): for name in files: if ".org" not in name: continue path = os.path.join(root, name) try: doc = load_org(open(path), extra_cautious=True) docs.append(doc) except Exception as err: import traceback traceback.print_exc() print(f"== On {path}") sys.exit(1) logging.info("Collected {} files".format(len(docs))) return docs def regen_all(src_top, dest_top, *, docs=None, db=None): files_generated = 0 cur = db.cursor() cleaned_db = False try: cur.execute('DELETE FROM note_search;') cleaned_db = True except sqlite3.OperationalError as err: if WATCH: logging.warning("Error pre-cleaning DB, search won't be updated") else: raise docs = load_all(src_top) doc_to_headline_remapping = {} os.makedirs(dest_top, exist_ok=True) ## Build headline list # This includes a virtual headline for ID-referenced documents. all_headlines = [] main_headlines_by_path = {} main_headline_to_docid = {} for doc in docs: relpath = os.path.relpath(doc.path, src_top) changed = False headlines = list(doc.getAllHeadlines()) related = None if not relpath.startswith("public/"): # print("Skip:", relpath) continue i = len(headlines) while i > 0: i -= 1 headline = headlines[i] if headline.title.get_text().strip().lower() == "related" and headline.depth == 1: if related is not None: print( "Found duplicated related: {} vs {}".format( related.id, headline.id ) ) assert related is None related = headline headlines.pop(i) for headline in headlines: if headline.id is None: headline.id = str(uuid.uuid4()) changed = True if changed: print("Updated", relpath) save_changes(doc) all_headlines.extend(headlines) main_headline = None topHeadlines = doc.getTopHeadlines() if ((len(topHeadlines) == 1 and related is None) or (len(topHeadlines) == 2 and related is not None)): main_headline = [h for h in topHeadlines if h != related][0] main_headlines_by_path[doc.path] = main_headline if doc.id is not None: doc_to_headline_remapping['id:' + doc.id] = 'id:' + main_headline.id main_headline_to_docid[main_headline.id] = doc.id files_generated += 1 elif doc.id is not None: logging.error("Cannot render document from id: {}. {} headlines {} related".format( relpath, len(topHeadlines), 'with' if related is not None else 'without' )) # Build graph graph = {} backlink_graph = {} for headline in all_headlines: links = [] headline_links = list(headline.get_links()) if headline == main_headline and related is not None: headline_links.extend(list(related.get_links())) for l in headline_links: if l.value.startswith('http://') or l.value.startswith('https://'): pass # Ignore for now, external URL elif l.value.startswith('id:'): links.append({'target': l.value}) elif l.value.startswith('attachment:'): pass # Ignore, attachment elif l.value.startswith('file:'): pass # Ignore, attachment elif l.value.startswith('notmuch:'): pass # Ignore, mail elif l.value.startswith('orgit-rev:'): pass # Ignore, mail elif l.value.startswith('*'): pass # Ignore, internal elif not ':' in l.value.split()[0]: pass # Ignore, internal elif l.value.startswith('./'): pass # TODO: Properly handle else: logging.warning('On document {}, unknown link to {}'.format(doc.path, l.value)) if headline.parent: if isinstance(headline.parent, org_rw.Headline): links.append({ "target": headline.parent.id, "relation": "in" }) for backlink in links: if 'relation' in backlink and backlink['relation'] == 'in': continue target = backlink['target'] if target.startswith('id:'): target = target[len('id:'):] if target not in backlink_graph: backlink_graph[target] = set() backlink_graph[target].add(headline.id) graph[headline.id] = { "title": org_rw.org_rw.token_list_to_plaintext(headline.title.contents).strip(), "links": links, "depth": headline.depth, } if headline.id in main_headline_to_docid: graph[main_headline_to_docid[headline.id]] = graph[headline.id] topLevelHeadline = headline while isinstance(topLevelHeadline.parent, org_rw.Headline): topLevelHeadline = topLevelHeadline.parent # Save for full-text-search cur.execute('''INSERT INTO note_search(note_id, title, body, top_level_title, is_done, is_todo) VALUES (?, ?, ?, ?, ?, ?);''', ( headline.id, headline.title.get_text(), '\n'.join(headline.doc.dump_headline(headline, recursive=False)), topLevelHeadline.title.get_text(), headline.is_done, headline.is_todo, )) # Update graph, replace document ids with headline ids for headline_data in graph.values(): for link in headline_data['links']: if link['target'] in doc_to_headline_remapping: link['target'] = doc_to_headline_remapping[link['target']] # Remap document ids backlinks to main headlines for doc_id, main_headline_id in doc_to_headline_remapping.items(): if doc_id.startswith('id:'): doc_id = doc_id[len('id:'):] if main_headline_id.startswith('id:'): main_headline_id = main_headline_id[len('id:'):] for backlink in backlink_graph.get(doc_id, []): if main_headline_id not in backlink_graph: backlink_graph[main_headline_id] = set() backlink_graph[main_headline_id].add(backlink) # Render docs after we've built the graph # Render main headlines full_graph_info = { "nodes": graph, "backlinks": backlink_graph, "main_headlines": main_headlines_by_path } for _docpath, main_headline in main_headlines_by_path.items(): if main_headline.doc.id: endpath = os.path.join(dest_top, main_headline.doc.id + ".node.html") with open(endpath, "wt") as f: f.write(render_as_document(main_headline, main_headline.doc, headlineLevel=0, graph=full_graph_info, title=org_rw.token_list_to_plaintext(main_headline.title.contents))) # Render all headlines for headline in all_headlines: endpath = os.path.join(dest_top, headline.id + ".node.html") # Render HTML with open(endpath, "wt") as f: f.write(render_as_document(headline, headline.doc, headlineLevel=0, graph=full_graph_info, title=org_rw.token_list_to_plaintext(headline.title.contents))) files_generated += 1 if headline.id == INDEX_ID: index_endpath = os.path.join(dest_top, "index.html") with open(index_endpath, "wt") as f: f.write(render_as_document(headline, headline.doc, headlineLevel=0, graph=full_graph_info, title=org_rw.token_list_to_plaintext(headline.title.contents))) files_generated += 1 # Output graph files graphpath = os.path.join(dest_top, "graph.json") graph_explorer_path = os.path.join(dest_top, "graph.html") with open(graphpath, "wt") as f: json.dump(obj=graph, fp=f, indent=2) graph_explorer_path = os.path.join(dest_top, "graph.html") with open(graph_explorer_path, 'wt') as f: with open(os.path.join(os.path.dirname(os.path.abspath(dest_top)), '..', 'static', 'graph_explorer.html'), 'rt') as template: source = template.read() f.write(source.replace('', json.dumps(graph))) logging.info("Generated {} files".format(files_generated)) cur.close() db.commit() def main(src_top, dest_top): notifier = inotify.adapters.InotifyTrees([src_top, STATIC_PATH]) ## Initial load t0 = time.time() os.makedirs(dest_top, exist_ok=True) db = create_db(os.path.join(dest_top, 'db.sqlite3')) docs = regen_all(src_top, dest_top, db=db) if not WATCH: logging.info("Build completed in {:.2f}s".format(time.time() - t0)) return 0 logging.info("Initial load completed in {:.2f}s".format(time.time() - t0)) ## Updating for event in notifier.event_gen(yield_nones=False): (ev, types, directory, file) = event if not any([type in MONITORED_EVENT_TYPES for type in types]): continue if is_git_path(directory): continue filepath = os.path.join(directory, file) print("CHANGED: {}".format(filepath)) t0 = time.time() try: docs = regen_all(src_top, dest_top, docs=docs, db=db) except: logging.error(traceback.format_exc()) logging.error("Loading new templates failed 😿") continue logging.info("Updated all in {:.2f}s".format(time.time() - t0)) def get_headline_with_name(target_name, doc): target_name = target_name.strip() for headline in doc.getAllHeadlines(): if headline.title.get_text().strip() == target_name: return headline return None def assert_id_exists(id, src_headline, graph): if id not in graph["nodes"]: raise NonExistingLocalNoteError(id, src_headline) def print_tree(tree, indentation=0, headline=None): # if headline and headline.id != INDEX_ID: # return return for element in tree: if "children" in dir(element): if len(element.children) > 0: print_element(element.children, indentation + 1, headline) print() elif "content" in dir(element): for content in element.content: print_element(content, indentation + 1, headline) def print_element(element, indentation, headline): if isinstance(element, org_rw.Link): print(" " * indentation, "Link:", element.get_raw()) elif isinstance(element, str): print(" " * indentation, "{" + element + "}", type(element)) else: print_tree(element, indentation, headline) def render_property_drawer(element, acc, headline, graph): pass def render_logbook_drawer(element, acc, headline, graph): pass def render_property_node(element, acc, headline, graph): pass def render_list_group(element, acc, headline, graph): acc.append("
'.format(_class))
if is_code:
acc.append('')
# Remove indentation common to all lines
base_indentation = min([
len(l) - len(l.lstrip(' '))
for l in content.split('\n')
if len(l.strip()) > 0
])
content_lines = [
l[base_indentation:]
for l in content.split('\n')
]
acc.append('\n'.join(content_lines))
if is_code:
acc.append('
')
acc.append('
')
def render_code_block(element, acc, headline, graph):
code = element.lines
if element.arguments is not None and len(element.arguments) > 0 :
try:
lexer = pygments.lexers.get_lexer_by_name(element.arguments.split()[0], stripall=True)
content = pygments.highlight(code,
lexer,
pygments.formatters.HtmlFormatter()
)
acc.append(content)
return
except pygments.util.ClassNotFound:
pass
logging.error("Cannot find lexer for {}".format(element.subtype.lower()))
content = html.escape(code)
render_block(content, acc, _class='code ' + element.subtype.lower(), is_code=True)
def render_results_block(element, acc, headline, graph):
items = [e.get_raw() for e in element.children]
content = '\n'.join(items)
if len(content.strip()) > 0:
render_block(content, acc, _class='results lang-text', is_code=False)
def render_org_text(element, acc, headline, graph):
as_dom = org_rw.text_to_dom(element.contents, element)
render_text_tokens(as_dom, acc, headline, graph)
def render_text(element, acc, headline, graph):
acc.append('') if isinstance(tokens, org_rw.Text): tokens = tokens.contents for chunk in tokens: if isinstance(chunk, str): lines = chunk.split('\n\n') contents = [] for line in lines: line_chunks = [] for word in TEXT_OR_LINK_RE.findall(line): if '://' in word and not (word.startswith('org-protocol://')): if not (word.startswith('http://') or word.startswith('https://') or word.startswith('ftp://') or word.startswith('ftps://') ): logging.warning('Is this a link? {} (on {})\nLine: {}\nChunks: {}'.format(word, headline.doc.path, line, line_chunks)) line_chunks.append(html.escape(word)) else: line_chunks.append('{description}' .format(url=word, description=html.escape(word))) else: line_chunks.append(html.escape(word)) contents.append(''.join(line_chunks)) acc.append('{}'.format('
'.join(contents))) elif isinstance(chunk, Link): link_target = chunk.value is_internal_link = True description = chunk.description if description is None: description = chunk.value try: if link_target.startswith('id:'): assert_id_exists(link_target[3:], headline, graph) link_target = './' + link_target[3:] + '.node.html' elif link_target.startswith('./') or link_target.startswith('../'): if '::' in link_target: logging.warning('Not implemented headline links to other files. Used on {}'.format(link_target)) else: target_path = os.path.abspath(os.path.join(os.path.dirname(headline.doc.path), link_target)) if target_path not in graph['main_headlines']: logging.warning('Link to doc not in graph: {}'.format(target_path)) else: assert_id_exists(graph['main_headlines'][target_path].id, headline, graph) link_target = './' + graph['main_headlines'][target_path].id + '.node.html' elif link_target.startswith('attachment:'): logging.warning('Not implemented `attachment:` links. Used on {}'.format(link_target)) elif link_target.startswith('* '): target_headline = get_headline_with_name(link_target.lstrip('* '), headline.doc) if target_headline is None: logging.warning('No headline found corresponding to {}. On file {}'.format(link_target, headline.doc.path)) else: assert_id_exists(target_headline.id, headline, graph) link_target = './' + target_headline.id + '.node.html' else: is_internal_link = False if not ( link_target.startswith('https://') or link_target.startswith('http://') or link_target.startswith('/') ): raise NotImplementedError('Unknown link type: {}' .format(link_target)) acc.append('{}'.format( html.escape(link_target), 'internal' if is_internal_link else 'external', html.escape(description), )) except NonExistingLocalNoteError as err: logging.warning(err.get_message()) acc.append(html.escape(description)) # else: # raise NotImplementedError('TextToken: {}'.format(chunk)) acc.append('
') def render_tag(element, acc, headline, graph): return { dom.PropertyDrawerNode: render_property_drawer, dom.LogbookDrawerNode: render_logbook_drawer, dom.PropertyNode: render_property_node, dom.ListGroupNode: render_list_group, dom.ListItem: render_list_item, dom.TableNode: render_table, dom.TableSeparatorRow: render_table_separator_row, dom.TableRow: render_table_row, dom.CodeBlock: render_code_block, dom.Text: render_text, dom.ResultsDrawerNode: render_results_block, org_rw.Text: render_org_text, }[type(element)](element, acc, headline, graph) def render_tree(tree, acc, headline, graph): for element in tree: render_tag(element, acc, headline, graph) def render_inline(tree, f, headline, graph): acc = [] f(tree, acc, headline, graph) return ''.join(acc) def render_as_document(headline, doc, headlineLevel, graph, title): if isinstance(headline.parent, org_rw.Headline): topLevelHeadline = headline.parent while isinstance(topLevelHeadline.parent, org_rw.Headline): topLevelHeadline = topLevelHeadline.parent return f"""