feat(beta): httpx migration, Tor/proxy refactor, JSON results, alt-link fixes, tests, optional static bundling, HTTP/2 env toggle, cleanup

This commit is contained in:
Don-Swanson 2025-09-21 00:11:54 -05:00
parent 418d9df89c
commit 7f80eb1e51
No known key found for this signature in database
GPG key ID: C6A6ACD574A005E5
27 changed files with 825 additions and 113 deletions

13
.pre-commit-config.yaml Normal file
View file

@ -0,0 +1,13 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.9
hooks:
- id: ruff
args: [--fix]
- id: ruff-format
- repo: https://github.com/psf/black
rev: 24.8.0
hooks:
- id: black
args: [--quiet]

View file

@ -75,6 +75,7 @@ Contents
- User-defined [custom bangs](#custom-bangs)
- Optional location-based searching (i.e. results near \<city\>)
- Optional NoJS mode to view search results in a separate window with JavaScript blocked
- JSON output for results via content negotiation (see "JSON results (API)")
<sup>*No third party JavaScript. Whoogle can be used with JavaScript disabled, but if enabled, uses JavaScript for things like presenting search suggestions.</sup>
@ -463,6 +464,8 @@ There are a few optional environment variables available for customizing a Whoog
| WHOOGLE_SHOW_FAVICONS | Show/hide favicons next to search result URLs. Default on. |
| WHOOGLE_UPDATE_CHECK | Enable/disable the automatic daily check for new versions of Whoogle. Default on. |
| WHOOGLE_FALLBACK_ENGINE_URL | Set a fallback Search Engine URL when there is internal server error or instance is rate-limited. Search query is appended to the end of the URL (eg. https://duckduckgo.com/?k1=-1&q=). |
| WHOOGLE_BUNDLE_STATIC | When set to 1, serve a single bundled CSS and JS file generated at startup to reduce requests. Default off. |
| WHOOGLE_HTTP2 | Enable HTTP/2 for upstream requests (via httpx). Default on — set to 0 to force HTTP/1.1. |
### Config Environment Variables
These environment variables allow setting default config values, but can be overwritten manually by using the home page config menu. These allow a shortcut for destroying/rebuilding an instance to the same config state every time.
@ -495,6 +498,28 @@ Same as most search engines, with the exception of filtering by time range.
To filter by a range of time, append ":past <time>" to the end of your search, where <time> can be `hour`, `day`, `month`, or `year`. Example: `coronavirus updates :past hour`
### JSON results (API)
Whoogle can return filtered results as JSON using the same sanitization rules as the HTML view.
- Send `Accept: application/json` or append `format=json` to the search URL.
- Example: `/search?q=whoogle` with `Accept: application/json`, or `/search?q=whoogle&format=json`.
- Response shape:
```
{
"query": "whoogle",
"search_type": "",
"results": [
{"href": "https://example.com/page", "text": "Example Page"},
...
]
}
```
Special cases:
- Feeling Lucky returns HTTP 303 with body `{ "redirect": "<url>" }`.
- Temporary blocks (captcha) return HTTP 503 with `{ "blocked": true, "error_message": "...", "query": "..." }`.
## Extra Steps
### Set Whoogle as your primary search engine
@ -630,6 +655,14 @@ server {
You can then add SSL support using LetsEncrypt by following a guide such as [this one](https://www.nginx.com/blog/using-free-ssltls-certificates-from-lets-encrypt-with-nginx/).
### Static asset bundling (optional)
Whoogle can optionally serve a single bundled CSS and JS to reduce the number of HTTP requests.
- Enable by setting `WHOOGLE_BUNDLE_STATIC=1` and restarting the app.
- On startup, Whoogle concatenates local CSS/JS into hashed files under `app/static/build/` and templates will prefer those bundles.
- When disabled (default), templates load individual CSS/JS files for easier development.
- Note: Theme CSS (`*-theme.css`) are still loaded separately to honor user theme selection.
## Contributing
Under the hood, Whoogle is a basic Flask app with the following structure:

View file

@ -18,6 +18,8 @@ import warnings
from werkzeug.middleware.proxy_fix import ProxyFix
from app.utils.misc import read_config_bool
from app.services.http_client import HttpxClient
from app.services.provider import close_all_clients
from app.version import __version__
app = Flask(__name__, static_folder=os.path.dirname(
@ -50,6 +52,7 @@ app.config['STATIC_FOLDER'] = os.getenv(
app.config['BUILD_FOLDER'] = os.path.join(
app.config['STATIC_FOLDER'], 'build')
app.config['CACHE_BUSTING_MAP'] = {}
app.config['BUNDLE_STATIC'] = read_config_bool('WHOOGLE_BUNDLE_STATIC')
app.config['LANGUAGES'] = json.load(open(
os.path.join(app.config['STATIC_FOLDER'], 'settings/languages.json'),
encoding='utf-8'))
@ -86,6 +89,17 @@ app.config['BANG_FILE'] = os.path.join(
app.config['BANG_PATH'],
'bangs.json')
# Global services registry (simple DI)
app.services = {}
@app.teardown_appcontext
def _teardown_clients(exception):
try:
close_all_clients()
except Exception:
pass
# Ensure all necessary directories exist
if not os.path.exists(app.config['CONFIG_PATH']):
os.makedirs(app.config['CONFIG_PATH'])
@ -174,10 +188,54 @@ for cb_dir in cache_busting_dirs:
map_path = map_path[1:]
app.config['CACHE_BUSTING_MAP'][cb_file] = map_path
# Optionally create simple bundled assets (opt-in via WHOOGLE_BUNDLE_STATIC=1)
if app.config['BUNDLE_STATIC']:
# CSS bundle: include all css except theme files (end with -theme.css)
css_dir = os.path.join(app.config['STATIC_FOLDER'], 'css')
css_parts = []
for name in sorted(os.listdir(css_dir)):
if not name.endswith('.css'):
continue
if name.endswith('-theme.css'):
continue
try:
css_parts.append(open(os.path.join(css_dir, name), 'r', encoding='utf-8').read())
except Exception:
pass
css_bundle = '\n'.join(css_parts)
if css_bundle:
css_tmp = os.path.join(app.config['BUILD_FOLDER'], 'app.css')
open(css_tmp, 'w', encoding='utf-8').write(css_bundle)
css_hashed = gen_file_hash(app.config['BUILD_FOLDER'], 'app.css')
os.replace(css_tmp, os.path.join(app.config['BUILD_FOLDER'], css_hashed))
map_path = os.path.join('app/static/build', css_hashed)
app.config['CACHE_BUSTING_MAP']['bundle.css'] = map_path
# JS bundle: include all js files
js_dir = os.path.join(app.config['STATIC_FOLDER'], 'js')
js_parts = []
for name in sorted(os.listdir(js_dir)):
if not name.endswith('.js'):
continue
try:
js_parts.append(open(os.path.join(js_dir, name), 'r', encoding='utf-8').read())
except Exception:
pass
js_bundle = '\n;'.join(js_parts)
if js_bundle:
js_tmp = os.path.join(app.config['BUILD_FOLDER'], 'app.js')
open(js_tmp, 'w', encoding='utf-8').write(js_bundle)
js_hashed = gen_file_hash(app.config['BUILD_FOLDER'], 'app.js')
os.replace(js_tmp, os.path.join(app.config['BUILD_FOLDER'], js_hashed))
map_path = os.path.join('app/static/build', js_hashed)
app.config['CACHE_BUSTING_MAP']['bundle.js'] = map_path
# Templating functions
app.jinja_env.globals.update(clean_query=clean_query)
app.jinja_env.globals.update(
cb_url=lambda f: app.config['CACHE_BUSTING_MAP'][f.lower()])
app.jinja_env.globals.update(
bundle_static=lambda: app.config.get('BUNDLE_STATIC', False))
# Attempt to acquire tor identity, to determine if Tor config is available
send_tor_signal(Signal.HEARTBEAT)

View file

@ -649,50 +649,94 @@ class Filter:
"""Replaces link locations and page elements if "alts" config
is enabled
"""
for site, alt in SITE_ALTS.items():
if site != "medium.com" and alt != "":
# Ignore medium.com replacements since these are handled
# specifically in the link description replacement, and medium
# results are never given their own "card" result where this
# replacement would make sense.
# Also ignore if the alt is empty, since this is used to indicate
# that the alt is not enabled.
for div in self.soup.find_all('div', text=re.compile(site)):
# Use the number of words in the div string to determine if the
# string is a result description (shouldn't replace domains used
# in desc text).
if len(div.string.split(' ')) == 1:
# Precompute regex for sites (escape dots) and common prefixes
site_keys = list(SITE_ALTS.keys())
if not site_keys:
return
sites_pattern = re.compile('|'.join([re.escape(k) for k in site_keys]))
prefix_pattern = re.compile(r'^(?:https?:\/\/)?(?:(?:www|mobile|m)\.)?')
# 1) Replace bare domain divs (single token) once, avoiding duplicates
for div in self.soup.find_all('div', text=sites_pattern):
if not div or not div.string:
continue
if len(div.string.split(' ')) != 1:
continue
match = sites_pattern.search(div.string)
if not match:
continue
site = match.group(0)
alt = SITE_ALTS.get(site, '')
if not alt:
continue
# Skip if already contains the alt to avoid old.old.* repetition
if alt in div.string:
continue
div.string = div.string.replace(site, alt)
# 2) Update link hrefs and descriptions in a single pass
for link in self.soup.find_all('a', href=True):
# Search and replace all link descriptions
# with alternative location
link['href'] = get_site_alt(link['href'])
link_desc = link.find_all(
text=re.compile('|'.join(SITE_ALTS.keys())))
if len(link_desc) == 0:
# Find a description text node matching a known site
desc_nodes = link.find_all(text=sites_pattern)
if not desc_nodes:
continue
desc_node = desc_nodes[0]
link_str = str(desc_node)
# Determine which site key is present in the description
site_match = sites_pattern.search(link_str)
if not site_match:
continue
site = site_match.group(0)
alt = SITE_ALTS.get(site, '')
if not alt:
continue
# Replace link description
link_desc = link_desc[0]
if site not in link_desc or not alt:
# Avoid duplication if alt already present
if alt in link_str:
continue
new_desc = BeautifulSoup(features='html.parser').new_tag('div')
link_str = str(link_desc)
# Medium links should be handled differently, since 'medium.com'
# is a common substring of domain names, but shouldn't be
# replaced (i.e. 'philomedium.com' should stay as it is).
# Medium-specific handling remains to avoid matching substrings
if 'medium.com' in link_str:
if link_str.startswith('medium.com') or '.medium.com' in link_str:
link_str = SITE_ALTS['medium.com'] + link_str[
link_str.find('medium.com') + len('medium.com'):]
new_desc.string = link_str
replaced = SITE_ALTS['medium.com'] + link_str[
link_str.find('medium.com') + len('medium.com'):
]
else:
new_desc.string = link_str.replace(site, alt)
replaced = link_str
else:
# If the description looks like a URL with scheme, replace only the host
if '://' in link_str:
scheme, rest = link_str.split('://', 1)
host, sep, path = rest.partition('/')
# Drop common prefixes from host when swapping to a fully-qualified alt
alt_parsed = urlparse.urlparse(alt)
alt_host = alt_parsed.netloc if alt_parsed.netloc else alt.replace('https://', '').replace('http://', '')
# If alt includes a scheme, prefer its host; otherwise use alt as host
if alt_parsed.scheme:
new_host = alt_host
else:
# When alt has no scheme, still replace entire host
new_host = alt
# Prevent replacing if host already equals target
if host == new_host:
replaced = link_str
else:
replaced = f"{scheme}://{new_host}{sep}{path}"
else:
# No scheme in the text; include optional prefixes in replacement
# Replace any leading www./m./mobile. + site with alt host (no scheme)
alt_parsed = urlparse.urlparse(alt)
alt_host = alt_parsed.netloc if alt_parsed.netloc else alt.replace('https://', '').replace('http://', '')
# Build a pattern that includes optional prefixes for the specific site
site_with_prefix = re.compile(rf'(?:(?:www|mobile|m)\.)?{re.escape(site)}')
replaced = site_with_prefix.sub(alt_host, link_str, count=1)
link_desc.replace_with(new_desc)
new_desc = BeautifulSoup(features='html.parser').new_tag('div')
new_desc.string = replaced
desc_node.replace_with(new_desc)
def view_image(self, soup) -> BeautifulSoup:
"""Replaces the soup with a new one that handles mobile results and

View file

@ -1,10 +1,10 @@
from app.models.config import Config
from app.utils.misc import read_config_bool
from app.services.provider import get_http_client
from datetime import datetime
from defusedxml import ElementTree as ET
import random
import requests
from requests import Response, ConnectionError
import httpx
import urllib.parse as urlparse
import os
from stem import Signal, SocketError
@ -202,7 +202,7 @@ class Request:
config: the user's current whoogle configuration
"""
def __init__(self, normal_ua, root_path, config: Config):
def __init__(self, normal_ua, root_path, config: Config, http_client=None):
self.search_url = 'https://www.google.com/search?gbv=1&num=' + str(
os.getenv('WHOOGLE_RESULTS_PER_PAGE', 10)) + '&q='
# Send heartbeat to Tor, used in determining if the user can or cannot
@ -249,6 +249,8 @@ class Request:
self.tor = config.tor
self.tor_valid = False
self.root_path = root_path
# Initialize HTTP client (shared per proxies)
self.http_client = http_client or get_http_client(self.proxies)
def __getitem__(self, name):
return getattr(self, name)
@ -286,7 +288,7 @@ class Request:
return []
def send(self, base_url='', query='', attempt=0,
force_mobile=False, user_agent='') -> Response:
force_mobile=False, user_agent=''):
"""Sends an outbound request to a URL. Optionally sends the request
using Tor, if enabled by the user.
@ -339,8 +341,9 @@ class Request:
# Make sure that the tor connection is valid, if enabled
if self.tor:
try:
tor_check = requests.get('https://check.torproject.org/',
proxies=self.proxies, headers=headers)
tor_check = self.http_client.get('https://check.torproject.org/',
headers=headers,
retries=1)
self.tor_valid = 'Congratulations' in tor_check.text
if not self.tor_valid:
@ -348,16 +351,18 @@ class Request:
"Tor connection succeeded, but the connection could "
"not be validated by torproject.org",
disable=True)
except ConnectionError:
except httpx.RequestError:
raise TorError(
"Error raised during Tor connection validation",
disable=True)
response = requests.get(
try:
response = self.http_client.get(
(base_url or self.search_url) + query,
proxies=self.proxies,
headers=headers,
cookies=cookies)
except httpx.HTTPError as e:
raise
# Retry query with new identity if using Tor (max 10 attempts)
if 'form id="captcha-form"' in response.text and self.tor:

View file

@ -32,8 +32,7 @@ from app.utils.session import valid_user_session
from bs4 import BeautifulSoup as bsoup
from flask import jsonify, make_response, request, redirect, render_template, \
send_file, session, url_for, g
from requests import exceptions
from requests.models import PreparedRequest
import httpx
from cryptography.fernet import Fernet, InvalidToken
from cryptography.exceptions import InvalidSignature
from werkzeug.datastructures import MultiDict
@ -166,7 +165,8 @@ def before_request_func():
g.user_request = Request(
request.headers.get('User-Agent'),
get_request_url(request.url_root),
config=g.user_config)
config=g.user_config
)
g.app_location = g.user_config.url
@ -299,7 +299,7 @@ def search():
get_req_str = urlparse.urlencode(post_data)
return redirect(url_for('.search') + '?' + get_req_str)
search_util = Search(request, g.user_config, g.session_key)
search_util = Search(request, g.user_config, g.session_key, user_request=g.user_request)
query = search_util.new_search_query()
bang = resolve_bang(query)
@ -320,7 +320,15 @@ def search():
'tor']
return redirect(url_for('.index'))
wants_json = (
request.args.get('format') == 'json' or
'application/json' in request.headers.get('Accept', '') or
'application/*+json' in request.headers.get('Accept', '')
)
if search_util.feeling_lucky:
if wants_json:
return jsonify({'redirect': response}), 303
return redirect(response, code=303)
# If the user is attempting to translate a string, determine the correct
@ -341,8 +349,17 @@ def search():
app.logger.error('503 (CAPTCHA)')
fallback_engine = os.environ.get('WHOOGLE_FALLBACK_ENGINE_URL', '')
if (fallback_engine):
if wants_json:
return jsonify({'redirect': fallback_engine + query}), 302
return redirect(fallback_engine + query)
if wants_json:
return jsonify({
'blocked': True,
'error_message': translation['ratelimit'],
'query': urlparse.unquote(query)
}), 503
else:
return render_template(
'error.html',
blocked=True,
@ -382,6 +399,29 @@ def search():
home_url = f"home?preferences={preferences}" if preferences else "home"
cleanresponse = str(response).replace("andlt;","&lt;").replace("andgt;","&gt;")
if wants_json:
# Build a parsable JSON from the filtered soup
json_soup = bsoup(str(response), 'html.parser')
results = []
seen = set()
for a in json_soup.find_all('a', href=True):
href = a['href']
if not href.startswith('http'):
continue
if href in seen:
continue
text = a.get_text(strip=True)
if not text:
continue
seen.add(href)
results.append({'href': href, 'text': text})
return jsonify({
'query': urlparse.unquote(query),
'search_type': search_util.search_type,
'results': results
})
return render_template(
'display.html',
has_update=app.config['HAS_UPDATE'],
@ -521,7 +561,7 @@ def element():
tmp_mem.seek(0)
return send_file(tmp_mem, mimetype=src_type)
except exceptions.RequestException:
except httpx.HTTPError:
pass
return send_file(io.BytesIO(empty_gif), mimetype='image/gif')

2
app/services/__init__.py Normal file
View file

@ -0,0 +1,2 @@

107
app/services/http_client.py Normal file
View file

@ -0,0 +1,107 @@
import threading
import time
from typing import Any, Dict, Optional, Tuple
import httpx
from cachetools import TTLCache
class HttpxClient:
"""Thin wrapper around httpx.Client providing simple retries and optional TTL caching.
The client is intended to be safe for reuse across requests. Per-request
overrides for headers/cookies are supported.
"""
def __init__(
self,
proxies: Optional[Dict[str, str]] = None,
timeout_seconds: float = 15.0,
cache_ttl_seconds: int = 30,
cache_maxsize: int = 256,
http2: bool = True) -> None:
client_kwargs = dict(http2=http2,
timeout=timeout_seconds,
follow_redirects=True)
# Prefer future-proof mounts when proxies are provided; fall back to proxies=
self._proxies = proxies or {}
if self._proxies:
# If both schemes map to the same proxy, try the newer proxy= API first
proxy_values = list(self._proxies.values())
single_proxy = proxy_values[0] if proxy_values and all(v == proxy_values[0] for v in proxy_values) else None
if single_proxy:
try:
self._client = httpx.Client(proxy=single_proxy, **client_kwargs)
except TypeError:
# Older httpx that doesn't support proxy=; try proxies=
try:
self._client = httpx.Client(proxies=self._proxies, **client_kwargs)
except TypeError:
mounts: Dict[str, httpx.Proxy] = {}
for scheme_key, url in self._proxies.items():
prefix = f"{scheme_key}://"
mounts[prefix] = httpx.Proxy(url)
self._client = httpx.Client(mounts=mounts, **client_kwargs)
else:
# Distinct proxies per scheme; use mounts fallback if needed
try:
self._client = httpx.Client(proxies=self._proxies, **client_kwargs)
except TypeError:
mounts: Dict[str, httpx.Proxy] = {}
for scheme_key, url in self._proxies.items():
prefix = f"{scheme_key}://"
mounts[prefix] = httpx.Proxy(url)
self._client = httpx.Client(mounts=mounts, **client_kwargs)
else:
self._client = httpx.Client(**client_kwargs)
self._timeout_seconds = timeout_seconds
self._cache = TTLCache(maxsize=cache_maxsize, ttl=cache_ttl_seconds)
self._cache_lock = threading.Lock()
@property
def proxies(self) -> Dict[str, str]:
return self._proxies
def _cache_key(self, method: str, url: str, headers: Optional[Dict[str, str]]) -> Tuple[str, str, Tuple[Tuple[str, str], ...]]:
normalized_headers = tuple(sorted((headers or {}).items()))
return (method.upper(), url, normalized_headers)
def get(self,
url: str,
headers: Optional[Dict[str, str]] = None,
cookies: Optional[Dict[str, str]] = None,
retries: int = 2,
backoff_seconds: float = 0.5,
use_cache: bool = False) -> httpx.Response:
if use_cache:
key = self._cache_key('GET', url, headers)
with self._cache_lock:
cached = self._cache.get(key)
if cached is not None:
return cached
last_exc: Optional[Exception] = None
attempt = 0
while attempt <= retries:
try:
response = self._client.get(url, headers=headers, cookies=cookies)
if use_cache and response.status_code == 200:
with self._cache_lock:
self._cache[key] = response
return response
except httpx.HTTPError as exc:
last_exc = exc
if attempt == retries:
raise
time.sleep(backoff_seconds * (2 ** attempt))
attempt += 1
# Should not reach here
if last_exc:
raise last_exc
raise httpx.HTTPError('Unknown HTTP error')
def close(self) -> None:
self._client.close()

40
app/services/provider.py Normal file
View file

@ -0,0 +1,40 @@
import os
from typing import Dict, Tuple
from app.services.http_client import HttpxClient
_clients: Dict[tuple, HttpxClient] = {}
def _proxies_key(proxies: Dict[str, str]) -> Tuple[Tuple[str, str], Tuple[str, str]]:
if not proxies:
return tuple(), tuple()
# Separate http/https for stable key
items = sorted((proxies or {}).items())
return tuple(items), tuple(items)
def get_http_client(proxies: Dict[str, str]) -> HttpxClient:
# Determine HTTP/2 enablement from env (default on)
http2_env = os.environ.get('WHOOGLE_HTTP2', '1').lower()
http2_enabled = http2_env in ('1', 'true', 't', 'yes', 'y')
key = (_proxies_key(proxies or {}), http2_enabled)
client = _clients.get(key)
if client is not None:
return client
client = HttpxClient(proxies=proxies or None, http2=http2_enabled)
_clients[key] = client
return client
def close_all_clients() -> None:
for client in list(_clients.values()):
try:
client.close()
except Exception:
pass
_clients.clear()

View file

@ -9,10 +9,14 @@
{% endif %}
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="referrer" content="no-referrer">
{% if bundle_static() %}
<link rel="stylesheet" href="/{{ cb_url('bundle.css') }}">
{% else %}
<link rel="stylesheet" href="{{ cb_url('logo.css') }}">
<link rel="stylesheet" href="{{ cb_url('input.css') }}">
<link rel="stylesheet" href="{{ cb_url('search.css') }}">
<link rel="stylesheet" href="{{ cb_url('header.css') }}">
{% endif %}
{% if config.theme %}
{% if config.theme == 'system' %}
<style>
@ -39,10 +43,14 @@
{{ response|safe }}
</body>
{% include 'footer.html' %}
{% if bundle_static() %}
<script src="/{{ cb_url('bundle.js') }}" defer></script>
{% else %}
{% if autocomplete_enabled == '1' %}
<script src="{{ cb_url('autocomplete.js') }}"></script>
{% endif %}
<script src="{{ cb_url('utils.js') }}"></script>
<script src="{{ cb_url('keyboard.js') }}"></script>
<script src="{{ cb_url('currency.js') }}"></script>
{% endif %}
</html>

View file

@ -10,8 +10,12 @@
{% else %}
<link rel="stylesheet" href="{{ cb_url(('dark' if config.dark else 'light') + '-theme.css') }}"/>
{% endif %}
{% if bundle_static() %}
<link rel="stylesheet" href="/{{ cb_url('bundle.css') }}">
{% else %}
<link rel="stylesheet" href="{{ cb_url('main.css') }}">
<link rel="stylesheet" href="{{ cb_url('error.css') }}">
{% endif %}
<style>{{ config.style }}</style>
<div>
<h1>Error</h1>

View file

@ -155,4 +155,8 @@
</div>
</div>
{% if bundle_static() %}
<script src="/{{ cb_url('bundle.js') }}" defer></script>
{% else %}
<script type="text/javascript" src="{{ cb_url('header.js') }}"></script>
{% endif %}

View file

@ -17,13 +17,21 @@
<meta name="referrer" content="no-referrer">
<meta name="msapplication-TileColor" content="#ffffff">
<meta name="msapplication-TileImage" content="static/img/favicon/ms-icon-144x144.png">
{% if bundle_static() %}
<script src="/{{ cb_url('bundle.js') }}" defer></script>
{% else %}
{% if autocomplete_enabled == '1' %}
<script src="{{ cb_url('autocomplete.js') }}"></script>
{% endif %}
<script type="text/javascript" src="{{ cb_url('controller.js') }}"></script>
{% endif %}
<link rel="search" href="opensearch.xml" type="application/opensearchdescription+xml" title="Whoogle Search">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
{% if bundle_static() %}
<link rel="stylesheet" href="/{{ cb_url('bundle.css') }}">
{% else %}
<link rel="stylesheet" href="{{ cb_url('logo.css') }}">
{% endif %}
{% if config.theme %}
{% if config.theme == 'system' %}
<style>
@ -36,7 +44,9 @@
{% else %}
<link rel="stylesheet" href="{{ cb_url(('dark' if config.dark else 'light') + '-theme.css') }}"/>
{% endif %}
{% if not bundle_static() %}
<link rel="stylesheet" href="{{ cb_url('main.css') }}">
{% endif %}
<noscript>
<style>
#main {

View file

@ -1,5 +1,5 @@
import json
import requests
import httpx
import urllib.parse as urlparse
import os
import glob
@ -63,12 +63,9 @@ def gen_bangs_json(bangs_file: str) -> None:
None
"""
try:
# Request full list from DDG
r = requests.get(DDG_BANGS)
r = httpx.get(DDG_BANGS)
r.raise_for_status()
except requests.exceptions.HTTPError as err:
raise SystemExit(err)
# Convert to json
data = json.loads(r.text)

View file

@ -5,7 +5,7 @@ import io
import os
import re
from requests import exceptions, get
import httpx
from urllib.parse import urlparse
from bs4 import BeautifulSoup as bsoup
from cryptography.fernet import Fernet
@ -97,8 +97,8 @@ def get_proxy_host_url(r: Request, default: str, root=False) -> str:
def check_for_update(version_url: str, current: str) -> int:
# Check for the latest version of Whoogle
has_update = ''
with contextlib.suppress(exceptions.ConnectionError, AttributeError):
update = bsoup(get(version_url).text, 'html.parser')
with contextlib.suppress(httpx.RequestError, AttributeError):
update = bsoup(httpx.get(version_url).text, 'html.parser')
latest = update.select_one('[class="Link--primary"]').string[1:]
current = int(''.join(filter(str.isdigit, current)))
latest = int(''.join(filter(str.isdigit, latest)))

View file

@ -55,7 +55,7 @@ class Search:
config: the current user config settings
session_key: the flask user fernet key
"""
def __init__(self, request, config, session_key, cookies_disabled=False):
def __init__(self, request, config, session_key, cookies_disabled=False, user_request=None):
method = request.method
self.request = request
self.request_params = request.args if method == 'GET' else request.form
@ -66,6 +66,7 @@ class Search:
self.query = ''
self.widget = ''
self.cookies_disabled = cookies_disabled
self.user_request = user_request
self.search_type = self.request_params.get(
'tbm') if 'tbm' in self.request_params else ''
@ -152,7 +153,8 @@ class Search:
# and self.config.view_image
# and not g.user_request.mobile)
get_body = g.user_request.send(query=full_query,
client = self.user_request or g.user_request
get_body = client.send(query=full_query,
force_mobile=self.config.view_image,
user_agent=self.user_agent)
@ -166,7 +168,7 @@ class Search:
# html_soup = content_filter.view_image(html_soup)
# Indicate whether or not a Tor connection is active
if g.user_request.tor_valid:
if (self.user_request or g.user_request).tor_valid:
html_soup.insert(0, bsoup(TOR_BANNER, 'html.parser'))
formatted_results = content_filter.clean(html_soup)

View file

@ -1,6 +1,6 @@
import json
import pathlib
import requests
import httpx
lingva = 'https://lingva.ml/api/v1/en'
@ -25,7 +25,7 @@ def translate(v: str, lang: str) -> str:
lingva_req = f'{lingva}/{lang}/{v}'
response = requests.get(lingva_req).json()
response = httpx.get(lingva_req).json()
if 'translation' in response:
return response['translation']

View file

@ -1,3 +1,16 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[tool.ruff]
line-length = 100
target-version = "py311"
lint.select = [
"E", "F", "W", # pycodestyle/pyflakes
"I", # isort
]
lint.ignore = []
[tool.black]
line-length = 100
target-version = ['py311']

View file

@ -1,10 +1,8 @@
attrs==22.2.0
beautifulsoup4==4.11.2
brotli==1.0.9
cachelib==0.10.2
certifi==2024.7.4
cffi==1.17.1
chardet==5.1.0
click==8.1.3
cryptography==3.3.2; platform_machine == 'armv7l'
cryptography==45.0.7; platform_machine != 'armv7l'
@ -23,13 +21,14 @@ pycparser==2.22
pyOpenSSL==19.1.0; platform_machine == 'armv7l'
pyOpenSSL==25.3.0; platform_machine != 'armv7l'
pyparsing==3.0.9
PySocks==1.7.1
pytest==7.2.1
python-dateutil==2.8.2
requests==2.32.2
httpx[http2,socks]==0.28.1
cachetools==5.5.0
soupsieve==2.4
stem==1.8.1
urllib3==1.26.19
httpcore>=1.0.9
h11>=0.16.0
validators==0.22.0
waitress==3.0.1
wcwidth==0.2.6

View file

@ -25,7 +25,7 @@ install_requires=
defusedxml
Flask
python-dotenv
requests
httpx[http2,socks]
stem
validators
waitress

110
test/test_alts.py Normal file
View file

@ -0,0 +1,110 @@
import copy
import os
from bs4 import BeautifulSoup
from app.filter import Filter
from app.models.config import Config
from app.utils.session import generate_key
from app.utils import results as results_mod
def build_soup(html: str):
return BeautifulSoup(html, 'html.parser')
def make_filter(soup: BeautifulSoup):
secret_key = generate_key()
cfg = Config(**{'alts': True})
f = Filter(user_key=secret_key, config=cfg)
f.soup = soup
return f
def test_no_duplicate_alt_prefix_reddit(monkeypatch):
original_site_alts = copy.deepcopy(results_mod.SITE_ALTS)
try:
# Simulate user setting alt to old.reddit.com
monkeypatch.setitem(results_mod.SITE_ALTS, 'reddit.com', 'old.reddit.com')
html = '''
<div id="main">
<a href="https://www.reddit.com/r/whoogle">www.reddit.com</a>
<div>www.reddit.com</div>
<div>old.reddit.com</div>
</div>
'''
soup = build_soup(html)
f = make_filter(soup)
f.site_alt_swap()
# Href replaced once
a = soup.find('a')
assert a['href'].startswith('https://old.reddit.com')
# Bare domain replaced, but already-alt text stays unchanged (no old.old...)
divs = [d.get_text() for d in soup.find_all('div') if d.get_text().strip()]
assert 'old.reddit.com' in divs
assert 'old.old.reddit.com' not in ''.join(divs)
finally:
results_mod.SITE_ALTS.clear()
results_mod.SITE_ALTS.update(original_site_alts)
def test_wikipedia_simple_no_lang_param(monkeypatch):
original_site_alts = copy.deepcopy(results_mod.SITE_ALTS)
try:
monkeypatch.setitem(results_mod.SITE_ALTS, 'wikipedia.org', 'https://wikiless.example')
html = '''
<div id="main">
<a href="https://simple.wikipedia.org/wiki/Whoogle">https://simple.wikipedia.org/wiki/Whoogle</a>
<div>simple.wikipedia.org</div>
</div>
'''
soup = build_soup(html)
f = make_filter(soup)
f.site_alt_swap()
a = soup.find('a')
# Should be rewritten to the alt host, without ?lang
assert a['href'].startswith('https://wikiless.example')
assert '?lang=' not in a['href']
# Description host replaced once
text = soup.find('div').get_text()
assert 'wikiless.example' in text
assert 'simple.wikipedia.org' not in text
finally:
results_mod.SITE_ALTS.clear()
results_mod.SITE_ALTS.update(original_site_alts)
def test_single_pass_description_replacement(monkeypatch):
original_site_alts = copy.deepcopy(results_mod.SITE_ALTS)
try:
monkeypatch.setitem(results_mod.SITE_ALTS, 'twitter.com', 'https://nitter.example')
html = '''
<div id="main">
<a href="https://twitter.com/whoogle">https://twitter.com/whoogle</a>
<div>https://www.twitter.com</div>
</div>
'''
soup = build_soup(html)
f = make_filter(soup)
f.site_alt_swap()
a = soup.find('a')
assert a['href'].startswith('https://nitter.example')
# Ensure description got host swapped once, no double scheme or duplication
text = soup.find('div').get_text()
assert text.startswith('https://nitter.example')
assert 'https://https://' not in text
assert 'nitter.examplenitter.example' not in text
finally:
results_mod.SITE_ALTS.clear()
results_mod.SITE_ALTS.update(original_site_alts)

View file

@ -0,0 +1,29 @@
from app.request import Request
from app.models.config import Config
class FakeHttpClient:
def get(self, url, headers=None, cookies=None, retries=0, backoff_seconds=0.5, use_cache=False):
# Minimal XML in Google Toolbar Autocomplete format
xml = (
'<?xml version="1.0"?>\n'
'<topp>\n'
' <CompleteSuggestion><suggestion data="whoogle"/></CompleteSuggestion>\n'
' <CompleteSuggestion><suggestion data="whoogle search"/></CompleteSuggestion>\n'
'</topp>'
)
class R:
text = xml
return R()
def close(self):
pass
def test_autocomplete_parsing():
cfg = Config(**{})
req = Request(normal_ua='UA', root_path='http://localhost:5000', config=cfg, http_client=FakeHttpClient())
suggestions = req.autocomplete('who')
assert 'whoogle' in suggestions
assert 'whoogle search' in suggestions

33
test/test_http_client.py Normal file
View file

@ -0,0 +1,33 @@
import types
import httpx
import pytest
from app.services.http_client import HttpxClient
def test_httpxclient_follow_redirects_and_proxy(monkeypatch):
calls = []
class FakeClient:
def __init__(self, *args, **kwargs):
calls.append(kwargs)
def get(self, *args, **kwargs):
class R:
status_code = 200
text = ''
return R()
def close(self):
pass
monkeypatch.setattr(httpx, 'Client', FakeClient)
proxies = {'http': 'socks5://127.0.0.1:9050', 'https': 'socks5://127.0.0.1:9050'}
client = HttpxClient(proxies=proxies)
# Ensure the constructor attempted to set follow_redirects and one of proxy/proxies
assert len(calls) == 1
kwargs = calls[0]
assert kwargs.get('follow_redirects') is True
assert ('proxy' in kwargs) or ('proxies' in kwargs) or ('mounts' in kwargs)

73
test/test_json.py Normal file
View file

@ -0,0 +1,73 @@
import json
import types
import pytest
from app.models.endpoint import Endpoint
from app.utils import search as search_mod
@pytest.fixture
def stubbed_search_response(monkeypatch):
# Stub Search.new_search_query to return a stable query
def fake_new_query(self):
self.query = 'whoogle'
return self.query
# Return a minimal filtered HTML snippet with a couple of links
html = (
'<div id="main">'
' <a href="https://example.com/page">Example Page</a>'
' <a href="/relative">Relative</a>'
' <a href="https://example.org/other">Other</a>'
'</div>'
)
def fake_generate(self):
return html
monkeypatch.setattr(search_mod.Search, 'new_search_query', fake_new_query)
monkeypatch.setattr(search_mod.Search, 'generate_response', fake_generate)
def test_search_json_accept(client, stubbed_search_response):
rv = client.get(f'/{Endpoint.search}?q=whoogle', headers={'Accept': 'application/json'})
assert rv._status_code == 200
data = json.loads(rv.data)
assert data['query'] == 'whoogle'
assert isinstance(data['results'], list)
hrefs = {item['href'] for item in data['results']}
assert 'https://example.com/page' in hrefs
assert 'https://example.org/other' in hrefs
# Relative href should be excluded
assert not any(href.endswith('/relative') for href in hrefs)
def test_search_json_format_param(client, stubbed_search_response):
rv = client.get(f'/{Endpoint.search}?q=whoogle&format=json')
assert rv._status_code == 200
data = json.loads(rv.data)
assert data['query'] == 'whoogle'
assert len(data['results']) >= 2
def test_search_json_feeling_lucky(client, monkeypatch):
# Force query to be interpreted as feeling lucky and return a redirect URL
def fake_new_query(self):
self.query = 'whoogle !'
# emulate behavior of new_search_query setting feeling_lucky
self.feeling_lucky = True
return self.query
def fake_generate(self):
return 'https://example.com/lucky'
monkeypatch.setattr(search_mod.Search, 'new_search_query', fake_new_query)
monkeypatch.setattr(search_mod.Search, 'generate_response', fake_generate)
rv = client.get(f'/{Endpoint.search}?q=whoogle%20!', headers={'Accept': 'application/json'})
assert rv._status_code == 303
data = json.loads(rv.data)
assert data['redirect'] == 'https://example.com/lucky'

View file

@ -3,6 +3,7 @@ from app.filter import Filter
from app.models.config import Config
from app.models.endpoint import Endpoint
from app.utils import results
from app.utils import search as search_mod
from app.utils.session import generate_key
from datetime import datetime
from dateutil.parser import ParserError, parse
@ -32,18 +33,24 @@ def get_search_results(data):
return result_divs
def test_get_results(client):
# FIXME: Temporary fix while #1211 is investigated
return
def test_get_results(client, monkeypatch):
def fake_generate(self):
# Build 10 results under #main, each with a single inner div
items = []
for i in range(10):
items.append(f'<div><div><a href="https://example.com/{i}">Item {i}</a></div></div>')
return f'<div id="main">{"".join(items)}</div>'
monkeypatch.setattr(search_mod.Search, 'generate_response', fake_generate)
rv = client.get(f'/{Endpoint.search}?q=test')
assert rv._status_code == 200
# Depending on the search, there can be more
# than 10 result divs
results = get_search_results(rv.data)
assert len(results) >= 10
assert len(results) <= 15
results_divs = get_search_results(rv.data)
assert len(results_divs) >= 10
assert len(results_divs) <= 15
def test_post_results(client):
@ -87,9 +94,12 @@ def test_block_results(client):
assert result_site not in 'pinterest.com'
def test_view_my_ip(client):
# FIXME: Temporary fix while #1211 is investigated
return
def test_view_my_ip(client, monkeypatch):
def fake_generate(self):
# Minimal page; ip card is injected later by routes when widget == 'ip'
return '<div id="main"></div>'
monkeypatch.setattr(search_mod.Search, 'generate_response', fake_generate)
rv = client.get(f'/{Endpoint.search}?q=my ip address')
assert rv._status_code == 200
@ -100,9 +110,16 @@ def test_view_my_ip(client):
assert '127.0.0.1' in str_data
def test_recent_results(client):
# FIXME: Temporary fix while #1211 is investigated
return
def test_recent_results(client, monkeypatch):
def fake_generate(self):
# Create results with a span containing today's date so it passes all windows
today = datetime.now().strftime('%b %d, %Y')
items = []
for i in range(5):
items.append(f'<div><div><span>{today}</span></div></div>')
return f'<div id="main">{"".join(items)}</div>'
monkeypatch.setattr(search_mod.Search, 'generate_response', fake_generate)
times = {
'tbs=qdr:y': 365,

26
test/test_routes_json.py Normal file
View file

@ -0,0 +1,26 @@
import json
import pytest
from app.models.endpoint import Endpoint
from app.utils import search as search_mod
def test_captcha_json_block(client, monkeypatch):
def fake_new_query(self):
self.query = 'test'
return self.query
def fake_generate(self):
# Inject a captcha marker into HTML so route returns 503 JSON
return '<div>div class="g-recaptcha"</div>'
monkeypatch.setattr(search_mod.Search, 'new_search_query', fake_new_query)
monkeypatch.setattr(search_mod.Search, 'generate_response', fake_generate)
rv = client.get(f'/{Endpoint.search}?q=test&format=json')
assert rv._status_code == 503
data = json.loads(rv.data)
assert data['blocked'] is True
assert 'error_message' in data

45
test/test_tor.py Normal file
View file

@ -0,0 +1,45 @@
import pytest
from app.request import Request, TorError
from app.models.config import Config
class FakeResponse:
def __init__(self, text: str = '', status_code: int = 200, content: bytes = b''):
self.text = text
self.status_code = status_code
self.content = content or b''
class FakeHttpClient:
def __init__(self, tor_ok: bool):
self._tor_ok = tor_ok
def get(self, url, headers=None, cookies=None, retries=0, backoff_seconds=0.5, use_cache=False):
if 'check.torproject.org' in url:
return FakeResponse(text=('Congratulations' if self._tor_ok else 'Not Tor'))
return FakeResponse(text='', status_code=200, content=b'OK')
def close(self):
pass
def build_config(tor: bool) -> Config:
# Minimal config with tor flag
return Config(**{'tor': tor})
def test_tor_validation_success():
cfg = build_config(tor=True)
req = Request(normal_ua='TestUA', root_path='http://localhost:5000', config=cfg, http_client=FakeHttpClient(tor_ok=True))
resp = req.send(base_url='https://example.com', query='')
assert req.tor_valid is True
assert resp.status_code == 200
def test_tor_validation_failure():
cfg = build_config(tor=True)
req = Request(normal_ua='TestUA', root_path='http://localhost:5000', config=cfg, http_client=FakeHttpClient(tor_ok=False))
with pytest.raises(TorError):
_ = req.send(base_url='https://example.com', query='')