Adds timeout to update checks (#542)

Co-authored-by: Donncha Ó Cearbhaill <donncha.ocearbhaill@amnesty.org>
This commit is contained in:
Tek
2024-10-17 11:56:05 +02:00
committed by GitHub
parent 9678eb17e5
commit 7575315966
2 changed files with 21 additions and 5 deletions

View File

@@ -3,6 +3,9 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import requests
from rich import print as rich_print
from .updates import IndicatorsUpdates, MVTUpdates
@@ -10,10 +13,13 @@ from .version import MVT_VERSION
def check_updates() -> None:
log = logging.getLogger("mvt")
# First we check for MVT version updates.
mvt_updates = MVTUpdates()
try:
mvt_updates = MVTUpdates()
latest_version = mvt_updates.check()
except requests.exceptions.Timeout:
log.verbose("Couldn't check latest MVT version")
except Exception:
pass
else:
@@ -48,6 +54,8 @@ def check_updates() -> None:
try:
ioc_to_update = ioc_updates.check()
except requests.exceptions.Timeout:
log.verbose("Couldn't check latest indicators")
except Exception:
pass
else:

View File

@@ -23,7 +23,7 @@ INDICATORS_CHECK_FREQUENCY = 12
class MVTUpdates:
def check(self) -> str:
res = requests.get("https://pypi.org/pypi/mvt/json")
res = requests.get("https://pypi.org/pypi/mvt/json", timeout=15)
data = res.json()
latest_version = data.get("info", {}).get("version", "")
@@ -69,6 +69,10 @@ class IndicatorsUpdates:
handle.write(str(timestamp))
def get_latest_update(self) -> int:
"""
Check the time of the latest indicator update.
Returns 0 if this file doesn't exists.
"""
if not os.path.exists(self.latest_update_path):
return 0
@@ -88,7 +92,7 @@ class IndicatorsUpdates:
url = self.github_raw_url.format(
self.index_owner, self.index_repo, self.index_branch, self.index_path
)
res = requests.get(url)
res = requests.get(url, timeout=15)
if res.status_code != 200:
log.error(
"Failed to retrieve indicators index located at %s (error %d)",
@@ -100,7 +104,7 @@ class IndicatorsUpdates:
return yaml.safe_load(res.content)
def download_remote_ioc(self, ioc_url: str) -> Optional[str]:
res = requests.get(ioc_url)
res = requests.get(ioc_url, timeout=15)
if res.status_code != 200:
log.error(
"Failed to download indicators file from %s (error %d)",
@@ -166,7 +170,7 @@ class IndicatorsUpdates:
file_commit_url = (
f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}"
)
res = requests.get(file_commit_url)
res = requests.get(file_commit_url, timeout=15)
if res.status_code != 200:
log.error(
"Failed to get details about file %s (error %d)",
@@ -195,6 +199,10 @@ class IndicatorsUpdates:
return latest_commit_ts
def should_check(self) -> Tuple[bool, int]:
"""
Compare time of the latest indicator check with current time.
Returns bool and number of hours since the last check.
"""
now = datetime.now()
latest_check_ts = self.get_latest_check()
latest_check_dt = datetime.fromtimestamp(latest_check_ts)