Files
anoracleofra-code 668ce16dc7 v0.9.6: InfoNet hashchain, Wormhole gate encryption, mesh reputation, 16 community contributors
Gate messages now propagate via the Infonet hashchain as encrypted blobs — every node syncs them
through normal chain sync while only Gate members with MLS keys can decrypt. Added mesh reputation
system, peer push workers, voluntary Wormhole opt-in for node participation, fork recovery,
killwormhole scripts, obfuscated terminology, and hardened the self-updater to protect encryption
keys and chain state during updates.

New features: Shodan search, train tracking, Sentinel Hub imagery, 8 new intelligence layers,
CCTV expansion to 11,000+ cameras across 6 countries, Mesh Terminal CLI, prediction markets,
desktop-shell scaffold, and comprehensive mesh test suite (215 frontend + backend tests passing).

Community contributors: @wa1id, @AlborzNazari, @adust09, @Xpirix, @imqdcr, @csysp, @suranyami,
@chr0n1x, @johan-martensson, @singularfailure, @smithbh, @OrfeoTerkuci, @deuza, @tm-const,
@Elhard1, @ttulttul
2026-03-26 05:58:04 -06:00

441 lines
17 KiB
Python

"""ctypes bridge for the Rust privacy-core crate.
This module follows the architecture docs in extra/docs-internal:
- Python orchestrates
- Rust owns private protocol state
- Python sees opaque integer handles and serialized ciphertext only
"""
from __future__ import annotations
import ctypes
import json
import os
from pathlib import Path
from typing import Iterable
class PrivacyCoreError(RuntimeError):
"""Raised when the Rust privacy-core returns an error."""
class PrivacyCoreUnavailable(PrivacyCoreError):
"""Raised when the shared library cannot be found or loaded."""
class _ByteBuffer(ctypes.Structure):
_fields_ = [
("data", ctypes.POINTER(ctypes.c_uint8)),
("len", ctypes.c_size_t),
]
class PrivacyCoreClient:
"""Handle-based interface to the local Rust privacy-core."""
def __init__(self, library: ctypes.CDLL, library_path: Path) -> None:
self._library = library
self.library_path = library_path
self._configure_functions()
@classmethod
def load(cls, library_path: str | os.PathLike[str] | None = None) -> "PrivacyCoreClient":
resolved = cls._resolve_library_path(library_path)
try:
library = ctypes.CDLL(str(resolved))
except OSError as exc:
raise PrivacyCoreUnavailable(f"failed to load privacy-core library: {resolved}") from exc
return cls(library, resolved)
@staticmethod
def _resolve_library_path(library_path: str | os.PathLike[str] | None) -> Path:
if library_path:
resolved = Path(library_path).expanduser().resolve()
if not resolved.exists():
raise PrivacyCoreUnavailable(f"privacy-core library not found: {resolved}")
return resolved
env_override = os.environ.get("PRIVACY_CORE_LIB")
if env_override:
resolved = Path(env_override).expanduser().resolve()
if not resolved.exists():
raise PrivacyCoreUnavailable(f"privacy-core library not found: {resolved}")
return resolved
repo_root = Path(__file__).resolve().parents[2]
candidates = []
for profile in ("debug", "release"):
target_dir = repo_root / "privacy-core" / "target" / profile
candidates.extend(
[
target_dir / "privacy_core.dll",
target_dir / "libprivacy_core.so",
target_dir / "libprivacy_core.dylib",
]
)
for candidate in candidates:
if candidate.exists():
return candidate.resolve()
searched = "\n".join(str(candidate) for candidate in candidates)
raise PrivacyCoreUnavailable(
"privacy-core shared library not found. Looked in:\n" f"{searched}"
)
def _configure_functions(self) -> None:
self._library.privacy_core_version.argtypes = []
self._library.privacy_core_version.restype = _ByteBuffer
self._library.privacy_core_last_error_message.argtypes = []
self._library.privacy_core_last_error_message.restype = _ByteBuffer
self._library.privacy_core_free_buffer.argtypes = [_ByteBuffer]
self._library.privacy_core_free_buffer.restype = None
self._library.privacy_core_create_identity.argtypes = []
self._library.privacy_core_create_identity.restype = ctypes.c_uint64
self._library.privacy_core_export_key_package.argtypes = [ctypes.c_uint64]
self._library.privacy_core_export_key_package.restype = _ByteBuffer
self._library.privacy_core_import_key_package.argtypes = [
ctypes.POINTER(ctypes.c_uint8),
ctypes.c_size_t,
]
self._library.privacy_core_import_key_package.restype = ctypes.c_uint64
self._library.privacy_core_create_group.argtypes = [ctypes.c_uint64]
self._library.privacy_core_create_group.restype = ctypes.c_uint64
self._library.privacy_core_add_member.argtypes = [ctypes.c_uint64, ctypes.c_uint64]
self._library.privacy_core_add_member.restype = ctypes.c_uint64
self._library.privacy_core_remove_member.argtypes = [ctypes.c_uint64, ctypes.c_uint32]
self._library.privacy_core_remove_member.restype = ctypes.c_uint64
self._library.privacy_core_encrypt_group_message.argtypes = [
ctypes.c_uint64,
ctypes.POINTER(ctypes.c_uint8),
ctypes.c_size_t,
]
self._library.privacy_core_encrypt_group_message.restype = _ByteBuffer
self._library.privacy_core_decrypt_group_message.argtypes = [
ctypes.c_uint64,
ctypes.POINTER(ctypes.c_uint8),
ctypes.c_size_t,
]
self._library.privacy_core_decrypt_group_message.restype = _ByteBuffer
self._library.privacy_core_export_public_bundle.argtypes = [ctypes.c_uint64]
self._library.privacy_core_export_public_bundle.restype = _ByteBuffer
self._library.privacy_core_handle_stats.argtypes = [
ctypes.POINTER(ctypes.c_uint8),
ctypes.c_size_t,
]
self._library.privacy_core_handle_stats.restype = ctypes.c_int64
self._library.privacy_core_commit_message_bytes.argtypes = [ctypes.c_uint64]
self._library.privacy_core_commit_message_bytes.restype = _ByteBuffer
self._library.privacy_core_commit_welcome_message_bytes.argtypes = [
ctypes.c_uint64,
ctypes.c_size_t,
]
self._library.privacy_core_commit_welcome_message_bytes.restype = _ByteBuffer
self._library.privacy_core_commit_joined_group_handle.argtypes = [
ctypes.c_uint64,
ctypes.c_size_t,
]
self._library.privacy_core_commit_joined_group_handle.restype = ctypes.c_uint64
self._library.privacy_core_create_dm_session.argtypes = [
ctypes.c_uint64,
ctypes.c_uint64,
]
self._library.privacy_core_create_dm_session.restype = ctypes.c_int64
self._library.privacy_core_dm_encrypt.argtypes = [
ctypes.c_uint64,
ctypes.POINTER(ctypes.c_uint8),
ctypes.c_size_t,
ctypes.POINTER(ctypes.c_uint8),
ctypes.c_size_t,
]
self._library.privacy_core_dm_encrypt.restype = ctypes.c_int64
self._library.privacy_core_dm_decrypt.argtypes = [
ctypes.c_uint64,
ctypes.POINTER(ctypes.c_uint8),
ctypes.c_size_t,
ctypes.POINTER(ctypes.c_uint8),
ctypes.c_size_t,
]
self._library.privacy_core_dm_decrypt.restype = ctypes.c_int64
self._library.privacy_core_dm_session_welcome.argtypes = [
ctypes.c_uint64,
ctypes.POINTER(ctypes.c_uint8),
ctypes.c_size_t,
]
self._library.privacy_core_dm_session_welcome.restype = ctypes.c_int64
self._library.privacy_core_join_dm_session.argtypes = [
ctypes.c_uint64,
ctypes.POINTER(ctypes.c_uint8),
ctypes.c_size_t,
]
self._library.privacy_core_join_dm_session.restype = ctypes.c_int64
self._library.privacy_core_release_dm_session.argtypes = [ctypes.c_uint64]
self._library.privacy_core_release_dm_session.restype = ctypes.c_int32
self._library.privacy_core_release_identity.argtypes = [ctypes.c_uint64]
self._library.privacy_core_release_identity.restype = ctypes.c_bool
self._library.privacy_core_release_key_package.argtypes = [ctypes.c_uint64]
self._library.privacy_core_release_key_package.restype = ctypes.c_bool
self._library.privacy_core_release_group.argtypes = [ctypes.c_uint64]
self._library.privacy_core_release_group.restype = ctypes.c_bool
self._library.privacy_core_release_commit.argtypes = [ctypes.c_uint64]
self._library.privacy_core_release_commit.restype = ctypes.c_bool
self._library.privacy_core_reset_all_state.argtypes = []
self._library.privacy_core_reset_all_state.restype = ctypes.c_bool
def version(self) -> str:
return self._consume_string(self._library.privacy_core_version())
def create_identity(self) -> int:
return self._ensure_handle(self._library.privacy_core_create_identity(), "create_identity")
def export_key_package(self, identity_handle: int) -> bytes:
return self._consume_bytes(
self._library.privacy_core_export_key_package(ctypes.c_uint64(identity_handle)),
"export_key_package",
)
def import_key_package(self, data: bytes) -> int:
buffer = self._as_ubyte_buffer(data)
handle = self._library.privacy_core_import_key_package(buffer, len(data))
return self._ensure_handle(handle, "import_key_package")
def create_group(self, identity_handle: int) -> int:
handle = self._library.privacy_core_create_group(ctypes.c_uint64(identity_handle))
return self._ensure_handle(handle, "create_group")
def add_member(self, group_handle: int, key_package_handle: int) -> int:
handle = self._library.privacy_core_add_member(
ctypes.c_uint64(group_handle),
ctypes.c_uint64(key_package_handle),
)
return self._ensure_handle(handle, "add_member")
def remove_member(self, group_handle: int, member_ref: int) -> int:
handle = self._library.privacy_core_remove_member(
ctypes.c_uint64(group_handle),
ctypes.c_uint32(member_ref),
)
return self._ensure_handle(handle, "remove_member")
def encrypt_group_message(self, group_handle: int, plaintext: bytes) -> bytes:
buffer = self._as_ubyte_buffer(plaintext)
return self._consume_bytes(
self._library.privacy_core_encrypt_group_message(
ctypes.c_uint64(group_handle),
buffer,
len(plaintext),
),
"encrypt_group_message",
)
def decrypt_group_message(self, group_handle: int, ciphertext: bytes) -> bytes:
buffer = self._as_ubyte_buffer(ciphertext)
return self._consume_bytes(
self._library.privacy_core_decrypt_group_message(
ctypes.c_uint64(group_handle),
buffer,
len(ciphertext),
),
"decrypt_group_message",
)
def export_public_bundle(self, identity_handle: int) -> bytes:
return self._consume_bytes(
self._library.privacy_core_export_public_bundle(ctypes.c_uint64(identity_handle)),
"export_public_bundle",
)
def handle_stats(self) -> dict:
payload = self._call_i64_bytes_op(
"handle_stats",
lambda out_buf, out_cap: self._library.privacy_core_handle_stats(out_buf, out_cap),
)
try:
return json.loads(payload.decode("utf-8"))
except Exception as exc:
raise PrivacyCoreError(f"handle_stats failed: invalid JSON: {exc}") from exc
def commit_message_bytes(self, commit_handle: int) -> bytes:
return self._consume_bytes(
self._library.privacy_core_commit_message_bytes(ctypes.c_uint64(commit_handle)),
"commit_message_bytes",
)
def commit_welcome_message_bytes(self, commit_handle: int, index: int = 0) -> bytes:
return self._consume_bytes(
self._library.privacy_core_commit_welcome_message_bytes(
ctypes.c_uint64(commit_handle),
ctypes.c_size_t(index),
),
"commit_welcome_message_bytes",
)
def commit_joined_group_handle(self, commit_handle: int, index: int = 0) -> int:
handle = self._library.privacy_core_commit_joined_group_handle(
ctypes.c_uint64(commit_handle),
ctypes.c_size_t(index),
)
return self._ensure_handle(handle, "commit_joined_group_handle")
def create_dm_session(self, initiator_identity: int, responder_key_package: int) -> int:
handle = self._library.privacy_core_create_dm_session(
ctypes.c_uint64(initiator_identity),
ctypes.c_uint64(responder_key_package),
)
if handle > 0:
return int(handle)
raise self._error_for("create_dm_session")
def dm_encrypt(self, session_handle: int, plaintext: bytes) -> bytes:
buffer = self._as_ubyte_buffer(plaintext)
return self._call_i64_bytes_op(
"dm_encrypt",
lambda out_buf, out_cap: self._library.privacy_core_dm_encrypt(
ctypes.c_uint64(session_handle),
buffer,
len(plaintext),
out_buf,
out_cap,
),
)
def dm_decrypt(self, session_handle: int, ciphertext: bytes) -> bytes:
buffer = self._as_ubyte_buffer(ciphertext)
return self._call_i64_bytes_op(
"dm_decrypt",
lambda out_buf, out_cap: self._library.privacy_core_dm_decrypt(
ctypes.c_uint64(session_handle),
buffer,
len(ciphertext),
out_buf,
out_cap,
),
)
def dm_session_welcome(self, session_handle: int) -> bytes:
return self._call_i64_bytes_op(
"dm_session_welcome",
lambda out_buf, out_cap: self._library.privacy_core_dm_session_welcome(
ctypes.c_uint64(session_handle),
out_buf,
out_cap,
),
)
def join_dm_session(self, responder_identity: int, welcome: bytes) -> int:
buffer = self._as_ubyte_buffer(welcome)
handle = self._library.privacy_core_join_dm_session(
ctypes.c_uint64(responder_identity),
buffer,
len(welcome),
)
if handle > 0:
return int(handle)
raise self._error_for("join_dm_session")
def release_dm_session(self, handle: int) -> bool:
return bool(self._library.privacy_core_release_dm_session(ctypes.c_uint64(handle)))
def release_identity(self, handle: int) -> bool:
return bool(self._library.privacy_core_release_identity(ctypes.c_uint64(handle)))
def release_key_package(self, handle: int) -> bool:
return bool(self._library.privacy_core_release_key_package(ctypes.c_uint64(handle)))
def release_group(self, handle: int) -> bool:
return bool(self._library.privacy_core_release_group(ctypes.c_uint64(handle)))
def release_commit(self, handle: int) -> bool:
return bool(self._library.privacy_core_release_commit(ctypes.c_uint64(handle)))
def reset_all_state(self) -> bool:
return bool(self._library.privacy_core_reset_all_state())
def _consume_string(self, buffer: _ByteBuffer) -> str:
payload = self._consume_buffer(buffer)
return payload.decode("utf-8")
def _consume_bytes(self, buffer: _ByteBuffer, operation: str) -> bytes:
payload = self._consume_buffer(buffer)
if payload:
return payload
raise self._error_for(operation)
def _consume_buffer(self, buffer: _ByteBuffer) -> bytes:
try:
if not buffer.data or buffer.len == 0:
return b""
return bytes(ctypes.string_at(buffer.data, buffer.len))
finally:
self._library.privacy_core_free_buffer(buffer)
def _ensure_handle(self, handle: int, operation: str) -> int:
if handle:
return int(handle)
raise self._error_for(operation)
def _call_i64_bytes_op(self, operation: str, invoker) -> bytes:
required = int(invoker(None, 0))
if required < 0:
raise self._error_for(operation)
if required == 0:
return b""
output = (ctypes.c_uint8 * required)()
written = int(invoker(output, required))
if written < 0:
raise self._error_for(operation)
return bytes(output[:written])
def _error_for(self, operation: str) -> PrivacyCoreError:
message = self._last_error()
if message:
return PrivacyCoreError(f"{operation} failed: {message}")
return PrivacyCoreError(f"{operation} failed without an error message")
def _last_error(self) -> str:
return self._consume_string(self._library.privacy_core_last_error_message())
@staticmethod
def _as_ubyte_buffer(data: bytes | bytearray | memoryview) -> ctypes.Array[ctypes.c_uint8]:
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError("privacy-core byte arguments must be bytes-like")
raw = bytes(data)
return (ctypes.c_uint8 * len(raw)).from_buffer_copy(raw)
def candidate_library_paths() -> Iterable[Path]:
"""Expose the default search order for diagnostics/tests."""
repo_root = Path(__file__).resolve().parents[2]
for profile in ("debug", "release"):
target_dir = repo_root / "privacy-core" / "target" / profile
yield target_dir / "privacy_core.dll"
yield target_dir / "libprivacy_core.so"
yield target_dir / "libprivacy_core.dylib"