diff --git a/src/ros2_medkit_gateway/src/http/handlers/data_handlers.cpp b/src/ros2_medkit_gateway/src/http/handlers/data_handlers.cpp index 78017c2..9f960f1 100644 --- a/src/ros2_medkit_gateway/src/http/handlers/data_handlers.cpp +++ b/src/ros2_medkit_gateway/src/http/handlers/data_handlers.cpp @@ -81,7 +81,10 @@ void DataHandlers::handle_list_data(const httplib::Request & req, httplib::Respo ext.ros2_type(topic_type); try { auto type_info = type_introspection->get_type_info(topic_type); - ext.type_info(type_info.schema); + json type_info_obj; + type_info_obj["schema"] = type_info.schema; + type_info_obj["default_value"] = type_info.default_value; + ext.type_info(type_info_obj); } catch (const std::exception & e) { RCLCPP_DEBUG(HandlerContext::logger(), "Could not get type info for topic '%s': %s", topic.name.c_str(), e.what()); @@ -176,7 +179,10 @@ void DataHandlers::handle_get_data_item(const httplib::Request & req, httplib::R auto type_introspection = data_access_mgr->get_type_introspection(); try { auto type_info = type_introspection->get_type_info(sample.message_type); - ext.type_info(type_info.schema); + json type_info_obj; + type_info_obj["schema"] = type_info.schema; + type_info_obj["default_value"] = type_info.default_value; + ext.type_info(type_info_obj); } catch (const std::exception & e) { RCLCPP_DEBUG(HandlerContext::logger(), "Could not get type info for topic '%s': %s", full_topic_path.c_str(), e.what()); diff --git a/src/ros2_medkit_gateway/src/http/handlers/fault_handlers.cpp b/src/ros2_medkit_gateway/src/http/handlers/fault_handlers.cpp index 14d6268..b1f92fd 100644 --- a/src/ros2_medkit_gateway/src/http/handlers/fault_handlers.cpp +++ b/src/ros2_medkit_gateway/src/http/handlers/fault_handlers.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -89,6 +90,35 @@ std::string validate_fault_code(const std::string & fault_code) { return ""; } +/// Helper to filter faults JSON array by a set of namespace prefixes +/// Keeps faults where any reporting_source starts with any of the given prefixes +json filter_faults_by_sources(const json & faults_array, const std::set & source_prefixes) { + json filtered = json::array(); + for (const auto & fault : faults_array) { + if (!fault.contains("reporting_sources")) { + continue; + } + const auto & sources = fault["reporting_sources"]; + bool matches = false; + for (const auto & src : sources) { + const std::string src_str = src.get(); + for (const auto & prefix : source_prefixes) { + if (src_str.rfind(prefix, 0) == 0) { + matches = true; + break; + } + } + if (matches) { + break; + } + } + if (matches) { + filtered.push_back(fault); + } + } + return filtered; +} + } // namespace void FaultHandlers::handle_list_all_faults(const httplib::Request & req, httplib::Response & res) { @@ -170,7 +200,6 @@ void FaultHandlers::handle_list_faults(const httplib::Request & req, httplib::Re {{"entity_id", entity_id}}); return; } - std::string namespace_path = entity_info.namespace_path; auto filter = parse_fault_status_param(req); if (!filter.is_valid) { @@ -188,6 +217,92 @@ void FaultHandlers::handle_list_faults(const httplib::Request & req, httplib::Re bool include_clusters = req.get_param_value("include_clusters") == "true"; auto fault_mgr = ctx_.node()->get_fault_manager(); + + // For Functions, aggregate faults from all host apps + // Functions don't have a single namespace_path - they host apps from potentially different namespaces + if (entity_info.type == EntityType::FUNCTION) { + // Get all faults (no namespace filter) + auto result = fault_mgr->get_faults("", filter.include_pending, filter.include_confirmed, filter.include_cleared, + include_muted, include_clusters); + + if (!result.success) { + HandlerContext::send_error(res, StatusCode::ServiceUnavailable_503, ERR_SERVICE_UNAVAILABLE, + "Failed to get faults", + {{"details", result.error_message}, {entity_info.id_field, entity_id}}); + return; + } + + // Collect host app FQNs for filtering + const auto & cache = ctx_.node()->get_thread_safe_cache(); + auto agg_configs = cache.get_entity_configurations(entity_id); + std::set host_fqns; + for (const auto & node : agg_configs.nodes) { + if (!node.node_fqn.empty()) { + host_fqns.insert(node.node_fqn); + } + } + + // Filter faults to only those from function's host apps + json filtered_faults = filter_faults_by_sources(result.data["faults"], host_fqns); + + // Build response + json response = {{"items", filtered_faults}}; + + XMedkit ext; + ext.entity_id(entity_id); + ext.add("aggregation_level", "function"); + ext.add("count", filtered_faults.size()); + ext.add("host_count", host_fqns.size()); + + response["x-medkit"] = ext.build(); + HandlerContext::send_json(res, response); + return; + } + + // For Components, aggregate faults from all hosted apps + // Components group Apps, so we filter by the apps' FQNs rather than namespace (which is too broad) + if (entity_info.type == EntityType::COMPONENT) { + // Get all faults (no namespace filter) + auto result = fault_mgr->get_faults("", filter.include_pending, filter.include_confirmed, filter.include_cleared, + include_muted, include_clusters); + + if (!result.success) { + HandlerContext::send_error(res, StatusCode::ServiceUnavailable_503, ERR_SERVICE_UNAVAILABLE, + "Failed to get faults", + {{"details", result.error_message}, {entity_info.id_field, entity_id}}); + return; + } + + // Collect hosted app FQNs for filtering + const auto & cache = ctx_.node()->get_thread_safe_cache(); + auto app_ids = cache.get_apps_for_component(entity_id); + std::set app_fqns; + for (const auto & app_id : app_ids) { + auto app = cache.get_app(app_id); + if (app && app->bound_fqn.has_value() && !app->bound_fqn->empty()) { + app_fqns.insert(app->bound_fqn.value()); + } + } + + // Filter faults to only those from component's hosted apps + json filtered_faults = filter_faults_by_sources(result.data["faults"], app_fqns); + + // Build response + json response = {{"items", filtered_faults}}; + + XMedkit ext; + ext.entity_id(entity_id); + ext.add("aggregation_level", "component"); + ext.add("count", filtered_faults.size()); + ext.add("app_count", app_fqns.size()); + + response["x-medkit"] = ext.build(); + HandlerContext::send_json(res, response); + return; + } + + // For other entity types (App, Area), use namespace_path filtering + std::string namespace_path = entity_info.namespace_path; auto result = fault_mgr->get_faults(namespace_path, filter.include_pending, filter.include_confirmed, filter.include_cleared, include_muted, include_clusters); diff --git a/src/ros2_medkit_gateway/src/http/handlers/handler_context.cpp b/src/ros2_medkit_gateway/src/http/handlers/handler_context.cpp index 83ce391..afe882c 100644 --- a/src/ros2_medkit_gateway/src/http/handlers/handler_context.cpp +++ b/src/ros2_medkit_gateway/src/http/handlers/handler_context.cpp @@ -101,8 +101,8 @@ EntityInfo HandlerContext::get_entity_info(const std::string & entity_id) const // Search areas (O(1) lookup) if (auto area = cache.get_area(entity_id)) { info.type = EntityType::AREA; - info.namespace_path = ""; // Areas don't have namespace_path - info.fqn = ""; + info.namespace_path = area->namespace_path; // Use area's namespace for fault filtering + info.fqn = area->namespace_path; info.id_field = "area_id"; info.error_name = "Area"; return info; diff --git a/src/ros2_medkit_gateway/src/native_topic_sampler.cpp b/src/ros2_medkit_gateway/src/native_topic_sampler.cpp index 423d666..5034bb9 100644 --- a/src/ros2_medkit_gateway/src/native_topic_sampler.cpp +++ b/src/ros2_medkit_gateway/src/native_topic_sampler.cpp @@ -280,24 +280,17 @@ TopicSampleResult NativeTopicSampler::sample_topic(const std::string & topic_nam RCLCPP_DEBUG(node_->get_logger(), "sample_topic: Created GenericSubscription for '%s'", topic_name.c_str()); - // Spin with timeout + // Wait for message using future with timeout + // The main executor will deliver callbacks - we just wait for the result + // Note: This works because HTTP requests are handled in a separate thread, + // while the main executor processes ROS callbacks const auto timeout = std::chrono::duration(timeout_sec); - const auto start_time = std::chrono::steady_clock::now(); - - while (!received.load()) { - auto elapsed = std::chrono::steady_clock::now() - start_time; - if (elapsed >= timeout) { - RCLCPP_DEBUG(node_->get_logger(), "sample_topic: Timeout waiting for message on '%s'", topic_name.c_str()); - result.has_data = false; - return result; - } - - // Spin with small timeout to allow checking received flag - rclcpp::spin_some(node_->get_node_base_interface()); + auto future_status = message_future.wait_for(timeout); - if (!received.load()) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } + if (future_status == std::future_status::timeout) { + RCLCPP_DEBUG(node_->get_logger(), "sample_topic: Timeout waiting for message on '%s'", topic_name.c_str()); + result.has_data = false; + return result; } // Deserialize message using JsonSerializer