Skip to content

Commit 40b43aa

Browse files
committed
first commit
0 parents  commit 40b43aa

11 files changed

Lines changed: 937 additions & 0 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/sqlparser.exe

cmd/sqlparser/main.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"os"
7+
"runtime"
8+
9+
"sqlparser/pkg/models"
10+
"sqlparser/pkg/parser"
11+
"sqlparser/pkg/writer"
12+
)
13+
14+
func main() {
15+
format := flag.String("format", "txt", "Output format (txt, csv, json)")
16+
output := flag.String("output", "", "Output file (if not specified, prints to stdout)")
17+
workers := flag.Int("workers", runtime.NumCPU(), "Number of worker threads (default: number of CPU cores)")
18+
flag.Parse()
19+
20+
args := flag.Args()
21+
if len(args) < 1 {
22+
fmt.Printf("Usage: sqlparser [-format=txt|csv|json] [-output=filename] [-workers=N] <sqlfile>\n")
23+
fmt.Printf(" -format: Output format (default: txt)\n")
24+
fmt.Printf(" -output: Output file (default: stdout)\n")
25+
fmt.Printf(" -workers: Number of worker threads (default: %d)\n", runtime.NumCPU())
26+
os.Exit(1)
27+
}
28+
29+
filename := args[0]
30+
outputFormat := models.OutputFormat(*format)
31+
32+
// Create output file or use stdout
33+
var out *os.File
34+
var err error
35+
if *output != "" {
36+
out, err = os.Create(*output)
37+
if err != nil {
38+
fmt.Printf("Error creating output file: %v\n", err)
39+
os.Exit(1)
40+
}
41+
defer out.Close()
42+
} else {
43+
out = os.Stdout
44+
}
45+
46+
// Initialize the output writer based on format
47+
w, err := writer.CreateWriter(outputFormat, out)
48+
if err != nil {
49+
fmt.Printf("Error creating writer: %v\n", err)
50+
os.Exit(1)
51+
}
52+
defer w.Close()
53+
54+
// Process the file
55+
fmt.Printf("Processing with %d workers...\n", *workers)
56+
err = parser.ProcessSQLFileInBatches(filename, w, *workers)
57+
if err != nil {
58+
fmt.Printf("Error processing SQL file: %v\n", err)
59+
os.Exit(1)
60+
}
61+
}

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
module sqlparser
2+
3+
go 1.21

pkg/models/types.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package models
2+
3+
const (
4+
BatchSize = 100000
5+
)
6+
7+
type OutputFormat string
8+
9+
const (
10+
FormatText OutputFormat = "txt"
11+
FormatCSV OutputFormat = "csv"
12+
FormatJSON OutputFormat = "json"
13+
)
14+
15+
type Row struct {
16+
RowNumber int `json:"row_number"`
17+
Data map[string]interface{} `json:"data"`
18+
}
19+
20+
type TableData struct {
21+
TableName string `json:"table_name"`
22+
RowCount int `json:"row_count"`
23+
Rows []Row `json:"rows"`
24+
}

pkg/parser/parser.go

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
package parser
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"os"
7+
"strings"
8+
"sync"
9+
"time"
10+
11+
"sqlparser/pkg/models"
12+
"sqlparser/pkg/writer"
13+
)
14+
15+
func ProcessSQLFileInBatches(filename string, writer writer.Writer, numWorkers int) error {
16+
startTime := time.Now()
17+
18+
file, err := os.Open(filename)
19+
if err != nil {
20+
return fmt.Errorf("error opening file: %v", err)
21+
}
22+
defer file.Close()
23+
24+
// Use a larger buffer for scanning
25+
buf := make([]byte, 64*1024)
26+
scanner := bufio.NewScanner(file)
27+
scanner.Buffer(buf, 10*1024*1024) // 10MB max line length
28+
29+
statementChan := make(chan string, numWorkers*2)
30+
resultChan := make(chan *struct {
31+
tableName string
32+
rows []models.Row
33+
err error
34+
}, numWorkers*2)
35+
36+
fmt.Printf("Starting to process file: %s at %s\n", filename, startTime.Format(time.RFC3339))
37+
38+
// Start worker pool
39+
var wg sync.WaitGroup
40+
for i := 0; i < numWorkers; i++ {
41+
wg.Add(1)
42+
go func(workerID int) {
43+
defer wg.Done()
44+
for statement := range statementChan {
45+
tableName, rows, err := processStatement(statement, numWorkers)
46+
resultChan <- &struct {
47+
tableName string
48+
rows []models.Row
49+
err error
50+
}{tableName, rows, err}
51+
}
52+
}(i)
53+
}
54+
55+
// Start result processor
56+
var processWg sync.WaitGroup
57+
processWg.Add(1)
58+
var currentTableName string
59+
var rowCount int
60+
var currentBatch []models.Row
61+
var batchCount int
62+
var totalStatements int
63+
var tableStartTime time.Time
64+
65+
go func() {
66+
defer processWg.Done()
67+
for result := range resultChan {
68+
if result.err != nil {
69+
fmt.Printf("Error processing statement: %v\n", result.err)
70+
continue
71+
}
72+
73+
if result.rows != nil {
74+
// Handle new table
75+
if result.tableName != currentTableName {
76+
if currentTableName != "" {
77+
if len(currentBatch) > 0 {
78+
if err := writer.WriteRows(currentBatch); err != nil {
79+
fmt.Printf("Error writing rows: %v\n", err)
80+
continue
81+
}
82+
batchCount++
83+
fmt.Printf("Processed batch %d for table %s (%d rows)\n", batchCount, currentTableName, len(currentBatch))
84+
currentBatch = nil // Help GC
85+
}
86+
if err := writer.WriteTableEnd(); err != nil {
87+
fmt.Printf("Error ending table: %v\n", err)
88+
continue
89+
}
90+
tableDuration := time.Since(tableStartTime)
91+
fmt.Printf("Finished processing table %s (total %d rows in %d batches) in %v\n",
92+
currentTableName, rowCount, batchCount, tableDuration)
93+
}
94+
currentTableName = result.tableName
95+
if err := writer.WriteTableStart(currentTableName); err != nil {
96+
fmt.Printf("Error starting table: %v\n", err)
97+
continue
98+
}
99+
currentBatch = make([]models.Row, 0, models.BatchSize)
100+
rowCount = 0
101+
batchCount = 0
102+
tableStartTime = time.Now()
103+
fmt.Printf("Started processing new table: %s at %s\n", currentTableName, tableStartTime.Format(time.RFC3339))
104+
}
105+
106+
// Process rows immediately
107+
for _, row := range result.rows {
108+
rowCount++
109+
row.RowNumber = rowCount
110+
currentBatch = append(currentBatch, row)
111+
112+
// Write batch if it reaches the batch size
113+
if len(currentBatch) >= models.BatchSize {
114+
batchStartTime := time.Now()
115+
if err := writer.WriteRows(currentBatch); err != nil {
116+
fmt.Printf("Error writing rows: %v\n", err)
117+
continue
118+
}
119+
batchCount++
120+
batchDuration := time.Since(batchStartTime)
121+
fmt.Printf("Processed batch %d for table %s (%d rows) in %v\n",
122+
batchCount, currentTableName, len(currentBatch), batchDuration)
123+
currentBatch = make([]models.Row, 0, models.BatchSize)
124+
}
125+
}
126+
// Clear the rows to help GC
127+
result.rows = nil
128+
}
129+
totalStatements++
130+
}
131+
}()
132+
133+
// Read and send statements to workers
134+
var currentStatement strings.Builder
135+
for scanner.Scan() {
136+
line := scanner.Text()
137+
138+
if strings.TrimSpace(line) == "" || strings.HasPrefix(strings.TrimSpace(line), "--") {
139+
continue
140+
}
141+
142+
currentStatement.WriteString(line)
143+
currentStatement.WriteString(" ")
144+
145+
if strings.HasSuffix(strings.TrimSpace(line), ";") {
146+
statementChan <- currentStatement.String()
147+
currentStatement.Reset()
148+
}
149+
}
150+
151+
// Close channels and wait for completion
152+
close(statementChan)
153+
wg.Wait()
154+
close(resultChan)
155+
processWg.Wait()
156+
157+
// Write any remaining rows in the last batch
158+
if len(currentBatch) > 0 {
159+
if err := writer.WriteRows(currentBatch); err != nil {
160+
return err
161+
}
162+
batchCount++
163+
fmt.Printf("Processed final batch %d for table %s (%d rows)\n", batchCount, currentTableName, len(currentBatch))
164+
currentBatch = nil // Help GC
165+
}
166+
167+
// Close the last table if any
168+
if currentTableName != "" {
169+
if err := writer.WriteTableEnd(); err != nil {
170+
return err
171+
}
172+
tableDuration := time.Since(tableStartTime)
173+
fmt.Printf("Finished processing table %s (total %d rows in %d batches) in %v\n",
174+
currentTableName, rowCount, batchCount, tableDuration)
175+
}
176+
177+
totalDuration := time.Since(startTime)
178+
fmt.Printf("\nProcessing Summary:\n")
179+
fmt.Printf("File: %s\n", filename)
180+
fmt.Printf("Total Statements: %d\n", totalStatements)
181+
fmt.Printf("Total Duration: %v\n", totalDuration)
182+
fmt.Printf("Average Time per Statement: %v\n", totalDuration/time.Duration(totalStatements))
183+
fmt.Printf("Workers Used: %d\n", numWorkers)
184+
185+
return scanner.Err()
186+
}
187+
188+
func processStatement(statement string, numWorkers int) (string, []models.Row, error) {
189+
statement = strings.TrimSpace(strings.TrimSuffix(statement, ";"))
190+
191+
if strings.HasPrefix(strings.ToUpper(statement), "INSERT INTO") {
192+
return parseInsert(statement, numWorkers)
193+
}
194+
return "", nil, nil
195+
}
196+
197+
func parseInsert(statement string, numWorkers int) (string, []models.Row, error) {
198+
parts := strings.SplitN(statement, "VALUES", 2)
199+
if len(parts) != 2 {
200+
return "", nil, fmt.Errorf("invalid INSERT statement format")
201+
}
202+
203+
insertPart := strings.TrimSpace(parts[0])
204+
if !strings.HasPrefix(strings.ToUpper(insertPart), "INSERT INTO") {
205+
return "", nil, fmt.Errorf("invalid INSERT statement format")
206+
}
207+
208+
tableParts := strings.SplitN(insertPart[11:], "(", 2)
209+
if len(tableParts) != 2 {
210+
return "", nil, fmt.Errorf("invalid INSERT statement format")
211+
}
212+
213+
tableName := strings.Trim(strings.TrimSpace(tableParts[0]), "`")
214+
columnsPart := strings.TrimRight(tableParts[1], ")")
215+
columns := parseColumnList(columnsPart)
216+
217+
valuesPart := strings.TrimSpace(parts[1])
218+
values := parseValuesList(valuesPart)
219+
220+
// Process rows in parallel if we have enough rows
221+
if len(values) > 1000 {
222+
return parseRowsParallel(tableName, columns, values, numWorkers)
223+
}
224+
225+
return parseRowsSequential(tableName, columns, values)
226+
}
227+
228+
func parseColumnList(columnsPart string) []string {
229+
columns := strings.Split(columnsPart, ",")
230+
result := make([]string, 0, len(columns))
231+
232+
for _, col := range columns {
233+
col = strings.Trim(strings.TrimSpace(col), "`")
234+
if col != "" {
235+
result = append(result, col)
236+
}
237+
}
238+
239+
return result
240+
}

0 commit comments

Comments
 (0)