Skip to content

logicblocks.event.store

Example

from logicblocks.event.store import EventStore, adapters
from logicblocks.event.types import NewEvent
from logicblocks.event.projection import Projector

adapter = adapters.InMemoryStorageAdapter()
store = EventStore(adapter)

stream = store.stream(category="profiles", stream="joe.bloggs")
stream.publish(
  events = [
    NewEvent(
      name="profile-created",
      payload={
        "name": "Joe Bloggs",
        "email": "joe.bloggs@example.com"
      }
    )
  ])
stream.publish(
  events = [
    NewEvent(
      name="date-of-birth-set",
      payload={
          "dob": "1992-07-10"
      }
    )
  ]
)

projector = Projector(
  handlers={
    "profile-created": lambda state, event: state.merge({
      "name": event.payload["name"],
      "email": event.payload["email"]
    }),
    "date-of-birth-set": lambda state, event: state.merge({
      "dob": event.payload["dob"]
    })
  }
)
profile = projector.project({}, stream.read())

# profile == {
#   "name": "Joe Bloggs", 
#   "email": "joe.bloggs@example.com", 
#   "dob": "1992-07-10"
# }

Reference

logicblocks.event.store

logicblocks.event.store.EventStore

Bases: object

The primary interface into the store of events.

An EventStore is backed by a StorageAdapter which implements event storage. Typically, events are stored in an immutable append only log, the details of which are storage implementation specific.

The event store is partitioned into streams, a sequence of events relating to the same "thing", such as an entity, a process or a state machine, and categories, a logical grouping of streams that share some characteristics.

For example, a stream might exist for each order in a commerce system, with the category of such streams being "orders".

Streams and categories are each identified by a string name. The combination of a category name and a stream name acts as an identifier for a specific stream of events.

Source code in src/logicblocks/event/store/store.py
class EventStore(object):
    """The primary interface into the store of events.

    An [`EventStore`][logicblocks.event.store.EventStore] is backed by a
    [`StorageAdapter`][logicblocks.event.store.adapters.StorageAdapter]
    which implements event storage. Typically, events are stored in an immutable
    append only log, the details of which are storage implementation specific.

    The event store is partitioned into _streams_, a sequence of events relating
    to the same "thing", such as an entity, a process or a state machine, and
    _categories_, a logical grouping of streams that share some characteristics.

    For example, a stream might exist for each order in a commerce system, with
    the category of such streams being "orders".

    Streams and categories are each identified by a string name. The combination
    of a category name and a stream name acts as an identifier for a specific
    stream of events.
    """

    adapter: StorageAdapter

    def __init__(self, adapter: StorageAdapter):
        self.adapter = adapter

    def stream(self, *, category: str, stream: str) -> EventStream:
        """Get a stream of events from the store.

        This method alone doesn't result in any IO, it instead returns a scoped
        event store for the stream identified by the category and stream names,
        as part of a fluent interface.

        Categories and streams implicitly exist, i.e., calling this method for a
        category or stream that has never been written to will not result in an
        error.

        Args:
            category (str): The name of the category of the stream.
            stream (str): The name of the stream.

        Returns:
            an event store scoped to the specified stream.
        """
        return EventStream(
            adapter=self.adapter, category=category, stream=stream
        )

    def category(self, *, category: str) -> EventCategory:
        """Get a category of events from the store.

        This method alone doesn't result in any IO, it instead returns a scoped
        event store for the category identified by the category name,
        as part of a fluent interface.

        Categories implicitly exist, i.e., calling this method for a category
        that has never been written to will not result in an error.

        Args:
            category (str): The name of the category.

        Returns:
            an event store scoped to the specified category.
        """
        return EventCategory(adapter=self.adapter, category=category)
category
category(*, category: str) -> EventCategory

Get a category of events from the store.

This method alone doesn't result in any IO, it instead returns a scoped event store for the category identified by the category name, as part of a fluent interface.

Categories implicitly exist, i.e., calling this method for a category that has never been written to will not result in an error.

Parameters:

Name Type Description Default
category str

The name of the category.

required

Returns:

Type Description
EventCategory

an event store scoped to the specified category.

Source code in src/logicblocks/event/store/store.py
def category(self, *, category: str) -> EventCategory:
    """Get a category of events from the store.

    This method alone doesn't result in any IO, it instead returns a scoped
    event store for the category identified by the category name,
    as part of a fluent interface.

    Categories implicitly exist, i.e., calling this method for a category
    that has never been written to will not result in an error.

    Args:
        category (str): The name of the category.

    Returns:
        an event store scoped to the specified category.
    """
    return EventCategory(adapter=self.adapter, category=category)
stream
stream(*, category: str, stream: str) -> EventStream

Get a stream of events from the store.

This method alone doesn't result in any IO, it instead returns a scoped event store for the stream identified by the category and stream names, as part of a fluent interface.

Categories and streams implicitly exist, i.e., calling this method for a category or stream that has never been written to will not result in an error.

Parameters:

Name Type Description Default
category str

The name of the category of the stream.

required
stream str

The name of the stream.

required

Returns:

Type Description
EventStream

an event store scoped to the specified stream.

Source code in src/logicblocks/event/store/store.py
def stream(self, *, category: str, stream: str) -> EventStream:
    """Get a stream of events from the store.

    This method alone doesn't result in any IO, it instead returns a scoped
    event store for the stream identified by the category and stream names,
    as part of a fluent interface.

    Categories and streams implicitly exist, i.e., calling this method for a
    category or stream that has never been written to will not result in an
    error.

    Args:
        category (str): The name of the category of the stream.
        stream (str): The name of the stream.

    Returns:
        an event store scoped to the specified stream.
    """
    return EventStream(
        adapter=self.adapter, category=category, stream=stream
    )

logicblocks.event.store.EventCategory

Bases: object

A class for interacting with a specific category of events.

Since a category consists of zero or more streams, the category can be narrowed to a specific stream using the stream method.

Events in the category can be read using the read method. Categories are also iterable, supporting iter.

Attributes:

Name Type Description
adapter StorageAdapter

The storage adapter to use for interacting with the category.

category str

the name of the category.

Source code in src/logicblocks/event/store/store.py
class EventCategory(object):
    """A class for interacting with a specific category of events.

    Since a category consists of zero or more streams, the category
    can be narrowed to a specific stream using the `stream` method.

    Events in the category can be read using the `read` method. Categories are
    also iterable, supporting `iter`.

    Attributes:
        adapter: The storage adapter to use for interacting with the category.
        category: the name of the category.
    """

    adapter: StorageAdapter
    category: str

    def __init__(self, adapter: StorageAdapter, category: str):
        self.adapter = adapter
        self.category = category

    def __iter__(self):
        """Iterate over the events in the category."""
        return self.adapter.scan(
            target=identifier.Category(category=self.category)
        )

    def stream(self, *, stream: str) -> EventStream:
        """Get a stream of events in the category.

        Args:
            stream (str): The name of the stream.

        Returns:
            an event store scoped to the specified stream.
        """
        return EventStream(
            adapter=self.adapter, category=self.category, stream=stream
        )

    def read(self) -> Sequence[StoredEvent]:
        """Read all events from the category.

        All events will be read into memory so stream iteration should be
        preferred in order to give storage adapters the opportunity to page
        events as they are read from the underlying persistence."""
        return list(iter(self))
__iter__
__iter__()

Iterate over the events in the category.

Source code in src/logicblocks/event/store/store.py
def __iter__(self):
    """Iterate over the events in the category."""
    return self.adapter.scan(
        target=identifier.Category(category=self.category)
    )
read
read() -> Sequence[StoredEvent]

Read all events from the category.

All events will be read into memory so stream iteration should be preferred in order to give storage adapters the opportunity to page events as they are read from the underlying persistence.

Source code in src/logicblocks/event/store/store.py
def read(self) -> Sequence[StoredEvent]:
    """Read all events from the category.

    All events will be read into memory so stream iteration should be
    preferred in order to give storage adapters the opportunity to page
    events as they are read from the underlying persistence."""
    return list(iter(self))
stream
stream(*, stream: str) -> EventStream

Get a stream of events in the category.

Parameters:

Name Type Description Default
stream str

The name of the stream.

required

Returns:

Type Description
EventStream

an event store scoped to the specified stream.

Source code in src/logicblocks/event/store/store.py
def stream(self, *, stream: str) -> EventStream:
    """Get a stream of events in the category.

    Args:
        stream (str): The name of the stream.

    Returns:
        an event store scoped to the specified stream.
    """
    return EventStream(
        adapter=self.adapter, category=self.category, stream=stream
    )

logicblocks.event.store.EventStream

Bases: object

A class for interacting with a specific stream of events.

Events can be published into the stream using the publish method, and then entire stream can be read using the read method. Streams are also iterable, supporting iter.

Attributes:

Name Type Description
adapter StorageAdapter

The storage adapter to use for interacting with the stream.

category str

The name of the category of the stream.

stream str

The name of the stream.

Source code in src/logicblocks/event/store/store.py
class EventStream(object):
    """A class for interacting with a specific stream of events.

    Events can be published into the stream using the `publish` method, and
    then entire stream can be read using the `read` method. Streams are also
    iterable, supporting `iter`.

    Attributes:
        adapter: The storage adapter to use for interacting with the stream.
        category: The name of the category of the stream.
        stream: The name of the stream.
    """

    adapter: StorageAdapter
    category: str
    stream: str

    def __init__(self, adapter: StorageAdapter, category: str, stream: str):
        self.adapter = adapter
        self.category = category
        self.stream = stream

    def __iter__(self):
        """Iterate over the events in the stream from position 0 to the end."""
        return self.adapter.scan(
            target=identifier.Stream(
                category=self.category, stream=self.stream
            )
        )

    def publish(
        self,
        *,
        events: Sequence[NewEvent],
        conditions: Set[WriteCondition] = frozenset(),
    ) -> Sequence[StoredEvent]:
        """Publish a sequence of events into the stream."""
        target = identifier.Stream(category=self.category, stream=self.stream)

        return self.adapter.save(
            target=target,
            events=events,
            conditions=conditions,
        )

    def read(self) -> Sequence[StoredEvent]:
        """Read all events from the stream.

        All events will be read into memory so stream iteration should be
        preferred in order to give storage adapters the opportunity to page
        events as they are read from the underlying persistence."""
        return list(iter(self))
__iter__
__iter__()

Iterate over the events in the stream from position 0 to the end.

Source code in src/logicblocks/event/store/store.py
def __iter__(self):
    """Iterate over the events in the stream from position 0 to the end."""
    return self.adapter.scan(
        target=identifier.Stream(
            category=self.category, stream=self.stream
        )
    )
publish
publish(
    *,
    events: Sequence[NewEvent],
    conditions: Set[WriteCondition] = frozenset()
) -> Sequence[StoredEvent]

Publish a sequence of events into the stream.

Source code in src/logicblocks/event/store/store.py
def publish(
    self,
    *,
    events: Sequence[NewEvent],
    conditions: Set[WriteCondition] = frozenset(),
) -> Sequence[StoredEvent]:
    """Publish a sequence of events into the stream."""
    target = identifier.Stream(category=self.category, stream=self.stream)

    return self.adapter.save(
        target=target,
        events=events,
        conditions=conditions,
    )
read
read() -> Sequence[StoredEvent]

Read all events from the stream.

All events will be read into memory so stream iteration should be preferred in order to give storage adapters the opportunity to page events as they are read from the underlying persistence.

Source code in src/logicblocks/event/store/store.py
def read(self) -> Sequence[StoredEvent]:
    """Read all events from the stream.

    All events will be read into memory so stream iteration should be
    preferred in order to give storage adapters the opportunity to page
    events as they are read from the underlying persistence."""
    return list(iter(self))

logicblocks.event.store.adapters.StorageAdapter

Bases: ABC

Source code in src/logicblocks/event/store/adapters/base.py
class StorageAdapter(ABC):
    @abstractmethod
    def save(
        self,
        *,
        target: Saveable,
        events: Sequence[NewEvent],
        conditions: Set[WriteCondition] = frozenset(),
    ) -> Sequence[StoredEvent]:
        raise NotImplementedError()

    @abstractmethod
    def scan(
        self,
        *,
        target: Scannable = identifier.Log(),
    ) -> Iterator[StoredEvent]:
        raise NotImplementedError()

logicblocks.event.store.adapters.InMemoryStorageAdapter

Bases: StorageAdapter

Source code in src/logicblocks/event/store/adapters/in_memory.py
class InMemoryStorageAdapter(StorageAdapter):
    _lock: threading.Lock
    _events: list[StoredEvent]
    _log_index: EventPositionList
    _stream_index: EventIndexDict[StreamKey]
    _category_index: EventIndexDict[CategoryKey]

    def __init__(self):
        self._lock = threading.Lock()
        self._events = []
        self._log_index = []
        self._stream_index = defaultdict(lambda: [])
        self._category_index = defaultdict(lambda: [])

    def save(
        self,
        *,
        target: Saveable,
        events: Sequence[NewEvent],
        conditions: Set[WriteCondition] = frozenset(),
    ) -> Sequence[StoredEvent]:
        category_key = target.category
        stream_key = (target.category, target.stream)

        with self._lock:
            stream_indices = self._stream_index[stream_key]
            stream_events = [self._events[i] for i in stream_indices]

            last_event = stream_events[-1] if stream_events else None

            for condition in conditions:
                condition.ensure(last_event)

            last_sequence_number = len(self._events)
            last_stream_position = (
                -1 if len(stream_events) == 0 else stream_events[-1].position
            )

            new_sequence_numbers = [
                last_sequence_number + i for i in range(len(events))
            ]
            new_stored_events = [
                StoredEvent(
                    id=uuid4().hex,
                    name=event.name,
                    stream=target.stream,
                    category=target.category,
                    position=last_stream_position + count + 1,
                    sequence_number=last_sequence_number + count,
                    payload=event.payload,
                    observed_at=event.observed_at,
                    occurred_at=event.occurred_at,
                )
                for event, count in zip(events, range(len(events)))
            ]

            self._events += new_stored_events
            self._log_index += new_sequence_numbers
            self._stream_index[stream_key] += new_sequence_numbers
            self._category_index[category_key] += new_sequence_numbers

            return new_stored_events

    def scan(
        self, *, target: Scannable = identifier.Log()
    ) -> Iterator[StoredEvent]:
        index = self._select_index(target)
        for sequence_number in index:
            yield self._events[sequence_number]

    def _select_index(self, target: Scannable) -> EventPositionList:
        match target:
            case identifier.Log():
                return self._log_index
            case identifier.Category(category):
                return self._category_index[category]
            case identifier.Stream(category, stream):
                return self._stream_index[(category, stream)]

logicblocks.event.store.adapters.PostgresStorageAdapter

Bases: StorageAdapter

Source code in src/logicblocks/event/store/adapters/postgres.py
class PostgresStorageAdapter(StorageAdapter):
    connection_pool: ConnectionPool[Connection]
    connection_pool_owner: bool
    table_parameters: TableParameters

    def __init__(
        self,
        *,
        connection_source: ConnectionSource,
        table_parameters: TableParameters = TableParameters(),
    ):
        if isinstance(connection_source, ConnectionParameters):
            self.connection_pool_owner = True
            self.connection_pool = ConnectionPool[Connection](
                connection_source.to_connection_string(), open=True
            )
        else:
            self.connection_pool_owner = False
            self.connection_pool = connection_source

        self.table_parameters = table_parameters

    def close(self) -> None:
        if self.connection_pool_owner:
            self.connection_pool.close()

    def save(
        self,
        *,
        target: Saveable,
        events: Sequence[NewEvent],
        conditions: Set[WriteCondition] = frozenset(),
    ) -> Sequence[StoredEvent]:
        with self.connection_pool.connection() as connection:
            with connection.cursor(
                row_factory=class_row(StoredEvent)
            ) as cursor:
                lock_table(cursor, table_parameters=self.table_parameters)

                last_event = read_last(
                    cursor,
                    target=target,
                    table_parameters=self.table_parameters,
                )

                for condition in conditions:
                    condition.ensure(last_event)

                current_position = last_event.position + 1 if last_event else 0

                return [
                    insert(
                        cursor,
                        target=target,
                        event=event,
                        position=position,
                        table_parameters=self.table_parameters,
                    )
                    for position, event in enumerate(events, current_position)
                ]

    def scan(
        self,
        *,
        target: Scannable = identifier.Log(),
    ) -> Iterator[StoredEvent]:
        with self.connection_pool.connection() as connection:
            with connection.cursor(
                row_factory=class_row(StoredEvent)
            ) as cursor:
                for record in cursor.execute(
                    *scan_query(target, table_parameters=self.table_parameters)
                ):
                    yield record

logicblocks.event.store.conditions.WriteCondition

Bases: ABC

Source code in src/logicblocks/event/store/conditions.py
class WriteCondition(ABC):
    @abstractmethod
    def ensure(self, last_event: StoredEvent | None) -> None:
        raise NotImplementedError()

logicblocks.event.store.conditions.PositionIsCondition dataclass

Bases: WriteCondition

Source code in src/logicblocks/event/store/conditions.py
@dataclass(frozen=True)
class PositionIsCondition(WriteCondition):
    position: int

    def ensure(self, last_event: StoredEvent | None):
        if last_event is None or last_event.position is not self.position:
            raise UnmetWriteConditionError("unexpected stream position")

logicblocks.event.store.conditions.EmptyStreamCondition dataclass

Bases: WriteCondition

Source code in src/logicblocks/event/store/conditions.py
@dataclass(frozen=True)
class EmptyStreamCondition(WriteCondition):
    def ensure(self, last_event: StoredEvent | None):
        if last_event is not None:
            raise UnmetWriteConditionError("stream is not empty")

logicblocks.event.store.conditions.position_is

position_is(position: int) -> WriteCondition
Source code in src/logicblocks/event/store/conditions.py
def position_is(position: int) -> WriteCondition:
    return PositionIsCondition(position=position)

logicblocks.event.store.conditions.stream_is_empty

stream_is_empty() -> WriteCondition
Source code in src/logicblocks/event/store/conditions.py
def stream_is_empty() -> WriteCondition:
    return EmptyStreamCondition()

logicblocks.event.types

logicblocks.event.types.identifier.Log dataclass

Bases: Identifier

Source code in src/logicblocks/event/types/identifier.py
@dataclass(frozen=True)
class Log(Identifier):
    def json(self) -> str:
        return json.dumps({"type": "log"})

    def __repr__(self) -> str:
        return "identifier.Log()"

    def __hash__(self):
        return hash(self.json())

logicblocks.event.types.identifier.Category dataclass

Bases: Identifier

Source code in src/logicblocks/event/types/identifier.py
@dataclass(frozen=True)
class Category(Identifier):
    category: str

    def __init__(self, *, category: str):
        object.__setattr__(self, "category", category)

    def json(self) -> str:
        return json.dumps(
            {
                "type": "category",
                "category": self.category,
            }
        )

    def __repr__(self) -> str:
        return f"identifier.Category(category={self.category})"

    def __hash__(self):
        return hash(self.json())

logicblocks.event.types.identifier.Stream dataclass

Bases: Identifier

Source code in src/logicblocks/event/types/identifier.py
@dataclass(frozen=True)
class Stream(Identifier):
    category: str
    stream: str

    def __init__(self, *, category: str, stream: str):
        object.__setattr__(self, "category", category)
        object.__setattr__(self, "stream", stream)

    def json(self) -> str:
        return json.dumps(
            {
                "type": "stream",
                "category": self.category,
                "stream": self.stream,
            }
        )

    def __repr__(self) -> str:
        return (
            f"identifier.Stream(category={self.category},stream={self.stream})"
        )

    def __hash__(self):
        return hash(self.json())

logicblocks.event.types.NewEvent dataclass

Bases: object

Source code in src/logicblocks/event/types/event.py
@dataclass(frozen=True)
class NewEvent(object):
    name: str
    payload: Mapping[str, Any]
    observed_at: datetime
    occurred_at: datetime

    def __init__(
        self,
        *,
        name: str,
        payload: Mapping[str, Any],
        observed_at: datetime | None = None,
        occurred_at: datetime | None = None,
        clock: Clock = SystemClock(),
    ):
        if observed_at is None:
            observed_at = clock.now(UTC)
        if occurred_at is None:
            occurred_at = observed_at

        object.__setattr__(self, "name", name)
        object.__setattr__(self, "payload", payload)
        object.__setattr__(self, "observed_at", observed_at)
        object.__setattr__(self, "occurred_at", occurred_at)

    def json(self):
        return json.dumps(
            {
                "name": self.name,
                "payload": self.payload,
                "observedAt": self.observed_at.isoformat(),
                "occurredAt": self.occurred_at.isoformat(),
            },
            sort_keys=True,
        )

    def __repr__(self):
        return (
            f"NewEvent("
            f"name={self.name}, "
            f"payload={dict(self.payload)}, "
            f"observed_at={self.observed_at}, "
            f"occurred_at={self.occurred_at})"
        )

    def __hash__(self):
        return hash(self.json())

logicblocks.event.types.StoredEvent dataclass

Bases: object

Source code in src/logicblocks/event/types/event.py
@dataclass(frozen=True)
class StoredEvent(object):
    id: str
    name: str
    stream: str
    category: str
    position: int
    sequence_number: int
    payload: Mapping[str, Any]
    observed_at: datetime
    occurred_at: datetime

    def __init__(
        self,
        *,
        id: str,
        name: str,
        stream: str,
        category: str,
        position: int,
        sequence_number: int,
        payload: Mapping[str, Any],
        observed_at: datetime,
        occurred_at: datetime,
    ):
        object.__setattr__(self, "id", id)
        object.__setattr__(self, "name", name)
        object.__setattr__(self, "stream", stream)
        object.__setattr__(self, "category", category)
        object.__setattr__(self, "position", position)
        object.__setattr__(self, "sequence_number", sequence_number)
        object.__setattr__(self, "payload", payload)
        object.__setattr__(self, "observed_at", observed_at)
        object.__setattr__(self, "occurred_at", occurred_at)

    def json(self):
        return json.dumps(
            {
                "id": self.id,
                "name": self.name,
                "stream": self.stream,
                "category": self.category,
                "position": self.position,
                "sequence_number": self.sequence_number,
                "payload": self.payload,
                "observedAt": self.observed_at.isoformat(),
                "occurredAt": self.occurred_at.isoformat(),
            },
            sort_keys=True,
        )

    def __repr__(self):
        return (
            f"StoredEvent("
            f"id={self.id}, "
            f"name={self.name}, "
            f"stream={self.stream}, "
            f"category={self.category}, "
            f"position={self.position}, "
            f"sequence_number={self.sequence_number}, "
            f"payload={dict(self.payload)}, "
            f"observed_at={self.observed_at}, "
            f"occurred_at={self.occurred_at})"
        )

    def __hash__(self):
        return hash(self.json())

logicblocks.event.types.Projection dataclass

Bases: object

Source code in src/logicblocks/event/types/projection.py
@dataclass(frozen=True)
class Projection(object):
    state: Mapping[str, Any]
    position: int

    def __init__(
        self,
        *,
        state: Mapping[str, Any],
        position: int,
    ):
        object.__setattr__(self, "state", state)
        object.__setattr__(self, "position", position)

    def json(self):
        return json.dumps(
            {
                "state": self.state,
                "position": self.position,
            },
            sort_keys=True,
        )

    def __repr__(self):
        return (
            f"Projection("
            f"state={dict(self.state)}, "
            f"position={self.position})"
        )

    def __hash__(self):
        return hash(self.json())

logicblocks.event.projection

logicblocks.event.projection.Projector dataclass

Source code in src/logicblocks/event/projection/projector.py
@dataclass(frozen=True)
class Projector:
    handlers: Mapping[
        str, Callable[[Mapping[str, Any], StoredEvent], Mapping[str, Any]]
    ]

    def __init__(
        self,
        *,
        handlers: Mapping[
            str, Callable[[Mapping[str, Any], StoredEvent], Mapping[str, Any]]
        ],
    ):
        object.__setattr__(self, "handlers", handlers)

    def call_handler_func(self, state: Mapping[str, Any], event: StoredEvent):
        if event.name in self.handlers:
            handler_function = self.handlers[event.name]
            return handler_function(state, event)
        else:
            raise MissingHandlerError(event)

    def project(self, state: Mapping[str, Any], events: List[StoredEvent]):
        return Projection(
            state=functools.reduce(self.call_handler_func, events, state),
            position=events[-1].position,
        )

logicblocks.event.testing

logicblocks.event.testing.builders

logicblocks.event.testing.data