Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3562] Migrate to Quarkus JDBC implementation #3563

Merged
merged 1 commit into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion services/base-jdbc/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2020, 2021 Contributors to the Eclipse Foundation
Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation

See the NOTICE file(s) distributed with this work for additional
information regarding copyright ownership.
Expand Down Expand Up @@ -86,6 +86,10 @@
<scope>provided</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-agroal</artifactId>
</dependency>

<!-- testing -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2022, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -85,6 +85,30 @@ public interface JdbcOptions {
@WithDefault("3600")
int maximumIdleTime();

/**
* Gets the maximum connection time for acquiring a connection from the DB connection pool.
*
* @return The maximum connection time for acquiring a connection from the pool.
*/
@WithDefault("30")
int maximumConnectionTime();

/**
* Gets the connection validation time interval in the DB connection pool.
*
* @return The connection validation time interval in the pool.
*/
@WithDefault("30")
int validationTime();

/**
* Gets the connection leak time limit from the DB connection pool.
*
* @return The connection leak time limit from the pool.
*/
@WithDefault("60")
int leakTime();

/**
* Gets the name of the table that contains the data.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand All @@ -13,14 +13,22 @@

package org.eclipse.hono.service.base.jdbc.config;

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;

import io.agroal.api.configuration.AgroalConnectionPoolConfiguration.ConnectionValidator;
import io.agroal.api.configuration.AgroalDataSourceConfiguration.DataSourceImplementation;
import io.agroal.api.configuration.supplier.AgroalDataSourceConfigurationSupplier;
import io.agroal.api.security.NamePrincipal;
import io.agroal.api.security.SimplePassword;
import io.agroal.pool.DataSource;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.jdbc.JDBCClient;
Expand All @@ -35,6 +43,9 @@ public class JdbcProperties {
public static final int DEFAULT_MINIMUM_POOL_SIZE = 3;
public static final int DEFAULT_INITIAL_POOL_SIZE = 3;
public static final int DEFAULT_MAXIMUM_IDLE_TIME = 3600;
public static final int DEFAULT_MAXIMUM_CONNECTION_TIME = 30;
public static final int DEFAULT_VALIDATION_TIME = 30;
public static final int DEFAULT_LEAK_TIME = 60;
private static final Logger log = LoggerFactory.getLogger(JdbcProperties.class);

private String url;
Expand All @@ -45,6 +56,9 @@ public class JdbcProperties {
private int minimumPoolSize = DEFAULT_MINIMUM_POOL_SIZE;
private int initialPoolSize = DEFAULT_INITIAL_POOL_SIZE;
private int maximumIdleTime = DEFAULT_MAXIMUM_IDLE_TIME;
private int maximumConnectionTime = DEFAULT_MAXIMUM_CONNECTION_TIME;
private int validationTime = DEFAULT_VALIDATION_TIME;
private int leakTime = DEFAULT_LEAK_TIME;
mattkaem marked this conversation as resolved.
Show resolved Hide resolved
private String tableName;

/**
Expand All @@ -67,6 +81,9 @@ public JdbcProperties(final JdbcOptions options) {
setMinimumPoolSize(options.minimumPoolSize());
setInitialPoolSize(options.initialPoolSize());
setMaximumIdleTime(options.maximumIdleTime());
setMaximumConnectionTime(options.maximumConnectionTime());
setValidationTime(options.validationTime());
setLeakTime(options.leakTime());
options.password().ifPresent(this::setPassword);
options.tableName().ifPresent(this::setTableName);
setUrl(options.url());
Expand Down Expand Up @@ -129,6 +146,27 @@ public int getMaximumIdleTime() {
return maximumIdleTime;
}

public void setMaximumConnectionTime(final int maximumConnectionTime) {
this.maximumConnectionTime = maximumConnectionTime;
}
public int getMaximumConnectionTime() {
return maximumConnectionTime;
}

public void setValidationTime(final int validationTime) {
this.validationTime = validationTime;
}
public int getValidationTime() {
return validationTime;
}

public void setLeakTime(final int leakTime) {
this.leakTime = leakTime;
}
public int getLeakTime() {
return leakTime;
}

public String getTableName() {
return tableName;
}
Expand All @@ -155,11 +193,18 @@ public static JDBCClient dataSource(final Vertx vertx, final JdbcProperties data
config.put("driver_class", dataSourceProperties.getDriverClass());
}

final String maxIdleLabel = "max_idle_time";
final String maxConnectionLabel = "max_connection_time";
final String validationLabel = "validation_time";
final String leakLabel = "leak_time";
final String minSizeLabel = "min_pool_size";
final String maxSizeLabel = "max_pool_size";
final String initSizeLabel = "initial_pool_size";

putValidValueIntoConfig(config, "max_idle_time", dataSourceProperties.getMaximumIdleTime(), 0, true);
putValidValueIntoConfig(config, maxIdleLabel, dataSourceProperties.getMaximumIdleTime(), 0, true);
putValidValueIntoConfig(config, maxConnectionLabel, dataSourceProperties.getMaximumConnectionTime(), 0, true);
putValidValueIntoConfig(config, validationLabel, dataSourceProperties.getValidationTime(), 0, true);
putValidValueIntoConfig(config, leakLabel, dataSourceProperties.getLeakTime(), 0, true);
putValidValueIntoConfig(config, minSizeLabel, dataSourceProperties.getMinimumPoolSize(), 0, true);
putValidValueIntoConfig(config, maxSizeLabel, dataSourceProperties.getMaximumPoolSize(), Math.max(1, config.getInteger(minSizeLabel)), true);
// check that initial pool size is between min and max pool size
Expand All @@ -168,14 +213,43 @@ public static JDBCClient dataSource(final Vertx vertx, final JdbcProperties data

log.info("Creating new SQL client: {} - table: {}", config, dataSourceProperties.getTableName());

// put password after logging

config
.put("password", dataSourceProperties.getPassword());

// create new client

return JDBCClient.create(vertx, config);
final int minSize = config.getInteger(minSizeLabel);
final int maxSize = config.getInteger(maxSizeLabel);
final int initSize = config.getInteger(initSizeLabel);
final Duration idleTime = Duration.ofSeconds(config.getInteger(maxIdleLabel));
final Duration connectionTime = Duration.ofSeconds(config.getInteger(maxConnectionLabel));
final Duration validationTime = Duration.ofSeconds(config.getInteger(validationLabel));
final Duration leakTime = Duration.ofSeconds(config.getInteger(leakLabel));
final NamePrincipal username = Optional
.ofNullable(dataSourceProperties.getUsername())
.map(NamePrincipal::new)
.orElse(null);
final SimplePassword password = Optional
.ofNullable(dataSourceProperties.getPassword())
.map(SimplePassword::new)
.orElse(null);

final AgroalDataSourceConfigurationSupplier configuration = new AgroalDataSourceConfigurationSupplier()
.metricsEnabled(false)
.dataSourceImplementation(DataSourceImplementation.AGROAL)
.connectionPoolConfiguration(poolConfig -> poolConfig
.minSize(minSize)
.maxSize(maxSize)
.initialSize(initSize)
.acquisitionTimeout(connectionTime)
.validationTimeout(validationTime)
.leakTimeout(leakTime)
.reapTimeout(idleTime)
.connectionValidator(ConnectionValidator.defaultValidator())
.connectionFactoryConfiguration(connConfig -> connConfig
.jdbcUrl(dataSourceProperties.getUrl())
.connectionProviderClassName(dataSourceProperties.getDriverClass())
.principal(username)
.credential(password)));

return JDBCClient.create(vertx, new DataSource(configuration.get()));

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -124,7 +124,7 @@ protected Future<ResultSet> read(final SQLOperations operations, final DeviceKey

return expanded
.trace(this.tracer, spanContext)
.query(this.client);
.query(operations);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.ResultSet;
import io.vertx.ext.sql.SQLConnection;
import io.vertx.ext.sql.SQLOperations;
import io.vertx.ext.sql.UpdateResult;

/**
Expand Down Expand Up @@ -328,14 +329,14 @@ public Future<Versioned<Void>> createDevice(

log.debug("createDevice - statement: {}", expanded);

return getDeviceCount(key.getTenantId(), span.context(), this.countDevicesOfTenantStatement, null, null)
return getDeviceCount(connection, key.getTenantId(), span.context(), this.countDevicesOfTenantStatement, null, null)
.compose(currentDeviceCount -> tenant.checkDeviceLimitReached(
key.getTenantId(),
currentDeviceCount,
globalDevicesPerTenantLimit))
.compose(ok -> expanded
.trace(this.tracer, context)
.update(this.client)
.update(connection)
.recover(SQL::translateException))

.compose(x -> createGroups(connection, key, new HashSet<>(device.getMemberOf()), context));
Expand Down Expand Up @@ -649,6 +650,7 @@ public Future<UpdateResult> dropTenant(final String tenantId, final SpanContext
/**
* Gets the number of devices that are registered for a tenant.
*
* @param operations The SQL operations instance to use.
* @param tenantId The tenant to count devices for.
* @param spanContext The span to contribute to.
* @param countStatement The count statement to use.
Expand All @@ -657,7 +659,7 @@ public Future<UpdateResult> dropTenant(final String tenantId, final SpanContext
* @return A future tracking the outcome of the operation.
* @throws NullPointerException if tenant is {@code null}.
*/
public Future<Integer> getDeviceCount(final String tenantId, final SpanContext spanContext, final Statement countStatement, final String field, final String value) {
public Future<Integer> getDeviceCount(final SQLOperations operations, final String tenantId, final SpanContext spanContext, final Statement countStatement, final String field, final String value) {

Objects.requireNonNull(tenantId);

Expand All @@ -675,7 +677,7 @@ public Future<Integer> getDeviceCount(final String tenantId, final SpanContext s

return expanded
.trace(this.tracer, span.context())
.query(this.client)
.query(operations)
.map(r -> {
final var entries = r.getRows(true);
switch (entries.size()) {
Expand Down Expand Up @@ -1007,7 +1009,7 @@ public Future<SearchResult<DeviceWithId>> findDevices(final String tenantId, fin
.withTag(TracingHelper.TAG_TENANT_ID, tenantId)
.start();

final Future<Integer> deviceCountFuture = getDeviceCount(tenantId, span.context(), countStatement, field, value);
final Future<Integer> deviceCountFuture = getDeviceCount(this.client, tenantId, span.context(), countStatement, field, value);

return deviceCountFuture
.compose(count -> expanded.trace(this.tracer, span.context()).query(this.client))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public Future<Versioned<Void>> create(final String tenantId, final Tenant tenant
log.debug("create - statement: {}", expanded);
return expanded
.trace(this.tracer, span.context())
.update(this.client)
.update(connection)
.recover(SQL::translateException)

// insert all trust anchors
Expand Down Expand Up @@ -443,13 +443,13 @@ protected Future<UpdateResult> updateJsonField(
// execute update
final var result = expanded
.trace(this.tracer, span.context())
.update(this.client);
.update(operations);

// process result, check optimistic lock
return checkOptimisticLock(
result, span,
resourceVersion,
checkSpan -> readTenantEntryById(this.client, tenantId, checkSpan.context()));
checkSpan -> readTenantEntryById(operations, tenantId, checkSpan.context()));
}

/**
Expand Down
14 changes: 13 additions & 1 deletion services/device-registry-jdbc/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2022 Contributors to the Eclipse Foundation
Copyright (c) 2022, 2023 Contributors to the Eclipse Foundation

See the NOTICE file(s) distributed with this work for additional
information regarding copyright ownership.
Expand Down Expand Up @@ -62,8 +62,20 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-agroal</artifactId>
</dependency>

<!-- JDBC drivers -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jdbc-postgresql</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jdbc-h2</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ quarkus.jackson.accept-case-insensitive-enums=true
# fail deserialization of JSON objects sent by clients if they contain unexpected content
quarkus.jackson.fail-on-unknown-properties=true

# enable h2 and postgres extensions
quarkus.datasource.h2.db-kind=h2
quarkus.datasource.pg.db-kind=pg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2020, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2020, 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -153,6 +153,8 @@ void startDevices(final Vertx vertx) throws IOException, SQLException {

private JdbcProperties resolveJdbcProperties() {
final var jdbc = new JdbcProperties();
jdbc.setInitialPoolSize(0);
mattkaem marked this conversation as resolved.
Show resolved Hide resolved
jdbc.setMinimumPoolSize(0);
if (DATABASE_TYPE != DatabaseType.H2) {
final JdbcDatabaseContainer<?> databaseContainer = getDatabaseContainer();
jdbc.setDriverClass(databaseContainer.getDriverClassName());
Expand Down
Loading