Skip to content

[refactor] Dynamic/streaming scene processing needs improvement. #660

@kaori-seasons

Description

@kaori-seasons

Description:

UnAlignedComputeWorke need support dynamic/streaming scene processing needs improvement.

Proposed Solution:

I. Problem

1.1 Core Issue

Issue 1: Incomplete Window Initialization Logic

Current init() method has the following problems:

  • Condition check processingWindowIdQueue.isEmpty() && windowId <= context.getWindowId() is overly restrictive
  • In dynamic graph scenarios, windows may arrive out of order, which this logic cannot handle correctly
  • Missing protection mechanism against duplicate initialization
  • Does not consider continuous window processing in streaming scenarios

Issue 2: Forced Alignment Processing in LoadGraphProcessEvent

LoadGraphProcessEvent is forced to use alignment processing mode, which severely limits the performance advantages of unaligned workers in dynamic graph loading scenarios.

Issue 3: Window Queue Management Defects

In the finishBarrier() method:

  • Queue poll operation may fail due to timeout
  • Window ID validation range [currentWindowId - 1, currentWindowId] may not be flexible enough for streaming scenarios
  • Missing queue recovery mechanism in exception cases

Issue 4: Finish Method Lacks State Synchronization

The finish method directly calls the processor's finish without:

  • Checking if queue state is consistent
  • Verifying if windows have truly completed processing
  • Handling potential race conditions

II. Detailed Improvement Plan

2.1 Improved UnAlignedComputeWorker Class

Solution 1: Enhanced Window Initialization Management

Improvement Goals:

  • Support out-of-order window arrival
  • Add window state tracking
  • Prevent duplicate initialization
  • Support streaming continuous window processing

Implementation Plan:

public class UnAlignedComputeWorker<T, R> extends AbstractUnAlignedWorker<T, R> {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(UnAlignedComputeWorker.class);
    
    // Added: Window state management
    private enum WindowState {
        PENDING,      // Waiting for initialization
        INITIALIZED,  // Initialized
        PROCESSING,   // Processing
        FINISHED      // Completed
    }
    
    // Added: Window state tracking map
    private final Map<Long, WindowState> windowStateMap = new ConcurrentHashMap<>();
    
    // Added: Initialized window set
    private final Set<Long> initializedWindows = ConcurrentHashMap.newKeySet();
    
    // Added: Configuration parameters
    private int maxPendingWindows = 10; // Maximum pending windows
    private boolean allowOutOfOrderWindows = true; // Allow out-of-order windows
    
    @Override
    public void init(long windowId) {
        // Check if window is already initialized
        if (initializedWindows.contains(windowId)) {
            LOGGER.warn("taskId {} windowId {} already initialized, skipping", 
                context.getTaskId(), windowId);
            return;
        }
        
        // Check queue capacity
        if (processingWindowIdQueue.size() >= maxPendingWindows) {
            LOGGER.warn("taskId {} processing queue is full, size: {}, windowId: {}", 
                context.getTaskId(), processingWindowIdQueue.size(), windowId);
            // Optional: Block waiting or throw exception
            throw new GeaflowRuntimeException("Processing window queue is full");
        }
        
        long currentWindowId = context.getWindowId();
        
        // Dynamic/streaming scenario improvement logic
        if (allowOutOfOrderWindows) {
            // Allow out-of-order: process as long as window is not initialized
            if (windowId < currentWindowId - maxPendingWindows) {
                LOGGER.error("taskId {} windowId {} is too old, current: {}", 
                    context.getTaskId(), windowId, currentWindowId);
                throw new GeaflowRuntimeException("Window ID too old: " + windowId);
            }
            
            // Update window state
            windowStateMap.put(windowId, WindowState.INITIALIZED);
            initializedWindows.add(windowId);
            
            // Call parent initialization (only when queue is empty and windowId is ordered)
            if (processingWindowIdQueue.isEmpty() && windowId <= currentWindowId) {
                super.init(windowId);
            }
            
            // Add to processing queue
            processingWindowIdQueue.add(windowId);
            
            LOGGER.info("taskId {} init windowId {} (out-of-order mode), current: {}, queue size: {}", 
                context.getTaskId(), windowId, currentWindowId, processingWindowIdQueue.size());
        } else {
            // Strict ordering mode (original logic)
            if (processingWindowIdQueue.isEmpty() && windowId <= currentWindowId) {
                super.init(windowId);
                windowStateMap.put(windowId, WindowState.INITIALIZED);
                initializedWindows.add(windowId);
            }
            processingWindowIdQueue.add(windowId);
            
            LOGGER.info("taskId {} init windowId {} (ordered mode), current: {}, queue size: {}", 
                context.getTaskId(), windowId, currentWindowId, processingWindowIdQueue.size());
        }
    }
    
    @Override
    public void finish(long windowId) {
        LOGGER.info("taskId {} finishing windowId {}, currentBatchId {}, real currentBatchId {}, queue size: {}",
            context.getTaskId(), windowId, windowId, context.getCurrentWindowId(), 
            processingWindowIdQueue.size());
        
        // Check window state
        WindowState state = windowStateMap.get(windowId);
        if (state == null) {
            LOGGER.warn("taskId {} windowId {} has no state record", context.getTaskId(), windowId);
        } else if (state == WindowState.FINISHED) {
            LOGGER.warn("taskId {} windowId {} already finished", context.getTaskId(), windowId);
            return;
        }
        
        // Update state to finished
        windowStateMap.put(windowId, WindowState.FINISHED);
        
        // Call processor finish
        context.getProcessor().finish(windowId);
        
        // Complete window processing
        finishWindow(windowId);
        
        // Clean up old window states (prevent memory leaks)
        cleanupOldWindows(windowId);
    }
    
    // Added: Clean up old window states
    private void cleanupOldWindows(long currentWindowId) {
        // Retain recent window states, clean up old ones
        long threshold = currentWindowId - maxPendingWindows * 2;
        windowStateMap.keySet().removeIf(wid -> wid < threshold);
        initializedWindows.removeIf(wid -> wid < threshold);
        
        LOGGER.debug("taskId {} cleaned up windows before {}, remaining states: {}", 
            context.getTaskId(), threshold, windowStateMap.size());
    }
    
    @Override
    public WorkerType getWorkerType() {
        return WorkerType.unaligned_compute;
    }
    
    // Added: Configuration methods
    public void setMaxPendingWindows(int maxPendingWindows) {
        this.maxPendingWindows = maxPendingWindows;
    }
    
    public void setAllowOutOfOrderWindows(boolean allowOutOfOrderWindows) {
        this.allowOutOfOrderWindows = allowOutOfOrderWindows;
    }
}

2.2 Improve AbstractUnAlignedWorker Base Class

Solution 2: Optimize LoadGraphProcessEvent Handling

Improvement Goals:

  • Support unaligned processing of LoadGraphProcessEvent
  • Maintain data consistency
  • Improve dynamic graph loading performance

Implementation Plan:

Add new methods in AbstractUnAlignedWorker:

/**
 * Process graph loading events in unaligned manner
 * Ensure consistency through phased checkpoints
 */
public void processLoadGraphUnaligned(long fetchCount) {
    LOGGER.info("taskId {} start unaligned graph loading, fetchCount: {}", 
        context.getTaskId(), fetchCount);
    
    // Phase 1: Asynchronously load vertex and edge data
    long processedCount = 0;
    Set<Long> receivedWindowIds = new HashSet<>();
    
    while (processedCount < fetchCount && running) {
        try {
            InputMessage input = inputReader.poll(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            if (input != null) {
                long windowId = input.getWindowId();
                receivedWindowIds.add(windowId);
                
                if (input.getMessage() != null) {
                    PipelineMessage message = input.getMessage();
                    processMessage(windowId, message);
                    processedCount++;
                } else {
                    // Encounter barrier, record but continue processing
                    long totalCount = input.getWindowCount();
                    LOGGER.debug("taskId {} received barrier for windowId {}, totalCount: {}", 
                        context.getTaskId(), windowId, totalCount);
                }
            }
        } catch (Throwable t) {
            if (running) {
                LOGGER.error("Error during unaligned graph loading", t);
                throw new GeaflowRuntimeException(t);
            }
        }
    }
    
    // Phase 2: Wait for barriers of all windows to arrive (ensure consistency)
    LOGGER.info("taskId {} graph data loaded, waiting for barriers, windows: {}", 
        context.getTaskId(), receivedWindowIds);
    
    for (Long windowId : receivedWindowIds) {
        // Wait for barrier of each window
        waitForWindowBarrier(windowId);
    }
    
    LOGGER.info("taskId {} completed unaligned graph loading", context.getTaskId());
}

private void waitForWindowBarrier(long windowId) {
    // Wait for barrier of specific window to arrive
    long startTime = System.currentTimeMillis();
    long timeout = 30000; // 30-second timeout
    
    while (System.currentTimeMillis() - startTime < timeout) {
        try {
            InputMessage input = inputReader.poll(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            if (input != null && input.getMessage() == null && input.getWindowId() == windowId) {
                // Received barrier of target window
                long totalCount = input.getWindowCount();
                processBarrier(windowId, totalCount);
                return;
            }
        } catch (InterruptedException e) {
            throw new GeaflowRuntimeException(e);
        }
    }
    
    throw new GeaflowRuntimeException(
        String.format("Timeout waiting for barrier of window %d", windowId));
}

2.3 Improve AbstractIterationComputeCommand

Solution 3: Dynamically Select Processing Mode

Modify AbstractIterationComputeCommand.execute() method:

@Override
public void execute(ITaskContext taskContext) {
    final long start = System.currentTimeMillis();
    super.execute(taskContext);
    AbstractWorker abstractWorker = (AbstractWorker) worker;
    abstractWorker.init(windowId);
    fetcherRunner.add(new FetchRequest(((WorkerContext) this.context).getTaskId(), 
        fetchWindowId, fetchCount));
    
    // Improvement: Select processing mode based on configuration and event type
    boolean useAligned = determineProcessingMode(abstractWorker);
    abstractWorker.process(fetchCount, useAligned);
    
    ((AbstractWorkerContext) this.context).getEventMetrics()
        .addProcessCostMs(System.currentTimeMillis() - start);
}

private boolean determineProcessingMode(AbstractWorker worker) {
    // If it's an aligned worker, always use aligned mode
    if (worker instanceof AbstractAlignedWorker) {
        return true;
    }
    
    // Check if it's LoadGraphProcessEvent
    if (this instanceof LoadGraphProcessEvent) {
        // Read from configuration whether to force alignment
        Configuration config = ((AbstractWorkerContext) context).getConfiguration();
        boolean forceAlignedForGraphLoad = config.getBoolean(
            "geaflow.graph.load.force.aligned", false);
        
        if (forceAlignedForGraphLoad) {
            LOGGER.info("taskId {} forcing aligned mode for graph loading", 
                context.getTaskId());
            return true;
        } else {
            LOGGER.info("taskId {} using unaligned mode for graph loading", 
                context.getTaskId());
            return false;
        }
    }
    
    // Other cases, use unaligned mode
    return false;
}

2.4 Enhanced Window Queue Management

Solution 4: Improve finishBarrier Method

Improve in AbstractUnAlignedWorker:

@Override
protected void finishBarrier(long totalCount, long processedCount) {
    // Validate counts
    if (totalCount != processedCount) {
        LOGGER.error("taskId {} count mismatch, TotalCount:{} != ProcessCount:{}",
            context.getTaskId(), totalCount, processedCount);
        // Decide whether to throw exception or continue based on configuration
        boolean strictMode = context.getConfiguration()
            .getBoolean("geaflow.strict.count.check", true);
        if (strictMode) {
            throw new GeaflowRuntimeException(
                String.format("Count mismatch: %d != %d", totalCount, processedCount));
        }
    }
    context.getEventMetrics().addShuffleReadRecords(totalCount);

    // Improvement: Add retry and timeout handling
    long currentWindowId = pollWindowIdWithRetry();
    
    // Validate window ID (relax restrictions to support streaming scenarios)
    validateWindowId(currentWindowId);
    
    // Complete window processing
    finish(currentWindowId);

    // Initialize next window
    super.init(currentWindowId + 1);
}

private long pollWindowIdWithRetry() {
    int maxRetries = 3;
    int retryCount = 0;
    long timeout = DEFAULT_TIMEOUT_MS;
    
    while (retryCount < maxRetries) {
        try {
            Long windowId = processingWindowIdQueue.poll(timeout, TimeUnit.MILLISECONDS);
            if (windowId != null) {
                LOGGER.debug("taskId {} polled windowId {} from queue", 
                    context.getTaskId(), windowId);
                return windowId;
            }
            
            retryCount++;
            timeout *= 2; // Exponential backoff
            LOGGER.warn("taskId {} failed to poll windowId, retry {}/{}", 
                context.getTaskId(), retryCount, maxRetries);
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new GeaflowRuntimeException("Interrupted while polling window ID", e);
        }
    }
    
    // All retries failed
    throw new GeaflowRuntimeException(
        String.format("Failed to poll window ID after %d retries", maxRetries));
}

private void validateWindowId(long currentWindowId) {
    long contextWindowId = context.getCurrentWindowId();
    
    // Relax validation range to support dynamic/streaming scenarios
    // Allow windows in [contextWindowId - maxWindowSkew, contextWindowId + maxWindowSkew] range
    int maxWindowSkew = context.getConfiguration()
        .getInteger("geaflow.window.skew.tolerance", 2);
    
    long lowerBound = contextWindowId - maxWindowSkew;
    long upperBound = contextWindowId + maxWindowSkew;
    
    if (currentWindowId < lowerBound || currentWindowId > upperBound) {
        String errorMessage = String.format(
            "Window ID %d out of valid range [%d, %d], context window: %d",
            currentWindowId, lowerBound, upperBound, contextWindowId);
        LOGGER.error("taskId {} {}", context.getTaskId(), errorMessage);
        throw new GeaflowRuntimeException(errorMessage);
    }
    
    if (currentWindowId != contextWindowId) {
        LOGGER.warn("taskId {} window ID mismatch: queue={}, context={}", 
            context.getTaskId(), currentWindowId, contextWindowId);
    }
}

III. Configuration Parameter Design

3.1 New Configuration Items

Add in FrameworkConfigKeys:

// Whether to allow unaligned mode for graph loading
public static final ConfigKey GRAPH_LOAD_UNALIGNED_ENABLE = ConfigKeys
    .key("geaflow.graph.load.unaligned.enable")
    .defaultValue(false)
    .description("enable unaligned processing for graph loading, default is false");

// Window ID tolerance range
public static final ConfigKey WINDOW_SKEW_TOLERANCE = ConfigKeys
    .key("geaflow.window.skew.tolerance")
    .defaultValue(2)
    .description("tolerance for window ID skew in dynamic scenarios");

// Maximum pending windows
public static final ConfigKey MAX_PENDING_WINDOWS = ConfigKeys
    .key("geaflow.max.pending.windows")
    .defaultValue(10)
    .description("maximum number of pending windows in unaligned worker");

// Allow out-of-order windows
public static final ConfigKey ALLOW_OUT_OF_ORDER_WINDOWS = ConfigKeys
    .key("geaflow.allow.out.of.order.windows")
    .defaultValue(true)
    .description("allow out-of-order window processing in stream scenarios");

// Strict count checking
public static final ConfigKey STRICT_COUNT_CHECK = ConfigKeys
    .key("geaflow.strict.count.check")
    .defaultValue(true)
    .description("enable strict count checking for barriers");

IV. Test Plan

4.1 Unit Tests

@Test
public void testOutOfOrderWindowInitialization() {
    UnAlignedComputeWorker worker = new UnAlignedComputeWorker();
    worker.setAllowOutOfOrderWindows(true);
    
    // Test out-of-order window initialization
    worker.init(5);
    worker.init(3);
    worker.init(4);
    
    // Verify all windows are correctly initialized
    assertEquals(3, worker.getInitializedWindows().size());
}

@Test
public void testDuplicateWindowInitialization() {
    UnAlignedComputeWorker worker = new UnAlignedComputeWorker();
    
    // Test duplicate initialization
    worker.init(1);
    worker.init(1); // Should be ignored
    
    assertEquals(1, worker.getProcessingWindowIdQueue().size());
}

@Test
public void testWindowQueueCapacity() {
    UnAlignedComputeWorker worker = new UnAlignedComputeWorker();
    worker.setMaxPendingWindows(3);
    
    // Test queue capacity limit
    worker.init(1);
    worker.init(2);
    worker.init(3);
    
    assertThrows(GeaflowRuntimeException.class, () -> {
        worker.init(4); // Should throw exception
    });
}

4.2 Integration Tests

@Test
public void testDynamicGraphStreamProcessing() {
    // Create dynamic graph streaming scenario
    Configuration config = new Configuration();
    config.put(FrameworkConfigKeys.ASP_ENABLE, true);
    config.put(FrameworkConfigKeys.ALLOW_OUT_OF_ORDER_WINDOWS, true);
    config.put(FrameworkConfigKeys.GRAPH_LOAD_UNALIGNED_ENABLE, true);
    
    // Build incremental graph view
    PIncGraphView<Integer, Integer, Integer> incGraphView = 
        buildIncrementalGraphView(config);
    
    // Execute traversal for multiple windows
    for (int i = 0; i < 10; i++) {
        incGraphView.incrementalTraversal(new TestTraversal())
            .start(getRequests())
            .sink(new TestSink());
    }
    
    // Verify result correctness
    verifyResults();
}

V. Performance Optimization Recommendations

5.1 Memory Management

  1. Automatic Window State Cleanup: Periodically clean up completed window states to prevent memory leaks
  2. Queue Capacity Limitation: Set reasonable queue size limits to prevent OOM
  3. Use Weak References: Consider using WeakHashMap for historical window states

5.2 Concurrency Optimization

  1. Lock-Free Queues: Consider using ConcurrentLinkedQueue instead of LinkedBlockingDeque
  2. Segmented Locks: Use segmented locks for window state map to reduce contention
  3. Batch Processing: Batch process window completion events to reduce RPC calls

5.3 Monitoring Metrics

Add the following monitoring metrics:

// Queue length
metrics.gauge("unaligned.worker.queue.size", 
    () -> processingWindowIdQueue.size());

// Pending windows count
metrics.gauge("unaligned.worker.pending.windows", 
    () -> windowStateMap.size());

// Window processing latency
metrics.histogram("unaligned.worker.window.latency");

// Out-of-order window count
metrics.counter("unaligned.worker.out.of.order.windows");

VI. Migration Path

6.1 Phase 1: Basic Improvements (1-2 weeks)

  • Implement enhanced window state management
  • Improve init() and finish() methods
  • Add basic configuration parameters

6.2 Phase 2: Performance Optimization (2-3 weeks)

  • Implement unaligned processing for LoadGraphProcessEvent
  • Optimize queue management and barrier handling
  • Complete monitoring and logging

6.3 Phase 3: Comprehensive Testing (2 weeks)

  • Unit test coverage
  • Integration test verification
  • Performance stress testing and tuning

6.4 Phase 4: Gradual Rollout (2-3 weeks)

  • Small traffic validation
  • Gradually expand scope
  • Monitoring and issue resolution

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions