Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ service StoreGrpcService {
rpc checkResourceCleanupForStoreCreation(ClusterStoreGrpcInfo) returns (ResourceCleanupCheckGrpcResponse) {}
rpc validateStoreDeleted(ValidateStoreDeletedGrpcRequest) returns (ValidateStoreDeletedGrpcResponse);
rpc listStores(ListStoresGrpcRequest) returns (ListStoresGrpcResponse);
rpc deleteStore(DeleteStoreGrpcRequest) returns (DeleteStoreGrpcResponse);
}

message CreateStoreGrpcRequest {
Expand Down Expand Up @@ -82,4 +83,14 @@ message ListStoresGrpcRequest {
message ListStoresGrpcResponse {
string clusterName = 1;
repeated string storeNames = 2;
}

message DeleteStoreGrpcRequest {
ClusterStoreGrpcInfo storeInfo = 1;
optional bool isAbortMigrationCleanup = 2;
}

message DeleteStoreGrpcResponse {
ClusterStoreGrpcInfo storeInfo = 1;
optional int64 executionId = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo;
import com.linkedin.venice.protocols.controller.CreateStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.DeleteStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.DeleteStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcRequest;
import com.linkedin.venice.protocols.controller.DiscoverClusterGrpcResponse;
import com.linkedin.venice.protocols.controller.LeaderControllerGrpcRequest;
Expand Down Expand Up @@ -249,6 +251,105 @@ public void testValidateStoreDeletedOverSecureGrpcChannel() {
assertEquals(response.getStoreInfo().getClusterName(), veniceCluster.getClusterName());
}

@Test(timeOut = TIMEOUT_MS)
public void testDeleteStoreGrpcEndpoint() {
String storeName = Utils.getUniqueString("test_delete_store");
String controllerGrpcUrl = veniceCluster.getLeaderVeniceController().getControllerGrpcUrl();
ManagedChannel channel = Grpc.newChannelBuilder(controllerGrpcUrl, InsecureChannelCredentials.create()).build();
StoreGrpcServiceGrpc.StoreGrpcServiceBlockingStub storeBlockingStub = StoreGrpcServiceGrpc.newBlockingStub(channel);

ClusterStoreGrpcInfo storeGrpcInfo = ClusterStoreGrpcInfo.newBuilder()
.setClusterName(veniceCluster.getClusterName())
.setStoreName(storeName)
.build();

// Step 1: Create the store
CreateStoreGrpcRequest createStoreGrpcRequest = CreateStoreGrpcRequest.newBuilder()
.setStoreInfo(storeGrpcInfo)
.setOwner("owner")
.setKeySchema(DEFAULT_KEY_SCHEMA)
.setValueSchema("\"string\"")
.build();
CreateStoreGrpcResponse createResponse = storeBlockingStub.createStore(createStoreGrpcRequest);
assertNotNull(createResponse, "Response should not be null");
assertEquals(createResponse.getStoreInfo().getStoreName(), storeName);

// Step 2: Verify store exists
veniceCluster.useControllerClient(controllerClient -> {
StoreResponse storeResponse = TestUtils.assertCommand(controllerClient.getStore(storeName));
assertNotNull(storeResponse.getStore(), "Store should exist after creation");
});

// Step 3: Disable the store before deletion (required for deleteStore)
veniceCluster.useControllerClient(controllerClient -> {
TestUtils.assertCommand(controllerClient.enableStoreReadWrites(storeName, false));
});

// Step 4: Delete the store via gRPC
DeleteStoreGrpcRequest deleteStoreRequest = DeleteStoreGrpcRequest.newBuilder().setStoreInfo(storeGrpcInfo).build();
DeleteStoreGrpcResponse deleteResponse = storeBlockingStub.deleteStore(deleteStoreRequest);
assertNotNull(deleteResponse, "Response should not be null");
assertEquals(deleteResponse.getStoreInfo().getClusterName(), veniceCluster.getClusterName());
assertEquals(deleteResponse.getStoreInfo().getStoreName(), storeName);

// Step 5: Verify store is deleted
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
ValidateStoreDeletedGrpcRequest validateRequest =
ValidateStoreDeletedGrpcRequest.newBuilder().setStoreInfo(storeGrpcInfo).build();
ValidateStoreDeletedGrpcResponse response = storeBlockingStub.validateStoreDeleted(validateRequest);
assertNotNull(response, "Response should not be null");
assertTrue(response.getStoreDeleted(), "Store should be marked as deleted after deletion");
});
}

@Test(timeOut = TIMEOUT_MS)
public void testDeleteStoreOverSecureGrpcChannel() {
String storeName = Utils.getUniqueString("test_delete_store_secure");
String controllerSecureGrpcUrl = veniceCluster.getLeaderVeniceController().getControllerSecureGrpcUrl();
ChannelCredentials credentials = GrpcUtils.buildChannelCredentials(sslFactory);
ManagedChannel channel = Grpc.newChannelBuilder(controllerSecureGrpcUrl, credentials).build();
StoreGrpcServiceGrpc.StoreGrpcServiceBlockingStub storeBlockingStub = StoreGrpcServiceGrpc.newBlockingStub(channel);

ClusterStoreGrpcInfo storeGrpcInfo = ClusterStoreGrpcInfo.newBuilder()
.setClusterName(veniceCluster.getClusterName())
.setStoreName(storeName)
.build();

// First, create the store with an insecure channel (creation requires allowlist)
mockDynamicAccessController.addResourceToAllowList(storeName);
CreateStoreGrpcRequest createStoreGrpcRequest = CreateStoreGrpcRequest.newBuilder()
.setStoreInfo(storeGrpcInfo)
.setOwner("owner")
.setKeySchema(DEFAULT_KEY_SCHEMA)
.setValueSchema("\"string\"")
.build();
CreateStoreGrpcResponse createResponse = storeBlockingStub.createStore(createStoreGrpcRequest);
assertNotNull(createResponse, "Response should not be null");

// Disable the store
veniceCluster.useControllerClient(controllerClient -> {
TestUtils.assertCommand(controllerClient.enableStoreReadWrites(storeName, false));
});

DeleteStoreGrpcRequest deleteRequest = DeleteStoreGrpcRequest.newBuilder().setStoreInfo(storeGrpcInfo).build();

// Case 1: User not in allowlist for the resource - should get permission denied
mockDynamicAccessController.removeResourceFromAllowList(storeName);
assertFalse(
mockDynamicAccessController.isAllowlistUsers(null, storeName, Method.GET.name()),
"User should not be in allowlist");
StatusRuntimeException exception =
Assert.expectThrows(StatusRuntimeException.class, () -> storeBlockingStub.deleteStore(deleteRequest));
assertEquals(exception.getStatus().getCode(), io.grpc.Status.Code.PERMISSION_DENIED);

// Case 2: User in allowlist - should succeed
mockDynamicAccessController.addResourceToAllowList(storeName);
DeleteStoreGrpcResponse deleteResponse = storeBlockingStub.deleteStore(deleteRequest);
assertNotNull(deleteResponse, "Response should not be null");
assertEquals(deleteResponse.getStoreInfo().getStoreName(), storeName);
assertEquals(deleteResponse.getStoreInfo().getClusterName(), veniceCluster.getClusterName());
}

@Test(timeOut = TIMEOUT_MS)
public void testListStoresGrpcEndpoint() {
String storeName1 = Utils.getUniqueString("test_list_stores_1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.DeleteStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.DeleteStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.ListStoresGrpcRequest;
Expand Down Expand Up @@ -147,4 +149,25 @@ public void listStores(ListStoresGrpcRequest grpcRequest, StreamObserver<ListSto
clusterName,
null);
}

/**
* Deletes a store from a cluster.
* Only allowlist users are allowed to delete stores.
*/
@Override
public void deleteStore(
DeleteStoreGrpcRequest grpcRequest,
StreamObserver<DeleteStoreGrpcResponse> responseObserver) {
LOGGER.debug("Received deleteStore with args: {}", grpcRequest);
String clusterName = grpcRequest.getStoreInfo().getClusterName();
String storeName = grpcRequest.getStoreInfo().getStoreName();
handleRequest(StoreGrpcServiceGrpc.getDeleteStoreMethod(), () -> {
if (!isAllowListUser(accessManager, storeName, Context.current())) {
throw new VeniceUnauthorizedAccessException(
ACL_CHECK_FAILURE_WARN_MESSAGE_PREFIX + StoreGrpcServiceGrpc.getDeleteStoreMethod().getFullMethodName()
+ " on resource: " + storeName);
}
return storeRequestHandler.deleteStore(grpcRequest);
}, responseObserver, clusterName, storeName);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.controller.server;

import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controller.AdminCommandExecutionTracker;
import com.linkedin.venice.controller.ControllerRequestHandlerDependencies;
import com.linkedin.venice.controller.StoreDeletedValidation;
import com.linkedin.venice.exceptions.VeniceException;
Expand All @@ -11,6 +12,8 @@
import com.linkedin.venice.protocols.controller.CreateStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.DeleteAclForStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.DeleteStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.DeleteStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.GetAclForStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.ListStoresGrpcRequest;
Expand Down Expand Up @@ -255,4 +258,43 @@ public ListStoresGrpcResponse listStores(ListStoresGrpcRequest request) {
LOGGER.info("Found {} stores in cluster: {}", selectedStoreNames.size(), clusterName);
return ListStoresGrpcResponse.newBuilder().setClusterName(clusterName).addAllStoreNames(selectedStoreNames).build();
}

/**
* Deletes a store from the specified Venice cluster.
* @param request the request containing cluster name, store name, and optional migration cleanup flag
* @return response with store info and optional execution ID for tracking
*/
public DeleteStoreGrpcResponse deleteStore(DeleteStoreGrpcRequest request) {
ClusterStoreGrpcInfo storeInfo = request.getStoreInfo();
ControllerRequestParamValidator.validateClusterStoreInfo(storeInfo);
String clusterName = storeInfo.getClusterName();
String storeName = storeInfo.getStoreName();
boolean isAbortMigrationCleanup = request.hasIsAbortMigrationCleanup() && request.getIsAbortMigrationCleanup();

LOGGER.info(
"Deleting store: {} in cluster: {} with isAbortMigrationCleanup: {}",
storeName,
clusterName,
isAbortMigrationCleanup);

DeleteStoreGrpcResponse.Builder responseBuilder = DeleteStoreGrpcResponse.newBuilder().setStoreInfo(storeInfo);

Optional<AdminCommandExecutionTracker> adminCommandExecutionTracker =
admin.getAdminCommandExecutionTracker(clusterName);

if (adminCommandExecutionTracker.isPresent()) {
// Lock the tracker to get the execution id for the last admin command.
// It will not make our performance worse, because we lock the whole cluster while handling the admin
// operation in parent admin.
synchronized (adminCommandExecutionTracker) {
admin.deleteStore(clusterName, storeName, isAbortMigrationCleanup, Store.IGNORE_VERSION, false);
responseBuilder.setExecutionId(adminCommandExecutionTracker.get().getLastExecutionId());
}
} else {
admin.deleteStore(clusterName, storeName, isAbortMigrationCleanup, Store.IGNORE_VERSION, false);
}

LOGGER.info("Successfully deleted store: {} in cluster: {}", storeName, clusterName);
return responseBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.protocols.controller.ClusterStoreGrpcInfo;
import com.linkedin.venice.protocols.controller.DeleteStoreGrpcRequest;
import com.linkedin.venice.protocols.controller.DeleteStoreGrpcResponse;
import com.linkedin.venice.protocols.controller.ListStoresGrpcRequest;
import com.linkedin.venice.protocols.controller.ListStoresGrpcResponse;
import com.linkedin.venice.protocols.controller.ValidateStoreDeletedGrpcRequest;
Expand Down Expand Up @@ -571,22 +573,24 @@ public void internalHandle(Request request, TrackableControllerResponse veniceRe
String storeName = request.queryParams(NAME);
boolean abortMigratingStore =
Utils.parseBooleanOrFalse(request.queryParams(IS_ABORT_MIGRATION_CLEANUP), IS_ABORT_MIGRATION_CLEANUP);
veniceResponse.setCluster(clusterName);
veniceResponse.setName(storeName);

Optional<AdminCommandExecutionTracker> adminCommandExecutionTracker =
admin.getAdminCommandExecutionTracker(clusterName);
// Build gRPC request from HTTP parameters
ClusterStoreGrpcInfo storeInfo =
ClusterStoreGrpcInfo.newBuilder().setClusterName(clusterName).setStoreName(storeName).build();
DeleteStoreGrpcRequest.Builder grpcRequestBuilder =
DeleteStoreGrpcRequest.newBuilder().setStoreInfo(storeInfo);
if (abortMigratingStore) {
grpcRequestBuilder.setIsAbortMigrationCleanup(true);
}

if (adminCommandExecutionTracker.isPresent()) {
// Lock the tracker to get the execution id for the last admin command.
// It will not make our performance worse, because we lock the whole cluster while handling the admin
// operation in parent admin.
synchronized (adminCommandExecutionTracker) {
admin.deleteStore(clusterName, storeName, abortMigratingStore, Store.IGNORE_VERSION, false);
veniceResponse.setExecutionId(adminCommandExecutionTracker.get().getLastExecutionId());
}
} else {
admin.deleteStore(clusterName, storeName, abortMigratingStore, Store.IGNORE_VERSION, false);
// Call handler
DeleteStoreGrpcResponse grpcResponse = storeRequestHandler.deleteStore(grpcRequestBuilder.build());

// Map response back to HTTP
veniceResponse.setCluster(grpcResponse.getStoreInfo().getClusterName());
veniceResponse.setName(grpcResponse.getStoreInfo().getStoreName());
if (grpcResponse.hasExecutionId()) {
veniceResponse.setExecutionId(grpcResponse.getExecutionId());
}
} catch (Throwable e) {
veniceResponse.setError(e);
Expand Down
Loading
Loading