Skip to content

Commit 1efa70e

Browse files
author
Andrius Mozūraitis
committed
Fixed XML parse bug
1 parent 086d0c4 commit 1efa70e

File tree

4 files changed

+206
-5
lines changed

4 files changed

+206
-5
lines changed

src/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ export { jsonRead, jsonWrite, JSONReadOptions, JSONWriteOptions, } from './json'
44
export { xmlRead, xmlWrite, XMLReadOptions, XMLWriteOptions } from './xml'
55
export { download, DownloadOptions } from './download'
66
export { cacheIter, CacheIterOptions } from './cache'
7-
export { fileGroupBy, FileGroupByOptions} from './fileGroupBy'
8-
9-
export { AnyIterable, AnyIterableValueOf } from './types'
7+
export { fileGroupBy, FileGroupByOptions } from './fileGroupBy'
8+
export { AnyIterable, AnyIterableValueOf } from './types'
9+
export { TrailingGroupByArgs, TrailingMapArgs, onDone, onProgress, trailingGroupBy, trailingMap } from './iteratorHelpers'

src/iteratorHelpers.ts

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
import { AsyncIterable } from "ix"
2+
import * as P from 'ts-prime'
3+
import { GroupedItems } from "./fileGroupBy"
4+
import { AnyIterable } from "./types"
5+
6+
7+
export interface TrailingGroupByArgs<T> {
8+
groupBy: ((data: T) => string | number) | ((data: T) => Promise<string | number>);
9+
maxGroupSize: number;
10+
totalItemsInMemory: number
11+
}
12+
13+
function _trailingGroupBy<T>(data: globalThis.AsyncIterable<T>, args: TrailingGroupByArgs<T>) {
14+
async function* iter() {
15+
const stats = {
16+
totalItemsInMemory: 0
17+
}
18+
const groups: Record<string, Array<T>> = {}
19+
for await (const item of data) {
20+
const id = await args.groupBy(item).toString()
21+
if (groups[id] == null) {
22+
groups[id] = []
23+
}
24+
25+
if (stats.totalItemsInMemory >= args.totalItemsInMemory) {
26+
const item = P.first(P.maxBy(Object.entries(groups), ([, items]) => items.length))
27+
if (item == null) {
28+
stats.totalItemsInMemory = 0
29+
} else {
30+
const [key, items] = item
31+
yield {
32+
key,
33+
items
34+
}
35+
groups[key] = []
36+
}
37+
}
38+
39+
groups[id].push(item)
40+
stats.totalItemsInMemory += 1
41+
if (groups[id].length >= args.maxGroupSize) {
42+
yield {
43+
key: id,
44+
items: groups[id]
45+
}
46+
stats.totalItemsInMemory -= groups[id].length
47+
groups[id] = []
48+
}
49+
continue
50+
}
51+
}
52+
53+
return AsyncIterable.from(iter())
54+
}
55+
56+
export function trailingGroupBy<T>(args: TrailingGroupByArgs<T>): (data: AnyIterable<T>) => AsyncIterable<GroupedItems<T>>
57+
export function trailingGroupBy<T>(data: AnyIterable<T>, args: TrailingGroupByArgs<T>): AsyncIterable<GroupedItems<T>>
58+
export function trailingGroupBy() {
59+
return P.purry(_trailingGroupBy, arguments)
60+
}
61+
62+
63+
export interface TrailingMapArgs<T, R> {
64+
mapFunc: (data: T) => Promise<R>
65+
maxConcurrency: number
66+
}
67+
68+
function _trailingMap<T, R>(data: AnyIterable<T>, args: TrailingMapArgs<T, R>) {
69+
async function* iter() {
70+
let done = false
71+
const iter = AsyncIterable.from(data)[Symbol.asyncIterator]()
72+
const requestQueue: Array<{ id: number, request: Promise<{ id: number, result: R }> }> = []
73+
74+
while (!done) {
75+
const id = Date.now()
76+
let value = iter.next().then(async (q) => {
77+
const req = await args.mapFunc(q.value)
78+
if (q.done) {
79+
done = true
80+
}
81+
return {
82+
id,
83+
result: req
84+
}
85+
})
86+
requestQueue.push({
87+
id, request: value
88+
})
89+
90+
if (requestQueue.length === args.maxConcurrency) {
91+
const result = await Promise.race(requestQueue.map((q) => q.request))
92+
requestQueue.splice(requestQueue.findIndex((q) => q.id === result.id), 1)
93+
}
94+
}
95+
96+
while (requestQueue.length !== 0) {
97+
const result = await Promise.race(requestQueue.map((q) => q.request))
98+
requestQueue.splice(requestQueue.findIndex((q) => q.id === result.id), 1)
99+
}
100+
}
101+
return AsyncIterable.from(iter())
102+
}
103+
104+
105+
export function trailingMap<T, R>(args: TrailingMapArgs<T, R>): (data: AnyIterable<T>) => AsyncIterable<R>
106+
export function trailingMap<T, R>(data: AnyIterable<T>, args: TrailingMapArgs<T, R>): AsyncIterable<R>
107+
export function trailingMap() {
108+
return P.purry(_trailingMap, arguments)
109+
}
110+
111+
function _onDone<T>(data: AnyIterable<T>, callback: () => void) {
112+
async function* iter() {
113+
for await (const item of data) {
114+
yield item
115+
}
116+
callback()
117+
}
118+
119+
return AsyncIterable.from(iter())
120+
}
121+
122+
123+
export function onDone<T>(callback: () => void): (data: AnyIterable<T>) => AsyncIterable<T>
124+
export function onDone<T>(data: AnyIterable<T>, callback: () => void): AsyncIterable<T>
125+
export function onDone() {
126+
return P.purry(_onDone, arguments)
127+
}
128+
129+
class ProgressTrack {
130+
items = 0
131+
isRunning = false
132+
startTime = 0
133+
134+
rollingDurations: number[] = []
135+
addItem() {
136+
if (this.startTime !== 0) {
137+
this.rollingDurations.push(Date.now() - this.startTime)
138+
if (this.rollingDurations.length >= 20) {
139+
this.rollingDurations.shift()
140+
}
141+
}
142+
this.items += 1
143+
this.isRunning = true
144+
this.startTime = Date.now()
145+
}
146+
147+
get average() {
148+
const mean = P.stats(this.rollingDurations, (q) => q).arithmetic_mean
149+
return mean
150+
}
151+
}
152+
153+
class Progress {
154+
constructor(private progress: ProgressTrack) { }
155+
toString() {
156+
const speed = this.progress.items > 1 ? ` Speed: ${(1 / (this.progress.average / 1000)).toFixed(2)} items/s,` : ""
157+
return `Items: ${this.progress.items.toLocaleString()},${speed} Memory: ${P.formatBytes(process.memoryUsage().heapUsed)}`
158+
}
159+
toJSON() {
160+
return {
161+
speed: 1 / (this.progress.average / 1000),
162+
items: this.progress.items
163+
}
164+
}
165+
}
166+
167+
interface OnProgressArgs {
168+
progress: (data: Progress) => void
169+
progressFrequency?: number
170+
}
171+
172+
173+
function _onProgress<T>(data: AnyIterable<T>, args: OnProgressArgs) {
174+
const progressInstance = new ProgressTrack()
175+
176+
const interval = setInterval(() => {
177+
args.progress(new Progress(progressInstance))
178+
}, args.progressFrequency || 2000)
179+
180+
async function* iter() {
181+
for await (const item of data) {
182+
progressInstance.addItem()
183+
yield item
184+
}
185+
}
186+
187+
args.progress(new Progress(progressInstance))
188+
189+
return AsyncIterable.from(iter()).finally(() => {
190+
clearInterval(interval)
191+
})
192+
}
193+
194+
export function onProgress<T>(args: OnProgressArgs): (data: AnyIterable<T>) => AsyncIterable<T>
195+
export function onProgress<T>(data: AnyIterable<T>, args: OnProgressArgs): AsyncIterable<T>
196+
export function onProgress() {
197+
return P.purry(_onProgress, arguments)
198+
}

src/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { download } from './download';
2+
import { xmlRead } from './xml';
13

24
export interface FileReference {
35
/**

src/xml.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ export function xmlRead<T>(options: XMLReadOptions): IX<T> {
250250
arrayMode: false, //"strict"
251251
}
252252
let count = 0
253+
const elMatch = new RegExp(`<${options.nodeName}( .*)?>`, 'gm')
253254
async function* iter() {
254255
for await (const buffer of bufferRead({
255256
...options,
@@ -261,7 +262,7 @@ export function xmlRead<T>(options: XMLReadOptions): IX<T> {
261262
}
262263
})) {
263264
const full = `${last}${buffer.toString()}`
264-
const beef = full.replace(new RegExp(`<${options.nodeName}`, 'gm'), `!@###@!<${options.nodeName}`).split(`!@###@!`)
265+
const beef = full.replace(elMatch, `!@###@!<${options.nodeName}>`).split(`!@###@!`)
265266
last = beef.pop() || ''
266267
for (const qwe of beef) {
267268
if (!qwe.includes(`<${options.nodeName}`)) {
@@ -280,7 +281,7 @@ export function xmlRead<T>(options: XMLReadOptions): IX<T> {
280281
}
281282

282283

283-
const last_chunk = last.replace(new RegExp(`<${options.nodeName}`, 'gm'), `!@###@!<${options.nodeName}`).split(`!@###@!`)
284+
const last_chunk = last.replace(elMatch, `!@###@!<${options.nodeName}`).split(`!@###@!`)
284285
const lastOfLast = last_chunk.pop() || ''
285286
for (const qwe of last_chunk) {
286287
if (!qwe.includes(`<${options.nodeName}`)) {

0 commit comments

Comments
 (0)