+name: run unittests
+on: ["push"]
+ test:
+ runs-on: "ubuntu-latest"
+ steps:
+ - uses: actions/checkout@v4
+ - uses: actions/setup-python@v4
+ with:
+ python-version: 3.11
+ - name: Install dependencies
+ run: pip install -r prototyping/setup/requirements.txt
+ - run: python -m unittest discover -v -s prototyping/test
+# Prerequisites
+# Compiled Object files
+# Precompiled Headers
+# Compiled Dynamic libraries
+# Fortran module files
+# Compiled Static libraries
+# Executables
+# Virtual environment
+# Models
+# Pycache
+# Log folder
+# COCO dataset
+# Surround dataset
+# ZIP fodlers
+# Vscode cache
+# Irrevelant assets
+# Mac folders
entry: cpplint
+ - repo: https://github.com/PyCQA/isort
+ rev: 5.12.0
+ hooks:
+ - id: isort
+ args:
+ - --filter-files
+ - repo: https://github.com/astral-sh/ruff-pre-commit
+ rev: v0.1.1
+ hooks:
+ - id: ruff
+ args:
+ - --fix
+ - --exit-non-zero-on-fix
+ - repo: https://github.com/cheshirekow/cmake-format-precommit
+ rev: v0.6.13
+ hooks:
+ - id: cmake-format
+ - repo: https://github.com/pre-commit/mirrors-clang-format
+ rev: v17.0.3
+ hooks:
+ - id: clang-format
+ args:
+ - --style=Google
+ - repo: https://github.com/pre-commit/pre-commit-hooks
+ rev: v4.5.0
+ hooks:
+ - id: check-added-large-files
+ - id: check-ast
+ - id: check-executables-have-shebangs
+ - id: check-json
+ - id: check-merge-conflict
+ - id: check-symlinks
+ - id: check-toml
+ - id: check-yaml
+ - id: debug-statements
+ - id: destroyed-symlinks
+ - id: detect-private-key
+ - id: end-of-file-fixer
+ - id: fix-byte-order-marker
+ - id: mixed-line-ending
+ - id: trailing-whitespace
+ - repo: https://github.com/psf/black
+ rev: 23.10.0
+ hooks:
+ - id: black
\ No newline at end of file
+## Prototypage Python
+Pour installer l'environnement virtuel nécessaire au projet, exécutez la commande suivante dans le dossier `prototyping` du dossier du projet :
+bash ./setup/venv_install.sh
+Afin de désinstaller la venv, la procédure est identique mais le script à lancer est `venv_uninstall.sh`, toujours dans le dossier `prototyping`.
+bash ./setup/venv_uninstall.sh
+Le but actuel de cette venv n'est pas de faire tourner le workspace ros, mais de pouvoir avoir un environnement commun aux prototypages.
+# Copyright 2016-2019 Dirk Thomas
+# Licensed under the Apache License, Version 2.0
+import argparse
+from collections import OrderedDict
+import os
+from pathlib import Path
+import sys
+FORMAT_STR_COMMENT_LINE = '# {comment}'
+FORMAT_STR_SET_ENV_VAR = 'export {name}="{value}"'
+FORMAT_STR_INVOKE_SCRIPT = 'COLCON_CURRENT_PREFIX="{prefix}" _colcon_prefix_sh_source_script "{script_path}"'
+FORMAT_STR_REMOVE_LEADING_SEPARATOR = 'if [ "$(echo -n ${name} | head -c 1)" = ":" ]; then export {name}=${{{name}#?}} ; fi'
+FORMAT_STR_REMOVE_TRAILING_SEPARATOR = 'if [ "$(echo -n ${name} | tail -c 1)" = ":" ]; then export {name}=${{{name}%?}} ; fi'
+DSV_TYPE_APPEND_NON_DUPLICATE = 'append-non-duplicate'
+DSV_TYPE_PREPEND_NON_DUPLICATE = 'prepend-non-duplicate'
+DSV_TYPE_PREPEND_NON_DUPLICATE_IF_EXISTS = 'prepend-non-duplicate-if-exists'
+DSV_TYPE_SET = 'set'
+DSV_TYPE_SET_IF_UNSET = 'set-if-unset'
+DSV_TYPE_SOURCE = 'source'
+def main(argv=sys.argv[1:]): # noqa: D103
+ parser = argparse.ArgumentParser(
+ description='Output shell commands for the packages in topological '
+ 'order')
+ parser.add_argument(
+ 'primary_extension',
+ help='The file extension of the primary shell')
+ parser.add_argument(
+ 'additional_extension', nargs='?',
+ help='The additional file extension to be considered')
+ parser.add_argument(
+ '--merged-install', action='store_true',
+ help='All install prefixes are merged into a single location')
+ args = parser.parse_args(argv)
+ packages = get_packages(Path(__file__).parent, args.merged_install)
+ ordered_packages = order_packages(packages)
+ for pkg_name in ordered_packages:
+ if _include_comments():
+ print(
+ {'comment': 'Package: ' + pkg_name}))
+ prefix = os.path.abspath(os.path.dirname(__file__))
+ if not args.merged_install:
+ prefix = os.path.join(prefix, pkg_name)
+ for line in get_commands(
+ pkg_name, prefix, args.primary_extension,
+ args.additional_extension
+ ):
+ print(line)
+ for line in _remove_ending_separators():
+ print(line)
+def get_packages(prefix_path, merged_install):
+ """
+ Find packages based on colcon-specific files created during installation.
+ :param Path prefix_path: The install prefix path of all packages
+ :param bool merged_install: The flag if the packages are all installed
+ directly in the prefix or if each package is installed in a subdirectory
+ named after the package
+ :returns: A mapping from the package name to the set of runtime
+ dependencies
+ :rtype: dict
+ """
+ packages = {}
+ # since importing colcon_core isn't feasible here the following constant
+ # must match colcon_core.location.get_relative_package_index_path()
+ subdirectory = 'share/colcon-core/packages'
+ if merged_install:
+ # return if workspace is empty
+ if not (prefix_path / subdirectory).is_dir():
+ return packages
+ # find all files in the subdirectory
+ for p in (prefix_path / subdirectory).iterdir():
+ if not p.is_file():
+ continue
+ if p.name.startswith('.'):
+ continue
+ add_package_runtime_dependencies(p, packages)
+ else:
+ # for each subdirectory look for the package specific file
+ for p in prefix_path.iterdir():
+ if not p.is_dir():
+ continue
+ if p.name.startswith('.'):
+ continue
+ p = p / subdirectory / p.name
+ if p.is_file():
+ add_package_runtime_dependencies(p, packages)
+ # remove unknown dependencies
+ pkg_names = set(packages.keys())
+ for k in packages.keys():
+ packages[k] = {d for d in packages[k] if d in pkg_names}
+ return packages
+def add_package_runtime_dependencies(path, packages):
+ """
+ Check the path and if it exists extract the packages runtime dependencies.
+ :param Path path: The resource file containing the runtime dependencies
+ :param dict packages: A mapping from package names to the sets of runtime
+ dependencies to add to
+ """
+ content = path.read_text()
+ dependencies = set(content.split(os.pathsep) if content else [])
+ packages[path.name] = dependencies
+def order_packages(packages):
+ """
+ Order packages topologically.
+ :param dict packages: A mapping from package name to the set of runtime
+ dependencies
+ :returns: The package names
+ :rtype: list
+ """
+ # select packages with no dependencies in alphabetical order
+ to_be_ordered = list(packages.keys())
+ ordered = []
+ while to_be_ordered:
+ pkg_names_without_deps = [
+ name for name in to_be_ordered if not packages[name]]
+ if not pkg_names_without_deps:
+ reduce_cycle_set(packages)
+ raise RuntimeError(
+ 'Circular dependency between: ' + ', '.join(sorted(packages)))
+ pkg_names_without_deps.sort()
+ pkg_name = pkg_names_without_deps[0]
+ to_be_ordered.remove(pkg_name)
+ ordered.append(pkg_name)
+ # remove item from dependency lists
+ for k in list(packages.keys()):
+ if pkg_name in packages[k]:
+ packages[k].remove(pkg_name)
+ return ordered
+def reduce_cycle_set(packages):
+ """
+ Reduce the set of packages to the ones part of the circular dependency.
+ :param dict packages: A mapping from package name to the set of runtime
+ dependencies which is modified in place
+ """
+ last_depended = None
+ while len(packages) > 0:
+ # get all remaining dependencies
+ depended = set()
+ for pkg_name, dependencies in packages.items():
+ depended = depended.union(dependencies)
+ # remove all packages which are not dependent on
+ for name in list(packages.keys()):
+ if name not in depended:
+ del packages[name]
+ if last_depended:
+ # if remaining packages haven't changed return them
+ if last_depended == depended:
+ return packages.keys()
+ # otherwise reduce again
+ last_depended = depended
+def _include_comments():
+ # skipping comment lines when COLCON_TRACE is not set speeds up the
+ # processing especially on Windows
+ return bool(os.environ.get('COLCON_TRACE'))
+def get_commands(pkg_name, prefix, primary_extension, additional_extension):
+ commands = []
+ package_dsv_path = os.path.join(prefix, 'share', pkg_name, 'package.dsv')
+ if os.path.exists(package_dsv_path):
+ commands += process_dsv_file(
+ package_dsv_path, prefix, primary_extension, additional_extension)
+ return commands
+def process_dsv_file(
+ dsv_path, prefix, primary_extension=None, additional_extension=None
+ commands = []
+ if _include_comments():
+ commands.append(FORMAT_STR_COMMENT_LINE.format_map({'comment': dsv_path}))
+ with open(dsv_path, 'r') as h:
+ content = h.read()
+ lines = content.splitlines()
+ basenames = OrderedDict()
+ for i, line in enumerate(lines):
+ # skip over empty or whitespace-only lines
+ if not line.strip():
+ continue
+ # skip over comments
+ if line.startswith('#'):
+ continue
+ try:
+ type_, remainder = line.split(';', 1)
+ except ValueError:
+ raise RuntimeError(
+ "Line %d in '%s' doesn't contain a semicolon separating the "
+ 'type from the arguments' % (i + 1, dsv_path))
+ if type_ != DSV_TYPE_SOURCE:
+ # handle non-source lines
+ try:
+ commands += handle_dsv_types_except_source(
+ type_, remainder, prefix)
+ except RuntimeError as e:
+ raise RuntimeError(
+ "Line %d in '%s' %s" % (i + 1, dsv_path, e)) from e
+ else:
+ # group remaining source lines by basename
+ path_without_ext, ext = os.path.splitext(remainder)
+ if path_without_ext not in basenames:
+ basenames[path_without_ext] = set()
+ assert ext.startswith('.')
+ ext = ext[1:]
+ if ext in (primary_extension, additional_extension):
+ basenames[path_without_ext].add(ext)
+ # add the dsv extension to each basename if the file exists
+ for basename, extensions in basenames.items():
+ if not os.path.isabs(basename):
+ basename = os.path.join(prefix, basename)
+ if os.path.exists(basename + '.dsv'):
+ extensions.add('dsv')
+ for basename, extensions in basenames.items():
+ if not os.path.isabs(basename):
+ basename = os.path.join(prefix, basename)
+ if 'dsv' in extensions:
+ # process dsv files recursively
+ commands += process_dsv_file(
+ basename + '.dsv', prefix, primary_extension=primary_extension,
+ additional_extension=additional_extension)
+ elif primary_extension in extensions and len(extensions) == 1:
+ # source primary-only files
+ commands += [
+ 'prefix': prefix,
+ 'script_path': basename + '.' + primary_extension})]
+ elif additional_extension in extensions:
+ # source non-primary files
+ commands += [
+ 'prefix': prefix,
+ 'script_path': basename + '.' + additional_extension})]
+ return commands
+def handle_dsv_types_except_source(type_, remainder, prefix):
+ commands = []
+ try:
+ env_name, value = remainder.split(';', 1)
+ except ValueError:
+ raise RuntimeError(
+ "doesn't contain a semicolon separating the environment name "
+ 'from the value')
+ try_prefixed_value = os.path.join(prefix, value) if value else prefix
+ if os.path.exists(try_prefixed_value):
+ value = try_prefixed_value
+ if type_ == DSV_TYPE_SET:
+ commands += _set(env_name, value)
+ elif type_ == DSV_TYPE_SET_IF_UNSET:
+ commands += _set_if_unset(env_name, value)
+ else:
+ assert False
+ elif type_ in (
+ ):
+ try:
+ env_name_and_values = remainder.split(';')
+ except ValueError:
+ raise RuntimeError(
+ "doesn't contain a semicolon separating the environment name "
+ 'from the values')
+ env_name = env_name_and_values[0]
+ values = env_name_and_values[1:]
+ for value in values:
+ if not value:
+ value = prefix
+ elif not os.path.isabs(value):
+ value = os.path.join(prefix, value)
+ if (
+ not os.path.exists(value)
+ ):
+ comment = f'skip extending {env_name} with not existing ' \
+ f'path: {value}'
+ if _include_comments():
+ commands.append(
+ FORMAT_STR_COMMENT_LINE.format_map({'comment': comment}))
+ commands += _append_unique_value(env_name, value)
+ else:
+ commands += _prepend_unique_value(env_name, value)
+ else:
+ raise RuntimeError(
+ 'contains an unknown environment hook type: ' + type_)
+ return commands
+env_state = {}
+def _append_unique_value(name, value):
+ global env_state
+ if name not in env_state:
+ if os.environ.get(name):
+ env_state[name] = set(os.environ[name].split(os.pathsep))
+ else:
+ env_state[name] = set()
+ # append even if the variable has not been set yet, in case a shell script sets the
+ # same variable without the knowledge of this Python script.
+ # later _remove_ending_separators() will cleanup any unintentional leading separator
+ extend = FORMAT_STR_USE_ENV_VAR.format_map({'name': name}) + os.pathsep
+ line = FORMAT_STR_SET_ENV_VAR.format_map(
+ {'name': name, 'value': extend + value})
+ if value not in env_state[name]:
+ env_state[name].add(value)
+ else:
+ if not _include_comments():
+ return []
+ line = FORMAT_STR_COMMENT_LINE.format_map({'comment': line})
+ return [line]
+def _prepend_unique_value(name, value):
+ global env_state
+ if name not in env_state:
+ if os.environ.get(name):
+ env_state[name] = set(os.environ[name].split(os.pathsep))
+ else:
+ env_state[name] = set()
+ # prepend even if the variable has not been set yet, in case a shell script sets the
+ # same variable without the knowledge of this Python script.
+ # later _remove_ending_separators() will cleanup any unintentional trailing separator
+ extend = os.pathsep + FORMAT_STR_USE_ENV_VAR.format_map({'name': name})
+ line = FORMAT_STR_SET_ENV_VAR.format_map(
+ {'name': name, 'value': value + extend})
+ if value not in env_state[name]:
+ env_state[name].add(value)
+ else:
+ if not _include_comments():
+ return []
+ line = FORMAT_STR_COMMENT_LINE.format_map({'comment': line})
+ return [line]
+# generate commands for removing prepended underscores
+def _remove_ending_separators():
+ # do nothing if the shell extension does not implement the logic
+ return []
+ global env_state
+ commands = []
+ for name in env_state:
+ # skip variables that already had values before this script started prepending
+ if name in os.environ:
+ continue
+ commands += [
+ FORMAT_STR_REMOVE_LEADING_SEPARATOR.format_map({'name': name}),
+ FORMAT_STR_REMOVE_TRAILING_SEPARATOR.format_map({'name': name})]
+ return commands
+def _set(name, value):
+ global env_state
+ env_state[name] = value
+ line = FORMAT_STR_SET_ENV_VAR.format_map(
+ {'name': name, 'value': value})
+ return [line]
+def _set_if_unset(name, value):
+ global env_state
+ line = FORMAT_STR_SET_ENV_VAR.format_map(
+ {'name': name, 'value': value})
+ if env_state.get(name, os.environ.get(name)):
+ line = FORMAT_STR_COMMENT_LINE.format_map({'comment': line})
+ return [line]
+if __name__ == '__main__': # pragma: no cover
+ try:
+ rc = main()
+ except RuntimeError as e:
+ print(str(e), file=sys.stderr)
+ rc = 1
+ sys.exit(rc)
+# generated from colcon_bash/shell/template/prefix.bash.em
+# This script extends the environment with all packages contained in this
+# prefix path.
+# a bash script is able to determine its own path if necessary
+if [ -z "$COLCON_CURRENT_PREFIX" ]; then
+ _colcon_prefix_bash_COLCON_CURRENT_PREFIX="$(builtin cd "`dirname "${BASH_SOURCE[0]}"`" > /dev/null && pwd)"
+# function to prepend a value to a variable
+# which uses colons as separators
+# duplicates as well as trailing separators are avoided
+# first argument: the name of the result variable
+# second argument: the value to be prepended
+_colcon_prefix_bash_prepend_unique_value() {
+ # arguments
+ _listname="$1"
+ _value="$2"
+ # get values from variable
+ eval _values=\"\$$_listname\"
+ # backup the field separator
+ _colcon_prefix_bash_prepend_unique_value_IFS="$IFS"
+ IFS=":"
+ # start with the new value
+ _all_values="$_value"
+ _contained_value=""
+ # iterate over existing values in the variable
+ for _item in $_values; do
+ # ignore empty strings
+ if [ -z "$_item" ]; then
+ continue
+ fi
+ # ignore duplicates of _value
+ if [ "$_item" = "$_value" ]; then
+ _contained_value=1
+ continue
+ fi
+ # keep non-duplicate values
+ _all_values="$_all_values:$_item"
+ done
+ unset _item
+ if [ -z "$_contained_value" ]; then
+ if [ -n "$COLCON_TRACE" ]; then
+ if [ "$_all_values" = "$_value" ]; then
+ echo "export $_listname=$_value"
+ else
+ echo "export $_listname=$_value:\$$_listname"
+ fi
+ fi
+ fi
+ unset _contained_value
+ # restore the field separator
+ IFS="$_colcon_prefix_bash_prepend_unique_value_IFS"
+ unset _colcon_prefix_bash_prepend_unique_value_IFS
+ # export the updated variable
+ eval export $_listname=\"$_all_values\"
+ unset _all_values
+ unset _values
+ unset _value
+ unset _listname
+# add this prefix to the COLCON_PREFIX_PATH
+_colcon_prefix_bash_prepend_unique_value COLCON_PREFIX_PATH "$_colcon_prefix_bash_COLCON_CURRENT_PREFIX"
+unset _colcon_prefix_bash_prepend_unique_value
+# check environment variable for custom Python executable
+if [ -n "$COLCON_PYTHON_EXECUTABLE" ]; then
+ if [ ! -f "$COLCON_PYTHON_EXECUTABLE" ]; then
+ return 1
+ fi
+ _colcon_python_executable="$COLCON_PYTHON_EXECUTABLE"
+ # try the Python executable known at configure time
+ _colcon_python_executable="/usr/bin/python3"
+ # if it doesn't exist try a fall back
+ if [ ! -f "$_colcon_python_executable" ]; then
+ if ! /usr/bin/env python3 --version > /dev/null 2> /dev/null; then
+ echo "error: unable to find python3 executable"
+ return 1
+ fi
+ _colcon_python_executable=`/usr/bin/env python3 -c "import sys; print(sys.executable)"`
+ fi
+# function to source another script with conditional trace output
+# first argument: the path of the script
+_colcon_prefix_sh_source_script() {
+ if [ -f "$1" ]; then
+ if [ -n "$COLCON_TRACE" ]; then
+ echo "# . \"$1\""
+ fi
+ . "$1"
+ else
+ echo "not found: \"$1\"" 1>&2
+ fi
+# get all commands in topological order
+_colcon_ordered_commands="$($_colcon_python_executable "$_colcon_prefix_bash_COLCON_CURRENT_PREFIX/_local_setup_util_sh.py" sh bash)"
+unset _colcon_python_executable
+if [ -n "$COLCON_TRACE" ]; then
+ echo "$(declare -f _colcon_prefix_sh_source_script)"
+ echo "# Execute generated script:"
+ echo "# <<<"
+ echo "${_colcon_ordered_commands}"
+ echo "# >>>"
+ echo "unset _colcon_prefix_sh_source_script"
+eval "${_colcon_ordered_commands}"
+unset _colcon_ordered_commands
+unset _colcon_prefix_sh_source_script
+unset _colcon_prefix_bash_COLCON_CURRENT_PREFIX
+# generated from colcon_powershell/shell/template/prefix.ps1.em
+# This script extends the environment with all packages contained in this
+# prefix path.
+# check environment variable for custom Python executable
+ if (!(Test-Path "$env:COLCON_PYTHON_EXECUTABLE" -PathType Leaf)) {
+ exit 1
+ }
+ $_colcon_python_executable="$env:COLCON_PYTHON_EXECUTABLE"
+} else {
+ # use the Python executable known at configure time
+ $_colcon_python_executable="/usr/bin/python3"
+ # if it doesn't exist try a fall back
+ if (!(Test-Path "$_colcon_python_executable" -PathType Leaf)) {
+ if (!(Get-Command "python3" -ErrorAction SilentlyContinue)) {
+ echo "error: unable to find python3 executable"
+ exit 1
+ }
+ $_colcon_python_executable="python3"
+ }
+# function to source another script with conditional trace output
+# first argument: the path of the script
+function _colcon_prefix_powershell_source_script {
+ param (
+ $_colcon_prefix_powershell_source_script_param
+ )
+ # source script with conditional trace output
+ if (Test-Path $_colcon_prefix_powershell_source_script_param) {
+ if ($env:COLCON_TRACE) {
+ echo ". '$_colcon_prefix_powershell_source_script_param'"
+ }
+ . "$_colcon_prefix_powershell_source_script_param"
+ } else {
+ Write-Error "not found: '$_colcon_prefix_powershell_source_script_param'"
+ }
+# get all commands in topological order
+$_colcon_ordered_commands = & "$_colcon_python_executable" "$(Split-Path $PSCommandPath -Parent)/_local_setup_util_ps1.py" ps1
+# execute all commands in topological order
+if ($env:COLCON_TRACE) {
+ echo "Execute generated script:"
+ echo "<<<"
+ $_colcon_ordered_commands.Split([Environment]::NewLine, [StringSplitOptions]::RemoveEmptyEntries) | Write-Output
+ echo ">>>"
+if ($_colcon_ordered_commands) {
+ $_colcon_ordered_commands.Split([Environment]::NewLine, [StringSplitOptions]::RemoveEmptyEntries) | Invoke-Expression
+# generated from colcon_core/shell/template/prefix.sh.em
+# This script extends the environment with all packages contained in this
+# prefix path.
+# since a plain shell script can't determine its own path when being sourced
+# either use the provided COLCON_CURRENT_PREFIX
+# or fall back to the build time prefix (if it exists)
+if [ -z "$COLCON_CURRENT_PREFIX" ]; then
+ if [ ! -d "$_colcon_prefix_sh_COLCON_CURRENT_PREFIX" ]; then
+ echo "The build time path \"$_colcon_prefix_sh_COLCON_CURRENT_PREFIX\" doesn't exist. Either source a script for a different shell or set the environment variable \"COLCON_CURRENT_PREFIX\" explicitly." 1>&2
+ unset _colcon_prefix_sh_COLCON_CURRENT_PREFIX
+ return 1
+ fi
+# function to prepend a value to a variable
+# which uses colons as separators
+# duplicates as well as trailing separators are avoided
+# first argument: the name of the result variable
+# second argument: the value to be prepended
+_colcon_prefix_sh_prepend_unique_value() {
+ # arguments
+ _listname="$1"
+ _value="$2"
+ # get values from variable
+ eval _values=\"\$$_listname\"
+ # backup the field separator
+ _colcon_prefix_sh_prepend_unique_value_IFS="$IFS"
+ IFS=":"
+ # start with the new value
+ _all_values="$_value"
+ _contained_value=""
+ # iterate over existing values in the variable
+ for _item in $_values; do
+ # ignore empty strings
+ if [ -z "$_item" ]; then
+ continue
+ fi
+ # ignore duplicates of _value
+ if [ "$_item" = "$_value" ]; then
+ _contained_value=1
+ continue
+ fi
+ # keep non-duplicate values
+ _all_values="$_all_values:$_item"
+ done
+ unset _item
+ if [ -z "$_contained_value" ]; then
+ if [ -n "$COLCON_TRACE" ]; then
+ if [ "$_all_values" = "$_value" ]; then
+ echo "export $_listname=$_value"
+ else
+ echo "export $_listname=$_value:\$$_listname"
+ fi
+ fi
+ fi
+ unset _contained_value
+ # restore the field separator
+ IFS="$_colcon_prefix_sh_prepend_unique_value_IFS"
+ unset _colcon_prefix_sh_prepend_unique_value_IFS
+ # export the updated variable
+ eval export $_listname=\"$_all_values\"
+ unset _all_values
+ unset _values
+ unset _value
+ unset _listname
+# add this prefix to the COLCON_PREFIX_PATH
+_colcon_prefix_sh_prepend_unique_value COLCON_PREFIX_PATH "$_colcon_prefix_sh_COLCON_CURRENT_PREFIX"
+unset _colcon_prefix_sh_prepend_unique_value
+# check environment variable for custom Python executable
+if [ -n "$COLCON_PYTHON_EXECUTABLE" ]; then
+ if [ ! -f "$COLCON_PYTHON_EXECUTABLE" ]; then
+ return 1
+ fi
+ _colcon_python_executable="$COLCON_PYTHON_EXECUTABLE"
+ # try the Python executable known at configure time
+ _colcon_python_executable="/usr/bin/python3"
+ # if it doesn't exist try a fall back
+ if [ ! -f "$_colcon_python_executable" ]; then
+ if ! /usr/bin/env python3 --version > /dev/null 2> /dev/null; then
+ echo "error: unable to find python3 executable"
+ return 1
+ fi
+ _colcon_python_executable=`/usr/bin/env python3 -c "import sys; print(sys.executable)"`
+ fi
+# function to source another script with conditional trace output
+# first argument: the path of the script
+_colcon_prefix_sh_source_script() {
+ if [ -f "$1" ]; then
+ if [ -n "$COLCON_TRACE" ]; then
+ echo "# . \"$1\""
+ fi
+ . "$1"
+ else
+ echo "not found: \"$1\"" 1>&2
+ fi
+# get all commands in topological order
+_colcon_ordered_commands="$($_colcon_python_executable "$_colcon_prefix_sh_COLCON_CURRENT_PREFIX/_local_setup_util_sh.py" sh)"
+unset _colcon_python_executable
+if [ -n "$COLCON_TRACE" ]; then
+ echo "_colcon_prefix_sh_source_script() {
+ if [ -f \"\$1\" ]; then
+ if [ -n \"\$COLCON_TRACE\" ]; then
+ echo \"# . \\\"\$1\\\"\"
+ fi
+ . \"\$1\"
+ else
+ echo \"not found: \\\"\$1\\\"\" 1>&2
+ fi
+ }"
+ echo "# Execute generated script:"
+ echo "# <<<"
+ echo "${_colcon_ordered_commands}"
+ echo "# >>>"
+ echo "unset _colcon_prefix_sh_source_script"
+eval "${_colcon_ordered_commands}"
+unset _colcon_ordered_commands
+unset _colcon_prefix_sh_source_script
+unset _colcon_prefix_sh_COLCON_CURRENT_PREFIX
+# generated from colcon_zsh/shell/template/prefix.zsh.em
+# This script extends the environment with all packages contained in this
+# prefix path.
+# a zsh script is able to determine its own path if necessary
+if [ -z "$COLCON_CURRENT_PREFIX" ]; then
+ _colcon_prefix_zsh_COLCON_CURRENT_PREFIX="$(builtin cd -q "`dirname "${(%):-%N}"`" > /dev/null && pwd)"
+# function to convert array-like strings into arrays
+# to workaround SH_WORD_SPLIT not being set
+_colcon_prefix_zsh_convert_to_array() {
+ local _listname=$1
+ local _dollar="$"
+ local _split="{="
+ local _to_array="(\"$_dollar$_split$_listname}\")"
+ eval $_listname=$_to_array
+# function to prepend a value to a variable
+# which uses colons as separators
+# duplicates as well as trailing separators are avoided
+# first argument: the name of the result variable
+# second argument: the value to be prepended
+_colcon_prefix_zsh_prepend_unique_value() {
+ # arguments
+ _listname="$1"
+ _value="$2"
+ # get values from variable
+ eval _values=\"\$$_listname\"
+ # backup the field separator
+ _colcon_prefix_zsh_prepend_unique_value_IFS="$IFS"
+ IFS=":"
+ # start with the new value
+ _all_values="$_value"
+ _contained_value=""
+ # workaround SH_WORD_SPLIT not being set
+ _colcon_prefix_zsh_convert_to_array _values
+ # iterate over existing values in the variable
+ for _item in $_values; do
+ # ignore empty strings
+ if [ -z "$_item" ]; then
+ continue
+ fi
+ # ignore duplicates of _value
+ if [ "$_item" = "$_value" ]; then
+ _contained_value=1
+ continue
+ fi
+ # keep non-duplicate values
+ _all_values="$_all_values:$_item"
+ done
+ unset _item
+ if [ -z "$_contained_value" ]; then
+ if [ -n "$COLCON_TRACE" ]; then
+ if [ "$_all_values" = "$_value" ]; then
+ echo "export $_listname=$_value"
+ else
+ echo "export $_listname=$_value:\$$_listname"
+ fi
+ fi
+ fi
+ unset _contained_value
+ # restore the field separator
+ IFS="$_colcon_prefix_zsh_prepend_unique_value_IFS"
+ unset _colcon_prefix_zsh_prepend_unique_value_IFS
+ # export the updated variable
+ eval export $_listname=\"$_all_values\"
+ unset _all_values
+ unset _values
+ unset _value
+ unset _listname
+# add this prefix to the COLCON_PREFIX_PATH
+_colcon_prefix_zsh_prepend_unique_value COLCON_PREFIX_PATH "$_colcon_prefix_zsh_COLCON_CURRENT_PREFIX"
+unset _colcon_prefix_zsh_prepend_unique_value
+unset _colcon_prefix_zsh_convert_to_array
+# check environment variable for custom Python executable
+if [ -n "$COLCON_PYTHON_EXECUTABLE" ]; then
+ if [ ! -f "$COLCON_PYTHON_EXECUTABLE" ]; then
+ return 1
+ fi
+ _colcon_python_executable="$COLCON_PYTHON_EXECUTABLE"
+ # try the Python executable known at configure time
+ _colcon_python_executable="/usr/bin/python3"
+ # if it doesn't exist try a fall back
+ if [ ! -f "$_colcon_python_executable" ]; then
+ if ! /usr/bin/env python3 --version > /dev/null 2> /dev/null; then
+ echo "error: unable to find python3 executable"
+ return 1
+ fi
+ _colcon_python_executable=`/usr/bin/env python3 -c "import sys; print(sys.executable)"`
+ fi
+# function to source another script with conditional trace output
+# first argument: the path of the script
+_colcon_prefix_sh_source_script() {
+ if [ -f "$1" ]; then
+ if [ -n "$COLCON_TRACE" ]; then
+ echo "# . \"$1\""
+ fi
+ . "$1"
+ else
+ echo "not found: \"$1\"" 1>&2
+ fi
+# get all commands in topological order
+_colcon_ordered_commands="$($_colcon_python_executable "$_colcon_prefix_zsh_COLCON_CURRENT_PREFIX/_local_setup_util_sh.py" sh zsh)"
+unset _colcon_python_executable
+if [ -n "$COLCON_TRACE" ]; then
+ echo "$(declare -f _colcon_prefix_sh_source_script)"
+ echo "# Execute generated script:"
+ echo "# <<<"
+ echo "${_colcon_ordered_commands}"
+ echo "# >>>"
+ echo "unset _colcon_prefix_sh_source_script"
+eval "${_colcon_ordered_commands}"
+unset _colcon_ordered_commands
+unset _colcon_prefix_sh_source_script
+unset _colcon_prefix_zsh_COLCON_CURRENT_PREFIX
+# EASY-INSTALL-ENTRY-SCRIPT: 'py-pubsub==0.0.0','console_scripts','camera_yolo'
+__requires__ = 'py-pubsub==0.0.0'
+import re
+import sys
+from pkg_resources import load_entry_point
+if __name__ == '__main__':
+ sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])
+ sys.exit(
+ load_entry_point('py-pubsub==0.0.0', 'console_scripts', 'camera_yolo')()
+ )
+# EASY-INSTALL-ENTRY-SCRIPT: 'py-pubsub==0.0.0','console_scripts','display'
+__requires__ = 'py-pubsub==0.0.0'
+import re
+import sys
+from pkg_resources import load_entry_point
+if __name__ == '__main__':
+ sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])
+ sys.exit(
+ load_entry_point('py-pubsub==0.0.0', 'console_scripts', 'display')()
+ )
+# EASY-INSTALL-ENTRY-SCRIPT: 'py-pubsub==0.0.0','console_scripts','listener'
+__requires__ = 'py-pubsub==0.0.0'
+import re
+import sys
+from pkg_resources import load_entry_point
+if __name__ == '__main__':
+ sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])
+ sys.exit(
+ load_entry_point('py-pubsub==0.0.0', 'console_scripts', 'listener')()
+ )
+# EASY-INSTALL-ENTRY-SCRIPT: 'py-pubsub==0.0.0','console_scripts','talker'
+__requires__ = 'py-pubsub==0.0.0'
+import re
+import sys
+from pkg_resources import load_entry_point
+if __name__ == '__main__':
+ sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])
+ sys.exit(
+ load_entry_point('py-pubsub==0.0.0', 'console_scripts', 'talker')()
+ )
+Metadata-Version: 1.2
+Name: py-pubsub
+Version: 0.0.0
+Summary: Examples of minimal publisher/subscriber using rclpy
+Home-page: UNKNOWN
+Maintainer: pge-2023
+Maintainer-email: pge-2023@todo.todo
+License: TODO: License declaration
+Description: UNKNOWN
+Platform: UNKNOWN
+camera_yolo = py_pubsub.camera_yolo:main
+display = py_pubsub.display:main
+import rclpy
+from rclpy.node import Node
+from sensor_msgs.msg import Image
+from cv_bridge import CvBridge
+import cv2
+import argparse
+from ultralytics import YOLO # YOLOv8 import
+from rclpy.utilities import remove_ros_args
+import sys
+class YOLODetector:
+ def __init__(self):
+ self.model = YOLO("yolov8s-pose.pt")
+ def compute(self, image):
+ if image is not None:
+ results = self.model.track(image, persist=True, tracker="bytetrack.yaml")
+ return results
+class MinimalPublisher(Node):
+ def __init__(self, camera_id):
+ super().__init__("minimal_publisher")
+ self.camera_id = camera_id
+ self.topic_name = f"annotated_images_{camera_id}"
+ self.publisher = self.create_publisher(Image, self.topic_name, 10)
+ self.subscription = self.create_subscription(
+ Image, f"Cam{camera_id}/image_raw", self.listener_callback, 10
+ )
+ self._cv_bridge = CvBridge()
+ self.detector = YOLODetector()
+ def listener_callback(self, image):
+ self.get_logger().info(f"Image received from Camera {self.camera_id}")
+ cv_image = cv2.cvtColor(
+ self._cv_bridge.imgmsg_to_cv2(image, desired_encoding="passthrough"),
+ )
+ results = self.detector.compute(cv_image)
+ if results:
+ annotated_image = results[0].plot()
+ msg = self._cv_bridge.cv2_to_imgmsg(annotated_image, "rgb8")
+ self.publisher.publish(msg)
+def main(args=None):
+ # Initialize ROS without passing args
+ rclpy.init()
+ # Create an argument parser for your script
+ parser = argparse.ArgumentParser(description="ROS 2 YOLO Object Detection Node")
+ # Add your custom argument
+ parser.add_argument("--cam", type=str, default="1", help="Camera identifier")
+ # Parse the command line arguments
+ custom_args = parser.parse_args()
+ # Create and spin your node
+ minimal_publisher = MinimalPublisher(custom_args.cam)
+ rclpy.spin(minimal_publisher)
+ # Shutdown and cleanup
+ minimal_publisher.destroy_node()
+ rclpy.shutdown()
+if __name__ == "__main__":
+ main()
+import rclpy
+from rclpy.node import Node
+from sensor_msgs.msg import Image
+from cv_bridge import CvBridge
+import cv2
+import argparse
+from rclpy.utilities import remove_ros_args
+import sys
+class ImageDisplayNode(Node):
+ def __init__(self, topic_name):
+ self.topic_name = topic_name
+ super().__init__("image_display_node")
+ self.subscription = self.create_subscription(
+ Image, self.topic_name, self.listener_callback, 10
+ )
+ self._cv_bridge = CvBridge()
+ def listener_callback(self, image):
+ cv_image = self._cv_bridge.imgmsg_to_cv2(image, desired_encoding="rgb8")
+ cv2.imshow(f"Annotated Image {self.topic_name}", cv_image)
+ cv2.waitKey(1)
+def main(args=None):
+ # Initialize ROS without passing args
+ rclpy.init()
+ # Create an argument parser for your script
+ parser = argparse.ArgumentParser(description="Image Display Node")
+ # Add your custom argument
+ parser.add_argument(
+ "--topic", type=str, default="1", required=True, help="Topic to subscribe to"
+ )
+ # Parse the command line arguments
+ custom_args = parser.parse_args()
+ # Create and spin your node
+ image_display_node = ImageDisplayNode(topic_name=custom_args.topic)
+ rclpy.spin(image_display_node)
+ # Shutdown and cleanup
+ image_display_node.destroy_node()
+ rclpy.shutdown()
+if __name__ == "__main__":
+ main()
+import rclpy
+from rclpy.node import Node
+from std_msgs.msg import String
+class MinimalPublisher(Node):
+ def __init__(self):
+ super().__init__('minimal_publisher')
+ self.publisher_ = self.create_publisher(String, 'topic', 10)
+ timer_period = 0.5 # seconds
+ self.timer = self.create_timer(timer_period, self.timer_callback)
+ self.i = 0
+ def timer_callback(self):
+ msg = String()
+ msg.data = 'Hello World: %d' % self.i
+ self.publisher_.publish(msg)
+ self.get_logger().info('Publishing: "%s"' % msg.data)
+ self.i += 1
+def main(args=None):
+ rclpy.init(args=args)
+ minimal_publisher = MinimalPublisher()
+ rclpy.spin(minimal_publisher)
+ # Destroy the node explicitly
+ # (optional - otherwise it will be done automatically
+ # when the garbage collector destroys the node object)
+ minimal_publisher.destroy_node()
+ rclpy.shutdown()
+if __name__ == '__main__':
+ main()
\ No newline at end of file
+import rclpy
+from rclpy.node import Node
+from sensor_msgs.msg import Image
+from cv_bridge import CvBridge
+import cv2
+import argparse
+from ultralytics import YOLO # YOLOv8 import
+from rclpy.utilities import remove_ros_args
+import sys
+class YOLODetector:
+ def __init__(self):
+ self.model = YOLO("yolov8s-pose.pt") # Initialize the YOLO model
+ def compute(self, image):
+ # Run YOLOv8 tracking on the image if it's valid
+ if image is not None:
+ results = self.model.track(image, persist=True, tracker="bytetrack.yaml")
+ return results
+class MinimalSubscriber(Node):
+ def __init__(self, camera_id="1"):
+ super().__init__("minimal_subscriber")
+ self.subscription = self.create_subscription(
+ Image, f"Cam{camera_id}/image_raw", self.listener_callback, 10
+ )
+ self._cv_bridge = CvBridge()
+ self.detector = YOLODetector()
+ def listener_callback(self, image):
+ self.get_logger().info("Image received")
+ cv_image = cv2.cvtColor(
+ self._cv_bridge.imgmsg_to_cv2(image, desired_encoding="passthrough"),
+ )
+ results = self.detector.compute(cv_image)
+ if results:
+ annotated_image = results[0].plot()
+ # Add code to display or process the annotated image
+def main(args=None):
+ # Separate ROS arguments from script arguments
+ ros_args = remove_ros_args(sys.argv)
+ rclpy.init(args=ros_args)
+ parser = argparse.ArgumentParser(description="ROS 2 YOLO Object Detection Node")
+ parser.add_argument("--cam", type=str, default="1", help="Camera identifier")
+ # Use parse_known_args to avoid error with unrecognized arguments
+ custom_args, unknown_args = parser.parse_known_args(args=remove_ros_args(sys.argv))
+ minimal_subscriber = MinimalSubscriber(custom_args.cam)
+ rclpy.spin(minimal_subscriber)
+ minimal_subscriber.destroy_node()
+ rclpy.shutdown()
+if __name__ == "__main__":
+ main()
+import cv2
+from ultralytics import YOLO # YOLOv8 import
+from PIL import Image
+class yolo:
+ def __init__(self):
+ self.model = YOLO("yolov8s-pose.pt")
+ def compute(self, image):
+ # Check if the image was successfully loaded
+ if image is not None:
+ # Run YOLOv8 tracking on the image
+ results = self.model.track(image, persist=True, tracker="bytetrack.yaml")
+ return results
+if __name__ == "__main__":
+ instance = yolo()
+ image = cv2.imread("test.jpg")
+ image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
+ results = instance.compute(image)
+ print(results)
+ # Visualize the results on the image
+ annotated_image = results[0].plot()
+ # Display the annotated image
+ cv2.imshow("YOLOv8 Tracking", annotated_image)
+ # Wait for a key press before closing the window
+ cv2.waitKey(0)
+ # Close the display window
+ cv2.destroyAllWindows()
\ No newline at end of file
+# generated from colcon_powershell/shell/template/hook_prepend_value.ps1.em
+colcon_prepend_unique_value AMENT_PREFIX_PATH "$env:COLCON_CURRENT_PREFIX"
+# generated from colcon_core/shell/template/hook_prepend_value.sh.em
+_colcon_prepend_unique_value AMENT_PREFIX_PATH "$COLCON_CURRENT_PREFIX"
+# generated from colcon_powershell/shell/template/hook_prepend_value.ps1.em
+colcon_prepend_unique_value PYTHONPATH "$env:COLCON_CURRENT_PREFIX\lib/python3.8/site-packages"
+# generated from colcon_core/shell/template/hook_prepend_value.sh.em
+_colcon_prepend_unique_value PYTHONPATH "$COLCON_CURRENT_PREFIX/lib/python3.8/site-packages"
+# generated from colcon_bash/shell/template/package.bash.em
+# This script extends the environment for this package.
+# a bash script is able to determine its own path if necessary
+if [ -z "$COLCON_CURRENT_PREFIX" ]; then
+ # the prefix is two levels up from the package specific share directory
+ _colcon_package_bash_COLCON_CURRENT_PREFIX="$(builtin cd "`dirname "${BASH_SOURCE[0]}"`/../.." > /dev/null && pwd)"
+# function to source another script with conditional trace output
+# first argument: the path of the script
+# additional arguments: arguments to the script
+_colcon_package_bash_source_script() {
+ if [ -f "$1" ]; then
+ if [ -n "$COLCON_TRACE" ]; then
+ echo "# . \"$1\""
+ fi
+ . "$@"
+ else
+ echo "not found: \"$1\"" 1>&2
+ fi
+# source sh script of this package
+_colcon_package_bash_source_script "$_colcon_package_bash_COLCON_CURRENT_PREFIX/share/py_pubsub/package.sh"
+unset _colcon_package_bash_source_script
+unset _colcon_package_bash_COLCON_CURRENT_PREFIX
+# generated from colcon_powershell/shell/template/package.ps1.em
+# function to append a value to a variable
+# which uses colons as separators
+# duplicates as well as leading separators are avoided
+# first argument: the name of the result variable
+# second argument: the value to be prepended
+function colcon_append_unique_value {
+ param (
+ $_listname,
+ $_value
+ )
+ # get values from variable
+ if (Test-Path Env:$_listname) {
+ $_values=(Get-Item env:$_listname).Value
+ } else {
+ $_values=""
+ }
+ $_duplicate=""
+ # start with no values
+ $_all_values=""
+ # iterate over existing values in the variable
+ if ($_values) {
+ $_values.Split(";") | ForEach {
+ # not an empty string
+ if ($_) {
+ # not a duplicate of _value
+ if ($_ -eq $_value) {
+ $_duplicate="1"
+ }
+ if ($_all_values) {
+ $_all_values="${_all_values};$_"
+ } else {
+ $_all_values="$_"
+ }
+ }
+ }
+ }
+ # append only non-duplicates
+ if (!$_duplicate) {
+ # avoid leading separator
+ if ($_all_values) {
+ $_all_values="${_all_values};${_value}"
+ } else {
+ $_all_values="${_value}"
+ }
+ }
+ # export the updated variable
+ Set-Item env:\$_listname -Value "$_all_values"
+# function to prepend a value to a variable
+# which uses colons as separators
+# duplicates as well as trailing separators are avoided
+# first argument: the name of the result variable
+# second argument: the value to be prepended
+function colcon_prepend_unique_value {
+ param (
+ $_listname,
+ $_value
+ )
+ # get values from variable
+ if (Test-Path Env:$_listname) {
+ $_values=(Get-Item env:$_listname).Value
+ } else {
+ $_values=""
+ }
+ # start with the new value
+ $_all_values="$_value"
+ # iterate over existing values in the variable
+ if ($_values) {
+ $_values.Split(";") | ForEach {
+ # not an empty string
+ if ($_) {
+ # not a duplicate of _value
+ if ($_ -ne $_value) {
+ # keep non-duplicate values
+ $_all_values="${_all_values};$_"
+ }
+ }
+ }
+ }
+ # export the updated variable
+ Set-Item env:\$_listname -Value "$_all_values"
+# function to source another script with conditional trace output
+# first argument: the path of the script
+# additional arguments: arguments to the script
+function colcon_package_source_powershell_script {
+ param (
+ $_colcon_package_source_powershell_script
+ )
+ # source script with conditional trace output
+ if (Test-Path $_colcon_package_source_powershell_script) {
+ if ($env:COLCON_TRACE) {
+ echo ". '$_colcon_package_source_powershell_script'"
+ }
+ . "$_colcon_package_source_powershell_script"
+ } else {
+ Write-Error "not found: '$_colcon_package_source_powershell_script'"
+ }
+# a powershell script is able to determine its own path
+# the prefix is two levels up from the package specific share directory
+$env:COLCON_CURRENT_PREFIX=(Get-Item $PSCommandPath).Directory.Parent.Parent.FullName
+colcon_package_source_powershell_script "$env:COLCON_CURRENT_PREFIX\share/py_pubsub/hook/pythonpath.ps1"
+colcon_package_source_powershell_script "$env:COLCON_CURRENT_PREFIX\share/py_pubsub/hook/ament_prefix_path.ps1"
+# generated from colcon_core/shell/template/package.sh.em
+# This script extends the environment for this package.
+# function to prepend a value to a variable
+# which uses colons as separators
+# duplicates as well as trailing separators are avoided
+# first argument: the name of the result variable
+# second argument: the value to be prepended
+_colcon_prepend_unique_value() {
+ # arguments
+ _listname="$1"
+ _value="$2"
+ # get values from variable
+ eval _values=\"\$$_listname\"
+ # backup the field separator
+ _colcon_prepend_unique_value_IFS=$IFS
+ IFS=":"
+ # start with the new value
+ _all_values="$_value"
+ # workaround SH_WORD_SPLIT not being set in zsh
+ if [ "$(command -v colcon_zsh_convert_to_array)" ]; then
+ colcon_zsh_convert_to_array _values
+ fi
+ # iterate over existing values in the variable
+ for _item in $_values; do
+ # ignore empty strings
+ if [ -z "$_item" ]; then
+ continue
+ fi
+ # ignore duplicates of _value
+ if [ "$_item" = "$_value" ]; then
+ continue
+ fi
+ # keep non-duplicate values
+ _all_values="$_all_values:$_item"
+ done
+ unset _item
+ # restore the field separator
+ IFS=$_colcon_prepend_unique_value_IFS
+ unset _colcon_prepend_unique_value_IFS
+ # export the updated variable
+ eval export $_listname=\"$_all_values\"
+ unset _all_values
+ unset _values
+ unset _value
+ unset _listname
+# since a plain shell script can't determine its own path when being sourced
+# either use the provided COLCON_CURRENT_PREFIX
+# or fall back to the build time prefix (if it exists)
+if [ -z "$COLCON_CURRENT_PREFIX" ]; then
+ if [ ! -d "$_colcon_package_sh_COLCON_CURRENT_PREFIX" ]; then
+ echo "The build time path \"$_colcon_package_sh_COLCON_CURRENT_PREFIX\" doesn't exist. Either source a script for a different shell or set the environment variable \"COLCON_CURRENT_PREFIX\" explicitly." 1>&2
+ unset _colcon_package_sh_COLCON_CURRENT_PREFIX
+ return 1
+ fi
+unset _colcon_package_sh_COLCON_CURRENT_PREFIX
+# function to source another script with conditional trace output
+# first argument: the path of the script
+# additional arguments: arguments to the script
+_colcon_package_sh_source_script() {
+ if [ -f "$1" ]; then
+ if [ -n "$COLCON_TRACE" ]; then
+ echo "# . \"$1\""
+ fi
+ . "$@"
+ else
+ echo "not found: \"$1\"" 1>&2
+ fi
+# source sh hooks
+_colcon_package_sh_source_script "$COLCON_CURRENT_PREFIX/share/py_pubsub/hook/pythonpath.sh"
+_colcon_package_sh_source_script "$COLCON_CURRENT_PREFIX/share/py_pubsub/hook/ament_prefix_path.sh"
+unset _colcon_package_sh_source_script
+# do not unset _colcon_prepend_unique_value since it might be used by non-primary shell hooks
+ py_pubsub
+ 0.0.0
+ Examples of minimal publisher/subscriber using rclpy
+ pge-2023
+ TODO: License declaration
+ rclpy
+ std_msgs
+ ament_copyright
+ ament_flake8
+ ament_pep257
+ python3-pytest
+ ament_python
+# generated from colcon_zsh/shell/template/package.zsh.em
+# This script extends the environment for this package.
+# a zsh script is able to determine its own path if necessary
+if [ -z "$COLCON_CURRENT_PREFIX" ]; then
+ # the prefix is two levels up from the package specific share directory
+ _colcon_package_zsh_COLCON_CURRENT_PREFIX="$(builtin cd -q "`dirname "${(%):-%N}"`/../.." > /dev/null && pwd)"
+# function to source another script with conditional trace output
+# first argument: the path of the script
+# additional arguments: arguments to the script
+_colcon_package_zsh_source_script() {
+ if [ -f "$1" ]; then
+ if [ -n "$COLCON_TRACE" ]; then
+ echo "# . \"$1\""
+ fi
+ . "$@"
+ else
+ echo "not found: \"$1\"" 1>&2
+ fi
+# function to convert array-like strings into arrays
+# to workaround SH_WORD_SPLIT not being set
+colcon_zsh_convert_to_array() {
+ local _listname=$1
+ local _dollar="$"
+ local _split="{="
+ local _to_array="(\"$_dollar$_split$_listname}\")"
+ eval $_listname=$_to_array
+# source sh script of this package
+_colcon_package_zsh_source_script "$_colcon_package_zsh_COLCON_CURRENT_PREFIX/share/py_pubsub/package.sh"
+unset convert_zsh_to_array
+unset _colcon_package_zsh_source_script
+unset _colcon_package_zsh_COLCON_CURRENT_PREFIX
+# generated from colcon_bash/shell/template/prefix_chain.bash.em
+# This script extends the environment with the environment of other prefix
+# paths which were sourced when this file was generated as well as all packages
+# contained in this prefix path.
+# function to source another script with conditional trace output
+# first argument: the path of the script
+_colcon_prefix_chain_bash_source_script() {
+ if [ -f "$1" ]; then
+ if [ -n "$COLCON_TRACE" ]; then
+ echo "# . \"$1\""
+ fi
+ . "$1"
+ else
+ echo "not found: \"$1\"" 1>&2
+ fi
+# source chained prefixes
+# setting COLCON_CURRENT_PREFIX avoids determining the prefix in the sourced script
+_colcon_prefix_chain_bash_source_script "$COLCON_CURRENT_PREFIX/local_setup.bash"
+# source this prefix
+# setting COLCON_CURRENT_PREFIX avoids determining the prefix in the sourced script
+COLCON_CURRENT_PREFIX="$(builtin cd "`dirname "${BASH_SOURCE[0]}"`" > /dev/null && pwd)"
+_colcon_prefix_chain_bash_source_script "$COLCON_CURRENT_PREFIX/local_setup.bash"
+unset _colcon_prefix_chain_bash_source_script
+# generated from colcon_powershell/shell/template/prefix_chain.ps1.em
+# This script extends the environment with the environment of other prefix
+# paths which were sourced when this file was generated as well as all packages
+# contained in this prefix path.
+# function to source another script with conditional trace output
+# first argument: the path of the script
+function _colcon_prefix_chain_powershell_source_script {
+ param (
+ $_colcon_prefix_chain_powershell_source_script_param
+ )
+ # source script with conditional trace output
+ if (Test-Path $_colcon_prefix_chain_powershell_source_script_param) {
+ if ($env:COLCON_TRACE) {
+ echo ". '$_colcon_prefix_chain_powershell_source_script_param'"
+ }
+ . "$_colcon_prefix_chain_powershell_source_script_param"
+ } else {
+ Write-Error "not found: '$_colcon_prefix_chain_powershell_source_script_param'"
+ }
+# source chained prefixes
+_colcon_prefix_chain_powershell_source_script "/opt/ros/foxy\local_setup.ps1"
+# source this prefix
+$env:COLCON_CURRENT_PREFIX=(Split-Path $PSCommandPath -Parent)
+_colcon_prefix_chain_powershell_source_script "$env:COLCON_CURRENT_PREFIX\local_setup.ps1"
+# generated from colcon_core/shell/template/prefix_chain.sh.em
+# This script extends the environment with the environment of other prefix
+# paths which were sourced when this file was generated as well as all packages
+# contained in this prefix path.
+# since a plain shell script can't determine its own path when being sourced
+# either use the provided COLCON_CURRENT_PREFIX
+# or fall back to the build time prefix (if it exists)
+if [ ! -z "$COLCON_CURRENT_PREFIX" ]; then
+elif [ ! -d "$_colcon_prefix_chain_sh_COLCON_CURRENT_PREFIX" ]; then
+ echo "The build time path \"$_colcon_prefix_chain_sh_COLCON_CURRENT_PREFIX\" doesn't exist. Either source a script for a different shell or set the environment variable \"COLCON_CURRENT_PREFIX\" explicitly." 1>&2
+ unset _colcon_prefix_chain_sh_COLCON_CURRENT_PREFIX
+ return 1
+# function to source another script with conditional trace output
+# first argument: the path of the script
+_colcon_prefix_chain_sh_source_script() {
+ if [ -f "$1" ]; then
+ if [ -n "$COLCON_TRACE" ]; then
+ echo "# . \"$1\""
+ fi
+ . "$1"
+ else
+ echo "not found: \"$1\"" 1>&2
+ fi
+# source chained prefixes
+# setting COLCON_CURRENT_PREFIX avoids relying on the build time prefix of the sourced script
+_colcon_prefix_chain_sh_source_script "$COLCON_CURRENT_PREFIX/local_setup.sh"
+# source this prefix
+# setting COLCON_CURRENT_PREFIX avoids relying on the build time prefix of the sourced script
+_colcon_prefix_chain_sh_source_script "$COLCON_CURRENT_PREFIX/local_setup.sh"
+unset _colcon_prefix_chain_sh_COLCON_CURRENT_PREFIX
+unset _colcon_prefix_chain_sh_source_script
+# generated from colcon_zsh/shell/template/prefix_chain.zsh.em
+# This script extends the environment with the environment of other prefix
+# paths which were sourced when this file was generated as well as all packages
+# contained in this prefix path.
+# function to source another script with conditional trace output
+# first argument: the path of the script
+_colcon_prefix_chain_zsh_source_script() {
+ if [ -f "$1" ]; then
+ if [ -n "$COLCON_TRACE" ]; then
+ echo "# . \"$1\""
+ fi
+ . "$1"
+ else
+ echo "not found: \"$1\"" 1>&2
+ fi
+# source chained prefixes
+# setting COLCON_CURRENT_PREFIX avoids determining the prefix in the sourced script
+_colcon_prefix_chain_zsh_source_script "$COLCON_CURRENT_PREFIX/local_setup.zsh"
+# source this prefix
+# setting COLCON_CURRENT_PREFIX avoids determining the prefix in the sourced script
+COLCON_CURRENT_PREFIX="$(builtin cd -q "`dirname "${(%):-%N}"`" > /dev/null && pwd)"
+_colcon_prefix_chain_zsh_source_script "$COLCON_CURRENT_PREFIX/local_setup.zsh"
+unset _colcon_prefix_chain_zsh_source_script
+# Charger l'environnement ROS
+source install/setup.bash
+# Construire le package spécifié
+colcon build --packages-select py_pubsub
+# Exécuter le noeud ROS avec l'argument de la caméra passé au script
+echo "$cam_id"
+# Exécuter camera_yolo avec l'identifiant de la caméra
+ros2 run py_pubsub camera_yolo --cam $cam_id
\ No newline at end of file
+# Charger l'environnement ROS
+source install/setup.bash
+# Construire le package spécifié
+colcon build --packages-select py_pubsub
+# Exécuter le noeud ROS avec l'argument de la caméra passé au script
+# Exécuter display avec l'identifiant de la caméra
+ros2 run py_pubsub display --topic "$topic_name"
\ No newline at end of file
+ py_pubsub
+ 0.0.0
+ Examples of minimal publisher/subscriber using rclpy
+ pge-2023
+ TODO: License declaration
+ rclpy
+ std_msgs
+ ament_copyright
+ ament_flake8
+ ament_pep257
+ python3-pytest
+ ament_python
+import rclpy
+from rclpy.node import Node
+from sensor_msgs.msg import Image
+from cv_bridge import CvBridge
+import cv2
+import argparse
+from ultralytics import YOLO # YOLOv8 import
+from ultralytics.utils import LOGGER # LOGGER import
+from rclpy.utilities import remove_ros_args
+import sys
+class YOLODetector:
+ def __init__(self):
+ self.model = YOLO("yolov8s-pose.pt")
+ def compute(self, image):
+ if image is not None:
+ results = self.model.track(image, persist=True, tracker="bytetrack.yaml")
+ return results
+class MinimalPublisher(Node):
+ def __init__(self, camera_id):
+ super().__init__("minimal_publisher")
+ self.camera_id = camera_id
+ self.topic_name = f"annotated_images_{camera_id}"
+ self.publisher = self.create_publisher(Image, self.topic_name, 10)
+ self.subscription = self.create_subscription(
+ Image, f"Cam{camera_id}/image_raw", self.listener_callback, 10
+ )
+ self._cv_bridge = CvBridge()
+ self.detector = YOLODetector()
+ def toData(self, result, keypoint=False):
+ """Convert the object to JSON format."""
+ if result.probs is not None:
+ LOGGER.warning("Warning: Classify task do not support tojson yet.")
+ return
+ # Create list of detection dictionaries
+ resultat = []
+ data = result.boxes.data.cpu().tolist()
+ h, w = (1, 1)
+ for i, row in enumerate(data): # xyxy, track_id if tracking, conf, class_id
+ box = {
+ "x1": row[0] / w,
+ "y1": row[1] / h,
+ "x2": row[2] / w,
+ "y2": row[3] / h,
+ }
+ class_id = int(row[-1])
+ name = result.names[class_id]
+ results = {"name": name, "box": box}
+ if result.boxes.is_track:
+ results["track_id"] = int(row[-3]) # track ID
+ if result.masks:
+ # numpy array
+ x, y = result.masks.xy[i][:, 0], result.masks.xy[i][:, 1]
+ results["segments"] = {"x": (x / w).tolist(), "y": (y / h).tolist()}
+ if keypoint:
+ x, y, visible = (
+ result.keypoints[i].data[0].cpu().unbind(dim=1)
+ ) # torch Tensor
+ results["keypoints"] = {
+ "x": (x / w).tolist(),
+ "y": (y / h).tolist(),
+ "visible": visible.tolist(),
+ }
+ resultat.append(results)
+ # Convert detections to JSON
+ return resultat
+ def listener_callback(self, image):
+ self.get_logger().info(f"Image received from Camera {self.camera_id}")
+ cv_image = cv2.cvtColor(
+ self._cv_bridge.imgmsg_to_cv2(image, desired_encoding="passthrough"),
+ )
+ results = self.detector.compute(cv_image)
+ if results:
+ annotated_image = results[0].plot()
+ msg = self._cv_bridge.cv2_to_imgmsg(annotated_image, "rgb8")
+ self.publisher.publish(msg)
+def main(args=None):
+ # Initialize ROS without passing args
+ rclpy.init()
+ # Create an argument parser for your script
+ parser = argparse.ArgumentParser(description="ROS 2 YOLO Object Detection Node")
+ # Add your custom argument
+ parser.add_argument("--cam", type=str, default="1", help="Camera identifier")
+ # Parse the command line arguments
+ custom_args = parser.parse_args()
+ # Create and spin your node
+ minimal_publisher = MinimalPublisher(custom_args.cam)
+ rclpy.spin(minimal_publisher)
+ # Shutdown and cleanup
+ minimal_publisher.destroy_node()
+ rclpy.shutdown()
+if __name__ == "__main__":
+ main()
+import rclpy
+from rclpy.node import Node
+from sensor_msgs.msg import Image
+from cv_bridge import CvBridge
+import cv2
+import argparse
+from rclpy.utilities import remove_ros_args
+import sys
+class ImageDisplayNode(Node):
+ def __init__(self, topic_name):
+ self.topic_name = topic_name
+ super().__init__("image_display_node")
+ self.subscription = self.create_subscription(
+ Image, self.topic_name, self.listener_callback, 10
+ )
+ self._cv_bridge = CvBridge()
+ def listener_callback(self, image):
+ cv_image = self._cv_bridge.imgmsg_to_cv2(image, desired_encoding="rgb8")
+ cv2.imshow(f"Annotated Image {self.topic_name}", cv_image)
+ cv2.waitKey(1)
+def main(args=None):
+ # Initialize ROS without passing args
+ rclpy.init()
+ # Create an argument parser for your script
+ parser = argparse.ArgumentParser(description="Image Display Node")
+ # Add your custom argument
+ parser.add_argument(
+ "--topic", type=str, default="1", required=True, help="Topic to subscribe to"
+ )
+ # Parse the command line arguments
+ custom_args = parser.parse_args()
+ # Create and spin your node
+ image_display_node = ImageDisplayNode(topic_name=custom_args.topic)
+ rclpy.spin(image_display_node)
+ # Shutdown and cleanup
+ image_display_node.destroy_node()
+ rclpy.shutdown()
+if __name__ == "__main__":
+ main()
+from setuptools import setup
+package_name = "py_pubsub"
+ name=package_name,
+ version="0.0.0",
+ packages=[package_name],
+ data_files=[
+ ("share/ament_index/resource_index/packages", ["resource/" + package_name]),
+ ("share/" + package_name, ["package.xml"]),
+ ],
+ install_requires=["setuptools"],
+ zip_safe=True,
+ maintainer="pge-2023",
+ maintainer_email="pge-2023@todo.todo",
+ description="Examples of minimal publisher/subscriber using rclpy",
+ license="TODO: License declaration",
+ tests_require=["pytest"],
+ entry_points={
+ "console_scripts": [
+ "display = py_pubsub.display:main",
+ "camera_yolo = py_pubsub.camera_yolo:main",
+ ],
+ },
+# Copyright 2015 Open Source Robotics Foundation, Inc.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ament_copyright.main import main
+import pytest
+def test_copyright():
+ rc = main(argv=['.', 'test'])
+ assert rc == 0, 'Found errors'
+# Copyright 2017 Open Source Robotics Foundation, Inc.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ament_flake8.main import main_with_errors
+import pytest
+def test_flake8():
+ rc, errors = main_with_errors(argv=[])
+ assert rc == 0, \
+ 'Found %d code style errors / warnings:\n' % len(errors) + \
+ '\n'.join(errors)
+# Copyright 2015 Open Source Robotics Foundation, Inc.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ament_pep257.main import main
+import pytest
+def test_pep257():
+ rc = main(argv=['.', 'test'])
+ assert rc == 0, 'Found code style errors / warnings'