mirror of
https://github.com/PlaneQuery/OpenAirframes.git
synced 2026-04-23 19:46:09 +02:00
27da93801e
add clickhouse_connect use 32GB update to no longer do df.copy() Add planequery_adsb_read.ipynb INCREASE: update Fargate task definition to 16 vCPU and 64 GB memory for improved performance on large datasets update notebook remove print(df) Ensure empty strings are preserved in DataFrame columns check if day has data for adsb update notebook
641 lines
24 KiB
Plaintext
641 lines
24 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "06ae0319",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import clickhouse_connect\n",
|
|
"client = clickhouse_connect.get_client(\n",
|
|
" host=os.environ[\"CLICKHOUSE_HOST\"],\n",
|
|
" username=os.environ[\"CLICKHOUSE_USERNAME\"],\n",
|
|
" password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n",
|
|
" secure=True,\n",
|
|
")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "779710f0",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df = client.query_df(\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '2024-01-01 00:00:00' AND time < '2024-01-02 00:00:00'\")\n",
|
|
"df_copy = df.copy()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "bf024da8",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# -- military = dbFlags & 1; interesting = dbFlags & 2; PIA = dbFlags & 4; LADD = dbFlags & 8;"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "270607b5",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df = load_raw_adsb_for_day(datetime(2024,1,1))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "ac06a30e",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df['aircraft']"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "91edab3e",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n",
|
|
"def compress_df(df):\n",
|
|
" icao = df.name\n",
|
|
" df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n",
|
|
" original_df = df.copy()\n",
|
|
" df = df.groupby(\"_signature\", as_index=False).last() # check if it works with both last and first.\n",
|
|
" # For each row, create a dict of non-empty column values. This is using sets and subsets...\n",
|
|
" def get_non_empty_dict(row):\n",
|
|
" return {col: row[col] for col in COLUMNS if row[col] != ''}\n",
|
|
" \n",
|
|
" df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n",
|
|
" df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n",
|
|
" \n",
|
|
" # Check if row i's non-empty values are a subset of row j's non-empty values\n",
|
|
" def is_subset_of_any(idx):\n",
|
|
" row_dict = df.loc[idx, '_non_empty_dict']\n",
|
|
" row_count = df.loc[idx, '_non_empty_count']\n",
|
|
" \n",
|
|
" for other_idx in df.index:\n",
|
|
" if idx == other_idx:\n",
|
|
" continue\n",
|
|
" other_dict = df.loc[other_idx, '_non_empty_dict']\n",
|
|
" other_count = df.loc[other_idx, '_non_empty_count']\n",
|
|
" \n",
|
|
" # Check if all non-empty values in current row match those in other row\n",
|
|
" if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n",
|
|
" # If they match and other has more defined columns, current row is redundant\n",
|
|
" if other_count > row_count:\n",
|
|
" return True\n",
|
|
" return False\n",
|
|
" \n",
|
|
" # Keep rows that are not subsets of any other row\n",
|
|
" keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n",
|
|
" df = df[keep_mask]\n",
|
|
"\n",
|
|
" if len(df) > 1:\n",
|
|
" original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n",
|
|
" value_counts = original_df[\"_signature\"].value_counts()\n",
|
|
" max_signature = value_counts.idxmax()\n",
|
|
" df = df[df['_signature'] == max_signature]\n",
|
|
"\n",
|
|
" df['icao'] = icao\n",
|
|
" df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n",
|
|
" return df\n",
|
|
"\n",
|
|
"# df = df_copy\n",
|
|
"# df = df_copy.iloc[0:100000]\n",
|
|
"# df = df[df['r'] == \"N4131T\"]\n",
|
|
"# df = df[(df['icao'] == \"008081\")]\n",
|
|
"# df = df.iloc[0:500]\n",
|
|
"df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n",
|
|
"df = df.drop(columns=['aircraft'])\n",
|
|
"df = df.sort_values(['icao', 'time'])\n",
|
|
"df[COLUMNS] = df[COLUMNS].fillna('')\n",
|
|
"ORIGINAL_COLUMNS = df.columns.tolist()\n",
|
|
"df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n",
|
|
"cols = df_compressed.columns.tolist()\n",
|
|
"cols.remove(\"icao\")\n",
|
|
"cols.insert(1, \"icao\")\n",
|
|
"df_compressed = df_compressed[cols]\n",
|
|
"df_compressed"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "efdfcb2c",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n",
|
|
"df[~df['aircraft_category'].isna()]"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "495c5025",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# SOME KIND OF MAP REDUCE SYSTEM\n",
|
|
"import os\n",
|
|
"\n",
|
|
"COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n",
|
|
"def compress_df(df):\n",
|
|
" icao = df.name\n",
|
|
" df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n",
|
|
" \n",
|
|
" # Compute signature counts before grouping (avoid copy)\n",
|
|
" signature_counts = df[\"_signature\"].value_counts()\n",
|
|
" \n",
|
|
" df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n",
|
|
" # For each row, create a dict of non-empty column values. This is using sets and subsets...\n",
|
|
" def get_non_empty_dict(row):\n",
|
|
" return {col: row[col] for col in COLUMNS if row[col] != ''}\n",
|
|
" \n",
|
|
" df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n",
|
|
" df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n",
|
|
" \n",
|
|
" # Check if row i's non-empty values are a subset of row j's non-empty values\n",
|
|
" def is_subset_of_any(idx):\n",
|
|
" row_dict = df.loc[idx, '_non_empty_dict']\n",
|
|
" row_count = df.loc[idx, '_non_empty_count']\n",
|
|
" \n",
|
|
" for other_idx in df.index:\n",
|
|
" if idx == other_idx:\n",
|
|
" continue\n",
|
|
" other_dict = df.loc[other_idx, '_non_empty_dict']\n",
|
|
" other_count = df.loc[other_idx, '_non_empty_count']\n",
|
|
" \n",
|
|
" # Check if all non-empty values in current row match those in other row\n",
|
|
" if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n",
|
|
" # If they match and other has more defined columns, current row is redundant\n",
|
|
" if other_count > row_count:\n",
|
|
" return True\n",
|
|
" return False\n",
|
|
" \n",
|
|
" # Keep rows that are not subsets of any other row\n",
|
|
" keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n",
|
|
" df = df[keep_mask]\n",
|
|
"\n",
|
|
" if len(df) > 1:\n",
|
|
" # Use pre-computed signature counts instead of original_df\n",
|
|
" remaining_sigs = df['_signature']\n",
|
|
" sig_counts = signature_counts[remaining_sigs]\n",
|
|
" max_signature = sig_counts.idxmax()\n",
|
|
" df = df[df['_signature'] == max_signature]\n",
|
|
"\n",
|
|
" df['icao'] = icao\n",
|
|
" df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n",
|
|
" return df\n",
|
|
"\n",
|
|
"# names of releases something like\n",
|
|
"# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n",
|
|
"\n",
|
|
"# Let's build historical first. \n",
|
|
"\n",
|
|
"_ch_client = None\n",
|
|
"\n",
|
|
"def _get_clickhouse_client():\n",
|
|
" \"\"\"Return a reusable ClickHouse client, with retry/backoff for transient DNS or connection errors.\"\"\"\n",
|
|
" global _ch_client\n",
|
|
" if _ch_client is not None:\n",
|
|
" return _ch_client\n",
|
|
"\n",
|
|
" import clickhouse_connect\n",
|
|
" import time\n",
|
|
"\n",
|
|
" max_retries = 5\n",
|
|
" for attempt in range(1, max_retries + 1):\n",
|
|
" try:\n",
|
|
" _ch_client = clickhouse_connect.get_client(\n",
|
|
" host=os.environ[\"CLICKHOUSE_HOST\"],\n",
|
|
" username=os.environ[\"CLICKHOUSE_USERNAME\"],\n",
|
|
" password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n",
|
|
" secure=True,\n",
|
|
" )\n",
|
|
" return _ch_client\n",
|
|
" except Exception as e:\n",
|
|
" wait = min(2 ** attempt, 30)\n",
|
|
" print(f\" ClickHouse connect attempt {attempt}/{max_retries} failed: {e}\")\n",
|
|
" if attempt == max_retries:\n",
|
|
" raise\n",
|
|
" print(f\" Retrying in {wait}s...\")\n",
|
|
" time.sleep(wait)\n",
|
|
"\n",
|
|
"\n",
|
|
"def load_raw_adsb_for_day(day):\n",
|
|
" \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n",
|
|
" from datetime import timedelta\n",
|
|
" from pathlib import Path\n",
|
|
" import pandas as pd\n",
|
|
" import time\n",
|
|
" \n",
|
|
" start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n",
|
|
" end_time = start_time + timedelta(days=1)\n",
|
|
" \n",
|
|
" # Set up caching\n",
|
|
" cache_dir = Path(\"data/adsb\")\n",
|
|
" cache_dir.mkdir(parents=True, exist_ok=True)\n",
|
|
" cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n",
|
|
" \n",
|
|
" # Check if cache exists\n",
|
|
" if cache_file.exists():\n",
|
|
" print(f\" Loading from cache: {cache_file}\")\n",
|
|
" df = pd.read_csv(cache_file, compression='zstd')\n",
|
|
" df['time'] = pd.to_datetime(df['time'])\n",
|
|
" else:\n",
|
|
" # Format dates for the query\n",
|
|
" start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n",
|
|
" end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n",
|
|
" \n",
|
|
" max_retries = 3\n",
|
|
" for attempt in range(1, max_retries + 1):\n",
|
|
" try:\n",
|
|
" client = _get_clickhouse_client()\n",
|
|
" print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n",
|
|
" df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n",
|
|
" break\n",
|
|
" except Exception as e:\n",
|
|
" wait = min(2 ** attempt, 30)\n",
|
|
" print(f\" Query attempt {attempt}/{max_retries} failed: {e}\")\n",
|
|
" if attempt == max_retries:\n",
|
|
" raise\n",
|
|
" # Reset client in case connection is stale\n",
|
|
" global _ch_client\n",
|
|
" _ch_client = None\n",
|
|
" print(f\" Retrying in {wait}s...\")\n",
|
|
" time.sleep(wait)\n",
|
|
" \n",
|
|
" # Save to cache\n",
|
|
" df.to_csv(cache_file, index=False, compression='zstd')\n",
|
|
" print(f\" Saved to cache: {cache_file}\")\n",
|
|
" \n",
|
|
" return df\n",
|
|
"\n",
|
|
"def load_historical_for_day(day):\n",
|
|
" from pathlib import Path\n",
|
|
" import pandas as pd\n",
|
|
" \n",
|
|
" df = load_raw_adsb_for_day(day)\n",
|
|
" print(df)\n",
|
|
" df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n",
|
|
" df = df.drop(columns=['aircraft'])\n",
|
|
" df = df.sort_values(['icao', 'time'])\n",
|
|
" df[COLUMNS] = df[COLUMNS].fillna('')\n",
|
|
" df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n",
|
|
" cols = df_compressed.columns.tolist()\n",
|
|
" cols.remove('time')\n",
|
|
" cols.insert(0, 'time')\n",
|
|
" cols.remove(\"icao\")\n",
|
|
" cols.insert(1, \"icao\")\n",
|
|
" df_compressed = df_compressed[cols]\n",
|
|
" return df_compressed\n",
|
|
"\n",
|
|
"\n",
|
|
"def concat_compressed_dfs(df_base, df_new):\n",
|
|
" \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n",
|
|
" import pandas as pd\n",
|
|
" \n",
|
|
" # Combine both dataframes\n",
|
|
" df_combined = pd.concat([df_base, df_new], ignore_index=True)\n",
|
|
" \n",
|
|
" # Sort by ICAO and time\n",
|
|
" df_combined = df_combined.sort_values(['icao', 'time'])\n",
|
|
" \n",
|
|
" # Fill NaN values\n",
|
|
" df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n",
|
|
" \n",
|
|
" # Apply compression logic per ICAO to get the best row\n",
|
|
" df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n",
|
|
" \n",
|
|
" # Sort by time\n",
|
|
" df_compressed = df_compressed.sort_values('time')\n",
|
|
" \n",
|
|
" return df_compressed\n",
|
|
"\n",
|
|
"\n",
|
|
"def get_latest_aircraft_adsb_csv_df():\n",
|
|
" \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n",
|
|
" from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n",
|
|
" \n",
|
|
" import pandas as pd\n",
|
|
" import re\n",
|
|
" \n",
|
|
" csv_path = download_latest_aircraft_adsb_csv()\n",
|
|
" df = pd.read_csv(csv_path)\n",
|
|
" df = df.fillna(\"\")\n",
|
|
" \n",
|
|
" # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n",
|
|
" match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n",
|
|
" if not match:\n",
|
|
" raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n",
|
|
" \n",
|
|
" date_str = match.group(1)\n",
|
|
" return df, date_str\n",
|
|
"\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "7f66acf7",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# SOME KIND OF MAP REDUCE SYSTEM\n",
|
|
"\n",
|
|
"\n",
|
|
"COLUMNS = ['dbFlags', 'ownOp', 'year', 'desc', 'aircraft_category', 'r', 't']\n",
|
|
"def compress_df(df):\n",
|
|
" icao = df.name\n",
|
|
" df[\"_signature\"] = df[COLUMNS].astype(str).agg('|'.join, axis=1)\n",
|
|
" original_df = df.copy()\n",
|
|
" df = df.groupby(\"_signature\", as_index=False).first() # check if it works with both last and first.\n",
|
|
" # For each row, create a dict of non-empty column values. This is using sets and subsets...\n",
|
|
" def get_non_empty_dict(row):\n",
|
|
" return {col: row[col] for col in COLUMNS if row[col] != ''}\n",
|
|
" \n",
|
|
" df['_non_empty_dict'] = df.apply(get_non_empty_dict, axis=1)\n",
|
|
" df['_non_empty_count'] = df['_non_empty_dict'].apply(len)\n",
|
|
" \n",
|
|
" # Check if row i's non-empty values are a subset of row j's non-empty values\n",
|
|
" def is_subset_of_any(idx):\n",
|
|
" row_dict = df.loc[idx, '_non_empty_dict']\n",
|
|
" row_count = df.loc[idx, '_non_empty_count']\n",
|
|
" \n",
|
|
" for other_idx in df.index:\n",
|
|
" if idx == other_idx:\n",
|
|
" continue\n",
|
|
" other_dict = df.loc[other_idx, '_non_empty_dict']\n",
|
|
" other_count = df.loc[other_idx, '_non_empty_count']\n",
|
|
" \n",
|
|
" # Check if all non-empty values in current row match those in other row\n",
|
|
" if all(row_dict.get(k) == other_dict.get(k) for k in row_dict.keys()):\n",
|
|
" # If they match and other has more defined columns, current row is redundant\n",
|
|
" if other_count > row_count:\n",
|
|
" return True\n",
|
|
" return False\n",
|
|
" \n",
|
|
" # Keep rows that are not subsets of any other row\n",
|
|
" keep_mask = ~df.index.to_series().apply(is_subset_of_any)\n",
|
|
" df = df[keep_mask]\n",
|
|
"\n",
|
|
" if len(df) > 1:\n",
|
|
" original_df = original_df[original_df['_signature'].isin(df['_signature'])]\n",
|
|
" value_counts = original_df[\"_signature\"].value_counts()\n",
|
|
" max_signature = value_counts.idxmax()\n",
|
|
" df = df[df['_signature'] == max_signature]\n",
|
|
"\n",
|
|
" df['icao'] = icao\n",
|
|
" df = df.drop(columns=['_non_empty_dict', '_non_empty_count', '_signature'])\n",
|
|
" return df\n",
|
|
"\n",
|
|
"# names of releases something like\n",
|
|
"# planequery_aircraft_adsb_2024-06-01T00-00-00Z.csv.gz\n",
|
|
"\n",
|
|
"# Let's build historical first. \n",
|
|
"\n",
|
|
"def load_raw_adsb_for_day(day):\n",
|
|
" \"\"\"Load raw ADS-B data for a day from cache or ClickHouse.\"\"\"\n",
|
|
" from datetime import timedelta\n",
|
|
" import clickhouse_connect\n",
|
|
" from pathlib import Path\n",
|
|
" import pandas as pd\n",
|
|
" \n",
|
|
" start_time = day.replace(hour=0, minute=0, second=0, microsecond=0)\n",
|
|
" end_time = start_time + timedelta(days=1)\n",
|
|
" \n",
|
|
" # Set up caching\n",
|
|
" cache_dir = Path(\"data/adsb\")\n",
|
|
" cache_dir.mkdir(parents=True, exist_ok=True)\n",
|
|
" cache_file = cache_dir / f\"adsb_raw_{start_time.strftime('%Y-%m-%d')}.csv.zst\"\n",
|
|
" \n",
|
|
" # Check if cache exists\n",
|
|
" if cache_file.exists():\n",
|
|
" print(f\" Loading from cache: {cache_file}\")\n",
|
|
" df = pd.read_csv(cache_file, compression='zstd')\n",
|
|
" df['time'] = pd.to_datetime(df['time'])\n",
|
|
" else:\n",
|
|
" # Format dates for the query\n",
|
|
" start_str = start_time.strftime('%Y-%m-%d %H:%M:%S')\n",
|
|
" end_str = end_time.strftime('%Y-%m-%d %H:%M:%S')\n",
|
|
" \n",
|
|
" client = clickhouse_connect.get_client(\n",
|
|
" host=os.environ[\"CLICKHOUSE_HOST\"],\n",
|
|
" username=os.environ[\"CLICKHOUSE_USERNAME\"],\n",
|
|
" password=os.environ[\"CLICKHOUSE_PASSWORD\"],\n",
|
|
" secure=True,\n",
|
|
" )\n",
|
|
" print(f\" Querying ClickHouse for {start_time.strftime('%Y-%m-%d')}\")\n",
|
|
" df = client.query_df(f\"SELECT time, icao,r,t,dbFlags,ownOp,year,desc,aircraft FROM adsb_messages Where time > '{start_str}' AND time < '{end_str}'\")\n",
|
|
" \n",
|
|
" # Save to cache\n",
|
|
" df.to_csv(cache_file, index=False, compression='zstd')\n",
|
|
" print(f\" Saved to cache: {cache_file}\")\n",
|
|
" \n",
|
|
" return df\n",
|
|
"\n",
|
|
"def load_historical_for_day(day):\n",
|
|
" from pathlib import Path\n",
|
|
" import pandas as pd\n",
|
|
" \n",
|
|
" df = load_raw_adsb_for_day(day)\n",
|
|
" \n",
|
|
" df['aircraft_category'] = df['aircraft'].apply(lambda x: x.get('category') if isinstance(x, dict) else None)\n",
|
|
" df = df.drop(columns=['aircraft'])\n",
|
|
" df = df.sort_values(['icao', 'time'])\n",
|
|
" df[COLUMNS] = df[COLUMNS].fillna('')\n",
|
|
" df_compressed = df.groupby('icao',group_keys=False).apply(compress_df)\n",
|
|
" cols = df_compressed.columns.tolist()\n",
|
|
" cols.remove('time')\n",
|
|
" cols.insert(0, 'time')\n",
|
|
" cols.remove(\"icao\")\n",
|
|
" cols.insert(1, \"icao\")\n",
|
|
" df_compressed = df_compressed[cols]\n",
|
|
" return df_compressed\n",
|
|
"\n",
|
|
"\n",
|
|
"def concat_compressed_dfs(df_base, df_new):\n",
|
|
" \"\"\"Concatenate base and new compressed dataframes, keeping the most informative row per ICAO.\"\"\"\n",
|
|
" import pandas as pd\n",
|
|
" \n",
|
|
" # Combine both dataframes\n",
|
|
" df_combined = pd.concat([df_base, df_new], ignore_index=True)\n",
|
|
" \n",
|
|
" # Sort by ICAO and time\n",
|
|
" df_combined = df_combined.sort_values(['icao', 'time'])\n",
|
|
" \n",
|
|
" # Fill NaN values\n",
|
|
" df_combined[COLUMNS] = df_combined[COLUMNS].fillna('')\n",
|
|
" \n",
|
|
" # Apply compression logic per ICAO to get the best row\n",
|
|
" df_compressed = df_combined.groupby('icao', group_keys=False).apply(compress_df)\n",
|
|
" \n",
|
|
" # Sort by time\n",
|
|
" df_compressed = df_compressed.sort_values('time')\n",
|
|
" \n",
|
|
" return df_compressed\n",
|
|
"\n",
|
|
"\n",
|
|
"def get_latest_aircraft_adsb_csv_df():\n",
|
|
" \"\"\"Download and load the latest ADS-B CSV from GitHub releases.\"\"\"\n",
|
|
" from get_latest_planequery_aircraft_release import download_latest_aircraft_adsb_csv\n",
|
|
" \n",
|
|
" import pandas as pd\n",
|
|
" import re\n",
|
|
" \n",
|
|
" csv_path = download_latest_aircraft_adsb_csv()\n",
|
|
" df = pd.read_csv(csv_path)\n",
|
|
" df = df.fillna(\"\")\n",
|
|
" \n",
|
|
" # Extract start date from filename pattern: planequery_aircraft_adsb_{start_date}_{end_date}.csv\n",
|
|
" match = re.search(r\"planequery_aircraft_adsb_(\\d{4}-\\d{2}-\\d{2})_\", str(csv_path))\n",
|
|
" if not match:\n",
|
|
" raise ValueError(f\"Could not extract date from filename: {csv_path.name}\")\n",
|
|
" \n",
|
|
" date_str = match.group(1)\n",
|
|
" return df, date_str\n",
|
|
"\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "e14c8363",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from datetime import datetime\n",
|
|
"df = load_historical_for_day(datetime(2024,1,1))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "3874ba4d",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"len(df)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "bcae50ad",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df[(df['icao'] == \"008081\")]"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "50921c86",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df[df['icao'] == \"a4e1d2\"]"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "8194d9aa",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df[df['r'] == \"N4131T\"]"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "1e3b7aa2",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df_compressed[df_compressed['icao'].duplicated(keep=False)]\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "40613bc1",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import gzip\n",
|
|
"import json\n",
|
|
"\n",
|
|
"path = \"/Users/jonahgoode/Downloads/test_extract/traces/fb/trace_full_acbbfb.json\"\n",
|
|
"\n",
|
|
"with gzip.open(path, \"rt\", encoding=\"utf-8\") as f:\n",
|
|
" data = json.load(f)\n",
|
|
"\n",
|
|
"print(type(data))\n",
|
|
"# use `data` here\n",
|
|
"import json\n",
|
|
"print(json.dumps(data, indent=2)[:2000])\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "320109b2",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# First, load the JSON to inspect its structure\n",
|
|
"import json\n",
|
|
"with open(\"/Users/jonahgoode/Documents/PlaneQuery/Other-Code/readsb-protobuf/webapp/src/db/aircrafts.json\", 'r') as f:\n",
|
|
" data = json.load(f)\n",
|
|
"\n",
|
|
"# Check the structure\n",
|
|
"print(type(data))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "590134f4",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"data['AC97E3']"
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": ".venv",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.12.10"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 5
|
|
}
|