Skip to content

Commit

Permalink
fix: notify flush receiver after write buffer is released (#4476)
Browse files Browse the repository at this point in the history
* fix: notify the worker after write buffer is released

* feat: worker level region count
  • Loading branch information
evenyag authored Aug 1, 2024
1 parent 62a0def commit 6c4b8b6
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 29 deletions.
30 changes: 29 additions & 1 deletion src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::storage::RegionId;
use strum::IntoStaticStr;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, watch};

use crate::access_layer::{AccessLayerRef, OperationType, SstWriteRequest};
use crate::cache::CacheManagerRef;
Expand Down Expand Up @@ -88,6 +88,9 @@ pub struct WriteBufferManagerImpl {
memory_used: AtomicUsize,
/// Memory that hasn't been scheduled to free (e.g. used by mutable memtables).
memory_active: AtomicUsize,
/// Optional notifier.
/// The manager can wake up the worker once we free the write buffer.
notifier: Option<watch::Sender<()>>,
}

impl WriteBufferManagerImpl {
Expand All @@ -98,9 +101,16 @@ impl WriteBufferManagerImpl {
mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
memory_used: AtomicUsize::new(0),
memory_active: AtomicUsize::new(0),
notifier: None,
}
}

/// Attaches a notifier to the manager.
pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
self.notifier = Some(notifier);
self
}

/// Returns memory usage of mutable memtables.
pub fn mutable_usage(&self) -> usize {
self.memory_active.load(Ordering::Relaxed)
Expand Down Expand Up @@ -159,6 +169,12 @@ impl WriteBufferManager for WriteBufferManagerImpl {

fn free_mem(&self, mem: usize) {
self.memory_used.fetch_sub(mem, Ordering::Relaxed);
if let Some(notifier) = &self.notifier {
// Notifies the worker after the memory usage is decreased. When we drop the memtable
// outside of the worker, the worker may still stall requests because the memory usage
// is not updated. So we need to notify the worker to handle stalled requests again.
let _ = notifier.send(());
}
}

fn memory_usage(&self) -> usize {
Expand Down Expand Up @@ -786,6 +802,18 @@ mod tests {
assert!(manager.should_flush_engine());
}

#[test]
fn test_manager_notify() {
let (sender, receiver) = watch::channel(());
let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
manager.reserve_mem(500);
assert!(!receiver.has_changed().unwrap());
manager.schedule_free_mem(500);
assert!(!receiver.has_changed().unwrap());
manager.free_mem(500);
assert!(receiver.has_changed().unwrap());
}

#[tokio::test]
async fn test_schedule_empty() {
let env = SchedulerEnv::new().await;
Expand Down
10 changes: 7 additions & 3 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ lazy_static! {
/// Global memtable dictionary size in bytes.
pub static ref MEMTABLE_DICT_BYTES: IntGauge =
register_int_gauge!("greptime_mito_memtable_dict_bytes", "mito memtable dictionary size in bytes").unwrap();
/// Gauge for open regions
pub static ref REGION_COUNT: IntGauge =
register_int_gauge!("greptime_mito_region_count", "mito region count").unwrap();
/// Gauge for open regions in each worker.
pub static ref REGION_COUNT: IntGaugeVec =
register_int_gauge_vec!(
"greptime_mito_region_count",
"mito region count in each worker",
&[WORKER_LABEL],
).unwrap();
/// Elapsed time to handle requests.
pub static ref HANDLE_REQUEST_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_mito_handle_request_elapsed",
Expand Down
26 changes: 16 additions & 10 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::config::MitoConfig;
use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::memtable::MemtableBuilderProvider;
use crate::metrics::WRITE_STALL_TOTAL;
use crate::metrics::{REGION_COUNT, WRITE_STALL_TOTAL};
use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef};
use crate::request::{
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
Expand Down Expand Up @@ -130,9 +130,11 @@ impl WorkerGroup {
object_store_manager: ObjectStoreManagerRef,
plugins: Plugins,
) -> Result<WorkerGroup> {
let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new(
config.global_write_buffer_size.as_bytes() as usize,
));
let (flush_sender, flush_receiver) = watch::channel(());
let write_buffer_manager = Arc::new(
WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
.with_notifier(flush_sender.clone()),
);
let puffin_manager_factory = PuffinManagerFactory::new(
&config.index.aux_path,
config.index.staging_size.as_bytes(),
Expand Down Expand Up @@ -165,7 +167,6 @@ impl WorkerGroup {
.build(),
);
let time_provider = Arc::new(StdTimeProvider);
let (flush_sender, flush_receiver) = watch::channel(());

let workers = (0..config.num_workers)
.map(|id| {
Expand Down Expand Up @@ -265,10 +266,12 @@ impl WorkerGroup {
listener: Option<crate::engine::listener::EventListenerRef>,
time_provider: TimeProviderRef,
) -> Result<WorkerGroup> {
let (flush_sender, flush_receiver) = watch::channel(());
let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
Arc::new(WriteBufferManagerImpl::new(
config.global_write_buffer_size.as_bytes() as usize,
))
Arc::new(
WriteBufferManagerImpl::new(config.global_write_buffer_size.as_bytes() as usize)
.with_notifier(flush_sender.clone()),
)
});
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
Expand Down Expand Up @@ -297,7 +300,6 @@ impl WorkerGroup {
.write_cache(write_cache)
.build(),
);
let (flush_sender, flush_receiver) = watch::channel(());
let workers = (0..config.num_workers)
.map(|id| {
WorkerStarter {
Expand Down Expand Up @@ -401,6 +403,7 @@ impl<S: LogStore> WorkerStarter<S> {

let running = Arc::new(AtomicBool::new(true));
let now = self.time_provider.current_time_millis();
let id_string = self.id.to_string();
let mut worker_thread = RegionWorkerLoop {
id: self.id,
config: self.config.clone(),
Expand Down Expand Up @@ -436,7 +439,8 @@ impl<S: LogStore> WorkerStarter<S> {
last_periodical_check_millis: now,
flush_sender: self.flush_sender,
flush_receiver: self.flush_receiver,
stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&self.id.to_string()]),
stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&id_string]),
region_count: REGION_COUNT.with_label_values(&[&id_string]),
};
let handle = common_runtime::spawn_global(async move {
worker_thread.run().await;
Expand Down Expand Up @@ -623,6 +627,8 @@ struct RegionWorkerLoop<S> {
flush_receiver: watch::Receiver<()>,
/// Gauge of stalled request count.
stalled_count: IntGauge,
/// Gauge of regions in the worker.
region_count: IntGauge,
}

impl<S: LogStore> RegionWorkerLoop<S> {
Expand Down
7 changes: 3 additions & 4 deletions src/mito2/src/worker/handle_close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use store_api::region_request::AffectedRows;
use store_api::storage::RegionId;

use crate::error::Result;
use crate::metrics::REGION_COUNT;
use crate::worker::RegionWorkerLoop;

impl<S> RegionWorkerLoop<S> {
Expand All @@ -31,7 +30,7 @@ impl<S> RegionWorkerLoop<S> {
return Ok(0);
};

info!("Try to close region {}", region_id);
info!("Try to close region {}, worker: {}", region_id, self.id);

region.stop().await;
self.regions.remove_region(region_id);
Expand All @@ -40,9 +39,9 @@ impl<S> RegionWorkerLoop<S> {
// Clean compaction status.
self.compaction_scheduler.on_region_closed(region_id);

info!("Region {} closed", region_id);
info!("Region {} closed, worker: {}", region_id, self.id);

REGION_COUNT.dec();
self.region_count.dec();

Ok(0)
}
Expand Down
9 changes: 6 additions & 3 deletions src/mito2/src/worker/handle_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use store_api::region_request::{AffectedRows, RegionCreateRequest};
use store_api::storage::RegionId;

use crate::error::{InvalidMetadataSnafu, Result};
use crate::metrics::REGION_COUNT;
use crate::region::opener::{check_recovered_region, RegionOpener};
use crate::worker::RegionWorkerLoop;

Expand Down Expand Up @@ -70,9 +69,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.create_or_open(&self.config, &self.wal)
.await?;

info!("A new region created, region: {:?}", region.metadata());
info!(
"A new region created, worker: {}, region: {:?}",
self.id,
region.metadata()
);

REGION_COUNT.inc();
self.region_count.inc();

// Insert the MitoRegion into the RegionMap.
self.regions.insert_region(Arc::new(region));
Expand Down
5 changes: 2 additions & 3 deletions src/mito2/src/worker/handle_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use store_api::storage::RegionId;
use tokio::time::sleep;

use crate::error::{OpenDalSnafu, Result};
use crate::metrics::REGION_COUNT;
use crate::region::{RegionMapRef, RegionState};
use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};

Expand All @@ -45,7 +44,7 @@ where
) -> Result<AffectedRows> {
let region = self.regions.writable_region(region_id)?;

info!("Try to drop region: {}", region_id);
info!("Try to drop region: {}, worker: {}", region_id, self.id);

// Marks the region as dropping.
region.set_dropping()?;
Expand Down Expand Up @@ -93,7 +92,7 @@ where
region_id
);

REGION_COUNT.dec();
self.region_count.dec();

// Detaches a background task to delete the region dir
let region_dir = region.access_layer.region_dir().to_owned();
Expand Down
14 changes: 9 additions & 5 deletions src/mito2/src/worker/handle_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use store_api::storage::RegionId;
use crate::error::{
ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result,
};
use crate::metrics::REGION_COUNT;
use crate::region::opener::RegionOpener;
use crate::request::OptionOutputTx;
use crate::wal::entry_distributor::WalEntryReceiver;
Expand Down Expand Up @@ -56,7 +55,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.context(OpenDalSnafu)?
{
let result = remove_region_dir_once(&request.region_dir, object_store).await;
info!("Region {} is dropped, result: {:?}", region_id, result);
info!(
"Region {} is dropped, worker: {}, result: {:?}",
region_id, self.id, result
);
return RegionNotFoundSnafu { region_id }.fail();
}

Expand Down Expand Up @@ -84,7 +86,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
sender.send(Err(err));
return;
}
info!("Try to open region {}", region_id);
info!("Try to open region {}, worker: {}", region_id, self.id);

// Open region from specific region dir.
let opener = match RegionOpener::new(
Expand Down Expand Up @@ -112,12 +114,14 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let wal = self.wal.clone();
let config = self.config.clone();
let opening_regions = self.opening_regions.clone();
let region_count = self.region_count.clone();
let worker_id = self.id;
opening_regions.insert_sender(region_id, sender);
common_runtime::spawn_global(async move {
match opener.open(&config, &wal).await {
Ok(region) => {
info!("Region {} is opened", region_id);
REGION_COUNT.inc();
info!("Region {} is opened, worker: {}", region_id, worker_id);
region_count.inc();

// Insert the Region into the RegionMap.
regions.insert_region(Arc::new(region));
Expand Down

0 comments on commit 6c4b8b6

Please sign in to comment.