Search Module
Overview of the Search Module
The module to be developed this time is the information search target determination (Search
) module using the KMeansPPClustering
module. The clustering module divides the areas of responsibility among agents and randomly selects search targets within the assigned areas.
Preparation for Implementing the Search Module
Note
The following tasks assume that the current directory is the root directory of the project.
First, create a file to write the search module.
touch src/<your_team_name>/module/complex/k_means_pp_search.py
Next, implement the search module. Write the following code into k_means_pp_search.py
. This will be the template for the search module to be implemented this time.
import random
from typing import Optional, cast
from rcrs_core.entities.building import Building
from rcrs_core.entities.entity import Entity
from rcrs_core.entities.refuge import Refuge
from rcrs_core.worldmodel.entityID import EntityID
from adf_core_python.core.agent.develop.develop_data import DevelopData
from adf_core_python.core.agent.info.agent_info import AgentInfo
from adf_core_python.core.agent.info.scenario_info import ScenarioInfo
from adf_core_python.core.agent.info.world_info import WorldInfo
from adf_core_python.core.agent.module.module_manager import ModuleManager
from adf_core_python.core.component.module.algorithm.clustering import Clustering
from adf_core_python.core.component.module.complex.search import Search
from adf_core_python.core.logger.logger import get_agent_logger
class KMeansPPSearch(Search):
def __init__(
self,
agent_info: AgentInfo,
world_info: WorldInfo,
scenario_info: ScenarioInfo,
module_manager: ModuleManager,
develop_data: DevelopData,
) -> None:
super().__init__(
agent_info, world_info, scenario_info, module_manager, develop_data
)
self._result: Optional[EntityID] = None
# ロガーの取得
self._logger = get_agent_logger(
f"{self.__class__.__module__}.{self.__class__.__qualname__}",
self._agent_info,
)
def calculate(self) -> Search:
return self
def get_target_entity_id(self) -> Optional[EntityID]:
return self._result
Module Registration
Next, register the created module. Modify the relevant section of config/module.yaml
as follows.
DefaultTacticsAmbulanceTeam:
Search: src.<your_team_name>.module.complex.k_means_pp_search.KMeansPPSearch
DefaultTacticsFireBrigade:
Search: src.<your_team_name>.module.complex.k_means_pp_search.KMeansPPSearch
DefaultTacticsPoliceForce:
Search: src.<your_team_name>.module.complex.k_means_pp_search.KMeansPPSearch
Module Implementation
First, make it possible to call the KMeansPPClustering
module.
Add the following code to config/module.yaml
.
KMeansPPSearch:
Clustering: src.<your_team_name>.module.algorithm.k_means_pp_clustering.KMeansPPClustering
Next, make it possible to call the KMeansPPClustering
module in the KMeansPPSearch
module.
Add the following code to k_means_pp_search.py
.
class KMeansPPSearch(Search):
def __init__(
self,
agent_info: AgentInfo,
world_info: WorldInfo,
scenario_info: ScenarioInfo,
module_manager: ModuleManager,
develop_data: DevelopData,
) -> None:
super().__init__(
agent_info, world_info, scenario_info, module_manager, develop_data
)
self._result: Optional[EntityID] = None
# ロガーの取得
self._logger = get_agent_logger(
f"{self.__class__.__module__}.{self.__class__.__qualname__}",
self._agent_info,
)
# クラスタリングモジュールの読み込み
self._clustering: Clustering = cast(
Clustering,
module_manager.get_module(
# config.yamlに登録したkey
"KMeansPPSearch.Clustering",
# 上記のkeyが登録されていなかった場合のデフォルトモジュール
"adf_core_python.implement.module.algorithm.k_means_clustering.KMeansClustering",
),
)
# クラスタリングモジュールの登録
self.register_sub_module(self._clustering)
Then, modify the calculate
method to call the clustering module and determine the search target.
Add the following code to k_means_pp_search.py
.
def calculate(self) -> Search:
# 自エージェントのエンティティIDを取得
me: EntityID = self._agent_info.get_entity_id()
# 自エージェントが所属するクラスターのインデックスを取得
allocated_cluster_index: int = self._clustering.get_cluster_index(me)
# クラスター内のエンティティIDを取得
cluster_entity_ids: list[EntityID] = self._clustering.get_cluster_entity_ids(
allocated_cluster_index
)
# 乱数で選択
if cluster_entity_ids:
self._result = random.choice(cluster_entity_ids)
# ログ出力
self._logger.info(f"Target entity ID: {self._result}")
return self
This completes the implementation of the KMeansPPSearch
module using the KMeansPPClustering
module.
Open two terminals.
Open one terminal and start the simulation server with the following command:
# Terminal A
cd WORKING_DIR/rcrs-server/scripts
./start-comprun.sh -m ../maps/tutorial_ambulance_team_only/map -c ../maps/tutorial_ambulance_team_only/config
Then open another terminal and start the agent:
# Terminal B
cd WORKING_DIR/<your_team_name>
python main.py
Module Improvement
The KMeansPPSearch
module randomly selects search targets within the assigned areas using the clustering module. As a result, the following issues arise:
The search target changes with each step
The search target changes before reaching the goal, making it difficult to reach the goal
Randomly selecting search targets in various places makes efficient searching impossible
Selecting already searched entities as search targets again makes efficient searching impossible
Selecting distant entities as search targets even though there are unexplored entities nearby
These are some of the issues.
Challenges
Improve the KMeansPPSearch
module and implement a module that performs more efficient searches.
Warning
There may be other points for improvement besides the issues listed here, so feel free to address them as well.
Warning
There are also some points for improvement in the example program, so try to fix them if you have time.
Issue of the search target changing with each step
Hints for the Approach
Do not change the search target until the initially selected target is reached
Example Program
def calculate(self) -> Search:
# 自エージェントのエンティティIDを取得
me: EntityID = self._agent_info.get_entity_id()
# 自エージェントが所属するクラスターのインデックスを取得
allocated_cluster_index: int = self._clustering.get_cluster_index(me)
# クラスター内のエンティティIDを取得
cluster_entity_ids: list[EntityID] = self._clustering.get_cluster_entity_ids(
allocated_cluster_index
)
# 探索対象をすでに選んでいる場合
if self._result:
# 自エージェントのいる場所のエンティティIDを取得
my_position = self._agent_info.get_position_entity_id()
# 探索対象の場所のエンティティIDを取得
target_position = self._world_info.get_entity_position(self._result)
# 自エージェントのいる場所と探索対象の場所が一致している場合、探索対象をリセット
if my_position == target_position:
# 探索対象をリセット
self._result = None
# 探索対象が未選択の場合
if not self._result and cluster_entity_ids:
self._result = random.choice(cluster_entity_ids)
# ログ出力
self._logger.info(f"Target entity ID: {self._result}")
return self
Issue of selecting already searched entities as search targets again
Hints for the Approach
Record already searched entities in some way to avoid selecting them as search targets again
Example Program
def __init__(
self,
agent_info: AgentInfo,
world_info: WorldInfo,
scenario_info: ScenarioInfo,
module_manager: ModuleManager,
develop_data: DevelopData,
) -> None:
super().__init__(
agent_info, world_info, scenario_info, module_manager, develop_data
)
self._result: Optional[EntityID] = None
# ロガーの取得
self._logger = get_agent_logger(
f"{self.__class__.__module__}.{self.__class__.__qualname__}",
self._agent_info,
)
# クラスタリングモジュールの読み込み
self._clustering: Clustering = cast(
Clustering,
module_manager.get_module(
# config.yamlに登録したkey
"KMeansPPSearch.Clustering",
# 上記のkeyが登録されていなかった場合のデフォルトモジュール
"adf_core_python.implement.module.algorithm.k_means_clustering.KMeansClustering",
),
)
# クラスタリングモジュールの要録
self.register_sub_module(self._clustering)
# 探索したいエンティティIDのリスト(追加)
self._search_entity_ids: list[EntityID] = []
def calculate(self) -> Search:
# 探索したいエンティティIDのリストが空の場合
if not self._search_entity_ids:
# 自エージェントのエンティティIDを取得
me: EntityID = self._agent_info.get_entity_id()
# 自エージェントが所属するクラスターのインデックスを取得
allocated_cluster_index: int = self._clustering.get_cluster_index(me)
# クラスター内のエンティティIDを取得(変更)
self._search_entity_ids: list[EntityID] = (
self._clustering.get_cluster_entity_ids(allocated_cluster_index)
)
# 探索対象をすでに選んでいる場合
if self._result:
# 自エージェントのいる場所のエンティティIDを取得
my_position = self._agent_info.get_position_entity_id()
# 探索対象の場所のエンティティIDを取得
target_position = self._world_info.get_entity_position(self._result)
# 自エージェントのいる場所と探索対象の場所が一致している場合、探索対象をリセット
if my_position == target_position:
# 探索したいエンティティIDのリストから探索対象を削除
self._search_entity_ids.remove(self._result)
# 探索対象をリセット
self._result = None
# 探索対象が未選択の場合(変更)
if not self._result and self._search_entity_ids:
self._result = random.choice(self._search_entity_ids)
# ログ出力
self._logger.info(f"Target entity ID: {self._result}")
return self
Selecting distant entities as search targets even though there are unexplored entities nearby
Hints for the Approach
Calculate the distance between entities and select the closest entity as the search target
Example Program
def calculate(self) -> Search:
# 探索したいエンティティIDのリストが空の場合
if not self._search_entity_ids:
# 自エージェントのエンティティIDを取得
me: EntityID = self._agent_info.get_entity_id()
# 自エージェントが所属するクラスターのインデックスを取得
allocated_cluster_index: int = self._clustering.get_cluster_index(me)
# クラスター内のエンティティIDを取得
self._search_entity_ids: list[EntityID] = (
self._clustering.get_cluster_entity_ids(allocated_cluster_index)
)
# 探索対象をすでに選んでいる場合
if self._result:
# 自エージェントのいる場所のエンティティIDを取得
my_position = self._agent_info.get_position_entity_id()
# 探索対象の場所のエンティティIDを取得
target_position = self._world_info.get_entity_position(self._result)
# 自エージェントのいる場所と探索対象の場所が一致している場合、探索対象をリセット
if my_position == target_position:
# 探索したいエンティティIDのリストから探索対象を削除
self._search_entity_ids.remove(self._result)
# 探索対象をリセット
self._result = None
# 探索対象が未選択の場合
if not self._result and self._search_entity_ids:
nearest_entity_id: Optional[EntityID] = None
nearest_distance: float = float("inf")
me: EntityID = self._agent_info.get_entity_id()
# 探索対象の中で自エージェントに最も近いエンティティIDを選択(変更)
for entity_id in self._search_entity_ids:
distance = self._world_info.get_distance(me, entity_id)
if distance < nearest_distance:
nearest_entity_id = entity_id
nearest_distance = distance
self._result = nearest_entity_id
# ログ出力
self._logger.info(f"Target entity ID: {self._result}")
return self