Timing Handler
The Timing Handler provides actors with time-related capabilities, including delays, periodic scheduling, and timeout management. It enables actors to control the timing of their operations while maintaining Theater's state verification model.
Overview
The Timing Handler implements the ntwk:theater/timing
interface, enabling actors to:
- Introduce controlled delays in their execution
- Implement timeout patterns for operations
- Enforce rate limits and throttling
- Create periodic tasks and scheduled operations
Configuration
To use the Timing Handler, add it to your actor's manifest:
[[handlers]]
type = "timing"
config = {
max_sleep_duration = 3600000, # Maximum sleep duration in milliseconds (1 hour)
min_sleep_duration = 1 # Minimum sleep duration in milliseconds
}
Configuration options:
max_sleep_duration
: (Optional) Maximum allowed sleep duration in milliseconds, defaults to 3600000 (1 hour)min_sleep_duration
: (Optional) Minimum allowed sleep duration in milliseconds, defaults to 1
Interface
The Timing Handler is defined using the following WIT interface:
interface timing {
// Sleep for the specified duration (in milliseconds)
sleep: func(duration-ms: u64) -> result<_, string>;
// Get current timestamp (milliseconds since epoch)
now: func() -> u64;
// Get high-resolution time for performance measurement (in nanoseconds)
performance-now: func() -> u64;
}
Basic Timing Operations
Sleep
The sleep
function pauses actor execution for a specified duration:
#![allow(unused)] fn main() { // Sleep for 1 second match timing::sleep(1000) { Ok(_) => { println!("Resumed after 1 second"); }, Err(error) => { println!("Sleep operation failed: {}", error); } } }
Note that the sleep duration must fall within the configured min_sleep_duration
and max_sleep_duration
range. Attempting to sleep for longer than the maximum or shorter than the minimum will result in an error.
Current Time
The now
function returns the current time in milliseconds since the Unix epoch:
#![allow(unused)] fn main() { let current_time = timing::now(); println!("Current time: {} ms", current_time); }
Performance Timing
The performance-now
function provides high-resolution timing for performance measurement:
#![allow(unused)] fn main() { // Measure operation duration let start = timing::performance_now(); // Perform operation perform_expensive_operation(); let end = timing::performance_now(); let duration_ns = end - start; let duration_ms = duration_ns / 1_000_000; println!("Operation took {} ms", duration_ms); }
Common Patterns
Implementing Timeouts
#![allow(unused)] fn main() { // Perform an operation with a timeout fn perform_with_timeout<F, T>(operation: F, timeout_ms: u64) -> Result<T, String> where F: FnOnce() -> Result<T, String>, { // Create a oneshot channel for the result let (tx, rx) = tokio::sync::oneshot::channel(); // Spawn a task to perform the operation tokio::spawn(async move { match operation() { Ok(result) => { let _ = tx.send(Ok(result)); }, Err(err) => { let _ = tx.send(Err(err)); } } }); // Wait for the result or timeout match tokio::time::timeout(std::time::Duration::from_millis(timeout_ms), rx).await { Ok(result) => result.unwrap(), Err(_) => Err("Operation timed out".to_string()), } } // Usage let result = perform_with_timeout(|| { // Perform potentially long-running operation perform_api_call() }, 5000); // 5 second timeout }
Rate Limiting
#![allow(unused)] fn main() { // Simple rate limiter struct RateLimiter { last_operation: u64, min_interval_ms: u64, } impl RateLimiter { fn new(min_interval_ms: u64) -> Self { Self { last_operation: 0, min_interval_ms, } } fn check_and_update(&mut self) -> Result<(), String> { let now = timing::now(); let elapsed = now - self.last_operation; if self.last_operation == 0 || elapsed >= self.min_interval_ms { self.last_operation = now; Ok(()) } else { let wait_time = self.min_interval_ms - elapsed; timing::sleep(wait_time)?; self.last_operation = timing::now(); Ok(()) } } } // Usage let mut rate_limiter = RateLimiter::new(100); // 100ms between operations for item in items { // Ensure we don't exceed rate limit rate_limiter.check_and_update()?; // Process item process_item(item)?; } }
Periodic Tasks
#![allow(unused)] fn main() { // Run a task periodically fn run_periodically<F>(task: F, interval_ms: u64, max_iterations: Option<usize>) -> Result<(), String> where F: Fn() -> Result<(), String>, { let mut iterations = 0; loop { // Run the task task()?; // Check if we've reached the maximum iterations if let Some(max) = max_iterations { iterations += 1; if iterations >= max { break; } } // Sleep until the next interval timing::sleep(interval_ms)?; } Ok(()) } // Usage run_periodically(|| { // Periodic task logic collect_metrics() }, 5000, Some(10))?; // Run every 5 seconds, 10 times }
State Chain Integration
All timing operations are recorded in the actor's state chain, creating a verifiable history. The chain events include:
-
TimingOperation: Records details of timing operations:
- Operation type (sleep, now, performance-now)
- Duration (for sleep operations)
- Timestamp
-
Error: Records any errors that occur:
- Operation type
- Error message
This integration ensures that all timing activities are:
- Traceable
- Verifiable
- Reproducible
- Auditable
Error Handling
The Timing Handler provides error information for various failure scenarios:
- Duration Errors: When sleep duration is outside allowed range
- Operation Errors: When timing operations fail
- Resource Errors: When system resources are unavailable
Security Considerations
When using the Timing Handler, consider the following security aspects:
- Sleep Limits: The configuration enforces limits on sleep durations
- Resource Consumption: Long or frequent sleeps may impact system resources
- Timing Attacks: Be aware of potential timing side-channel attacks
Implementation Details
Under the hood, the Timing Handler:
- Uses the Tokio runtime for asynchronous sleep operations
- Enforces configurable minimum and maximum sleep durations
- Records all operations in the state chain
- Provides consistent time sources across the actor system
Performance Considerations
- Sleep Overhead: There is a small overhead for each sleep operation
- Time Resolution: Time functions have platform-dependent resolution
- Resource Usage: Excessive sleep operations may impact system performance
Best Practices
- Error Handling: Always handle errors from timing functions
- Sleep Duration: Use reasonable sleep durations
- Batch Processing: Consider batching operations instead of sleeping between each
- Timeouts: Implement timeouts for operations that may not complete
- Rate Limiting: Use rate limiting for external API calls
Examples
Retry Logic
#![allow(unused)] fn main() { // Retry an operation with exponential backoff fn retry_with_backoff<F, T>( operation: F, initial_backoff_ms: u64, max_backoff_ms: u64, max_retries: usize ) -> Result<T, String> where F: Fn() -> Result<T, String>, { let mut backoff = initial_backoff_ms; let mut attempts = 0; loop { match operation() { Ok(result) => return Ok(result), Err(error) => { attempts += 1; if attempts >= max_retries { return Err(format!("Operation failed after {} attempts: {}", attempts, error)); } // Log the failure and retry plan println!("Attempt {} failed: {}. Retrying in {} ms", attempts, error, backoff); // Wait before next attempt timing::sleep(backoff)?; // Exponential backoff with jitter let jitter = (backoff as f64 * 0.1 * rand::random::<f64>()) as u64; backoff = std::cmp::min(backoff * 2 + jitter, max_backoff_ms); } } } } // Usage let result = retry_with_backoff( || external_api_call("https://api.example.com/data"), 100, // Initial backoff of 100ms 30000, // Maximum backoff of 30 seconds 5 // Maximum 5 retry attempts )?; }
Debouncing
#![allow(unused)] fn main() { // Debounce a function call struct Debouncer { last_call: u64, timeout_ms: u64, } impl Debouncer { fn new(timeout_ms: u64) -> Self { Self { last_call: 0, timeout_ms, } } fn should_call(&mut self) -> bool { let now = timing::now(); if now - self.last_call >= self.timeout_ms { self.last_call = now; true } else { false } } } // Usage let mut input_debouncer = Debouncer::new(500); // 500ms debounce fn process_input(input: &str) { if input_debouncer.should_call() { // Process the input println!("Processing input: {}", input); } else { // Skip processing this input println!("Debounced input: {}", input); } } }
Measuring Request Latency
#![allow(unused)] fn main() { // Measure and log request latency fn measure_request_latency<F, T>(operation_name: &str, operation: F) -> Result<T, String> where F: FnOnce() -> Result<T, String>, { // Get start time in high resolution let start = timing::performance_now(); // Perform the operation let result = operation()?; // Calculate duration let end = timing::performance_now(); let duration_ns = end - start; let duration_ms = duration_ns / 1_000_000; // Log the latency println!("{} took {} ms", operation_name, duration_ms); // Record metric runtime::record_metric(&format!("{}_latency_ms", operation_name), duration_ms as f64)?; // Return the result Ok(result) } // Usage let user_data = measure_request_latency("fetch_user_data", || { api_client::get_user_data(user_id) })?; }
Related Topics
- Runtime Handler - For runtime information and operations
- Message Server Handler - For actor-to-actor communication
- State Management - For state chain integration