Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion core/src/main/java/io/questdb/client/Sender.java
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ public int getTimeout() {
private PrivateKey privateKey;
private int protocol = PARAMETER_NOT_SET_EXPLICITLY;
private int protocolVersion = PARAMETER_NOT_SET_EXPLICITLY;
private boolean requestDurableAck;
private int retryTimeoutMillis = PARAMETER_NOT_SET_EXPLICITLY;
private boolean shouldDestroyPrivKey;
private boolean tlsEnabled;
Expand Down Expand Up @@ -932,7 +933,8 @@ public Sender build() {
actualAutoFlushIntervalNanos,
actualInFlightWindowSize,
wsAuthHeader,
actualMaxSchemasPerConnection
actualMaxSchemasPerConnection,
requestDurableAck
);
}

Expand Down Expand Up @@ -1483,6 +1485,27 @@ public LineSenderBuilder protocolVersion(int protocolVersion) {
return this;
}

/**
* Opts the connection in for STATUS_DURABLE_ACK frames. When enabled,
* servers with primary replication will emit per-table durable-upload
* watermarks as WAL data reaches the object store.
* <p>
* This setting is only supported for WebSocket transport.
* <p>
* Observe durable progress via
* {@link QwpWebSocketSender#getHighestDurableSeqTxn(CharSequence)}.
*
* @param enabled true to request durable ACKs
* @return this instance for method chaining
*/
public LineSenderBuilder requestDurableAck(boolean enabled) {
if (protocol != PARAMETER_NOT_SET_EXPLICITLY && protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException("request_durable_ack is only supported for WebSocket transport");
}
this.requestDurableAck = enabled;
return this;
}

/**
* Configures the maximum time the Sender will spend retrying upon receiving a recoverable error from the server.
* <br>
Expand Down Expand Up @@ -1875,6 +1898,18 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) {
pos = getValue(configurationString, pos, sink, "in_flight_window");
int windowSize = parseIntValue(sink, "in_flight_window");
inFlightWindowSize(windowSize);
} else if (Chars.equals("request_durable_ack", sink)) {
if (protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException("request_durable_ack is only supported for WebSocket transport");
}
pos = getValue(configurationString, pos, sink, "request_durable_ack");
if (Chars.equalsIgnoreCase("on", sink)) {
requestDurableAck(true);
} else if (Chars.equalsIgnoreCase("off", sink)) {
requestDurableAck(false);
} else {
throw new LineSenderException("invalid request_durable_ack [value=").put(sink).put(", allowed-values=[on, off]]");
}
} else if (Chars.equals("max_schemas_per_connection", sink)) {
if (protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException("max_schemas_per_connection is only supported for WebSocket transport");
Expand Down Expand Up @@ -1972,6 +2007,9 @@ private void validateParameters() {
.put(", requestedCapacity=").put(bufferCapacity)
.put("]");
}
if (requestDurableAck && protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException("request_durable_ack is only supported for WebSocket transport");
}
if (protocol == PROTOCOL_HTTP) {
if (httpClientConfiguration.getMaximumRequestBufferSize() < httpClientConfiguration.getInitialRequestBufferSize()) {
throw new LineSenderException("maximum buffer capacity cannot be less than initial buffer capacity ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public abstract class WebSocketClient implements QuietCloseable {
// QWP version negotiation
private String qwpClientId;
private int qwpMaxVersion = 1;
// Opt-in for STATUS_DURABLE_ACK frames; sent as X-QWP-Request-Durable-Ack: true
private boolean qwpRequestDurableAck;
// Receive buffer (native memory)
private long recvBufPtr;
private int recvBufSize;
Expand Down Expand Up @@ -390,6 +392,16 @@ public void setQwpMaxVersion(int maxVersion) {
this.qwpMaxVersion = maxVersion;
}

/**
* Enables the opt-in X-QWP-Request-Durable-Ack upgrade header. When set,
* servers with primary replication configured will additionally emit
* STATUS_DURABLE_ACK frames as the WAL containing committed client
* messages reaches the object store.
*/
public void setQwpRequestDurableAck(boolean enabled) {
this.qwpRequestDurableAck = enabled;
}

/**
* Non-blocking attempt to receive a WebSocket frame.
* Returns immediately if no complete frame is available.
Expand Down Expand Up @@ -476,6 +488,9 @@ public void upgrade(CharSequence path, int timeout, CharSequence authorizationHe
sendBuffer.putAscii(qwpClientId);
sendBuffer.putAscii("\r\n");
}
if (qwpRequestDurableAck) {
sendBuffer.putAscii("X-QWP-Request-Durable-Ack: true\r\n");
}
if (authorizationHeader != null) {
sendBuffer.putAscii("Authorization: ");
sendBuffer.putAscii(authorizationHeader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,28 @@ public Throwable getLastError() {
return lastError.get();
}

/**
* Returns the highest batch sequence acknowledged by the server, or -1 if
* no acknowledgment has been received yet.
*/
public long getHighestAckedSequence() {
return highestAcked;
}

/**
* Returns the maximum window size.
*/
public int getMaxWindowSize() {
return maxWindowSize;
}

/**
* Returns the timeout (ms) applied to blocking window operations.
*/
public long getTimeoutMs() {
return timeoutMs;
}

/**
* Returns the total number of batches acknowledged.
*/
Expand Down
Loading
Loading