Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 38 additions & 15 deletions lib/td/command/connector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,18 @@ def connector_preview(op)
$stdout.puts cmd_render_table(rows, :fields => fields, :render_format => op.render_format, :resize => false)

$stdout.puts "Update #{config_file} and use '#{$prog} " + Config.cl_options_string + "connector:preview #{config_file}' to preview again."
$stdout.puts "Use '#{$prog} " + Config.cl_options_string + "connector:issue #{config_file}' to run Server-side bulk load."
$stdout.puts "Use '#{$prog} " + Config.cl_options_string + "connector:issue #{config_file}' to run data connector."
end

def connector_issue(op)
database = table = nil
time_column = nil
wait = exclude = false
auto_create = false

op.on('--database DB_NAME', "destination database") { |s| database = s }
op.on('--table TABLE_NAME', "destination table") { |s| table = s }
op.on('--time-column COLUMN_NAME', "data partitioning key") { |s| time_column = s } # unnecessary but for backward compatibility
wait = exclude = false
auto_create = false
config_option = {}

on_with_obsolute_and_overwrite_config_warning(op, '--database DB_NAME') { |s| config_option['database'] = s }
on_with_obsolute_and_overwrite_config_warning(op, '--table TABLE_NAME') { |s| config_option['table'] = s }
on_with_obsolute_and_overwrite_config_warning(op, '--time-column COLUMN_NAME') { |s| config_option['time_column'] = s }

op.on('-w', '--wait', 'wait for finishing the job', TrueClass) { |b| wait = b }
op.on('-x', '--exclude', 'do not automatically retrieve the job result', TrueClass) { |b| exclude = b }
op.on('--auto-create-table', "Create table and database if doesn't exist", TrueClass) { |b|
Expand All @@ -124,19 +124,20 @@ def connector_issue(op)

config_file = op.cmd_parse

required('--database', database)
required('--table', table)

config = prepare_bulkload_job_config(config_file)
(config['out'] ||= {})['time_column'] = time_column if time_column # TODO will not work once embulk implements multi-job
overwrite_out_config(config, config_option)

client = get_client()

required('database', config['out']['database'])
required('table', config['out']['table'])

# TODO need fix if embulk support multi-job
if auto_create
create_database_and_table_if_not_exist(client, database, table)
create_database_and_table_if_not_exist(client, config['out']['database'], config['out']['table'])
end

job_id = client.bulk_load_issue(database, table, config: config)
job_id = client.bulk_load_issue(config['out']['database'], config['out']['table'], config: config)

$stdout.puts "Job #{job_id} is queued."
$stdout.puts "Use '#{$prog} " + Config.cl_options_string + "job:show #{job_id}' to show the status."
Expand All @@ -146,6 +147,28 @@ def connector_issue(op)
end
end

def on_with_obsolute_and_overwrite_config_warning(op, *args, &block)
options = args.each_with_object([]) do |arg, o|
arg.split("\s").each do |word|
o << word if word =~ /\A-/
end
end

op.on(*args, '(obsoleted)') do |s|
$stderr.puts "#{options.join(',')} #{options.size > 1 ? 'are' : 'is'} obsolete option. You should write to configuration file. Even if you wrote in the configuration file, #{s} is used."
block.call(s)
end
end

def overwrite_out_config(config, out_args)
# TODO will not work once embulk implements multi-job
config['out'] ||= {}

out_args.each do |key, value|
config['out'][key] = value if value
end
end

def connector_list(op)
set_render_format_option(op)
op.cmd_parse
Expand Down
2 changes: 1 addition & 1 deletion lib/td/command/list.rb
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def self.finishup
add_list 'connector:guess', %w[config?], 'Run guess to generate connector config file', ["connector:guess config.yml -o td-bulkload.yml\n\nexample config.yml:#{connector_guess_example_config}"]
add_list 'connector:preview', %w[config], 'Show preview of connector execution', ['connector:preview td-bulkload.yml']

add_list 'connector:issue', %w[config], 'Run one time connector execution', ['connector:issue td-bulkload.yml']
add_list 'connector:issue', %w[config], 'Run one time connector execution', ['connector:issue example_db event_logs td-bulkload.yml']

add_list 'connector:list', %w[], 'Show list of connector sessions', ['connector:list']
add_list 'connector:create', %w[name cron database table config], 'Create new connector session', ['connector:create connector1 "0 * * * *" connector_database connector_table td-bulkload.yml']
Expand Down
62 changes: 62 additions & 0 deletions spec/td/command/connector_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,68 @@ module TreasureData::Command
end
end

describe 'database and table arguments' do
let(:database) { 'database' }
let(:table) { 'table' }

before do
client = double(:client)
command.stub(:get_client).and_return(client)
command.stub(:create_database_and_table_if_not_exist)
command.stub(:prepare_bulkload_job_config).and_return(config)
client.should_receive(:bulk_load_issue).with(database, table, {config: expect_config}).and_return(1234)
end

context 'set from config file' do
let(:expect_config) {
{'out' => {'database' => database, 'table' => table}}
}

context 'without arguments' do
let(:option) {
List::CommandParser.new("connector:issue", ["config"], [], nil, [File.join("spec", "td", "fixture", "bulk_load.yml")], true)
}
let(:config) {
{'out' => {'database' => database, 'table' => table}}
}

it { expect { subject }.not_to raise_error }
end

context 'with arguments' do
let(:option) {
List::CommandParser.new("connector:issue", ["config"], ['database', 'table'], nil, [File.join("spec", "td", "fixture", "bulk_load.yml"), '--database', database, '--table', table], true)
}
let(:config) {
{'out' => {'database' => 'config_database', 'table' => 'config_table'}}
}

it {
expect { subject }.not_to raise_error
expect(stderr_io.string).to include "#{database} is used."
expect(stderr_io.string).to include "#{table} is used."
}
end
end

context 'set --database and --table' do
let(:option) {
List::CommandParser.new("connector:issue", ["config"], ['database', 'table'], nil, [File.join("spec", "td", "fixture", "bulk_load.yml"), '--database', database, '--table', table], true)
}
let(:config) { {} }
let(:expect_config) {
{'out' => {'database' => database, 'table' => table}}
}

it 'show warning' do
subject

expect(stderr_io.string).to include '--database is obsolete option'
expect(stderr_io.string).to include '--table is obsolete option'
end
end
end

describe 'queueing job' do
let(:option) {
List::CommandParser.new("connector:issue", ["config"], ['database', 'table'], nil, [File.join("spec", "td", "fixture", "bulk_load.yml"), '--database', 'database', '--table', 'table'], true)
Expand Down