Source code for codinglab.experiment

"""
Experiment framework for the coding experiments library.

This module provides the infrastructure for running coding experiments,
including experiment runners and result data structures. It enables
systematic evaluation of different coding schemes and channel models.
"""

# Module metadata
__author__ = "Mikhail Mikhailov"
__license__ = "MIT"
__version__ = "0.1.0"
__all__ = ["ExperimentResult", "ExperimentRunner"]

import time
from typing import Any, Dict
from dataclasses import dataclass, field
from .interfaces import Sender, Channel, SourceChar, ChannelChar
from .receivers.tracking import TrackingReceiver, TransmissionStats


[docs] @dataclass class ExperimentResult: """ Results from a coding experiment run. This class encapsulates all data collected during an experiment, including statistics, timing information, and any additional metadata. It provides computed properties for common analysis and can be serialized for storage or further processing. """ stats: TransmissionStats """Transmission statistics collected during the experiment.""" start_time: float """Experiment start time in seconds since epoch.""" end_time: float """Experiment end time in seconds since epoch.""" metadata: Dict[str, Any] = field(default_factory=dict) """Additional experiment metadata as key-value pairs.""" @property def duration(self) -> float: """ Calculate the total duration of the experiment. Returns: Experiment duration in seconds """ return self.end_time - self.start_time
[docs] def summary(self) -> str: """ Generate a human-readable summary of experiment results. Returns: Formatted string with key experiment metrics """ return ( f"Experiment Summary:\n" f" Duration: {self.duration:.3f} seconds\n" f" Messages: {self.stats.total_messages}\n" f" Success Rate: {self.stats.success_rate:.2%}\n" f" Source Symbols: {self.stats.total_source_symbols}\n" f" Channel Symbols: {self.stats.total_channel_symbols}\n" f" Compression Ratio: {self.stats.compression_ratio:.3f}\n" f" Avg. Code Length: {self.stats.average_code_len:.3f}\n" f" Avg. Processing Time: {self.stats.avg_message_time:.6f} s/msg" )
[docs] class ExperimentRunner: """ Orchestrates and runs coding experiments. This class coordinates the interaction between sender, channel, and receiver components to run complete transmission experiments. It manages the data flow, timing, and result collection. """
[docs] def __init__( self, sender: Sender[SourceChar, ChannelChar], channel: Channel[ChannelChar], receiver: TrackingReceiver[SourceChar, ChannelChar], ) -> None: """ Initialize the experiment runner with component instances. Args: sender: Sender instance for generating and encoding messages channel: Channel instance for transmitting messages receiver: TrackingReceiver instance for collecting stats Raises: TypeError: If receiver is not a TrackingReceiver instance """ if not isinstance(receiver, TrackingReceiver): raise TypeError("Receiver must be a TrackingReceiver instance") self.sender = sender """Sender instance for generating and encoding messages.""" self.channel = channel """Channel instance for transmitting messages.""" self.receiver = receiver """TrackingReceiver instance for receiving and decoding messages."""
[docs] def run( self, num_messages: int = 100, ) -> ExperimentResult: """ Run a coding experiment with the configured components. This method executes the complete transmission pipeline: 1. Generate and encode messages using the sender 2. Transmit messages through the channel 3. Receive, decode, and track messages using the receiver 4. Collect and return experiment results Args: num_messages: Number of messages to transmit (default: 100) Returns: ExperimentResult object containing all experiment data Raises: ValueError: If num_messages <= 0 """ if num_messages <= 0: raise ValueError(f"num_messages must be positive, got {num_messages}") start_time = time.time() # Reset receiver statistics before starting self.receiver.reset_stats() # Execute the transmission pipeline messages = self.sender.message_stream(num_messages) transmitted = self.channel.transmit_stream(messages) # Process messages and track progress results = [] for i, success in enumerate(self.receiver.receive_stream(transmitted)): results.append(success) end_time = time.time() # Create and return experiment results return ExperimentResult( stats=self.receiver.get_stats(), start_time=start_time, end_time=end_time, metadata={ "num_messages": num_messages, "success_count": sum(results), "failure_count": len(results) - sum(results), "components": { "sender_type": type(self.sender).__name__, "channel_type": type(self.channel).__name__, "receiver_type": type(self.receiver).__name__, }, }, )