Skip to content

Commit cde0775

Browse files
committed
Fix alias semantics and durable object test harness
1 parent 39fba93 commit cde0775

2 files changed

Lines changed: 60 additions & 9 deletions

File tree

src/persons.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,8 +381,20 @@ pub fn update_from_capture(request: &CaptureRequest) -> Option<PersonUpdate> {
381381
pub fn update_from_identify(request: &IdentifyRequest) -> Option<PersonUpdate> {
382382
let properties = request.properties.as_ref()?;
383383
let props = properties.as_object()?;
384-
let set = extract_object(props.get("$set"));
385-
let set_once = extract_object(props.get("$set_once"));
384+
let (set, mut set_once) = if props.contains_key("$set") || props.contains_key("$set_once")
385+
{
386+
(
387+
extract_object(props.get("$set")),
388+
extract_object(props.get("$set_once")),
389+
)
390+
} else {
391+
(props.clone(), Map::new())
392+
};
393+
394+
let extra_set_once = extract_object(request.extra.get("$set_once"));
395+
if !extra_set_once.is_empty() {
396+
set_once.extend(extra_set_once);
397+
}
386398

387399
let update = PersonUpdate {
388400
distinct_id: request.distinct_id.clone(),

tests/persons_do.rs

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use serde_json::{json, Value};
88
use std::fs;
99
use std::net::TcpListener;
1010
use std::path::PathBuf;
11+
use std::process::Stdio;
1112
use std::time::Duration;
1213
use tempfile::TempDir;
1314
use tokio::process::{Child, Command};
@@ -31,6 +32,7 @@ async fn durable_object_person_updates_apply() -> Result<(), Box<dyn std::error:
3132
&pipeline_endpoint.to_string(),
3233
debug_token,
3334
)?;
35+
patch_worker_bundle()?;
3436

3537
let mut wrangler = spawn_wrangler_dev(&config_path, port)?;
3638
wait_for_health(port).await?;
@@ -43,8 +45,10 @@ async fn durable_object_person_updates_apply() -> Result<(), Box<dyn std::error:
4345
.post(format!("{base_url}/identify"))
4446
.json(&json!({
4547
"distinct_id": "person-1",
46-
"properties": { "email": "person1@example.com" },
47-
"$set_once": { "created_at": "2024-01-01" }
48+
"properties": {
49+
"$set": { "email": "person1@example.com" },
50+
"$set_once": { "created_at": "2024-01-01" }
51+
}
4852
}))
4953
.send()
5054
.await?
@@ -78,8 +82,8 @@ async fn durable_object_person_updates_apply() -> Result<(), Box<dyn std::error:
7882
client
7983
.post(format!("{base_url}/alias"))
8084
.json(&json!({
81-
"distinct_id": "anon-1",
82-
"alias": "person-1"
85+
"distinct_id": "person-1",
86+
"alias": "anon-1"
8387
}))
8488
.send()
8589
.await?
@@ -105,7 +109,7 @@ fn write_wrangler_config(
105109
pipeline_endpoint: &str,
106110
debug_token: &str,
107111
) -> Result<PathBuf, Box<dyn std::error::Error>> {
108-
let main_path = std::env::current_dir()?.join("build/index.js");
112+
let main_path = std::env::current_dir()?.join("build/worker.mjs");
109113
let config = format!(
110114
r#"
111115
name = "hogflare-test"
@@ -117,13 +121,20 @@ CLOUDFLARE_PIPELINE_ENDPOINT = "{pipeline}"
117121
CLOUDFLARE_PIPELINE_TIMEOUT_SECS = "5"
118122
PERSON_DEBUG_TOKEN = "{debug_token}"
119123
124+
[build.upload]
125+
format = "modules"
126+
120127
[[durable_objects.bindings]]
121128
name = "PERSONS"
122129
class_name = "PersonDurableObject"
123130
131+
[[durable_objects.bindings]]
132+
name = "PERSON_ID_COUNTER"
133+
class_name = "PersonIdCounterDurableObject"
134+
124135
[[migrations]]
125136
tag = "v1"
126-
new_classes = ["PersonDurableObject"]
137+
new_classes = ["PersonDurableObject", "PersonIdCounterDurableObject"]
127138
"#,
128139
main = main_path.display(),
129140
pipeline = pipeline_endpoint,
@@ -135,12 +146,38 @@ new_classes = ["PersonDurableObject"]
135146
Ok(path)
136147
}
137148

149+
fn patch_worker_bundle() -> Result<(), Box<dyn std::error::Error>> {
150+
let build_dir = std::env::current_dir()?.join("build");
151+
let bundle_path = build_dir.join("index.js");
152+
if !bundle_path.exists() {
153+
return Err("missing build/index.js; run worker-build before tests".into());
154+
}
155+
let contents = fs::read_to_string(&bundle_path)?;
156+
let patched = if contents.starts_with("import source wasmModule") {
157+
contents.replacen(
158+
"import source wasmModule from",
159+
"import wasmModule from",
160+
1,
161+
)
162+
} else {
163+
contents
164+
};
165+
166+
fs::write(&bundle_path, &patched)?;
167+
fs::write(build_dir.join("index.mjs"), &patched)?;
168+
169+
let worker_shim = r#"export { default } from "./index.mjs";
170+
export * from "./index.mjs";
171+
"#;
172+
fs::write(build_dir.join("worker.mjs"), worker_shim)?;
173+
Ok(())
174+
}
175+
138176
fn spawn_wrangler_dev(config_path: &PathBuf, port: u16) -> Result<Child, Box<dyn std::error::Error>> {
139177
let child = Command::new("bunx")
140178
.arg("wrangler")
141179
.arg("dev")
142180
.arg("--local")
143-
.arg("--no-bundle")
144181
.arg("--config")
145182
.arg(config_path)
146183
.arg("--ip")
@@ -150,6 +187,8 @@ fn spawn_wrangler_dev(config_path: &PathBuf, port: u16) -> Result<Child, Box<dyn
150187
.arg("--log-level")
151188
.arg("error")
152189
.env("WRANGLER_SEND_METRICS", "false")
190+
.stdout(Stdio::null())
191+
.stderr(Stdio::null())
153192
.spawn()?;
154193

155194
Ok(child)

0 commit comments

Comments
 (0)