Skip to content

Commit

Permalink
split importer in two separated classes (one for each pass)
Browse files Browse the repository at this point in the history
  • Loading branch information
marph91 committed Dec 3, 2024
1 parent c58d5e8 commit 5d3aba3
Show file tree
Hide file tree
Showing 3 changed files with 255 additions and 234 deletions.
346 changes: 129 additions & 217 deletions src/importer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
"""Convert the intermediate format to Markdown."""

import dataclasses
import logging
from pathlib import Path
import re
import shutil
import urllib.parse

import frontmatter

import common
import intermediate_format as imf

Expand All @@ -29,39 +25,13 @@ def get_quoted_relative_path(source: Path, target: Path) -> str:
return urllib.parse.quote(str(target.relative_to(source, walk_up=True)))


OBSIDIAN_TAG_REGEX = re.compile(r"[^\w/_-]", re.UNICODE)


def normalize_obsidian_tag(tag: str) -> str:
class PathDeterminer:
"""
tag format: https://help.obsidian.md/Editing+and+formatting/Tags#Tag+format
>>> normalize_obsidian_tag("nested/tag")
'nested/tag'
>>> normalize_obsidian_tag("kebab-case")
'kebab-case'
>>> normalize_obsidian_tag("snake_case")
'snake_case'
>>> normalize_obsidian_tag("grüße-cześć-привет-你好")
'grüße-cześć-привет-你好'
>>> normalize_obsidian_tag("mul & tip...le")
'mul___tip___le'
>>> normalize_obsidian_tag("1984")
'1984_'
>>> normalize_obsidian_tag("y1984")
'y1984'
Determine the final paths of notebooks, notes and resources.
Create a note ID - path map for linking notes in the next pass.
"""
valid_char_tag = OBSIDIAN_TAG_REGEX.sub("_", tag)
if valid_char_tag.isdigit():
valid_char_tag += "_"
return valid_char_tag


class FilesystemImporter:
"""Import notebooks, notes and related data to the filesystem."""

def __init__(self, progress_bars, config):
self.frontmatter = config.frontmatter
def __init__(self, config):
self.root_path = None
self.global_resource_folder = (
None
Expand All @@ -80,209 +50,151 @@ def __init__(self, progress_bars, config):
)
# reference id - path (new id)
self.note_id_map: dict[str, Path] = {}
self.progress_bars = progress_bars

def import_resources(self, note: imf.Note):
# pylint: disable=too-many-branches
# TODO
assert note.path is not None
def determine_resource_path(self, note: imf.Note, resource: imf.Resource):
# determine new resource path
assert self.root_path is not None
for resource in note.resources:
self.progress_bars["resources"].update(1)
resource_title = (
resource.title
if resource.title not in [None, ""]
else resource.filename.name
)

# determine new resource path
if self.global_resource_folder is None:
if self.local_image_folder is not None and resource.is_image:
target_folder = self.local_image_folder
else:
target_folder = self.local_resource_folder
# local resources (next to the markdown files)
resource_folder = note.path.parent / target_folder
# TODO: done for each resource in each note
if target_folder != Path("."):
resource_folder.mkdir(
exist_ok=True, parents=len(target_folder.parts) > 1
)
resource.path = resource_folder / common.safe_path(
resource.filename.name
)
else:
# global resource folder
resource.path = (
self.root_path
/ self.global_resource_folder
/ common.safe_path(resource.filename.name)
)
# add extension if possible
assert resource.path is not None
if resource.path.suffix == "":
if (
resource.title is not None
and (suffix := Path(resource.title).suffix) != ""
):
resource.path = resource.path.with_suffix(suffix)
else:
guessed_suffix = common.guess_suffix(resource.filename)
resource.path = resource.path.with_suffix(guessed_suffix)

if resource.filename.is_file():
# Don't create multiple resources for the same file.
# Cache the original file paths and their corresponding ID.
if not resource.path.is_file():
shutil.copy(resource.filename, resource.path)
assert note.path is not None
if self.global_resource_folder is None:
if self.local_image_folder is not None and resource.is_image:
target_folder = self.local_image_folder
else:
LOGGER.warning(f'Resource "{resource.filename}" does not exist.')
relative_path = get_quoted_relative_path(note.path.parent, resource.path)
resource_markdown = (
f"{'!' * resource.is_image}[{resource_title}]({relative_path})"
target_folder = self.local_resource_folder
# local resources (next to the markdown files)
resource_folder = note.path.parent / target_folder
resource.path = resource_folder / common.safe_path(resource.filename.name)
else:
# global resource folder
resource.path = (
self.root_path
/ self.global_resource_folder
/ common.safe_path(resource.filename.name)
)
if resource.original_text is None:
# append
note.body = f"{note.body}\n\n{resource_markdown}"
# add extension if possible
assert resource.path is not None
if resource.path.suffix == "":
if (
resource.title is not None
and (suffix := Path(resource.title).suffix) != ""
):
resource.path = resource.path.with_suffix(suffix)
else:
# replace existing link
note.body = note.body.replace(resource.original_text, resource_markdown)
guessed_suffix = common.guess_suffix(resource.filename)
resource.path = resource.path.with_suffix(guessed_suffix)

def write_note(self, note: imf.Note):
assert note.path is not None
match self.frontmatter:
case "all":
metadata = {}
for field in dataclasses.fields(imf.Note):
match field.name:
case (
"body"
| "resources"
| "note_links"
| "original_id"
| "path"
):
continue # included elsewhere or no metadata
case "tags":
metadata["tags"] = [tag.title for tag in note.tags]
case _:
if (value := getattr(note, field.name)) is not None:
metadata[field.name] = value
post = frontmatter.Post(note.body, **metadata)
frontmatter.dump(post, note.path)
case "joplin":
# https://joplinapp.org/help/dev/spec/interop_with_frontmatter/
# Arbitrary metadata will be ignored.
metadata = {}
if note.tags:
# Convert the tags to lower case before the import to avoid issues
# with special first characters.
# See: https://github.com/laurent22/joplin/issues/11179
metadata["tags"] = [tag.title.lower() for tag in note.tags]
supported_keys = [
"title",
"created",
"updated",
"author",
"latitude",
"longitude",
"altitude",
]
for key in supported_keys:
if (value := getattr(note, key)) is not None:
metadata[key] = value
post = frontmatter.Post(note.body, **metadata)
frontmatter.dump(post, note.path)
case "obsidian":
# frontmatter format:
# https://help.obsidian.md/Editing+and+formatting/Properties#Property+format
metadata = {}
if note.tags:
metadata["tags"] = [
normalize_obsidian_tag(tag.title) for tag in note.tags
]
post = frontmatter.Post(note.body, **metadata)
frontmatter.dump(post, note.path)
else:
note.path.write_text(note.body, encoding="utf-8")
case "qownnotes":
# space separated tags, as supported by:
# - https://github.com/qownnotes/scripts/tree/master/epsilon-notes-tags
# - https://github.com/qownnotes/scripts/tree/master/yaml-nested-tags
if note.tags:
post = frontmatter.Post(
note.body, tags=" ".join([tag.title for tag in note.tags])
)
frontmatter.dump(post, note.path)
else:
note.path.write_text(note.body, encoding="utf-8")
case _:
note.path.write_text(note.body, encoding="utf-8")

@common.catch_all_exceptions
def import_note(self, note: imf.Note):
assert note.path is not None
self.progress_bars["notes"].update(1)

# Handle resources first, since the note body changes.
self.import_resources(note)

# needed to properly link notes later
self.note_id_map[note.reference_id] = note.path

if len(note.tags) > 0:
self.progress_bars["tags"].update(len(note.tags))
if not self.frontmatter:
LOGGER.warning(
f'Tags of note "{note.title}" will be lost without frontmatter. '
'You can add frontmatter by using "--frontmatter all".'
)
self.write_note(note)

def import_notebook(self, notebook: imf.Notebook):
def determine_paths(self, notebook: imf.Notebook):
assert notebook.path is not None
self.progress_bars["notebooks"].update(1)
if self.root_path is None:
self.root_path = notebook.path
if self.global_resource_folder is not None:
(self.root_path / self.global_resource_folder).mkdir(
exist_ok=True, parents=True
)
notebook.path.mkdir(exist_ok=True, parents=True) # TODO
for note in notebook.child_notes:
note.path = notebook.path / common.safe_path(note.title)
# Don't overwrite existing suffices.
if note.path.suffix != ".md":
note.path = note.path.with_suffix(note.path.suffix + ".md")
self.import_note(note)
# needed to properly link notes later
self.note_id_map[note.reference_id] = note.path

for resource in note.resources:
self.determine_resource_path(note, resource)
for child_notebook in notebook.child_notebooks:
child_notebook.path = notebook.path / common.safe_path(child_notebook.title)
self.import_notebook(child_notebook)
self.determine_paths(child_notebook)


def update_note_links(self, note: imf.Note):
class FilesystemImporter:
"""Import notebooks, notes and related data to the filesystem."""

def __init__(self, progress_bars, config, stats, note_id_map):
self.include_title = config.title_as_header
self.frontmatter = config.frontmatter
self.progress_bars = progress_bars
self.note_id_map: dict[str, Path] = note_id_map

if stats.tags > 0 and not self.frontmatter:
LOGGER.warning(
"Tags will be lost without frontmatter. "
'Frontmatter can be added by "--frontmatter all".'
)

def update_resource_links(self, note: imf.Note, resource: imf.Resource):
"""Replace the original ID of resources with their path in the filesystem."""
assert note.path is not None
if not note.note_links or not note.body:
return # nothing to link
for note_link in note.note_links:
self.progress_bars["note_links"].update(1)
new_path = self.note_id_map.get(note_link.original_id)
if new_path is None:
LOGGER.debug(
f'Note "{note.title}": '
f'could not find linked note: "{note_link.original_text}"',
# prevent [[]] syntax titles to be handled as markup
extra={"markup": None},
)
continue
assert resource.path is not None
resource_title = (
resource.title
if resource.title not in [None, ""]
else resource.filename.name
)

relative_path = get_quoted_relative_path(note.path.parent, new_path)
note.body = note.body.replace(
note_link.original_text, f"[{note_link.title}]({relative_path})"
relative_path = get_quoted_relative_path(note.path.parent, resource.path)
resource_markdown = (
f"{'!' * resource.is_image}[{resource_title}]({relative_path})"
)
if resource.original_text is None:
# append
note.body = f"{note.body}\n\n{resource_markdown}"
else:
# replace existing link
note.body = note.body.replace(resource.original_text, resource_markdown)

def write_resource(self, resource: imf.Resource):
if resource.filename.is_file():
# Don't create multiple resources for the same file.
# Cache the original file paths and their corresponding ID.
if resource.path is not None and not resource.path.is_file():
# TODO: done for each resource in each note
resource.path.parent.mkdir(exist_ok=True, parents=True)
shutil.copy(resource.filename, resource.path)
else:
LOGGER.warning(f'Resource "{resource.filename}" does not exist.')

def update_note_links(self, note: imf.Note, note_link: imf.NoteLink):
"""Replace the original ID of notes with their path in the filesystem."""
assert note.path is not None
new_path = self.note_id_map.get(note_link.original_id)
if new_path is None:
LOGGER.debug(
f'Note "{note.title}": '
f'could not find linked note: "{note_link.original_text}"',
# prevent [[]] syntax titles to be handled as markup
extra={"markup": None},
)
self.write_note(note) # update note
return

def link_notes(self, notebook: imf.Notebook):
relative_path = get_quoted_relative_path(note.path.parent, new_path)
note.body = note.body.replace(
note_link.original_text, f"[{note_link.title}]({relative_path})"
)

def write_note(self, note: imf.Note):
self.progress_bars["notes"].update(1)
if "tags" in self.progress_bars:
self.progress_bars["tags"].update(len(note.tags))
assert note.path is not None
note.path.write_text(
note.get_finalized_body(self.include_title, self.frontmatter),
encoding="utf-8",
)

@common.catch_all_exceptions
def import_note(self, note: imf.Note):
# Handle resources and note links first, since the note body changes.
for resource in note.resources:
self.progress_bars["resources"].update(1)
self.update_resource_links(note, resource)
self.write_resource(resource)
for note_link in note.note_links:
self.progress_bars["note_links"].update(1)
self.update_note_links(note, note_link)
# Finally write the note to the filesystem.
self.write_note(note)

@common.catch_all_exceptions
def import_notebook(self, notebook: imf.Notebook):
assert notebook.path is not None
self.progress_bars["notebooks"].update(1)
notebook.path.mkdir(exist_ok=True, parents=True)
for note in notebook.child_notes:
self.update_note_links(note)
self.import_note(note)
for child_notebook in notebook.child_notebooks:
self.link_notes(child_notebook)
self.import_notebook(child_notebook)
Loading

0 comments on commit 5d3aba3

Please sign in to comment.