Skip to content

Commit 329d1c6

Browse files
committed
Fix tests and update README
1 parent 1046d8b commit 329d1c6

7 files changed

Lines changed: 141 additions & 16 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ tests/js_client/node_modules/
88
dist/
99
wrangler.toml
1010
scripts/setup-cloudflare.sh
11+
scripts/pipeline-schema.json
1112
.claude/

Cargo.lock

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ version = "0.1.0"
44
edition = "2021"
55

66
[lib]
7-
crate-type = ["cdylib"]
7+
crate-type = ["cdylib", "rlib"]
88

99
[dependencies]
1010
axum = { version = "0.7", default-features = false, features = ["json"] }
@@ -29,6 +29,7 @@ worker = { version = "0.7", features = ["http"] }
2929
tower-service = "0.3.3"
3030

3131
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
32+
axum = { version = "0.7", default-features = false, features = ["json", "tokio", "http1"] }
3233
dotenvy = "0.15"
3334
reqwest = { version = "0.12", features = ["json", "rustls-tls"] }
3435
tokio = { version = "1.37", features = ["macros", "rt-multi-thread", "process", "net", "time"] }

README.md

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Hogflare is a Cloudflare Workers service that accepts PostHog ingestion requests
66

77
## Why?
88

9-
PostHog is a nice-to-use web & product analytics platform. However, self-hosting PostHog is prohibitively complex so most users seem to rely on the cloud offering. This is an alternative for cost-conscious data folks & businesses interested in a low maintenance way to ingest web & product analytics directly into a managed data lake.
9+
PostHog is a nice-to-use web & product analytics platform. However, self-hosting PostHog is prohibitively complex so most users seem to rely on the cloud offering. This is an alternative for cost-conscious data folks & businesses interested in a low maintenance way to ingest web & product analytics directly into a managed data lake.
1010

1111
A [hobby deployment of PostHog](https://github.com/PostHog/posthog/blob/master/docker-compose.hobby.yml) includes: postgres, redis, redis7, clickhouse, zookeeper, kafka, worker, web, plugins, proxy, objectstorage, seaweedfs, asyncmigrationscheck, temporal, elasticsearch, temporal-admin-tools, temporal-ui, temporal-django-worker, cyclotron-janitor, capture, replay-capture, property-defs-rs, livestream, feature-flags, cymbal
1212

@@ -16,7 +16,7 @@ Admittedly, PostHog does a *lot* more than this package, but some folks really j
1616

1717
1) Create a Pipeline stream and sink in the Cloudflare dashboard or via `wrangler pipelines setup`.
1818
2) Use the schema below for the stream.
19-
3) Create a local `wrangler.toml` (gitignored) and set variables.
19+
3) Copy `wrangler.toml.example` to `wrangler.toml` and set variables.
2020
4) Set Wrangler secrets.
2121
5) Deploy the Worker.
2222

@@ -38,11 +38,17 @@ Admittedly, PostHog does a *lot* more than this package, but some folks really j
3838
}
3939
```
4040

41-
### Local `wrangler.toml` (gitignored)
41+
### Wrangler config
42+
43+
Copy the example (gitignored) and fill in your stream endpoint:
44+
45+
```bash
46+
cp wrangler.toml.example wrangler.toml
47+
```
4248

4349
```toml
4450
name = "hogflare"
45-
main = "build/index.js"
51+
main = "build/index.js" # generated entrypoint from worker-build for the Rust worker
4652
compatibility_date = "2025-01-09"
4753

4854
[vars]
@@ -78,6 +84,35 @@ curl -X POST https://<your-worker>.workers.dev/capture \
7884
]'
7985
```
8086

87+
## PostHog SDK config (posthog-js)
88+
89+
```js
90+
import posthog from "posthog-js";
91+
92+
posthog.init("<project_api_key>", {
93+
api_host: "https://<your-worker>.workers.dev",
94+
capture_pageview: true,
95+
});
96+
```
97+
98+
## Local development (fake pipeline)
99+
100+
The repo includes a lightweight fake pipeline (FastAPI + DuckDB) used by tests.
101+
102+
```bash
103+
docker compose up --build -d fake-pipeline
104+
```
105+
106+
```bash
107+
# .env.local (not committed)
108+
CLOUDFLARE_PIPELINE_ENDPOINT=http://127.0.0.1:8088/
109+
CLOUDFLARE_PIPELINE_TIMEOUT_SECS=5
110+
```
111+
112+
```bash
113+
cargo run
114+
```
115+
81116
## Query data (DuckDB)
82117

83118
```sql
@@ -86,14 +121,6 @@ INSTALL iceberg;
86121
LOAD httpfs;
87122
LOAD iceberg;
88123

89-
CREATE SECRET r2_secret (
90-
TYPE S3,
91-
KEY_ID '<R2_ACCESS_KEY_ID>',
92-
SECRET '<R2_SECRET_ACCESS_KEY>',
93-
ENDPOINT '<ACCOUNT_ID>.r2.cloudflarestorage.com',
94-
REGION 'auto'
95-
);
96-
97124
CREATE SECRET r2_catalog_secret (
98125
TYPE ICEBERG,
99126
TOKEN '<CLOUDFLARE_API_TOKEN>'
@@ -132,6 +159,5 @@ Hogflare adds Cloudflare request data into `properties` when those keys are not
132159

133160
## Limitations
134161

135-
- This repo does not include a local Pipelines emulator. You need a real Cloudflare Pipeline endpoint.
136162
- `/decide` returns placeholders, not evaluated flags.
137163
- `/s` stores raw session recording chunks only.

src/extractors.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -769,7 +769,7 @@ mod tests {
769769
use chrono::TimeZone;
770770
use flate2::write::{GzEncoder, ZlibEncoder};
771771
use flate2::Compression;
772-
use serde_json::json;
772+
use serde_json::{json, Value};
773773
use std::{io::Write, sync::Arc, time::Duration};
774774

775775
use crate::{models::CaptureRequest, pipeline::PipelineClient, AppState};
@@ -1039,4 +1039,42 @@ mod tests {
10391039
let expected = chrono::Utc.with_ymd_and_hms(2025, 2, 2, 0, 0, 0).unwrap();
10401040
assert_eq!(payload.batch.sent_at, Some(expected));
10411041
}
1042+
1043+
#[test]
1044+
fn enrichment_reads_cloudflare_headers() {
1045+
let request = Request::builder()
1046+
.uri("/capture")
1047+
.header("cf-connecting-ip", "203.0.113.10")
1048+
.header("cf-ray", "ray-123")
1049+
.body(Body::empty())
1050+
.unwrap();
1051+
let (parts, _) = request.into_parts();
1052+
1053+
let props = build_enrichment_properties(&parts);
1054+
1055+
assert_eq!(
1056+
props.get("$ip"),
1057+
Some(&Value::String("203.0.113.10".to_string()))
1058+
);
1059+
assert_eq!(
1060+
props.get("cf_ray"),
1061+
Some(&Value::String("ray-123".to_string()))
1062+
);
1063+
}
1064+
1065+
#[test]
1066+
fn enrichment_ignores_empty_header_values() {
1067+
let request = Request::builder()
1068+
.uri("/capture")
1069+
.header("cf-connecting-ip", " ")
1070+
.header("cf-ray", "")
1071+
.body(Body::empty())
1072+
.unwrap();
1073+
let (parts, _) = request.into_parts();
1074+
1075+
let props = build_enrichment_properties(&parts);
1076+
1077+
assert!(props.get("$ip").is_none());
1078+
assert!(props.get("cf_ray").is_none());
1079+
}
10421080
}

src/pipeline.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ pub enum PipelineError {
334334
mod tests {
335335
use super::*;
336336
use chrono::TimeZone;
337-
use serde_json::json;
337+
use serde_json::{json, Value};
338338

339339
#[test]
340340
fn converts_group_identify_payload() {
@@ -430,4 +430,46 @@ mod tests {
430430
assert_eq!(event.api_key.as_deref(), Some("phc_session"));
431431
assert_eq!(event.properties, Some(payload));
432432
}
433+
434+
#[test]
435+
fn merges_enrichment_without_overwrite() {
436+
let payload = CaptureRequest {
437+
api_key: Some("phc_capture".to_string()),
438+
event: "test-event".to_string(),
439+
distinct_id: "user-1".to_string(),
440+
properties: Some(json!({
441+
"$ip": "203.0.113.1",
442+
"existing": true
443+
})),
444+
timestamp: None,
445+
context: None,
446+
extra: std::collections::HashMap::new(),
447+
};
448+
449+
let mut enrichment = Map::new();
450+
enrichment.insert(
451+
"$ip".to_string(),
452+
Value::String("198.51.100.2".to_string()),
453+
);
454+
enrichment.insert(
455+
"cf_ray".to_string(),
456+
Value::String("ray-xyz".to_string()),
457+
);
458+
459+
let event = PipelineEvent::from_capture(payload).with_enrichment(&enrichment);
460+
let props = match event.properties {
461+
Some(Value::Object(props)) => props,
462+
other => panic!("expected properties object, got {other:?}"),
463+
};
464+
465+
assert_eq!(
466+
props.get("$ip"),
467+
Some(&Value::String("203.0.113.1".to_string()))
468+
);
469+
assert_eq!(
470+
props.get("cf_ray"),
471+
Some(&Value::String("ray-xyz".to_string()))
472+
);
473+
assert_eq!(props.get("existing"), Some(&Value::Bool(true)));
474+
}
433475
}

wrangler.toml.example

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
name = "hogflare"
2+
main = "build/index.js" # generated entrypoint from worker-build for the Rust worker
3+
compatibility_date = "2025-01-09"
4+
5+
[vars]
6+
CLOUDFLARE_PIPELINE_ENDPOINT = "https://<stream-id>.ingest.cloudflare.com"
7+
CLOUDFLARE_PIPELINE_TIMEOUT_SECS = "10"

0 commit comments

Comments
 (0)