From f34396cf8e2e5b62d76633200ee46c77d6ead546 Mon Sep 17 00:00:00 2001 From: Niellune Date: Wed, 25 Mar 2026 15:55:50 +0200 Subject: [PATCH] Simplify USB reader, back to syncronous reading. --- src/decoder.cpp | 8 + src/pcm_audio.cpp | 8 + src/protocol/aes_cipher.cpp | 28 +-- src/protocol/aes_cipher.h | 15 +- src/protocol/connection.cpp | 356 ++++++++++++++++++++--------- src/protocol/connection.h | 42 ++-- src/protocol/connection_reader.cpp | 217 ------------------ src/protocol/connection_reader.h | 61 ----- src/protocol/imessage_sender.h | 15 -- src/protocol/message.h | 68 +++--- src/recorder.cpp | 2 +- src/recorder.h | 2 +- src/struct/usb_buffer.cpp | 153 ------------- src/struct/usb_buffer.h | 58 ----- 14 files changed, 351 insertions(+), 682 deletions(-) delete mode 100644 src/protocol/connection_reader.cpp delete mode 100644 src/protocol/connection_reader.h delete mode 100644 src/protocol/imessage_sender.h delete mode 100644 src/struct/usb_buffer.cpp delete mode 100644 src/struct/usb_buffer.h diff --git a/src/decoder.cpp b/src/decoder.cpp index 4c2b5a6..79ec6d2 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -172,6 +172,14 @@ void Decoder::loop(AVCodecContext *context, AVCodecParserContext *parser, AVPack { // Get raw data segment from queue std::unique_ptr segment = _data->pop(); + + char error[256]; + if (!segment->decrypt(error)) + { + log_w("Can't decrypt video segment > %s", error); + continue; + } + uint8_t *data_ptr = segment->data(); int data_size = segment->length(); diff --git a/src/pcm_audio.cpp b/src/pcm_audio.cpp index 2d65cec..c701270 100644 --- a/src/pcm_audio.cpp +++ b/src/pcm_audio.cpp @@ -153,6 +153,14 @@ void PcmAudio::play(SDL_AudioDeviceID device, ChannelConfig config, int32_t segm std::unique_ptr segment = _data->pop(); if (!segment) return; + + char error[256]; + if (!segment->decrypt(error)) + { + log_w("Can't decrypt audio segment > %s", error); + continue; + } + if (config != getConfig(segment.get())) return; diff --git a/src/protocol/aes_cipher.cpp b/src/protocol/aes_cipher.cpp index 50cfb92..08b7905 100644 --- a/src/protocol/aes_cipher.cpp +++ b/src/protocol/aes_cipher.cpp @@ -31,52 +31,52 @@ AESCipher::AESCipher(const std::string &baseKey) _initVec[12] = static_cast(_seed >> 24); } -Status AESCipher::Encrypt(uint8_t *data, uint32_t length) const +bool AESCipher::encrypt(uint8_t *data, uint32_t length, char *err) const { if (!data || length == 0) - return Status::Error("Empty data"); + return error(err, "Empty data"); auto ctx = std::unique_ptr(EVP_CIPHER_CTX_new(), EVP_CIPHER_CTX_free); if (!ctx) - return Status::Error("Failed to create cipher context"); + return error(err, "Failed to create cipher context"); if (EVP_EncryptInit_ex(ctx.get(), EVP_aes_128_cfb(), nullptr, _encKey.data(), _initVec.data()) != 1) - return Status::Error("Encryption initialization failed"); + return error(err, "Encryption initialization failed"); std::unique_ptr temp(new uint8_t[length + AES_BLOCK_SIZE]); int out_len = 0; if (EVP_EncryptUpdate(ctx.get(), temp.get(), &out_len, data, length) != 1) - return Status::Error("Encryption failed during update"); + return error(err, "Encryption failed during update"); int final_len = 0; if (EVP_EncryptFinal_ex(ctx.get(), temp.get() + out_len, &final_len) != 1) - return Status::Error("Encryption failed during final"); + return error(err, "Encryption failed during final"); std::copy_n(temp.get(), length, data); - return Status::Success(); + return true; } -Status AESCipher::Decrypt(uint8_t *data, uint32_t length) const +bool AESCipher::decrypt(uint8_t *data, uint32_t length, char *err) const { if (!data || length == 0) - return Status::Error("Empty data"); + return error(err, "Empty data"); auto ctx = std::unique_ptr(EVP_CIPHER_CTX_new(), EVP_CIPHER_CTX_free); if (!ctx) - return Status::Error("Failed to create cipher context"); + return error(err, "Failed to create cipher context"); if (EVP_DecryptInit_ex(ctx.get(), EVP_aes_128_cfb(), nullptr, _encKey.data(), _initVec.data()) != 1) - return Status::Error(" Decryption initialization failed"); + return error(err, "Decryption initialization failed"); std::unique_ptr temp(new uint8_t[length + AES_BLOCK_SIZE]); int out_len = 0; if (EVP_DecryptUpdate(ctx.get(), temp.get(), &out_len, data, length) != 1) - return Status::Error("Decryption failed during update"); + return error(err, "Decryption failed during update"); int final_len = 0; if (EVP_DecryptFinal_ex(ctx.get(), temp.get() + out_len, &final_len) != 1) - return Status::Error("Decryption failed during final"); + return error(err, "Decryption failed during final"); std::copy_n(temp.get(), length, data); - return Status::Success(); + return true; } diff --git a/src/protocol/aes_cipher.h b/src/protocol/aes_cipher.h index 9bc6218..4204194 100644 --- a/src/protocol/aes_cipher.h +++ b/src/protocol/aes_cipher.h @@ -16,11 +16,18 @@ public: AESCipher(const std::string &base_key); ~AESCipher() = default; - Status Encrypt(uint8_t *data, uint32_t length) const; - Status Decrypt(uint8_t *data, uint32_t length) const; + bool encrypt(uint8_t *data, uint32_t length, char *err) const; + bool decrypt(uint8_t *data, uint32_t length, char *err) const; - uint32_t Seed() const { return _seed; } - const std::string& Key() const { return _baseKey; } + uint32_t seed() const { return _seed; } + const std::string& key() const { return _baseKey; } + + static bool error(char* error, const char* message) + { + if(error) + strcpy(error, message); + return false; + } private: std::string _baseKey; diff --git a/src/protocol/connection.cpp b/src/protocol/connection.cpp index 3e06fc4..d5be0e2 100644 --- a/src/protocol/connection.cpp +++ b/src/protocol/connection.cpp @@ -4,6 +4,8 @@ #include #include +#include "libavcodec/defs.h" + #include "protocol/message.h" #include "common/logger.h" #include "protocol/protocol_const.h" @@ -12,19 +14,23 @@ Connection::Connection() : writeQueue(WRITE_QUEUE_SIZE), - videoStream(Settings::videoQueue), - audioStreamMain(Settings::audioQueue), - audioStreamAux(Settings::audioQueue), - _recorder(Settings::audioQueue), + videoStream(VIDEO_QUEUE_SIZE), + audioStreamMain(AUDIO_QUEUE_SIZE), + audioStreamAux(AUDIO_QUEUE_SIZE), + _processQueue(PROCESS_QUEUE_SIZE), + _statusHandler(nullptr), _cipher(nullptr), _context(nullptr), + _handler(nullptr), + _device(nullptr), + _endpointIn(0), _active(false), _connected(false), + _phoneConnected(false), _ecnrypt(false), _state(PROTOCOL_STATUS_INITIALISING), _failCount(0), - _nodeviceCount(0), - _statusHandler(nullptr) + _nodeviceCount(0) { int result = libusb_init(&_context); if (result < 0) @@ -76,7 +82,7 @@ void Connection::start(atomic *statusHandler) log_v("Starting"); _active = true; - _thread = std::thread(&Connection::mainLoop, this); + _writeThread = std::thread(&Connection::mainLoop, this); } void Connection::stop() @@ -87,11 +93,18 @@ void Connection::stop() log_v("Stopping"); _active = false; + _connected = false; writeQueue.notify(); state(PROTOCOL_STATUS_INITIALISING); - if (_thread.joinable()) - _thread.join(); + if (_processThread.joinable()) + _processThread.join(); + + if (_readThread.joinable()) + _readThread.join(); + + if (_writeThread.joinable()) + _writeThread.join(); _statusHandler = nullptr; } @@ -109,8 +122,8 @@ void Connection::mainLoop() libusb_device_handle *handler = libusb_open_device_with_vid_pid(_context, Settings::vendorid, Settings::productid); if (handler) { - uint8_t epIn = 0; - uint8_t epOut = 0; + uint8_t endpointIn = 0; + uint8_t endpointOut = 0; int retry = 0; libusb_device *device = nullptr; _ecnrypt = false; @@ -118,23 +131,15 @@ void Connection::mainLoop() while (!device && retry++ < LINK_RETRY) { - device = link(handler, &epIn, &epOut); + device = link(handler, &endpointIn, &endpointOut); writeQueue.waitFor(_active, LINK_RETRY_TIMEOUT); } if (device) { - _connected = true; - state(PROTOCOL_STATUS_ONLINE); - log_i("Device connected %d:%d speed: %d", libusb_get_bus_number(device), libusb_get_device_address(device), libusb_get_device_speed(device)); - _reader.start(_context, handler, epIn, this); - onDeviceConnect(); - - writeLoop(handler, epOut); - - _connected = false; + onDeviceConnect(handler, device, endpointIn); + writeLoop(handler, endpointOut); onDeviceDisconnect(); - _reader.stop(); } libusb_release_interface(handler, 0); @@ -145,9 +150,225 @@ void Connection::mainLoop() } } +void Connection::onDeviceConnect(libusb_device_handle *handler, libusb_device *device, uint8_t endpointIn) +{ + _connected = true; + _phoneConnected = false; + _handler = handler; + _device = device; + _endpointIn = endpointIn; + state(PROTOCOL_STATUS_ONLINE); + log_i("Device connected %d:%d speed: %d", libusb_get_bus_number(device), libusb_get_device_address(device), libusb_get_device_speed(device)); + writeQueue.clear(); + videoStream.clear(); + audioStreamMain.clear(); + audioStreamAux.clear(); + _processQueue.clear(); + _processThread = std::thread(&Connection::processLoop, this); + _readThread = std::thread(&Connection::readLoop, this); + sendInit(); +} + +void Connection::onDeviceDisconnect() +{ + log_i("Device disconnected"); + _connected = false; + _processQueue.notify(); + onPhoneDisconnect(); +} + +void Connection::onPhoneConnect() +{ + if (_phoneConnected) + return; + log_i("Phone connected"); + _phoneConnected = true; + + if (Settings::onConnect.value.length() > 1) + execute(Settings::onConnect.value.c_str()); +} + +void Connection::onPhoneDisconnect() +{ + if (!_phoneConnected) + return; + log_i("Phone disconnected"); + _phoneConnected = false; + + _recorder.stop(); + if (Settings::onDisconnect.value.length() > 1) + execute(Settings::onDisconnect.value.c_str()); +} + +void Connection::readLoop() +{ + setThreadName("usb-read"); + setThreadPriority(ThreadPriority::Realtime); + + log_d("USB reading thread started"); + + while (_connected) + { + int transferred = 0; + std::unique_ptr message = std::make_unique(_cipher); + + int result = libusb_bulk_transfer(_handler, _endpointIn, message->header(), message->headerSize(), &transferred, READ_TIMEOUT); + + if (result == LIBUSB_ERROR_NO_DEVICE) + { + _connected = false; + break; + } + + if (result != LIBUSB_SUCCESS || transferred != message->headerSize()) + { + if (result != LIBUSB_ERROR_TIMEOUT) + log_w("Header read failed > transfered %d / %d status %d", transferred, message->headerSize(), result); + continue; + } + + if (message->invalidMagic()) + { + log_w("Header read failed > invalid magic"); + continue; + } + + if (message->invalidChecksum()) + { + log_w("Header read failed > invalid checksum"); + continue; + } + + if (message->invalidLength()) + { + log_w("Header read failed > invalid length"); + continue; + } + + if (message->length() > 0) + { + uint32_t padding = message->type() == CMD_VIDEO_DATA ? AV_INPUT_BUFFER_PADDING_SIZE : 0; + uint8_t *buff = message->allocate(padding); + if (!buff) + { + log_w("Message read failed > can't allocate memory %d", message->length() + padding); + continue; + } + result = libusb_bulk_transfer(_handler, _endpointIn, buff, message->length(), &transferred, READ_TIMEOUT); + + if (result == LIBUSB_ERROR_NO_DEVICE) + { + _connected = false; + break; + } + + if (result != LIBUSB_SUCCESS || transferred != message->length()) + { + log_w("Message data read failed > transfered %d / %d status %d", transferred, message->length(), result); + continue; + } + } + + if (message->type() == CMD_VIDEO_DATA && message->setOffset(20)) + { + if (!videoStream.pushDiscard(std::move(message))) + log_w("Discard message > video queue is full"); + continue; + } + + if (message->type() == CMD_AUDIO_DATA && message->length() > 16) + { + int channel = message->getInt(8); + message->setOffset(12); + if (channel == 1) + { + if (!audioStreamMain.pushDiscard(std::move(message))) + log_w("Discard message > main audio queue is full"); + continue; + } + if (channel == 2) + { + if (!audioStreamAux.pushDiscard(std::move(message))) + log_w("Discard message > aux audio queue is full"); + continue; + } + } + + if (!_processQueue.pushDiscard(std::move(message))) + log_w("Discard message > process queue is full"); + } + + log_v("USB reading thread stopped"); +} + +void Connection::processLoop() +{ + setThreadName("usb-process"); + log_d("USB processing thread started"); + + while (_connected) + { + if (!_processQueue.wait(_connected)) + continue; + ; + + std::unique_ptr message = _processQueue.pop(); + if (!message) + continue; + + char error[256]; + if (!message->decrypt(error)) + { + log_w("Can't decrypt message %d > %s", message->type(), error); + continue; + } + + switch (message->type()) + { + + case CMD_CONTROL: + if (message->length() == 4) + { + switch (message->getInt(0)) + { + case 1: + _recorder.start(&writeQueue); + break; + + case 2: + _recorder.stop(); + break; + } + } + break; + + case CMD_PLUGGED: + { + state(PROTOCOL_STATUS_CONNECTED); + onPhoneConnect(); + break; + } + + case CMD_UNPLUGGED: + { + state(PROTOCOL_STATUS_ONLINE); + onPhoneDisconnect(); + break; + } + + case CMD_ENCRYPTION: + if (message->length() == 0) + setEncryption(true); + break; + } + } + + log_v("USB processing thread stopped"); +} + void Connection::writeLoop(libusb_device_handle *handler, uint8_t ep) { - while (_active && _reader.active()) + while (_connected) { std::unique_ptr message = writeQueue.pop(); if (!message) @@ -160,7 +381,7 @@ void Connection::writeLoop(libusb_device_handle *handler, uint8_t ep) if (!message) message = Message::HeartBeat(); - if (!_active || !_reader.active()) + if (!_connected) break; if (!message->allocated()) @@ -173,10 +394,10 @@ void Connection::writeLoop(libusb_device_handle *handler, uint8_t ep) if (_ecnrypt) { - Status s = message->encrypt(_cipher); - if (s.failed()) + char error[256]; + if (!message->encrypt(_cipher, error)) { - log_w("Message encryption failed > %s", s.error()); + log_w("Message encryption failed > %s", error); continue; } } @@ -286,7 +507,7 @@ bool Connection::state(u_int8_t state) return false; } -void Connection::onDeviceConnect() +void Connection::sendInit() { int syncTime = std::time(nullptr); int drivePosition = Settings::leftDrive ? 0 : 1; // 0==left, 1==right @@ -329,7 +550,7 @@ void Connection::onDeviceConnect() if (Settings::encryption) { if (_cipher) - send(Message::Encryption(_cipher->Seed())); + send(Message::Encryption(_cipher->seed())); else log_w("Can't request encryption > Cypher is not initalised"); } @@ -358,80 +579,3 @@ void Connection::onDeviceConnect() if (Settings::autoconnect) send(Message::Control(1002)); } - -void Connection::onDeviceDisconnect() -{ - _recorder.stop(); - if (Settings::onDisconnect.value.length() > 1) - execute(Settings::onDisconnect.value.c_str()); -} - -void Connection::onMessage(std::unique_ptr message) -{ - Status s = message->decrypt(_cipher); - if (s.failed()) - { - log_w("Can't decrypt message %d > %s", message->type(), s.error()); - return; - } - - switch (message->type()) - { - - case CMD_CONTROL: - if (message->length() == 4) - { - switch (message->getInt(0)) - { - case 1: - _recorder.start(&writeQueue); - break; - - case 2: - _recorder.stop(); - break; - } - } - break; - - case CMD_PLUGGED: - { - state(PROTOCOL_STATUS_CONNECTED); - if (Settings::onConnect.value.length() > 1) - execute(Settings::onConnect.value.c_str()); - break; - } - - case CMD_UNPLUGGED: - { - state(PROTOCOL_STATUS_ONLINE); - _recorder.stop(); - if (Settings::onDisconnect.value.length() > 1) - execute(Settings::onDisconnect.value.c_str()); - break; - } - - case CMD_VIDEO_DATA: - { - if (message->setOffset(20)) - videoStream.pushDiscard(std::move(message)); - break; - } - case CMD_AUDIO_DATA: - { - if (message->length() <= 16) - break; - int channel = message->getInt(8); - message->setOffset(12); - if (channel == 1) - audioStreamMain.pushDiscard(std::move(message)); - if (channel == 2) - audioStreamAux.pushDiscard(std::move(message)); - break; - } - case CMD_ENCRYPTION: - if (message->length() == 0) - setEncryption(true); - break; - } -} diff --git a/src/protocol/connection.h b/src/protocol/connection.h index 6eaa4f1..c72894f 100644 --- a/src/protocol/connection.h +++ b/src/protocol/connection.h @@ -1,5 +1,5 @@ -#ifndef SRC_CONNECTOR -#define SRC_CONNECTOR +#ifndef SRC_PROTOCOL_CONNECTION +#define SRC_PROTOCOL_CONNECTION #include @@ -8,22 +8,25 @@ #include #include -#include "protocol/imessage_sender.h" #include "struct/atomic_queue.h" -#include "struct/usb_buffer.h" #include "protocol/aes_cipher.h" -#include "protocol/connection_reader.h" #include "recorder.h" #define LINK_RETRY 5 #define LINK_RETRY_TIMEOUT 100 #define RECONNECT_TIMEOUT 100 #define PROTOCOL_HEARTBEAT_DELAY 3000 +#define READ_TIMEOUT 1000 + +#define WRITE_QUEUE_SIZE 128 +#define VIDEO_QUEUE_SIZE 128 +#define AUDIO_QUEUE_SIZE 128 +#define PROCESS_QUEUE_SIZE 128 + -#define WRITE_QUEUE_SIZE 256 #define ENCRYPTION_BASE "SkBRDy3gmrw1ieH0" -class Connection : public IMessageReceiver +class Connection { public: @@ -40,33 +43,42 @@ public: AtomicQueue audioStreamMain; AtomicQueue audioStreamAux; - virtual void onMessage(std::unique_ptr message) override; - private: void mainLoop(); + void readLoop(); + void processLoop(); void writeLoop(libusb_device_handle *handler, uint8_t ep); libusb_device *link(libusb_device_handle *handler, uint8_t *epIn, uint8_t *epOut); void setEncryption(bool enabled); bool fail(int status, const char *msg); bool state(u_int8_t state); - void onDeviceConnect(); + void sendInit(); + void onDeviceConnect(libusb_device_handle *handler, libusb_device *device, uint8_t endpointIn); void onDeviceDisconnect(); + void onPhoneConnect(); + void onPhoneDisconnect(); + + std::thread _writeThread; + std::thread _readThread; + std::thread _processThread; Recorder _recorder; + AtomicQueue _processQueue; + atomic *_statusHandler; AESCipher *_cipher; - ConnectionReader _reader; libusb_context *_context; - std::thread _thread; + libusb_device_handle *_handler; + libusb_device *_device; + uint8_t _endpointIn; std::atomic _active; std::atomic _connected; + std::atomic _phoneConnected; std::atomic _ecnrypt; uint8_t _state; uint8_t _failCount; uint8_t _nodeviceCount; - - atomic *_statusHandler; }; -#endif /* SRC_CONNECTOR */ +#endif /* SRC_PROTOCOL_CONNECTION */ diff --git a/src/protocol/connection_reader.cpp b/src/protocol/connection_reader.cpp deleted file mode 100644 index 1e33557..0000000 --- a/src/protocol/connection_reader.cpp +++ /dev/null @@ -1,217 +0,0 @@ -#include "protocol/connection_reader.h" - -#include -#include -#include -#include -#include -#include - -#include "settings.h" -#include "common/logger.h" -#include "common/threading.h" -#include "protocol/protocol_const.h" -#include "libavcodec/defs.h" -#include "common/functions.h" - -ConnectionReader::ConnectionReader() - : _active(false), - _buffer(Settings::usbBuffer, Settings::usbTransferSize), - _transfers(Settings::usbQueue), - _receiver(nullptr), - _usbContext(nullptr) -{ - log_v("Created"); -} - -ConnectionReader::~ConnectionReader() -{ - stop(); - - for (Context &context : _transfers) - { - if (context.transfer) - { - libusb_free_transfer(context.transfer); - context.transfer = nullptr; - } - } - log_v("Destroyed"); -} - -bool ConnectionReader::start(libusb_context *context, libusb_device_handle *device, uint8_t endpoint, IMessageReceiver *receiver) -{ - if (_active || !context || !device) - return false; - - _receiver = receiver; - _usbContext = context; - - log_i("Starting to read endpoint %d with %d requests", endpoint, _transfers.size()); - - // Prepare usb transfers - for (Context &context : _transfers) - { - context.owner = this; - if (!context.transfer) - { - context.transfer = libusb_alloc_transfer(0); - if (context.transfer == nullptr) - { - log_e("Can't allocate usb transfer"); - return false; - } - } - } - - // Start processing thread - _buffer.reset(); - _active = true; - _processThread = std::thread(&ConnectionReader::processLoop, this); - _readThread = std::thread(&ConnectionReader::readLoop, this); - - // Start usb reading thread - for (Context &context : _transfers) - { - context.slot = _buffer.get(); - if (context.slot == nullptr) - { - log_e("Can't allocate data slot for usb transfer"); - return false; - } - context.owner = this; - libusb_fill_bulk_transfer(context.transfer, device, endpoint, context.slot->data, context.slot->size, ConnectionReader::onUsbRead, &context, 0); - int status = libusb_submit_transfer(context.transfer); - if (status != LIBUSB_SUCCESS) - { - log_w("USB transfer submit failed with code %d", status); - return false; - } - } - - return true; -} - -void ConnectionReader::stop() -{ - log_v("Stopping"); - - _active = false; - - if (_usbContext) - { - for (Context &context : _transfers) - { - if (context.transfer) - libusb_cancel_transfer(context.transfer); - } - - timeval timeout{0, 100000}; - libusb_handle_events_timeout_completed(_usbContext, &timeout, nullptr); - - log_v("Events canceled"); - } - - _buffer.notify(); - - if (_readThread.joinable()) - _readThread.join(); - - if (_processThread.joinable()) - _processThread.join(); - - _usbContext = nullptr; - - log_v("Threads stopped"); -} - -void ConnectionReader::onUsbRead(libusb_transfer *transfer) -{ - if (!transfer || !transfer->user_data) - return; - - Context *c = static_cast(transfer->user_data); - if (!c->owner->_active) - return; - - log_p("Read %d [%d]: %s", transfer->actual_length, transfer->status, bytes(transfer->buffer, transfer->actual_length, 40).c_str()); - - if (transfer->status == LIBUSB_TRANSFER_CANCELLED) - return; - - if (transfer->status != LIBUSB_TRANSFER_COMPLETED) - { - log_w("USB read failed with status %d", transfer->status); - c->owner->_active = false; - return; - } - - c->slot->commit(transfer->actual_length); - - c->slot = c->owner->_buffer.get(); - if (!c->slot) - { - log_e("Can't allocate data slot for next usb transfer"); - c->owner->_active = false; - return; - } - c->transfer->buffer = c->slot->data; - - if (!c->owner->_active) - return; - - int status = libusb_submit_transfer(c->transfer); - if (status != LIBUSB_SUCCESS) - { - log_w("USB transfer re-submit failed with status %d", status); - c->owner->_active = false; - } -} - -void ConnectionReader::readLoop() -{ - setThreadName("usb-read"); - setThreadPriority(ThreadPriority::Realtime); - timeval timeout{0, 100000}; - - log_d("USB reading thread started"); - - while (_active) - { - libusb_handle_events_timeout_completed(_usbContext, &timeout, nullptr); - } - - log_v("USB reading thread stopped"); -} - -void ConnectionReader::processLoop() -{ - setThreadName("usb-process"); - log_d("USB processing thread started"); - - while (_active) - { - std::unique_ptr message = std::make_unique(); - - if (!_buffer.read(message->header(), message->headerSize(), _active)) - break; - - if (!message->valid()) - { - log_w("Mallformed message %s", message->toString(20).c_str()); - continue; - } - - if (message->length() >= 0) - { - uint8_t *buff = message->allocate(message->type() == CMD_VIDEO_DATA ? AV_INPUT_BUFFER_PADDING_SIZE : 0); - if (!_buffer.read(buff, message->length(), _active)) - break; - } - - if (_receiver && message->allocated()) - _receiver->onMessage(std::move(message)); - } - - log_v("USB processing thread stopped"); -} diff --git a/src/protocol/connection_reader.h b/src/protocol/connection_reader.h deleted file mode 100644 index 8258ac4..0000000 --- a/src/protocol/connection_reader.h +++ /dev/null @@ -1,61 +0,0 @@ -#ifndef SRC_PROTOCOL_CONNECTION_READER -#define SRC_PROTOCOL_CONNECTION_READER - -#include - -#include -#include -#include -#include -#include - -#include "struct/usb_buffer.h" -#include "protocol/aes_cipher.h" -#include "protocol/message.h" - -class IMessageReceiver -{ -public: - virtual ~IMessageReceiver() = default; - virtual void onMessage(std::unique_ptr message) = 0; -}; - -class ConnectionReader -{ -public: - ConnectionReader(); - ~ConnectionReader(); - - ConnectionReader(const ConnectionReader &) = delete; - ConnectionReader &operator=(const ConnectionReader &) = delete; - - bool start(libusb_context *context, libusb_device_handle *device, uint8_t endpoint, IMessageReceiver *receiver); - void stop(); - - int bufferCount() const { return _buffer.count(); } - bool active() const { return _active; } - -private: - struct Context - { - ConnectionReader *owner = nullptr; - DataSlot *slot = nullptr; - libusb_transfer *transfer = nullptr; - }; - - static void onUsbRead(libusb_transfer *transfer); - void readLoop(); - void processLoop(); - - void cancelTransfers(); - - std::atomic _active; - UsbBuffer _buffer; - std::vector _transfers; - IMessageReceiver *_receiver; - std::thread _readThread; - std::thread _processThread; - libusb_context *_usbContext; -}; - -#endif /* SRC_PROTOCOL_CONNECTION_READER */ diff --git a/src/protocol/imessage_sender.h b/src/protocol/imessage_sender.h deleted file mode 100644 index 617e882..0000000 --- a/src/protocol/imessage_sender.h +++ /dev/null @@ -1,15 +0,0 @@ -#ifndef SRC_HELPER_ISENDER -#define SRC_HELPER_ISENDER - -#include - -#include "protocol/message.h" - -class IMessageSender -{ -public: - virtual ~IMessageSender() = default; - virtual bool send(std::unique_ptr packet) = 0; -}; - -#endif /* SRC_HELPER_ISENDER */ diff --git a/src/protocol/message.h b/src/protocol/message.h index 50de96b..4d9588a 100644 --- a/src/protocol/message.h +++ b/src/protocol/message.h @@ -26,8 +26,8 @@ struct Header class Message { public: - Message() - : _header({0, 0, 0, 0}), _data(nullptr), _offset(0), _size(0), _encrypt(false) + Message(AESCipher *cipher) + : _header({0, 0, 0, 0}), _data(nullptr), _offset(0), _size(0), _cipher(cipher), _encrypt(false) { } @@ -99,51 +99,40 @@ public: return true; } - bool valid() const + bool encrypt(AESCipher *cipher, char *err = nullptr) { - if (_header.magic != MAGIC_ENC && _header.magic != MAGIC) - return false; - if (_header.typecheck != ~_header.type) - return false; - if (_header.length < 0 || _header.length > MESSAGE_MAX_PAYLOAD_SIZE) + if (!_encrypt || _header.magic == MAGIC_ENC) + return true; + + if (!cipher) + return AESCipher::error(err, "Cipher is not initialised"); + + if (!allocated()) + return AESCipher::error(err, "Message data is not allocated"); + + if (!cipher->encrypt(_data, _header.length, err)) return false; + + _header.magic = MAGIC_ENC; return true; } - Status encrypt(AESCipher *cipher) - { - if (!_encrypt) - return Status::Success(); - - if (_header.magic == MAGIC_ENC) - return Status::Success(); - - if (!cipher) - return Status::Error("Cipher is not initialised"); - - if (!allocated()) - return Status::Error("Message data is not allocated"); - - Status result = cipher->Encrypt(_data, _header.length); - - if (result.succeed()) - _header.magic = MAGIC_ENC; - - return result; - } - - Status decrypt(AESCipher *cipher) + bool decrypt(char *err = nullptr) { if (_header.magic != MAGIC_ENC) - return Status::Success(); + return true; - if (!cipher) - return Status::Error("Cipher is not initialised"); + if (!_cipher) + return AESCipher::error(err, "Cipher is not initialised"); if (!allocated()) - return Status::Error("Message data is not allocated"); + return AESCipher::error(err, "Message data is not allocated"); - return cipher->Decrypt(_data, _header.length); + if (!_cipher->decrypt(_data, _header.length, err)) + return false; + + _header.magic = MAGIC; + return true; } bool isMotion() const @@ -299,7 +288,11 @@ public: int32_t length() const { return _header.length - _offset; } uint8_t *data() const { return _data ? _data + _offset : nullptr; } - const std::string toString(int count) const + bool invalidMagic() const { return _header.magic != MAGIC_ENC && _header.magic != MAGIC; } + bool invalidChecksum() const { return _header.typecheck != ~_header.type; } + bool invalidLength() const { return _header.length < 0 || _header.length > MESSAGE_MAX_PAYLOAD_SIZE; } + + const std::string toString(int count) const { const char *cmds = "Unknown"; for (size_t i = 0; i < sizeof(protocolCmdList) / sizeof(protocolCmdList[0]); ++i) @@ -369,6 +362,7 @@ private: uint8_t *_data; uint32_t _offset; uint32_t _size; + AESCipher *_cipher; bool _encrypt; }; diff --git a/src/recorder.cpp b/src/recorder.cpp index e48fea7..1bebe48 100644 --- a/src/recorder.cpp +++ b/src/recorder.cpp @@ -9,7 +9,7 @@ #include "protocol/message.h" -Recorder::Recorder(uint16_t buffSize) +Recorder::Recorder() : _queue(nullptr), _active(false), _device(0) { } diff --git a/src/recorder.h b/src/recorder.h index 3869c4c..fb86b87 100644 --- a/src/recorder.h +++ b/src/recorder.h @@ -12,7 +12,7 @@ class Recorder { public: - Recorder(uint16_t buffSize); + Recorder(); ~Recorder(); void start(AtomicQueue *queue); diff --git a/src/struct/usb_buffer.cpp b/src/struct/usb_buffer.cpp deleted file mode 100644 index 7602ed2..0000000 --- a/src/struct/usb_buffer.cpp +++ /dev/null @@ -1,153 +0,0 @@ -#include "struct/usb_buffer.h" - -#include -#include -#include - -DataSlot::DataSlot() - : ready(false), offset(0), length(0), size(0), data(nullptr), _cv(nullptr) -{ -} - -DataSlot::~DataSlot() -{ - size = 0; - if (data) - { - free(data); - data = nullptr; - } -} - -void DataSlot::init(uint32_t slotSize, std::condition_variable *condition) -{ - ready.store(false); - offset = 0; - length = 0; - size = slotSize; - data = static_cast(malloc(size)); - _cv = condition; -} - -void DataSlot::reset() -{ - ready.store(false); - offset = 0; - length = 0; -} - -void DataSlot::commit(size_t dataSize) -{ - length = dataSize; - offset = 0; - ready.store(true); - - if (_cv) - _cv->notify_one(); -} - -bool DataSlot::consume(size_t dataSize) -{ - offset += dataSize; - if (offset < length) - return false; - ready.store(false); - return true; -} - -size_t DataSlot::remain() const -{ - return length > offset ? length - offset : 0; -} - -UsbBuffer::UsbBuffer(uint16_t slotCount, uint32_t slotSize) - : _slots(nullptr), _size(slotCount), _writeSlot(0), _readSlot(0) -{ - if (slotCount == 0 || slotSize == 0) - throw std::invalid_argument("Number of slots and slot size must be greater than 0"); - - _slots = new DataSlot[_size]; - - for (uint16_t i = 0; i < _size; i++) - { - _slots[i].init(slotSize, &_cv); - } -} - -UsbBuffer::~UsbBuffer() -{ - _cv.notify_all(); - if (_slots) - { - delete[] _slots; - } -} - -DataSlot *UsbBuffer::get() -{ - if (_slots[_writeSlot].ready.load()) - return nullptr; - DataSlot *slot = &(_slots[_writeSlot]); - _writeSlot++; - if (_writeSlot >= _size) - _writeSlot = 0; - return slot; -} - -bool UsbBuffer::read(uint8_t *dst, uint32_t length, std::atomic &active) -{ - if (length == 0) - return true; - - size_t done = 0; - while (length > 0) - { - while (!_slots[_readSlot].ready.load()) - { - std::unique_lock lock(_mutex); - _cv.wait(lock, [&]() - { return !active.load() || _slots[_readSlot].ready.load(); }); - if (!active.load()) - return false; - } - - size_t copy = _slots[_readSlot].remain(); - if (copy > length) - copy = length; - if (dst != nullptr) - std::memcpy(dst + done, _slots[_readSlot].data + _slots[_readSlot].offset, copy); - if (_slots[_readSlot].consume(copy)) - { - _readSlot++; - if (_readSlot >= _size) - _readSlot = 0; - } - done += copy; - length -= copy; - } - - return active.load(); -} - -void UsbBuffer::reset() -{ - _readSlot = 0; - _writeSlot = 0; - for (uint16_t i = 0; i < _size; i++) - { - _slots[i].reset(); - } -} - -void UsbBuffer::notify() -{ - _cv.notify_all(); -} - -int UsbBuffer::count() const -{ - int result = _writeSlot - _readSlot; - if (result < 0) - result += _size; - return result; -} diff --git a/src/struct/usb_buffer.h b/src/struct/usb_buffer.h deleted file mode 100644 index b50db93..0000000 --- a/src/struct/usb_buffer.h +++ /dev/null @@ -1,58 +0,0 @@ -#ifndef SRC_STRUCT_USB_BUFFER -#define SRC_STRUCT_USB_BUFFER - -#include -#include -#include -#include -#include - -class DataSlot -{ -public: - DataSlot(); - ~DataSlot(); - - void init(uint32_t slotSize, std::condition_variable *condition); - void reset(); - void commit(size_t dataSize); - bool consume(size_t dataSize); - size_t remain() const; - - std::atomic ready; - size_t offset; - size_t length; - size_t size; - uint8_t *data; - -private: - std::condition_variable *_cv; -}; - -class UsbBuffer -{ -public: - UsbBuffer(uint16_t slotCount, uint32_t slotSize); - ~UsbBuffer(); - - UsbBuffer(const UsbBuffer &) = delete; - UsbBuffer &operator=(const UsbBuffer &) = delete; - - DataSlot *get(); - bool read(uint8_t *dst, uint32_t length, std::atomic &active); - - void reset(); - void notify(); - int count() const; - -private: - std::mutex _mutex; - std::condition_variable _cv; - - DataSlot *_slots; - uint16_t _size; - uint16_t _writeSlot; - uint16_t _readSlot; -}; - -#endif /* SRC_STRUCT_USB_BUFFER */