diff --git a/core/src/main/java/io/questdb/client/Sender.java b/core/src/main/java/io/questdb/client/Sender.java index 1175db7f..cf320640 100644 --- a/core/src/main/java/io/questdb/client/Sender.java +++ b/core/src/main/java/io/questdb/client/Sender.java @@ -619,6 +619,7 @@ public int getTimeout() { private PrivateKey privateKey; private int protocol = PARAMETER_NOT_SET_EXPLICITLY; private int protocolVersion = PARAMETER_NOT_SET_EXPLICITLY; + private boolean requestDurableAck; private int retryTimeoutMillis = PARAMETER_NOT_SET_EXPLICITLY; private boolean shouldDestroyPrivKey; private boolean tlsEnabled; @@ -932,7 +933,8 @@ public Sender build() { actualAutoFlushIntervalNanos, actualInFlightWindowSize, wsAuthHeader, - actualMaxSchemasPerConnection + actualMaxSchemasPerConnection, + requestDurableAck ); } @@ -1483,6 +1485,27 @@ public LineSenderBuilder protocolVersion(int protocolVersion) { return this; } + /** + * Opts the connection in for STATUS_DURABLE_ACK frames. When enabled, + * servers with primary replication will emit per-table durable-upload + * watermarks as WAL data reaches the object store. + *

+ * This setting is only supported for WebSocket transport. + *

+ * Observe durable progress via + * {@link QwpWebSocketSender#getHighestDurableSeqTxn(CharSequence)}. + * + * @param enabled true to request durable ACKs + * @return this instance for method chaining + */ + public LineSenderBuilder requestDurableAck(boolean enabled) { + if (protocol != PARAMETER_NOT_SET_EXPLICITLY && protocol != PROTOCOL_WEBSOCKET) { + throw new LineSenderException("request_durable_ack is only supported for WebSocket transport"); + } + this.requestDurableAck = enabled; + return this; + } + /** * Configures the maximum time the Sender will spend retrying upon receiving a recoverable error from the server. *
@@ -1875,6 +1898,18 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) { pos = getValue(configurationString, pos, sink, "in_flight_window"); int windowSize = parseIntValue(sink, "in_flight_window"); inFlightWindowSize(windowSize); + } else if (Chars.equals("request_durable_ack", sink)) { + if (protocol != PROTOCOL_WEBSOCKET) { + throw new LineSenderException("request_durable_ack is only supported for WebSocket transport"); + } + pos = getValue(configurationString, pos, sink, "request_durable_ack"); + if (Chars.equalsIgnoreCase("on", sink)) { + requestDurableAck(true); + } else if (Chars.equalsIgnoreCase("off", sink)) { + requestDurableAck(false); + } else { + throw new LineSenderException("invalid request_durable_ack [value=").put(sink).put(", allowed-values=[on, off]]"); + } } else if (Chars.equals("max_schemas_per_connection", sink)) { if (protocol != PROTOCOL_WEBSOCKET) { throw new LineSenderException("max_schemas_per_connection is only supported for WebSocket transport"); @@ -1972,6 +2007,9 @@ private void validateParameters() { .put(", requestedCapacity=").put(bufferCapacity) .put("]"); } + if (requestDurableAck && protocol != PROTOCOL_WEBSOCKET) { + throw new LineSenderException("request_durable_ack is only supported for WebSocket transport"); + } if (protocol == PROTOCOL_HTTP) { if (httpClientConfiguration.getMaximumRequestBufferSize() < httpClientConfiguration.getInitialRequestBufferSize()) { throw new LineSenderException("maximum buffer capacity cannot be less than initial buffer capacity ") diff --git a/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketClient.java b/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketClient.java index fc93a1e5..986a34dd 100644 --- a/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketClient.java +++ b/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketClient.java @@ -107,6 +107,8 @@ public abstract class WebSocketClient implements QuietCloseable { // QWP version negotiation private String qwpClientId; private int qwpMaxVersion = 1; + // Opt-in for STATUS_DURABLE_ACK frames; sent as X-QWP-Request-Durable-Ack: true + private boolean qwpRequestDurableAck; // Receive buffer (native memory) private long recvBufPtr; private int recvBufSize; @@ -390,6 +392,16 @@ public void setQwpMaxVersion(int maxVersion) { this.qwpMaxVersion = maxVersion; } + /** + * Enables the opt-in X-QWP-Request-Durable-Ack upgrade header. When set, + * servers with primary replication configured will additionally emit + * STATUS_DURABLE_ACK frames as the WAL containing committed client + * messages reaches the object store. + */ + public void setQwpRequestDurableAck(boolean enabled) { + this.qwpRequestDurableAck = enabled; + } + /** * Non-blocking attempt to receive a WebSocket frame. * Returns immediately if no complete frame is available. @@ -476,6 +488,9 @@ public void upgrade(CharSequence path, int timeout, CharSequence authorizationHe sendBuffer.putAscii(qwpClientId); sendBuffer.putAscii("\r\n"); } + if (qwpRequestDurableAck) { + sendBuffer.putAscii("X-QWP-Request-Durable-Ack: true\r\n"); + } if (authorizationHeader != null) { sendBuffer.putAscii("Authorization: "); sendBuffer.putAscii(authorizationHeader); diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/InFlightWindow.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/InFlightWindow.java index 4a9b0868..1d8e5c46 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/InFlightWindow.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/InFlightWindow.java @@ -377,6 +377,14 @@ public Throwable getLastError() { return lastError.get(); } + /** + * Returns the highest batch sequence acknowledged by the server, or -1 if + * no acknowledgment has been received yet. + */ + public long getHighestAckedSequence() { + return highestAcked; + } + /** * Returns the maximum window size. */ @@ -384,6 +392,13 @@ public int getMaxWindowSize() { return maxWindowSize; } + /** + * Returns the timeout (ms) applied to blocking window operations. + */ + public long getTimeoutMs() { + return timeoutMs; + } + /** * Returns the total number of batches acknowledged. */ diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index 86885e6e..93189006 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -35,6 +35,7 @@ import io.questdb.client.cutlass.line.array.LongArray; import io.questdb.client.cutlass.qwp.protocol.QwpConstants; import io.questdb.client.cutlass.qwp.protocol.QwpTableBuffer; +import io.questdb.client.std.CharSequenceLongHashMap; import io.questdb.client.std.CharSequenceObjHashMap; import io.questdb.client.std.Chars; import io.questdb.client.std.Decimal128; @@ -151,7 +152,7 @@ public class QwpWebSocketSender implements Sender { // Flow control private InFlightWindow inFlightWindow; private int maxSentSchemaId = -1; - // Track highest symbol ID sent to server (for delta encoding) + // Track the highest symbol ID sent to server (for delta encoding) // Once sent over TCP, server is guaranteed to receive it (or connection dies) private int maxSentSymbolId = -1; // Batch sequence counter (must match server's messageSequence) @@ -160,7 +161,11 @@ public class QwpWebSocketSender implements Sender { // Async mode: pending row tracking private long pendingBytes; private int pendingRowCount; + private final CharSequenceLongHashMap syncCommittedSeqTxns = new CharSequenceLongHashMap(); + private final CharSequenceLongHashMap syncDurableSeqTxns = new CharSequenceLongHashMap(); + private boolean requestDurableAck; private boolean sawBinaryAck; + private boolean sawPong; private WebSocketSendQueue sendQueue; private QwpWebSocketSender( @@ -303,6 +308,33 @@ public static QwpWebSocketSender connect( return sender; } + public static QwpWebSocketSender connect( + String host, + int port, + ClientTlsConfiguration tlsConfig, + int autoFlushRows, + int autoFlushBytes, + long autoFlushIntervalNanos, + int inFlightWindowSize, + String authorizationHeader, + int maxSchemasPerConnection, + boolean requestDurableAck + ) { + QwpWebSocketSender sender = new QwpWebSocketSender( + host, port, tlsConfig, + autoFlushRows, autoFlushBytes, autoFlushIntervalNanos, + inFlightWindowSize, authorizationHeader, maxSchemasPerConnection + ); + try { + sender.setRequestDurableAck(requestDurableAck); + sender.ensureConnected(); + } catch (Throwable t) { + sender.close(); + throw t; + } + return sender; + } + /** * Creates a sender without connecting. For testing only. *

@@ -315,10 +347,14 @@ public static QwpWebSocketSender connect( * @return unconnected sender */ public static QwpWebSocketSender createForTesting(String host, int port, int inFlightWindowSize) { + return createForTesting(host, port, inFlightWindowSize, null); + } + + public static QwpWebSocketSender createForTesting(String host, int port, int inFlightWindowSize, String authorizationHeader) { return new QwpWebSocketSender( host, port, null, DEFAULT_AUTO_FLUSH_ROWS, DEFAULT_AUTO_FLUSH_BYTES, DEFAULT_AUTO_FLUSH_INTERVAL_NANOS, - inFlightWindowSize, null, DEFAULT_MAX_SCHEMAS_PER_CONNECTION + inFlightWindowSize, authorizationHeader, DEFAULT_MAX_SCHEMAS_PER_CONNECTION ); } @@ -817,6 +853,31 @@ public int getAutoFlushRows() { return autoFlushRows; } + /** + * Returns the highest seqTxn committed (written to WAL) for the given + * table, or -1 if no commit has been acknowledged for that table yet. + */ + public long getHighestAckedSeqTxn(CharSequence tableName) { + if (sendQueue != null) { + return sendQueue.getCommittedSeqTxn(tableName); + } + return syncCommittedSeqTxns.get(tableName); + } + + /** + * Returns the highest seqTxn durably uploaded to object store for the + * given table, or -1 if no durable ACK has been observed for that table. + * Only meaningful when the connection was opened with + * {@link #setRequestDurableAck(boolean)} = true on a server where primary + * replication is enabled. + */ + public long getHighestDurableSeqTxn(CharSequence tableName) { + if (sendQueue != null) { + return sendQueue.getDurableSeqTxn(tableName); + } + return syncDurableSeqTxns.get(tableName); + } + /** * Returns the max symbol ID sent to the server. * Once sent over TCP, server is guaranteed to receive it (or connection dies). @@ -998,6 +1059,31 @@ public QwpWebSocketSender longColumn(CharSequence columnName, long value) { return this; } + /** + * Sends a WebSocket PING and blocks until the PONG arrives, processing + * any STATUS_DURABLE_ACK or STATUS_OK frames along the way. + *

+ * The server flushes pending durable ACKs before sending the PONG, so + * after this method returns, {@link #getHighestDurableSeqTxn(CharSequence)} + * reflects all durable progress up to the moment the server processed + * the PING. + *

+ * In async mode the PING is sent by the I/O thread; the I/O loop + * continues its normal work (sending batches, draining ACKs) while + * waiting for the PONG. + * + * @throws LineSenderException if the connection is closed or the ping times out + */ + public void ping() { + checkNotClosed(); + ensureConnected(); + if (inFlightWindowSize > 1) { + sendQueue.ping(); + } else { + syncPing(); + } + } + @Override public void reset() { checkNotClosed(); @@ -1026,6 +1112,27 @@ public void setGorillaEnabled(boolean enabled) { this.encoder.setGorillaEnabled(enabled); } + /** + * Opts the connection in for STATUS_DURABLE_ACK frames. Must be called + * before any send operation — the flag is consulted once, during WebSocket + * upgrade. Setting this true on a server without primary replication + * enabled is a no-op: the server silently ignores the header. + *

+ * Observe durable progress via {@link #getHighestDurableSeqTxn(CharSequence)}. + * + * @throws LineSenderException if the connection is already established or closed + */ + public void setRequestDurableAck(boolean enabled) { + if (closed) { + throw new LineSenderException("Sender is closed"); + } + if (connected) { + throw new LineSenderException( + "setRequestDurableAck must be called before the first send"); + } + this.requestDurableAck = enabled; + } + /** * Adds a SHORT column value to the current row. * @@ -1263,6 +1370,7 @@ private void ensureConnected() { try { client.setQwpMaxVersion(QwpConstants.MAX_SUPPORTED_VERSION); client.setQwpClientId(QwpConstants.CLIENT_ID); + client.setQwpRequestDurableAck(requestDurableAck); client.connect(host, port); client.upgrade(WRITE_PATH, authorizationHeader); } catch (Exception e) { @@ -1695,6 +1803,58 @@ private boolean shouldAutoFlush() { return false; } + private void syncPing() { + client.sendPing(1000); + long deadline = System.currentTimeMillis() + InFlightWindow.DEFAULT_TIMEOUT_MS; + LineSenderException pingError = null; + while (System.currentTimeMillis() < deadline) { + sawPong = false; + sawBinaryAck = false; + boolean received = client.receiveFrame(ackHandler, 1000); + if (received) { + if (sawBinaryAck) { + if (ackResponse.isDurableAck()) { + updateSyncSeqTxns(syncDurableSeqTxns); + } else if (ackResponse.isSuccess()) { + inFlightWindow.acknowledgeUpTo(ackResponse.getSequence()); + updateSyncSeqTxns(syncCommittedSeqTxns); + } else { + // Server-side error on a pending batch (parse / + // schema / security / internal / write error). + // Route through inFlightWindow.fail so subsequent + // waitForAck / flush calls also see it, capture the + // first error and throw it after PONG so the caller + // of ping() can react. We finish draining the round + // before throwing so durable/committed progress + // observed in this ping is preserved. + LineSenderException err = new LineSenderException(ackResponse.getErrorMessage()); + inFlightWindow.fail(ackResponse.getSequence(), err); + if (pingError == null) { + pingError = err; + } + } + } + if (sawPong) { + if (pingError != null) { + throw pingError; + } + return; + } + } + } + throw new LineSenderException("Ping timed out"); + } + + private void updateSyncSeqTxns(CharSequenceLongHashMap seqTxns) { + for (int i = 0, n = ackResponse.getTableEntryCount(); i < n; i++) { + String name = ackResponse.getTableName(i); + long seqTxn = ackResponse.getTableSeqTxn(i); + if (seqTxn > seqTxns.get(name)) { + seqTxns.put(name, seqTxn); + } + } + } + private long toMicros(long value, ChronoUnit unit) { switch (unit) { case NANOS: @@ -1740,19 +1900,20 @@ private void waitForAck(long expectedSequence) { boolean received = client.receiveFrame(ackHandler, 1000); // 1 second timeout per read attempt if (received) { - // Non-binary frames (e.g. ping/pong/text) are not ACKs. if (!sawBinaryAck) { continue; } - long sequence = ackResponse.getSequence(); if (ackResponse.isSuccess()) { - // Cumulative ACK - acknowledge all batches up to this sequence + long sequence = ackResponse.getSequence(); inFlightWindow.acknowledgeUpTo(sequence); + updateSyncSeqTxns(syncCommittedSeqTxns); if (sequence >= expectedSequence) { - return; // Our batch was acknowledged (cumulative) + return; } - // Got ACK for lower sequence - continue waiting + } else if (ackResponse.isDurableAck()) { + updateSyncSeqTxns(syncDurableSeqTxns); } else { + long sequence = ackResponse.getSequence(); String errorMessage = ackResponse.getErrorMessage(); LineSenderException error = new LineSenderException( "Server error for batch " + sequence + ": " + @@ -1786,19 +1947,22 @@ private static class AckFrameHandler implements WebSocketFrameHandler { @Override public void onBinaryMessage(long payloadPtr, int payloadLen) { sender.sawBinaryAck = true; - if (!WebSocketResponse.isStructurallyValid(payloadPtr, payloadLen)) { + // readFrom validates inline; a single pass parses and bounds-checks. + if (!sender.ackResponse.readFrom(payloadPtr, payloadLen)) { throw new LineSenderException( "Invalid ACK response payload [length=" + payloadLen + ']' ); } - if (!sender.ackResponse.readFrom(payloadPtr, payloadLen)) { - throw new LineSenderException("Failed to parse ACK response"); - } } @Override public void onClose(int code, String reason) { throw new LineSenderException("WebSocket closed while waiting for ACK [code=" + code + ", reason=" + reason + ']'); } + + @Override + public void onPong(long payloadPtr, int payloadLen) { + sender.sawPong = true; + } } } diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketResponse.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketResponse.java index 9ed6b9b9..1c892690 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketResponse.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketResponse.java @@ -24,41 +24,56 @@ package io.questdb.client.cutlass.qwp.client; +import io.questdb.client.std.LongList; +import io.questdb.client.std.ObjList; import io.questdb.client.std.Unsafe; +import io.questdb.client.std.Utf8SequenceObjHashMap; +import io.questdb.client.std.str.DirectUtf8String; import io.questdb.client.std.str.Utf8s; +import org.jetbrains.annotations.TestOnly; import java.nio.charset.StandardCharsets; /** * Binary response format for WebSocket QWP v1 protocol. *

- * Response format (little-endian): + * STATUS_OK response format (little-endian): + *

+ * +--------+----------+------------+--------------------------------------+
+ * | status | sequence | tableCount | table entries                         |
+ * | 1 byte | 8 bytes  | 2 bytes    | [nameLen(2)+name(N)+seqTxn(8)] * cnt |
+ * +--------+----------+------------+--------------------------------------+
+ * 
+ *

+ * STATUS_DURABLE_ACK response format: + *

+ * +--------+------------+--------------------------------------+
+ * | status | tableCount | table entries                         |
+ * | 1 byte | 2 bytes    | [nameLen(2)+name(N)+seqTxn(8)] * cnt |
+ * +--------+------------+--------------------------------------+
+ * 
+ *

+ * Error response format: *

  * +--------+----------+------------------+
- * | status | sequence | error (if any)   |
+ * | status | sequence | error            |
  * | 1 byte | 8 bytes  | 2 bytes + UTF-8  |
  * +--------+----------+------------------+
  * 
- *

- * Status codes: - *

- *

- * The sequence number allows correlation with the original request. - * Error message is only present when status != 0. */ public class WebSocketResponse { public static final int MAX_ERROR_MESSAGE_LENGTH = 1024; + public static final int MIN_DURABLE_ACK_SIZE = 3; // status + tableCount public static final int MIN_ERROR_RESPONSE_SIZE = 11; // status + sequence + error length - // Minimum response size: status (1) + sequence (8) - public static final int MIN_RESPONSE_SIZE = 9; + public static final int MIN_OK_RESPONSE_SIZE = 11; // status + sequence + tableCount + /** + * Per-table durable-upload acknowledgment. Emitted by servers where + * primary replication is enabled and the connection opted in via + * X-QWP-Request-Durable-Ack. Payload: status + tableCount + per-table + * entries (nameLen + name + seqTxn). + */ + public static final byte STATUS_DURABLE_ACK = 0x02; public static final byte STATUS_INTERNAL_ERROR = 0x06; // Status codes (must match QWP_SPECIFICATION.md) public static final byte STATUS_OK = 0x00; @@ -66,6 +81,14 @@ public class WebSocketResponse { public static final byte STATUS_SCHEMA_MISMATCH = 0x03; public static final byte STATUS_SECURITY_ERROR = 0x08; public static final byte STATUS_WRITE_ERROR = 0x09; + private final DirectUtf8String lookupKey = new DirectUtf8String(); + // Caches decoded table names so the same native-memory byte sequence resolves to + // the same interned String across frames. First sight of a name allocates a String + // (decoded via Utf8s.stringFromUtf8Bytes) and an owned Utf8String map key; every + // subsequent sight is allocation-free. + private final Utf8SequenceObjHashMap tableNameCache = new Utf8SequenceObjHashMap<>(); + private final ObjList tableNames = new ObjList<>(); + private final LongList tableSeqTxns = new LongList(); private String errorMessage; private int errorMessageUtf8Length; private long sequence; @@ -78,9 +101,23 @@ public WebSocketResponse() { this.errorMessageUtf8Length = -1; } + /** + * Creates a durable-upload ACK response with a single table entry. + */ + @TestOnly + public static WebSocketResponse durableAck(String tableName, long seqTxn) { + WebSocketResponse response = new WebSocketResponse(); + response.status = STATUS_DURABLE_ACK; + response.sequence = -1; + response.tableNames.add(tableName); + response.tableSeqTxns.add(seqTxn); + return response; + } + /** * Creates an error response. */ + @TestOnly public static WebSocketResponse error(long sequence, byte status, String errorMessage) { WebSocketResponse response = new WebSocketResponse(); response.status = status; @@ -92,38 +129,43 @@ public static WebSocketResponse error(long sequence, byte status, String errorMe /** * Validates binary response framing without allocating. - *

- * Accepted formats: - *

* * @param ptr response buffer pointer * @param length response frame payload length * @return true if payload structure is valid */ public static boolean isStructurallyValid(long ptr, int length) { - if (length < MIN_RESPONSE_SIZE) { + if (length < 1) { return false; } byte status = Unsafe.getUnsafe().getByte(ptr); if (status == STATUS_OK) { - return length == MIN_RESPONSE_SIZE; + if (length < MIN_OK_RESPONSE_SIZE) { + return false; + } + return validateTableEntries(ptr + 9, length - 9); } + if (status == STATUS_DURABLE_ACK) { + if (length < MIN_DURABLE_ACK_SIZE) { + return false; + } + return validateTableEntries(ptr + 1, length - 1); + } + + // Error response if (length < MIN_ERROR_RESPONSE_SIZE) { return false; } - - int msgLen = Unsafe.getUnsafe().getShort(ptr + MIN_RESPONSE_SIZE) & 0xFFFF; + int msgLen = Unsafe.getUnsafe().getShort(ptr + 9) & 0xFFFF; return length == MIN_ERROR_RESPONSE_SIZE + msgLen; } /** * Creates a success response. */ + @TestOnly public static WebSocketResponse success(long sequence) { WebSocketResponse response = new WebSocketResponse(); response.status = STATUS_OK; @@ -140,12 +182,20 @@ public String getErrorMessage() { } /** - * Returns the sequence number. + * Returns the client batch sequence number (STATUS_OK and error frames), + * or -1 for STATUS_DURABLE_ACK frames. */ public long getSequence() { return sequence; } + /** + * Returns the raw status byte. + */ + public byte getStatus() { + return status; + } + /** * Returns a human-readable status name. */ @@ -153,6 +203,8 @@ public String getStatusName() { switch (status) { case STATUS_OK: return "OK"; + case STATUS_DURABLE_ACK: + return "DURABLE_ACK"; case STATUS_PARSE_ERROR: return "PARSE_ERROR"; case STATUS_SCHEMA_MISMATCH: @@ -168,8 +220,27 @@ public String getStatusName() { } } + public int getTableEntryCount() { + return tableNames.size(); + } + + public String getTableName(int index) { + return tableNames.getQuick(index); + } + + public long getTableSeqTxn(int index) { + return tableSeqTxns.getQuick(index); + } + + /** + * Returns true when this is a per-table durable-upload ACK (STATUS_DURABLE_ACK). + */ + public boolean isDurableAck() { + return status == STATUS_DURABLE_ACK; + } + /** - * Returns true if this is a success response. + * Returns true if this is a success response (STATUS_OK). */ public boolean isSuccess() { return status == STATUS_OK; @@ -183,50 +254,70 @@ public boolean isSuccess() { * @return true if successfully parsed, false if not enough data */ public boolean readFrom(long ptr, int length) { - if (length < MIN_RESPONSE_SIZE) { + tableNames.clear(); + tableSeqTxns.clear(); + + if (length < 1) { return false; } - int offset = 0; - - // Status (1 byte) - status = Unsafe.getUnsafe().getByte(ptr + offset); - offset += 1; - - // Sequence (8 bytes, little-endian) - sequence = Unsafe.getUnsafe().getLong(ptr + offset); - offset += 8; + status = Unsafe.getUnsafe().getByte(ptr); - // Error message (if status != OK and more data available) - if (status != STATUS_OK && length >= offset + 2) { - int msgLen = Unsafe.getUnsafe().getShort(ptr + offset) & 0xFFFF; - offset += 2; + if (status == STATUS_OK) { + if (length < MIN_OK_RESPONSE_SIZE) { + return false; + } + sequence = Unsafe.getUnsafe().getLong(ptr + 1); + errorMessage = null; + errorMessageUtf8Length = -1; + return readTableEntries(ptr + 9, length - 9); + } - if (length >= offset + msgLen && msgLen > 0) { - byte[] msgBytes = new byte[msgLen]; - for (int i = 0; i < msgLen; i++) { - msgBytes[i] = Unsafe.getUnsafe().getByte(ptr + offset + i); - } - errorMessage = new String(msgBytes, StandardCharsets.UTF_8); - errorMessageUtf8Length = -1; - } else { - errorMessage = null; - errorMessageUtf8Length = 0; + if (status == STATUS_DURABLE_ACK) { + if (length < MIN_DURABLE_ACK_SIZE) { + return false; } - } else { + sequence = -1; errorMessage = null; errorMessageUtf8Length = -1; + return readTableEntries(ptr + 1, length - 1); } + // Error response + if (length < MIN_ERROR_RESPONSE_SIZE) { + return false; + } + sequence = Unsafe.getUnsafe().getLong(ptr + 1); + int offset = 9; + int msgLen = Unsafe.getUnsafe().getShort(ptr + offset) & 0xFFFF; + offset += 2; + if (length < offset + msgLen) { + return false; + } + if (msgLen > 0) { + byte[] msgBytes = new byte[msgLen]; + for (int i = 0; i < msgLen; i++) { + msgBytes[i] = Unsafe.getUnsafe().getByte(ptr + offset + i); + } + errorMessage = new String(msgBytes, StandardCharsets.UTF_8); + errorMessageUtf8Length = -1; + } else { + errorMessage = null; + errorMessageUtf8Length = 0; + } return true; } /** * Calculates the serialized size of this response. */ + @TestOnly public int serializedSize() { if (status == STATUS_OK) { - return MIN_RESPONSE_SIZE; + return MIN_OK_RESPONSE_SIZE + tableEntriesSize(); + } + if (status == STATUS_DURABLE_ACK) { + return MIN_DURABLE_ACK_SIZE + tableEntriesSize(); } return MIN_ERROR_RESPONSE_SIZE + getErrorMessageUtf8Length(); } @@ -234,7 +325,9 @@ public int serializedSize() { @Override public String toString() { if (isSuccess()) { - return "WebSocketResponse{status=OK, seq=" + sequence + "}"; + return "WebSocketResponse{status=OK, seq=" + sequence + ", tables=" + tableNames.size() + "}"; + } else if (isDurableAck()) { + return "WebSocketResponse{status=DURABLE_ACK, tables=" + tableNames.size() + "}"; } else { return "WebSocketResponse{status=" + getStatusName() + ", seq=" + sequence + ", error=" + errorMessage + "}"; @@ -247,19 +340,23 @@ public String toString() { * @param ptr destination address * @return number of bytes written */ + @TestOnly public int writeTo(long ptr) { int offset = 0; - // Status (1 byte) Unsafe.getUnsafe().putByte(ptr + offset, status); offset += 1; - // Sequence (8 bytes, little-endian) - Unsafe.getUnsafe().putLong(ptr + offset, sequence); - offset += 8; + if (status == STATUS_OK) { + Unsafe.getUnsafe().putLong(ptr + offset, sequence); + offset += 8; + offset += writeTableEntries(ptr + offset); + } else if (status == STATUS_DURABLE_ACK) { + offset += writeTableEntries(ptr + offset); + } else { + Unsafe.getUnsafe().putLong(ptr + offset, sequence); + offset += 8; - // Error message length and bytes (if any) - if (status != STATUS_OK) { int msgLen = getErrorMessageUtf8Length(); // Length prefix (2 bytes, little-endian) Unsafe.getUnsafe().putShort(ptr + offset, (short) msgLen); @@ -270,12 +367,105 @@ public int writeTo(long ptr) { offset += Utf8s.strCpyUtf8(errorMessage, ptr + offset, msgLen); } } + return offset; + } + + // Validates inline as it parses; returns false on truncation, lying-length + // entries, empty table names, or trailing garbage. On false, tableNames / + // tableSeqTxns may hold partial state, but the caller (readFrom) clears + // both lists at the start of every call so partial state never leaks. + private boolean readTableEntries(long ptr, int remaining) { + if (remaining < 2) { + return false; + } + int tableCount = Unsafe.getUnsafe().getShort(ptr) & 0xFFFF; + int offset = 2; + for (int i = 0; i < tableCount; i++) { + if (remaining < offset + 2) { + return false; + } + int nameLen = Unsafe.getUnsafe().getShort(ptr + offset) & 0xFFFF; + offset += 2; + // Empty table names are rejected as structurally invalid - a valid + // table name is never zero bytes, and accepting empty names would + // let a misbehaving server poison the per-table tracker with "" entries. + if (nameLen == 0 || remaining < offset + nameLen + 8) { + return false; + } + long nameLo = ptr + offset; + long nameHi = nameLo + nameLen; + offset += nameLen; + long seqTxn = Unsafe.getUnsafe().getLong(ptr + offset); + offset += 8; + tableNames.add(internTableName(nameLo, nameHi)); + tableSeqTxns.add(seqTxn); + } + return remaining == offset; + } + + private String internTableName(long lo, long hi) { + lookupKey.of(lo, hi); + int keyIndex = tableNameCache.keyIndex(lookupKey); + if (keyIndex < 0) { + return tableNameCache.valueAtQuick(keyIndex); + } + String decoded = Utf8s.stringFromUtf8Bytes(lo, hi); + tableNameCache.putAt(keyIndex, lookupKey, decoded); + return decoded; + } + + private int tableEntriesSize() { + int size = 0; + for (int i = 0, n = tableNames.size(); i < n; i++) { + size += 2 + tableNames.getQuick(i).getBytes(StandardCharsets.UTF_8).length + 8; + } + return size; + } + + private static boolean validateTableEntries(long ptr, int remaining) { + if (remaining < 2) { + return false; + } + int tableCount = Unsafe.getUnsafe().getShort(ptr) & 0xFFFF; + int offset = 2; + for (int i = 0; i < tableCount; i++) { + if (remaining < offset + 2) { + return false; + } + int nameLen = Unsafe.getUnsafe().getShort(ptr + offset) & 0xFFFF; + offset += 2; + // Empty table names are rejected as structurally invalid - a valid + // table name is never zero bytes, and accepting empty names would + // let a misbehaving server poison the per-table tracker with "" entries. + if (nameLen == 0 || remaining < offset + nameLen + 8) { + return false; + } + offset += nameLen + 8; + } + return remaining == offset; + } + private int writeTableEntries(long ptr) { + int offset = 0; + int count = tableNames.size(); + Unsafe.getUnsafe().putShort(ptr + offset, (short) count); + offset += 2; + for (int i = 0; i < count; i++) { + byte[] nameBytes = tableNames.getQuick(i).getBytes(StandardCharsets.UTF_8); + Unsafe.getUnsafe().putShort(ptr + offset, (short) nameBytes.length); + offset += 2; + for (int j = 0; j < nameBytes.length; j++) { + Unsafe.getUnsafe().putByte(ptr + offset + j, nameBytes[j]); + } + offset += nameBytes.length; + Unsafe.getUnsafe().putLong(ptr + offset, tableSeqTxns.getQuick(i)); + offset += 8; + } return offset; } private int getErrorMessageUtf8Length() { - if (status == STATUS_OK || errorMessage == null || errorMessage.isEmpty()) { + if (status == STATUS_OK || status == STATUS_DURABLE_ACK || errorMessage == null || errorMessage.isEmpty()) { errorMessageUtf8Length = 0; return 0; } diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketSendQueue.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketSendQueue.java index 181602b8..1ac73f81 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketSendQueue.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketSendQueue.java @@ -27,6 +27,7 @@ import io.questdb.client.cutlass.http.client.WebSocketClient; import io.questdb.client.cutlass.http.client.WebSocketFrameHandler; import io.questdb.client.cutlass.line.LineSenderException; +import io.questdb.client.std.CharSequenceLongHashMap; import io.questdb.client.std.QuietCloseable; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; @@ -73,6 +74,7 @@ public class WebSocketSendQueue implements QuietCloseable { private final WebSocketClient client; // Configuration private final long enqueueTimeoutMs; + private final long pingTimeoutMs; @Nullable private final ConnectionFailureListener connectionFailureListener; // Optional InFlightWindow for tracking sent batches awaiting ACK @@ -81,6 +83,11 @@ public class WebSocketSendQueue implements QuietCloseable { // The I/O thread for async send/receive private final Thread ioThread; + // Serializes concurrent ping() callers so each one gets its own PING/PONG + // round-trip. Without this, two callers can race on pingComplete and the + // second caller can return on the first caller's PONG, observing a stale + // durable watermark. + private final Object pingLock = new Object(); // Counter for batches currently being processed by the I/O thread // This tracks batches that have been dequeued but not yet fully sent private final AtomicInteger processingCount = new AtomicInteger(0); @@ -94,6 +101,10 @@ public class WebSocketSendQueue implements QuietCloseable { // Synchronization for flush/close private final CountDownLatch shutdownLatch; private final long shutdownTimeoutMs; + // Per-table seqTxn watermarks. Written by the I/O thread only; read by user threads. + // All accesses synchronize on the map instance itself for publication and monotonic updates. + private final CharSequenceLongHashMap committedSeqTxns = new CharSequenceLongHashMap(); + private final CharSequenceLongHashMap durableSeqTxns = new CharSequenceLongHashMap(); // Statistics - receiving private final AtomicLong totalAcks = new AtomicLong(0); // Statistics - sending @@ -109,6 +120,10 @@ public class WebSocketSendQueue implements QuietCloseable { // Single pending buffer slot (double-buffering means at most 1 item in queue) // Zero allocation - just a volatile reference handoff private volatile MicrobatchBuffer pendingBuffer; + private volatile boolean pingComplete; + private volatile boolean pingRequested; + private volatile boolean pongReceived; + private long pingDeadlineNanos; // Running state private volatile boolean running; private volatile boolean shuttingDown; @@ -146,6 +161,7 @@ public WebSocketSendQueue(WebSocketClient client, @Nullable InFlightWindow inFli this.inFlightWindow = inFlightWindow; this.enqueueTimeoutMs = enqueueTimeoutMs; this.shutdownTimeoutMs = shutdownTimeoutMs; + this.pingTimeoutMs = inFlightWindow != null ? inFlightWindow.getTimeoutMs() : InFlightWindow.DEFAULT_TIMEOUT_MS; this.connectionFailureListener = connectionFailureListener; this.running = true; this.shuttingDown = false; @@ -349,6 +365,61 @@ public Throwable getLastError() { return lastError; } + public long getCommittedSeqTxn(CharSequence tableName) { + synchronized (committedSeqTxns) { + return committedSeqTxns.get(tableName); + } + } + + public long getDurableSeqTxn(CharSequence tableName) { + synchronized (durableSeqTxns) { + return durableSeqTxns.get(tableName); + } + } + + /** + * Requests the I/O thread to send a WebSocket PING and blocks until + * the PONG arrives. The I/O loop continues its normal work (sending + * batches, draining ACKs) while waiting for the PONG. + *

+ * The server flushes pending durable ACKs before sending the PONG, + * so after this method returns {@code getDurableSeqTxn()} reflects + * all durable progress up to the moment the server processed the PING. + *

+ * Concurrent ping callers are serialized: each caller gets its own + * PING / PONG round-trip so the post-condition holds for every caller + * independently. A second caller may wait up to {@code pingTimeoutMs} + * for an in-flight ping to complete before its own ping starts. + */ + public void ping() { + synchronized (pingLock) { + checkError(); + synchronized (processingLock) { + pingComplete = false; + pingRequested = true; + processingLock.notifyAll(); + long deadline = System.nanoTime() + pingTimeoutMs * 1_000_000L; + while (!pingComplete && running) { + long remaining = (deadline - System.nanoTime()) / 1_000_000L; + if (remaining <= 0) { + throw new LineSenderException("Ping timed out"); + } + try { + processingLock.wait(remaining); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new LineSenderException("Ping interrupted"); + } + } + if (!pingComplete) { + checkError(); + throw new LineSenderException("Ping aborted: send queue is shutting down"); + } + } + checkError(); + } + } + /** * Returns the total number of batches sent. */ @@ -374,12 +445,12 @@ private void checkError() { } /** - * Computes the current I/O state based on queue and in-flight status. + * Computes the current I/O state based on queue, in-flight, and ping status. */ private IoState computeState(boolean hasInFlight) { if (!isPendingEmpty()) { return IoState.ACTIVE; - } else if (hasInFlight) { + } else if (hasInFlight || pingDeadlineNanos > 0) { return IoState.DRAINING; } else { return IoState.IDLE; @@ -449,6 +520,20 @@ private void ioLoop() { try { int drainIdleCycles = 0; while (running || !isPendingEmpty()) { + // Send a pending PING if requested + if (pingRequested) { + pingRequested = false; + pongReceived = false; + pingDeadlineNanos = System.nanoTime() + pingTimeoutMs * 1_000_000L; + try { + client.sendPing(1000); + } catch (Exception e) { + pingDeadlineNanos = 0; + failConnection(new LineSenderException("Ping failed", e)); + completePing(); + } + } + MicrobatchBuffer batch = null; boolean hasInFlight = (inFlightWindow != null && inFlightWindow.getInFlightCount() > 0); IoState state = computeState(hasInFlight); @@ -460,7 +545,7 @@ private void ioLoop() { // Nothing to do - wait for work under lock synchronized (processingLock) { // Re-check under lock to avoid missed wakeup - if (isPendingEmpty() && running) { + if (isPendingEmpty() && running && !pingRequested) { try { processingLock.wait(100); } catch (InterruptedException e) { @@ -473,10 +558,22 @@ private void ioLoop() { case ACTIVE: case DRAINING: // Try to receive any pending ACKs (non-blocking) - if (hasInFlight && client.isConnected()) { + if (client.isConnected()) { receivedAcks = tryReceiveAcks(); } + // Check if a pending PING has been answered + if (pingDeadlineNanos > 0) { + if (pongReceived) { + pingDeadlineNanos = 0; + completePing(); + } else if (System.nanoTime() >= pingDeadlineNanos) { + pingDeadlineNanos = 0; + failConnection(new LineSenderException("Ping timed out waiting for PONG")); + completePing(); + } + } + // Try to dequeue and send a batch boolean hasWindowSpace = (inFlightWindow == null || inFlightWindow.hasWindowSpace()); if (hasWindowSpace) { @@ -521,6 +618,13 @@ private void ioLoop() { } } + private void completePing() { + synchronized (processingLock) { + pingComplete = true; + processingLock.notifyAll(); + } + } + private boolean isPendingEmpty() { return pendingBuffer == null; } @@ -664,7 +768,8 @@ private class ResponseHandler implements WebSocketFrameHandler { @Override public void onBinaryMessage(long payloadPtr, int payloadLen) { - if (!WebSocketResponse.isStructurallyValid(payloadPtr, payloadLen)) { + // readFrom validates inline; a single pass parses and bounds-checks. + if (!response.readFrom(payloadPtr, payloadLen)) { LineSenderException error = new LineSenderException( "Invalid ACK response payload [length=" + payloadLen + ']' ); @@ -673,18 +778,9 @@ public void onBinaryMessage(long payloadPtr, int payloadLen) { return; } - // Parse response from binary payload - if (!response.readFrom(payloadPtr, payloadLen)) { - LineSenderException error = new LineSenderException("Failed to parse ACK response"); - LOG.error("Failed to parse response"); - failConnection(error); - return; - } - long sequence = response.getSequence(); if (response.isSuccess()) { - // Cumulative ACK - acknowledge all batches up to this sequence if (inFlightWindow != null) { int acked = inFlightWindow.acknowledgeUpTo(sequence); if (acked > 0) { @@ -696,6 +792,16 @@ public void onBinaryMessage(long payloadPtr, int payloadLen) { LOG.debug("ACK for already-acknowledged sequences [upTo={}]", sequence); } } + for (int i = 0, n = response.getTableEntryCount(); i < n; i++) { + advanceSeqTxn(committedSeqTxns, response.getTableName(i), response.getTableSeqTxn(i)); + } + } else if (response.isDurableAck()) { + for (int i = 0, n = response.getTableEntryCount(); i < n; i++) { + advanceSeqTxn(durableSeqTxns, response.getTableName(i), response.getTableSeqTxn(i)); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Durable ACK received [tables={}]", response.getTableEntryCount()); + } } else { // Error - fail the batch String errorMessage = response.getErrorMessage(); @@ -714,5 +820,19 @@ public void onClose(int code, String reason) { LOG.info("WebSocket closed by server [code={}, reason={}]", code, reason); failConnection(new LineSenderException("WebSocket closed by server [code=" + code + ", reason=" + reason + ']')); } + + @Override + public void onPong(long payloadPtr, int payloadLen) { + pongReceived = true; + } + } + + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + private static void advanceSeqTxn(CharSequenceLongHashMap map, String tableName, long seqTxn) { + synchronized (map) { + if (seqTxn > map.get(tableName)) { + map.put(tableName, seqTxn); + } + } } } diff --git a/core/src/main/java/io/questdb/client/std/CharSequenceLongHashMap.java b/core/src/main/java/io/questdb/client/std/CharSequenceLongHashMap.java new file mode 100644 index 00000000..95fa8c01 --- /dev/null +++ b/core/src/main/java/io/questdb/client/std/CharSequenceLongHashMap.java @@ -0,0 +1,112 @@ +/*+***************************************************************************** + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2026 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.client.std; + +import org.jetbrains.annotations.NotNull; + +import java.util.Arrays; + + +public class CharSequenceLongHashMap extends AbstractCharSequenceHashSet { + public static final long NO_ENTRY_VALUE = -1L; + private final ObjList list; + private final long noEntryValue; + private long[] values; + + public CharSequenceLongHashMap() { + this(8); + } + + public CharSequenceLongHashMap(int initialCapacity) { + this(initialCapacity, 0.4, NO_ENTRY_VALUE); + } + + public CharSequenceLongHashMap(int initialCapacity, double loadFactor, long noEntryValue) { + super(initialCapacity, loadFactor); + this.noEntryValue = noEntryValue; + this.list = new ObjList<>(capacity); + values = new long[keys.length]; + clear(); + } + + @Override + public final void clear() { + super.clear(); + list.clear(); + Arrays.fill(values, noEntryValue); + } + + public long get(@NotNull CharSequence key) { + return valueAt(keyIndex(key)); + } + + public ObjList keys() { + return list; + } + + public boolean put(@NotNull CharSequence key, long value) { + int index = keyIndex(key); + if (index < 0) { + values[-index - 1] = value; + return false; + } + final String keyString = Chars.toString(key); + putAt0(index, keyString, value); + list.add(keyString); + return true; + } + + public long valueAt(int index) { + int index1 = -index - 1; + return index < 0 ? values[index1] : noEntryValue; + } + + private void putAt0(int index, CharSequence key, long value) { + keys[index] = key; + values[index] = value; + if (--free == 0) { + rehash(); + } + } + + private void rehash() { + long[] oldValues = values; + CharSequence[] oldKeys = keys; + int size = capacity - free; + capacity = capacity * 2; + free = capacity - size; + mask = Numbers.ceilPow2((int) (capacity / loadFactor)) - 1; + this.keys = new CharSequence[mask + 1]; + this.values = new long[mask + 1]; + for (int i = oldKeys.length - 1; i > -1; i--) { + CharSequence key = oldKeys[i]; + if (key != null) { + final int index = keyIndex(key); + keys[index] = key; + values[index] = oldValues[i]; + } + } + } +} diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/InFlightWindowTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/InFlightWindowTest.java index 95587c95..40deb626 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/InFlightWindowTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/InFlightWindowTest.java @@ -394,6 +394,42 @@ public void testConcurrentAddAndCumulativeAck() throws Exception { assertEquals(numBatches, window.getTotalAcked()); } + @Test + public void testGetHighestAckedSequenceInitiallyMinusOne() { + InFlightWindow window = new InFlightWindow(8, 1000); + assertEquals(-1, window.getHighestAckedSequence()); + } + + @Test + public void testGetHighestAckedSequenceAdvancesOnAcknowledge() { + InFlightWindow window = new InFlightWindow(8, 1000); + + window.addInFlight(0); + window.addInFlight(1); + window.addInFlight(2); + + window.acknowledge(0); + assertEquals(0, window.getHighestAckedSequence()); + + window.acknowledgeUpTo(2); + assertEquals(2, window.getHighestAckedSequence()); + } + + @Test + public void testGetHighestAckedSequenceDoesNotRegress() { + InFlightWindow window = new InFlightWindow(8, 1000); + + window.addInFlight(0); + window.addInFlight(1); + + window.acknowledgeUpTo(1); + assertEquals(1, window.getHighestAckedSequence()); + + // Duplicate/lower ack should not regress + window.acknowledge(0); + assertEquals(1, window.getHighestAckedSequence()); + } + @Test public void testDefaultWindowSize() { InFlightWindow window = new InFlightWindow(); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketAckIntegrationTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketAckIntegrationTest.java index 5950b114..d33ce36f 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketAckIntegrationTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketAckIntegrationTest.java @@ -24,6 +24,7 @@ package io.questdb.client.test.cutlass.qwp.client; +import io.questdb.client.cutlass.line.LineSenderException; import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender; import io.questdb.client.cutlass.qwp.client.WebSocketResponse; import io.questdb.client.cutlass.qwp.websocket.WebSocketCloseCode; @@ -34,8 +35,13 @@ import org.junit.Test; import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; /** * Integration tests for QWP v1 WebSocket ACK delivery mechanism. @@ -195,17 +201,160 @@ public void testSyncFlushIgnoresPingAndWaitsForAck() throws Exception { } } + @Test + public void testDurableAckUpgradeHeaderNotSentByDefault() throws Exception { + int port = TEST_PORT + 31; + AtomicReference capturedRequest = new AtomicReference<>(); + + try (ServerSocket serverSocket = new ServerSocket(port)) { + serverSocket.setSoTimeout(5000); + + Thread serverThread = new Thread(() -> { + try { + Socket client = serverSocket.accept(); + InputStream in = client.getInputStream(); + StringBuilder request = new StringBuilder(); + byte[] buf = new byte[1]; + while (true) { + int read = in.read(buf); + if (read <= 0) { + break; + } + request.append((char) buf[0]); + if (request.toString().endsWith("\r\n\r\n")) { + break; + } + } + capturedRequest.set(request.toString()); + client.close(); + } catch (Exception e) { + // expected + } + }); + serverThread.start(); + + try { + QwpWebSocketSender.connect("localhost", port, null, + 0, 0, 0, 1, null).close(); + } catch (LineSenderException e) { + // expected - server doesn't complete handshake + } + + serverThread.join(5000); + + String request = capturedRequest.get(); + Assert.assertNotNull("Server should have received upgrade request", request); + Assert.assertFalse("Request should NOT contain X-QWP-Request-Durable-Ack header", + request.contains("X-QWP-Request-Durable-Ack")); + } + } + + @Test + public void testDurableAckUpgradeHeaderSent() throws Exception { + int port = TEST_PORT + 30; + AtomicReference capturedRequest = new AtomicReference<>(); + + try (ServerSocket serverSocket = new ServerSocket(port)) { + serverSocket.setSoTimeout(5000); + + Thread serverThread = new Thread(() -> { + try { + Socket client = serverSocket.accept(); + InputStream in = client.getInputStream(); + StringBuilder request = new StringBuilder(); + byte[] buf = new byte[1]; + while (true) { + int read = in.read(buf); + if (read <= 0) { + break; + } + request.append((char) buf[0]); + if (request.toString().endsWith("\r\n\r\n")) { + break; + } + } + capturedRequest.set(request.toString()); + client.close(); + } catch (Exception e) { + // expected + } + }); + serverThread.start(); + + try { + QwpWebSocketSender.connect("localhost", port, null, + 0, 0, 0, 1, null, + QwpWebSocketSender.DEFAULT_MAX_SCHEMAS_PER_CONNECTION, + true).close(); + } catch (LineSenderException e) { + // expected - server doesn't complete handshake + } + + serverThread.join(5000); + + String request = capturedRequest.get(); + Assert.assertNotNull("Server should have received upgrade request", request); + Assert.assertTrue("Request should contain X-QWP-Request-Durable-Ack header", + request.contains("X-QWP-Request-Durable-Ack: true")); + } + } + + @Test + public void testSyncDurableAckDuringWaitForAck() throws Exception { + int port = TEST_PORT + 25; + DurableAckThenStatusOkHandler handler = new DurableAckThenStatusOkHandler(); + + try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + server.start(); + Assert.assertTrue("Server failed to start", server.awaitStart(5, TimeUnit.SECONDS)); + + // window=1 for sync mode + try (QwpWebSocketSender sender = QwpWebSocketSender.connect( + "localhost", port, null, 0, 0, 0, 1, null)) { + sender.table("trades") + .longColumn("price", 100) + .atNow(); + sender.flush(); + + Assert.assertEquals(42L, sender.getHighestDurableSeqTxn("trades")); + Assert.assertEquals(10L, sender.getHighestAckedSeqTxn("trades")); + } + } + } + + @Test + public void testSyncFlushUpdatesCommittedSeqTxnsWithTableEntries() throws Exception { + int port = TEST_PORT + 24; + AckWithTableEntriesHandler handler = new AckWithTableEntriesHandler(); + + try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + server.start(); + Assert.assertTrue("Server failed to start", server.awaitStart(5, TimeUnit.SECONDS)); + + // window=1 for sync mode + try (QwpWebSocketSender sender = QwpWebSocketSender.connect( + "localhost", port, null, 0, 0, 0, 1, null)) { + sender.table("trades") + .longColumn("price", 100) + .atNow(); + sender.flush(); + + Assert.assertEquals(10L, sender.getHighestAckedSeqTxn("trades")); + Assert.assertEquals(20L, sender.getHighestAckedSeqTxn("orders")); + Assert.assertEquals(-1L, sender.getHighestAckedSeqTxn("other")); + } + } + } + /** * Creates a binary ACK response using WebSocketResponse format. - * Format: status (1 byte) + sequence (8 bytes little-endian) + * Format: status (1) + sequence (8) + tableCount (2, zero entries) */ private static byte[] createAckResponse(long sequence) { - byte[] response = new byte[WebSocketResponse.MIN_RESPONSE_SIZE]; + byte[] response = new byte[WebSocketResponse.MIN_OK_RESPONSE_SIZE]; - // Status OK (0) response[0] = WebSocketResponse.STATUS_OK; - // Sequence (little-endian) response[1] = (byte) (sequence & 0xFF); response[2] = (byte) ((sequence >> 8) & 0xFF); response[3] = (byte) ((sequence >> 16) & 0xFF); @@ -215,9 +364,82 @@ private static byte[] createAckResponse(long sequence) { response[7] = (byte) ((sequence >> 48) & 0xFF); response[8] = (byte) ((sequence >> 56) & 0xFF); + // tableCount = 0 + response[9] = 0; + response[10] = 0; + + return response; + } + + private static byte[] createAckResponseWithTables(long sequence, String[] tableNames, long[] seqTxns) { + byte[][] nameBytes = new byte[tableNames.length][]; + int size = 1 + 8 + 2; + for (int i = 0; i < tableNames.length; i++) { + nameBytes[i] = tableNames[i].getBytes(StandardCharsets.UTF_8); + size += 2 + nameBytes[i].length + 8; + } + + byte[] response = new byte[size]; + int offset = 0; + response[offset++] = WebSocketResponse.STATUS_OK; + for (int i = 0; i < 8; i++) { + response[offset++] = (byte) ((sequence >> (i * 8)) & 0xFF); + } + response[offset++] = (byte) (tableNames.length & 0xFF); + response[offset++] = (byte) ((tableNames.length >> 8) & 0xFF); + for (int i = 0; i < tableNames.length; i++) { + response[offset++] = (byte) (nameBytes[i].length & 0xFF); + response[offset++] = (byte) ((nameBytes[i].length >> 8) & 0xFF); + System.arraycopy(nameBytes[i], 0, response, offset, nameBytes[i].length); + offset += nameBytes[i].length; + for (int j = 0; j < 8; j++) { + response[offset++] = (byte) ((seqTxns[i] >> (j * 8)) & 0xFF); + } + } return response; } + private static byte[] createDurableAckResponse(String[] tableNames, long[] seqTxns) { + byte[][] nameBytes = new byte[tableNames.length][]; + int size = 1 + 2; + for (int i = 0; i < tableNames.length; i++) { + nameBytes[i] = tableNames[i].getBytes(StandardCharsets.UTF_8); + size += 2 + nameBytes[i].length + 8; + } + + byte[] response = new byte[size]; + int offset = 0; + response[offset++] = WebSocketResponse.STATUS_DURABLE_ACK; + response[offset++] = (byte) (tableNames.length & 0xFF); + response[offset++] = (byte) ((tableNames.length >> 8) & 0xFF); + for (int i = 0; i < tableNames.length; i++) { + response[offset++] = (byte) (nameBytes[i].length & 0xFF); + response[offset++] = (byte) ((nameBytes[i].length >> 8) & 0xFF); + System.arraycopy(nameBytes[i], 0, response, offset, nameBytes[i].length); + offset += nameBytes[i].length; + for (int j = 0; j < 8; j++) { + response[offset++] = (byte) ((seqTxns[i] >> (j * 8)) & 0xFF); + } + } + return response; + } + + private static class AckWithTableEntriesHandler implements TestWebSocketServer.WebSocketServerHandler { + private final AtomicLong nextSequence = new AtomicLong(0); + + @Override + public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) { + long sequence = nextSequence.getAndIncrement(); + try { + client.sendBinary(createAckResponseWithTables(sequence, + new String[]{"trades", "orders"}, + new long[]{10L, 20L})); + } catch (IOException e) { + LOG.error("Failed to send ACK with tables", e); + } + } + } + private static class ClosingServerHandler implements TestWebSocketServer.WebSocketServerHandler { @Override public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) { @@ -259,6 +481,27 @@ public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] dat } } + private static class DurableAckThenStatusOkHandler implements TestWebSocketServer.WebSocketServerHandler { + private final AtomicLong nextSequence = new AtomicLong(0); + + @Override + public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) { + long sequence = nextSequence.getAndIncrement(); + try { + // Send durable ACK first + client.sendBinary(createDurableAckResponse( + new String[]{"trades"}, + new long[]{42L})); + // Then send STATUS_OK with committed seqTxns + client.sendBinary(createAckResponseWithTables(sequence, + new String[]{"trades"}, + new long[]{10L})); + } catch (IOException e) { + LOG.error("Failed to send ACK frames", e); + } + } + } + private static class InvalidAckPayloadHandler implements TestWebSocketServer.WebSocketServerHandler { @Override public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) { diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderStateTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderStateTest.java index 23ed4774..01ef97d1 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderStateTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderStateTest.java @@ -24,10 +24,17 @@ package io.questdb.client.test.cutlass.qwp.client; +import io.questdb.client.DefaultHttpClientConfiguration; +import io.questdb.client.cutlass.http.client.WebSocketClient; +import io.questdb.client.cutlass.http.client.WebSocketFrameHandler; import io.questdb.client.cutlass.line.LineSenderException; import io.questdb.client.cutlass.qwp.client.InFlightWindow; import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender; +import io.questdb.client.cutlass.qwp.client.WebSocketResponse; import io.questdb.client.cutlass.qwp.protocol.QwpTableBuffer; +import io.questdb.client.network.PlainSocketFactory; +import io.questdb.client.std.MemoryTag; +import io.questdb.client.std.Unsafe; import io.questdb.client.test.AbstractTest; import org.junit.Assert; import org.junit.Test; @@ -35,6 +42,9 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; import static io.questdb.client.test.tools.TestUtils.assertMemoryLeak; @@ -53,10 +63,9 @@ public class QwpWebSocketSenderStateTest extends AbstractTest { @Test public void testConnectionFailureIsSenderLevelTerminalState() throws Exception { assertMemoryLeak(() -> { - QwpWebSocketSender sender = QwpWebSocketSender.createForTesting( + try (QwpWebSocketSender sender = QwpWebSocketSender.createForTesting( "localhost", 0, 10_000, 0, 0L, 8 - ); - try { + )) { LineSenderException failure = new LineSenderException( "Server error for batch 7: WRITE_ERROR - disk full" ); @@ -80,12 +89,256 @@ public void testConnectionFailureIsSenderLevelTerminalState() throws Exception { Assert.assertSame(failure, e); assertStackContains(e, "flush"); } + } + }); + } + + @Test + public void testConnectWithDurableAckToClosedPort() throws Exception { + assertMemoryLeak(() -> { + try { + QwpWebSocketSender.connect( + "127.0.0.1", 1, null, + QwpWebSocketSender.DEFAULT_AUTO_FLUSH_ROWS, + QwpWebSocketSender.DEFAULT_AUTO_FLUSH_BYTES, + QwpWebSocketSender.DEFAULT_AUTO_FLUSH_INTERVAL_NANOS, + 1, null, + QwpWebSocketSender.DEFAULT_MAX_SCHEMAS_PER_CONNECTION, + true + ).close(); + Assert.fail("Expected LineSenderException"); + } catch (LineSenderException e) { + Assert.assertTrue(e.getMessage().contains("Failed to connect")); + } + }); + } + + @Test + public void testGetHighestDurableSeqTxnDefaultsToMinusOne() throws Exception { + assertMemoryLeak(() -> { + try (QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1)) { + Assert.assertEquals(-1L, sender.getHighestDurableSeqTxn("any_table")); + } + }); + } + + @Test + public void testGetHighestAckedSeqTxnDefaultsToMinusOne() throws Exception { + assertMemoryLeak(() -> { + try (QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1)) { + Assert.assertEquals(-1L, sender.getHighestAckedSeqTxn("any_table")); + } + }); + } + + @Test + public void testSetRequestDurableAckBeforeConnect() throws Exception { + assertMemoryLeak(() -> { + try (QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1)) { + // Must not throw before connection is established + sender.setRequestDurableAck(true); + sender.setRequestDurableAck(false); + } + }); + } + + @Test + public void testSetRequestDurableAckAfterConnectThrows() throws Exception { + assertMemoryLeak(() -> { + QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1); + try { + setField(sender, "connected", true); + try { + sender.setRequestDurableAck(true); + Assert.fail("Expected exception for setRequestDurableAck after connect"); + } catch (LineSenderException e) { + Assert.assertTrue(e.getMessage().contains("before the first send")); + } } finally { + setField(sender, "connected", false); sender.close(); } }); } + @Test + public void testSetRequestDurableAckOnClosedSenderThrows() throws Exception { + assertMemoryLeak(() -> { + QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1); + sender.close(); + try { + sender.setRequestDurableAck(true); + Assert.fail("Expected exception for setRequestDurableAck on closed sender"); + } catch (LineSenderException e) { + Assert.assertTrue(e.getMessage().contains("closed")); + } + }); + } + + @Test + public void testPingAfterCloseThrows() throws Exception { + assertMemoryLeak(() -> { + QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1); + sender.close(); + try { + sender.ping(); + Assert.fail("Expected exception"); + } catch (LineSenderException e) { + Assert.assertTrue(e.getMessage().contains("closed")); + } + }); + } + + @Test + public void testSyncPingProcessesDurableAck() throws Exception { + assertMemoryLeak(() -> { + QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1); + PingTestClient client = new PingTestClient(); + try { + client.frameSequence.add(handler -> emitBinaryResponse(handler, WebSocketResponse.durableAck("trades", 5))); + client.frameSequence.add(handler -> handler.onPong(0, 0)); + + setField(sender, "client", client); + setField(sender, "connected", true); + setField(sender, "inFlightWindow", new InFlightWindow(1, InFlightWindow.DEFAULT_TIMEOUT_MS)); + + sender.ping(); + + Assert.assertTrue(client.pingSent); + Assert.assertEquals(5L, sender.getHighestDurableSeqTxn("trades")); + } finally { + setField(sender, "client", null); + setField(sender, "connected", false); + sender.close(); + client.close(); + } + }); + } + + @Test + public void testSyncPingProcessesStatusOk() throws Exception { + assertMemoryLeak(() -> { + QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1); + PingTestClient client = new PingTestClient(); + try { + client.frameSequence.add(handler -> emitBinaryResponse(handler, WebSocketResponse.success(3))); + client.frameSequence.add(handler -> handler.onPong(0, 0)); + + setField(sender, "client", client); + setField(sender, "connected", true); + InFlightWindow window = new InFlightWindow(8, InFlightWindow.DEFAULT_TIMEOUT_MS); + window.addInFlight(0); + window.addInFlight(1); + window.addInFlight(2); + window.addInFlight(3); + setField(sender, "inFlightWindow", window); + + sender.ping(); + + Assert.assertTrue(client.pingSent); + Assert.assertEquals(0, window.getInFlightCount()); + } finally { + setField(sender, "client", null); + setField(sender, "connected", false); + sender.close(); + client.close(); + } + }); + } + + @Test + public void testSyncPingSurfacesServerErrorFrame() throws Exception { + // Regression: syncPing used to branch only on isDurableAck() / + // isSuccess(). Any error frame (parse / schema / security / internal + // / write error) arriving between PING and PONG was parsed into + // ackResponse, neither branch fired, and the error was silently + // discarded. A caller using ping() to confirm "all my batches + // landed" would get a false affirmative; the error only surfaced + // on the next flush's waitForAck. + // + // Fix: capture the first error during the ping round and throw it + // after PONG so ping() callers see the failure directly. Also route + // through inFlightWindow.fail so subsequent waitForAck / flush + // calls re-observe it. Frames arriving between the error and PONG + // are still processed so durable/committed progress is preserved. + assertMemoryLeak(() -> { + // inFlightWindowSize=1 routes ping() through syncPing (the code under test). + // The injected inFlightWindow can still hold multiple batches. + QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1); + PingTestClient client = new PingTestClient(); + try { + // Server sends an error frame for seq=2, a durable ack, then PONG. + client.frameSequence.add(handler -> emitBinaryResponse( + handler, + WebSocketResponse.error(2L, WebSocketResponse.STATUS_SCHEMA_MISMATCH, "column type mismatch") + )); + client.frameSequence.add(handler -> emitBinaryResponse(handler, WebSocketResponse.durableAck("trades", 9))); + client.frameSequence.add(handler -> handler.onPong(0, 0)); + + setField(sender, "client", client); + setField(sender, "connected", true); + InFlightWindow window = new InFlightWindow(8, InFlightWindow.DEFAULT_TIMEOUT_MS); + window.addInFlight(0); + window.addInFlight(1); + window.addInFlight(2); + setField(sender, "inFlightWindow", window); + + try { + sender.ping(); + Assert.fail("syncPing must throw on server error frame"); + } catch (LineSenderException expected) { + Assert.assertTrue( + "error message must be propagated from the server frame", + expected.getMessage() != null && expected.getMessage().contains("column type mismatch") + ); + } + + Assert.assertTrue(client.pingSent); + // Durable progress observed before the throw must be preserved. + Assert.assertEquals(9L, sender.getHighestDurableSeqTxn("trades")); + // Error is also recorded on the window so the next waitForAck / flush sees it. + Throwable err = window.getLastError(); + Assert.assertNotNull( + "syncPing must also record the error on the inFlightWindow", + err + ); + Assert.assertTrue(err instanceof LineSenderException); + Assert.assertTrue( + err.getMessage() != null && err.getMessage().contains("column type mismatch") + ); + } finally { + setField(sender, "client", null); + setField(sender, "connected", false); + sender.close(); + client.close(); + } + }); + } + + @Test + public void testSyncPingReturnsOnPong() throws Exception { + assertMemoryLeak(() -> { + QwpWebSocketSender sender = QwpWebSocketSender.createForTesting("localhost", 0, 1); + PingTestClient client = new PingTestClient(); + try { + client.frameSequence.add(handler -> handler.onPong(0, 0)); + + setField(sender, "client", client); + setField(sender, "connected", true); + setField(sender, "inFlightWindow", new InFlightWindow(1, InFlightWindow.DEFAULT_TIMEOUT_MS)); + + sender.ping(); + + Assert.assertTrue(client.pingSent); + } finally { + setField(sender, "client", null); + setField(sender, "connected", false); + sender.close(); + client.close(); + } + }); + } + @Test public void testAutoFlushAccumulatesRowsAcrossAllTables() throws Exception { assertMemoryLeak(() -> { @@ -377,4 +630,56 @@ private static void setField(Object target, String fieldName, Object value) thro f.setAccessible(true); f.set(target, value); } + + private static void emitBinaryResponse(WebSocketFrameHandler handler, WebSocketResponse response) { + int size = response.serializedSize(); + long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT); + try { + response.writeTo(ptr); + handler.onBinaryMessage(ptr, size); + } finally { + Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT); + } + } + + private static class PingTestClient extends WebSocketClient { + final List> frameSequence = new ArrayList<>(); + boolean pingSent = false; + private int nextFrame = 0; + + PingTestClient() { + super(DefaultHttpClientConfiguration.INSTANCE, PlainSocketFactory.INSTANCE); + } + + @Override + public boolean isConnected() { + return true; + } + + @Override + public boolean receiveFrame(WebSocketFrameHandler handler, int timeout) { + if (nextFrame < frameSequence.size()) { + frameSequence.get(nextFrame++).accept(handler); + return true; + } + return false; + } + + @Override + public void sendBinary(long dataPtr, int length) { + } + + @Override + public void sendPing(int timeout) { + pingSent = true; + } + + @Override + protected void ioWait(int timeout, int op) { + } + + @Override + protected void setupIoWait() { + } + } } diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketResponseTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketResponseTest.java new file mode 100644 index 00000000..b70bdc4f --- /dev/null +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketResponseTest.java @@ -0,0 +1,434 @@ +/*+***************************************************************************** + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2026 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.client.test.cutlass.qwp.client; + +import io.questdb.client.cutlass.qwp.client.WebSocketResponse; +import io.questdb.client.std.MemoryTag; +import io.questdb.client.std.Unsafe; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; + +import static io.questdb.client.test.tools.TestUtils.assertMemoryLeak; + +public class WebSocketResponseTest { + + @Test + public void testDurableAckFactory() throws Exception { + assertMemoryLeak(() -> { + WebSocketResponse response = WebSocketResponse.durableAck("trades", 42L); + Assert.assertTrue(response.isDurableAck()); + Assert.assertFalse(response.isSuccess()); + Assert.assertEquals(1, response.getTableEntryCount()); + Assert.assertEquals("trades", response.getTableName(0)); + Assert.assertEquals(42L, response.getTableSeqTxn(0)); + Assert.assertEquals(WebSocketResponse.STATUS_DURABLE_ACK, response.getStatus()); + Assert.assertEquals("DURABLE_ACK", response.getStatusName()); + Assert.assertNull(response.getErrorMessage()); + }); + } + + @Test + public void testDurableAckIsStructurallyValid() throws Exception { + assertMemoryLeak(() -> { + WebSocketResponse response = WebSocketResponse.durableAck("t", 7L); + int size = response.serializedSize(); + + long ptr = Unsafe.malloc(size + 1, MemoryTag.NATIVE_DEFAULT); + try { + response.writeTo(ptr); + Assert.assertTrue(WebSocketResponse.isStructurallyValid(ptr, size)); + Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, size + 1)); + Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, size - 1)); + } finally { + Unsafe.free(ptr, size + 1, MemoryTag.NATIVE_DEFAULT); + } + }); + } + + @Test + public void testDurableAckRoundTripThroughNativeMemory() throws Exception { + assertMemoryLeak(() -> { + WebSocketResponse original = WebSocketResponse.durableAck("orders", 12345L); + int size = original.serializedSize(); + long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT); + try { + original.writeTo(ptr); + + WebSocketResponse parsed = new WebSocketResponse(); + Assert.assertTrue(parsed.readFrom(ptr, size)); + Assert.assertTrue(parsed.isDurableAck()); + Assert.assertFalse(parsed.isSuccess()); + Assert.assertEquals(1, parsed.getTableEntryCount()); + Assert.assertEquals("orders", parsed.getTableName(0)); + Assert.assertEquals(12345L, parsed.getTableSeqTxn(0)); + Assert.assertNull(parsed.getErrorMessage()); + } finally { + Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT); + } + }); + } + + @Test + public void testDurableAckDoesNotCarryErrorMessage() throws Exception { + assertMemoryLeak(() -> { + WebSocketResponse response = WebSocketResponse.durableAck("t", 99L); + int size = response.serializedSize(); + long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT); + try { + response.writeTo(ptr); + + WebSocketResponse parsed = new WebSocketResponse(); + Assert.assertTrue(parsed.readFrom(ptr, size)); + Assert.assertNull(parsed.getErrorMessage()); + } finally { + Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT); + } + }); + } + + @Test + public void testSuccessIsNotDurableAck() throws Exception { + assertMemoryLeak(() -> { + WebSocketResponse response = WebSocketResponse.success(10L); + Assert.assertTrue(response.isSuccess()); + Assert.assertFalse(response.isDurableAck()); + Assert.assertEquals("OK", response.getStatusName()); + }); + } + + @Test + public void testErrorIsNotDurableAck() throws Exception { + assertMemoryLeak(() -> { + WebSocketResponse response = WebSocketResponse.error( + 5L, WebSocketResponse.STATUS_PARSE_ERROR, "bad input"); + Assert.assertFalse(response.isDurableAck()); + Assert.assertFalse(response.isSuccess()); + Assert.assertEquals("PARSE_ERROR", response.getStatusName()); + }); + } + + @Test + public void testSuccessRoundTripUnchanged() throws Exception { + assertMemoryLeak(() -> { + WebSocketResponse original = WebSocketResponse.success(77L); + int size = original.serializedSize(); + long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT); + try { + original.writeTo(ptr); + + WebSocketResponse parsed = new WebSocketResponse(); + Assert.assertTrue(parsed.readFrom(ptr, size)); + Assert.assertTrue(parsed.isSuccess()); + Assert.assertFalse(parsed.isDurableAck()); + Assert.assertEquals(77L, parsed.getSequence()); + } finally { + Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT); + } + }); + } + + @Test + public void testSuccessWithTableEntriesRoundTrip() throws Exception { + assertMemoryLeak(() -> { + // Build a STATUS_OK frame with 2 table entries directly in native memory + // Format: status(1) + sequence(8) + tableCount(2) + entries + byte[] name1 = "trades".getBytes(java.nio.charset.StandardCharsets.UTF_8); + byte[] name2 = "orders".getBytes(java.nio.charset.StandardCharsets.UTF_8); + int size = 1 + 8 + 2 + (2 + name1.length + 8) + (2 + name2.length + 8); + long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT); + try { + int offset = 0; + Unsafe.getUnsafe().putByte(ptr + offset, WebSocketResponse.STATUS_OK); + offset += 1; + Unsafe.getUnsafe().putLong(ptr + offset, 42L); + offset += 8; + Unsafe.getUnsafe().putShort(ptr + offset, (short) 2); + offset += 2; + // entry 1: trades, seqTxn=10 + Unsafe.getUnsafe().putShort(ptr + offset, (short) name1.length); + offset += 2; + for (int i = 0; i < name1.length; i++) { + Unsafe.getUnsafe().putByte(ptr + offset + i, name1[i]); + } + offset += name1.length; + Unsafe.getUnsafe().putLong(ptr + offset, 10L); + offset += 8; + // entry 2: orders, seqTxn=20 + Unsafe.getUnsafe().putShort(ptr + offset, (short) name2.length); + offset += 2; + for (int i = 0; i < name2.length; i++) { + Unsafe.getUnsafe().putByte(ptr + offset + i, name2[i]); + } + offset += name2.length; + Unsafe.getUnsafe().putLong(ptr + offset, 20L); + + Assert.assertTrue(WebSocketResponse.isStructurallyValid(ptr, size)); + + WebSocketResponse parsed = new WebSocketResponse(); + Assert.assertTrue(parsed.readFrom(ptr, size)); + Assert.assertTrue(parsed.isSuccess()); + Assert.assertFalse(parsed.isDurableAck()); + Assert.assertEquals(42L, parsed.getSequence()); + Assert.assertEquals(2, parsed.getTableEntryCount()); + Assert.assertEquals("trades", parsed.getTableName(0)); + Assert.assertEquals(10L, parsed.getTableSeqTxn(0)); + Assert.assertEquals("orders", parsed.getTableName(1)); + Assert.assertEquals(20L, parsed.getTableSeqTxn(1)); + Assert.assertNull(parsed.getErrorMessage()); + } finally { + Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT); + } + }); + } + + @Test + public void testEmptyTableNameRejected() throws Exception { + assertMemoryLeak(() -> { + // STATUS_OK with nameLen=0 - a well-behaved server never emits + // an empty table name; the frame must be rejected as structurally + // invalid so it cannot corrupt the per-table tracker. + int size = 1 + 8 + 2 + (2 + 8); + long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT); + try { + int offset = 0; + Unsafe.getUnsafe().putByte(ptr + offset, WebSocketResponse.STATUS_OK); + offset += 1; + Unsafe.getUnsafe().putLong(ptr + offset, 1L); + offset += 8; + Unsafe.getUnsafe().putShort(ptr + offset, (short) 1); + offset += 2; + Unsafe.getUnsafe().putShort(ptr + offset, (short) 0); + offset += 2; + Unsafe.getUnsafe().putLong(ptr + offset, 42L); + + Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, size)); + + WebSocketResponse parsed = new WebSocketResponse(); + Assert.assertFalse(parsed.readFrom(ptr, size)); + } finally { + Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT); + } + }); + } + + @Test + public void testErrorRoundTrip() throws Exception { + assertMemoryLeak(() -> { + WebSocketResponse original = WebSocketResponse.error( + 3L, WebSocketResponse.STATUS_INTERNAL_ERROR, "oops"); + int size = original.serializedSize(); + long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT); + try { + original.writeTo(ptr); + + WebSocketResponse parsed = new WebSocketResponse(); + Assert.assertTrue(parsed.readFrom(ptr, size)); + Assert.assertFalse(parsed.isSuccess()); + Assert.assertFalse(parsed.isDurableAck()); + Assert.assertEquals(3L, parsed.getSequence()); + Assert.assertEquals("oops", parsed.getErrorMessage()); + } finally { + Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT); + } + }); + } + + @Test + public void testLargeTableCountWithInsufficientPayload() throws Exception { + assertMemoryLeak(() -> { + int size = WebSocketResponse.MIN_OK_RESPONSE_SIZE; + long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT); + try { + Unsafe.getUnsafe().putByte(ptr, WebSocketResponse.STATUS_OK); + Unsafe.getUnsafe().putLong(ptr + 1, 1L); + Unsafe.getUnsafe().putShort(ptr + 9, (short) 1000); + + Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, size)); + + WebSocketResponse parsed = new WebSocketResponse(); + Assert.assertFalse(parsed.readFrom(ptr, size)); + } finally { + Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT); + } + }); + } + + @Test + public void testTrailingGarbageBytesStatusOk() throws Exception { + assertMemoryLeak(() -> { + WebSocketResponse original = WebSocketResponse.success(1L); + int validSize = original.serializedSize(); + int totalSize = validSize + 5; + long ptr = Unsafe.malloc(totalSize, MemoryTag.NATIVE_DEFAULT); + try { + original.writeTo(ptr); + for (int i = 0; i < 5; i++) { + Unsafe.getUnsafe().putByte(ptr + validSize + i, (byte) 0xFF); + } + + Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, totalSize)); + } finally { + Unsafe.free(ptr, totalSize, MemoryTag.NATIVE_DEFAULT); + } + }); + } + + @Test + public void testTrailingGarbageBytesWithTableEntries() throws Exception { + assertMemoryLeak(() -> { + byte[] name = "trades".getBytes(StandardCharsets.UTF_8); + int validSize = 1 + 8 + 2 + (2 + name.length + 8); + int totalSize = validSize + 3; + long ptr = Unsafe.malloc(totalSize, MemoryTag.NATIVE_DEFAULT); + try { + int offset = 0; + Unsafe.getUnsafe().putByte(ptr + offset, WebSocketResponse.STATUS_OK); + offset += 1; + Unsafe.getUnsafe().putLong(ptr + offset, 5L); + offset += 8; + Unsafe.getUnsafe().putShort(ptr + offset, (short) 1); + offset += 2; + Unsafe.getUnsafe().putShort(ptr + offset, (short) name.length); + offset += 2; + for (int i = 0; i < name.length; i++) { + Unsafe.getUnsafe().putByte(ptr + offset + i, name[i]); + } + offset += name.length; + Unsafe.getUnsafe().putLong(ptr + offset, 10L); + + Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, totalSize)); + + WebSocketResponse parsed = new WebSocketResponse(); + Assert.assertFalse(parsed.readFrom(ptr, totalSize)); + } finally { + Unsafe.free(ptr, totalSize, MemoryTag.NATIVE_DEFAULT); + } + }); + } + + @Test + public void testTruncatedTableEntriesDurableAck() throws Exception { + assertMemoryLeak(() -> { + byte[] name1 = "t1".getBytes(StandardCharsets.UTF_8); + // STATUS_DURABLE_ACK with tableCount=2 but only 1 entry + int size = 1 + 2 + (2 + name1.length + 8); + long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT); + try { + int offset = 0; + Unsafe.getUnsafe().putByte(ptr + offset, WebSocketResponse.STATUS_DURABLE_ACK); + offset += 1; + Unsafe.getUnsafe().putShort(ptr + offset, (short) 2); + offset += 2; + Unsafe.getUnsafe().putShort(ptr + offset, (short) name1.length); + offset += 2; + for (int i = 0; i < name1.length; i++) { + Unsafe.getUnsafe().putByte(ptr + offset + i, name1[i]); + } + offset += name1.length; + Unsafe.getUnsafe().putLong(ptr + offset, 10L); + + Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, size)); + + WebSocketResponse parsed = new WebSocketResponse(); + Assert.assertFalse(parsed.readFrom(ptr, size)); + } finally { + Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT); + } + }); + } + + @Test + public void testTruncatedTableEntriesStatusOk() throws Exception { + assertMemoryLeak(() -> { + byte[] name1 = "trades".getBytes(StandardCharsets.UTF_8); + // STATUS_OK with tableCount=2 but only 1 entry + int size = 1 + 8 + 2 + (2 + name1.length + 8); + long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT); + try { + int offset = 0; + Unsafe.getUnsafe().putByte(ptr + offset, WebSocketResponse.STATUS_OK); + offset += 1; + Unsafe.getUnsafe().putLong(ptr + offset, 1L); + offset += 8; + Unsafe.getUnsafe().putShort(ptr + offset, (short) 2); + offset += 2; + Unsafe.getUnsafe().putShort(ptr + offset, (short) name1.length); + offset += 2; + for (int i = 0; i < name1.length; i++) { + Unsafe.getUnsafe().putByte(ptr + offset + i, name1[i]); + } + offset += name1.length; + Unsafe.getUnsafe().putLong(ptr + offset, 10L); + + Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, size)); + + WebSocketResponse parsed = new WebSocketResponse(); + Assert.assertFalse(parsed.readFrom(ptr, size)); + } finally { + Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT); + } + }); + } + + @Test + public void testUnknownStatusByte() throws Exception { + assertMemoryLeak(() -> { + int size = WebSocketResponse.MIN_ERROR_RESPONSE_SIZE; + long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT); + try { + Unsafe.getUnsafe().putByte(ptr, (byte) 0xFF); + Unsafe.getUnsafe().putLong(ptr + 1, 1L); + Unsafe.getUnsafe().putShort(ptr + 9, (short) 0); + + // Unknown status falls through to error validation path + Assert.assertTrue(WebSocketResponse.isStructurallyValid(ptr, size)); + + WebSocketResponse parsed = new WebSocketResponse(); + Assert.assertTrue(parsed.readFrom(ptr, size)); + Assert.assertEquals((byte) 0xFF, parsed.getStatus()); + Assert.assertEquals("UNKNOWN(255)", parsed.getStatusName()); + } finally { + Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT); + } + }); + } + + @Test + public void testZeroLengthPayload() throws Exception { + assertMemoryLeak(() -> { + long ptr = Unsafe.malloc(1, MemoryTag.NATIVE_DEFAULT); + try { + Assert.assertFalse(WebSocketResponse.isStructurallyValid(ptr, 0)); + + WebSocketResponse parsed = new WebSocketResponse(); + Assert.assertFalse(parsed.readFrom(ptr, 0)); + } finally { + Unsafe.free(ptr, 1, MemoryTag.NATIVE_DEFAULT); + } + }); + } +} diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketSendQueueTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketSendQueueTest.java index 19b4753f..9d3e98e9 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketSendQueueTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketSendQueueTest.java @@ -36,9 +36,9 @@ import io.questdb.client.std.MemoryTag; import io.questdb.client.std.Os; import io.questdb.client.std.Unsafe; -import static io.questdb.client.test.tools.TestUtils.assertMemoryLeak; import org.junit.Test; +import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -46,20 +46,17 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import static io.questdb.client.test.tools.TestUtils.assertMemoryLeak; import static org.junit.Assert.*; public class WebSocketSendQueueTest { - + @Test public void testEnqueueTimeoutWhenPendingSlotOccupied() throws Exception { assertMemoryLeak(() -> { InFlightWindow window = new InFlightWindow(1, 1_000); - FakeWebSocketClient client = new FakeWebSocketClient(); - MicrobatchBuffer batch0 = sealedBuffer((byte) 1); - MicrobatchBuffer batch1 = sealedBuffer((byte) 2); WebSocketSendQueue queue = null; - - try { + try (FakeWebSocketClient client = new FakeWebSocketClient(); MicrobatchBuffer batch0 = sealedBuffer((byte) 1); MicrobatchBuffer batch1 = sealedBuffer((byte) 2)) { // Keep window full so I/O thread cannot drain pending slot. window.addInFlight(0); queue = new WebSocketSendQueue(client, window, 100, 500); @@ -74,9 +71,6 @@ public void testEnqueueTimeoutWhenPendingSlotOccupied() throws Exception { } finally { window.acknowledgeUpTo(Long.MAX_VALUE); closeQuietly(queue); - batch0.close(); - batch1.close(); - client.close(); } }); } @@ -85,12 +79,8 @@ public void testEnqueueTimeoutWhenPendingSlotOccupied() throws Exception { public void testEnqueueWaitsUntilSlotAvailable() throws Exception { assertMemoryLeak(() -> { InFlightWindow window = new InFlightWindow(1, 1_000); - FakeWebSocketClient client = new FakeWebSocketClient(); - MicrobatchBuffer batch0 = sealedBuffer((byte) 1); - MicrobatchBuffer batch1 = sealedBuffer((byte) 2); WebSocketSendQueue queue = null; - - try { + try (FakeWebSocketClient client = new FakeWebSocketClient(); MicrobatchBuffer batch0 = sealedBuffer((byte) 1); MicrobatchBuffer batch1 = sealedBuffer((byte) 2)) { window.addInFlight(0); queue = new WebSocketSendQueue(client, window, 2_000, 500); final WebSocketSendQueue finalQueue = queue; @@ -124,9 +114,6 @@ public void testEnqueueWaitsUntilSlotAvailable() throws Exception { } finally { window.acknowledgeUpTo(Long.MAX_VALUE); closeQuietly(queue); - batch0.close(); - batch1.close(); - client.close(); } }); } @@ -135,12 +122,10 @@ public void testEnqueueWaitsUntilSlotAvailable() throws Exception { public void testFlushFailsOnInvalidAckPayload() throws Exception { assertMemoryLeak(() -> { InFlightWindow window = new InFlightWindow(8, 5_000); - FakeWebSocketClient client = new FakeWebSocketClient(); WebSocketSendQueue queue = null; - CountDownLatch payloadDelivered = new CountDownLatch(1); - AtomicBoolean fired = new AtomicBoolean(false); - - try { + try (FakeWebSocketClient client = new FakeWebSocketClient()) { + CountDownLatch payloadDelivered = new CountDownLatch(1); + AtomicBoolean fired = new AtomicBoolean(false); window.addInFlight(0); client.setTryReceiveBehavior(handler -> { if (fired.compareAndSet(false, true)) { @@ -162,7 +147,6 @@ public void testFlushFailsOnInvalidAckPayload() throws Exception { } } finally { closeQuietly(queue); - client.close(); } }); } @@ -171,11 +155,9 @@ public void testFlushFailsOnInvalidAckPayload() throws Exception { public void testFlushFailsOnReceiveIoError() throws Exception { assertMemoryLeak(() -> { InFlightWindow window = new InFlightWindow(8, 5_000); - FakeWebSocketClient client = new FakeWebSocketClient(); WebSocketSendQueue queue = null; - CountDownLatch receiveAttempted = new CountDownLatch(1); - - try { + try (FakeWebSocketClient client = new FakeWebSocketClient()) { + CountDownLatch receiveAttempted = new CountDownLatch(1); window.addInFlight(0); client.setTryReceiveBehavior(handler -> { receiveAttempted.countDown(); @@ -198,7 +180,6 @@ public void testFlushFailsOnReceiveIoError() throws Exception { } } finally { closeQuietly(queue); - client.close(); } }); } @@ -206,11 +187,8 @@ public void testFlushFailsOnReceiveIoError() throws Exception { @Test public void testFlushFailsOnSendIoError() throws Exception { assertMemoryLeak(() -> { - FakeWebSocketClient client = new FakeWebSocketClient(); - MicrobatchBuffer batch = sealedBuffer((byte) 42); WebSocketSendQueue queue = null; - - try { + try (FakeWebSocketClient client = new FakeWebSocketClient(); MicrobatchBuffer batch = sealedBuffer((byte) 42)) { client.setSendBehavior((dataPtr, length) -> { throw new RuntimeException("send-fail"); }); @@ -228,8 +206,6 @@ public void testFlushFailsOnSendIoError() throws Exception { } } finally { closeQuietly(queue); - batch.close(); - client.close(); } }); } @@ -238,12 +214,10 @@ public void testFlushFailsOnSendIoError() throws Exception { public void testFlushFailsWhenServerClosesConnection() throws Exception { assertMemoryLeak(() -> { InFlightWindow window = new InFlightWindow(8, 5_000); - FakeWebSocketClient client = new FakeWebSocketClient(); WebSocketSendQueue queue = null; - CountDownLatch closeDelivered = new CountDownLatch(1); - AtomicBoolean fired = new AtomicBoolean(false); - - try { + try (FakeWebSocketClient client = new FakeWebSocketClient()) { + CountDownLatch closeDelivered = new CountDownLatch(1); + AtomicBoolean fired = new AtomicBoolean(false); window.addInFlight(0); client.setTryReceiveBehavior(handler -> { if (fired.compareAndSet(false, true)) { @@ -265,7 +239,6 @@ public void testFlushFailsWhenServerClosesConnection() throws Exception { } } finally { closeQuietly(queue); - client.close(); } }); } @@ -288,7 +261,7 @@ public void testEnqueueAfterServerErrorAckSurfacesServerError() throws Exception client.setTryReceiveBehavior(handler -> { long sent = highestSent.get(); if (sent >= 0 && fired.compareAndSet(false, true)) { - emitError(handler, sent, WebSocketResponse.STATUS_WRITE_ERROR, "disk full"); + emitError(handler, sent); errorDelivered.countDown(); return true; } @@ -397,11 +370,406 @@ public void testAwaitPendingAcksKeepsDrainNonBlocking() throws Exception { }); } + @Test + public void testStatusOkWithTableEntriesUpdatesCommittedSeqTxn() throws Exception { + assertMemoryLeak(() -> { + InFlightWindow window = new InFlightWindow(8, 5_000); + WebSocketSendQueue queue = null; + try (FakeWebSocketClient client = new FakeWebSocketClient()) { + CountDownLatch ackDelivered = new CountDownLatch(1); + AtomicBoolean fired = new AtomicBoolean(false); + window.addInFlight(0); + client.setTryReceiveBehavior(handler -> { + if (fired.compareAndSet(false, true)) { + emitAckWithTables(handler, + new String[]{"trades", "orders"}, + new long[]{10L, 20L}); + ackDelivered.countDown(); + return true; + } + return false; + }); + + queue = new WebSocketSendQueue(client, window, 1_000, 500); + assertTrue("Expected ACK callback", + ackDelivered.await(2, TimeUnit.SECONDS)); + + long deadline = System.currentTimeMillis() + 2_000; + while (queue.getCommittedSeqTxn("trades") < 0 + && System.currentTimeMillis() < deadline) { + Os.sleep(5); + } + + assertEquals(10L, queue.getCommittedSeqTxn("trades")); + assertEquals(20L, queue.getCommittedSeqTxn("orders")); + assertEquals(-1L, queue.getCommittedSeqTxn("other")); + assertEquals(0, window.getInFlightCount()); + } finally { + window.acknowledgeUpTo(Long.MAX_VALUE); + closeQuietly(queue); + } + }); + } + + @Test + public void testDurableAckUpdatesPerTableSeqTxn() throws Exception { + assertMemoryLeak(() -> { + InFlightWindow window = new InFlightWindow(8, 5_000); + WebSocketSendQueue queue = null; + try (FakeWebSocketClient client = new FakeWebSocketClient()) { + CountDownLatch durableDelivered = new CountDownLatch(1); + AtomicBoolean fired = new AtomicBoolean(false); + window.addInFlight(0); + client.setTryReceiveBehavior(handler -> { + if (fired.compareAndSet(false, true)) { + emitDurableAck(handler, "trades", 10); + durableDelivered.countDown(); + return true; + } + return false; + }); + + queue = new WebSocketSendQueue(client, window, 1_000, 500); + assertTrue("Expected durable ACK callback", + durableDelivered.await(2, TimeUnit.SECONDS)); + + long deadline = System.currentTimeMillis() + 2_000; + while (queue.getDurableSeqTxn("trades") < 0 && System.currentTimeMillis() < deadline) { + Os.sleep(5); + } + + assertEquals(10, queue.getDurableSeqTxn("trades")); + assertEquals(-1, queue.getDurableSeqTxn("other")); + assertEquals(1, window.getInFlightCount()); + } finally { + window.acknowledgeUpTo(Long.MAX_VALUE); + closeQuietly(queue); + } + }); + } + + @Test + public void testDurableAckIsMonotonic() throws Exception { + assertMemoryLeak(() -> { + InFlightWindow window = new InFlightWindow(8, 5_000); + WebSocketSendQueue queue = null; + try (FakeWebSocketClient client = new FakeWebSocketClient()) { + AtomicInteger callCount = new AtomicInteger(); + CountDownLatch allDelivered = new CountDownLatch(1); + window.addInFlight(0); + window.addInFlight(1); + window.addInFlight(2); + + client.setTryReceiveBehavior(handler -> { + int n = callCount.getAndIncrement(); + switch (n) { + case 0: + emitDurableAck(handler, "t", 20); + return true; + case 1: + emitDurableAck(handler, "t", 10); + allDelivered.countDown(); + return true; + default: + return false; + } + }); + + queue = new WebSocketSendQueue(client, window, 1_000, 500); + assertTrue(allDelivered.await(2, TimeUnit.SECONDS)); + + long deadline = System.currentTimeMillis() + 2_000; + while (queue.getDurableSeqTxn("t") < 20 && System.currentTimeMillis() < deadline) { + Os.sleep(5); + } + + assertEquals(20, queue.getDurableSeqTxn("t")); + } finally { + window.acknowledgeUpTo(Long.MAX_VALUE); + closeQuietly(queue); + } + }); + } + + @Test + public void testDurableAckInterleavedWithStatusOk() throws Exception { + assertMemoryLeak(() -> { + InFlightWindow window = new InFlightWindow(8, 5_000); + WebSocketSendQueue queue = null; + try (FakeWebSocketClient client = new FakeWebSocketClient()) { + AtomicInteger callCount = new AtomicInteger(); + CountDownLatch allDelivered = new CountDownLatch(1); + window.addInFlight(0); + window.addInFlight(1); + + client.setTryReceiveBehavior(handler -> { + int n = callCount.getAndIncrement(); + switch (n) { + case 0: + emitAck(handler, 0); + return true; + case 1: + emitDurableAck(handler, "t", 10); + return true; + case 2: + emitAck(handler, 1); + return true; + case 3: + emitDurableAck(handler, "t", 20); + allDelivered.countDown(); + return true; + default: + return false; + } + }); + + queue = new WebSocketSendQueue(client, window, 1_000, 500); + assertTrue(allDelivered.await(2, TimeUnit.SECONDS)); + + long deadline = System.currentTimeMillis() + 2_000; + while ((queue.getDurableSeqTxn("t") < 20 || window.getInFlightCount() > 0) + && System.currentTimeMillis() < deadline) { + Os.sleep(5); + } + + assertEquals(20, queue.getDurableSeqTxn("t")); + assertEquals(0, window.getInFlightCount()); + } finally { + window.acknowledgeUpTo(Long.MAX_VALUE); + closeQuietly(queue); + } + }); + } + + @Test + public void testPingBlocksUntilPong() throws Exception { + assertMemoryLeak(() -> { + InFlightWindow window = new InFlightWindow(8, 5_000); + WebSocketSendQueue queue = null; + try (FakeWebSocketClient client = new FakeWebSocketClient()) { + AtomicInteger callCount = new AtomicInteger(); + client.setTryReceiveBehavior(handler -> { + int n = callCount.getAndIncrement(); + switch (n) { + case 0: + emitDurableAck(handler, "t", 7); + return true; + case 1: + handler.onPong(0, 0); + return true; + default: + return false; + } + }); + + queue = new WebSocketSendQueue(client, window, 1_000, 500); + + queue.ping(); + + // After ping() returns, durable ACK must already be processed + assertEquals(7, queue.getDurableSeqTxn("t")); + } finally { + closeQuietly(queue); + } + }); + } + + @Test + public void testPingWithInFlightBatches() throws Exception { + assertMemoryLeak(() -> { + InFlightWindow window = new InFlightWindow(8, 5_000); + WebSocketSendQueue queue = null; + try (FakeWebSocketClient client = new FakeWebSocketClient()) { + window.addInFlight(0); + window.addInFlight(1); + + AtomicBoolean pingSent = new AtomicBoolean(false); + client.setPingSendBehavior(() -> pingSent.set(true)); + + AtomicInteger callCount = new AtomicInteger(); + client.setTryReceiveBehavior(handler -> { + int n = callCount.get(); + switch (n) { + case 0: + emitAck(handler, 1); + callCount.incrementAndGet(); + return true; + case 1: + emitDurableAck(handler, "t", 5); + callCount.incrementAndGet(); + return true; + case 2: + // Pong can only arrive in response to a PING + if (!pingSent.get()) { + return false; + } + handler.onPong(0, 0); + callCount.incrementAndGet(); + return true; + default: + return false; + } + }); + + queue = new WebSocketSendQueue(client, window, 1_000, 500); + + queue.ping(); + + assertEquals(0, window.getInFlightCount()); + assertEquals(5, queue.getDurableSeqTxn("t")); + } finally { + closeQuietly(queue); + } + }); + } + + @Test + public void testPingTimesOutWhenNoPong() throws Exception { + assertMemoryLeak(() -> { + InFlightWindow window = new InFlightWindow(8, 2_000); + WebSocketSendQueue queue = null; + try (FakeWebSocketClient client = new FakeWebSocketClient()) { + // Never emit a PONG + client.setTryReceiveBehavior(handler -> false); + + queue = new WebSocketSendQueue(client, window, 1_000, 500); + + try { + queue.ping(); + fail("Expected ping timeout"); + } catch (LineSenderException e) { + assertTrue(e.getMessage().contains("Ping timed out")); + } + } finally { + closeQuietly(queue); + } + }); + } + + @Test + public void testPingSurfacesTransportError() throws Exception { + assertMemoryLeak(() -> { + InFlightWindow window = new InFlightWindow(8, 5_000); + WebSocketSendQueue queue = null; + try (FakeWebSocketClient client = new FakeWebSocketClient()) { + client.setPingSendBehavior(() -> { + throw new RuntimeException("ping-send-fail"); + }); + + queue = new WebSocketSendQueue(client, window, 1_000, 500); + + try { + queue.ping(); + fail("Expected error from ping"); + } catch (LineSenderException e) { + assertTrue(e.getMessage().contains("Ping failed") + || e.getMessage().contains("Error in send queue")); + } + } finally { + closeQuietly(queue); + } + }); + } + + @Test + public void testConcurrentPingCallersEachGetTheirOwnPing() throws Exception { + // Without serialization, two concurrent ping() callers can both wake up on + // the same PONG and return — the second caller observes a durable watermark + // taken before its own PING was processed. The pingLock around ping() + // guarantees each caller sends its own PING and waits for its own PONG. + // + // To trigger the bug deterministically the I/O thread is held inside the + // first sendPing call until all caller threads are parked, so the buggy + // code has all of them in the synchronized(processingLock) block before + // any PONG is processed and only one or two PINGs are emitted in total. + assertMemoryLeak(() -> { + InFlightWindow window = new InFlightWindow(8, 5_000); + WebSocketSendQueue queue = null; + try (FakeWebSocketClient client = new FakeWebSocketClient()) { + AtomicInteger pingsSent = new AtomicInteger(); + AtomicInteger pendingPongs = new AtomicInteger(); + CountDownLatch firstPingBarrier = new CountDownLatch(1); + client.setPingSendBehavior(() -> { + int n = pingsSent.incrementAndGet(); + pendingPongs.incrementAndGet(); + if (n == 1) { + try { + firstPingBarrier.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }); + client.setTryReceiveBehavior(handler -> { + if (pendingPongs.get() > 0 && pendingPongs.decrementAndGet() >= 0) { + handler.onPong(0, 0); + return true; + } + return false; + }); + + queue = new WebSocketSendQueue(client, window, 5_000, 500); + final WebSocketSendQueue q = queue; + + int callerCount = 3; + CountDownLatch ready = new CountDownLatch(callerCount); + CountDownLatch start = new CountDownLatch(1); + AtomicReference err = new AtomicReference<>(); + Thread[] threads = new Thread[callerCount]; + for (int i = 0; i < callerCount; i++) { + threads[i] = new Thread(() -> { + try { + ready.countDown(); + start.await(); + q.ping(); + } catch (Throwable t) { + err.set(t); + } + }, "ping-caller-" + i); + threads[i].start(); + } + ready.await(); + start.countDown(); + // Wait until every caller is parked: either in processingLock.wait() + // (buggy path) or BLOCKED on pingLock (fixed path). + for (Thread t : threads) { + awaitThreadBlocked(t); + } + firstPingBarrier.countDown(); + for (Thread t : threads) { + t.join(10_000); + assertFalse("ping caller " + t.getName() + " did not complete", t.isAlive()); + } + if (err.get() != null) { + throw new AssertionError("ping caller threw", err.get()); + } + assertEquals("each concurrent caller must send its own PING", + callerCount, pingsSent.get()); + } finally { + closeQuietly(queue); + } + }); + } + + @Test + public void testDurableSeqTxnInitiallyMinusOne() throws Exception { + assertMemoryLeak(() -> { + InFlightWindow window = new InFlightWindow(8, 5_000); + WebSocketSendQueue queue = null; + try (FakeWebSocketClient client = new FakeWebSocketClient()) { + queue = new WebSocketSendQueue(client, window, 1_000, 500); + assertEquals(-1, queue.getDurableSeqTxn("any_table")); + } finally { + closeQuietly(queue); + } + }); + } + private static void awaitThreadBlocked(Thread thread) { long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); while (System.nanoTime() < deadline) { Thread.State state = thread.getState(); - if (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING) { + if (state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING || state == Thread.State.BLOCKED) { return; } Os.sleep(1); @@ -427,6 +795,51 @@ private static void emitBinary(WebSocketFrameHandler handler, byte[] payload) { } } + private static void emitDurableAck(WebSocketFrameHandler handler, String tableName, long seqTxn) { + WebSocketResponse response = WebSocketResponse.durableAck(tableName, seqTxn); + int size = response.serializedSize(); + long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT); + try { + response.writeTo(ptr); + handler.onBinaryMessage(ptr, size); + } finally { + Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT); + } + } + + private static void emitAckWithTables(WebSocketFrameHandler handler, + String[] tableNames, long[] seqTxns) { + byte[][] nameBytes = new byte[tableNames.length][]; + int size = 1 + 8 + 2; + for (int i = 0; i < tableNames.length; i++) { + nameBytes[i] = tableNames[i].getBytes(StandardCharsets.UTF_8); + size += 2 + nameBytes[i].length + 8; + } + long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT); + try { + int offset = 0; + Unsafe.getUnsafe().putByte(ptr + offset, WebSocketResponse.STATUS_OK); + offset += 1; + Unsafe.getUnsafe().putLong(ptr + offset, 0); + offset += 8; + Unsafe.getUnsafe().putShort(ptr + offset, (short) tableNames.length); + offset += 2; + for (int i = 0; i < tableNames.length; i++) { + Unsafe.getUnsafe().putShort(ptr + offset, (short) nameBytes[i].length); + offset += 2; + for (int j = 0; j < nameBytes[i].length; j++) { + Unsafe.getUnsafe().putByte(ptr + offset + j, nameBytes[i][j]); + } + offset += nameBytes[i].length; + Unsafe.getUnsafe().putLong(ptr + offset, seqTxns[i]); + offset += 8; + } + handler.onBinaryMessage(ptr, size); + } finally { + Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT); + } + } + private static void emitAck(WebSocketFrameHandler handler, long sequence) { WebSocketResponse response = WebSocketResponse.success(sequence); int size = response.serializedSize(); @@ -439,8 +852,8 @@ private static void emitAck(WebSocketFrameHandler handler, long sequence) { } } - private static void emitError(WebSocketFrameHandler handler, long sequence, byte status, String errorMessage) { - WebSocketResponse response = WebSocketResponse.error(sequence, status, errorMessage); + private static void emitError(WebSocketFrameHandler handler, long sequence) { + WebSocketResponse response = WebSocketResponse.error(sequence, WebSocketResponse.STATUS_WRITE_ERROR, "disk full"); int size = response.serializedSize(); long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT); try { @@ -474,6 +887,7 @@ private interface ReceiveBehavior { private static class FakeWebSocketClient extends WebSocketClient { private volatile TryReceiveBehavior behavior = handler -> false; private volatile boolean connected = true; + private volatile Runnable pingSendBehavior = () -> {}; private volatile ReceiveBehavior receiveBehavior = (handler, timeout) -> false; private volatile SendBehavior sendBehavior = (dataPtr, length) -> { }; @@ -498,6 +912,15 @@ public void sendBinary(long dataPtr, int length) { sendBehavior.send(dataPtr, length); } + @Override + public void sendPing(int timeout) { + pingSendBehavior.run(); + } + + public void setPingSendBehavior(Runnable pingSendBehavior) { + this.pingSendBehavior = pingSendBehavior; + } + public void setSendBehavior(SendBehavior sendBehavior) { this.sendBehavior = sendBehavior; } diff --git a/core/src/test/java/io/questdb/client/test/std/CharSequenceLongHashMapTest.java b/core/src/test/java/io/questdb/client/test/std/CharSequenceLongHashMapTest.java new file mode 100644 index 00000000..509c972d --- /dev/null +++ b/core/src/test/java/io/questdb/client/test/std/CharSequenceLongHashMapTest.java @@ -0,0 +1,152 @@ +/*+***************************************************************************** + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2026 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.client.test.std; + +import io.questdb.client.std.CharSequenceLongHashMap; +import io.questdb.client.std.ObjList; +import io.questdb.client.std.Rnd; +import org.junit.Assert; +import org.junit.Test; + +public class CharSequenceLongHashMapTest { + + @Test + public void testClear() { + CharSequenceLongHashMap map = new CharSequenceLongHashMap(); + map.put("a", 1); + map.put("b", 2); + Assert.assertEquals(2, map.size()); + + map.clear(); + Assert.assertEquals(0, map.size()); + Assert.assertEquals(CharSequenceLongHashMap.NO_ENTRY_VALUE, map.get("a")); + Assert.assertEquals(CharSequenceLongHashMap.NO_ENTRY_VALUE, map.get("b")); + } + + @Test + public void testContains() { + CharSequenceLongHashMap map = new CharSequenceLongHashMap(); + Rnd rnd = new Rnd(); + final int N = 1000; + + for (int i = 0; i < N; i++) { + String s = rnd.nextString(10).substring(1, 9); + map.put(s, i); + } + + ObjList keys = map.keys(); + for (int i = 0; i < keys.size(); i++) { + Assert.assertTrue(map.contains(keys.get(i))); + } + } + + @Test + public void testCustomNoEntryValue() { + long customNoEntry = -42L; + CharSequenceLongHashMap map = new CharSequenceLongHashMap(8, 0.4, customNoEntry); + Assert.assertEquals(customNoEntry, map.get("missing")); + map.put("x", 100); + Assert.assertEquals(100, map.get("x")); + Assert.assertEquals(customNoEntry, map.get("y")); + } + + @Test + public void testGetMissingKeyReturnsNoEntryValue() { + CharSequenceLongHashMap map = new CharSequenceLongHashMap(); + Assert.assertEquals(CharSequenceLongHashMap.NO_ENTRY_VALUE, map.get("missing")); + map.put("present", 42); + Assert.assertEquals(CharSequenceLongHashMap.NO_ENTRY_VALUE, map.get("absent")); + } + + @Test + public void testPutAndGet() { + Rnd rnd = new Rnd(); + CharSequenceLongHashMap map = new CharSequenceLongHashMap(); + final int N = 1000; + for (int i = 0; i < N; i++) { + CharSequence cs = rnd.nextString(15); + boolean isNew = map.put(cs, rnd.nextLong()); + Assert.assertTrue(isNew); + } + Assert.assertEquals(N, map.size()); + + rnd.reset(); + + // verify all values are retrievable + for (int i = 0; i < N; i++) { + CharSequence cs = rnd.nextString(15); + Assert.assertEquals(rnd.nextLong(), map.get(cs)); + } + } + + @Test + public void testPutUpdatesExistingKey() { + CharSequenceLongHashMap map = new CharSequenceLongHashMap(); + Assert.assertTrue(map.put("key", 1)); + Assert.assertFalse(map.put("key", 2)); + Assert.assertEquals(2, map.get("key")); + Assert.assertEquals(1, map.size()); + } + + @Test + public void testRehash() { + // use a small initial capacity to force multiple rehashes + CharSequenceLongHashMap map = new CharSequenceLongHashMap(8); + Rnd rnd = new Rnd(); + final int N = 500; + for (int i = 0; i < N; i++) { + map.put(rnd.nextString(15), rnd.nextLong()); + } + Assert.assertEquals(N, map.size()); + + // verify all values survive rehash + rnd.reset(); + for (int i = 0; i < N; i++) { + CharSequence cs = rnd.nextString(15); + long expected = rnd.nextLong(); + Assert.assertEquals(expected, map.get(cs)); + } + } + + @Test + public void testValueAtWithKeyIndex() { + CharSequenceLongHashMap map = new CharSequenceLongHashMap(); + map.put("alpha", 10); + map.put("beta", 20); + + int indexAlpha = map.keyIndex("alpha"); + Assert.assertTrue(indexAlpha < 0); + Assert.assertEquals(10, map.valueAt(indexAlpha)); + + int indexBeta = map.keyIndex("beta"); + Assert.assertTrue(indexBeta < 0); + Assert.assertEquals(20, map.valueAt(indexBeta)); + + // missing key returns a positive index + int indexMissing = map.keyIndex("gamma"); + Assert.assertTrue(indexMissing >= 0); + Assert.assertEquals(CharSequenceLongHashMap.NO_ENTRY_VALUE, map.valueAt(indexMissing)); + } +}