diff --git a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb index 9dfe9733..ef87f7e1 100644 --- a/lib/active_record/connection_adapters/clickhouse/schema_statements.rb +++ b/lib/active_record/connection_adapters/clickhouse/schema_statements.rb @@ -6,22 +6,55 @@ module ActiveRecord module ConnectionAdapters module Clickhouse module SchemaStatements - DEFAULT_RESPONSE_FORMAT = 'JSONCompactEachRowWithNamesAndTypes'.freeze - DB_EXCEPTION_REGEXP = /\ACode:\s+\d+\.\s+DB::Exception:/.freeze + def with_settings(**settings) + @block_settings ||= {} + prev_settings = @block_settings + @block_settings = @block_settings.merge(settings) + yield + ensure + @block_settings = prev_settings + end - def execute(sql, name = nil, settings: {}) - do_execute(sql, name, settings: settings) + # Request a specific format for the duration of the provided block. + # Pass `nil` to explicitly send the SQL statement without a `FORMAT` clause. + # @param [String, nil] format + # + # @example Specify CSVWithNamesAndTypes format + # with_response_format('CSVWithNamesAndTypes') do + # Table.connection.execute('SELECT * FROM table') + # end + # # sends and executes "SELECT * FROM table FORMAT CSVWithNamesAndTypes" + # + # @example Specify no format + # with_response_format(nil) do + # Table.connection.execute('SELECT * FROM table') + # end + # # sends and executes "SELECT * FROM table" + def with_response_format(format) + prev_format = @response_format + @response_format = format + yield + ensure + @response_format = prev_format + end + + def execute(sql, name = nil, format: @response_format, settings: {}) + with_response_format(format) do + log(sql, [adapter_name, name].compact.join(' ')) do + raw_execute(sql, settings: settings) + end + end end - def exec_insert(sql, name, _binds, _pk = nil, _sequence_name = nil, returning: nil) - new_sql = sql.dup.sub(/ (DEFAULT )?VALUES/, " VALUES") - do_execute(new_sql, name, format: nil) + def exec_insert(sql, name = nil, _binds = [], _pk = nil, _sequence_name = nil, returning: nil) + new_sql = sql.sub(/ (DEFAULT )?VALUES/, " VALUES") + with_response_format(nil) { execute(new_sql, name) } true end def internal_exec_query(sql, name = nil, binds = [], prepare: false, async: false, allow_retry: false) - result = do_execute(sql, name) + result = execute(sql, name) columns = result['meta'].map { |m| m['name'] } types = {} result['meta'].each_with_index do |m, i| @@ -37,24 +70,25 @@ def internal_exec_query(sql, name = nil, binds = [], prepare: false, async: fals end def exec_insert_all(sql, name) - do_execute(sql, name, format: nil) + with_response_format(nil) { execute(sql, name) } true end # @link https://clickhouse.com/docs/en/sql-reference/statements/alter/update - def exec_update(_sql, _name = nil, _binds = []) - do_execute(_sql, _name, format: nil) + def exec_update(sql, name = nil, _binds = []) + execute(sql, name) 0 end # @link https://clickhouse.com/docs/en/sql-reference/statements/delete - def exec_delete(_sql, _name = nil, _binds = []) - log(_sql, "#{adapter_name} #{_name}") do - res = request(_sql) + def exec_delete(sql, name = nil, _binds = []) + log(sql, "#{adapter_name} #{name}") do + statement = Statement.new(sql, format: @response_format) + res = request(statement) begin data = JSON.parse(res.header['x-clickhouse-summary']) data['result_rows'].to_i - rescue JSONError + rescue JSON::ParserError 0 end end @@ -85,7 +119,9 @@ def functions end def show_create_function(function) - do_execute("SELECT create_query FROM system.functions WHERE origin = 'SQLUserDefined' AND name = '#{function}'", format: nil) + result = do_system_execute("SELECT create_query FROM system.functions WHERE origin = 'SQLUserDefined' AND name = '#{function}'") + return if result.nil? + result['data'].flatten.first end def table_options(table) @@ -110,18 +146,18 @@ def data_sources tables end - def do_system_execute(sql, name = nil) - log_with_debug(sql, "#{adapter_name} #{name}") do - res = request(sql, DEFAULT_RESPONSE_FORMAT) - process_response(res, DEFAULT_RESPONSE_FORMAT, sql) + def do_system_execute(sql, name = nil, except_params: []) + log_with_debug(sql, [adapter_name, name].compact.join(' ')) do + raw_execute(sql, except_params: except_params) end end def do_execute(sql, name = nil, format: DEFAULT_RESPONSE_FORMAT, settings: {}) - log(sql, "#{adapter_name} #{name}") do - res = request(sql, format, settings) - process_response(res, format, sql) - end + ActiveRecord.deprecator.warn(<<~MSG.squish) + `do_execute` is deprecated and will be removed in an upcoming release. + Please use `execute` instead. + MSG + execute(sql, name, format: format, settings: settings) end if ::ActiveRecord::version >= Gem::Version.new('7.2') @@ -154,7 +190,7 @@ def assume_migrated_upto_version(version, migrations_paths = nil) if (duplicate = inserting.detect { |v| inserting.count(v) > 1 }) raise "Duplicate migration #{duplicate}. Please renumber your migrations to resolve the conflict." end - do_execute(insert_versions_sql(inserting), nil, format: nil, settings: {max_partitions_per_insert_block: [100, inserting.size].max}) + execute(insert_versions_sql(inserting), nil, settings: {max_partitions_per_insert_block: [100, inserting.size].max}) end end @@ -168,54 +204,19 @@ def with_yaml_fallback(value) # :nodoc: end end - private - - # Make HTTP request to ClickHouse server - # @param [String] sql - # @param [String, nil] format - # @param [Hash] settings - # @return [Net::HTTPResponse] - def request(sql, format = nil, settings = {}) - formatted_sql = apply_format(sql, format) - request_params = @connection_config || {} - @connection.post("/?#{request_params.merge(settings).to_param}", formatted_sql, { - 'User-Agent' => "Clickhouse ActiveRecord #{ClickhouseActiverecord::VERSION}", - 'Content-Type' => 'application/x-www-form-urlencoded', - }) - end + protected - def apply_format(sql, format) - format ? "#{sql} FORMAT #{format}" : sql - end + def table_structure(table_name) + result = do_system_execute("DESCRIBE TABLE `#{table_name}`", table_name) + data = result['data'] - def process_response(res, format, sql = nil) - case res.code.to_i - when 200 - body = res.body + return data unless data.empty? - if body.include?("DB::Exception") && body.match?(DB_EXCEPTION_REGEXP) - raise ActiveRecord::ActiveRecordError, "Response code: #{res.code}:\n#{res.body}#{sql ? "\nQuery: #{sql}" : ''}" - else - format_body_response(res.body, format) - end - else - case res.body - when /DB::Exception:.*\(UNKNOWN_DATABASE\)/ - raise ActiveRecord::NoDatabaseError - when /DB::Exception:.*\(DATABASE_ALREADY_EXISTS\)/ - raise ActiveRecord::DatabaseAlreadyExists - else - raise ActiveRecord::ActiveRecordError, "Response code: #{res.code}:\n#{res.body}" - end - end - rescue JSON::ParserError - res.body + raise ActiveRecord::StatementInvalid, "Could not find table '#{table_name}'" end + alias column_definitions table_structure - def log_with_debug(sql, name = nil) - return yield unless @debug - log(sql, "#{name} (system)") { yield } - end + private def schema_creation Clickhouse::SchemaCreation.new(self) @@ -234,20 +235,6 @@ def new_column_from_field(table_name, field, _definitions) Clickhouse::Column.new(field[0], default_value, type_metadata, field[1].include?('Nullable'), default_function, codec: field[5].presence) end - protected - - def table_structure(table_name) - result = do_system_execute("DESCRIBE TABLE `#{table_name}`", table_name) - data = result['data'] - - return data unless data.empty? - - raise ActiveRecord::StatementInvalid, "Could not find table '#{table_name}'" - end - alias column_definitions table_structure - - private - # Extracts the value from a PostgreSQL column default definition. def extract_value_from_default(default_expression, default_type) return nil if default_type != 'DEFAULT' || default_expression.blank? @@ -267,42 +254,36 @@ def has_default_function?(default) # :nodoc: (%r{\w+\(.*\)} === default) end - def format_body_response(body, format) - return body if body.blank? - - case format - when 'JSONCompact' - format_from_json_compact(body) - when 'JSONCompactEachRowWithNamesAndTypes' - format_from_json_compact_each_row_with_names_and_types(body) - else - body - end + def raw_execute(sql, settings: {}, except_params: []) + statement = Statement.new(sql, format: @response_format) + statement.response = request(statement, settings: settings, except_params: except_params) + statement.processed_response end - def format_from_json_compact(body) - parse_json_payload(body) + # Make HTTP request to ClickHouse server + # @param [ActiveRecord::ConnectionAdapters::Clickhouse::Statement] statement + # @param [Hash] settings + # @param [Array] except_params + # @return [Net::HTTPResponse] + def request(statement, settings: {}, except_params: []) + @connection.post("/?#{settings_params(settings, except: except_params)}", + statement.formatted_sql, + 'Content-Type' => 'application/x-www-form-urlencoded', + 'User-Agent' => ClickhouseAdapter::USER_AGENT) end - def format_from_json_compact_each_row_with_names_and_types(body) - rows = body.split("\n").map { |row| parse_json_payload(row) } - names, types, *data = rows - - meta = names.zip(types).map do |name, type| - { - 'name' => name, - 'type' => type - } - end - - { - 'meta' => meta, - 'data' => data - } + def log_with_debug(sql, name = nil) + return yield unless @debug + log(sql, "#{name} (system)") { yield } end - def parse_json_payload(payload) - JSON.parse(payload, decimal_class: BigDecimal) + def settings_params(settings = {}, except: []) + request_params = @connection_config || {} + block_settings = @block_settings || {} + request_params.merge(block_settings) + .merge(settings) + .except(*except) + .to_param end end end diff --git a/lib/active_record/connection_adapters/clickhouse/statement.rb b/lib/active_record/connection_adapters/clickhouse/statement.rb new file mode 100644 index 00000000..4a587945 --- /dev/null +++ b/lib/active_record/connection_adapters/clickhouse/statement.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +require 'active_record/connection_adapters/clickhouse/statement/format_manager' +require 'active_record/connection_adapters/clickhouse/statement/response_processor' + +module ActiveRecord + module ConnectionAdapters + module Clickhouse + class Statement + + attr_reader :format + attr_writer :response + + def initialize(sql, format:) + @sql = sql + @format = format + end + + def formatted_sql + @formatted_sql ||= FormatManager.new(@sql, format: @format).apply + end + + def processed_response + ResponseProcessor.new(@response, @format, @sql).process + end + + end + end + end +end diff --git a/lib/active_record/connection_adapters/clickhouse/statement/format_manager.rb b/lib/active_record/connection_adapters/clickhouse/statement/format_manager.rb new file mode 100644 index 00000000..b17da791 --- /dev/null +++ b/lib/active_record/connection_adapters/clickhouse/statement/format_manager.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +module ActiveRecord + module ConnectionAdapters + module Clickhouse + class Statement + class FormatManager + + def initialize(sql, format:) + @sql = sql.strip + @format = format + end + + def apply + return @sql if skip_format? || @format.blank? + + "#{@sql} FORMAT #{@format}" + end + + def skip_format? + system_command? || schema_command? || format_specified? || delete? + end + + private + + def system_command? + /\Asystem|\Aoptimize/i.match?(@sql) + end + + def schema_command? + /\Acreate|\Aalter|\Adrop|\Arename/i.match?(@sql) + end + + def format_specified? + /format [a-z]+\z/i.match?(@sql) + end + + def delete? + /\Adelete from/i.match?(@sql) + end + + end + end + end + end +end diff --git a/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb b/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb new file mode 100644 index 00000000..11cb8547 --- /dev/null +++ b/lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb @@ -0,0 +1,99 @@ +# frozen_string_literal: true + +module ActiveRecord + module ConnectionAdapters + module Clickhouse + class Statement + class ResponseProcessor + + DB_EXCEPTION_REGEXP = /\ACode:\s+\d+\.\s+DB::Exception:/.freeze + + def initialize(raw_response, format, sql) + @raw_response = raw_response + @body = raw_response.body + @format = format + @sql = sql + end + + def process + if success? + process_successful_response + else + raise_database_error! + end + rescue JSON::ParserError + @body + end + + private + + def success? + @raw_response.code.to_i == 200 + end + + def process_successful_response + raise_generic!(@sql) if @body.include?('DB::Exception') && @body.match?(DB_EXCEPTION_REGEXP) + + format_body_response + end + + def raise_generic!(sql = nil) + raise ActiveRecord::ActiveRecordError, "Response code: #{@raw_response.code}:\n#{@body}#{"\nQuery: #{sql}" if sql}" + end + + def format_body_response + return @body if @body.blank? + + case @format + when 'JSONCompact' + format_from_json_compact(@body) + when 'JSONCompactEachRowWithNamesAndTypes' + format_from_json_compact_each_row_with_names_and_types(@body) + else + @body + end + rescue JSON::ParserError + @body + end + + def format_from_json_compact(body) + parse_json_payload(body) + end + + def format_from_json_compact_each_row_with_names_and_types(body) + rows = body.each_line.map { |row| parse_json_payload(row) } + names, types, *data = rows + + meta = names.zip(types).map do |name, type| + { + 'name' => name, + 'type' => type + } + end + + { + 'meta' => meta, + 'data' => data + } + end + + def parse_json_payload(payload) + JSON.parse(payload, decimal_class: BigDecimal) + end + + def raise_database_error! + case @body + when /DB::Exception:.*\(UNKNOWN_DATABASE\)/ + raise ActiveRecord::NoDatabaseError + when /DB::Exception:.*\(DATABASE_ALREADY_EXISTS\)/ + raise ActiveRecord::DatabaseAlreadyExists + else + raise_generic! + end + end + + end + end + end + end +end diff --git a/lib/active_record/connection_adapters/clickhouse_adapter.rb b/lib/active_record/connection_adapters/clickhouse_adapter.rb index fb739933..144a1881 100644 --- a/lib/active_record/connection_adapters/clickhouse_adapter.rb +++ b/lib/active_record/connection_adapters/clickhouse_adapter.rb @@ -16,6 +16,7 @@ require 'active_record/connection_adapters/clickhouse/quoting' require 'active_record/connection_adapters/clickhouse/schema_creation' require 'active_record/connection_adapters/clickhouse/schema_statements' +require 'active_record/connection_adapters/clickhouse/statement' require 'active_record/connection_adapters/clickhouse/table_definition' require 'net/http' require 'openssl' @@ -82,6 +83,8 @@ class ClickhouseAdapter < AbstractAdapter include Clickhouse::Quoting ADAPTER_NAME = 'Clickhouse'.freeze + DEFAULT_RESPONSE_FORMAT = 'JSONCompactEachRowWithNamesAndTypes'.freeze + USER_AGENT = "ClickHouse ActiveRecord #{ClickhouseActiverecord::VERSION}" NATIVE_DATABASE_TYPES = { string: { name: 'String' }, integer: { name: 'UInt32' }, @@ -137,6 +140,7 @@ def initialize(config_or_deprecated_connection, deprecated_logger = nil, depreca @connection_config = { user: @config[:username], password: @config[:password], database: @config[:database] }.compact @debug = @config[:debug] || false + @response_format = @config[:format] || DEFAULT_RESPONSE_FORMAT @prepared_statements = false @@ -145,7 +149,7 @@ def initialize(config_or_deprecated_connection, deprecated_logger = nil, depreca # Return ClickHouse server version def server_version - @server_version ||= do_system_execute('SELECT version()')['data'][0][0] + @server_version ||= select_value('SELECT version()') end # Savepoints are not supported, noop @@ -308,10 +312,7 @@ def show_create_table(table) # Create a new ClickHouse database. def create_database(name) sql = apply_cluster "CREATE DATABASE #{quote_table_name(name)}" - log_with_debug(sql, adapter_name) do - res = @connection.post("/?#{@connection_config.except(:database).to_param}", sql) - process_response(res, DEFAULT_RESPONSE_FORMAT) - end + do_system_execute sql, adapter_name, except_params: [:database] end def create_view(table_name, request_settings: {}, **options) @@ -324,7 +325,7 @@ def create_view(table_name, request_settings: {}, **options) drop_table(table_name, options.merge(if_exists: true)) end - do_execute(schema_creation.accept(td), format: nil, settings: request_settings) + execute(schema_creation.accept(td), settings: request_settings) end def create_table(table_name, request_settings: {}, **options, &block) @@ -341,7 +342,7 @@ def create_table(table_name, request_settings: {}, **options, &block) drop_table(table_name, options.merge(if_exists: true)) end - do_execute(schema_creation.accept(td), format: nil, settings: request_settings) + execute(schema_creation.accept(td), settings: request_settings) if options[:with_distributed] distributed_table_name = options.delete(:with_distributed) @@ -356,16 +357,13 @@ def create_table(table_name, request_settings: {}, **options, &block) def create_function(name, body, **options) fd = "CREATE#{' OR REPLACE' if options[:force]} FUNCTION #{apply_cluster(quote_table_name(name))} AS #{body}" - do_execute(fd, format: nil) + execute(fd) end # Drops a ClickHouse database. def drop_database(name) #:nodoc: sql = apply_cluster "DROP DATABASE IF EXISTS #{quote_table_name(name)}" - log_with_debug(sql, adapter_name) do - res = @connection.post("/?#{@connection_config.except(:database).to_param}", sql) - process_response(res, DEFAULT_RESPONSE_FORMAT) - end + do_system_execute sql, adapter_name, except_params: [:database] end def drop_functions @@ -375,7 +373,7 @@ def drop_functions end def rename_table(table_name, new_name) - do_execute apply_cluster "RENAME TABLE #{quote_table_name(table_name)} TO #{quote_table_name(new_name)}" + execute apply_cluster "RENAME TABLE #{quote_table_name(table_name)} TO #{quote_table_name(new_name)}" end def drop_table(table_name, options = {}) # :nodoc: @@ -385,7 +383,7 @@ def drop_table(table_name, options = {}) # :nodoc: query = apply_cluster(query) query = "#{query} SYNC" if options[:sync] - do_execute(query) + execute(query) if options[:with_distributed] distributed_table_name = options.delete(:with_distributed) @@ -400,25 +398,19 @@ def drop_function(name, options = {}) query = apply_cluster(query) query = "#{query} SYNC" if options[:sync] - do_execute(query, format: nil) + execute(query) end def add_column(table_name, column_name, type, **options) - return if options[:if_not_exists] == true && column_exists?(table_name, column_name, type) - - at = create_alter_table table_name - at.add_column(column_name, type, **options) - execute(schema_creation.accept(at), nil, settings: {wait_end_of_query: 1, send_progress_in_http_headers: 1}) + with_settings(wait_end_of_query: 1, send_progress_in_http_headers: 1) { super } end def remove_column(table_name, column_name, type = nil, **options) - return if options[:if_exists] == true && !column_exists?(table_name, column_name) - - execute("ALTER TABLE #{quote_table_name(table_name)} #{remove_column_for_alter(table_name, column_name, type, **options)}", nil, settings: {wait_end_of_query: 1, send_progress_in_http_headers: 1}) + with_settings(wait_end_of_query: 1, send_progress_in_http_headers: 1) { super } end def change_column(table_name, column_name, type, **options) - result = do_execute("ALTER TABLE #{quote_table_name(table_name)} #{change_column_for_alter(table_name, column_name, type, **options)}", nil, settings: {wait_end_of_query: 1, send_progress_in_http_headers: 1}) + result = execute("ALTER TABLE #{quote_table_name(table_name)} #{change_column_for_alter(table_name, column_name, type, **options)}", nil, settings: {wait_end_of_query: 1, send_progress_in_http_headers: 1}) raise "Error parse json response: #{result}" if result.presence && !result.is_a?(Hash) end diff --git a/lib/clickhouse-activerecord/tasks.rb b/lib/clickhouse-activerecord/tasks.rb index 7440a17f..55c5794a 100644 --- a/lib/clickhouse-activerecord/tasks.rb +++ b/lib/clickhouse-activerecord/tasks.rb @@ -66,9 +66,9 @@ def structure_load(*args) if sql.gsub(/[a-z]/i, '').blank? next elsif sql =~ /^INSERT INTO/ - connection.do_execute(sql, nil, format: nil) + connection.execute(sql, nil, format: nil) elsif sql =~ /^CREATE .*?FUNCTION/ - connection.do_execute(sql, nil, format: nil) + connection.execute(sql, nil, format: nil) else connection.execute(sql) end diff --git a/spec/cluster/migration_spec.rb b/spec/cluster/migration_spec.rb index 09edf193..e1451d7a 100644 --- a/spec/cluster/migration_spec.rb +++ b/spec/cluster/migration_spec.rb @@ -70,7 +70,7 @@ let(:directory) { 'dsl_create_function' } it 'creates a function' do - ActiveRecord::Base.connection.do_execute('CREATE FUNCTION forced_fun AS (x, k, b) -> k*x + b', format: nil) + ActiveRecord::Base.connection.execute('CREATE FUNCTION forced_fun AS (x, k, b) -> k*x + b') subject diff --git a/spec/fixtures/migrations/plain_function_creation/1_create_some_function.rb b/spec/fixtures/migrations/plain_function_creation/1_create_some_function.rb index ede6a3e2..e5002738 100644 --- a/spec/fixtures/migrations/plain_function_creation/1_create_some_function.rb +++ b/spec/fixtures/migrations/plain_function_creation/1_create_some_function.rb @@ -5,11 +5,11 @@ def up sql = <<~SQL CREATE FUNCTION multFun AS (x,y) -> x * y SQL - do_execute(sql, format: nil) + execute(sql) sql = <<~SQL CREATE FUNCTION addFun AS (x,y) -> x + y SQL - do_execute(sql, format: nil) + execute(sql) end end diff --git a/spec/single/migration_spec.rb b/spec/single/migration_spec.rb index 9b1c574f..aca41e2b 100644 --- a/spec/single/migration_spec.rb +++ b/spec/single/migration_spec.rb @@ -388,7 +388,7 @@ context 'dsl' do let(:directory) { 'dsl_create_function' } it 'creates a function' do - ActiveRecord::Base.connection.do_execute('CREATE FUNCTION forced_fun AS (x, k, b) -> k*x + b', format: nil) + ActiveRecord::Base.connection.execute('CREATE FUNCTION forced_fun AS (x, k, b) -> k*x + b') subject diff --git a/spec/single/model_spec.rb b/spec/single/model_spec.rb index d32b3320..b7d9f5c1 100644 --- a/spec/single/model_spec.rb +++ b/spec/single/model_spec.rb @@ -36,16 +36,33 @@ class ModelPk < ActiveRecord::Base expect(Model.first.event_name).to eq('DB::Exception') end - describe '#do_execute' do + describe '#execute' do it 'returns formatted result' do - result = Model.connection.do_execute('SELECT 1 AS t') + result = Model.connection.execute('SELECT 1 AS t') + expect(result['data']).to eq([[1]]) + expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) + end + + it 'also works when a different format is passed as a keyword' do + result = Model.connection.execute('SELECT 1 AS t', format: 'JSONCompact') + expect(result['data']).to eq([[1]]) + expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) + end + end + + describe '#with_response_format' do + it 'returns formatted result' do + result = Model.connection.execute('SELECT 1 AS t') expect(result['data']).to eq([[1]]) expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) end context 'with JSONCompact format' do it 'returns formatted result' do - result = Model.connection.do_execute('SELECT 1 AS t', format: 'JSONCompact') + result = + Model.connection.with_response_format('JSONCompact') do + Model.connection.execute('SELECT 1 AS t') + end expect(result['data']).to eq([[1]]) expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) end @@ -53,11 +70,24 @@ class ModelPk < ActiveRecord::Base context 'with JSONCompactEachRowWithNamesAndTypes format' do it 'returns formatted result' do - result = Model.connection.do_execute('SELECT 1 AS t', format: 'JSONCompactEachRowWithNamesAndTypes') + result = + Model.connection.with_response_format('JSONCompactEachRowWithNamesAndTypes') do + Model.connection.execute('SELECT 1 AS t') + end expect(result['data']).to eq([[1]]) expect(result['meta']).to eq([{ 'name' => 't', 'type' => 'UInt8' }]) end end + + context 'with nil format' do + it 'omits the FORMAT clause' do + result = + Model.connection.with_response_format(nil) do + Model.connection.execute('SELECT 1 AS t') + end + expect(result.chomp).to eq('1') + end + end end describe '#create' do @@ -238,6 +268,55 @@ class ModelPk < ActiveRecord::Base end end + describe 'block-style settings' do + let!(:record) { Model.create!(event_name: 'some event', date: Date.current, datetime: Time.now) } + + let(:last_query_finder) do + <<~SQL.squish + SELECT query, Settings, event_time_microseconds + FROM system.query_log + WHERE query ILIKE 'SELECT sample.* FROM sample FORMAT %' + ORDER BY event_date DESC, event_time DESC, event_time_microseconds DESC + LIMIT 1 + SQL + end + + it 'sends the settings to the server' do + expect_any_instance_of(Net::HTTP).to receive(:post).and_wrap_original do |original_method, *args, **kwargs| + resource, sql, * = args + if sql.include?('SELECT sample.*') + query = resource.split('?').second + params = query.split('&').to_h { |pair| pair.split('=').map { |s| CGI.unescape(s) } } + expect(params['cast_keep_nullable']).to eq('1') + expect(params['log_comment']).to eq('Log Comment!') + end + original_method.call(*args, **kwargs) + end + + Model.connection.with_settings(cast_keep_nullable: 1, log_comment: 'Log Comment!') do + Model.all.load + end + end + + it 'resets settings to default outside the block' do + Model.connection.with_settings(cast_keep_nullable: 1, log_comment: 'Log Comment!') do + Model.all.load + end + + expect_any_instance_of(Net::HTTP).to receive(:post).and_wrap_original do |original_method, *args, **kwargs| + resource, sql, * = args + if sql.include?('SELECT sample.*') + query = resource.split('?').second + params = query.split('&').to_h { |pair| pair.split('=').map { |s| CGI.unescape(s) } } + expect(params).not_to include('cast_keep_nullable', 'log_comment') + end + original_method.call(*args, **kwargs) + end + + Model.all.load + end + end + describe '#using' do it 'works' do sql = Model.joins(:joins).using(:event_name, :date).to_sql