Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/aws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *AwsTest) SetUpSuite(t *C) {
func (s *AwsTest) TestRegionDetection(t *C) {
s.s3.bucket = "goofys-eu-west-1.kahing.xyz"

if TigrisDetected(s.s3.flags) {
if TigrisDetectedForTests(s.s3.flags) {
t.Skip("Not relevant for Tigris detected")
}

Expand All @@ -55,7 +55,7 @@ func (s *AwsTest) TestRegionDetection(t *C) {
func (s *AwsTest) TestBucket404(t *C) {
s.s3.bucket = RandStringBytesMaskImprSrc(63)

if TigrisDetected(s.s3.flags) {
if TigrisDetectedForTests(s.s3.flags) {
t.Skip("Not relevant for Tigris detected")
}

Expand Down
5 changes: 3 additions & 2 deletions core/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import (
type Capabilities struct {
MaxMultipartSize uint64
// indicates that the blob store has native support for directories
DirBlob bool
Name string
DirBlob bool
Name string
IsTigris bool
}

type HeadBlobInput struct {
Expand Down
47 changes: 45 additions & 2 deletions core/backend_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ func (s *S3Backend) detectBucketLocationByHEAD() (err error, isAws bool) {
isAws = true
}

if server != nil && strings.Contains(server[0], "Tigris") {
s.cap.IsTigris = true
}

switch resp.StatusCode {
case 200:
// note that this only happen if the bucket is in us-east-1
Expand Down Expand Up @@ -408,7 +412,7 @@ func (s *S3Backend) Init(key string) error {
}

if !s.config.RegionSet {
_, _ = s.detectBucketLocationByHEAD()
_, isAws = s.detectBucketLocationByHEAD()
// if err == nil {
// we detected a region header, this is probably AWS S3,
// or we can use anonymous access, or both
Expand Down Expand Up @@ -809,7 +813,46 @@ func (s *S3Backend) DeleteBlobs(param *DeleteBlobsInput) (*DeleteBlobsOutput, er
}

func (s *S3Backend) RenameBlob(param *RenameBlobInput) (*RenameBlobOutput, error) {
return nil, syscall.ENOTSUP
from := s.bucket + "/" + param.Source

params := &s3.CopyObjectInput{
Bucket: &s.bucket,
CopySource: aws.String(pathEscape(from)),
Key: &param.Destination,
MetadataDirective: aws.String(s3.MetadataDirectiveCopy),
}

S3Debug(s3Log, params, "RenameObject")

if s.config.UseSSE {
params.ServerSideEncryption = &s.sseType
if s.config.UseKMS && s.config.KMSKeyID != "" {
params.SSEKMSKeyId = &s.config.KMSKeyID
}
} else if s.config.SseC != "" {
params.SSECustomerAlgorithm = PString("AES256")
params.SSECustomerKey = &s.config.SseC
params.SSECustomerKeyMD5 = &s.config.SseCDigest
params.CopySourceSSECustomerAlgorithm = PString("AES256")
params.CopySourceSSECustomerKey = &s.config.SseC
params.CopySourceSSECustomerKeyMD5 = &s.config.SseCDigest
}

if s.config.ACL != "" {
params.ACL = &s.config.ACL
}

req, _ := s.CopyObjectRequest(params)

withHeader(req, "X-Tigris-Rename", "true")

err := req.Send()
if err != nil {
s3Log.Warn().Interface("params", params).Err(err).Msg("RenameObject failed")
return nil, err
}

return &RenameBlobOutput{s.getRequestId(req)}, nil
}

func (s *S3Backend) mpuCopyPart(from string, to string, mpuId string, bytes string, part int64, srcEtag *string) (*string, error) {
Expand Down
9 changes: 6 additions & 3 deletions core/cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ type FlagStorage struct {

TigrisPrefetch bool
TigrisListContent bool
TigrisRename bool

IsTigris bool
}

func (flags *FlagStorage) GetMimeType(fileName string) (retMime *string) {
Expand Down Expand Up @@ -163,9 +166,9 @@ func (flags *FlagStorage) Cleanup() {
}
}

func (flags *FlagStorage) IsTigris() bool {
return strings.Contains(flags.Endpoint, "tigris.dev") ||
strings.Contains(flags.Endpoint, "storage.dev")
func (flags *FlagStorage) IsTigrisEndpoint() {
flags.IsTigris = strings.Contains(flags.Endpoint, ".tigris.dev") ||
strings.Contains(flags.Endpoint, ".storage.dev")
}

var defaultHTTPTransport = http.Transport{
Expand Down
31 changes: 20 additions & 11 deletions core/cfg/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,6 @@ MISC OPTIONS:
Usage: "Drop root group and change to this group ID (defaults to --gid).",
},

cli.BoolFlag{
Name: "tigris-prefetch",
Usage: "Enable Tigris prefetch on list (default: off)",
},

cli.BoolFlag{
Name: "tigris-list-content",
Usage: "Include inlined objects content in list (default: on)",
},

cli.BoolFlag{
Name: "refresh-dirs",
Usage: "Automatically refresh open directories using notifications under Windows",
Expand Down Expand Up @@ -660,6 +650,21 @@ MISC OPTIONS:
Value: 512,
Usage: "Simultaneously opened cache file descriptor limit",
},

cli.BoolFlag{
Name: "tigris-prefetch",
Usage: "Enable Tigris prefetch on list (default: off)",
},

cli.BoolFlag{
Name: "tigris-list-content",
Usage: "Include inlined objects content in list (default: on)",
},

cli.BoolFlag{
Name: "no-instant-rename",
Usage: "Disable Tigris 'instant' rename (default: off)",
},
}

if runtime.GOOS == "windows" {
Expand Down Expand Up @@ -956,6 +961,7 @@ func PopulateFlags(c *cli.Context) (ret *FlagStorage) {
ClusterGrpcReflection: c.Bool("grpc-reflection"),

TigrisPrefetch: c.Bool("tigris-prefetch"),
TigrisRename: !c.Bool("no-instant-rename"),
TigrisListContent: c.Bool("tigris-list-content"),
}

Expand Down Expand Up @@ -1001,8 +1007,10 @@ func PopulateFlags(c *cli.Context) (ret *FlagStorage) {
panic("Unknown --iam-flavor: " + config.IAMFlavor)
}

flags.IsTigrisEndpoint()

// special enabled for the Tigris by default
if flags.IsTigris() {
if flags.IsTigris {
flags.EnableSpecials = !c.IsSet("no-specials")
}

Expand Down Expand Up @@ -1140,5 +1148,6 @@ func DefaultFlags() *FlagStorage {
},
TigrisPrefetch: false,
TigrisListContent: true,
TigrisRename: true,
}
}
4 changes: 2 additions & 2 deletions core/cfg/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ func InitLoggers(flags *FlagStorage) error {
Format: flags.LogFormat,
}

if (lib.IsTTY(os.Stdout) || lib.IsTTY(os.Stderr)) && log.DefaultLogConfig.Format == "" && lf == "stderr" {
if (lib.IsTTY(os.Stdout) || lib.IsTTY(os.Stderr)) && log.DefaultLogConfig.Format == "" && (lf == "stderr" || lf == "syslog") {
log.DefaultLogConfig.Format = "console"
}

log.DefaultLogConfig.Color = true
if flags.NoLogColor {
if flags.NoLogColor || lf == "syslog" {
log.DefaultLogConfig.Color = false
}

Expand Down
155 changes: 146 additions & 9 deletions core/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,14 +768,145 @@ func (inode *Inode) sendUpload(priority int) bool {
return false
}

func (inode *Inode) sendRename() {
cloud, key := inode.cloud()
if inode.isDir() {
key += "/"
func (inode *Inode) finishErrorSendRenameSpecial(err error, from string, key string, oldParent *Inode, oldName string) {
mappedErr := mapAwsError(err)
if mappedErr == syscall.ENOENT || mappedErr == syscall.ERANGE {
s3Log.Warnf("Conflict detected (inode %v): failed to copy %v to %v: %v. File is removed remotely, dropping cache", inode.Id, from, key, err)
inode.mu.Lock()
newParent := inode.Parent
oldParent := inode.oldParent
oldName := inode.oldName
inode.oldParent = nil
inode.oldName = ""
inode.renamingTo = false
inode.resetCache()
inode.mu.Unlock()
newParent.removeChild(inode)
if oldParent != nil {
oldParent.mu.Lock()
if _, ok := oldParent.dir.DeletedChildren[oldName]; ok {
delete(oldParent.dir.DeletedChildren, oldName)
oldParent.addModified(-1)
}
oldParent.mu.Unlock()
}
} else {
fuseLog.Warnf("Failed to copy %v to %v (rename): %v", from, key, err)
inode.mu.Lock()
inode.recordFlushError(err)
if inode.Parent == oldParent && inode.Name == oldName {
// Someone renamed the inode back to the original name
// ...while we failed to copy it :)
inode.oldParent = nil
inode.oldName = ""
inode.renamingTo = false
inode.Parent.addModified(-1)
if (inode.CacheState == ST_MODIFIED || inode.CacheState == ST_CREATED) &&
!inode.isStillDirty() {
inode.SetCacheState(ST_CACHED)
inode.SetAttrTime(time.Now())
}
}
inode.mu.Unlock()
}
}

func (inode *Inode) finishSuccessSendRenameSpecial(from string, key string, oldParent *Inode, oldName string, newParent *Inode, newName string) {
fuseLog.Debugf("Renamed %v to %v (rename)", from, key)
inode.mu.Lock()

// Now we know that the object is accessible by the new name
if inode.Parent == newParent && inode.Name == newName {
// Just clear the old path
inode.oldParent = nil
inode.oldName = ""
} else if inode.Parent == oldParent && inode.Name == oldName {
// Someone renamed the inode back to the original name(!)
inode.oldParent = nil
inode.oldName = ""
// Delete the new key instead of the old one (?)
} else {
// Someone renamed the inode again(!)
inode.oldParent = newParent
inode.oldName = newName
}
if (inode.CacheState == ST_MODIFIED || inode.CacheState == ST_CREATED) &&
!inode.isStillDirty() {
inode.SetCacheState(ST_CACHED)
inode.SetAttrTime(time.Now())
}
inode.renamingTo = false
inode.mu.Unlock()

oldParent.mu.Lock()
delete(oldParent.dir.DeletedChildren, oldName)
oldParent.mu.Unlock()
// And track ModifiedChildren because rename is special - it takes two parents
oldParent.addModified(-1)
}

func (inode *Inode) finishRenameFlush() {
inode.mu.Lock()
inode.IsFlushing -= inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.activeFlushers, -1)
inode.fs.WakeupFlusher()
inode.mu.Unlock()
}

func (inode *Inode) startRenameFlush() {
inode.IsFlushing += inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.stats.flushes, 1)
atomic.AddInt64(&inode.fs.activeFlushers, 1)
}

func (inode *Inode) sendRenameSpecial() {
if inode.isDir() && inode.fs.flags.NoDirObject {
return
}

cloud, key := inode.cloud()
_, from := inode.oldParent.cloud()

from = appendChildName(from, inode.oldName)
inode.renamingTo = true
oldParent := inode.oldParent
oldName := inode.oldName
newParent := inode.Parent
newName := inode.Name
if inode.isDir() {
from += "/"
key += "/"
}

inode.startRenameFlush()

go func() {
inode.fs.addInflightChange(key)
_, err := cloud.RenameBlob(&RenameBlobInput{
Source: from,
Destination: key,
})
inode.fs.completeInflightChange(key)
if err != nil {
mappedErr := mapAwsError(err)
if mappedErr != syscall.ENOENT || !inode.isDir() {
inode.finishErrorSendRenameSpecial(err, from, key, oldParent, oldName)
} else {
inode.finishSuccessSendRenameSpecial(from, key, oldParent, oldName, newParent, newName)
}
} else {
inode.finishSuccessSendRenameSpecial(from, key, oldParent, oldName, newParent, newName)
}
inode.finishRenameFlush()
}()
}

func (inode *Inode) sendRenameCopy() {
cloud, key := inode.cloud()
if inode.isDir() {
key += "/"
}
inode.startRenameFlush()
_, from := inode.oldParent.cloud()
from = appendChildName(from, inode.oldName)
oldParent := inode.oldParent
Expand Down Expand Up @@ -914,14 +1045,20 @@ func (inode *Inode) sendRename() {
}
}
}
inode.mu.Lock()
inode.IsFlushing -= inode.fs.flags.MaxParallelParts
atomic.AddInt64(&inode.fs.activeFlushers, -1)
inode.fs.WakeupFlusher()
inode.mu.Unlock()
inode.finishRenameFlush()
}()
}

func (inode *Inode) sendRename() {
flags := inode.fs.flags
cloud := *inode.fs.cloud.Load()
if cloud.Capabilities().IsTigris && flags.TigrisRename {
inode.sendRenameSpecial()
return
}
inode.sendRenameCopy()
}

func (inode *Inode) sendUpdateMeta() {
// Update metadata by COPYing into the same object
// It results in the optimized implementation in S3
Expand Down
Loading