Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private void assertRunningResult(JobIdentity ji) {

try {
Thread.sleep(60 * 1000L);
Task<?> task = ThreadPoolTaskExecutor.getInstance().getTask(ji);
Task<?> task = ThreadPoolTaskExecutor.getInstance().getTaskRuntimeInfo(ji);
Assert.assertSame(JobStatus.DONE, task.getStatus());
} catch (InterruptedException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,9 @@
import com.oceanbase.odc.service.common.response.Responses;
import com.oceanbase.odc.service.common.response.SuccessResponse;
import com.oceanbase.odc.service.common.util.UrlUtils;
import com.oceanbase.odc.service.task.Task;
import com.oceanbase.odc.service.task.base.BaseTask;
import com.oceanbase.odc.service.task.constants.JobExecutorUrls;
import com.oceanbase.odc.service.task.executor.DefaultTaskResult;
import com.oceanbase.odc.service.task.executor.DefaultTaskResultBuilder;
import com.oceanbase.odc.service.task.executor.TaskMonitor;
import com.oceanbase.odc.service.task.executor.logger.LogBiz;
import com.oceanbase.odc.service.task.executor.logger.LogBizImpl;
import com.oceanbase.odc.service.task.executor.logger.LogUtils;
Expand Down Expand Up @@ -60,7 +57,8 @@ public ExecutorRequestHandler() {
this.executorBiz = new LogBizImpl();
}

public SuccessResponse<Object> process(HttpMethod httpMethod, String uri, String requestData) {
public SuccessResponse<Object>
process(HttpMethod httpMethod, String uri, String requestData) {
if (uri == null || uri.trim().isEmpty()) {
return Responses.single("request error: uri is empty.");
}
Expand Down Expand Up @@ -89,21 +87,21 @@ public SuccessResponse<Object> process(HttpMethod httpMethod, String uri, String
matcher = modifyParametersPattern.matcher(path);
if (matcher.find()) {
JobIdentity ji = getJobIdentity(matcher);
Task<?> task = ThreadPoolTaskExecutor.getInstance().getTask(ji);
boolean result = task.modify(JobUtils.fromJsonToMap(requestData));
TaskRuntimeInfo runtimeInfo = ThreadPoolTaskExecutor.getInstance().getTaskRuntimeInfo(ji);
boolean result = runtimeInfo.getTask().modify(JobUtils.fromJsonToMap(requestData));
return Responses.ok(result);
}

matcher = getResultPattern.matcher(path);
if (matcher.find()) {
JobIdentity ji = getJobIdentity(matcher);
BaseTask<?> task = ThreadPoolTaskExecutor.getInstance().getTask(ji);
TaskMonitor taskMonitor = task.getTaskMonitor();
DefaultTaskResult result = DefaultTaskResultBuilder.build(task);
TaskRuntimeInfo runtimeInfo = ThreadPoolTaskExecutor.getInstance().getTaskRuntimeInfo(ji);
TaskMonitor taskMonitor = runtimeInfo.getTaskMonitor();
DefaultTaskResult result = DefaultTaskResultBuilder.build(runtimeInfo.getTask());
if (taskMonitor != null && MapUtils.isNotEmpty(taskMonitor.getLogMetadata())) {
result.setLogMetadata(taskMonitor.getLogMetadata());
// assign final error message
DefaultTaskResultBuilder.assignErrorMessage(result, task);
DefaultTaskResultBuilder.assignErrorMessage(result, runtimeInfo.getTask());
taskMonitor.markLogMetaCollected();
}
DefaultTaskResult copiedResult = ObjectUtil.deepCopy(result, DefaultTaskResult.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ public interface TaskExecutor {

boolean cancel(JobIdentity ji);

BaseTask<?> getTask(JobIdentity ji);
TaskRuntimeInfo getTaskRuntimeInfo(JobIdentity ji);

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.oceanbase.odc.service.task.executor;
package com.oceanbase.odc.agent.runtime;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -35,6 +35,11 @@
import com.oceanbase.odc.service.task.constants.JobParametersKeyConstants;
import com.oceanbase.odc.service.task.constants.JobServerUrls;
import com.oceanbase.odc.service.task.enums.JobStatus;
import com.oceanbase.odc.service.task.executor.DefaultTaskResult;
import com.oceanbase.odc.service.task.executor.DefaultTaskResultBuilder;
import com.oceanbase.odc.service.task.executor.HeartbeatRequest;
import com.oceanbase.odc.service.task.executor.TaskReporter;
import com.oceanbase.odc.service.task.executor.TraceDecoratorThreadFactory;
import com.oceanbase.odc.service.task.executor.logger.LogBizImpl;
import com.oceanbase.odc.service.task.util.JobUtils;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2023 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.oceanbase.odc.agent.runtime;

import java.util.concurrent.Future;

import com.oceanbase.odc.service.task.base.BaseTask;

import lombok.Data;

/**
* @author longpeng.zlp
* @date 2024/10/24 14:32
*/
@Data
public class TaskRuntimeInfo {
private final BaseTask<?> task;
private final Future<?> future;
private final TaskMonitor taskMonitor;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
*/
package com.oceanbase.odc.agent.runtime;

import java.util.HashMap;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -27,13 +32,19 @@
import com.oceanbase.odc.common.concurrent.ExecutorUtils;
import com.oceanbase.odc.core.shared.PreConditions;
import com.oceanbase.odc.core.task.TaskThreadFactory;
import com.oceanbase.odc.service.objectstorage.cloud.CloudObjectStorageService;
import com.oceanbase.odc.service.objectstorage.cloud.model.ObjectStorageConfiguration;
import com.oceanbase.odc.service.task.ExceptionListener;
import com.oceanbase.odc.service.task.SharedStorage;
import com.oceanbase.odc.service.task.Task;
import com.oceanbase.odc.service.task.TaskContext;
import com.oceanbase.odc.service.task.TaskEventListener;
import com.oceanbase.odc.service.task.base.BaseTask;
import com.oceanbase.odc.service.task.caller.JobContext;
import com.oceanbase.odc.service.task.executor.TraceDecoratorThreadFactory;
import com.oceanbase.odc.service.task.executor.task.ExceptionListener;
import com.oceanbase.odc.service.task.executor.task.TaskContext;
import com.oceanbase.odc.service.task.schedule.JobIdentity;
import com.oceanbase.odc.service.task.util.CloudObjectStorageServiceBuilder;
import com.oceanbase.odc.service.task.util.JobUtils;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -47,8 +58,7 @@
public class ThreadPoolTaskExecutor implements TaskExecutor {

private static final TaskExecutor TASK_EXECUTOR = new ThreadPoolTaskExecutor();
private final Map<JobIdentity, BaseTask<?>> tasks = new HashMap<>();
private final Map<JobIdentity, Future<?>> futures = new HashMap<>();
private final Map<JobIdentity, TaskRuntimeInfo> tasks = new ConcurrentHashMap<>();
private final ExecutorService executor;

private ThreadPoolTaskExecutor() {
Expand All @@ -68,6 +78,9 @@ synchronized public void execute(BaseTask<?> task, JobContext jc) {
if (tasks.containsKey(jobIdentity)) {
throw new IllegalArgumentException("Task already exists, jobIdentity=" + jobIdentity.getId());
}
// init cloud objet storage service and task monitor
CloudObjectStorageService cloudObjectStorageService = buildCloudStorageService(jc);
TaskMonitor taskMonitor = new TaskMonitor(task, cloudObjectStorageService);
Future<?> future = executor.submit(() -> {
try {
task.start(new TaskContext() {
Expand All @@ -80,19 +93,108 @@ public ExceptionListener getExceptionListener() {
public JobContext getJobContext() {
return jc;
}

@Override
public TaskEventListener getTaskEventListener() {
return taskEventListener(taskMonitor);
}

@Override
public SharedStorage getSharedStorage() {
return sharedStorage(cloudObjectStorageService);
}
});
} catch (Exception e) {
log.error("Task start failed, jobIdentity={}.", jobIdentity.getId(), e);
task.onException(e);
}
});
futures.put(jobIdentity, future);
tasks.put(jobIdentity, task);
tasks.put(jobIdentity, new TaskRuntimeInfo(task, future, taskMonitor));
}

private SharedStorage sharedStorage(CloudObjectStorageService cloudObjectStorageService) {
return new SharedStorage() {
@Override
public boolean available() {
return null != cloudObjectStorageService && cloudObjectStorageService.supported();
}

@Override
public InputStream download(String objectId) throws IOException {
return cloudObjectStorageService.getObject(objectId);
}

@Override
public String upload(String expectName, File localFile) throws IOException {
return cloudObjectStorageService.upload(expectName, localFile);
}

@Override
public String uploadTemp(String expectName, File localFile) throws IOException {
return cloudObjectStorageService.uploadTemp(expectName, localFile);
}

@Override
public URL getDownloadURL(String objectId) throws IOException {
return cloudObjectStorageService.generateDownloadUrl(objectId);
}

@Override
public String getPathPrefix() {
return cloudObjectStorageService.getBucketName();
}
};
}

/**
* build task event listener
*
* @param taskMonitor
* @return
*/
private TaskEventListener taskEventListener(TaskMonitor taskMonitor) {
return new TaskEventListener() {
@Override
public void onTaskStart(Task<?> task) {
taskMonitor.monitor();
}

@Override
public void onTaskStop(Task<?> task) {}

@Override
public void onTaskModify(Task<?> task) {}

@Override
public void onTaskFinalize(Task<?> task) {
taskMonitor.finalWork();
}
};
}

/**
* build task monitor
*
* @param jobContext
* @return
*/
protected CloudObjectStorageService buildCloudStorageService(JobContext jobContext) {
Optional<ObjectStorageConfiguration> storageConfig = JobUtils.getObjectStorageConfiguration();
CloudObjectStorageService cloudObjectStorageService = null;
try {
if (storageConfig.isPresent()) {
cloudObjectStorageService = CloudObjectStorageServiceBuilder.build(storageConfig.get());
}
} catch (Throwable e) {
log.warn("Init cloud object storage service failed, id={}.", jobContext.getJobIdentity().getId(), e);
}
return cloudObjectStorageService;
}

@Override
public boolean cancel(JobIdentity ji) {
Task<?> task = getTask(ji);
TaskRuntimeInfo runtimeInfo = getTaskRuntimeInfo(ji);
BaseTask<?> task = runtimeInfo.getTask();
Future<Boolean> stopFuture = executor.submit(task::stop);
boolean result = false;
try {
Expand All @@ -117,9 +219,9 @@ public boolean cancel(JobIdentity ji) {
}

@Override
public BaseTask<?> getTask(JobIdentity ji) {
BaseTask<?> task = tasks.get(ji);
PreConditions.notNull(task, "task", "Task not found, jobIdentity=" + ji.getId());
return task;
public TaskRuntimeInfo getTaskRuntimeInfo(JobIdentity ji) {
TaskRuntimeInfo runtimeInfo = tasks.get(ji);
PreConditions.notNull(runtimeInfo, "task", "Task not found, jobIdentity=" + ji.getId());
return runtimeInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,18 @@
*/
package com.oceanbase.odc.service.objectstorage;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.io.InputStream;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.springframework.core.io.Resource;

import com.oceanbase.odc.core.shared.exception.InternalServerError;
import com.oceanbase.odc.service.objectstorage.cloud.CloudObjectStorageService;
import com.oceanbase.odc.service.objectstorage.model.ObjectMetadata;
import com.oceanbase.odc.service.objectstorage.model.StorageObject;
import com.oceanbase.odc.service.objectstorage.operator.LocalFileOperator;
import com.oceanbase.odc.service.objectstorage.util.ObjectStorageUtils;
import com.oceanbase.odc.service.task.SharedStorage;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -42,27 +39,22 @@
public class ObjectStorageHandler {

private final LocalFileOperator localFileOperator;
private final CloudObjectStorageService cloudObjectStorageService;
private final SharedStorage sharedStorage;

public ObjectStorageHandler(CloudObjectStorageService cloudObjectStorageService, String localDir) {
public ObjectStorageHandler(SharedStorage sharedStorage, String localDir) {
this.localFileOperator = new LocalFileOperator(localDir);
this.cloudObjectStorageService = cloudObjectStorageService;
}

public String loadObjectContentAsString(ObjectMetadata metadata) throws IOException {
StorageObject storageObject = loadObject(metadata);
return IOUtils.toString(storageObject.getContent(), StandardCharsets.UTF_8);
this.sharedStorage = sharedStorage;
}

public StorageObject loadObject(ObjectMetadata metadata) {
Resource resource;
try {
if (cloudObjectStorageService.supported()) {
if (sharedStorage.available()) {
// OSS supported, load file from local if exists, otherwise load file from oss
if (localFileOperator.isLocalFileAbsent(metadata)) {
log.info("File is absent in local, load file from oss, bucket={}, objectId={}",
metadata.getBucketName(), metadata.getObjectId());
loadObjectFromOss(metadata);
loadObjectFromSharedObject(metadata);
}
resource = localFileOperator.loadAsResource(metadata.getBucketName(), metadata.getObjectId());
} else {
Expand All @@ -78,14 +70,11 @@ public StorageObject loadObject(ObjectMetadata metadata) {
}
}

private void loadObjectFromOss(ObjectMetadata metadata) throws IOException {
File tempFile = cloudObjectStorageService.downloadToTempFile(metadata.getObjectId());
try (FileInputStream inputStream = new FileInputStream(tempFile)) {
private void loadObjectFromSharedObject(ObjectMetadata metadata) throws IOException {
try (InputStream inputStream = sharedStorage.download(metadata.getObjectId())) {
FileUtils.copyInputStreamToFile(inputStream,
localFileOperator.getOrCreateLocalFile(metadata.getBucketName(),
metadata.getObjectId()));
} finally {
FileUtils.deleteQuietly(tempFile);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import com.oceanbase.odc.core.shared.Verify;
import com.oceanbase.odc.service.flow.task.model.SizeAwareInputStream;
import com.oceanbase.odc.service.objectstorage.ObjectStorageHandler;
import com.oceanbase.odc.service.objectstorage.cloud.CloudObjectStorageService;
import com.oceanbase.odc.service.objectstorage.model.ObjectMetadata;
import com.oceanbase.odc.service.objectstorage.model.StorageObject;
import com.oceanbase.odc.service.task.SharedStorage;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -49,7 +49,7 @@ public static String concatObjectId(String objectId, String extension) {
}

public static SizeAwareInputStream loadObjectsForTask(@NonNull List<ObjectMetadata> metadatas,
CloudObjectStorageService cloudOSS, String executorDataPath, long maxReadBytes) throws IOException {
SharedStorage cloudOSS, String executorDataPath, long maxReadBytes) throws IOException {
InputStream inputStream = new ByteArrayInputStream(new byte[0]);
long totalBytes = 0;
for (ObjectMetadata metadata : metadatas) {
Expand Down
Loading