diff --git a/.github/workflows/container.yml b/.github/workflows/container.yml index 0ed6475e2..faf309aba 100644 --- a/.github/workflows/container.yml +++ b/.github/workflows/container.yml @@ -33,7 +33,7 @@ jobs: run: | echo is_release=${{ contains(github.ref, 'refs/tags/') }} | tee -a $GITHUB_OUTPUT echo is_dev=${{ ! contains(github.ref, 'refs/tags/') }} | tee -a $GITHUB_OUTPUT - echo version=$(git describe --always --tags) | tee -a $GITHUB_OUTPUT + echo version=$(git describe --tags | cut -d '-' -f -2 | sed 's/-/.dev/g') | tee -a $GITHUB_OUTPUT # QEMU is used to set up VMs for building non-x86_64 images. - name: Set up QEMU diff --git a/README.rst b/README.rst index 0cc7489e4..8e1e7c633 100644 --- a/README.rst +++ b/README.rst @@ -491,6 +491,11 @@ Keys to take special care are the ones needed to configure Kafka and advertised_ * - ``log_format`` - ``%(name)-20s\t%(threadName)s\t%(levelname)-8s\t%(message)s`` - Log format + * - ``waiting_time_before_acting_as_master_ms`` + - ``5000`` + - The time that a master wait before becoming an active master if at the previous round of election wasn't the master (in that case the waiting time its skipped). + Should be an upper bound of the time required for a master to write a message in the kafka topic + the time required from a node in the cluster to consume the + Log of messages. If the value its too low there is the risk under high load of producing different schemas with the ID. Authentication and authorization of Karapace Schema Registry REST API diff --git a/src/karapace/backup/api.py b/src/karapace/backup/api.py index d06c99ebe..3da9a2304 100644 --- a/src/karapace/backup/api.py +++ b/src/karapace/backup/api.py @@ -373,13 +373,20 @@ def _handle_restore_topic( instruction: RestoreTopic, config: Config, skip_topic_creation: bool = False, + override_replication_factor: int | None = None, ) -> None: if skip_topic_creation: return + repl_factor = instruction.replication_factor + if override_replication_factor is not None: + LOG.info( + "Overriding replication factor with: %d (was: %d)", override_replication_factor, instruction.replication_factor + ) + repl_factor = override_replication_factor if not _maybe_create_topic( config=config, name=instruction.topic_name, - replication_factor=instruction.replication_factor, + replication_factor=repl_factor, topic_configs=instruction.topic_configs, ): raise BackupTopicAlreadyExists(f"Topic to restore '{instruction.topic_name}' already exists") @@ -426,6 +433,7 @@ def restore_backup( backup_location: ExistingFile, topic_name: TopicName, skip_topic_creation: bool = False, + override_replication_factor: int | None = None, ) -> None: """Restores a backup from the specified location into the configured topic. @@ -475,7 +483,7 @@ def _check_producer_exception() -> None: _handle_restore_topic_legacy(instruction, config, skip_topic_creation) producer = stack.enter_context(_producer(config, instruction.topic_name)) elif isinstance(instruction, RestoreTopic): - _handle_restore_topic(instruction, config, skip_topic_creation) + _handle_restore_topic(instruction, config, skip_topic_creation, override_replication_factor) producer = stack.enter_context(_producer(config, instruction.topic_name)) elif isinstance(instruction, ProducerSend): if producer is None: diff --git a/src/karapace/backup/cli.py b/src/karapace/backup/cli.py index 5e3d72854..7125b1e04 100644 --- a/src/karapace/backup/cli.py +++ b/src/karapace/backup/cli.py @@ -76,6 +76,15 @@ def parse_args() -> argparse.Namespace: ), ) + parser_restore.add_argument( + "--override-replication-factor", + help=( + "Override the replication factor that is save in the backup. This is needed when restoring a backup from a" + "downsized cluster (like scaling down from 6 to 3 nodes). This has effect only for V3 backups." + ), + type=int, + ) + return parser.parse_args() @@ -115,6 +124,7 @@ def dispatch(args: argparse.Namespace) -> None: backup_location=api.locate_backup_file(location), topic_name=api.normalize_topic_name(args.topic, config), skip_topic_creation=args.skip_topic_creation, + override_replication_factor=args.override_replication_factor, ) except BackupDataRestorationError: traceback.print_exc() diff --git a/src/karapace/config.py b/src/karapace/config.py index 7f02b7712..c7f597092 100644 --- a/src/karapace/config.py +++ b/src/karapace/config.py @@ -85,6 +85,7 @@ class Config(TypedDict): kafka_schema_reader_strict_mode: bool kafka_retriable_errors_silenced: bool use_protobuf_formatter: bool + waiting_time_before_acting_as_master_ms: int sentry: NotRequired[Mapping[str, object]] tags: NotRequired[Mapping[str, object]] @@ -163,6 +164,7 @@ class ConfigDefaults(Config, total=False): "kafka_schema_reader_strict_mode": False, "kafka_retriable_errors_silenced": True, "use_protobuf_formatter": False, + "waiting_time_before_acting_as_master_ms": 5000, } SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD] diff --git a/src/karapace/coordinator/master_coordinator.py b/src/karapace/coordinator/master_coordinator.py index 5abd4bd31..172c50f34 100644 --- a/src/karapace/coordinator/master_coordinator.py +++ b/src/karapace/coordinator/master_coordinator.py @@ -13,25 +13,42 @@ from karapace.config import Config from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS +from karapace.typing import SchemaReaderStoppper +from threading import Thread from typing import Final import asyncio import logging +import time __all__ = ("MasterCoordinator",) + LOG = logging.getLogger(__name__) class MasterCoordinator: - """Handles primary election""" + """Handles primary election + + The coordination is run in own dedicated thread, under stress situation the main + eventloop could have queue of items to work and having own thread will give more + runtime for the coordination tasks as Python intrepreter will switch the active + thread by the configured thread switch interval. Default interval in CPython is + 5 milliseconds. + """ def __init__(self, config: Config) -> None: super().__init__() self._config: Final = config self._kafka_client: AIOKafkaClient | None = None - self._running = True self._sc: SchemaCoordinator | None = None + self._closing = asyncio.Event() + self._thread: Thread = Thread(target=self._start_loop, daemon=True) + self._loop: asyncio.AbstractEventLoop | None = None + self._schema_reader_stopper: SchemaReaderStoppper | None = None + + def set_stoppper(self, schema_reader_stopper: SchemaReaderStoppper) -> None: + self._schema_reader_stopper = schema_reader_stopper @property def schema_coordinator(self) -> SchemaCoordinator | None: @@ -41,7 +58,18 @@ def schema_coordinator(self) -> SchemaCoordinator | None: def config(self) -> Config: return self._config - async def start(self) -> None: + def start(self) -> None: + self._thread.start() + + def _start_loop(self) -> None: + # we should avoid the reassignment otherwise we leak resources + assert self._loop is None, "Loop already started" + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + self._loop.create_task(self._async_loop()) + self._loop.run_forever() + + async def _async_loop(self) -> None: self._kafka_client = self.init_kafka_client() # Wait until schema coordinator is ready. # This probably needs better synchronization than plain waits. @@ -61,10 +89,22 @@ async def start(self) -> None: await asyncio.sleep(0.5) self._sc = self.init_schema_coordinator() - while True: - if self._sc.ready(): - return - await asyncio.sleep(0.5) + + # keeping the thread sleeping until it die. + # we need to keep the schema_coordinator running + # it contains the `heartbeat` and coordination logic. + await self._closing.wait() + + LOG.info("Closing master_coordinator") + if self._sc: + await self._sc.close() + while self._loop is not None and not self._loop.is_closed(): + self._loop.stop() + if not self._loop.is_running(): + self._loop.close() + time.sleep(0.5) + if self._kafka_client: + await self._kafka_client.close() def init_kafka_client(self) -> AIOKafkaClient: ssl_context = create_ssl_context( @@ -90,8 +130,10 @@ def init_kafka_client(self) -> AIOKafkaClient: def init_schema_coordinator(self) -> SchemaCoordinator: assert self._kafka_client is not None + assert self._schema_reader_stopper is not None schema_coordinator = SchemaCoordinator( client=self._kafka_client, + schema_reader_stopper=self._schema_reader_stopper, election_strategy=self._config.get("master_election_strategy", "lowest"), group_id=self._config["group_id"], hostname=self._config["advertised_hostname"], @@ -99,6 +141,7 @@ def init_schema_coordinator(self) -> SchemaCoordinator: port=self._config["advertised_port"], scheme=self._config["advertised_protocol"], session_timeout_ms=self._config["session_timeout_ms"], + waiting_time_before_acting_as_master_ms=self._config["waiting_time_before_acting_as_master_ms"], ) schema_coordinator.start() return schema_coordinator @@ -107,7 +150,7 @@ def get_coordinator_status(self) -> SchemaCoordinatorStatus: assert self._sc is not None generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID return SchemaCoordinatorStatus( - is_primary=self._sc.are_we_master if self._sc is not None else None, + is_primary=self._sc.are_we_master() if self._sc is not None else None, is_primary_eligible=self._config["master_eligibility"], primary_url=self._sc.master_url if self._sc is not None else None, is_running=True, @@ -116,12 +159,22 @@ def get_coordinator_status(self) -> SchemaCoordinatorStatus: def get_master_info(self) -> tuple[bool | None, str | None]: """Return whether we're the master, and the actual master url that can be used if we're not""" - assert self._sc is not None - return self._sc.are_we_master, self._sc.master_url + if not self._sc: + return False, None + + if not self._sc.ready(): + # we should wait for a while after we have been elected master, we should also consume + # all the messages in the log before proceeding, check the doc of `self._sc.are_we_master` + # for more details + return False, None + + return self._sc.are_we_master(), self._sc.master_url + + def __send_close_event(self) -> None: + self._closing.set() async def close(self) -> None: - LOG.info("Closing master_coordinator") - if self._sc: - await self._sc.close() - if self._kafka_client: - await self._kafka_client.close() + LOG.info("Sending the close signal to the master coordinator thread") + if self._loop is None: + raise ValueError("Cannot stop the loop before `.start()` is called") + self._loop.call_soon_threadsafe(self.__send_close_event) diff --git a/src/karapace/coordinator/schema_coordinator.py b/src/karapace/coordinator/schema_coordinator.py index 151a1db26..d99f1c79f 100644 --- a/src/karapace/coordinator/schema_coordinator.py +++ b/src/karapace/coordinator/schema_coordinator.py @@ -27,7 +27,7 @@ from aiokafka.util import create_future, create_task from collections.abc import Coroutine, Sequence from karapace.dataclasses import default_dataclass -from karapace.typing import JsonData +from karapace.typing import JsonData, SchemaReaderStoppper from karapace.utils import json_decode, json_encode from karapace.version import __version__ from typing import Any, Final @@ -118,12 +118,12 @@ class SchemaCoordinator: Contains original comments and also Schema Registry specific comments. """ - are_we_master: bool | None = None master_url: str | None = None def __init__( self, client: AIOKafkaClient, + schema_reader_stopper: SchemaReaderStoppper, hostname: str, port: int, scheme: str, @@ -135,6 +135,7 @@ def __init__( rebalance_timeout_ms: int = 30000, retry_backoff_ms: int = 100, session_timeout_ms: int = 10000, + waiting_time_before_acting_as_master_ms: int = 5000, ) -> None: # Coordination flags and futures self._client: Final = client @@ -147,7 +148,17 @@ def __init__( self.scheme: Final = scheme self.master_eligibility: Final = master_eligibility self.master_url: str | None = None - self.are_we_master = False + self._schema_reader_stopper = schema_reader_stopper + self._are_we_master: bool | None = False + # a value that its strictly higher than any clock, so we are sure + # we are never going to consider this the leader without explictly passing + # from False to True for the `_are_we_master` variable. + self._initial_election_sec: float | None = float("infinity") + # used to understand if I need to wait the `waiting_time_before_acting_as_master_ms` + # before acting as a leader or not, if the last time I was leader was less than 5 seconds + # ago I can skip the waiting phase (note that I'm always using my own time, no problems due + # to skew of clocks between machines). + self._last_time_i_was_leader: float = float("-infinity") self.rejoin_needed_fut: asyncio.Future[None] | None = None self._coordinator_dead_fut: asyncio.Future[None] | None = None @@ -163,6 +174,7 @@ def __init__( self._rebalance_timeout_ms: Final = rebalance_timeout_ms self._retry_backoff_ms: Final = retry_backoff_ms self._session_timeout_ms: Final = session_timeout_ms + self._waiting_time_before_acting_as_master_ms: Final = waiting_time_before_acting_as_master_ms self._coordinator_lookup_lock: Final = asyncio.Lock() self._coordination_task: asyncio.Future[None] | None = None @@ -182,6 +194,48 @@ def __init__( self._metadata_snapshot: list[Assignment] = [] + def is_master_assigned_to_myself(self) -> bool: + return self._are_we_master or False + + def are_we_master(self) -> bool | None: + """ + After a new election its made we should wait for a while since the previous master could have produced + a new message shortly before being disconnected from the cluster. + If this is true the max id selected for the next schema ID, so we can create + two schemas with the same id (or even more if rapid elections are one after another). + The fix its to wait for ~= 5 seconds if new messages arrives before becoming available as a master. + The condition to resume being the master its: + no new messages are still to be processed + at least 5 seconds have passed since we were elected master + """ + if self._are_we_master is None: + # `self._are_we_master` is `None` only during the perform of the assignment + # where we don't know if we are master yet + LOG.warning("No new elections performed yet.") + return None + + if not self._ready or not self._schema_reader_stopper.ready(): + return False + + if self._are_we_master and self._initial_election_sec is not None: + # `time.monotonic()` because we don't want the time to go back or forward because of + # e.g. ntp + if time.monotonic() > self._initial_election_sec + (self._waiting_time_before_acting_as_master_ms / 1000): + # set the value to `None` since it's expensive to call each time the monotonic clock. + LOG.info("Declaring myself as master since %s are passed!", self._waiting_time_before_acting_as_master_ms) + self._initial_election_sec = None + # this is the last point in time were we wait till to the end of the log queue for new + # incoming messages. + self._schema_reader_stopper.set_not_ready() + return False + + LOG.info( + "Newly elected as master, waiting %s ms before writing any schema to let other requests complete", + self._waiting_time_before_acting_as_master_ms, + ) + return False + + return self._are_we_master + def start(self) -> None: """Must be called after creating SchemaCoordinator object to initialize futures and start the coordination task. @@ -281,6 +335,10 @@ async def _maybe_leave_group(self) -> None: LOG.warning("LeaveGroup request failed: %s", err) else: LOG.info("LeaveGroup request succeeded") + # to avoid sleeping if we were the master, a new actor join the cluster + # and we are immediately elected as leader again. + if self.are_we_master(): + self._last_time_i_was_leader = time.monotonic() self.reset_generation() def _handle_metadata_update(self, _: ClusterMetadata) -> None: @@ -349,7 +407,7 @@ async def perform_assignment( response_data.protocol, response_data.members, ) - self.are_we_master = None + self._are_we_master = None error = NO_ERROR urls = {} fallback_urls = {} @@ -417,13 +475,40 @@ async def _on_join_complete( # On Kafka protocol we can be assigned to be master, but if not master eligible, then we're not master for real if member_assignment["master"] == member_id and member_identity["master_eligibility"]: self.master_url = master_url - self.are_we_master = True + self._are_we_master = True + ive_never_been_a_master = self._last_time_i_was_leader == float("-inf") + another_master_could_have_been_elected = ( + self._last_time_i_was_leader + (self._waiting_time_before_acting_as_master_ms / 1000) < time.monotonic() + ) + if ive_never_been_a_master or another_master_could_have_been_elected: + # we need to wait late record arrivals only in the case there + # was a master change, the time before acting its always respect + # to which was the previous master (if we were master no need + # to wait more before acting) + self._schema_reader_stopper.set_not_ready() + # flag that its set at startup, fix this + # `time.monotonic()` because we don't want the time to go back or forward because of e.g. ntp + self._initial_election_sec = time.monotonic() + + LOG.info( + "Declaring myself as not master for %s milliseconds, " + "another master meanwhile could have added other records", + self._waiting_time_before_acting_as_master_ms, + ) + else: + LOG.info( + "Starting immediately serving requests since I was master less than %s milliseconds ago, " + "no other masters could have written a new schema meanwhile", + self._waiting_time_before_acting_as_master_ms, + ) elif not member_identity["master_eligibility"]: + LOG.warning("Member %s is not eligible as a master", member_id) self.master_url = None - self.are_we_master = False + self._are_we_master = False else: + LOG.info("We are not elected as master") self.master_url = master_url - self.are_we_master = False + self._are_we_master = False self._ready = True return None @@ -434,6 +519,7 @@ def coordinator_dead(self) -> None: """ if self._coordinator_dead_fut is not None and self.coordinator_id is not None: LOG.warning("Marking the coordinator dead (node %s)for group %s.", self.coordinator_id, self.group_id) + self._are_we_master = False self.coordinator_id = None self._coordinator_dead_fut.set_result(None) @@ -441,6 +527,8 @@ def reset_generation(self) -> None: """Coordinator did not recognize either generation or member_id. Will need to re-join the group. """ + LOG.info("Resetting generation status") + self._are_we_master = False self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.request_rejoin() @@ -514,6 +602,8 @@ async def __coordination_routine(self) -> None: try: await self.ensure_coordinator_known() if self.need_rejoin(): + if self.are_we_master(): + self._last_time_i_was_leader = time.monotonic() new_assignment = await self.ensure_active_group() if not new_assignment: continue diff --git a/src/karapace/key_format.py b/src/karapace/key_format.py index e39d4c49e..64f0e525a 100644 --- a/src/karapace/key_format.py +++ b/src/karapace/key_format.py @@ -5,16 +5,31 @@ See LICENSE for details """ +from collections import OrderedDict from enum import Enum from karapace.typing import ArgJsonObject from karapace.utils import json_encode -from typing import Optional +from types import MappingProxyType +from typing import Final, Optional -SCHEMA_KEY_ORDER = ["keytype", "subject", "version", "magic"] -CONFIG_KEY_ORDER = ["keytype", "subject", "magic"] -NOOP_KEY_ORDER = ["keytype", "magic"] +# used by the OrderedDict for the relative order of keys. +SCHEMA_KEY_ORDER: Final[tuple[str, str, str, str]] = ("keytype", "subject", "version", "magic") +CONFIG_KEY_ORDER: Final[tuple[str, str, str]] = ("keytype", "subject", "magic") +NOOP_KEY_ORDER: Final[tuple[str, str]] = ("keytype", "magic") -CANONICAL_KEY_ORDERS = [SCHEMA_KEY_ORDER, CONFIG_KEY_ORDER, NOOP_KEY_ORDER] +KEY_ORDER = MappingProxyType( + { + tuple(sorted(SCHEMA_KEY_ORDER)): SCHEMA_KEY_ORDER, + tuple(sorted(CONFIG_KEY_ORDER)): CONFIG_KEY_ORDER, + tuple(sorted(NOOP_KEY_ORDER)): NOOP_KEY_ORDER, + } +) + +CANONICAL_KEY_ORDERS: tuple[tuple[str, str, str, str], tuple[str, str, str], tuple[str, str]] = ( + SCHEMA_KEY_ORDER, + CONFIG_KEY_ORDER, + NOOP_KEY_ORDER, +) class KeyMode(Enum): @@ -72,8 +87,11 @@ def format_key( corrected_key["version"] = key["version"] # Magic is the last element corrected_key["magic"] = key["magic"] - return json_encode(corrected_key, binary=True, sort_keys=False, compact=True) + + fixed_order = KEY_ORDER[tuple(sorted(corrected_key.keys()))] + fixed_order_dict = OrderedDict(list(sorted(corrected_key.items(), key=lambda t: fixed_order.index(t[0])))) + return json_encode(fixed_order_dict, binary=True, sort_keys=False, compact=True) def is_key_in_canonical_format(key: ArgJsonObject) -> bool: - return list(key.keys()) in CANONICAL_KEY_ORDERS + return tuple(key.keys()) in CANONICAL_KEY_ORDERS diff --git a/src/karapace/protobuf/serialization.py b/src/karapace/protobuf/serialization.py index 6c3ca61fd..123e80c8f 100644 --- a/src/karapace/protobuf/serialization.py +++ b/src/karapace/protobuf/serialization.py @@ -93,17 +93,31 @@ def _deserialize_msg(msgtype: Any) -> MessageElement: for nested_enum in msgtype.enum_type: nested_types.append(_deserialize_enum(nested_enum)) - one_ofs: list[OneOfElement] = [OneOfElement(oneof.name) for oneof in msgtype.oneof_decl] + one_ofs: list[OneOfElement | None] = [OneOfElement(oneof.name) for oneof in msgtype.oneof_decl] for f in msgtype.field: sf = _deserialize_field(f) - if f.HasField("oneof_index"): + is_oneof = f.HasField("oneof_index") + is_proto3_optional = f.HasField("oneof_index") and f.HasField("proto3_optional") and f.proto3_optional + if is_proto3_optional: + # Every proto3 optional field is placed into a one-field oneof, called a "synthetic" oneof, + # as it was not present in the source .proto file. + # This will make sure that we don't interpret those optionals as oneof. + one_ofs[f.oneof_index] = None + fields.append(sf) + elif is_oneof: one_ofs[f.oneof_index].fields.append(sf) else: fields.append(sf) + one_ofs_filtered: list[OneOfElement] = [oneof for oneof in one_ofs if oneof is not None] return MessageElement( - DEFAULT_LOCATION, msgtype.name, nested_types=nested_types, reserveds=reserveds, fields=fields, one_ofs=one_ofs + DEFAULT_LOCATION, + msgtype.name, + nested_types=nested_types, + reserveds=reserveds, + fields=fields, + one_ofs=one_ofs_filtered, ) diff --git a/src/karapace/schema_reader.py b/src/karapace/schema_reader.py index b20487631..d7c02ba03 100644 --- a/src/karapace/schema_reader.py +++ b/src/karapace/schema_reader.py @@ -43,9 +43,9 @@ from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents from karapace.statsd import StatsClient -from karapace.typing import JsonObject, SchemaId, Subject, Version +from karapace.typing import JsonObject, SchemaId, SchemaReaderStoppper, Subject, Version from karapace.utils import json_decode, JSONDecodeError, shutdown -from threading import Event, Thread +from threading import Event, Lock, Thread from typing import Final import asyncio @@ -129,7 +129,7 @@ def _create_admin_client_from_config(config: Config) -> KafkaAdminClient: ) -class KafkaSchemaReader(Thread): +class KafkaSchemaReader(Thread, SchemaReaderStoppper): def __init__( self, config: Config, @@ -167,7 +167,10 @@ def __init__( # old stale version that has not been deleted yet.) self.offset = OFFSET_UNINITIALIZED self._highest_offset = OFFSET_UNINITIALIZED - self.ready = False + # when a master its elected as master we should read the last arrived messages at least + # once. This lock prevent the concurrent modification of the `ready` flag. + self._ready_lock = Lock() + self._ready = False # This event controls when the Reader should stop running, it will be # set by another thread (e.g. `KarapaceSchemaRegistry`) @@ -302,7 +305,7 @@ def _get_beginning_offset(self) -> int: try: beginning_offset, _ = self.consumer.get_watermark_offsets(TopicPartition(self.config["topic_name"], 0)) # The `-1` decrement here is due to historical reasons (evolution of schema reader and offset watcher): - # * The first `OffsetWatcher` implementation neeeded this for flagging empty offsets + # * The first `OffsetWatcher` implementation needed this for flagging empty offsets # * Then synchronization and locking was changed and this remained # * See https://github.com/Aiven-Open/karapace/pull/364/files # * See `OFFSET_EMPTY` and `OFFSET_UNINITIALIZED` @@ -319,9 +322,10 @@ def _get_beginning_offset(self) -> int: return OFFSET_UNINITIALIZED def _is_ready(self) -> bool: - if self.ready: - return True - + """ + Always call `_is_ready` only if `self._ready` is False. + Removed the check since now with the Lock the lookup it's a costly operation. + """ assert self.consumer is not None, "Thread must be started" try: @@ -365,6 +369,14 @@ def _is_ready(self) -> bool: def highest_offset(self) -> int: return max(self._highest_offset, self._offset_watcher.greatest_offset()) + def ready(self) -> bool: + with self._ready_lock: + return self._ready + + def set_not_ready(self) -> None: + with self._ready_lock: + self._ready = False + @staticmethod def _parse_message_value(raw_value: str | bytes) -> JsonObject | None: value = json_decode(raw_value) @@ -376,10 +388,8 @@ def _parse_message_value(raw_value: str | bytes) -> JsonObject | None: def handle_messages(self) -> None: assert self.consumer is not None, "Thread must be started" - msgs: list[Message] = self.consumer.consume(timeout=self.timeout_s, num_messages=self.max_messages_to_process) - if self.ready is False: - self.ready = self._is_ready() + self._update_is_ready_flag() watch_offsets = False if self.master_coordinator is not None: @@ -433,9 +443,8 @@ def consume_messages(self, msgs: list[Message], watch_offsets: bool) -> None: # Default keymode is CANONICAL and preferred unless any data consumed # has key in non-canonical format. If keymode is set to DEPRECATED_KARAPACE # the subsequent keys are omitted from detection. - if not self.ready and self.key_formatter.get_keymode() == KeyMode.CANONICAL: - if msg_keymode == KeyMode.DEPRECATED_KARAPACE: - self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE) + if self.key_formatter.get_keymode() == KeyMode.CANONICAL and msg_keymode == KeyMode.DEPRECATED_KARAPACE: + self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE) value = None message_value = msg.value() @@ -461,14 +470,28 @@ def consume_messages(self, msgs: list[Message], watch_offsets: bool) -> None: else: schema_records_processed_keymode_deprecated_karapace += 1 - if self.ready and watch_offsets: - self._offset_watcher.offset_seen(self.offset) + with self._ready_lock: + if self._ready and watch_offsets: + self._offset_watcher.offset_seen(self.offset) self._report_schema_metrics( schema_records_processed_keymode_canonical, schema_records_processed_keymode_deprecated_karapace, ) + def _update_is_ready_flag(self) -> None: + update_ready_flag = False + + # to keep the lock as few as possible. + with self._ready_lock: + if self._ready is False: + update_ready_flag = True + + if update_ready_flag: + new_ready_flag = self._is_ready() + with self._ready_lock: + self._ready = new_ready_flag + def _report_schema_metrics( self, schema_records_processed_keymode_canonical: int, diff --git a/src/karapace/schema_registry.py b/src/karapace/schema_registry.py index 67f58fddd..5c553b0db 100644 --- a/src/karapace/schema_registry.py +++ b/src/karapace/schema_registry.py @@ -58,6 +58,7 @@ def __init__(self, config: Config) -> None: master_coordinator=self.mc, database=self.database, ) + self.mc.set_stoppper(self.schema_reader) self.schema_lock = asyncio.Lock() self._master_lock = asyncio.Lock() @@ -74,7 +75,7 @@ def get_schemas(self, subject: Subject, *, include_deleted: bool = False) -> lis return list(schema_versions.values()) async def start(self) -> None: - await self.mc.start() + self.mc.start() self.schema_reader.start() self.producer.initialize_karapace_producer() @@ -96,7 +97,7 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str | are_we_master, master_url = self.mc.get_master_info() if are_we_master is None: LOG.info("No master set: %r, url: %r", are_we_master, master_url) - elif not ignore_readiness and self.schema_reader.ready is False: + elif not ignore_readiness and self.schema_reader.ready() is False: LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready) else: return are_we_master, master_url diff --git a/src/karapace/schema_registry_apis.py b/src/karapace/schema_registry_apis.py index fbb8f5a0c..919742982 100644 --- a/src/karapace/schema_registry_apis.py +++ b/src/karapace/schema_registry_apis.py @@ -31,7 +31,7 @@ ) from karapace.karapace import HealthCheck, KarapaceBase from karapace.protobuf.exception import ProtobufUnresolvedDependencyException -from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE, SERVER_NAME +from karapace.rapu import HTTPRequest, HTTPResponse, JSON_CONTENT_TYPE, SERVER_NAME from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema, Versioner from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping from karapace.schema_registry import KarapaceSchemaRegistry @@ -103,8 +103,8 @@ async def schema_registry_health(self) -> HealthCheck: resp = {} if self._auth is not None: resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified - resp["schema_registry_ready"] = self.schema_registry.schema_reader.ready - if self.schema_registry.schema_reader.ready: + resp["schema_registry_ready"] = self.schema_registry.schema_reader.ready() + if self.schema_registry.schema_reader.ready(): resp["schema_registry_startup_time_sec"] = ( self.schema_registry.schema_reader.last_check - self._process_start_time ) @@ -140,20 +140,21 @@ def _check_authorization(self, user: User | None, operation: Operation, resource if not self._auth.check_authorization(user, operation, resource): self.r(body={"message": "Forbidden"}, content_type=JSON_CONTENT_TYPE, status=HTTPStatus.FORBIDDEN) - async def _forward_if_not_ready_to_serve(self, request: HTTPRequest) -> None: - if self.schema_registry.schema_reader.ready: + async def _forward_if_not_ready_to_serve(self, request: HTTPRequest, content_type: str | None = None) -> None: + if self.schema_registry.schema_reader.ready(): pass else: # Not ready, still loading the state. # Needs only the master_url _, master_url = await self.schema_registry.get_master(ignore_readiness=True) + returned_content_type = request.get_header("Content-Type") if content_type is None else content_type if not master_url: self.no_master_error(request.content_type) elif f"{self.config['advertised_hostname']}:{self.config['advertised_port']}" in master_url: # If master url is the same as the url of this Karapace respond 503. self.r( body="", - content_type=request.get_header("Content-Type"), + content_type=returned_content_type, status=HTTPStatus.SERVICE_UNAVAILABLE, ) else: @@ -162,7 +163,7 @@ async def _forward_if_not_ready_to_serve(self, request: HTTPRequest) -> None: request=request, body=request.json, url=url, - content_type=request.get_header("Content-Type"), + content_type=returned_content_type, method=request.method, ) @@ -206,6 +207,12 @@ def _add_schema_registry_routes(self) -> None: schema_request=True, auth=self._auth, ) + self.route( + "/master_available", + callback=self.master_available, + method="GET", + with_request=True, + ) self.route( "/config", callback=self.config_set, @@ -676,6 +683,34 @@ async def config_subject_delete( self.r({"compatibility": self.schema_registry.schema_reader.config["compatibility"]}, content_type) + async def master_available(self, *, request: HTTPRequest) -> None: + no_cache_header = {"Cache-Control": "no-store, no-cache, must-revalidate"} + are_we_master, master_url = await self.schema_registry.get_master() + self.log.info("are master %s, master url %s", are_we_master, master_url) + + if ( + self.schema_registry.schema_reader.master_coordinator._sc is not None # pylint: disable=protected-access + and self.schema_registry.schema_reader.master_coordinator._sc.is_master_assigned_to_myself() # pylint: disable=protected-access + ): + raise HTTPResponse( + body={"master_available": are_we_master}, + status=HTTPStatus.OK, + content_type=JSON_CONTENT_TYPE, + headers=no_cache_header, + ) + + if master_url is None or f"{self.config['advertised_hostname']}:{self.config['advertised_port']}" in master_url: + raise HTTPResponse( + body={"master_available": False}, + status=HTTPStatus.OK, + content_type=JSON_CONTENT_TYPE, + headers=no_cache_header, + ) + + await self._forward_request_remote( + request=request, body={}, url=f"{master_url}/master_available", content_type=JSON_CONTENT_TYPE, method="GET" + ) + async def subjects_list(self, content_type: str, *, request: HTTPRequest, user: User | None = None) -> None: deleted = request.query.get("deleted", "false").lower() == "true" subjects = self.schema_registry.database.find_subjects(include_deleted=deleted) @@ -1307,14 +1342,16 @@ async def _forward_request_remote( if auth_header is not None: headers["Authorization"] = auth_header - with async_timeout.timeout(timeout): + async with async_timeout.timeout(timeout): async with func(url, headers=headers, json=body) as response: if response.headers.get("content-type", "").startswith(JSON_CONTENT_TYPE): resp_content = await response.json() else: resp_content = await response.text() - self.r(body=resp_content, content_type=content_type, status=HTTPStatus(response.status)) + raise HTTPResponse( + body=resp_content, content_type=content_type, status=HTTPStatus(response.status), headers=response.headers + ) def no_master_error(self, content_type: str) -> None: self.r( diff --git a/src/karapace/sentry/sentry_client.py b/src/karapace/sentry/sentry_client.py index 776214c7f..c4dc99d33 100644 --- a/src/karapace/sentry/sentry_client.py +++ b/src/karapace/sentry/sentry_client.py @@ -41,6 +41,8 @@ def _initialize_sentry(self) -> None: # Don't send library logged errors to Sentry as there is also proper return value or raised exception to calling code from sentry_sdk.integrations.logging import ignore_logger + ignore_logger("aiokafka") + ignore_logger("aiokafka.*") ignore_logger("kafka") ignore_logger("kafka.*") diff --git a/src/karapace/typing.py b/src/karapace/typing.py index 1268db001..f7fba570f 100644 --- a/src/karapace/typing.py +++ b/src/karapace/typing.py @@ -4,6 +4,7 @@ """ from __future__ import annotations +from abc import ABC, abstractmethod from collections.abc import Mapping, Sequence from enum import Enum, unique from karapace.errors import InvalidVersion @@ -102,3 +103,13 @@ def value(self) -> int: @property def is_latest(self) -> bool: return self.value == self.MINUS_1_VERSION_TAG + + +class SchemaReaderStoppper(ABC): + @abstractmethod + def ready(self) -> bool: + pass + + @abstractmethod + def set_not_ready(self) -> None: + pass diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index 332b09f0a..6f2e5df35 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -4,7 +4,7 @@ """ from __future__ import annotations -from aiokafka.errors import UnknownTopicOrPartitionError +from aiokafka.errors import InvalidReplicationFactorError, UnknownTopicOrPartitionError from collections.abc import Iterator from confluent_kafka import Message, TopicPartition from confluent_kafka.admin import NewTopic @@ -698,6 +698,56 @@ def __exit__(self, exc_type, exc_value, exc_traceback): ) +def test_backup_restoration_override_replication_factor( + admin_client: KafkaAdminClient, + kafka_servers: KafkaServers, + producer: KafkaProducer, + new_topic: NewTopic, +) -> None: + backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / new_topic.topic + metadata_path = backup_directory / f"{new_topic.topic}.metadata" + config = set_config_defaults( + { + "bootstrap_uri": kafka_servers.bootstrap_servers, + } + ) + + # pupulate the topic and create a backup + for i in range(10): + producer.send( + new_topic.topic, + key=f"message-key-{i}", + value=f"message-value-{i}-" + 1000 * "X", + ) + producer.flush() + api.create_backup( + config=config, + backup_location=backup_directory, + topic_name=TopicName(new_topic.topic), + version=BackupVersion.V3, + replication_factor=6, + ) + + # make sure topic doesn't exist beforehand. + _delete_topic(admin_client, new_topic.topic) + + # assert that the restore would fail without the replication factor override + with pytest.raises(InvalidReplicationFactorError): + api.restore_backup( + config=config, + backup_location=metadata_path, + topic_name=TopicName(new_topic.topic), + ) + + # finally restore the backup with override + api.restore_backup( + config=config, + backup_location=metadata_path, + topic_name=TopicName(new_topic.topic), + override_replication_factor=1, + ) + + def no_color_env() -> dict[str, str]: env = os.environ.copy() try: diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 1673445ba..583008b27 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -35,10 +35,11 @@ from tests.integration.utils.rest_client import RetryRestClient from tests.integration.utils.synchronization import lock_path_for from tests.integration.utils.zookeeper import configure_and_start_zk -from tests.utils import repeat_until_successful_request +from tests.utils import repeat_until_master_is_available, repeat_until_successful_request from urllib.parse import urlparse import asyncio +import contextlib import json import os import pathlib @@ -282,6 +283,7 @@ async def fixture_rest_async( "bootstrap_uri": kafka_servers.bootstrap_servers, # Use non-default max request size for REST producer. "producer_max_request_size": REST_PRODUCER_MAX_REQUEST_BYTES, + "waiting_time_before_acting_as_master_ms": 300, } ) write_config(config_path, config) @@ -356,6 +358,7 @@ async def fixture_rest_async_novalidation( # Use non-default max request size for REST producer. "producer_max_request_size": REST_PRODUCER_MAX_REQUEST_BYTES, "name_strategy_validation": False, # This should be only difference from rest_async + "waiting_time_before_acting_as_master_ms": 300, } ) write_config(config_path, config) @@ -429,6 +432,7 @@ async def fixture_rest_async_registry_auth( "registry_port": registry.port, "registry_user": "admin", "registry_password": "admin", + "waiting_time_before_acting_as_master_ms": 300, } ) rest = KafkaRest(config=config) @@ -490,7 +494,8 @@ async def fixture_registry_async_pair( config_templates=[config1, config2], data_dir=session_logdir / _clear_test_name(request.node.name), ) as endpoints: - yield [server.endpoint.to_url() for server in endpoints] + async with after_master_is_available(endpoints, request.config.getoption("server_ca")): + yield [server.endpoint.to_url() for server in endpoints] @pytest.fixture(scope="function", name="registry_cluster") @@ -542,6 +547,7 @@ async def fixture_registry_async_client( timeout=10, sleep=0.3, ) + await repeat_until_master_is_available(client) yield client finally: await client.close() @@ -682,11 +688,27 @@ async def fixture_registry_async_client_auth( timeout=10, sleep=0.3, ) + await repeat_until_master_is_available(client) yield client finally: await client.close() +@contextlib.asynccontextmanager +async def after_master_is_available( + registry_instances: list[RegistryDescription], server_ca: str | None +) -> AsyncIterator[None]: + client = Client( + server_uri=registry_instances[0].endpoint.to_url(), + server_ca=server_ca, + ) + try: + await repeat_until_master_is_available(client) + yield + finally: + await client.close() + + @pytest.fixture(scope="function", name="registry_async_retry_client_auth") async def fixture_registry_async_retry_client_auth(registry_async_client_auth: Client) -> RetryRestClient: return RetryRestClient(registry_async_client_auth) @@ -714,7 +736,8 @@ async def fixture_registry_async_auth_pair( config_templates=[config1, config2], data_dir=session_logdir / _clear_test_name(request.node.name), ) as endpoints: - yield [server.endpoint.to_url() for server in endpoints] + async with after_master_is_available(endpoints, request.config.getoption("server_ca")): + yield [server.endpoint.to_url() for server in endpoints] @pytest.fixture(scope="function", name="new_topic") diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index 3de98acca..c4c8c95a8 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -6,6 +6,7 @@ """ from karapace.config import set_config_defaults from karapace.coordinator.master_coordinator import MasterCoordinator +from karapace.typing import SchemaReaderStoppper from tests.integration.utils.kafka_server import KafkaServers from tests.integration.utils.network import allocate_port from tests.integration.utils.rest_client import RetryRestClient @@ -16,9 +17,18 @@ import pytest +class AlwaysAvailableSchemaReaderStoppper(SchemaReaderStoppper): + def ready(self) -> bool: + return True + + def set_not_ready(self) -> None: + pass + + async def init_admin(config): mc = MasterCoordinator(config=config) - await mc.start() + mc.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) + mc.start() return mc @@ -28,15 +38,15 @@ def is_master(mc: MasterCoordinator) -> bool: This takes care of a race condition were the flag `master` is set but `master_url` is not yet set. """ - return bool(mc.schema_coordinator and mc.schema_coordinator.are_we_master and mc.schema_coordinator.master_url) + return bool(mc.schema_coordinator and mc.schema_coordinator.are_we_master() and mc.schema_coordinator.master_url) def has_master(mc: MasterCoordinator) -> bool: """True if `mc` has a master.""" - return bool(mc.schema_coordinator and not mc.schema_coordinator.are_we_master and mc.schema_coordinator.master_url) + return bool(mc.schema_coordinator and not mc.schema_coordinator.are_we_master() and mc.schema_coordinator.master_url) -@pytest.mark.timeout(60) # Github workflows need a bit of extra time +@pytest.mark.timeout(65) # Github workflows need a bit of extra time @pytest.mark.parametrize("strategy", ["lowest", "highest"]) async def test_master_selection(kafka_servers: KafkaServers, strategy: str) -> None: # Use random port to allow for parallel runs. @@ -185,11 +195,11 @@ async def test_no_eligible_master(kafka_servers: KafkaServers) -> None: mc = await init_admin(config_aa) try: # Wait for the election to happen, ie. flag is not None - while not mc.schema_coordinator or mc.schema_coordinator.are_we_master is None: + while not mc.schema_coordinator or mc.schema_coordinator.are_we_master() is None: await asyncio.sleep(0.5) # Make sure the end configuration is as expected - assert mc.schema_coordinator.are_we_master is False + assert mc.schema_coordinator.are_we_master() is False assert mc.schema_coordinator.master_url is None finally: await mc.close() diff --git a/tests/integration/test_schema_coordinator.py b/tests/integration/test_schema_coordinator.py index 5af1d17c0..d8b02c0c9 100644 --- a/tests/integration/test_schema_coordinator.py +++ b/tests/integration/test_schema_coordinator.py @@ -23,6 +23,7 @@ from karapace.utils import json_encode from karapace.version import __version__ from tenacity import retry, stop_after_delay, TryAgain, wait_fixed +from tests.integration.test_master_coordinator import AlwaysAvailableSchemaReaderStoppper from tests.integration.utils.kafka_server import KafkaServers from tests.utils import new_random_name from typing import Final @@ -33,6 +34,7 @@ import contextlib import logging import pytest +import time UNKNOWN_MEMBER_ID = JoinGroupRequest.UNKNOWN_MEMBER_ID @@ -55,6 +57,7 @@ async def fixture_admin( ) -> AsyncGenerator: coordinator = SchemaCoordinator( mocked_client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -111,8 +114,11 @@ async def test_coordinator_workflow( # Check if 2 coordinators will coordinate rebalances correctly # Check if the initial group join is performed correctly with minimal # setup + + waiting_time_before_acting_as_master_sec = 6 coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host-1", 10101, "https", @@ -122,17 +128,26 @@ async def test_coordinator_workflow( session_timeout_ms=10000, heartbeat_interval_ms=500, retry_backoff_ms=100, + # removing 1 second to consider the network latency in the rest of the test + waiting_time_before_acting_as_master_ms=(waiting_time_before_acting_as_master_sec - 1) * 1000, ) coordinator.start() assert coordinator.coordinator_id is None await wait_for_ready(coordinator) await coordinator.ensure_coordinator_known() - await wait_for_primary_state(coordinator) + assert coordinator.coordinator_id is not None + + assert not coordinator.are_we_master() + # waiting a little bit more since the cluster needs to setup. + await asyncio.sleep(2 * waiting_time_before_acting_as_master_sec) + assert not coordinator.are_we_master(), "last fetch before being available as master" + assert coordinator.are_we_master(), f"after {waiting_time_before_acting_as_master_sec} seconds we can act as a master" # Check if adding an additional coordinator will rebalance correctly client2 = await _get_client(kafka_servers=kafka_servers) coordinator2 = SchemaCoordinator( client2, + AlwaysAvailableSchemaReaderStoppper(), "test-host-2", 10100, "https", @@ -155,14 +170,45 @@ async def test_coordinator_workflow( secondary = coordinator if primary_selection_strategy == "highest" else coordinator2 secondary_client = client if primary_selection_strategy == "highest" else client2 - await wait_for_primary_state(primary) - assert not secondary.are_we_master + if primary == coordinator2: + # we need to disable the master for `waiting_time_before_acting_as_master_sec` seconds each time, + # a new node its elected as master. + # if the coordinator its `coordinator1` since isn't changed we don't have to wait + # for the `waiting_time_before_acting_as_master_sec` seconds. + + # give time to the election to be forwarded to all the coordinators. + await asyncio.sleep(3) + + assert ( + not primary.are_we_master() + ), "after a change in the coordinator we can act as a master until we wait for the required time" + assert not secondary.are_we_master(), "also the second cannot be immediately a master" + # after that time the primary can act as a master + await asyncio.sleep(waiting_time_before_acting_as_master_sec) + assert not primary.are_we_master(), "Last fetch before being available as master" + assert not secondary.are_we_master(), "secondary cannot be master" + + assert primary.are_we_master() + assert not secondary.are_we_master() # Check is closing the primary coordinator will rebalance the secondary to change to primary await primary.close() await primary_client.close() - await wait_for_primary_state(secondary) + now = time.time() + while time.time() - now < waiting_time_before_acting_as_master_sec: + assert not secondary.are_we_master(), ( + f"Cannot become master before {waiting_time_before_acting_as_master_sec} seconds " + f"for the late records that can arrive from the previous master" + ) + await asyncio.sleep(0.5) + + attempts = 0 + while not secondary.are_we_master(): + attempts += 1 + if attempts >= 1000: + raise ValueError("The master should have been elected") + await asyncio.sleep(0.5) await secondary.close() await secondary_client.close() @@ -385,6 +431,7 @@ async def test_coordinator_metadata_update(client: AIOKafkaClient) -> None: try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -427,6 +474,7 @@ async def test_coordinator__send_req(client: AIOKafkaClient) -> None: try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -465,6 +513,7 @@ async def test_coordinator_ensure_coordinator_known(client: AIOKafkaClient) -> N try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -546,6 +595,7 @@ async def test_coordinator__do_heartbeat(client: AIOKafkaClient) -> None: try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -634,6 +684,7 @@ async def test_coordinator__heartbeat_routine(client: AIOKafkaClient) -> None: try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -705,6 +756,7 @@ async def test_coordinator__coordination_routine(client: AIOKafkaClient) -> None try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", diff --git a/tests/integration/test_schema_reader.py b/tests/integration/test_schema_reader.py index 4d00a5581..a1cf382b4 100644 --- a/tests/integration/test_schema_reader.py +++ b/tests/integration/test_schema_reader.py @@ -15,6 +15,7 @@ from karapace.schema_reader import KafkaSchemaReader from karapace.utils import json_encode from tests.base_testcase import BaseTestCase +from tests.integration.test_master_coordinator import AlwaysAvailableSchemaReaderStoppper from tests.integration.utils.kafka_server import KafkaServers from tests.schemas.json_schemas import FALSE_SCHEMA, TRUE_SCHEMA from tests.utils import create_group_name_factory, create_subject_name_factory, new_random_name, new_topic @@ -70,8 +71,9 @@ async def test_regression_soft_delete_schemas_should_be_registered( } ) master_coordinator = MasterCoordinator(config=config) + master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) try: - await master_coordinator.start() + master_coordinator.start() database = InMemoryDatabase() offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( @@ -162,8 +164,9 @@ async def test_regression_config_for_inexisting_object_should_not_throw( } ) master_coordinator = MasterCoordinator(config=config) + master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) try: - await master_coordinator.start() + master_coordinator.start() database = InMemoryDatabase() offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( @@ -266,8 +269,9 @@ async def test_key_format_detection( } ) master_coordinator = MasterCoordinator(config=config) + master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) try: - await master_coordinator.start() + master_coordinator.start() key_formatter = KeyFormatter() database = InMemoryDatabase() offset_watcher = OffsetWatcher() diff --git a/tests/schemas/protobuf.py b/tests/schemas/protobuf.py index 1bd3f05d5..afbb3f890 100644 --- a/tests/schemas/protobuf.py +++ b/tests/schemas/protobuf.py @@ -261,3 +261,21 @@ "lzdGVyLk1ldGFkYXRhEhYKDmNvbXBhbnlfbnVtYmVyGAIgASgJGhYKCE1ldGFkYXRhEgoK" "AmlkGAEgASgJYgZwcm90bzM=" ) + +schema_protobuf_optionals_bin = ( + "Cgp0ZXN0LnByb3RvIqYBCgpEaW1lbnNpb25zEhEKBHNpemUYASABKAFIAIgBARISCgV3aWR0aBgCIAEoAUgBiAEBEhMKBmhlaWdodBgDIAEo" + + "AUgCiAEBEhMKBmxlbmd0aBgEIAEoAUgDiAEBEhMKBndlaWdodBgFIAEoAUgEiAEBQgcKBV9zaXplQggKBl93aWR0aEIJCgdfaGVpZ2h0Qg" + + "kKB19sZW5ndGhCCQoHX3dlaWdodGIGcHJvdG8z" +) + +schema_protobuf_optionals = """\ +syntax = "proto3"; + +message Dimensions { + optional double size = 1; + optional double width = 2; + optional double height = 3; + optional double length = 4; + optional double weight = 5; +} +""" diff --git a/tests/unit/test_protobuf_binary_serialization.py b/tests/unit/test_protobuf_binary_serialization.py index 6950066d3..99bfe375e 100644 --- a/tests/unit/test_protobuf_binary_serialization.py +++ b/tests/unit/test_protobuf_binary_serialization.py @@ -16,6 +16,8 @@ schema_protobuf_nested_message4_bin_protoc, schema_protobuf_oneof, schema_protobuf_oneof_bin, + schema_protobuf_optionals, + schema_protobuf_optionals_bin, schema_protobuf_order_after, schema_protobuf_order_after_bin, schema_protobuf_plain, @@ -89,6 +91,7 @@ (schema_protobuf_references, schema_protobuf_references_bin), (schema_protobuf_references2, schema_protobuf_references2_bin), (schema_protobuf_complex, schema_protobuf_complex_bin), + (schema_protobuf_optionals, schema_protobuf_optionals_bin), ], ) def test_schema_deserialize(schema_plain, schema_serialized): @@ -125,6 +128,7 @@ def test_protoc_serialized_schema_deserialize(schema_plain, schema_serialized): schema_protobuf_references, schema_protobuf_references2, schema_protobuf_complex, + schema_protobuf_optionals, ], ) def test_simple_schema_serialize(schema): diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index 552fa0be7..d500c5b03 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -173,7 +173,7 @@ def test_readiness_check(testcase: ReadinessTestCase) -> None: schema_reader.offset = testcase.cur_offset schema_reader.handle_messages() - assert schema_reader.ready is testcase.expected + assert schema_reader.ready() is testcase.expected def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None: @@ -196,7 +196,7 @@ def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None: assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_ON_STARTUP schema_reader.handle_messages() - assert schema_reader.ready is True + assert schema_reader.ready() is True assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP @@ -242,16 +242,16 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche schema_reader.handle_messages() assert schema_reader.offset == 1 - assert schema_reader.ready is False + assert schema_reader.ready() is False schema_reader.handle_messages() assert schema_reader.offset == 2 - assert schema_reader.ready is False + assert schema_reader.ready() is False schema_reader.handle_messages() assert schema_reader.offset == 3 - assert schema_reader.ready is False + assert schema_reader.ready() is False schema_reader.handle_messages() # call last time to call _is_ready() assert schema_reader.offset == 3 - assert schema_reader.ready is True + assert schema_reader.ready() is True assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP @@ -598,7 +598,7 @@ def test_message_error_handling( schema_reader.handle_messages() assert schema_reader.offset == 1 - assert not schema_reader.ready + assert not schema_reader.ready() for log in caplog.records: assert log.name == "karapace.schema_reader" assert log.levelname == "WARNING" @@ -640,7 +640,7 @@ def test_message_error_handling_with_invalid_reference_schema_protobuf( schema_reader.handle_messages() assert schema_reader.offset == 1 - assert not schema_reader.ready + assert not schema_reader.ready() # When handling the schema schema_reader.consumer.consume.side_effect = ([message_using_ref],) @@ -650,7 +650,7 @@ def test_message_error_handling_with_invalid_reference_schema_protobuf( schema_reader.handle_messages() assert schema_reader.offset == 1 - assert not schema_reader.ready + assert not schema_reader.ready() warn_records = [r for r in caplog.records if r.levelname == "WARNING"] diff --git a/tests/unit/test_schema_registry_api.py b/tests/unit/test_schema_registry_api.py index 7fcecd47e..b4d87f35b 100644 --- a/tests/unit/test_schema_registry_api.py +++ b/tests/unit/test_schema_registry_api.py @@ -33,7 +33,7 @@ async def test_validate_schema_request_body() -> None: async def test_forward_when_not_ready() -> None: with patch("karapace.schema_registry_apis.KarapaceSchemaRegistry") as schema_registry_class: schema_reader_mock = Mock(spec=KafkaSchemaReader) - ready_property_mock = PropertyMock(return_value=False) + ready_property_mock = PropertyMock(return_value=lambda: False) schema_registry = AsyncMock(spec=KarapaceSchemaRegistry) type(schema_reader_mock).ready = ready_property_mock schema_registry.schema_reader = schema_reader_mock diff --git a/tests/utils.py b/tests/utils.py index 191fba348..4c255ff2f 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -19,6 +19,7 @@ import os import ssl import sys +import time import uuid consumer_valid_payload = { @@ -319,6 +320,15 @@ async def repeat_until_successful_request( return res +async def repeat_until_master_is_available(client: Client) -> None: + while True: + res = await client.get("/master_available", json={}) + reply = res.json() + if reply is not None and "master_available" in reply and reply["master_available"] is True: + break + time.sleep(1) + + def write_ini(file_path: Path, ini_data: dict) -> None: ini_contents = (f"{key}={value}" for key, value in ini_data.items()) file_contents = "\n".join(ini_contents)