Add examples for working with named pipes and resistive buttons with ADS1115

This commit is contained in:
Niellun
2025-11-04 01:09:47 +02:00
parent 765a437f1d
commit 2bfe7116dc
4 changed files with 357 additions and 1 deletions
+16
View File
@@ -0,0 +1,16 @@
# Makefile for compiling buttons.cpp with optimizations for release
CXX := g++
CXXFLAGS := -std=c++17 -O3 -Wall -Wextra -march=native -flto
LDFLAGS := -flto
all: keylistener emulator
keylistener: keylistener.cpp
$(CXX) $(CXXFLAGS) keylistener.cpp -o keylistener $(LDFLAGS)
emulator: emulator.cpp
$(CXX) $(CXXFLAGS) emulator.cpp -o emulator $(LDFLAGS)
clean:
rm -f keylistener emulator
+220
View File
@@ -0,0 +1,220 @@
/**
* @brief Example code for intercepting resistive button presses via an ADS1115 ADC module on a Raspberry Pi.
*
* This file demonstrates how to read the voltage levels from the button wires connected to an
* ADS1115 analogtodigital converter over I²C. The voltages are decoded into button
* identifiers, longpress events are detected, and corresponding key codes are sent to the
* main application through a named FIFO pipe.
*
* Important:`FIFO_PATH` must match the pipe name configured in the main application
* settings.
*
* @note The program ignores `SIGPIPE` to avoid termination when the pipe is closed.
* @note Compile with a C++17 (or newer) compiler and link against the I²C library.
*/
#include <iostream>
#include <vector>
#include <thread>
#include <chrono>
#include <cstdint>
#include <functional>
#include <fcntl.h>
#include <unistd.h>
#include <linux/i2c-dev.h>
#include <sys/ioctl.h>
#include <signal.h>
#define FIFO_PATH "/tmp/fastcarplay_pipe"
#define LONG_PRESS 20
#define ADS1115_ADDRESS 0x48
#define FREQUENCY 50
#define BTN_LEFT 100
#define BTN_RIGHT 101
#define BTN_SELECT_DOWN 104
#define BTN_SELECT_UP 105
#define BTN_BACK 106
#define BTN_HOME 200
int fifo_fd = -1;
std::vector<double> voltages = {2.7, 1.9, 1.15, 0.45};
class ButtonProcessor
{
public:
ButtonProcessor(std::function<void(int, bool)> callback, int long_press)
: _callback(callback), _long_press(long_press),
_current(0), _previous(0), _count(0) {}
void process_voltage(double voltage)
{
int button_id = 0;
if (voltage < voltages[0])
{
for (auto v : voltages)
{
button_id++;
if (voltage > v)
break;
}
}
if (button_id == 0)
{
if (_previous > 0 && _count < _long_press)
{
_callback(_previous, false);
}
_previous = 0;
}
else if (_previous == button_id)
{
if (_count < _long_press)
{
_count++;
if (_count == _long_press)
{
_callback(button_id, true);
}
}
}
else if (_current == button_id)
{
_previous = button_id;
_count = 1;
}
_current = button_id;
}
private:
std::function<void(int, bool)> _callback;
int _long_press;
int _current;
int _previous;
int _count;
};
void send_key_code(uint8_t i)
{
if (fifo_fd == -1)
{
fifo_fd = open(FIFO_PATH, O_WRONLY | O_NONBLOCK);
if (fifo_fd == -1)
return; // cannot open FIFO
}
ssize_t n = write(fifo_fd, &i, sizeof(i));
if (n == -1)
{
// handle broken pipe or closed reader
close(fifo_fd);
fifo_fd = -1;
}
}
// Callback examples
void callback0(int button_id, bool long_press)
{
std::cout << "[Channel 0] Button " << button_id << " pressed. Long press: " << long_press << "\n";
if (button_id == 4)
{
if (long_press)
{
send_key_code(BTN_SELECT_DOWN);
send_key_code(BTN_SELECT_UP);
}
else
{
send_key_code(BTN_RIGHT);
}
}
if (button_id == 3)
{
if (long_press)
{
send_key_code(BTN_BACK);
}
else
{
send_key_code(BTN_LEFT);
}
}
}
void callback1(int button_id, bool long_press)
{
std::cout << "[Channel 1] Button " << button_id << " pressed. Long press: " << long_press << "\n";
if (button_id == 3)
{
if (!long_press)
{
send_key_code(BTN_HOME);
}
}
}
double read_diff(int file_i2c, uint8_t mux)
{
uint16_t config = 0x8000 // OS = start conversion
| (mux << 12) // MUX
| (0x01 << 9) // ±4.096V
| (0x01 << 8) // single-shot
| 0x00A3; // 128 SPS, comparator off
uint8_t buffer[3];
buffer[0] = 0x01; // Config register
buffer[1] = (config >> 8) & 0xFF;
buffer[2] = config & 0xFF;
write(file_i2c, buffer, 3);
std::this_thread::sleep_for(std::chrono::milliseconds(5));
uint8_t read_buf[2];
buffer[0] = 0x00; // Conversion register
write(file_i2c, buffer, 1);
read(file_i2c, read_buf, 2);
int16_t value = (read_buf[0] << 8) | read_buf[1];
return static_cast<double>(value) * 4.096 / 32768.0;
}
int main()
{
signal(SIGPIPE, SIG_IGN); // ignore SIGPIPE globally
int file_i2c = open("/dev/i2c-1", O_RDWR);
if (file_i2c < 0 || ioctl(file_i2c, I2C_SLAVE, ADS1115_ADDRESS) < 0)
{
std::cerr << "Failed to open I2C device\n";
return 1;
}
ButtonProcessor proc0(callback0, LONG_PRESS);
ButtonProcessor proc1(callback1, LONG_PRESS);
constexpr double interval = 1.0 / FREQUENCY;
while (true)
{
auto loop_start = std::chrono::steady_clock::now();
double v0 = read_diff(file_i2c, 0x04);
double v1 = read_diff(file_i2c, 0x05);
proc0.process_voltage(v0);
proc1.process_voltage(v1);
auto loop_end = std::chrono::steady_clock::now();
std::chrono::duration<double> elapsed = loop_end - loop_start;
if (elapsed.count() < interval)
{
std::this_thread::sleep_for(std::chrono::duration<double>(interval - elapsed.count()));
}
}
close(file_i2c);
return 0;
}
+120
View File
@@ -0,0 +1,120 @@
#!/usr/bin/env python3
import time
import struct
from smbus2 import SMBus
ADS1115_ADDRESS = 0x48
FIFO_PATH = "/tmp/myfifo"
LONG_PRESS = 20
fifo_file = None
class ButtonProcessor:
def __init__(self, voltages, callback, long_press_threshold):
"""
:param voltages: List of voltages sorted in descending order.
:param callback: Function to call when a button press is detected. Signature: callback(button_id: int, long_press: bool)
:param long_press_threshold: Number of consecutive calls to consider a long press.
"""
self.voltages = voltages
self.callback = callback
self.long_press_threshold = long_press_threshold
self.current_id = 0
self.previous_id = 0
self.press_count = 0
def process_voltage(self, voltage):
"""
Process the next voltage reading.
:param voltage: The voltage to process.
"""
button_id = 0
if voltage < self.voltages[0]:
for v in self.voltages:
button_id = button_id + 1
if voltage > v:
break
if button_id == 0:
if self.previous_id > 0 and self.press_count < self.long_press_threshold:
self.callback(self.previous_id, False)
self.previous_id = 0
elif self.previous_id == button_id:
if self.press_count < self.long_press_threshold:
self.press_count = self.press_count + 1
if self.press_count == self.long_press_threshold:
self.callback(button_id, True)
elif self.current_id == button_id:
self.previous_id = button_id
self.press_count = 1
self.current_id = button_id
def read_diff(bus, mux):
# 128 SPS, ±4.096V, single-shot (sufficient for 25 Hz)
config = (0x8000 # OS = start conversion
| (mux << 12) # MUX select
| (0x01 << 9) # ±4.096V
| (0x01 << 8) # single-shot
| 0x00A3) # 128 SPS, comparator off
bus.write_i2c_block_data(ADS1115_ADDRESS, 0x01, [(config >> 8) & 0xFF, config & 0xFF])
time.sleep(0.005) # ~5ms conversion time at 250 SPS
data = bus.read_i2c_block_data(ADS1115_ADDRESS, 0x00, 2)
value = (data[0] << 8) | data[1]
if value > 0x7FFF:
value -= 0x10000
return value * 4.096 / 32768
def open_fifo():
global fifo_file
try:
fifo_file = open(FIFO_PATH, "wb", buffering=0)
except FileNotFoundError:
fifo_file = None
return False
return True
def send_key_code(i: int):
global fifo_file
if fifo_file is None:
if not open_fifo():
return
try:
fifo_file.write(struct.pack("B", i))
fifo_file.flush()
except (BrokenPipeError, FileNotFoundError):
fifo_file = None
send_key_code(i)
# Define a callback for channel A that prints debug info.
def callback0(button_id: int, long_press: bool):
print(f"[Channel 0] Button {button_id} pressed. Long press: {long_press}")
# Define a callback for channel B that prints debug info.
def callback1(button_id: int, long_press: bool):
print(f"[Channel 1] Button {button_id} pressed. Long press: {long_press}")
def main():
with SMBus(1) as bus:
interval = 1/50
# Configure voltage thresholds (example values). Adjust according to hardware.
voltage_thresholds = [2.7, 1.9, 1.15, 0.45]
# Create ButtonProcessor instances for each channel with its own callback.
proc0 = ButtonProcessor(voltage_thresholds, callback0, LONG_PRESS)
proc1 = ButtonProcessor(voltage_thresholds, callback1, LONG_PRESS)
# Continuously read voltages and feed them to the processors.
while True:
loop_start = time.time()
v0 = read_diff(bus, 0x04) # AIN0-AIN3
v1 = read_diff(bus, 0x05) # AIN1-AIN3
proc0.process_voltage(v0)
proc1.process_voltage(v1)
elapsed = time.time() - loop_start
time.sleep(max(0, interval - elapsed))
if __name__ == "__main__":
main()
+1 -1
View File
@@ -24,7 +24,7 @@ extern "C"
#define FRAME_DELAY_INACTIVE 200
static const char *title = "Fast Car Play v0.5";
static const char *title = "Fast Car Play v0.6";
static SDL_Window *window = nullptr;
static SDL_Renderer *renderer = nullptr;
Uint32 evtStatus = (Uint32)-1;