From 2941848893dcbbba7f52a15821cc3655306d3610 Mon Sep 17 00:00:00 2001 From: Marc Date: Wed, 17 Dec 2025 21:41:47 +0100 Subject: [PATCH] Refactor getTrending.py for improved structure --- .github/getTrending.py | 237 ++++++++++++++++------------------------- 1 file changed, 90 insertions(+), 147 deletions(-) diff --git a/.github/getTrending.py b/.github/getTrending.py index c363be6a4d..dc903c5d35 100644 --- a/.github/getTrending.py +++ b/.github/getTrending.py @@ -1,156 +1,99 @@ #!/usr/bin/env python3 -"""Regenerate the Trending PoCs tables in README.md. - -Goals (matching the legacy README that worked well): -- Cover the current year plus the previous three. -- Keep the familiar heading “Latest 20 of N Repositories”. -- Only show repos updated in the last WINDOW_DAYS. -- Require a CVE-shaped repo name for that year and a non-empty description. -- Sort newest first, then by stars, and cap at MAX_ROWS per year. -""" - -from __future__ import annotations - -import os -import re -from datetime import datetime, timedelta, timezone -from pathlib import Path -from typing import Iterable, List, TypedDict - +# -*- coding:utf-8 -*- import requests - -WINDOW_DAYS = 4 -MAX_ROWS = 20 -YEARS_BACK = 4 -MIN_STARS = 0 # keep low to capture fresh repos +import json +from datetime import datetime -class Repo(TypedDict): - name: str - html_url: str - description: str | None - stargazers_count: int - updated_at: str - - -def github_headers() -> dict: - token = os.environ.get("GITHUB_TOKEN") or os.environ.get("GH_TOKEN") - headers = {"Accept": "application/vnd.github+json"} - if token: - headers["Authorization"] = f"Bearer {token}" - return headers - - -def time_ago(updated_at: str, now: datetime) -> str: - dt = datetime.strptime(updated_at, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) - delta = now - dt +def time_ago(datetime_str): + datetime_obj = datetime.strptime(datetime_str, "%Y-%m-%dT%H:%M:%SZ") + current_datetime = datetime.now() + delta = current_datetime - datetime_obj if delta.days > 0: - return "1 day ago" if delta.days == 1 else f"{delta.days} days ago" - hours = delta.seconds // 3600 - if hours: - return "1 hour ago" if hours == 1 else f"{hours} hours ago" - minutes = (delta.seconds % 3600) // 60 - if minutes: - return "1 minute ago" if minutes == 1 else f"{minutes} minutes ago" - return "just now" - - -def _search_total(year: int) -> int: - """Return total repositories matching CVE-year (used for table heading).""" - stars_clause = f"stars:>{MIN_STARS}" if MIN_STARS >= 0 else "stars:>0" - query = f"CVE-{year} in:name {stars_clause} archived:false" - url = "https://api.github.com/search/repositories" - resp = requests.get( - url, params={"q": query, "per_page": 1}, headers=github_headers(), timeout=30 - ) - resp.raise_for_status() - return int(resp.json().get("total_count", 0)) - - -def fetch_trending(year: int, cutoff: datetime) -> tuple[List[Repo], int]: - """Fetch and filter trending repos for a year, returning rows and total_count.""" - stars_clause = f"stars:>{MIN_STARS}" if MIN_STARS >= 0 else "stars:>0" - query = f"CVE-{year} in:name {stars_clause} archived:false pushed:>={cutoff.date().isoformat()}" - url = "https://api.github.com/search/repositories" - total_count = _search_total(year) - pattern = re.compile(rf"cve-{year}-\d+", re.IGNORECASE) - filtered: List[Repo] = [] - seen_urls: set[str] = set() - - # Walk multiple pages to gather enough fresh repos (up to MAX_ROWS). - for page in range(1, 2): - params = { - "q": query, - "sort": "updated", - "order": "desc", - "per_page": 100, - "page": page, - } - resp = requests.get(url, params=params, headers=github_headers(), timeout=30) - resp.raise_for_status() - items: Iterable[Repo] = resp.json().get("items", []) - if not items: - break - for item in items: - name = item.get("name", "") - updated_at = item.get("updated_at") - description = (item.get("description") or "").strip() - html_url = item.get("html_url") - if not updated_at or not html_url or not description: - continue - if not pattern.search(name or ""): - continue - updated_dt = datetime.strptime(updated_at, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) - if updated_dt < cutoff: - continue - if html_url in seen_urls: - continue - seen_urls.add(html_url) - filtered.append(item) - if len(filtered) >= MAX_ROWS: - break - - # Already sorted by updated desc; break ties by stars - filtered.sort( - key=lambda r: ( - -datetime.strptime(r["updated_at"], "%Y-%m-%dT%H:%M:%SZ").timestamp(), - -int(r.get("stargazers_count", 0)), - ) - ) - return filtered[:MAX_ROWS], total_count - - -def build_rows(repos: List[Repo], now: datetime) -> List[str]: - rows: List[str] = [] - for repo in repos: - desc = repo.get("description") or "" - stars = int(repo.get("stargazers_count", 0)) - updated = time_ago(repo["updated_at"], now) - rows.append(f"| {stars}⭐ | {updated} | [{repo['name']}]({repo['html_url']}) | {desc} |") - return rows - - -def main() -> None: - now = datetime.now(timezone.utc) - current_year = now.year - cutoff = now - timedelta(days=WINDOW_DAYS) - - output: List[str] = ['

Recently updated Proof-of-Concepts

'] - - for year in range(current_year, current_year - YEARS_BACK, -1): - repos, total = fetch_trending(year, cutoff) - output.append(f"\n\n## {year}\n") - output.append(f"### Latest {MAX_ROWS} of {total} Repositories\n") - output.append("| Stars | Updated | Name | Description |") - output.append("| --- | --- | --- | --- |") - if repos: - output.extend(build_rows(repos, now)) + if delta.days == 1: + return "1 day ago" else: - output.append("| 0⭐ | — | No recent CVE PoCs | No repositories matched the filters. |") + return f"{delta.days} days ago" + elif delta.seconds >= 3600: + hours = delta.seconds // 3600 + if hours == 1: + return "1 hour ago" + else: + return f"{hours} hours ago" + elif delta.seconds >= 60: + minutes = delta.seconds // 60 + if minutes == 1: + return "1 minute ago" + else: + return f"{minutes} minutes ago" + else: + return "just now" - Path("README.md").write_text("\n".join(output), encoding="utf-8") - print(f"Wrote tables for {YEARS_BACK} years ending {current_year}") +current_year = datetime.now().year +total_repos_per_year = {} +#tz_header = {"Time-Zone": "Europe/Amsterdam"} +repositories_by_year = {} +for year in range(current_year, current_year - 5, -1): + year_repositories = [] + print(f"Fetching data for {year}") + response = requests.get(f'https://api.github.com/search/repositories?q=%22CVE-{year}%22%20in:name%20%20stars:>2%20language:Shell%20language:Go%20language:ASP%20language:WebAssembly%20language:R%20language:Lua%20language:Python%20%20%20language:C++%20language:C%20language:JavaScript%20language:Perl%20language:PowerShell%20language:Ruby%20language:Rust%20language:Java%20%20language:PHP&s=updated&o=desc&page=1&per_page=20') + if response.status_code != 200: + print(f"Failed to fetch data for year {year}: {response.status_code}") + continue -if __name__ == "__main__": - main() + data = response.json() + total_count = data.get("total_count", 0) + print(f"Found: {total_count}") + total_repos_per_year[year] = total_count + if "items" in data: + items = data["items"] + if items: + year_repositories.extend(items) + else: + print(f"No more items found for year {year}") + + if year_repositories: + # Sort the repositories by stargazers_count in descending order + #year_repositories.sort(key=lambda repo: repo['stargazers_count'], reverse=True) + repositories_by_year[year] = year_repositories + +# Define a class to handle repository information +class RepositoryInfo: + def __init__(self, description, stargazers_count, name, html_url, updated_at): + self.description = description + self.stargazers_count = stargazers_count + self.name = name + self.html_url = html_url + self.updated_at = updated_at + + def __hash__(self): + return hash(self.name + self.html_url) + + def __eq__(self, other): + return self.html_url == other.html_url and self.name == other.name + +final_output = ['

Recently updated Proof-of-Concepts

'] +for year in range(current_year, current_year - 5, -1): + if year in repositories_by_year: + year_repositories = repositories_by_year[year] + year_repositories = [RepositoryInfo(repo["description"], repo["stargazers_count"], repo["name"], repo["html_url"], repo["updated_at"]) for repo in year_repositories] + + final_output.append(f"\n\n## {year}\n") + final_output.append(f"### Latest 20 of {total_repos_per_year[year]} Repositories\n") + final_output.append("| Stars | Updated | Name | Description |") + final_output.append("| --- | --- | --- | --- |") + + for repo in year_repositories: + try: + description = repo.description or "" + updated = time_ago(repo.updated_at) + final_output.append(f"| {repo.stargazers_count}⭐ | {updated} | [{repo.name}]({repo.html_url}) | {description} |") + except Exception as e: + print(f"Error generating final output for repository {repo.name}: {e}") + pass + +if repositories_by_year: + with open("README.md", "w", encoding="utf-8") as file: + file.write("\n".join(final_output)) + print("Final output written to README.md")