Skip to content

Commit

Permalink
feat: ⚡️ Reduce OPCUA subscriptions timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
fungiboletus committed Apr 5, 2024
1 parent f1d9045 commit 832eed2
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 42 deletions.
118 changes: 79 additions & 39 deletions src/ingestors/opcua/opcua_browser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,56 @@ struct BrowsingState {
found_variables: RefCell<HashMap<NodeId, bool>>,
}

fn handle_references(
node_id: NodeId,
state: Rc<BrowsingState>,
references: Vec<ReferenceDescription>,
node_ids_to_browse_again: &mut Vec<NodeId>,
) -> Result<()> {
// If the current node has children and is a variable, we may want to ignore it
if state.skip_variables_with_children && !references.is_empty() {
let mut found_variables = state.found_variables.borrow_mut();
if let Some(true) = found_variables.get(&node_id) {
// This variable has children, we should not keep it
println!("Excluding variable: {}", node_id);
found_variables.insert(node_id.clone(), false);
}
}

for reference in references {
let browse_name = reference.browse_name.name.as_ref();
// Skip references that are in another namespace, if not allowed
if !state.discover_across_namespaces
&& reference.node_id.node_id.namespace != state.namespace
{
continue;
}

// Filter out the references based on their browsing name, if the option
// is enabled.
if let Some(filter_regex) = &state.filter_regex {
if filter_regex.is_match(browse_name) {
continue;
}
}

// Add the reference to the list of nodes to browse.
node_ids_to_browse_again.push(reference.node_id.node_id.clone());

// Save the variable !
if reference.node_class == NodeClass::Variable {
let mut found_variables = state.found_variables.borrow_mut();
if found_variables.len() >= state.max_nodes {
bail!("Max number of nodes reached: {} Perhaps the OPCUA server has a loop in its organisation.", state.max_nodes);
}
println!("Found variable: {}", reference.node_id.node_id);
found_variables.insert(reference.node_id.node_id.clone(), true);
}
}

Ok(())
}

fn browse(
node_id: NodeId,
session: Arc<RwLock<Session>>,
Expand Down Expand Up @@ -92,53 +142,43 @@ fn browse(
}

for result in results {
if let Some(refs) = &result.references {
// If the current node has children and is a variable, we may want to ignore it
if state.skip_variables_with_children && !refs.is_empty() {
let mut found_variables = state.found_variables.borrow_mut();
if let Some(true) = found_variables.get(&node_id) {
// This variable has children, we should not keep it
println!("Excluding variable: {}", node_id);
found_variables.insert(node_id.clone(), false);
}
}

for reference in refs {
let browse_name = reference.browse_name.name.as_ref();
// Skip references that are in another namespace, if not allowed
if !state.discover_across_namespaces
&& reference.node_id.node_id.namespace != state.namespace
{
continue;
}

// Filter out the references based on their browsing name, if the option
// is enabled.
if let Some(filter_regex) = &state.filter_regex {
if filter_regex.is_match(browse_name) {
continue;
}
if let Some(references) = result.references {
handle_references(
node_id.clone(),
state.clone(),
references,
&mut node_ids_to_browse_again,
)?;
}
let mut continuation_point = result.continuation_point;
while !continuation_point.is_null_or_empty() {
let browse_next_results =
session_read.browse_next(true, &[continuation_point])?;
if let Some(browse_next_results) = browse_next_results {
if browse_next_results.len() != 1 {
bail!("Not the right number of continuation point results found");
}

// Add the reference to the list of nodes to browse.
node_ids_to_browse_again.push(reference.node_id.node_id.clone());

// Save the variable !
if reference.node_class == NodeClass::Variable {
let mut found_variables = state.found_variables.borrow_mut();
if found_variables.len() >= state.max_nodes {
bail!("Max number of nodes reached: {} Perhaps the OPCUA server has a loop in its organisation.", state.max_nodes);
}
println!("Found variable: {}", reference.node_id.node_id);
found_variables.insert(reference.node_id.node_id.clone(), true);
let first_result = browse_next_results
.first()
.expect("No results found")
.to_owned();
if let Some(references) = first_result.references {
handle_references(
node_id.clone(),
state.clone(),
references,
&mut node_ids_to_browse_again,
)?;
}
continuation_point = first_result.continuation_point;
} else {
break;
}
}
}
}
}

//let current_node_id = node_id;
for node_id in node_ids_to_browse_again {
browse(
node_id,
Expand Down
10 changes: 7 additions & 3 deletions src/ingestors/opcua/opcua_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,14 @@ where
.map(|identifier| identifier.into())
.collect::<Vec<MonitoredItemCreateRequest>>();

let results =
session.create_monitored_items(subscription_id, TimestampsToReturn::Both, &items)?;
let mut all_results = Vec::with_capacity(items.len());
for chunk in items.chunks(32) {
let results =
session.create_monitored_items(subscription_id, TimestampsToReturn::Both, chunk)?;
all_results.extend(results);
}

Ok((subscription_id, results))
Ok((subscription_id, all_results))
}

pub async fn opcua_client(config: OpcuaConfig, event_bus: Arc<EventBus>) -> Result<()> {
Expand Down

0 comments on commit 832eed2

Please sign in to comment.