diff --git a/src/openstack_mcp_server/tools/network_tools.py b/src/openstack_mcp_server/tools/network_tools.py index 62e5d55..c1101df 100644 --- a/src/openstack_mcp_server/tools/network_tools.py +++ b/src/openstack_mcp_server/tools/network_tools.py @@ -11,6 +11,8 @@ Port, Router, RouterInterface, + SecurityGroup, + SecurityGroupRule, Subnet, ) @@ -56,6 +58,11 @@ def register_tools(self, mcp: FastMCP): mcp.tool()(self.add_router_interface) mcp.tool()(self.get_router_interfaces) mcp.tool()(self.remove_router_interface) + mcp.tool()(self.get_security_groups) + mcp.tool()(self.create_security_group) + mcp.tool()(self.get_security_group_detail) + mcp.tool()(self.update_security_group) + mcp.tool()(self.delete_security_group) def get_networks( self, @@ -171,9 +178,9 @@ def update_network( update_args = {} - if name is not None: + if name: update_args["name"] = name - if description is not None: + if description: update_args["description"] = description if is_admin_state_up is not None: update_args["admin_state_up"] = is_admin_state_up @@ -305,11 +312,11 @@ def create_subnet( "ip_version": ip_version, "enable_dhcp": is_dhcp_enabled, } - if name is not None: + if name: subnet_args["name"] = name - if description is not None: + if description: subnet_args["description"] = description - if gateway_ip is not None: + if gateway_ip: subnet_args["gateway_ip"] = gateway_ip if dns_nameservers is not None: subnet_args["dns_nameservers"] = dns_nameservers @@ -378,13 +385,13 @@ def update_subnet( """ conn = get_openstack_conn() update_args: dict = {} - if name is not None: + if name: update_args["name"] = name - if description is not None: + if description: update_args["description"] = description if clear_gateway: update_args["gateway_ip"] = None - elif gateway_ip is not None: + elif gateway_ip: update_args["gateway_ip"] = gateway_ip if is_dhcp_enabled is not None: update_args["enable_dhcp"] = is_dhcp_enabled @@ -492,9 +499,9 @@ def set_port_binding( """ conn = get_openstack_conn() update_args: dict = {} - if host_id is not None: + if host_id: update_args["binding_host_id"] = host_id - if vnic_type is not None: + if vnic_type: update_args["binding_vnic_type"] = vnic_type if profile is not None: update_args["binding_profile"] = profile @@ -531,11 +538,11 @@ def create_port( "network_id": network_id, "admin_state_up": is_admin_state_up, } - if name is not None: + if name: port_args["name"] = name - if description is not None: + if description: port_args["description"] = description - if device_id is not None: + if device_id: port_args["device_id"] = device_id if fixed_ips is not None: port_args["fixed_ips"] = fixed_ips @@ -604,13 +611,13 @@ def update_port( """ conn = get_openstack_conn() update_args: dict = {} - if name is not None: + if name: update_args["name"] = name - if description is not None: + if description: update_args["description"] = description if is_admin_state_up is not None: update_args["admin_state_up"] = is_admin_state_up - if device_id is not None: + if device_id: update_args["device_id"] = device_id if security_group_ids is not None: update_args["security_groups"] = security_group_ids @@ -717,13 +724,13 @@ def create_floating_ip( """ conn = get_openstack_conn() ip_args: dict = {"floating_network_id": floating_network_id} - if description is not None: + if description: ip_args["description"] = description - if fixed_ip_address is not None: + if fixed_ip_address: ip_args["fixed_ip_address"] = fixed_ip_address - if port_id is not None: + if port_id: ip_args["port_id"] = port_id - if project_id is not None: + if project_id: ip_args["project_id"] = project_id ip = conn.network.create_ip(**ip_args) return self._convert_to_floating_ip_model(ip) @@ -744,7 +751,7 @@ def attach_floating_ip_to_port( """ conn = get_openstack_conn() update_args: dict = {"port_id": port_id} - if fixed_ip_address is not None: + if fixed_ip_address: update_args["fixed_ip_address"] = fixed_ip_address ip = conn.network.update_ip(floating_ip_id, **update_args) return self._convert_to_floating_ip_model(ip) @@ -783,11 +790,11 @@ def update_floating_ip( """ conn = get_openstack_conn() update_args: dict = {} - if description is not None: + if description: update_args["description"] = description - if port_id is not None: + if port_id: update_args["port_id"] = port_id - if fixed_ip_address is not None: + if fixed_ip_address: update_args["fixed_ip_address"] = fixed_ip_address else: if clear_port: @@ -947,13 +954,13 @@ def create_router( """ conn = get_openstack_conn() router_args: dict = {"admin_state_up": is_admin_state_up} - if name is not None: + if name: router_args["name"] = name - if description is not None: + if description: router_args["description"] = description if is_distributed is not None: router_args["distributed"] = is_distributed - if project_id is not None: + if project_id: router_args["project_id"] = project_id if external_gateway_info is not None: router_args["external_gateway_info"] = ( @@ -1008,9 +1015,9 @@ def update_router( """ conn = get_openstack_conn() update_args: dict = {} - if name is not None: + if name: update_args["name"] = name - if description is not None: + if description: update_args["description"] = description if is_admin_state_up is not None: update_args["admin_state_up"] = is_admin_state_up @@ -1114,9 +1121,9 @@ def remove_router_interface( """ conn = get_openstack_conn() args: dict = {} - if subnet_id is not None: + if subnet_id: args["subnet_id"] = subnet_id - if port_id is not None: + if port_id: args["port_id"] = port_id res = conn.network.remove_interface_from_router(router_id, **args) return RouterInterface( @@ -1161,6 +1168,134 @@ def _sanitize_server_filters(self, filters: dict) -> dict: if not filters: return {} attrs = dict(filters) - # Remove client-only or unsupported filters attrs.pop("status", None) return attrs + + def get_security_groups( + self, + project_id: str | None = None, + name: str | None = None, + id: str | None = None, + ) -> list[SecurityGroup]: + """ + Get the list of Security Groups with optional filtering. + + :param project_id: Filter by project ID + :param name: Filter by security group name + :param id: Filter by security group ID + :return: List of SecurityGroup objects + """ + conn = get_openstack_conn() + filters: dict = {} + if project_id: + filters["project_id"] = project_id + if name: + filters["name"] = name + if id: + filters["id"] = id + security_groups = conn.network.security_groups(**filters) + return [ + self._convert_to_security_group_model(sg) for sg in security_groups + ] + + def create_security_group( + self, + name: str, + description: str | None = None, + project_id: str | None = None, + ) -> SecurityGroup: + """ + Create a new Security Group. + + :param name: Security group name + :param description: Security group description + :param project_id: Project ID to assign ownership + :return: Created SecurityGroup object + """ + conn = get_openstack_conn() + args: dict = {"name": name} + if description: + args["description"] = description + if project_id: + args["project_id"] = project_id + sg = conn.network.create_security_group(**args) + return self._convert_to_security_group_model(sg) + + def get_security_group_detail( + self, security_group_id: str + ) -> SecurityGroup: + """ + Get detailed information about a specific Security Group. + + :param security_group_id: ID of the security group to retrieve + :return: SecurityGroup details + """ + conn = get_openstack_conn() + sg = conn.network.get_security_group(security_group_id) + return self._convert_to_security_group_model(sg) + + def update_security_group( + self, + security_group_id: str, + name: str | None = None, + description: str | None = None, + ) -> SecurityGroup: + """ + Update an existing Security Group. + + :param security_group_id: ID of the security group to update + :param name: New security group name + :param description: New security group description + :return: Updated SecurityGroup object + """ + conn = get_openstack_conn() + update_args: dict = {} + if name: + update_args["name"] = name + if description: + update_args["description"] = description + if not update_args: + current = conn.network.get_security_group(security_group_id) + return self._convert_to_security_group_model(current) + sg = conn.network.update_security_group( + security_group_id, **update_args + ) + return self._convert_to_security_group_model(sg) + + def delete_security_group(self, security_group_id: str) -> None: + """ + Delete a Security Group. + + :param security_group_id: ID of the security group to delete + :return: None + """ + conn = get_openstack_conn() + conn.network.delete_security_group( + security_group_id, ignore_missing=False + ) + return None + + def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup: + """ + Convert an OpenStack Security Group object to a SecurityGroup pydantic model. + + :param openstack_sg: OpenStack security group object + :return: Pydantic SecurityGroup model + """ + rule_ids: list[str] | None = None + rules = getattr(openstack_sg, "security_group_rules", None) + if rules is not None: + dto_rules = [ + SecurityGroupRule.model_validate(r, from_attributes=True) + for r in rules + ] + rule_ids = [str(r.id) for r in dto_rules if getattr(r, "id", None)] + + return SecurityGroup( + id=openstack_sg.id, + name=getattr(openstack_sg, "name", None), + status=getattr(openstack_sg, "status", None), + description=getattr(openstack_sg, "description", None), + project_id=getattr(openstack_sg, "project_id", None), + security_group_rule_ids=rule_ids, + ) diff --git a/tests/tools/test_network_tools.py b/tests/tools/test_network_tools.py index c3c7c13..5108bed 100644 --- a/tests/tools/test_network_tools.py +++ b/tests/tools/test_network_tools.py @@ -11,6 +11,7 @@ Port, Router, RouterInterface, + SecurityGroup, Subnet, ) @@ -1368,6 +1369,138 @@ def test_update_reassign_bulk_and_auto_assign_floating_ip( auto = tools.assign_first_available_floating_ip("ext-net", "port-9") assert isinstance(auto, FloatingIP) + def test_get_security_groups_filters(self, mock_openstack_connect_network): + """Test getting security groups with filters.""" + mock_conn = mock_openstack_connect_network + + sg = Mock() + sg.id = "sg-1" + sg.name = "default" + sg.status = None + sg.description = "desc" + sg.project_id = "proj-1" + sg.security_group_rules = [ + {"id": "r-1"}, + {"id": "r-2"}, + ] + + expected_sg = SecurityGroup( + id="sg-1", + name="default", + status=None, + description="desc", + project_id="proj-1", + security_group_rule_ids=["r-1", "r-2"], + ) + + tools = self.get_network_tools() + + # Test by project_id and name + mock_conn.network.security_groups.return_value = [sg] + res = tools.get_security_groups(project_id="proj-1", name="default") + assert res == [expected_sg] + mock_conn.network.security_groups.assert_called_with( + project_id="proj-1", name="default" + ) + + # Test by id + mock_conn.network.security_groups.return_value = [sg] + res = tools.get_security_groups(id="sg-1") + assert res == [expected_sg] + mock_conn.network.security_groups.assert_called_with(id="sg-1") + + def test_create_security_group(self, mock_openstack_connect_network): + mock_conn = mock_openstack_connect_network + sg = Mock() + sg.id = "sg-2" + sg.name = "web" + sg.status = None + sg.description = "for web" + sg.project_id = "proj-1" + sg.security_group_rules = [] + mock_conn.network.create_security_group.return_value = sg + + tools = self.get_network_tools() + res = tools.create_security_group( + name="web", description="for web", project_id="proj-1" + ) + assert res == SecurityGroup( + id="sg-2", + name="web", + status=None, + description="for web", + project_id="proj-1", + security_group_rule_ids=[], + ) + mock_conn.network.create_security_group.assert_called_once_with( + name="web", description="for web", project_id="proj-1" + ) + + def test_get_security_group_detail(self, mock_openstack_connect_network): + mock_conn = mock_openstack_connect_network + sg = Mock() + sg.id = "sg-3" + sg.name = "db" + sg.status = None + sg.description = None + sg.project_id = None + sg.security_group_rules = None + mock_conn.network.get_security_group.return_value = sg + + tools = self.get_network_tools() + res = tools.get_security_group_detail("sg-3") + assert res.id == "sg-3" + mock_conn.network.get_security_group.assert_called_once_with("sg-3") + + def test_update_security_group(self, mock_openstack_connect_network): + mock_conn = mock_openstack_connect_network + sg = Mock() + sg.id = "sg-4" + sg.name = "new-name" + sg.status = None + sg.description = "new-desc" + sg.project_id = None + sg.security_group_rules = [] + mock_conn.network.update_security_group.return_value = sg + + tools = self.get_network_tools() + res = tools.update_security_group( + security_group_id="sg-4", name="new-name", description="new-desc" + ) + assert res.name == "new-name" + mock_conn.network.update_security_group.assert_called_once_with( + "sg-4", name="new-name", description="new-desc" + ) + + def test_update_security_group_no_fields_returns_current( + self, mock_openstack_connect_network + ): + mock_conn = mock_openstack_connect_network + current = Mock() + current.id = "sg-5" + current.name = "cur" + current.status = None + current.description = None + current.project_id = None + current.security_group_rules = None + mock_conn.network.get_security_group.return_value = current + + tools = self.get_network_tools() + res = tools.update_security_group("sg-5") + assert res.id == "sg-5" + mock_conn.network.get_security_group.assert_called_once_with("sg-5") + + def test_delete_security_group(self, mock_openstack_connect_network): + mock_conn = mock_openstack_connect_network + mock_conn.network.delete_security_group.return_value = None + + tools = self.get_network_tools() + res = tools.delete_security_group("sg-6") + assert res is None + mock_conn.network.delete_security_group.assert_called_once_with( + "sg-6", ignore_missing=False + ) + def test_get_routers_with_filters(self, mock_openstack_connect_network): mock_conn = mock_openstack_connect_network