commit ceba2958fa9b607f038b8f865cad70f0d8bccc42 Author: 0xFEEDC0DE64 Date: Wed Jul 16 20:25:01 2025 +0200 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a5d18fa --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +build/ +*.user* diff --git a/README.md b/README.md new file mode 100644 index 0000000..438ef6f --- /dev/null +++ b/README.md @@ -0,0 +1,16 @@ +# less_shitty_proxyjs implementation using redis and traefik + +Install the traefik proxyjs.yml file (using the file provider!) to register the default handler that does load balancing to all proxyjs instances +Install redis in traefik, to allow for dynamic route updates + +## Build and run + +``` +git clone https://code.brunner.ninja/feedc0de/less_shitty_proxyjs.git +cd less_shitty_proxyjs/ +mkdir build +cd build/ +qmake6 .. +make -j8 +parallel -j10 --linebuffer ./less_shitty_proxyjs 'test{}' '950{}' ::: {0..9} +``` diff --git a/client.cpp b/client.cpp new file mode 100644 index 0000000..384ae8d --- /dev/null +++ b/client.cpp @@ -0,0 +1,86 @@ +#include "client.h" + +#include + +#include "webserver.h" + +Client::Client(WebServer &server, std::unique_ptr &&socket, + const QString &serial, const std::set> &serialClients) : + QObject{&server}, + m_server{server}, + m_socket{std::move(socket)}, + m_serial{serial} +{ + qDebug() << "new ws connection!!" << m_socket->requestUrl(); + + m_socket->sendTextMessage(QString{"Hello from server %0, you requested serial %1, there are %2 clients connected"} + .arg(m_server.m_identity, serial).arg(serialClients.size())); + + for (auto &other : serialClients) + { + other->sendTextMessage(QString{"A new client connected, number of connected clients: %0"}.arg(serialClients.size() + 1)); + + connect(this, &Client::sendTextMessageToOthers, + other.get(), &Client::sendTextMessage); + connect(other.get(), &Client::sendTextMessageToOthers, + this, &Client::sendTextMessage); + } + + QObject::connect(m_socket.get(), &QWebSocket::disconnected, + this, &Client::socketDisconnected); + + QObject::connect(m_socket.get(), &QWebSocket::destroyed, + this, &Client::socketDestroyed); +} + +Client::~Client() = default; + +void Client::sendTextMessage(const QString &text) +{ + qDebug() << text; + if (m_socket) + m_socket->sendTextMessage(text); + else + qWarning() << "tried to send with invalid socket"; +} + +void Client::textMessageReceived(const QString &text) +{ + qDebug() << text; + emit sendTextMessageToOthers(text); +} + +void Client::socketDestroyed() +{ + qDebug() << "destroyed"; + m_socket.release(); + assert(!m_socket); +} + +void Client::socketDisconnected() +{ + qDebug() << "disconnected"; + + std::set> &serialClients = m_server.m_clients[m_serial]; + + { + auto iter = std::find_if(std::begin(serialClients), std::end(serialClients), [this](const auto &ptr){ + return ptr.get() == this; + }); + if (iter == std::end(serialClients)) + qWarning() << "couldnt fint ourself when trying to erase us"; + else + { + qDebug() << serialClients.size(); + serialClients.extract(iter).value().release()->deleteLater(); + qDebug() << serialClients.size(); + } + } + + if (serialClients.empty()) + m_server.destroyTraefikRoute(m_serial); + else for (auto &ptr : serialClients) + ptr->sendTextMessage(QString{"A client disconnected, number of remaining connected clients: %0"}.arg(serialClients.size())); + + qDebug() << "done with sending..."; +} diff --git a/client.h b/client.h new file mode 100644 index 0000000..695db8f --- /dev/null +++ b/client.h @@ -0,0 +1,35 @@ +#pragma once + +#include + +#include +#include + +class WebServer; +class QWebSocket; + +class Client : public QObject +{ + Q_OBJECT + +public: + explicit Client(WebServer &server, std::unique_ptr &&socket, + const QString &serial, const std::set> &serialClients); + ~Client() override; + +signals: + void sendTextMessageToOthers(const QString &text); + +public slots: + void sendTextMessage(const QString &text); + +private slots: + void textMessageReceived(const QString &text); + void socketDestroyed(); + void socketDisconnected(); + +private: + WebServer &m_server; + std::unique_ptr m_socket; + const QString m_serial; +}; diff --git a/client.html b/client.html new file mode 100644 index 0000000..171c792 --- /dev/null +++ b/client.html @@ -0,0 +1,29 @@ + + + + less shitty proxyjs + + + + + +
+

client.html

+

Server-Identity: {{identity}}

+

Request-Serial: {{serial}}

+

WS status: Disconnected

+ + +
+
+ + +
+
+ + + + + + + diff --git a/index.html b/index.html new file mode 100644 index 0000000..dae342b --- /dev/null +++ b/index.html @@ -0,0 +1,16 @@ + + + + less shitty proxyjs + + + + +
+

index.html

+

Server-Identity: {{identity}}

+
+ + + + diff --git a/less_shitty_proxyjs.pro b/less_shitty_proxyjs.pro new file mode 100644 index 0000000..35c5471 --- /dev/null +++ b/less_shitty_proxyjs.pro @@ -0,0 +1,28 @@ +QT = core network websockets httpserver + +CONFIG += c++latest cmdline + +DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000 + +SOURCES += \ + client.cpp \ + main.cpp \ + redisqtadapter.cpp \ + webserver.cpp + +RESOURCES += \ + resources.qrc + +OTHER_FILES += \ + client.html \ + index.html \ + script.js \ + style.css + +HEADERS += \ + client.h \ + redisqtadapter.h \ + webserver.h + +LIBS += \ + -L/usr/lib -lhiredis diff --git a/main.cpp b/main.cpp new file mode 100644 index 0000000..907eab1 --- /dev/null +++ b/main.cpp @@ -0,0 +1,85 @@ +#include +#include + +#include "webserver.h" + +#include +#include + +int main(int argc, char *argv[]) +{ + qSetMessagePattern("%{time dd.MM.yyyy HH:mm:ss.zzz} " + "[" + "%{if-debug}D%{endif}" + "%{if-info}I%{endif}" + "%{if-warning}W%{endif}" + "%{if-critical}C%{endif}" + "%{if-fatal}F%{endif}" + "] " + "%{function}(): " + "%{message}"); + + QCoreApplication app{argc, argv}; + QCoreApplication::setApplicationName("less_shitty_proxyjs"); + QCoreApplication::setApplicationVersion("1.0"); + + QString identity = "test"; + uint16_t port = 8000; + + { + QCommandLineParser parser; + parser.setApplicationDescription("Test helper"); + parser.addHelpOption(); + parser.addVersionOption(); + parser.addPositionalArgument("identiy", QCoreApplication::translate("main", "The name of this service instance")); + parser.addPositionalArgument("port", QCoreApplication::translate("main", "The port to listen on (will only listen on localhost).")); + parser.process(app); + + const auto &args = parser.positionalArguments(); + if (args.size() > 0) + identity = args.first(); + if (args.size() > 1) + { + bool ok; + port = args.at(1).toInt(&ok); + if (!ok) + { + qCritical("could not parse port: %s", qPrintable(args.at(1))); + return -1; + } + } + } + + qSetMessagePattern(QString{"%{time dd.MM.yyyy HH:mm:ss.zzz} %0 " + "[" + "%{if-debug}D%{endif}" + "%{if-info}I%{endif}" + "%{if-warning}W%{endif}" + "%{if-critical}C%{endif}" + "%{if-fatal}F%{endif}" + "] " + "%{function}(): " + "%{message}"}.arg(identity)); + + WebServer server{identity, QString{"http://localhost:%0"}.arg(port)}; + + QTcpServer tcpServer; + if (!tcpServer.listen(QHostAddress::LocalHost, port)) + { + qCritical("failed to start listening on port %hu: %s", port, qPrintable(tcpServer.errorString())); + return -1; + } + + if (!server.bind(&tcpServer)) + { + qCritical("failed to webserver on socket!"); + return -2; + } + + qDebug() << "server started"; + + auto result = app.exec(); + qDebug() << "bey bey!"; + return result; +} + diff --git a/proxyjs.yml b/proxyjs.yml new file mode 100644 index 0000000..7f8aaf7 --- /dev/null +++ b/proxyjs.yml @@ -0,0 +1,25 @@ +http: + routers: + proxyjs: + rule: "Host(`proxyjs.brunner.ninja`)" + service: proxyjs + entryPoints: + - websecure + tls: + certResolver: myresolver + priority: 10 + + services: + proxyjs: + loadBalancer: + servers: + - url: "http://localhost:9500" + - url: "http://localhost:9501" + - url: "http://localhost:9502" + - url: "http://localhost:9503" + - url: "http://localhost:9504" + - url: "http://localhost:9505" + - url: "http://localhost:9506" + - url: "http://localhost:9507" + - url: "http://localhost:9508" + - url: "http://localhost:9509" diff --git a/redisqtadapter.cpp b/redisqtadapter.cpp new file mode 100644 index 0000000..4343baa --- /dev/null +++ b/redisqtadapter.cpp @@ -0,0 +1,2 @@ +#include "redisqtadapter.h" + diff --git a/redisqtadapter.h b/redisqtadapter.h new file mode 100644 index 0000000..e7b4162 --- /dev/null +++ b/redisqtadapter.h @@ -0,0 +1,108 @@ +#pragma once + +#include +#include + +static void RedisQtAddRead(void *); +static void RedisQtDelRead(void *); +static void RedisQtAddWrite(void *); +static void RedisQtDelWrite(void *); +static void RedisQtCleanup(void *); + +class RedisQtAdapter : public QObject { + + Q_OBJECT + + friend + void RedisQtAddRead(void * adapter) { + RedisQtAdapter * a = static_cast(adapter); + a->addRead(); + } + + friend + void RedisQtDelRead(void * adapter) { + RedisQtAdapter * a = static_cast(adapter); + a->delRead(); + } + + friend + void RedisQtAddWrite(void * adapter) { + RedisQtAdapter * a = static_cast(adapter); + a->addWrite(); + } + + friend + void RedisQtDelWrite(void * adapter) { + RedisQtAdapter * a = static_cast(adapter); + a->delWrite(); + } + + friend + void RedisQtCleanup(void * adapter) { + RedisQtAdapter * a = static_cast(adapter); + a->cleanup(); + } + +public: + RedisQtAdapter(QObject * parent = 0) + : QObject(parent), m_ctx(0), m_read(0), m_write(0) { } + + ~RedisQtAdapter() { + if (m_ctx != 0) { + m_ctx->ev.data = NULL; + } + } + + int setContext(redisAsyncContext * ac) { + if (ac->ev.data != NULL) { + return REDIS_ERR; + } + m_ctx = ac; + m_ctx->ev.data = this; + m_ctx->ev.addRead = RedisQtAddRead; + m_ctx->ev.delRead = RedisQtDelRead; + m_ctx->ev.addWrite = RedisQtAddWrite; + m_ctx->ev.delWrite = RedisQtDelWrite; + m_ctx->ev.cleanup = RedisQtCleanup; + return REDIS_OK; + } + +private: + void addRead() { + if (m_read) return; + m_read = new QSocketNotifier(m_ctx->c.fd, QSocketNotifier::Read, 0); + connect(m_read, SIGNAL(activated(int)), this, SLOT(read())); + } + + void delRead() { + if (!m_read) return; + delete m_read; + m_read = 0; + } + + void addWrite() { + if (m_write) return; + m_write = new QSocketNotifier(m_ctx->c.fd, QSocketNotifier::Write, 0); + connect(m_write, SIGNAL(activated(int)), this, SLOT(write())); + } + + void delWrite() { + if (!m_write) return; + delete m_write; + m_write = 0; + } + + void cleanup() { + delRead(); + delWrite(); + } + +private slots: + void read() { redisAsyncHandleRead(m_ctx); } + void write() { redisAsyncHandleWrite(m_ctx); } + +private: + redisAsyncContext * m_ctx; + QSocketNotifier * m_read; + QSocketNotifier * m_write; +}; diff --git a/resources.qrc b/resources.qrc new file mode 100644 index 0000000..9cbe7a0 --- /dev/null +++ b/resources.qrc @@ -0,0 +1,8 @@ + + + client.html + index.html + script.js + style.css + + diff --git a/script.js b/script.js new file mode 100644 index 0000000..79dd784 --- /dev/null +++ b/script.js @@ -0,0 +1,63 @@ +jQuery(document).ready(function($){ + let ws = null; + + $('#reconnectButton').click(function(ev){ + ev.preventDefault(); + + if (ws) { + $('#reconnectButton').text('Connect'); + ws.close(); + ws = null; + } else { + $('#reconnectButton').text('Disconnect'); + setWsStatus('Connecting'); + ws = new WebSocket(window.location.href.replace(/^http/, "ws")); + ws.onopen = function(event) { + setWsStatus('Connected'); + $('#sendForm input').focus(); + } + ws.onclose = function(event) { + setWsStatus('Closed'); + $('#reconnectButton').text('Connect'); + ws.close(); + ws = null; + } + ws.onerror = function(event) { + setWsStatus('Error'); + } + ws.onmessage = function(event) { + if (typeof event.data == 'string') { + pushLine('Received ' + event.data); + } else if (typeof event.data == 'object') { + pushLine('Received binary message!'); + } else { + pushLine('Unknown msg data type ' + typeof event.data); + } + }; + } + }).click(); + + $('#sendForm').submit(function(ev){ + ev.preventDefault(); + + if (!ws) { + alert('please connect first'); + return; + } + + const input = $('#sendForm input'); + const val = input.val(); + pushLine('Sending ' + val); + ws.send(val); + input.val('').focus(); + }); +}); + +function pushLine(line) { + $('#logView').append($('

').text(new Date().toLocaleTimeString() + ': ' + line)); +} + +function setWsStatus(status) { + $('#wsStatus').text(status); + pushLine('Status changed to ' + status); +} diff --git a/style.css b/style.css new file mode 100644 index 0000000..6d969c4 --- /dev/null +++ b/style.css @@ -0,0 +1,6 @@ +#logView { + width: 100%; + min-height: 400px; + + background-color: yellow; +} diff --git a/webserver.cpp b/webserver.cpp new file mode 100644 index 0000000..45a9616 --- /dev/null +++ b/webserver.cpp @@ -0,0 +1,166 @@ +#include "webserver.h" + +#include +#include +#include + +#include "client.h" + +namespace { +QHttpServerResponse serveHtmlWithPlaceholders(const QString &filePath, const QMap &placeholders); +} // namespace + +WebServer::WebServer(const QString &identity, const QString &url, QObject *parent) : + QObject{parent}, + m_identity{identity} +{ + m_redis = redisAsyncConnect("localhost", 6379); + + if (m_redis->err) + qFatal("error with redis: %s", m_redis->errstr); + + m_redisAdapter.setContext(m_redis); + + redisAsyncCommand(m_redis, NULL, NULL, "SET %s %s", + QString{"traefik/http/services/proxyjs_%0/loadbalancer/servers/0/url"}.arg(m_identity).toUtf8().constData(), + url.toUtf8().constData()); + + m_server.route("/", [&]() { + return serveHtmlWithPlaceholders(":/lspjs/index.html", { + {"identity", identity} + }); + }); + + m_server.route("/", [&](int serial) { + return serveHtmlWithPlaceholders(":/lspjs/client.html", { + {"identity", identity}, + {"serial", QString::number(serial)} + }); + }); + + m_server.route("/script.js", []() { + return QHttpServerResponse::fromFile(":/lspjs/script.js"); + }); + + m_server.route("/style.css", []() { + return QHttpServerResponse::fromFile(":/lspjs/style.css"); + }); + + m_server.addWebSocketUpgradeVerifier(this, &WebServer::verifySocketUpgrade); + + QObject::connect(&m_server, &QHttpServer::newWebSocketConnection, + this, qOverload<>(&WebServer::newWebSocketConnection)); +} + +bool WebServer::bind(QTcpServer *server) +{ + return m_server.bind(server); +} + +WebServer::~WebServer() = default; + +void WebServer::newWebSocketConnection() +{ + while (auto socket = m_server.nextPendingWebSocketConnection()) + newWebSocketConnection(std::move(socket)); +} + +void WebServer::createTraefikRoute(const QString &serial) +{ + qDebug() << "create traefik route for" << serial << "to" << m_identity; + + redisAsyncCommand(m_redis, NULL, NULL, "SET %s %s", + QString{"traefik/http/routers/proxyjs_%0/rule"}.arg(serial).toUtf8().constData(), + QString{"Host(`proxyjs.brunner.ninja`) && Path(`/%0`)"}.arg(serial).toUtf8().constData()); + + redisAsyncCommand(m_redis, NULL, NULL, "SET %s %s", + QString{"traefik/http/routers/proxyjs_%0/entrypoints/0"}.arg(serial).toUtf8().constData(), + "websecure"); + + redisAsyncCommand(m_redis, NULL, NULL, "SET %s %s", + QString{"traefik/http/routers/proxyjs_%0/service"}.arg(serial).toUtf8().constData(), + QString{"proxyjs_%0"}.arg(m_identity).toUtf8().constData()); + + redisAsyncCommand(m_redis, NULL, NULL, "SET %s %s", + QString{"traefik/http/routers/proxyjs_%0/tls/certresolver"}.arg(serial).toUtf8().constData(), + "myresolver"); + + redisAsyncCommand(m_redis, NULL, NULL, "SET %s %s", + QString{"traefik/http/routers/proxyjs_%0/priority"}.arg(serial).toUtf8().constData(), + "15"); +} + +void WebServer::destroyTraefikRoute(const QString &serial) +{ + qDebug() << "destroy traefik route for" << serial << "to" << m_identity; + + redisAsyncCommand(m_redis, NULL, NULL, "DEL %s", + QString{"traefik/http/routers/proxyjs_%0/rule"}.arg(serial).toUtf8().constData()); + + redisAsyncCommand(m_redis, NULL, NULL, "DEL %s", + QString{"traefik/http/routers/proxyjs_%0/entrypoints/0"}.arg(serial).toUtf8().constData()); + + redisAsyncCommand(m_redis, NULL, NULL, "DEL %s", + QString{"traefik/http/routers/proxyjs_%0/service"}.arg(serial).toUtf8().constData()); + + redisAsyncCommand(m_redis, NULL, NULL, "DEL %s", + QString{"traefik/http/routers/proxyjs_%0/tls/certresolver"}.arg(serial).toUtf8().constData()); + + redisAsyncCommand(m_redis, NULL, NULL, "DEL %s", + QString{"traefik/http/routers/proxyjs_%0/priority"}.arg(serial).toUtf8().constData()); +} + +QHttpServerWebSocketUpgradeResponse WebServer::verifySocketUpgrade(const QHttpServerRequest &request) +{ + auto path = request.url().path(); + while (path.startsWith('/')) + path.removeFirst(); + bool ok; + path.toInt(&ok); + if (!ok) + return QHttpServerWebSocketUpgradeResponse::passToNext(); + + return QHttpServerWebSocketUpgradeResponse::accept(); +} + +void WebServer::newWebSocketConnection(std::unique_ptr &&socket) +{ + auto serial = socket->requestUrl().path(); + if (serial.startsWith('/')) + serial.removeFirst(); + + auto &set = m_clients[serial]; + if (set.empty()) + createTraefikRoute(serial); + + set.emplace(std::make_unique(*this, std::move(socket), serial, set)); +} + +namespace { +QHttpServerResponse serveHtmlWithPlaceholders(const QString &filePath, const QMap &placeholders) +{ + QString content; + + { + QFile file{filePath}; + if (!file.open(QIODevice::ReadOnly | QIODevice::Text)) + { + qWarning("could not open file %s because %s", qPrintable(filePath), qPrintable(file.errorString())); + return QHttpServerResponse{"Failed to open file", QHttpServerResponder::StatusCode::InternalServerError}; + } + + QTextStream in(&file); + content = in.readAll(); + } + + QString modified = content; + for (auto it = placeholders.begin(); it != placeholders.end(); ++it) + modified.replace("{{" + it.key() + "}}", it.value()); + + QMimeDatabase db; + QMimeType mime = db.mimeTypeForFile(filePath); + + QHttpServerResponse response(mime.name().toUtf8(), modified.toUtf8()); + return response; +} +} // namespace diff --git a/webserver.h b/webserver.h new file mode 100644 index 0000000..f264eec --- /dev/null +++ b/webserver.h @@ -0,0 +1,45 @@ +#pragma once + +#include +#include + +#include +#include +#include + +#include +#include "redisqtadapter.h" + +class QWebSocket; +class Client; + +class WebServer : public QObject +{ + Q_OBJECT + friend class Client; + +public: + explicit WebServer(const QString &identity, const QString &url, QObject *parent = nullptr); + ~WebServer() override; + + bool bind(QTcpServer *server); + +private slots: + void newWebSocketConnection(); + +private: + void createTraefikRoute(const QString &serial); + void destroyTraefikRoute(const QString &serial); + + QHttpServerWebSocketUpgradeResponse verifySocketUpgrade(const QHttpServerRequest &request); + void newWebSocketConnection(std::unique_ptr &&socket); + + QHttpServer m_server; + + QString m_identity; + + std::unordered_map>> m_clients; + + redisAsyncContext *m_redis; + RedisQtAdapter m_redisAdapter; +};