import asyncio import math import textwrap import typing import urllib.parse from typing import Optional import jinja2 import tld from asyncer import asyncify from loguru import logger from rl_string_helper import RLStringHelper, parse_markups, split_overlapping_ranges from . import jinja_env from .exceptions import InvalidMediumPostID, InvalidMediumPostURL, InvalidURL, MediumParserException, MediumPostQueryError from .api import MediumApi from .models.html_result import HtmlResult from .time import convert_datetime_to_human_readable from .utils import correct_url, extract_hex_string, getting_percontage_of_match, is_has_valid_medium_post_id, is_valid_medium_url, is_valid_url, resolve_medium_url if typing.TYPE_CHECKING: from database_lib import AbstractCacheBackend class MediumParser: __slots__ = ("cache", "host_address", "jinja_template", "post_template", "timeout", "medium_api") def __init__(self, cache: "AbstractCacheBackend", medium_api: MediumApi, timeout: int, host_address: str, template_folder: str = "./templates"): self.timeout: int = timeout self.cache: AbstractCacheBackend = cache self.host_address: str = host_address self.jinja_template: jinja2.Environment = jinja2.Environment(loader=jinja2.FileSystemLoader(template_folder)) self.post_template: jinja2.Template = self.jinja_template.get_template("post.html") self.medium_api: MediumApi = medium_api async def resolve(self, unknown: str) -> str: logger.debug(f"We got some unknown data: {unknown=}. Trying resolve them...///") if is_has_valid_medium_post_id(unknown): logger.debug("Seems like it's valid post_id") return extract_hex_string(unknown) logger.debug("...maybe it's URL. Let's checkout...") post_id = await self.resolve_url(unknown) return post_id async def resolve_url(self, url: str) -> str: sanitized_url = correct_url(url) if not is_valid_url(url) or not await is_valid_medium_url(sanitized_url): raise InvalidURL(f"Invalid Medium URL: {sanitized_url}") post_id = await resolve_medium_url(sanitized_url, self.timeout) if not post_id: raise InvalidMediumPostURL(f"Could not find Medium post ID for URL: {sanitized_url}") return post_id async def delete_from_cache(self, post_id: str): self.cache.delete(post_id) return True async def get_post_data_from_cache(self, post_id: str): async def _get_from_cache(): logger.debug("Using cache backend") post_data = self.cache.pull(post_id) if post_data: logger.debug("post query was found on cache") return post_data.json() logger.debug(f"No data found in cache by {post_id}") return None try: return await asyncio.wait_for(_get_from_cache(), timeout=self.timeout) except asyncio.TimeoutError: logger.debug("Timeout while waiting for cache") return None except Exception as e: logger.error(f"Error while waiting for cache: {e}") return None async def get_post_data_from_api(self, post_id: str): async def _get_from_api(): logger.debug("Using API to gather post data") try: return await self.medium_api.query_post_by_id(post_id) except Exception as ex: logger.debug("Error while querying post data from Medium API") logger.exception(ex) return None try: return await asyncio.wait_for(_get_from_api(), timeout=self.timeout) except asyncio.TimeoutError: logger.debug("Timeout while waiting for cache") return None except Exception as e: logger.error(f"Error while waiting for cache: {e}") return None async def query_get(self, post_id: str, use_cache: bool, force_cache: bool = False): cache_used = True post_data = await self.get_post_data_from_cache(post_id) if use_cache else None if not post_data and not force_cache: logger.debug("Getting value from cache failed, using API") cache_used = False post_data = await self.get_post_data_from_api(post_id) return post_data, cache_used async def query(self, post_id: str, use_cache: bool = True, retry: int = 2, force_cache: bool = False): logger.debug(f"Medium QUERY: {use_cache=}, {retry=}, {force_cache=}") post_data, is_cache_used = None, False attempt = 0 reason = None while not post_data and attempt < retry: try: post_data, is_cache_used = await self.query_get(post_id, use_cache, force_cache) if not post_data: reason = "No post data returned" elif not isinstance(post_data, dict): reason = f"Post data is not a dictionary: {post_data=}" elif post_data.get("error"): reason = f"Post data contains an error: {post_data=}" elif not post_data.get("data"): reason = f"Post data missing 'data' key: {post_data=}" elif not post_data.get("data", {}).get("post"): reason = f"Post data missing 'data.post' key: {post_data=}" if reason is None: logger.debug("Post data was successfully queried") break except Exception as e: logger.error(f"Attempt {attempt + 1} failed with exception: {e}") logger.debug(f"Retrying in {2 ** attempt} seconds...") await asyncio.sleep(2**attempt) finally: attempt += 1 else: if not reason: reason = "Unknown" raise MediumPostQueryError(f"Could not query post by ID from API: {post_id}. Reason: {reason}") if not is_cache_used: logger.debug("Pushing post data to cache") self.cache.push(post_id, post_data) logger.trace(f"Query: done") return post_data def _parse_and_render_content_html_post(self, content: dict, title: str, subtitle: str, preview_image_id: str, highlights: list, tags: list) -> tuple[list, str, str]: paragraphs = content["bodyModel"]["paragraphs"] tags_list = [tag["displayTitle"] for tag in tags] out_paragraphs: list[str] = [] current_pos = 0 def parse_paragraph_text(text: str, markups: list, is_code: bool = False) -> str: if is_code: quote_html_type = ["minimal"] else: quote_html_type = ["full"] text_formater = RLStringHelper(text, quote_html_type=quote_html_type) parsed_markups = parse_markups(markups) fixed_markups = split_overlapping_ranges(parsed_markups) for markup in fixed_markups: text_formater.set_template(markup["start"], markup["end"], markup["template"]) return text_formater while len(paragraphs) > current_pos: paragraph = paragraphs[current_pos] logger.trace(f"Current paragraph #{current_pos} data: {paragraph}") # For debugging stuff... # if paragraph["id"] != "": # current_pos += 1 # continue if current_pos in range(4): if paragraph["type"] in ["H3", "H4", "H2"]: if getting_percontage_of_match(paragraph["text"], title) > 80: if title.endswith("…"): logger.trace("Title was detected, replace...") title = paragraph["text"] else: logger.trace("Title was detected, ignore...") current_pos += 1 continue if paragraph["type"] in ["H4"]: if paragraph["text"] in tags_list: logger.trace("Tag was detected, ignore...") current_pos += 1 continue if paragraph["type"] in ["H4", "P"]: is_paragraph_subtitle = getting_percontage_of_match(paragraph["text"], subtitle) > 80 if is_paragraph_subtitle and not subtitle.endswith("…"): logger.trace("Subtitle was detected, ignore...") subtitle = paragraph["text"] current_pos += 1 continue elif subtitle and subtitle.endswith("…") and len(paragraph["text"]) > 100: subtitle = "" elif paragraph["type"] == "IMG": if paragraph["metadata"] and paragraph["metadata"]["id"] == preview_image_id: logger.trace("Preview image was detected, ignore...") current_pos += 1 continue if paragraph["text"] is not None: text_formater = parse_paragraph_text(paragraph["text"], paragraph["markups"]) else: text_formater = None for highlight in highlights: for highlight_paragraph in highlight["paragraphs"]: if highlight_paragraph["name"] == paragraph["name"]: logger.trace("Apply highlight to this paragraph") if highlight_paragraph["text"] != text_formater.get_text(): logger.warning("Highlighted text and paragraph text are not the same! Skip...") break quote_markup_template = '{{ text }}' text_formater.set_template( highlight["startOffset"], highlight["endOffset"], quote_markup_template, ) break if paragraph["type"] == "H2": css_class = [] if out_paragraphs: css_class.append("pt-12") header_template = jinja_env.from_string('
{{ text }}
') if paragraphs[current_pos - 1]["type"] in ["H4", "H3"]: css_class.append("mt-3") else: css_class.append("mt-7") paragraph_template_rendered = paragraph_template.render(text=text_formater.get_text(), css_class=" ".join(css_class)) out_paragraphs.append(paragraph_template_rendered) elif paragraph["type"] == "ULI": uli_template = jinja_env.from_string('{{code_block}}')
code_block_template = jinja_env.from_string('{{ text }}')
code_css_class = []
if paragraph["codeBlockMetadata"] and paragraph["codeBlockMetadata"]["lang"] is not None:
code_css_class.append(f'language-{paragraph["codeBlockMetadata"]["lang"]}')
else:
code_css_class.append("nohighlight")
# code_css_class.append("auto")
code_list = []
_tmp_current_pos = current_pos
while len(paragraphs) > _tmp_current_pos:
_paragraph = paragraphs[_tmp_current_pos]
if _paragraph["type"] == "PRE":
text_formater = parse_paragraph_text(_paragraph["text"], _paragraph["markups"], is_code=True)
code_list.append(text_formater.get_text())
else:
break
_tmp_current_pos += 1
code_block_template_rendered = code_block_template.render(text="\n".join(code_list), code_css_class=" ".join(code_css_class))
pre_template_rendered = pre_template.render(code_block=code_block_template_rendered)
out_paragraphs.append(pre_template_rendered)
current_pos = _tmp_current_pos - 1
elif paragraph["type"] == "BQ":
bq_template = jinja_env.from_string(
'' ) bq_template_rendered = bq_template.render(text=text_formater.get_text()) logger.trace(bq_template_rendered) out_paragraphs.append(bq_template_rendered) elif paragraph["type"] == "PQ": pq_template = jinja_env.from_string('{{ text }}
') pq_template_rendered = pq_template.render(text=text_formater.get_text()) logger.trace(pq_template_rendered) out_paragraphs.append(pq_template_rendered) elif paragraph["type"] == "MIXTAPE_EMBED": # TODO: redirect all Medium embeding articles to Fredium embed_template = jinja_env.from_string( '' ) if paragraph.get("mixtapeMetadata") is not None: url = paragraph["mixtapeMetadata"]["href"] else: logger.warning("Ignore MIXTAPE_EMBED paragraph type, since we can't get url") current_pos += 1 continue text_raw = paragraph["text"] if len(paragraph["markups"]) != 3: logger.warning("Ignore MIXTAPE_EMBED paragraph type, since we can't split text") current_pos += 1 continue title_range = paragraph["markups"][1] description_range = paragraph["markups"][2] logger.trace(f"{title_range=}") logger.trace(f"{description_range=}") embed_title = text_raw[title_range["start"] : title_range["end"]] embed_description = text_raw[description_range["start"] : description_range["end"]] logger.trace(f"{embed_title=}") logger.trace(f"{embed_description=}") try: embed_site = tld.get_fld(url) except Exception as ex: logger.warning(f"Can't get embed site fld: {ex}. Using custom logic...") parsed_url = urllib.parse.urlparse(url) embed_site = parsed_url.hostname logger.trace(f"{embed_site=}") embed_template_rendered = embed_template.render(paragraph=paragraph, url=url, embed_title=embed_title, embed_description=embed_description, embed_site=embed_site) out_paragraphs.append(embed_template_rendered) elif paragraph["type"] == "IFRAME": iframe_template = jinja_env.from_string( '' ) iframe_template_rendered = iframe_template.render(host_address=self.host_address, iframe_id=paragraph["iframe"]["mediaResource"]["id"]) out_paragraphs.append(iframe_template_rendered) else: logger.error(f"Unknown {paragraph['type']}: {paragraph}") current_pos += 1 return out_paragraphs, title, subtitle async def render_as_html(self, post_id: str): post_data = await self.query(post_id) try: result = await self._render_as_html(post_data, post_id) except Exception as ex: raise ex else: return result async def generate_metadata(self, post_data: dict, post_id: str, as_dict: bool = False) -> tuple: title = RLStringHelper(post_data["data"]["post"]["title"], ["minimal"]).get_text() subtitle = RLStringHelper(post_data["data"]["post"]["previewContent"]["subtitle"]).get_text() description = RLStringHelper(textwrap.shorten(subtitle, width=100, placeholder="...")).get_text() preview_image_id = post_data["data"]["post"]["previewImage"]["id"] creator = post_data["data"]["post"]["creator"] collection = post_data["data"]["post"]["collection"] url = post_data["data"]["post"]["mediumUrl"] reading_time = math.ceil(post_data["data"]["post"]["readingTime"]) free_access = "No" if post_data["data"]["post"]["isLocked"] else "Yes" updated_at = convert_datetime_to_human_readable(post_data["data"]["post"]["updatedAt"]) first_published_at = convert_datetime_to_human_readable(post_data["data"]["post"]["firstPublishedAt"]) tags = post_data["data"]["post"]["tags"] if as_dict: return { "post_id": post_id, "title": title, "subtitle": subtitle, "description": description, "url": url, "creator": creator, "collection": collection, "reading_time": reading_time, "free_access": free_access, "updated_at": updated_at, "first_published_at": first_published_at, "preview_image_id": preview_image_id, "tags": tags, } return title, subtitle, description, url, creator, collection, reading_time, free_access, updated_at, first_published_at, preview_image_id, tags async def _render_as_html(self, post_data: dict, post_id: str) -> "HtmlResult": # Generate metadata in parallel metadata_task = asyncio.create_task(self.generate_metadata(post_data, post_id)) # Parse and render content in parallel content, title, subtitle = await asyncify(self._parse_and_render_content_html_post)( post_data["data"]["post"]["content"], post_data["data"]["post"]["title"], post_data["data"]["post"]["previewContent"]["subtitle"], post_data["data"]["post"]["previewImage"]["id"], post_data["data"]["post"]["highlights"], post_data["data"]["post"]["tags"], ) # Await metadata title, subtitle, description, url, creator, collection, reading_time, free_access, updated_at, first_published_at, preview_image_id, tags = await metadata_task post_page_title_raw = "{{ title }} | by {{ creator.name }}" if collection: post_page_title_raw += " | in {{ collection.name }}" post_page_title = jinja_env.from_string(post_page_title_raw) post_page_title_rendered = post_page_title.render(title=title, creator=creator, collection=collection) post_context = { "subtitle": subtitle, "title": title, "url": url, "creator": creator, "collection": collection, "readingTime": reading_time, "freeAccess": free_access, "updatedAt": updated_at, "firstPublishedAt": first_published_at, "previewImageId": preview_image_id, "content": content, "tags": tags, } post_template_rendered = self.post_template.render(post_context) return HtmlResult(post_page_title_rendered, description, url, post_template_rendered) async def render_as_markdown(self) -> str: raise NotImplementedError("Markdown rendering is not implemented. Please use HTML rendering instead"){{ text }}