Skip to main content

CommonLibrary/Transport/
TransportStrategy.rs

1#![allow(non_snake_case, non_camel_case_types, non_upper_case_globals)]
2//! # TransportStrategy Trait
3//!
4//! Defines the core trait that all transport implementations must implement.
5//! This trait provides a unified, transport-agnostic interface for sending
6//! requests and notifications, with optional event streaming capabilities.
7//!
8//! All transports must be async and thread-safe (`Send + Sync`).
9
10use async_trait::async_trait;
11use serde::{Deserialize, Serialize};
12
13use super::{
14	Common::TransportType,
15	TransportConfig::TransportConfig,
16	TransportError::TransportError,
17	UnifiedRequest::UnifiedRequest,
18	UnifiedResponse::UnifiedResponse,
19};
20
21/// Core transport strategy trait.
22///
23/// This trait defines the essential operations that any transport mechanism
24/// must provide. Components interact with transports through this trait,
25/// allowing them to be transport-agnostic.
26#[async_trait]
27pub trait TransportStrategy: Send + Sync {
28	/// Establishes a connection to the transport endpoint.
29	async fn Connect(&mut self) -> Result<(), TransportError>;
30
31	/// Closes the connection and releases any associated resources.
32	async fn Disconnect(&mut self) -> Result<(), TransportError>;
33
34	/// Sends a request and waits for a response.
35	async fn SendRequest(&mut self, Request:UnifiedRequest) -> Result<UnifiedResponse, TransportError>;
36
37	/// Sends a notification (fire-and-forget message).
38	async fn SendNotification(&mut self, Notification:UnifiedRequest) -> Result<(), TransportError>;
39
40	/// Creates a stream of events from the transport.
41	fn StreamEvents(&self)
42	-> std::result::Result<futures::stream::BoxStream<'static, UnifiedResponse>, TransportError>;
43
44	/// Checks if the transport is currently connected.
45	fn IsConnected(&self) -> bool;
46
47	/// Returns the estimated round-trip latency in milliseconds.
48	fn LatencyMilliseconds(&self) -> u64;
49
50	/// Returns the type of transport (gRPC, IPC, WASM, etc.).
51	fn TransportKind(&self) -> TransportType;
52
53	/// Returns the transport's configuration.
54	fn Configuration(&self) -> &TransportConfig;
55
56	/// Checks if the transport supports bidirectional streaming.
57	fn SupportsStreaming(&self) -> bool;
58
59	/// Returns the transport's current capabilities and limits.
60	fn Capabilities(&self) -> TransportCapabilities;
61
62	/// Collects and returns current performance metrics.
63	fn Metrics(&self) -> TransportMetrics;
64}
65
66/// Transport capabilities and limits.
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub struct TransportCapabilities {
69	/// Maximum size of a single message in bytes.
70	pub MaximumMessageSize:usize,
71
72	/// Whether the transport supports request-response pattern.
73	pub SupportsRequestResponse:bool,
74
75	/// Whether the transport supports server-side streaming.
76	pub SupportsServerStreaming:bool,
77
78	/// Whether the transport supports client-side streaming.
79	pub SupportsClientStreaming:bool,
80
81	/// Whether the transport supports bidirectional streaming.
82	pub SupportsBidirectionalStreaming:bool,
83
84	/// Whether the transport supports broadcast/notifications.
85	pub SupportsNotifications:bool,
86
87	/// Estimated maximum concurrent requests/connections.
88	pub MaximumConcurrent:usize,
89
90	/// Whether the transport requires network connectivity.
91	pub RequiresNetwork:bool,
92
93	/// Whether the transport supports encryption/TLS.
94	pub SupportsEncryption:bool,
95
96	/// Whether the transport supports compression.
97	pub SupportsCompression:bool,
98}
99
100impl Default for TransportCapabilities {
101	fn default() -> Self {
102		Self {
103			MaximumMessageSize:1024 * 1024, // 1MB
104			SupportsRequestResponse:true,
105
106			SupportsServerStreaming:false,
107
108			SupportsClientStreaming:false,
109
110			SupportsBidirectionalStreaming:false,
111
112			SupportsNotifications:true,
113
114			MaximumConcurrent:100,
115
116			RequiresNetwork:false,
117
118			SupportsEncryption:false,
119
120			SupportsCompression:false,
121		}
122	}
123}
124
125/// Transport performance metrics.
126#[derive(Debug, Clone, Default)]
127pub struct TransportMetrics {
128	/// Total number of requests sent (including retries).
129	pub RequestsTotal:u64,
130
131	/// Total number of successful requests (2xx/OK responses).
132	pub RequestsSuccessful:u64,
133
134	/// Total number of failed requests (excludes timeouts/retries).
135	pub RequestsFailed:u64,
136
137	/// Total number of notifications sent.
138	pub NotificationsSent:u64,
139
140	/// Total number of connections established (includes reconnections).
141	pub ConnectionsEstablished:u64,
142
143	/// Total number of connection failures.
144	pub ConnectionFailures:u64,
145
146	/// Total bytes sent (compressed size if compression enabled).
147	pub BytesSent:u64,
148
149	/// Total bytes received (compressed size if compression enabled).
150	pub BytesReceived:u64,
151
152	/// Counter for circuit breaker state changes.
153	pub CircuitBreakerState:u32,
154
155	/// Histogram of request latencies in milliseconds (p50, p95, p99).
156	/// Stored as (count, sum, sum of squares) for online variance calculation.
157	pub LatencyMillisecondsHistogram:Option<(u64, f64, f64)>,
158
159	/// Current active connections (gauge).
160	pub ActiveConnections:u32,
161
162	/// Current pending requests (gauge).
163	pub PendingRequests:u32,
164}
165
166impl TransportMetrics {
167	/// Creates a new, empty metrics container.
168	pub fn New() -> Self { Self::default() }
169
170	/// Resets all cumulative metrics to zero.
171	pub fn Reset(&mut self) { *self = Self::New(); }
172
173	/// Computes the success rate as a percentage (0-100).
174	pub fn SuccessRate(&self) -> Option<f64> {
175		let Total = self.RequestsTotal;
176
177		if Total == 0 {
178			None
179		} else {
180			Some((self.RequestsSuccessful as f64 / Total as f64) * 100.0)
181		}
182	}
183
184	/// Computes the average request latency in milliseconds.
185	pub fn AverageLatency(&self) -> Option<f64> {
186		let (Count, Sum, _) = self.LatencyMillisecondsHistogram?;
187
188		if Count == 0 { None } else { Some(Sum / Count as f64) }
189	}
190
191	/// Computes the 95th percentile latency from the histogram.
192	pub fn LatencyPercentile95(&self) -> Option<f64> {
193		let (Count, Mean, SumSquared) = self.LatencyMillisecondsHistogram?;
194
195		if Count < 20 {
196			return None;
197		}
198
199		let Variance = (SumSquared / Count as f64) - (Mean * Mean);
200
201		let StandardDeviation = Variance.sqrt();
202
203		Some(Mean + 1.645 * StandardDeviation)
204	}
205
206	/// Records a request latency sample.
207	pub fn RecordLatency(&mut self, LatencyMilliseconds:f64) {
208		let (Count, Sum, SumSquared) = self.LatencyMillisecondsHistogram.get_or_insert((0, 0.0, 0.0));
209
210		*Count += 1;
211		*Sum += LatencyMilliseconds;
212		*SumSquared += LatencyMilliseconds * LatencyMilliseconds;
213	}
214
215	/// Increments the RequestsTotal and RequestsSuccessful counters.
216	pub fn IncrementRequestSuccess(&mut self) {
217		self.RequestsTotal += 1;
218
219		self.RequestsSuccessful += 1;
220	}
221
222	/// Increments the RequestsTotal and RequestsFailed counters.
223	pub fn IncrementRequestFailure(&mut self) {
224		self.RequestsTotal += 1;
225
226		self.RequestsFailed += 1;
227	}
228
229	/// Updates the circuit breaker state.
230	pub fn SetCircuitBreakerState(&mut self, State:CircuitBreakerState) {
231		let StateCode = match State {
232			CircuitBreakerState::Closed => 1,
233
234			CircuitBreakerState::Open => 0,
235
236			CircuitBreakerState::HalfOpen => 2,
237		};
238
239		let OldState = self.CircuitBreakerState;
240
241		self.CircuitBreakerState = (OldState & 0xFFFF_0000) | StateCode as u32;
242	}
243}
244
245/// Circuit breaker state.
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247pub enum CircuitBreakerState {
248	/// Circuit is closed; requests flow normally.
249	Closed,
250
251	/// Circuit is open; requests are rejected immediately.
252	Open,
253
254	/// Circuit is half-open; limited requests are allowed to test recovery.
255	HalfOpen,
256}
257
258/// Transport-specific error codes.
259#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
260#[repr(u16)]
261pub enum TransportErrorCode {
262	/// Connection to endpoint failed or was lost.
263	ConnectionFailed = 100,
264
265	/// Operation timed out.
266	Timeout = 101,
267
268	/// Target endpoint not found/service unavailable.
269	NotFound = 102,
270
271	/// Invalid request format or parameters.
272	InvalidRequest = 103,
273
274	/// Remote endpoint returned an application error.
275	RemoteError = 104,
276
277	/// Message too large for transport.
278	MessageTooLarge = 105,
279
280	/// Encryption/decryption failed.
281	EncryptionError = 106,
282
283	/// Serialization/deserialization failed.
284	SerializationError = 107,
285
286	/// Authentication/authorization failed.
287	Unauthorized = 108,
288
289	/// Rate limit exceeded.
290	RateLimited = 109,
291
292	/// Feature not supported by this transport.
293	NotSupported = 110,
294
295	/// Internal transport error (bug, corrupted state).
296	InternalError = 111,
297
298	/// Circuit breaker is open; request rejected.
299	CircuitBreakerOpen = 112,
300
301	/// Stream already in use or closed.
302	StreamError = 113,
303
304	/// Configuration error (invalid settings).
305	ConfigurationError = 114,
306}
307
308impl TransportErrorCode {
309	/// Returns `true` if this error code is retryable.
310	pub fn IsRetryable(&self) -> bool {
311		matches!(
312			self,
313			TransportErrorCode::ConnectionFailed
314				| TransportErrorCode::Timeout
315				| TransportErrorCode::RateLimited
316				| TransportErrorCode::RemoteError
317		)
318	}
319
320	/// Returns the recommended retry delay in milliseconds for this error.
321	pub fn RecommendedRetryDelayMilliseconds(&self) -> u64 {
322		match self {
323			TransportErrorCode::ConnectionFailed => 1000,
324
325			TransportErrorCode::Timeout => 500,
326
327			TransportErrorCode::RateLimited => 2000,
328
329			TransportErrorCode::RemoteError => 300,
330
331			_ => 0,
332		}
333	}
334}
335
336#[cfg(test)]
337mod tests {
338
339	use super::*;
340
341	#[test]
342	fn TestRetryableErrorCodes() {
343		assert!(TransportErrorCode::ConnectionFailed.IsRetryable());
344
345		assert!(TransportErrorCode::Timeout.IsRetryable());
346
347		assert!(TransportErrorCode::RateLimited.IsRetryable());
348
349		assert!(!TransportErrorCode::InvalidRequest.IsRetryable());
350
351		assert!(!TransportErrorCode::NotFound.IsRetryable());
352	}
353
354	#[test]
355	fn TestErrorRecommendedDelays() {
356		assert_eq!(TransportErrorCode::ConnectionFailed.RecommendedRetryDelayMilliseconds(), 1000);
357
358		assert_eq!(TransportErrorCode::Timeout.RecommendedRetryDelayMilliseconds(), 500);
359
360		assert_eq!(TransportErrorCode::RateLimited.RecommendedRetryDelayMilliseconds(), 2000);
361
362		assert_eq!(TransportErrorCode::InvalidRequest.RecommendedRetryDelayMilliseconds(), 0);
363	}
364}