Clustering Module

Purpose of the Clustering Module

When operating multiple agents, it is crucial to determine how to coordinate them. In RRS, many teams adopt coordination methods that assign each agent a designated area of responsibility (as well as other coordination methods). To assign these areas, objects on the map need to be divided into several groups. A module called the clustering module is used to manage these groupings.

In this document, we will implement a clustering module using algorithms commonly employed by many teams participating in world championships.

Overview of the Clustering Module to Be Developed

The module developed in this document, as shown in the image below,

  1. Divides the objects on the map into sections equal to the number of agents using the k-means++ algorithm.

  2. Assigns the sections to the agents in a one-to-one manner using the Hungarian algorithm, minimizing the total distance between them.

Clustering Image

Implementation of the Clustering Module

Note

The following tasks assume that the current directory is the root directory of the project.

First, create a file to write the clustering module.

mkdir -p src/<your_team_name>/module/algorithm
touch src/<your_team_name>/module/algorithm/k_means_pp_clustering.py

Next, implement the clustering module. Write the following code into k_means_pp_clustering.py.

import numpy as np
from adf_core_python.core.agent.develop.develop_data import DevelopData
from adf_core_python.core.agent.info.agent_info import AgentInfo
from adf_core_python.core.agent.info.scenario_info import ScenarioInfo, ScenarioInfoKeys
from adf_core_python.core.agent.info.world_info import WorldInfo
from adf_core_python.core.agent.module.module_manager import ModuleManager
from adf_core_python.core.component.module.algorithm.clustering import Clustering
from adf_core_python.core.logger.logger import get_logger
from rcrs_core.connection.URN import Entity as EntityURN
from rcrs_core.entities.ambulanceCenter import AmbulanceCentre
from rcrs_core.entities.building import Building
from rcrs_core.entities.entity import Entity
from rcrs_core.entities.fireStation import FireStation
from rcrs_core.entities.gasStation import GasStation
from rcrs_core.entities.hydrant import Hydrant
from rcrs_core.entities.policeOffice import PoliceOffice
from rcrs_core.entities.refuge import Refuge
from rcrs_core.entities.road import Road
from rcrs_core.worldmodel.entityID import EntityID
from scipy.optimize import linear_sum_assignment
from sklearn.cluster import KMeans

# クラスタリングのシード値
SEED = 42


class KMeansPPClustering(Clustering):
    def __init__(
        self,
        agent_info: AgentInfo,
        world_info: WorldInfo,
        scenario_info: ScenarioInfo,
        module_manager: ModuleManager,
        develop_data: DevelopData,
    ) -> None:
        super().__init__(
            agent_info, world_info, scenario_info, module_manager, develop_data
        )
        # ロガーの取得
        self._logger = get_logger(f"{self.__class__.__name__}")

        # クラスター数の設定
        self._cluster_number: int = 1
        match agent_info.get_myself().get_urn():
            # エージェントのクラスに応じてクラスター数を設定
            case EntityURN.AMBULANCE_TEAM:
                self._cluster_number = scenario_info.get_value(
                    ScenarioInfoKeys.SCENARIO_AGENTS_AT,
                    1,
                )
            case EntityURN.POLICE_FORCE:
                self._cluster_number = scenario_info.get_value(
                    ScenarioInfoKeys.SCENARIO_AGENTS_PF,
                    1,
                )
            case EntityURN.FIRE_BRIGADE:
                self._cluster_number = scenario_info.get_value(
                    ScenarioInfoKeys.SCENARIO_AGENTS_FB,
                    1,
                )

        # 自分と同じクラスのエージェントのリストを取得
        self._agents: list[Entity] = world_info.get_entities_of_types(
            [
                agent_info.get_myself().__class__,
            ]
        )

        # クラスタリング結果を保持する変数
        self._cluster_entities: list[list[Entity]] = []

        # クラスタリング対象のエンティティのリストを取得
        self._entities: list[Entity] = world_info.get_entities_of_types(
            [
                AmbulanceCentre,
                FireStation,
                GasStation,
                Hydrant,
                PoliceOffice,
                Refuge,
                Road,
                Building,
            ]
        )

    def calculate(self) -> Clustering:
        return self

    def get_cluster_number(self) -> int:
        """
        クラスター数を取得する

        Returns
        -------
        int
            クラスター数
        """
        return self._cluster_number

    def get_cluster_index(self, entity_id: EntityID) -> int:
        """
        エージェントに割り当てられたクラスターのインデックスを取得する

        Parameters
        ----------
        entity_id : EntityID
            エージェントのID

        Returns
        -------
        int
            クラスターのインデックス
        """
        return self._agent_cluster_indices.get(entity_id, 0)

    def get_cluster_entities(self, cluster_index: int) -> list[Entity]:
        """
        クラスターのエンティティのリストを取得する

        Parameters
        ----------
        cluster_index : int
            クラスターのインデックス

        Returns
        -------
        list[Entity]
            クラスターのエンティティのリスト
        """
        if cluster_index >= len(self._cluster_entities):
            return []
        return self._cluster_entities[cluster_index]

    def get_cluster_entity_ids(self, cluster_index: int) -> list[EntityID]:
        """
        クラスターのエンティティのIDのリストを取得する

        Parameters
        ----------
        cluster_index : int
            クラスターのインデックス

        Returns
        -------
        list[EntityID]
            クラスターのエンティティのIDのリスト
        """
        if cluster_index >= len(self._cluster_entities):
            return []
        return [entity.get_id() for entity in self._cluster_entities[cluster_index]]

    def prepare(self) -> Clustering:
        """
        エージェントの起動時に一回のみ実行される処理
        """
        super().prepare()
        if self.get_count_prepare() > 1:
            return self

        # クラスタリングを実行
        kmeans_pp = self._perform_kmeans_pp(self._entities, self._cluster_number)

        # クラスタリング結果を保持
        self._cluster_entities = [[] for _ in range(self._cluster_number)]
        for entity, cluster_index in zip(self._entities, kmeans_pp.labels_):
            self._cluster_entities[cluster_index].append(entity)

        # エージェントとクラスターのエンティティの距離を計算し、最も全体の合計の距離が短くなるようにエージェントとクラスターを対応付ける
        agent_cluster_indices = self._agent_cluster_assignment(
            self._agents, kmeans_pp.cluster_centers_
        )

        # エージェントとクラスターの対応付け結果を保持
        self._agent_cluster_indices = {
            entity.get_id(): cluster_index
            for entity, cluster_index in zip(self._agents, agent_cluster_indices)
        }

        # デバッグ用のログ出力
        self._logger.info(
            f"Clustered entities: {[[entity.get_id().get_value() for entity in cluster] for cluster in self._cluster_entities]}"
        )

        self._logger.info(
            f"Agent cluster indices: {[([self._world_info.get_entity(entity_id).get_x(), self._world_info.get_entity(entity_id).get_y()], int(cluster_index)) for entity_id, cluster_index in self._agent_cluster_indices.items()]}"
        )

        return self

    def _perform_kmeans_pp(self, entities: list[Entity], n_clusters: int = 1) -> KMeans:
        """
        K-means++法によるクラスタリングを実行する

        Parameters
        ----------
        entities : list[Entity]
            クラスタリング対象のエンティティのリスト

        n_clusters : int, optional
            クラスター数, by default 1

        Returns
        -------
        KMeans
            クラスタリング結果
        """
        entity_positions: np.ndarray = np.array(
            [
                [entity.get_x(), entity.get_y()]
                for entity in entities
                if entity.get_x() is not None and entity.get_y() is not None
            ]
        )

        entity_positions = entity_positions.reshape(-1, 2)
        kmeans_pp = KMeans(
            n_clusters=n_clusters,
            init="k-means++",
            random_state=SEED,
        )
        kmeans_pp.fit(entity_positions)
        return kmeans_pp

    def _agent_cluster_assignment(
        self, agents: list[Entity], cluster_positions: np.ndarray
    ) -> np.ndarray:
        """
        エージェントとクラスターの対応付けを行う

        Parameters
        ----------
        agents : list[Entity]
            エージェントのリスト

        cluster_positions : np.ndarray
            クラスターの位置のリスト

        Returns
        -------
        np.ndarray
            エージェントとクラスターの対応付け結果
        """
        # エージェントの位置のリストを取得
        agent_positions = np.array(
            [
                [agent.get_x(), agent.get_y()]
                for agent in agents
                if agent.get_x() is not None and agent.get_y() is not None
            ]
        )

        # エージェントとクラスターの距離行列を計算
        agent_positions = agent_positions.reshape(-1, 2)
        cost_matrix = np.linalg.norm(
            agent_positions[:, np.newaxis] - cluster_positions, axis=2
        )
        # ハンガリアンアルゴリズムによりエージェントとクラスターの対応付けを行う
        _, col_ind = linear_sum_assignment(cost_matrix)
        return col_ind

The implementation of k-means++ uses the KMeans class from scikit-learn. The KMeans class clusters objects on the map based on the number of clusters specified by n_clusters. The clustering results are stored in the labels_ attribute, and the coordinates of each cluster center are stored in the cluster_centers_ attribute.

The implementation of the Hungarian algorithm uses the linear_sum_assignment function from scipy. The linear_sum_assignment function takes a cost matrix as an argument and performs optimal assignment.

Next, register the created module. Edit config/module.yaml as follows.

SampleSearch:
  PathPlanning: adf_core_python.implement.module.algorithm.a_star_path_planning.AStarPathPlanning
  Clustering: src.<your_team_name>.module.algorithm.k_means_pp_clustering.KMeansPPClustering

SampleHumanDetector:
  Clustering: src.<your_team_name>.module.algorithm.k_means_pp_clustering.KMeansPPClustering

Open two terminals.

Open one terminal and start the simulation server with the following command:

# Terminal A
cd WORKING_DIR/rcrs-server/scripts
./start-comprun.sh -m ../maps/tutorial_ambulance_team_only/map -c ../maps/tutorial_ambulance_team_only/config

Then open another terminal and start the agent:

# Terminal B
cd WORKING_DIR/<your_team_name>
python main.py

When the agent starts, the clustering results will be displayed in the standard output.

[info     ] Clustered entities: [[257, 259, 262, 263, 270, 278, 280, 297, 336, 913, 914, 915, 916, 917, 918, 919, 933, 941, 942, 943, 944, 945, 946, 947, 974, 250, 253], [349, 896, 899, 902, 934, 960, 968, 969, 970, 971, 248, 251], [258, 266, 268, 269, 274, 275, 279, 920, 921, 922, 923, 924, 925, 926, 927, 928, 929, 932, 948, 949, 950, 951, 952, 953, 954, 955, 956, 957, 958, 959, 975, 976, 254, 255], [256, 271, 273, 281, 296, 298, 314, 330, 903, 904, 905, 910, 911, 912, 935, 936, 937, 938, 939, 940, 247, 249]] [KMeansPPClustering]
[info     ] Agent cluster indices: [([89544, 19925], 1), ([69989, 120063], 0), ([130029, 50380], 2), ([29898, 59056], 3)] [KMeansPPClustering]

As it is, the clustering results are not very clear, so let’s display the clustering results on the map.

Download the cluster visualization script and extract it. Then, in main.py, modify the following part.

# クラスタリング結果
clusters = []

Copy the array after Clustered entities: in the output and paste it.

Example

# クラスタリング結果
clusters = [[257, 259, 262, 263, 270, 278, 280, 297, 336, 913, 914, 915, 916, 917, 918, 919, 933, 941, 942, 943, 944, 945, 946, 947, 974, 250, 253], [349, 896, 899, 902, 934, 960, 968, 969, 970, 971, 248, 251], [258, 266, 268, 269, 274, 275, 279, 920, 921, 922, 923, 924, 925, 926, 927, 928, 929, 932, 948, 949, 950, 951, 952, 953, 954, 955, 956, 957, 958, 959, 975, 976, 254, 255], [256, 271, 273, 281, 296, 298, 314, 330, 903, 904, 905, 910, 911, 912, 935, 936, 937, 938, 939, 940, 247, 249]]

After pasting, execute the following command.

pip install -r requirements.txt
python main.py

An image like the one below will be output.

Clustering Image