Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/modern-clubs-sell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"ff-effect": patch
---

Allow requirement on step.run and use InngestFunction.Any
86 changes: 42 additions & 44 deletions bun.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions packages/effect/src/for/inngest/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ describe('wrapStep', () => {

const wrapped = wrapStep(mockStep);
const result = await Effect.runPromise(
wrapped.run('test', () => Effect.succeed(42)),
Effect.scoped(wrapped.run('test', () => Effect.succeed(42))),
);
expect(result).toBe(42);
expect(mockStep.run).toHaveBeenCalledWith('test', expect.any(Function));
Expand All @@ -55,7 +55,7 @@ describe('wrapStep', () => {

const wrapped = wrapStep(mockStep);
const exit = await Effect.runPromiseExit(
wrapped.run('fail', () => Effect.succeed(1)),
Effect.scoped(wrapped.run('fail', () => Effect.succeed(1))),
);
expect(exit._tag).toBe('Failure');
});
Expand Down
16 changes: 8 additions & 8 deletions packages/effect/src/for/inngest/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ import { HttpApp } from '@effect/platform';
import { type Cron, Data, Effect, FiberSet, Layer } from 'effect';
import * as Context from 'effect/Context';
import * as Inspectable from 'effect/Inspectable';
import type { GetEvents, GetFunctionInput, Inngest } from 'inngest';
import type {
GetEvents,
GetFunctionInput,
Inngest,
InngestFunction,
} from 'inngest';
import { serve } from 'inngest/bun';
import { extract } from '../../extract';
import { cronToString } from './cron';
Expand All @@ -16,11 +21,6 @@ export class InngestError extends Data.TaggedError('ff-effect/InngestError')<{
cause?: unknown;
}> {}

declare const InngestFunctionBrand: unique symbol;

/** Opaque wrapper around inngest's InngestFunction to avoid leaking internal types */
export type InngestFunction = { readonly [InngestFunctionBrand]: true };

// biome-ignore lint/suspicious/noExplicitAny: matches Inngest.Any
type AnyInngest = Inngest<any>;

Expand Down Expand Up @@ -134,11 +134,11 @@ export function createInngest<
>),
);
},
) as unknown as InngestFunction;
) as unknown as InngestFunction.Any;
});

type ServeOpts = {
functions: InngestFunction[];
functions: InngestFunction.Any[];
servePath?: string;
signingKey?: string;
signingKeyFallback?: string;
Expand Down
18 changes: 10 additions & 8 deletions packages/effect/src/for/inngest/step.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Duration, Effect } from 'effect';
import { runPromiseUnwrapped } from '../../run-promise-unwrapped';
import { Duration, Effect, FiberSet } from 'effect';
import { InngestError } from './index';

type OriginalStep = {
Expand All @@ -17,12 +16,15 @@ export function wrapStep<TStep>(step: TStep) {
const s = step as unknown as OriginalStep;

return {
run: <A, E>(id: string, fn: () => Effect.Effect<A, E, never>) =>
Effect.tryPromise({
try: () => s.run(id, () => runPromiseUnwrapped(fn())),
catch: (cause) =>
new InngestError({ message: `Step "${id}" failed`, cause }),
}) as Effect.Effect<A, InngestError>,
run: <A, E, R>(id: string, fn: () => Effect.Effect<A, E, R>) =>
Effect.gen(function* () {
const runPromise = yield* FiberSet.makeRuntimePromise<R>();
return yield* Effect.tryPromise({
try: () => s.run(id, () => runPromise(fn())),
catch: (cause) =>
new InngestError({ message: `Step "${id}" failed`, cause }),
}) as Effect.Effect<A, InngestError, R>;
}),

sleep: (id: string, duration: Duration.DurationInput) =>
Effect.tryPromise({
Expand Down
2 changes: 2 additions & 0 deletions packages/scratchpad/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"@ai-sdk/xai": "^3.0.29",
"@effect/opentelemetry": "^0.60.0",
"@effect/platform": "^0.93.6",
"@effect/platform-bun": "^0.87.1",
"@opentelemetry/exporter-trace-otlp-http": "^0.208.0",
"@opentelemetry/sdk-trace-base": "^2.2.0",
"@orpc/client": "^1.13.2",
Expand All @@ -20,6 +21,7 @@
"ff-ai": "workspace:*",
"ff-effect": "workspace:*",
"ff-serv": "workspace:*",
"inngest": "^3.52.4",
"postgres": "^3.4.7"
},
"devDependencies": {
Expand Down
97 changes: 97 additions & 0 deletions packages/scratchpad/src/examples/inngest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { FetchHttpClient, HttpClient } from '@effect/platform';
import * as BunContext from '@effect/platform-bun/BunContext';
import * as BunRuntime from '@effect/platform-bun/BunRuntime';
import { Effect, Layer } from 'effect';
import * as S from 'effect/Schema';
import { createInngest } from 'ff-effect/for/inngest';
import { basicHandler, createFetchHandler, Logger } from 'ff-serv';
import * as InngestSdk from 'inngest';

/**
* # How to use
* 1. Start inngest cli `npx --ignore-scripts=false inngest-cli@latest dev`
* 2. Run this
**/

class Nothing extends Effect.Service<Nothing>()('nothing', {
effect: Effect.gen(function* () {
return {
say: (message: string) => Logger.info(message),
};
}),
}) {}

const Inngest = createInngest(
Effect.succeed(
new InngestSdk.Inngest({
id: 'dev',
schemas: new InngestSdk.EventSchemas().fromSchema({
'say-hello': S.standardSchemaV1(
S.Struct({
message: S.String,
}),
),
}),
}),
),
);

const program = Effect.gen(function* () {
const helloWorld = yield* Inngest.createFunction(
{ id: 'asdf' },
{ event: 'say-hello' },
({ event, step, runId }) =>
Effect.gen(function* () {
yield* Logger.info('Workflow starting');

yield* step.run('one', () =>
Effect.gen(function* () {
const svc = yield* Nothing;
yield* svc.say(`Hello ${event.data.message}`);
}),
);
}).pipe(Effect.annotateLogs({ runId })),
);

const server = Bun.serve({
fetch: yield* createFetchHandler(
[
basicHandler(
'/api/inngest',
yield* Inngest.fetchHandler({ functions: [helloWorld] }),
),
basicHandler('/invoke', () =>
Effect.gen(function* () {
yield* Inngest.send({
name: 'say-hello',
data: { message: 'world' },
});
return new Response('ok');
}),
),
],
{ debug: false },
),
});
yield* Effect.addFinalizer(() => Effect.promise(() => server.stop()));
yield* Logger.info(`Server started in port ${server.port}`);

yield* HttpClient.put(`http://localhost:${server.port}/api/inngest`);
yield* Logger.info('Called inngest put');

yield* Effect.never;
});

BunRuntime.runMain(
program.pipe(
Effect.provide(
Layer.mergeAll(
BunContext.layer,
Nothing.Default,
Inngest.layer,
Layer.scope,
FetchHttpClient.layer,
),
),
),
);