From a7b1130a14f139d7d042fc215083e2df09886233 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ushio Date: Thu, 22 Jan 2026 14:56:47 -0800 Subject: [PATCH] Add first-class Kafka trigger and output binding support - Add types/kafka.d.ts with full type definitions for Kafka bindings - Update types/index.d.ts to export Kafka types - Update types/app.d.ts with kafka() function declaration - Update types/trigger.d.ts with kafka trigger function - Update types/output.d.ts with kafka output function - Update src/app.ts with kafka() implementation - Update src/trigger.ts with kafkaTrigger binding - Update src/output.ts with kafka output binding Kafka binding options include: - brokerList, topic, consumerGroup (trigger) - schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword - authenticationMode, protocol, username, password - SSL certificate locations - cardinality, lagThreshold (trigger) - batchSize, maxMessageBytes, enableIdempotence (output) --- src/app.ts | 5 + src/output.ts | 9 ++ src/trigger.ts | 9 ++ types/app.d.ts | 8 ++ types/index.d.ts | 1 + types/kafka.d.ts | 341 +++++++++++++++++++++++++++++++++++++++++++++ types/output.d.ts | 6 + types/trigger.d.ts | 6 + 8 files changed, 385 insertions(+) create mode 100644 types/kafka.d.ts diff --git a/src/app.ts b/src/app.ts index 84d4be7..e2eed3c 100644 --- a/src/app.ts +++ b/src/app.ts @@ -11,6 +11,7 @@ import { HttpHandler, HttpMethod, HttpMethodFunctionOptions, + KafkaFunctionOptions, McpToolFunctionOptions, MySqlFunctionOptions, ServiceBusQueueFunctionOptions, @@ -146,6 +147,10 @@ export function webPubSub(name: string, options: WebPubSubFunctionOptions): void generic(name, convertToGenericOptions(options, trigger.webPubSub)); } +export function kafka(name: string, options: KafkaFunctionOptions): void { + generic(name, convertToGenericOptions(options, trigger.kafka)); +} + /** * Registers an MCP Tool function in your app. * This function is triggered by MCP Tool events and allows you to define the behavior of the function. diff --git a/src/output.ts b/src/output.ts index 0f0a781..563e907 100644 --- a/src/output.ts +++ b/src/output.ts @@ -12,6 +12,8 @@ import { GenericOutputOptions, HttpOutput, HttpOutputOptions, + KafkaOutput, + KafkaOutputOptions, MySqlOutput, MySqlOutputOptions, ServiceBusQueueOutput, @@ -115,6 +117,13 @@ export function webPubSub(options: WebPubSubOutputOptions): WebPubSubOutput { }); } +export function kafka(options: KafkaOutputOptions): KafkaOutput { + return addOutputBindingName({ + ...options, + type: 'kafka', + }); +} + export function generic(options: GenericOutputOptions): FunctionOutput { return addOutputBindingName(options); } diff --git a/src/trigger.ts b/src/trigger.ts index 1a25f69..403c16d 100644 --- a/src/trigger.ts +++ b/src/trigger.ts @@ -12,6 +12,8 @@ import { GenericTriggerOptions, HttpTrigger, HttpTriggerOptions, + KafkaTrigger, + KafkaTriggerOptions, McpToolTrigger, McpToolTriggerOptions, MySqlTrigger, @@ -129,6 +131,13 @@ export function webPubSub(options: WebPubSubTriggerOptions): WebPubSubTrigger { }); } +export function kafka(options: KafkaTriggerOptions): KafkaTrigger { + return addTriggerBindingName({ + ...options, + type: 'kafkaTrigger', + }); +} + /** * Creates an MCP Tool trigger configuration. * This function is used to define an MCP Tool trigger for an Azure Function. diff --git a/types/app.d.ts b/types/app.d.ts index b49952c..cb16136 100644 --- a/types/app.d.ts +++ b/types/app.d.ts @@ -5,6 +5,7 @@ import { CosmosDBFunctionOptions } from './cosmosDB'; import { EventGridEvent, EventGridFunctionOptions } from './eventGrid'; import { EventHubFunctionOptions } from './eventHub'; import { GenericFunctionOptions } from './generic'; +import { KafkaFunctionOptions } from './kafka'; import { HttpFunctionOptions, HttpHandler, HttpMethodFunctionOptions } from './http'; import { McpToolFunctionOptions } from './mcpTool'; import { MySqlFunctionOptions } from './mySql'; @@ -182,6 +183,13 @@ export function sql(name: string, options: SqlFunctionOptions): */ export function mySql(name: string, options: MySqlFunctionOptions): void; +/** + * Registers a function in your app that will be triggered whenever a message is added to a Kafka topic + * @param name The name of the function. The name must be unique within your app and will mostly be used for your own tracking purposes + * @param options Configuration options describing the inputs, outputs, and handler for this function + */ +export function kafka(name: string, options: KafkaFunctionOptions): void; + /** * Registers a generic function in your app that will be triggered based on the type specified in `options.trigger.type` * Use this method if your desired trigger type does not already have its own method diff --git a/types/index.d.ts b/types/index.d.ts index c25dbc5..db93014 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -10,6 +10,7 @@ export * from './cosmosDB.v4'; export * from './eventGrid'; export * from './eventHub'; export * from './generic'; +export * from './kafka'; export * from './hooks/appHooks'; export * from './hooks/HookContext'; export * from './hooks/invocationHooks'; diff --git a/types/kafka.d.ts b/types/kafka.d.ts new file mode 100644 index 0000000..b355d78 --- /dev/null +++ b/types/kafka.d.ts @@ -0,0 +1,341 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. + +import { FunctionOptions, FunctionOutput, FunctionResult, FunctionTrigger, RetryOptions } from './index'; +import { InvocationContext } from './InvocationContext'; + +export type KafkaHandler = (messages: T, context: InvocationContext) => FunctionResult; + +export interface KafkaFunctionOptions extends KafkaTriggerOptions, Partial { + handler: KafkaHandler; + + trigger?: KafkaTrigger; + + /** + * An optional retry policy to rerun a failed execution until either successful completion occurs or the maximum number of retries is reached. + * Learn more [here](https://learn.microsoft.com/azure/azure-functions/functions-bindings-error-pages) + */ + retry?: RetryOptions; +} + +/** + * SASL mechanism to use for authentication. + */ +export type BrokerAuthenticationMode = 'NotSet' | 'Gssapi' | 'Plain' | 'ScramSha256' | 'ScramSha512' | 'OAuthBearer'; + +/** + * Security protocol used to communicate with brokers. + */ +export type BrokerProtocol = 'NotSet' | 'Plaintext' | 'Ssl' | 'SaslPlaintext' | 'SaslSsl'; + +/** + * OAuth Bearer method for authentication. + */ +export type OAuthBearerMethod = 'Default' | 'Oidc'; + +export interface KafkaTriggerOptions { + /** + * The Kafka broker list (comma-separated list of broker addresses). + * Can be an app setting name. + */ + brokerList: string; + + /** + * The Kafka topic to consume messages from. + */ + topic: string; + + /** + * The consumer group ID. + */ + consumerGroup?: string; + + /** + * The EventHub connection string when using Kafka protocol header feature of Azure EventHubs. + */ + eventHubConnectionString?: string; + + /** + * Set to `many` in order to enable batching. If omitted or set to `one`, a single message is passed to the function. + */ + cardinality?: 'many' | 'one'; + + /** + * Defines how Functions runtime should treat the parameter value. + * Use `string` for string data, `binary` for binary data. + */ + dataType?: 'string' | 'binary'; + + /** + * Gets or sets the Avro schema for generic record deserialization. + * Should be used only if a generic record should be generated. + */ + avroSchema?: string; + + /** + * SASL mechanism to use for authentication. + * Default: NotSet + */ + authenticationMode?: BrokerAuthenticationMode; + + /** + * SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms. + */ + username?: string; + + /** + * SASL password for use with the PLAIN and SASL-SCRAM-.. mechanisms. + */ + password?: string; + + /** + * Security protocol used to communicate with brokers. + * Default: NotSet + */ + protocol?: BrokerProtocol; + + /** + * Path to client's private key (PEM) used for authentication. + * ssl.key.location in librdkafka + */ + sslKeyLocation?: string; + + /** + * Path to CA certificate file for verifying the broker's certificate. + * ssl.ca.location in librdkafka + */ + sslCaLocation?: string; + + /** + * Path to client's certificate. + * ssl.certificate.location in librdkafka + */ + sslCertificateLocation?: string; + + /** + * Password for client's certificate. + * ssl.key.password in librdkafka + */ + sslKeyPassword?: string; + + /** + * URL for the Avro Schema Registry. + */ + schemaRegistryUrl?: string; + + /** + * Username for the Avro Schema Registry. + */ + schemaRegistryUsername?: string; + + /** + * Password for the Avro Schema Registry. + */ + schemaRegistryPassword?: string; + + /** + * Maximum number of unprocessed messages a worker is expected to have at an instance. + * Used for target-based scaling to determine the number of worker instances. + * Default: 1000 + */ + lagThreshold?: number; + + /** + * OAuth Bearer method. + * Either 'Default' or 'Oidc' + */ + oAuthBearerMethod?: OAuthBearerMethod; + + /** + * OAuth Bearer Client Id. + * Specify only when OAuthBearerMethod is 'Oidc' + */ + oAuthBearerClientId?: string; + + /** + * OAuth Bearer Client Secret. + * Specify only when OAuthBearerMethod is 'Oidc' + */ + oAuthBearerClientSecret?: string; + + /** + * OAuth Bearer scope. + * Specify only when OAuthBearerMethod is 'Oidc' + */ + oAuthBearerScope?: string; + + /** + * OAuth Bearer token endpoint URL. + * Specify only when OAuthBearerMethod is 'Oidc' + */ + oAuthBearerTokenEndpointUrl?: string; + + /** + * OAuth Bearer extensions. + * Allow additional information to be provided to the broker. + * Comma-separated list of key=value pairs. + */ + oAuthBearerExtensions?: string; +} +export type KafkaTrigger = FunctionTrigger & KafkaTriggerOptions; + +export interface KafkaOutputOptions { + /** + * The Kafka broker list (comma-separated list of broker addresses). + * Can be an app setting name. + */ + brokerList: string; + + /** + * The Kafka topic to produce messages to. + */ + topic: string; + + /** + * Gets or sets the Avro schema for generic record serialization. + * Should be used only if a generic record should be generated. + */ + avroSchema?: string; + + /** + * Gets or sets the maximum transmit message size in bytes. + * Default: 1MB + */ + maxMessageBytes?: number; + + /** + * Maximum number of messages batched in one MessageSet. + * Default: 10000 + */ + batchSize?: number; + + /** + * When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. + * Default: false + */ + enableIdempotence?: boolean; + + /** + * Local message timeout in milliseconds. + * Default: 300000 + */ + messageTimeoutMs?: number; + + /** + * The ack timeout of the producer request in milliseconds. + * Default: 5000 + */ + requestTimeoutMs?: number; + + /** + * How many times to retry sending a failing message. + * Default: 2147483647 + */ + maxRetries?: number; + + /** + * SASL mechanism to use for authentication. + * Default: NotSet + */ + authenticationMode?: BrokerAuthenticationMode; + + /** + * SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms. + */ + username?: string; + + /** + * SASL password for use with the PLAIN and SASL-SCRAM-.. mechanisms. + */ + password?: string; + + /** + * Security protocol used to communicate with brokers. + * Default: NotSet + */ + protocol?: BrokerProtocol; + + /** + * Path to client's private key (PEM) used for authentication. + * ssl.key.location in librdkafka + */ + sslKeyLocation?: string; + + /** + * Path to CA certificate file for verifying the broker's certificate. + * ssl.ca.location in librdkafka + */ + sslCaLocation?: string; + + /** + * Path to client's certificate. + * ssl.certificate.location in librdkafka + */ + sslCertificateLocation?: string; + + /** + * Password for client's certificate. + * ssl.key.password in librdkafka + */ + sslKeyPassword?: string; + + /** + * Linger.MS property provides the time between batches of messages being sent to cluster. + * Larger value allows more batching results in high throughput. + * Default: 5 + */ + lingerMs?: number; + + /** + * URL for the Avro Schema Registry. + */ + schemaRegistryUrl?: string; + + /** + * Username for the Avro Schema Registry. + */ + schemaRegistryUsername?: string; + + /** + * Password for the Avro Schema Registry. + */ + schemaRegistryPassword?: string; + + /** + * OAuth Bearer method. + * Either 'Default' or 'Oidc' + */ + oAuthBearerMethod?: OAuthBearerMethod; + + /** + * OAuth Bearer Client Id. + * Specify only when OAuthBearerMethod is 'Oidc' + */ + oAuthBearerClientId?: string; + + /** + * OAuth Bearer Client Secret. + * Specify only when OAuthBearerMethod is 'Oidc' + */ + oAuthBearerClientSecret?: string; + + /** + * OAuth Bearer scope. + * Specify only when OAuthBearerMethod is 'Oidc' + */ + oAuthBearerScope?: string; + + /** + * OAuth Bearer token endpoint URL. + * Specify only when OAuthBearerMethod is 'Oidc' + */ + oAuthBearerTokenEndpointUrl?: string; + + /** + * OAuth Bearer extensions. + * Allow additional information to be provided to the broker. + * Comma-separated list of key=value pairs. + */ + oAuthBearerExtensions?: string; +} +export type KafkaOutput = FunctionOutput & KafkaOutputOptions; diff --git a/types/output.d.ts b/types/output.d.ts index e9e08d9..a6c7890 100644 --- a/types/output.d.ts +++ b/types/output.d.ts @@ -5,6 +5,7 @@ import { CosmosDBOutput, CosmosDBOutputOptions } from './cosmosDB'; import { EventGridOutput, EventGridOutputOptions } from './eventGrid'; import { EventHubOutput, EventHubOutputOptions } from './eventHub'; import { GenericOutputOptions } from './generic'; +import { KafkaOutput, KafkaOutputOptions } from './kafka'; import { HttpOutput, HttpOutputOptions } from './http'; import { FunctionOutput } from './index'; import { MySqlOutput, MySqlOutputOptions } from './mySql'; @@ -79,6 +80,11 @@ export function mySql(options: MySqlOutputOptions): MySqlOutput; */ export function webPubSub(options: WebPubSubOutputOptions): WebPubSubOutput; +/** + * [Link to docs and examples](https://docs.microsoft.com/azure/azure-functions/functions-bindings-kafka-output?pivots=programming-language-javascript) + */ +export function kafka(options: KafkaOutputOptions): KafkaOutput; + /** * A generic option that can be used for any output type * Use this method if your desired output type does not already have its own method diff --git a/types/trigger.d.ts b/types/trigger.d.ts index e8105d2..63eb12a 100644 --- a/types/trigger.d.ts +++ b/types/trigger.d.ts @@ -5,6 +5,7 @@ import { CosmosDBTrigger, CosmosDBTriggerOptions } from './cosmosDB'; import { EventGridTrigger, EventGridTriggerOptions } from './eventGrid'; import { EventHubTrigger, EventHubTriggerOptions } from './eventHub'; import { GenericTriggerOptions } from './generic'; +import { KafkaTrigger, KafkaTriggerOptions } from './kafka'; import { HttpTrigger, HttpTriggerOptions } from './http'; import { FunctionTrigger } from './index'; import { McpToolFunctionOptions, McpToolTrigger } from './mcpTool'; @@ -96,6 +97,11 @@ export function webPubSub(options: WebPubSubTriggerOptions): WebPubSubTrigger; */ export function mcpTool(options: McpToolFunctionOptions): McpToolTrigger; +/** + * [Link to docs and examples](https://docs.microsoft.com/azure/azure-functions/functions-bindings-kafka-trigger?pivots=programming-language-javascript) + */ +export function kafka(options: KafkaTriggerOptions): KafkaTrigger; + /** * A generic option that can be used for any trigger type * Use this method if your desired trigger type does not already have its own method