from __future__ import annotations import asyncio import math import textwrap import typing import urllib.parse import jinja2 import tld from asyncer import asyncify from loguru import logger from rl_string_helper import RLStringHelper, split_overlapping_ranges from . import jinja_env from .api import MediumApi from .exceptions import ( InvalidMediumPostURL, InvalidURL, MediumPostQueryError, ) from .markups import parse_markups from .models.html_result import HtmlResult from .time import convert_datetime_to_human_readable from .utils import ( correct_url, extract_hex_string, getting_percontage_of_match, is_has_valid_medium_post_id, is_valid_medium_url, is_valid_url, resolve_medium_url, ) if typing.TYPE_CHECKING: from database_lib import AbstractCacheBackend class MediumParser: __slots__ = ( "cache", "host_address", "jinja_template", "post_template", "timeout", "medium_api", ) def __init__( self, cache: AbstractCacheBackend, medium_api: MediumApi, timeout: int, host_address: str, template_folder: str = "./templates", ): self.timeout: int = timeout self.cache: AbstractCacheBackend = cache self.host_address: str = host_address self.jinja_template: jinja2.Environment = jinja2.Environment( loader=jinja2.FileSystemLoader(template_folder) ) self.post_template: jinja2.Template = self.jinja_template.get_template( "post.html" ) self.medium_api: MediumApi = medium_api async def resolve(self, unknown: str) -> str: logger.debug(f"We got some unknown data: {unknown=}. Trying resolve them...///") post_id = None try: logger.debug("...maybe it's URL. Let's checkout...") post_id = await self.resolve_url(unknown) except Exception as e: logger.exception(e) logger.error(f"Error while resolving URL: {e}") if is_has_valid_medium_post_id(unknown): logger.debug("Seems like it's valid post_id") return extract_hex_string(unknown) logger.error(f"Unknown data: {unknown}") raise e return post_id async def resolve_url(self, url: str) -> str: sanitized_url = correct_url(url) if not is_valid_url(url) or not await is_valid_medium_url(sanitized_url): raise InvalidURL(f"Invalid Medium URL: {sanitized_url}") post_id = await resolve_medium_url(sanitized_url, self.timeout) if not post_id: raise InvalidMediumPostURL( f"Could not find Medium post ID for URL: {sanitized_url}" ) return post_id async def delete_from_cache(self, post_id: str): self.cache.delete(post_id) return True async def get_post_data_from_cache(self, post_id: str): async def _get_from_cache(): logger.debug("Using cache backend") post_data = self.cache.pull(post_id) if post_data: logger.debug("post query was found on cache") parsed_data = post_data.json() if parsed_data: return parsed_data logger.debug(f"No data found in cache by {post_id}") return None try: return await asyncio.wait_for(_get_from_cache(), timeout=self.timeout) except asyncio.TimeoutError: logger.debug("Timeout while waiting for cache") return None except Exception as e: logger.exception(e) logger.error(f"Error while waiting for cache: {e}") return None async def get_post_data_from_api(self, post_id: str): async def _get_from_api(): logger.debug("Using API to gather post data") try: return await self.medium_api.query_post_by_id(post_id) except Exception as ex: logger.debug("Error while querying post data from Medium API") logger.exception(ex) return None try: return await asyncio.wait_for(_get_from_api(), timeout=self.timeout) except asyncio.TimeoutError: logger.debug("Timeout while waiting for cache") return None except Exception as e: logger.error(f"Error while waiting for cache: {e}") return None async def query_get(self, post_id: str, use_cache: bool, force_cache: bool = False): cache_used = True post_data = await self.get_post_data_from_cache(post_id) if use_cache else None if not post_data and not force_cache: logger.debug("Getting value from cache failed, using API") cache_used = False post_data = await self.get_post_data_from_api(post_id) return post_data, cache_used async def query( self, post_id: str, use_cache: bool = True, retry: int = 2, force_cache: bool = False, ): logger.debug(f"Medium QUERY: {use_cache=}, {retry=}, {force_cache=}") post_data, is_cache_used = None, False attempt = 0 reason = None while not post_data and attempt < retry: try: post_data, is_cache_used = await self.query_get( post_id, use_cache, force_cache ) if not post_data: reason = "No post data returned" elif not isinstance(post_data, dict): reason = f"Post data is not a dictionary: {post_data=}" elif post_data.get("error"): reason = f"Post data contains an error: {post_data=}" elif not post_data.get("data"): reason = f"Post data missing 'data' key: {post_data=}" elif not post_data.get("data", {}).get("post"): reason = f"Post data missing 'data.post' key: {post_data=}" if reason is None: logger.debug("Post data was successfully queried") break except Exception as e: logger.error(f"Attempt {attempt + 1} failed with exception: {e}") logger.debug(f"Retrying in {2 ** attempt} seconds...") await asyncio.sleep(2**attempt) finally: attempt += 1 else: if not reason: reason = "Unknown" raise MediumPostQueryError( f"Could not query post by ID from API: {post_id}. Reason: {reason}" ) if not is_cache_used: logger.debug("Pushing post data to cache") self.cache.push(post_id, post_data) logger.trace("Query: done") return post_data def _parse_and_render_content_html_post( self, content: dict, title: str, subtitle: str, preview_image_id: str, highlights: list, tags: list, post_data: dict, ) -> tuple[list, str, str]: paragraphs = content["bodyModel"]["paragraphs"] tags_list = [tag["displayTitle"] for tag in tags] out_paragraphs: list[str] = [] current_pos = 0 def parse_paragraph_text( text: str, markups: list, is_code: bool = False ) -> RLStringHelper: # Hotfix, workaround for code block has_code_block = any(markup["type"] == "CODE" for markup in markups) if is_code or has_code_block: quote_html_type = ["minimal"] # quote_html_type = None else: quote_html_type = ["full"] text_formater = RLStringHelper(text, quote_html_type=quote_html_type) parsed_markups = parse_markups(markups) fixed_markups = split_overlapping_ranges(parsed_markups) for markup in fixed_markups: text_formater.set_template( markup["start"], markup["end"], markup["template"] ) return text_formater while len(paragraphs) > current_pos: paragraph = paragraphs[current_pos] logger.trace(f"Current paragraph #{current_pos} data: {paragraph}") # For debugging stuff... # if paragraph["id"] != "": # current_pos += 1 # continue if current_pos in range(4): if paragraph["type"] in ["H3", "H4", "H2"]: if getting_percontage_of_match(paragraph["text"], title) > 80: if title.endswith("…"): logger.trace("Title was detected, replace...") title = paragraph["text"] else: logger.trace("Title was detected, ignore...") current_pos += 1 continue if paragraph["type"] in ["H4"]: if paragraph["text"] in tags_list: logger.trace("Tag was detected, ignore...") current_pos += 1 continue if paragraph["type"] in ["H4", "P"]: is_paragraph_subtitle = ( getting_percontage_of_match(paragraph["text"], subtitle) > 80 ) if is_paragraph_subtitle and not subtitle.endswith("…"): logger.trace("Subtitle was detected, ignore...") subtitle = paragraph["text"] current_pos += 1 continue elif ( subtitle and subtitle.endswith("…") and len(paragraph["text"]) > 100 ): subtitle = "" elif paragraph["type"] == "IMG": if ( paragraph["metadata"] and paragraph["metadata"]["id"] == preview_image_id ): logger.trace("Preview image was detected, ignore...") current_pos += 1 continue if paragraph["text"] is not None: text_formater = parse_paragraph_text( paragraph["text"], paragraph["markups"] ) else: text_formater = parse_paragraph_text("", []) for highlight in highlights: for highlight_paragraph in highlight["paragraphs"]: if highlight_paragraph["name"] == paragraph["name"]: logger.trace("Apply highlight to this paragraph") if highlight_paragraph["text"] != text_formater.get_text(): logger.warning( "Highlighted text and paragraph text are not the same! Skip..." ) break quote_markup_template = ( '{{ text }}' ) text_formater.set_template( highlight["startOffset"], highlight["endOffset"], quote_markup_template, ) break if paragraph["type"] == "H2": css_class = [] if out_paragraphs: css_class.append("pt-12") header_template = jinja_env.from_string( '

{{ text }}

' ) header_template_rendered = header_template.render( id=paragraph["name"], text=text_formater.get_text(), css_class="".join(css_class), ) out_paragraphs.append(header_template_rendered) elif paragraph["type"] == "H3": css_class = [] if out_paragraphs: css_class.append("pt-12") header_template = jinja_env.from_string( '

{{ text }}

' ) header_template_rendered = header_template.render( id=paragraph["name"], text=text_formater.get_text(), css_class="".join(css_class), ) out_paragraphs.append(header_template_rendered) elif paragraph["type"] == "H4": css_class = [] if out_paragraphs: css_class.append("pt-8") header_template = jinja_env.from_string( '

{{ text }}

' ) header_template_rendered = header_template.render( id=paragraph["name"], text=text_formater.get_text(), css_class="".join(css_class), ) out_paragraphs.append(header_template_rendered) elif paragraph["type"] == "IMG": image_template = jinja_env.from_string( '
{{ paragraph.metadata.alt }}
' ) image_caption_template = jinja_env.from_string( "
{{ text }}
" ) if paragraph["layout"] == "OUTSET_ROW": image_templates_row = [] img_row_template = jinja_env.from_string( '
{{ images }}
' ) image_template_rendered = image_template.render(paragraph=paragraph) image_templates_row.append(image_template_rendered) _tmp_current_pos = current_pos + 1 while len(paragraphs) > _tmp_current_pos: _paragraph = paragraphs[_tmp_current_pos] if _paragraph["layout"] == "OUTSET_ROW_CONTINUE": image_template_rendered = image_template.render( paragraph=_paragraph ) image_templates_row.append(image_template_rendered) else: break _tmp_current_pos += 1 img_row_template_rendered = img_row_template.render( images="".join(image_templates_row) ) out_paragraphs.append(img_row_template_rendered) current_pos = _tmp_current_pos - 1 elif paragraph["layout"] == "FULL_WIDTH": logger.warning("IMG: not implemented FULL_WIDTH layout") current_pos += 1 continue else: image_template_rendered = image_template.render(paragraph=paragraph) out_paragraphs.append(image_template_rendered) if paragraph["text"]: out_paragraphs.append( image_caption_template.render(text=text_formater.get_text()) ) elif paragraph["type"] == "P": css_class = ["leading-8"] if paragraph.get("hasDropCap", False): # не понятно как логика срабатывает, иногда как-будто две буквы идут в drop cap как здесь - https://medium.com/write-a-catalyst/trumps-gaza-proposal-a-negotiation-tactic-for-real-change-0291df856c77 css_class.extend(["first-letter:text-7xl", "first-letter:float-left", "first-letter:mr-2", "first-letter:pt-2"]) paragraph_template = jinja_env.from_string( '

{{ text }}

' ) if paragraphs[current_pos - 1]["type"] in ["H4", "H3"]: css_class.append("mt-3") else: css_class.append("mt-7") paragraph_template_rendered = paragraph_template.render( text=text_formater.get_text(), css_class=" ".join(css_class) ) out_paragraphs.append(paragraph_template_rendered) elif paragraph["type"] == "ULI": uli_template = jinja_env.from_string( '' ) li_template = jinja_env.from_string("
  • {{ text }}
  • ") li_templates = [] _tmp_current_pos = current_pos while len(paragraphs) > _tmp_current_pos: _paragraph = paragraphs[_tmp_current_pos] if _paragraph["type"] == "ULI": text_formater = parse_paragraph_text( _paragraph["text"], _paragraph["markups"] ) li_template_rendered = li_template.render( text=text_formater.get_text() ) li_templates.append(li_template_rendered) else: break _tmp_current_pos += 1 uli_template_rendered = uli_template.render(li="".join(li_templates)) out_paragraphs.append(uli_template_rendered) current_pos = _tmp_current_pos - 1 elif paragraph["type"] == "OLI": ol_template = jinja_env.from_string( '
      {{ li }}
    ' ) li_template = jinja_env.from_string("
  • {{ text }}
  • ") li_templates = [] _tmp_current_pos = current_pos while len(paragraphs) > _tmp_current_pos: _paragraph = paragraphs[_tmp_current_pos] if _paragraph["type"] == "OLI": text_formater = parse_paragraph_text( _paragraph["text"], _paragraph["markups"] ) li_template_rendered = li_template.render( text=text_formater.get_text() ) li_templates.append(li_template_rendered) else: break _tmp_current_pos += 1 ol_template_rendered = ol_template.render(li="".join(li_templates)) out_paragraphs.append(ol_template_rendered) current_pos = _tmp_current_pos - 1 elif paragraph["type"] == "PRE": pre_template = jinja_env.from_string( '
    {{code_block}}
    ' ) code_block_template = jinja_env.from_string( '{{ text }}' ) code_css_class = [] if ( paragraph["codeBlockMetadata"] and paragraph["codeBlockMetadata"]["lang"] is not None ): code_css_class.append( f'language-{paragraph["codeBlockMetadata"]["lang"]}' ) else: code_css_class.append("nohighlight") # code_css_class.append("auto") code_list = [] _tmp_current_pos = current_pos while len(paragraphs) > _tmp_current_pos: _paragraph = paragraphs[_tmp_current_pos] if _paragraph["type"] == "PRE": text_formater = parse_paragraph_text( _paragraph["text"], _paragraph["markups"], is_code=True ) code_list.append(text_formater.get_text()) else: break _tmp_current_pos += 1 code_block_template_rendered = code_block_template.render( text="\n".join(code_list), code_css_class=" ".join(code_css_class) ) pre_template_rendered = pre_template.render( code_block=code_block_template_rendered ) out_paragraphs.append(pre_template_rendered) current_pos = _tmp_current_pos - 1 elif paragraph["type"] == "BQ": bq_template = jinja_env.from_string( '

    {{ text }}

    ' ) bq_template_rendered = bq_template.render(text=text_formater.get_text()) logger.trace(bq_template_rendered) out_paragraphs.append(bq_template_rendered) elif paragraph["type"] == "PQ": pq_template = jinja_env.from_string( '

    {{ text }}

    ' ) pq_template_rendered = pq_template.render(text=text_formater.get_text()) logger.trace(pq_template_rendered) out_paragraphs.append(pq_template_rendered) elif paragraph["type"] == "MIXTAPE_EMBED": # TODO: redirect all Medium embeding articles to Fredium mixtape_embed_html = """

    {{ embed_title }}

    {{ embed_description }}

    {{ embed_site }}

    """ embed_template = jinja_env.from_string(mixtape_embed_html) if paragraph.get("mixtapeMetadata") is not None: url = paragraph["mixtapeMetadata"]["href"] else: logger.warning( "Ignore MIXTAPE_EMBED paragraph type, since we can't get url" ) current_pos += 1 continue text_raw = paragraph["text"] if len(paragraph["markups"]) != 3: logger.warning( "Ignore MIXTAPE_EMBED paragraph type, since we can't split text" ) current_pos += 1 continue title_range = paragraph["markups"][1] description_range = paragraph["markups"][2] logger.trace(f"{title_range=}") logger.trace(f"{description_range=}") embed_title = text_raw[title_range["start"] : title_range["end"]] embed_description = text_raw[ description_range["start"] : description_range["end"] ] logger.trace(f"{embed_title=}") logger.trace(f"{embed_description=}") try: embed_site = tld.get_fld(url) except Exception as ex: logger.warning( f"Can't get embed site fld: {ex}. Using custom logic..." ) parsed_url = urllib.parse.urlparse(url) embed_site = parsed_url.hostname logger.trace(f"{embed_site=}") embed_template_rendered = embed_template.render( paragraph=paragraph, url=url, embed_title=embed_title, embed_description=embed_description, embed_site=embed_site, ) out_paragraphs.append(embed_template_rendered) elif paragraph["type"] == "IFRAME": logger.debug(f"Processing IFRAME paragraph") # First check if we have direct mediaResource in the iframe media_resource = paragraph.get("iframe", {}).get("mediaResource", {}) # If mediaResource is just a reference, look it up in post_data media_resource_ref = paragraph.get("iframe", {}).get("mediaResource", {}).get("__ref") if media_resource_ref and not media_resource.get("id") and not media_resource.get("iframeSrc"): logger.debug(f"Found media resource reference: {media_resource_ref}") data_payload = post_data.get("data", {}) if media_resource_ref in data_payload: media_resource = data_payload[media_resource_ref] logger.debug(f"Found media resource for ref: {media_resource_ref}") else: logger.warning(f"Could not find media resource for ref: {media_resource_ref}") # Get iframe source from mediaResource iframe_src_val = media_resource.get("iframeSrc") iframe_id = media_resource.get("id") # Determine the source URL for the iframe src = iframe_src_val if not src and iframe_id: logger.debug(f"Using fallback iframe URL with ID: {iframe_id}") src = f"{self.host_address}/render_iframe/{iframe_id}" if not src: logger.warning("No iframe source found, skipping iframe") current_pos += 1 continue # Get iframe dimensions iframe_width = media_resource.get("iframeWidth") iframe_height = media_resource.get("iframeHeight") # If dimensions are available in paragraph.iframe directly, use those if not iframe_width and paragraph.get("iframe", {}).get("iframeWidth"): iframe_width = paragraph["iframe"]["iframeWidth"] if not iframe_height and paragraph.get("iframe", {}).get("iframeHeight"): iframe_height = paragraph["iframe"]["iframeHeight"] logger.debug(f"Iframe dimensions: {iframe_width}x{iframe_height}") # Render with aspect ratio if we have valid dimensions if iframe_width and iframe_height and iframe_width > 0: ratio = (iframe_height / iframe_width) * 100 iframe_template = jinja_env.from_string( """
    """ ) iframe_template_rendered = iframe_template.render( src=src, ratio=f"{ratio:.4f}", iframe_width=iframe_width or "100%", iframe_height=iframe_height or "100%", ) out_paragraphs.append(iframe_template_rendered) else: # Fallback to responsive iframe without aspect ratio iframe_template = jinja_env.from_string( '
    ' ) iframe_template_rendered = iframe_template.render( src=src, iframe_width=iframe_width or "100%", iframe_height=iframe_height or "100%", ) out_paragraphs.append(iframe_template_rendered) else: logger.error(f"Unknown {paragraph['type']}: {paragraph}") current_pos += 1 return out_paragraphs, title, subtitle async def render_as_html(self, post_id: str): post_data = await self.query(post_id) try: result = await self._render_as_html(post_data, post_id) except Exception as ex: raise ex else: return result async def generate_metadata( self, post_data: dict, post_id: str, as_dict: bool = False ) -> tuple | dict[str, str]: title = RLStringHelper( post_data["data"]["post"]["title"], ["minimal"] ).get_text() subtitle = RLStringHelper( post_data["data"]["post"]["previewContent"]["subtitle"] ).get_text() description = RLStringHelper( textwrap.shorten(subtitle, width=100, placeholder="...") ).get_text() preview_image_id = post_data["data"]["post"]["previewImage"]["id"] creator = post_data["data"]["post"]["creator"] collection = post_data["data"]["post"]["collection"] url = post_data["data"]["post"]["mediumUrl"] reading_time = math.ceil(post_data["data"]["post"]["readingTime"]) free_access = "No" if post_data["data"]["post"]["isLocked"] else "Yes" updated_at = convert_datetime_to_human_readable( post_data["data"]["post"]["updatedAt"] ) first_published_at = convert_datetime_to_human_readable( post_data["data"]["post"]["firstPublishedAt"] ) tags = post_data["data"]["post"]["tags"] if as_dict: return { "post_id": post_id, "title": title, "subtitle": subtitle, "description": description, "url": url, "creator": creator, "collection": collection, "reading_time": reading_time, "free_access": free_access, "updated_at": updated_at, "first_published_at": first_published_at, "preview_image_id": preview_image_id, "tags": tags, } return ( title, subtitle, description, url, creator, collection, reading_time, free_access, updated_at, first_published_at, preview_image_id, tags, ) async def _render_as_html(self, post_data: dict, post_id: str) -> "HtmlResult": # Generate metadata in parallel metadata_task = asyncio.create_task(self.generate_metadata(post_data, post_id)) # Parse and render content in parallel content, title, subtitle = await asyncify( self._parse_and_render_content_html_post )( post_data["data"]["post"]["content"], post_data["data"]["post"]["title"], post_data["data"]["post"]["previewContent"]["subtitle"], post_data["data"]["post"]["previewImage"]["id"], post_data["data"]["post"]["highlights"], post_data["data"]["post"]["tags"], post_data, ) # Await metadata ( title, subtitle, description, url, creator, collection, reading_time, free_access, updated_at, first_published_at, preview_image_id, tags, ) = await metadata_task post_page_title_raw = "{{ title }} | by {{ creator.name }}" if collection: post_page_title_raw += " | in {{ collection.name }}" post_page_title = jinja_env.from_string(post_page_title_raw) post_page_title_rendered = post_page_title.render( title=title, creator=creator, collection=collection ) post_context = { "subtitle": subtitle, "title": title, "url": url, "creator": creator, "collection": collection, "readingTime": reading_time, "freeAccess": free_access, "updatedAt": updated_at, "firstPublishedAt": first_published_at, "previewImageId": preview_image_id, "content": content, "tags": tags, } post_template_rendered = self.post_template.render(post_context) return HtmlResult( post_page_title_rendered, description, url, post_template_rendered ) async def render_as_markdown(self) -> str: raise NotImplementedError( "Markdown rendering is not implemented. Please use HTML rendering instead" )