Async USB read

This commit is contained in:
Niellune
2026-03-21 03:28:52 +02:00
parent 6e1cf00086
commit a06e58133f
11 changed files with 271 additions and 149 deletions
+1 -1
View File
@@ -320,7 +320,7 @@ void Application::loop()
interface.drawHome(true, PROTOCOL_STATUS_UNKNOWN);
VideoBuffer videoBuffer;
Protocol protocol(Settings::width, Settings::height, Settings::sourceFps, AV_INPUT_BUFFER_PADDING_SIZE);
Protocol protocol(Settings::width, Settings::height, Settings::sourceFps);
Decoder decoder;
PcmAudio audioMain("Main"), audioAux("Aux");
+91 -66
View File
@@ -9,8 +9,7 @@
#include "helper/functions.h"
#include "settings.h"
Connector::Connector(uint16_t videoPadding)
: _videoPadding(videoPadding)
Connector::Connector()
{
try
{
@@ -25,22 +24,58 @@ Connector::Connector(uint16_t videoPadding)
int result = libusb_init(&_context);
if (result < 0)
throw std::runtime_error(std::string("Can't initialise USB: ") + libusb_error_name(result));
_usbTransfers = Settings::usbQueue;
if(_usbTransfers > MAX_USB_REQUESTS)
_usbTransfers = MAX_USB_REQUESTS;
for (int i = 0; i < _usbTransfers; i++)
{
_usbTransfer[i] = libusb_alloc_transfer(0);
uint8_t *buf = static_cast<uint8_t *>(malloc(Settings::usbBufferSize));
libusb_fill_bulk_transfer(_usbTransfer[i], _device, _endpoint_in, buf, Settings::usbBufferSize, Connector::onUsbRead, this, READ_TIMEOUT);
}
}
Connector::~Connector()
{
_active = false;
_queue.notify();
libusb_interrupt_event_handler(_context);
if (_write_thread.joinable())
_write_thread.join();
if (_read_thread.joinable())
_read_thread.join();
if (_cipher)
{
delete _cipher;
_cipher = nullptr;
}
for (int i = 0; i < _usbTransfers; i++)
{
if (_usbTransfer[i])
{
timeval timeout{0, 100000};
libusb_cancel_transfer(_usbTransfer[i]);
libusb_handle_events_timeout_completed(_context, &timeout, nullptr);
}
}
for (int i = 0; i < _usbTransfers; i++)
{
if (_usbTransfer[i])
{
if (_usbTransfer[i]->buffer)
free(_usbTransfer[i]->buffer);
libusb_free_transfer(_usbTransfer[i]);
_usbTransfer[i] = nullptr;
}
}
if (_device)
{
libusb_release_interface(_device, 0);
@@ -71,11 +106,15 @@ void Connector::stop()
_active = false;
_queue.notify();
libusb_interrupt_event_handler(_context);
state(PROTOCOL_STATUS_INITIALISING);
if (_write_thread.joinable())
_write_thread.join();
if (_read_thread.joinable())
_read_thread.join();
}
bool Connector::connect(uint16_t vendor_id, uint16_t product_id)
@@ -138,7 +177,7 @@ void Connector::release()
}
}
bool Connector::nextState(u_int8_t state)
bool Connector::state(u_int8_t state)
{
if (state == _state)
return false;
@@ -154,6 +193,7 @@ bool Connector::nextState(u_int8_t state)
_nodeviceCount = 0;
_failCount = 0;
_state = state;
onStatus(state);
return true;
}
@@ -161,18 +201,13 @@ bool Connector::nextState(u_int8_t state)
{
_failCount = 0;
_state = state;
onStatus(state);
return true;
}
return false;
}
void Connector::state(u_int8_t state)
{
if (nextState(state))
onStatus(state);
}
bool Connector::linkFail(int status, const char *msg)
{
if (status == 0)
@@ -183,6 +218,14 @@ bool Connector::linkFail(int status, const char *msg)
return true;
}
void Connector::onDisconnect()
{
if (!_connected)
return;
std::cout << "[Connection] Device disconnected" << std::endl;
_connected = false;
}
bool Connector::send(std::unique_ptr<Command> packet)
{
if (!_connected || !packet)
@@ -323,68 +366,50 @@ void Connector::printMessage(uint32_t cmd, uint32_t length, uint8_t *data, bool
std::cout << oss.str() << std::endl;
}
void Connector::onUsbRead(libusb_transfer *transfer)
{
Connector *c = static_cast<Connector *>(transfer->user_data);
if (!c->_active)
return;
if (transfer->status == LIBUSB_TRANSFER_NO_DEVICE)
{
c->onDisconnect();
return;
}
if (c->_active && transfer->status == LIBUSB_TRANSFER_COMPLETED && transfer->actual_length > 0)
c->onData(transfer->buffer, transfer->actual_length);
if (c->_active && (libusb_submit_transfer(transfer) != LIBUSB_SUCCESS))
{
std::cout << "[Connection] USB transfer re-submit failed" << std::endl;
c->onDisconnect();
}
}
void Connector::readLoop()
{
Header header;
int transferred = 0;
uint8_t *data = nullptr;
// Set thread name
setThreadName("protocol-reader");
timeval timeout{0, 100000};
for (int i = 0; i < _usbTransfers; i++)
{
_usbTransfer[i]->dev_handle = _device;
_usbTransfer[i]->endpoint = _endpoint_in;
int status = libusb_submit_transfer(_usbTransfer[i]);
if (status != LIBUSB_SUCCESS)
{
std::cout << "[Connection] USB transfer submit " << i << " failed: " << status << std::endl;
onDisconnect();
return;
}
}
while (_active && _connected)
{
int result = libusb_bulk_transfer(_device, _endpoint_in, reinterpret_cast<uint8_t *>(&header), sizeof(Header), &transferred, READ_TIMEOUT);
if (result == LIBUSB_ERROR_NO_DEVICE)
{
std::cout << "[Connection] Device disconnected" << std::endl;
_connected = false;
continue;
}
if (result != LIBUSB_SUCCESS || transferred != sizeof(Header))
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
continue;
}
int padding = 0;
if (header.type == 6)
padding = _videoPadding;
if (header.length > 0)
{
data = (uint8_t *)malloc(header.length + padding);
if (!data)
continue;
libusb_bulk_transfer(_device, _endpoint_in, data, header.length, &transferred, READ_TIMEOUT);
}
if (header.magic == MAGIC_ENC)
{
if (!_cipher)
{
std::cout << "[Connection] Received encrypted command " << header.type << " but cipher is not initialised" << std::endl;
if (data)
free(data);
continue;
}
if (!_cipher->Decrypt(data, header.length))
{
std::cout << "[Connection] Can't decrypt command " << header.type << std::endl;
if (data)
free(data);
continue;
}
}
#ifdef PROTOCOL_DEBUG
printMessage(header.type, header.length, data, header.magic == MAGIC_ENC, false);
#endif
if (padding > 0)
std::fill(data + header.length, data + header.length + padding, 0);
onData(header.type, header.length, data);
libusb_handle_events_timeout_completed(_context, &timeout, nullptr);
}
}
+12 -18
View File
@@ -13,7 +13,9 @@
#include "struct/atomic_queue.h"
#include "struct/command.h"
#define READ_TIMEOUT 3000
#define READ_TIMEOUT 10000
#define MAX_USB_REQUESTS 64
#define COMMAND_QUEUE_SIZE 256
#define ENCRYPTION_BASE "SkBRDy3gmrw1ieH0"
#define PROTOCOL_DEBUG_NONE 0
@@ -22,21 +24,12 @@
#define PROTOCOL_DEBUG_OUT 3
#define PROTOCOL_DEBUG_ALL 4
#pragma pack(push, 1)
struct Header
{
uint32_t magic;
int32_t length;
uint32_t type;
uint32_t typecheck;
};
#pragma pack(pop)
class Connector : public ISender
{
public:
Connector(uint16_t videoPadding);
Connector();
virtual ~Connector();
void start();
@@ -44,7 +37,7 @@ public:
bool send(std::unique_ptr<Command> packet) override;
protected:
virtual void onData(uint32_t cmd, uint32_t length, uint8_t *data) = 0;
virtual void onData(uint8_t *data, uint32_t length) = 0;
virtual void onStatus(u_int8_t status) = 0;
virtual void onDevice(bool connected) = 0;
@@ -58,15 +51,16 @@ protected:
AESCipher *_cipher = nullptr;
private:
static void onUsbRead(libusb_transfer *transfer);
void readLoop();
void writeLoop();
void onDisconnect();
bool connect(uint16_t vendor_id, uint16_t product_id);
bool link();
void release();
void state(u_int8_t state);
bool nextState(u_int8_t state);
bool state(u_int8_t state);
bool linkFail(int status, const char *msg);
int write(int cmd, bool encrypt, uint8_t *data, uint32_t size);
@@ -74,6 +68,7 @@ private:
libusb_device_handle *_device = nullptr;
uint8_t _endpoint_in;
uint8_t _endpoint_out;
uint8_t _usbTransfers;
std::atomic<bool> _connected = false;
std::atomic<bool> _ecnrypt = false;
@@ -86,9 +81,8 @@ private:
std::thread _write_thread;
std::mutex _write_mutex;
std::atomic<bool> _active = false;
AtomicQueue<Command> _queue{256};
u_int16_t _videoPadding;
AtomicQueue<Command> _queue{COMMAND_QUEUE_SIZE};
libusb_transfer *_usbTransfer[MAX_USB_REQUESTS] = {};
};
#endif /* SRC_CONNECTOR */
+2 -1
View File
@@ -58,10 +58,11 @@
#define BTN_300 300
#define BTN_301 301
#define AUDIO_BUFFER_SIZE 2560
#define AUDIO_BUFFER_OFFSET 12
#define OFFSET_AUDIO_FORMAT 0
struct ProtocolCmdEntry
{
int cmd;
+1
View File
@@ -1,5 +1,6 @@
#include "pcm_audio.h"
#include "helper/functions.h"
#include "helper/protocol_const.h"
#include "settings.h"
// Add sample size (buffer size in samples) to ChannelConfig
+57 -38
View File
@@ -1,14 +1,14 @@
#include "protocol.h"
#include "helper/protocol_const.h"
#include "helper/functions.h"
#include "settings.h"
#include <cstring>
#include <iomanip>
#include <cctype>
#include <algorithm>
#include <ctime>
#include <iostream>
Protocol::Protocol(uint16_t width, uint16_t height, uint16_t fps, uint16_t padding)
: Connector(padding),
videoData(Settings::videoQueue),
Protocol::Protocol(uint16_t width, uint16_t height, uint16_t fps)
: videoData(Settings::videoQueue),
audioStreamMain(Settings::audioQueue),
audioStreamAux(Settings::audioQueue),
_recorder(Settings::audioQueue),
@@ -17,6 +17,7 @@ Protocol::Protocol(uint16_t width, uint16_t height, uint16_t fps, uint16_t paddi
_fps(fps),
_phoneConnected(false)
{
}
Protocol::~Protocol()
@@ -161,19 +162,52 @@ void Protocol::onControl(int cmd)
}
}
void Protocol::onData(uint32_t cmd, uint32_t length, uint8_t *data)
void Protocol::onData(uint8_t *data, uint32_t length)
{
bool dispose = true;
switch (cmd)
uint32_t offset = 0;
while (offset < length)
{
if (_message == nullptr)
_message = std::make_unique<Message>();
offset += _message->parse(data + offset, length - offset);
if (!_message->ready())
continue;
if (!_message->valid())
{
std::cout << "[Connection] Can't allocate message " << _message->type() << " with size " << _message->length() << std::endl;
_message = nullptr;
continue;
}
if (_message->encrypted() && !_message->decrypt(_cipher))
{
if (!_cipher)
std::cout << "[Connection] Received encrypted command " << _message->type() << " but cipher is not initialised" << std::endl;
else
std::cout << "[Connection] Can't decrypt command " << _message->type() << std::endl;
_message = nullptr;
continue;
}
#ifdef PROTOCOL_DEBUG
printMessage(_message->type(), _message->length(), _message->data(), _message->encrypted(), false);
#endif
dispatch(std::move(_message));
_message = nullptr;
}
}
void Protocol::dispatch(std::unique_ptr<Message> msg)
{
switch (msg->type())
{
case CMD_CONTROL:
if (length == 4)
{
int cmd = 0;
memcpy(&cmd, data, sizeof(int));
onControl(cmd);
}
if (msg->length() == 4)
onControl(msg->getInt(0));
break;
case CMD_PLUGGED:
@@ -186,40 +220,25 @@ void Protocol::onData(uint32_t cmd, uint32_t length, uint8_t *data)
case CMD_VIDEO_DATA:
{
if (length <= 20)
break;
videoData.pushDiscard(std::make_unique<Message>(data, length, 20));
dispose = false;
if(msg->setOffset(20))
videoData.pushDiscard(std::move(msg));
break;
}
case CMD_AUDIO_DATA:
{
if (length <= 16)
{
if (msg->length() <= 16)
break;
}
int channel = 0;
memcpy(&channel, data + 8, sizeof(int));
int channel = msg->getInt(8);
msg->setOffset(12);
if (channel == 1)
{
audioStreamMain.pushDiscard(std::make_unique<Message>(data, length, 12));
dispose = false;
break;
}
audioStreamMain.pushDiscard(std::move(msg));
if (channel == 2)
{
audioStreamAux.pushDiscard(std::make_unique<Message>(data, length, 12));
dispose = false;
break;
}
audioStreamAux.pushDiscard(std::move(msg));
break;
}
case CMD_ENCRYPTION:
if (length == 0)
if (msg->length() == 0)
setEncryption(true);
break;
}
if (dispose && length > 0 && data)
free(data);
}
+7 -7
View File
@@ -3,8 +3,6 @@
#include "struct/atomic_queue.h"
#include "struct/message.h"
#include "struct/multitouch.h"
#include "settings.h"
#include "connector.h"
#include "recorder.h"
@@ -12,7 +10,7 @@ class Protocol : public Connector
{
public:
Protocol(uint16_t width, uint16_t height, uint16_t fps, uint16_t padding);
Protocol(uint16_t width, uint16_t height, uint16_t fps);
~Protocol() override;
Protocol(const Protocol &) = delete;
@@ -30,11 +28,11 @@ private:
void onStatus(uint8_t status) override;
void onDevice(bool connected) override;
void onControl(int cmd);
void onData(uint32_t cmd, uint32_t length, uint8_t *data) override;
void onPhone(bool connected);
void onData(uint8_t *data, uint32_t length) override;
static const char *cmdString(int cmd);
void dispatch(std::unique_ptr<Message> msg);
void onControl(int cmd);
void onPhone(bool connected);
Recorder _recorder;
uint16_t _width;
@@ -44,6 +42,8 @@ private:
uint32_t _evtStatusId = (uint32_t)-1;
uint32_t _evtPhoneId = (uint32_t)-1;
std::unique_ptr<Message> _message;
};
#endif /* SRC_PROTOCOL */
+1 -1
View File
@@ -8,7 +8,7 @@ bool Settings::load(const std::string &filename)
std::ifstream file(filename);
if (!file.is_open())
{
std::cerr << "[Settings] Cannot open " << filename << "" << std::endl;
std::cerr << "[Settings] Cannot open file: " << filename << std::endl;
return false;
}
+8 -6
View File
@@ -42,14 +42,16 @@ public:
static inline Setting<int> forceRedraw{"force-redraw", 0};
static inline Setting<float> aspectCorrection{"aspect-correction", 1};
static inline Setting<std::string> renderDriver{"renderer-driver", ""};
static inline Setting<bool> alternativeRendering{"alternative-rendering", false};
static inline Setting<bool> alternativeRendering{"alternative-rendering", true};
static inline Setting<bool> fastScale{"fast-render-scale", false};
static inline Setting<int> videoQueue{"video-buffer-size", 32};
static inline Setting<int> audioQueue{"audio-buffer-size", 16};
static inline Setting<int> usbQueue{"async-usb-calls", 8};
static inline Setting<int> usbBufferSize{"usb-buffer-size", 131072};
static inline Setting<int> videoQueue{"video-buffer-size", 64};
static inline Setting<int> audioQueue{"audio-buffer-size", 64};
static inline Setting<int> audioDelay{"audio-buffer-wait", 2};
static inline Setting<int> audioDelayCall{"audio-buffer-wait-call", 8};
static inline Setting<int> audioDelayCall{"audio-buffer-wait-call", 6};
static inline Setting<float> audioFade{"audio-fade", 0.3};
static inline Setting<int> audioBuffer{"audio-buffer-samples", 2048};
static inline Setting<int> audioBuffer{"audio-buffer-samples", 512};
static inline Setting<std::string> audioDriver{"audio-driver", ""};
static inline Setting<std::string> onConnect{"on-connect-script", ""};
@@ -83,7 +85,7 @@ public:
// Debug section
static inline Setting<int> protocolDebug{"protocol-debug", 0};
static inline Setting<bool> codecLowDelay{"decode-low-delay", true};
static inline Setting<bool> codecFast{"decode-fast", false};
static inline Setting<bool> codecFast{"decode-fast", true};
static bool load(const std::string &filename);
static void print();
+81 -5
View File
@@ -1,17 +1,30 @@
#ifndef SRC_STRUCT_MESSAGE
#define SRC_STRUCT_MESSAGE
#include "libavcodec/defs.h"
#include "helper/protocol_const.h"
#include <cstdint>
#include <cstring>
#include <memory>
#include <cstdlib>
#define OFFSET_AUDIO_FORMAT 0
#include "aes_cipher.h"
#pragma pack(push, 1)
struct Header
{
uint32_t magic;
int32_t length;
uint32_t type;
uint32_t typecheck;
};
#pragma pack(pop)
class Message
{
public:
Message(uint8_t *data, uint32_t data_length, uint32_t offset) : _data(data), _length(data_length), _offset(offset)
Message() : _header({0, 0, 0, 0}), _data(nullptr), _offset(0), _headerLegth(0), _dataLength(0)
{
}
@@ -24,21 +37,84 @@ public:
}
}
uint32_t parse(uint8_t *data, uint32_t data_length)
{
uint32_t result = 0;
if (!hasHeader())
{
uint8_t copy = sizeof(Header) - _headerLegth;
if (copy > data_length)
copy = data_length;
memcpy(reinterpret_cast<uint8_t *>(&_header) + _headerLegth, data, copy);
_headerLegth += copy;
result += copy;
}
if (!hasHeader() || result >= data_length || _header.length <= 0)
return result;
if (_data == nullptr)
{
uint32_t padding = _header.type == CMD_VIDEO_DATA ? AV_INPUT_BUFFER_PADDING_SIZE : 0;
_data = (uint8_t *)malloc(_header.length + padding);
if (padding > 0 && _data)
std::fill(_data + _header.length, _data + _header.length + padding, 0);
}
uint32_t copy = _header.length - _dataLength;
if (copy > data_length - result)
copy = data_length - result;
if (_data)
memcpy(_data + _dataLength, data + result, copy);
_dataLength += copy;
result += copy;
return result;
}
int getInt(uint32_t offset) const
{
int result = 0;
if (_length - sizeof(int) >= offset)
if (_data && _dataLength - sizeof(int) >= offset)
memcpy(&result, _data + offset, sizeof(int));
return result;
}
bool ready() const { return _headerLegth == sizeof(Header) && (_header.length <= 0 || _dataLength >= _header.length); }
bool valid() const { return _header.length <= 0 || _data; }
bool setOffset(uint32_t offset)
{
if (offset >= _dataLength)
return false;
_offset = offset;
return true;
}
bool encrypted() const { return _header.magic == MAGIC_ENC; }
bool decrypt(AESCipher *cipher)
{
if (!cipher)
return false;
return cipher->Decrypt(_data, _dataLength);
}
uint8_t *data() const { return _data + _offset; }
uint32_t length() const { return _length - _offset; }
uint32_t length() const { return _dataLength - _offset; }
uint32_t type() const { return _header.type; }
private:
bool hasHeader() const { return _headerLegth == sizeof(Header); }
Header _header;
uint8_t *_data;
uint32_t _length;
uint32_t _offset;
u_int8_t _headerLegth;
uint32_t _dataLength;
};
#endif /* SRC_STRUCT_MESSAGE */