Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 65 additions & 5 deletions packages/core-bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/core-bridge/sdk-core
Submodule sdk-core updated 102 files
41 changes: 16 additions & 25 deletions packages/core-bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,12 +656,10 @@ where
mod config {
use std::collections::HashMap;

use anyhow::Context as _;

use temporalio_client::HttpConnectProxyOptions;
use temporalio_sdk_core::{
ClientOptions as CoreClientOptions, ClientOptionsBuilder,
ClientTlsConfig as CoreClientTlsConfig, TlsConfig as CoreTlsConfig, Url,
ClientOptions as CoreClientOptions, ClientTlsOptions as CoreClientTlsOptions,
TlsOptions as CoreTlsOptions, Url,
};

use bridge_macros::TryFromJs;
Expand All @@ -673,7 +671,7 @@ mod config {
target_url: Url,
client_name: String,
client_version: String,
tls: Option<TlsConfig>,
tls: Option<TlsOptions>,
http_connect_proxy: Option<HttpConnectProxy>,
headers: Option<HashMap<String, MetadataValue>>,
api_key: Option<String>,
Expand All @@ -682,14 +680,14 @@ mod config {

#[derive(Debug, Clone, TryFromJs)]
#[allow(clippy::struct_field_names)]
struct TlsConfig {
struct TlsOptions {
domain: Option<String>,
server_root_ca_cert: Option<Vec<u8>>,
client_tls_config: Option<TlsConfigClientCertPair>,
client_tls_options: Option<TlsOptionsClientCertPair>,
}

#[derive(Debug, Clone, TryFromJs)]
struct TlsConfigClientCertPair {
struct TlsOptionsClientCertPair {
client_cert: Vec<u8>,
client_private_key: Vec<u8>,
}
Expand All @@ -709,42 +707,35 @@ mod config {
impl TryInto<CoreClientOptions> for ClientOptions {
type Error = BridgeError;
fn try_into(self) -> Result<CoreClientOptions, Self::Error> {
let mut builder = ClientOptionsBuilder::default();

if let Some(tls) = self.tls {
builder.tls_cfg(tls.into());
}

let (ascii_headers, bin_headers) = partition_headers(self.headers);

let client_options = builder
let client_options = CoreClientOptions::builder()
.target_url(self.target_url)
.client_name(self.client_name)
.client_version(self.client_version)
// tls_cfg -- above
.http_connect_proxy(self.http_connect_proxy.map(Into::into))
.headers(ascii_headers)
.binary_headers(bin_headers)
.api_key(self.api_key)
.maybe_tls_options(self.tls.map(Into::into))
.maybe_http_connect_proxy(self.http_connect_proxy.map(Into::into))
.maybe_headers(ascii_headers)
.maybe_binary_headers(bin_headers)
.maybe_api_key(self.api_key)
.disable_error_code_metric_tags(self.disable_error_code_metric_tags)
// identity -- skipped: will be set on worker
// retry_config -- skipped: worker overrides anyway
// override_origin -- skipped: will default to tls_cfg.domain
// keep_alive -- skipped: defaults to true; is there any reason to disable this?
// skip_get_system_info -- skipped: defaults to false; is there any reason to set this?
.build()
.context("Invalid Client options")?;
.build();

Ok(client_options)
}
}

impl From<TlsConfig> for CoreTlsConfig {
fn from(val: TlsConfig) -> Self {
impl From<TlsOptions> for CoreTlsOptions {
fn from(val: TlsOptions) -> Self {
Self {
domain: val.domain,
server_root_ca_cert: val.server_root_ca_cert,
client_tls_config: val.client_tls_config.map(|pair| CoreClientTlsConfig {
client_tls_options: val.client_tls_options.map(|pair| CoreClientTlsOptions {
client_cert: pair.client_cert,
client_private_key: pair.client_private_key,
}),
Expand Down
31 changes: 23 additions & 8 deletions packages/core-bridge/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ pub struct Runtime {
pub fn runtime_new(
bridge_options: config::RuntimeOptions,
) -> BridgeResult<OpaqueOutboundHandle<Runtime>> {
let (telemetry_options, metrics_options, logging_options) = bridge_options.try_into()?;
let (telemetry_options, metrics_options, logging_options, worker_heartbeat_interval_millis) =
bridge_options.try_into()?;

// Create core runtime which starts tokio multi-thread runtime
let runtime_options = RuntimeOptionsBuilder::default()
.telemetry_options(telemetry_options)
.heartbeat_interval(None)
.heartbeat_interval(worker_heartbeat_interval_millis.map(Duration::from_millis))
.build()
.context("Failed to build runtime options")?;
let mut core_runtime = CoreRuntime::new(runtime_options, TokioRuntimeBuilder::default())
Expand Down Expand Up @@ -266,6 +267,7 @@ mod config {
log_exporter: LogExporterOptions,
telemetry: TelemetryOptions,
metrics_exporter: Option<MetricsExporterOptions>,
worker_heartbeat_interval_millis: Option<u64>,
}

#[derive(Debug, Clone, TryFromJs)]
Expand Down Expand Up @@ -322,6 +324,7 @@ mod config {
CoreTelemetryOptions,
Option<super::BridgeMetricsExporter>,
super::BridgeLogExporter,
Option<u64>,
)> for RuntimeOptions
{
type Error = BridgeError;
Expand All @@ -331,8 +334,16 @@ mod config {
CoreTelemetryOptions,
Option<super::BridgeMetricsExporter>,
super::BridgeLogExporter,
Option<u64>,
)> {
let (telemetry_logger, log_exporter) = match self.log_exporter {
let Self {
log_exporter,
telemetry,
metrics_exporter,
worker_heartbeat_interval_millis,
} = self;

let (telemetry_logger, log_exporter) = match log_exporter {
LogExporterOptions::Console { filter } => (
CoreTelemetryLogger::Console { filter },
BridgeLogExporter::Console,
Expand All @@ -352,17 +363,21 @@ mod config {
let mut telemetry_options = TelemetryOptionsBuilder::default();
let telemetry_options = telemetry_options
.logging(telemetry_logger)
.metric_prefix(self.telemetry.metric_prefix)
.attach_service_name(self.telemetry.attach_service_name)
.metric_prefix(telemetry.metric_prefix)
.attach_service_name(telemetry.attach_service_name)
.build()
.context("Failed to build telemetry options")?;

let metrics_exporter = self
.metrics_exporter
let metrics_exporter = metrics_exporter
.map(std::convert::TryInto::try_into)
.transpose()?;

Ok((telemetry_options, metrics_exporter, log_exporter))
Ok((
telemetry_options,
metrics_exporter,
log_exporter,
worker_heartbeat_interval_millis,
))
}
}

Expand Down
45 changes: 41 additions & 4 deletions packages/core-bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ pub fn worker_complete_workflow_activation(
),
}
}
CompleteWfError::WorkflowNotEnabled => {
BridgeError::UnexpectedError(err.to_string())
}
})
})
}
Expand Down Expand Up @@ -225,6 +228,9 @@ pub fn worker_complete_activity_task(
field: None,
message: format!("Malformed Activity Completion: {reason:?}"),
},
CompleteActivityError::ActivityNotEnabled => {
BridgeError::UnexpectedError(err.to_string())
}
})
})
}
Expand Down Expand Up @@ -296,7 +302,7 @@ pub fn worker_complete_nexus_task(
.await
.map_err(|err| match err {
CompleteNexusError::NexusNotEnabled => {
BridgeError::UnexpectedError(format!("{err}"))
BridgeError::UnexpectedError(err.to_string())
}
CompleteNexusError::MalformedNexusCompletion { reason } => BridgeError::TypeError {
field: None,
Expand Down Expand Up @@ -463,9 +469,10 @@ impl MutableFinalize for HistoryForReplayTunnelHandle {}
////////////////////////////////////////////////////////////////////////////////////////////////////

mod config {
use std::collections::HashSet;
use std::{sync::Arc, time::Duration};

use temporalio_common::protos::temporal::api::enums::v1::VersioningBehavior as CoreVersioningBehavior;
use temporalio_common::protos::temporal::api::worker::v1::PluginInfo;
use temporalio_common::worker::{
ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind,
PollerBehavior as CorePollerBehavior, SlotKind, WorkerConfig, WorkerConfigBuilder,
Expand Down Expand Up @@ -499,14 +506,15 @@ mod config {
workflow_task_poller_behavior: PollerBehavior,
activity_task_poller_behavior: PollerBehavior,
nexus_task_poller_behavior: PollerBehavior,
enable_non_local_activities: bool,
task_types: WorkerTaskTypes,
sticky_queue_schedule_to_start_timeout: Duration,
max_cached_workflows: usize,
max_heartbeat_throttle_interval: Duration,
default_heartbeat_throttle_interval: Duration,
max_activities_per_second: Option<f64>,
max_task_queue_activities_per_second: Option<f64>,
shutdown_grace_time: Option<Duration>,
plugins: Vec<String>,
Copy link
Member

@cretz cretz Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider deduping plugins if we don't already. Depending on plugin implementation, it may be very normal to configure multiple of the same plugin on the same worker (maybe each has different settings).

This should be done in every SDK IMO too if we don't already (could consider making that Vec a Set on the Core side).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, a set would ensure deduping. Created temporalio/sdk-core#1072

}

#[derive(TryFromJs)]
Expand Down Expand Up @@ -540,6 +548,26 @@ mod config {
AutoUpgrade,
}

#[derive(TryFromJs)]
#[allow(clippy::struct_excessive_bools)]
pub struct WorkerTaskTypes {
enable_workflows: bool,
enable_local_activities: bool,
enable_remote_activities: bool,
enable_nexus: bool,
}

impl From<&WorkerTaskTypes> for temporalio_common::worker::WorkerTaskTypes {
fn from(t: &WorkerTaskTypes) -> Self {
Self {
enable_workflows: t.enable_workflows,
enable_local_activities: t.enable_local_activities,
enable_remote_activities: t.enable_remote_activities,
enable_nexus: t.enable_nexus,
}
}
}

impl BridgeWorkerOptions {
pub(crate) fn into_core_config(self) -> Result<WorkerConfig, WorkerConfigBuilderError> {
// Set all other options
Expand All @@ -566,14 +594,23 @@ mod config {
.workflow_task_poller_behavior(self.workflow_task_poller_behavior)
.activity_task_poller_behavior(self.activity_task_poller_behavior)
.nexus_task_poller_behavior(self.nexus_task_poller_behavior)
.no_remote_activities(!self.enable_non_local_activities)
.task_types(&self.task_types)
.sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout)
.max_cached_workflows(self.max_cached_workflows)
.max_heartbeat_throttle_interval(self.max_heartbeat_throttle_interval)
.default_heartbeat_throttle_interval(self.default_heartbeat_throttle_interval)
.max_task_queue_activities_per_second(self.max_task_queue_activities_per_second)
.max_worker_activities_per_second(self.max_activities_per_second)
.graceful_shutdown_period(self.shutdown_grace_time)
.plugins(
self.plugins
.into_iter()
.map(|name| PluginInfo {
name,
version: String::new(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own knowledge, is version from the past or not yet implemented?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I don't know. haha. The API was originally implemented with version, but not sure what the intent was. Will follow up on this.

})
.collect::<HashSet<_>>(),
)
.build()
}
}
Expand Down
Loading