Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void testTimePartition() throws Exception {
}
timestatmps.forEach(
t -> {
long timePartitionId = TimePartitionUtils.getTimePartitionId(t);
long timePartitionId = TimePartitionUtils.getTimePartitionId(t, "root.sg1");
assertTrue(timePartitions.contains(timePartitionId));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public void testManageDatabase() {
String[] TTLs = new String[] {"INF"};
int[] schemaReplicaFactors = new int[] {1};
int[] dataReplicaFactors = new int[] {1};
long[] timePartitionOrigin = new long[] {0L};
int[] timePartitionInterval = new int[] {604800000};

// show
Expand Down Expand Up @@ -156,9 +157,10 @@ public void testManageDatabase() {
assertEquals(TTLs[cnt], resultSet.getString(2));
assertEquals(schemaReplicaFactors[cnt], resultSet.getInt(3));
assertEquals(dataReplicaFactors[cnt], resultSet.getInt(4));
assertEquals(timePartitionInterval[cnt], resultSet.getLong(5));
assertEquals(schemaRegionGroupNum[cnt], resultSet.getInt(6));
assertEquals(dataRegionGroupNum[cnt], resultSet.getInt(7));
assertEquals(timePartitionOrigin[cnt], resultSet.getLong(5));
assertEquals(timePartitionInterval[cnt], resultSet.getLong(6));
assertEquals(schemaRegionGroupNum[cnt], resultSet.getInt(7));
assertEquals(dataRegionGroupNum[cnt], resultSet.getInt(8));
cnt++;
}
assertEquals(databaseNames.length, cnt);
Expand Down Expand Up @@ -196,9 +198,10 @@ public void testManageDatabase() {

// Test create database with properties
statement.execute(
"create database test_prop with (ttl=300, schema_region_group_num=DEFAULT, time_partition_interval=100000)");
"create database test_prop with (ttl=300, schema_region_group_num=DEFAULT, time_partition_origin=2000, time_partition_interval=100000)");
databaseNames = new String[] {"test_prop"};
TTLs = new String[] {"300"};
timePartitionOrigin = new long[] {2000L};
timePartitionInterval = new int[] {100000};

// show
Expand All @@ -223,6 +226,23 @@ public void testManageDatabase() {
assertEquals(databaseNames.length, cnt);
}

try (final ResultSet resultSet = statement.executeQuery("SHOW DATABASES DETAILS")) {
int cnt = 0;
while (resultSet.next()) {
if (resultSet.getString(1).equals("information_schema")) {
continue;
}
assertEquals(databaseNames[cnt], resultSet.getString(1));
assertEquals(TTLs[cnt], resultSet.getString(2));
assertEquals(schemaReplicaFactors[cnt], resultSet.getInt(3));
assertEquals(dataReplicaFactors[cnt], resultSet.getInt(4));
assertEquals(timePartitionOrigin[cnt], resultSet.getLong(5));
assertEquals(timePartitionInterval[cnt], resultSet.getLong(6));
cnt++;
}
assertEquals(databaseNames.length, cnt);
}

try {
statement.execute("create database test_prop_2 with (non_exist_prop=DEFAULT)");
fail(
Expand Down Expand Up @@ -428,6 +448,7 @@ public void testInformationSchema() throws SQLException {
"ttl(ms),STRING,ATTRIBUTE,",
"schema_replication_factor,INT32,ATTRIBUTE,",
"data_replication_factor,INT32,ATTRIBUTE,",
"time_partition_origin,INT64,ATTRIBUTE,",
"time_partition_interval,INT64,ATTRIBUTE,",
"schema_region_group_num,INT32,ATTRIBUTE,",
"data_region_group_num,INT32,ATTRIBUTE,")));
Expand Down Expand Up @@ -638,11 +659,11 @@ public void testInformationSchema() throws SQLException {

TestUtils.assertResultSetEqual(
statement.executeQuery("select * from databases"),
"database,ttl(ms),schema_replication_factor,data_replication_factor,time_partition_interval,schema_region_group_num,data_region_group_num,",
"database,ttl(ms),schema_replication_factor,data_replication_factor,time_partition_origin,time_partition_interval,schema_region_group_num,data_region_group_num,",
new HashSet<>(
Arrays.asList(
"information_schema,INF,null,null,null,null,null,",
"test,INF,1,1,604800000,0,0,")));
"information_schema,INF,null,null,null,null,null,null,",
"test,INF,1,1,0,604800000,0,0,")));
TestUtils.assertResultSetEqual(
statement.executeQuery("show devices from tables where status = 'USING'"),
"database,table_name,ttl(ms),status,comment,table_type,",
Expand Down Expand Up @@ -844,8 +865,8 @@ public void testDBAuth() throws SQLException {
Collections.singleton("information_schema,INF,null,null,null,"));
TestUtils.assertResultSetEqual(
userStmt.executeQuery("select * from information_schema.databases"),
"database,ttl(ms),schema_replication_factor,data_replication_factor,time_partition_interval,schema_region_group_num,data_region_group_num,",
Collections.singleton("information_schema,INF,null,null,null,null,null,"));
"database,ttl(ms),schema_replication_factor,data_replication_factor,time_partition_origin,time_partition_interval,schema_region_group_num,data_region_group_num,",
Collections.singleton("information_schema,INF,null,null,null,null,null,null,"));
}

try (final Connection adminCon = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ databaseAttributeClause
databaseAttributeKey
: TTL
| TIME_PARTITION_INTERVAL
| TIME_PARTITION_ORIGIN
| SCHEMA_REGION_GROUP_NUM
| DATA_REGION_GROUP_NUM
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,10 @@ TIME_PARTITION_INTERVAL
: T I M E '_' P A R T I T I O N '_' I N T E R V A L
;

TIME_PARTITION_ORIGIN
: T I M E '_' P A R T I T I O N '_' O R I G I N
;

SCHEMA_REGION_GROUP_NUM
: S C H E M A '_' R E G I O N '_' G R O U P '_' N U M
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
Expand Down Expand Up @@ -234,6 +235,9 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
case SetTimePartitionInterval:
plan = new SetTimePartitionIntervalPlan();
break;
case SetTimePartitionOrigin:
plan = new SetTimePartitionOriginPlan();
break;
case AdjustMaxRegionGroupNum:
plan = new AdjustMaxRegionGroupNumPlan();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public enum ConfigPhysicalPlanType {
SetSchemaReplicationFactor((short) 202),
SetDataReplicationFactor((short) 203),
SetTimePartitionInterval((short) 204),
SetTimePartitionOrigin((short) 212),
AdjustMaxRegionGroupNum((short) 205),
DeleteDatabase((short) 206),
PreDeleteDatabase((short) 207),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.consensus.request.write.database;

import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

public class SetTimePartitionOriginPlan extends ConfigPhysicalPlan {

private String storageGroup;

private long timePartitionOrigin;

public SetTimePartitionOriginPlan() {
super(ConfigPhysicalPlanType.SetTimePartitionOrigin);
}

public SetTimePartitionOriginPlan(String storageGroup, long timePartitionOrigin) {
this();
this.storageGroup = storageGroup;
this.timePartitionOrigin = timePartitionOrigin;
}

public String getDatabase() {
return storageGroup;
}

public long getTimePartitionOrigin() {
return timePartitionOrigin;
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());

BasicStructureSerDeUtil.write(storageGroup, stream);
stream.writeLong(timePartitionOrigin);
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
storageGroup = BasicStructureSerDeUtil.readString(buffer);
timePartitionOrigin = buffer.getLong();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SetTimePartitionOriginPlan that = (SetTimePartitionOriginPlan) o;
return timePartitionOrigin == that.timePartitionOrigin
&& storageGroup.equals(that.storageGroup);
}

@Override
public int hashCode() {
return Objects.hash(storageGroup, timePartitionOrigin);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan;
import org.apache.iotdb.confignode.consensus.response.ainode.AINodeRegisterResp;
Expand Down Expand Up @@ -722,6 +723,16 @@ public TSStatus setTimePartitionInterval(
}
}

@Override
public TSStatus setTimePartitionOrigin(SetTimePartitionOriginPlan setTimePartitionOriginPlan) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return clusterSchemaManager.setTimePartitionOrigin(setTimePartitionOriginPlan);
} else {
return status;
}
}

@Override
public DataSet countMatchedDatabases(CountDatabasePlan countDatabasePlan) {
TSStatus status = confirmLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.cq.CQManager;
Expand Down Expand Up @@ -388,6 +389,8 @@ public interface IManager {

TSStatus setTimePartitionInterval(SetTimePartitionIntervalPlan configPhysicalPlan);

TSStatus setTimePartitionOrigin(SetTimePartitionOriginPlan configPhysicalPlan);

/**
* Count Databases.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan;
import org.apache.iotdb.confignode.consensus.request.write.table.PreAlterColumnDataTypePlan;
import org.apache.iotdb.confignode.consensus.request.write.table.SetTableColumnCommentPlan;
Expand Down Expand Up @@ -471,6 +472,17 @@ public TSStatus setTimePartitionInterval(
}
}

public TSStatus setTimePartitionOrigin(SetTimePartitionOriginPlan setTimePartitionOriginPlan) {
try {
return getConsensusManager().write(setTimePartitionOriginPlan);
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
return result;
}
}

/**
* Only leader use this interface. Adjust the maxSchemaRegionGroupNum and maxDataRegionGroupNum of
* each Database based on existing cluster resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionOriginPlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
Expand Down Expand Up @@ -439,6 +440,8 @@ public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan)
case SetTimePartitionInterval:
return clusterSchemaInfo.setTimePartitionInterval(
(SetTimePartitionIntervalPlan) physicalPlan);
case SetTimePartitionOrigin:
return clusterSchemaInfo.setTimePartitionOrigin((SetTimePartitionOriginPlan) physicalPlan);
case CreateRegionGroups:
return partitionInfo.createRegionGroups((CreateRegionGroupsPlan) physicalPlan);
case OfferRegionMaintainTasks:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ public Map<TSeriesPartitionSlot, TConsensusGroupId> getLastDataAllotTable() {
*/
public void autoCleanPartitionTable(long TTL, TTimePartitionSlot currentTimeSlot) {
long[] removedTimePartitionSlots =
dataPartitionTable.autoCleanPartitionTable(TTL, currentTimeSlot).stream()
dataPartitionTable.autoCleanPartitionTable(TTL, currentTimeSlot, databaseName).stream()
.map(TTimePartitionSlot::getStartTime)
.collect(Collectors.toList())
.stream()
Expand Down
Loading