From 4b4cad46bac4778ee2c254bb29107a75c8f17d7d Mon Sep 17 00:00:00 2001 From: Rory Flynn <75283103+roaree@users.noreply.github.com> Date: Wed, 15 Nov 2023 11:40:24 +0100 Subject: [PATCH] Add `CustomJSONEncoder` to handle bytes types (#414) Adds a custom JSON encoder class to fix serialisation issues where modules included bytes types containing non-utf8 bytes, which can't be serialised to JSON. --------- Co-authored-by: Rory Flynn --- mvt/common/module.py | 9 ++++----- mvt/common/utils.py | 23 +++++++++++++++++++++++ tests/common/test_utils.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 5 deletions(-) diff --git a/mvt/common/module.py b/mvt/common/module.py index b7a63fc..9d7694d 100644 --- a/mvt/common/module.py +++ b/mvt/common/module.py @@ -4,14 +4,13 @@ # https://license.mvt.re/1.1/ import csv +import json import logging import os import re from typing import Any, Dict, List, Optional, Union -import simplejson as json - -from .utils import exec_or_profile +from .utils import CustomJSONEncoder, exec_or_profile class DatabaseNotFoundError(Exception): @@ -103,7 +102,7 @@ class MVTModule: results_json_path = os.path.join(self.results_path, results_file_name) with open(results_json_path, "w", encoding="utf-8") as handle: try: - json.dump(self.results, handle, indent=4, default=str) + json.dump(self.results, handle, indent=4, cls=CustomJSONEncoder) except Exception as exc: self.log.error( "Unable to store results of module %s to file %s: %s", @@ -116,7 +115,7 @@ class MVTModule: detected_file_name = f"{name}_detected.json" detected_json_path = os.path.join(self.results_path, detected_file_name) with open(detected_json_path, "w", encoding="utf-8") as handle: - json.dump(self.detected, handle, indent=4, default=str) + json.dump(self.detected, handle, indent=4, cls=CustomJSONEncoder) def serialize(self, record: dict) -> Union[dict, list, None]: raise NotImplementedError diff --git a/mvt/common/utils.py b/mvt/common/utils.py index 2baa9fd..86cb971 100644 --- a/mvt/common/utils.py +++ b/mvt/common/utils.py @@ -6,6 +6,7 @@ import cProfile import datetime import hashlib +import json import logging import os import re @@ -14,6 +15,28 @@ from typing import Any, Iterator, Union from rich.logging import RichHandler +class CustomJSONEncoder(json.JSONEncoder): + """ + Custom JSON encoder to handle non-standard types. + + Some modules are storing non-UTF-8 bytes in their results dictionaries. + This causes exceptions when the results are being encoded as JSON. + + Of course this means that when MVT is run via `check-iocs` with existing + results, the encoded version will be loaded back into the dictionary. + Modules should ensure they encode anything that needs to be compared + against an indicator in a JSON-friendly type. + """ + + def default(self, o): + if isinstance(o, bytes): + # Decode as utf-8, replace any invalid UTF-8 bytes with escaped hex + return o.decode("utf-8", errors="backslashreplace") + + # For all other types try to use the string representation. + return str(o) + + def convert_chrometime_to_datetime(timestamp: int) -> datetime.datetime: """Converts Chrome timestamp to a datetime. diff --git a/tests/common/test_utils.py b/tests/common/test_utils.py index 1624777..f2f8433 100644 --- a/tests/common/test_utils.py +++ b/tests/common/test_utils.py @@ -3,10 +3,13 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import json import logging import os +from datetime import datetime from mvt.common.utils import ( + CustomJSONEncoder, convert_datetime_to_iso, convert_mactime_to_iso, convert_unix_to_iso, @@ -64,3 +67,29 @@ class TestHashes: hashes[1]["sha256"] == "cfae0e04ef139b5a2ae1e2b3d400ce67eb98e67ff66f56ba2a580fe41bc120d0" ) + + +class TestCustomJSONEncoder: + def test__normal_input(self): + assert json.dumps({"a": "b"}, cls=CustomJSONEncoder) == '{"a": "b"}' + + def test__datetime_object(self): + assert ( + json.dumps( + {"timestamp": datetime(2023, 11, 13, 12, 21, 49, 727467)}, + cls=CustomJSONEncoder, + ) + == '{"timestamp": "2023-11-13 12:21:49.727467"}' + ) + + def test__bytes_non_utf_8(self): + assert ( + json.dumps({"identifier": b"\xa8\xa9"}, cls=CustomJSONEncoder) + == """{"identifier": "\\\\xa8\\\\xa9"}""" + ) + + def test__bytes_valid_utf_8(self): + assert ( + json.dumps({"name": "家".encode()}, cls=CustomJSONEncoder) + == '{"name": "\\u5bb6"}' + )