diff --git a/web/.env_template b/.env_template
similarity index 100%
rename from web/.env_template
rename to .env_template
diff --git a/database-lib/database_lib/main.py b/database-lib/database_lib/main.py
index 7241a1b..99b9b86 100644
--- a/database-lib/database_lib/main.py
+++ b/database-lib/database_lib/main.py
@@ -45,7 +45,9 @@ class CacheResponse:
def __init__(self, key: str, data: Union[CacheData, str]):
self.key: str = key
- self.data: CacheData = CacheData(data) if isinstance(data, str) else data
+ self.data: CacheData = (
+ CacheData(data) if not isinstance(data, CacheData) else data
+ )
def json(self):
return self.data.json()
diff --git a/medium-parser/medium_parser/__init__.py b/medium-parser/medium_parser/__init__.py
index a2b9f04..129d452 100644
--- a/medium-parser/medium_parser/__init__.py
+++ b/medium-parser/medium_parser/__init__.py
@@ -5,4 +5,5 @@ from medium_parser import exceptions as exceptions
from medium_parser import exceptions as medium_parser_exceptions
retry_options = ExponentialRetry(attempts=3)
+jinja_env_debug = jinja2.Environment(undefined=jinja2.DebugUndefined)
jinja_env = jinja2.Environment()
diff --git a/medium-parser/medium_parser/api.py b/medium-parser/medium_parser/api.py
index 4149c00..a88c0c0 100644
--- a/medium-parser/medium_parser/api.py
+++ b/medium-parser/medium_parser/api.py
@@ -15,7 +15,12 @@ from medium_parser.utils import generate_random_sha256_hash
class MediumApi:
__slots__ = ("auth_cookies", "proxy_list", "timeout")
- def __init__(self, auth_cookies: Optional[str] = None, proxy_list: Optional[List[str]] = None, timeout: int = 3):
+ def __init__(
+ self,
+ auth_cookies: Optional[str] = None,
+ proxy_list: Optional[List[str]] = None,
+ timeout: int = 3,
+ ):
self.auth_cookies = auth_cookies
self.proxy_list = proxy_list
self.timeout = timeout
@@ -50,7 +55,7 @@ class MediumApi:
"Connection": "Keep-Alive",
}
- if self.auth_cookies:
+ if self.auth_cookies is not None:
headers["Cookie"] = self.auth_cookies
graphql_data = {
@@ -68,7 +73,11 @@ class MediumApi:
logger.debug(f"Request started...")
async with aiohttp.ClientSession(connector=connector) as session:
- async with RetryClient(client_session=session, raise_for_status=False, retry_options=retry_options) as retry_client:
+ async with RetryClient(
+ client_session=session,
+ raise_for_status=False,
+ retry_options=retry_options,
+ ) as retry_client:
async with retry_client.post(
"https://medium.com/_/graphql",
headers=headers,
@@ -76,7 +85,9 @@ class MediumApi:
timeout=self.timeout,
) as request:
if request.status != 200:
- logger.error(f"Failed to fetch post by ID {post_id} with status code: {request.status}")
+ logger.error(
+ f"Failed to fetch post by ID {post_id} with status code: {request.status}"
+ )
return None
try:
@@ -89,7 +100,9 @@ class MediumApi:
logger.debug(f"Request finished...")
if exception:
- logger.error(f"Exception occured while fetching post {post_id}, so let's just fuck it up")
+ logger.error(
+ f"Exception occured while fetching post {post_id}, so let's just fuck it up"
+ )
raise exception
return response_data
diff --git a/medium-parser/medium_parser/core.py b/medium-parser/medium_parser/core.py
index 35fffba..ed12b83 100644
--- a/medium-parser/medium_parser/core.py
+++ b/medium-parser/medium_parser/core.py
@@ -101,7 +101,7 @@ class MediumParser:
async def _get_from_cache():
logger.debug("Using cache backend")
post_data = self.cache.pull(post_id)
- if post_data and post_data.data.has_data():
+ if post_data:
logger.debug("post query was found on cache")
parsed_data = post_data.json()
if parsed_data:
diff --git a/medium-parser/medium_parser/markups.py b/medium-parser/medium_parser/markups.py
index 7aac76d..1b1a0cc 100644
--- a/medium-parser/medium_parser/markups.py
+++ b/medium-parser/medium_parser/markups.py
@@ -1,4 +1,4 @@
-from medium_parser import jinja_env
+from medium_parser import jinja_env_debug
def raw_render(**kwargs):
@@ -15,7 +15,7 @@ def parse_markups(markups: list[str]):
for markup in markups:
if markup["type"] == "A":
if markup["anchorType"] == "LINK":
- template = jinja_env.from_string(
+ template = jinja_env_debug.from_string(
'{{text}}'
)
template = template.render(
@@ -26,7 +26,7 @@ def parse_markups(markups: list[str]):
)
)
elif markup["anchorType"] == "USER":
- template = jinja_env.from_string(
+ template = jinja_env_debug.from_string(
'{{text}}'
)
template = template.render(userId=markup["userId"])
@@ -43,7 +43,7 @@ def parse_markups(markups: list[str]):
else:
continue
- template = jinja_env.from_string(template)
+ template = jinja_env_debug.from_string(template)
markup["template"] = template
markups_out.append(markup)
diff --git a/medium-parser/tests/example_test.py b/medium-parser/tests/example_test.py
index 38d305e..be8ccb4 100644
--- a/medium-parser/tests/example_test.py
+++ b/medium-parser/tests/example_test.py
@@ -4,6 +4,7 @@ import sys
import jinja2
from loguru import logger
+from medium_parser.api import MediumApi
from medium_parser.core import MediumParser
from database_lib import SQLiteCacheBackend
@@ -11,6 +12,7 @@ jinja2_env = jinja2.Environment(
loader=jinja2.FileSystemLoader("./"),
)
+
async def safe_main():
try:
await main()
@@ -19,6 +21,7 @@ async def safe_main():
async def main():
+ medium_api = MediumApi(timeout=8)
logger.remove()
logger.add(sys.stderr, level="INFO")
# logger.add(sys.stderr, level="TRACE")
@@ -27,13 +30,13 @@ async def main():
# dl = await MediumParser.from_url("")
sqlite = SQLiteCacheBackend("test_db.sqlite")
sqlite.init_db()
- dl = MediumParser("ef85d8e72883", sqlite, 8, "localhost")
- query_result = await dl.query(use_cache=False)
+ dl = MediumParser(sqlite, medium_api, 8, "localhost")
+ query_result = await dl.query("cd842ce3f0a3", use_cache=False)
with open("query_result.json", "w") as f:
json.dump(query_result, f, indent=2)
- result = await dl.render_as_html()
+ result = await dl.render_as_html("cd842ce3f0a3")
with open("medium.html", "w") as f:
template = jinja2_env.get_template("example_base_template.html")
diff --git a/rl_string_helper/rl_string_helper/string_helper.py b/rl_string_helper/rl_string_helper/string_helper.py
index 1a8960f..fed3919 100644
--- a/rl_string_helper/rl_string_helper/string_helper.py
+++ b/rl_string_helper/rl_string_helper/string_helper.py
@@ -9,7 +9,6 @@ from rl_string_helper.mixins.string_assignment import (
jinja_env = Environment(undefined=DebugUndefined)
-# TODO: more clarified description
"""
In JavaScript, the `length` property of a String object returns the number of code units (bytes) in the string, which makes use of UTF-16 encoding.
In UTF-16, each Unicode character may be encoded as one or two code units (byte). This means that for certain scripts, such as emojis, mathematical symbols, or some Chinese characters,
@@ -20,57 +19,70 @@ More info to read: https://habr.com/ru/articles/769256/
"""
-# TODO: doc! Who will read this noodles lol?
-# TODO: check cases when UTF-16 character can be more that 2 bytes
-class RLStringHelper:
- __slots__ = (
- "string",
- "templates",
- "replaces",
- "quote_html_type",
- "quote_replaces",
- "_default_bang_char",
- )
-
- def __init__(
- self,
- string: str,
- quote_html_type: list[str] = ["full"],
- _default_bang_char: str = "R",
- ):
- self.string: str = quote_symbol(string)
- self.templates: list[tuple[tuple[int, int], Template]] = []
- self.quote_replaces: list[tuple[tuple[int, int], str]] = []
- self.replaces: list[tuple[tuple[int, int], str]] = []
- self.quote_html_type = quote_html_type
- self._default_bang_char = _default_bang_char
+class UTF16Handler:
+ def __init__(self, default_bang_char: str = "R"):
+ logger.info(
+ f"Initializing UTF16Handler with default_bang_char: {default_bang_char}"
+ )
+ self._default_bang_char = default_bang_char
def pre_utf_16_bang(
self, string: str, string_pos_matrix: list
) -> tuple[str, list, list[tuple[int, int, int]]]:
+ logger.info("Starting pre_utf_16_bang method")
utf_16_bang_list: list[tuple[int, int, int]] = []
string_len_utf_16 = len(string.encode("utf-16-le")) // 2
+ logger.debug(f"UTF-16 length of string: {string_len_utf_16}")
if string_len_utf_16 == len(string):
logger.trace("String doesn't contain multibyte characters")
return string, string_pos_matrix, utf_16_bang_list
i = 0
while len(string) - 1 > i:
+ logger.debug(f"Processing character at index {i}")
new_i = string_pos_matrix[i]
char = string[new_i]
char_len = len(char.encode("utf-16-le")) // 2
if char_len == 2:
+ logger.debug(f"Multibyte character found at index {i}")
char_len_dif = char_len - 1
char_present = self._default_bang_char * char_len_dif
string, string_pos_matrix = self._paste_char(
string, string_pos_matrix, new_i + 1, char_present
)
+ logger.info(f"Mutation: Inserted '{char_present}' at index {new_i + 1}")
i += 1
utf_16_bang_list.append((i, char_len_dif, i))
i += 1
+ logger.info("Finished pre_utf_16_bang method")
return string, string_pos_matrix, utf_16_bang_list
+ def post_utf_16_bang(
+ self,
+ string: StringAssignmentMixin,
+ string_pos_matrix: list,
+ utf_16_bang_list: list,
+ ):
+ logger.info("Starting post_utf_16_bang method")
+ string = StringAssignmentMixin(str(string))
+ post_transbang = 0
+ for bang_pos, char_len, old_pos in utf_16_bang_list:
+ logger.debug(f"Processing bang at position {bang_pos}")
+ string, string_pos_matrix = self._delete_char(
+ string,
+ string_pos_matrix,
+ bang_pos - post_transbang,
+ char_len,
+ old_pos - post_transbang,
+ )
+ logger.info(
+ f"Mutation: Deleted {char_len} character(s) at index {bang_pos - post_transbang}"
+ )
+ post_transbang += char_len
+ logger.info("Finished post_utf_16_bang method")
+ return string, string_pos_matrix
+
def _paste_char(
self,
string: StringAssignmentMixin,
@@ -78,11 +90,13 @@ class RLStringHelper:
pos: int,
char: str,
) -> tuple[StringAssignmentMixin, list]:
+ logger.debug(f"Pasting character '{char}' at position {pos}")
char_len = len(char)
string_pos_matrix.insert(pos, string_pos_matrix[pos])
for matrix_i in range(pos + 1, len(string_pos_matrix)):
string_pos_matrix[matrix_i] += char_len
string.insert(pos, char)
+ logger.info(f"Mutation: Inserted '{char}' at position {pos}")
return string, string_pos_matrix
def _delete_char(
@@ -93,8 +107,11 @@ class RLStringHelper:
char_len: int,
old_pos: int,
):
+ logger.debug(f"Deleting character at position {pos}")
+ deleted_char = string[pos : pos + char_len]
string.pop(pos)
string_pos_matrix.pop(old_pos)
+ logger.info(f"Mutation: Deleted '{deleted_char}' at position {pos}")
for matrix_i in range(pos, len(string_pos_matrix)):
if isinstance(string_pos_matrix[matrix_i], int):
string_pos_matrix[matrix_i] -= char_len
@@ -105,74 +122,35 @@ class RLStringHelper:
)
return string, string_pos_matrix
- def post_utf_16_bang(
+
+class TemplateRenderer:
+ def render_templates(
self,
- string: StringAssignmentMixin,
+ string: str,
string_pos_matrix: list,
utf_16_bang_list: list,
+ templates: list,
):
- string = StringAssignmentMixin(str(string))
- post_transbang = 0
- for bang_pos, char_len, old_pos in utf_16_bang_list:
- string, string_pos_matrix = self._delete_char(
- string,
- string_pos_matrix,
- bang_pos - post_transbang,
- char_len,
- old_pos - post_transbang,
- )
- post_transbang += char_len
- return string, string_pos_matrix
-
- def set_template(self, start: int, end: int, template: str | Template):
- if not isinstance(template, Template):
- template = jinja_env.from_string(template)
- self.templates.append(((start, end), template))
-
- def set_replace(self, start: int, end: int, replace_with: str):
- self.replaces.append(((start, end), replace_with))
-
- def _render_templates(
- self, string: str, string_pos_matrix: list, utf_16_bang_list: list
- ):
- if not self.templates:
+ logger.info("Starting render_templates method")
+ if not templates:
+ logger.info("No templates to render")
return string, string_pos_matrix, utf_16_bang_list
- templates = reversed(self.templates)
+ templates = reversed(templates)
updated_text = string
- def _get_prefix_len(template_raw: Template, inner_char: str = "{"):
- template = template_raw.render()
- return template.find(inner_char)
-
- def _get_suffix_len(template_raw: Template, outer_char: str = "}"):
- template = template_raw.render()
- return len(template) - template.rfind(outer_char) - 1
-
- def update_nested_positions(start, end, prefix_len, suffix_len):
- for i in range(end, len(string_pos_matrix)):
- string_pos_matrix[i] += suffix_len + prefix_len
- for i in range(start, end):
- string_pos_matrix[i] += prefix_len
- for n in range(len(utf_16_bang_list)):
- utf_16_bang = utf_16_bang_list[n]
- if utf_16_bang[2] > end:
- utf_16_bang_list[n] = (
- utf_16_bang[0] + prefix_len + suffix_len,
- utf_16_bang[1],
- utf_16_bang[2],
- )
- elif utf_16_bang[2] > start:
- utf_16_bang_list[n] = (
- utf_16_bang[0] + prefix_len,
- utf_16_bang[1],
- utf_16_bang[2],
- )
-
for (start, end), template in templates:
- if start >= len(string_pos_matrix) or end - 1 >= len(string_pos_matrix):
+ logger.debug(f"Rendering template for range {start}:{end}")
+ if start >= len(string_pos_matrix):
+ logger.warning("Template start range out of bounds, skipping")
continue
+ if end - 1 >= len(string_pos_matrix):
+ logger.warning(
+ "Template end range out of bounds, fixing end position..."
+ )
+ end = len(string_pos_matrix)
if start == end:
+ logger.warning("Empty template range, skipping")
continue
new_start, new_end = (
@@ -180,71 +158,85 @@ class RLStringHelper:
string_pos_matrix[end - 1] + 1,
)
if new_end < new_start:
+ logger.warning("Invalid template range, skipping")
continue
context_text = template.render(text=updated_text[new_start:new_end])
updated_text_template = jinja_env.from_string(
"{{ updated_text[:new_start] }}{{ context_text }}{{updated_text[new_end:]}}"
)
+ old_text = updated_text[new_start:new_end]
updated_text = updated_text_template.render(
updated_text=updated_text,
context_text=context_text,
new_start=new_start,
new_end=new_end,
)
+ logger.info(
+ f"Mutation: Replaced '{old_text}' with '{context_text}' in range {new_start}:{new_end}"
+ )
- prefix_len = _get_prefix_len(template)
- suffix_len = _get_suffix_len(template)
- update_nested_positions(start, end, prefix_len, suffix_len)
+ prefix_len = self._get_prefix_len(template)
+ suffix_len = self._get_suffix_len(template)
+ self._update_nested_positions(
+ string_pos_matrix, utf_16_bang_list, start, end, prefix_len, suffix_len
+ )
+ logger.info("Finished render_templates method")
return updated_text, string_pos_matrix, utf_16_bang_list
- def _render_replaces(
+ def _get_prefix_len(self, template_raw: Template, inner_char: str = "{"):
+ logger.debug("Calculating prefix length")
+ template = template_raw.render()
+ return template.find(inner_char)
+
+ def _get_suffix_len(self, template_raw: Template, outer_char: str = "}"):
+ logger.debug("Calculating suffix length")
+ template = template_raw.render()
+ return len(template) - template.rfind(outer_char) - 1
+
+ def _update_nested_positions(
+ self, string_pos_matrix, utf_16_bang_list, start, end, prefix_len, suffix_len
+ ):
+ logger.debug(f"Updating nested positions for range {start}:{end}")
+ for i in range(end, len(string_pos_matrix)):
+ string_pos_matrix[i] += suffix_len + prefix_len
+ for i in range(start, end):
+ string_pos_matrix[i] += prefix_len
+ for n in range(len(utf_16_bang_list)):
+ utf_16_bang = utf_16_bang_list[n]
+ if utf_16_bang[2] > end:
+ utf_16_bang_list[n] = (
+ utf_16_bang[0] + prefix_len + suffix_len,
+ utf_16_bang[1],
+ utf_16_bang[2],
+ )
+ elif utf_16_bang[2] > start:
+ utf_16_bang_list[n] = (
+ utf_16_bang[0] + prefix_len,
+ utf_16_bang[1],
+ utf_16_bang[2],
+ )
+ logger.info(f"Mutation: Updated positions for template in range {start}:{end}")
+
+
+class StringReplacer:
+ def render_replaces(
self,
string: StringAssignmentMixin,
string_pos_matrix: list,
utf_16_bang_list: list,
+ replaces: list,
):
- if not self.replaces and not self.quote_replaces:
+ logger.info("Starting render_replaces method")
+ if not replaces:
+ logger.info("No replacements to perform")
return string, string_pos_matrix, utf_16_bang_list
string = StringAssignmentMixin(str(string))
- replaces = self.replaces + self.quote_replaces
-
- def update_positions(
- start: int, end: int, replace_len: int, new_start: int, new_end: int
- ):
- pos_len_diff = replace_len - (end - start)
- for pos_index in range(end, len(string_pos_matrix)):
- if isinstance(string_pos_matrix[pos_index], int):
- string_pos_matrix[pos_index] += pos_len_diff
- elif isinstance(string_pos_matrix[pos_index], tuple):
- string_pos_matrix[pos_index] = (
- string_pos_matrix[pos_index][0] + pos_len_diff,
- string_pos_matrix[pos_index][1] + pos_len_diff,
- )
- if pos_len_diff != 0:
- for i in range(start, end):
- if isinstance(string_pos_matrix[i], int):
- string_pos_matrix[i] = (
- string_pos_matrix[i],
- string_pos_matrix[i] + replace_len,
- )
- elif isinstance(string_pos_matrix[i], tuple):
- string_pos_matrix[i] = (
- string_pos_matrix[i][0] + replace_len,
- string_pos_matrix[i][1] + replace_len,
- )
- for n in range(len(utf_16_bang_list)):
- utf_16_bang = utf_16_bang_list[n]
- if utf_16_bang[0] > end:
- utf_16_bang_list[n] = (
- utf_16_bang[0] + pos_len_diff,
- utf_16_bang[1],
- utf_16_bang[2],
- )
for (start, end), replace_with in replaces:
+ logger.debug(f"Performing replacement for range {start}:{end}")
new_start, new_end = string_pos_matrix[start], string_pos_matrix[end - 1]
if isinstance(new_end, int):
new_end += 1
@@ -255,91 +247,231 @@ class RLStringHelper:
)
new_end = max(new_end) if isinstance(new_end, tuple) else new_end
+ old_text = string[new_start:new_end]
string[new_start:new_end] = replace_with
- update_positions(start, end, len(replace_with), new_start, new_end)
+ logger.info(
+ f"Mutation: Replaced '{old_text}' with '{replace_with}' in range {new_start}:{new_end}"
+ )
+ self._update_positions(
+ string_pos_matrix,
+ utf_16_bang_list,
+ start,
+ end,
+ len(replace_with),
+ new_start,
+ new_end,
+ )
+ logger.info("Finished render_replaces method")
return string, string_pos_matrix, utf_16_bang_list
+ def _update_positions(
+ self,
+ string_pos_matrix,
+ utf_16_bang_list,
+ start,
+ end,
+ replace_len,
+ new_start,
+ new_end,
+ ):
+ logger.debug(f"Updating positions for replacement in range {start}:{end}")
+ pos_len_diff = replace_len - (end - start)
+ for pos_index in range(end, len(string_pos_matrix)):
+ if isinstance(string_pos_matrix[pos_index], int):
+ string_pos_matrix[pos_index] += pos_len_diff
+ elif isinstance(string_pos_matrix[pos_index], tuple):
+ string_pos_matrix[pos_index] = (
+ string_pos_matrix[pos_index][0] + pos_len_diff,
+ string_pos_matrix[pos_index][1] + pos_len_diff,
+ )
+ if pos_len_diff != 0:
+ for i in range(start, end):
+ if isinstance(string_pos_matrix[i], int):
+ string_pos_matrix[i] = (
+ string_pos_matrix[i],
+ string_pos_matrix[i] + replace_len,
+ )
+ elif isinstance(string_pos_matrix[i], tuple):
+ string_pos_matrix[i] = (
+ string_pos_matrix[i][0] + replace_len,
+ string_pos_matrix[i][1] + replace_len,
+ )
+ for n in range(len(utf_16_bang_list)):
+ utf_16_bang = utf_16_bang_list[n]
+ if utf_16_bang[0] > end:
+ utf_16_bang_list[n] = (
+ utf_16_bang[0] + pos_len_diff,
+ utf_16_bang[1],
+ utf_16_bang[2],
+ )
+ logger.info(
+ f"Mutation: Updated positions for replacement in range {start}:{end}"
+ )
+
+
+class RLStringHelper:
+ def __init__(
+ self,
+ string: str,
+ quote_html_type: list[str] = ["full"],
+ _default_bang_char: str = "R",
+ ):
+ logger.info("Initializing RLStringHelper")
+ self.string: str = quote_symbol(string)
+ self.templates: list[tuple[tuple[int, int], Template]] = []
+ self.quote_replaces: list[tuple[tuple[int, int], str]] = []
+ self.replaces: list[tuple[tuple[int, int], str]] = []
+ self.quote_html_type = quote_html_type
+ self.utf16_handler = UTF16Handler(_default_bang_char)
+ self.template_renderer = TemplateRenderer()
+ self.string_replacer = StringReplacer()
+
+ def set_template(self, start: int, end: int, template: str | Template):
+ logger.info(f"Setting template for range {start}:{end}")
+ if not isinstance(template, Template):
+ template = jinja_env.from_string(template)
+ self.templates.append(((start, end), template))
+ logger.info(f"Mutation: Added template for range {start}:{end}")
+
+ def set_replace(self, start: int, end: int, replace_with: str):
+ logger.info(f"Setting replacement for range {start}:{end}")
+ self.replaces.append(((start, end), replace_with))
+ logger.info(
+ f"Mutation: Added replacement '{replace_with}' for range {start}:{end}"
+ )
+
def __str__(self):
+ logger.info("Converting RLStringHelper to string")
string = StringAssignmentMixin(self.string)
string_pos_matrix = list(range(len(string)))
- updated_text, string_pos_matrix, utf_16_bang_list = self.pre_utf_16_bang(
- string, string_pos_matrix
+ updated_text, string_pos_matrix, utf_16_bang_list = (
+ self.utf16_handler.pre_utf_16_bang(string, string_pos_matrix)
)
if self.quote_html_type:
+ logger.info("Applying HTML quoting")
self.quote_replaces = list(
quote_html(str(updated_text), self.quote_html_type)
)
+ logger.info(
+ f"Mutation: Added {len(self.quote_replaces)} HTML quote replacements"
+ )
if not self.templates and not self.replaces and not self.quote_replaces:
+ logger.info("No modifications needed, returning original string")
return self.string
- updated_text, string_pos_matrix, utf_16_bang_list = self._render_templates(
- updated_text, string_pos_matrix, utf_16_bang_list
- )
- updated_text, string_pos_matrix, utf_16_bang_list = self._render_replaces(
- updated_text, string_pos_matrix, utf_16_bang_list
- )
- updated_text, string_pos_matrix = self.post_utf_16_bang(
+ updated_text, string_pos_matrix, utf_16_bang_list = (
+ self.template_renderer.render_templates(
+ updated_text, string_pos_matrix, utf_16_bang_list, self.templates
+ )
+ )
+ updated_text, string_pos_matrix, utf_16_bang_list = (
+ self.string_replacer.render_replaces(
+ updated_text,
+ string_pos_matrix,
+ utf_16_bang_list,
+ self.replaces + self.quote_replaces,
+ )
+ )
+ updated_text, string_pos_matrix = self.utf16_handler.post_utf_16_bang(
updated_text, string_pos_matrix, utf_16_bang_list
)
+ logger.info("Finished string conversion")
return str(updated_text)
def get_text(self):
+ logger.info("Getting text from RLStringHelper")
return self.__str__()
-def split_overlapping_ranges(markups, _retry_count: int = 7):
- for _ in range(len(markups) * _retry_count):
- new_markups = split_overlapping_range_position(markups)
- if len(new_markups) == len(markups):
- break
- markups = new_markups
- return markups
+def split_overlapping_ranges(markups):
+ logger.info("Starting split_overlapping_ranges")
+ new_markups = process_and_optimize_intervals(
+ *[
+ Interval(markup["start"], markup["end"], markup["type"], markup["template"])
+ for markup in markups
+ ]
+ )
+ dict_new_markups = [markup.to_dict() for markup in new_markups]
+ return dict_new_markups
-def split_overlapping_range_position(positions):
- if not positions:
- return []
+class Interval:
+ def __init__(self, start, end, type, template=None):
+ self.start = start
+ self.end = end
+ self.type = type
+ self.template = template
- positions.sort(key=lambda x: x["start"])
- result = [positions[0]]
+ def __repr__(self):
+ return f"start={self.start}, end={self.end}, type={self.type}, template={self.template}"
- for pos in positions[1:]:
- last = result[-1]
- if not pos["start"] < last["end"]:
- result.append(pos.copy())
- continue
+ def to_dict(self):
+ return {
+ "start": self.start,
+ "end": self.end,
+ "type": self.type,
+ "template": self.template,
+ }
- if pos["type"] != last["type"]:
- if pos["end"] <= last["end"]:
- result[-1] = {
- "start": last["start"],
- "end": pos["start"],
- "type": last["type"],
- "template": last["template"],
- }
- result.append(pos.copy())
- if pos["end"] < last["end"]:
- result.append(
- {
- "start": pos["end"],
- "end": last["end"],
- "type": last["type"],
- "template": last["template"],
- }
- )
- else:
- result[-1] = {
- "start": last["start"],
- "end": pos["start"],
- "type": last["type"],
- "template": last["template"],
- }
- result.append(pos.copy())
- else:
- result[-1]["end"] = max(last["end"], pos["end"])
+
+def split_intervals_with_types(*intervals):
+ points = set()
+ for interval in intervals:
+ points.add(interval.start)
+ points.add(interval.end)
+
+ sorted_points = sorted(points)
+ result = []
+
+ for i in range(len(sorted_points) - 1):
+ start = sorted_points[i]
+ end = sorted_points[i + 1]
+ types_and_templates = [
+ (interval.type, interval.template)
+ for interval in intervals
+ if interval.start <= start and interval.end >= end
+ ]
+ result.append(
+ Interval(
+ start,
+ end,
+ [t[0] for t in types_and_templates],
+ [t[1] for t in types_and_templates],
+ )
+ )
return result
+
+
+def convert_to_object_string(input_data):
+ result = []
+
+ for item in input_data:
+ start, end = item.start, item.end
+ for type_, template in zip(item.type, item.template):
+ result.append(Interval(start, end, type_, template))
+
+ return result
+
+
+def process_and_optimize_intervals(*intervals):
+ split = split_intervals_with_types(*intervals)
+ return convert_to_object_string(split)
+
+
+# intervals = [
+# Interval(4, 17, "bold", "{{text}}"),
+# Interval(10, 21, "italic", "{{text}}"),
+# Interval(18, 27, "underline", "{{text}}"),
+# Interval(33, 41, "code", "{{text}}"),
+# Interval(40, 46, "link", '{{text}}'),
+# Interval(0, 46, "span", "{{text}}"),
+# ]
+
+# intervals_result =
+# for interval in intervals_result:
+# print(interval)
diff --git a/rl_string_helper/tests/test_rl_string_helper.py b/rl_string_helper/tests/test_rl_string_helper.py
index 5685f54..e56adca 100644
--- a/rl_string_helper/tests/test_rl_string_helper.py
+++ b/rl_string_helper/tests/test_rl_string_helper.py
@@ -73,62 +73,112 @@ class TestRLStringHelper:
helper.set_template(0, 11, "{{text}}")
assert str(helper) == "Hello world"
- # def test_super_duper_overlapsing(self):
- # # https://medium.com/google-cloud/implementing-semantic-caching-a-step-by-step-guide-to-faster-cost-effective-genai-workflows-ef85d8e72883#bypass
- # text = "Note: The patterns and ideas discussed in this post are broadly applicable and can be adopted for other cloud providers."
- # helper = RLStringHelper(text)
- # markups = (
- # [
- # {
- # "__typename": "Markup",
- # "name": None,
- # "type": "CODE",
- # "start": 0,
- # "end": 5,
- # "href": None,
- # "title": None,
- # "rel": None,
- # "anchorType": None,
- # "userId": None,
- # "creatorIds": None,
- # },
- # {
- # "__typename": "Markup",
- # "name": None,
- # "type": "STRONG",
- # "start": 0,
- # "end": 6,
- # "href": None,
- # "title": None,
- # "rel": None,
- # "anchorType": None,
- # "userId": None,
- # "creatorIds": None,
- # },
- # {
- # "__typename": "Markup",
- # "name": None,
- # "type": "EM",
- # "start": 0,
- # "end": 6,
- # "href": None,
- # "title": None,
- # "rel": None,
- # "anchorType": None,
- # "userId": None,
- # "creatorIds": None,
- # },
- # ],
- # )
- # parsed_markups = parse_markups(markups[0])
- # logger.debug(parsed_markups)
- # parsed_markups = split_overlapping_ranges(parsed_markups)
- # logger.debug(parsed_markups)
- # for markup in parsed_markups:
- # helper.set_template(markup["start"], markup["end"], markup["template"])
+ def test_super_duper_overlapsing(self):
+ text = "Note: The patterns and ideas discussed in this post are broadly applicable."
+ helper = RLStringHelper(text)
- # expected_pattern = r"]*>Note: The patterns and ideas discussed in this post are broadly applicable and can be adopted for other cloud providers\."
- # assert re.match(expected_pattern, str(helper))
+ markups = [
+ {"start": 0, "end": 5, "type": "code", "template": "{{text}}"},
+ {
+ "start": 0,
+ "end": 6,
+ "type": "strong",
+ "template": "{{text}}",
+ },
+ {"start": 0, "end": 6, "type": "em", "template": "{{text}}"},
+ ]
+
+ parsed_markups = split_overlapping_ranges(markups)
+ for markup in parsed_markups:
+ helper.set_template(markup["start"], markup["end"], markup["template"])
+
+ expected_pattern = r"Note: The patterns and ideas discussed in this post are broadly applicable\."
+ result = str(helper)
+ assert re.match(expected_pattern, result)
+
+ def test_complex_overlapping_tags(self):
+ text = "The quick (brown) fox jumps over 13 lazy dogs!"
+ helper = RLStringHelper(text)
+
+ markups = [
+ {
+ "start": 0,
+ "end": 46,
+ "type": "span",
+ "template": "{{text}}",
+ },
+ {"start": 4, "end": 17, "type": "bold", "template": "{{text}}"},
+ {"start": 10, "end": 21, "type": "italic", "template": "{{text}}"},
+ {
+ "start": 18,
+ "end": 27,
+ "type": "underline",
+ "template": "{{text}}",
+ },
+ {
+ "start": 33,
+ "end": 41,
+ "type": "code",
+ "template": "{{text}}",
+ },
+ {
+ "start": 40,
+ "end": 46,
+ "type": "link",
+ "template": '{{text}}',
+ },
+ ]
+
+ parsed_markups = split_overlapping_ranges(markups)
+ for markup in parsed_markups:
+ helper.set_template(markup["start"], markup["end"], markup["template"])
+
+ expected_output = (
+ "The quick (brown) fox jumps "
+ 'over 13 lazy dogs!'
+ )
+ assert str(helper) == expected_output
+
+ def test_nmultibyte_emoji(self):
+ from medium_parser.markups import parse_markups
+
+ data = {
+ "__typename": "Paragraph",
+ "id": "236e7049b537_33",
+ "name": "ba8c",
+ "href": None,
+ "text": "Noah dragged his two printers out from Settings ⚙️ < Printers & Scanners \ud83d\udda8️ and dropped them in Dock or Desktop, I don’t remember — but you can drag to both the places.",
+ "iframe": None,
+ "layout": None,
+ "markups": [
+ {
+ "__typename": "Markup",
+ "name": None,
+ "type": "CODE",
+ "start": 39,
+ "end": 76,
+ "href": None,
+ "title": None,
+ "rel": None,
+ "anchorType": None,
+ "userId": None,
+ "creatorIds": None,
+ }
+ ],
+ "metadata": None,
+ "mixtapeMetadata": None,
+ "type": "P",
+ "hasDropCap": None,
+ "dropCapImage": None,
+ "codeBlockMetadata": None,
+ }
+ helper = RLStringHelper(data["text"])
+ parsed_markups = split_overlapping_ranges(parse_markups(data["markups"]))
+ for markup in parsed_markups:
+ helper.set_template(markup["start"], markup["end"], markup["template"])
+ print(str(helper))
+
+ assert str(helper) == data["text"]
def test_basic_replace(self):
# Replace A to B - ONE to ONE char
@@ -188,26 +238,23 @@ class TestRLStringHelper:
helper = RLStringHelper(issue_text)
assert helper.get_text() == issue_text
- # def test_markup_parser(self):
- # href_markup = {
- # "__typename": 'Markup',
- # "anchorType": 'LINK',
- # "end": 12,
- # "href": 'https://readwise.io/bookreview/{{book_id',
- # "name": None,
- # "rel": 'nofollow',
- # "start": 0,
- # "title": '',
- # "type": 'A',
- # "userId": None
- # }
+ def test_markup_parser(self):
+ href_markup = {
+ "start": 0,
+ "end": 12,
+ "type": "a",
+ "template": '{{ text }}',
+ }
- # helper = RLStringHelper("Hello world")
- # markups = parse_markups([href_markup])
- # parsed_markups = split_overlapping_ranges(markups)
- # for markup in parsed_markups:
- # helper.set_template(markup["start"], markup["end"], markup["template"])
- # assert helper.get_text() == 'Hello world'
+ helper = RLStringHelper("Hello world")
+ parsed_markups = split_overlapping_ranges([href_markup])
+ for markup in parsed_markups:
+ helper.set_template(markup["start"], markup["end"], markup["template"])
+
+ assert (
+ helper.get_text()
+ == 'Hello world'
+ )
def test_medium_all(self):
helper = RLStringHelper("ABC Hello world")
diff --git a/test_lab/pr_brAAAAAAAAAAAAA.py b/test_lab/pr_brAAAAAAAAAAAAA.py
new file mode 100644
index 0000000..7ed8e3f
--- /dev/null
+++ b/test_lab/pr_brAAAAAAAAAAAAA.py
@@ -0,0 +1,59 @@
+from collections import defaultdict
+import heapq
+from icecream import ic
+from loguru import logger
+
+
+def _split_overlapping_ranges(positions):
+ logger.info("Starting improved split_overlapping_range_position")
+ if not positions:
+ logger.info("No positions to split")
+ return []
+
+ events = []
+ for i, pos in enumerate(positions):
+ heapq.heappush(events, (pos["start"], 0, i)) # 0 for start event
+ heapq.heappush(events, (pos["end"], 1, i)) # 1 for end event
+
+ active = set()
+ result = []
+ last_point = None
+ open_ranges = defaultdict(list)
+
+ while events:
+ point, event_type, index = heapq.heappop(events)
+
+ if last_point is not None and point > last_point and active:
+ for act_index in active:
+ open_ranges[act_index].append(
+ {
+ "start": last_point,
+ "end": point,
+ "type": positions[act_index]["type"],
+ "template": positions[act_index]["template"],
+ }
+ )
+
+ if event_type == 0: # Start event
+ active.add(index)
+ else: # End event
+ active.remove(index)
+ if open_ranges[index]:
+ result.extend(open_ranges[index])
+ del open_ranges[index]
+
+ last_point = point
+
+ # Sort the result based on the original order of positions
+ result.sort(
+ key=lambda x: next(
+ i
+ for i, pos in enumerate(positions)
+ if pos["type"] == x["type"] and pos["template"] == x["template"]
+ )
+ )
+
+ logger.info(
+ f"Finished improved split_overlapping_range_position. Generated {len(result)} ranges."
+ )
+ return result