Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-5827] Set all nodes to synchronous replicas #784

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
# See LICENSE file for licensing details.

options:
synchronous_node_count:
description: |
Sets the number of synchronous nodes to be maintained in the cluster. Should be
either "all", "majority" or a positive value.
type: string
default: "all"
durability_synchronous_commit:
description: |
Sets the current transactions synchronization level. This charm allows only the
Expand Down
22 changes: 21 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,13 +445,25 @@ def get_unit_ip(self, unit: Unit) -> Optional[str]:
else:
return None

def updated_synchronous_node_count(self) -> bool:
"""Tries to update synchronous_node_count configuration and reports the result."""
try:
self._patroni.update_synchronous_node_count()
return True
except RetryError:
logger.debug("Unable to set synchronous_node_count")
return False

def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
"""The leader removes the departing units from the list of cluster members."""
# Allow leader to update endpoints if it isn't leaving.
if not self.unit.is_leader() or event.departing_unit == self.unit:
return

if "cluster_initialised" not in self._peers.data[self.app]:
if (
"cluster_initialised" not in self._peers.data[self.app]
or not self.updated_synchronous_node_count()
):
logger.debug(
"Deferring on_peer_relation_departed: Cluster must be initialized before members can leave"
)
Expand Down Expand Up @@ -655,6 +667,10 @@ def _on_config_changed(self, event) -> None:
self.unit.status = BlockedStatus("Configuration Error. Please check the logs")
logger.error("Invalid configuration: %s", str(e))
return
if not self.updated_synchronous_node_count():
logger.debug("Defer on_config_changed: unable to set synchronous node count")
event.defer()
return

if self.is_blocked and "Configuration Error" in self.unit.status.message:
self._set_active_status()
Expand All @@ -668,6 +684,9 @@ def _on_config_changed(self, event) -> None:
# Enable and/or disable the extensions.
self.enable_disable_extensions()

self._unblock_extensions()

def _unblock_extensions(self) -> None:
# Unblock the charm after extensions are enabled (only if it's blocked due to application
# charms requesting extensions).
if self.unit.status.message != EXTENSIONS_BLOCKING_MESSAGE:
Expand Down Expand Up @@ -778,6 +797,7 @@ def _add_members(self, event) -> None:
for member in self._hosts - self._patroni.cluster_members:
logger.debug("Adding %s to cluster", member)
self.add_cluster_member(member)
self._patroni.update_synchronous_node_count()
except NotReadyError:
logger.info("Deferring reconfigure: another member doing sync right now")
event.defer()
Expand Down
5 changes: 3 additions & 2 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
"""Structured configuration for the PostgreSQL charm."""

import logging
from typing import Optional
from typing import Literal, Optional

from charms.data_platform_libs.v0.data_models import BaseConfigModel
from pydantic import validator
from pydantic import PositiveInt, validator

logger = logging.getLogger(__name__)


class CharmConfig(BaseConfigModel):
"""Manager for the structured configuration."""

synchronous_node_count: Literal["all", "majority"] | PositiveInt
durability_synchronous_commit: Optional[str]
instance_default_text_search_config: Optional[str]
instance_password_encryption: Optional[str]
Expand Down
36 changes: 35 additions & 1 deletion src/patroni.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class SwitchoverFailedError(Exception):
"""Raised when a switchover failed for some reason."""


class UpdateSyncNodeCountError(Exception):
"""Raised when updating synchronous_node_count failed for some reason."""


class Patroni:
"""This class handles the communication with Patroni API and configuration files."""

Expand Down Expand Up @@ -126,6 +130,36 @@ def _get_alternative_patroni_url(
url = self._patroni_url
return url

@property
def _synchronous_node_count(self) -> int:
planned_units = self._charm.app.planned_units()
if self._charm.config.synchronous_node_count == "all":
return planned_units - 1
elif self._charm.config.synchronous_node_count == "majority":
return planned_units // 2
return (
self._charm.config.synchronous_node_count
if self._charm.config.synchronous_node_count < self._members_count - 1
else planned_units - 1
)

def update_synchronous_node_count(self) -> None:
"""Update synchronous_node_count."""
# Try to update synchronous_node_count.
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
r = requests.patch(
f"{self._patroni_url}/config",
json={"synchronous_node_count": self._synchronous_node_count},
verify=self._verify,
auth=self._patroni_auth,
timeout=PATRONI_TIMEOUT,
)

# Check whether the update was unsuccessful.
if r.status_code != 200:
raise UpdateSyncNodeCountError(f"received {r.status_code}")

def get_primary(
self, unit_name_pattern=False, alternative_endpoints: Optional[List[str]] = None
) -> str:
Expand Down Expand Up @@ -525,7 +559,7 @@ def render_patroni_yml_file(
restore_to_latest=restore_to_latest,
stanza=stanza,
restore_stanza=restore_stanza,
minority_count=self._members_count // 2,
synchronous_node_count=self._synchronous_node_count,
version=self.rock_postgresql_version.split(".")[0],
pg_parameters=parameters,
primary_cluster_endpoint=self._charm.async_replication.get_primary_cluster_endpoint(),
Expand Down
1 change: 1 addition & 0 deletions src/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def _on_upgrade_changed(self, event) -> None:
return

self.charm.update_config()
self.charm.updated_synchronous_node_count()

def _on_upgrade_charm_check_legacy(self, event: UpgradeCharmEvent) -> None:
if not self.peer_relation:
Expand Down
2 changes: 1 addition & 1 deletion templates/patroni.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ bootstrap:
dcs:
synchronous_mode: true
failsafe_mode: true
synchronous_node_count: {{ minority_count }}
synchronous_node_count: {{ synchronous_node_count }}
postgresql:
use_pg_rewind: true
remove_data_directory_on_rewind_failure: true
Expand Down
41 changes: 41 additions & 0 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,26 @@ async def get_postgresql_parameter(ops_test: OpsTest, parameter_name: str) -> Op
return parameter_value


async def get_leader(model: Model, application_name: str) -> str:
"""Get the standby leader name.

Args:
model: the model instance.
application_name: the name of the application to get the value for.

Returns:
the name of the standby leader.
"""
status = await model.get_status()
first_unit_ip = next(
unit for unit in status["applications"][application_name]["units"].values()
)["address"]
cluster = get_patroni_cluster(first_unit_ip)
for member in cluster["members"]:
if member["role"] == "leader":
return member["name"]


async def get_standby_leader(model: Model, application_name: str) -> str:
"""Get the standby leader name.

Expand Down Expand Up @@ -1146,3 +1166,24 @@ async def remove_unit_force(ops_test: OpsTest, num_units: int):
timeout=1000,
wait_for_exact_units=scale,
)


async def get_cluster_roles(
ops_test: OpsTest, unit_name: str
) -> dict[str, str | list[str] | None]:
"""Returns whether the unit a replica in the cluster."""
unit_ip = await get_unit_address(ops_test, unit_name)
members = {"replicas": [], "primaries": [], "sync_standbys": []}
member_list = get_patroni_cluster(unit_ip)["members"]
logger.info(f"Cluster members are: {member_list}")
for member in member_list:
role = member["role"]
name = "/".join(member["name"].rsplit("-", 1))
if role == "leader":
members["primaries"].append(name)
elif role == "sync_standby":
members["sync_standbys"].append(name)
else:
members["replicas"].append(name)

return members
16 changes: 8 additions & 8 deletions tests/integration/ha_tests/test_async_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
from .helpers import (
are_writes_increasing,
check_writes,
get_leader,
get_standby_leader,
get_sync_standby,
start_continuous_writes,
)

Expand Down Expand Up @@ -415,11 +415,11 @@ async def test_async_replication_failover_in_main_cluster(
logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test)

sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME)
logger.info(f"Sync-standby: {sync_standby}")
logger.info("deleting the sync-standby pod")
primary = await get_leader(first_model, DATABASE_APP_NAME)
logger.info(f"Primary: {primary}")
logger.info("deleting the primary pod")
client = Client(namespace=first_model.info.name)
client.delete(Pod, name=sync_standby.replace("/", "-"))
client.delete(Pod, name=primary.replace("/", "-"))

async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL):
await gather(
Expand All @@ -432,9 +432,9 @@ async def test_async_replication_failover_in_main_cluster(
)

# Check that the sync-standby unit is not the same as before.
new_sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME)
logger.info(f"New sync-standby: {new_sync_standby}")
assert new_sync_standby != sync_standby, "Sync-standby is the same as before"
Comment on lines -435 to -437
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync standby will not change, since all the replicas are sync standbys.

new_primary = await get_leader(first_model, DATABASE_APP_NAME)
logger.info(f"New sync-standby: {new_primary}")
assert new_primary != primary, "Sync-standby is the same as before"

logger.info("Ensure continuous_writes after the crashed unit")
await are_writes_increasing(ops_test)
Expand Down
84 changes: 84 additions & 0 deletions tests/integration/ha_tests/test_synchronous_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/usr/bin/env python3
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.
import pytest
from pytest_operator.plugin import OpsTest
from tenacity import Retrying, stop_after_attempt, wait_fixed

from ..helpers import app_name, build_and_deploy
from .helpers import get_cluster_roles


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_build_and_deploy(ops_test: OpsTest) -> None:
"""Build and deploy three unit of PostgreSQL."""
wait_for_apps = False
# It is possible for users to provide their own cluster for HA testing. Hence, check if there
# is a pre-existing cluster.
if not await app_name(ops_test):
wait_for_apps = True
await build_and_deploy(ops_test, 3, wait_for_idle=False)

if wait_for_apps:
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(status="active", timeout=1000, raise_on_error=False)


@pytest.mark.group(1)
async def test_default_all(ops_test: OpsTest) -> None:
app = await app_name(ops_test)

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=300)

for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
with attempt:
roles = await get_cluster_roles(
ops_test, ops_test.model.applications[app].units[0].name
)

assert len(roles["primaries"]) == 1
assert len(roles["sync_standbys"]) == 2
assert len(roles["replicas"]) == 0


@pytest.mark.group(1)
async def test_majority(ops_test: OpsTest) -> None:
app = await app_name(ops_test)

await ops_test.model.applications[app].set_config({"synchronous_node_count": "majority"})

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[app], status="active")

for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
with attempt:
roles = await get_cluster_roles(
ops_test, ops_test.model.applications[app].units[0].name
)

assert len(roles["primaries"]) == 1
assert len(roles["sync_standbys"]) == 1
assert len(roles["replicas"]) == 1


@pytest.mark.group(1)
async def test_constant(ops_test: OpsTest) -> None:
"""Kill primary unit, check reelection."""
app = await app_name(ops_test)

await ops_test.model.applications[app].set_config({"synchronous_node_count": "2"})

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=300)

for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
with attempt:
roles = await get_cluster_roles(
ops_test, ops_test.model.applications[app].units[0].name
)

assert len(roles["primaries"]) == 1
assert len(roles["sync_standbys"]) == 2
assert len(roles["replicas"]) == 0
3 changes: 1 addition & 2 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,15 +761,14 @@ async def switchover(
)
assert response.status_code == 200, f"Switchover status code is {response.status_code}"
app_name = current_primary.split("/")[0]
minority_count = len(ops_test.model.applications[app_name].units) // 2
for attempt in Retrying(stop=stop_after_attempt(30), wait=wait_fixed(2), reraise=True):
with attempt:
response = requests.get(f"http://{primary_ip}:8008/cluster")
assert response.status_code == 200
standbys = len([
member for member in response.json()["members"] if member["role"] == "sync_standby"
])
assert standbys >= minority_count
assert standbys == len(ops_test.model.applications[app_name].units) - 1


async def wait_for_idle_on_blocked(
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ async def test_config_parameters(ops_test: OpsTest) -> None:
test_string = "abcXYZ123"

configs = [
{"synchronous_node_count": ["0", "1"]}, # config option is greater than 0
{
"synchronous_node_count": [test_string, "all"]
}, # config option is one of `all`, `minority` or `majority`
{
"durability_synchronous_commit": [test_string, "on"]
}, # config option is one of `on`, `remote_apply` or `remote_write`
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def test_tls(ops_test: OpsTest) -> None:
patroni_password = await get_password(ops_test, "patroni")
cluster_info = requests.get(f"https://{primary_address}:8008/cluster", verify=False)
for member in cluster_info.json()["members"]:
if member["role"] == "replica":
if member["role"] != "leader":
replica = "/".join(member["name"].rsplit("-", 1))

# Check if TLS enabled for replication
Expand Down
Loading
Loading