- *
- * Configuration format is same as PropertiesFileConfigurationProvider
- *
- * Configuration properties
- *
- * agentName - Name of Agent for which configuration needs to be pulled
- *
- * zkConnString - Connection string to ZooKeeper Ensemble
- * (host:port,host1:port1)
- *
- * basePath - Base Path where agent configuration needs to be stored. Defaults
- * to /flume
- */
-public abstract class AbstractZooKeeperConfigurationProvider extends
- AbstractConfigurationProvider {
-
- static final String DEFAULT_ZK_BASE_PATH = "/flume";
-
- protected final String basePath;
-
- protected final String zkConnString;
-
- protected AbstractZooKeeperConfigurationProvider(String agentName,
- String zkConnString, String basePath) {
- super(agentName);
- Preconditions.checkArgument(!Strings.isNullOrEmpty(zkConnString),
- "Invalid Zookeeper Connection String %s", zkConnString);
- this.zkConnString = zkConnString;
- if (basePath == null || basePath.isEmpty()) {
- this.basePath = DEFAULT_ZK_BASE_PATH;
- } else {
- this.basePath = basePath;
- }
- }
-
- protected CuratorFramework createClient() {
- return CuratorFrameworkFactory.newClient(zkConnString,
- new ExponentialBackoffRetry(1000, 1));
- }
-
- protected FlumeConfiguration configFromBytes(byte[] configData)
- throws IOException {
- Map configMap;
- if (configData == null || configData.length == 0) {
- configMap = Collections.emptyMap();
- } else {
- String fileContent = new String(configData, Charsets.UTF_8);
- Properties properties = new Properties();
- properties.load(new StringReader(fileContent));
- configMap = toMap(properties);
- }
- return new FlumeConfiguration(configMap);
- }
-}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/Application.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/Application.java
deleted file mode 100644
index 267313a8dd190..0000000000000
--- a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/Application.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.flume.node;
-
-import com.google.common.base.Throwables;
-import com.google.common.eventbus.Subscribe;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.SinkRunner;
-import org.apache.flume.SourceRunner;
-import org.apache.flume.instrumentation.MonitorService;
-import org.apache.flume.instrumentation.MonitoringType;
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.apache.flume.lifecycle.LifecycleSupervisor;
-import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class Application {
-
- private static final Logger logger = LoggerFactory
- .getLogger(Application.class);
-
- public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
- public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
-
- private final List components;
- private final LifecycleSupervisor supervisor;
- private MaterializedConfiguration materializedConfiguration;
- private MonitorService monitorServer;
- private final ReentrantLock lifecycleLock = new ReentrantLock();
-
- public Application() {
- this(new ArrayList(0));
- }
-
- public Application(List components) {
- this.components = components;
- supervisor = new LifecycleSupervisor();
- }
-
- public void start() {
- lifecycleLock.lock();
- try {
- for (LifecycleAware component : components) {
- supervisor.supervise(component,
- new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
- }
- } finally {
- lifecycleLock.unlock();
- }
- }
-
- @Subscribe
- public void handleConfigurationEvent(MaterializedConfiguration conf) {
- try {
- lifecycleLock.lockInterruptibly();
- stopAllComponents();
- startAllComponents(conf);
- } catch (InterruptedException e) {
- logger.info("Interrupted while trying to handle configuration event");
- return;
- } finally {
- // If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock
- if (lifecycleLock.isHeldByCurrentThread()) {
- lifecycleLock.unlock();
- }
- }
- }
-
- public void stop() {
- lifecycleLock.lock();
- stopAllComponents();
- try {
- supervisor.stop();
- if (monitorServer != null) {
- monitorServer.stop();
- }
- } finally {
- lifecycleLock.unlock();
- }
- }
-
- private void stopAllComponents() {
- if (this.materializedConfiguration != null) {
- logger.info("Shutting down configuration: {}", this.materializedConfiguration);
- for (Entry entry :
- this.materializedConfiguration.getSourceRunners().entrySet()) {
- try {
- logger.info("Stopping Source " + entry.getKey());
- supervisor.unsupervise(entry.getValue());
- } catch (Exception e) {
- logger.error("Error while stopping {}", entry.getValue(), e);
- }
- }
-
- for (Entry entry :
- this.materializedConfiguration.getSinkRunners().entrySet()) {
- try {
- logger.info("Stopping Sink " + entry.getKey());
- supervisor.unsupervise(entry.getValue());
- } catch (Exception e) {
- logger.error("Error while stopping {}", entry.getValue(), e);
- }
- }
-
- for (Entry entry :
- this.materializedConfiguration.getChannels().entrySet()) {
- try {
- logger.info("Stopping Channel " + entry.getKey());
- supervisor.unsupervise(entry.getValue());
- } catch (Exception e) {
- logger.error("Error while stopping {}", entry.getValue(), e);
- }
- }
- }
- if (monitorServer != null) {
- monitorServer.stop();
- }
- }
-
- private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
- logger.info("Starting new configuration:{}", materializedConfiguration);
-
- this.materializedConfiguration = materializedConfiguration;
-
- for (Entry entry :
- materializedConfiguration.getChannels().entrySet()) {
- try {
- logger.info("Starting Channel " + entry.getKey());
- supervisor.supervise(entry.getValue(),
- new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
- } catch (Exception e) {
- logger.error("Error while starting {}", entry.getValue(), e);
- }
- }
-
- /*
- * Wait for all channels to start.
- */
- for (Channel ch : materializedConfiguration.getChannels().values()) {
- while (ch.getLifecycleState() != LifecycleState.START
- && !supervisor.isComponentInErrorState(ch)) {
- try {
- logger.info("Waiting for channel: " + ch.getName()
- + " to start. Sleeping for 500 ms");
- Thread.sleep(500);
- } catch (InterruptedException e) {
- logger.error("Interrupted while waiting for channel to start.", e);
- Throwables.propagate(e);
- }
- }
- }
-
- for (Entry entry : materializedConfiguration.getSinkRunners().entrySet()) {
- try {
- logger.info("Starting Sink " + entry.getKey());
- supervisor.supervise(entry.getValue(),
- new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
- } catch (Exception e) {
- logger.error("Error while starting {}", entry.getValue(), e);
- }
- }
-
- for (Entry entry :
- materializedConfiguration.getSourceRunners().entrySet()) {
- try {
- logger.info("Starting Source " + entry.getKey());
- supervisor.supervise(entry.getValue(),
- new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
- } catch (Exception e) {
- logger.error("Error while starting {}", entry.getValue(), e);
- }
- }
-
- this.loadMonitoring();
- }
-
- @SuppressWarnings("unchecked")
- private void loadMonitoring() {
- Properties systemProps = System.getProperties();
- Set keys = systemProps.stringPropertyNames();
- try {
- if (keys.contains(CONF_MONITOR_CLASS)) {
- String monitorType = systemProps.getProperty(CONF_MONITOR_CLASS);
- Class extends MonitorService> klass;
- try {
- //Is it a known type?
- klass = MonitoringType.valueOf(
- monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass();
- } catch (Exception e) {
- //Not a known type, use FQCN
- klass = (Class extends MonitorService>) Class.forName(monitorType);
- }
- this.monitorServer = klass.getDeclaredConstructor().newInstance();
- Context context = new Context();
- for (String key : keys) {
- if (key.startsWith(CONF_MONITOR_PREFIX)) {
- context.put(key.substring(CONF_MONITOR_PREFIX.length()),
- systemProps.getProperty(key));
- }
- }
- monitorServer.configure(context);
- monitorServer.start();
- }
- } catch (Exception e) {
- logger.warn("Error starting monitoring. "
- + "Monitoring might not be available.", e);
- }
-
- }
-
-}
\ No newline at end of file
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/ConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/ConfigurationProvider.java
deleted file mode 100644
index e2a7ffe813900..0000000000000
--- a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/ConfigurationProvider.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.flume.node;
-
-public interface ConfigurationProvider {
- MaterializedConfiguration getConfiguration();
-}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/EnvVarResolverProperties.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/EnvVarResolverProperties.java
deleted file mode 100644
index b561dbb893ea1..0000000000000
--- a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/EnvVarResolverProperties.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.flume.node;
-
-import com.google.common.base.Preconditions;
-import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-
-/**
- * A class that extends the Java built-in Properties overriding.
- * {@link java.util.Properties#getProperty(String)} to allow ${ENV_VAR_NAME}-style environment
- * variable inclusions
- */
-public class EnvVarResolverProperties extends Properties {
- /**
- * @param input The input string with ${ENV_VAR_NAME}-style environment variable names
- * @return The output string with ${ENV_VAR_NAME} replaced with their environment variable values
- */
- protected static String resolveEnvVars(String input) {
- Preconditions.checkNotNull(input);
- // match ${ENV_VAR_NAME}
- Pattern p = Pattern.compile("\\$\\{(\\w+)\\}");
- Matcher m = p.matcher(input);
- StringBuffer sb = new StringBuffer();
- while (m.find()) {
- String envVarName = m.group(1);
- String envVarValue = System.getenv(envVarName);
- m.appendReplacement(sb, null == envVarValue ? "" : envVarValue);
- }
- m.appendTail(sb);
- return sb.toString();
- }
-
- /**
- * @param key the property key
- * @return the value of the property key with ${ENV_VAR_NAME}-style environment variables replaced
- */
- @Override
- public String getProperty(String key) {
- return resolveEnvVars(super.getProperty(key));
- }
-}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/MaterializedConfiguration.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/MaterializedConfiguration.java
deleted file mode 100644
index 9b18d042e4a96..0000000000000
--- a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/MaterializedConfiguration.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.flume.node;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.flume.Channel;
-import org.apache.flume.SinkRunner;
-import org.apache.flume.SourceRunner;
-
-/**
- * MaterializedConfiguration represents the materialization of a Flume
- * properties file. That is it's the actual Source, Sink, and Channels
- * represented in the configuration file.
- */
-public interface MaterializedConfiguration {
-
- void addSourceRunner(String name, SourceRunner sourceRunner);
-
- void addSinkRunner(String name, SinkRunner sinkRunner);
-
- void addChannel(String name, Channel channel);
-
- ImmutableMap getSourceRunners();
-
- ImmutableMap getSinkRunners();
-
- ImmutableMap getChannels();
-
-}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingPropertiesFileConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingPropertiesFileConfigurationProvider.java
deleted file mode 100644
index 29f3f0b8ada7f..0000000000000
--- a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingPropertiesFileConfigurationProvider.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.flume.node;
-
-import com.google.common.base.Preconditions;
-import com.google.common.eventbus.EventBus;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.io.File;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.flume.CounterGroup;
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PollingPropertiesFileConfigurationProvider
- extends PropertiesFileConfigurationProvider
- implements LifecycleAware {
-
- private static final Logger LOGGER =
- LoggerFactory.getLogger(PollingPropertiesFileConfigurationProvider.class);
-
- private final EventBus eventBus;
- private final File file;
- private final int interval;
- private final CounterGroup counterGroup;
- private LifecycleState lifecycleState;
-
- private ScheduledExecutorService executorService;
-
- public PollingPropertiesFileConfigurationProvider(String agentName,
- File file, EventBus eventBus, int interval) {
- super(agentName, file);
- this.eventBus = eventBus;
- this.file = file;
- this.interval = interval;
- counterGroup = new CounterGroup();
- lifecycleState = LifecycleState.IDLE;
- }
-
- @Override
- public void start() {
- LOGGER.info("Configuration provider starting");
-
- Preconditions.checkState(file != null,
- "The parameter file must not be null");
-
- executorService = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
- .build());
-
- FileWatcherRunnable fileWatcherRunnable =
- new FileWatcherRunnable(file, counterGroup);
-
- executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
- TimeUnit.SECONDS);
-
- lifecycleState = LifecycleState.START;
-
- LOGGER.debug("Configuration provider started");
- }
-
- @Override
- public void stop() {
- LOGGER.info("Configuration provider stopping");
-
- executorService.shutdown();
- try {
- if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
- LOGGER.debug("File watcher has not terminated. Forcing shutdown of executor.");
- executorService.shutdownNow();
- while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
- LOGGER.debug("Waiting for file watcher to terminate");
- }
- }
- } catch (InterruptedException e) {
- LOGGER.debug("Interrupted while waiting for file watcher to terminate");
- Thread.currentThread().interrupt();
- }
- lifecycleState = LifecycleState.STOP;
- LOGGER.debug("Configuration provider stopped");
- }
-
- @Override
- public synchronized LifecycleState getLifecycleState() {
- return lifecycleState;
- }
-
-
- @Override
- public String toString() {
- return "{ file:" + file + " counterGroup:" + counterGroup + " provider:"
- + getClass().getCanonicalName() + " agentName:" + getAgentName() + " }";
- }
-
- public class FileWatcherRunnable implements Runnable {
-
- private final File file;
- private final CounterGroup counterGroup;
-
- private long lastChange;
-
- public FileWatcherRunnable(File file, CounterGroup counterGroup) {
- super();
- this.file = file;
- this.counterGroup = counterGroup;
- this.lastChange = 0L;
- }
-
- @Override
- public void run() {
- LOGGER.debug("Checking file:{} for changes", file);
-
- counterGroup.incrementAndGet("file.checks");
-
- long lastModified = file.lastModified();
-
- if (lastModified > lastChange) {
- LOGGER.info("Reloading configuration file:{}", file);
-
- counterGroup.incrementAndGet("file.loads");
-
- lastChange = lastModified;
-
- try {
- eventBus.post(getConfiguration());
- } catch (Exception e) {
- LOGGER.error("Failed to load configuration data. Exception follows.",
- e);
- } catch (NoClassDefFoundError e) {
- LOGGER.error("Failed to start agent because dependencies were not "
- + "found in classpath. Error follows.", e);
- } catch (Throwable t) {
- // caught because the caller does not handle or log Throwables
- LOGGER.error("Unhandled error", t);
- }
- }
- }
- }
-
-}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingZooKeeperConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingZooKeeperConfigurationProvider.java
deleted file mode 100644
index 4b5a6e4b7b504..0000000000000
--- a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PollingZooKeeperConfigurationProvider.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.flume.node;
-
-import com.google.common.eventbus.EventBus;
-import java.io.IOException;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.NodeCache;
-import org.apache.flume.FlumeException;
-import org.apache.flume.conf.FlumeConfiguration;
-import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.lifecycle.LifecycleState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PollingZooKeeperConfigurationProvider extends
- AbstractZooKeeperConfigurationProvider implements LifecycleAware {
-
- private static final Logger LOGGER = LoggerFactory
- .getLogger(PollingZooKeeperConfigurationProvider.class);
-
- private final EventBus eventBus;
-
- private final CuratorFramework client;
-
- private NodeCache agentNodeCache;
-
- private FlumeConfiguration flumeConfiguration;
-
- private LifecycleState lifecycleState;
-
- public PollingZooKeeperConfigurationProvider(String agentName,
- String zkConnString, String basePath, EventBus eventBus) {
- super(agentName, zkConnString, basePath);
- this.eventBus = eventBus;
- client = createClient();
- agentNodeCache = null;
- flumeConfiguration = null;
- lifecycleState = LifecycleState.IDLE;
- }
-
- @Override
- protected FlumeConfiguration getFlumeConfiguration() {
- return flumeConfiguration;
- }
-
- @Override
- public void start() {
- LOGGER.debug("Starting...");
- try {
- client.start();
- try {
- agentNodeCache = new NodeCache(client, basePath + "/" + getAgentName());
- agentNodeCache.start();
- agentNodeCache.getListenable().addListener(() -> refreshConfiguration());
- } catch (Exception e) {
- client.close();
- throw e;
- }
- } catch (Exception e) {
- lifecycleState = LifecycleState.ERROR;
- if (e instanceof RuntimeException) {
- throw (RuntimeException) e;
- } else {
- throw new FlumeException(e);
- }
- }
- lifecycleState = LifecycleState.START;
- }
-
- private void refreshConfiguration() throws IOException {
- LOGGER.info("Refreshing configuration from ZooKeeper");
- byte[] data = null;
- ChildData childData = agentNodeCache.getCurrentData();
- if (childData != null) {
- data = childData.getData();
- }
- flumeConfiguration = configFromBytes(data);
- eventBus.post(getConfiguration());
- }
-
- @Override
- public void stop() {
- LOGGER.debug("Stopping...");
- if (agentNodeCache != null) {
- try {
- agentNodeCache.close();
- } catch (IOException e) {
- LOGGER.warn("Encountered exception while stopping", e);
- lifecycleState = LifecycleState.ERROR;
- }
- }
-
- try {
- client.close();
- } catch (Exception e) {
- LOGGER.warn("Error stopping Curator client", e);
- lifecycleState = LifecycleState.ERROR;
- }
-
- if (lifecycleState != LifecycleState.ERROR) {
- lifecycleState = LifecycleState.STOP;
- }
- }
-
- @Override
- public LifecycleState getLifecycleState() {
- return lifecycleState;
- }
-}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PropertiesFileConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PropertiesFileConfigurationProvider.java
deleted file mode 100644
index ebf5868a1d0c5..0000000000000
--- a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/PropertiesFileConfigurationProvider.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.flume.node;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.util.HashMap;
-import java.util.Properties;
-import org.apache.flume.conf.FlumeConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * A configuration provider that uses properties file for specifying
- * configuration. The configuration files follow the Java properties file syntax
- * rules specified at {@link java.util.Properties#load(java.io.Reader)}. Every
- * configuration value specified in the properties file is prefixed by an
- * Agent Name which helps isolate an individual agent's namespace.
- *
- *
- * Valid configuration files must observe the following rules for every agent
- * namespace.
- *
- *
For every <agent name> there must be three lists specified that
- * include <agent name>.sources,
- * <agent name>.sinks, and <agent name>.channels.
- * Each of these lists must contain a space separated list of names
- * corresponding to that particular entity.
- *
For each source named in <agent name>.sources, there must
- * be a non-empty type attribute specified from the valid set of source
- * types. For example:
- * <agent name>.sources.<source name>.type = event
- *
For each source named in <agent name>.sources, there must
- * be a space-separated list of channel names that the source will associate
- * with during runtime. Each of these names must be contained in the channels
- * list specified by <agent name>.channels. For example:
- * <agent name>.sources.<source name>.channels =
- * <channel-1 name> <channel-2 name>
- *
For each source named in the <agent name>.sources, there
- * must be a runner namespace of configuration that configures the
- * associated source runner. For example:
- * <agent name>.sources.<source name>.runner.type = avro.
- * This namespace can also be used to configure other configuration of the
- * source runner as needed. For example:
- * <agent name>.sources.<source name>.runner.port = 10101
- *
- *
For each source named in <sources>.sources there can
- * be an optional selector.type specified that identifies the type
- * of channel selector associated with the source. If not specified, the
- * default replicating channel selector is used.
- *
For each channel named in the <agent name>.channels,
- * there must be a non-empty type attribute specified from the valid
- * set of channel types. For example:
- * <agent name>.channels.<channel name>.type = mem
- *
For each sink named in the <agent name>.sinks, there must
- * be a non-empty type attribute specified from the valid set of sink
- * types. For example:
- * <agent name>.sinks.<sink name>.type = hdfs
- *
For each sink named in the <agent name>.sinks, there must
- * be a non-empty single-valued channel name specified as the value of the
- * channel attribute. This value must be contained in the channels list
- * specified by <agent name>.channels. For example:
- * <agent name>.sinks.<sink name>.channel =
- * <channel name>
- *
For each sink named in the <agent name>.sinks, there must
- * be a runner namespace of configuration that configures the
- * associated sink runner. For example:
- * <agent name>.sinks.<sink name>.runner.type = polling.
- * This namespace can also be used to configure other configuration of the sink
- * runner as needed. For example:
- * <agent name>.sinks.<sink name>.runner.polling.interval =
- * 60
- *
A fourth optional list <agent name>.sinkgroups
- * may be added to each agent, consisting of unique space separated names
- * for groups
- *
Each sinkgroup must specify sinks, containing a list of all sinks
- * belonging to it. These cannot be shared by multiple groups.
- * Further, one can set a processor and behavioral parameters to determine
- * how sink selection is made via <agent name>.sinkgroups.<
- * group name<.processor. For further detail refer to individual processor
- * documentation
- *
Sinks not assigned to a group will be assigned to default single sink
- * groups.
- *
- *
- * Apart from the above required configuration values, each source, sink or
- * channel can have its own set of arbitrary configuration as required by the
- * implementation. Each of these configuration values are expressed by fully
- * namespace qualified configuration keys. For example, the configuration
- * property called capacity for a channel called ch1 for the
- * agent named host1 with value 1000 will be expressed as:
- * host1.channels.ch1.capacity = 1000.
- *
- *
- * Any information contained in the configuration file other than what pertains
- * to the configured agents, sources, sinks and channels via the explicitly
- * enumerated list of sources, sinks and channels per agent name are ignored by
- * this provider. Moreover, if any of the required configuration values are not
- * present in the configuration file for the configured entities, that entity
- * and anything that depends upon it is considered invalid and consequently not
- * configured. For example, if a channel is missing its type attribute,
- * it is considered misconfigured. Also, any sources or sinks that depend upon
- * this channel are also considered misconfigured and not initialized.
- *
- *
- *
- *
- * @see java.util.Properties#load(java.io.Reader)
- */
-public class PropertiesFileConfigurationProvider extends
- AbstractConfigurationProvider {
-
- private static final Logger LOGGER = LoggerFactory
- .getLogger(PropertiesFileConfigurationProvider.class);
- private static final String DEFAULT_PROPERTIES_IMPLEMENTATION = "java.util.Properties";
-
- private final File file;
-
- public PropertiesFileConfigurationProvider(String agentName, File file) {
- super(agentName);
- this.file = file;
- }
-
- @Override
- public FlumeConfiguration getFlumeConfiguration() {
- BufferedReader reader = null;
- try {
- reader = new BufferedReader(new FileReader(file));
- String resolverClassName = System.getProperty("propertiesImplementation",
- DEFAULT_PROPERTIES_IMPLEMENTATION);
- Class extends Properties> propsclass = Class.forName(resolverClassName)
- .asSubclass(Properties.class);
- Properties properties = propsclass.getDeclaredConstructor().newInstance();
- properties.load(reader);
- return new FlumeConfiguration(toMap(properties));
- } catch (IOException ex) {
- LOGGER.error("Unable to load file:" + file
- + " (I/O failure) - Exception follows.", ex);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- LOGGER.error("Configuration resolver class not found", e);
- } catch (InstantiationException e) {
- LOGGER.error("Instantiation exception", e);
- } catch (IllegalAccessException e) {
- LOGGER.error("Illegal access exception", e);
- } catch (InvocationTargetException e) {
- LOGGER.error("Invocation target exception", e);
- } catch (NoSuchMethodException e) {
- LOGGER.error("No such method exception", e);
- } finally {
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException ex) {
- LOGGER.warn(
- "Unable to close file reader for file: " + file, ex);
- }
- }
- }
- return new FlumeConfiguration(new HashMap());
- }
-}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/SimpleMaterializedConfiguration.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/SimpleMaterializedConfiguration.java
deleted file mode 100644
index cbe49431f5d66..0000000000000
--- a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/SimpleMaterializedConfiguration.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.flume.node;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.flume.Channel;
-import org.apache.flume.SinkRunner;
-import org.apache.flume.SourceRunner;
-
-public class SimpleMaterializedConfiguration implements MaterializedConfiguration {
-
- private final Map channels;
- private final Map sourceRunners;
- private final Map sinkRunners;
-
- public SimpleMaterializedConfiguration() {
- channels = new HashMap();
- sourceRunners = new HashMap();
- sinkRunners = new HashMap();
- }
-
- @Override
- public String toString() {
- return "{ sourceRunners:" + sourceRunners + " sinkRunners:" + sinkRunners
- + " channels:" + channels + " }";
- }
-
- @Override
- public void addSourceRunner(String name, SourceRunner sourceRunner) {
- sourceRunners.put(name, sourceRunner);
- }
-
- @Override
- public void addSinkRunner(String name, SinkRunner sinkRunner) {
- sinkRunners.put(name, sinkRunner);
- }
-
- @Override
- public void addChannel(String name, Channel channel) {
- channels.put(name, channel);
- }
-
- @Override
- public ImmutableMap getChannels() {
- return ImmutableMap.copyOf(channels);
- }
-
- @Override
- public ImmutableMap getSourceRunners() {
- return ImmutableMap.copyOf(sourceRunners);
- }
-
- @Override
- public ImmutableMap getSinkRunners() {
- return ImmutableMap.copyOf(sinkRunners);
- }
-
-}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/StaticZooKeeperConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/StaticZooKeeperConfigurationProvider.java
deleted file mode 100644
index 2a927e081672a..0000000000000
--- a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/StaticZooKeeperConfigurationProvider.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.flume.node;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.flume.FlumeException;
-import org.apache.flume.conf.FlumeConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StaticZooKeeperConfigurationProvider extends
- AbstractZooKeeperConfigurationProvider {
-
- private static final Logger LOGGER = LoggerFactory
- .getLogger(StaticZooKeeperConfigurationProvider.class);
-
- public StaticZooKeeperConfigurationProvider(String agentName,
- String zkConnString, String basePath) {
- super(agentName, zkConnString, basePath);
- }
-
- @Override
- protected FlumeConfiguration getFlumeConfiguration() {
- try {
- CuratorFramework cf = createClient();
- cf.start();
- try {
- byte[] data = cf.getData().forPath(basePath + "/" + getAgentName());
- return configFromBytes(data);
- } finally {
- cf.close();
- }
- } catch (Exception e) {
- LOGGER.error("Error getting configuration info from Zookeeper", e);
- throw new FlumeException(e);
- }
- }
-
-}
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/package-info.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/package-info.java
deleted file mode 100644
index 5815e29e75c27..0000000000000
--- a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.flume.node;
\ No newline at end of file
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/package-info.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/package-info.java
deleted file mode 100644
index 2ce89df678cdd..0000000000000
--- a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.flume;
\ No newline at end of file
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/AbstractSink.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/AbstractSink.java
deleted file mode 100644
index 038642edc379b..0000000000000
--- a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/AbstractSink.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.flume.sink;
-
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.flume.FlumeConfig;
-import org.apache.pulsar.io.flume.FlumeConnector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Simple abstract sink class for pulsar to flume.
- */
-public abstract class AbstractSink implements Sink {
-
- private static final Logger log = LoggerFactory.getLogger(AbstractSink.class);
-
-
- public abstract T extractValue(Record record);
-
- protected static BlockingQueue