From bc4354e287663dbda4e1393d568d0628f40feb47 Mon Sep 17 00:00:00 2001 From: vgerman-softheme Date: Fri, 14 Apr 2017 22:50:41 +0300 Subject: [PATCH] Parallel input file split implemented. Some file close glitches are still present. --- Common/Algorithm.cs | 48 +++++++++ Common/Common.csproj | 2 + Common/Constants.cs | 4 +- Common/OffsetLength.cs | 9 ++ DataGenerator/Program.cs | 6 +- ExternalSort.Tests/Generic.cs | 13 +++ ExternalSort/App.config | 1 + ExternalSort/LineComparer.cs | 2 +- ExternalSort/MergeSort.cs | 189 +++++++++++++++++++++++----------- ExternalSort/Program.cs | 18 ++-- ExternalSort/Settings.cs | 9 +- 11 files changed, 227 insertions(+), 74 deletions(-) create mode 100644 Common/Algorithm.cs create mode 100644 Common/OffsetLength.cs diff --git a/Common/Algorithm.cs b/Common/Algorithm.cs new file mode 100644 index 0000000..5ca36c2 --- /dev/null +++ b/Common/Algorithm.cs @@ -0,0 +1,48 @@ +using System; +using System.Text; + +namespace Common +{ + public static class Algorithm + { + public static readonly byte[] EolUtf8Bytes = Encoding.UTF8.GetBytes(Environment.NewLine); + + /// + /// Finds subsequence of bytes inside another byte array + /// + /// + /// + /// Offset of the subsequence or -1 of the subsequence was not found + public static int FindByteSubsequence(byte[] buffer, byte[] searchedBytes) + { + for (var i = 0; i < buffer.Length - searchedBytes.Length; ++i) + { + var match = false; + for (var j = 0; j < searchedBytes.Length; ++j) + { + if (buffer[i + j] == searchedBytes[j]) + { + match = true; + } + else + { + match = false; + break; + } + } + + if (match) + { + return i; + } + } + + return -1; + } + + public static int FindEolOffset(byte[] buffer) + { + return FindByteSubsequence(buffer, EolUtf8Bytes); + } + } +} diff --git a/Common/Common.csproj b/Common/Common.csproj index 2a3f25d..3b3056a 100644 --- a/Common/Common.csproj +++ b/Common/Common.csproj @@ -41,9 +41,11 @@ + + diff --git a/Common/Constants.cs b/Common/Constants.cs index 551ce2d..8d0444c 100644 --- a/Common/Constants.cs +++ b/Common/Constants.cs @@ -2,8 +2,8 @@ { public static class Constants { - public const uint Mb = 1024 * 1024; + public const uint MB = 1024 * 1024; - public const uint Gb = 1024 * Mb; + public const uint GB = 1024 * MB; } } diff --git a/Common/OffsetLength.cs b/Common/OffsetLength.cs new file mode 100644 index 0000000..43a2988 --- /dev/null +++ b/Common/OffsetLength.cs @@ -0,0 +1,9 @@ +namespace Common +{ + public class OffsetLength + { + public T Offset { get; set; } + + public T Length { get; set; } + } +} diff --git a/DataGenerator/Program.cs b/DataGenerator/Program.cs index fb18c13..92df968 100644 --- a/DataGenerator/Program.cs +++ b/DataGenerator/Program.cs @@ -13,7 +13,7 @@ namespace DataGenerator { public class Program { - private const ulong DafaultSize = Constants.Mb; + private const ulong DafaultSize = Constants.MB; private const int OutFileBuffer = 128 * 1024; private const int ChunkSize = 128; @@ -47,11 +47,11 @@ static void Main(string[] args) var lowArg = args[0].ToLowerInvariant(); if (lowArg.EndsWith("mb")) { - outSize = (ulong)size * Constants.Mb; + outSize = (ulong)size * Constants.MB; } else if (lowArg.EndsWith("gb")) { - outSize = (ulong)size * Constants.Gb; + outSize = (ulong)size * Constants.GB; } else { diff --git a/ExternalSort.Tests/Generic.cs b/ExternalSort.Tests/Generic.cs index 4e89f98..ffd385c 100644 --- a/ExternalSort.Tests/Generic.cs +++ b/ExternalSort.Tests/Generic.cs @@ -83,5 +83,18 @@ public void TestBadLinesCopmparer() Assert.AreEqual("3. Apple", lines.First()); Assert.AreEqual("End!", lines.Last()); } + + [TestMethod] + public void TestFindSubByte() + { + var hello = "Hello"; + var input = hello + Environment.NewLine + "!"; + + var endlineBytes = Encoding.UTF8.GetBytes(Environment.NewLine); + var inputBytes = Encoding.UTF8.GetBytes(input); + + var found = Algorithm.FindByteSubsequence(inputBytes, endlineBytes); + Assert.AreEqual(hello.Length, found); + } } } diff --git a/ExternalSort/App.config b/ExternalSort/App.config index ea019a8..78cb1f0 100644 --- a/ExternalSort/App.config +++ b/ExternalSort/App.config @@ -5,5 +5,6 @@ + diff --git a/ExternalSort/LineComparer.cs b/ExternalSort/LineComparer.cs index f98a007..7acae7d 100644 --- a/ExternalSort/LineComparer.cs +++ b/ExternalSort/LineComparer.cs @@ -7,7 +7,7 @@ namespace ExternalSort public class LineComparer : IComparer { /// - /// This method tries hart to follow the spec. rules but nevertheless it never fails. + /// This method tries hard to follow the specs but nevertheless it never fails. /// /// /// diff --git a/ExternalSort/MergeSort.cs b/ExternalSort/MergeSort.cs index 2c42a42..b4c6804 100644 --- a/ExternalSort/MergeSort.cs +++ b/ExternalSort/MergeSort.cs @@ -1,10 +1,12 @@ using System; using System.Collections.Generic; +using System.Configuration; using System.Diagnostics; using System.IO; using System.IO.Compression; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; using Common; @@ -36,7 +38,7 @@ public async Task MergeSortFile(string inputFile, string outputFile) var inputFileLength = new FileInfo(inputFile).Length; _inputFileLength = inputFileLength; - var files = await Split(inputFile, Path.GetTempPath()); + var files = Split(inputFile, Path.GetTempPath()); await MergeSortFiles(files.Item2, files.Item1, outputFile, Comparator); foreach (var file in files.Item2) @@ -101,61 +103,48 @@ private async Task MergeSortFiles(IList files, long totalLines, string o } } - private async Task LinesWriter(IList lines, string tempPath) + private async Task LinesWriter(IList lines, string fileName) { - var tempFileName = Path.Combine(tempPath, Path.GetRandomFileName()); - using (var stringStream = OpenForAsyncTextWrite(tempFileName)) + using (var stringStream = OpenForAsyncTextWrite(fileName)) { foreach (var line in lines) { await stringStream.WriteLineAsync(line); } } - - return tempFileName; } - private async Task>> Split(string file, string tempLocationPath) + private Tuple> Split(string file, string tempLocationPath) { var files = new List(); - var writeTask = Task.CompletedTask; var lineCount = 0L; + var maxProcessors = Settings.MaxProcessors; + var maxFileSize = (long)Settings.MaxMemoryUsageBytes / maxProcessors; - using (new AutoStopwatch("Creating temp files", _inputFileLength)) + using (new AutoStopwatch($"Creating temp files using {maxProcessors} threads", _inputFileLength)) { - using (var countedList = new CountedList(Settings.MaxMemoryUsageBytes / 2)) + using (var bigInputFile = OpenForAsyncRead(file)) { - countedList.MaxIntemsReached += (fullList, bytesCount) => - { - writeTask.Wait(); - writeTask = Task.Run(async () => - { - fullList.Sort(_comparer); - files.Add(await LinesWriter(fullList, tempLocationPath)); - }); - }; + var rangeList = TextLinesAwareSplit(bigInputFile, maxFileSize); + bigInputFile.Close(); + MakeTempFiles(files, rangeList.Count, tempLocationPath); - var bigInputFile = OpenForAsyncRead(file); - using (var inputStream = new StreamReader(bigInputFile)) + Parallel.ForEach(rangeList, + new ParallelOptions { MaxDegreeOfParallelism = maxProcessors }, + (range, state, id) => { - var done = false; - while (!done) + List linesList; + using (var inputFile = new StreamReader(OpenForAsyncRead(file))) { - var res = await ReadLines(inputStream); - var lines = res.Item1; - done = res.Item2; - - lineCount += lines.Count; - - foreach (var line in lines) - { - countedList.Add(line); - } + linesList = ReadAllLinesInRange(inputFile, range).Result; + linesList.Sort(_comparer); } - } + + Interlocked.Add(ref lineCount, linesList.Count); + LinesWriter(linesList, files[(int)id]).Wait(); + }); } - await writeTask; if (lineCount > 0) { var bytesPerLine = _inputFileLength / lineCount; @@ -166,41 +155,25 @@ private async Task>> Split(string file, string tempLoc } } - Console.WriteLine("{0} files created. Total lines {1}. Max Queue size {2}", files.Count, lineCount, Settings.MaxQueueRecords); + Console.WriteLine("{0} files created. Total lines {1}. Max Queue size {2}. Max file size {3}", files.Count, lineCount, Settings.MaxQueueRecords, maxFileSize); return new Tuple>(lineCount, files); } - private async Task, bool>> ReadLines(StreamReader reader, int maxCount = 1024) + private async Task> ReadAllLinesInRange(StreamReader reader, OffsetLength range) { var result = new List(); var done = false; - for (int i = 0; i < maxCount; i++) + var endOffset = range.Offset + range.Length; + + while (reader.BaseStream.Position < endOffset) { var line = await reader.ReadLineAsync(); - if (line != null) - { - result.Add(line); - } - else - { - done = true; - } - } - - return new Tuple, bool>(result, done); - } + Debug.Assert(line != null, "Sanity check"); - private static void AddToQueue(IDictionary> sortedChunks, AutoFileQueue queue) - { - var newTop = queue.Peek(); - if (sortedChunks.ContainsKey(newTop)) - { - sortedChunks[newTop].Add(queue); - } - else - { - sortedChunks.Add(newTop, new List { queue }); + result.Add(line); } + + return new List(result); } private List NWayMerge(IDictionary> sortedChunks, int maxCount = 1024) @@ -246,6 +219,74 @@ private List NWayMerge(IDictionary> sortedCh return outList; } + private IList> TextLinesAwareSplit(Stream input, long maxChunkLenght) + { + var length = input.Length; + var partsCount = length / maxChunkLenght + length % maxChunkLenght > 0 ? 1 : 0; + var partList = new List>(partsCount); + + for (var i = 0L; i < length; i += maxChunkLenght) + { + var maxLength = Math.Min(maxChunkLenght, length - i); + partList.Add(new OffsetLength { Offset = i, Length = maxLength }); + } + + for (var j = 0; j < partList.Count - 1; ++j) + { + var current = partList[j]; + var next = partList[j + 1]; + + var endOffset = current.Offset + current.Length; + Debug.Assert(next.Offset == endOffset, "Sanity check"); + + var offsetDelta = SeekForwardEolOffset(input, endOffset); + if (offsetDelta != 0) + { + current.Length += offsetDelta; + next.Offset += offsetDelta; + next.Length -= offsetDelta; + } + } + + return partList; + } + + private long SeekForwardEolOffset(Stream input, long startPos) + { + var plusOffset = 0L; + if (startPos >= input.Length) + { + throw new ArgumentException("Offset is out of file bounds!"); + } + + input.Seek(startPos, SeekOrigin.Begin); + + var buffer = new byte[256]; + var found = false; + while (input.CanRead && !found) + { + var maxRead = (int)Math.Min(buffer.Length, input.Length - plusOffset); + var readThisTime = input.Read(buffer, 0, maxRead); + if (readThisTime <= 0) + { + throw new IOException("Unexpected EOF reached"); + } + + var resultOffset = Algorithm.FindEolOffset(buffer); + if (resultOffset >= 0) + { + plusOffset += resultOffset; + found = true; + } + else + { + plusOffset += readThisTime; + } + } + + return found ? plusOffset : -1; + } + private StreamWriter OpenForAsyncTextWrite(string fileName) { var fileStream = OpenForAsyncWrite(fileName); @@ -271,5 +312,33 @@ private static FileStream OpenForAsyncRead(string fileName) { return new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read, FileBufferSize, FileOptions.Asynchronous); } + + private static void MakeTempFiles(IList destination, int count, string tempLocationPath) + { + destination.Clear(); + while (destination.Count < count) + { + var tempFileName = Path.Combine(tempLocationPath, Path.GetRandomFileName()); + if (File.Exists(tempFileName)) + { + continue; + } + + destination.Add(tempFileName); + } + } + + private static void AddToQueue(IDictionary> sortedChunks, AutoFileQueue queue) + { + var newTop = queue.Peek(); + if (sortedChunks.ContainsKey(newTop)) + { + sortedChunks[newTop].Add(queue); + } + else + { + sortedChunks.Add(newTop, new List { queue }); + } + } } } diff --git a/ExternalSort/Program.cs b/ExternalSort/Program.cs index 59e29ab..4889a2e 100644 --- a/ExternalSort/Program.cs +++ b/ExternalSort/Program.cs @@ -39,14 +39,20 @@ static void Main(string[] args) var appSettings = ConfigurationManager.AppSettings; var deflate = appSettings["DeflateTemp"]; + var maxMem = appSettings["MaxMemoryUsageBytes"]; + var st = new Settings + { + OrdinalStringSortOrder = option.StartsWith("/ord"), + DeflateTempFiles = deflate == "true", + }; - var ms = new MergeSort( - new Settings - { - OrdinalStringSortOrder = option.StartsWith("/ord"), - DeflateTempFiles = deflate == "true", - }); + ulong mmem; + if (ulong.TryParse(maxMem, out mmem)) + { + st.MaxMemoryUsageBytes = mmem; + } + var ms = new MergeSort(st); ms.MergeSortFile(imputFile, outputFile).Wait(); } } diff --git a/ExternalSort/Settings.cs b/ExternalSort/Settings.cs index e68e43f..1dda981 100644 --- a/ExternalSort/Settings.cs +++ b/ExternalSort/Settings.cs @@ -1,4 +1,5 @@ using System; +using Common; namespace ExternalSort { @@ -6,8 +7,8 @@ public class Settings { public Settings() { - //512 MB - MaxMemoryUsageBytes = 512 * 1024 * 1024; + //2 GB will work for 32 bit systems! + MaxMemoryUsageBytes = 2 * Constants.GB; // Safe value MaxQueueRecords = 1000; @@ -15,12 +16,16 @@ public Settings() OrdinalStringSortOrder = false; DeflateTempFiles = true; + + MaxProcessors = Environment.ProcessorCount; } public ulong MaxMemoryUsageBytes { get; set; } public int MaxQueueRecords { get; set; } + public int MaxProcessors { get; set; } + public bool OrdinalStringSortOrder { get; set; } public bool DeflateTempFiles { get; set; }