In-memory EventBusPort adapter for Beignet applications.
Use it for tests, local development, and single-process apps. Distributed
systems should adapt a queue, stream, outbox, or message broker behind the same
EventBusPort interface.
bun add @beignet/provider-event-bus-memory
import { defineEvent } from "@beignet/core/events";
import { createInMemoryEventBus } from "@beignet/provider-event-bus-memory";
import { z } from "zod";
// Define your domain events
const UserRegistered = defineEvent("user.registered", {
payload: z.object({
userId: z.string(),
email: z.string().email(),
}),
});
// Create the event bus
const eventBus = createInMemoryEventBus();
// Subscribe to events
const unsubscribe = eventBus.subscribe(UserRegistered, (payload) => {
console.log(`User registered: ${payload.email}`);
// Send welcome email, update analytics, etc.
});
// Publish events
await eventBus.publish(UserRegistered, {
userId: "123",
email: "user@example.com",
});
// Unsubscribe when done
unsubscribe();
import { createNextServer } from "@beignet/next";
import { createInMemoryEventBusProvider } from "@beignet/provider-event-bus-memory";
import { appPorts } from "@/infra/app-ports";
import { routes } from "@/server/routes";
export const server = await createNextServer({
ports: appPorts,
providers: [createInMemoryEventBusProvider()],
createContext: ({ ports }) => ({
ports,
}),
routes,
});
Use createInMemoryEventBus() directly when you want to manually assign an
event bus under ports.
Pass a provider instrumentation target when creating the direct event bus to
record published events under the eventBus watcher:
const eventBus = createInMemoryEventBus({
instrumentation: ports,
});
import { defineEvent } from "@beignet/core/events";
import { z } from "zod";
const OrderPlaced = defineEvent("order.placed", {
payload: z.object({
orderId: z.string(),
total: z.number(),
}),
});
// Subscribe to events in your application setup
ctx.ports.eventBus.subscribe(OrderPlaced, async (payload) => {
// Send order confirmation email
await ctx.ports.mailer.send({
to: customer.email,
subject: "Order Confirmation",
text: `Your order ${payload.orderId} has been placed!`,
});
});
const placeOrder = useCase
.command("orders.place")
.input(PlaceOrderInput)
.output(OrderOutput)
.emits([OrderPlaced])
.run(async ({ ctx, input, events }) => {
return ctx.ports.uow.transaction(async (tx) => {
const order = await tx.orders.create(input);
await events.record(tx.events, OrderPlaced, {
orderId: order.id,
total: order.total,
});
return order;
});
});
publish<E>(event: E, payload: InferEventPayload<E>): Promise<void> | voidPublish a domain event with a typed payload.
await eventBus.publish(UserRegistered, {
userId: "123",
email: "user@example.com",
});
By default, the in-memory bus awaits handlers so local development and tests are
deterministic. Handler errors are rethrown unless onHandlerError is provided.
Use delivery: "fire-and-forget" when you intentionally want detached
in-process delivery.
subscribe<E>(event: E, handler: (payload) => void | Promise<void>): () => voidSubscribe to a domain event. Returns an unsubscribe function.
const unsubscribe = eventBus.subscribe(UserRegistered, (payload) => {
console.log(`New user: ${payload.email}`);
});
// Later, when you want to stop listening:
unsubscribe();
The event bus provides full type safety:
import type { EventBusPort } from "@beignet/core/ports";
import { definePorts } from "@beignet/core/ports";
// Type-safe ports definition
const appPorts = definePorts({
eventBus: createInMemoryEventBus() as EventBusPort,
// ... other ports
});
type AppPorts = typeof appPorts;
The in-memory event bus is perfect for testing:
import { describe, expect, it, mock } from "bun:test";
describe("User Registration", () => {
it("should publish UserRegistered event", async () => {
const eventBus = createInMemoryEventBus();
const handler = mock(() => {});
eventBus.subscribe(UserRegistered, handler);
// Perform registration
await registerUser(ctx, { email: "test@example.com" });
expect(handler).toHaveBeenCalledWith({
userId: expect.any(String),
email: "test@example.com",
});
});
});
publish(...) waits for subscribed handlers unless
delivery: "fire-and-forget" is configuredpublish(...) unless
onHandlerError is configuredGood for:
Not suitable for:
For production distributed systems, implement EventBusPort with a proper message broker.
MIT