{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "06ae0319", "metadata": {}, "outputs": [], "source": [ "import clickhouse_connect\n", "client = clickhouse_connect.get_client(\n", " host=os.environ[\"CLICKHOUSE_HOST\"],\n", " username=os.environ[\"CLICKHOUSE_USERNAME\"],\n", " password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n", " secure=True,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "779710f0", "metadata": {}, "outputs": [], "source": [ "df = client.query_df(\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '2024-01-01 00:00:00' AND time < '2024-01-02 00:00:00'\")\n", "df_copy = df.copy()" ] }, { "cell_type": "code", "execution_count": null, "id": "bf024da8", "metadata": {}, "outputs": [], "source": [ "# -- military = dbFlags & 1; interesting = dbFlags & 2; PIA = dbFlags & 4; LADD = dbFlags & 8;" ] }, { "cell_type": "code", "execution_count": null, "id": "270607b5", "metadata": {}, "outputs": [], "source": [ "df = load_raw_adsb_for_day(datetime(2024,1,1))" ] }, { "cell_type": "code", "execution_count": null, "id": "ac06a30e", "metadata": {}, "outputs": [], "source": [ "df['aircraft']" ] }, { "cell_type": "code", "execution_count": null, "id": "91edab3e", "metadata": {}, "outputs": [], "source": [ "COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n", "def compress_df(df):\n", " icao = df.name\n", " df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n", " original_df = df.copy()\n", " df = df.groupby(\"_signature\", as_index=False).last() # check if it works with both last and first.\n", " # For each row, create a dict of non-empty column values. This is using sets and subsets...\n", " def get_non_empty_dict(row):\n", " return {col: row[col] for col in COLUMNS if row[col] != ''}\n", " \n", " df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n", " df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n", " \n", " # Check if row i's non-empty values are a subset of row j's non-empty values\n", " def is_subset_of_any(idx):\n", " row_dict = df.loc[idx, '_non_empty_dict']\n", " row_count = df.loc[idx, '_non_empty_count']\n", " \n", " for other_idx in df.index:\n", " if idx == other_idx:\n", " continue\n", " other_dict = df.loc[other_idx, '_non_empty_dict']\n", " other_count = df.loc[other_idx, '_non_empty_count']\n", " \n", " # Check if all non-empty values in current row match those in other row\n", " if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n", " # If they match and other has more defined columns, current row is redundant\n", " if other_count > row_count:\n", " return True\n", " return False\n", " \n", " # Keep rows that are not subsets of any other row\n", " keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n", " df = df[keep_mask]\n", "\n", " if len(df) > 1:\n", " original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n", " value_counts = original_df[\"_signature\"].value_counts()\n", " max_signature = value_counts.idxmax()\n", " df = df[df['_signature'] == max_signature]\n", "\n", " df['icao'] = icao\n", " df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n", " return df\n", "\n", "# df = df_copy\n", "# df = df_copy.iloc[0:100000]\n", "# df = df[df['r'] == \"N4131T\"]\n", "# df = df[(df['icao'] == \"008081\")]\n", "# df = df.iloc[0:500]\n", "df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", "df = df.drop(columns=['aircraft'])\n", "df = df.sort_values(['icao', 'time'])\n", "df[COLUMNS] = df[COLUMNS].fillna('')\n", "ORIGINAL_COLUMNS = df.columns.tolist()\n", "df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n", "cols = df_compressed.columns.tolist()\n", "cols.remove(\"icao\")\n", "cols.insert(1, \"icao\")\n", "df_compressed = df_compressed[cols]\n", "df_compressed" ] }, { "cell_type": "code", "execution_count": null, "id": "efdfcb2c", "metadata": {}, "outputs": [], "source": [ "df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", "df[~df['aircraft_category'].isna()]" ] }, { "cell_type": "code", "execution_count": null, "id": "495c5025", "metadata": {}, "outputs": [], "source": [ "# SOME KIND OF MAP REDUCE SYSTEM\n", "import os\n", "\n", "COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n", "def compress_df(df):\n", " icao = df.name\n", " df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n", " \n", " # Compute signature counts before grouping (avoid copy)\n", " signature_counts = df[\"_signature\"].value_counts()\n", " \n", " df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n", " # For each row, create a dict of non-empty column values. This is using sets and subsets...\n", " def get_non_empty_dict(row):\n", " return {col: row[col] for col in COLUMNS if row[col] != ''}\n", " \n", " df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n", " df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n", " \n", " # Check if row i's non-empty values are a subset of row j's non-empty values\n", " def is_subset_of_any(idx):\n", " row_dict = df.loc[idx, '_non_empty_dict']\n", " row_count = df.loc[idx, '_non_empty_count']\n", " \n", " for other_idx in df.index:\n", " if idx == other_idx:\n", " continue\n", " other_dict = df.loc[other_idx, '_non_empty_dict']\n", " other_count = df.loc[other_idx, '_non_empty_count']\n", " \n", " # Check if all non-empty values in current row match those in other row\n", " if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n", " # If they match and other has more defined columns, current row is redundant\n", " if other_count > row_count:\n", " return True\n", " return False\n", " \n", " # Keep rows that are not subsets of any other row\n", " keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n", " df = df[keep_mask]\n", "\n", " if len(df) > 1:\n", " # Use pre-computed signature counts instead of original_df\n", " remaining_sigs = df['_signature']\n", " sig_counts = signature_counts[remaining_sigs]\n", " max_signature = sig_counts.idxmax()\n", " df = df[df['_signature'] == max_signature]\n", "\n", " df['icao'] = icao\n", " df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n", " return df\n", "\n", "# names of releases something like\n", "# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n", "\n", "# Let's build historical first. \n", "\n", "_ch_client = None\n", "\n", "def _get_clickhouse_client():\n", " \"\"\"Return a reusable ClickHouse client, with retry/backoff for transient DNS or connection errors.\"\"\"\n", " global _ch_client\n", " if _ch_client is not None:\n", " return _ch_client\n", "\n", " import clickhouse_connect\n", " import time\n", "\n", " max_retries = 5\n", " for attempt in range(1, max_retries + 1):\n", " try:\n", " _ch_client = clickhouse_connect.get_client(\n", " host=os.environ[\"CLICKHOUSE_HOST\"],\n", " username=os.environ[\"CLICKHOUSE_USERNAME\"],\n", " password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n", " secure=True,\n", " )\n", " return _ch_client\n", " except Exception as e:\n", " wait = min(2 ** attempt, 30)\n", " print(f\" ClickHouse connect attempt {attempt}/{max_retries} failed: {e}\")\n", " if attempt == max_retries:\n", " raise\n", " print(f\" Retrying in {wait}s...\")\n", " time.sleep(wait)\n", "\n", "\n", "def load_raw_adsb_for_day(day):\n", " \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n", " from datetime import timedelta\n", " from pathlib import Path\n", " import pandas as pd\n", " import time\n", " \n", " start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n", " end_time = start_time + timedelta(days=1)\n", " \n", " # Set up caching\n", " cache_dir = Path(\"data/adsb\")\n", " cache_dir.mkdir(parents=True, exist_ok=True)\n", " cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n", " \n", " # Check if cache exists\n", " if cache_file.exists():\n", " print(f\" Loading from cache: {cache_file}\")\n", " df = pd.read_csv(cache_file, compression='zstd')\n", " df['time'] = pd.to_datetime(df['time'])\n", " else:\n", " # Format dates for the query\n", " start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n", " end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n", " \n", " max_retries = 3\n", " for attempt in range(1, max_retries + 1):\n", " try:\n", " client = _get_clickhouse_client()\n", " print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n", " df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n", " break\n", " except Exception as e:\n", " wait = min(2 ** attempt, 30)\n", " print(f\" Query attempt {attempt}/{max_retries} failed: {e}\")\n", " if attempt == max_retries:\n", " raise\n", " # Reset client in case connection is stale\n", " global _ch_client\n", " _ch_client = None\n", " print(f\" Retrying in {wait}s...\")\n", " time.sleep(wait)\n", " \n", " # Save to cache\n", " df.to_csv(cache_file, index=False, compression='zstd')\n", " print(f\" Saved to cache: {cache_file}\")\n", " \n", " return df\n", "\n", "def load_historical_for_day(day):\n", " from pathlib import Path\n", " import pandas as pd\n", " \n", " df = load_raw_adsb_for_day(day)\n", " print(df)\n", " df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", " df = df.drop(columns=['aircraft'])\n", " df = df.sort_values(['icao', 'time'])\n", " df[COLUMNS] = df[COLUMNS].fillna('')\n", " df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n", " cols = df_compressed.columns.tolist()\n", " cols.remove('time')\n", " cols.insert(0, 'time')\n", " cols.remove(\"icao\")\n", " cols.insert(1, \"icao\")\n", " df_compressed = df_compressed[cols]\n", " return df_compressed\n", "\n", "\n", "def concat_compressed_dfs(df_base, df_new):\n", " \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n", " import pandas as pd\n", " \n", " # Combine both dataframes\n", " df_combined = pd.concat([df_base, df_new], ignore_index=True)\n", " \n", " # Sort by ICAO and time\n", " df_combined = df_combined.sort_values(['icao', 'time'])\n", " \n", " # Fill NaN values\n", " df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n", " \n", " # Apply compression logic per ICAO to get the best row\n", " df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n", " \n", " # Sort by time\n", " df_compressed = df_compressed.sort_values('time')\n", " \n", " return df_compressed\n", "\n", "\n", "def get_latest_aircraft_adsb_csv_df():\n", " \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n", " from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n", " \n", " import pandas as pd\n", " import re\n", " \n", " csv_path = download_latest_aircraft_adsb_csv()\n", " df = pd.read_csv(csv_path)\n", " df = df.fillna(\"\")\n", " \n", " # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n", " match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n", " if not match:\n", " raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n", " \n", " date_str = match.group(1)\n", " return df, date_str\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "7f66acf7", "metadata": {}, "outputs": [], "source": [ "# SOME KIND OF MAP REDUCE SYSTEM\n", "\n", "\n", "COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n", "def compress_df(df):\n", " icao = df.name\n", " df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n", " original_df = df.copy()\n", " df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n", " # For each row, create a dict of non-empty column values. This is using sets and subsets...\n", " def get_non_empty_dict(row):\n", " return {col: row[col] for col in COLUMNS if row[col] != ''}\n", " \n", " df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n", " df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n", " \n", " # Check if row i's non-empty values are a subset of row j's non-empty values\n", " def is_subset_of_any(idx):\n", " row_dict = df.loc[idx, '_non_empty_dict']\n", " row_count = df.loc[idx, '_non_empty_count']\n", " \n", " for other_idx in df.index:\n", " if idx == other_idx:\n", " continue\n", " other_dict = df.loc[other_idx, '_non_empty_dict']\n", " other_count = df.loc[other_idx, '_non_empty_count']\n", " \n", " # Check if all non-empty values in current row match those in other row\n", " if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n", " # If they match and other has more defined columns, current row is redundant\n", " if other_count > row_count:\n", " return True\n", " return False\n", " \n", " # Keep rows that are not subsets of any other row\n", " keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n", " df = df[keep_mask]\n", "\n", " if len(df) > 1:\n", " original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n", " value_counts = original_df[\"_signature\"].value_counts()\n", " max_signature = value_counts.idxmax()\n", " df = df[df['_signature'] == max_signature]\n", "\n", " df['icao'] = icao\n", " df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n", " return df\n", "\n", "# names of releases something like\n", "# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n", "\n", "# Let's build historical first. \n", "\n", "def load_raw_adsb_for_day(day):\n", " \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n", " from datetime import timedelta\n", " import clickhouse_connect\n", " from pathlib import Path\n", " import pandas as pd\n", " \n", " start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n", " end_time = start_time + timedelta(days=1)\n", " \n", " # Set up caching\n", " cache_dir = Path(\"data/adsb\")\n", " cache_dir.mkdir(parents=True, exist_ok=True)\n", " cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n", " \n", " # Check if cache exists\n", " if cache_file.exists():\n", " print(f\" Loading from cache: {cache_file}\")\n", " df = pd.read_csv(cache_file, compression='zstd')\n", " df['time'] = pd.to_datetime(df['time'])\n", " else:\n", " # Format dates for the query\n", " start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n", " end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n", " \n", " client = clickhouse_connect.get_client(\n", " host=os.environ[\"CLICKHOUSE_HOST\"],\n", " username=os.environ[\"CLICKHOUSE_USERNAME\"],\n", " password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n", " secure=True,\n", " )\n", " print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n", " df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n", " \n", " # Save to cache\n", " df.to_csv(cache_file, index=False, compression='zstd')\n", " print(f\" Saved to cache: {cache_file}\")\n", " \n", " return df\n", "\n", "def load_historical_for_day(day):\n", " from pathlib import Path\n", " import pandas as pd\n", " \n", " df = load_raw_adsb_for_day(day)\n", " \n", " df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n", " df = df.drop(columns=['aircraft'])\n", " df = df.sort_values(['icao', 'time'])\n", " df[COLUMNS] = df[COLUMNS].fillna('')\n", " df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n", " cols = df_compressed.columns.tolist()\n", " cols.remove('time')\n", " cols.insert(0, 'time')\n", " cols.remove(\"icao\")\n", " cols.insert(1, \"icao\")\n", " df_compressed = df_compressed[cols]\n", " return df_compressed\n", "\n", "\n", "def concat_compressed_dfs(df_base, df_new):\n", " \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n", " import pandas as pd\n", " \n", " # Combine both dataframes\n", " df_combined = pd.concat([df_base, df_new], ignore_index=True)\n", " \n", " # Sort by ICAO and time\n", " df_combined = df_combined.sort_values(['icao', 'time'])\n", " \n", " # Fill NaN values\n", " df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n", " \n", " # Apply compression logic per ICAO to get the best row\n", " df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n", " \n", " # Sort by time\n", " df_compressed = df_compressed.sort_values('time')\n", " \n", " return df_compressed\n", "\n", "\n", "def get_latest_aircraft_adsb_csv_df():\n", " \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n", " from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n", " \n", " import pandas as pd\n", " import re\n", " \n", " csv_path = download_latest_aircraft_adsb_csv()\n", " df = pd.read_csv(csv_path)\n", " df = df.fillna(\"\")\n", " \n", " # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n", " match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n", " if not match:\n", " raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n", " \n", " date_str = match.group(1)\n", " return df, date_str\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "e14c8363", "metadata": {}, "outputs": [], "source": [ "from datetime import datetime\n", "df = load_historical_for_day(datetime(2024,1,1))" ] }, { "cell_type": "code", "execution_count": null, "id": "3874ba4d", "metadata": {}, "outputs": [], "source": [ "len(df)" ] }, { "cell_type": "code", "execution_count": null, "id": "bcae50ad", "metadata": {}, "outputs": [], "source": [ "df[(df['icao'] == \"008081\")]" ] }, { "cell_type": "code", "execution_count": null, "id": "50921c86", "metadata": {}, "outputs": [], "source": [ "df[df['icao'] == \"a4e1d2\"]" ] }, { "cell_type": "code", "execution_count": null, "id": "8194d9aa", "metadata": {}, "outputs": [], "source": [ "df[df['r'] == \"N4131T\"]" ] }, { "cell_type": "code", "execution_count": null, "id": "1e3b7aa2", "metadata": {}, "outputs": [], "source": [ "df_compressed[df_compressed['icao'].duplicated(keep=False)]\n" ] }, { "cell_type": "code", "execution_count": null, "id": "40613bc1", "metadata": {}, "outputs": [], "source": [ "import gzip\n", "import json\n", "\n", "path = \"/Users/jonahgoode/Downloads/test_extract/traces/fb/trace_full_acbbfb.json\"\n", "\n", "with gzip.open(path, \"rt\", encoding=\"utf-8\") as f:\n", " data = json.load(f)\n", "\n", "print(type(data))\n", "# use `data` here\n", "import json\n", "print(json.dumps(data, indent=2)[:2000])\n" ] }, { "cell_type": "code", "execution_count": null, "id": "320109b2", "metadata": {}, "outputs": [], "source": [ "# First, load the JSON to inspect its structure\n", "import json\n", "with open(\"/Users/jonahgoode/Documents/PlaneQuery/Other-Code/readsb-protobuf/webapp/src/db/aircrafts.json\", 'r') as f:\n", " data = json.load(f)\n", "\n", "# Check the structure\n", "print(type(data))" ] }, { "cell_type": "code", "execution_count": null, "id": "590134f4", "metadata": {}, "outputs": [], "source": [ "data['AC97E3']" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.10" } }, "nbformat": 4, "nbformat_minor": 5 }