-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbuilder.go
More file actions
227 lines (196 loc) · 7.84 KB
/
builder.go
File metadata and controls
227 lines (196 loc) · 7.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package itt
import (
"fmt"
"time"
"github.com/MatheusGrego/itt-engine/gpu"
)
// Builder configures and creates an Engine.
type Builder struct {
// Algorithms
divergence DivergenceFunc
curvature CurvatureFunc
topology TopologyFunc
threshold float64
thresholdFunc ThresholdFunc
// Weights
weightFunc WeightFunc
nodeTypeFunc NodeTypeFunc
aggregation AggregationFunc
// MVCC
gcSnapshotWarning time.Duration
gcSnapshotForce time.Duration
// Compaction
compactionStrategy CompactionStrategy
compactionThreshold int
compactionInterval time.Duration
// Callbacks
onChange func(Delta)
onAnomaly func(TensionResult)
onCompact func(CompactStats)
onGC func(GCStats)
onError func(error)
// Observability
logger Logger
// Storage
storage Storage
baseGraph *GraphData
// Calibration
calibrator Calibrator
curvatureAlpha float64
detectabilityAlpha float64
// Concealment
concealmentLambda float64
concealmentHops int
// Temporal
temporalCapacity int
diffusivityAlpha float64
onTensionSpike func(nodeID string, delta float64)
tensionSpikeThreshold float64
// Performance
parallelWorkers int // number of goroutines for parallel analysis (0 = auto-detect)
// GPU
gpuBackend gpu.ComputeBackend
gpuThreshold int // minimum node count to route to GPU (0 = disabled)
// Cache
cacheEnabled bool
cacheTTL time.Duration
// Internal
channelSize int
}
func (b *Builder) Divergence(d DivergenceFunc) *Builder { b.divergence = d; return b }
func (b *Builder) Curvature(c CurvatureFunc) *Builder {
b.curvature = c
if b.curvatureAlpha == 0 {
b.curvatureAlpha = 0.5
}
return b
}
func (b *Builder) CurvatureAlpha(alpha float64) *Builder { b.curvatureAlpha = alpha; return b }
func (b *Builder) DetectabilityAlpha(alpha float64) *Builder { b.detectabilityAlpha = alpha; return b }
func (b *Builder) Topology(t TopologyFunc) *Builder { b.topology = t; return b }
func (b *Builder) Threshold(t float64) *Builder { b.threshold = t; return b }
func (b *Builder) ThresholdFunc(f ThresholdFunc) *Builder { b.thresholdFunc = f; return b }
func (b *Builder) WeightFunc(f WeightFunc) *Builder { b.weightFunc = f; return b }
func (b *Builder) NodeTypeFunc(f NodeTypeFunc) *Builder { b.nodeTypeFunc = f; return b }
func (b *Builder) AggregationFunc(f AggregationFunc) *Builder { b.aggregation = f; return b }
func (b *Builder) GCSnapshotWarning(d time.Duration) *Builder { b.gcSnapshotWarning = d; return b }
func (b *Builder) GCSnapshotForce(d time.Duration) *Builder { b.gcSnapshotForce = d; return b }
func (b *Builder) CompactionStrategy(s CompactionStrategy) *Builder {
b.compactionStrategy = s
return b
}
func (b *Builder) CompactionThreshold(n int) *Builder { b.compactionThreshold = n; return b }
func (b *Builder) CompactionInterval(d time.Duration) *Builder { b.compactionInterval = d; return b }
func (b *Builder) OnChange(f func(Delta)) *Builder { b.onChange = f; return b }
func (b *Builder) OnAnomaly(f func(TensionResult)) *Builder { b.onAnomaly = f; return b }
func (b *Builder) OnCompact(f func(CompactStats)) *Builder { b.onCompact = f; return b }
func (b *Builder) OnGC(f func(GCStats)) *Builder { b.onGC = f; return b }
func (b *Builder) OnError(f func(error)) *Builder { b.onError = f; return b }
func (b *Builder) SetLogger(l Logger) *Builder { b.logger = l; return b }
func (b *Builder) SetStorage(s Storage) *Builder { b.storage = s; return b }
func (b *Builder) BaseGraph(g *GraphData) *Builder { b.baseGraph = g; return b }
func (b *Builder) SetCalibrator(c Calibrator) *Builder { b.calibrator = c; return b }
// WithLogger sets the structured logger.
func (b *Builder) WithLogger(l Logger) *Builder { b.logger = l; return b }
// WithStorage sets the persistence backend.
func (b *Builder) WithStorage(s Storage) *Builder { b.storage = s; return b }
// WithCalibrator sets the anomaly calibrator.
func (b *Builder) WithCalibrator(c Calibrator) *Builder { b.calibrator = c; return b }
// Concealment configures concealment cost analysis.
// lambda is the exponential decay parameter, maxHops is the BFS neighborhood depth.
// Set lambda to 0 to disable (default).
func (b *Builder) Concealment(lambda float64, maxHops int) *Builder {
b.concealmentLambda = lambda
b.concealmentHops = maxHops
return b
}
// TemporalCapacity sets the ring buffer size for per-node tension history.
func (b *Builder) TemporalCapacity(n int) *Builder {
b.temporalCapacity = n
return b
}
// DiffusivityAlpha sets the diffusivity constant for temporal calculations.
func (b *Builder) DiffusivityAlpha(alpha float64) *Builder {
b.diffusivityAlpha = alpha
return b
}
// OnTensionSpike registers a callback fired when a node's tension delta exceeds the spike threshold.
func (b *Builder) OnTensionSpike(f func(string, float64)) *Builder {
b.onTensionSpike = f
return b
}
// TensionSpikeThreshold sets the minimum tension delta to trigger a spike callback.
func (b *Builder) TensionSpikeThreshold(t float64) *Builder {
b.tensionSpikeThreshold = t
return b
}
func (b *Builder) ChannelSize(n int) *Builder { b.channelSize = n; return b }
// WithParallelWorkers sets the number of goroutines for parallel analysis.
// If workers <= 0, uses runtime.NumCPU() (auto-detect).
// For graphs < 100 nodes, parallel analysis is automatically disabled regardless of this setting.
func (b *Builder) WithParallelWorkers(workers int) *Builder {
b.parallelWorkers = workers
return b
}
// WithGPU enables GPU-accelerated analysis for graphs with at least threshold nodes.
// Attempts to initialize the default GoSL backend. If GPU initialization fails,
// the engine falls back to CPU silently — no error is returned.
// A threshold of 0 disables GPU routing (same as not calling WithGPU).
func (b *Builder) WithGPU(threshold int) *Builder {
if threshold <= 0 {
return b
}
backend, err := gpu.NewGoSLBackend()
if err != nil {
// GPU unavailable — degrade gracefully to CPU-only
return b
}
b.gpuBackend = backend
b.gpuThreshold = threshold
return b
}
// WithGPUBackend sets a specific GPU backend (useful for testing or alternative implementations).
// The threshold controls the minimum node count to route analysis to GPU.
func (b *Builder) WithGPUBackend(backend gpu.ComputeBackend, threshold int) *Builder {
b.gpuBackend = backend
b.gpuThreshold = threshold
return b
}
// WithCache enables result caching with the given TTL.
// Cache dramatically improves read performance (100-1000x) for workloads with high cache hit rates.
// TTL acts as a safety fallback; cache entries are invalidated when their MVCC version is GC'd.
// Default TTL: 60 seconds.
func (b *Builder) WithCache(ttl time.Duration) *Builder {
b.cacheEnabled = true
if ttl > 0 {
b.cacheTTL = ttl
} else {
b.cacheTTL = 60 * time.Second
}
return b
}
// Build validates configuration and returns a new Engine.
func (b *Builder) Build() (*Engine, error) {
if b.threshold < 0 {
return nil, fmt.Errorf("%w: threshold must be >= 0", ErrInvalidConfig)
}
if b.gcSnapshotForce > 0 && b.gcSnapshotWarning > 0 && b.gcSnapshotForce < b.gcSnapshotWarning {
return nil, fmt.Errorf("%w: gcSnapshotForce must be >= gcSnapshotWarning", ErrInvalidConfig)
}
if b.channelSize <= 0 {
return nil, fmt.Errorf("%w: channelSize must be > 0", ErrInvalidConfig)
}
if b.detectabilityAlpha <= 0 || b.detectabilityAlpha >= 1 {
return nil, fmt.Errorf("%w: detectabilityAlpha must be in (0, 1)", ErrInvalidConfig)
}
if b.concealmentLambda < 0 {
return nil, fmt.Errorf("%w: concealmentLambda must be >= 0", ErrInvalidConfig)
}
if b.concealmentHops < 0 {
return nil, fmt.Errorf("%w: concealmentHops must be >= 0", ErrInvalidConfig)
}
if b.gpuThreshold < 0 {
return nil, fmt.Errorf("%w: gpuThreshold must be >= 0", ErrInvalidConfig)
}
return newEngine(b), nil
}