Skip to content
2 changes: 1 addition & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ promotions:
parameters:
env_vars:
- name: TEST_SUITE
default_value: "@smoke|@topic-message-viewer|@evolve-schema|@produce-message-to-topic|@project-scaffolding|@flink-statements|@direct-connection-crud|@flink-artifacts"
default_value: "@smoke|@topic-message-viewer|@evolve-schema|@produce-message-to-topic|@project-scaffolding|@flink-statements|@direct-connection-crud|@flink-artifacts|@flink-udfs"
description: "Playwright E2E test tags to run, separated by |. By default, runs the smoketests and CUJ-related tests."
- name: VSCODE_VERSION
default_value: "stable"
Expand Down
4 changes: 2 additions & 2 deletions service.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ semaphore:
- name: TEST_SUITE
required: false
description: The test name or tag(s) (pipe-separated, e.g. `@ccloud|@direct`) to run. If not specified, will run all E2E tests.
default_value: "@smoke|@topic-message-viewer|@evolve-schema|@produce-message-to-topic|@project-scaffolding|@flink-statements|@direct-connection-crud|@flink-artifacts"
default_value: "@smoke|@topic-message-viewer|@evolve-schema|@produce-message-to-topic|@project-scaffolding|@flink-statements|@direct-connection-crud|@flink-artifacts|@flink-udfs"
- name: PLATFORM
required: true
options:
Expand Down Expand Up @@ -119,7 +119,7 @@ semaphore:
- name: TEST_SUITE
required: true
description: The test tag(s) (pipe-separated, e.g. `@smoke|@topic-message-viewer`) to run. By default, runs the smoke tests and critical user flow tests.
default_value: "@smoke|@topic-message-viewer|@evolve-schema|@produce-message-to-topic|@project-scaffolding|@flink-statements|@direct-connection-crud|@flink-artifacts"
default_value: "@smoke|@topic-message-viewer|@evolve-schema|@produce-message-to-topic|@project-scaffolding|@flink-statements|@direct-connection-crud|@flink-artifacts|@flink-udfs"
- name: PLATFORM
required: true
options:
Expand Down
1 change: 1 addition & 0 deletions src/models/flinkUDF.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ export class FlinkUdfTreeItem extends TreeItem {
this.id = resource.id;
this.resource = resource;
this.contextValue = `${resource.connectionType.toLowerCase()}-flink-udf`;
this.accessibilityInformation = { label: `Flink UDF: ${resource.name}` };

this.description = `${resource.parametersSignature} → ${formatSqlType(resource.returnType)}`;
this.tooltip = createFlinkUdfToolTip(resource);
Expand Down
63 changes: 63 additions & 0 deletions tests/e2e/baseTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import { stubAllDialogs, stubDialog } from "electron-playwright-helpers";
import { createWriteStream, existsSync, mkdtempSync, readFileSync } from "fs";
import { tmpdir } from "os";
import path from "path";
import { fileURLToPath } from "url";
import { DEBUG_LOGGING_ENABLED } from "./constants";
import { Notification } from "./objects/notifications/Notification";
import { NotificationArea } from "./objects/notifications/NotificationArea";
import { Quickpick } from "./objects/quickInputs/Quickpick";
import { FlinkDatabaseView, SelectFlinkDatabase } from "./objects/views/FlinkDatabaseView";
import {
DEFAULT_CCLOUD_TOPIC_REPLICATION_FACTOR,
SelectKafkaCluster,
Expand All @@ -17,6 +19,7 @@ import {
import type { CCloudConnectionItem } from "./objects/views/viewItems/CCloudConnectionItem";
import type { DirectConnectionItem } from "./objects/views/viewItems/DirectConnectionItem";
import type { LocalConnectionItem } from "./objects/views/viewItems/LocalConnectionItem";
import type { ArtifactConfig } from "./types/artifact";
import type { DirectConnectionOptions, LocalConnectionOptions } from "./types/connection";
import { ConnectionType, FormConnectionType, SupportedAuthType } from "./types/connection";
import type { TopicConfig } from "./types/topic";
Expand Down Expand Up @@ -98,6 +101,16 @@ interface VSCodeFixtures {
* `name` for tests to reference.
*/
topic: string;

/**
* Configuration options for creating an artifact with the {@linkcode artifact} fixture.
*/
artifactConfig: ArtifactConfig | undefined;
/**
* Set up a Flink artifact based on the {@linkcode artifactConfig} option and return the
* artifact `name` for tests to reference. Tears down the artifact after the test completes.
*/
artifact: string;
}

export const test = testBase.extend<VSCodeFixtures>({
Expand Down Expand Up @@ -352,6 +365,56 @@ export const test = testBase.extend<VSCodeFixtures>({
await topicsView.loadTopics(connectionType, SelectKafkaCluster.FromResourcesView, clusterLabel);
await topicsView.deleteTopic(topicName);
},

// no default value, must be provided by test
artifactConfig: [undefined, { option: true }],

artifact: async ({ electronApp, page, connectionItem, artifactConfig }, use) => {
if (!artifactConfig) {
throw new Error("artifactConfig must be set, like `test.use({ artifactConfig: {} })`");
}

await expect(connectionItem.locator).toHaveAttribute("aria-expanded", "true");

const entrypoint = artifactConfig.entrypoint ?? SelectFlinkDatabase.FromDatabaseViewButton;
const jarPath =
artifactConfig.jarPath ??
path.join(
path.dirname(fileURLToPath(import.meta.url)),
"..",
"fixtures",
"flink-artifacts",
"udfs-simple.jar",
);

const artifactsView = new FlinkDatabaseView(page);

if (artifactConfig.provider && artifactConfig.region) {
await artifactsView.selectKafkaClusterByProviderRegion(
artifactConfig.provider,
artifactConfig.region,
);
} else {
await artifactsView.ensureExpanded();
await artifactsView.loadArtifacts(entrypoint);
}

// track the artifact name across try/finally so cleanup runs even if a
// post-upload setup step throws before use()
let artifactName: string | undefined;
try {
artifactName = await artifactsView.uploadFlinkArtifact(electronApp, jarPath);
await artifactsView.waitForUploadSuccess();
await use(artifactName);
} finally {
if (artifactName) {
await openConfluentSidebar(page);
await artifactsView.ensureExpanded();
// deleteFlinkArtifact searches internally; expanding here could be hidden by a prior search
await artifactsView.deleteFlinkArtifact(artifactName);
}
}
},
});

/**
Expand Down
74 changes: 71 additions & 3 deletions tests/e2e/objects/views/FlinkDatabaseView.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ export class FlinkDatabaseView extends SearchableView {
return this.treeItems.and(this.page.locator('[aria-label^="Flink Artifact: "]'));
}

/** Get the UDFs container item. */
get udfsContainer(): Locator {
return this.treeItems.filter({ hasText: /^UDFs/ }).first();
}

/** UDF items based on their accessibilityInformation label. */
get udfs(): Locator {
return this.treeItems.and(this.page.locator('[aria-label^="Flink UDF: "]'));
}

/**
* Load the Flink Artifacts view by selecting a Kafka cluster as the Flink database,
* using the specified entrypoint.
Expand Down Expand Up @@ -204,6 +214,63 @@ export class FlinkDatabaseView extends SearchableView {
await this.expandContainer(this.artifactsContainer);
}

/** Expand the UDFs container to show any available UDF tree items. */
async expandUdfsContainer(): Promise<void> {
await this.expandContainer(this.udfsContainer);
}

/**
* Run the "Register As UDF" guided flow for the given artifact, providing `className` and
* `functionName` to the series of quickinput boxes.
*/
async startGuidedUdfCreation(
artifactName: string,
className: string,
functionName: string,
): Promise<void> {
await this.expandArtifactsContainer();
const artifactLocator = this.artifacts.filter({ hasText: artifactName });
await expect(artifactLocator).toHaveCount(1);
const artifactItem = new ViewItem(this.page, artifactLocator.first());
await artifactItem.rightClickContextMenuAction("Register As UDF");

const classNameInput = new InputBox(this.page);
await expect(classNameInput.locator).toBeVisible();
await classNameInput.input.fill(className);
await classNameInput.confirm();

const functionNameInput = new InputBox(this.page);
await expect(functionNameInput.locator).toBeVisible();
await functionNameInput.input.fill(functionName);
await functionNameInput.confirm();
}

/** Open the "Register With Flink SQL" document for the given artifact. */
async openUdfRegistrationDocument(artifactName: string): Promise<void> {
await this.expandArtifactsContainer();
const artifactLocator = this.artifacts.filter({ hasText: artifactName });
await expect(artifactLocator).toHaveCount(1);
const artifactItem = new ViewItem(this.page, artifactLocator.first());
await artifactItem.rightClickContextMenuAction("Register With Flink SQL");
}

/** Delete a Flink UDF via its tree-item context menu. */
async deleteFlinkUdf(functionName: string): Promise<void> {
// scope to UDF tree items so the search result can't match a same-named item from a
// different container (e.g. an artifact reusing the suffix)
const udfLocator = await this.getItemByLabel(functionName, this.udfs);
const udfItem = new ViewItem(this.page, udfLocator);
await udfItem.locator.scrollIntoViewIfNeeded();
await expect(udfItem.locator).toBeVisible();

await udfItem.rightClickContextMenuAction("Delete UDF");
const notificationArea = new NotificationArea(this.page);
const successNotifications = notificationArea.infoNotifications.filter({
hasText: "deleted successfully",
});
await expect(successNotifications).not.toHaveCount(0);
}

/**
* Click the upload button on the Artifacts container to initiate the artifact upload flow.
*/
Expand Down Expand Up @@ -284,9 +351,10 @@ export class FlinkDatabaseView extends SearchableView {
* @param artifactName - The name of the artifact to delete
*/
async deleteFlinkArtifact(artifactName: string): Promise<void> {
const artifactLocator = this.artifacts.filter({ hasText: artifactName });
await expect(artifactLocator).toHaveCount(1);
const artifactItem = new ViewItem(this.page, artifactLocator.first());
// scope to artifact tree items so the search result can't match a same-named item from a
// different container (e.g. a UDF reusing the suffix)
const artifactLocator = await this.getItemByLabel(artifactName, this.artifacts);
const artifactItem = new ViewItem(this.page, artifactLocator);
await artifactItem.locator.scrollIntoViewIfNeeded();
await expect(artifactItem.locator).toBeVisible();

Expand Down
36 changes: 29 additions & 7 deletions tests/e2e/specs/flinkArtifact.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,23 @@ test.describe("Flink Artifacts", { tag: [Tag.CCloud, Tag.FlinkArtifacts] }, () =
await expect(connectionItem.locator).toHaveAttribute("aria-expanded", "true");
});

test.afterEach(async () => {
test.afterEach(async ({ page }) => {
// server-side cleanup: if the happy-path test didn't reach its inline delete
// (e.g. an earlier assertion failed), drop the orphaned artifact here so
// reruns don't pile up dangling artifacts on the cluster. Best-effort: a
// failed cleanup must not mask the original test failure, hence the catch.
if (uploadedArtifactName) {
try {
const artifactsView = new FlinkDatabaseView(page);
await openConfluentSidebar(page);
await artifactsView.ensureExpanded();
await artifactsView.deleteFlinkArtifact(uploadedArtifactName);
} catch (err) {
console.warn(`failed to clean up orphaned artifact "${uploadedArtifactName}":`, err);
}
uploadedArtifactName = undefined;
}

// Only delete temporary test files, not permanent fixtures
const permanentFixture = path.join(fixturesDir, "udfs-simple.jar");
if (artifactPath && existsSync(artifactPath) && artifactPath !== permanentFixture) {
Expand All @@ -36,6 +52,8 @@ test.describe("Flink Artifacts", { tag: [Tag.CCloud, Tag.FlinkArtifacts] }, () =

/** The path to an artifact created by the test suite, to be cleaned up in afterEach. */
let artifactPath: string | undefined;
/** The name of an artifact uploaded to CCloud, tracked so afterEach can clean it up if a happy-path test fails before its inline deletion runs. */
let uploadedArtifactName: string | undefined;

const fixturesDir = path.join(__dirname, "..", "..", "fixtures", "flink-artifacts");

Expand Down Expand Up @@ -83,7 +101,7 @@ test.describe("Flink Artifacts", { tag: [Tag.CCloud, Tag.FlinkArtifacts] }, () =

await artifactsView.ensureExpanded();
await artifactsView.loadArtifacts(entrypoint);
const uploadedArtifactName = await startUploadFlow(
const artifactName = await startUploadFlow(
entrypoint,
page,
electronApp,
Expand All @@ -92,22 +110,26 @@ test.describe("Flink Artifacts", { tag: [Tag.CCloud, Tag.FlinkArtifacts] }, () =
region,
artifactPath,
);
// record the upload so afterEach can clean it up if the rest of the
// test throws before the inline deletion below
uploadedArtifactName = artifactName;

await artifactsView.waitForUploadSuccess();
const notificationArea = new NotificationArea(page);
const successNotifications = notificationArea.infoNotifications.filter({
hasText: "uploaded successfully",
});
await expect(successNotifications.first()).toBeVisible();
const artifactViewItem = await artifactsView.getDatabaseResourceByLabel(
uploadedArtifactName,
artifactName,
artifactsView.artifactsContainer,
);

await expect(artifactViewItem).toBeVisible();
await artifactsView.deleteFlinkArtifact(uploadedArtifactName);
await expect(artifactsView.artifacts.filter({ hasText: uploadedArtifactName })).toHaveCount(
0,
);
await artifactsView.deleteFlinkArtifact(artifactName);
await expect(artifactsView.artifacts.filter({ hasText: artifactName })).toHaveCount(0);
// inline deletion succeeded, so afterEach has no server-side work to do
uploadedArtifactName = undefined;
});

test(`should fail to upload a jar exceeding the file limit [${provider}/${region}] - ${testName}`, async ({
Expand Down
88 changes: 88 additions & 0 deletions tests/e2e/specs/flinkUdf.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { expect } from "@playwright/test";
import { test } from "../baseTest";
import { TextDocument } from "../objects/editor/TextDocument";
import { NotificationArea } from "../objects/notifications/NotificationArea";
import { FlinkDatabaseView } from "../objects/views/FlinkDatabaseView";
import { Tag } from "../tags";
import { ConnectionType } from "../types/connection";
import { randomHexString } from "../utils/strings";

test.describe("Flink UDFs", { tag: [Tag.CCloud, Tag.FlinkUDFs] }, () => {
test.use({ connectionType: ConnectionType.Ccloud });

// TODO: add GCP, see https://github.com/confluentinc/vscode/issues/2817
const providers = [
{ provider: "AWS", region: "us-east-2" },
{ provider: "AZURE", region: "eastus" },
];

for (const { provider, region } of providers) {
test.describe(provider, () => {
test.use({ artifactConfig: { provider, region } });

test("should create a UDF via the guided flow", async ({ page, artifact }) => {
const flinkDatabaseView = new FlinkDatabaseView(page);
const functionName = `test_udf_${randomHexString(6)}`;

await flinkDatabaseView.startGuidedUdfCreation(
artifact,
"io.confluent.udf.examples.scalar.SumScalarFunction",
functionName,
);

const notificationArea = new NotificationArea(page);
const successNotifications = notificationArea.infoNotifications.filter({
hasText: "function created successfully",
});
await expect(successNotifications.first()).toBeVisible({ timeout: 60_000 });

// search auto-expands the matching container, so we don't need to expand UDFs manually
const udfItem = await flinkDatabaseView.getItemByLabel(functionName);
await expect(udfItem).toBeVisible();
});

test("should open a Flink SQL document with the appropriate content", async ({
page,
artifact,
}) => {
const flinkDatabaseView = new FlinkDatabaseView(page);

await flinkDatabaseView.openUdfRegistrationDocument(artifact);

// the registration command opens a fresh untitled doc with snippets
const registrationDoc = new TextDocument(page, "Untitled-1");
await expect(registrationDoc.locator).toBeVisible();
await expect(registrationDoc.editorContent).toContainText(/CREATE\s+FUNCTION/);
await expect(registrationDoc.editorContent).toContainText(/USING\s+JAR/);
await expect(registrationDoc.editorContent).toContainText("confluent-artifact://");
// not testing the statement submission behavior here since those are covered by the
// @flink-statements tests, and the document is just a template with snippet placeholders
});

test("should delete a UDF", async ({ page, artifact }) => {
const flinkDatabaseView = new FlinkDatabaseView(page);
const functionName = `test_udf_delete_${randomHexString(6)}`;

await flinkDatabaseView.startGuidedUdfCreation(
artifact,
"io.confluent.udf.examples.scalar.ConcatScalarFunction",
functionName,
);

const notificationArea = new NotificationArea(page);
const createNotifications = notificationArea.infoNotifications.filter({
hasText: "function created successfully",
});
await expect(createNotifications.first()).toBeVisible({ timeout: 60_000 });

const udfItemBefore = await flinkDatabaseView.getItemByLabel(functionName);
await expect(udfItemBefore).toBeVisible();

await flinkDatabaseView.deleteFlinkUdf(functionName);
// search filter from deleteFlinkUdf is still applied, so the deleted UDF drops out
const udfItemAfter = flinkDatabaseView.udfs.filter({ hasText: functionName });
await expect(udfItemAfter).toHaveCount(0);
});
});
}
});
2 changes: 2 additions & 0 deletions tests/e2e/tags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ export enum Tag {
DirectConnectionCRUD = "@direct-connection-crud",
/** Tests that upload and delete Flink artifacts. */
FlinkArtifacts = "@flink-artifacts",
/** Tests that create and delete Flink UDFs. */
FlinkUDFs = "@flink-udfs",

// Resource-/fixture-specific tags

Expand Down
Loading