diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java index 5ec1adb57a..476b36e82f 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java @@ -17,15 +17,79 @@ package org.apache.unomi.router.api; /** - * Created by amidani on 18/10/2017. + * Facade for the Apache Camel runtime used by the Unomi Router extension. + * Implementations manage dynamic routes for profile import (from sources such as Kafka or files) + * and profile export (collection and producer pipelines), and expose a minimal API so callers do not + * depend on Camel types unless they choose to. + * + *

Key responsibilities: + *

+ *

+ * + *

Typical usage: + *

+ *

+ * + * @since 1.0 */ public interface IRouterCamelContext { + /** + * Stops and removes an existing Camel route by id, if it is currently registered in the context. + * + * @param routeId Camel route identifier (usually aligned with import/export configuration id) + * @param fireEvent when {@code true}, signals that router lifecycle events may be emitted; the concrete + * implementation defines whether events are fired (reserved hook for observability) + * @throws Exception if Camel fails to remove the route definition + */ void killExistingRoute(String routeId, boolean fireEvent) throws Exception; + /** + * Refreshes the profile import reader route for the given configuration: removes any existing route with the + * same id, loads the {@link org.apache.unomi.router.api.ImportConfiguration}, and—for recurrent configs— + * registers a new route built from current settings. + * + * @param configId identifier of the import configuration whose reader route should be updated + * @param fireEvent when {@code true}, signals that router lifecycle events may be emitted after the update + * @throws Exception if route removal or registration fails + */ void updateProfileImportReaderRoute(String configId, boolean fireEvent) throws Exception; + /** + * Refreshes the profile export reader (collect) route for the given configuration: removes any existing route + * with the same id, loads the {@link org.apache.unomi.router.api.ExportConfiguration}, and—for recurrent + * configs—registers a new collect route built from current settings. + * + * @param configId identifier of the export configuration whose reader route should be updated + * @param fireEvent when {@code true}, signals that router lifecycle events may be emitted after the update + * @throws Exception if route removal or registration fails + */ void updateProfileExportReaderRoute(String configId, boolean fireEvent) throws Exception; + /** + * Enables or disables Camel route tracing on the underlying {@code CamelContext} for debugging (message flow, + * exchanges). Intended for diagnostics in development or incident analysis; may have performance impact when on. + * + * @param tracing {@code true} to enable Camel tracing, {@code false} to disable + */ void setTracing(boolean tracing); + + /** + * Returns the underlying Camel context instance. + * The API uses {@link Object} so consumers of this module are not required to depend on Camel at compile time. + * Callers that ship Camel may cast to {@code org.apache.camel.CamelContext}. + * + * @return the Camel context instance, or {@code null} if not initialized or not exposed + */ + default Object getCamelContext() { + return null; + } } diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportExportConfiguration.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportExportConfiguration.java index 10209bd154..7c9b5e2382 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportExportConfiguration.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportExportConfiguration.java @@ -16,15 +16,52 @@ */ package org.apache.unomi.router.api; - import org.apache.unomi.api.Item; +import org.apache.unomi.api.Item; - import java.util.ArrayList; - import java.util.HashMap; - import java.util.List; - import java.util.Map; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** - * Created by amidani on 21/06/2017. + * Base configuration class for import and export operations in Apache Unomi. + * This class serves as the foundation for both ImportConfiguration and ExportConfiguration, + * providing common configuration properties and behaviors needed for data transfer operations. + * + *

Key features and responsibilities: + *

+ *

+ * + *

Usage in Unomi: + *

+ *

+ * + *

Configuration properties include: + *

+ *

+ * + * @see org.apache.unomi.router.api.services.ImportExportConfigurationService + * @since 1.0 */ public class ImportExportConfiguration extends Item { @@ -53,28 +90,32 @@ public void setProperty(String name, Object value) { } /** - * Retrieves the name of the import configuration - * @return the name of the import configuration + * Retrieves the display name of this configuration. + * + * @return the name of this configuration */ public String getName() { return this.name; } /** - * Sets the name of the import configuration - * @param name the name of the import configuration + * Sets the display name of this configuration. + * + * @param name the name of this configuration */ public void setName(String name) { this.name = name; } /** - * Retrieves the description of the import configuration - * @return the description of the import configuration + * Retrieves the human-readable description of this configuration. + * + * @return the description of this configuration */ public String getDescription() { return this.description; } /** - * Sets the description of the import configuration - * @param description the description of the import configuration + * Sets the human-readable description of this configuration. + * + * @param description the description of this configuration */ public void setDescription(String description) { this.description = description; @@ -82,14 +123,18 @@ public void setDescription(String description) { /** - * Retrieves the config type of the import configuration - * @return the config type of the import configuration + * Returns the scheduling mode for this configuration ({@code recurrent} or {@code oneshot}). + * + * @return {@link RouterConstants#IMPORT_EXPORT_CONFIG_TYPE_RECURRENT} or + * {@link RouterConstants#IMPORT_EXPORT_CONFIG_TYPE_ONESHOT} */ public String getConfigType() { return this.configType; } /** - * Sets the config type of the import configuration - * @param configType the config type of the import configuration + * Sets the scheduling mode for this configuration. + * + * @param configType {@link RouterConstants#IMPORT_EXPORT_CONFIG_TYPE_RECURRENT} or + * {@link RouterConstants#IMPORT_EXPORT_CONFIG_TYPE_ONESHOT} */ public void setConfigType(String configType) { this.configType = configType; @@ -106,45 +151,45 @@ public Object getProperty(String name) { } /** - * Retrieves a Map of all property name - value pairs for this import configuration. + * Retrieves a map of all property name/value pairs for this configuration. * - * @return a Map of all property name - value pairs for this import configuration + * @return a map of all property name/value pairs for this configuration */ public Map getProperties() { return properties; } /** - * Retrieves the import configuration active flag. + * Returns whether this configuration is active (eligible for scheduled or triggered runs). * - * @return true if the import configuration is active false if not + * @return {@code true} if this configuration is active, {@code false} otherwise */ public boolean isActive() { return this.active; } /** - * Sets the active flag true/false. + * Sets whether this configuration is active. * - * @param active a boolean to set to active or inactive the import configuration + * @param active {@code true} to activate, {@code false} to deactivate */ public void setActive(boolean active) { this.active = active; } /** - * Retrieves the import configuration status for last execution. + * Retrieves the status of the last execution for this configuration. * - * @return status of the last execution + * @return status of the last execution, or {@code null} if none */ public String getStatus() { return this.status; } /** - * Sets status of the last execution. + * Sets the status of the last execution for this configuration. * - * @param status of the last execution + * @param status the status of the last execution */ public void setStatus(String status) { this.status = status; @@ -159,11 +204,16 @@ public String getColumnSeparator() { } /** - * Sets the column separator. - * @param columnSeparator property used to specify a line separator. Defaults to ',' + * Sets the column separator used when reading or writing delimited text (typically CSV). + * + * @param columnSeparator the column delimiter; must be exactly one character when non-null + * @throws IllegalArgumentException if {@code columnSeparator} is empty or longer than one character */ public void setColumnSeparator(String columnSeparator) { - if(this.columnSeparator !=null) { + if (columnSeparator != null) { + if (columnSeparator.length() != 1) { + throw new IllegalArgumentException("columnSeparator must be exactly one character"); + } this.columnSeparator = columnSeparator; } } @@ -187,9 +237,9 @@ public void setLineSeparator(String lineSeparator) { } /** - * Gets the multi value separator. + * Returns the separator used between multiple values within a single field. * - * @return multiValueSeparator multi value separator + * @return the multi-value separator (often {@code ";"}) */ public String getMultiValueSeparator() { return this.multiValueSeparator; @@ -206,9 +256,9 @@ public void setMultiValueSeparator(String multiValueSeparator) { } /** - * Gets the multi value delimiter. + * Returns the delimiter wrapping multi-valued fields when serialized. * - * @return multiValueDelimiter multi value delimiter + * @return the multi-value delimiter (may be empty when not used) */ public String getMultiValueDelimiter() { return this.multiValueDelimiter; @@ -225,8 +275,9 @@ public void setMultiValueDelimiter(String multiValueDelimiter) { } /** - * Retrieves the executions - * @return executions + * Returns the history of execution records for this configuration (timestamps, counts, errors, etc.). + * + * @return the list of execution maps; may be empty */ public List> getExecutions() { return this.executions; @@ -234,8 +285,9 @@ public List> getExecutions() { /** - * Sets the executions - * @param executions executions + * Replaces the execution history for this configuration. + * + * @param executions the new execution history list */ public void setExecutions(List> executions) { this.executions = executions; diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java index 30e40e0c81..957ac88e9e 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java @@ -22,56 +22,120 @@ import java.util.List; /** - * An extension of {@link Profile} to handle merge strategy and deletion when importing profiles + * A specialized Profile class designed for import operations in Apache Unomi. + * This class extends the standard {@link Profile} with additional properties and behaviors + * specifically needed during the profile import process. + * + *

Key features: + *

    + *
  • Controls which properties should be overwritten during import
  • + *
  • Specifies the property used for merging with existing profiles
  • + *
  • Handles profile deletion flags
  • + *
  • Controls merge vs full-replace behavior for existing profiles (see {@link #isOverwriteExistingProfiles()})
  • + *
+ *

+ * + *

Usage in Unomi: + *

    + *
  • Used by import processors to handle profile data
  • + *
  • Consumed by ProfileImportService for import operations
  • + *
  • Supports different import strategies (merge/overwrite/delete)
  • + *
+ *

+ * + * @see Profile + * @see org.apache.unomi.router.api.services.ProfileImportService + * @since 1.0 */ public class ProfileToImport extends Profile { + /** List of property names that should be overwritten during import */ private List propertiesToOverwrite; + + /** Property used to identify existing profiles for merging */ private String mergingProperty; + + /** Flag indicating if this profile should be deleted */ private boolean profileToDelete; - private boolean overwriteExistingProfiles; + /** Flag controlling whether to overwrite existing profile data */ + private boolean overwriteExistingProfiles; + /** + * Gets the list of properties that should be overwritten during import. + * These properties will be updated even if they already exist in the target profile. + * + * @return list of property names to overwrite + */ public List getPropertiesToOverwrite() { return this.propertiesToOverwrite; } + /** + * Sets the list of properties that should be overwritten during import. + * + * @param propertiesToOverwrite list of property names that should be overwritten + */ public void setPropertiesToOverwrite(List propertiesToOverwrite) { this.propertiesToOverwrite = propertiesToOverwrite; } + /** + * Checks if this profile is marked for deletion. + * When true, the matching profile in the system will be deleted rather than updated. + * + * @return true if the profile should be deleted, false otherwise + */ public boolean isProfileToDelete() { return this.profileToDelete; } + /** + * Sets whether this profile should be deleted during import. + * + * @param profileToDelete true to mark the profile for deletion, false otherwise + */ public void setProfileToDelete(boolean profileToDelete) { this.profileToDelete = profileToDelete; } + /** + * Indicates whether selective property updates are enabled for existing profiles. + * When {@code true} and {@link #getPropertiesToOverwrite()} is non-empty, only the listed properties + * are updated on a matching profile. Otherwise the entire properties map is replaced. + * + * @return {@code true} to apply selective overwrites when {@code propertiesToOverwrite} is set + */ public boolean isOverwriteExistingProfiles() { return this.overwriteExistingProfiles; } /** - * Sets the overwriteExistingProfiles flag. - * @param overwriteExistingProfiles flag used to specify if we want to overwrite existing profiles + * Sets whether selective property updates are enabled for existing profiles. + * + * @param overwriteExistingProfiles {@code true} to update only {@link #getPropertiesToOverwrite()} + * when that list is non-empty; {@code false} replaces the full properties map */ public void setOverwriteExistingProfiles(boolean overwriteExistingProfiles) { this.overwriteExistingProfiles = overwriteExistingProfiles; } + /** + * Gets the property name used for identifying existing profiles during merge operations. + * This property is used to match imported profiles with existing ones in the system. + * + * @return the name of the property used for profile matching + */ public String getMergingProperty() { return this.mergingProperty; } /** - * Sets the merging property. - * @param mergingProperty property used to check if the profile exist when merging + * Sets the property name used for identifying existing profiles during merge operations. + * + * @param mergingProperty the name of the property to use for profile matching */ public void setMergingProperty(String mergingProperty) { this.mergingProperty = mergingProperty; } - - - } diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/exceptions/BadProfileDataFormatException.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/exceptions/BadProfileDataFormatException.java index 85cf5ea807..c4156e929b 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/exceptions/BadProfileDataFormatException.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/exceptions/BadProfileDataFormatException.java @@ -17,18 +17,55 @@ package org.apache.unomi.router.api.exceptions; /** - * Created by amidani on 13/06/2017. + * Exception thrown when profile import line data cannot be parsed or converted during import processing. + * Indicates issues with CSV structure, column mapping, or property value conversion on an import line. + * + *

Common scenarios where this exception is thrown: + *

    + *
  • Invalid CSV format or column count mismatch on an import line
  • + *
  • Missing required profile fields in the mapping
  • + *
  • Property value conversion failures (e.g. unsupported type for a mapped field)
  • + *
  • Malformed multi-value fields
  • + *
  • Empty lines in import files
  • + *
+ *

+ * + *

Usage in Unomi: + *

    + *
  • Thrown by import line processors (e.g. {@code LineSplitProcessor})
  • + *
  • Handled by import route error handlers
  • + *
+ *

+ * + * @see org.apache.unomi.router.api.ProfileToImport + * @since 1.0 */ public class BadProfileDataFormatException extends Exception { + /** + * Constructs a new exception with {@code null} as its detail message. + * The cause is not initialized. + */ public BadProfileDataFormatException() { super(); } + /** + * Constructs a new exception with the specified detail message. + * The cause is not initialized. + * + * @param message the detail message describing the cause of the exception + */ public BadProfileDataFormatException(String message) { super(message); } + /** + * Constructs a new exception with the specified detail message and cause. + * + * @param message the detail message describing the cause of the exception + * @param cause the underlying cause of the exception + */ public BadProfileDataFormatException(String message, Throwable cause) { super(message, cause); } diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java index edb103cc52..023741708f 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java @@ -24,7 +24,40 @@ import java.util.Map; /** - * A service to access and operate on {@link ImportConfiguration}s / {@link ExportConfiguration}s. + * Service interface for managing import and export configurations in Apache Unomi. + * This service provides CRUD operations for {@link ImportConfiguration}s and {@link ExportConfiguration}s, + * as well as functionality to manage the lifecycle of data transfer configurations. + * + *

Key responsibilities: + *

    + *
  • Managing the lifecycle of import/export configurations
  • + *
  • Providing CRUD operations for configurations
  • + *
  • Coordinating with Camel routes for configuration updates
  • + *
  • Tracking configuration changes that need route updates
  • + *
+ *

+ * + *

Usage in Unomi: + *

    + *
  • Used by REST endpoints to manage import/export configurations
  • + *
  • Consumed by Camel routes to get configuration updates
  • + *
  • Utilized by admin interfaces for configuration management
  • + *
+ *

+ * + *

Implementation considerations: + *

    + *
  • Implementations should handle configuration persistence
  • + *
  • Thread safety should be considered for concurrent operations
  • + *
  • Configuration changes should be properly propagated to running routes
  • + *
+ *

+ * + * @param The type of configuration (ImportConfiguration or ExportConfiguration) + * @see ImportConfiguration + * @see ExportConfiguration + * @see RouterConstants.CONFIG_CAMEL_REFRESH + * @since 1.0 */ public interface ImportExportConfigurationService { @@ -38,7 +71,7 @@ public interface ImportExportConfigurationService { /** * Retrieves the import/export configuration identified by the specified identifier. * - * @param configId the identifier of the profile to retrieve + * @param configId the identifier of the configuration to retrieve * @return the import/export configuration identified by the specified identifier or * {@code null} if no such import/export configuration exists */ @@ -61,8 +94,11 @@ public interface ImportExportConfigurationService { void delete(String configId); /** - * Used by camel route system to get the latest changes on configs and reflect changes on camel routes if necessary - * @return map of configId per operation to be done in camel + * Consumes pending configuration changes for the Camel router layer. + * Implementations typically dequeue IDs whose configurations were updated or removed so that + * routes can be refreshed accordingly. + * + * @return a map from configuration ID to the refresh operation ({@link RouterConstants.CONFIG_CAMEL_REFRESH}) */ Map consumeConfigsToBeRefresh(); } diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileExportService.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileExportService.java index dc0d81df2c..ccdc3711b6 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileExportService.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileExportService.java @@ -23,12 +23,61 @@ import java.util.Collection; /** - * Created by amidani on 30/06/2017. + * Service interface for handling the export of profiles from Apache Unomi. + * This service is responsible for extracting profiles based on segment criteria + * and converting them into the appropriate export format (e.g., CSV). + * + *

Key responsibilities: + *

    + *
  • Extracting profiles based on segment criteria
  • + *
  • Converting profiles to export format
  • + *
  • Handling data formatting and transformation
  • + *
  • Managing export file generation
  • + *
+ *

+ * + *

Usage in Unomi: + *

    + *
  • Called by export route processors to handle profile extraction
  • + *
  • Used during scheduled export operations
  • + *
  • Integrated with Unomi's segmentation system
  • + *
+ *

+ * + *

Implementation considerations: + *

    + *
  • Must handle large data sets efficiently
  • + *
  • Should implement proper error handling
  • + *
  • Must respect profile property formatting
  • + *
  • Should handle multi-valued properties
  • + *
+ *

+ * + * @see Profile + * @see ExportConfiguration + * @see PropertyType + * @since 1.0 */ public interface ProfileExportService { + /** + * Extracts profiles belonging to a specified segment and formats them for export. + * Implementations typically query profiles by segment, build CSV content (including line separators + * between rows), append an execution record to the configuration, and persist the updated configuration. + * + * @param exportConfiguration the configuration specifying export parameters and format + * @return CSV (or configured delimited) content for the extracted profiles + */ String extractProfilesBySegment(ExportConfiguration exportConfiguration); + /** + * Converts a single profile to one delimited row according to the export configuration mapping. + * Does not append line separators; callers or export routes add separators between rows. + * + * @param profile the profile to convert + * @param exportConfiguration the configuration specifying the export format + * @return one row of delimited profile data (no trailing line separator) + */ String convertProfileToCSVLine(Profile profile, ExportConfiguration exportConfiguration); } diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java index aa7d1829d7..0d008bbee7 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java @@ -21,9 +21,54 @@ import java.lang.reflect.InvocationTargetException; /** - * Created by amidani on 20/05/2017. + * Service interface for handling the import of individual profiles into Apache Unomi. + * This service is responsible for the actual processing and storage of imported profile data, + * including merging with existing profiles or creating new ones as needed. + * + *

Key responsibilities: + *

    + *
  • Processing individual profile imports
  • + *
  • Merging imported data with existing profiles
  • + *
  • Handling profile creation for new imports
  • + *
  • Managing profile deletion when specified
  • + *
+ *

+ * + *

Usage in Unomi: + *

    + *
  • Called by import route processors to handle individual profile data
  • + *
  • Used during batch import operations
  • + *
  • Integrated with Unomi's profile management system
  • + *
+ *

+ * + *

Implementation considerations: + *

    + *
  • Must handle profile merging strategies defined on {@link ProfileToImport}
  • + *
  • Should implement proper error handling
  • + *
  • Must maintain data consistency
  • + *
  • Expects property values already parsed (type conversion is done upstream, e.g. by import processors)
  • + *
+ *

+ * + * @see ProfileToImport + * @see org.apache.unomi.api.Profile + * @since 1.0 */ public interface ProfileImportService { + /** + * Processes a profile for import, handling the save, merge, or delete operation as specified. + * This method is the core functionality for profile import processing, determining whether to: + * - Create a new profile + * - Merge with an existing profile + * - Delete an existing profile + * + * @param profileToImport the profile data to be imported, containing all necessary information + * for the import operation including the operation type + * @return true if the operation was successful, false otherwise + * @throws InvocationTargetException if there is an error during property mapping + * @throws IllegalAccessException if there is an error accessing profile properties + */ boolean saveMergeDeleteImportedProfile(ProfileToImport profileToImport) throws InvocationTargetException, IllegalAccessException; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java index 1ea03eb3a6..ae31b63006 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java @@ -22,17 +22,42 @@ import java.util.List; /** - * Created by amidani on 28/06/2017. + * A bean that handles the collection of profiles based on segment criteria. + * This class provides functionality to extract profiles from Unomi's persistence + * layer based on segment membership. + * + *

Features: + *

    + *
  • Segment-based profile extraction via persistence queries
  • + *
  • Integration with Unomi's persistence service
  • + *
+ *

+ * + * @since 1.0 */ public class CollectProfileBean { private PersistenceService persistenceService; + /** + * Returns all profiles that belong to the given segment. + *

+ * Note: the current implementation may load a large result set into memory; see UNOMI-759. + *

+ * + * @param segment the segment identifier to match (stored index {@code "segments"}) + * @return profiles for that segment; may be empty, never {@code null} + */ public List extractProfileBySegment(String segment) { // TODO: UNOMI-759 avoid loading all profiles in RAM here return persistenceService.query("segments", segment,null, Profile.class); } + /** + * Sets the persistence service used for profile queries. + * + * @param persistenceService the Unomi persistence service to use + */ public void setPersistenceService(PersistenceService persistenceService) { this.persistenceService = persistenceService; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java index 4d329209d3..cfd167d04f 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java @@ -49,7 +49,25 @@ import java.util.concurrent.TimeUnit; /** - * Created by amidani on 04/05/2017. + * The main Camel context manager for the Unomi Router component. + * This class manages the lifecycle of all import and export routes, + * handles route configuration updates, and maintains the Camel context. + * + *

Features: + *

    + *
  • Initializes and manages the Camel context
  • + *
  • Sets up import and export routes
  • + *
  • Handles route configuration updates
  • + *
  • Manages route lifecycle (start/stop/update)
  • + *
  • Supports Kafka ({@link RouterConstants#CONFIG_TYPE_KAFKA}) and in-process + * {@code direct:} endpoints when configured as {@link RouterConstants#CONFIG_TYPE_NOBROKER}
  • + *
+ *

+ * + *

Dependency-injection setters on this class are intended for OSGi/Blueprint wiring and are not part of the + * {@link IRouterCamelContext} API surface.

+ * + * @since 1.0 */ public class RouterCamelContext implements IRouterCamelContext { @@ -79,8 +97,11 @@ public class RouterCamelContext implements IRouterCamelContext { private Integer configsRefreshInterval = 1000; private ScheduledFuture scheduledFuture; + /** Reserved event topic identifier for future remove notifications (not published by the current implementation). */ public static String EVENT_ID_REMOVE = "org.apache.unomi.router.event.remove"; + /** Event topic related to import lifecycle (reserved for integrations). */ public static String EVENT_ID_IMPORT = "org.apache.unomi.router.event.import"; + /** Event topic related to export lifecycle (reserved for integrations). */ public static String EVENT_ID_EXPORT = "org.apache.unomi.router.event.export"; public void setExecHistorySize(String execHistorySize) { @@ -99,10 +120,17 @@ public void setConfigSharingService(ConfigSharingService configSharingService) { this.configSharingService = configSharingService; } + /** {@inheritDoc} */ + @Override public void setTracing(boolean tracing) { - camelContext.setTracing(true); + camelContext.setTracing(tracing); } + /** + * Initializes the scheduler, shared config properties, the Camel context, and import/export routes. + * + * @throws Exception if Camel or service setup fails + */ public void init() throws Exception { LOGGER.info("Initialize Camel Context..."); scheduler = Executors.newSingleThreadScheduledExecutor(); @@ -116,6 +144,11 @@ public void init() throws Exception { LOGGER.info("Camel Context initialized successfully."); } + /** + * Stops the configuration refresh scheduler and shuts down the Camel context (all routes and components). + * + * @throws Exception if Camel shutdown fails + */ public void destroy() throws Exception { scheduledFuture.cancel(true); if (scheduler != null) { @@ -223,6 +256,8 @@ private void initCamel() throws Exception { camelContext.start(); } + /** {@inheritDoc} */ + @Override public void killExistingRoute(String routeId, boolean fireEvent) throws Exception { //Active routes Route route = camelContext.getRoute(routeId); @@ -234,6 +269,8 @@ public void killExistingRoute(String routeId, boolean fireEvent) throws Exceptio } } + /** {@inheritDoc} */ + @Override public void updateProfileImportReaderRoute(String configId, boolean fireEvent) throws Exception { killExistingRoute(configId, false); @@ -255,6 +292,8 @@ public void updateProfileImportReaderRoute(String configId, boolean fireEvent) t } } + /** {@inheritDoc} */ + @Override public void updateProfileExportReaderRoute(String configId, boolean fireEvent) throws Exception { killExistingRoute(configId, false); @@ -275,6 +314,11 @@ public void updateProfileExportReaderRoute(String configId, boolean fireEvent) t } } + /** + * {@inheritDoc} + *

The concrete type is {@link org.apache.camel.CamelContext}; callers may narrow the reference safely.

+ */ + @Override public CamelContext getCamelContext() { return camelContext; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java index d9794a8de1..7f7a7e7767 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java @@ -30,14 +30,47 @@ import java.util.Map; /** - * Created by amidani on 29/06/2017. + * A Camel processor that handles the completion of profile export routes. + * This processor updates the export configuration with execution statistics + * and manages the execution history of export operations. + * + *

The processor performs the following operations: + *

    + *
  • Records export execution statistics
  • + *
  • Updates the export configuration status
  • + *
  • Maintains execution history within configured size limits
  • + *
  • Persists updated configuration information
  • + *
+ *

+ * + * @since 1.0 */ public class ExportRouteCompletionProcessor implements Processor { private static final Logger LOGGER = LoggerFactory.getLogger(ExportRouteCompletionProcessor.class.getName()); + + /** Service for managing export configurations */ private ImportExportConfigurationService exportConfigurationService; + + /** Maximum number of execution history entries to maintain */ private int executionsHistorySize; + /** + * Processes the completion of an export route by updating its configuration and statistics. + * + *

This method: + *

    + *
  • Loads the current export configuration
  • + *
  • Creates an execution entry with timestamp and statistics
  • + *
  • Updates the configuration with execution results
  • + *
  • Maintains the execution history size limit
  • + *
  • Updates the export status to complete
  • + *
+ *

+ * + * @param exchange the Camel exchange containing export execution details + * @throws Exception if an error occurs during processing + */ @Override public void process(Exchange exchange) throws Exception { // We load the conf from ES because we are going to increment the execution number @@ -59,10 +92,20 @@ public void process(Exchange exchange) throws Exception { LOGGER.info("Processing route {} completed.", exchange.getFromRouteId()); } + /** + * Sets the service used for managing export configurations. + * + * @param exportConfigurationService the service for handling export configurations + */ public void setExportConfigurationService(ImportExportConfigurationService exportConfigurationService) { this.exportConfigurationService = exportConfigurationService; } + /** + * Sets the maximum size of the execution history to maintain. + * + * @param executionsHistorySize the maximum number of execution entries to keep + */ public void setExecutionsHistorySize(int executionsHistorySize) { this.executionsHistorySize = executionsHistorySize; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java index 61c6ed4089..39e5a42d98 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java @@ -26,14 +26,48 @@ import org.slf4j.LoggerFactory; /** - * Created by amidani on 22/05/2017. + * A Camel processor that retrieves import configurations based on file names. + * This processor extracts the configuration ID from the filename and loads + * the corresponding import configuration for processing. + * + *

The processor expects filenames in the format: + *

configurationId.extension
+ * where the configurationId matches an existing import configuration.

+ * + *

Features: + *

    + *
  • Extracts configuration ID from filename
  • + *
  • Loads corresponding import configuration
  • + *
  • Sets configuration in exchange header for processing
  • + *
  • Handles missing configurations gracefully
  • + *
+ *

+ * + * @since 1.0 */ public class ImportConfigByFileNameProcessor implements Processor { private static final Logger LOGGER = LoggerFactory.getLogger(ImportConfigByFileNameProcessor.class.getName()); + /** Service for managing import configurations */ private ImportExportConfigurationService importConfigurationService; + /** + * Processes the exchange by loading an import configuration based on the filename. + * + *

This method: + *

    + *
  • Extracts the filename from the exchange body
  • + *
  • Parses the configuration ID from the filename
  • + *
  • Attempts to load the corresponding import configuration
  • + *
  • Sets the configuration in the exchange header if found
  • + *
  • Stops route processing if no configuration is found
  • + *
+ *

+ * + * @param exchange the Camel exchange containing the file to process + * @throws Exception if an error occurs during processing + */ @Override public void process(Exchange exchange) throws Exception { @@ -49,6 +83,11 @@ public void process(Exchange exchange) throws Exception { } } + /** + * Sets the service used for managing import configurations. + * + * @param importConfigurationService the service for handling import configurations + */ public void setImportConfigurationService(ImportExportConfigurationService importConfigurationService) { this.importConfigurationService = importConfigurationService; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java index 7554ab3fe2..e34d965b62 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java @@ -26,15 +26,51 @@ import java.util.*; /** - * Created by amidani on 14/06/2017. + * A Camel processor that handles the completion of profile import routes. + * This processor manages the final stage of import operations, collecting statistics, + * handling errors, and updating the import configuration with execution results. + * + *

The processor performs the following operations: + *

    + *
  • Collects import statistics persisted on the configuration (success and failure counts, plus error details)
  • + *
  • Manages error reporting with configurable limits
  • + *
  • Updates import configuration status
  • + *
  • Maintains execution history
  • + *
  • Handles both one-shot and recurring imports
  • + *
+ *

+ * + * @since 1.0 */ public class ImportRouteCompletionProcessor implements Processor { private static final Logger LOGGER = LoggerFactory.getLogger(ImportRouteCompletionProcessor.class.getName()); + + /** Service for managing import configurations */ private ImportExportConfigurationService importConfigurationService; + + /** Maximum number of execution history entries to maintain */ private int executionsHistorySize; + + /** Maximum number of errors to report per execution */ private int execErrReportSize; + /** + * Processes the completion of an import route by collecting statistics and updating configuration. + * + *

This method: + *

    + *
  • Identifies the import configuration (one-shot or recurring)
  • + *
  • Counts successful and failed imports (unrecognized line types are skipped and not persisted)
  • + *
  • Collects error information up to the configured limit
  • + *
  • Updates the import configuration with execution results
  • + *
  • Sets the final status based on success/failure counts
  • + *
+ *

+ * + * @param exchange the Camel exchange containing import results + * @throws Exception if an error occurs during processing + */ @Override public void process(Exchange exchange) throws Exception { String importConfigId = null; @@ -89,14 +125,29 @@ public void process(Exchange exchange) throws Exception { LOGGER.info("Processing route {} completed. completion date: {}.", exchange.getFromRouteId(), new Date()); } + /** + * Sets the service used for managing import configurations. + * + * @param importConfigurationService the service for handling import configurations + */ public void setImportConfigurationService(ImportExportConfigurationService importConfigurationService) { this.importConfigurationService = importConfigurationService; } + /** + * Sets the maximum size of the execution history to maintain. + * + * @param executionsHistorySize the maximum number of execution entries to keep + */ public void setExecutionsHistorySize(int executionsHistorySize) { this.executionsHistorySize = executionsHistorySize; } + /** + * Sets the maximum number of errors to report per execution. + * + * @param execErrReportSize the maximum number of errors to store per execution + */ public void setExecErrReportSize(int execErrReportSize) { this.execErrReportSize = execErrReportSize; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java index ecfab189f8..f1748cc0af 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java @@ -23,16 +23,43 @@ import org.apache.unomi.router.api.services.ProfileExportService; /** - * Created by amidani on 28/06/2017. + * A Camel processor that converts Unomi Profile objects into CSV lines for export. + * This processor is responsible for transforming profile data into a formatted string + * according to the export configuration specified in the exchange header. + * + *

The processor works in conjunction with the ProfileExportService to perform + * the actual conversion of profile data to CSV format.

+ * + * @since 1.0 */ public class LineBuildProcessor implements Processor { private ProfileExportService profileExportService; + /** + * Constructs a new LineBuildProcessor with the specified ProfileExportService. + * + * @param profileExportService the service responsible for converting profiles to CSV format + */ public LineBuildProcessor(ProfileExportService profileExportService) { this.profileExportService = profileExportService; } + /** + * Processes the exchange by converting a Profile object into a CSV line. + * + *

This method: + *

    + *
  • Extracts the export configuration from the exchange header
  • + *
  • Gets the Profile object from the exchange body
  • + *
  • Converts the profile to a CSV line using the ProfileExportService
  • + *
  • Sets the resulting string as the new exchange body
  • + *
+ *

+ * + * @param exchange the Camel exchange containing the Profile to convert and export configuration + * @throws Exception if an error occurs during processing + */ @Override public void process(Exchange exchange) throws Exception { ExportConfiguration exportConfiguration = (ExportConfiguration) exchange.getIn().getHeader("exportConfig"); diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java index 3cf8ff6b62..2f4a5950de 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java @@ -25,12 +25,47 @@ import org.slf4j.LoggerFactory; /** - * Created by amidani on 14/06/2017. + * A Camel processor that handles failures during the line splitting process of data import. + * This processor is responsible for creating structured error reports when lines fail to process, + * providing detailed information about the nature of the failure and the problematic data. + * + *

The handler processes different types of exceptions: + *

    + *
  • BadProfileDataFormatException - for data format related errors
  • + *
  • General exceptions - capturing the root cause message
  • + *
+ *

+ * + *

For each failure, it creates an ImportLineError object containing: + *

    + *
  • The error code or message
  • + *
  • The content of the failed line
  • + *
  • The line number in the source file
  • + *
+ *

+ * + * @since 1.0 */ public class LineSplitFailureHandler implements Processor { private static final Logger LOGGER = LoggerFactory.getLogger(LineSplitFailureHandler.class.getName()); + /** + * Processes failures that occur during line splitting and creates structured error reports. + * + *

This method: + *

    + *
  • Logs the failure details including the route ID and exception
  • + *
  • Creates an ImportLineError object with detailed error information
  • + *
  • Extracts the appropriate error message based on the exception type
  • + *
  • Sets the failure information in the exchange for further processing
  • + *
+ *

+ * + * @param exchange the Camel exchange containing the failed message and exception details + * @throws Exception if an error occurs during failure handling + */ + @Override public void process(Exchange exchange) throws Exception { LOGGER.error("Route: {}, Error: {}", exchange.getProperty(Exchange.FAILURE_ROUTE_ID), exchange.getProperty(Exchange.EXCEPTION_CAUGHT)); ImportLineError importLineError = new ImportLineError(); diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java index 265fec8f58..e93d556374 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java @@ -35,25 +35,77 @@ import java.util.*; /** - * Created by amidani on 29/12/2016. + * A Camel processor that splits and processes CSV lines into ProfileToImport objects. + * This processor handles the conversion of CSV data into structured profile data according + * to the import configuration, supporting various data types and multi-value fields. + * + *

Features include: + *

    + *
  • CSV parsing using RFC4180 standard
  • + *
  • Support for header rows
  • + *
  • Field mapping to profile properties
  • + *
  • Multi-value field handling
  • + *
  • Type conversion based on property definitions
  • + *
  • Profile merging configuration
  • + *
  • Delete operation support
  • + *
+ *

+ * + * @since 1.0 */ public class LineSplitProcessor implements Processor { private static final Logger LOGGER = LoggerFactory.getLogger(LineSplitProcessor.class.getName()); + /** Maps field names to their corresponding column indices */ private Map fieldsMapping; + + /** List of properties that should be overwritten during import */ private List propertiesToOverwrite; + + /** Property used for merging profiles */ private String mergingProperty; + + /** Whether to overwrite existing profiles during import */ private boolean overwriteExistingProfiles; + + /** Whether the CSV file contains a header row */ private boolean hasHeader; + + /** Whether the CSV file contains a column for delete operations */ private boolean hasDeleteColumn; + + /** Character used to separate columns in the CSV */ private String columnSeparator; + /** Character used to separate multiple values within a field */ private String multiValueSeparator; + + /** Characters used to delimit multi-value fields */ private String multiValueDelimiter; + /** Collection of property types used for type conversion */ private Collection profilePropertyTypes; + /** + * Processes a single line from a CSV file and converts it into a ProfileToImport object. + * + *

The method performs the following operations: + *

    + *
  • Handles one-shot import configurations if present
  • + *
  • Skips header row if configured
  • + *
  • Parses CSV line using RFC4180 standard
  • + *
  • Validates field mapping against data
  • + *
  • Converts fields according to their property types
  • + *
  • Handles multi-value fields
  • + *
  • Sets up profile merging configuration
  • + *
  • Processes delete operations if configured
  • + *
+ *

+ * + * @param exchange the Camel exchange containing the CSV line to process + * @throws Exception if an error occurs during processing, including BadProfileDataFormatException + */ @Override public void process(Exchange exchange) throws Exception { diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java index 94737b50fb..3caadc8789 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java @@ -28,13 +28,43 @@ import java.util.Set; /** - * Created by amidani on 29/12/2016. + * A Camel processor that handles the storage of imported profiles in the Unomi system. + * This processor is responsible for managing the final stage of profile import, including + * segment calculation and profile persistence. + * + *

The processor performs the following operations: + *

    + *
  • Processes profiles marked for import
  • + *
  • Calculates segments and scores for non-deleted profiles
  • + *
  • Updates profile information with calculated segments
  • + *
  • Persists profiles in the Unomi storage system
  • + *
+ *

+ * + * @since 1.0 */ public class UnomiStorageProcessor implements Processor { + /** Service for handling profile import operations */ private ProfileImportService profileImportService; + + /** Service for managing profile segments and scoring */ private SegmentService segmentService; + /** + * Processes the exchange by storing or updating the profile in Unomi's storage system. + * + *

This method: + *

    + *
  • Extracts the ProfileToImport from the message body
  • + *
  • For non-delete operations, calculates and updates segments and scores
  • + *
  • Persists the profile using the ProfileImportService
  • + *
+ *

+ * + * @param exchange the Camel exchange containing the profile to process + * @throws Exception if an error occurs during processing + */ @Override public void process(Exchange exchange) throws Exception { @@ -59,10 +89,20 @@ public void process(Exchange exchange) } } + /** + * Sets the profile import service used for persisting profiles. + * + * @param profileImportService the service responsible for profile import operations + */ public void setProfileImportService(ProfileImportService profileImportService) { this.profileImportService = profileImportService; } + /** + * Sets the segment service used for calculating profile segments and scores. + * + * @param segmentService the service responsible for segment calculations + */ public void setSegmentService(SegmentService segmentService) { this.segmentService = segmentService; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java index 5529c109b8..9a7a351b86 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java @@ -31,19 +31,58 @@ import java.util.Map; /** - * Created by amidani on 27/06/2017. + * A Camel route builder that handles the collection of profiles for export. + * This route builder creates routes that periodically collect profiles based on + * segment criteria and prepare them for export processing. + * + *

Features: + *

    + *
  • Timer-based profile collection
  • + *
  • Segment-based profile filtering
  • + *
  • Support for multiple export configurations
  • + *
  • Configurable collection intervals
  • + *
  • Security through endpoint allowlist
  • + *
  • Support for Kafka and in-process {@code direct:} endpoints ({@link RouterConstants#CONFIG_TYPE_KAFKA} / {@link RouterConstants#CONFIG_TYPE_NOBROKER})
  • + *
+ *

+ * + * @since 1.0 */ public class ProfileExportCollectRouteBuilder extends RouterAbstractRouteBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(ProfileExportCollectRouteBuilder.class); + /** List of export configurations to process */ private List exportConfigurationList; + + /** Service for persisting and retrieving data */ private PersistenceService persistenceService; + /** + * Constructs a new route builder with Kafka configuration. + * + * @param kafkaProps map containing Kafka configuration properties + * @param configType {@link RouterConstants#CONFIG_TYPE_KAFKA} or {@link RouterConstants#CONFIG_TYPE_NOBROKER} + */ public ProfileExportCollectRouteBuilder(Map kafkaProps, String configType) { super(kafkaProps, configType); } + /** + * Configures the routes for collecting profiles to export. + * Creates a route for each export configuration that matches the criteria. + * + *

Each route: + *

    + *
  • Runs on a configured timer schedule
  • + *
  • Collects profiles based on segment criteria
  • + *
  • Processes profiles for export
  • + *
  • Routes data to appropriate endpoints
  • + *
+ *

+ * + * @throws Exception if an error occurs during route configuration + */ @Override public void configure() throws Exception { if (exportConfigurationList == null || exportConfigurationList.isEmpty()) { @@ -93,10 +132,20 @@ public void configure() throws Exception { } } + /** + * Sets the list of export configurations to process. + * + * @param exportConfigurationList list of export configurations + */ public void setExportConfigurationList(List exportConfigurationList) { this.exportConfigurationList = exportConfigurationList; } + /** + * Sets the persistence service for data operations. + * + * @param persistenceService service for persisting and retrieving data + */ public void setPersistenceService(PersistenceService persistenceService) { this.persistenceService = persistenceService; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java index e11378590c..89619b7da9 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java @@ -29,24 +29,67 @@ import java.util.Map; /** - * Created by amidani on 28/06/2017. + * A Camel route builder that handles the production of export data from collected profiles. + * This route builder creates routes that process collected profiles and formats them + * for export to the configured destination. + * + *

Features: + *

    + *
  • Profile data transformation to export format
  • + *
  • Line-by-line processing with aggregation
  • + *
  • Support for multiple export destinations
  • + *
  • Completion handling and status updates
  • + *
  • Support for Kafka and in-process {@code direct:} endpoints ({@link RouterConstants#CONFIG_TYPE_KAFKA} / {@link RouterConstants#CONFIG_TYPE_NOBROKER})
  • + *
+ *

+ * + * @since 1.0 */ public class ProfileExportProducerRouteBuilder extends RouterAbstractRouteBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(ProfileExportProducerRouteBuilder.class); + /** Processor for handling export completion */ private ExportRouteCompletionProcessor exportRouteCompletionProcessor; + /** Service for profile export operations */ private ProfileExportService profileExportService; + /** + * Constructs a new route builder with Kafka configuration. + * + * @param kafkaProps map containing Kafka configuration properties + * @param configType {@link RouterConstants#CONFIG_TYPE_KAFKA} or {@link RouterConstants#CONFIG_TYPE_NOBROKER} + */ public ProfileExportProducerRouteBuilder(Map kafkaProps, String configType) { super(kafkaProps, configType); } + /** + * Sets the profile export service. + * + * @param profileExportService service for handling profile exports + */ public void setProfileExportService(ProfileExportService profileExportService) { this.profileExportService = profileExportService; } + /** + * Configures the routes for producing export data. + * Creates a route that processes collected profiles and prepares them for export. + * + *

The route: + *

    + *
  • Unmarshals incoming profile data
  • + *
  • Processes profiles into export format
  • + *
  • Aggregates lines for batch processing
  • + *
  • Handles export completion
  • + *
  • Routes data to configured destinations
  • + *
+ *

+ * + * @throws Exception if an error occurs during route configuration + */ @Override public void configure() throws Exception { @@ -69,6 +112,11 @@ public void configure() throws Exception { } + /** + * Sets the processor for handling export completion. + * + * @param exportRouteCompletionProcessor processor for export completion handling + */ public void setExportRouteCompletionProcessor(ExportRouteCompletionProcessor exportRouteCompletionProcessor) { this.exportRouteCompletionProcessor = exportRouteCompletionProcessor; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java index 1ebc5c3884..2b24fdbf83 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java @@ -37,20 +37,59 @@ import java.util.Map; /** - * Created by amidani on 26/04/2017. + * A Camel route builder that handles the import of profiles from configured sources. + * This route builder creates routes that process incoming profile data from various + * sources and prepares it for import into Unomi. + * + *

Features: + *

    + *
  • Support for multiple import configurations
  • + *
  • Line-by-line processing of import data
  • + *
  • Error handling and failure reporting
  • + *
  • Configuration validation and status updates
  • + *
  • Support for Kafka and in-process {@code direct:} endpoints ({@link RouterConstants#CONFIG_TYPE_KAFKA} / {@link RouterConstants#CONFIG_TYPE_NOBROKER})
  • + *
  • Graceful shutdown handling
  • + *
+ *

+ * + * @since 1.0 */ - public class ProfileImportFromSourceRouteBuilder extends RouterAbstractRouteBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(ProfileImportFromSourceRouteBuilder.class.getName()); + /** List of import configurations to process */ private List importConfigurationList; + + /** Service for managing import configurations */ private ImportExportConfigurationService importConfigurationService; + /** + * Constructs a new route builder with Kafka configuration. + * + * @param kafkaProps map containing Kafka configuration properties + * @param configType {@link RouterConstants#CONFIG_TYPE_KAFKA} or {@link RouterConstants#CONFIG_TYPE_NOBROKER} + */ public ProfileImportFromSourceRouteBuilder(Map kafkaProps, String configType) { super(kafkaProps, configType); } + /** + * Configures the routes for importing profiles from sources. + * Creates routes for each import configuration and sets up error handling. + * + *

The routes: + *

    + *
  • Handle data validation and format errors
  • + *
  • Process data line by line
  • + *
  • Update import status and progress
  • + *
  • Route processed data to appropriate endpoints
  • + *
  • Manage graceful completion of imports
  • + *
+ *

+ * + * @throws Exception if an error occurs during route configuration + */ @Override public void configure() throws Exception { @@ -132,10 +171,20 @@ public void process(Exchange exchange) throws Exception { } } + /** + * Sets the list of import configurations to process. + * + * @param importConfigurationList list of import configurations + */ public void setImportConfigurationList(List importConfigurationList) { this.importConfigurationList = importConfigurationList; } + /** + * Sets the service for managing import configurations. + * + * @param importConfigurationService service for handling import configurations + */ public void setImportConfigurationService(ImportExportConfigurationService importConfigurationService) { this.importConfigurationService = importConfigurationService; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java index 863437032e..fd9d6c001e 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java @@ -30,18 +30,59 @@ import java.util.Map; /** - * Created by amidani on 22/05/2017. + * A Camel route builder that handles one-time profile imports from files. + * This route builder creates routes that process CSV files dropped into a + * monitored directory for one-time import operations. + * + *

Features: + *

    + *
  • File-based import processing
  • + *
  • Configuration lookup from filename
  • + *
  • CSV file processing with error handling
  • + *
  • Support for Kafka and in-process {@code direct:} endpoints ({@link RouterConstants#CONFIG_TYPE_KAFKA} / {@link RouterConstants#CONFIG_TYPE_NOBROKER})
  • + *
  • Automatic file movement after processing
  • + *
  • Error reporting and failed file handling
  • + *
+ *

+ * + * @since 1.0 */ public class ProfileImportOneShotRouteBuilder extends RouterAbstractRouteBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(ProfileImportOneShotRouteBuilder.class.getName()); + + /** Processor for extracting import configuration from filenames */ private ImportConfigByFileNameProcessor importConfigByFileNameProcessor; + + /** Directory to monitor for import files */ private String uploadDir; + /** + * Constructs a new route builder with Kafka configuration. + * + * @param kafkaProps map containing Kafka configuration properties + * @param configType {@link RouterConstants#CONFIG_TYPE_KAFKA} or {@link RouterConstants#CONFIG_TYPE_NOBROKER} + */ public ProfileImportOneShotRouteBuilder(Map kafkaProps, String configType) { super(kafkaProps, configType); } + /** + * Configures the route for one-shot profile imports. + * Creates a route that monitors a directory for CSV files and processes them for import. + * + *

The route: + *

    + *
  • Monitors upload directory for CSV files
  • + *
  • Extracts configuration from filename
  • + *
  • Processes file contents line by line
  • + *
  • Handles validation and format errors
  • + *
  • Routes processed data to appropriate endpoints
  • + *
+ *

+ * + * @throws Exception if an error occurs during route configuration + */ @Override public void configure() throws Exception { @@ -81,10 +122,20 @@ public void configure() throws Exception { } } + /** + * Sets the processor for handling import configuration by filename. + * + * @param importConfigByFileNameProcessor processor for filename-based configuration + */ public void setImportConfigByFileNameProcessor(ImportConfigByFileNameProcessor importConfigByFileNameProcessor) { this.importConfigByFileNameProcessor = importConfigByFileNameProcessor; } + /** + * Sets the directory to monitor for import files. + * + * @param uploadDir path to the directory to monitor + */ public void setUploadDir(String uploadDir) { this.uploadDir = uploadDir; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java index ff4942c75a..7f31e57bbb 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java @@ -29,19 +29,56 @@ import java.util.Map; /** - * Created by amidani on 26/04/2017. + * A Camel route builder that handles the final stage of profile imports by storing + * processed profile data into Apache Unomi's storage system. + * + *

Features: + *

    + *
  • Final processing of imported profiles
  • + *
  • Integration with Unomi's storage system
  • + *
  • Support for Kafka and in-process {@code direct:} endpoints ({@link RouterConstants#CONFIG_TYPE_KAFKA} / {@link RouterConstants#CONFIG_TYPE_NOBROKER})
  • + *
  • Import completion handling
  • + *
  • Error handling and reporting
  • + *
+ *

+ * + * @since 1.0 */ public class ProfileImportToUnomiRouteBuilder extends RouterAbstractRouteBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(ProfileImportToUnomiRouteBuilder.class.getName()); + /** Processor for storing profiles in Unomi */ private UnomiStorageProcessor unomiStorageProcessor; + + /** Processor for handling import completion */ private ImportRouteCompletionProcessor importRouteCompletionProcessor; + /** + * Constructs a new route builder with Kafka configuration. + * + * @param kafkaProps map containing Kafka configuration properties + * @param configType {@link RouterConstants#CONFIG_TYPE_KAFKA} or {@link RouterConstants#CONFIG_TYPE_NOBROKER} + */ public ProfileImportToUnomiRouteBuilder(Map kafkaProps, String configType) { super(kafkaProps, configType); } + /** + * Configures the route for storing imported profiles in Unomi. + * Creates a route that processes incoming profile data and stores it in Unomi's storage system. + * + *

The route: + *

    + *
  • Receives processed profile data
  • + *
  • Stores profiles in Unomi's storage system
  • + *
  • Handles import completion
  • + *
  • Manages error reporting
  • + *
+ *

+ * + * @throws Exception if an error occurs during route configuration + */ @Override public void configure() throws Exception { @@ -67,10 +104,20 @@ public void configure() throws Exception { .to("log:org.apache.unomi.router?level=DEBUG"); } + /** + * Sets the processor for storing profiles in Unomi. + * + * @param unomiStorageProcessor processor for Unomi storage operations + */ public void setUnomiStorageProcessor(UnomiStorageProcessor unomiStorageProcessor) { this.unomiStorageProcessor = unomiStorageProcessor; } + /** + * Sets the processor for handling import completion. + * + * @param importRouteCompletionProcessor processor for import completion operations + */ public void setImportRouteCompletionProcessor(ImportRouteCompletionProcessor importRouteCompletionProcessor) { this.importRouteCompletionProcessor = importRouteCompletionProcessor; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java index ad06a00ecb..69586990fa 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java @@ -28,26 +28,67 @@ import java.util.Map; /** - * Created by amidani on 13/06/2017. + * Abstract base class for all Unomi router route builders. + * This class provides common functionality and configuration for both import + * and export routes, supporting Kafka ({@link RouterConstants#CONFIG_TYPE_KAFKA}) and in-process + * {@code direct:} buffer endpoints when configured as {@link RouterConstants#CONFIG_TYPE_NOBROKER}. + * + *

Features: + *

    + *
  • Common Kafka configuration handling
  • + *
  • Endpoint URI generation for Kafka topics or in-vm {@code direct:} buffers
  • + *
  • Shared configuration for JSON data format
  • + *
  • Profile service integration
  • + *
  • Endpoint security through allowlist
  • + *
+ *

+ * + * @since 1.0 */ public abstract class RouterAbstractRouteBuilder extends RouteBuilder { + /** JSON data format configuration */ protected JacksonDataFormat jacksonDataFormat; + /** Kafka broker host */ protected String kafkaHost; + + /** Kafka broker port */ protected String kafkaPort; + + /** Topic for import operations */ protected String kafkaImportTopic; + + /** Topic for export operations */ protected String kafkaExportTopic; + + /** Consumer group ID for import operations */ protected String kafkaImportGroupId; + + /** Consumer group ID for export operations */ protected String kafkaExportGroupId; + + /** Number of Kafka consumers */ protected String kafkaConsumerCount; + + /** Auto-commit configuration for Kafka */ protected String kafkaAutoCommit; + /** Router transport mode ({@link RouterConstants#CONFIG_TYPE_KAFKA} or {@link RouterConstants#CONFIG_TYPE_NOBROKER}) */ protected String configType; + + /** List of allowed endpoint schemes */ protected String allowedEndpoints; + /** Service for profile operations */ protected ProfileService profileService; + /** + * Constructs a new route builder with Kafka configuration. + * + * @param kafkaProps map containing Kafka configuration properties + * @param configType {@link RouterConstants#CONFIG_TYPE_KAFKA} or {@link RouterConstants#CONFIG_TYPE_NOBROKER} + */ public RouterAbstractRouteBuilder(Map kafkaProps, String configType) { this.kafkaHost = kafkaProps.get("kafkaHost"); this.kafkaPort = kafkaProps.get("kafkaPort"); @@ -60,6 +101,21 @@ public RouterAbstractRouteBuilder(Map kafkaProps, String configT this.configType = configType; } + /** + * Gets the appropriate endpoint URI based on configuration type and operation. + * + *

This method: + *

    + *
  • Creates Kafka endpoints with appropriate configuration when using Kafka
  • + *
  • Returns direct endpoint URIs when not using Kafka
  • + *
  • Configures consumer properties for incoming endpoints
  • + *
+ *

+ * + * @param direction the direction of the endpoint (to/from) + * @param operationDepositBuffer the operation buffer identifier + * @return Object either a KafkaEndpoint or String depending on configuration + */ public Object getEndpointURI(String direction, String operationDepositBuffer) { Object endpoint; if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { @@ -91,14 +147,29 @@ public Object getEndpointURI(String direction, String operationDepositBuffer) { return endpoint; } + /** + * Sets the JSON data format configuration. + * + * @param jacksonDataFormat the JSON data format to use + */ public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) { this.jacksonDataFormat = jacksonDataFormat; } + /** + * Sets the list of allowed endpoint schemes. + * + * @param allowedEndpoints comma-separated list of allowed endpoint schemes + */ public void setAllowedEndpoints(String allowedEndpoints) { this.allowedEndpoints = allowedEndpoints; } + /** + * Sets the profile service. + * + * @param profileService the service for profile operations + */ public void setProfileService(ProfileService profileService) { this.profileService = profileService; } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java index ca87ad3bd3..c113e3aebd 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java @@ -22,11 +22,40 @@ import java.util.ArrayList; /** - * Created by amidani on 16/06/2017. + * An implementation of Camel's AggregationStrategy that aggregates exchange bodies into an ArrayList. + * This strategy is useful when you need to collect multiple messages into a single list for batch processing + * or grouped operations within the Unomi Router. + * + *

The strategy maintains the following behavior: + *

    + *
  • For the first message (when oldExchange is null), it creates a new ArrayList and adds the message body to it
  • + *
  • For subsequent messages, it adds the new message body to the existing ArrayList
  • + *
+ *

+ * + *

The ArrayList is maintained in the exchange body, allowing for easy access to all aggregated items + * once the aggregation is complete.

+ * + * @since 1.0 */ public class ArrayListAggregationStrategy implements AggregationStrategy { - + /** + * Aggregates exchange messages by collecting their bodies into an ArrayList. + * + *

This method implements the core aggregation logic where: + *

    + *
  • The new exchange's body is extracted as is (maintaining its original type)
  • + *
  • If this is the first message, a new ArrayList is created to store the messages
  • + *
  • The new body is added to the ArrayList
  • + *
  • The ArrayList is maintained in the exchange body for subsequent aggregations
  • + *
+ *

+ * + * @param oldExchange the previous exchange being aggregated (may be null on first invocation) + * @param newExchange the current exchange being aggregated (contains the new item to add to the list) + * @return the aggregated exchange containing the ArrayList of all aggregated items + */ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Object newBody = newExchange.getIn().getBody(); ArrayList list = null; diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java index d01859f226..8ccabe6871 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java @@ -22,10 +22,40 @@ import org.apache.unomi.router.api.RouterUtils; /** - * Created by amidani on 29/06/2017. + * An implementation of Camel's AggregationStrategy that combines multiple text lines into a single string + * for export purposes. This strategy is specifically designed to work with the Unomi Router's export functionality, + * where multiple data lines need to be aggregated into a single export file. + * + *

The strategy maintains the following behavior: + *

    + *
  • For the first message (when oldExchange is null), it simply returns the new exchange
  • + *
  • For subsequent messages, it appends the new content to the existing content using the configured line separator
  • + *
+ *

+ * + *

The line separator used for aggregation is obtained from the ExportConfiguration object + * stored in the exchange header under the key "exportConfig".

+ * + * @since 1.0 */ public class StringLinesAggregationStrategy implements AggregationStrategy { + /** + * Aggregates two exchanges by combining their body content with appropriate line separation. + * + *

This method implements the core aggregation logic where: + *

    + *
  • The new exchange's body is extracted as a String
  • + *
  • The line separator is obtained from the export configuration in the exchange header
  • + *
  • If there's an old exchange, the new content is appended to it with the line separator
  • + *
  • If there's no old exchange, the new exchange is returned as is
  • + *
+ *

+ * + * @param oldExchange the previous exchange being aggregated (may be null on first invocation) + * @param newExchange the current exchange being aggregated (contains the new line to append) + * @return the aggregated exchange containing the combined content + */ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Object newBody = newExchange.getIn().getBody(String.class); String lineSeparator = newExchange.getIn().getHeader("exportConfig", ExportConfiguration.class).getLineSeparator();