Skip to content

Commit d8f9b60

Browse files
committed
Formalize job naming
Signed-off-by: Dominik Dębowczyk <dominik.debowczyk@getindata.com>
1 parent 0e10834 commit d8f9b60

3 files changed

Lines changed: 117 additions & 13 deletions

File tree

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
/* Copyright 2018-2025 contributors to the OpenLineage project
3+
/* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.openlineage.client.job;
7+
8+
import javax.annotation.Nullable;
9+
import lombok.Builder;
10+
11+
/**
12+
* Utility class for constructing job names according to the OpenLineage job naming conventions.
13+
*
14+
* <p>Supported job types include:
15+
*
16+
* <ul>
17+
* <li>Spark: {@code {appName}.{command}.{table}}
18+
* </ul>
19+
*/
20+
@SuppressWarnings("PMD.MissingStaticMethodInNonInstantiatableClass")
21+
public class Naming {
22+
23+
private Naming() {}
24+
25+
/** Interface representing a job name that can be resolved to a string. */
26+
public interface JobName {
27+
/**
28+
* Returns the formatted job name.
29+
*
30+
* @return a string representing the job name.
31+
*/
32+
String getName();
33+
}
34+
35+
/** Represents a Spark job name using the format: {@code {appName}.{command}.{table}}. */
36+
@Builder
37+
public static class Spark implements JobName {
38+
private final String appName;
39+
private final String command;
40+
private final String table;
41+
42+
/**
43+
* Constructs a new {@link Spark} job name.
44+
*
45+
* @param appName the Spark application name; must be non-null and non-empty
46+
* @param command the command or function being run; must be non-null and non-empty
47+
* @param table the target table; must be non-null and non-empty
48+
* @throws IllegalArgumentException if any argument is empty
49+
* @throws NullPointerException if any argument is null
50+
*/
51+
public Spark(String appName, @Nullable String command, @Nullable String table) {
52+
if (appName.isEmpty()) {
53+
throw new IllegalArgumentException("appName, command, and table must be non-empty");
54+
}
55+
this.appName = appName;
56+
this.command = command;
57+
this.table = table;
58+
}
59+
60+
/**
61+
* {@inheritDoc}
62+
*
63+
* @return the job name in the format: {@code {appName}.{command}.{table}}
64+
*/
65+
@Override
66+
public String getName() {
67+
return appName + (command != null ? "." + command : "") + (table != null ? "." + table : "");
68+
}
69+
}
70+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
/* Copyright 2018-2025 contributors to the OpenLineage project
3+
/* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.openlineage.client.job;
7+
8+
import static org.junit.jupiter.api.Assertions.assertEquals;
9+
import static org.junit.jupiter.api.Assertions.assertThrows;
10+
11+
import org.junit.jupiter.api.Test;
12+
13+
class NamingTest {
14+
15+
@Test
16+
void testSparkJobName() {
17+
Naming.Spark spark =
18+
Naming.Spark.builder()
19+
.appName("my_awesome_app")
20+
.command("execute_insert_into_hive_table")
21+
.table("mydb_mytable")
22+
.build();
23+
24+
assertEquals("my_awesome_app.execute_insert_into_hive_table.mydb_mytable", spark.getName());
25+
}
26+
27+
@Test
28+
void testSparkEmptyFieldsThrowException() {
29+
assertThrows(IllegalArgumentException.class, () -> new Naming.Spark("", "cmd", "table"));
30+
}
31+
32+
@Test
33+
void testSparkNullFieldsThrowException() {
34+
assertThrows(
35+
NullPointerException.class,
36+
() -> Naming.Spark.builder().appName(null).command("cmd").table("table").build());
37+
}
38+
}

integration/spark/shared/src/main/java/io/openlineage/spark/api/naming/JobNameBuilder.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import static io.openlineage.spark.agent.util.DatabricksUtils.prettifyDatabricksJobName;
99

10+
import io.openlineage.client.job.Naming;
1011
import io.openlineage.spark.agent.util.DatabricksUtils;
1112
import io.openlineage.spark.api.JobNameSuffixProvider;
1213
import io.openlineage.spark.api.OpenLineageContext;
@@ -34,32 +35,27 @@ public static String build(OpenLineageContext context) {
3435
}
3536

3637
Optional<SparkConf> sparkConf = context.getSparkContext().map(SparkContext::getConf);
37-
StringBuilder jobNameBuilder =
38-
new StringBuilder(applicationJobNameResolver.getJobName(context));
38+
Naming.Spark.SparkBuilder jobNameBuilder =
39+
Naming.Spark.builder().appName(applicationJobNameResolver.getJobName(context));
3940

4041
sparkNodeName(context)
4142
.ifPresent(
4243
nodeName ->
43-
jobNameBuilder
44-
.append(JOB_NAME_PARTS_SEPARATOR)
45-
.append(replaceDots(context, NameNormalizer.normalize(nodeName))));
44+
jobNameBuilder.command(replaceDots(context, NameNormalizer.normalize(nodeName))));
4645

4746
String jobName;
4847
if (context.getOpenLineageConfig().getJobName() != null
4948
&& !context.getOpenLineageConfig().getJobName().getAppendDatasetName()) {
5049
// no need to append output dataset name
51-
jobName = jobNameBuilder.toString();
50+
jobName = jobNameBuilder.build().getName();
5251
} else {
5352
// append output dataset as job suffix
54-
jobNameBuilder.append(
53+
jobNameBuilder.table(
5554
getJobSuffix(context)
56-
.map(
57-
suffix ->
58-
JOB_NAME_PARTS_SEPARATOR
59-
+ suffix.replace(JOB_NAME_PARTS_SEPARATOR, INNER_SEPARATOR))
60-
.orElse(""));
55+
.map(suffix -> suffix.replace(JOB_NAME_PARTS_SEPARATOR, INNER_SEPARATOR))
56+
.orElse(null));
6157

62-
jobName = jobNameBuilder.toString();
58+
jobName = jobNameBuilder.build().getName();
6359
if (sparkConf.isPresent() && DatabricksUtils.isRunOnDatabricksPlatform(sparkConf.get())) {
6460
jobName = prettifyDatabricksJobName(sparkConf.get(), jobName);
6561
}

0 commit comments

Comments
 (0)