diff --git a/core/src/main/java/io/questdb/client/Sender.java b/core/src/main/java/io/questdb/client/Sender.java
index 1175db7f..cf320640 100644
--- a/core/src/main/java/io/questdb/client/Sender.java
+++ b/core/src/main/java/io/questdb/client/Sender.java
@@ -619,6 +619,7 @@ public int getTimeout() {
private PrivateKey privateKey;
private int protocol = PARAMETER_NOT_SET_EXPLICITLY;
private int protocolVersion = PARAMETER_NOT_SET_EXPLICITLY;
+ private boolean requestDurableAck;
private int retryTimeoutMillis = PARAMETER_NOT_SET_EXPLICITLY;
private boolean shouldDestroyPrivKey;
private boolean tlsEnabled;
@@ -932,7 +933,8 @@ public Sender build() {
actualAutoFlushIntervalNanos,
actualInFlightWindowSize,
wsAuthHeader,
- actualMaxSchemasPerConnection
+ actualMaxSchemasPerConnection,
+ requestDurableAck
);
}
@@ -1483,6 +1485,27 @@ public LineSenderBuilder protocolVersion(int protocolVersion) {
return this;
}
+ /**
+ * Opts the connection in for STATUS_DURABLE_ACK frames. When enabled,
+ * servers with primary replication will emit per-table durable-upload
+ * watermarks as WAL data reaches the object store.
+ *
+ * This setting is only supported for WebSocket transport.
+ *
+ * Observe durable progress via
+ * {@link QwpWebSocketSender#getHighestDurableSeqTxn(CharSequence)}.
+ *
+ * @param enabled true to request durable ACKs
+ * @return this instance for method chaining
+ */
+ public LineSenderBuilder requestDurableAck(boolean enabled) {
+ if (protocol != PARAMETER_NOT_SET_EXPLICITLY && protocol != PROTOCOL_WEBSOCKET) {
+ throw new LineSenderException("request_durable_ack is only supported for WebSocket transport");
+ }
+ this.requestDurableAck = enabled;
+ return this;
+ }
+
/**
* Configures the maximum time the Sender will spend retrying upon receiving a recoverable error from the server.
*
@@ -1875,6 +1898,18 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) {
pos = getValue(configurationString, pos, sink, "in_flight_window");
int windowSize = parseIntValue(sink, "in_flight_window");
inFlightWindowSize(windowSize);
+ } else if (Chars.equals("request_durable_ack", sink)) {
+ if (protocol != PROTOCOL_WEBSOCKET) {
+ throw new LineSenderException("request_durable_ack is only supported for WebSocket transport");
+ }
+ pos = getValue(configurationString, pos, sink, "request_durable_ack");
+ if (Chars.equalsIgnoreCase("on", sink)) {
+ requestDurableAck(true);
+ } else if (Chars.equalsIgnoreCase("off", sink)) {
+ requestDurableAck(false);
+ } else {
+ throw new LineSenderException("invalid request_durable_ack [value=").put(sink).put(", allowed-values=[on, off]]");
+ }
} else if (Chars.equals("max_schemas_per_connection", sink)) {
if (protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException("max_schemas_per_connection is only supported for WebSocket transport");
@@ -1972,6 +2007,9 @@ private void validateParameters() {
.put(", requestedCapacity=").put(bufferCapacity)
.put("]");
}
+ if (requestDurableAck && protocol != PROTOCOL_WEBSOCKET) {
+ throw new LineSenderException("request_durable_ack is only supported for WebSocket transport");
+ }
if (protocol == PROTOCOL_HTTP) {
if (httpClientConfiguration.getMaximumRequestBufferSize() < httpClientConfiguration.getInitialRequestBufferSize()) {
throw new LineSenderException("maximum buffer capacity cannot be less than initial buffer capacity ")
diff --git a/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketClient.java b/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketClient.java
index fc93a1e5..986a34dd 100644
--- a/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketClient.java
+++ b/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketClient.java
@@ -107,6 +107,8 @@ public abstract class WebSocketClient implements QuietCloseable {
// QWP version negotiation
private String qwpClientId;
private int qwpMaxVersion = 1;
+ // Opt-in for STATUS_DURABLE_ACK frames; sent as X-QWP-Request-Durable-Ack: true
+ private boolean qwpRequestDurableAck;
// Receive buffer (native memory)
private long recvBufPtr;
private int recvBufSize;
@@ -390,6 +392,16 @@ public void setQwpMaxVersion(int maxVersion) {
this.qwpMaxVersion = maxVersion;
}
+ /**
+ * Enables the opt-in X-QWP-Request-Durable-Ack upgrade header. When set,
+ * servers with primary replication configured will additionally emit
+ * STATUS_DURABLE_ACK frames as the WAL containing committed client
+ * messages reaches the object store.
+ */
+ public void setQwpRequestDurableAck(boolean enabled) {
+ this.qwpRequestDurableAck = enabled;
+ }
+
/**
* Non-blocking attempt to receive a WebSocket frame.
* Returns immediately if no complete frame is available.
@@ -476,6 +488,9 @@ public void upgrade(CharSequence path, int timeout, CharSequence authorizationHe
sendBuffer.putAscii(qwpClientId);
sendBuffer.putAscii("\r\n");
}
+ if (qwpRequestDurableAck) {
+ sendBuffer.putAscii("X-QWP-Request-Durable-Ack: true\r\n");
+ }
if (authorizationHeader != null) {
sendBuffer.putAscii("Authorization: ");
sendBuffer.putAscii(authorizationHeader);
diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/InFlightWindow.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/InFlightWindow.java
index 4a9b0868..1d8e5c46 100644
--- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/InFlightWindow.java
+++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/InFlightWindow.java
@@ -377,6 +377,14 @@ public Throwable getLastError() {
return lastError.get();
}
+ /**
+ * Returns the highest batch sequence acknowledged by the server, or -1 if
+ * no acknowledgment has been received yet.
+ */
+ public long getHighestAckedSequence() {
+ return highestAcked;
+ }
+
/**
* Returns the maximum window size.
*/
@@ -384,6 +392,13 @@ public int getMaxWindowSize() {
return maxWindowSize;
}
+ /**
+ * Returns the timeout (ms) applied to blocking window operations.
+ */
+ public long getTimeoutMs() {
+ return timeoutMs;
+ }
+
/**
* Returns the total number of batches acknowledged.
*/
diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java
index 86885e6e..93189006 100644
--- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java
+++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java
@@ -35,6 +35,7 @@
import io.questdb.client.cutlass.line.array.LongArray;
import io.questdb.client.cutlass.qwp.protocol.QwpConstants;
import io.questdb.client.cutlass.qwp.protocol.QwpTableBuffer;
+import io.questdb.client.std.CharSequenceLongHashMap;
import io.questdb.client.std.CharSequenceObjHashMap;
import io.questdb.client.std.Chars;
import io.questdb.client.std.Decimal128;
@@ -151,7 +152,7 @@ public class QwpWebSocketSender implements Sender {
// Flow control
private InFlightWindow inFlightWindow;
private int maxSentSchemaId = -1;
- // Track highest symbol ID sent to server (for delta encoding)
+ // Track the highest symbol ID sent to server (for delta encoding)
// Once sent over TCP, server is guaranteed to receive it (or connection dies)
private int maxSentSymbolId = -1;
// Batch sequence counter (must match server's messageSequence)
@@ -160,7 +161,11 @@ public class QwpWebSocketSender implements Sender {
// Async mode: pending row tracking
private long pendingBytes;
private int pendingRowCount;
+ private final CharSequenceLongHashMap syncCommittedSeqTxns = new CharSequenceLongHashMap();
+ private final CharSequenceLongHashMap syncDurableSeqTxns = new CharSequenceLongHashMap();
+ private boolean requestDurableAck;
private boolean sawBinaryAck;
+ private boolean sawPong;
private WebSocketSendQueue sendQueue;
private QwpWebSocketSender(
@@ -303,6 +308,33 @@ public static QwpWebSocketSender connect(
return sender;
}
+ public static QwpWebSocketSender connect(
+ String host,
+ int port,
+ ClientTlsConfiguration tlsConfig,
+ int autoFlushRows,
+ int autoFlushBytes,
+ long autoFlushIntervalNanos,
+ int inFlightWindowSize,
+ String authorizationHeader,
+ int maxSchemasPerConnection,
+ boolean requestDurableAck
+ ) {
+ QwpWebSocketSender sender = new QwpWebSocketSender(
+ host, port, tlsConfig,
+ autoFlushRows, autoFlushBytes, autoFlushIntervalNanos,
+ inFlightWindowSize, authorizationHeader, maxSchemasPerConnection
+ );
+ try {
+ sender.setRequestDurableAck(requestDurableAck);
+ sender.ensureConnected();
+ } catch (Throwable t) {
+ sender.close();
+ throw t;
+ }
+ return sender;
+ }
+
/**
* Creates a sender without connecting. For testing only.
*
@@ -315,10 +347,14 @@ public static QwpWebSocketSender connect(
* @return unconnected sender
*/
public static QwpWebSocketSender createForTesting(String host, int port, int inFlightWindowSize) {
+ return createForTesting(host, port, inFlightWindowSize, null);
+ }
+
+ public static QwpWebSocketSender createForTesting(String host, int port, int inFlightWindowSize, String authorizationHeader) {
return new QwpWebSocketSender(
host, port, null,
DEFAULT_AUTO_FLUSH_ROWS, DEFAULT_AUTO_FLUSH_BYTES, DEFAULT_AUTO_FLUSH_INTERVAL_NANOS,
- inFlightWindowSize, null, DEFAULT_MAX_SCHEMAS_PER_CONNECTION
+ inFlightWindowSize, authorizationHeader, DEFAULT_MAX_SCHEMAS_PER_CONNECTION
);
}
@@ -817,6 +853,31 @@ public int getAutoFlushRows() {
return autoFlushRows;
}
+ /**
+ * Returns the highest seqTxn committed (written to WAL) for the given
+ * table, or -1 if no commit has been acknowledged for that table yet.
+ */
+ public long getHighestAckedSeqTxn(CharSequence tableName) {
+ if (sendQueue != null) {
+ return sendQueue.getCommittedSeqTxn(tableName);
+ }
+ return syncCommittedSeqTxns.get(tableName);
+ }
+
+ /**
+ * Returns the highest seqTxn durably uploaded to object store for the
+ * given table, or -1 if no durable ACK has been observed for that table.
+ * Only meaningful when the connection was opened with
+ * {@link #setRequestDurableAck(boolean)} = true on a server where primary
+ * replication is enabled.
+ */
+ public long getHighestDurableSeqTxn(CharSequence tableName) {
+ if (sendQueue != null) {
+ return sendQueue.getDurableSeqTxn(tableName);
+ }
+ return syncDurableSeqTxns.get(tableName);
+ }
+
/**
* Returns the max symbol ID sent to the server.
* Once sent over TCP, server is guaranteed to receive it (or connection dies).
@@ -998,6 +1059,31 @@ public QwpWebSocketSender longColumn(CharSequence columnName, long value) {
return this;
}
+ /**
+ * Sends a WebSocket PING and blocks until the PONG arrives, processing
+ * any STATUS_DURABLE_ACK or STATUS_OK frames along the way.
+ *
+ * The server flushes pending durable ACKs before sending the PONG, so
+ * after this method returns, {@link #getHighestDurableSeqTxn(CharSequence)}
+ * reflects all durable progress up to the moment the server processed
+ * the PING.
+ *
+ * In async mode the PING is sent by the I/O thread; the I/O loop
+ * continues its normal work (sending batches, draining ACKs) while
+ * waiting for the PONG.
+ *
+ * @throws LineSenderException if the connection is closed or the ping times out
+ */
+ public void ping() {
+ checkNotClosed();
+ ensureConnected();
+ if (inFlightWindowSize > 1) {
+ sendQueue.ping();
+ } else {
+ syncPing();
+ }
+ }
+
@Override
public void reset() {
checkNotClosed();
@@ -1026,6 +1112,27 @@ public void setGorillaEnabled(boolean enabled) {
this.encoder.setGorillaEnabled(enabled);
}
+ /**
+ * Opts the connection in for STATUS_DURABLE_ACK frames. Must be called
+ * before any send operation — the flag is consulted once, during WebSocket
+ * upgrade. Setting this true on a server without primary replication
+ * enabled is a no-op: the server silently ignores the header.
+ *
+ * Observe durable progress via {@link #getHighestDurableSeqTxn(CharSequence)}.
+ *
+ * @throws LineSenderException if the connection is already established or closed
+ */
+ public void setRequestDurableAck(boolean enabled) {
+ if (closed) {
+ throw new LineSenderException("Sender is closed");
+ }
+ if (connected) {
+ throw new LineSenderException(
+ "setRequestDurableAck must be called before the first send");
+ }
+ this.requestDurableAck = enabled;
+ }
+
/**
* Adds a SHORT column value to the current row.
*
@@ -1263,6 +1370,7 @@ private void ensureConnected() {
try {
client.setQwpMaxVersion(QwpConstants.MAX_SUPPORTED_VERSION);
client.setQwpClientId(QwpConstants.CLIENT_ID);
+ client.setQwpRequestDurableAck(requestDurableAck);
client.connect(host, port);
client.upgrade(WRITE_PATH, authorizationHeader);
} catch (Exception e) {
@@ -1695,6 +1803,58 @@ private boolean shouldAutoFlush() {
return false;
}
+ private void syncPing() {
+ client.sendPing(1000);
+ long deadline = System.currentTimeMillis() + InFlightWindow.DEFAULT_TIMEOUT_MS;
+ LineSenderException pingError = null;
+ while (System.currentTimeMillis() < deadline) {
+ sawPong = false;
+ sawBinaryAck = false;
+ boolean received = client.receiveFrame(ackHandler, 1000);
+ if (received) {
+ if (sawBinaryAck) {
+ if (ackResponse.isDurableAck()) {
+ updateSyncSeqTxns(syncDurableSeqTxns);
+ } else if (ackResponse.isSuccess()) {
+ inFlightWindow.acknowledgeUpTo(ackResponse.getSequence());
+ updateSyncSeqTxns(syncCommittedSeqTxns);
+ } else {
+ // Server-side error on a pending batch (parse /
+ // schema / security / internal / write error).
+ // Route through inFlightWindow.fail so subsequent
+ // waitForAck / flush calls also see it, capture the
+ // first error and throw it after PONG so the caller
+ // of ping() can react. We finish draining the round
+ // before throwing so durable/committed progress
+ // observed in this ping is preserved.
+ LineSenderException err = new LineSenderException(ackResponse.getErrorMessage());
+ inFlightWindow.fail(ackResponse.getSequence(), err);
+ if (pingError == null) {
+ pingError = err;
+ }
+ }
+ }
+ if (sawPong) {
+ if (pingError != null) {
+ throw pingError;
+ }
+ return;
+ }
+ }
+ }
+ throw new LineSenderException("Ping timed out");
+ }
+
+ private void updateSyncSeqTxns(CharSequenceLongHashMap seqTxns) {
+ for (int i = 0, n = ackResponse.getTableEntryCount(); i < n; i++) {
+ String name = ackResponse.getTableName(i);
+ long seqTxn = ackResponse.getTableSeqTxn(i);
+ if (seqTxn > seqTxns.get(name)) {
+ seqTxns.put(name, seqTxn);
+ }
+ }
+ }
+
private long toMicros(long value, ChronoUnit unit) {
switch (unit) {
case NANOS:
@@ -1740,19 +1900,20 @@ private void waitForAck(long expectedSequence) {
boolean received = client.receiveFrame(ackHandler, 1000); // 1 second timeout per read attempt
if (received) {
- // Non-binary frames (e.g. ping/pong/text) are not ACKs.
if (!sawBinaryAck) {
continue;
}
- long sequence = ackResponse.getSequence();
if (ackResponse.isSuccess()) {
- // Cumulative ACK - acknowledge all batches up to this sequence
+ long sequence = ackResponse.getSequence();
inFlightWindow.acknowledgeUpTo(sequence);
+ updateSyncSeqTxns(syncCommittedSeqTxns);
if (sequence >= expectedSequence) {
- return; // Our batch was acknowledged (cumulative)
+ return;
}
- // Got ACK for lower sequence - continue waiting
+ } else if (ackResponse.isDurableAck()) {
+ updateSyncSeqTxns(syncDurableSeqTxns);
} else {
+ long sequence = ackResponse.getSequence();
String errorMessage = ackResponse.getErrorMessage();
LineSenderException error = new LineSenderException(
"Server error for batch " + sequence + ": " +
@@ -1786,19 +1947,22 @@ private static class AckFrameHandler implements WebSocketFrameHandler {
@Override
public void onBinaryMessage(long payloadPtr, int payloadLen) {
sender.sawBinaryAck = true;
- if (!WebSocketResponse.isStructurallyValid(payloadPtr, payloadLen)) {
+ // readFrom validates inline; a single pass parses and bounds-checks.
+ if (!sender.ackResponse.readFrom(payloadPtr, payloadLen)) {
throw new LineSenderException(
"Invalid ACK response payload [length=" + payloadLen + ']'
);
}
- if (!sender.ackResponse.readFrom(payloadPtr, payloadLen)) {
- throw new LineSenderException("Failed to parse ACK response");
- }
}
@Override
public void onClose(int code, String reason) {
throw new LineSenderException("WebSocket closed while waiting for ACK [code=" + code + ", reason=" + reason + ']');
}
+
+ @Override
+ public void onPong(long payloadPtr, int payloadLen) {
+ sender.sawPong = true;
+ }
}
}
diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketResponse.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketResponse.java
index 9ed6b9b9..1c892690 100644
--- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketResponse.java
+++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketResponse.java
@@ -24,41 +24,56 @@
package io.questdb.client.cutlass.qwp.client;
+import io.questdb.client.std.LongList;
+import io.questdb.client.std.ObjList;
import io.questdb.client.std.Unsafe;
+import io.questdb.client.std.Utf8SequenceObjHashMap;
+import io.questdb.client.std.str.DirectUtf8String;
import io.questdb.client.std.str.Utf8s;
+import org.jetbrains.annotations.TestOnly;
import java.nio.charset.StandardCharsets;
/**
* Binary response format for WebSocket QWP v1 protocol.
*
- * Response format (little-endian):
+ * STATUS_OK response format (little-endian):
+ *
+ * +--------+----------+------------+--------------------------------------+
+ * | status | sequence | tableCount | table entries |
+ * | 1 byte | 8 bytes | 2 bytes | [nameLen(2)+name(N)+seqTxn(8)] * cnt |
+ * +--------+----------+------------+--------------------------------------+
+ *
+ *
+ * STATUS_DURABLE_ACK response format:
+ *
+ * +--------+------------+--------------------------------------+
+ * | status | tableCount | table entries |
+ * | 1 byte | 2 bytes | [nameLen(2)+name(N)+seqTxn(8)] * cnt |
+ * +--------+------------+--------------------------------------+
+ *
+ *
+ * Error response format:
*
* +--------+----------+------------------+
- * | status | sequence | error (if any) |
+ * | status | sequence | error |
* | 1 byte | 8 bytes | 2 bytes + UTF-8 |
* +--------+----------+------------------+
*
- *
- * Status codes:
- *
- * - 0x00: Success (ACK)
- * - 0x05: Parse error
- * - 0x03: Schema mismatch
- * - 0x09: Write error
- * - 0x08: Security error
- * - 0x06: Internal error
- *
- *
- * The sequence number allows correlation with the original request.
- * Error message is only present when status != 0.
*/
public class WebSocketResponse {
public static final int MAX_ERROR_MESSAGE_LENGTH = 1024;
+ public static final int MIN_DURABLE_ACK_SIZE = 3; // status + tableCount
public static final int MIN_ERROR_RESPONSE_SIZE = 11; // status + sequence + error length
- // Minimum response size: status (1) + sequence (8)
- public static final int MIN_RESPONSE_SIZE = 9;
+ public static final int MIN_OK_RESPONSE_SIZE = 11; // status + sequence + tableCount
+ /**
+ * Per-table durable-upload acknowledgment. Emitted by servers where
+ * primary replication is enabled and the connection opted in via
+ * X-QWP-Request-Durable-Ack. Payload: status + tableCount + per-table
+ * entries (nameLen + name + seqTxn).
+ */
+ public static final byte STATUS_DURABLE_ACK = 0x02;
public static final byte STATUS_INTERNAL_ERROR = 0x06;
// Status codes (must match QWP_SPECIFICATION.md)
public static final byte STATUS_OK = 0x00;
@@ -66,6 +81,14 @@ public class WebSocketResponse {
public static final byte STATUS_SCHEMA_MISMATCH = 0x03;
public static final byte STATUS_SECURITY_ERROR = 0x08;
public static final byte STATUS_WRITE_ERROR = 0x09;
+ private final DirectUtf8String lookupKey = new DirectUtf8String();
+ // Caches decoded table names so the same native-memory byte sequence resolves to
+ // the same interned String across frames. First sight of a name allocates a String
+ // (decoded via Utf8s.stringFromUtf8Bytes) and an owned Utf8String map key; every
+ // subsequent sight is allocation-free.
+ private final Utf8SequenceObjHashMap tableNameCache = new Utf8SequenceObjHashMap<>();
+ private final ObjList tableNames = new ObjList<>();
+ private final LongList tableSeqTxns = new LongList();
private String errorMessage;
private int errorMessageUtf8Length;
private long sequence;
@@ -78,9 +101,23 @@ public WebSocketResponse() {
this.errorMessageUtf8Length = -1;
}
+ /**
+ * Creates a durable-upload ACK response with a single table entry.
+ */
+ @TestOnly
+ public static WebSocketResponse durableAck(String tableName, long seqTxn) {
+ WebSocketResponse response = new WebSocketResponse();
+ response.status = STATUS_DURABLE_ACK;
+ response.sequence = -1;
+ response.tableNames.add(tableName);
+ response.tableSeqTxns.add(seqTxn);
+ return response;
+ }
+
/**
* Creates an error response.
*/
+ @TestOnly
public static WebSocketResponse error(long sequence, byte status, String errorMessage) {
WebSocketResponse response = new WebSocketResponse();
response.status = status;
@@ -92,38 +129,43 @@ public static WebSocketResponse error(long sequence, byte status, String errorMe
/**
* Validates binary response framing without allocating.
- *
- * Accepted formats:
- *
- * - OK: exactly 9 bytes (status + sequence)
- * - Error: exactly 11 + errorLength bytes
- *
*
* @param ptr response buffer pointer
* @param length response frame payload length
* @return true if payload structure is valid
*/
public static boolean isStructurallyValid(long ptr, int length) {
- if (length < MIN_RESPONSE_SIZE) {
+ if (length < 1) {
return false;
}
byte status = Unsafe.getUnsafe().getByte(ptr);
if (status == STATUS_OK) {
- return length == MIN_RESPONSE_SIZE;
+ if (length < MIN_OK_RESPONSE_SIZE) {
+ return false;
+ }
+ return validateTableEntries(ptr + 9, length - 9);
}
+ if (status == STATUS_DURABLE_ACK) {
+ if (length < MIN_DURABLE_ACK_SIZE) {
+ return false;
+ }
+ return validateTableEntries(ptr + 1, length - 1);
+ }
+
+ // Error response
if (length < MIN_ERROR_RESPONSE_SIZE) {
return false;
}
-
- int msgLen = Unsafe.getUnsafe().getShort(ptr + MIN_RESPONSE_SIZE) & 0xFFFF;
+ int msgLen = Unsafe.getUnsafe().getShort(ptr + 9) & 0xFFFF;
return length == MIN_ERROR_RESPONSE_SIZE + msgLen;
}
/**
* Creates a success response.
*/
+ @TestOnly
public static WebSocketResponse success(long sequence) {
WebSocketResponse response = new WebSocketResponse();
response.status = STATUS_OK;
@@ -140,12 +182,20 @@ public String getErrorMessage() {
}
/**
- * Returns the sequence number.
+ * Returns the client batch sequence number (STATUS_OK and error frames),
+ * or -1 for STATUS_DURABLE_ACK frames.
*/
public long getSequence() {
return sequence;
}
+ /**
+ * Returns the raw status byte.
+ */
+ public byte getStatus() {
+ return status;
+ }
+
/**
* Returns a human-readable status name.
*/
@@ -153,6 +203,8 @@ public String getStatusName() {
switch (status) {
case STATUS_OK:
return "OK";
+ case STATUS_DURABLE_ACK:
+ return "DURABLE_ACK";
case STATUS_PARSE_ERROR:
return "PARSE_ERROR";
case STATUS_SCHEMA_MISMATCH:
@@ -168,8 +220,27 @@ public String getStatusName() {
}
}
+ public int getTableEntryCount() {
+ return tableNames.size();
+ }
+
+ public String getTableName(int index) {
+ return tableNames.getQuick(index);
+ }
+
+ public long getTableSeqTxn(int index) {
+ return tableSeqTxns.getQuick(index);
+ }
+
+ /**
+ * Returns true when this is a per-table durable-upload ACK (STATUS_DURABLE_ACK).
+ */
+ public boolean isDurableAck() {
+ return status == STATUS_DURABLE_ACK;
+ }
+
/**
- * Returns true if this is a success response.
+ * Returns true if this is a success response (STATUS_OK).
*/
public boolean isSuccess() {
return status == STATUS_OK;
@@ -183,50 +254,70 @@ public boolean isSuccess() {
* @return true if successfully parsed, false if not enough data
*/
public boolean readFrom(long ptr, int length) {
- if (length < MIN_RESPONSE_SIZE) {
+ tableNames.clear();
+ tableSeqTxns.clear();
+
+ if (length < 1) {
return false;
}
- int offset = 0;
-
- // Status (1 byte)
- status = Unsafe.getUnsafe().getByte(ptr + offset);
- offset += 1;
-
- // Sequence (8 bytes, little-endian)
- sequence = Unsafe.getUnsafe().getLong(ptr + offset);
- offset += 8;
+ status = Unsafe.getUnsafe().getByte(ptr);
- // Error message (if status != OK and more data available)
- if (status != STATUS_OK && length >= offset + 2) {
- int msgLen = Unsafe.getUnsafe().getShort(ptr + offset) & 0xFFFF;
- offset += 2;
+ if (status == STATUS_OK) {
+ if (length < MIN_OK_RESPONSE_SIZE) {
+ return false;
+ }
+ sequence = Unsafe.getUnsafe().getLong(ptr + 1);
+ errorMessage = null;
+ errorMessageUtf8Length = -1;
+ return readTableEntries(ptr + 9, length - 9);
+ }
- if (length >= offset + msgLen && msgLen > 0) {
- byte[] msgBytes = new byte[msgLen];
- for (int i = 0; i < msgLen; i++) {
- msgBytes[i] = Unsafe.getUnsafe().getByte(ptr + offset + i);
- }
- errorMessage = new String(msgBytes, StandardCharsets.UTF_8);
- errorMessageUtf8Length = -1;
- } else {
- errorMessage = null;
- errorMessageUtf8Length = 0;
+ if (status == STATUS_DURABLE_ACK) {
+ if (length < MIN_DURABLE_ACK_SIZE) {
+ return false;
}
- } else {
+ sequence = -1;
errorMessage = null;
errorMessageUtf8Length = -1;
+ return readTableEntries(ptr + 1, length - 1);
}
+ // Error response
+ if (length < MIN_ERROR_RESPONSE_SIZE) {
+ return false;
+ }
+ sequence = Unsafe.getUnsafe().getLong(ptr + 1);
+ int offset = 9;
+ int msgLen = Unsafe.getUnsafe().getShort(ptr + offset) & 0xFFFF;
+ offset += 2;
+ if (length < offset + msgLen) {
+ return false;
+ }
+ if (msgLen > 0) {
+ byte[] msgBytes = new byte[msgLen];
+ for (int i = 0; i < msgLen; i++) {
+ msgBytes[i] = Unsafe.getUnsafe().getByte(ptr + offset + i);
+ }
+ errorMessage = new String(msgBytes, StandardCharsets.UTF_8);
+ errorMessageUtf8Length = -1;
+ } else {
+ errorMessage = null;
+ errorMessageUtf8Length = 0;
+ }
return true;
}
/**
* Calculates the serialized size of this response.
*/
+ @TestOnly
public int serializedSize() {
if (status == STATUS_OK) {
- return MIN_RESPONSE_SIZE;
+ return MIN_OK_RESPONSE_SIZE + tableEntriesSize();
+ }
+ if (status == STATUS_DURABLE_ACK) {
+ return MIN_DURABLE_ACK_SIZE + tableEntriesSize();
}
return MIN_ERROR_RESPONSE_SIZE + getErrorMessageUtf8Length();
}
@@ -234,7 +325,9 @@ public int serializedSize() {
@Override
public String toString() {
if (isSuccess()) {
- return "WebSocketResponse{status=OK, seq=" + sequence + "}";
+ return "WebSocketResponse{status=OK, seq=" + sequence + ", tables=" + tableNames.size() + "}";
+ } else if (isDurableAck()) {
+ return "WebSocketResponse{status=DURABLE_ACK, tables=" + tableNames.size() + "}";
} else {
return "WebSocketResponse{status=" + getStatusName() + ", seq=" + sequence +
", error=" + errorMessage + "}";
@@ -247,19 +340,23 @@ public String toString() {
* @param ptr destination address
* @return number of bytes written
*/
+ @TestOnly
public int writeTo(long ptr) {
int offset = 0;
- // Status (1 byte)
Unsafe.getUnsafe().putByte(ptr + offset, status);
offset += 1;
- // Sequence (8 bytes, little-endian)
- Unsafe.getUnsafe().putLong(ptr + offset, sequence);
- offset += 8;
+ if (status == STATUS_OK) {
+ Unsafe.getUnsafe().putLong(ptr + offset, sequence);
+ offset += 8;
+ offset += writeTableEntries(ptr + offset);
+ } else if (status == STATUS_DURABLE_ACK) {
+ offset += writeTableEntries(ptr + offset);
+ } else {
+ Unsafe.getUnsafe().putLong(ptr + offset, sequence);
+ offset += 8;
- // Error message length and bytes (if any)
- if (status != STATUS_OK) {
int msgLen = getErrorMessageUtf8Length();
// Length prefix (2 bytes, little-endian)
Unsafe.getUnsafe().putShort(ptr + offset, (short) msgLen);
@@ -270,12 +367,105 @@ public int writeTo(long ptr) {
offset += Utf8s.strCpyUtf8(errorMessage, ptr + offset, msgLen);
}
}
+ return offset;
+ }
+
+ // Validates inline as it parses; returns false on truncation, lying-length
+ // entries, empty table names, or trailing garbage. On false, tableNames /
+ // tableSeqTxns may hold partial state, but the caller (readFrom) clears
+ // both lists at the start of every call so partial state never leaks.
+ private boolean readTableEntries(long ptr, int remaining) {
+ if (remaining < 2) {
+ return false;
+ }
+ int tableCount = Unsafe.getUnsafe().getShort(ptr) & 0xFFFF;
+ int offset = 2;
+ for (int i = 0; i < tableCount; i++) {
+ if (remaining < offset + 2) {
+ return false;
+ }
+ int nameLen = Unsafe.getUnsafe().getShort(ptr + offset) & 0xFFFF;
+ offset += 2;
+ // Empty table names are rejected as structurally invalid - a valid
+ // table name is never zero bytes, and accepting empty names would
+ // let a misbehaving server poison the per-table tracker with "" entries.
+ if (nameLen == 0 || remaining < offset + nameLen + 8) {
+ return false;
+ }
+ long nameLo = ptr + offset;
+ long nameHi = nameLo + nameLen;
+ offset += nameLen;
+ long seqTxn = Unsafe.getUnsafe().getLong(ptr + offset);
+ offset += 8;
+ tableNames.add(internTableName(nameLo, nameHi));
+ tableSeqTxns.add(seqTxn);
+ }
+ return remaining == offset;
+ }
+
+ private String internTableName(long lo, long hi) {
+ lookupKey.of(lo, hi);
+ int keyIndex = tableNameCache.keyIndex(lookupKey);
+ if (keyIndex < 0) {
+ return tableNameCache.valueAtQuick(keyIndex);
+ }
+ String decoded = Utf8s.stringFromUtf8Bytes(lo, hi);
+ tableNameCache.putAt(keyIndex, lookupKey, decoded);
+ return decoded;
+ }
+
+ private int tableEntriesSize() {
+ int size = 0;
+ for (int i = 0, n = tableNames.size(); i < n; i++) {
+ size += 2 + tableNames.getQuick(i).getBytes(StandardCharsets.UTF_8).length + 8;
+ }
+ return size;
+ }
+
+ private static boolean validateTableEntries(long ptr, int remaining) {
+ if (remaining < 2) {
+ return false;
+ }
+ int tableCount = Unsafe.getUnsafe().getShort(ptr) & 0xFFFF;
+ int offset = 2;
+ for (int i = 0; i < tableCount; i++) {
+ if (remaining < offset + 2) {
+ return false;
+ }
+ int nameLen = Unsafe.getUnsafe().getShort(ptr + offset) & 0xFFFF;
+ offset += 2;
+ // Empty table names are rejected as structurally invalid - a valid
+ // table name is never zero bytes, and accepting empty names would
+ // let a misbehaving server poison the per-table tracker with "" entries.
+ if (nameLen == 0 || remaining < offset + nameLen + 8) {
+ return false;
+ }
+ offset += nameLen + 8;
+ }
+ return remaining == offset;
+ }
+ private int writeTableEntries(long ptr) {
+ int offset = 0;
+ int count = tableNames.size();
+ Unsafe.getUnsafe().putShort(ptr + offset, (short) count);
+ offset += 2;
+ for (int i = 0; i < count; i++) {
+ byte[] nameBytes = tableNames.getQuick(i).getBytes(StandardCharsets.UTF_8);
+ Unsafe.getUnsafe().putShort(ptr + offset, (short) nameBytes.length);
+ offset += 2;
+ for (int j = 0; j < nameBytes.length; j++) {
+ Unsafe.getUnsafe().putByte(ptr + offset + j, nameBytes[j]);
+ }
+ offset += nameBytes.length;
+ Unsafe.getUnsafe().putLong(ptr + offset, tableSeqTxns.getQuick(i));
+ offset += 8;
+ }
return offset;
}
private int getErrorMessageUtf8Length() {
- if (status == STATUS_OK || errorMessage == null || errorMessage.isEmpty()) {
+ if (status == STATUS_OK || status == STATUS_DURABLE_ACK || errorMessage == null || errorMessage.isEmpty()) {
errorMessageUtf8Length = 0;
return 0;
}
diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketSendQueue.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketSendQueue.java
index 181602b8..1ac73f81 100644
--- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketSendQueue.java
+++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketSendQueue.java
@@ -27,6 +27,7 @@
import io.questdb.client.cutlass.http.client.WebSocketClient;
import io.questdb.client.cutlass.http.client.WebSocketFrameHandler;
import io.questdb.client.cutlass.line.LineSenderException;
+import io.questdb.client.std.CharSequenceLongHashMap;
import io.questdb.client.std.QuietCloseable;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
@@ -73,6 +74,7 @@ public class WebSocketSendQueue implements QuietCloseable {
private final WebSocketClient client;
// Configuration
private final long enqueueTimeoutMs;
+ private final long pingTimeoutMs;
@Nullable
private final ConnectionFailureListener connectionFailureListener;
// Optional InFlightWindow for tracking sent batches awaiting ACK
@@ -81,6 +83,11 @@ public class WebSocketSendQueue implements QuietCloseable {
// The I/O thread for async send/receive
private final Thread ioThread;
+ // Serializes concurrent ping() callers so each one gets its own PING/PONG
+ // round-trip. Without this, two callers can race on pingComplete and the
+ // second caller can return on the first caller's PONG, observing a stale
+ // durable watermark.
+ private final Object pingLock = new Object();
// Counter for batches currently being processed by the I/O thread
// This tracks batches that have been dequeued but not yet fully sent
private final AtomicInteger processingCount = new AtomicInteger(0);
@@ -94,6 +101,10 @@ public class WebSocketSendQueue implements QuietCloseable {
// Synchronization for flush/close
private final CountDownLatch shutdownLatch;
private final long shutdownTimeoutMs;
+ // Per-table seqTxn watermarks. Written by the I/O thread only; read by user threads.
+ // All accesses synchronize on the map instance itself for publication and monotonic updates.
+ private final CharSequenceLongHashMap committedSeqTxns = new CharSequenceLongHashMap();
+ private final CharSequenceLongHashMap durableSeqTxns = new CharSequenceLongHashMap();
// Statistics - receiving
private final AtomicLong totalAcks = new AtomicLong(0);
// Statistics - sending
@@ -109,6 +120,10 @@ public class WebSocketSendQueue implements QuietCloseable {
// Single pending buffer slot (double-buffering means at most 1 item in queue)
// Zero allocation - just a volatile reference handoff
private volatile MicrobatchBuffer pendingBuffer;
+ private volatile boolean pingComplete;
+ private volatile boolean pingRequested;
+ private volatile boolean pongReceived;
+ private long pingDeadlineNanos;
// Running state
private volatile boolean running;
private volatile boolean shuttingDown;
@@ -146,6 +161,7 @@ public WebSocketSendQueue(WebSocketClient client, @Nullable InFlightWindow inFli
this.inFlightWindow = inFlightWindow;
this.enqueueTimeoutMs = enqueueTimeoutMs;
this.shutdownTimeoutMs = shutdownTimeoutMs;
+ this.pingTimeoutMs = inFlightWindow != null ? inFlightWindow.getTimeoutMs() : InFlightWindow.DEFAULT_TIMEOUT_MS;
this.connectionFailureListener = connectionFailureListener;
this.running = true;
this.shuttingDown = false;
@@ -349,6 +365,61 @@ public Throwable getLastError() {
return lastError;
}
+ public long getCommittedSeqTxn(CharSequence tableName) {
+ synchronized (committedSeqTxns) {
+ return committedSeqTxns.get(tableName);
+ }
+ }
+
+ public long getDurableSeqTxn(CharSequence tableName) {
+ synchronized (durableSeqTxns) {
+ return durableSeqTxns.get(tableName);
+ }
+ }
+
+ /**
+ * Requests the I/O thread to send a WebSocket PING and blocks until
+ * the PONG arrives. The I/O loop continues its normal work (sending
+ * batches, draining ACKs) while waiting for the PONG.
+ *
+ * The server flushes pending durable ACKs before sending the PONG,
+ * so after this method returns {@code getDurableSeqTxn()} reflects
+ * all durable progress up to the moment the server processed the PING.
+ *
+ * Concurrent ping callers are serialized: each caller gets its own
+ * PING / PONG round-trip so the post-condition holds for every caller
+ * independently. A second caller may wait up to {@code pingTimeoutMs}
+ * for an in-flight ping to complete before its own ping starts.
+ */
+ public void ping() {
+ synchronized (pingLock) {
+ checkError();
+ synchronized (processingLock) {
+ pingComplete = false;
+ pingRequested = true;
+ processingLock.notifyAll();
+ long deadline = System.nanoTime() + pingTimeoutMs * 1_000_000L;
+ while (!pingComplete && running) {
+ long remaining = (deadline - System.nanoTime()) / 1_000_000L;
+ if (remaining <= 0) {
+ throw new LineSenderException("Ping timed out");
+ }
+ try {
+ processingLock.wait(remaining);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new LineSenderException("Ping interrupted");
+ }
+ }
+ if (!pingComplete) {
+ checkError();
+ throw new LineSenderException("Ping aborted: send queue is shutting down");
+ }
+ }
+ checkError();
+ }
+ }
+
/**
* Returns the total number of batches sent.
*/
@@ -374,12 +445,12 @@ private void checkError() {
}
/**
- * Computes the current I/O state based on queue and in-flight status.
+ * Computes the current I/O state based on queue, in-flight, and ping status.
*/
private IoState computeState(boolean hasInFlight) {
if (!isPendingEmpty()) {
return IoState.ACTIVE;
- } else if (hasInFlight) {
+ } else if (hasInFlight || pingDeadlineNanos > 0) {
return IoState.DRAINING;
} else {
return IoState.IDLE;
@@ -449,6 +520,20 @@ private void ioLoop() {
try {
int drainIdleCycles = 0;
while (running || !isPendingEmpty()) {
+ // Send a pending PING if requested
+ if (pingRequested) {
+ pingRequested = false;
+ pongReceived = false;
+ pingDeadlineNanos = System.nanoTime() + pingTimeoutMs * 1_000_000L;
+ try {
+ client.sendPing(1000);
+ } catch (Exception e) {
+ pingDeadlineNanos = 0;
+ failConnection(new LineSenderException("Ping failed", e));
+ completePing();
+ }
+ }
+
MicrobatchBuffer batch = null;
boolean hasInFlight = (inFlightWindow != null && inFlightWindow.getInFlightCount() > 0);
IoState state = computeState(hasInFlight);
@@ -460,7 +545,7 @@ private void ioLoop() {
// Nothing to do - wait for work under lock
synchronized (processingLock) {
// Re-check under lock to avoid missed wakeup
- if (isPendingEmpty() && running) {
+ if (isPendingEmpty() && running && !pingRequested) {
try {
processingLock.wait(100);
} catch (InterruptedException e) {
@@ -473,10 +558,22 @@ private void ioLoop() {
case ACTIVE:
case DRAINING:
// Try to receive any pending ACKs (non-blocking)
- if (hasInFlight && client.isConnected()) {
+ if (client.isConnected()) {
receivedAcks = tryReceiveAcks();
}
+ // Check if a pending PING has been answered
+ if (pingDeadlineNanos > 0) {
+ if (pongReceived) {
+ pingDeadlineNanos = 0;
+ completePing();
+ } else if (System.nanoTime() >= pingDeadlineNanos) {
+ pingDeadlineNanos = 0;
+ failConnection(new LineSenderException("Ping timed out waiting for PONG"));
+ completePing();
+ }
+ }
+
// Try to dequeue and send a batch
boolean hasWindowSpace = (inFlightWindow == null || inFlightWindow.hasWindowSpace());
if (hasWindowSpace) {
@@ -521,6 +618,13 @@ private void ioLoop() {
}
}
+ private void completePing() {
+ synchronized (processingLock) {
+ pingComplete = true;
+ processingLock.notifyAll();
+ }
+ }
+
private boolean isPendingEmpty() {
return pendingBuffer == null;
}
@@ -664,7 +768,8 @@ private class ResponseHandler implements WebSocketFrameHandler {
@Override
public void onBinaryMessage(long payloadPtr, int payloadLen) {
- if (!WebSocketResponse.isStructurallyValid(payloadPtr, payloadLen)) {
+ // readFrom validates inline; a single pass parses and bounds-checks.
+ if (!response.readFrom(payloadPtr, payloadLen)) {
LineSenderException error = new LineSenderException(
"Invalid ACK response payload [length=" + payloadLen + ']'
);
@@ -673,18 +778,9 @@ public void onBinaryMessage(long payloadPtr, int payloadLen) {
return;
}
- // Parse response from binary payload
- if (!response.readFrom(payloadPtr, payloadLen)) {
- LineSenderException error = new LineSenderException("Failed to parse ACK response");
- LOG.error("Failed to parse response");
- failConnection(error);
- return;
- }
-
long sequence = response.getSequence();
if (response.isSuccess()) {
- // Cumulative ACK - acknowledge all batches up to this sequence
if (inFlightWindow != null) {
int acked = inFlightWindow.acknowledgeUpTo(sequence);
if (acked > 0) {
@@ -696,6 +792,16 @@ public void onBinaryMessage(long payloadPtr, int payloadLen) {
LOG.debug("ACK for already-acknowledged sequences [upTo={}]", sequence);
}
}
+ for (int i = 0, n = response.getTableEntryCount(); i < n; i++) {
+ advanceSeqTxn(committedSeqTxns, response.getTableName(i), response.getTableSeqTxn(i));
+ }
+ } else if (response.isDurableAck()) {
+ for (int i = 0, n = response.getTableEntryCount(); i < n; i++) {
+ advanceSeqTxn(durableSeqTxns, response.getTableName(i), response.getTableSeqTxn(i));
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Durable ACK received [tables={}]", response.getTableEntryCount());
+ }
} else {
// Error - fail the batch
String errorMessage = response.getErrorMessage();
@@ -714,5 +820,19 @@ public void onClose(int code, String reason) {
LOG.info("WebSocket closed by server [code={}, reason={}]", code, reason);
failConnection(new LineSenderException("WebSocket closed by server [code=" + code + ", reason=" + reason + ']'));
}
+
+ @Override
+ public void onPong(long payloadPtr, int payloadLen) {
+ pongReceived = true;
+ }
+ }
+
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ private static void advanceSeqTxn(CharSequenceLongHashMap map, String tableName, long seqTxn) {
+ synchronized (map) {
+ if (seqTxn > map.get(tableName)) {
+ map.put(tableName, seqTxn);
+ }
+ }
}
}
diff --git a/core/src/main/java/io/questdb/client/std/CharSequenceLongHashMap.java b/core/src/main/java/io/questdb/client/std/CharSequenceLongHashMap.java
new file mode 100644
index 00000000..95fa8c01
--- /dev/null
+++ b/core/src/main/java/io/questdb/client/std/CharSequenceLongHashMap.java
@@ -0,0 +1,112 @@
+/*+*****************************************************************************
+ * ___ _ ____ ____
+ * / _ \ _ _ ___ ___| |_| _ \| __ )
+ * | | | | | | |/ _ \/ __| __| | | | _ \
+ * | |_| | |_| | __/\__ \ |_| |_| | |_) |
+ * \__\_\\__,_|\___||___/\__|____/|____/
+ *
+ * Copyright (c) 2014-2019 Appsicle
+ * Copyright (c) 2019-2026 QuestDB
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ ******************************************************************************/
+
+package io.questdb.client.std;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.Arrays;
+
+
+public class CharSequenceLongHashMap extends AbstractCharSequenceHashSet {
+ public static final long NO_ENTRY_VALUE = -1L;
+ private final ObjList list;
+ private final long noEntryValue;
+ private long[] values;
+
+ public CharSequenceLongHashMap() {
+ this(8);
+ }
+
+ public CharSequenceLongHashMap(int initialCapacity) {
+ this(initialCapacity, 0.4, NO_ENTRY_VALUE);
+ }
+
+ public CharSequenceLongHashMap(int initialCapacity, double loadFactor, long noEntryValue) {
+ super(initialCapacity, loadFactor);
+ this.noEntryValue = noEntryValue;
+ this.list = new ObjList<>(capacity);
+ values = new long[keys.length];
+ clear();
+ }
+
+ @Override
+ public final void clear() {
+ super.clear();
+ list.clear();
+ Arrays.fill(values, noEntryValue);
+ }
+
+ public long get(@NotNull CharSequence key) {
+ return valueAt(keyIndex(key));
+ }
+
+ public ObjList keys() {
+ return list;
+ }
+
+ public boolean put(@NotNull CharSequence key, long value) {
+ int index = keyIndex(key);
+ if (index < 0) {
+ values[-index - 1] = value;
+ return false;
+ }
+ final String keyString = Chars.toString(key);
+ putAt0(index, keyString, value);
+ list.add(keyString);
+ return true;
+ }
+
+ public long valueAt(int index) {
+ int index1 = -index - 1;
+ return index < 0 ? values[index1] : noEntryValue;
+ }
+
+ private void putAt0(int index, CharSequence key, long value) {
+ keys[index] = key;
+ values[index] = value;
+ if (--free == 0) {
+ rehash();
+ }
+ }
+
+ private void rehash() {
+ long[] oldValues = values;
+ CharSequence[] oldKeys = keys;
+ int size = capacity - free;
+ capacity = capacity * 2;
+ free = capacity - size;
+ mask = Numbers.ceilPow2((int) (capacity / loadFactor)) - 1;
+ this.keys = new CharSequence[mask + 1];
+ this.values = new long[mask + 1];
+ for (int i = oldKeys.length - 1; i > -1; i--) {
+ CharSequence key = oldKeys[i];
+ if (key != null) {
+ final int index = keyIndex(key);
+ keys[index] = key;
+ values[index] = oldValues[i];
+ }
+ }
+ }
+}
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/InFlightWindowTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/InFlightWindowTest.java
index 95587c95..40deb626 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/InFlightWindowTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/InFlightWindowTest.java
@@ -394,6 +394,42 @@ public void testConcurrentAddAndCumulativeAck() throws Exception {
assertEquals(numBatches, window.getTotalAcked());
}
+ @Test
+ public void testGetHighestAckedSequenceInitiallyMinusOne() {
+ InFlightWindow window = new InFlightWindow(8, 1000);
+ assertEquals(-1, window.getHighestAckedSequence());
+ }
+
+ @Test
+ public void testGetHighestAckedSequenceAdvancesOnAcknowledge() {
+ InFlightWindow window = new InFlightWindow(8, 1000);
+
+ window.addInFlight(0);
+ window.addInFlight(1);
+ window.addInFlight(2);
+
+ window.acknowledge(0);
+ assertEquals(0, window.getHighestAckedSequence());
+
+ window.acknowledgeUpTo(2);
+ assertEquals(2, window.getHighestAckedSequence());
+ }
+
+ @Test
+ public void testGetHighestAckedSequenceDoesNotRegress() {
+ InFlightWindow window = new InFlightWindow(8, 1000);
+
+ window.addInFlight(0);
+ window.addInFlight(1);
+
+ window.acknowledgeUpTo(1);
+ assertEquals(1, window.getHighestAckedSequence());
+
+ // Duplicate/lower ack should not regress
+ window.acknowledge(0);
+ assertEquals(1, window.getHighestAckedSequence());
+ }
+
@Test
public void testDefaultWindowSize() {
InFlightWindow window = new InFlightWindow();
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketAckIntegrationTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketAckIntegrationTest.java
index 5950b114..d33ce36f 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketAckIntegrationTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketAckIntegrationTest.java
@@ -24,6 +24,7 @@
package io.questdb.client.test.cutlass.qwp.client;
+import io.questdb.client.cutlass.line.LineSenderException;
import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender;
import io.questdb.client.cutlass.qwp.client.WebSocketResponse;
import io.questdb.client.cutlass.qwp.websocket.WebSocketCloseCode;
@@ -34,8 +35,13 @@
import org.junit.Test;
import java.io.IOException;
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Integration tests for QWP v1 WebSocket ACK delivery mechanism.
@@ -195,17 +201,160 @@ public void testSyncFlushIgnoresPingAndWaitsForAck() throws Exception {
}
}
+ @Test
+ public void testDurableAckUpgradeHeaderNotSentByDefault() throws Exception {
+ int port = TEST_PORT + 31;
+ AtomicReference capturedRequest = new AtomicReference<>();
+
+ try (ServerSocket serverSocket = new ServerSocket(port)) {
+ serverSocket.setSoTimeout(5000);
+
+ Thread serverThread = new Thread(() -> {
+ try {
+ Socket client = serverSocket.accept();
+ InputStream in = client.getInputStream();
+ StringBuilder request = new StringBuilder();
+ byte[] buf = new byte[1];
+ while (true) {
+ int read = in.read(buf);
+ if (read <= 0) {
+ break;
+ }
+ request.append((char) buf[0]);
+ if (request.toString().endsWith("\r\n\r\n")) {
+ break;
+ }
+ }
+ capturedRequest.set(request.toString());
+ client.close();
+ } catch (Exception e) {
+ // expected
+ }
+ });
+ serverThread.start();
+
+ try {
+ QwpWebSocketSender.connect("localhost", port, null,
+ 0, 0, 0, 1, null).close();
+ } catch (LineSenderException e) {
+ // expected - server doesn't complete handshake
+ }
+
+ serverThread.join(5000);
+
+ String request = capturedRequest.get();
+ Assert.assertNotNull("Server should have received upgrade request", request);
+ Assert.assertFalse("Request should NOT contain X-QWP-Request-Durable-Ack header",
+ request.contains("X-QWP-Request-Durable-Ack"));
+ }
+ }
+
+ @Test
+ public void testDurableAckUpgradeHeaderSent() throws Exception {
+ int port = TEST_PORT + 30;
+ AtomicReference capturedRequest = new AtomicReference<>();
+
+ try (ServerSocket serverSocket = new ServerSocket(port)) {
+ serverSocket.setSoTimeout(5000);
+
+ Thread serverThread = new Thread(() -> {
+ try {
+ Socket client = serverSocket.accept();
+ InputStream in = client.getInputStream();
+ StringBuilder request = new StringBuilder();
+ byte[] buf = new byte[1];
+ while (true) {
+ int read = in.read(buf);
+ if (read <= 0) {
+ break;
+ }
+ request.append((char) buf[0]);
+ if (request.toString().endsWith("\r\n\r\n")) {
+ break;
+ }
+ }
+ capturedRequest.set(request.toString());
+ client.close();
+ } catch (Exception e) {
+ // expected
+ }
+ });
+ serverThread.start();
+
+ try {
+ QwpWebSocketSender.connect("localhost", port, null,
+ 0, 0, 0, 1, null,
+ QwpWebSocketSender.DEFAULT_MAX_SCHEMAS_PER_CONNECTION,
+ true).close();
+ } catch (LineSenderException e) {
+ // expected - server doesn't complete handshake
+ }
+
+ serverThread.join(5000);
+
+ String request = capturedRequest.get();
+ Assert.assertNotNull("Server should have received upgrade request", request);
+ Assert.assertTrue("Request should contain X-QWP-Request-Durable-Ack header",
+ request.contains("X-QWP-Request-Durable-Ack: true"));
+ }
+ }
+
+ @Test
+ public void testSyncDurableAckDuringWaitForAck() throws Exception {
+ int port = TEST_PORT + 25;
+ DurableAckThenStatusOkHandler handler = new DurableAckThenStatusOkHandler();
+
+ try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
+ server.start();
+ Assert.assertTrue("Server failed to start", server.awaitStart(5, TimeUnit.SECONDS));
+
+ // window=1 for sync mode
+ try (QwpWebSocketSender sender = QwpWebSocketSender.connect(
+ "localhost", port, null, 0, 0, 0, 1, null)) {
+ sender.table("trades")
+ .longColumn("price", 100)
+ .atNow();
+ sender.flush();
+
+ Assert.assertEquals(42L, sender.getHighestDurableSeqTxn("trades"));
+ Assert.assertEquals(10L, sender.getHighestAckedSeqTxn("trades"));
+ }
+ }
+ }
+
+ @Test
+ public void testSyncFlushUpdatesCommittedSeqTxnsWithTableEntries() throws Exception {
+ int port = TEST_PORT + 24;
+ AckWithTableEntriesHandler handler = new AckWithTableEntriesHandler();
+
+ try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
+ server.start();
+ Assert.assertTrue("Server failed to start", server.awaitStart(5, TimeUnit.SECONDS));
+
+ // window=1 for sync mode
+ try (QwpWebSocketSender sender = QwpWebSocketSender.connect(
+ "localhost", port, null, 0, 0, 0, 1, null)) {
+ sender.table("trades")
+ .longColumn("price", 100)
+ .atNow();
+ sender.flush();
+
+ Assert.assertEquals(10L, sender.getHighestAckedSeqTxn("trades"));
+ Assert.assertEquals(20L, sender.getHighestAckedSeqTxn("orders"));
+ Assert.assertEquals(-1L, sender.getHighestAckedSeqTxn("other"));
+ }
+ }
+ }
+
/**
* Creates a binary ACK response using WebSocketResponse format.
- * Format: status (1 byte) + sequence (8 bytes little-endian)
+ * Format: status (1) + sequence (8) + tableCount (2, zero entries)
*/
private static byte[] createAckResponse(long sequence) {
- byte[] response = new byte[WebSocketResponse.MIN_RESPONSE_SIZE];
+ byte[] response = new byte[WebSocketResponse.MIN_OK_RESPONSE_SIZE];
- // Status OK (0)
response[0] = WebSocketResponse.STATUS_OK;
- // Sequence (little-endian)
response[1] = (byte) (sequence & 0xFF);
response[2] = (byte) ((sequence >> 8) & 0xFF);
response[3] = (byte) ((sequence >> 16) & 0xFF);
@@ -215,9 +364,82 @@ private static byte[] createAckResponse(long sequence) {
response[7] = (byte) ((sequence >> 48) & 0xFF);
response[8] = (byte) ((sequence >> 56) & 0xFF);
+ // tableCount = 0
+ response[9] = 0;
+ response[10] = 0;
+
+ return response;
+ }
+
+ private static byte[] createAckResponseWithTables(long sequence, String[] tableNames, long[] seqTxns) {
+ byte[][] nameBytes = new byte[tableNames.length][];
+ int size = 1 + 8 + 2;
+ for (int i = 0; i < tableNames.length; i++) {
+ nameBytes[i] = tableNames[i].getBytes(StandardCharsets.UTF_8);
+ size += 2 + nameBytes[i].length + 8;
+ }
+
+ byte[] response = new byte[size];
+ int offset = 0;
+ response[offset++] = WebSocketResponse.STATUS_OK;
+ for (int i = 0; i < 8; i++) {
+ response[offset++] = (byte) ((sequence >> (i * 8)) & 0xFF);
+ }
+ response[offset++] = (byte) (tableNames.length & 0xFF);
+ response[offset++] = (byte) ((tableNames.length >> 8) & 0xFF);
+ for (int i = 0; i < tableNames.length; i++) {
+ response[offset++] = (byte) (nameBytes[i].length & 0xFF);
+ response[offset++] = (byte) ((nameBytes[i].length >> 8) & 0xFF);
+ System.arraycopy(nameBytes[i], 0, response, offset, nameBytes[i].length);
+ offset += nameBytes[i].length;
+ for (int j = 0; j < 8; j++) {
+ response[offset++] = (byte) ((seqTxns[i] >> (j * 8)) & 0xFF);
+ }
+ }
return response;
}
+ private static byte[] createDurableAckResponse(String[] tableNames, long[] seqTxns) {
+ byte[][] nameBytes = new byte[tableNames.length][];
+ int size = 1 + 2;
+ for (int i = 0; i < tableNames.length; i++) {
+ nameBytes[i] = tableNames[i].getBytes(StandardCharsets.UTF_8);
+ size += 2 + nameBytes[i].length + 8;
+ }
+
+ byte[] response = new byte[size];
+ int offset = 0;
+ response[offset++] = WebSocketResponse.STATUS_DURABLE_ACK;
+ response[offset++] = (byte) (tableNames.length & 0xFF);
+ response[offset++] = (byte) ((tableNames.length >> 8) & 0xFF);
+ for (int i = 0; i < tableNames.length; i++) {
+ response[offset++] = (byte) (nameBytes[i].length & 0xFF);
+ response[offset++] = (byte) ((nameBytes[i].length >> 8) & 0xFF);
+ System.arraycopy(nameBytes[i], 0, response, offset, nameBytes[i].length);
+ offset += nameBytes[i].length;
+ for (int j = 0; j < 8; j++) {
+ response[offset++] = (byte) ((seqTxns[i] >> (j * 8)) & 0xFF);
+ }
+ }
+ return response;
+ }
+
+ private static class AckWithTableEntriesHandler implements TestWebSocketServer.WebSocketServerHandler {
+ private final AtomicLong nextSequence = new AtomicLong(0);
+
+ @Override
+ public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) {
+ long sequence = nextSequence.getAndIncrement();
+ try {
+ client.sendBinary(createAckResponseWithTables(sequence,
+ new String[]{"trades", "orders"},
+ new long[]{10L, 20L}));
+ } catch (IOException e) {
+ LOG.error("Failed to send ACK with tables", e);
+ }
+ }
+ }
+
private static class ClosingServerHandler implements TestWebSocketServer.WebSocketServerHandler {
@Override
public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) {
@@ -259,6 +481,27 @@ public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] dat
}
}
+ private static class DurableAckThenStatusOkHandler implements TestWebSocketServer.WebSocketServerHandler {
+ private final AtomicLong nextSequence = new AtomicLong(0);
+
+ @Override
+ public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) {
+ long sequence = nextSequence.getAndIncrement();
+ try {
+ // Send durable ACK first
+ client.sendBinary(createDurableAckResponse(
+ new String[]{"trades"},
+ new long[]{42L}));
+ // Then send STATUS_OK with committed seqTxns
+ client.sendBinary(createAckResponseWithTables(sequence,
+ new String[]{"trades"},
+ new long[]{10L}));
+ } catch (IOException e) {
+ LOG.error("Failed to send ACK frames", e);
+ }
+ }
+ }
+
private static class InvalidAckPayloadHandler implements TestWebSocketServer.WebSocketServerHandler {
@Override
public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) {
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderStateTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderStateTest.java
index 23ed4774..01ef97d1 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderStateTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderStateTest.java
@@ -24,10 +24,17 @@
package io.questdb.client.test.cutlass.qwp.client;
+import io.questdb.client.DefaultHttpClientConfiguration;
+import io.questdb.client.cutlass.http.client.WebSocketClient;
+import io.questdb.client.cutlass.http.client.WebSocketFrameHandler;
import io.questdb.client.cutlass.line.LineSenderException;
import io.questdb.client.cutlass.qwp.client.InFlightWindow;
import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender;
+import io.questdb.client.cutlass.qwp.client.WebSocketResponse;
import io.questdb.client.cutlass.qwp.protocol.QwpTableBuffer;
+import io.questdb.client.network.PlainSocketFactory;
+import io.questdb.client.std.MemoryTag;
+import io.questdb.client.std.Unsafe;
import io.questdb.client.test.AbstractTest;
import org.junit.Assert;
import org.junit.Test;
@@ -35,6 +42,9 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
import static io.questdb.client.test.tools.TestUtils.assertMemoryLeak;
@@ -53,10 +63,9 @@ public class QwpWebSocketSenderStateTest extends AbstractTest {
@Test
public void testConnectionFailureIsSenderLevelTerminalState() throws Exception {
assertMemoryLeak(() -> {
- QwpWebSocketSender sender = QwpWebSocketSender.createForTesting(
+ try (QwpWebSocketSender sender = QwpWebSocketSender.createForTesting(
"localhost", 0, 10_000, 0, 0L, 8
- );
- try {
+ )) {
LineSenderException failure = new LineSenderException(
"Server error for batch 7: WRITE_ERROR - disk full"
);
@@ -80,12 +89,256 @@ public void testConnectionFailureIsSenderLevelTerminalState() throws Exception {
Assert.assertSame(failure, e);
assertStackContains(e, "flush");
}
+ }
+ });
+ }
+
+ @Test
+ public void testConnectWithDurableAckToClosedPort() throws Exception {
+ assertMemoryLeak(() -> {
+ try {
+ QwpWebSocketSender.connect(
+ "127.0.0.1", 1, null,
+ QwpWebSocketSender.DEFAULT_AUTO_FLUSH_ROWS,
+ QwpWebSocketSender.DEFAULT_AUTO_FLUSH_BYTES,
+ QwpWebSocketSender.DEFAULT_AUTO_FLUSH_INTERVAL_NANOS,
+ 1, null,
+ QwpWebSocketSender.DEFAULT_MAX_SCHEMAS_PER_CONNECTION,
+ true
+ ).close();
+ Assert.fail("Expected LineSenderException");
+ } catch (LineSenderException e) {
+ Assert.assertTrue(e.getMessage().contains("Failed to connect"));
+ }
+ });
+ }
+
+ @Test
+ public void testGetHighestDurableSeqTxnDefaultsToMinusOne() throws Exception {
+ assertMemoryLeak(() -> {
+ try (QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1)) {
+ Assert.assertEquals(-1L, sender.getHighestDurableSeqTxn("any_table"));
+ }
+ });
+ }
+
+ @Test
+ public void testGetHighestAckedSeqTxnDefaultsToMinusOne() throws Exception {
+ assertMemoryLeak(() -> {
+ try (QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1)) {
+ Assert.assertEquals(-1L, sender.getHighestAckedSeqTxn("any_table"));
+ }
+ });
+ }
+
+ @Test
+ public void testSetRequestDurableAckBeforeConnect() throws Exception {
+ assertMemoryLeak(() -> {
+ try (QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1)) {
+ // Must not throw before connection is established
+ sender.setRequestDurableAck(true);
+ sender.setRequestDurableAck(false);
+ }
+ });
+ }
+
+ @Test
+ public void testSetRequestDurableAckAfterConnectThrows() throws Exception {
+ assertMemoryLeak(() -> {
+ QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1);
+ try {
+ setField(sender, "connected", true);
+ try {
+ sender.setRequestDurableAck(true);
+ Assert.fail("Expected exception for setRequestDurableAck after connect");
+ } catch (LineSenderException e) {
+ Assert.assertTrue(e.getMessage().contains("before the first send"));
+ }
} finally {
+ setField(sender, "connected", false);
sender.close();
}
});
}
+ @Test
+ public void testSetRequestDurableAckOnClosedSenderThrows() throws Exception {
+ assertMemoryLeak(() -> {
+ QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1);
+ sender.close();
+ try {
+ sender.setRequestDurableAck(true);
+ Assert.fail("Expected exception for setRequestDurableAck on closed sender");
+ } catch (LineSenderException e) {
+ Assert.assertTrue(e.getMessage().contains("closed"));
+ }
+ });
+ }
+
+ @Test
+ public void testPingAfterCloseThrows() throws Exception {
+ assertMemoryLeak(() -> {
+ QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1);
+ sender.close();
+ try {
+ sender.ping();
+ Assert.fail("Expected exception");
+ } catch (LineSenderException e) {
+ Assert.assertTrue(e.getMessage().contains("closed"));
+ }
+ });
+ }
+
+ @Test
+ public void testSyncPingProcessesDurableAck() throws Exception {
+ assertMemoryLeak(() -> {
+ QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1);
+ PingTestClient client = new PingTestClient();
+ try {
+ client.frameSequence.add(handler -> emitBinaryResponse(handler, WebSocketResponse.durableAck("trades", 5)));
+ client.frameSequence.add(handler -> handler.onPong(0, 0));
+
+ setField(sender, "client", client);
+ setField(sender, "connected", true);
+ setField(sender, "inFlightWindow", new InFlightWindow(1, InFlightWindow.DEFAULT_TIMEOUT_MS));
+
+ sender.ping();
+
+ Assert.assertTrue(client.pingSent);
+ Assert.assertEquals(5L, sender.getHighestDurableSeqTxn("trades"));
+ } finally {
+ setField(sender, "client", null);
+ setField(sender, "connected", false);
+ sender.close();
+ client.close();
+ }
+ });
+ }
+
+ @Test
+ public void testSyncPingProcessesStatusOk() throws Exception {
+ assertMemoryLeak(() -> {
+ QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1);
+ PingTestClient client = new PingTestClient();
+ try {
+ client.frameSequence.add(handler -> emitBinaryResponse(handler, WebSocketResponse.success(3)));
+ client.frameSequence.add(handler -> handler.onPong(0, 0));
+
+ setField(sender, "client", client);
+ setField(sender, "connected", true);
+ InFlightWindow window = new InFlightWindow(8, InFlightWindow.DEFAULT_TIMEOUT_MS);
+ window.addInFlight(0);
+ window.addInFlight(1);
+ window.addInFlight(2);
+ window.addInFlight(3);
+ setField(sender, "inFlightWindow", window);
+
+ sender.ping();
+
+ Assert.assertTrue(client.pingSent);
+ Assert.assertEquals(0, window.getInFlightCount());
+ } finally {
+ setField(sender, "client", null);
+ setField(sender, "connected", false);
+ sender.close();
+ client.close();
+ }
+ });
+ }
+
+ @Test
+ public void testSyncPingSurfacesServerErrorFrame() throws Exception {
+ // Regression: syncPing used to branch only on isDurableAck() /
+ // isSuccess(). Any error frame (parse / schema / security / internal
+ // / write error) arriving between PING and PONG was parsed into
+ // ackResponse, neither branch fired, and the error was silently
+ // discarded. A caller using ping() to confirm "all my batches
+ // landed" would get a false affirmative; the error only surfaced
+ // on the next flush's waitForAck.
+ //
+ // Fix: capture the first error during the ping round and throw it
+ // after PONG so ping() callers see the failure directly. Also route
+ // through inFlightWindow.fail so subsequent waitForAck / flush
+ // calls re-observe it. Frames arriving between the error and PONG
+ // are still processed so durable/committed progress is preserved.
+ assertMemoryLeak(() -> {
+ // inFlightWindowSize=1 routes ping() through syncPing (the code under test).
+ // The injected inFlightWindow can still hold multiple batches.
+ QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1);
+ PingTestClient client = new PingTestClient();
+ try {
+ // Server sends an error frame for seq=2, a durable ack, then PONG.
+ client.frameSequence.add(handler -> emitBinaryResponse(
+ handler,
+ WebSocketResponse.error(2L, WebSocketResponse.STATUS_SCHEMA_MISMATCH, "column type mismatch")
+ ));
+ client.frameSequence.add(handler -> emitBinaryResponse(handler, WebSocketResponse.durableAck("trades", 9)));
+ client.frameSequence.add(handler -> handler.onPong(0, 0));
+
+ setField(sender, "client", client);
+ setField(sender, "connected", true);
+ InFlightWindow window = new InFlightWindow(8, InFlightWindow.DEFAULT_TIMEOUT_MS);
+ window.addInFlight(0);
+ window.addInFlight(1);
+ window.addInFlight(2);
+ setField(sender, "inFlightWindow", window);
+
+ try {
+ sender.ping();
+ Assert.fail("syncPing must throw on server error frame");
+ } catch (LineSenderException expected) {
+ Assert.assertTrue(
+ "error message must be propagated from the server frame",
+ expected.getMessage() != null && expected.getMessage().contains("column type mismatch")
+ );
+ }
+
+ Assert.assertTrue(client.pingSent);
+ // Durable progress observed before the throw must be preserved.
+ Assert.assertEquals(9L, sender.getHighestDurableSeqTxn("trades"));
+ // Error is also recorded on the window so the next waitForAck / flush sees it.
+ Throwable err = window.getLastError();
+ Assert.assertNotNull(
+ "syncPing must also record the error on the inFlightWindow",
+ err
+ );
+ Assert.assertTrue(err instanceof LineSenderException);
+ Assert.assertTrue(
+ err.getMessage() != null && err.getMessage().contains("column type mismatch")
+ );
+ } finally {
+ setField(sender, "client", null);
+ setField(sender, "connected", false);
+ sender.close();
+ client.close();
+ }
+ });
+ }
+
+ @Test
+ public void testSyncPingReturnsOnPong() throws Exception {
+ assertMemoryLeak(() -> {
+ QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1);
+ PingTestClient client = new PingTestClient();
+ try {
+ client.frameSequence.add(handler -> handler.onPong(0, 0));
+
+ setField(sender, "client", client);
+ setField(sender, "connected", true);
+ setField(sender, "inFlightWindow", new InFlightWindow(1, InFlightWindow.DEFAULT_TIMEOUT_MS));
+
+ sender.ping();
+
+ Assert.assertTrue(client.pingSent);
+ } finally {
+ setField(sender, "client", null);
+ setField(sender, "connected", false);
+ sender.close();
+ client.close();
+ }
+ });
+ }
+
@Test
public void testAutoFlushAccumulatesRowsAcrossAllTables() throws Exception {
assertMemoryLeak(() -> {
@@ -377,4 +630,56 @@ private static void setField(Object target, String fieldName, Object value) thro
f.setAccessible(true);
f.set(target, value);
}
+
+ private static void emitBinaryResponse(WebSocketFrameHandler handler, WebSocketResponse response) {
+ int size = response.serializedSize();
+ long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT);
+ try {
+ response.writeTo(ptr);
+ handler.onBinaryMessage(ptr, size);
+ } finally {
+ Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT);
+ }
+ }
+
+ private static class PingTestClient extends WebSocketClient {
+ final List> frameSequence = new ArrayList<>();
+ boolean pingSent = false;
+ private int nextFrame = 0;
+
+ PingTestClient() {
+ super(DefaultHttpClientConfiguration.INSTANCE, PlainSocketFactory.INSTANCE);
+ }
+
+ @Override
+ public boolean isConnected() {
+ return true;
+ }
+
+ @Override
+ public boolean receiveFrame(WebSocketFrameHandler handler, int timeout) {
+ if (nextFrame < frameSequence.size()) {
+ frameSequence.get(nextFrame++).accept(handler);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void sendBinary(long dataPtr, int length) {
+ }
+
+ @Override
+ public void sendPing(int timeout) {
+ pingSent = true;
+ }
+
+ @Override
+ protected void ioWait(int timeout, int op) {
+ }
+
+ @Override
+ protected void setupIoWait() {
+ }
+ }
}
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketResponseTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketResponseTest.java
new file mode 100644
index 00000000..b70bdc4f
--- /dev/null
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketResponseTest.java
@@ -0,0 +1,434 @@
+/*+*****************************************************************************
+ * ___ _ ____ ____
+ * / _ \ _ _ ___ ___| |_| _ \| __ )
+ * | | | | | | |/ _ \/ __| __| | | | _ \
+ * | |_| | |_| | __/\__ \ |_| |_| | |_) |
+ * \__\_\\__,_|\___||___/\__|____/|____/
+ *
+ * Copyright (c) 2014-2019 Appsicle
+ * Copyright (c) 2019-2026 QuestDB
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ ******************************************************************************/
+
+package io.questdb.client.test.cutlass.qwp.client;
+
+import io.questdb.client.cutlass.qwp.client.WebSocketResponse;
+import io.questdb.client.std.MemoryTag;
+import io.questdb.client.std.Unsafe;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+import static io.questdb.client.test.tools.TestUtils.assertMemoryLeak;
+
+public class WebSocketResponseTest {
+
+ @Test
+ public void testDurableAckFactory() throws Exception {
+ assertMemoryLeak(() -> {
+ WebSocketResponse response = WebSocketResponse.durableAck("trades", 42L);
+ Assert.assertTrue(response.isDurableAck());
+ Assert.assertFalse(response.isSuccess());
+ Assert.assertEquals(1, response.getTableEntryCount());
+ Assert.assertEquals("trades", response.getTableName(0));
+ Assert.assertEquals(42L, response.getTableSeqTxn(0));
+ Assert.assertEquals(WebSocketResponse.STATUS_DURABLE_ACK, response.getStatus());
+ Assert.assertEquals("DURABLE_ACK", response.getStatusName());
+ Assert.assertNull(response.getErrorMessage());
+ });
+ }
+
+ @Test
+ public void testDurableAckIsStructurallyValid() throws Exception {
+ assertMemoryLeak(() -> {
+ WebSocketResponse response = WebSocketResponse.durableAck("t", 7L);
+ int size = response.serializedSize();
+
+ long ptr = Unsafe.malloc(size + 1, MemoryTag.NATIVE_DEFAULT);
+ try {
+ response.writeTo(ptr);
+ Assert.assertTrue(WebSocketResponse.isStructurallyValid(ptr, size));
+ Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, size + 1));
+ Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, size - 1));
+ } finally {
+ Unsafe.free(ptr, size + 1, MemoryTag.NATIVE_DEFAULT);
+ }
+ });
+ }
+
+ @Test
+ public void testDurableAckRoundTripThroughNativeMemory() throws Exception {
+ assertMemoryLeak(() -> {
+ WebSocketResponse original = WebSocketResponse.durableAck("orders", 12345L);
+ int size = original.serializedSize();
+ long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT);
+ try {
+ original.writeTo(ptr);
+
+ WebSocketResponse parsed = new WebSocketResponse();
+ Assert.assertTrue(parsed.readFrom(ptr, size));
+ Assert.assertTrue(parsed.isDurableAck());
+ Assert.assertFalse(parsed.isSuccess());
+ Assert.assertEquals(1, parsed.getTableEntryCount());
+ Assert.assertEquals("orders", parsed.getTableName(0));
+ Assert.assertEquals(12345L, parsed.getTableSeqTxn(0));
+ Assert.assertNull(parsed.getErrorMessage());
+ } finally {
+ Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT);
+ }
+ });
+ }
+
+ @Test
+ public void testDurableAckDoesNotCarryErrorMessage() throws Exception {
+ assertMemoryLeak(() -> {
+ WebSocketResponse response = WebSocketResponse.durableAck("t", 99L);
+ int size = response.serializedSize();
+ long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT);
+ try {
+ response.writeTo(ptr);
+
+ WebSocketResponse parsed = new WebSocketResponse();
+ Assert.assertTrue(parsed.readFrom(ptr, size));
+ Assert.assertNull(parsed.getErrorMessage());
+ } finally {
+ Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT);
+ }
+ });
+ }
+
+ @Test
+ public void testSuccessIsNotDurableAck() throws Exception {
+ assertMemoryLeak(() -> {
+ WebSocketResponse response = WebSocketResponse.success(10L);
+ Assert.assertTrue(response.isSuccess());
+ Assert.assertFalse(response.isDurableAck());
+ Assert.assertEquals("OK", response.getStatusName());
+ });
+ }
+
+ @Test
+ public void testErrorIsNotDurableAck() throws Exception {
+ assertMemoryLeak(() -> {
+ WebSocketResponse response = WebSocketResponse.error(
+ 5L, WebSocketResponse.STATUS_PARSE_ERROR, "bad input");
+ Assert.assertFalse(response.isDurableAck());
+ Assert.assertFalse(response.isSuccess());
+ Assert.assertEquals("PARSE_ERROR", response.getStatusName());
+ });
+ }
+
+ @Test
+ public void testSuccessRoundTripUnchanged() throws Exception {
+ assertMemoryLeak(() -> {
+ WebSocketResponse original = WebSocketResponse.success(77L);
+ int size = original.serializedSize();
+ long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT);
+ try {
+ original.writeTo(ptr);
+
+ WebSocketResponse parsed = new WebSocketResponse();
+ Assert.assertTrue(parsed.readFrom(ptr, size));
+ Assert.assertTrue(parsed.isSuccess());
+ Assert.assertFalse(parsed.isDurableAck());
+ Assert.assertEquals(77L, parsed.getSequence());
+ } finally {
+ Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT);
+ }
+ });
+ }
+
+ @Test
+ public void testSuccessWithTableEntriesRoundTrip() throws Exception {
+ assertMemoryLeak(() -> {
+ // Build a STATUS_OK frame with 2 table entries directly in native memory
+ // Format: status(1) + sequence(8) + tableCount(2) + entries
+ byte[] name1 = "trades".getBytes(java.nio.charset.StandardCharsets.UTF_8);
+ byte[] name2 = "orders".getBytes(java.nio.charset.StandardCharsets.UTF_8);
+ int size = 1 + 8 + 2 + (2 + name1.length + 8) + (2 + name2.length + 8);
+ long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT);
+ try {
+ int offset = 0;
+ Unsafe.getUnsafe().putByte(ptr + offset, WebSocketResponse.STATUS_OK);
+ offset += 1;
+ Unsafe.getUnsafe().putLong(ptr + offset, 42L);
+ offset += 8;
+ Unsafe.getUnsafe().putShort(ptr + offset, (short) 2);
+ offset += 2;
+ // entry 1: trades, seqTxn=10
+ Unsafe.getUnsafe().putShort(ptr + offset, (short) name1.length);
+ offset += 2;
+ for (int i = 0; i < name1.length; i++) {
+ Unsafe.getUnsafe().putByte(ptr + offset + i, name1[i]);
+ }
+ offset += name1.length;
+ Unsafe.getUnsafe().putLong(ptr + offset, 10L);
+ offset += 8;
+ // entry 2: orders, seqTxn=20
+ Unsafe.getUnsafe().putShort(ptr + offset, (short) name2.length);
+ offset += 2;
+ for (int i = 0; i < name2.length; i++) {
+ Unsafe.getUnsafe().putByte(ptr + offset + i, name2[i]);
+ }
+ offset += name2.length;
+ Unsafe.getUnsafe().putLong(ptr + offset, 20L);
+
+ Assert.assertTrue(WebSocketResponse.isStructurallyValid(ptr, size));
+
+ WebSocketResponse parsed = new WebSocketResponse();
+ Assert.assertTrue(parsed.readFrom(ptr, size));
+ Assert.assertTrue(parsed.isSuccess());
+ Assert.assertFalse(parsed.isDurableAck());
+ Assert.assertEquals(42L, parsed.getSequence());
+ Assert.assertEquals(2, parsed.getTableEntryCount());
+ Assert.assertEquals("trades", parsed.getTableName(0));
+ Assert.assertEquals(10L, parsed.getTableSeqTxn(0));
+ Assert.assertEquals("orders", parsed.getTableName(1));
+ Assert.assertEquals(20L, parsed.getTableSeqTxn(1));
+ Assert.assertNull(parsed.getErrorMessage());
+ } finally {
+ Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT);
+ }
+ });
+ }
+
+ @Test
+ public void testEmptyTableNameRejected() throws Exception {
+ assertMemoryLeak(() -> {
+ // STATUS_OK with nameLen=0 - a well-behaved server never emits
+ // an empty table name; the frame must be rejected as structurally
+ // invalid so it cannot corrupt the per-table tracker.
+ int size = 1 + 8 + 2 + (2 + 8);
+ long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT);
+ try {
+ int offset = 0;
+ Unsafe.getUnsafe().putByte(ptr + offset, WebSocketResponse.STATUS_OK);
+ offset += 1;
+ Unsafe.getUnsafe().putLong(ptr + offset, 1L);
+ offset += 8;
+ Unsafe.getUnsafe().putShort(ptr + offset, (short) 1);
+ offset += 2;
+ Unsafe.getUnsafe().putShort(ptr + offset, (short) 0);
+ offset += 2;
+ Unsafe.getUnsafe().putLong(ptr + offset, 42L);
+
+ Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, size));
+
+ WebSocketResponse parsed = new WebSocketResponse();
+ Assert.assertFalse(parsed.readFrom(ptr, size));
+ } finally {
+ Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT);
+ }
+ });
+ }
+
+ @Test
+ public void testErrorRoundTrip() throws Exception {
+ assertMemoryLeak(() -> {
+ WebSocketResponse original = WebSocketResponse.error(
+ 3L, WebSocketResponse.STATUS_INTERNAL_ERROR, "oops");
+ int size = original.serializedSize();
+ long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT);
+ try {
+ original.writeTo(ptr);
+
+ WebSocketResponse parsed = new WebSocketResponse();
+ Assert.assertTrue(parsed.readFrom(ptr, size));
+ Assert.assertFalse(parsed.isSuccess());
+ Assert.assertFalse(parsed.isDurableAck());
+ Assert.assertEquals(3L, parsed.getSequence());
+ Assert.assertEquals("oops", parsed.getErrorMessage());
+ } finally {
+ Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT);
+ }
+ });
+ }
+
+ @Test
+ public void testLargeTableCountWithInsufficientPayload() throws Exception {
+ assertMemoryLeak(() -> {
+ int size = WebSocketResponse.MIN_OK_RESPONSE_SIZE;
+ long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT);
+ try {
+ Unsafe.getUnsafe().putByte(ptr, WebSocketResponse.STATUS_OK);
+ Unsafe.getUnsafe().putLong(ptr + 1, 1L);
+ Unsafe.getUnsafe().putShort(ptr + 9, (short) 1000);
+
+ Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, size));
+
+ WebSocketResponse parsed = new WebSocketResponse();
+ Assert.assertFalse(parsed.readFrom(ptr, size));
+ } finally {
+ Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT);
+ }
+ });
+ }
+
+ @Test
+ public void testTrailingGarbageBytesStatusOk() throws Exception {
+ assertMemoryLeak(() -> {
+ WebSocketResponse original = WebSocketResponse.success(1L);
+ int validSize = original.serializedSize();
+ int totalSize = validSize + 5;
+ long ptr = Unsafe.malloc(totalSize, MemoryTag.NATIVE_DEFAULT);
+ try {
+ original.writeTo(ptr);
+ for (int i = 0; i < 5; i++) {
+ Unsafe.getUnsafe().putByte(ptr + validSize + i, (byte) 0xFF);
+ }
+
+ Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, totalSize));
+ } finally {
+ Unsafe.free(ptr, totalSize, MemoryTag.NATIVE_DEFAULT);
+ }
+ });
+ }
+
+ @Test
+ public void testTrailingGarbageBytesWithTableEntries() throws Exception {
+ assertMemoryLeak(() -> {
+ byte[] name = "trades".getBytes(StandardCharsets.UTF_8);
+ int validSize = 1 + 8 + 2 + (2 + name.length + 8);
+ int totalSize = validSize + 3;
+ long ptr = Unsafe.malloc(totalSize, MemoryTag.NATIVE_DEFAULT);
+ try {
+ int offset = 0;
+ Unsafe.getUnsafe().putByte(ptr + offset, WebSocketResponse.STATUS_OK);
+ offset += 1;
+ Unsafe.getUnsafe().putLong(ptr + offset, 5L);
+ offset += 8;
+ Unsafe.getUnsafe().putShort(ptr + offset, (short) 1);
+ offset += 2;
+ Unsafe.getUnsafe().putShort(ptr + offset, (short) name.length);
+ offset += 2;
+ for (int i = 0; i < name.length; i++) {
+ Unsafe.getUnsafe().putByte(ptr + offset + i, name[i]);
+ }
+ offset += name.length;
+ Unsafe.getUnsafe().putLong(ptr + offset, 10L);
+
+ Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, totalSize));
+
+ WebSocketResponse parsed = new WebSocketResponse();
+ Assert.assertFalse(parsed.readFrom(ptr, totalSize));
+ } finally {
+ Unsafe.free(ptr, totalSize, MemoryTag.NATIVE_DEFAULT);
+ }
+ });
+ }
+
+ @Test
+ public void testTruncatedTableEntriesDurableAck() throws Exception {
+ assertMemoryLeak(() -> {
+ byte[] name1 = "t1".getBytes(StandardCharsets.UTF_8);
+ // STATUS_DURABLE_ACK with tableCount=2 but only 1 entry
+ int size = 1 + 2 + (2 + name1.length + 8);
+ long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT);
+ try {
+ int offset = 0;
+ Unsafe.getUnsafe().putByte(ptr + offset, WebSocketResponse.STATUS_DURABLE_ACK);
+ offset += 1;
+ Unsafe.getUnsafe().putShort(ptr + offset, (short) 2);
+ offset += 2;
+ Unsafe.getUnsafe().putShort(ptr + offset, (short) name1.length);
+ offset += 2;
+ for (int i = 0; i < name1.length; i++) {
+ Unsafe.getUnsafe().putByte(ptr + offset + i, name1[i]);
+ }
+ offset += name1.length;
+ Unsafe.getUnsafe().putLong(ptr + offset, 10L);
+
+ Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, size));
+
+ WebSocketResponse parsed = new WebSocketResponse();
+ Assert.assertFalse(parsed.readFrom(ptr, size));
+ } finally {
+ Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT);
+ }
+ });
+ }
+
+ @Test
+ public void testTruncatedTableEntriesStatusOk() throws Exception {
+ assertMemoryLeak(() -> {
+ byte[] name1 = "trades".getBytes(StandardCharsets.UTF_8);
+ // STATUS_OK with tableCount=2 but only 1 entry
+ int size = 1 + 8 + 2 + (2 + name1.length + 8);
+ long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT);
+ try {
+ int offset = 0;
+ Unsafe.getUnsafe().putByte(ptr + offset, WebSocketResponse.STATUS_OK);
+ offset += 1;
+ Unsafe.getUnsafe().putLong(ptr + offset, 1L);
+ offset += 8;
+ Unsafe.getUnsafe().putShort(ptr + offset, (short) 2);
+ offset += 2;
+ Unsafe.getUnsafe().putShort(ptr + offset, (short) name1.length);
+ offset += 2;
+ for (int i = 0; i < name1.length; i++) {
+ Unsafe.getUnsafe().putByte(ptr + offset + i, name1[i]);
+ }
+ offset += name1.length;
+ Unsafe.getUnsafe().putLong(ptr + offset, 10L);
+
+ Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, size));
+
+ WebSocketResponse parsed = new WebSocketResponse();
+ Assert.assertFalse(parsed.readFrom(ptr, size));
+ } finally {
+ Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT);
+ }
+ });
+ }
+
+ @Test
+ public void testUnknownStatusByte() throws Exception {
+ assertMemoryLeak(() -> {
+ int size = WebSocketResponse.MIN_ERROR_RESPONSE_SIZE;
+ long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT);
+ try {
+ Unsafe.getUnsafe().putByte(ptr, (byte) 0xFF);
+ Unsafe.getUnsafe().putLong(ptr + 1, 1L);
+ Unsafe.getUnsafe().putShort(ptr + 9, (short) 0);
+
+ // Unknown status falls through to error validation path
+ Assert.assertTrue(WebSocketResponse.isStructurallyValid(ptr, size));
+
+ WebSocketResponse parsed = new WebSocketResponse();
+ Assert.assertTrue(parsed.readFrom(ptr, size));
+ Assert.assertEquals((byte) 0xFF, parsed.getStatus());
+ Assert.assertEquals("UNKNOWN(255)", parsed.getStatusName());
+ } finally {
+ Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT);
+ }
+ });
+ }
+
+ @Test
+ public void testZeroLengthPayload() throws Exception {
+ assertMemoryLeak(() -> {
+ long ptr = Unsafe.malloc(1, MemoryTag.NATIVE_DEFAULT);
+ try {
+ Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, 0));
+
+ WebSocketResponse parsed = new WebSocketResponse();
+ Assert.assertFalse(parsed.readFrom(ptr, 0));
+ } finally {
+ Unsafe.free(ptr, 1, MemoryTag.NATIVE_DEFAULT);
+ }
+ });
+ }
+}
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketSendQueueTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketSendQueueTest.java
index 19b4753f..9d3e98e9 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketSendQueueTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketSendQueueTest.java
@@ -36,9 +36,9 @@
import io.questdb.client.std.MemoryTag;
import io.questdb.client.std.Os;
import io.questdb.client.std.Unsafe;
-import static io.questdb.client.test.tools.TestUtils.assertMemoryLeak;
import org.junit.Test;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -46,20 +46,17 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import static io.questdb.client.test.tools.TestUtils.assertMemoryLeak;
import static org.junit.Assert.*;
public class WebSocketSendQueueTest {
-
+
@Test
public void testEnqueueTimeoutWhenPendingSlotOccupied() throws Exception {
assertMemoryLeak(() -> {
InFlightWindow window = new InFlightWindow(1, 1_000);
- FakeWebSocketClient client = new FakeWebSocketClient();
- MicrobatchBuffer batch0 = sealedBuffer((byte) 1);
- MicrobatchBuffer batch1 = sealedBuffer((byte) 2);
WebSocketSendQueue queue = null;
-
- try {
+ try (FakeWebSocketClient client = new FakeWebSocketClient(); MicrobatchBuffer batch0 = sealedBuffer((byte) 1); MicrobatchBuffer batch1 = sealedBuffer((byte) 2)) {
// Keep window full so I/O thread cannot drain pending slot.
window.addInFlight(0);
queue = new WebSocketSendQueue(client, window, 100, 500);
@@ -74,9 +71,6 @@ public void testEnqueueTimeoutWhenPendingSlotOccupied() throws Exception {
} finally {
window.acknowledgeUpTo(Long.MAX_VALUE);
closeQuietly(queue);
- batch0.close();
- batch1.close();
- client.close();
}
});
}
@@ -85,12 +79,8 @@ public void testEnqueueTimeoutWhenPendingSlotOccupied() throws Exception {
public void testEnqueueWaitsUntilSlotAvailable() throws Exception {
assertMemoryLeak(() -> {
InFlightWindow window = new InFlightWindow(1, 1_000);
- FakeWebSocketClient client = new FakeWebSocketClient();
- MicrobatchBuffer batch0 = sealedBuffer((byte) 1);
- MicrobatchBuffer batch1 = sealedBuffer((byte) 2);
WebSocketSendQueue queue = null;
-
- try {
+ try (FakeWebSocketClient client = new FakeWebSocketClient(); MicrobatchBuffer batch0 = sealedBuffer((byte) 1); MicrobatchBuffer batch1 = sealedBuffer((byte) 2)) {
window.addInFlight(0);
queue = new WebSocketSendQueue(client, window, 2_000, 500);
final WebSocketSendQueue finalQueue = queue;
@@ -124,9 +114,6 @@ public void testEnqueueWaitsUntilSlotAvailable() throws Exception {
} finally {
window.acknowledgeUpTo(Long.MAX_VALUE);
closeQuietly(queue);
- batch0.close();
- batch1.close();
- client.close();
}
});
}
@@ -135,12 +122,10 @@ public void testEnqueueWaitsUntilSlotAvailable() throws Exception {
public void testFlushFailsOnInvalidAckPayload() throws Exception {
assertMemoryLeak(() -> {
InFlightWindow window = new InFlightWindow(8, 5_000);
- FakeWebSocketClient client = new FakeWebSocketClient();
WebSocketSendQueue queue = null;
- CountDownLatch payloadDelivered = new CountDownLatch(1);
- AtomicBoolean fired = new AtomicBoolean(false);
-
- try {
+ try (FakeWebSocketClient client = new FakeWebSocketClient()) {
+ CountDownLatch payloadDelivered = new CountDownLatch(1);
+ AtomicBoolean fired = new AtomicBoolean(false);
window.addInFlight(0);
client.setTryReceiveBehavior(handler -> {
if (fired.compareAndSet(false, true)) {
@@ -162,7 +147,6 @@ public void testFlushFailsOnInvalidAckPayload() throws Exception {
}
} finally {
closeQuietly(queue);
- client.close();
}
});
}
@@ -171,11 +155,9 @@ public void testFlushFailsOnInvalidAckPayload() throws Exception {
public void testFlushFailsOnReceiveIoError() throws Exception {
assertMemoryLeak(() -> {
InFlightWindow window = new InFlightWindow(8, 5_000);
- FakeWebSocketClient client = new FakeWebSocketClient();
WebSocketSendQueue queue = null;
- CountDownLatch receiveAttempted = new CountDownLatch(1);
-
- try {
+ try (FakeWebSocketClient client = new FakeWebSocketClient()) {
+ CountDownLatch receiveAttempted = new CountDownLatch(1);
window.addInFlight(0);
client.setTryReceiveBehavior(handler -> {
receiveAttempted.countDown();
@@ -198,7 +180,6 @@ public void testFlushFailsOnReceiveIoError() throws Exception {
}
} finally {
closeQuietly(queue);
- client.close();
}
});
}
@@ -206,11 +187,8 @@ public void testFlushFailsOnReceiveIoError() throws Exception {
@Test
public void testFlushFailsOnSendIoError() throws Exception {
assertMemoryLeak(() -> {
- FakeWebSocketClient client = new FakeWebSocketClient();
- MicrobatchBuffer batch = sealedBuffer((byte) 42);
WebSocketSendQueue queue = null;
-
- try {
+ try (FakeWebSocketClient client = new FakeWebSocketClient(); MicrobatchBuffer batch = sealedBuffer((byte) 42)) {
client.setSendBehavior((dataPtr, length) -> {
throw new RuntimeException("send-fail");
});
@@ -228,8 +206,6 @@ public void testFlushFailsOnSendIoError() throws Exception {
}
} finally {
closeQuietly(queue);
- batch.close();
- client.close();
}
});
}
@@ -238,12 +214,10 @@ public void testFlushFailsOnSendIoError() throws Exception {
public void testFlushFailsWhenServerClosesConnection() throws Exception {
assertMemoryLeak(() -> {
InFlightWindow window = new InFlightWindow(8, 5_000);
- FakeWebSocketClient client = new FakeWebSocketClient();
WebSocketSendQueue queue = null;
- CountDownLatch closeDelivered = new CountDownLatch(1);
- AtomicBoolean fired = new AtomicBoolean(false);
-
- try {
+ try (FakeWebSocketClient client = new FakeWebSocketClient()) {
+ CountDownLatch closeDelivered = new CountDownLatch(1);
+ AtomicBoolean fired = new AtomicBoolean(false);
window.addInFlight(0);
client.setTryReceiveBehavior(handler -> {
if (fired.compareAndSet(false, true)) {
@@ -265,7 +239,6 @@ public void testFlushFailsWhenServerClosesConnection() throws Exception {
}
} finally {
closeQuietly(queue);
- client.close();
}
});
}
@@ -288,7 +261,7 @@ public void testEnqueueAfterServerErrorAckSurfacesServerError() throws Exception
client.setTryReceiveBehavior(handler -> {
long sent = highestSent.get();
if (sent >= 0 && fired.compareAndSet(false, true)) {
- emitError(handler, sent, WebSocketResponse.STATUS_WRITE_ERROR, "disk full");
+ emitError(handler, sent);
errorDelivered.countDown();
return true;
}
@@ -397,11 +370,406 @@ public void testAwaitPendingAcksKeepsDrainNonBlocking() throws Exception {
});
}
+ @Test
+ public void testStatusOkWithTableEntriesUpdatesCommittedSeqTxn() throws Exception {
+ assertMemoryLeak(() -> {
+ InFlightWindow window = new InFlightWindow(8, 5_000);
+ WebSocketSendQueue queue = null;
+ try (FakeWebSocketClient client = new FakeWebSocketClient()) {
+ CountDownLatch ackDelivered = new CountDownLatch(1);
+ AtomicBoolean fired = new AtomicBoolean(false);
+ window.addInFlight(0);
+ client.setTryReceiveBehavior(handler -> {
+ if (fired.compareAndSet(false, true)) {
+ emitAckWithTables(handler,
+ new String[]{"trades", "orders"},
+ new long[]{10L, 20L});
+ ackDelivered.countDown();
+ return true;
+ }
+ return false;
+ });
+
+ queue = new WebSocketSendQueue(client, window, 1_000, 500);
+ assertTrue("Expected ACK callback",
+ ackDelivered.await(2, TimeUnit.SECONDS));
+
+ long deadline = System.currentTimeMillis() + 2_000;
+ while (queue.getCommittedSeqTxn("trades") < 0
+ && System.currentTimeMillis() < deadline) {
+ Os.sleep(5);
+ }
+
+ assertEquals(10L, queue.getCommittedSeqTxn("trades"));
+ assertEquals(20L, queue.getCommittedSeqTxn("orders"));
+ assertEquals(-1L, queue.getCommittedSeqTxn("other"));
+ assertEquals(0, window.getInFlightCount());
+ } finally {
+ window.acknowledgeUpTo(Long.MAX_VALUE);
+ closeQuietly(queue);
+ }
+ });
+ }
+
+ @Test
+ public void testDurableAckUpdatesPerTableSeqTxn() throws Exception {
+ assertMemoryLeak(() -> {
+ InFlightWindow window = new InFlightWindow(8, 5_000);
+ WebSocketSendQueue queue = null;
+ try (FakeWebSocketClient client = new FakeWebSocketClient()) {
+ CountDownLatch durableDelivered = new CountDownLatch(1);
+ AtomicBoolean fired = new AtomicBoolean(false);
+ window.addInFlight(0);
+ client.setTryReceiveBehavior(handler -> {
+ if (fired.compareAndSet(false, true)) {
+ emitDurableAck(handler, "trades", 10);
+ durableDelivered.countDown();
+ return true;
+ }
+ return false;
+ });
+
+ queue = new WebSocketSendQueue(client, window, 1_000, 500);
+ assertTrue("Expected durable ACK callback",
+ durableDelivered.await(2, TimeUnit.SECONDS));
+
+ long deadline = System.currentTimeMillis() + 2_000;
+ while (queue.getDurableSeqTxn("trades") < 0 && System.currentTimeMillis() < deadline) {
+ Os.sleep(5);
+ }
+
+ assertEquals(10, queue.getDurableSeqTxn("trades"));
+ assertEquals(-1, queue.getDurableSeqTxn("other"));
+ assertEquals(1, window.getInFlightCount());
+ } finally {
+ window.acknowledgeUpTo(Long.MAX_VALUE);
+ closeQuietly(queue);
+ }
+ });
+ }
+
+ @Test
+ public void testDurableAckIsMonotonic() throws Exception {
+ assertMemoryLeak(() -> {
+ InFlightWindow window = new InFlightWindow(8, 5_000);
+ WebSocketSendQueue queue = null;
+ try (FakeWebSocketClient client = new FakeWebSocketClient()) {
+ AtomicInteger callCount = new AtomicInteger();
+ CountDownLatch allDelivered = new CountDownLatch(1);
+ window.addInFlight(0);
+ window.addInFlight(1);
+ window.addInFlight(2);
+
+ client.setTryReceiveBehavior(handler -> {
+ int n = callCount.getAndIncrement();
+ switch (n) {
+ case 0:
+ emitDurableAck(handler, "t", 20);
+ return true;
+ case 1:
+ emitDurableAck(handler, "t", 10);
+ allDelivered.countDown();
+ return true;
+ default:
+ return false;
+ }
+ });
+
+ queue = new WebSocketSendQueue(client, window, 1_000, 500);
+ assertTrue(allDelivered.await(2, TimeUnit.SECONDS));
+
+ long deadline = System.currentTimeMillis() + 2_000;
+ while (queue.getDurableSeqTxn("t") < 20 && System.currentTimeMillis() < deadline) {
+ Os.sleep(5);
+ }
+
+ assertEquals(20, queue.getDurableSeqTxn("t"));
+ } finally {
+ window.acknowledgeUpTo(Long.MAX_VALUE);
+ closeQuietly(queue);
+ }
+ });
+ }
+
+ @Test
+ public void testDurableAckInterleavedWithStatusOk() throws Exception {
+ assertMemoryLeak(() -> {
+ InFlightWindow window = new InFlightWindow(8, 5_000);
+ WebSocketSendQueue queue = null;
+ try (FakeWebSocketClient client = new FakeWebSocketClient()) {
+ AtomicInteger callCount = new AtomicInteger();
+ CountDownLatch allDelivered = new CountDownLatch(1);
+ window.addInFlight(0);
+ window.addInFlight(1);
+
+ client.setTryReceiveBehavior(handler -> {
+ int n = callCount.getAndIncrement();
+ switch (n) {
+ case 0:
+ emitAck(handler, 0);
+ return true;
+ case 1:
+ emitDurableAck(handler, "t", 10);
+ return true;
+ case 2:
+ emitAck(handler, 1);
+ return true;
+ case 3:
+ emitDurableAck(handler, "t", 20);
+ allDelivered.countDown();
+ return true;
+ default:
+ return false;
+ }
+ });
+
+ queue = new WebSocketSendQueue(client, window, 1_000, 500);
+ assertTrue(allDelivered.await(2, TimeUnit.SECONDS));
+
+ long deadline = System.currentTimeMillis() + 2_000;
+ while ((queue.getDurableSeqTxn("t") < 20 || window.getInFlightCount() > 0)
+ && System.currentTimeMillis() < deadline) {
+ Os.sleep(5);
+ }
+
+ assertEquals(20, queue.getDurableSeqTxn("t"));
+ assertEquals(0, window.getInFlightCount());
+ } finally {
+ window.acknowledgeUpTo(Long.MAX_VALUE);
+ closeQuietly(queue);
+ }
+ });
+ }
+
+ @Test
+ public void testPingBlocksUntilPong() throws Exception {
+ assertMemoryLeak(() -> {
+ InFlightWindow window = new InFlightWindow(8, 5_000);
+ WebSocketSendQueue queue = null;
+ try (FakeWebSocketClient client = new FakeWebSocketClient()) {
+ AtomicInteger callCount = new AtomicInteger();
+ client.setTryReceiveBehavior(handler -> {
+ int n = callCount.getAndIncrement();
+ switch (n) {
+ case 0:
+ emitDurableAck(handler, "t", 7);
+ return true;
+ case 1:
+ handler.onPong(0, 0);
+ return true;
+ default:
+ return false;
+ }
+ });
+
+ queue = new WebSocketSendQueue(client, window, 1_000, 500);
+
+ queue.ping();
+
+ // After ping() returns, durable ACK must already be processed
+ assertEquals(7, queue.getDurableSeqTxn("t"));
+ } finally {
+ closeQuietly(queue);
+ }
+ });
+ }
+
+ @Test
+ public void testPingWithInFlightBatches() throws Exception {
+ assertMemoryLeak(() -> {
+ InFlightWindow window = new InFlightWindow(8, 5_000);
+ WebSocketSendQueue queue = null;
+ try (FakeWebSocketClient client = new FakeWebSocketClient()) {
+ window.addInFlight(0);
+ window.addInFlight(1);
+
+ AtomicBoolean pingSent = new AtomicBoolean(false);
+ client.setPingSendBehavior(() -> pingSent.set(true));
+
+ AtomicInteger callCount = new AtomicInteger();
+ client.setTryReceiveBehavior(handler -> {
+ int n = callCount.get();
+ switch (n) {
+ case 0:
+ emitAck(handler, 1);
+ callCount.incrementAndGet();
+ return true;
+ case 1:
+ emitDurableAck(handler, "t", 5);
+ callCount.incrementAndGet();
+ return true;
+ case 2:
+ // Pong can only arrive in response to a PING
+ if (!pingSent.get()) {
+ return false;
+ }
+ handler.onPong(0, 0);
+ callCount.incrementAndGet();
+ return true;
+ default:
+ return false;
+ }
+ });
+
+ queue = new WebSocketSendQueue(client, window, 1_000, 500);
+
+ queue.ping();
+
+ assertEquals(0, window.getInFlightCount());
+ assertEquals(5, queue.getDurableSeqTxn("t"));
+ } finally {
+ closeQuietly(queue);
+ }
+ });
+ }
+
+ @Test
+ public void testPingTimesOutWhenNoPong() throws Exception {
+ assertMemoryLeak(() -> {
+ InFlightWindow window = new InFlightWindow(8, 2_000);
+ WebSocketSendQueue queue = null;
+ try (FakeWebSocketClient client = new FakeWebSocketClient()) {
+ // Never emit a PONG
+ client.setTryReceiveBehavior(handler -> false);
+
+ queue = new WebSocketSendQueue(client, window, 1_000, 500);
+
+ try {
+ queue.ping();
+ fail("Expected ping timeout");
+ } catch (LineSenderException e) {
+ assertTrue(e.getMessage().contains("Ping timed out"));
+ }
+ } finally {
+ closeQuietly(queue);
+ }
+ });
+ }
+
+ @Test
+ public void testPingSurfacesTransportError() throws Exception {
+ assertMemoryLeak(() -> {
+ InFlightWindow window = new InFlightWindow(8, 5_000);
+ WebSocketSendQueue queue = null;
+ try (FakeWebSocketClient client = new FakeWebSocketClient()) {
+ client.setPingSendBehavior(() -> {
+ throw new RuntimeException("ping-send-fail");
+ });
+
+ queue = new WebSocketSendQueue(client, window, 1_000, 500);
+
+ try {
+ queue.ping();
+ fail("Expected error from ping");
+ } catch (LineSenderException e) {
+ assertTrue(e.getMessage().contains("Ping failed")
+ || e.getMessage().contains("Error in send queue"));
+ }
+ } finally {
+ closeQuietly(queue);
+ }
+ });
+ }
+
+ @Test
+ public void testConcurrentPingCallersEachGetTheirOwnPing() throws Exception {
+ // Without serialization, two concurrent ping() callers can both wake up on
+ // the same PONG and return — the second caller observes a durable watermark
+ // taken before its own PING was processed. The pingLock around ping()
+ // guarantees each caller sends its own PING and waits for its own PONG.
+ //
+ // To trigger the bug deterministically the I/O thread is held inside the
+ // first sendPing call until all caller threads are parked, so the buggy
+ // code has all of them in the synchronized(processingLock) block before
+ // any PONG is processed and only one or two PINGs are emitted in total.
+ assertMemoryLeak(() -> {
+ InFlightWindow window = new InFlightWindow(8, 5_000);
+ WebSocketSendQueue queue = null;
+ try (FakeWebSocketClient client = new FakeWebSocketClient()) {
+ AtomicInteger pingsSent = new AtomicInteger();
+ AtomicInteger pendingPongs = new AtomicInteger();
+ CountDownLatch firstPingBarrier = new CountDownLatch(1);
+ client.setPingSendBehavior(() -> {
+ int n = pingsSent.incrementAndGet();
+ pendingPongs.incrementAndGet();
+ if (n == 1) {
+ try {
+ firstPingBarrier.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ });
+ client.setTryReceiveBehavior(handler -> {
+ if (pendingPongs.get() > 0 && pendingPongs.decrementAndGet() >= 0) {
+ handler.onPong(0, 0);
+ return true;
+ }
+ return false;
+ });
+
+ queue = new WebSocketSendQueue(client, window, 5_000, 500);
+ final WebSocketSendQueue q = queue;
+
+ int callerCount = 3;
+ CountDownLatch ready = new CountDownLatch(callerCount);
+ CountDownLatch start = new CountDownLatch(1);
+ AtomicReference err = new AtomicReference<>();
+ Thread[] threads = new Thread[callerCount];
+ for (int i = 0; i < callerCount; i++) {
+ threads[i] = new Thread(() -> {
+ try {
+ ready.countDown();
+ start.await();
+ q.ping();
+ } catch (Throwable t) {
+ err.set(t);
+ }
+ }, "ping-caller-" + i);
+ threads[i].start();
+ }
+ ready.await();
+ start.countDown();
+ // Wait until every caller is parked: either in processingLock.wait()
+ // (buggy path) or BLOCKED on pingLock (fixed path).
+ for (Thread t : threads) {
+ awaitThreadBlocked(t);
+ }
+ firstPingBarrier.countDown();
+ for (Thread t : threads) {
+ t.join(10_000);
+ assertFalse("ping caller " + t.getName() + " did not complete", t.isAlive());
+ }
+ if (err.get() != null) {
+ throw new AssertionError("ping caller threw", err.get());
+ }
+ assertEquals("each concurrent caller must send its own PING",
+ callerCount, pingsSent.get());
+ } finally {
+ closeQuietly(queue);
+ }
+ });
+ }
+
+ @Test
+ public void testDurableSeqTxnInitiallyMinusOne() throws Exception {
+ assertMemoryLeak(() -> {
+ InFlightWindow window = new InFlightWindow(8, 5_000);
+ WebSocketSendQueue queue = null;
+ try (FakeWebSocketClient client = new FakeWebSocketClient()) {
+ queue = new WebSocketSendQueue(client, window, 1_000, 500);
+ assertEquals(-1, queue.getDurableSeqTxn("any_table"));
+ } finally {
+ closeQuietly(queue);
+ }
+ });
+ }
+
private static void awaitThreadBlocked(Thread thread) {
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
while (System.nanoTime() < deadline) {
Thread.State state = thread.getState();
- if (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING) {
+ if (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING || state == Thread.State.BLOCKED) {
return;
}
Os.sleep(1);
@@ -427,6 +795,51 @@ private static void emitBinary(WebSocketFrameHandler handler, byte[] payload) {
}
}
+ private static void emitDurableAck(WebSocketFrameHandler handler, String tableName, long seqTxn) {
+ WebSocketResponse response = WebSocketResponse.durableAck(tableName, seqTxn);
+ int size = response.serializedSize();
+ long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT);
+ try {
+ response.writeTo(ptr);
+ handler.onBinaryMessage(ptr, size);
+ } finally {
+ Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT);
+ }
+ }
+
+ private static void emitAckWithTables(WebSocketFrameHandler handler,
+ String[] tableNames, long[] seqTxns) {
+ byte[][] nameBytes = new byte[tableNames.length][];
+ int size = 1 + 8 + 2;
+ for (int i = 0; i < tableNames.length; i++) {
+ nameBytes[i] = tableNames[i].getBytes(StandardCharsets.UTF_8);
+ size += 2 + nameBytes[i].length + 8;
+ }
+ long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT);
+ try {
+ int offset = 0;
+ Unsafe.getUnsafe().putByte(ptr + offset, WebSocketResponse.STATUS_OK);
+ offset += 1;
+ Unsafe.getUnsafe().putLong(ptr + offset, 0);
+ offset += 8;
+ Unsafe.getUnsafe().putShort(ptr + offset, (short) tableNames.length);
+ offset += 2;
+ for (int i = 0; i < tableNames.length; i++) {
+ Unsafe.getUnsafe().putShort(ptr + offset, (short) nameBytes[i].length);
+ offset += 2;
+ for (int j = 0; j < nameBytes[i].length; j++) {
+ Unsafe.getUnsafe().putByte(ptr + offset + j, nameBytes[i][j]);
+ }
+ offset += nameBytes[i].length;
+ Unsafe.getUnsafe().putLong(ptr + offset, seqTxns[i]);
+ offset += 8;
+ }
+ handler.onBinaryMessage(ptr, size);
+ } finally {
+ Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT);
+ }
+ }
+
private static void emitAck(WebSocketFrameHandler handler, long sequence) {
WebSocketResponse response = WebSocketResponse.success(sequence);
int size = response.serializedSize();
@@ -439,8 +852,8 @@ private static void emitAck(WebSocketFrameHandler handler, long sequence) {
}
}
- private static void emitError(WebSocketFrameHandler handler, long sequence, byte status, String errorMessage) {
- WebSocketResponse response = WebSocketResponse.error(sequence, status, errorMessage);
+ private static void emitError(WebSocketFrameHandler handler, long sequence) {
+ WebSocketResponse response = WebSocketResponse.error(sequence, WebSocketResponse.STATUS_WRITE_ERROR, "disk full");
int size = response.serializedSize();
long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT);
try {
@@ -474,6 +887,7 @@ private interface ReceiveBehavior {
private static class FakeWebSocketClient extends WebSocketClient {
private volatile TryReceiveBehavior behavior = handler -> false;
private volatile boolean connected = true;
+ private volatile Runnable pingSendBehavior = () -> {};
private volatile ReceiveBehavior receiveBehavior = (handler, timeout) -> false;
private volatile SendBehavior sendBehavior = (dataPtr, length) -> {
};
@@ -498,6 +912,15 @@ public void sendBinary(long dataPtr, int length) {
sendBehavior.send(dataPtr, length);
}
+ @Override
+ public void sendPing(int timeout) {
+ pingSendBehavior.run();
+ }
+
+ public void setPingSendBehavior(Runnable pingSendBehavior) {
+ this.pingSendBehavior = pingSendBehavior;
+ }
+
public void setSendBehavior(SendBehavior sendBehavior) {
this.sendBehavior = sendBehavior;
}
diff --git a/core/src/test/java/io/questdb/client/test/std/CharSequenceLongHashMapTest.java b/core/src/test/java/io/questdb/client/test/std/CharSequenceLongHashMapTest.java
new file mode 100644
index 00000000..509c972d
--- /dev/null
+++ b/core/src/test/java/io/questdb/client/test/std/CharSequenceLongHashMapTest.java
@@ -0,0 +1,152 @@
+/*+*****************************************************************************
+ * ___ _ ____ ____
+ * / _ \ _ _ ___ ___| |_| _ \| __ )
+ * | | | | | | |/ _ \/ __| __| | | | _ \
+ * | |_| | |_| | __/\__ \ |_| |_| | |_) |
+ * \__\_\\__,_|\___||___/\__|____/|____/
+ *
+ * Copyright (c) 2014-2019 Appsicle
+ * Copyright (c) 2019-2026 QuestDB
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ ******************************************************************************/
+
+package io.questdb.client.test.std;
+
+import io.questdb.client.std.CharSequenceLongHashMap;
+import io.questdb.client.std.ObjList;
+import io.questdb.client.std.Rnd;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CharSequenceLongHashMapTest {
+
+ @Test
+ public void testClear() {
+ CharSequenceLongHashMap map = new CharSequenceLongHashMap();
+ map.put("a", 1);
+ map.put("b", 2);
+ Assert.assertEquals(2, map.size());
+
+ map.clear();
+ Assert.assertEquals(0, map.size());
+ Assert.assertEquals(CharSequenceLongHashMap.NO_ENTRY_VALUE, map.get("a"));
+ Assert.assertEquals(CharSequenceLongHashMap.NO_ENTRY_VALUE, map.get("b"));
+ }
+
+ @Test
+ public void testContains() {
+ CharSequenceLongHashMap map = new CharSequenceLongHashMap();
+ Rnd rnd = new Rnd();
+ final int N = 1000;
+
+ for (int i = 0; i < N; i++) {
+ String s = rnd.nextString(10).substring(1, 9);
+ map.put(s, i);
+ }
+
+ ObjList keys = map.keys();
+ for (int i = 0; i < keys.size(); i++) {
+ Assert.assertTrue(map.contains(keys.get(i)));
+ }
+ }
+
+ @Test
+ public void testCustomNoEntryValue() {
+ long customNoEntry = -42L;
+ CharSequenceLongHashMap map = new CharSequenceLongHashMap(8, 0.4, customNoEntry);
+ Assert.assertEquals(customNoEntry, map.get("missing"));
+ map.put("x", 100);
+ Assert.assertEquals(100, map.get("x"));
+ Assert.assertEquals(customNoEntry, map.get("y"));
+ }
+
+ @Test
+ public void testGetMissingKeyReturnsNoEntryValue() {
+ CharSequenceLongHashMap map = new CharSequenceLongHashMap();
+ Assert.assertEquals(CharSequenceLongHashMap.NO_ENTRY_VALUE, map.get("missing"));
+ map.put("present", 42);
+ Assert.assertEquals(CharSequenceLongHashMap.NO_ENTRY_VALUE, map.get("absent"));
+ }
+
+ @Test
+ public void testPutAndGet() {
+ Rnd rnd = new Rnd();
+ CharSequenceLongHashMap map = new CharSequenceLongHashMap();
+ final int N = 1000;
+ for (int i = 0; i < N; i++) {
+ CharSequence cs = rnd.nextString(15);
+ boolean isNew = map.put(cs, rnd.nextLong());
+ Assert.assertTrue(isNew);
+ }
+ Assert.assertEquals(N, map.size());
+
+ rnd.reset();
+
+ // verify all values are retrievable
+ for (int i = 0; i < N; i++) {
+ CharSequence cs = rnd.nextString(15);
+ Assert.assertEquals(rnd.nextLong(), map.get(cs));
+ }
+ }
+
+ @Test
+ public void testPutUpdatesExistingKey() {
+ CharSequenceLongHashMap map = new CharSequenceLongHashMap();
+ Assert.assertTrue(map.put("key", 1));
+ Assert.assertFalse(map.put("key", 2));
+ Assert.assertEquals(2, map.get("key"));
+ Assert.assertEquals(1, map.size());
+ }
+
+ @Test
+ public void testRehash() {
+ // use a small initial capacity to force multiple rehashes
+ CharSequenceLongHashMap map = new CharSequenceLongHashMap(8);
+ Rnd rnd = new Rnd();
+ final int N = 500;
+ for (int i = 0; i < N; i++) {
+ map.put(rnd.nextString(15), rnd.nextLong());
+ }
+ Assert.assertEquals(N, map.size());
+
+ // verify all values survive rehash
+ rnd.reset();
+ for (int i = 0; i < N; i++) {
+ CharSequence cs = rnd.nextString(15);
+ long expected = rnd.nextLong();
+ Assert.assertEquals(expected, map.get(cs));
+ }
+ }
+
+ @Test
+ public void testValueAtWithKeyIndex() {
+ CharSequenceLongHashMap map = new CharSequenceLongHashMap();
+ map.put("alpha", 10);
+ map.put("beta", 20);
+
+ int indexAlpha = map.keyIndex("alpha");
+ Assert.assertTrue(indexAlpha < 0);
+ Assert.assertEquals(10, map.valueAt(indexAlpha));
+
+ int indexBeta = map.keyIndex("beta");
+ Assert.assertTrue(indexBeta < 0);
+ Assert.assertEquals(20, map.valueAt(indexBeta));
+
+ // missing key returns a positive index
+ int indexMissing = map.keyIndex("gamma");
+ Assert.assertTrue(indexMissing >= 0);
+ Assert.assertEquals(CharSequenceLongHashMap.NO_ENTRY_VALUE, map.valueAt(indexMissing));
+ }
+}