Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue bandit scheduler #489

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

Tekexa
Copy link

@Tekexa Tekexa commented Dec 6, 2024

Description

Fixes the BanditScheduler class because it could sometimes have unexpected number of active emitters.

Done

  • Added a check that enough emitters are active (before PR: only checked if there was 0 active emitters)
  • Added a check that there are not too many active emitters
  • Both checks comply with the BanditScheduler's expected behaviour, i.e.:
    - activate emitters that were never used, if there are none, those that are expected to perform better
    - deactivate least performing emitters first

Questions

  • I am quite sure this is not the most efficient way to do this, which would slow down any use of the BanditScheduler. I would appreciate any feedback on how to improve this.
  • I added a test, but it seems more complicated than other tests of the file. Should I put it somewhere else?

Status

  • I have read the guidelines in
    CONTRIBUTING.md
  • I have formatted my code using yapf
  • I have tested my code by running pytest
  • I have linted my code with pylint
  • I have added a one-line description of my change to the changelog in
    HISTORY.md
  • This PR is ready to go

@Tekexa Tekexa marked this pull request as ready for review December 6, 2024 22:07
Copy link
Member

@btjanaka btjanaka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Tekexa, thank you for sending in this PR! I really appreciate it when people give feedback on pyribs.

I am taking a look and have some questions. My primary question is, do you have an example where the number of active emitters is unstable? It seems to me that every time an emitter needs to restart, we simply mark it for reselection in BanditScheduler.ask. If we always do this, shouldn't the number of active emitters always be the same?

Please keep in mind it has been a long time since I looked at this portion of pyribs, so I may need a bit more help understanding your issue. Thanks!

ribs/schedulers/_bandit_scheduler.py Outdated Show resolved Hide resolved
# If no emitters are active, activate the first num_active.
if not self._active_arr.any():
# If not enough emitters are active, activate the first num_active.
diff = self._num_active - self._active_arr.sum()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make these names more descriptive, like num_needed and active_i?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand where you expect active_i.
No problem for `num_needed, you're right it's more explicit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I meant that i would be renamed to active_i. Makes more sense to me since i is a common name and it's going to be available after the loop is done, so it would be easy for it to get confused in later pieces of code.

@Tekexa
Copy link
Author

Tekexa commented Dec 7, 2024

To be honest this is also a fix I did months ago because I ran into the issue. I was not sure about my fix at the time, so that’s why I am proposing one only now.

It happened when I had both classic MAP-Elites and CMA-ES emitters in the emitters pool, so I think it is because these 2 have different periods of activity. It also has to do with the fact that the code initially checks if there is absolutely 0 active emitter, but there could be some active emitters, just not enough of them.

I should be able to provide an example to reproduce next week.

@btjanaka
Copy link
Member

btjanaka commented Dec 7, 2024

That's weird because this line in ask seems to check for emitters like GaussianEmitter that do not have a restarts attribute and thus can be considered as "always restarting."

I'll take a look at the example when it's ready. Thanks!

@Tekexa
Copy link
Author

Tekexa commented Dec 10, 2024

[EDIT]: See last comment for the up to date fix in the example
Here is an example where the number of active emitters is not the stable without my fix, and it is stable with my fix.
I am sorry this is long, I tried to make it as simple as possible.
I just found out that there is still a problem even with my fix, because if you comment the GaussienEmitter in emitters in the example I provide, the number of active emitters is not stable in both version of BanditScheduler. I am working on it and will update if I find a solution.

import numpy as np
import tqdm

from ribs.archives import GridArchive
from ribs.schedulers import BanditScheduler
from ribs.emitters import EvolutionStrategyEmitter, GaussianEmitter

class DebugBanditScheduler(BanditScheduler):
    def ask(self):
        """Generates a batch of solutions by calling ask() on all active
        emitters.

        The emitters used by ask are determined by the UCB1 algorithm. Briefly,
        emitters that have never been selected before are prioritized, then
        emitters are sorted in descending order based on the accurary of their
        past prediction.

        .. note:: The order of the solutions returned from this method is
            important, so do not rearrange them.

        Returns:
            (batch_size, dim) array: An array of n solutions to evaluate. Each
            row contains a single solution.
        Raises:
            RuntimeError: This method was called immediately after calling an
                ask method.
        """
        if self._last_called == "ask":
            raise RuntimeError("ask cannot be called immediately after " +
                               self._last_called)
        self._last_called = "ask"

        if self._reselect == "terminated":
            # Reselect terminated emitters. Emitters are terminated if their
            # restarts attribute have incremented.
            emitter_restarts = np.array([
                emitter.restarts if hasattr(emitter, "restarts") else -1
                for emitter in self._emitter_pool
            ])
            reselect = emitter_restarts > self._restarts

            # If the emitter does not have "restarts" attribute, assume it
            # restarts every iteration.
            reselect[emitter_restarts < 0] = True

            self._restarts = emitter_restarts
        else:
            # Reselect all emitters.
            reselect = self._active_arr.copy()

        # If not enough emitters are active, activate the first num_active.
        nb_activate= self._num_active - self._active_arr.sum()
        i = 0
        while nb_activate> 0:
            reselect[i] = False
            if not self._active_arr[i]:
                self._active_arr[i] = True
                nb_activate-= 1
            i += 1

        # Deactivate emitters to be reselected.
        self._active_arr[reselect] = False

        # Select emitters based on the UCB1 formula.
        # The ranking of emitters also follows these rules:
        # - Emitters that have never been selected are prioritized.
        # - If reselect is "terminated", then only active emitters that have
        #   terminated/restarted will be reselected. Otherwise, if reselect is
        #   "all", then all emitters are reselected.
        if reselect.any():
            ucb1 = np.full_like(
                self._emitter_pool, np.inf
            )  # np.inf forces to select emitters that were not yet selected
            update_ucb = self._selection != 0
            if update_ucb.any():
                ucb1[update_ucb] = (
                    self._success[update_ucb] / self._selection[update_ucb] +
                    self._zeta * np.sqrt(
                        np.log(self._success.sum()) /
                        self._selection[update_ucb]))
            # Activate top emitters based on UCB1.
            activate = np.argsort(ucb1)[-reselect.sum():]
            self._active_arr[activate] = True

            # Deactivate emitters if there are too many active emitters.
            nb_deactivate = self.emitters.sum() - self._num_active
            deactivate = np.argsort(ucb1)
            for i in deactivate:
                if nb_deactivate == 0:
                    break
                if self._active_arr[i]:
                    self._active_arr[i] = False
                    nb_deactivate -= 1

        self._cur_solutions = []

        for i in np.where(self._active_arr)[0]:
            emitter = self._emitter_pool[i]
            emitter_sols = emitter.ask()
            self._cur_solutions.append(emitter_sols)
            self._num_emitted[i] = len(emitter_sols)

        # In case the emitters didn't return any solutions.
        self._cur_solutions = np.concatenate(
            self._cur_solutions, axis=0) if self._cur_solutions else np.empty(
                (0, self._solution_dim))
        return self._cur_solutions

# set the random seed
np.random.seed(42)

nb_variables = 7
nb_emitters = 5
nb_generations = 100
nb_dimensions = 2

archives = [
    GridArchive(
        solution_dim = nb_variables,
        dims = [10] * nb_dimensions,
        ranges = [(-2, 2)] * nb_dimensions
    ) for i in range(2)
]

me_map_elites_archives = archives[0]
me_map_elites_archives_debug = archives[1]


emitters_me_map_elites = [  # Create a pool of different emitters
    EvolutionStrategyEmitter(
        archive = me_map_elites_archives,
        x0 = [0.5] * nb_variables,  # This is arbitrary
        sigma0 = 0.3,
        ranker = "2imp",
    ) for i in range(nb_emitters * 2)
] + [
    EvolutionStrategyEmitter(
        archive = me_map_elites_archives,
        x0 = [0.5] * nb_variables,  # This is arbitrary
        sigma0 = 0.3,
        ranker = "2imp",
    ) for i in range(nb_emitters * 2)
] + [
        GaussianEmitter(
        archive = me_map_elites_archives,
        x0 = [0.5] * nb_variables,
        sigma = 0.3,
    ) for i in range(nb_emitters * 2)
]


emitters_me_map_elites_debug = \
[  # Create a pool of different emitters
    EvolutionStrategyEmitter(
        archive = me_map_elites_archives_debug,
        x0 = [0.5] * nb_variables,  # This is arbitrary
        sigma0 = 0.3,
        ranker = "2imp",
    ) for i in range(nb_emitters * 2)
] + [
    EvolutionStrategyEmitter(
        archive = me_map_elites_archives_debug,
        x0 = [0.5] * nb_variables,  # This is arbitrary
        sigma0 = 0.3,
        ranker = "2imp",
    ) for i in range(nb_emitters * 2)
] +  \
[
        GaussianEmitter(
        archive = me_map_elites_archives_debug,
        x0 = [0.5] * nb_variables,
        sigma = 0.3,
    ) for i in range(nb_emitters * 2)
]

emitter_calls = np.array([0] * len(emitters_me_map_elites))

scheduler_me_map_elites = BanditScheduler(
    archive = me_map_elites_archives,
    emitter_pool = emitters_me_map_elites,
    num_active = nb_emitters
)

scheduler_me_map_elites_debug = DebugBanditScheduler(
    archive = me_map_elites_archives_debug,
    emitter_pool = emitters_me_map_elites_debug,
    num_active = nb_emitters
)

print("Starting ME-MAP-Elites")
for itr in tqdm.trange(1, nb_generations + 1):
    if itr % 10 == 0:
        print(f"Iteration {itr}/{nb_generations}")
    
    # === CLASSIC ME-MAP-Elites ===
    sols = scheduler_me_map_elites.ask()
    
    obj_array = np.random.rand(len(sols))
    feat_array = np.random.rand(len(sols), nb_dimensions)
    
    scheduler_me_map_elites.tell(obj_array, feat_array)

    if np.sum(scheduler_me_map_elites._active_arr) != nb_emitters:
        print("Classic - Nb. active emitters: "
              f"{np.sum(scheduler_me_map_elites._active_arr)}"
              f" instead of {nb_emitters}"
              f" at iteration {itr}")
    
    # === DEBUG ME-MAP-Elites ===
    sols_debug = scheduler_me_map_elites_debug.ask()

    obj_array = np.random.rand(len(sols_debug))
    feat_array = np.random.rand(len(sols_debug), nb_dimensions)

    scheduler_me_map_elites_debug.tell(obj_array, feat_array)

    if np.sum(scheduler_me_map_elites_debug._active_arr) != nb_emitters:
        print("Debug - Nb. active emitters: "
              f"{np.sum(scheduler_me_map_elites._active_arr)}"
              f" instead of {nb_emitters}"
              f" at iteration {itr}")

@Tekexa Tekexa force-pushed the fix-issue-bandit-scheduler branch from 21d9bc6 to 4329d4a Compare December 10, 2024 21:43
@Tekexa
Copy link
Author

Tekexa commented Dec 10, 2024

Now, everything works as intended. Below, I provide the updated example to make sure everything works. This examples shows that the fixes help when:

  • The emitter pool contains only EvolutionStrategyEmitter emitters, the previous version would not activate enough emitters (you need to comment the GaussianEmitter lines to illustrate this)
  • The emitter pool contain both EvolutionStrategyEmitter and GaussianEmitter emitters, the previous version would activate too much of them
import numpy as np
import tqdm

from ribs.archives import GridArchive
from ribs.schedulers import BanditScheduler
from ribs.emitters import EvolutionStrategyEmitter, GaussianEmitter

class DebugBanditScheduler(BanditScheduler):
    def ask(self):
        """Generates a batch of solutions by calling ask() on all active
        emitters.

        The emitters used by ask are determined by the UCB1 algorithm. Briefly,
        emitters that have never been selected before are prioritized, then
        emitters are sorted in descending order based on the accurary of their
        past prediction.

        .. note:: The order of the solutions returned from this method is
            important, so do not rearrange them.

        Returns:
            (batch_size, dim) array: An array of n solutions to evaluate. Each
            row contains a single solution.
        Raises:
            RuntimeError: This method was called immediately after calling an
                ask method.
        """
        if self._last_called == "ask":
            raise RuntimeError("ask cannot be called immediately after " +
                               self._last_called)
        self._last_called = "ask"

        if self._reselect == "terminated":
            # Reselect terminated emitters. Emitters are terminated if their
            # restarts attribute have incremented.
            emitter_restarts = np.array([
                emitter.restarts if hasattr(emitter, "restarts") else -1
                for emitter in self._emitter_pool
            ])
            reselect = emitter_restarts > self._restarts

            # If the emitter does not have "restarts" attribute, assume it
            # restarts every iteration.
            reselect[emitter_restarts < 0] = True

            self._restarts = emitter_restarts
        else:
            # Reselect all emitters.
            reselect = self._active_arr.copy()

        # If not enough emitters are active, activate the first num_active.
        # This always happens on the first iteration(s).
        num_needed = self._num_active - self._active_arr.sum()
        i = 0
        while num_needed > 0:
            reselect[i] = False
            if not self._active_arr[i]:
                self._active_arr[i] = True
                num_needed -= 1
            i += 1

        # Deactivate emitters to be reselected.
        self._active_arr[reselect] = False

        # Select emitters based on the UCB1 formula.
        # The ranking of emitters also follows these rules:
        # - Emitters that have never been selected are prioritized.
        # - If reselect is "terminated", then only active emitters that have
        #   terminated/restarted will be reselected. Otherwise, if reselect is
        #   "all", then all emitters are reselected.
        if reselect.any():
            ucb1 = np.full_like(
                self._emitter_pool, np.inf
            )  # np.inf forces to select emitters that were not yet selected
            update_ucb = self._selection != 0
            if update_ucb.any():
                ucb1[update_ucb] = (
                    self._success[update_ucb] / self._selection[update_ucb] +
                    self._zeta * np.sqrt(
                        np.log(self._success.sum()) /
                        self._selection[update_ucb]))
            # Activate top emitters based on UCB1, until there are num_active
            # active emitters. Activate only inactive emitters.
            activate = np.argsort(ucb1)[::-1]
            for i in activate:
                if self._active_arr.sum() == self._num_active:
                    break
                if not self._active_arr[i]:
                    self._active_arr[i] = True

        self._cur_solutions = []

        for i in np.where(self._active_arr)[0]:
            emitter = self._emitter_pool[i]
            emitter_sols = emitter.ask()
            self._cur_solutions.append(emitter_sols)
            self._num_emitted[i] = len(emitter_sols)

        # In case the emitters didn't return any solutions.
        self._cur_solutions = np.concatenate(
            self._cur_solutions, axis=0) if self._cur_solutions else np.empty(
                (0, self._solution_dim))
        return self._cur_solutions

# set the random seed
np.random.seed(42)

nb_variables = 7
nb_emitters = 3
nb_generations = 100
nb_dimensions = 2

archives = [
    GridArchive(
        solution_dim = nb_variables,
        dims = [10] * nb_dimensions,
        ranges = [(-2, 2)] * nb_dimensions
    ) for i in range(2)
]

me_map_elites_archives = archives[0]
me_map_elites_archives_debug = archives[1]


emitters_me_map_elites = [  # Create a pool of different emitters
    EvolutionStrategyEmitter(
        archive = me_map_elites_archives,
        x0 = [0.5] * nb_variables,
        sigma0 = 0.3,
        ranker = "2imp",
    ) for i in range(nb_emitters * 2)
] + [
    EvolutionStrategyEmitter(
        archive = me_map_elites_archives,
        x0 = [0.5] * nb_variables,
        sigma0 = 0.3,
        ranker = "2obj",
    ) for i in range(nb_emitters * 2)
] + [
        GaussianEmitter(
        archive = me_map_elites_archives,
        x0 = [0.5] * nb_variables,
        sigma = 0.3,
    ) for i in range(nb_emitters * 2)
]


emitters_me_map_elites_debug = [  # Create a pool of different emitters
    EvolutionStrategyEmitter(
        archive = me_map_elites_archives_debug,
        x0 = [0.5] * nb_variables,
        sigma0 = 0.3,
        ranker = "2imp",
    ) for i in range(nb_emitters * 2)
] + [
    EvolutionStrategyEmitter(
        archive = me_map_elites_archives_debug,
        x0 = [0.5] * nb_variables,
        sigma0 = 0.3,
        ranker = "2obj",
    ) for i in range(nb_emitters * 2)
] + [
        GaussianEmitter(
        archive = me_map_elites_archives_debug,
        x0 = [0.5] * nb_variables,
        sigma = 0.3,
    ) for i in range(nb_emitters * 2)
]

emitter_calls = np.array([0] * len(emitters_me_map_elites))

scheduler_me_map_elites = BanditScheduler(
    archive = me_map_elites_archives,
    emitter_pool = emitters_me_map_elites,
    num_active = nb_emitters
)

scheduler_me_map_elites_debug = DebugBanditScheduler(
    archive = me_map_elites_archives_debug,
    emitter_pool = emitters_me_map_elites_debug,
    num_active = nb_emitters
)

print("Starting ME-MAP-Elites")
for itr in tqdm.trange(1, nb_generations + 1):
    if itr % 10 == 0:
        print(f"Iteration {itr}/{nb_generations}")
    
    # === CLASSIC ME-MAP-Elites ===
    sols = scheduler_me_map_elites.ask()
    
    obj_array = np.random.rand(len(sols))
    feat_array = np.random.rand(len(sols), nb_dimensions)
    
    scheduler_me_map_elites.tell(obj_array, feat_array)

    if np.sum(scheduler_me_map_elites._active_arr) != nb_emitters:
        print("Classic - Nb. active emitters: "
              f"{np.sum(scheduler_me_map_elites._active_arr)}"
              f" instead of {nb_emitters}"
              f" at iteration {itr}")
    
    # === DEBUG ME-MAP-Elites ===
    sols_debug = scheduler_me_map_elites_debug.ask()

    obj_array = np.random.rand(len(sols_debug))
    feat_array = np.random.rand(len(sols_debug), nb_dimensions)

    scheduler_me_map_elites_debug.tell(obj_array, feat_array)

    if np.sum(scheduler_me_map_elites_debug._active_arr) != nb_emitters:
        print("Debug - Nb. active emitters: "
              f"{np.sum(scheduler_me_map_elites._active_arr)}"
              f" instead of {nb_emitters}"
              f" at iteration {itr}")

Copy link
Member

@btjanaka btjanaka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Tekexa, I ran your example and saw that the bug is exactly as described.

To answer your questions:

  • The code looks fine to me in terms of efficiency. I did add a small comment though.
  • The tests are in the right place!

# active emitters. Activate only inactive emitters.
activate = np.argsort(ucb1)[::-1]
for i in activate:
if self._active_arr.sum() == self._num_active:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding a variable that counts the number of active emitters so that we don't keep calling .sum() here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants