Optimise queues and rendering

This commit is contained in:
Niellune
2025-05-20 23:16:06 +03:00
parent 7f3ea88b21
commit 64f709072f
23 changed files with 585 additions and 476 deletions
+105
View File
@@ -0,0 +1,105 @@
#ifndef SRC_STRUCT_ATOMIC_QUEUE
#define SRC_STRUCT_ATOMIC_QUEUE
#include <cstdint>
#include <atomic>
#include <memory>
#include <condition_variable>
#include <mutex>
using namespace std;
template <typename T>
class AtomicQueue
{
public:
AtomicQueue(uint16_t size)
: _size(size), _data(new unique_ptr<T>[size]), _first(0), _last(0), _count(0)
{
}
AtomicQueue(const AtomicQueue &) = delete;
AtomicQueue &operator=(const AtomicQueue &) = delete;
~AtomicQueue() = default;
bool pushDiscard(unique_ptr<T> obj)
{
if (_count == _size)
return false;
_first = (_first + 1) % _size;
_data[_first] = std::move(obj);
++_count;
_lock.notify_one();
return true;
}
bool pushReplace(unique_ptr<T> obj)
{
if (_count == _size)
{
_data[_first] = std::move(obj);
return false;
}
_first = (_first + 1) % _size;
_data[_first] = std::move(obj);
++_count;
_lock.notify_one();
return true;
}
unique_ptr<T> pop()
{
if (_count == 0)
return nullptr;
_last = (_last + 1) % _size;
auto item = std::move(_data[_last]);
--_count;
return item;
}
unique_ptr<T> wait(atomic<bool> &waitFlag)
{
unique_lock<std::mutex> lock(_mtx);
_lock.wait(lock, [&]
{ return _count > 0 || !waitFlag; });
if (!waitFlag)
return nullptr;
_last = (_last + 1) % _size;
auto item = std::move(_data[_last]);
--_count;
return item;
}
void clear()
{
_data = std::make_unique<std::unique_ptr<T>[]>(_size);
_first = 0;
_last = 0;
_count = 0;
}
void notify()
{
_lock.notify_all();
}
uint16_t count() { return _count; }
private:
uint16_t _size;
unique_ptr<unique_ptr<T>[]> _data;
uint16_t _first;
uint16_t _last;
atomic<uint16_t> _count;
mutex _mtx;
condition_variable _lock;
};
#endif /* SRC_STRUCT_ATOMIC_QUEUE */
+43
View File
@@ -0,0 +1,43 @@
#ifndef SRC_STRUCT_MESSAGE
#define SRC_STRUCT_MESSAGE
#include <cstdint>
#include <cstring>
#include <memory>
#define OFFSET_AUDIO_FORMAT 0
class Message
{
public:
Message(uint8_t *data, uint32_t data_length, uint32_t offset) : _data(data), _length(data_length), _offset(offset)
{
}
~Message()
{
if (_data)
{
free(_data);
_data = nullptr;
}
}
int getInt(uint32_t offset) const
{
int result = 0;
if (_length - sizeof(int) >= offset)
memcpy(&result, _data + offset, sizeof(int));
return result;
}
uint8_t *data() const { return _data + _offset; }
uint32_t length() const { return _length - _offset; }
private:
uint8_t *_data;
uint32_t _length;
uint32_t _offset;
};
#endif /* SRC_STRUCT_MESSAGE */
-69
View File
@@ -1,69 +0,0 @@
#include "raw_queue.h"
RawQueue::RawQueue(uint16_t capacity)
: _buffer(capacity), _head(0), _tail(0), _size(0), _capacity(capacity)
{
}
RawQueue::~RawQueue()
{
clear();
}
bool RawQueue::push(uint8_t *data, int offset, int size)
{
std::lock_guard<std::mutex> lock(_mutex);
if (_size == _buffer.size())
{
free(data);
return false; // queue full
}
_buffer[_tail] = RawEntry{data, offset, size};
_tail = (_tail + 1) % _capacity;
_size++;
_condition.notify_one();
return true;
}
RawEntry RawQueue::wait(const std::atomic<bool> &reading)
{
std::unique_lock<std::mutex> lock(_mutex);
_condition.wait(lock, [&]
{ return !reading.load() || _size > 0; });
if (!reading || _size == 0)
return RawEntry{nullptr, 0, 0};
RawEntry entry = _buffer[_head];
_head = (_head + 1) % _capacity;
_size--;
return entry;
}
void RawQueue::clear()
{
std::lock_guard<std::mutex> lock(_mutex);
// Free any remaining buffers
while (_size > 0)
{
RawEntry &e = _buffer[_head];
if (e.data)
{
free(e.data);
e.data = nullptr;
}
_head = (_head + 1) % _capacity;
_size--;
}
// Reset indices
_head = _tail = 0;
}
void RawQueue::notify()
{
_condition.notify_all();
}
-46
View File
@@ -1,46 +0,0 @@
#ifndef SRC_RAW_QUEUE
#define SRC_RAW_QUEUE
#include <cstdint>
#include <vector>
#include <atomic>
#include <mutex>
#include <condition_variable>
struct RawEntry
{
uint8_t *data;
int offset;
int size;
};
// Single entry: raw buffer pointer + metadata
class RawQueue
{
public:
RawQueue(uint16_t capacity = 256);
~RawQueue();
// Non-blocking push: returns false if full
bool push(uint8_t *data, int offset, int size);
// Blocks until an entry is available or reader_active == false
RawEntry wait(const std::atomic<bool> &reading);
// Clears the queue and frees any pending buffers
void clear();
// Unlock queus
void notify();
private:
std::vector<RawEntry> _buffer;
uint16_t _head;
uint16_t _tail;
uint16_t _size;
uint16_t _capacity;
std::mutex _mutex;
std::condition_variable _condition;
};
#endif
-111
View File
@@ -1,111 +0,0 @@
#include "video_buffer.h"
extern "C"
{
#include <libavutil/imgutils.h>
}
#include <stdexcept>
#include <string>
#include "error.h"
VideoBuffer::VideoBuffer()
: _width(0), _height(0)
{
}
// Allocate two YUV420P frames for double buffering and initialize to black
Error VideoBuffer::allocate(uint16_t width, uint16_t height)
{
_width = width;
_height = height;
deallocate();
reset();
Error e;
for (uint8_t i = 0; i < BUFFER_VIDEO_FRAMES; ++i)
{
// Allocate AVFrame
_frames[i] = av_frame_alloc();
if (e.null(_frames[i], "Failed to allocate AVFrame"))
break;
_frames[i]->format = AV_PIX_FMT_YUV420P;
_frames[i]->width = width;
_frames[i]->height = height;
// Allocate data buffer with 32 byte allingment
if (e.avFail(av_frame_get_buffer(_frames[i], 32), "Failed to allocate AVFrame buffer"))
break;
// Set Y plane to black (0)
memset(_frames[i]->data[0], 0, _frames[i]->linesize[0] * height);
// Set U plane to 128 (neutral)
memset(_frames[i]->data[1], 128, _frames[i]->linesize[1] * (height / 2));
// Set V plane to 128 (neutral)
memset(_frames[i]->data[2], 128, _frames[i]->linesize[2] * (height / 2));
}
return e;
}
VideoBuffer::~VideoBuffer()
{
deallocate();
}
void VideoBuffer::deallocate()
{
for (uint8_t i = 0; i < BUFFER_VIDEO_FRAMES; ++i)
{
if (_frames[i])
{
// Free the frame itself
av_frame_free(&_frames[i]);
// Clear
_frames[i] = nullptr;
}
}
}
bool VideoBuffer::getLatest(AVFrame **frame, uint32_t *id)
{
_reading.store(_latest.load());
int index = _reading.load();
if (index < 0)
return false;
*frame = _frames[index];
*id = _ids[index];
return true;
}
void VideoBuffer::consumeLatest()
{
_reading.store(-1);
}
const AVFrame *VideoBuffer::writeFrame(uint32_t id)
{
int index = _writing.load();
while (index == _reading.load() || index == _latest.load())
{
index = (index + 1) % BUFFER_VIDEO_FRAMES;
}
_writing.store(index);
_ids[index] = id;
return _frames[index];
}
void VideoBuffer::commitFrame()
{
_latest.store(_writing.load());
}
void VideoBuffer::reset()
{
_writing.store(0);
_reading.store(-1);
_latest.store(-1);
for (uint8_t i = 0; i < BUFFER_VIDEO_FRAMES; i++)
{
_ids[i] = 0;
}
}
+64 -20
View File
@@ -1,5 +1,5 @@
#ifndef SRC_VIDEO_BUFFER
#define SRC_VIDEO_BUFFER
#ifndef SRC_STRUCT_VIDEO_BUFFER
#define SRC_STRUCT_VIDEO_BUFFER
extern "C"
{
@@ -7,31 +7,75 @@ extern "C"
}
#include <atomic>
#include "helper/error.h"
#include <stdexcept>
#define BUFFER_VIDEO_FRAMES 3
class VideoBuffer
{
public:
VideoBuffer();
~VideoBuffer();
Error allocate(uint16_t width, uint16_t height);
uint16_t width() const { return _width; };
uint16_t height() const { return _height; };
void reset();
bool getLatest(AVFrame **frame, uint32_t *id);
void consumeLatest();
const AVFrame *writeFrame(uint32_t id);
void commitFrame();
VideoBuffer()
{
_writing.store(0);
_reading.store(-1);
_latest.store(-1);
for (uint8_t i = 0; i < BUFFER_VIDEO_FRAMES; ++i)
{
_ids[i] = 0;
_frames[i] = av_frame_alloc();
if (!_frames[i])
{
throw std::runtime_error("Failed to allocate AVFrame");
}
}
}
~VideoBuffer()
{
for (uint8_t i = 0; i < BUFFER_VIDEO_FRAMES; ++i)
{
if (_frames[i])
{
av_frame_free(&_frames[i]);
_frames[i] = nullptr;
}
}
}
bool latest(AVFrame **frame, uint32_t *id)
{
_reading.store(_latest.load());
int index = _reading.load();
if (index < 0)
return false;
*frame = _frames[index];
*id = _ids[index];
return true;
}
void consume()
{
_reading.store(-1);
}
AVFrame *write(uint32_t id)
{
int index = _writing.load();
while (index == _reading.load() || index == _latest.load())
{
index = (index + 1) % BUFFER_VIDEO_FRAMES;
}
_writing.store(index);
_ids[index] = id;
return _frames[index];
}
void commit()
{
_latest.store(_writing.load());
}
private:
void deallocate();
uint16_t _width;
uint16_t _height;
std::atomic<int8_t> _latest;
std::atomic<int8_t> _reading;
std::atomic<int8_t> _writing;
@@ -39,4 +83,4 @@ private:
uint32_t _ids[BUFFER_VIDEO_FRAMES];
};
#endif /* SRC_VIDEO_BUFFER */
#endif /* SRC_STRUCT_VIDEO_BUFFER */