Source code for codinglab.receivers.tracking

"""
Tracking receiver implementation for the coding experiments library.

This module provides a receiver implementation that collects detailed
statistics about message reception, including success rates, processing
times, and compression metrics. It's designed for experimental analysis
and performance evaluation.
"""

# Module metadata
__author__ = "Mikhail Mikhailov"
__license__ = "MIT"
__version__ = "0.1.0"
__all__ = ["TransmissionStats", "TrackingReceiver"]

import time
from typing import Iterator, Optional
from dataclasses import dataclass
from ..interfaces import Receiver, Decoder
from ..types import (
    TransmissionLog,
    TransmissionEvent,
    SourceChar,
    ChannelChar,
    Message,
    TransmissionLogger,
)
from ..logger import NullLogger
from copy import copy


[docs] @dataclass class TransmissionStats: """ Statistics collected during message transmission and reception. This class tracks various metrics about the transmission process, including message counts, symbol counts, error rates, and timing information. It also provides computed properties for common performance indicators. """ total_messages: int = 0 """Total number of messages processed.""" successful_messages: int = 0 """Number of correctly decoded messages.""" decoded_messages: int = 0 """Number of decoded messages.""" failed_messages: int = 0 """Number of messages that failed to decode.""" total_source_symbols: int = 0 """Total number of source symbols processed.""" total_channel_symbols: int = 0 """Total number of channel symbols processed.""" decode_errors: int = 0 """Number of decoding errors encountered.""" validation_errors: int = 0 """Number of validation errors (decoded message not match original).""" total_processing_time: float = 0.0 """Total time spent processing messages (seconds).""" avg_message_time: float = 0.0 """Average processing time per message (seconds).""" @property def success_rate(self) -> float: """ Calculate the success rate of message transmission. Returns: Ratio of successful messages to total messages, or 0.0 if no messages have been processed """ if self.total_messages > 0: return self.successful_messages / self.total_messages return 0.0 @property def compression_ratio(self) -> float: """ Calculate the compression ratio achieved by encoding. Returns: Ratio of source symbols to channel symbols, or 0.0 if no channel symbols have been processed Note: Values greater than 1.0 indicate compression (fewer channel symbols than source symbols), while values less than 1.0 indicate expansion. """ if self.total_channel_symbols > 0: return self.total_source_symbols / self.total_channel_symbols return 0.0 @property def average_code_len(self) -> float: """ Calculate the average code length per source symbol. Returns: Average number of channel symbols per source symbol, or 0.0 if no source symbols have been processed """ if self.total_source_symbols > 0: return self.total_channel_symbols / self.total_source_symbols return 0.0
[docs] class TrackingReceiver(Receiver[SourceChar, ChannelChar]): """ Receiver that tracks detailed statistics about message reception. This receiver not only decodes messages but also collects extensive statistics about the reception process, including timing information, success rates, and compression metrics. It's ideal for experimental analysis and performance evaluation. Note: stats will not contain failed_messages and successful_messages if logger does not have method ``check_message`` as their value can be computed based only on knowing messages sent by sender. """
[docs] def __init__( self, decoder: Decoder[SourceChar, ChannelChar], logger: TransmissionLogger = NullLogger(), ) -> None: """ Initialize the tracking receiver. Args: decoder: Decoder instance for converting channel symbols back to source messages logger: Logger for recording transmission events (defaults to NullLogger) """ self._decoder = decoder """Decoder instance for channel-to-source symbol conversion.""" self._stats = TransmissionStats() """Statistics collected during message reception.""" self._last_message: Optional[Message[SourceChar]] = None """The last successfully decoded source message.""" self._logger = logger """Logger for recording transmission events."""
[docs] def receive_stream( self, messages: Iterator[Message[ChannelChar]] ) -> Iterator[bool]: """ Receive, decode, and track statistics for a stream of messages. This implementation decodes each message, tracks detailed statistics about the decoding process, logs events, and yields True for successful decoding or False for failures. Args: messages: Iterator of messages received from a channel Yields: True for each successfully received and decoded message, False for messages that failed to decode Note: All statistics are updated incrementally as messages are processed, allowing real-time monitoring of performance. """ for encoded_message in messages: start_time = time.time() # Log message reception self._logger.log( TransmissionLog( timestamp=time.time(), event=TransmissionEvent.RECEIVED, message=encoded_message, data={}, ) ) self._stats.total_messages += 1 try: # Decode the message decoded_data = self._decoder.decode(encoded_message.data) decoded_message = Message(id=encoded_message.id, data=decoded_data) # Update tracking state self._last_message = decoded_message # Update statistics for successful decoding self._stats.total_source_symbols += len(decoded_data) self._stats.total_channel_symbols += len(encoded_message.data) # Log successful decoding decode_log = TransmissionLog( timestamp=time.time(), event=TransmissionEvent.DECODED, message=decoded_message, data={ "encoded_length": len(encoded_message.data), "decoded_length": len(decoded_data), "encoded_message": encoded_message, }, ) self._logger.log(decode_log) self._stats.decoded_messages += 1 if hasattr(self._logger, "check_message"): if self._logger.check_message(decoded_message): self._stats.successful_messages += 1 success = True else: self._stats.validation_errors += 1 success = False else: self._stats.successful_messages += 1 success = True except Exception as e: # Update statistics for failed decoding self._stats.decode_errors += 1 # Log decoding error self._logger.log( TransmissionLog( timestamp=time.time(), event=TransmissionEvent.ERROR, message=encoded_message, data={"error": str(e), "error_type": type(e).__name__}, ) ) success = False # Update timing statistics processing_time = time.time() - start_time self._stats.total_processing_time += processing_time if self._stats.total_messages > 0: self._stats.avg_message_time = ( self._stats.total_processing_time / self._stats.total_messages ) yield success
[docs] def get_last_message(self) -> Optional[Message[SourceChar]]: """ Get the last successfully decoded message. Returns: The last decoded source message, or None if no messages have been successfully decoded yet """ return self._last_message
[docs] def get_stats(self) -> TransmissionStats: """ Get the current transmission statistics. Returns: Copy of the current TransmissionStats object with all collected metrics """ return copy(self._stats)
[docs] def reset_stats(self) -> None: """ Reset all statistics and logs to their initial state. This method clears all collected statistics and logs, allowing the receiver to start fresh for a new experiment or test run. """ self._stats = TransmissionStats()