Source code for codinglab.senders.fixed

"""
Fixed messages sender implementation for the coding experiments library.

This module provides a sender implementation that cycles through a fixed
set of pre-defined messages. It's useful for reproducible experiments and
testing scenarios where specific message sequences are required.
"""

# Module metadata
__author__ = "Mikhail Mikhailov"
__license__ = "MIT"
__version__ = "0.1.0"
__all__ = ["FixedMessagesSender"]

import time
from typing import Sequence, Iterator, List, Optional
from .base import BaseSender
from ..interfaces import Encoder
from ..types import (
    Message,
    SourceChar,
    ChannelChar,
    TransmissionLogger,
    TransmissionEvent,
    TransmissionLog,
)
from ..logger import NullLogger


[docs] class FixedMessagesSender(BaseSender[SourceChar, ChannelChar]): """ Sender that cycles through a fixed set of source messages. This sender repeatedly sends messages from a provided list, wrapping around to the beginning when the end is reached. Each message is assigned a unique ID for tracking purposes. Attributes: _messages: List of source messages to send _message_id: Counter for assigning unique message IDs _index: Current position in the messages list """
[docs] def __init__( self, encoder: Encoder[SourceChar, ChannelChar], messages: List[Sequence[SourceChar]], logger: TransmissionLogger = NullLogger(), ) -> None: """ Initialize the fixed messages sender. Args: encoder: Encoder instance for converting source to channel symbols messages: List of source messages to send logger: Logger for recording transmission events (defaults to NullLogger) Raises: ValueError: If messages list is empty """ if not messages: raise ValueError("Messages list cannot be empty") super().__init__(encoder, logger) self._messages = messages """List of source messages to send.""" self._message_id = 0 """Counter for assigning unique message IDs.""" self._index = 0 """Current position in the messages list."""
[docs] def message_stream( self, stream_len: Optional[int] = None ) -> Iterator[Message[ChannelChar]]: """ Generate a stream of encoded messages from the fixed list. If stream_len is not specified, generates one encoded message for each source message in the list. If stream_len exceeds the number of messages, the sender wraps around to the beginning. Args: stream_len: Number of messages to generate. If None, uses the length of the messages list. Yields: Encoded messages wrapped in Message containers with unique IDs Raises: ValueError: If stream_len <= 0 """ if stream_len is None: stream_len = len(self._messages) if stream_len <= 0: raise ValueError(f"stream_len must be positive, got {stream_len}") for _ in range(stream_len): # Get the next source message from the list source_message = Message( id=self._message_id, data=self._messages[self._index] ) # Update tracking state self._last_message = source_message self._message_id += 1 self._index = (self._index + 1) % len(self._messages) self._logger.log( TransmissionLog( timestamp=time.time(), event=TransmissionEvent.SOURCE_GENERATED, message=source_message, data={"index": self._index}, ) ) # Encode the message encoded_data = self._encoder.encode(source_message.data) encoded_message = Message(id=source_message.id, data=encoded_data) self._logger.log( TransmissionLog( timestamp=time.time(), event=TransmissionEvent.ENCODED, message=encoded_message, data={}, ) ) yield encoded_message
[docs] def reset(self) -> None: """ Reset the sender to its initial state. This method resets the message index and ID counter, allowing the sender to restart from the beginning of the messages list. """ self._index = 0 self._message_id = 0