remove notebook

This commit is contained in:
ggman12
2026-02-12 16:07:28 -05:00
parent fecf9ff0ea
commit 2826dfd450
-640
View File
@@ -1,640 +0,0 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "06ae0319",
"metadata": {},
"outputs": [],
"source": [
"import clickhouse_connect\n",
"client = clickhouse_connect.get_client(\n",
" host=os.environ[\"CLICKHOUSE_HOST\"],\n",
" username=os.environ[\"CLICKHOUSE_USERNAME\"],\n",
" password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n",
" secure=True,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "779710f0",
"metadata": {},
"outputs": [],
"source": [
"df = client.query_df(\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '2024-01-01 00:00:00' AND time < '2024-01-02 00:00:00'\")\n",
"df_copy = df.copy()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bf024da8",
"metadata": {},
"outputs": [],
"source": [
"# -- military = dbFlags & 1; interesting = dbFlags & 2; PIA = dbFlags & 4; LADD = dbFlags & 8;"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "270607b5",
"metadata": {},
"outputs": [],
"source": [
"df = load_raw_adsb_for_day(datetime(2024,1,1))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ac06a30e",
"metadata": {},
"outputs": [],
"source": [
"df['aircraft']"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "91edab3e",
"metadata": {},
"outputs": [],
"source": [
"COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n",
"def compress_df(df):\n",
" icao = df.name\n",
" df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n",
" original_df = df.copy()\n",
" df = df.groupby(\"_signature\", as_index=False).last() # check if it works with both last and first.\n",
" # For each row, create a dict of non-empty column values. This is using sets and subsets...\n",
" def get_non_empty_dict(row):\n",
" return {col: row[col] for col in COLUMNS if row[col] != ''}\n",
" \n",
" df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n",
" df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n",
" \n",
" # Check if row i's non-empty values are a subset of row j's non-empty values\n",
" def is_subset_of_any(idx):\n",
" row_dict = df.loc[idx, '_non_empty_dict']\n",
" row_count = df.loc[idx, '_non_empty_count']\n",
" \n",
" for other_idx in df.index:\n",
" if idx == other_idx:\n",
" continue\n",
" other_dict = df.loc[other_idx, '_non_empty_dict']\n",
" other_count = df.loc[other_idx, '_non_empty_count']\n",
" \n",
" # Check if all non-empty values in current row match those in other row\n",
" if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n",
" # If they match and other has more defined columns, current row is redundant\n",
" if other_count > row_count:\n",
" return True\n",
" return False\n",
" \n",
" # Keep rows that are not subsets of any other row\n",
" keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n",
" df = df[keep_mask]\n",
"\n",
" if len(df) > 1:\n",
" original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n",
" value_counts = original_df[\"_signature\"].value_counts()\n",
" max_signature = value_counts.idxmax()\n",
" df = df[df['_signature'] == max_signature]\n",
"\n",
" df['icao'] = icao\n",
" df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n",
" return df\n",
"\n",
"# df = df_copy\n",
"# df = df_copy.iloc[0:100000]\n",
"# df = df[df['r'] == \"N4131T\"]\n",
"# df = df[(df['icao'] == \"008081\")]\n",
"# df = df.iloc[0:500]\n",
"df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n",
"df = df.drop(columns=['aircraft'])\n",
"df = df.sort_values(['icao', 'time'])\n",
"df[COLUMNS] = df[COLUMNS].fillna('')\n",
"ORIGINAL_COLUMNS = df.columns.tolist()\n",
"df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n",
"cols = df_compressed.columns.tolist()\n",
"cols.remove(\"icao\")\n",
"cols.insert(1, \"icao\")\n",
"df_compressed = df_compressed[cols]\n",
"df_compressed"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "efdfcb2c",
"metadata": {},
"outputs": [],
"source": [
"df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n",
"df[~df['aircraft_category'].isna()]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "495c5025",
"metadata": {},
"outputs": [],
"source": [
"# SOME KIND OF MAP REDUCE SYSTEM\n",
"import os\n",
"\n",
"COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n",
"def compress_df(df):\n",
" icao = df.name\n",
" df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n",
" \n",
" # Compute signature counts before grouping (avoid copy)\n",
" signature_counts = df[\"_signature\"].value_counts()\n",
" \n",
" df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n",
" # For each row, create a dict of non-empty column values. This is using sets and subsets...\n",
" def get_non_empty_dict(row):\n",
" return {col: row[col] for col in COLUMNS if row[col] != ''}\n",
" \n",
" df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n",
" df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n",
" \n",
" # Check if row i's non-empty values are a subset of row j's non-empty values\n",
" def is_subset_of_any(idx):\n",
" row_dict = df.loc[idx, '_non_empty_dict']\n",
" row_count = df.loc[idx, '_non_empty_count']\n",
" \n",
" for other_idx in df.index:\n",
" if idx == other_idx:\n",
" continue\n",
" other_dict = df.loc[other_idx, '_non_empty_dict']\n",
" other_count = df.loc[other_idx, '_non_empty_count']\n",
" \n",
" # Check if all non-empty values in current row match those in other row\n",
" if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n",
" # If they match and other has more defined columns, current row is redundant\n",
" if other_count > row_count:\n",
" return True\n",
" return False\n",
" \n",
" # Keep rows that are not subsets of any other row\n",
" keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n",
" df = df[keep_mask]\n",
"\n",
" if len(df) > 1:\n",
" # Use pre-computed signature counts instead of original_df\n",
" remaining_sigs = df['_signature']\n",
" sig_counts = signature_counts[remaining_sigs]\n",
" max_signature = sig_counts.idxmax()\n",
" df = df[df['_signature'] == max_signature]\n",
"\n",
" df['icao'] = icao\n",
" df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n",
" return df\n",
"\n",
"# names of releases something like\n",
"# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n",
"\n",
"# Let's build historical first. \n",
"\n",
"_ch_client = None\n",
"\n",
"def _get_clickhouse_client():\n",
" \"\"\"Return a reusable ClickHouse client, with retry/backoff for transient DNS or connection errors.\"\"\"\n",
" global _ch_client\n",
" if _ch_client is not None:\n",
" return _ch_client\n",
"\n",
" import clickhouse_connect\n",
" import time\n",
"\n",
" max_retries = 5\n",
" for attempt in range(1, max_retries + 1):\n",
" try:\n",
" _ch_client = clickhouse_connect.get_client(\n",
" host=os.environ[\"CLICKHOUSE_HOST\"],\n",
" username=os.environ[\"CLICKHOUSE_USERNAME\"],\n",
" password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n",
" secure=True,\n",
" )\n",
" return _ch_client\n",
" except Exception as e:\n",
" wait = min(2 ** attempt, 30)\n",
" print(f\" ClickHouse connect attempt {attempt}/{max_retries} failed: {e}\")\n",
" if attempt == max_retries:\n",
" raise\n",
" print(f\" Retrying in {wait}s...\")\n",
" time.sleep(wait)\n",
"\n",
"\n",
"def load_raw_adsb_for_day(day):\n",
" \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n",
" from datetime import timedelta\n",
" from pathlib import Path\n",
" import pandas as pd\n",
" import time\n",
" \n",
" start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n",
" end_time = start_time + timedelta(days=1)\n",
" \n",
" # Set up caching\n",
" cache_dir = Path(\"data/adsb\")\n",
" cache_dir.mkdir(parents=True, exist_ok=True)\n",
" cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n",
" \n",
" # Check if cache exists\n",
" if cache_file.exists():\n",
" print(f\" Loading from cache: {cache_file}\")\n",
" df = pd.read_csv(cache_file, compression='zstd')\n",
" df['time'] = pd.to_datetime(df['time'])\n",
" else:\n",
" # Format dates for the query\n",
" start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n",
" end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n",
" \n",
" max_retries = 3\n",
" for attempt in range(1, max_retries + 1):\n",
" try:\n",
" client = _get_clickhouse_client()\n",
" print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n",
" df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n",
" break\n",
" except Exception as e:\n",
" wait = min(2 ** attempt, 30)\n",
" print(f\" Query attempt {attempt}/{max_retries} failed: {e}\")\n",
" if attempt == max_retries:\n",
" raise\n",
" # Reset client in case connection is stale\n",
" global _ch_client\n",
" _ch_client = None\n",
" print(f\" Retrying in {wait}s...\")\n",
" time.sleep(wait)\n",
" \n",
" # Save to cache\n",
" df.to_csv(cache_file, index=False, compression='zstd')\n",
" print(f\" Saved to cache: {cache_file}\")\n",
" \n",
" return df\n",
"\n",
"def load_historical_for_day(day):\n",
" from pathlib import Path\n",
" import pandas as pd\n",
" \n",
" df = load_raw_adsb_for_day(day)\n",
" print(df)\n",
" df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n",
" df = df.drop(columns=['aircraft'])\n",
" df = df.sort_values(['icao', 'time'])\n",
" df[COLUMNS] = df[COLUMNS].fillna('')\n",
" df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n",
" cols = df_compressed.columns.tolist()\n",
" cols.remove('time')\n",
" cols.insert(0, 'time')\n",
" cols.remove(\"icao\")\n",
" cols.insert(1, \"icao\")\n",
" df_compressed = df_compressed[cols]\n",
" return df_compressed\n",
"\n",
"\n",
"def concat_compressed_dfs(df_base, df_new):\n",
" \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n",
" import pandas as pd\n",
" \n",
" # Combine both dataframes\n",
" df_combined = pd.concat([df_base, df_new], ignore_index=True)\n",
" \n",
" # Sort by ICAO and time\n",
" df_combined = df_combined.sort_values(['icao', 'time'])\n",
" \n",
" # Fill NaN values\n",
" df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n",
" \n",
" # Apply compression logic per ICAO to get the best row\n",
" df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n",
" \n",
" # Sort by time\n",
" df_compressed = df_compressed.sort_values('time')\n",
" \n",
" return df_compressed\n",
"\n",
"\n",
"def get_latest_aircraft_adsb_csv_df():\n",
" \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n",
" from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n",
" \n",
" import pandas as pd\n",
" import re\n",
" \n",
" csv_path = download_latest_aircraft_adsb_csv()\n",
" df = pd.read_csv(csv_path)\n",
" df = df.fillna(\"\")\n",
" \n",
" # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n",
" match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n",
" if not match:\n",
" raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n",
" \n",
" date_str = match.group(1)\n",
" return df, date_str\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7f66acf7",
"metadata": {},
"outputs": [],
"source": [
"# SOME KIND OF MAP REDUCE SYSTEM\n",
"\n",
"\n",
"COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n",
"def compress_df(df):\n",
" icao = df.name\n",
" df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n",
" original_df = df.copy()\n",
" df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n",
" # For each row, create a dict of non-empty column values. This is using sets and subsets...\n",
" def get_non_empty_dict(row):\n",
" return {col: row[col] for col in COLUMNS if row[col] != ''}\n",
" \n",
" df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n",
" df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n",
" \n",
" # Check if row i's non-empty values are a subset of row j's non-empty values\n",
" def is_subset_of_any(idx):\n",
" row_dict = df.loc[idx, '_non_empty_dict']\n",
" row_count = df.loc[idx, '_non_empty_count']\n",
" \n",
" for other_idx in df.index:\n",
" if idx == other_idx:\n",
" continue\n",
" other_dict = df.loc[other_idx, '_non_empty_dict']\n",
" other_count = df.loc[other_idx, '_non_empty_count']\n",
" \n",
" # Check if all non-empty values in current row match those in other row\n",
" if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n",
" # If they match and other has more defined columns, current row is redundant\n",
" if other_count > row_count:\n",
" return True\n",
" return False\n",
" \n",
" # Keep rows that are not subsets of any other row\n",
" keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n",
" df = df[keep_mask]\n",
"\n",
" if len(df) > 1:\n",
" original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n",
" value_counts = original_df[\"_signature\"].value_counts()\n",
" max_signature = value_counts.idxmax()\n",
" df = df[df['_signature'] == max_signature]\n",
"\n",
" df['icao'] = icao\n",
" df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n",
" return df\n",
"\n",
"# names of releases something like\n",
"# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n",
"\n",
"# Let's build historical first. \n",
"\n",
"def load_raw_adsb_for_day(day):\n",
" \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n",
" from datetime import timedelta\n",
" import clickhouse_connect\n",
" from pathlib import Path\n",
" import pandas as pd\n",
" \n",
" start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n",
" end_time = start_time + timedelta(days=1)\n",
" \n",
" # Set up caching\n",
" cache_dir = Path(\"data/adsb\")\n",
" cache_dir.mkdir(parents=True, exist_ok=True)\n",
" cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n",
" \n",
" # Check if cache exists\n",
" if cache_file.exists():\n",
" print(f\" Loading from cache: {cache_file}\")\n",
" df = pd.read_csv(cache_file, compression='zstd')\n",
" df['time'] = pd.to_datetime(df['time'])\n",
" else:\n",
" # Format dates for the query\n",
" start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n",
" end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n",
" \n",
" client = clickhouse_connect.get_client(\n",
" host=os.environ[\"CLICKHOUSE_HOST\"],\n",
" username=os.environ[\"CLICKHOUSE_USERNAME\"],\n",
" password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n",
" secure=True,\n",
" )\n",
" print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n",
" df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n",
" \n",
" # Save to cache\n",
" df.to_csv(cache_file, index=False, compression='zstd')\n",
" print(f\" Saved to cache: {cache_file}\")\n",
" \n",
" return df\n",
"\n",
"def load_historical_for_day(day):\n",
" from pathlib import Path\n",
" import pandas as pd\n",
" \n",
" df = load_raw_adsb_for_day(day)\n",
" \n",
" df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n",
" df = df.drop(columns=['aircraft'])\n",
" df = df.sort_values(['icao', 'time'])\n",
" df[COLUMNS] = df[COLUMNS].fillna('')\n",
" df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n",
" cols = df_compressed.columns.tolist()\n",
" cols.remove('time')\n",
" cols.insert(0, 'time')\n",
" cols.remove(\"icao\")\n",
" cols.insert(1, \"icao\")\n",
" df_compressed = df_compressed[cols]\n",
" return df_compressed\n",
"\n",
"\n",
"def concat_compressed_dfs(df_base, df_new):\n",
" \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n",
" import pandas as pd\n",
" \n",
" # Combine both dataframes\n",
" df_combined = pd.concat([df_base, df_new], ignore_index=True)\n",
" \n",
" # Sort by ICAO and time\n",
" df_combined = df_combined.sort_values(['icao', 'time'])\n",
" \n",
" # Fill NaN values\n",
" df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n",
" \n",
" # Apply compression logic per ICAO to get the best row\n",
" df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n",
" \n",
" # Sort by time\n",
" df_compressed = df_compressed.sort_values('time')\n",
" \n",
" return df_compressed\n",
"\n",
"\n",
"def get_latest_aircraft_adsb_csv_df():\n",
" \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n",
" from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n",
" \n",
" import pandas as pd\n",
" import re\n",
" \n",
" csv_path = download_latest_aircraft_adsb_csv()\n",
" df = pd.read_csv(csv_path)\n",
" df = df.fillna(\"\")\n",
" \n",
" # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n",
" match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n",
" if not match:\n",
" raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n",
" \n",
" date_str = match.group(1)\n",
" return df, date_str\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e14c8363",
"metadata": {},
"outputs": [],
"source": [
"from datetime import datetime\n",
"df = load_historical_for_day(datetime(2024,1,1))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3874ba4d",
"metadata": {},
"outputs": [],
"source": [
"len(df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bcae50ad",
"metadata": {},
"outputs": [],
"source": [
"df[(df['icao'] == \"008081\")]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "50921c86",
"metadata": {},
"outputs": [],
"source": [
"df[df['icao'] == \"a4e1d2\"]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8194d9aa",
"metadata": {},
"outputs": [],
"source": [
"df[df['r'] == \"N4131T\"]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1e3b7aa2",
"metadata": {},
"outputs": [],
"source": [
"df_compressed[df_compressed['icao'].duplicated(keep=False)]\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "40613bc1",
"metadata": {},
"outputs": [],
"source": [
"import gzip\n",
"import json\n",
"\n",
"path = \"/Users/jonahgoode/Downloads/test_extract/traces/fb/trace_full_acbbfb.json\"\n",
"\n",
"with gzip.open(path, \"rt\", encoding=\"utf-8\") as f:\n",
" data = json.load(f)\n",
"\n",
"print(type(data))\n",
"# use `data` here\n",
"import json\n",
"print(json.dumps(data, indent=2)[:2000])\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "320109b2",
"metadata": {},
"outputs": [],
"source": [
"# First, load the JSON to inspect its structure\n",
"import json\n",
"with open(\"/Users/jonahgoode/Documents/PlaneQuery/Other-Code/readsb-protobuf/webapp/src/db/aircrafts.json\", 'r') as f:\n",
" data = json.load(f)\n",
"\n",
"# Check the structure\n",
"print(type(data))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "590134f4",
"metadata": {},
"outputs": [],
"source": [
"data['AC97E3']"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.10"
}
},
"nbformat": 4,
"nbformat_minor": 5
}