From e87ecbe882ced6f075b665f7032fe56361661393 Mon Sep 17 00:00:00 2001 From: ZhymabekRoman Date: Sat, 21 Sep 2024 20:41:38 +0500 Subject: [PATCH] medium parser: overlapse parser finnal fix --- web/.env_template => .env_template | 0 database-lib/database_lib/main.py | 4 +- medium-parser/medium_parser/__init__.py | 1 + medium-parser/medium_parser/api.py | 23 +- medium-parser/medium_parser/core.py | 2 +- medium-parser/medium_parser/markups.py | 8 +- medium-parser/tests/example_test.py | 9 +- .../rl_string_helper/string_helper.py | 484 +++++++++++------- .../tests/test_rl_string_helper.py | 195 ++++--- test_lab/pr_brAAAAAAAAAAAAA.py | 59 +++ 10 files changed, 521 insertions(+), 264 deletions(-) rename web/.env_template => .env_template (100%) create mode 100644 test_lab/pr_brAAAAAAAAAAAAA.py diff --git a/web/.env_template b/.env_template similarity index 100% rename from web/.env_template rename to .env_template diff --git a/database-lib/database_lib/main.py b/database-lib/database_lib/main.py index 7241a1b..99b9b86 100644 --- a/database-lib/database_lib/main.py +++ b/database-lib/database_lib/main.py @@ -45,7 +45,9 @@ class CacheResponse: def __init__(self, key: str, data: Union[CacheData, str]): self.key: str = key - self.data: CacheData = CacheData(data) if isinstance(data, str) else data + self.data: CacheData = ( + CacheData(data) if not isinstance(data, CacheData) else data + ) def json(self): return self.data.json() diff --git a/medium-parser/medium_parser/__init__.py b/medium-parser/medium_parser/__init__.py index a2b9f04..129d452 100644 --- a/medium-parser/medium_parser/__init__.py +++ b/medium-parser/medium_parser/__init__.py @@ -5,4 +5,5 @@ from medium_parser import exceptions as exceptions from medium_parser import exceptions as medium_parser_exceptions retry_options = ExponentialRetry(attempts=3) +jinja_env_debug = jinja2.Environment(undefined=jinja2.DebugUndefined) jinja_env = jinja2.Environment() diff --git a/medium-parser/medium_parser/api.py b/medium-parser/medium_parser/api.py index 4149c00..a88c0c0 100644 --- a/medium-parser/medium_parser/api.py +++ b/medium-parser/medium_parser/api.py @@ -15,7 +15,12 @@ from medium_parser.utils import generate_random_sha256_hash class MediumApi: __slots__ = ("auth_cookies", "proxy_list", "timeout") - def __init__(self, auth_cookies: Optional[str] = None, proxy_list: Optional[List[str]] = None, timeout: int = 3): + def __init__( + self, + auth_cookies: Optional[str] = None, + proxy_list: Optional[List[str]] = None, + timeout: int = 3, + ): self.auth_cookies = auth_cookies self.proxy_list = proxy_list self.timeout = timeout @@ -50,7 +55,7 @@ class MediumApi: "Connection": "Keep-Alive", } - if self.auth_cookies: + if self.auth_cookies is not None: headers["Cookie"] = self.auth_cookies graphql_data = { @@ -68,7 +73,11 @@ class MediumApi: logger.debug(f"Request started...") async with aiohttp.ClientSession(connector=connector) as session: - async with RetryClient(client_session=session, raise_for_status=False, retry_options=retry_options) as retry_client: + async with RetryClient( + client_session=session, + raise_for_status=False, + retry_options=retry_options, + ) as retry_client: async with retry_client.post( "https://medium.com/_/graphql", headers=headers, @@ -76,7 +85,9 @@ class MediumApi: timeout=self.timeout, ) as request: if request.status != 200: - logger.error(f"Failed to fetch post by ID {post_id} with status code: {request.status}") + logger.error( + f"Failed to fetch post by ID {post_id} with status code: {request.status}" + ) return None try: @@ -89,7 +100,9 @@ class MediumApi: logger.debug(f"Request finished...") if exception: - logger.error(f"Exception occured while fetching post {post_id}, so let's just fuck it up") + logger.error( + f"Exception occured while fetching post {post_id}, so let's just fuck it up" + ) raise exception return response_data diff --git a/medium-parser/medium_parser/core.py b/medium-parser/medium_parser/core.py index 35fffba..ed12b83 100644 --- a/medium-parser/medium_parser/core.py +++ b/medium-parser/medium_parser/core.py @@ -101,7 +101,7 @@ class MediumParser: async def _get_from_cache(): logger.debug("Using cache backend") post_data = self.cache.pull(post_id) - if post_data and post_data.data.has_data(): + if post_data: logger.debug("post query was found on cache") parsed_data = post_data.json() if parsed_data: diff --git a/medium-parser/medium_parser/markups.py b/medium-parser/medium_parser/markups.py index 7aac76d..1b1a0cc 100644 --- a/medium-parser/medium_parser/markups.py +++ b/medium-parser/medium_parser/markups.py @@ -1,4 +1,4 @@ -from medium_parser import jinja_env +from medium_parser import jinja_env_debug def raw_render(**kwargs): @@ -15,7 +15,7 @@ def parse_markups(markups: list[str]): for markup in markups: if markup["type"] == "A": if markup["anchorType"] == "LINK": - template = jinja_env.from_string( + template = jinja_env_debug.from_string( '{{text}}' ) template = template.render( @@ -26,7 +26,7 @@ def parse_markups(markups: list[str]): ) ) elif markup["anchorType"] == "USER": - template = jinja_env.from_string( + template = jinja_env_debug.from_string( '{{text}}' ) template = template.render(userId=markup["userId"]) @@ -43,7 +43,7 @@ def parse_markups(markups: list[str]): else: continue - template = jinja_env.from_string(template) + template = jinja_env_debug.from_string(template) markup["template"] = template markups_out.append(markup) diff --git a/medium-parser/tests/example_test.py b/medium-parser/tests/example_test.py index 38d305e..be8ccb4 100644 --- a/medium-parser/tests/example_test.py +++ b/medium-parser/tests/example_test.py @@ -4,6 +4,7 @@ import sys import jinja2 from loguru import logger +from medium_parser.api import MediumApi from medium_parser.core import MediumParser from database_lib import SQLiteCacheBackend @@ -11,6 +12,7 @@ jinja2_env = jinja2.Environment( loader=jinja2.FileSystemLoader("./"), ) + async def safe_main(): try: await main() @@ -19,6 +21,7 @@ async def safe_main(): async def main(): + medium_api = MediumApi(timeout=8) logger.remove() logger.add(sys.stderr, level="INFO") # logger.add(sys.stderr, level="TRACE") @@ -27,13 +30,13 @@ async def main(): # dl = await MediumParser.from_url("") sqlite = SQLiteCacheBackend("test_db.sqlite") sqlite.init_db() - dl = MediumParser("ef85d8e72883", sqlite, 8, "localhost") - query_result = await dl.query(use_cache=False) + dl = MediumParser(sqlite, medium_api, 8, "localhost") + query_result = await dl.query("cd842ce3f0a3", use_cache=False) with open("query_result.json", "w") as f: json.dump(query_result, f, indent=2) - result = await dl.render_as_html() + result = await dl.render_as_html("cd842ce3f0a3") with open("medium.html", "w") as f: template = jinja2_env.get_template("example_base_template.html") diff --git a/rl_string_helper/rl_string_helper/string_helper.py b/rl_string_helper/rl_string_helper/string_helper.py index 1a8960f..fed3919 100644 --- a/rl_string_helper/rl_string_helper/string_helper.py +++ b/rl_string_helper/rl_string_helper/string_helper.py @@ -9,7 +9,6 @@ from rl_string_helper.mixins.string_assignment import ( jinja_env = Environment(undefined=DebugUndefined) -# TODO: more clarified description """ In JavaScript, the `length` property of a String object returns the number of code units (bytes) in the string, which makes use of UTF-16 encoding. In UTF-16, each Unicode character may be encoded as one or two code units (byte). This means that for certain scripts, such as emojis, mathematical symbols, or some Chinese characters, @@ -20,57 +19,70 @@ More info to read: https://habr.com/ru/articles/769256/ """ -# TODO: doc! Who will read this noodles lol? -# TODO: check cases when UTF-16 character can be more that 2 bytes -class RLStringHelper: - __slots__ = ( - "string", - "templates", - "replaces", - "quote_html_type", - "quote_replaces", - "_default_bang_char", - ) - - def __init__( - self, - string: str, - quote_html_type: list[str] = ["full"], - _default_bang_char: str = "R", - ): - self.string: str = quote_symbol(string) - self.templates: list[tuple[tuple[int, int], Template]] = [] - self.quote_replaces: list[tuple[tuple[int, int], str]] = [] - self.replaces: list[tuple[tuple[int, int], str]] = [] - self.quote_html_type = quote_html_type - self._default_bang_char = _default_bang_char +class UTF16Handler: + def __init__(self, default_bang_char: str = "R"): + logger.info( + f"Initializing UTF16Handler with default_bang_char: {default_bang_char}" + ) + self._default_bang_char = default_bang_char def pre_utf_16_bang( self, string: str, string_pos_matrix: list ) -> tuple[str, list, list[tuple[int, int, int]]]: + logger.info("Starting pre_utf_16_bang method") utf_16_bang_list: list[tuple[int, int, int]] = [] string_len_utf_16 = len(string.encode("utf-16-le")) // 2 + logger.debug(f"UTF-16 length of string: {string_len_utf_16}") if string_len_utf_16 == len(string): logger.trace("String doesn't contain multibyte characters") return string, string_pos_matrix, utf_16_bang_list i = 0 while len(string) - 1 > i: + logger.debug(f"Processing character at index {i}") new_i = string_pos_matrix[i] char = string[new_i] char_len = len(char.encode("utf-16-le")) // 2 if char_len == 2: + logger.debug(f"Multibyte character found at index {i}") char_len_dif = char_len - 1 char_present = self._default_bang_char * char_len_dif string, string_pos_matrix = self._paste_char( string, string_pos_matrix, new_i + 1, char_present ) + logger.info(f"Mutation: Inserted '{char_present}' at index {new_i + 1}") i += 1 utf_16_bang_list.append((i, char_len_dif, i)) i += 1 + logger.info("Finished pre_utf_16_bang method") return string, string_pos_matrix, utf_16_bang_list + def post_utf_16_bang( + self, + string: StringAssignmentMixin, + string_pos_matrix: list, + utf_16_bang_list: list, + ): + logger.info("Starting post_utf_16_bang method") + string = StringAssignmentMixin(str(string)) + post_transbang = 0 + for bang_pos, char_len, old_pos in utf_16_bang_list: + logger.debug(f"Processing bang at position {bang_pos}") + string, string_pos_matrix = self._delete_char( + string, + string_pos_matrix, + bang_pos - post_transbang, + char_len, + old_pos - post_transbang, + ) + logger.info( + f"Mutation: Deleted {char_len} character(s) at index {bang_pos - post_transbang}" + ) + post_transbang += char_len + logger.info("Finished post_utf_16_bang method") + return string, string_pos_matrix + def _paste_char( self, string: StringAssignmentMixin, @@ -78,11 +90,13 @@ class RLStringHelper: pos: int, char: str, ) -> tuple[StringAssignmentMixin, list]: + logger.debug(f"Pasting character '{char}' at position {pos}") char_len = len(char) string_pos_matrix.insert(pos, string_pos_matrix[pos]) for matrix_i in range(pos + 1, len(string_pos_matrix)): string_pos_matrix[matrix_i] += char_len string.insert(pos, char) + logger.info(f"Mutation: Inserted '{char}' at position {pos}") return string, string_pos_matrix def _delete_char( @@ -93,8 +107,11 @@ class RLStringHelper: char_len: int, old_pos: int, ): + logger.debug(f"Deleting character at position {pos}") + deleted_char = string[pos : pos + char_len] string.pop(pos) string_pos_matrix.pop(old_pos) + logger.info(f"Mutation: Deleted '{deleted_char}' at position {pos}") for matrix_i in range(pos, len(string_pos_matrix)): if isinstance(string_pos_matrix[matrix_i], int): string_pos_matrix[matrix_i] -= char_len @@ -105,74 +122,35 @@ class RLStringHelper: ) return string, string_pos_matrix - def post_utf_16_bang( + +class TemplateRenderer: + def render_templates( self, - string: StringAssignmentMixin, + string: str, string_pos_matrix: list, utf_16_bang_list: list, + templates: list, ): - string = StringAssignmentMixin(str(string)) - post_transbang = 0 - for bang_pos, char_len, old_pos in utf_16_bang_list: - string, string_pos_matrix = self._delete_char( - string, - string_pos_matrix, - bang_pos - post_transbang, - char_len, - old_pos - post_transbang, - ) - post_transbang += char_len - return string, string_pos_matrix - - def set_template(self, start: int, end: int, template: str | Template): - if not isinstance(template, Template): - template = jinja_env.from_string(template) - self.templates.append(((start, end), template)) - - def set_replace(self, start: int, end: int, replace_with: str): - self.replaces.append(((start, end), replace_with)) - - def _render_templates( - self, string: str, string_pos_matrix: list, utf_16_bang_list: list - ): - if not self.templates: + logger.info("Starting render_templates method") + if not templates: + logger.info("No templates to render") return string, string_pos_matrix, utf_16_bang_list - templates = reversed(self.templates) + templates = reversed(templates) updated_text = string - def _get_prefix_len(template_raw: Template, inner_char: str = "{"): - template = template_raw.render() - return template.find(inner_char) - - def _get_suffix_len(template_raw: Template, outer_char: str = "}"): - template = template_raw.render() - return len(template) - template.rfind(outer_char) - 1 - - def update_nested_positions(start, end, prefix_len, suffix_len): - for i in range(end, len(string_pos_matrix)): - string_pos_matrix[i] += suffix_len + prefix_len - for i in range(start, end): - string_pos_matrix[i] += prefix_len - for n in range(len(utf_16_bang_list)): - utf_16_bang = utf_16_bang_list[n] - if utf_16_bang[2] > end: - utf_16_bang_list[n] = ( - utf_16_bang[0] + prefix_len + suffix_len, - utf_16_bang[1], - utf_16_bang[2], - ) - elif utf_16_bang[2] > start: - utf_16_bang_list[n] = ( - utf_16_bang[0] + prefix_len, - utf_16_bang[1], - utf_16_bang[2], - ) - for (start, end), template in templates: - if start >= len(string_pos_matrix) or end - 1 >= len(string_pos_matrix): + logger.debug(f"Rendering template for range {start}:{end}") + if start >= len(string_pos_matrix): + logger.warning("Template start range out of bounds, skipping") continue + if end - 1 >= len(string_pos_matrix): + logger.warning( + "Template end range out of bounds, fixing end position..." + ) + end = len(string_pos_matrix) if start == end: + logger.warning("Empty template range, skipping") continue new_start, new_end = ( @@ -180,71 +158,85 @@ class RLStringHelper: string_pos_matrix[end - 1] + 1, ) if new_end < new_start: + logger.warning("Invalid template range, skipping") continue context_text = template.render(text=updated_text[new_start:new_end]) updated_text_template = jinja_env.from_string( "{{ updated_text[:new_start] }}{{ context_text }}{{updated_text[new_end:]}}" ) + old_text = updated_text[new_start:new_end] updated_text = updated_text_template.render( updated_text=updated_text, context_text=context_text, new_start=new_start, new_end=new_end, ) + logger.info( + f"Mutation: Replaced '{old_text}' with '{context_text}' in range {new_start}:{new_end}" + ) - prefix_len = _get_prefix_len(template) - suffix_len = _get_suffix_len(template) - update_nested_positions(start, end, prefix_len, suffix_len) + prefix_len = self._get_prefix_len(template) + suffix_len = self._get_suffix_len(template) + self._update_nested_positions( + string_pos_matrix, utf_16_bang_list, start, end, prefix_len, suffix_len + ) + logger.info("Finished render_templates method") return updated_text, string_pos_matrix, utf_16_bang_list - def _render_replaces( + def _get_prefix_len(self, template_raw: Template, inner_char: str = "{"): + logger.debug("Calculating prefix length") + template = template_raw.render() + return template.find(inner_char) + + def _get_suffix_len(self, template_raw: Template, outer_char: str = "}"): + logger.debug("Calculating suffix length") + template = template_raw.render() + return len(template) - template.rfind(outer_char) - 1 + + def _update_nested_positions( + self, string_pos_matrix, utf_16_bang_list, start, end, prefix_len, suffix_len + ): + logger.debug(f"Updating nested positions for range {start}:{end}") + for i in range(end, len(string_pos_matrix)): + string_pos_matrix[i] += suffix_len + prefix_len + for i in range(start, end): + string_pos_matrix[i] += prefix_len + for n in range(len(utf_16_bang_list)): + utf_16_bang = utf_16_bang_list[n] + if utf_16_bang[2] > end: + utf_16_bang_list[n] = ( + utf_16_bang[0] + prefix_len + suffix_len, + utf_16_bang[1], + utf_16_bang[2], + ) + elif utf_16_bang[2] > start: + utf_16_bang_list[n] = ( + utf_16_bang[0] + prefix_len, + utf_16_bang[1], + utf_16_bang[2], + ) + logger.info(f"Mutation: Updated positions for template in range {start}:{end}") + + +class StringReplacer: + def render_replaces( self, string: StringAssignmentMixin, string_pos_matrix: list, utf_16_bang_list: list, + replaces: list, ): - if not self.replaces and not self.quote_replaces: + logger.info("Starting render_replaces method") + if not replaces: + logger.info("No replacements to perform") return string, string_pos_matrix, utf_16_bang_list string = StringAssignmentMixin(str(string)) - replaces = self.replaces + self.quote_replaces - - def update_positions( - start: int, end: int, replace_len: int, new_start: int, new_end: int - ): - pos_len_diff = replace_len - (end - start) - for pos_index in range(end, len(string_pos_matrix)): - if isinstance(string_pos_matrix[pos_index], int): - string_pos_matrix[pos_index] += pos_len_diff - elif isinstance(string_pos_matrix[pos_index], tuple): - string_pos_matrix[pos_index] = ( - string_pos_matrix[pos_index][0] + pos_len_diff, - string_pos_matrix[pos_index][1] + pos_len_diff, - ) - if pos_len_diff != 0: - for i in range(start, end): - if isinstance(string_pos_matrix[i], int): - string_pos_matrix[i] = ( - string_pos_matrix[i], - string_pos_matrix[i] + replace_len, - ) - elif isinstance(string_pos_matrix[i], tuple): - string_pos_matrix[i] = ( - string_pos_matrix[i][0] + replace_len, - string_pos_matrix[i][1] + replace_len, - ) - for n in range(len(utf_16_bang_list)): - utf_16_bang = utf_16_bang_list[n] - if utf_16_bang[0] > end: - utf_16_bang_list[n] = ( - utf_16_bang[0] + pos_len_diff, - utf_16_bang[1], - utf_16_bang[2], - ) for (start, end), replace_with in replaces: + logger.debug(f"Performing replacement for range {start}:{end}") new_start, new_end = string_pos_matrix[start], string_pos_matrix[end - 1] if isinstance(new_end, int): new_end += 1 @@ -255,91 +247,231 @@ class RLStringHelper: ) new_end = max(new_end) if isinstance(new_end, tuple) else new_end + old_text = string[new_start:new_end] string[new_start:new_end] = replace_with - update_positions(start, end, len(replace_with), new_start, new_end) + logger.info( + f"Mutation: Replaced '{old_text}' with '{replace_with}' in range {new_start}:{new_end}" + ) + self._update_positions( + string_pos_matrix, + utf_16_bang_list, + start, + end, + len(replace_with), + new_start, + new_end, + ) + logger.info("Finished render_replaces method") return string, string_pos_matrix, utf_16_bang_list + def _update_positions( + self, + string_pos_matrix, + utf_16_bang_list, + start, + end, + replace_len, + new_start, + new_end, + ): + logger.debug(f"Updating positions for replacement in range {start}:{end}") + pos_len_diff = replace_len - (end - start) + for pos_index in range(end, len(string_pos_matrix)): + if isinstance(string_pos_matrix[pos_index], int): + string_pos_matrix[pos_index] += pos_len_diff + elif isinstance(string_pos_matrix[pos_index], tuple): + string_pos_matrix[pos_index] = ( + string_pos_matrix[pos_index][0] + pos_len_diff, + string_pos_matrix[pos_index][1] + pos_len_diff, + ) + if pos_len_diff != 0: + for i in range(start, end): + if isinstance(string_pos_matrix[i], int): + string_pos_matrix[i] = ( + string_pos_matrix[i], + string_pos_matrix[i] + replace_len, + ) + elif isinstance(string_pos_matrix[i], tuple): + string_pos_matrix[i] = ( + string_pos_matrix[i][0] + replace_len, + string_pos_matrix[i][1] + replace_len, + ) + for n in range(len(utf_16_bang_list)): + utf_16_bang = utf_16_bang_list[n] + if utf_16_bang[0] > end: + utf_16_bang_list[n] = ( + utf_16_bang[0] + pos_len_diff, + utf_16_bang[1], + utf_16_bang[2], + ) + logger.info( + f"Mutation: Updated positions for replacement in range {start}:{end}" + ) + + +class RLStringHelper: + def __init__( + self, + string: str, + quote_html_type: list[str] = ["full"], + _default_bang_char: str = "R", + ): + logger.info("Initializing RLStringHelper") + self.string: str = quote_symbol(string) + self.templates: list[tuple[tuple[int, int], Template]] = [] + self.quote_replaces: list[tuple[tuple[int, int], str]] = [] + self.replaces: list[tuple[tuple[int, int], str]] = [] + self.quote_html_type = quote_html_type + self.utf16_handler = UTF16Handler(_default_bang_char) + self.template_renderer = TemplateRenderer() + self.string_replacer = StringReplacer() + + def set_template(self, start: int, end: int, template: str | Template): + logger.info(f"Setting template for range {start}:{end}") + if not isinstance(template, Template): + template = jinja_env.from_string(template) + self.templates.append(((start, end), template)) + logger.info(f"Mutation: Added template for range {start}:{end}") + + def set_replace(self, start: int, end: int, replace_with: str): + logger.info(f"Setting replacement for range {start}:{end}") + self.replaces.append(((start, end), replace_with)) + logger.info( + f"Mutation: Added replacement '{replace_with}' for range {start}:{end}" + ) + def __str__(self): + logger.info("Converting RLStringHelper to string") string = StringAssignmentMixin(self.string) string_pos_matrix = list(range(len(string))) - updated_text, string_pos_matrix, utf_16_bang_list = self.pre_utf_16_bang( - string, string_pos_matrix + updated_text, string_pos_matrix, utf_16_bang_list = ( + self.utf16_handler.pre_utf_16_bang(string, string_pos_matrix) ) if self.quote_html_type: + logger.info("Applying HTML quoting") self.quote_replaces = list( quote_html(str(updated_text), self.quote_html_type) ) + logger.info( + f"Mutation: Added {len(self.quote_replaces)} HTML quote replacements" + ) if not self.templates and not self.replaces and not self.quote_replaces: + logger.info("No modifications needed, returning original string") return self.string - updated_text, string_pos_matrix, utf_16_bang_list = self._render_templates( - updated_text, string_pos_matrix, utf_16_bang_list - ) - updated_text, string_pos_matrix, utf_16_bang_list = self._render_replaces( - updated_text, string_pos_matrix, utf_16_bang_list - ) - updated_text, string_pos_matrix = self.post_utf_16_bang( + updated_text, string_pos_matrix, utf_16_bang_list = ( + self.template_renderer.render_templates( + updated_text, string_pos_matrix, utf_16_bang_list, self.templates + ) + ) + updated_text, string_pos_matrix, utf_16_bang_list = ( + self.string_replacer.render_replaces( + updated_text, + string_pos_matrix, + utf_16_bang_list, + self.replaces + self.quote_replaces, + ) + ) + updated_text, string_pos_matrix = self.utf16_handler.post_utf_16_bang( updated_text, string_pos_matrix, utf_16_bang_list ) + logger.info("Finished string conversion") return str(updated_text) def get_text(self): + logger.info("Getting text from RLStringHelper") return self.__str__() -def split_overlapping_ranges(markups, _retry_count: int = 7): - for _ in range(len(markups) * _retry_count): - new_markups = split_overlapping_range_position(markups) - if len(new_markups) == len(markups): - break - markups = new_markups - return markups +def split_overlapping_ranges(markups): + logger.info("Starting split_overlapping_ranges") + new_markups = process_and_optimize_intervals( + *[ + Interval(markup["start"], markup["end"], markup["type"], markup["template"]) + for markup in markups + ] + ) + dict_new_markups = [markup.to_dict() for markup in new_markups] + return dict_new_markups -def split_overlapping_range_position(positions): - if not positions: - return [] +class Interval: + def __init__(self, start, end, type, template=None): + self.start = start + self.end = end + self.type = type + self.template = template - positions.sort(key=lambda x: x["start"]) - result = [positions[0]] + def __repr__(self): + return f"start={self.start}, end={self.end}, type={self.type}, template={self.template}" - for pos in positions[1:]: - last = result[-1] - if not pos["start"] < last["end"]: - result.append(pos.copy()) - continue + def to_dict(self): + return { + "start": self.start, + "end": self.end, + "type": self.type, + "template": self.template, + } - if pos["type"] != last["type"]: - if pos["end"] <= last["end"]: - result[-1] = { - "start": last["start"], - "end": pos["start"], - "type": last["type"], - "template": last["template"], - } - result.append(pos.copy()) - if pos["end"] < last["end"]: - result.append( - { - "start": pos["end"], - "end": last["end"], - "type": last["type"], - "template": last["template"], - } - ) - else: - result[-1] = { - "start": last["start"], - "end": pos["start"], - "type": last["type"], - "template": last["template"], - } - result.append(pos.copy()) - else: - result[-1]["end"] = max(last["end"], pos["end"]) + +def split_intervals_with_types(*intervals): + points = set() + for interval in intervals: + points.add(interval.start) + points.add(interval.end) + + sorted_points = sorted(points) + result = [] + + for i in range(len(sorted_points) - 1): + start = sorted_points[i] + end = sorted_points[i + 1] + types_and_templates = [ + (interval.type, interval.template) + for interval in intervals + if interval.start <= start and interval.end >= end + ] + result.append( + Interval( + start, + end, + [t[0] for t in types_and_templates], + [t[1] for t in types_and_templates], + ) + ) return result + + +def convert_to_object_string(input_data): + result = [] + + for item in input_data: + start, end = item.start, item.end + for type_, template in zip(item.type, item.template): + result.append(Interval(start, end, type_, template)) + + return result + + +def process_and_optimize_intervals(*intervals): + split = split_intervals_with_types(*intervals) + return convert_to_object_string(split) + + +# intervals = [ +# Interval(4, 17, "bold", "{{text}}"), +# Interval(10, 21, "italic", "{{text}}"), +# Interval(18, 27, "underline", "{{text}}"), +# Interval(33, 41, "code", "{{text}}"), +# Interval(40, 46, "link", '{{text}}'), +# Interval(0, 46, "span", "{{text}}"), +# ] + +# intervals_result = +# for interval in intervals_result: +# print(interval) diff --git a/rl_string_helper/tests/test_rl_string_helper.py b/rl_string_helper/tests/test_rl_string_helper.py index 5685f54..e56adca 100644 --- a/rl_string_helper/tests/test_rl_string_helper.py +++ b/rl_string_helper/tests/test_rl_string_helper.py @@ -73,62 +73,112 @@ class TestRLStringHelper: helper.set_template(0, 11, "{{text}}") assert str(helper) == "Hello world" - # def test_super_duper_overlapsing(self): - # # https://medium.com/google-cloud/implementing-semantic-caching-a-step-by-step-guide-to-faster-cost-effective-genai-workflows-ef85d8e72883#bypass - # text = "Note: The patterns and ideas discussed in this post are broadly applicable and can be adopted for other cloud providers." - # helper = RLStringHelper(text) - # markups = ( - # [ - # { - # "__typename": "Markup", - # "name": None, - # "type": "CODE", - # "start": 0, - # "end": 5, - # "href": None, - # "title": None, - # "rel": None, - # "anchorType": None, - # "userId": None, - # "creatorIds": None, - # }, - # { - # "__typename": "Markup", - # "name": None, - # "type": "STRONG", - # "start": 0, - # "end": 6, - # "href": None, - # "title": None, - # "rel": None, - # "anchorType": None, - # "userId": None, - # "creatorIds": None, - # }, - # { - # "__typename": "Markup", - # "name": None, - # "type": "EM", - # "start": 0, - # "end": 6, - # "href": None, - # "title": None, - # "rel": None, - # "anchorType": None, - # "userId": None, - # "creatorIds": None, - # }, - # ], - # ) - # parsed_markups = parse_markups(markups[0]) - # logger.debug(parsed_markups) - # parsed_markups = split_overlapping_ranges(parsed_markups) - # logger.debug(parsed_markups) - # for markup in parsed_markups: - # helper.set_template(markup["start"], markup["end"], markup["template"]) + def test_super_duper_overlapsing(self): + text = "Note: The patterns and ideas discussed in this post are broadly applicable." + helper = RLStringHelper(text) - # expected_pattern = r"]*>Note: The patterns and ideas discussed in this post are broadly applicable and can be adopted for other cloud providers\." - # assert re.match(expected_pattern, str(helper)) + markups = [ + {"start": 0, "end": 5, "type": "code", "template": "{{text}}"}, + { + "start": 0, + "end": 6, + "type": "strong", + "template": "{{text}}", + }, + {"start": 0, "end": 6, "type": "em", "template": "{{text}}"}, + ] + + parsed_markups = split_overlapping_ranges(markups) + for markup in parsed_markups: + helper.set_template(markup["start"], markup["end"], markup["template"]) + + expected_pattern = r"Note: The patterns and ideas discussed in this post are broadly applicable\." + result = str(helper) + assert re.match(expected_pattern, result) + + def test_complex_overlapping_tags(self): + text = "The quick (brown) fox jumps over 13 lazy dogs!" + helper = RLStringHelper(text) + + markups = [ + { + "start": 0, + "end": 46, + "type": "span", + "template": "{{text}}", + }, + {"start": 4, "end": 17, "type": "bold", "template": "{{text}}"}, + {"start": 10, "end": 21, "type": "italic", "template": "{{text}}"}, + { + "start": 18, + "end": 27, + "type": "underline", + "template": "{{text}}", + }, + { + "start": 33, + "end": 41, + "type": "code", + "template": "{{text}}", + }, + { + "start": 40, + "end": 46, + "type": "link", + "template": '{{text}}', + }, + ] + + parsed_markups = split_overlapping_ranges(markups) + for markup in parsed_markups: + helper.set_template(markup["start"], markup["end"], markup["template"]) + + expected_output = ( + "The quick (brown) fox jumps " + 'over 13 lazy dogs!' + ) + assert str(helper) == expected_output + + def test_nmultibyte_emoji(self): + from medium_parser.markups import parse_markups + + data = { + "__typename": "Paragraph", + "id": "236e7049b537_33", + "name": "ba8c", + "href": None, + "text": "Noah dragged his two printers out from Settings ⚙️ < Printers & Scanners \ud83d\udda8️ and dropped them in Dock or Desktop, I don’t remember — but you can drag to both the places.", + "iframe": None, + "layout": None, + "markups": [ + { + "__typename": "Markup", + "name": None, + "type": "CODE", + "start": 39, + "end": 76, + "href": None, + "title": None, + "rel": None, + "anchorType": None, + "userId": None, + "creatorIds": None, + } + ], + "metadata": None, + "mixtapeMetadata": None, + "type": "P", + "hasDropCap": None, + "dropCapImage": None, + "codeBlockMetadata": None, + } + helper = RLStringHelper(data["text"]) + parsed_markups = split_overlapping_ranges(parse_markups(data["markups"])) + for markup in parsed_markups: + helper.set_template(markup["start"], markup["end"], markup["template"]) + print(str(helper)) + + assert str(helper) == data["text"] def test_basic_replace(self): # Replace A to B - ONE to ONE char @@ -188,26 +238,23 @@ class TestRLStringHelper: helper = RLStringHelper(issue_text) assert helper.get_text() == issue_text - # def test_markup_parser(self): - # href_markup = { - # "__typename": 'Markup', - # "anchorType": 'LINK', - # "end": 12, - # "href": 'https://readwise.io/bookreview/{{book_id', - # "name": None, - # "rel": 'nofollow', - # "start": 0, - # "title": '', - # "type": 'A', - # "userId": None - # } + def test_markup_parser(self): + href_markup = { + "start": 0, + "end": 12, + "type": "a", + "template": '{{ text }}', + } - # helper = RLStringHelper("Hello world") - # markups = parse_markups([href_markup]) - # parsed_markups = split_overlapping_ranges(markups) - # for markup in parsed_markups: - # helper.set_template(markup["start"], markup["end"], markup["template"]) - # assert helper.get_text() == 'Hello world' + helper = RLStringHelper("Hello world") + parsed_markups = split_overlapping_ranges([href_markup]) + for markup in parsed_markups: + helper.set_template(markup["start"], markup["end"], markup["template"]) + + assert ( + helper.get_text() + == 'Hello world' + ) def test_medium_all(self): helper = RLStringHelper("ABC Hello world") diff --git a/test_lab/pr_brAAAAAAAAAAAAA.py b/test_lab/pr_brAAAAAAAAAAAAA.py new file mode 100644 index 0000000..7ed8e3f --- /dev/null +++ b/test_lab/pr_brAAAAAAAAAAAAA.py @@ -0,0 +1,59 @@ +from collections import defaultdict +import heapq +from icecream import ic +from loguru import logger + + +def _split_overlapping_ranges(positions): + logger.info("Starting improved split_overlapping_range_position") + if not positions: + logger.info("No positions to split") + return [] + + events = [] + for i, pos in enumerate(positions): + heapq.heappush(events, (pos["start"], 0, i)) # 0 for start event + heapq.heappush(events, (pos["end"], 1, i)) # 1 for end event + + active = set() + result = [] + last_point = None + open_ranges = defaultdict(list) + + while events: + point, event_type, index = heapq.heappop(events) + + if last_point is not None and point > last_point and active: + for act_index in active: + open_ranges[act_index].append( + { + "start": last_point, + "end": point, + "type": positions[act_index]["type"], + "template": positions[act_index]["template"], + } + ) + + if event_type == 0: # Start event + active.add(index) + else: # End event + active.remove(index) + if open_ranges[index]: + result.extend(open_ranges[index]) + del open_ranges[index] + + last_point = point + + # Sort the result based on the original order of positions + result.sort( + key=lambda x: next( + i + for i, pos in enumerate(positions) + if pos["type"] == x["type"] and pos["template"] == x["template"] + ) + ) + + logger.info( + f"Finished improved split_overlapping_range_position. Generated {len(result)} ranges." + ) + return result