CommonLibrary/Telemetry/
EmitOTLPSpan.rs1#![allow(non_snake_case)]
2
3use std::{
10 collections::hash_map::DefaultHasher,
11 hash::{Hash, Hasher},
12 sync::{
13 OnceLock,
14 atomic::{AtomicBool, Ordering},
15 },
16 time::{SystemTime, UNIX_EPOCH},
17};
18
19use crate::Telemetry::{Client, IsAllowed};
20
21static OTLP_AVAILABLE:AtomicBool = AtomicBool::new(true);
22
23static OTLP_TRACE_ID:OnceLock<String> = OnceLock::new();
24
25fn NowNano() -> u64 {
26 SystemTime::now()
27 .duration_since(UNIX_EPOCH)
28 .map(|D| D.as_nanos() as u64)
29 .unwrap_or(0)
30}
31
32fn TraceId() -> &'static str {
33 OTLP_TRACE_ID.get_or_init(|| {
34 let mut H = DefaultHasher::new();
35 std::process::id().hash(&mut H);
36 NowNano().hash(&mut H);
37 format!("{:032x}", H.finish() as u128)
38 })
39}
40
41fn RandU64() -> u64 {
42 let mut H = DefaultHasher::new();
43
44 std::thread::current().id().hash(&mut H);
45
46 NowNano().hash(&mut H);
47
48 H.finish()
49}
50
51fn ParseEndpoint(Endpoint:&str) -> (String, String) {
52 let WithoutScheme = Endpoint
53 .strip_prefix("http://")
54 .or_else(|| Endpoint.strip_prefix("https://"))
55 .unwrap_or(Endpoint);
56
57 let (HostPort, Path) = match WithoutScheme.split_once('/') {
58 Some((HP, Rest)) => (HP.to_string(), format!("/{}", Rest.trim_start_matches('/'))),
59
60 None => (WithoutScheme.to_string(), "/v1/traces".to_string()),
61 };
62
63 let PathFinal = if Path == "/" { "/v1/traces".to_string() } else { Path };
64
65 (HostPort, PathFinal)
66}
67
68pub fn Fn(Name:&str, StartNano:u64, EndNano:u64, Attributes:&[(&str, &str)]) {
71 if !IsAllowed::OTLP() {
72 return;
73 }
74
75 if !OTLP_AVAILABLE.load(Ordering::Relaxed) {
76 return;
77 }
78
79 let Configuration = IsAllowed::Cached();
80
81 let TierStr = Client::TIER.get().map(|T| T.AsStr()).unwrap_or("common");
82
83 let SpanId = format!("{:016x}", RandU64());
84
85 let TraceIdString = TraceId().to_string();
86
87 let SpanName = Name.to_string();
88
89 let AttributesJson:Vec<String> = Attributes
90 .iter()
91 .map(|(K, V)| {
92 format!(
93 r#"{{"key":"{}","value":{{"stringValue":"{}"}}}}"#,
94 K,
95 V.replace('\\', "\\\\").replace('"', "\\\"")
96 )
97 })
98 .collect();
99
100 let IsError = SpanName.contains("error");
101
102 let StatusCode = if IsError { 2 } else { 1 };
103
104 let ServiceName = format!("land-editor-{}", TierStr);
105
106 let Payload = format!(
107 concat!(
108 r#"{{"resourceSpans":[{{"resource":{{"attributes":["#,
109 r#"{{"key":"service.name","value":{{"stringValue":"{}"}}}},"#,
110 r#"{{"key":"service.version","value":{{"stringValue":"0.0.1"}}}},"#,
111 r#"{{"key":"land.tier","value":{{"stringValue":"{}"}}}}"#,
112 r#"]}},"scopeSpans":[{{"scope":{{"name":"land.{}","version":"1.0.0"}},"#,
113 r#""spans":[{{"traceId":"{}","spanId":"{}","name":"{}","kind":1,"#,
114 r#""startTimeUnixNano":"{}","endTimeUnixNano":"{}","#,
115 r#""attributes":[{}],"status":{{"code":{}}}}}]}}]}}]}}"#,
116 ),
117 ServiceName,
118 TierStr,
119 TierStr,
120 TraceIdString,
121 SpanId,
122 SpanName,
123 StartNano,
124 EndNano,
125 AttributesJson.join(","),
126 StatusCode,
127 );
128
129 let (HostAddress, PathSegment) = ParseEndpoint(&Configuration.OTLPEndpoint);
130
131 std::thread::spawn(move || {
132 use std::{
133 io::{Read as IoRead, Write as IoWrite},
134 net::TcpStream,
135 time::Duration,
136 };
137
138 let Ok(SocketAddress) = HostAddress.parse() else {
139 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
140 return;
141 };
142 let Ok(mut Stream) = TcpStream::connect_timeout(&SocketAddress, Duration::from_millis(200)) else {
143 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
144 return;
145 };
146 let _ = Stream.set_write_timeout(Some(Duration::from_millis(200)));
147 let _ = Stream.set_read_timeout(Some(Duration::from_millis(200)));
148
149 let HttpReq = format!(
150 "POST {} HTTP/1.1\r\nHost: {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: \
151 close\r\n\r\n",
152 PathSegment,
153 HostAddress,
154 Payload.len()
155 );
156 if Stream.write_all(HttpReq.as_bytes()).is_err() {
157 return;
158 }
159 if Stream.write_all(Payload.as_bytes()).is_err() {
160 return;
161 }
162 let mut Buf = [0u8; 32];
163 let _ = Stream.read(&mut Buf);
164 if !(Buf.starts_with(b"HTTP/1.1 2") || Buf.starts_with(b"HTTP/1.0 2")) {
165 OTLP_AVAILABLE.store(false, Ordering::Relaxed);
166 }
167 });
168}
169
170pub fn NowNanoPub() -> u64 { NowNano() }