diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c825fd6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea/* +fasts3 diff --git a/.goreleaser.yml b/.goreleaser.yml new file mode 100644 index 0000000..ff1867a --- /dev/null +++ b/.goreleaser.yml @@ -0,0 +1,45 @@ +project_name: fasts3 +release: + github: + owner: tuneinc + name: fasts3 + name_template: '{{.Tag}}' +brew: + description: "Fast s3 utility is a faster version of s3cmd's ls, get, and cp functions ideal for buckets containing millions of keys." + homepage: "https://github.com/tuneinc/fasts3" + github: + owner: tuneinc + name: homebrew-tap + commit_author: + name: goreleaserbot + email: goreleaser@carlosbecker.com + install: bin.install "fasts3" +builds: +- goos: + - linux + - darwin + goarch: + - amd64 + - "386" + main: main.go + ldflags: -s -w -X main.version={{.Version}} -X main.commit={{.Commit}} -X main.date={{.Date}} + binary: fasts3 +archive: + format: tar.gz + name_template: '{{ .Binary }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}{{ if .Arm }}v{{ + .Arm }}{{ end }}' +snapshot: + name_template: SNAPSHOT-{{ .Commit }} +checksum: + name_template: '{{ .ProjectName }}_{{ .Version }}_checksums.txt' +changelog: + filters: + # commit messages matching the regexp listed here will be removed from + # the changelog + # Default is empty + exclude: + - '^\(docs\)' + - '^\(travis\)' + - '^\(coverage\)' + - '^\(tests?' + - '^Merge pull request' diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..ee79e1f --- /dev/null +++ b/.travis.yml @@ -0,0 +1,28 @@ +language: go + +go: + - 1.9.x + - 1.10.x + - master + +stages: + - test + - name: deploy + if: tag IS present + +jobs: + include: + - stage: test + script: go build ./... + - stage: deploy + go: 1.10.x + script: curl -sL https://git.io/goreleaser | bash + +matrix: + allow_failures: + - go: master + fast_finish: true + +env: + global: + secure: "Iby/cnnH2C7SmMJoLBT1khL/jfiNxpoZ2axHyEHSeiaoixS75vkKtyHrrcs8A+71fKgMLOq4w5VGbNfHtrUWt6Ny4IFWiNk6RYPnFu72Bzm6bKFb5xS0yGPQ0Yo1GHGlNwHeBi9gI+MUQ4EDoKj4CGvVL60ELcZIaJjnNvXWRGU=" diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..65d909a --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 TUNE, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md index a66f663..7536c43 100644 --- a/README.md +++ b/README.md @@ -1,157 +1,90 @@ +[![Build Status](https://travis-ci.org/tuneinc/fasts3.svg?branch=master)](https://travis-ci.org/tuneinc/fasts3) [![Go Report Card](https://goreportcard.com/badge/github.com/tuneinc/fasts3)](https://goreportcard.com/report/github.com/tuneinc/fasts3) ![FastS3](http://i.imgur.com/A42azaA.png) --- -Fast s3 utility is a faster version of s3cmd's ls and del functions ideal for listing and deleting buckets containing millions of keys. +Fast s3 utility is a faster version of s3cmd's ls, get, and cp functions ideal for buckets containing millions of keys. -#Installation +![autocomplete demo for zsh](autocomplete_demo.gif) + +# Installation ```bash -go get github.com/TuneOSS/fasts3 -cd $GOPATH/src/github.com/TuneOSS/fasts3 -go build +go get -u github.com/metaverse/fasts3 ``` +This should install the binary under `$GOPATH/bin/` -#Configuration +# Configuration -use `fasts3 init` command which will create a template file in ~/.fs3cfg +Use `aws configure` command from the aws cli tool (https://aws.amazon.com/cli/) which will create the necessary config files in `~/.aws/credentials`. +add your region to your credentials file to support tab-completion of buckets: ```ini [default] -access_key= -secret_key= +aws_access_key_id=xxxx +aws_secret_access_key=xxxx +region=us-east-1 ``` -fill in the template file with your s3 credentials - -alternatively you can set these environment variables: +Alternatively you can set these environment variables which will take precedence over the credentials file: ```bash export AWS_ACCESS_KEY_ID= export AWS_SECRET_ACCESS_KEY= +export AWS_REGION=us-east-1 ``` -#Usage - -``` -usage: fasts3 [] [ ...] - -Multi-threaded s3 utility - -Flags: - --help Show help. - -Commands: - help [] - Show help for a command. - - ls [] - List s3 prefixes. - - del [] [] - Delete s3 keys - - get [] [] - Fetch files from s3 - - stream [] [] - Stream s3 files to stdout - - init - Initialize .fs3cfg file in home directory - -``` - -#####ls -``` -usage: fasts3 [] ls [] - -List s3 prefixes. - -Flags: - --help Show help. - -r, --recursive Get all keys for this prefix. - --search-depth=0 search depth to search for work. - -H, --human-readable human readable key size. - -d, --with-date include the last modified date. - -Args: - paritial s3 uri to list, ex: s3://mary/had/a/little/lamb/ - -``` - -#####del -``` -usage: fasts3 [] del [] [] - -Delete s3 keys - -Flags: - --help Show help. - -r, --recursive Delete all keys with prefix - --search-depth=0 search depth to search for work. - -Args: - [] 1 or more partial s3 uris to delete delimited by space - -``` - -#####get -``` -usage: fasts3 get [] [] - -Fetch files from s3 - -Flags: - --search-depth=0 search depth to search for work. - -Args: - [] list of prefixes or s3Uris to retrieve +# Usage +Use: ``` - -#####stream -``` -usage: fasts3 stream [] [] - -Stream s3 files to stdout - -Flags: - --search-depth=0 search depth to search for work. - --key-regex=KEY-REGEX - regex filter for keys - -Args: - [] list of prefixes or s3Uris to retrieve +fasts3 --help +fasts3 --help ``` -####Using search depth to *go* faster -Many times you know the structure of your s3 bucket, this can be used to optimize listings. Say you have a structure like so: +### Using search depth to *go* faster +Many times you know the structure of your s3 bucket, and this can be used to optimize listings. Say you have a structure like so: ```bash fasts3 ls s3://mybuck/logs/ -DIR s3://mybuck/logs/2010/ +DIR s3://mybuck/logs/2011/ DIR s3://mybuck/logs/2012/ DIR s3://mybuck/logs/2013/ DIR s3://mybuck/logs/2014/ DIR s3://mybuck/logs/2015/ ``` -doing a `fasts3 ls -r s3://mybuck/logs/` will read all keys under `logs` sequentially. We can make this faster by adding a `--search-depth 1` flag to the command which gives each of the underlying directories it's own thread increasing throughput. +Doing a `fasts3 ls -r s3://mybuck/logs/` will read all keys under `logs` sequentially. We can make this faster by adding a `--search-depth 1` flag to the command which gives each of the underlying directories its own thread, increasing throughput. + +### Concurrency +The concurrency level of s3 command execution can be tweaked based on your usage needs. By default, `4*NumCPU` s3 commands will be executed concurrently, which is ideal based on our benchmarks. If you want to override this value, set `GOMAXPROCS` in your environment to set the concurrency level: `GOMAXPROCS=64 fasts3 ls -r s3://mybuck/logs/` will execute 64 s3 commands concurrently. -####Examples +### Examples ```bash # ls fasts3 ls s3://mybucket/ # lists top level directories and keys fasts3 ls -r s3://mybucket/ # lists all keys in the bucket fasts3 ls -r --search-depth 1 s3://mybucket/ # lists all keys in the bucket using the directories 1 level down to thread +fasts3 ls -r s3://mybucket/ | awk '{s += $1}END{print s}' # sum sizes of all objects in the bucket -# del -fasts3 del -r s3://mybuck/logs/ # deletes all keys in the prefix -fasts3 del s3://mybuck/logs/2015/01/12/api.log.201501122359.gz # deletes single key -fasts3 del $(fasts3 ls s3://mybuck/logs/2015/01/12 | awk -F " " '/api.log/{print $2}') # delete all keys that have "api.log" in them - -#get +# get fasts3 get s3://mybuck/logs/ # fetches all logs in the prefix # stream fasts3 stream s3://mybuck/logs/ # streams all logs under prefix to stdout -fasts3 stream --key-filter ".*2015-01-01" s3://mybuck/logs/ # streams all logs with 2015-01-01 in the key name stdout +fasts3 stream --key-regex ".*2015-01-01" s3://mybuck/logs/ # streams all logs with 2015-01-01 in the key name stdout + +# cp +fasts3 cp -r s3://mybuck/logs/ s3://otherbuck/ # copies all subdirectories to another bucket +fasts3 cp -r -f s3://mybuck/logs/ s3://otherbuck/all-logs/ # copies all source files into the same destination directory +``` + +### Completion +Bash and ZSH completion are available. + +To install for bash: +``` +source completion.sh +``` + +For zsh: +``` +source completion.zsh ``` diff --git a/autocomplete_demo.gif b/autocomplete_demo.gif new file mode 100644 index 0000000..06b2500 Binary files /dev/null and b/autocomplete_demo.gif differ diff --git a/awswrapper/aws.go b/awswrapper/aws.go deleted file mode 100644 index 7494f0f..0000000 --- a/awswrapper/aws.go +++ /dev/null @@ -1,33 +0,0 @@ -package awswrapper - -import ( - "os" - "os/user" - "path" - - "github.com/AdRoll/goamz/aws" - "github.com/vaughan0/go-ini" -) - -func GetAwsAuth() (aws.Auth, error) { - env_access_key_id := os.Getenv("AWS_ACCESS_KEY_ID") - env_secret_key_id := os.Getenv("AWS_SECRET_ACCESS_KEY") - if env_access_key_id != "" && env_secret_key_id != "" { - return aws.Auth{AccessKey: env_access_key_id, SecretKey: env_secret_key_id}, nil - } - - usr, err := user.Current() - if err != nil { - return aws.Auth{}, err - } - - file, err := ini.LoadFile(path.Join(usr.HomeDir, ".fs3cfg")) - if err != nil { - // try looking for .s3cfg (common for s3cmd users) - file, err = ini.LoadFile(path.Join(usr.HomeDir, ".s3cfg")) - if err != nil { - return aws.Auth{}, err - } - } - return aws.Auth{AccessKey: file["default"]["access_key"], SecretKey: file["default"]["secret_key"]}, nil -} diff --git a/cmd/cp.go b/cmd/cp.go new file mode 100644 index 0000000..0d6b2d2 --- /dev/null +++ b/cmd/cp.go @@ -0,0 +1,60 @@ +package cmd + +import ( + "fmt" + "log" + "strings" + + "github.com/aws/aws-sdk-go/service/s3" + "github.com/spf13/cobra" + "github.com/metaverse/fasts3/s3wrapper" +) + +// cpCmd represents the cp command +var cpCmd = &cobra.Command{ + Use: "cp ", + Short: "Copy files within S3", + Long: ``, + Args: validateS3URIs(cobra.ExactArgs(2)), + Run: func(cmd *cobra.Command, args []string) { + recursive, err := cmd.Flags().GetBool("recursive") + if err != nil { + log.Fatal(err) + } + flat, err := cmd.Flags().GetBool("flat") + if err != nil { + log.Fatal(err) + } + err = Cp(s3Client, args, recursive, delimiter, searchDepth, keyRegex, flat) + if err != nil { + log.Fatal(err) + } + }, +} + +// Cp copies files from one s3 location to another using svc, s3Uris is a list of source and dest s3 URIs, recurse tells +// whether to list all keys under the source prefix, delimiter tells the delimiter to use when listing, searchDepth determines +// the number of prefixes to list before parallelizing list calls, keyRegex is a regex filter on keys, when flat is +// true it only takes the last part of the prefix as the filename. +func Cp(svc *s3.S3, s3Uris []string, recurse bool, delimiter string, searchDepth int, keyRegex string, flat bool) error { + listCh, err := Ls(svc, []string{s3Uris[0]}, recurse, delimiter, searchDepth, keyRegex) + if err != nil { + return err + } + + wrap := s3wrapper.New(svc, maxParallel) + + copiedFiles := wrap.CopyAll(listCh, s3Uris[0], s3Uris[1], delimiter, recurse, flat) + for file := range copiedFiles { + fmt.Printf("Copied %s -> %s%s%s\n", file.FullKey, strings.TrimRight(s3Uris[1], delimiter), delimiter, file.Key) + } + + return nil +} + +func init() { + rootCmd.AddCommand(cpCmd) + + cpCmd.Flags().BoolP("recursive", "r", false, "Copy all keys for this prefix.") + cpCmd.Flags().BoolP("flat", "f", false, "Copy all source files into a flat destination folder (vs. corresponding subfolders)") +} diff --git a/cmd/get.go b/cmd/get.go new file mode 100644 index 0000000..d74acd5 --- /dev/null +++ b/cmd/get.go @@ -0,0 +1,63 @@ +package cmd + +import ( + "log" + + "github.com/aws/aws-sdk-go/service/s3" + "github.com/metaverse/fasts3/s3wrapper" + "github.com/spf13/cobra" +) + +// getCmd represents the get command +var getCmd = &cobra.Command{ + Use: "get ", + Short: "Download files from S3", + Long: ``, + Args: validateS3URIs(cobra.MinimumNArgs(1)), + Run: func(cmd *cobra.Command, args []string) { + recursive, err := cmd.Flags().GetBool("recursive") + if err != nil { + log.Fatal(err) + } + skipExisting, err := cmd.Flags().GetBool("skip-existing") + if err != nil { + log.Fatal(err) + } + err = Get(GetS3Client(), args, recursive, delimiter, searchDepth, keyRegex, skipExisting) + if err != nil { + log.Fatal(err) + } + }, +} + +func init() { + rootCmd.AddCommand(getCmd) + + getCmd.Flags().BoolP("recursive", "r", false, "Get all keys for this prefix") + getCmd.Flags().BoolP("skip-existing", "x", false, "Skips downloading keys which already exist on the local file system") +} + +// Get downloads a file to the local filesystem using svc, s3Uris specifies the +// S3 Prefixes/Keys to download, recurse tells whether or not to download +// everything under s3Uris, delimiter tells the delimiter to use when listing, +// searchDepth determines how many prefixes to list before parallelizing list +// calls, keyRegex is a regex filter on Keys, skipExisting skips files which +// already exist on the filesystem. +func Get(svc *s3.S3, s3Uris []string, recurse bool, delimiter string, searchDepth int, keyRegex string, skipExisting bool) error { + listCh, err := Ls(svc, s3Uris, recurse, delimiter, searchDepth, keyRegex) + if err != nil { + return err + } + + wrap, err := s3wrapper.New(svc, maxParallel).WithRegionFrom(s3Uris[0]) + if err != nil { + return err + } + + downloadedFiles := wrap.GetAll(listCh, skipExisting) + for file := range downloadedFiles { + log.Printf("Downloaded %s -> %s\n", file.FullKey, file.Key) + } + + return nil +} diff --git a/cmd/ls.go b/cmd/ls.go new file mode 100644 index 0000000..1bd3231 --- /dev/null +++ b/cmd/ls.go @@ -0,0 +1,147 @@ +package cmd + +import ( + "fmt" + "log" + "regexp" + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + humanize "github.com/dustin/go-humanize" + "github.com/metaverse/fasts3/s3wrapper" + "github.com/spf13/cobra" +) + +// lsCmd represents the ls command +var lsCmd = &cobra.Command{ + Use: "ls ", + Short: "List S3 prefixes", + Long: ``, + Args: validateS3URIs(cobra.MinimumNArgs(1)), + Run: func(cmd *cobra.Command, args []string) { + recursive, err := cmd.Flags().GetBool("recursive") + if err != nil { + log.Fatal(err) + } + humanReadable, err := cmd.Flags().GetBool("human-readable") + if err != nil { + log.Fatal(err) + } + includeDates, err := cmd.Flags().GetBool("with-date") + if err != nil { + log.Fatal(err) + } + + listChan, err := Ls(GetS3Client(), args, recursive, delimiter, searchDepth, keyRegex) + if err != nil { + log.Fatal(err) + } + + for listOutput := range listChan { + if listOutput.IsPrefix { + fmt.Printf("%10s %s\n", "DIR", listOutput.FullKey) + } else { + var size string + if humanReadable { + size = fmt.Sprintf("%10s", humanize.Bytes(uint64(listOutput.Size))) + } else { + size = fmt.Sprintf("%10d", listOutput.Size) + } + date := "" + if includeDates { + date = " " + (listOutput.LastModified).Format("2006-01-02T15:04:05") + } + fmt.Printf("%s%s %s\n", size, date, listOutput.FullKey) + } + } + }, +} + +// Ls lists S3 keys and prefixes using svc, s3Uris specifies which S3 prefixes/keys to list, recursive tells whether or not to list everything +// under s3Uris, delimiter tells which character to use as the delimiter for listing prefixes, searchDepth determines how many prefixes to list +// before parallelizing list calls, keyRegex is a regex filter on Keys +func Ls(svc *s3.S3, s3Uris []string, recursive bool, delimiter string, searchDepth int, keyRegex string) (chan *s3wrapper.ListOutput, error) { + wrap, err := s3wrapper.New(svc, maxParallel).WithRegionFrom(s3Uris[0]) + if err != nil { + return nil, err + } + outChan := make(chan *s3wrapper.ListOutput, 10000) + + slashRegex := regexp.MustCompile("/") + bucketExpandedS3Uris := make([]string, 0, 1000) + + // transforms uris with partial or no bucket (e.g. s3://) + // into a listable uri + for _, uri := range s3Uris { + // filters uris without bucket or partial bucket specified + // s3 key/prefix queries will always have 3 slashes, where-as + // bucket queries will always have 2 (e.g. s3:/// vs s3://) + if len(slashRegex.FindAllString(uri, -1)) == 2 { + buckets, err := wrap.ListBuckets(uri) + if err != nil { + return nil, err + } + for _, bucket := range buckets { + // add the bucket back to the list of s3 uris in cases where + // we are searching beyond the bucket + if recursive || searchDepth > 0 { + resp, err := svc.GetBucketLocation(&s3.GetBucketLocationInput{Bucket: aws.String(bucket)}) + if err != nil { + return nil, err + } + // if the region is location constrained and not in the region we specified in our config + // then don't list it, otherwise we will get an error from the AWS API + if resp.LocationConstraint == nil || *resp.LocationConstraint == *svc.Client.Config.Region { + bucketExpandedS3Uris = append(bucketExpandedS3Uris, s3wrapper.FormatS3Uri(bucket, "")) + } + } else { + key := "" + fullKey := s3wrapper.FormatS3Uri(bucket, "") + outChan <- &s3wrapper.ListOutput{ + IsPrefix: true, + Key: key, + FullKey: fullKey, + LastModified: time.Time{}, + Size: 0, + Bucket: bucket, + } + } + } + } else { + bucketExpandedS3Uris = append(bucketExpandedS3Uris, uri) + } + } + s3Uris = bucketExpandedS3Uris + + go func() { + defer close(outChan) + + for i := 0; i < searchDepth; i++ { + newS3Uris := make([]string, 0) + for itm := range wrap.ListAll(s3Uris, false, delimiter, keyRegex) { + if itm.IsPrefix { + newS3Uris = append(newS3Uris, strings.TrimRight(itm.FullKey, delimiter)+delimiter) + } else { + outChan <- itm + } + } + s3Uris = newS3Uris + } + + for itm := range wrap.ListAll(s3Uris, recursive, delimiter, keyRegex) { + outChan <- itm + } + }() + + return outChan, nil +} + +func init() { + rootCmd.AddCommand(lsCmd) + + lsCmd.Flags().BoolP("recursive", "r", false, "Get all keys for this prefix") + lsCmd.Flags().BoolP("human-readable", "H", false, "Output human-readable object sizes") + lsCmd.Flags().BoolP("with-date", "d", false, "Include the last modified date") +} diff --git a/cmd/rm.go b/cmd/rm.go new file mode 100644 index 0000000..d9183cc --- /dev/null +++ b/cmd/rm.go @@ -0,0 +1,54 @@ +package cmd + +import ( + "fmt" + "log" + + "github.com/aws/aws-sdk-go/service/s3" + "github.com/metaverse/fasts3/s3wrapper" + "github.com/spf13/cobra" +) + +// rmCmd represents the rm command +var rmCmd = &cobra.Command{ + Use: "rm ", + Short: "Delete files within S3", + Long: ``, + Args: validateS3URIs(cobra.MinimumNArgs(1)), + Run: func(cmd *cobra.Command, args []string) { + recursive, err := cmd.Flags().GetBool("recursive") + if err != nil { + log.Fatal(err) + } + if err := Rm(GetS3Client(), args, recursive, delimiter, searchDepth, keyRegex); err != nil { + log.Fatal(err) + } + }, +} + +// Rm removes files from S3 using svc, s3Uris is a list of prefixes/keys to delete, recurse tells whether or not to delete +// everything under the prefixes, delimiter tells the delimiter to use when listing, searchDepth determines the number of +// prefixes to list before parallelizing list calls, keyRegex is a regex filter on keys +func Rm(svc *s3.S3, s3Uris []string, recurse bool, delimiter string, searchDepth int, keyRegex string) error { + listCh, err := Ls(svc, s3Uris, recurse, delimiter, searchDepth, keyRegex) + if err != nil { + return err + } + + wrap, err := s3wrapper.New(svc, maxParallel).WithRegionFrom(s3Uris[0]) + if err != nil { + return err + } + + deleted := wrap.DeleteObjects(listCh) + for key := range deleted { + fmt.Printf("Deleted %s\n", key.FullKey) + } + return nil +} + +func init() { + rootCmd.AddCommand(rmCmd) + + rmCmd.Flags().BoolP("recursive", "r", false, "Get all keys for this prefix") +} diff --git a/cmd/root.go b/cmd/root.go new file mode 100644 index 0000000..bd29879 --- /dev/null +++ b/cmd/root.go @@ -0,0 +1,95 @@ +package cmd + +import ( + "fmt" + "log" + "regexp" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/spf13/cobra" +) + +// rootCmd represents the base command when called without any subcommands +var rootCmd = &cobra.Command{ + Use: "fasts3", + Short: "A faster S3 utility", + Long: ``, + Run: func(cmd *cobra.Command, args []string) { + if showVersion, err := cmd.Flags().GetBool("version"); err == nil && showVersion { + versionCmd.Run(cmd, args) + return + } + cmd.Help() + }, +} + +var ( + s3Client *s3.S3 + + keyRegex string + delimiter string + searchDepth int + maxParallel int + endpoint string + usePathStyleAddressing bool +) + +func init() { + rootCmd.Flags().Bool("version", false, "Show the version") + rootCmd.PersistentFlags().StringVar(&keyRegex, "key-regex", "", "Regex filter for keys") + rootCmd.PersistentFlags().StringVar(&delimiter, "delimiter", "/", "Delimiter to use while listing") + rootCmd.PersistentFlags().IntVar(&searchDepth, "search-depth", 0, "Dictates how many prefix groups to walk down") + rootCmd.PersistentFlags().IntVarP(&maxParallel, "max-parallel", "p", 10, "Maximum number of calls to make to S3 simultaneously") + rootCmd.PersistentFlags().StringVar(&endpoint, "endpoint", "", "endpoint to make S3 requests against") + rootCmd.PersistentFlags().BoolVar(&usePathStyleAddressing, "path-style-addressing", false, "enables path-style addressing (deprecated in normal AWS environments)") +} + +// Execute adds all child commands to the root command and sets flags appropriately. +// This is called by main.main(). It only needs to happen once to the rootCmd. +func Execute() { + if err := rootCmd.Execute(); err != nil { + log.Fatal(err) + } +} + +func GetS3Client() *s3.S3 { + awsSession, err := session.NewSessionWithOptions(session.Options{ + SharedConfigState: session.SharedConfigEnable, + }) + + if err != nil { + log.Fatal(err) + } + + config := aws.NewConfig() + if endpoint != "" { + config = config.WithEndpoint(endpoint) + } + config = config.WithS3ForcePathStyle(usePathStyleAddressing) + + return s3.New(awsSession, config) +} + +func validateS3URIs(pArgs ...cobra.PositionalArgs) func(cmd *cobra.Command, args []string) error { + return func(cmd *cobra.Command, args []string) error { + for _, pArg := range pArgs { + err := pArg(cmd, args) + if err != nil { + return err + } + } + + for _, a := range args { + hasMatch, err := regexp.MatchString("^s3://", a) + if err != nil { + return err + } + if !hasMatch { + return fmt.Errorf("%s not a valid S3 uri, Please enter a valid S3 uri. Ex: s3://mary/had/a/little/lamb", a) + } + } + return nil + } +} diff --git a/cmd/stream.go b/cmd/stream.go new file mode 100644 index 0000000..1d17920 --- /dev/null +++ b/cmd/stream.go @@ -0,0 +1,93 @@ +package cmd + +import ( + "fmt" + "log" + "os" + + "github.com/aws/aws-sdk-go/service/s3" + "github.com/metaverse/fasts3/s3wrapper" + "github.com/spf13/cobra" +) + +// streamCmd represents the stream command +var streamCmd = &cobra.Command{ + Use: "stream ", + Short: "Stream the S3 objects contents to STDOUT", + Args: validateS3URIs(cobra.MinimumNArgs(1)), + Run: func(cmd *cobra.Command, args []string) { + includeKeyName, err := cmd.Flags().GetBool("include-key-name") + if err != nil { + log.Fatal(err) + } + ordered, err := cmd.Flags().GetBool("ordered") + if err != nil { + log.Fatal(err) + } + raw, err := cmd.Flags().GetBool("raw") + if err != nil { + log.Fatal(err) + } + + err = Stream( + GetS3Client(), + args, + delimiter, + searchDepth, + includeKeyName, + keyRegex, + ordered, + raw) + if err != nil { + fmt.Fprintf(os.Stderr, "Encountered an error: %s\n", err) + return + } + }, +} + +// Stream streams S3 Key content to stdout using the svc, s3Uris specifies the +// S3 Prefixes/Keys to stream, delimiter tells the delimiter to use when listing, +// searchDepth determines how many prefixes to list before parallelizing list +// calls, includeKeyName will prefix each line with the key in which the line +// came from, keyRegex is a regex filter on Keys, ordered determines whether the +// lines can be inter-mingled with lines from other files or must be in order +// (helpful for parsing binary files), raw is a boolean for determining whether +// to output the raw data of each file instead of lines +func Stream( + svc *s3.S3, + s3Uris []string, + delimiter string, + searchDepth int, + includeKeyName bool, + keyRegex string, + ordered bool, + raw bool, +) error { + listCh, err := Ls(svc, s3Uris, true, delimiter, searchDepth, keyRegex) + if err != nil { + return err + } + wrap, err := s3wrapper.New(svc, maxParallel).WithRegionFrom(s3Uris[0]) + if err != nil { + return err + } + + if ordered { + wrap.WithMaxConcurrency(1) + } + + lines := wrap.Stream(listCh, includeKeyName, raw) + for line := range lines { + fmt.Print(line) + } + + return nil +} + +func init() { + rootCmd.AddCommand(streamCmd) + + streamCmd.Flags().BoolP("include-key-name", "i", false, "Include the key name in streamed output") + streamCmd.Flags().BoolP("ordered", "o", false, "Read the keys in-order, not mixing output from different keys (this will reduce the parallelism to 1)") + streamCmd.Flags().BoolP("raw", "r", false, "Raw object stream (do not uncompress or delimit stream)") +} diff --git a/cmd/version.go b/cmd/version.go new file mode 100644 index 0000000..1923ca2 --- /dev/null +++ b/cmd/version.go @@ -0,0 +1,24 @@ +package cmd + +import ( + "fmt" + + "github.com/spf13/cobra" +) + +// Version is set by main during build +var Version string + +// versionCmd represents the version command +var versionCmd = &cobra.Command{ + Use: "version", + Short: "Show the version", + Long: ``, + Run: func(cmd *cobra.Command, args []string) { + fmt.Println(Version) + }, +} + +func init() { + rootCmd.AddCommand(versionCmd) +} diff --git a/completion.sh b/completion.sh new file mode 100644 index 0000000..070d0d1 --- /dev/null +++ b/completion.sh @@ -0,0 +1,22 @@ +#!/bin/zsh +#_fasts3_complete() { +# local s3Prefix=$(${COMP_LINE} | grep -oh "s3://.*") +# +# COMPREPLY=() +# local completions="1,2,3" +# COMPREPLY=( $(compgen"1,2,3" ) +# return 0 +#} +# +#complete -f -F _fasts3_complete fasts3 +_foo() +{ + ORIG_COMP_WORDBREAKS="${COMP_WORDBREAKS}" + COMP_WORDBREAKS=" " + local cur + _get_comp_words_by_ref -n : cur + next=$(fasts3 ls $cur | awk '{print $2}' | sed 's/s3://g') + COMPREPLY=($next) + COMP_WORDBREAKS=${ORIG_COMP_WORDBREAKS} +} +complete -o nospace -F _foo fasts3 diff --git a/completion.zsh b/completion.zsh new file mode 100644 index 0000000..6f2d80f --- /dev/null +++ b/completion.zsh @@ -0,0 +1,11 @@ +#!/bin/zsh +_fasts3_complete() { + local s3Prefix completions + s3Prefix="$1" + if [[ "$s3Prefix" =~ ^s3:// ]]; then + completions="$(fasts3 ls $s3Prefix | awk '{print $2}')" + reply=("${(ps:\n:)completions}") + fi +} + +compctl -K _fasts3_complete -S '' fasts3 diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..11b8ee2 --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module github.com/metaverse/fasts3 + +go 1.17 + +require ( + github.com/aws/aws-sdk-go v1.43.40 + github.com/dustin/go-humanize v1.0.0 + github.com/spf13/cobra v1.4.0 +) + +require ( + github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d2b8636 --- /dev/null +++ b/go.sum @@ -0,0 +1,34 @@ +github.com/aws/aws-sdk-go v1.43.40 h1:xeymFmt2atvG7C9nTjYR1PUt3QZC2sCKvySu/UNdXhM= +github.com/aws/aws-sdk-go v1.43.40/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= +github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= +github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/spf13/cobra v1.4.0 h1:y+wJpx64xcgO1V+RcnwW0LEHxTKRi2ZDPSBjWnrg88Q= +github.com/spf13/cobra v1.4.0/go.mod h1:Wo4iy3BUC+X2Fybo0PDqwJIv3dNRiZLHQymsfxlB84g= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk= +golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/main.go b/main.go index a475c36..34e3028 100644 --- a/main.go +++ b/main.go @@ -1,431 +1,17 @@ package main -/** - * A utility for doing operations on s3 faster than s3cmd - */ import ( - "bufio" - "bytes" - "compress/gzip" - "fmt" - "io/ioutil" "log" - "os" - "os/user" - "path" - "regexp" - "runtime" - "strings" - "sync" - "github.com/AdRoll/goamz/aws" - "github.com/AdRoll/goamz/s3" - "github.com/TuneOSS/fasts3/awswrapper" - "github.com/TuneOSS/fasts3/s3wrapper" - "github.com/alecthomas/kingpin" - "github.com/dustin/go-humanize" + "github.com/metaverse/fasts3/cmd" ) -type s3List []string - -// Set overrides kingping's Set method to validate value for s3 URIs -func (s *s3List) Set(value string) error { - hasMatch, err := regexp.MatchString("^s3://", value) - if err != nil { - return err - } - if !hasMatch { - return fmt.Errorf("%s not a valid S3 uri, Please enter a valid S3 uri. Ex: s3://mary/had/a/little/lamb\n", *lsS3Uri) - } else { - *s = append(*s, value) - return nil - } -} - -func (s *s3List) String() string { - return "" -} - -// IsCumulative specifies S3List as a cumulative argument -func (s *s3List) IsCumulative() bool { - return true -} - -// S3List creates a new S3List kingpin setting -func S3List(s kingpin.Settings) (target *[]string) { - target = new([]string) - s.SetValue((*s3List)(target)) - return -} - var ( - app = kingpin.New("fasts3", "Multi-threaded s3 utility") - - ls = app.Command("ls", "List s3 prefixes.") - lsS3Uri = ls.Arg("s3uri", "paritial s3 uri to list, ex: s3://mary/had/a/little/lamb/").Required().String() - lsRecurse = ls.Flag("recursive", "Get all keys for this prefix.").Short('r').Bool() - lsSearchDepth = ls.Flag("search-depth", "search depth to search for work.").Default("0").Int() - humanReadable = ls.Flag("human-readable", "human readable key size.").Short('H').Bool() - withDate = ls.Flag("with-date", "include the last modified date.").Short('d').Bool() - - del = app.Command("del", "Delete s3 keys") - delPrefixes = S3List(del.Arg("prefixes", "1 or more partial s3 uris to delete delimited by space")) - delRecurse = del.Flag("recursive", "Delete all keys with prefix").Short('r').Bool() - delSearchDepth = del.Flag("search-depth", "search depth to search for work.").Default("0").Int() - - get = app.Command("get", "Fetch files from s3") - getS3Uris = S3List(get.Arg("prefixes", "list of prefixes or s3Uris to retrieve")) - getSearchDepth = get.Flag("search-depth", "search depth to search for work.").Default("0").Int() - - stream = app.Command("stream", "Stream s3 files to stdout") - streamS3Uris = S3List(stream.Arg("prefixes", "list of prefixes or s3Uris to retrieve")) - streamSearchDepth = stream.Flag("search-depth", "search depth to search for work.").Default("0").Int() - streamKeyRegex = stream.Flag("key-regex", "regex filter for keys").Default("").String() - streamIncludeKeyName = stream.Flag("include-key-name", "regex filter for keys").Bool() - - initApp = app.Command("init", "Initialize .fs3cfg file in home directory") + version = "master" ) -// parseS3Uri parses a s3 uri into it's bucket and prefix -func parseS3Uri(s3Uri string) (bucket string, prefix string) { - s3UriParts := strings.Split(s3Uri, "/") - prefix = strings.Join(s3UriParts[3:], "/") - bucket = s3UriParts[2] - return -} - -// GetBucket builds a s3 connection retrieving the bucket -func GetBucket(bucket string) *s3.Bucket { - auth, err := awswrapper.GetAwsAuth() - if err != nil { - log.Fatalln(err) - } - b := s3.New(auth, aws.USEast).Bucket(bucket) - loc, err := b.Location() - if err != nil { - log.Fatalln(err) - - } - if aws.GetRegion(loc) != aws.USEast { - b = s3.New(auth, aws.GetRegion(loc)).Bucket(bucket) - } - return b -} - -// Ls lists directorys or keys under a prefix -func Ls(s3Uri string, searchDepth int, isRecursive, isHumanReadable, includeDate bool) { - bucket, prefix := parseS3Uri(s3Uri) - b := GetBucket(bucket) - - var ch <-chan s3.Key - if isRecursive { - ch = s3wrapper.ListRecurse(b, prefix, searchDepth) - } else { - ch = s3wrapper.ListWithCommonPrefixes(b, prefix) - } - - for k := range ch { - if k.Size < 0 { - fmt.Printf("%10s s3://%s/%s\n", "DIR", bucket, k.Key) - } else { - var size string - if isHumanReadable { - size = fmt.Sprintf("%10s", humanize.Bytes(uint64(k.Size))) - } else { - size = fmt.Sprintf("%10d", k.Size) - } - date := "" - if includeDate { - date = " " + k.LastModified - } - fmt.Printf("%s%s s3://%s/%s\n", size, date, bucket, k.Key) - } - } - -} - -// Del deletes a set of prefixes(s3 keys or partial keys -func Del(prefixes []string, searchDepth int, isRecursive bool) { - if len(*delPrefixes) == 0 { - fmt.Printf("No prefixes provided\n Usage: fasts3 del ") - return - } - keys := make(chan string, len(prefixes)*2+1) - var b *s3.Bucket = nil - go func() { - for _, delPrefix := range prefixes { - bucket, prefix := parseS3Uri(delPrefix) - - if b == nil { - b = GetBucket(bucket) - } - - keys <- prefix - if *delRecurse { - keyExists, err := b.Exists(prefix) - if err != nil { - log.Fatalln(err) - } - - if keyExists { - keys <- prefix - } else if *delRecurse { - for key := range s3wrapper.ListRecurse(b, prefix, searchDepth) { - keys <- key.Key - } - - } else { - fmt.Printf("trying to delete a prefix, please add --recursive or -r to proceed\n") - } - } - } - close(keys) - }() - - var wg sync.WaitGroup - msgs := make(chan string, 1000) - for i := 1; i <= 10; i++ { - wg.Add(1) - go func() { - batch := make([]string, 0, 100) - for key := range keys { - batch = append(batch, key) - if len(batch) >= 100 { - err := s3wrapper.DeleteMulti(b, batch) - if err != nil { - log.Fatalln(err) - } - for _, k := range batch { - msgs <- fmt.Sprintf("File %s Deleted\n", k) - } - batch = batch[:0] - } - } - - if len(batch) > 0 { - err := s3wrapper.DeleteMulti(b, batch) - if err != nil { - log.Fatalln(err) - } - for _, k := range batch { - msgs <- fmt.Sprintf("File %s Deleted\n", k) - } - } - wg.Done() - }() - } - go func() { - wg.Wait() - close(msgs) - }() - for msg := range msgs { - fmt.Print(msg) - } -} - -// initializes configs necessary for fasts3 utility -func Init() error { - usr, err := user.Current() - if err != nil { - return err - } - - fs3cfg_path := path.Join(usr.HomeDir, ".fs3cfg") - if _, err := os.Stat(fs3cfg_path); os.IsNotExist(err) { - cfg := `[default] -access_key= -secret_key=` - ioutil.WriteFile(fs3cfg_path, []byte(cfg), 0644) - fmt.Printf("created template file %s\n", fs3cfg_path) - } else { - fmt.Print(".fs3cfg already exists in home directory") - } - - return nil -} - -type GetRequest struct { - Key string - OriginalPrefix string -} - -// Get lists and retrieves s3 keys given a list of prefixes -// searchDepth can also be specified to increase speed of listing -func Get(prefixes []string, searchDepth int) { - if len(prefixes) == 0 { - fmt.Printf("No prefixes provided\n Usage: fasts3 get ") - return - } - getRequests := make(chan GetRequest, len(prefixes)*2+1) - var b *s3.Bucket = nil - go func() { - for _, prefix := range prefixes { - bucket, prefix := parseS3Uri(prefix) - - if b == nil { - b = GetBucket(bucket) - } - - keyExists, err := b.Exists(prefix) - if err != nil { - log.Fatalln(err) - } - - if keyExists { - keyParts := strings.Split(prefix, "/") - ogPrefix := strings.Join(keyParts[0:len(keyParts)-1], "/") + "/" - getRequests <- GetRequest{Key: prefix, OriginalPrefix: ogPrefix} - } else { - for key := range s3wrapper.ListRecurse(b, prefix, searchDepth) { - getRequests <- GetRequest{Key: key.Key, OriginalPrefix: prefix} - } - - } - } - close(getRequests) - }() - - var wg sync.WaitGroup - msgs := make(chan string, 1000) - workingDirectory, err := os.Getwd() - if err != nil { - log.Fatalln(err) - } - for i := 1; i <= 10; i++ { - wg.Add(1) - go func() { - for rq := range getRequests { - dest := path.Join(workingDirectory, strings.Replace(rq.Key, rq.OriginalPrefix, "", 1)) - msgs <- fmt.Sprintf("Getting %s -> %s\n", rq.Key, dest) - err := s3wrapper.GetToFile(b, rq.Key, dest) - if err != nil { - log.Fatalln(err) - } - } - wg.Done() - }() - } - go func() { - wg.Wait() - close(msgs) - }() - for msg := range msgs { - fmt.Print(msg) - } -} - -// getReaderByExt is a factory for reader based on the extension of the key -func getReaderByExt(bts []byte, key string) (*bufio.Reader, error) { - ext := path.Ext(key) - reader := bytes.NewReader(bts) - if ext == ".gz" { - gzReader, err := gzip.NewReader(reader) - if err != nil { - return nil, err - } - return bufio.NewReader(gzReader), nil - } else { - return bufio.NewReader(reader), nil - } -} - -// Stream takes a set of prefixes lists them and -// streams the contents by line -func Stream(prefixes []string, searchDepth int, keyRegex string, includeKeyName bool) { - if len(prefixes) == 0 { - fmt.Printf("No prefixes provided\n Usage: fasts3 get ") - return - } - keys := make(chan string, len(prefixes)*2+1) - var keyRegexFilter *regexp.Regexp - if keyRegex != "" { - keyRegexFilter = regexp.MustCompile(keyRegex) - } else { - keyRegexFilter = nil - } - var b *s3.Bucket = nil - go func() { - for _, prefix := range prefixes { - bucket, prefix := parseS3Uri(prefix) - - if b == nil { - b = GetBucket(bucket) - } - - keyExists, err := b.Exists(prefix) - if err != nil { - log.Fatalln(err) - } - - if keyExists { - if keyRegexFilter != nil && !keyRegexFilter.MatchString(prefix) { - continue - } - keys <- prefix - } else { - for key := range s3wrapper.ListRecurse(b, prefix, searchDepth) { - if keyRegexFilter != nil && !keyRegexFilter.MatchString(key.Key) { - continue - } - keys <- key.Key - } - - } - } - close(keys) - }() - - var wg sync.WaitGroup - msgs := make(chan string, 1000) - for i := 1; i <= 10; i++ { - wg.Add(1) - go func() { - for key := range keys { - bts, err := s3wrapper.Get(b, key) - reader, err := getReaderByExt(bts, key) - if err != nil { - panic(err) - } - for { - line, _, err := reader.ReadLine() - if err != nil { - if err.Error() == "EOF" { - break - } else { - log.Fatalln(err) - } - } - msg := fmt.Sprintf("%s\n", string(line)) - if includeKeyName { - msg = fmt.Sprintf("[%s] %s", key, msg) - } - msgs <- msg - } - } - wg.Done() - }() - } - go func() { - wg.Wait() - close(msgs) - }() - for msg := range msgs { - fmt.Print(msg) - } -} - func main() { - runtime.GOMAXPROCS(runtime.NumCPU()) - switch kingpin.MustParse(app.Parse(os.Args[1:])) { - case "ls": - Ls(*lsS3Uri, *lsSearchDepth, *lsRecurse, *humanReadable, *withDate) - case "del": - Del(*delPrefixes, *lsSearchDepth, *delRecurse) - case "get": - Get(*getS3Uris, *getSearchDepth) - case "stream": - Stream(*streamS3Uris, *streamSearchDepth, *streamKeyRegex, *streamIncludeKeyName) - case "init": - Init() - } + log.SetFlags(log.Lshortfile) + cmd.Version = version + cmd.Execute() } diff --git a/s3wrapper/s3.go b/s3wrapper/s3.go index d1d9e51..770c969 100644 --- a/s3wrapper/s3.go +++ b/s3wrapper/s3.go @@ -1,274 +1,489 @@ package s3wrapper import ( - "io/ioutil" + "bufio" + "compress/gzip" + "context" + "fmt" + "io" "log" + "net/url" "os" + "path" + "regexp" "strings" "sync" "time" - "github.com/AdRoll/goamz/s3" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" ) -const defaultDelimiter string = "/" +// ListOutput represents the pruned and +// normalized result of a list call to S3, +// this is meant to cut down on memory and +// overhead being used in the channels +type ListOutput struct { + IsPrefix bool + Size int64 + Key string + LastModified time.Time + Bucket string + FullKey string +} -const numListRoutines int = 30 +// S3Wrapper is a wrapper for the S3 +// library which aims to make some of +// it's functions faster +type S3Wrapper struct { + concurrencySemaphore chan struct{} + svc *s3.S3 +} -const maxRetries int = 5 +// parseS3Uri parses a s3 uri into its bucket and prefix +func parseS3Uri(s3Uri string) (bucket string, prefix string) { + s3UriParts := strings.Split(s3Uri, "/") + prefix = strings.Join(s3UriParts[3:], "/") + bucket = s3UriParts[2] + return bucket, prefix +} -type listWork struct { - prefix string - delimiter string +// FormatS3Uri takes a bucket and a prefix and turns it into +// a S3 URI +func FormatS3Uri(bucket string, key string) string { + return fmt.Sprintf("s3://%s", path.Join(bucket, key)) } -// StripS3Path gets the key part from an s3 uri. The key part is everything after -// the header (s3://) -func StripS3Path(key string) string { - key = strings.TrimRight(key, "\n") - if strings.Index(key, "s3://") < 0 { - return key // IF s3:// is not located, assume the file is already stripped +// New creates a new S3Wrapper +func New(svc *s3.S3, maxParallel int) *S3Wrapper { + return &S3Wrapper{ + svc: svc, + concurrencySemaphore: make(chan struct{}, maxParallel), } +} - // Take the first element after splitting from tab (Should be 's3://.../.../.../...') - metaFilePath := strings.Split(key, "\t")[0] - // Clear everything off the line before 's3://' to allow taking s3cmd ls output as input - s3Path := metaFilePath[strings.Index(metaFilePath, "s3://"):] - // The bucket is the second element if you split the string 's3:///' by '/' - bucket := strings.Split(s3Path, "/")[2] - header := "s3://" + bucket - return metaFilePath[strings.Index(metaFilePath, header)+len(header):] +func (w *S3Wrapper) WithRegionFrom(uri string) (*S3Wrapper, error) { + bucket, _ := parseS3Uri(uri) + region, err := s3manager.GetBucketRegionWithClient(context.Background(), w.svc, bucket) + if err != nil { + log.Printf("WARN: unable to autodetect region, falling back to default. Cause: '%s'\n", err) + return w, nil + } + sess, err := session.NewSessionWithOptions(session.Options{SharedConfigState: session.SharedConfigEnable}) + if err != nil { + return nil, err + } + w.svc = s3.New(sess, aws.NewConfig().WithRegion(region)) + return w, nil } -// getListWork generates a list of prefixes based on prefix by searching down the searchDepth -// using DELIMITER as a delimiter -func getListWork(bucket *s3.Bucket, prefix string, searchDepth int) []listWork { - currentPrefixes := []listWork{listWork{prefix, ""}} - results := []listWork{} - for i := 0; i < searchDepth; i++ { - newPrefixes := []listWork{} - for _, pfx := range currentPrefixes { - for res := range List(bucket, pfx.prefix, defaultDelimiter) { - if len(res.CommonPrefixes) != 0 { - for _, newPfx := range res.CommonPrefixes { - newPrefixes = append(newPrefixes, listWork{newPfx, ""}) - } - // catches the case where keys and common prefixes live in the same place - if len(res.Contents) > 0 { - results = append(results, listWork{pfx.prefix, "/"}) - } - } else { - results = append(results, listWork{pfx.prefix, ""}) - } +// WithMaxConcurrency sets the maximum concurrency for the S3 operations +func (w *S3Wrapper) WithMaxConcurrency(maxConcurrency int) *S3Wrapper { + w.concurrencySemaphore = make(chan struct{}, maxConcurrency) + return w +} + +// ListAll is a convienience function for listing and collating all the results for multiple S3 URIs +func (w *S3Wrapper) ListAll(s3Uris []string, recursive bool, delimiter string, keyRegex string) chan *ListOutput { + ch := make(chan *ListOutput, 10000) + var wg sync.WaitGroup + for _, s3Uri := range s3Uris { + wg.Add(1) + go func(s3Uri string) { + defer wg.Done() + for itm := range w.List(s3Uri, recursive, delimiter, keyRegex) { + ch <- itm } - } - currentPrefixes = newPrefixes - } - for _, pfx := range currentPrefixes { - results = append(results, pfx) + }(s3Uri) } - return results + go func() { + wg.Wait() + close(ch) + }() + + return ch } -// List function with retry and support for listing all keys in a prefix -func List(bucket *s3.Bucket, prefix string, delimiter string) <-chan *s3.ListResp { - ch := make(chan *s3.ListResp, 100) - go func(pfix string, del string) { +// List is a wrapping function to parallelize listings and normalize the results from the API +func (w *S3Wrapper) List(s3Uri string, recursive bool, delimiter string, keyRegex string) chan *ListOutput { + bucket, prefix := parseS3Uri(s3Uri) + if recursive { + delimiter = "" + } + var keyRegexFilter *regexp.Regexp + if keyRegex != "" { + keyRegexFilter = regexp.MustCompile(keyRegex) + } + + params := &s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), // Required + Delimiter: aws.String(delimiter), + EncodingType: aws.String(s3.EncodingTypeUrl), + FetchOwner: aws.Bool(false), + MaxKeys: aws.Int64(1000), + Prefix: aws.String(prefix), + } + + ch := make(chan *ListOutput, 10000) + go func() { defer close(ch) - isTruncated := true - nextMarker := "" - for isTruncated { - attempts := 0 - for { - attempts++ - res, err := bucket.List(pfix, del, nextMarker, 1000) - if err != nil { - if err.Error() == "runtime error: index out of range" { - break + w.concurrencySemaphore <- struct{}{} + defer func() { <-w.concurrencySemaphore }() + + err := w.svc.ListObjectsV2Pages(params, func(page *s3.ListObjectsV2Output, lastPage bool) bool { + for _, prefix := range page.CommonPrefixes { + if *prefix.Prefix != delimiter { + escapedPrefix, err := url.QueryUnescape(*prefix.Prefix) + if err != nil { + escapedPrefix = *prefix.Prefix } - if attempts >= maxRetries { - log.Panic(err) + formattedKey := FormatS3Uri(bucket, escapedPrefix) + ch <- &ListOutput{ + IsPrefix: true, + Key: escapedPrefix, + FullKey: formattedKey, + LastModified: time.Time{}, + Size: 0, + Bucket: bucket, } + } + } - time.Sleep(time.Second * 3) - } else { - ch <- res - if len(res.Contents) > 0 { - nextMarker = res.Contents[len(res.Contents)-1].Key - } else if len(res.CommonPrefixes) > 0 { - nextMarker = res.CommonPrefixes[len(res.CommonPrefixes)-1] - } - isTruncated = res.IsTruncated - break + for _, key := range page.Contents { + escapedKey, err := url.QueryUnescape(*key.Key) + if err != nil { + escapedKey = *key.Key + } + formattedKey := FormatS3Uri(bucket, escapedKey) + if keyRegexFilter != nil && !keyRegexFilter.MatchString(formattedKey) { + continue + } + ch <- &ListOutput{ + IsPrefix: false, + Key: escapedKey, + FullKey: formattedKey, + LastModified: *key.LastModified, + Size: *key.Size, + Bucket: bucket, } } + return true + }) + if err != nil { + panic(err) } - }(prefix, delimiter) + }() + return ch } -func Put(bucket *s3.Bucket, key string, contents []byte, contentType string, permissions s3.ACL, options s3.Options) error { - attempts := 0 - for { - attempts++ - err := bucket.Put(key, contents, contentType, permissions, options) - if err == nil { - return nil - } - if attempts >= maxRetries && err != nil { - return err - } - - time.Sleep(time.Second * 3) +// GetReader retrieves an appropriate reader for the given bucket and key +func (w *S3Wrapper) GetReader(bucket string, key string) (io.ReadCloser, error) { + params := &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), } + resp, err := w.svc.GetObject(params) + if err != nil { + return nil, err + } + return resp.Body, nil } -func Get(bucket *s3.Bucket, key string) ([]byte, error) { - attempts := 0 - for { - attempts++ - buff, err := bucket.Get(key) - if err == nil { - return buff, nil - } - if attempts >= maxRetries && err != nil { - return nil, err - } - } +// Stream provides a channel with data from the keys +func (w *S3Wrapper) Stream(keys chan *ListOutput, includeKeyName bool, raw bool) chan string { + lines := make(chan string, 10000) + var wg sync.WaitGroup + go func() { + for key := range keys { + wg.Add(1) + go func(key *ListOutput) { + defer wg.Done() + w.concurrencySemaphore <- struct{}{} + defer func() { <-w.concurrencySemaphore }() -} + reader, err := w.GetReader(key.Bucket, key.Key) + if err != nil { + panic(err) + } + defer reader.Close() + if !raw { + extReader, err := getReaderByExt(reader, key.Key) + if err != nil { + panic(err) + } + bufExtReader := bufio.NewReader(extReader) -// createPathIfNotExists takes a path and creates -// it if it doesn't exist -func createPathIfNotExists(path string) error { - _, err := os.Stat(path) - if os.IsNotExist(err) { - err := os.MkdirAll(path, 0755) - if err != nil { - return err - } - } else { - return nil - } - return nil -} + for { + line, err := bufExtReader.ReadBytes('\n') -// GetToFile takes a bucket and key and puts the bytes to a file -// in specified by dest -func GetToFile(bucket *s3.Bucket, key string, dest string) error { - destParts := strings.Split(dest, "/") - path := strings.Join(destParts[0:len(destParts)-1], "/") + if err != nil && err.Error() != "EOF" { + log.Fatalln(err) + } - if err := createPathIfNotExists(path); err != nil { - return err - } + if includeKeyName { + lines <- fmt.Sprintf("[%s] %s", key.FullKey, string(line)) + } else { + lines <- string(line) + } + if err != nil { + break + } + } + } else { + buf := make([]byte, 64) + for { + numBytes, err := reader.Read(buf) + if err != nil && err.Error() != "EOF" { + log.Fatalln(err) + } - bts, err := Get(bucket, key) - if err != nil { - return err - } - err = ioutil.WriteFile(dest, bts, 0644) - if err != nil { - return err - } - return nil -} + if includeKeyName { + lines <- fmt.Sprintf("[%s] %s", key.FullKey, string(buf[0:numBytes])) + } else { + lines <- string(buf[0:numBytes]) + } -func toDeleteStruct(keys []string) s3.Delete { - objs := make([]s3.Object, 0) - for _, key := range keys { - if key != "" { - objs = append(objs, s3.Object{Key: strings.TrimSpace(key)}) + if err != nil { + break + } + } + } + }(key) } - } - return s3.Delete{false, objs} + go func() { + wg.Wait() + close(lines) + }() + }() + + return lines } -// DeleteMulti deletes multiple keys -func DeleteMulti(bucket *s3.Bucket, keys []string) error { - attempts := 0 - for { - attempts++ - err := bucket.DelMulti(toDeleteStruct(keys)) - if err != nil { - if attempts >= maxRetries { - return err - } +// GetAll retrieves all keys to the local filesystem, it repurposes ListOutput as it's +// output which contains the local paths to the keys +func (w *S3Wrapper) GetAll(keys chan *ListOutput, skipExisting bool) chan *ListOutput { + listOut := make(chan *ListOutput, 10000) + var wg sync.WaitGroup + for key := range keys { + if _, err := os.Stat(key.Key); skipExisting == false || os.IsNotExist(err) { + wg.Add(1) + go func(k *ListOutput) { + defer wg.Done() + w.concurrencySemaphore <- struct{}{} + defer func() { <-w.concurrencySemaphore }() - time.Sleep(time.Second * 3) - } else { - break + if !k.IsPrefix { + // TODO: this assumes '/' as a delimiter + parts := strings.Split(k.Key, "/") + dir := strings.Join(parts[0:len(parts)-1], "/") + createPathIfNotExists(dir) + reader, err := w.GetReader(k.Bucket, k.Key) + if err != nil { + panic(err) + } + defer reader.Close() + outFile, err := os.Create(k.Key) + if err != nil { + panic(err) + } + defer outFile.Close() + _, err = io.Copy(outFile, reader) + if err != nil { + panic(err) + } + listOut <- k + } + }(key) } } - return nil -} -// lists a prefix and includes common prefixes -func ListWithCommonPrefixes(bucket *s3.Bucket, prefix string) <-chan s3.Key { - ch := make(chan s3.Key, 1000) go func() { - defer close(ch) - for listResp := range List(bucket, prefix, defaultDelimiter) { - for _, prefix := range listResp.CommonPrefixes { - ch <- s3.Key{prefix, "", -1, "", "", s3.Owner{}} - } - for _, key := range listResp.Contents { - ch <- key - } - } + wg.Wait() + close(listOut) }() - return ch + + return listOut } -// min function for integers -func intMin(x, y int) int { - if x < y { - return x +// CopyAll copies keys to the dest, source defines what the base prefix is +func (w *S3Wrapper) CopyAll(keys chan *ListOutput, source, dest string, delimiter string, recurse, flat bool) chan *ListOutput { + _, sourcePrefix := parseS3Uri(source) + destBucket, destPrefix := parseS3Uri(dest) + + listOut := make(chan *ListOutput, 1e4) + var wg sync.WaitGroup + for key := range keys { + wg.Add(1) + go func(k *ListOutput) { + defer wg.Done() + w.concurrencySemaphore <- struct{}{} + defer func() { <-w.concurrencySemaphore }() + + if !k.IsPrefix { + keyBucket, keyPrefix := parseS3Uri(k.FullKey) + sourcePath := "/" + path.Join(keyBucket, keyPrefix) + + // trim common path prefixes from k.Key and sourcePrefix + trimDest := strings.Split(k.Key, delimiter) + if flat { + trimDest = trimDest[len(trimDest)-1:] + } else if recurse { + trimSource := strings.Split(sourcePrefix, delimiter) + for len(trimDest) > 1 && len(trimSource) > 1 { + if trimDest[0] != trimSource[0] { + break + } + trimDest = trimDest[1:] + trimSource = trimSource[1:] + } + } + fullDest := destPrefix + strings.Join(trimDest, delimiter) + + _, err := w.svc.CopyObject(&s3.CopyObjectInput{ + Bucket: &destBucket, + CopySource: &sourcePath, + Key: &fullDest, + }) + if err != nil { + fmt.Println("error:", err) + } else { + k.Key = fullDest + listOut <- k + } + } + }(key) } - return y + + go func() { + wg.Wait() + close(listOut) + }() + + return listOut } -func intMax(x, y int) int { - if x < y { - return y +// ListBuckets returns a list of bucket names and does a prefix +// filter based on s3Uri (of the form s3://) +func (w *S3Wrapper) ListBuckets(s3Uri string) ([]string, error) { + + bucketPrefix, _ := parseS3Uri(s3Uri) + results, err := w.svc.ListBuckets(&s3.ListBucketsInput{}) + if err != nil { + return nil, err } - return x -} -// partition partitions list into list of lists where len(lists) <= partitions -func partition(list []listWork, partitionSize int) [][]listWork { - partitions := [][]listWork{} - step := intMax(len(list)/partitionSize, 1) - for i := 0; i < len(list); i += step { - outerBound := intMin(len(list), i+step) - partitions = append(partitions, list[i:outerBound]) + buckets := make([]string, 0, len(results.Buckets)) + for _, bucket := range results.Buckets { + if *bucket.Name != "" && !strings.HasPrefix(*bucket.Name, bucketPrefix) { + continue + } + buckets = append(buckets, *bucket.Name) } - return partitions + return buckets, nil } -// listRecurse lists prefix in parallel using searchDepth to search for routine's work -func ListRecurse(bucket *s3.Bucket, prefix string, searchDepth int) <-chan s3.Key { - ch := make(chan s3.Key, 2000) +const maxKeysPerDeleteObjectsRequest = 1000 + +// DeleteObjects deletes all keys in the given keys channel +func (w *S3Wrapper) DeleteObjects(keys chan *ListOutput) chan *ListOutput { + listOut := make(chan *ListOutput, 1e4) var wg sync.WaitGroup - workPartitions := partition(getListWork(bucket, prefix, searchDepth), numListRoutines) - for _, partition := range workPartitions { + for i := 0; i < cap(w.concurrencySemaphore); i++ { wg.Add(1) - go func(work []listWork) { + go func() { + w.concurrencySemaphore <- struct{}{} + defer func() { <-w.concurrencySemaphore }() defer wg.Done() - for _, workItem := range work { - for res := range List(bucket, workItem.prefix, workItem.delimiter) { - for _, c := range res.Contents { - ch <- c + objects := make([]*s3.ObjectIdentifier, 0, maxKeysPerDeleteObjectsRequest) + listOutCache := make([]*ListOutput, 0, maxKeysPerDeleteObjectsRequest) + params := &s3.DeleteObjectsInput{ + Bucket: aws.String(""), + Delete: &s3.Delete{}, + } + for item := range keys { + if item.IsPrefix { + continue + } + + if *params.Bucket == "" { + params.Bucket = aws.String(item.Bucket) + } + // only maxKeysPerDeleteObjectsRequest objects can fit in + // one DeleteObjects request also if the bucket changes we cannot + // put it in the same request so we flush and start a new one + if len(objects) >= maxKeysPerDeleteObjectsRequest || *params.Bucket != item.Bucket { + // flush + params.Delete = &s3.Delete{ + Objects: objects, + } + _, err := w.svc.DeleteObjects(params) + if err != nil { + panic(err) } + + // write the keys deleted to the results channel + for _, cacheItem := range listOutCache { + listOut <- cacheItem + } + + // reset + listOutCache = make([]*ListOutput, 0, maxKeysPerDeleteObjectsRequest) + params.Bucket = aws.String(item.Bucket) + objects = make([]*s3.ObjectIdentifier, 0, maxKeysPerDeleteObjectsRequest) + } + objects = append(objects, &s3.ObjectIdentifier{ + Key: aws.String(item.Key), + }) + listOutCache = append(listOutCache, item) + } + if len(objects) > 0 { + // flush again for any remaining keys + params.Delete = &s3.Delete{ + Objects: objects, + } + _, err := w.svc.DeleteObjects(params) + if err != nil { + panic(err) + } + + for _, cacheItem := range listOutCache { + listOut <- cacheItem } } - }(partition) + }() } go func() { wg.Wait() - close(ch) + close(listOut) }() - return ch + + return listOut +} + +// getReaderByExt is a factory for reader based on the extension of the key +func getReaderByExt(reader io.ReadCloser, key string) (io.ReadCloser, error) { + ext := path.Ext(key) + if ext == ".gz" || ext == ".gzip" { + gzReader, err := gzip.NewReader(reader) + if err != nil { + return reader, nil + } + return gzReader, nil + } + + return reader, nil +} + +// createPathIfNotExists takes a path and creates +// it if it doesn't exist +func createPathIfNotExists(path string) error { + if _, err := os.Stat(path); !os.IsNotExist(err) { + return nil + } + return os.MkdirAll(path, 0755) }