diff --git a/README.md b/README.md index 5605a06..b2482be 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,47 @@ # go-diskqueue -[![Build Status](https://secure.travis-ci.org/nsqio/go-diskqueue.png?branch=master)](http://travis-ci.org/nsqio/go-diskqueue) [![GoDoc](https://godoc.org/github.com/nsqio/go-diskqueue?status.svg)](https://godoc.org/github.com/nsqio/go-diskqueue) [![GitHub release](https://img.shields.io/github/release/nsqio/go-diskqueue.svg)](https://github.com/nsqio/go-diskqueue/releases/latest) +This is a fork of https://github.com/nsqio/go-diskqueue with the additional support of total disk space limit. + +[![Build Status](https://travis-ci.com/kev1n80/go-diskqueue.svg?branch=master)](https://travis-ci.com/github/kev1n80/go-diskqueue) [![Go Reference](https://pkg.go.dev/badge/github.com/kev1n80/go-diskqueue.svg)](https://pkg.go.dev/github.com/kev1n80/go-diskqueue) [![GitHub release](https://img.shields.io/github/release/kev1n80/go-diskqueue.svg)](https://github.com/kev1n80/go-diskqueue/releases/latest) A Go package providing a filesystem-backed FIFO queue Pulled out of https://github.com/nsqio/nsq + +# Description +Diskqueue is a synchronized "filesystem-backed FIFO queue” meaning it will store data you pass in by writing them to file. + +Diskqueue writes messages and their message length to files in the order: message length in binary and then message. This allows Diskqueue to know how much of the file to read in order to get the next message. Once Diskqueue reads a file completely (when the number of bytes read surpasses the size of the file), it deletes the file. + +In terms of threads, creating a Diskqueue object starts a “worker thread” by calling the private function ioLoop, which is a continuous loop that accepts requests to read, write, empty, get depth, or exit. This worker thread DOES NOT create other worker threads to handle tasks asynchronously. It is important to note that Diskqueue will sync if needed (i.e. set by sync flag after user retrieves read data) before handling a new request. Using a public function can be seen as creating a request to the Diskqueue object’s “worker thread” which is implemented by using Channels. + +# Disk Space limit Feature +The original DiskQueue package did not contain a disk size limit feature; however, this forked repo does! By using a separate constructor `NewWithDiskSpace`, the user can use this disk space limit feature which will delete the oldest files in order to create space for new, incoming data. + +In order to accurately adjust `depth` when a file is deleted, DiskQueue will store the number of messages in a file by writing this number to the end of the file. That way we can access this number and decrement `depth` accordingly. + +Note: The disk size limit must be greater than 56 bytes which is reserved for the meta data file. + +# Public Functions + +## Put([]byte) error +Add data to the queue, and if a failure occurs none of the data will be written. + +## ReadChan() <-chan []byte +This is expected to be an *unbuffered* channel that will not close until `Close()` or `Delete()` is called. + +## Close() error +Cleans up the queue and persists the current state to metadata. + +## Delete() error +Cleans up the queue, but does not save the current state to metadata. + +## Depth() int64 +Returns the number of data in the queue; however, this number can become inaccurate if a file becomes corrupted or unaccessible. +Although there are times when this number can be inaccurate, this number will always be 0 when there is nothing in the queue due to the `checkTailCorruption(depth int64)` private function. + +## Empty() error +Empties out the queue by deleting all of the files containing data. + +## TotalBytesFolderSize() int64 +Returns the total number of bytes the content in the targeted folder take up. diff --git a/diskqueue.go b/diskqueue.go index 26b3438..d92c24e 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -7,9 +7,11 @@ import ( "errors" "fmt" "io" + "io/ioutil" "math/rand" "os" "path" + "regexp" "sync" "time" ) @@ -19,11 +21,13 @@ import ( type LogLevel int const ( - DEBUG = LogLevel(1) - INFO = LogLevel(2) - WARN = LogLevel(3) - ERROR = LogLevel(4) - FATAL = LogLevel(5) + DEBUG = LogLevel(1) + INFO = LogLevel(2) + WARN = LogLevel(3) + ERROR = LogLevel(4) + FATAL = LogLevel(5) + numFileMsgBytes = 8 + maxMetaDataFileSize = 56 ) type AppLogFunc func(lvl LogLevel, f string, args ...interface{}) @@ -47,10 +51,12 @@ func (l LogLevel) String() string { type Interface interface { Put([]byte) error ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel + PeekChan() <-chan []byte // this is expected to be an *unbuffered* channel Close() error Delete() error Depth() int64 Empty() error + TotalBytesFolderSize() int64 } // diskQueue implements a filesystem backed FIFO queue @@ -58,17 +64,21 @@ type diskQueue struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms // run-time state (also persisted to disk) - readPos int64 - writePos int64 - readFileNum int64 - writeFileNum int64 - depth int64 + readPos int64 + writePos int64 + readFileNum int64 + writeFileNum int64 + readMessages int64 // Number of read messages. It's used to update depth. + writeMessages int64 // Number of write messages. It's used to update depth. + totalDiskSpaceUsed int64 + depth int64 sync.RWMutex // instantiation time metadata name string dataPath string + maxBytesDiskSpace int64 maxBytesPerFile int64 // cannot change once created maxBytesPerFileRead int64 minMsgSize int32 @@ -91,6 +101,9 @@ type diskQueue struct { // exposed via ReadChan() readChan chan []byte + // exposed via PeekChan() + peekChan chan []byte + // internal channels depthChan chan int64 writeChan chan []byte @@ -101,6 +114,11 @@ type diskQueue struct { exitSyncChan chan int logf AppLogFunc + + // disk limit implementation flag + enableDiskLimitation bool + + badFileNameRegexp, fileNameRegexp *regexp.Regexp } // New instantiates an instance of diskQueue, retrieving metadata @@ -108,23 +126,63 @@ type diskQueue struct { func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { + + return NewWithDiskSpace(name, dataPath, + 0, maxBytesPerFile, + minMsgSize, maxMsgSize, + syncEvery, syncTimeout, logf) +} + +// Another constructor that allows users to use Disk Space Limit feature +// If user is not using Disk Space Limit feature, maxBytesDiskSpace will +// be 0 +func NewWithDiskSpace(name string, dataPath string, + maxBytesDiskSpace int64, maxBytesPerFile int64, + minMsgSize int32, maxMsgSize int32, + syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { + enableDiskLimitation := true + if maxBytesDiskSpace <= 0 { + enableDiskLimitation = false + } d := diskQueue{ - name: name, - dataPath: dataPath, - maxBytesPerFile: maxBytesPerFile, - minMsgSize: minMsgSize, - maxMsgSize: maxMsgSize, - readChan: make(chan []byte), - depthChan: make(chan int64), - writeChan: make(chan []byte), - writeResponseChan: make(chan error), - emptyChan: make(chan int), - emptyResponseChan: make(chan error), - exitChan: make(chan int), - exitSyncChan: make(chan int), - syncEvery: syncEvery, - syncTimeout: syncTimeout, - logf: logf, + name: name, + dataPath: dataPath, + maxBytesDiskSpace: maxBytesDiskSpace, + maxBytesPerFile: maxBytesPerFile, + minMsgSize: minMsgSize, + maxMsgSize: maxMsgSize, + readChan: make(chan []byte), + peekChan: make(chan []byte), + depthChan: make(chan int64), + writeChan: make(chan []byte), + writeResponseChan: make(chan error), + emptyChan: make(chan int), + emptyResponseChan: make(chan error), + exitChan: make(chan int), + exitSyncChan: make(chan int), + syncEvery: syncEvery, + syncTimeout: syncTimeout, + logf: logf, + enableDiskLimitation: enableDiskLimitation, + } + + err := d.start() + if err != nil { + return nil + } + + return &d +} + +// Get the last known state of DiskQueue from metadata and start ioLoop +func (d *diskQueue) start() error { + // ensure that DiskQueue has enough space to write the metadata file + at least one data file with max size + message size + if d.enableDiskLimitation && (d.maxBytesDiskSpace <= maxMetaDataFileSize+d.maxBytesPerFile) { + errorMsg := fmt.Sprintf( + "disk size limit too small(%d): not enough space for MetaData file (size=%d) and at least one data file with max size (maxBytesPerFile=%d).", + d.maxBytesDiskSpace, maxMetaDataFileSize, d.maxBytesPerFile) + d.logf(ERROR, "DISKQUEUE(%s) - %s", errorMsg) + return errors.New(errorMsg) } // no need to lock here, nothing else could possibly be touching this instance @@ -133,8 +191,18 @@ func New(name string, dataPath string, maxBytesPerFile int64, d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err) } + d.fileNameRegexp = regexp.MustCompile(`^` + d.name + `.diskqueue.\d+.dat$`) + d.badFileNameRegexp = regexp.MustCompile(`^` + d.name + `.diskqueue.\d+.dat.bad$`) + + d.updateTotalDiskSpaceUsed() + go d.ioLoop() - return &d + + return nil +} + +func (d *diskQueue) PeekChan() <-chan []byte { + return d.peekChan } // Depth returns the depth of the queue @@ -147,6 +215,20 @@ func (d *diskQueue) Depth() int64 { return depth } +// Returns to total size of the contents (files) in the directory located in the dataPath +func (d *diskQueue) TotalBytesFolderSize() int64 { + var totalFolderSizeBytes int64 + + getTotalFolderSizeBytes := func(fileInfo os.FileInfo) error { + totalFolderSizeBytes += fileInfo.Size() + return nil + } + + d.walkDiskQueueDir(getTotalFolderSizeBytes) + + return totalFolderSizeBytes +} + // ReadChan returns the receive-only []byte channel for reading data func (d *diskQueue) ReadChan() <-chan []byte { return d.readChan @@ -267,6 +349,12 @@ func (d *diskQueue) skipToNextRWFile() error { d.nextReadPos = 0 d.depth = 0 + if d.enableDiskLimitation { + d.totalDiskSpaceUsed = 0 + d.readMessages = 0 + d.writeMessages = 0 + } + return err } @@ -301,6 +389,10 @@ func (d *diskQueue) readOne() ([]byte, error) { stat, err := d.readFile.Stat() if err == nil { d.maxBytesPerFileRead = stat.Size() + if d.enableDiskLimitation { + // last 8 bytes are reserved for the number of messages in this file + d.maxBytesPerFileRead -= numFileMsgBytes + } } } @@ -353,6 +445,197 @@ func (d *diskQueue) readOne() ([]byte, error) { return readBuf, nil } +func (d *diskQueue) removeBadFile(oldestBadFileInfo os.FileInfo) error { + var err error + badFileFilePath := path.Join(d.dataPath, oldestBadFileInfo.Name()) + + // remove file if it exists + err = os.Remove(badFileFilePath) + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to remove .bad file(%s) - %s", d.name, oldestBadFileInfo.Name(), err) + d.updateTotalDiskSpaceUsed() + return err + } else { + // recaclulate total bad files disk size to get most accurate info + d.totalDiskSpaceUsed -= oldestBadFileInfo.Size() + d.logf(INFO, "DISKQUEUE(%s) removed .bad file(%s) of size(%d bytes) to free up disk space", d.name, oldestBadFileInfo.Name(), oldestBadFileInfo.Size()) + } + + return nil +} + +func (d *diskQueue) readNumOfMessages(fileName string) (int64, error) { + var err error + + if d.readFile == nil { + d.readFile, err = os.OpenFile(fileName, os.O_RDONLY, 0600) + if err != nil { + return 0, err + } + } + + closeReadFile := func() { + d.readFile.Close() + d.readFile = nil + } + defer closeReadFile() + + // read total messages number at the end of the file + _, err = d.readFile.Seek(-numFileMsgBytes, 2) + if err != nil { + return 0, err + } + + var totalMessages int64 + err = binary.Read(d.readFile, binary.BigEndian, &totalMessages) + if err != nil { + return 0, err + } + + return totalMessages, nil +} + +func (d *diskQueue) removeReadFile() error { + if d.readFileNum == d.writeFileNum { + d.skipToNextRWFile() + return nil + } + + curFileName := d.fileName(d.readFileNum) + totalMessages, err := d.readNumOfMessages(curFileName) + if err != nil { + return err + } + + // update depth with the remaining number of messages + d.depth -= totalMessages - d.readMessages + + // we have not finished reading this file + if d.readFileNum == d.nextReadFileNum { + d.nextReadFileNum++ + d.nextReadPos = 0 + } + + d.moveToNextReadFile() + + return nil +} + +// walk through all of the files in the DiskQueue directory +func (d *diskQueue) walkDiskQueueDir(fn func(os.FileInfo) error) error { + fileInfos, err := ioutil.ReadDir(d.dataPath) + + if err != nil { + return err + } + + for _, fileInfo := range fileInfos { + // only go through files and skip directories + if !fileInfo.IsDir() { + err = fn(fileInfo) + if err != nil { + return err + } + } + } + + return nil +} + +func (d *diskQueue) getAllBadFileInfo() ([]os.FileInfo, error) { + var badFileInfos []os.FileInfo + + getAllBadFileInfo := func(fileInfo os.FileInfo) error { + // only accept "bad" files created by this DiskQueue object + if d.badFileNameRegexp.MatchString(fileInfo.Name()) { + badFileInfos = append(badFileInfos, fileInfo) + } + + return nil + } + + err := d.walkDiskQueueDir(getAllBadFileInfo) + + return badFileInfos, err +} + +// get the accurate total non-"bad" file size +func (d *diskQueue) updateTotalDiskSpaceUsed() { + d.totalDiskSpaceUsed = maxMetaDataFileSize + + updateTotalDiskSpaceUsed := func(fileInfo os.FileInfo) error { + // only accept files created by this DiskQueue object + if d.fileNameRegexp.MatchString(fileInfo.Name()) || d.badFileNameRegexp.MatchString(fileInfo.Name()) { + d.totalDiskSpaceUsed += fileInfo.Size() + } + + return nil + } + + err := d.walkDiskQueueDir(updateTotalDiskSpaceUsed) + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to update write bytes - %s", d.name, err) + } +} + +func (d *diskQueue) freeDiskSpace(expectedBytesIncrease int64) error { + var err error + var badFileInfos []os.FileInfo + + badFileInfos, err = d.getAllBadFileInfo() + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to retrieve all .bad file info - %s", d.name, err) + } + + // keep freeing up disk space until we have enough space to write this message + for _, badFileInfo := range badFileInfos { + if d.totalDiskSpaceUsed+expectedBytesIncrease <= d.maxBytesDiskSpace { + return nil + } + d.removeBadFile(badFileInfo) + } + for d.readFileNum <= d.writeFileNum { + if d.totalDiskSpaceUsed+expectedBytesIncrease <= d.maxBytesDiskSpace { + return nil + } + // delete the read file (make space) + readFileToDeleteNum := d.readFileNum + err = d.removeReadFile() + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to remove file(%s) - %s", d.name, d.fileName(readFileToDeleteNum), err) + d.handleReadError() + return err + } else { + d.logf(INFO, "DISKQUEUE(%s) removed file(%s) to free up disk space", d.name, d.fileName(readFileToDeleteNum)) + } + d.updateTotalDiskSpaceUsed() + } + + if d.totalDiskSpaceUsed+expectedBytesIncrease > d.maxBytesDiskSpace { + return fmt.Errorf("could not make space for totalDiskSpaceUsed = %d, expectedBytesIncrease = %d, with maxBytesDiskSpace = %d ", d.totalDiskSpaceUsed, expectedBytesIncrease, d.maxBytesDiskSpace) + } + + return nil +} + +// check if there is enough available disk space to write new data to file +func (d *diskQueue) checkDiskSpace(expectedBytesIncrease int64) error { + // If the data to be written is bigger than the disk size limit, do not write + if expectedBytesIncrease > d.maxBytesDiskSpace { + errorMsg := fmt.Sprintf( + "message size(%d) surpasses disk size limit(%d)", + expectedBytesIncrease, d.maxBytesDiskSpace) + d.logf(ERROR, "DISKQUEUE(%s) - %s", d.name, errorMsg) + return errors.New(errorMsg) + } + + // check if we have enough space to write this message + if d.totalDiskSpaceUsed+expectedBytesIncrease > d.maxBytesDiskSpace { + return d.freeDiskSpace(expectedBytesIncrease) + } + return nil +} + // writeOne performs a low level filesystem write for a single []byte // while advancing write positions and rolling files, if necessary func (d *diskQueue) writeOne(data []byte) error { @@ -383,6 +666,28 @@ func (d *diskQueue) writeOne(data []byte) error { return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize) } + totalBytes := int64(4 + dataLen) + reachedFileSizeLimit := false + + if d.enableDiskLimitation { + expectedBytesIncrease := totalBytes + // check if we will reach or surpass file size limit + if d.writePos+totalBytes+numFileMsgBytes >= d.maxBytesPerFile { + reachedFileSizeLimit = true + expectedBytesIncrease += numFileMsgBytes + } + + // free disk space if needed + err = d.checkDiskSpace(expectedBytesIncrease) + if err != nil { + return err + } + } else if d.writePos+totalBytes >= d.maxBytesPerFile { + reachedFileSizeLimit = true + } + + // add all data to writeBuf before writing to file + // this causes everything to be written to file or nothing d.writeBuf.Reset() err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) if err != nil { @@ -394,6 +699,15 @@ func (d *diskQueue) writeOne(data []byte) error { return err } + // check if we reached the file size limit with this message + if d.enableDiskLimitation && reachedFileSizeLimit { + // write number of messages in binary to file + err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1) + if err != nil { + return err + } + } + // only write to the file once _, err = d.writeFile.Write(d.writeBuf.Bytes()) if err != nil { @@ -402,11 +716,15 @@ func (d *diskQueue) writeOne(data []byte) error { return err } - totalBytes := int64(4 + dataLen) d.writePos += totalBytes d.depth += 1 - if d.writePos >= d.maxBytesPerFile { + if d.enableDiskLimitation { + d.totalDiskSpaceUsed += totalBytes + d.writeMessages += 1 + } + + if reachedFileSizeLimit { if d.readFileNum == d.writeFileNum { d.maxBytesPerFileRead = d.writePos } @@ -414,6 +732,10 @@ func (d *diskQueue) writeOne(data []byte) error { d.writeFileNum++ d.writePos = 0 + if d.enableDiskLimitation { + d.writeMessages = 0 + } + // sync every time we start writing to a new file err = d.sync() if err != nil { @@ -440,6 +762,10 @@ func (d *diskQueue) sync() error { } } + if d.enableDiskLimitation { + d.updateTotalDiskSpaceUsed() + } + err := d.persistMetaData() if err != nil { return err @@ -462,13 +788,23 @@ func (d *diskQueue) retrieveMetaData() error { defer f.Close() var depth int64 - _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", - &depth, - &d.readFileNum, &d.readPos, - &d.writeFileNum, &d.writePos) + // if user is using disk space limit feature + if d.enableDiskLimitation { + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + &depth, + &d.readFileNum, &d.readMessages, &d.readPos, + &d.writeFileNum, &d.writeMessages, &d.writePos) + } else { + _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", + &depth, + &d.readFileNum, &d.readPos, + &d.writeFileNum, &d.writePos) + } + if err != nil { return err } + d.depth = depth d.nextReadFileNum = d.readFileNum d.nextReadPos = d.readPos @@ -490,10 +826,18 @@ func (d *diskQueue) persistMetaData() error { return err } - _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", - d.depth, - d.readFileNum, d.readPos, - d.writeFileNum, d.writePos) + // if user is using disk space limit feature + if d.enableDiskLimitation { + _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + d.depth, + d.readFileNum, d.readMessages, d.readPos, + d.writeFileNum, d.writeMessages, d.writePos) + } else { + _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", + d.depth, + d.readFileNum, d.readPos, + d.writeFileNum, d.writePos) + } if err != nil { f.Close() return err @@ -553,24 +897,45 @@ func (d *diskQueue) checkTailCorruption(depth int64) { } } -func (d *diskQueue) moveForward() { +func (d *diskQueue) moveToNextReadFile() { oldReadFileNum := d.readFileNum d.readFileNum = d.nextReadFileNum d.readPos = d.nextReadPos - d.depth -= 1 // see if we need to clean up the old file if oldReadFileNum != d.nextReadFileNum { + // sync every time we start reading from a new file d.needSync = true fn := d.fileName(oldReadFileNum) + oldFileInfo, _ := os.Stat(fn) + err := os.Remove(fn) if err != nil { d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fn, err) + } else { + d.logf(INFO, "DISKQUEUE(%s) removed(%s) of size(%d bytes)", d.name, fn, oldFileInfo.Size()) } + + if d.enableDiskLimitation { + d.readMessages = 0 + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to update write bytes - %s", d.name, err) + } + } + } +} + +func (d *diskQueue) moveForward() { + d.depth -= 1 + + if d.enableDiskLimitation { + d.readMessages += 1 } + d.moveToNextReadFile() + d.checkTailCorruption(d.depth) } @@ -585,6 +950,10 @@ func (d *diskQueue) handleReadError() { } d.writeFileNum++ d.writePos = 0 + + if d.enableDiskLimitation { + d.writeMessages = 0 + } } badFn := d.fileName(d.readFileNum) @@ -605,9 +974,14 @@ func (d *diskQueue) handleReadError() { d.readPos = 0 d.nextReadFileNum = d.readFileNum d.nextReadPos = 0 + if d.enableDiskLimitation { + d.readMessages = 0 + } // significant state change, schedule a sync on the next iteration d.needSync = true + + d.checkTailCorruption(d.depth) } // ioLoop provides the backend for exposing a go channel (via ReadChan()) @@ -623,6 +997,7 @@ func (d *diskQueue) ioLoop() { var err error var count int64 var r chan []byte + var p chan []byte syncTicker := time.NewTicker(d.syncTimeout) @@ -651,13 +1026,16 @@ func (d *diskQueue) ioLoop() { } } r = d.readChan + p = d.peekChan } else { r = nil + p = nil } select { // the Go channel spec dictates that nil channel operations (read or write) // in a select are skipped, we set r to d.readChan only when there is data to read + case p <- dataRead: case r <- dataRead: count++ // moveForward sets needSync flag if a file is removed diff --git a/diskqueue_test.go b/diskqueue_test.go index ba5879c..ef43ccf 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -9,6 +9,7 @@ import ( "path" "path/filepath" "reflect" + "regexp" "runtime" "strconv" "sync" @@ -130,6 +131,83 @@ func TestDiskQueueRoll(t *testing.T) { } } +func TestDiskQueuePeek(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_peek" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + msg := bytes.Repeat([]byte{0}, 10) + ml := int64(len(msg)) + dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l) + defer dq.Close() + NotNil(t, dq) + Equal(t, int64(0), dq.Depth()) + + t.Run("roll", func(t *testing.T) { + for i := 0; i < 10; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(i+1), dq.Depth()) + } + + for i := 10; i > 0; i-- { + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(i-1), dq.Depth()) + } + + Nil(t, dq.Empty()) + }) + + t.Run("peek-read", func(t *testing.T) { + for i := 0; i < 10; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(i+1), dq.Depth()) + } + + for i := 10; i > 0; i-- { + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(i-1), dq.Depth()) + } + + Nil(t, dq.Empty()) + }) + + t.Run("read-peek", func(t *testing.T) { + for i := 0; i < 10; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(i+1), dq.Depth()) + } + + for i := 10; i > 1; i-- { + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(i-1), dq.Depth()) + + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i-1), dq.Depth()) + } + + Nil(t, dq.Empty()) + }) + +} + func assertFileNotExist(t *testing.T, fn string) { f, err := os.OpenFile(fn, os.O_RDONLY, 0600) Equal(t, (*os.File)(nil), f) @@ -231,6 +309,11 @@ func TestDiskQueueCorruption(t *testing.T) { Equal(t, msg, <-dq.ReadChan()) } + badFilesCount := numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 1 { + panic("fail") + } + // corrupt the 4th (current) file dqFn = dq.(*diskQueue).fileName(3) os.Truncate(dqFn, 100) @@ -238,6 +321,10 @@ func TestDiskQueueCorruption(t *testing.T) { dq.Put(msg) // in 5th file Equal(t, msg, <-dq.ReadChan()) + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 2 { + panic("fail") + } // write a corrupt (len 0) message at the 5th (current) file dq.(*diskQueue).writeFile.Write([]byte{0, 0, 0, 0}) @@ -247,17 +334,23 @@ func TestDiskQueueCorruption(t *testing.T) { dq.Put(msg) Equal(t, msg, <-dq.ReadChan()) + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 3 { + panic("fail") + } } type md struct { - depth int64 - readFileNum int64 - writeFileNum int64 - readPos int64 - writePos int64 + depth int64 + readFileNum int64 + writeFileNum int64 + readMessages int64 + writeMessages int64 + readPos int64 + writePos int64 } -func readMetaDataFile(fileName string, retried int) md { +func readMetaDataFile(fileName string, retried int, enableDiskLimitation bool) md { f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { // provide a simple retry that results in up to @@ -265,17 +358,25 @@ func readMetaDataFile(fileName string, retried int) md { if retried < 9 { retried++ time.Sleep(50 * time.Millisecond) - return readMetaDataFile(fileName, retried) + return readMetaDataFile(fileName, retried, enableDiskLimitation) } panic(err) } defer f.Close() var ret md - _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", - &ret.depth, - &ret.readFileNum, &ret.readPos, - &ret.writeFileNum, &ret.writePos) + + if enableDiskLimitation { + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + &ret.depth, + &ret.readFileNum, &ret.readMessages, &ret.readPos, + &ret.writeFileNum, &ret.writeMessages, &ret.writePos) + } else { + _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", + &ret.depth, + &ret.readFileNum, &ret.readPos, + &ret.writeFileNum, &ret.writePos) + } if err != nil { panic(err) } @@ -297,7 +398,7 @@ func TestDiskQueueSyncAfterRead(t *testing.T) { dq.Put(msg) for i := 0; i < 10; i++ { - d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0) + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, false) if d.depth == 1 && d.readFileNum == 0 && d.writeFileNum == 0 && @@ -315,7 +416,7 @@ next: <-dq.ReadChan() for i := 0; i < 10; i++ { - d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0) + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, false) if d.depth == 1 && d.readFileNum == 0 && d.writeFileNum == 0 && @@ -331,6 +432,567 @@ next: done: } +func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_read_with_disk_size_implementation" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + dq := NewWithDiskSpace(dqName, tmpDir, 6040, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + defer dq.Close() + + msgSize := 1000 + msg := make([]byte, msgSize) + dq.Put(msg) + + if dq.Depth() != 1 { + panic("fail") + } + + for i := 0; i < 10; i++ { + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 1 && + d.readFileNum == 0 && + d.writeFileNum == 0 && + d.readMessages == 0 && + d.writeMessages == 1 && + d.readPos == 0 && + d.writePos == 1004 && + dq.(*diskQueue).totalDiskSpaceUsed == 1004+maxMetaDataFileSize { + // success + goto next + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +next: + dq.Put(msg) + <-dq.ReadChan() + + if dq.Depth() != 1 { + panic("fail") + } + + for i := 0; i < 10; i++ { + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 1 && + d.readFileNum == 0 && + d.writeFileNum == 0 && + d.readMessages == 1 && + d.writeMessages == 2 && + d.readPos == 1004 && + d.writePos == 2008 && + dq.(*diskQueue).totalDiskSpaceUsed == 2008+maxMetaDataFileSize { + // success + goto completeWriteFile + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +completeWriteFile: + // meet the file size limit exactly (2048 bytes) when writeFileNum + // equals readFileNum + totalBytes := 2 * (msgSize + 4) + bytesRemaining := 2048 - (totalBytes + 8) + oneByteMsgSizeIncrease := 5 + dq.Put(make([]byte, bytesRemaining-4-oneByteMsgSizeIncrease)) + dq.Put(make([]byte, 1)) + + if dq.Depth() != 3 { + panic("fail") + } + + for i := 0; i < 10; i++ { + // test that write position and messages reset when a new file is created + // test the writeFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 3 && + d.readFileNum == 0 && + d.writeFileNum == 1 && + d.readMessages == 1 && + d.writeMessages == 0 && + d.readPos == 1004 && + d.writePos == 0 && + dq.(*diskQueue).totalDiskSpaceUsed == 2048+maxMetaDataFileSize { + // success + goto completeReadFile + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +completeReadFile: + dq.Put(msg) + + <-dq.ReadChan() + <-dq.ReadChan() + <-dq.ReadChan() + + if dq.Depth() != 1 { + panic("fail") + } + + for i := 0; i < 10; i++ { + // test that read position and messages reset when a file is completely read + // test the readFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 1 && + d.readFileNum == 1 && + d.writeFileNum == 1 && + d.readMessages == 0 && + d.writeMessages == 1 && + d.readPos == 0 && + d.writePos == 1004 && + dq.(*diskQueue).totalDiskSpaceUsed == 1004+maxMetaDataFileSize { + // success + goto completeWriteFileAgain + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +completeWriteFileAgain: + // make writeFileNum ahead of readFileNum + dq.Put(msg) + dq.Put(msg) + + // meet the file size limit exactly (2048 bytes) when writeFileNum + // is ahead of readFileNum + dq.Put(msg) + dq.Put(msg) + dq.Put(make([]byte, bytesRemaining-4-oneByteMsgSizeIncrease)) + dq.Put(make([]byte, 1)) + + if dq.Depth() != 7 { + panic("fail") + } + + for i := 0; i < 10; i++ { + // test that write position and messages reset when a file is completely read + // test the writeFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 7 && + d.readFileNum == 1 && + d.writeFileNum == 3 && + d.readMessages == 0 && + d.writeMessages == 0 && + d.readPos == 0 && + d.writePos == 0 && + dq.(*diskQueue).totalDiskSpaceUsed == 5068+maxMetaDataFileSize { + // success + goto completeReadFileAgain + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +completeReadFileAgain: + <-dq.ReadChan() + <-dq.ReadChan() + <-dq.ReadChan() + + <-dq.ReadChan() + <-dq.ReadChan() + <-dq.ReadChan() + <-dq.ReadChan() + + if dq.Depth() != 0 { + panic("fail") + } + + for i := 0; i < 10; i++ { + // test that read position and messages reset when a file is completely read + // test the readFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 0 && + d.readFileNum == 3 && + d.writeFileNum == 3 && + d.readMessages == 0 && + d.writeMessages == 0 && + d.readPos == 0 && + d.writePos == 0 && + dq.(*diskQueue).totalDiskSpaceUsed == maxMetaDataFileSize { + // success + goto done + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +done: +} + +func TestDiskSizeImplementationDiskSizeLimit(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_implementation_disk_size_limit" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + dq := NewWithDiskSpace(dqName, tmpDir, 6040, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + defer dq.Close() + + msgSize := 1000 + msg := make([]byte, msgSize) + + // meet disk size limit + // write a complete file + dq.Put(msg) + dq.Put(msg) + dq.Put(msg) + + // meet the disk size limit exactly (6040 bytes) when writeFileNum + // is ahead of readFileNum + dq.Put(msg) + dq.Put(msg) + + totalDiskBytes := int64(5*(msgSize+4) + 8) + + // save space for msg len and number of msgs in file + diskBytesRemaining := 6040 - maxMetaDataFileSize - (totalDiskBytes + 12) + dq.Put(make([]byte, diskBytesRemaining)) + + depth := dq.Depth() + if depth != 6 { + panic("fail") + } + + for i := 0; i < 10; i++ { + // test that read position and messages reset when a file is completely read + // test the readFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 6 && + d.readFileNum == 0 && + d.writeFileNum == 2 && + d.readMessages == 0 && + d.writeMessages == 0 && + d.readPos == 0 && + d.writePos == 0 && + dq.(*diskQueue).totalDiskSpaceUsed == 6040 { + // success + goto surpassDiskSizeLimit + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +surpassDiskSizeLimit: + dq.Put(make([]byte, 1)) + + if dq.Depth() != 4 { + panic("fail") + } + + for i := 0; i < 10; i++ { + // test that read position and messages reset when a file is completely read + // test the readFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 4 && + d.readFileNum == 1 && + d.writeFileNum == 2 && + d.readMessages == 0 && + d.writeMessages == 1 && + d.readPos == 0 && + d.writePos == 5 && + dq.(*diskQueue).totalDiskSpaceUsed == 3025 { + // success + goto done + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +done: +} + +func TestDiskSizeImplementationMsgSizeGreaterThanFileSize(t *testing.T) { + // write three files + + l := NewTestLogger(t) + dqName := "test_disk_queue_implementation_msg_size_greater_than_file_size" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + dq := NewWithDiskSpace(dqName, tmpDir, 1<<12, 1<<10, 0, 1<<12, 2500, 50*time.Millisecond, l) + defer dq.Close() + + msgSize := 1000 + msg := make([]byte, msgSize) + + // file size: 1496 + dq.Put(msg) + dq.Put(make([]byte, 480)) + + // file size: 1032 + dq.Put(msg) + dq.Put(make([]byte, 16)) + + // file size: 1512 + dq.Put(make([]byte, 1500)) + + if dq.Depth() != 5 { + panic("fail") + } + + for i := 0; i < 10; i++ { + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 5 && + d.readFileNum == 0 && + d.writeFileNum == 3 && + d.readMessages == 0 && + d.writeMessages == 0 && + d.readPos == 0 && + d.writePos == 0 && + dq.(*diskQueue).totalDiskSpaceUsed == 4040+maxMetaDataFileSize { + // success + goto writeLargeMsg + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +writeLargeMsg: + // Write a large message that causes the deletion of three files + dq.Put(make([]byte, 3000)) + + if dq.Depth() != 1 { + panic("fail") + } + + for i := 0; i < 10; i++ { + // test that read position and messages reset when a file is completely read + // test the readFileNum correctly increments + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 1 && + d.readFileNum == 3 && + d.writeFileNum == 4 && + d.readMessages == 0 && + d.writeMessages == 0 && + d.readPos == 0 && + d.writePos == 0 && + dq.(*diskQueue).totalDiskSpaceUsed == 3012+maxMetaDataFileSize { + // success + goto done + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +done: +} + +func createBadFile(dqName string, filePath string, fileNum int64, numBytes int) error { + fn := fmt.Sprintf(path.Join(filePath, "%s.diskqueue.%06d.dat.bad"), dqName, fileNum) + + badFile, err := os.OpenFile(fn, os.O_RDWR|os.O_CREATE, 0600) + if err != nil { + return err + } + + defer badFile.Close() + + _, err = badFile.Write(make([]byte, numBytes)) + + return err +} + +func numberOfBadFiles(diskQueueName string, dataPath string) int64 { + var badFilesCount int64 + + fileInfos, _ := ioutil.ReadDir(dataPath) + for _, fileInfo := range fileInfos { + regExp, _ := regexp.Compile(`^` + diskQueueName + `.diskqueue.\d\d\d\d\d\d.dat.bad$`) + if regExp.MatchString(fileInfo.Name()) { + badFilesCount++ + } + } + + return badFilesCount +} + +func TestDiskSizeImplementationWithBadFiles(t *testing.T) { + // write three files + + l := NewTestLogger(t) + dqName := "test_disk_queue_implementation_with_bad_files" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + + // there should be no .bad files + var badFilesCount int64 + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 0 { + panic("fail") + } + + // make 2 bad files + createBadFile(dqName, tmpDir, 0, 1503) + createBadFile(dqName, tmpDir, 1, 1032) + + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 2 { + panic("fail") + } + + dq := NewWithDiskSpace(dqName, tmpDir, 1<<12, 1<<10, 10, 1600, 2500, 50*time.Millisecond, l) + defer dq.Close() + + msgSize := 1000 + msg := make([]byte, msgSize) + + // file 0 size: 1497 + dq.Put(msg) + dq.Put(make([]byte, 481)) + + // no bad files should have been deleted + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 2 { + panic("fail") + } + + // file 1 size: 1032 + dq.Put(msg) + dq.Put(make([]byte, 16)) + + // one .bad file should be deleted in order to make space + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 1 { + panic("fail") + } + + // file 2 size: 1503 + dq.Put(make([]byte, 1491)) + + // check if all the .bad files were deleted + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 0 { + panic("fail") + } + + depth := dq.Depth() + if depth != 5 { + panic("fail") + } + + for i := 0; i < 10; i++ { + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.depth == 5 && + d.readFileNum == 0 && + d.writeFileNum == 3 && + d.readMessages == 0 && + d.writeMessages == 0 && + d.readPos == 0 && + d.writePos == 0 && + dq.(*diskQueue).totalDiskSpaceUsed == 4032+maxMetaDataFileSize { + // success + goto corruptFiles + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +corruptFiles: + // test removeReadFile when file is corrupted + // create bad files see if totalDiskSpaceUsed is updated properly + // check that after corrupting files, we make space appropriately + + // corrupt file 0 + dqFn := dq.(*diskQueue).fileName(0) + os.Truncate(dqFn, 1017) // 1 valid message, 1 corrupted message + + dq.Put(make([]byte, 100)) + + // check if the .bad files were deleted + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 0 { + panic("fail") + } + + for i := 0; i < 10; i++ { + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.readFileNum == 1 && + d.writeFileNum == 3 && + d.readMessages == 0 && + d.writeMessages == 1 && + d.readPos == 0 && + d.writePos == 104 && + dq.(*diskQueue).totalDiskSpaceUsed == 2639+maxMetaDataFileSize { + // success + goto readCorruptedFile + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +readCorruptedFile: + // test handleReadError + + // there should be no "bad" files at this point + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 0 { + panic("fail") + } + + // corrupt file 2 + // have readOne turn it into a bad file and then try to make space + dqFn = dq.(*diskQueue).fileName(2) + os.Truncate(dqFn, 1020) + + // read file 1 + <-dq.ReadChan() + <-dq.ReadChan() + + // wait for DiskQueue to notice that file 2 is corrupted + time.Sleep(20 * time.Millisecond) + + // check if the file was converted into a .bad file + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 1 { + panic("fail") + } + + // go over the disk limit + dq.Put(msg) + + // write a complete file + dq.Put(msg) + dq.Put(msg) + + // check if the corrupted file was deleted to make space + badFilesCount = numberOfBadFiles(dqName, tmpDir) + if badFilesCount != 0 { + panic("fail") + } + + for i := 0; i < 10; i++ { + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) + if d.readFileNum == 3 && + d.writeFileNum == 5 && + d.readMessages == 0 && + d.writeMessages == 0 && + d.readPos == 0 && + d.writePos == 0 && + dq.(*diskQueue).totalDiskSpaceUsed == 3132+maxMetaDataFileSize { + // success + goto done + } + time.Sleep(100 * time.Millisecond) + } + panic("fail") + +done: +} + func TestDiskQueueTorture(t *testing.T) { var wg sync.WaitGroup diff --git a/go.mod b/go.mod index 30a1fc9..7e4f9b8 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ -module github.com/nsqio/go-diskqueue +module github.com/kev1n80/go-diskqueue -go 1.13 +go 1.21