Skip to content

Commit 089add0

Browse files
author
Andrius Mozūraitis
committed
Improved overal quality of fileGroupBy function
1 parent 93e4451 commit 089add0

File tree

3 files changed

+109
-20
lines changed

3 files changed

+109
-20
lines changed

docs/data/data.json

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,6 +1128,93 @@
11281128
"canonicalReferenceGroup": "iterparse!DownloadOptions",
11291129
"tags": []
11301130
},
1131+
{
1132+
"members": [
1133+
{
1134+
"kind": "Function",
1135+
"comment": {
1136+
"description": "",
1137+
"examples": [],
1138+
"parsed": []
1139+
},
1140+
"excerptTokens": [
1141+
{
1142+
"kind": "Content",
1143+
"text": "export declare function fileGroupBy<T>(args: "
1144+
},
1145+
{
1146+
"kind": "Reference",
1147+
"text": "FileGroupByOptions",
1148+
"canonicalReference": "iterparse!FileGroupByOptions:interface"
1149+
},
1150+
{
1151+
"kind": "Content",
1152+
"text": "<T>"
1153+
},
1154+
{
1155+
"kind": "Content",
1156+
"text": "): "
1157+
},
1158+
{
1159+
"kind": "Reference",
1160+
"text": "AsyncIterable",
1161+
"canonicalReference": "ix!AsyncIterableX:class"
1162+
},
1163+
{
1164+
"kind": "Content",
1165+
"text": "<"
1166+
},
1167+
{
1168+
"kind": "Reference",
1169+
"text": "GroupedItems",
1170+
"canonicalReference": "iterparse!GroupedItems:interface"
1171+
},
1172+
{
1173+
"kind": "Content",
1174+
"text": "<T>>"
1175+
},
1176+
{
1177+
"kind": "Content",
1178+
"text": ";"
1179+
}
1180+
],
1181+
"canonicalReference": "iterparse!fileGroupBy:function(1)",
1182+
"canonicalReferenceGroup": "iterparse!fileGroupBy"
1183+
}
1184+
],
1185+
"kind": "Function",
1186+
"name": "fileGroupBy",
1187+
"canonicalReference": "iterparse!fileGroupBy:function(1)",
1188+
"package": "iterparse",
1189+
"canonicalReferenceGroup": "iterparse!fileGroupBy",
1190+
"tags": []
1191+
},
1192+
{
1193+
"members": [
1194+
{
1195+
"kind": "Interface",
1196+
"comment": {
1197+
"description": "",
1198+
"examples": [],
1199+
"parsed": []
1200+
},
1201+
"excerptTokens": [
1202+
{
1203+
"kind": "Content",
1204+
"text": "export interface FileGroupByOptions<T> "
1205+
}
1206+
],
1207+
"canonicalReference": "iterparse!FileGroupByOptions:interface",
1208+
"canonicalReferenceGroup": "iterparse!FileGroupByOptions"
1209+
}
1210+
],
1211+
"kind": "Interface",
1212+
"name": "FileGroupByOptions",
1213+
"canonicalReference": "iterparse!FileGroupByOptions:interface",
1214+
"package": "iterparse",
1215+
"canonicalReferenceGroup": "iterparse!FileGroupByOptions",
1216+
"tags": []
1217+
},
11311218
{
11321219
"members": [
11331220
{

src/fileGroupBy.ts

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ class GroupingProgressDisplay {
1919
return `Grouping idle...`
2020
}
2121
if (this.progress.state === 'GROUPING') {
22-
return `Grouping, Items: ${this.progress.groupedItems.toLocaleString()}, Total Groups: ${json.groupedGroups}, Speed: ${json.groupingSpeed}, Grouped Size: ${formatBytes(json.groupedBytes)}, Memory: ${formatBytes(json.memory)}`
22+
return `Grouping, Items: ${this.progress.groupedItems.toLocaleString()}, Total Groups: ${json.groupedGroups}, Grouped Size: ${formatBytes(json.groupedBytes)}, Memory: ${formatBytes(json.memory)}`
2323
}
24-
return `Reading, Progress: ${((json.readedItems/json.groupedItems) * 100).toFixed(2)}%, Items: ${this.progress.readedItems.toLocaleString()}/${json.groupedItems.toLocaleString()}, Groups: ${json.readedGroups}/${json.groupedGroups}, Speed: ${json.readingSpeed}, Read: ${formatBytes(json.readedBytes)}, Memory: ${formatBytes(json.memory)}`
24+
return `Reading, Progress: ${((json.readedItems/json.groupedItems) * 100).toFixed(2)}%, Groups: ${json.readedGroups}/${json.groupedGroups}, Memory: ${formatBytes(json.memory)}`
2525
}
2626

2727
toJSON() {
@@ -31,14 +31,12 @@ class GroupingProgressDisplay {
3131

3232
const readingDiff = Math.floor(Date.now() - this.progress.parsingStartTime)
3333
const readingBytesPerMs = Math.floor(this.progress.groupedBytes / readingDiff) || 0
34-
const readingBytesPerSecond = Math.floor(groupingBytesPerMs * 1000)
34+
const readingBytesPerSecond = Math.floor(readingBytesPerMs * 1000)
3535

3636
return {
3737
state: this.progress.state,
3838
groupingBytesPerSecond,
3939
readingBytesPerSecond,
40-
groupingSpeed: `${formatBytes(groupingBytesPerSecond)}/s`,
41-
readingSpeed: `${formatBytes(readingBytesPerSecond)}/s`,
4240
memory: process.memoryUsage().heapUsed,
4341
groupingStartTime: this.progress.parsingStartTime,
4442
groupingStopTime: this.progress.parsingStopTime,
@@ -143,11 +141,7 @@ export interface FileGroupByOptions<T> {
143141
export function fileGroupBy<T>(args: FileGroupByOptions<T>): AsyncIterable<GroupedItems<T>> {
144142
const progress: GroupingProgress = new GroupingProgress()
145143

146-
const interval = setInterval(() => {
147-
args.progress?.(new GroupingProgressDisplay(progress))
148-
}, args.progressFrequency || 1000)
149-
150-
144+
let interval: NodeJS.Timeout | undefined
151145
async function* groupProcess() {
152146
const tmpFile = tmpNameSync()
153147
const encoding = 'utf8'
@@ -161,9 +155,13 @@ export function fileGroupBy<T>(args: FileGroupByOptions<T>): AsyncIterable<Group
161155
groups: new Map(),
162156
lastPosition: 0,
163157
}
164-
158+
interval = setInterval(() => {
159+
if (progress.state === 'IDLE') return
160+
args.progress?.(new GroupingProgressDisplay(progress))
161+
}, args.progressFrequency || 1000)
165162

166163
for await (const value of args.source) {
164+
167165
if (progress.groupedGroups === 0) {
168166
progress.start('GROUPING')
169167
}
@@ -172,19 +170,22 @@ export function fileGroupBy<T>(args: FileGroupByOptions<T>): AsyncIterable<Group
172170
const groupId = args.groupingFn(value).toString()
173171
await appendFile(fd, parsedValue, { encoding })
174172

175-
const group = groupFileMap.groups.get(groupId) || []
176-
177173

178174
progress.addItem(1)
179175
progress.addChunk(size)
180-
181-
group.push([groupFileMap.lastPosition, size])
182-
groupFileMap.groups.set(groupId, group)
176+
const group = groupFileMap.groups.get(groupId)
177+
if (group == null) {
178+
progress.addGroup(1)
179+
}
180+
181+
const newGroup = group || []
182+
newGroup.push([groupFileMap.lastPosition, size])
183+
groupFileMap.groups.set(groupId, newGroup)
183184
groupFileMap.lastPosition = groupFileMap.lastPosition + size
184185
continue
185186
}
186187

187-
188+
args.progress?.(new GroupingProgressDisplay(progress))
188189
progress.start('READING')
189190

190191
for (const [groupId, mapData] of groupFileMap.groups) {
@@ -193,14 +194,14 @@ export function fileGroupBy<T>(args: FileGroupByOptions<T>): AsyncIterable<Group
193194
key: groupId,
194195
items: await Promise.all(
195196
mapData.map(async ([location ,size])=> {
196-
progress.readItem()
197+
progress.readItem(1)
197198
progress.readChunk(size)
198199
const buffer = Buffer.alloc(size)
199200
await read(fd, buffer, 0, size, location)
200201
const item = P.canFail(() => {
201202
return JSON.parse(buffer.toString(encoding))
202203
})
203-
if (P.canFail(item)) {
204+
if (P.isError(item)) {
204205
throw new Error(`Critical error: something went wrong in grouping process`)
205206
}
206207
return item as T
@@ -213,6 +214,7 @@ export function fileGroupBy<T>(args: FileGroupByOptions<T>): AsyncIterable<Group
213214
}
214215

215216
return AsyncIterable.from(groupProcess()).finally(()=>{
217+
if (interval == null) return
216218
clearInterval(interval)
217219
})
218220
}

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ export { jsonRead, jsonWrite, JSONReadOptions, JSONWriteOptions, } from './json'
44
export { xmlRead, xmlWrite, XMLReadOptions, XMLWriteOptions } from './xml'
55
export { download, DownloadOptions } from './download'
66
export { cacheIter, CacheIterOptions } from './cache'
7-
export { fileGroupBy, FileGroupOptions } from './fileGroupBy'
7+
export { fileGroupBy, FileGroupByOptions} from './fileGroupBy'
88

99
export { AnyIterable, AnyIterableValueOf } from './types'

0 commit comments

Comments
 (0)