Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/base/Headers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,31 @@ inline bool waitOnSocketData(int fd) {
return FD_ISSET(fd, &fdset);
}

/**
* Wait on a fd to be writable (non-blocking check).
*
* @return true if the fd is writable, or false if it would block.
*/
inline bool waitOnSocketWritable(int fd, int64_t sec = 0, int64_t usec = 0) {
fd_set fdset;
FD_ZERO(&fdset);
FD_SET(fd, &fdset);
timeval tv;
tv.tv_sec = sec;
tv.tv_usec = usec;
VLOG(4) << "Before selecting sockFd for write";
const int selectResult = select(fd + 1, NULL, &fdset, NULL, &tv);
if (selectResult < 0) {
if (errno == EINTR) {
// Interrupted by the signal, the caller will retry.
return false;
} else {
FATAL_FAIL(selectResult);
}
}
return FD_ISSET(fd, &fdset);
}

inline string genRandomAlphaNum(int len) {
static const char alphanum[] =
"0123456789"
Expand Down
104 changes: 104 additions & 0 deletions src/base/WriteBuffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#ifndef __ET_WRITE_BUFFER__
#define __ET_WRITE_BUFFER__

#include "Headers.hpp"

namespace et {
/**
* @brief Bounded buffer for pending write data, enabling flow control.
*
* This buffer is used to queue outgoing data when the socket is not ready
* to accept writes. By limiting the buffer size, we create natural
* backpressure that propagates upstream when the consumer is slow.
*/
class WriteBuffer {
public:
/** @brief Maximum bytes to buffer before applying backpressure. */
static constexpr size_t MAX_BUFFER_SIZE = 256 * 1024; // 256KB

WriteBuffer() : totalBytes(0), writeOffset(0) {}

/**
* @brief Returns true if the buffer has room for more data.
* When false, the caller should stop reading from the source.
*/
bool canAcceptMore() const { return totalBytes < MAX_BUFFER_SIZE; }

/**
* @brief Returns true if there is data waiting to be written.
*/
bool hasPendingData() const { return !pending.empty(); }

/**
* @brief Returns the current amount of buffered data in bytes.
*/
size_t size() const { return totalBytes; }

/**
* @brief Adds data to the end of the buffer.
* @param data The data to enqueue.
*/
void enqueue(const string &data) {
if (data.empty()) return;
pending.push_back(data);
totalBytes += data.size();
}

/**
* @brief Returns a pointer to the next bytes to write and the count.
* @param count Output: number of bytes available for writing.
* @return Pointer to the data, or nullptr if buffer is empty.
*/
const char *peekData(size_t *count) const {
if (pending.empty()) {
*count = 0;
return nullptr;
}
const string &front = pending.front();
*count = front.size() - writeOffset;
return front.data() + writeOffset;
}

/**
* @brief Removes bytesWritten from the front of the buffer.
* @param bytesWritten Number of bytes successfully written to socket.
*/
void consume(size_t bytesWritten) {
if (bytesWritten == 0) return;

while (bytesWritten > 0 && !pending.empty()) {
string &front = pending.front();
size_t available = front.size() - writeOffset;

if (bytesWritten >= available) {
// Consumed the entire front chunk
bytesWritten -= available;
totalBytes -= available;
writeOffset = 0;
pending.pop_front();
} else {
// Partial consumption
writeOffset += bytesWritten;
totalBytes -= bytesWritten;
bytesWritten = 0;
}
}
}

/**
* @brief Clears all pending data.
*/
void clear() {
pending.clear();
totalBytes = 0;
writeOffset = 0;
}

private:
std::deque<string> pending;
size_t totalBytes;
size_t writeOffset; // Offset into the front chunk for partial writes
};
} // namespace et

#endif // __ET_WRITE_BUFFER__
35 changes: 31 additions & 4 deletions src/terminal/TerminalClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "TelemetryService.hpp"
#include "TunnelUtils.hpp"
#include "WriteBuffer.hpp"

namespace et {

Expand Down Expand Up @@ -173,6 +174,10 @@ void TerminalClient::run(const string& command, const bool noexit) {

TerminalInfo lastTerminalInfo;

// Flow control: buffer for pending console output
// This creates backpressure when the console is slow to consume data
WriteBuffer consoleOutputBuffer;

if (!console.get()) {
// NOTE: ../../scripts/ssh-et relies on the wording of this message, so if
// you change it please update it as well.
Expand Down Expand Up @@ -201,15 +206,36 @@ void TerminalClient::run(const string& command, const bool noexit) {
}
int clientFd = connection->getSocketFd();
if (clientFd > 0) {
FD_SET(clientFd, &rfd);
maxfd = max(maxfd, clientFd);
// Only read from server if console output buffer has room
// This creates backpressure when the console is slow
if (consoleOutputBuffer.canAcceptMore()) {
FD_SET(clientFd, &rfd);
maxfd = max(maxfd, clientFd);
}
}
// TODO: set port forward sockets as well for performance reasons.
tv.tv_sec = 0;
tv.tv_usec = 10000;
select(maxfd + 1, &rfd, NULL, NULL, &tv);

try {
// First, drain the console output buffer
// This should be done before reading more data
if (console && consoleOutputBuffer.hasPendingData()) {
// Drain as much as possible from the buffer
while (consoleOutputBuffer.hasPendingData()) {
size_t count;
const char* data = consoleOutputBuffer.peekData(&count);
if (data == nullptr || count == 0) break;

// Write to console (may block if console is slow)
string s(data, count);
VLOG(2) << "Draining buffered bytes to console: " << count;
console->write(s);
consoleOutputBuffer.consume(count);
}
}

if (console) {
// Check for data to send.
if (FD_ISSET(consoleFd, &rfd)) {
Expand Down Expand Up @@ -286,15 +312,16 @@ void TerminalClient::run(const string& command, const bool noexit) {
case et::TerminalPacketType::TERMINAL_BUFFER: {
if (console) {
VLOG(3) << "Got terminal buffer";
// Read from the server and write to our fake terminal
// Read from the server and buffer for later write to console
et::TerminalBuffer tb =
stringToProto<et::TerminalBuffer>(packet.getPayload());
const string& s = tb.buffer();
// VLOG(5) << "Got message: " << s;
// VLOG(1) << "Got byte: " << int(b) << " " << char(b) << " " <<
// connection->getReader()->getSequenceNumber();
keepaliveTime = time(NULL) + keepaliveDuration;
console->write(s);
// Buffer data for flow-controlled sending to console
consoleOutputBuffer.enqueue(s);
}
break;
}
Expand Down
111 changes: 92 additions & 19 deletions src/terminal/TerminalServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "TerminalServer.hpp"

#include "TelemetryService.hpp"
#include "WriteBuffer.hpp"

#define BUF_SIZE (16 * 1024)

Expand Down Expand Up @@ -124,6 +125,11 @@ void TerminalServer::runJumpHost(
terminalFd,
Packet(TerminalPacketType::JUMPHOST_INIT, protoToString(payload)));

// Flow control: buffer for pending jumphost output to client
std::deque<Packet> pendingPackets;
size_t pendingBytes = 0;
const size_t MAX_PENDING_BYTES = 256 * 1024; // 256KB limit

while (true) {
{
lock_guard<std::mutex> guard(terminalThreadMutex);
Expand All @@ -132,27 +138,55 @@ void TerminalServer::runJumpHost(
}
}

fd_set rfd;
fd_set rfd, wfd;
timeval tv;

FD_ZERO(&rfd);
FD_SET(terminalFd, &rfd);
FD_ZERO(&wfd);

// Only read from terminal if we have room in the buffer
if (pendingBytes < MAX_PENDING_BYTES) {
FD_SET(terminalFd, &rfd);
}

int maxfd = terminalFd;
int serverClientFd = serverClientState->getSocketFd();
if (serverClientFd > 0) {
FD_SET(serverClientFd, &rfd);
maxfd = max(maxfd, serverClientFd);

// Monitor write availability if we have pending packets
if (!pendingPackets.empty()) {
FD_SET(serverClientFd, &wfd);
}
}
tv.tv_sec = 0;
tv.tv_usec = 100000;
select(maxfd + 1, &rfd, NULL, NULL, &tv);
select(maxfd + 1, &rfd, &wfd, NULL, &tv);

try {
if (FD_ISSET(terminalFd, &rfd)) {
// First, drain pending packets when socket is writable
if (serverClientFd > 0 && FD_ISSET(serverClientFd, &wfd) &&
!pendingPackets.empty()) {
while (!pendingPackets.empty()) {
serverClientState->writePacket(pendingPackets.front());
pendingBytes -= pendingPackets.front().length();
pendingPackets.pop_front();

// Check if socket is still writable for more writes
if (!waitOnSocketWritable(serverClientFd)) {
break;
}
}
}

// Read from terminal if buffer has room
if (FD_ISSET(terminalFd, &rfd) && pendingBytes < MAX_PENDING_BYTES) {
try {
Packet packet;
if (terminalSocketHandler->readPacket(terminalFd, &packet)) {
serverClientState->writePacket(packet);
pendingPackets.push_back(packet);
pendingBytes += packet.length();
}
} catch (const std::runtime_error &ex) {
LOG(INFO) << "Terminal session ended" << ex.what();
Expand Down Expand Up @@ -266,6 +300,10 @@ void TerminalServer::runTerminal(
terminalFd,
Packet(TerminalPacketType::TERMINAL_INIT, protoToString(termInit)));

// Flow control: buffer for pending terminal output to client
// This creates backpressure when the client is slow to consume data
WriteBuffer terminalOutputBuffer;

while (run) {
{
lock_guard<std::mutex> guard(terminalThreadMutex);
Expand All @@ -276,37 +314,72 @@ void TerminalServer::runTerminal(

// Data structures needed for select() and
// non-blocking I/O.
fd_set rfd;
fd_set rfd, wfd;
timeval tv;

FD_ZERO(&rfd);
FD_SET(terminalFd, &rfd);
FD_ZERO(&wfd);

// Only read from terminal if we have room in the output buffer
// This is key for backpressure: if client is slow, we stop reading
if (terminalOutputBuffer.canAcceptMore()) {
FD_SET(terminalFd, &rfd);
}

int maxfd = terminalFd;
int serverClientFd = serverClientState->getSocketFd();
if (serverClientFd > 0) {
FD_SET(serverClientFd, &rfd);
maxfd = max(maxfd, serverClientFd);

// Monitor write availability if we have pending data
if (terminalOutputBuffer.hasPendingData()) {
FD_SET(serverClientFd, &wfd);
}
}
tv.tv_sec = 0;
tv.tv_usec = 100000;
select(maxfd + 1, &rfd, NULL, NULL, &tv);
select(maxfd + 1, &rfd, &wfd, NULL, &tv);

try {
// Check for data to receive; the received
// data includes also the data previously sent
// on the same master descriptor (line 90).
if (FD_ISSET(terminalFd, &rfd)) {
// Read from terminal and write to client
memset(b, 0, BUF_SIZE);
int rc = read(terminalFd, b, BUF_SIZE);
if (rc > 0) {
VLOG(2) << "Sending bytes from terminal: " << rc << " "
<< serverClientState->getWriter()->getSequenceNumber();
string s(b, rc);
// First, try to drain the output buffer when socket is writable
// This should be done before reading more data
if (serverClientFd > 0 && FD_ISSET(serverClientFd, &wfd) &&
terminalOutputBuffer.hasPendingData()) {
// Drain as much as possible from the buffer
while (terminalOutputBuffer.hasPendingData()) {
size_t count;
const char *data = terminalOutputBuffer.peekData(&count);
if (data == nullptr || count == 0) break;

// Create a TerminalBuffer packet and send it
string s(data, count);
et::TerminalBuffer tb;
tb.set_buffer(s);
VLOG(2) << "Draining buffered bytes to client: " << count << " "
<< serverClientState->getWriter()->getSequenceNumber();
serverClientState->writePacket(
Packet(TerminalPacketType::TERMINAL_BUFFER, protoToString(tb)));
terminalOutputBuffer.consume(count);

// Check if socket is still writable for more writes
if (!waitOnSocketWritable(serverClientFd)) {
break; // Socket would block, stop draining
}
}
}

// Check for data to receive from terminal
// Only if we have room in the buffer (backpressure)
if (FD_ISSET(terminalFd, &rfd) && terminalOutputBuffer.canAcceptMore()) {
// Read from terminal and buffer for later write to client
memset(b, 0, BUF_SIZE);
int rc = read(terminalFd, b, BUF_SIZE);
if (rc > 0) {
VLOG(2) << "Read bytes from terminal: " << rc
<< " buffer size: " << terminalOutputBuffer.size();
// Buffer the data for flow-controlled sending
terminalOutputBuffer.enqueue(string(b, rc));
} else if (rc == 0) {
LOG(INFO) << "Terminal session ended";
run = false;
Expand Down
Loading
Loading