Skip to content

Commit 8e4e5f6

Browse files
Merge pull request #1 from ops0-ai/feature/kafka-ssl
feat: adding ssl for kafka connect
2 parents e6c614b + a24a8da commit 8e4e5f6

1 file changed

Lines changed: 33 additions & 20 deletions

File tree

main.go

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ func main() {
9999
var interactiveMode bool
100100
var adminMode string
101101
var kafkaBrokers string
102+
var kafkaCommandConfig string
102103

103104
flag.BoolVar(&showVersion, "version", false, "show version information")
104105
flag.BoolVar(&displayHelp, "help", false, "show help information")
@@ -110,6 +111,7 @@ func main() {
110111
flag.BoolVar(&interactiveMode, "o", false, "operations interactive mode")
111112
flag.StringVar(&adminMode, "admin", "", "enter admin mode for a specific service (e.g., 'kafka')")
112113
flag.StringVar(&kafkaBrokers, "brokers", "", "comma-separated list of Kafka brokers for admin mode")
114+
flag.StringVar(&kafkaCommandConfig, "command-config", "", "path to Kafka command config file for SSL/SASL")
113115
flag.Parse()
114116

115117
if installAll {
@@ -124,7 +126,7 @@ func main() {
124126
fmt.Println("❌ ops0: --brokers flag is required for Kafka admin mode")
125127
os.Exit(1)
126128
}
127-
runKafkaAdminSession(kafkaBrokers)
129+
runKafkaAdminSession(kafkaBrokers, kafkaCommandConfig)
128130
default:
129131
fmt.Printf("❌ ops0: Unknown admin mode '%s'. Supported modes: kafka\n", adminMode)
130132
os.Exit(1)
@@ -264,10 +266,12 @@ func showHelp() {
264266
fmt.Println("\n Kafka Admin Mode:")
265267
fmt.Println(" Usage: ops0 --admin kafka --brokers <broker_list>")
266268
fmt.Println(" Flags:")
267-
fmt.Println(" --admin kafka Enter Kafka admin mode.")
268-
fmt.Println(" --brokers <list> Comma-separated list of Kafka brokers (required).")
269+
fmt.Println(" --admin kafka Enter Kafka admin mode.")
270+
fmt.Println(" --brokers <list> Comma-separated list of Kafka brokers (required).")
271+
fmt.Println(" --command-config <path> Path to client config file for SSL/SASL.")
269272
fmt.Println(" Example:")
270273
fmt.Println(" ops0 --admin kafka --brokers localhost:9092")
274+
fmt.Println(" ops0 --admin kafka --brokers ssl-broker:9093 --command-config client.properties")
271275

272276
// Supported Tools
273277
fmt.Println("\n🛠️ Supported Tools:")
@@ -1922,7 +1926,7 @@ func runInteractiveSession() {
19221926
}
19231927
}
19241928

1925-
func runKafkaAdminSession(brokers string) {
1929+
func runKafkaAdminSession(brokers string, commandConfig string) {
19261930
// Prerequisite check for Homebrew on macOS
19271931
if runtime.GOOS == "darwin" {
19281932
if _, err := findCommand("brew"); err != nil {
@@ -1951,7 +1955,7 @@ func runKafkaAdminSession(brokers string) {
19511955
}
19521956
}
19531957

1954-
// 1. Check if kafka-topics.sh is available
1958+
// 1. Check if kafka-topics is available
19551959
cmdPath, err := findCommand("kafka-topics")
19561960
if err != nil {
19571961
if err.Error() == "found_not_in_path" {
@@ -1996,7 +2000,11 @@ func runKafkaAdminSession(brokers string) {
19962000

19972001
// 2. Test connection to the cluster
19982002
fmt.Printf("Connecting to Kafka cluster at %s...\n", brokers)
1999-
testCmd := exec.Command(cmdPath, "--bootstrap-server", brokers, "--list")
2003+
args := []string{"--bootstrap-server", brokers, "--list"}
2004+
if commandConfig != "" {
2005+
args = append(args, "--command-config", commandConfig)
2006+
}
2007+
testCmd := exec.Command(cmdPath, args...)
20002008
testCmd.Stderr = os.Stderr
20012009
if err := testCmd.Run(); err != nil {
20022010
fmt.Printf(red+"❌ Could not connect to Kafka cluster. Please check your broker addresses and network connectivity. Error: %v"+reset+"\n", err)
@@ -2032,7 +2040,7 @@ func runKafkaAdminSession(brokers string) {
20322040
continue
20332041
}
20342042

2035-
suggestion := getKafkaAISuggestion(claudeConfig, input, brokers)
2043+
suggestion := getKafkaAISuggestion(claudeConfig, input, brokers, commandConfig)
20362044

20372045
if suggestion != nil {
20382046
// Show operation details and prompt for confirmation
@@ -2069,26 +2077,31 @@ func runKafkaAdminSession(brokers string) {
20692077
}
20702078
}
20712079

2072-
func getKafkaAISuggestion(config *ClaudeConfig, userInput, brokers string) *CommandSuggestion {
2080+
func getKafkaAISuggestion(config *ClaudeConfig, userInput, brokers, commandConfig string) *CommandSuggestion {
2081+
connectionFlags := fmt.Sprintf("--bootstrap-server %s", brokers)
2082+
if commandConfig != "" {
2083+
connectionFlags += fmt.Sprintf(" --command-config %s", commandConfig)
2084+
}
2085+
20732086
systemPrompt := fmt.Sprintf(`You are an expert Kafka administrator's assistant. Your sole job is to translate natural language user requests into the appropriate Kafka command-line tool command (e.g., kafka-topics, kafka-console-consumer, kafka-configs).
20742087
2075-
The user is connected to the Kafka cluster at: %s
2076-
**You must inject '--bootstrap-server %s' into every command you generate.** Do not use full paths for the kafka commands (e.g. use 'kafka-topics' not '/usr/local/bin/kafka-topics').
2088+
The user is connected to a Kafka cluster.
2089+
**You must inject '%s' into every command you generate.** Do not use full paths for the kafka commands (e.g. use 'kafka-topics' not '/usr/local/bin/kafka-topics').
20772090
20782091
Here are some examples of Kafka commands:
2079-
- List topics: kafka-topics --bootstrap-server %s --list
2080-
- Describe a topic: kafka-topics --bootstrap-server %s --describe --topic my-topic
2081-
- Create a topic: kafka-topics --bootstrap-server %s --create --topic new-topic --partitions 1 --replication-factor 1
2082-
- Delete a topic: kafka-topics --bootstrap-server %s --delete --topic old-topic
2083-
- Consume messages: kafka-console-consumer --bootstrap-server %s --topic my-topic --from-beginning --max-messages 10
2084-
- Produce a message: kafka-console-producer --bootstrap-server %s --topic my-topic
2085-
- Describe configs: kafka-configs --bootstrap-server %s --describe --entity-type topics --entity-name my-topic
2092+
- List topics: kafka-topics %s --list
2093+
- Describe a topic: kafka-topics %s --describe --topic my-topic
2094+
- Create a topic: kafka-topics %s --create --topic new-topic --partitions 1 --replication-factor 1
2095+
- Delete a topic: kafka-topics %s --delete --topic old-topic
2096+
- Consume messages: kafka-console-consumer %s --topic my-topic --from-beginning --max-messages 10
2097+
- Produce a message: kafka-console-producer %s --topic my-topic
2098+
- Describe configs: kafka-configs %s --describe --entity-type topics --entity-name my-topic
20862099
20872100
Respond with a JSON object in this exact format, with no extra text or explanations.
20882101
Use one of the following standardized intents: 'list_topics', 'describe_topic', 'create_topic', 'delete_topic', 'produce_message', 'consume_message', 'alter_configs', 'describe_configs', 'list_consumer_groups', 'describe_consumer_group', 'get_cluster_info'.
20892102
{
20902103
"tool": "kafka",
2091-
"command": "kafka-topics --bootstrap-server %s --list",
2104+
"command": "kafka-topics %s --list",
20922105
"dry_run_command": "",
20932106
"description": "This command will list all topics in the Kafka cluster.",
20942107
"intent": "list_topics",
@@ -2097,9 +2110,9 @@ Use one of the following standardized intents: 'list_topics', 'describe_topic',
20972110
}
20982111
20992112
If the user says "produce a message 'hello world' to topic 'test'", the command should be:
2100-
"echo 'hello world' | kafka-console-producer --bootstrap-server %s --topic test"
2113+
"echo 'hello world' | kafka-console-producer %s --topic test"
21012114
2102-
User Request: %s`, brokers, brokers, brokers, brokers, brokers, brokers, brokers, brokers, brokers, brokers, brokers, userInput)
2115+
User Request: %s`, connectionFlags, connectionFlags, connectionFlags, connectionFlags, connectionFlags, connectionFlags, connectionFlags, connectionFlags, connectionFlags, connectionFlags, userInput)
21032116

21042117
response := callClaude(config, systemPrompt, userInput)
21052118
if response == "" {

0 commit comments

Comments
 (0)