Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ public boolean rename(Path src, Path dst) throws IOException {
return wrap(() -> fileIO(src).rename(src, dst));
}

@Override
public boolean tryToWriteAtomic(Path path, String content) throws IOException {
return wrap(() -> fileIO(path).tryToWriteAtomic(path, content));
}

private FileIO fileIO(Path path) throws IOException {
if (lazyFileIO == null) {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,21 @@
import org.apache.paimon.utils.ReflectionUtils;

import com.aliyun.oss.OSSClient;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.common.comm.ServiceClient;
import com.aliyun.oss.model.ObjectMetadata;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -180,6 +184,34 @@ protected AliyunOSSFileSystem createFileSystem(org.apache.hadoop.fs.Path path) {
}
}

@Override
public boolean tryToWriteAtomic(Path path, String content) throws IOException {
URI uri = path.toUri();
String bucket = uri.getHost();
String objectKey = uri.getPath().substring(1);
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);

ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(bytes.length);
metadata.setHeader("x-oss-forbid-overwrite", "true");

AliyunOSSFileSystem fs = (AliyunOSSFileSystem) getFileSystem(path(path));
AliyunOSSFileSystemStore store = fs.getStore();
try {
OSSClient ossClient = ReflectionUtils.getPrivateFieldValue(store, "ossClient");
ossClient.putObject(bucket, objectKey, new ByteArrayInputStream(bytes), metadata);
return true;
} catch (OSSException e) {
if ("FileAlreadyExists".equals(e.getErrorCode())) {
LOG.warn("Failed to atomic write {}: object already exists", path);
return false;
}
throw new IOException("Failed to atomic write " + path, e);
} catch (Exception e) {
throw new IOException("Failed to atomic write " + path, e);
}
}

@Override
public void close() {
if (!allowCache) {
Expand Down