mirror of
https://github.com/Lissy93/awesome-privacy.git
synced 2026-03-11 08:55:33 +00:00
293 lines
9.8 KiB
Python
293 lines
9.8 KiB
Python
"""Validates data quality for added/modified services using the diff JSON."""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
|
|
import yaml
|
|
|
|
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
DATA_PATH = os.path.join(PROJECT_ROOT, "awesome-privacy.yml")
|
|
DIFF_PATH = "/tmp/pr-diff.json"
|
|
FINDINGS_PATH = "/tmp/findings-data.json"
|
|
|
|
REQUIRED_FIELDS = ("name", "description", "url", "icon")
|
|
|
|
CONTRIBUTING = "https://github.com/Lissy93/awesome-privacy/blob/main/.github/CONTRIBUTING.md"
|
|
|
|
SCHEMA_MSG = (
|
|
"Some of the schema checks have failed. Please check that your addition"
|
|
" contains all the required fields, with acceptable values, nothing"
|
|
" additional and that it is following valid YAML syntax"
|
|
)
|
|
MULTIPLE_MSG = "Please make just one addition per pull request"
|
|
MISSING_TPL = (
|
|
"Did you include all required fields? Looks like {fields} is missing or"
|
|
f" invalid. Please see the [required fields]({CONTRIBUTING}#service-fields)"
|
|
" for available fields."
|
|
)
|
|
POSITION_MSG = (
|
|
"New entries must be added to the end of the section, unless otherwise requested"
|
|
)
|
|
OPENSOURCE_MSG = (
|
|
"You indicated this app/service is not open source. This will likely make"
|
|
" it ineligible for listing on Awesome Privacy in accordance with our"
|
|
f" [Requirements]({CONTRIBUTING}#requirements)."
|
|
" Please ensure that this is justified in your PR body."
|
|
)
|
|
DUPLICATE_NAME_MSG = (
|
|
"A service named `{name}` already exists (in {location})."
|
|
" If this is a different service, please clarify in your PR description"
|
|
)
|
|
DUPLICATE_URL_MSG = (
|
|
"The URL `{url}` is already associated with `{existing}`."
|
|
" Please check this isn't a duplicate submission"
|
|
)
|
|
DESC_LENGTH_MSG = (
|
|
"Description length ({length} chars) is outside the recommended 50\u2013250"
|
|
f" character range. Please see our [Contributing Guidelines]({CONTRIBUTING}#description)"
|
|
)
|
|
OPENSOURCE_GITHUB_MSG = (
|
|
"You marked this service as open source but didn't include a `github` field."
|
|
" Please add the repository link"
|
|
)
|
|
|
|
|
|
def load_json(path):
|
|
"""Load JSON from a file, returning None on any error."""
|
|
try:
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def load_yaml_data(path):
|
|
"""Load YAML from a file, returning None on any error."""
|
|
try:
|
|
with open(path) as f:
|
|
return yaml.safe_load(f)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def find_section_services(head, category, section):
|
|
"""Return the services list for a category/section pair, or None."""
|
|
for cat in head.get("categories", []):
|
|
if cat.get("name") == category:
|
|
for sec in cat.get("sections", []):
|
|
if sec.get("name") == section:
|
|
return sec.get("services", [])
|
|
return None
|
|
|
|
|
|
def find_service_fields(head, category, section, service_name):
|
|
"""Look up a service's fields in the head YAML."""
|
|
services = find_section_services(head, category, section)
|
|
if services:
|
|
for svc in services:
|
|
if svc.get("name") == service_name:
|
|
return svc
|
|
return None
|
|
|
|
|
|
def check_required_fields(diff, head):
|
|
"""Return a finding if any added/modified service is missing required fields."""
|
|
missing = set()
|
|
for svc in diff.get("services", {}).get("added", []):
|
|
fields = svc.get("fields", {})
|
|
for f in REQUIRED_FIELDS:
|
|
if fields.get(f) is None:
|
|
missing.add(f)
|
|
for svc in diff.get("services", {}).get("modified", []):
|
|
if not head:
|
|
continue
|
|
changed = svc.get("changed_fields", [])
|
|
fields = find_service_fields(
|
|
head, svc["category"], svc["section"], svc["service"]
|
|
)
|
|
if fields:
|
|
for f in REQUIRED_FIELDS:
|
|
if f in changed and fields.get(f) is None:
|
|
missing.add(f)
|
|
if missing:
|
|
names = ", ".join(f"`{f}`" for f in sorted(missing))
|
|
return MISSING_TPL.format(fields=names)
|
|
return None
|
|
|
|
|
|
def check_position(diff, head):
|
|
"""Return a finding if a newly added service is not at the end of its section."""
|
|
if not head:
|
|
return None
|
|
for svc in diff.get("services", {}).get("added", []):
|
|
services = find_section_services(head, svc["category"], svc["section"])
|
|
if services and services[-1].get("name") != svc["service"]:
|
|
return POSITION_MSG
|
|
return None
|
|
|
|
|
|
def check_open_source(diff):
|
|
"""Return a finding if an added service has openSource missing or not true."""
|
|
for svc in diff.get("services", {}).get("added", []):
|
|
fields = svc.get("fields", {})
|
|
if fields.get("openSource") is not True and not fields.get("github"):
|
|
return OPENSOURCE_MSG
|
|
return None
|
|
|
|
|
|
def check_single_entry(diff):
|
|
"""Return a finding if the diff adds multiple new services or sections."""
|
|
services = diff.get("services", {})
|
|
added_count = len(services.get("added", []))
|
|
if added_count > 1:
|
|
return MULTIPLE_MSG
|
|
if added_count == 0:
|
|
added_sections = [s for s in diff.get("sections", [])
|
|
if s.get("change_type") == "added_section"]
|
|
if len(added_sections) > 1:
|
|
return MULTIPLE_MSG
|
|
return None
|
|
|
|
|
|
def _added_keys(diff):
|
|
"""Build a set of (category, section, lowercase_name) for added services."""
|
|
keys = set()
|
|
for svc in diff.get("services", {}).get("added", []):
|
|
name = svc.get("fields", {}).get("name", "").lower().strip()
|
|
keys.add((svc.get("category", ""), svc.get("section", ""), name))
|
|
return keys
|
|
|
|
|
|
def build_name_index(head, diff):
|
|
"""Build {lowercase_name: "category > section"} from all services, excluding additions."""
|
|
index = {}
|
|
if not head:
|
|
return index
|
|
exclude = _added_keys(diff)
|
|
for cat in head.get("categories", []):
|
|
cn = cat.get("name", "")
|
|
for sec in cat.get("sections", []):
|
|
sn = sec.get("name", "")
|
|
for svc in sec.get("services", []):
|
|
name = svc.get("name", "").lower().strip()
|
|
if name and (cn, sn, name) not in exclude:
|
|
index[name] = f"{cn} > {sn}"
|
|
return index
|
|
|
|
|
|
def build_url_index(head, diff):
|
|
"""Build {url: service_name} from all services, excluding additions."""
|
|
index = {}
|
|
if not head:
|
|
return index
|
|
exclude = _added_keys(diff)
|
|
for cat in head.get("categories", []):
|
|
cn = cat.get("name", "")
|
|
for sec in cat.get("sections", []):
|
|
sn = sec.get("name", "")
|
|
for svc in sec.get("services", []):
|
|
name = svc.get("name", "").lower().strip()
|
|
if (cn, sn, name) in exclude:
|
|
continue
|
|
url = svc.get("url", "")
|
|
if url:
|
|
index[url] = svc.get("name", "")
|
|
return index
|
|
|
|
|
|
def check_duplicate_name(diff, name_index):
|
|
"""Return a finding if an added service name already exists in the YAML."""
|
|
for svc in diff.get("services", {}).get("added", []):
|
|
name = svc.get("fields", {}).get("name", "").lower().strip()
|
|
if name and name in name_index:
|
|
return DUPLICATE_NAME_MSG.format(
|
|
name=svc["fields"]["name"], location=name_index[name],
|
|
)
|
|
return None
|
|
|
|
|
|
def check_duplicate_url(diff, url_index):
|
|
"""Return a finding if an added service URL already exists in the YAML."""
|
|
for svc in diff.get("services", {}).get("added", []):
|
|
url = svc.get("fields", {}).get("url", "")
|
|
if url and url in url_index:
|
|
return DUPLICATE_URL_MSG.format(url=url, existing=url_index[url])
|
|
return None
|
|
|
|
|
|
def check_description_length(diff):
|
|
"""Return a finding if an added service description is outside 50-250 chars."""
|
|
for svc in diff.get("services", {}).get("added", []):
|
|
desc = svc.get("fields", {}).get("description", "")
|
|
length = len(desc)
|
|
if length < 50 or length > 280:
|
|
return DESC_LENGTH_MSG.format(length=length)
|
|
return None
|
|
|
|
|
|
def check_opensource_github(diff):
|
|
"""Return a finding if an added service is open source but has no github field."""
|
|
for svc in diff.get("services", {}).get("added", []):
|
|
fields = svc.get("fields", {})
|
|
if fields.get("openSource") is True and not fields.get("github"):
|
|
return OPENSOURCE_GITHUB_MSG
|
|
return None
|
|
|
|
|
|
def main():
|
|
findings = []
|
|
critical = False
|
|
try:
|
|
if os.environ.get("SCHEMA_OUTCOME") == "failure":
|
|
findings.append({"msg": SCHEMA_MSG, "level": "error"})
|
|
|
|
diff = load_json(DIFF_PATH)
|
|
head = load_yaml_data(DATA_PATH)
|
|
|
|
if diff:
|
|
finding = check_single_entry(diff)
|
|
if finding:
|
|
findings.append(finding)
|
|
|
|
finding = check_required_fields(diff, head)
|
|
if finding:
|
|
findings.append({"msg": finding, "level": "error"})
|
|
critical = True
|
|
|
|
finding = check_position(diff, head)
|
|
if finding:
|
|
findings.append(finding)
|
|
|
|
finding = check_open_source(diff)
|
|
if finding:
|
|
findings.append(finding)
|
|
|
|
name_index = build_name_index(head, diff)
|
|
url_index = build_url_index(head, diff)
|
|
|
|
finding = check_duplicate_name(diff, name_index)
|
|
if finding:
|
|
findings.append(finding)
|
|
|
|
finding = check_duplicate_url(diff, url_index)
|
|
if finding:
|
|
findings.append(finding)
|
|
|
|
finding = check_description_length(diff)
|
|
if finding:
|
|
findings.append(finding)
|
|
|
|
finding = check_opensource_github(diff)
|
|
if finding:
|
|
findings.append(finding)
|
|
except Exception:
|
|
pass
|
|
|
|
with open(FINDINGS_PATH, "w") as f:
|
|
json.dump(findings, f)
|
|
sys.exit(1 if critical else 0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|