Simplify USB reader, back to syncronous reading.

This commit is contained in:
Niellune
2026-03-25 15:55:50 +02:00
parent 45886d00db
commit f34396cf8e
14 changed files with 351 additions and 682 deletions
+8
View File
@@ -172,6 +172,14 @@ void Decoder::loop(AVCodecContext *context, AVCodecParserContext *parser, AVPack
{
// Get raw data segment from queue
std::unique_ptr<Message> segment = _data->pop();
char error[256];
if (!segment->decrypt(error))
{
log_w("Can't decrypt video segment > %s", error);
continue;
}
uint8_t *data_ptr = segment->data();
int data_size = segment->length();
+8
View File
@@ -153,6 +153,14 @@ void PcmAudio::play(SDL_AudioDeviceID device, ChannelConfig config, int32_t segm
std::unique_ptr<Message> segment = _data->pop();
if (!segment)
return;
char error[256];
if (!segment->decrypt(error))
{
log_w("Can't decrypt audio segment > %s", error);
continue;
}
if (config != getConfig(segment.get()))
return;
+14 -14
View File
@@ -31,52 +31,52 @@ AESCipher::AESCipher(const std::string &baseKey)
_initVec[12] = static_cast<uint8_t>(_seed >> 24);
}
Status AESCipher::Encrypt(uint8_t *data, uint32_t length) const
bool AESCipher::encrypt(uint8_t *data, uint32_t length, char *err) const
{
if (!data || length == 0)
return Status::Error("Empty data");
return error(err, "Empty data");
auto ctx = std::unique_ptr<EVP_CIPHER_CTX, decltype(&EVP_CIPHER_CTX_free)>(EVP_CIPHER_CTX_new(), EVP_CIPHER_CTX_free);
if (!ctx)
return Status::Error("Failed to create cipher context");
return error(err, "Failed to create cipher context");
if (EVP_EncryptInit_ex(ctx.get(), EVP_aes_128_cfb(), nullptr, _encKey.data(), _initVec.data()) != 1)
return Status::Error("Encryption initialization failed");
return error(err, "Encryption initialization failed");
std::unique_ptr<uint8_t[]> temp(new uint8_t[length + AES_BLOCK_SIZE]);
int out_len = 0;
if (EVP_EncryptUpdate(ctx.get(), temp.get(), &out_len, data, length) != 1)
return Status::Error("Encryption failed during update");
return error(err, "Encryption failed during update");
int final_len = 0;
if (EVP_EncryptFinal_ex(ctx.get(), temp.get() + out_len, &final_len) != 1)
return Status::Error("Encryption failed during final");
return error(err, "Encryption failed during final");
std::copy_n(temp.get(), length, data);
return Status::Success();
return true;
}
Status AESCipher::Decrypt(uint8_t *data, uint32_t length) const
bool AESCipher::decrypt(uint8_t *data, uint32_t length, char *err) const
{
if (!data || length == 0)
return Status::Error("Empty data");
return error(err, "Empty data");
auto ctx = std::unique_ptr<EVP_CIPHER_CTX, decltype(&EVP_CIPHER_CTX_free)>(EVP_CIPHER_CTX_new(), EVP_CIPHER_CTX_free);
if (!ctx)
return Status::Error("Failed to create cipher context");
return error(err, "Failed to create cipher context");
if (EVP_DecryptInit_ex(ctx.get(), EVP_aes_128_cfb(), nullptr, _encKey.data(), _initVec.data()) != 1)
return Status::Error(" Decryption initialization failed");
return error(err, "Decryption initialization failed");
std::unique_ptr<uint8_t[]> temp(new uint8_t[length + AES_BLOCK_SIZE]);
int out_len = 0;
if (EVP_DecryptUpdate(ctx.get(), temp.get(), &out_len, data, length) != 1)
return Status::Error("Decryption failed during update");
return error(err, "Decryption failed during update");
int final_len = 0;
if (EVP_DecryptFinal_ex(ctx.get(), temp.get() + out_len, &final_len) != 1)
return Status::Error("Decryption failed during final");
return error(err, "Decryption failed during final");
std::copy_n(temp.get(), length, data);
return Status::Success();
return true;
}
+11 -4
View File
@@ -16,11 +16,18 @@ public:
AESCipher(const std::string &base_key);
~AESCipher() = default;
Status Encrypt(uint8_t *data, uint32_t length) const;
Status Decrypt(uint8_t *data, uint32_t length) const;
bool encrypt(uint8_t *data, uint32_t length, char *err) const;
bool decrypt(uint8_t *data, uint32_t length, char *err) const;
uint32_t Seed() const { return _seed; }
const std::string& Key() const { return _baseKey; }
uint32_t seed() const { return _seed; }
const std::string& key() const { return _baseKey; }
static bool error(char* error, const char* message)
{
if(error)
strcpy(error, message);
return false;
}
private:
std::string _baseKey;
+250 -106
View File
@@ -4,6 +4,8 @@
#include <iostream>
#include <iomanip>
#include "libavcodec/defs.h"
#include "protocol/message.h"
#include "common/logger.h"
#include "protocol/protocol_const.h"
@@ -12,19 +14,23 @@
Connection::Connection()
: writeQueue(WRITE_QUEUE_SIZE),
videoStream(Settings::videoQueue),
audioStreamMain(Settings::audioQueue),
audioStreamAux(Settings::audioQueue),
_recorder(Settings::audioQueue),
videoStream(VIDEO_QUEUE_SIZE),
audioStreamMain(AUDIO_QUEUE_SIZE),
audioStreamAux(AUDIO_QUEUE_SIZE),
_processQueue(PROCESS_QUEUE_SIZE),
_statusHandler(nullptr),
_cipher(nullptr),
_context(nullptr),
_handler(nullptr),
_device(nullptr),
_endpointIn(0),
_active(false),
_connected(false),
_phoneConnected(false),
_ecnrypt(false),
_state(PROTOCOL_STATUS_INITIALISING),
_failCount(0),
_nodeviceCount(0),
_statusHandler(nullptr)
_nodeviceCount(0)
{
int result = libusb_init(&_context);
if (result < 0)
@@ -76,7 +82,7 @@ void Connection::start(atomic<int8_t> *statusHandler)
log_v("Starting");
_active = true;
_thread = std::thread(&Connection::mainLoop, this);
_writeThread = std::thread(&Connection::mainLoop, this);
}
void Connection::stop()
@@ -87,11 +93,18 @@ void Connection::stop()
log_v("Stopping");
_active = false;
_connected = false;
writeQueue.notify();
state(PROTOCOL_STATUS_INITIALISING);
if (_thread.joinable())
_thread.join();
if (_processThread.joinable())
_processThread.join();
if (_readThread.joinable())
_readThread.join();
if (_writeThread.joinable())
_writeThread.join();
_statusHandler = nullptr;
}
@@ -109,8 +122,8 @@ void Connection::mainLoop()
libusb_device_handle *handler = libusb_open_device_with_vid_pid(_context, Settings::vendorid, Settings::productid);
if (handler)
{
uint8_t epIn = 0;
uint8_t epOut = 0;
uint8_t endpointIn = 0;
uint8_t endpointOut = 0;
int retry = 0;
libusb_device *device = nullptr;
_ecnrypt = false;
@@ -118,23 +131,15 @@ void Connection::mainLoop()
while (!device && retry++ < LINK_RETRY)
{
device = link(handler, &epIn, &epOut);
device = link(handler, &endpointIn, &endpointOut);
writeQueue.waitFor(_active, LINK_RETRY_TIMEOUT);
}
if (device)
{
_connected = true;
state(PROTOCOL_STATUS_ONLINE);
log_i("Device connected %d:%d speed: %d", libusb_get_bus_number(device), libusb_get_device_address(device), libusb_get_device_speed(device));
_reader.start(_context, handler, epIn, this);
onDeviceConnect();
writeLoop(handler, epOut);
_connected = false;
onDeviceConnect(handler, device, endpointIn);
writeLoop(handler, endpointOut);
onDeviceDisconnect();
_reader.stop();
}
libusb_release_interface(handler, 0);
@@ -145,9 +150,225 @@ void Connection::mainLoop()
}
}
void Connection::onDeviceConnect(libusb_device_handle *handler, libusb_device *device, uint8_t endpointIn)
{
_connected = true;
_phoneConnected = false;
_handler = handler;
_device = device;
_endpointIn = endpointIn;
state(PROTOCOL_STATUS_ONLINE);
log_i("Device connected %d:%d speed: %d", libusb_get_bus_number(device), libusb_get_device_address(device), libusb_get_device_speed(device));
writeQueue.clear();
videoStream.clear();
audioStreamMain.clear();
audioStreamAux.clear();
_processQueue.clear();
_processThread = std::thread(&Connection::processLoop, this);
_readThread = std::thread(&Connection::readLoop, this);
sendInit();
}
void Connection::onDeviceDisconnect()
{
log_i("Device disconnected");
_connected = false;
_processQueue.notify();
onPhoneDisconnect();
}
void Connection::onPhoneConnect()
{
if (_phoneConnected)
return;
log_i("Phone connected");
_phoneConnected = true;
if (Settings::onConnect.value.length() > 1)
execute(Settings::onConnect.value.c_str());
}
void Connection::onPhoneDisconnect()
{
if (!_phoneConnected)
return;
log_i("Phone disconnected");
_phoneConnected = false;
_recorder.stop();
if (Settings::onDisconnect.value.length() > 1)
execute(Settings::onDisconnect.value.c_str());
}
void Connection::readLoop()
{
setThreadName("usb-read");
setThreadPriority(ThreadPriority::Realtime);
log_d("USB reading thread started");
while (_connected)
{
int transferred = 0;
std::unique_ptr<Message> message = std::make_unique<Message>(_cipher);
int result = libusb_bulk_transfer(_handler, _endpointIn, message->header(), message->headerSize(), &transferred, READ_TIMEOUT);
if (result == LIBUSB_ERROR_NO_DEVICE)
{
_connected = false;
break;
}
if (result != LIBUSB_SUCCESS || transferred != message->headerSize())
{
if (result != LIBUSB_ERROR_TIMEOUT)
log_w("Header read failed > transfered %d / %d status %d", transferred, message->headerSize(), result);
continue;
}
if (message->invalidMagic())
{
log_w("Header read failed > invalid magic");
continue;
}
if (message->invalidChecksum())
{
log_w("Header read failed > invalid checksum");
continue;
}
if (message->invalidLength())
{
log_w("Header read failed > invalid length");
continue;
}
if (message->length() > 0)
{
uint32_t padding = message->type() == CMD_VIDEO_DATA ? AV_INPUT_BUFFER_PADDING_SIZE : 0;
uint8_t *buff = message->allocate(padding);
if (!buff)
{
log_w("Message read failed > can't allocate memory %d", message->length() + padding);
continue;
}
result = libusb_bulk_transfer(_handler, _endpointIn, buff, message->length(), &transferred, READ_TIMEOUT);
if (result == LIBUSB_ERROR_NO_DEVICE)
{
_connected = false;
break;
}
if (result != LIBUSB_SUCCESS || transferred != message->length())
{
log_w("Message data read failed > transfered %d / %d status %d", transferred, message->length(), result);
continue;
}
}
if (message->type() == CMD_VIDEO_DATA && message->setOffset(20))
{
if (!videoStream.pushDiscard(std::move(message)))
log_w("Discard message > video queue is full");
continue;
}
if (message->type() == CMD_AUDIO_DATA && message->length() > 16)
{
int channel = message->getInt(8);
message->setOffset(12);
if (channel == 1)
{
if (!audioStreamMain.pushDiscard(std::move(message)))
log_w("Discard message > main audio queue is full");
continue;
}
if (channel == 2)
{
if (!audioStreamAux.pushDiscard(std::move(message)))
log_w("Discard message > aux audio queue is full");
continue;
}
}
if (!_processQueue.pushDiscard(std::move(message)))
log_w("Discard message > process queue is full");
}
log_v("USB reading thread stopped");
}
void Connection::processLoop()
{
setThreadName("usb-process");
log_d("USB processing thread started");
while (_connected)
{
if (!_processQueue.wait(_connected))
continue;
;
std::unique_ptr<Message> message = _processQueue.pop();
if (!message)
continue;
char error[256];
if (!message->decrypt(error))
{
log_w("Can't decrypt message %d > %s", message->type(), error);
continue;
}
switch (message->type())
{
case CMD_CONTROL:
if (message->length() == 4)
{
switch (message->getInt(0))
{
case 1:
_recorder.start(&writeQueue);
break;
case 2:
_recorder.stop();
break;
}
}
break;
case CMD_PLUGGED:
{
state(PROTOCOL_STATUS_CONNECTED);
onPhoneConnect();
break;
}
case CMD_UNPLUGGED:
{
state(PROTOCOL_STATUS_ONLINE);
onPhoneDisconnect();
break;
}
case CMD_ENCRYPTION:
if (message->length() == 0)
setEncryption(true);
break;
}
}
log_v("USB processing thread stopped");
}
void Connection::writeLoop(libusb_device_handle *handler, uint8_t ep)
{
while (_active && _reader.active())
while (_connected)
{
std::unique_ptr<Message> message = writeQueue.pop();
if (!message)
@@ -160,7 +381,7 @@ void Connection::writeLoop(libusb_device_handle *handler, uint8_t ep)
if (!message)
message = Message::HeartBeat();
if (!_active || !_reader.active())
if (!_connected)
break;
if (!message->allocated())
@@ -173,10 +394,10 @@ void Connection::writeLoop(libusb_device_handle *handler, uint8_t ep)
if (_ecnrypt)
{
Status s = message->encrypt(_cipher);
if (s.failed())
char error[256];
if (!message->encrypt(_cipher, error))
{
log_w("Message encryption failed > %s", s.error());
log_w("Message encryption failed > %s", error);
continue;
}
}
@@ -286,7 +507,7 @@ bool Connection::state(u_int8_t state)
return false;
}
void Connection::onDeviceConnect()
void Connection::sendInit()
{
int syncTime = std::time(nullptr);
int drivePosition = Settings::leftDrive ? 0 : 1; // 0==left, 1==right
@@ -329,7 +550,7 @@ void Connection::onDeviceConnect()
if (Settings::encryption)
{
if (_cipher)
send(Message::Encryption(_cipher->Seed()));
send(Message::Encryption(_cipher->seed()));
else
log_w("Can't request encryption > Cypher is not initalised");
}
@@ -358,80 +579,3 @@ void Connection::onDeviceConnect()
if (Settings::autoconnect)
send(Message::Control(1002));
}
void Connection::onDeviceDisconnect()
{
_recorder.stop();
if (Settings::onDisconnect.value.length() > 1)
execute(Settings::onDisconnect.value.c_str());
}
void Connection::onMessage(std::unique_ptr<Message> message)
{
Status s = message->decrypt(_cipher);
if (s.failed())
{
log_w("Can't decrypt message %d > %s", message->type(), s.error());
return;
}
switch (message->type())
{
case CMD_CONTROL:
if (message->length() == 4)
{
switch (message->getInt(0))
{
case 1:
_recorder.start(&writeQueue);
break;
case 2:
_recorder.stop();
break;
}
}
break;
case CMD_PLUGGED:
{
state(PROTOCOL_STATUS_CONNECTED);
if (Settings::onConnect.value.length() > 1)
execute(Settings::onConnect.value.c_str());
break;
}
case CMD_UNPLUGGED:
{
state(PROTOCOL_STATUS_ONLINE);
_recorder.stop();
if (Settings::onDisconnect.value.length() > 1)
execute(Settings::onDisconnect.value.c_str());
break;
}
case CMD_VIDEO_DATA:
{
if (message->setOffset(20))
videoStream.pushDiscard(std::move(message));
break;
}
case CMD_AUDIO_DATA:
{
if (message->length() <= 16)
break;
int channel = message->getInt(8);
message->setOffset(12);
if (channel == 1)
audioStreamMain.pushDiscard(std::move(message));
if (channel == 2)
audioStreamAux.pushDiscard(std::move(message));
break;
}
case CMD_ENCRYPTION:
if (message->length() == 0)
setEncryption(true);
break;
}
}
+27 -15
View File
@@ -1,5 +1,5 @@
#ifndef SRC_CONNECTOR
#define SRC_CONNECTOR
#ifndef SRC_PROTOCOL_CONNECTION
#define SRC_PROTOCOL_CONNECTION
#include <libusb-1.0/libusb.h>
@@ -8,22 +8,25 @@
#include <mutex>
#include <string>
#include "protocol/imessage_sender.h"
#include "struct/atomic_queue.h"
#include "struct/usb_buffer.h"
#include "protocol/aes_cipher.h"
#include "protocol/connection_reader.h"
#include "recorder.h"
#define LINK_RETRY 5
#define LINK_RETRY_TIMEOUT 100
#define RECONNECT_TIMEOUT 100
#define PROTOCOL_HEARTBEAT_DELAY 3000
#define READ_TIMEOUT 1000
#define WRITE_QUEUE_SIZE 128
#define VIDEO_QUEUE_SIZE 128
#define AUDIO_QUEUE_SIZE 128
#define PROCESS_QUEUE_SIZE 128
#define WRITE_QUEUE_SIZE 256
#define ENCRYPTION_BASE "SkBRDy3gmrw1ieH0"
class Connection : public IMessageReceiver
class Connection
{
public:
@@ -40,33 +43,42 @@ public:
AtomicQueue<Message> audioStreamMain;
AtomicQueue<Message> audioStreamAux;
virtual void onMessage(std::unique_ptr<Message> message) override;
private:
void mainLoop();
void readLoop();
void processLoop();
void writeLoop(libusb_device_handle *handler, uint8_t ep);
libusb_device *link(libusb_device_handle *handler, uint8_t *epIn, uint8_t *epOut);
void setEncryption(bool enabled);
bool fail(int status, const char *msg);
bool state(u_int8_t state);
void onDeviceConnect();
void sendInit();
void onDeviceConnect(libusb_device_handle *handler, libusb_device *device, uint8_t endpointIn);
void onDeviceDisconnect();
void onPhoneConnect();
void onPhoneDisconnect();
std::thread _writeThread;
std::thread _readThread;
std::thread _processThread;
Recorder _recorder;
AtomicQueue<Message> _processQueue;
atomic<int8_t> *_statusHandler;
AESCipher *_cipher;
ConnectionReader _reader;
libusb_context *_context;
std::thread _thread;
libusb_device_handle *_handler;
libusb_device *_device;
uint8_t _endpointIn;
std::atomic<bool> _active;
std::atomic<bool> _connected;
std::atomic<bool> _phoneConnected;
std::atomic<bool> _ecnrypt;
uint8_t _state;
uint8_t _failCount;
uint8_t _nodeviceCount;
atomic<int8_t> *_statusHandler;
};
#endif /* SRC_CONNECTOR */
#endif /* SRC_PROTOCOL_CONNECTION */
-217
View File
@@ -1,217 +0,0 @@
#include "protocol/connection_reader.h"
#include <algorithm>
#include <cstdlib>
#include <exception>
#include <iostream>
#include <stdexcept>
#include <utility>
#include "settings.h"
#include "common/logger.h"
#include "common/threading.h"
#include "protocol/protocol_const.h"
#include "libavcodec/defs.h"
#include "common/functions.h"
ConnectionReader::ConnectionReader()
: _active(false),
_buffer(Settings::usbBuffer, Settings::usbTransferSize),
_transfers(Settings::usbQueue),
_receiver(nullptr),
_usbContext(nullptr)
{
log_v("Created");
}
ConnectionReader::~ConnectionReader()
{
stop();
for (Context &context : _transfers)
{
if (context.transfer)
{
libusb_free_transfer(context.transfer);
context.transfer = nullptr;
}
}
log_v("Destroyed");
}
bool ConnectionReader::start(libusb_context *context, libusb_device_handle *device, uint8_t endpoint, IMessageReceiver *receiver)
{
if (_active || !context || !device)
return false;
_receiver = receiver;
_usbContext = context;
log_i("Starting to read endpoint %d with %d requests", endpoint, _transfers.size());
// Prepare usb transfers
for (Context &context : _transfers)
{
context.owner = this;
if (!context.transfer)
{
context.transfer = libusb_alloc_transfer(0);
if (context.transfer == nullptr)
{
log_e("Can't allocate usb transfer");
return false;
}
}
}
// Start processing thread
_buffer.reset();
_active = true;
_processThread = std::thread(&ConnectionReader::processLoop, this);
_readThread = std::thread(&ConnectionReader::readLoop, this);
// Start usb reading thread
for (Context &context : _transfers)
{
context.slot = _buffer.get();
if (context.slot == nullptr)
{
log_e("Can't allocate data slot for usb transfer");
return false;
}
context.owner = this;
libusb_fill_bulk_transfer(context.transfer, device, endpoint, context.slot->data, context.slot->size, ConnectionReader::onUsbRead, &context, 0);
int status = libusb_submit_transfer(context.transfer);
if (status != LIBUSB_SUCCESS)
{
log_w("USB transfer submit failed with code %d", status);
return false;
}
}
return true;
}
void ConnectionReader::stop()
{
log_v("Stopping");
_active = false;
if (_usbContext)
{
for (Context &context : _transfers)
{
if (context.transfer)
libusb_cancel_transfer(context.transfer);
}
timeval timeout{0, 100000};
libusb_handle_events_timeout_completed(_usbContext, &timeout, nullptr);
log_v("Events canceled");
}
_buffer.notify();
if (_readThread.joinable())
_readThread.join();
if (_processThread.joinable())
_processThread.join();
_usbContext = nullptr;
log_v("Threads stopped");
}
void ConnectionReader::onUsbRead(libusb_transfer *transfer)
{
if (!transfer || !transfer->user_data)
return;
Context *c = static_cast<Context *>(transfer->user_data);
if (!c->owner->_active)
return;
log_p("Read %d [%d]: %s", transfer->actual_length, transfer->status, bytes(transfer->buffer, transfer->actual_length, 40).c_str());
if (transfer->status == LIBUSB_TRANSFER_CANCELLED)
return;
if (transfer->status != LIBUSB_TRANSFER_COMPLETED)
{
log_w("USB read failed with status %d", transfer->status);
c->owner->_active = false;
return;
}
c->slot->commit(transfer->actual_length);
c->slot = c->owner->_buffer.get();
if (!c->slot)
{
log_e("Can't allocate data slot for next usb transfer");
c->owner->_active = false;
return;
}
c->transfer->buffer = c->slot->data;
if (!c->owner->_active)
return;
int status = libusb_submit_transfer(c->transfer);
if (status != LIBUSB_SUCCESS)
{
log_w("USB transfer re-submit failed with status %d", status);
c->owner->_active = false;
}
}
void ConnectionReader::readLoop()
{
setThreadName("usb-read");
setThreadPriority(ThreadPriority::Realtime);
timeval timeout{0, 100000};
log_d("USB reading thread started");
while (_active)
{
libusb_handle_events_timeout_completed(_usbContext, &timeout, nullptr);
}
log_v("USB reading thread stopped");
}
void ConnectionReader::processLoop()
{
setThreadName("usb-process");
log_d("USB processing thread started");
while (_active)
{
std::unique_ptr<Message> message = std::make_unique<Message>();
if (!_buffer.read(message->header(), message->headerSize(), _active))
break;
if (!message->valid())
{
log_w("Mallformed message %s", message->toString(20).c_str());
continue;
}
if (message->length() >= 0)
{
uint8_t *buff = message->allocate(message->type() == CMD_VIDEO_DATA ? AV_INPUT_BUFFER_PADDING_SIZE : 0);
if (!_buffer.read(buff, message->length(), _active))
break;
}
if (_receiver && message->allocated())
_receiver->onMessage(std::move(message));
}
log_v("USB processing thread stopped");
}
-61
View File
@@ -1,61 +0,0 @@
#ifndef SRC_PROTOCOL_CONNECTION_READER
#define SRC_PROTOCOL_CONNECTION_READER
#include <libusb-1.0/libusb.h>
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <thread>
#include <vector>
#include "struct/usb_buffer.h"
#include "protocol/aes_cipher.h"
#include "protocol/message.h"
class IMessageReceiver
{
public:
virtual ~IMessageReceiver() = default;
virtual void onMessage(std::unique_ptr<Message> message) = 0;
};
class ConnectionReader
{
public:
ConnectionReader();
~ConnectionReader();
ConnectionReader(const ConnectionReader &) = delete;
ConnectionReader &operator=(const ConnectionReader &) = delete;
bool start(libusb_context *context, libusb_device_handle *device, uint8_t endpoint, IMessageReceiver *receiver);
void stop();
int bufferCount() const { return _buffer.count(); }
bool active() const { return _active; }
private:
struct Context
{
ConnectionReader *owner = nullptr;
DataSlot *slot = nullptr;
libusb_transfer *transfer = nullptr;
};
static void onUsbRead(libusb_transfer *transfer);
void readLoop();
void processLoop();
void cancelTransfers();
std::atomic<bool> _active;
UsbBuffer _buffer;
std::vector<Context> _transfers;
IMessageReceiver *_receiver;
std::thread _readThread;
std::thread _processThread;
libusb_context *_usbContext;
};
#endif /* SRC_PROTOCOL_CONNECTION_READER */
-15
View File
@@ -1,15 +0,0 @@
#ifndef SRC_HELPER_ISENDER
#define SRC_HELPER_ISENDER
#include <memory>
#include "protocol/message.h"
class IMessageSender
{
public:
virtual ~IMessageSender() = default;
virtual bool send(std::unique_ptr<Message> packet) = 0;
};
#endif /* SRC_HELPER_ISENDER */
+31 -37
View File
@@ -26,8 +26,8 @@ struct Header
class Message
{
public:
Message()
: _header({0, 0, 0, 0}), _data(nullptr), _offset(0), _size(0), _encrypt(false)
Message(AESCipher *cipher)
: _header({0, 0, 0, 0}), _data(nullptr), _offset(0), _size(0), _cipher(cipher), _encrypt(false)
{
}
@@ -99,51 +99,40 @@ public:
return true;
}
bool valid() const
bool encrypt(AESCipher *cipher, char *err = nullptr)
{
if (_header.magic != MAGIC_ENC && _header.magic != MAGIC)
return false;
if (_header.typecheck != ~_header.type)
return false;
if (_header.length < 0 || _header.length > MESSAGE_MAX_PAYLOAD_SIZE)
if (!_encrypt || _header.magic == MAGIC_ENC)
return true;
if (!cipher)
return AESCipher::error(err, "Cipher is not initialised");
if (!allocated())
return AESCipher::error(err, "Message data is not allocated");
if (!cipher->encrypt(_data, _header.length, err))
return false;
_header.magic = MAGIC_ENC;
return true;
}
Status encrypt(AESCipher *cipher)
{
if (!_encrypt)
return Status::Success();
if (_header.magic == MAGIC_ENC)
return Status::Success();
if (!cipher)
return Status::Error("Cipher is not initialised");
if (!allocated())
return Status::Error("Message data is not allocated");
Status result = cipher->Encrypt(_data, _header.length);
if (result.succeed())
_header.magic = MAGIC_ENC;
return result;
}
Status decrypt(AESCipher *cipher)
bool decrypt(char *err = nullptr)
{
if (_header.magic != MAGIC_ENC)
return Status::Success();
return true;
if (!cipher)
return Status::Error("Cipher is not initialised");
if (!_cipher)
return AESCipher::error(err, "Cipher is not initialised");
if (!allocated())
return Status::Error("Message data is not allocated");
return AESCipher::error(err, "Message data is not allocated");
return cipher->Decrypt(_data, _header.length);
if (!_cipher->decrypt(_data, _header.length, err))
return false;
_header.magic = MAGIC;
return true;
}
bool isMotion() const
@@ -299,7 +288,11 @@ public:
int32_t length() const { return _header.length - _offset; }
uint8_t *data() const { return _data ? _data + _offset : nullptr; }
const std::string toString(int count) const
bool invalidMagic() const { return _header.magic != MAGIC_ENC && _header.magic != MAGIC; }
bool invalidChecksum() const { return _header.typecheck != ~_header.type; }
bool invalidLength() const { return _header.length < 0 || _header.length > MESSAGE_MAX_PAYLOAD_SIZE; }
const std::string toString(int count) const
{
const char *cmds = "Unknown";
for (size_t i = 0; i < sizeof(protocolCmdList) / sizeof(protocolCmdList[0]); ++i)
@@ -369,6 +362,7 @@ private:
uint8_t *_data;
uint32_t _offset;
uint32_t _size;
AESCipher *_cipher;
bool _encrypt;
};
+1 -1
View File
@@ -9,7 +9,7 @@
#include "protocol/message.h"
Recorder::Recorder(uint16_t buffSize)
Recorder::Recorder()
: _queue(nullptr), _active(false), _device(0)
{
}
+1 -1
View File
@@ -12,7 +12,7 @@
class Recorder
{
public:
Recorder(uint16_t buffSize);
Recorder();
~Recorder();
void start(AtomicQueue<Message> *queue);
-153
View File
@@ -1,153 +0,0 @@
#include "struct/usb_buffer.h"
#include <cstdlib>
#include <cstring>
#include <stdexcept>
DataSlot::DataSlot()
: ready(false), offset(0), length(0), size(0), data(nullptr), _cv(nullptr)
{
}
DataSlot::~DataSlot()
{
size = 0;
if (data)
{
free(data);
data = nullptr;
}
}
void DataSlot::init(uint32_t slotSize, std::condition_variable *condition)
{
ready.store(false);
offset = 0;
length = 0;
size = slotSize;
data = static_cast<uint8_t *>(malloc(size));
_cv = condition;
}
void DataSlot::reset()
{
ready.store(false);
offset = 0;
length = 0;
}
void DataSlot::commit(size_t dataSize)
{
length = dataSize;
offset = 0;
ready.store(true);
if (_cv)
_cv->notify_one();
}
bool DataSlot::consume(size_t dataSize)
{
offset += dataSize;
if (offset < length)
return false;
ready.store(false);
return true;
}
size_t DataSlot::remain() const
{
return length > offset ? length - offset : 0;
}
UsbBuffer::UsbBuffer(uint16_t slotCount, uint32_t slotSize)
: _slots(nullptr), _size(slotCount), _writeSlot(0), _readSlot(0)
{
if (slotCount == 0 || slotSize == 0)
throw std::invalid_argument("Number of slots and slot size must be greater than 0");
_slots = new DataSlot[_size];
for (uint16_t i = 0; i < _size; i++)
{
_slots[i].init(slotSize, &_cv);
}
}
UsbBuffer::~UsbBuffer()
{
_cv.notify_all();
if (_slots)
{
delete[] _slots;
}
}
DataSlot *UsbBuffer::get()
{
if (_slots[_writeSlot].ready.load())
return nullptr;
DataSlot *slot = &(_slots[_writeSlot]);
_writeSlot++;
if (_writeSlot >= _size)
_writeSlot = 0;
return slot;
}
bool UsbBuffer::read(uint8_t *dst, uint32_t length, std::atomic<bool> &active)
{
if (length == 0)
return true;
size_t done = 0;
while (length > 0)
{
while (!_slots[_readSlot].ready.load())
{
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait(lock, [&]()
{ return !active.load() || _slots[_readSlot].ready.load(); });
if (!active.load())
return false;
}
size_t copy = _slots[_readSlot].remain();
if (copy > length)
copy = length;
if (dst != nullptr)
std::memcpy(dst + done, _slots[_readSlot].data + _slots[_readSlot].offset, copy);
if (_slots[_readSlot].consume(copy))
{
_readSlot++;
if (_readSlot >= _size)
_readSlot = 0;
}
done += copy;
length -= copy;
}
return active.load();
}
void UsbBuffer::reset()
{
_readSlot = 0;
_writeSlot = 0;
for (uint16_t i = 0; i < _size; i++)
{
_slots[i].reset();
}
}
void UsbBuffer::notify()
{
_cv.notify_all();
}
int UsbBuffer::count() const
{
int result = _writeSlot - _readSlot;
if (result < 0)
result += _size;
return result;
}
-58
View File
@@ -1,58 +0,0 @@
#ifndef SRC_STRUCT_USB_BUFFER
#define SRC_STRUCT_USB_BUFFER
#include <cstddef>
#include <cstdint>
#include <atomic>
#include <mutex>
#include <condition_variable>
class DataSlot
{
public:
DataSlot();
~DataSlot();
void init(uint32_t slotSize, std::condition_variable *condition);
void reset();
void commit(size_t dataSize);
bool consume(size_t dataSize);
size_t remain() const;
std::atomic<bool> ready;
size_t offset;
size_t length;
size_t size;
uint8_t *data;
private:
std::condition_variable *_cv;
};
class UsbBuffer
{
public:
UsbBuffer(uint16_t slotCount, uint32_t slotSize);
~UsbBuffer();
UsbBuffer(const UsbBuffer &) = delete;
UsbBuffer &operator=(const UsbBuffer &) = delete;
DataSlot *get();
bool read(uint8_t *dst, uint32_t length, std::atomic<bool> &active);
void reset();
void notify();
int count() const;
private:
std::mutex _mutex;
std::condition_variable _cv;
DataSlot *_slots;
uint16_t _size;
uint16_t _writeSlot;
uint16_t _readSlot;
};
#endif /* SRC_STRUCT_USB_BUFFER */