Skip to content

Commit

Permalink
Add support for dump_database() builtin function. Involved a bunch of…
Browse files Browse the repository at this point in the history
… refactoring of the way the database and loader interface propagate through + a new Config struct passed along.

Fix missing linefeeds in textdump read, without adding unnecessary final linefeed.
Change loader interface to be able to expose for textdump
  • Loading branch information
rdaum committed Jan 12, 2024
1 parent 5f285ee commit 528e411
Show file tree
Hide file tree
Showing 16 changed files with 180 additions and 59 deletions.
26 changes: 21 additions & 5 deletions crates/daemon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use tokio::signal::unix::{signal, SignalKind};
use tracing::info;

use moor_db::DatabaseBuilder;
use moor_kernel::config::Config;
use moor_kernel::tasks::scheduler::Scheduler;
use moor_kernel::textdump::textdump_load;
use rpc_common::{RpcRequestError, RpcResponse, RpcResult};
Expand Down Expand Up @@ -60,6 +61,13 @@ struct Args {
#[arg(short, long, value_name = "textdump", help = "Path to textdump to import", value_hint = ValueHint::FilePath)]
textdump: Option<PathBuf>,

#[arg(
long,
value_name = "textdump-output",
help = "Path to textdump file to write on `dump_database()`, if any"
)]
textdump_out: Option<PathBuf>,

#[arg(
short,
long,
Expand Down Expand Up @@ -188,7 +196,7 @@ async fn main() {

info!("Daemon starting...");
let db_source_builder = DatabaseBuilder::new().with_path(args.db.clone());
let (mut db_source, freshly_made) = db_source_builder.open_db().await.unwrap();
let (db_source, freshly_made) = db_source_builder.open_db().await.unwrap();
info!(path = ?args.db, "Opened database");

// If the database already existed, do not try to import the textdump...
Expand All @@ -198,10 +206,11 @@ async fn main() {
} else {
info!("Loading textdump...");
let start = std::time::Instant::now();
let mut loader_interface = db_source
let loader_interface = db_source
.clone()
.loader_client()
.expect("Unable to get loader interface from database");
textdump_load(loader_interface.as_mut(), textdump.to_str().unwrap())
textdump_load(loader_interface.clone(), textdump.to_str().unwrap())
.await
.unwrap();
let duration = start.elapsed();
Expand All @@ -219,11 +228,18 @@ async fn main() {
let mut stop_signal =
signal(SignalKind::interrupt()).expect("Unable to register STOP signal handler");

let config = Config {
textdump_output: args.textdump_out,
};

let state_source = db_source
.clone()
.world_state_source()
.expect("Could not get world state source from db");
// The pieces from core we're going to use:
// Our DB.
let state_source = db_source.world_state_source().unwrap();
// Our scheduler.
let scheduler = Scheduler::new(state_source.clone());
let scheduler = Scheduler::new(db_source, config);

// The scheduler thread:
let loop_scheduler = scheduler.clone();
Expand Down
8 changes: 4 additions & 4 deletions crates/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ pub struct DatabaseBuilder {
}

pub trait Database {
fn loader_client(&mut self) -> Result<Box<dyn LoaderInterface>, WorldStateError>;
fn world_state_source(self: Box<Self>) -> Result<Arc<dyn WorldStateSource>, WorldStateError>;
fn loader_client(self: Arc<Self>) -> Result<Arc<dyn LoaderInterface>, WorldStateError>;
fn world_state_source(self: Arc<Self>) -> Result<Arc<dyn WorldStateSource>, WorldStateError>;
}

impl DatabaseBuilder {
Expand All @@ -60,10 +60,10 @@ impl DatabaseBuilder {

/// Returns a new database instance. The second value in the result tuple is true if the
/// database was newly created, and false if it was already present.
pub async fn open_db(&self) -> Result<(Box<dyn Database>, bool), String> {
pub async fn open_db(&self) -> Result<(Arc<dyn Database + Send + Sync>, bool), String> {
let (db, fresh) =
RelBoxWorldState::open(self.path.clone(), self.memory_size.unwrap_or(1 << 40)).await;
Ok((Box::new(db), fresh))
Ok((Arc::new(db), fresh))
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/db/src/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use moor_values::var::Var;
/// Interface exposed to be used by the textdump loader. Overlap of functionality with what
/// WorldState could provide, but potentially different constraints/semantics (e.g. no perms checks)
#[async_trait]
pub trait LoaderInterface {
pub trait LoaderInterface: Send + Sync {
/// For reading textdumps...
async fn create_object(
&self,
Expand Down
8 changes: 4 additions & 4 deletions crates/db/src/odb/rb_worldstate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1093,13 +1093,13 @@ impl RelBoxTransaction {
}

impl Database for RelBoxWorldState {
fn loader_client(&mut self) -> Result<Box<dyn LoaderInterface>, WorldStateError> {
fn loader_client(self: Arc<Self>) -> Result<Arc<dyn LoaderInterface>, WorldStateError> {
let tx = RelBoxTransaction::new(self.db.clone());
Ok(Box::new(DbTxWorldState { tx: Box::new(tx) }))
Ok(Arc::new(DbTxWorldState { tx: Box::new(tx) }))
}

fn world_state_source(self: Box<Self>) -> Result<Arc<dyn WorldStateSource>, WorldStateError> {
Ok(Arc::new(*self))
fn world_state_source(self: Arc<Self>) -> Result<Arc<dyn WorldStateSource>, WorldStateError> {
Ok(self)
}
}

Expand Down
21 changes: 21 additions & 0 deletions crates/kernel/src/builtins/bf_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,26 @@ async fn bf_eval<'a>(bf_args: &mut BfCallState<'a>) -> Result<BfRet, Error> {
}
bf_declare!(eval, bf_eval);

async fn bf_dump_database<'a>(bf_args: &mut BfCallState<'a>) -> Result<BfRet, Error> {
bf_args
.task_perms()
.await
.map_err(world_state_err)?
.check_wizard()
.map_err(world_state_err)?;

bf_args
.scheduler_sender
.send((
bf_args.exec_state.top().task_id,
SchedulerControlMsg::Checkpoint,
))
.map_err(|_| Error::E_QUOTA)?;

Ok(Ret(v_bool(true)))
}
bf_declare!(dump_database, bf_dump_database);

impl VM {
pub(crate) fn register_bf_server(&mut self) {
self.builtins[offset_for_builtin("notify")] = Arc::new(BfNotify {});
Expand Down Expand Up @@ -829,5 +849,6 @@ impl VM {
self.builtins[offset_for_builtin("listeners")] = Arc::new(BfListeners {});
self.builtins[offset_for_builtin("eval")] = Arc::new(BfEval {});
self.builtins[offset_for_builtin("read")] = Arc::new(BfRead {});
self.builtins[offset_for_builtin("dump_database")] = Arc::new(BfDumpDatabase {});
}
}
9 changes: 9 additions & 0 deletions crates/kernel/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//! Config is created by the host daemon, and passed through the scheduler, whereupon it is
//! available to all components. Used to hold things typically configured by CLI flags, etc.
use std::path::PathBuf;

#[derive(Debug, Default)]
pub struct Config {
pub textdump_output: Option<PathBuf>,
}
1 change: 1 addition & 0 deletions crates/kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//

pub mod builtins;
pub mod config;
pub mod matching;
pub mod tasks;
pub mod textdump;
Expand Down
76 changes: 68 additions & 8 deletions crates/kernel/src/tasks/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
//

use std::collections::HashMap;
use std::fs::File;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use crate::textdump::{make_textdump, TextdumpWriter};
use bincode::{Decode, Encode};
use metrics_macros::{gauge, increment_counter};
use thiserror::Error;
Expand All @@ -37,6 +39,7 @@ use SchedulerError::{
TaskAbortedCancelled, TaskAbortedError, TaskAbortedException, TaskAbortedLimit,
};

use crate::config::Config;
use crate::tasks::scheduler::SchedulerError::TaskNotFound;
use crate::tasks::sessions::Session;
use crate::tasks::task::Task;
Expand All @@ -46,6 +49,7 @@ use crate::vm::Fork;
use crate::vm::UncaughtException;
use moor_compiler::codegen::compile;
use moor_compiler::CompileError;
use moor_db::Database;

const SCHEDULER_TICK_TIME: Duration = Duration::from_millis(5);
const METRICS_POLLER_TICK_TIME: Duration = Duration::from_secs(5);
Expand All @@ -57,6 +61,7 @@ pub struct Scheduler {
inner: Arc<RwLock<Inner>>,
control_sender: UnboundedSender<(TaskId, SchedulerControlMsg)>,
control_receiver: Arc<Mutex<UnboundedReceiver<(TaskId, SchedulerControlMsg)>>>,
config: Arc<Config>,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq, Decode, Encode)]
Expand Down Expand Up @@ -147,24 +152,26 @@ enum TaskHandleResult {

struct Inner {
running: bool,
state_source: Arc<dyn WorldStateSource>,
database: Arc<dyn Database + Send + Sync>,
next_task_id: usize,
tasks: HashMap<TaskId, TaskControl>,
input_requests: HashMap<Uuid, TaskId>,
}

/// Public facing interface for the scheduler.
impl Scheduler {
pub fn new(state_source: Arc<dyn WorldStateSource>) -> Self {
pub fn new(database: Arc<dyn Database + Send + Sync>, config: Config) -> Self {
let config = Arc::new(config);
let (control_sender, control_receiver) = tokio::sync::mpsc::unbounded_channel();
Self {
inner: Arc::new(RwLock::new(Inner {
running: Default::default(),
state_source,
database,
next_task_id: Default::default(),
tasks: HashMap::new(),
input_requests: Default::default(),
})),
config: config.clone(),
control_sender,
control_receiver: Arc::new(Mutex::new(control_receiver)),
}
Expand Down Expand Up @@ -539,6 +546,7 @@ impl Scheduler {
}

/// Handle task-origin'd scheduler control messages.
/// Note: this function should never be allowed to panic, as it is called from the scheduler main loop.
async fn handle_task_control_msg(
&self,
task_id: TaskId,
Expand Down Expand Up @@ -739,6 +747,46 @@ impl Scheduler {
// Task is asking to boot a player.
vec![TaskHandleResult::Disconnect(task_id, player)]
}
SchedulerControlMsg::Checkpoint => {
increment_counter!("scheduler,checkpoint");
let Some(textdump_path) = self.config.textdump_output.clone() else {
error!("Cannot textdump as textdump_file not configured");
return vec![];
};

// TODO: fork a task to do this, so we're not blocking the scheduler thread.
let loader_client = {
let inner = self.inner.write().await;

match inner.database.clone().loader_client() {
Ok(tx) => tx,
Err(e) => {
error!(?e, "Could not start transaction for checkpoint");
return vec![];
}
}
};

tokio::spawn(async move {
let Ok(mut output) = File::create(&textdump_path) else {
error!("Could not open textdump file for writing");
return;
};

let textdump = make_textdump(
loader_client,
// TODO: just to be compatible with LambdaMOO import for now, hopefully.
Some("** LambdaMOO Database, Format Version 1 **"),
)
.await;

let mut writer = TextdumpWriter::new(&mut output);
if let Err(e) = writer.write_textdump(&textdump) {
error!(?e, "Could not write textdump");
}
});
vec![]
}
}
}
}
Expand Down Expand Up @@ -848,7 +896,10 @@ impl Inner {
task.suspended = false;

let world_state = self
.state_source
.database
.clone()
.world_state_source()
.expect("Could not get world state source")
.new_world_state()
.await
// This is a rather drastic system problem if it happens, and it's best to just die.
Expand Down Expand Up @@ -1061,7 +1112,10 @@ impl Inner {

// Follow the usual task resume logic.
let world_state = self
.state_source
.database
.clone()
.world_state_source()
.expect("Unable to create world state source from database")
.new_world_state()
.await
.expect("Could not start transaction for resumed task. Panic.");
Expand Down Expand Up @@ -1094,7 +1148,10 @@ impl Inner {

// Create a new transaction.
let world_state = self
.state_source
.database
.clone()
.world_state_source()
.expect("Unable to get world source from database")
.new_world_state()
.await
.expect("Could not start transaction for resumed task. Panic.");
Expand Down Expand Up @@ -1180,7 +1237,11 @@ impl Inner {
self.next_task_id += 1;
let (task_control_sender, task_control_receiver) = tokio::sync::mpsc::unbounded_channel();

let state_source = self.state_source.clone();
let state_source = self
.database
.clone()
.world_state_source()
.expect("Unable to instantiate database");
let task_control = TaskControl {
task_id,
player,
Expand All @@ -1197,7 +1258,6 @@ impl Inner {
// TODO: support a queue-size on concurrent executing tasks and allow them to sit in an
// initially suspended state without spawning a worker thread, until the queue has space.
// Spawn the task's thread.
let state_source = self.state_source.clone();
tokio::spawn(async move {
debug!(?task_id, ?task_start, "Starting up task");
Task::run(
Expand Down
2 changes: 2 additions & 0 deletions crates/kernel/src/tasks/task_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,6 @@ pub enum SchedulerControlMsg {
player: Objid,
sender_permissions: Perms,
},
/// Task is requesting that a textdump checkpoint happen, to the configured file.
Checkpoint,
}
3 changes: 2 additions & 1 deletion crates/kernel/src/textdump/load_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::BTreeMap;
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;

use metrics_macros::increment_counter;
use tracing::{info, span, trace};
Expand Down Expand Up @@ -87,7 +88,7 @@ fn cv_aspec_flag(flags: u16) -> ArgSpec {

#[tracing::instrument(skip(ldr))]
pub async fn textdump_load(
ldr: &mut dyn LoaderInterface,
ldr: Arc<dyn LoaderInterface>,
path: &str,
) -> Result<(), TextdumpReaderError> {
let textdump_import_span = span!(tracing::Level::INFO, "textdump_import");
Expand Down
Loading

0 comments on commit 528e411

Please sign in to comment.