Skip to content

Commit 548cfc5

Browse files
authored
Merge pull request #63 from fdarian/ff-71-inngest-adapter-fix
feat(ff-effect): inngest adapter fix
2 parents ce3a742 + c20b239 commit 548cfc5

7 files changed

Lines changed: 166 additions & 62 deletions

File tree

.changeset/modern-clubs-sell.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"ff-effect": patch
3+
---
4+
5+
Allow requirement on step.run and use InngestFunction.Any

bun.lock

Lines changed: 42 additions & 44 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/effect/src/for/inngest/index.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ describe('wrapStep', () => {
4444

4545
const wrapped = wrapStep(mockStep);
4646
const result = await Effect.runPromise(
47-
wrapped.run('test', () => Effect.succeed(42)),
47+
Effect.scoped(wrapped.run('test', () => Effect.succeed(42))),
4848
);
4949
expect(result).toBe(42);
5050
expect(mockStep.run).toHaveBeenCalledWith('test', expect.any(Function));
@@ -55,7 +55,7 @@ describe('wrapStep', () => {
5555

5656
const wrapped = wrapStep(mockStep);
5757
const exit = await Effect.runPromiseExit(
58-
wrapped.run('fail', () => Effect.succeed(1)),
58+
Effect.scoped(wrapped.run('fail', () => Effect.succeed(1))),
5959
);
6060
expect(exit._tag).toBe('Failure');
6161
});

packages/effect/src/for/inngest/index.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@ import { HttpApp } from '@effect/platform';
22
import { type Cron, Data, Effect, FiberSet, Layer } from 'effect';
33
import * as Context from 'effect/Context';
44
import * as Inspectable from 'effect/Inspectable';
5-
import type { GetEvents, GetFunctionInput, Inngest } from 'inngest';
5+
import type {
6+
GetEvents,
7+
GetFunctionInput,
8+
Inngest,
9+
InngestFunction,
10+
} from 'inngest';
611
import { serve } from 'inngest/bun';
712
import { extract } from '../../extract';
813
import { cronToString } from './cron';
@@ -16,11 +21,6 @@ export class InngestError extends Data.TaggedError('ff-effect/InngestError')<{
1621
cause?: unknown;
1722
}> {}
1823

19-
declare const InngestFunctionBrand: unique symbol;
20-
21-
/** Opaque wrapper around inngest's InngestFunction to avoid leaking internal types */
22-
export type InngestFunction = { readonly [InngestFunctionBrand]: true };
23-
2424
// biome-ignore lint/suspicious/noExplicitAny: matches Inngest.Any
2525
type AnyInngest = Inngest<any>;
2626

@@ -134,11 +134,11 @@ export function createInngest<
134134
>),
135135
);
136136
},
137-
) as unknown as InngestFunction;
137+
) as unknown as InngestFunction.Any;
138138
});
139139

140140
type ServeOpts = {
141-
functions: InngestFunction[];
141+
functions: InngestFunction.Any[];
142142
servePath?: string;
143143
signingKey?: string;
144144
signingKeyFallback?: string;

packages/effect/src/for/inngest/step.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
import { Duration, Effect } from 'effect';
2-
import { runPromiseUnwrapped } from '../../run-promise-unwrapped';
1+
import { Duration, Effect, FiberSet } from 'effect';
32
import { InngestError } from './index';
43

54
type OriginalStep = {
@@ -17,12 +16,15 @@ export function wrapStep<TStep>(step: TStep) {
1716
const s = step as unknown as OriginalStep;
1817

1918
return {
20-
run: <A, E>(id: string, fn: () => Effect.Effect<A, E, never>) =>
21-
Effect.tryPromise({
22-
try: () => s.run(id, () => runPromiseUnwrapped(fn())),
23-
catch: (cause) =>
24-
new InngestError({ message: `Step "${id}" failed`, cause }),
25-
}) as Effect.Effect<A, InngestError>,
19+
run: <A, E, R>(id: string, fn: () => Effect.Effect<A, E, R>) =>
20+
Effect.gen(function* () {
21+
const runPromise = yield* FiberSet.makeRuntimePromise<R>();
22+
return yield* Effect.tryPromise({
23+
try: () => s.run(id, () => runPromise(fn())),
24+
catch: (cause) =>
25+
new InngestError({ message: `Step "${id}" failed`, cause }),
26+
}) as Effect.Effect<A, InngestError, R>;
27+
}),
2628

2729
sleep: (id: string, duration: Duration.DurationInput) =>
2830
Effect.tryPromise({

packages/scratchpad/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
"@ai-sdk/xai": "^3.0.29",
1212
"@effect/opentelemetry": "^0.60.0",
1313
"@effect/platform": "^0.93.6",
14+
"@effect/platform-bun": "^0.87.1",
1415
"@opentelemetry/exporter-trace-otlp-http": "^0.208.0",
1516
"@opentelemetry/sdk-trace-base": "^2.2.0",
1617
"@orpc/client": "^1.13.2",
@@ -20,6 +21,7 @@
2021
"ff-ai": "workspace:*",
2122
"ff-effect": "workspace:*",
2223
"ff-serv": "workspace:*",
24+
"inngest": "^3.52.4",
2325
"postgres": "^3.4.7"
2426
},
2527
"devDependencies": {
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import { FetchHttpClient, HttpClient } from '@effect/platform';
2+
import * as BunContext from '@effect/platform-bun/BunContext';
3+
import * as BunRuntime from '@effect/platform-bun/BunRuntime';
4+
import { Effect, Layer } from 'effect';
5+
import * as S from 'effect/Schema';
6+
import { createInngest } from 'ff-effect/for/inngest';
7+
import { basicHandler, createFetchHandler, Logger } from 'ff-serv';
8+
import * as InngestSdk from 'inngest';
9+
10+
/**
11+
* # How to use
12+
* 1. Start inngest cli `npx --ignore-scripts=false inngest-cli@latest dev`
13+
* 2. Run this
14+
**/
15+
16+
class Nothing extends Effect.Service<Nothing>()('nothing', {
17+
effect: Effect.gen(function* () {
18+
return {
19+
say: (message: string) => Logger.info(message),
20+
};
21+
}),
22+
}) {}
23+
24+
const Inngest = createInngest(
25+
Effect.succeed(
26+
new InngestSdk.Inngest({
27+
id: 'dev',
28+
schemas: new InngestSdk.EventSchemas().fromSchema({
29+
'say-hello': S.standardSchemaV1(
30+
S.Struct({
31+
message: S.String,
32+
}),
33+
),
34+
}),
35+
}),
36+
),
37+
);
38+
39+
const program = Effect.gen(function* () {
40+
const helloWorld = yield* Inngest.createFunction(
41+
{ id: 'asdf' },
42+
{ event: 'say-hello' },
43+
({ event, step, runId }) =>
44+
Effect.gen(function* () {
45+
yield* Logger.info('Workflow starting');
46+
47+
yield* step.run('one', () =>
48+
Effect.gen(function* () {
49+
const svc = yield* Nothing;
50+
yield* svc.say(`Hello ${event.data.message}`);
51+
}),
52+
);
53+
}).pipe(Effect.annotateLogs({ runId })),
54+
);
55+
56+
const server = Bun.serve({
57+
fetch: yield* createFetchHandler(
58+
[
59+
basicHandler(
60+
'/api/inngest',
61+
yield* Inngest.fetchHandler({ functions: [helloWorld] }),
62+
),
63+
basicHandler('/invoke', () =>
64+
Effect.gen(function* () {
65+
yield* Inngest.send({
66+
name: 'say-hello',
67+
data: { message: 'world' },
68+
});
69+
return new Response('ok');
70+
}),
71+
),
72+
],
73+
{ debug: false },
74+
),
75+
});
76+
yield* Effect.addFinalizer(() => Effect.promise(() => server.stop()));
77+
yield* Logger.info(`Server started in port ${server.port}`);
78+
79+
yield* HttpClient.put(`http://localhost:${server.port}/api/inngest`);
80+
yield* Logger.info('Called inngest put');
81+
82+
yield* Effect.never;
83+
});
84+
85+
BunRuntime.runMain(
86+
program.pipe(
87+
Effect.provide(
88+
Layer.mergeAll(
89+
BunContext.layer,
90+
Nothing.Default,
91+
Inngest.layer,
92+
Layer.scope,
93+
FetchHttpClient.layer,
94+
),
95+
),
96+
),
97+
);

0 commit comments

Comments
 (0)