Skip to content

Commit

Permalink
Command time
Browse files Browse the repository at this point in the history
  • Loading branch information
marcin-usielski committed Feb 1, 2024
1 parent f1a67f3 commit 1004314
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 27 deletions.
17 changes: 11 additions & 6 deletions moler/cmd/unix/genericunix.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@
__copyright__ = 'Copyright (C) 2018-2024, Nokia'
__email__ = '[email protected]'

import re
import abc
import six

from moler.cmd.commandtextualgeneric import CommandTextualGeneric
from moler.exceptions import CommandFailure
from moler.helpers import remove_all_known_special_chars
from moler.abstract_moler_connection import AbstractMolerConnection
from moler.runner import ConnectionObserverRunner
from typing import Optional, Pattern, Union, Tuple

import re
import abc
import six


cmd_failure_causes = ['not found',
'No such file or directory',
Expand All @@ -26,9 +31,9 @@

@six.add_metaclass(abc.ABCMeta)
class GenericUnixCommand(CommandTextualGeneric):
# _re_fail = re.compile(r_cmd_failure_cause_alternatives, re.IGNORECASE)

def __init__(self, connection, prompt=None, newline_chars=None, runner=None):
def __init__(self, connection: AbstractMolerConnection, prompt: Optional[Union[str, Pattern]] = None,
newline_chars: Optional[Union[list, tuple]] = None, runner: Optional[ConnectionObserverRunner] = None):
"""
:param connection: Moler connection to device, terminal when command is executed.
:param prompt: prompt (on system where command runs).
Expand All @@ -40,7 +45,7 @@ def __init__(self, connection, prompt=None, newline_chars=None, runner=None):
self.remove_all_known_special_chars_from_terminal_output = True
self.re_fail = re.compile(r_cmd_failure_cause_alternatives, re.IGNORECASE)

def _decode_line(self, line):
def _decode_line(self, line: str) -> str:
"""
Method to delete new line chars and other chars we don not need to parse in on_new_line (color escape character)
Expand Down
126 changes: 126 additions & 0 deletions moler/cmd/unix/time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# -*- coding: utf-8 -*-
"""
Command time module
"""


__author__ = 'Marcin Usielski'
__copyright__ = 'Copyright (C) 2024, Nokia'
__email__ = '[email protected]'

from moler.cmd.unix.genericunix import GenericUnixCommand
from moler.exceptions import ParsingDone
import re


class Time(GenericUnixCommand):

def __init__(self, connection, options: str=None, prompt=None, newline_chars=None, runner=None):
"""
Unix command time.
:param connection: Moler connection to device, terminal when command is executed.
:param options: Options of unix command time
:param prompt: prompt (on system where command runs).
:param newline_chars: Characters to split lines - list.
:param runner: Runner to run command.
"""
super(Time, self).__init__(connection=connection, prompt=prompt, newline_chars=newline_chars, runner=runner)
self.ret_required = False
self.options = options

def build_command_string(self):
"""
Build the command string from parameters passed to object.
:return: String representation of command to send over connection to device.
"""
cmd = "time"
if self.options is not None:
cmd = "{} {}".format(cmd, self.options)
return cmd

def on_new_line(self, line: str, is_full_line: bool) -> None:
"""
Method to parse output from command.
:param line: Line from device.
:param is_full_line: True if line is full (was the end line), False otherwise.
"""
try:
self._parse_time(line)
except ParsingDone:
pass
return super().on_new_line(line, is_full_line)

# real 0m0.000s
_re_time = re.compile(r"^\s*(?P<type>real|user|sys)\s+(?P<value>(?P<min>\d+)m(?P<sec>\d+\.\d+)s)\s*$")

def _parse_time(self, line: str):
"""
Parse time line.
:param line: Line from device.
:return: None
"""
if self._regex_helper.search_compiled(Time._re_time, line):
type = self._regex_helper.group('type')
self.current_ret[type] = dict()
self.current_ret[type]['raw'] = self._regex_helper.group('value')
self.current_ret[type]['sec'] = float(self._regex_helper.group('min')) * 60 + float(self._regex_helper.group('sec')) # noqa
raise ParsingDone()


COMMAND_OUTPUT = """time
real 0m0.000s
user 0m0.000s
sys 0m0.000s
moler_bash#"""


COMMAND_KWARGS = {}


COMMAND_RESULT = {
'real': {
'raw': '0m0.000s',
'sec': 0.
},
'user': {
'raw': '0m0.000s',
'sec': 0.
},
'sys': {
'raw': '0m0.000s',
'sec': 0.
}
}


COMMAND_OUTPUT_sleep_60 = """time sleep 60
real 1m0.004s
user 0m0.001s
sys 0m0.002s
moler_bash#"""


COMMAND_KWARGS_sleep_60 = {
'options': 'sleep 60',
}


COMMAND_RESULT_sleep_60 = {
'real': {
'raw': '1m0.004s',
'sec': 60.004
},
'user': {
'raw': '0m0.001s',
'sec': 0.001
},
'sys': {
'raw': '0m0.002s',
'sec': 0.002
}
}
42 changes: 21 additions & 21 deletions moler/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@
import importlib
import logging
import re
import sys
import collections.abc
from functools import wraps
from types import FunctionType, MethodType
from six import string_types
from math import isclose
from typing import Any


class ClassProperty(property):
def __get__(self, cls, owner):
return classmethod(self.fget).__get__(None, owner)()


def copy_list(src, deep_copy=False):
def copy_list(src, deep_copy: bool = False) -> list:
"""
Copies list, if None then returns empty list
:param src: List to copy
Expand All @@ -38,7 +38,7 @@ def copy_list(src, deep_copy=False):
return list(src)


def copy_dict(src, deep_copy=False):
def copy_dict(src, deep_copy: bool = False) -> dict:
"""
Copies dict, if None then returns empty dict
:param src: List to copy
Expand All @@ -61,7 +61,7 @@ def instance_id(instance):
return instance_id


def camel_case_to_lower_case_underscore(string):
def camel_case_to_lower_case_underscore(string: str) -> str:
"""
Split string by upper case letters.
F.e. useful to convert camel case strings to underscore separated ones.
Expand All @@ -81,7 +81,7 @@ def camel_case_to_lower_case_underscore(string):
_re_escape_codes_cursor = re.compile(r"\x1B(([\dA-F]+)|(\[\d+;\d+r)|(J))")


def remove_escape_codes(line):
def remove_escape_codes(line: str) -> str:
"""
:param line: line from terminal
:return: line without terminal escape codes
Expand All @@ -98,7 +98,7 @@ def remove_escape_codes(line):
_re_cursor_visibility_codes = re.compile(r"\x1B\[\?(12|25)[hl]")


def remove_cursor_visibility_codes(multiline):
def remove_cursor_visibility_codes(multiline: str) -> str:
"""
:param multiline: string from terminal holding single or multiple lines
:return: line(s) without terminal escape codes related to cursor visibility
Expand All @@ -112,7 +112,7 @@ def remove_cursor_visibility_codes(multiline):
_re_text_formatting_codes = re.compile(r"\x1B\[\d*m")


def remove_text_formatting_codes(multiline):
def remove_text_formatting_codes(multiline: str) -> str:
"""
:param multiline: string from terminal holding single or multiple lines
:return: line(s) without terminal escape codes related to text formatting
Expand All @@ -126,7 +126,7 @@ def remove_text_formatting_codes(multiline):
_re_console_title_codes = re.compile(r"\x1B\][02];[^\x07]+\x07")


def remove_window_title_codes(multiline):
def remove_window_title_codes(multiline: str) -> str:
"""
:param multiline: string from terminal holding single or multiple lines
:return: line(s) without terminal escape codes setting console window/icon title
Expand All @@ -140,7 +140,7 @@ def remove_window_title_codes(multiline):
_re_space_fill_to_right_margin = re.compile(r"(\x1B\[\d+[XC])+(\r|\n)")


def remove_fill_spaces_right_codes(multiline):
def remove_fill_spaces_right_codes(multiline: str) -> str:
"""
:param multiline: string from terminal holding single or multiple lines
:return: line(s) without spaces added till right VT-screen margin
Expand All @@ -153,7 +153,7 @@ def remove_fill_spaces_right_codes(multiline):
_re_overwritten_left_writes = re.compile(r"^[^\n\r]*\x1B\[H(.)", flags=re.DOTALL | re.MULTILINE)


def remove_overwritten_left_write(multiline):
def remove_overwritten_left_write(multiline: str) -> str:
"""
:param multiline: string from terminal holding single or multiple lines
:return: line without spaces added till right VT-screen margin
Expand All @@ -165,7 +165,7 @@ def remove_overwritten_left_write(multiline):
_re_remove_terminal_last_cmd_status = re.compile(r'\x1b]777;notify;.*\x07')


def remove_terminal_last_cmd_status(line):
def remove_terminal_last_cmd_status(line: str) -> str:
"""
:param line: line from terminal
:return: line without terminal last cmd status
Expand All @@ -174,7 +174,7 @@ def remove_terminal_last_cmd_status(line):
return line


def remove_all_known_special_chars(line):
def remove_all_known_special_chars(line: str) -> str:
"""
:param line: line from terminal
:return: line without all known special chars
Expand All @@ -189,7 +189,7 @@ def remove_all_known_special_chars(line):
return line


def create_object_from_name(full_class_name, constructor_params):
def create_object_from_name(full_class_name: str, constructor_params: dict) -> object:
name_splitted = full_class_name.split('.')
module_name = ".".join(name_splitted[:-1])
class_name = name_splitted[-1]
Expand All @@ -200,7 +200,7 @@ def create_object_from_name(full_class_name, constructor_params):
return obj


def update_dict(target_dict, expand_dict):
def update_dict(target_dict: dict, expand_dict: dict) -> dict:
for key, value in expand_dict.items():
if (key in target_dict and isinstance(target_dict[key], dict) and isinstance(expand_dict[key],
collections.abc.Mapping)):
Expand All @@ -209,8 +209,8 @@ def update_dict(target_dict, expand_dict):
target_dict[key] = expand_dict[key]


def compare_objects(first_object, second_object, significant_digits=None,
exclude_types=None):
def compare_objects(first_object: Any, second_object: Any, significant_digits=None,
exclude_types=None) -> str:
"""
Return difference between two objects.
:param first_object: first object to compare
Expand All @@ -228,8 +228,8 @@ def compare_objects(first_object, second_object, significant_digits=None,
return diff


def diff_data(first_object, second_object, significant_digits=None,
exclude_types=None, msg=None):
def diff_data(first_object: Any, second_object: Any, significant_digits=None,
exclude_types=None, msg=None) -> str:
"""
Compare two objects recursively and return a message indicating any differences.
Expand Down Expand Up @@ -281,8 +281,8 @@ def diff_data(first_object, second_object, significant_digits=None,
return ""


def _compare_dicts(first_object, second_object, msg, significant_digits=None,
exclude_types=None):
def _compare_dicts(first_object: dict, second_object: dict, msg: str, significant_digits=None,
exclude_types=None) -> str:
"""
Compare two dictionaries recursively and return a message indicating any
differences.
Expand Down Expand Up @@ -321,7 +321,7 @@ def _compare_dicts(first_object, second_object, msg, significant_digits=None,
return ""


def _compare_sets(first_object, second_object, msg):
def _compare_sets(first_object: set, second_object: set, msg: str) -> str:
"""
Compare two sets.
Expand Down

0 comments on commit 1004314

Please sign in to comment.