web/database-lib/database_lib/cache_db.py

151 lines
6 KiB
Python
Raw Normal View History

2024-01-31 00:48:20 +00:00
from typing import Union
import sqlite3
import json
2024-02-23 04:49:43 +00:00
from loguru import logger
2024-01-31 00:48:20 +00:00
from warnings import warn
2024-02-23 08:08:05 +00:00
import threading
2024-01-31 00:48:20 +00:00
try:
import sqlite_zstd
except ImportError:
2024-02-23 05:47:31 +00:00
logger.debug("Can't use zstd compression. Please install 'sqlite_zstd' package")
2024-01-31 00:48:20 +00:00
warn("Can't use zstd compression. Please install 'sqlite_zstd' package")
sqlite_zstd = None
class CacheResponse:
2024-02-23 04:49:43 +00:00
__slots__ = ('data',)
2024-01-31 00:48:20 +00:00
def __init__(self, data: str):
self.data = data
def json(self):
return json.loads(self.data)
def __repr__(self):
return self.data
def __str__(self):
return self.data
class SQLiteCacheBackend:
2024-02-24 02:10:47 +00:00
__slots__ = ('connection', 'cursor', 'database', 'lock')
2024-01-31 00:48:20 +00:00
def __init__(self, database: str):
2024-02-23 08:14:30 +00:00
self.database = database
2024-02-24 02:10:47 +00:00
self.connection = sqlite3.connect(database, timeout=10.0)
2024-01-31 00:48:20 +00:00
self.connection.enable_load_extension(True) # Enable loading of extensions
2024-02-23 07:01:31 +00:00
self.connection.execute("PRAGMA foreign_keys = ON;") # Need for working with foreign keys in db
self.connection.execute("PRAGMA journal_mode=WAL;") # Need to properly work with ZSTD compression
self.connection.execute("PRAGMA auto_vacuum=full;") # Same as above thing
2024-01-31 00:48:20 +00:00
self.cursor = self.connection.cursor()
2024-02-24 02:10:47 +00:00
self.lock = threading.Lock()
2024-01-31 00:48:20 +00:00
if sqlite_zstd is not None:
sqlite_zstd.load(self.connection)
def all(self):
with self.connection:
return self.cursor.execute("SELECT * FROM cache").fetchall()
def all_length(self) -> int:
with self.connection:
return self.cursor.execute("SELECT COUNT(*) FROM cache").fetchone()[0]
def random(self, size: int):
with self.connection:
2024-02-23 04:49:43 +00:00
return self.cursor.execute("SELECT * FROM cache ORDER BY RANDOM() LIMIT ?", (size,)).fetchall()
2024-01-31 00:48:20 +00:00
def enable_zstd(self):
if sqlite_zstd is None:
raise ValueError("Can't use zstd compression. Please install 'sqlite_zstd' package")
with self.connection:
try:
2024-02-23 07:01:31 +00:00
self.cursor.execute("SELECT zstd_enable_transparent('{\"table\": \"cache\", \"column\": \"value\", \"compression_level\": 9, \"dict_chooser\": \"''a''\"}')")
2024-01-31 00:48:20 +00:00
except Exception as error:
print(error)
2024-02-23 07:01:31 +00:00
self.connection.execute("PRAGMA auto_vacuum=full")
2024-02-23 08:08:05 +00:00
self.maintenance_thread()
2024-01-31 00:48:20 +00:00
def init_db(self):
with self.connection:
self.cursor.execute("CREATE TABLE IF NOT EXISTS cache (key TEXT PRIMARY KEY, value TEXT)")
2024-02-23 07:01:31 +00:00
# self.cursor.execute("CREATE INDEX IF NOT EXISTS idx_key ON cache (key)")
2024-01-31 00:48:20 +00:00
2024-02-23 05:47:31 +00:00
def pull(self, key: str) -> Union[CacheResponse, None]:
2024-01-31 00:48:20 +00:00
with self.connection:
cache = self.cursor.execute("SELECT value FROM cache WHERE key = :0", {'0': key}).fetchone()
if cache:
2024-02-23 04:49:43 +00:00
logger.debug("Value found in DB, returning it")
2024-01-31 00:48:20 +00:00
return CacheResponse(cache[0])
2024-02-23 05:47:31 +00:00
else:
logger.debug(f"No value found for key: {key}")
return None
2024-01-31 00:48:20 +00:00
2024-02-23 05:47:31 +00:00
def push(self, key: str, value: Union[str, dict]) -> None:
2024-01-31 00:48:20 +00:00
if isinstance(value, dict):
2024-02-23 05:47:31 +00:00
try:
value = json.dumps(value)
except TypeError as e:
raise ValueError(f"Unable to serialize value to JSON: {e}")
2024-01-31 00:48:20 +00:00
elif not isinstance(value, str):
2024-02-23 05:47:31 +00:00
raise ValueError(f"value argument should be a string or dict, not {type(value).__name__}")
2024-02-24 02:10:47 +00:00
with self.lock:
with self.connection:
self.cursor.execute("INSERT OR REPLACE INTO cache VALUES (:0, :1)", {'0': key, '1': value})
2024-02-23 05:47:31 +00:00
2024-01-31 00:48:20 +00:00
def delete(self, key: str) -> None:
with self.connection:
2024-02-23 05:47:31 +00:00
result = self.cursor.execute("SELECT 1 FROM cache WHERE key = :0", {'0': key}).fetchone()
if result:
self.cursor.execute("DELETE FROM cache WHERE key = :0", {'0': key})
logger.debug(f"Deleted key: {key}")
else:
logger.debug(f"Attempted to delete non-existing key: {key}")
2024-02-23 08:08:05 +00:00
def maintenance(self, time: int = None, blocking_time: int = 0.5):
2024-02-23 08:14:30 +00:00
connection = sqlite3.connect(self.database)
cursor = connection.cursor()
2024-02-23 08:16:42 +00:00
connection.enable_load_extension(True) # Enable loading of extensions
connection.execute("PRAGMA foreign_keys = ON;") # Need for working with foreign keys in db
connection.execute("PRAGMA journal_mode=WAL;") # Need to properly work with ZSTD compression
connection.execute("PRAGMA auto_vacuum=full;") # Same as above thing
2024-02-24 02:10:47 +00:00
2024-02-23 08:16:42 +00:00
if sqlite_zstd is not None:
sqlite_zstd.load(connection)
2024-02-24 02:10:47 +00:00
2024-02-23 08:14:30 +00:00
with connection:
2024-02-23 08:08:05 +00:00
if time is not None:
2024-02-23 08:14:30 +00:00
cursor.execute("SELECT zstd_incremental_maintenance(?, ?);", (time, blocking_time))
2024-02-23 08:08:05 +00:00
else:
2024-02-23 08:14:30 +00:00
cursor.execute("SELECT zstd_incremental_maintenance(null, ?);", (blocking_time,))
cursor.execute("VACUUM")
cursor.execute("ANALYZE")
2024-02-24 02:10:47 +00:00
cursor.close()
connection.close()
2024-02-23 05:47:31 +00:00
2024-02-23 08:08:05 +00:00
def maintenance_thread(self):
maintenance_thread = threading.Thread(target=self.maintenance, daemon=True)
maintenance_thread.start()
# Doesn't works with sqlite_zstd
# def migrate_add_index_to_key(self):
# with self.connection:
# # Check if the index already exists
# index_exists = self.cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name='idx_key'").fetchone()
# if not index_exists:
# # Create the index if it doesn't exist
# self.cursor.execute("CREATE INDEX idx_key ON cache (key)")
# logger.info("Index 'idx_key' on column 'key' created successfully.")
# else:
# logger.info("Index 'idx_key' on column 'key' already exists.")
2024-01-31 00:48:20 +00:00
2024-02-23 07:01:31 +00:00
def show_schema_info(self):
with self.connection:
return self.connection.execute("SELECT sql FROM sqlite_master").fetchall()
2024-01-31 00:48:20 +00:00
def close(self):
self.__del__()
def __del__(self) -> None:
self.connection.close()