Skip to content

cattle_grid.extensions.examples.simple_storage

This extension is an example of storing activities and then serving them through a HTTP API.

I will possibly extend it to also store objects (and provide some Create activity creation), but not much more.

A real storage mechanism should have several features this simple API has not, e.g.

  • Serving a HTML page through content negotiation
  • Allowing one to update the content of the database
  • Collecting and adding metadata, e.g. a replies collection for objects

Usage:

[[extensions]]
module = "cattle_grid.extensions.examples.simple_storage"
api_prefix = "/simple_storage"

config = { prefix = "/simple_storage/" }

extension = Extension(name='simple storage', module=__name__, lifespan=lifespan, config_class=SimpleStorageConfiguration) module-attribute

The simple storage extension

get_activity(uuid, headers, session) async

Returns the activity or object

Source code in cattle_grid/extensions/examples/simple_storage/__init__.py
@extension.get("/{uuid}")
async def get_activity(
    uuid: uuid.UUID, headers: ActivityPubHeaders, session: FastApiSession
):
    """Returns the activity or object"""
    result = await session.scalar(
        select(StoredActivity).where(StoredActivity.id == uuid)
    )

    if result is None:
        result = await session.scalar(
            select(StoredObject).where(StoredObject.id == uuid)
        )

    if result is None:
        raise HTTPException(status_code=404, detail="Activity not found")

    actor = await Actor.get_or_none(actor_id=result.actor)

    if actor is None:
        raise HTTPException(status_code=410, detail="Activity no longer available")

    if not await is_valid_requester(
        headers.x_cattle_grid_requester, actor, result.data
    ):
        raise HTTPException(status_code=401)

    if result.data.get("id") != headers.x_ap_location:
        raise HTTPException(status_code=400, detail="Location header does not match id")

    return result.data

main() async

Basic endpoint that just returns a string, so requesting with an uuid doesn’t return an error.

Source code in cattle_grid/extensions/examples/simple_storage/__init__.py
@extension.get("/")
async def main():
    """Basic endpoint that just returns a string, so
    requesting with an uuid doesn't return an error."""
    return "simple storage cattle grid sample extension"

simple_storage_publish_activity(msg, config, session, broker=Context()) async

Method to subscribe to the publish_activity routing_key.

An activity send to this endpoint will be stored in the database, and then published through the send_message mechanism.

The stored activity can then be retrieved through the HTTP API.

Source code in cattle_grid/extensions/examples/simple_storage/__init__.py
@extension.subscribe("publish_activity")
async def simple_storage_publish_activity(
    msg: PublishActivity,
    config: extension.Config,
    session: CommittingSession,
    broker: RabbitBroker = Context(),
):
    """Method to subscribe to the `publish_activity` routing_key.

    An activity send to this endpoint will be stored in the
    database, and then published through the `send_message`
    mechanism.

    The stored activity can then be retrieved through the
    HTTP API.
    """
    if msg.data.get("id"):
        raise ValueError("Activity ID must not be set")

    if msg.data.get("actor") != msg.actor:
        raise ValueError("Actor must match message actor")

    activity = msg.data
    activity["id"], uuid = config.make_id(msg.actor)

    logger.info("Publishing activity with id %s for %s", msg.actor, activity["id"])

    session.add(
        StoredActivity(
            id=uuid,
            data=activity,
            actor=msg.actor,
        )
    )

    await broker.publish(
        ActivityMessage(actor=msg.actor, data=activity),
        routing_key="send_message",
        exchange=exchange(),
    )

simple_storage_publish_object(msg, config, session, broker=Context()) async

Publishes an object, subscribed to the routing key publish_object.

We note that this routine creates a Create activity for the object.

Source code in cattle_grid/extensions/examples/simple_storage/__init__.py
@extension.subscribe("publish_object")
async def simple_storage_publish_object(
    msg: PublishObject,
    config: extension.Config,
    session: CommittingSession,
    broker: RabbitBroker = Context(),
):
    """Publishes an object, subscribed to the routing key
    `publish_object`.

    We note that this routine creates a `Create` activity for the object."""

    if msg.data.get("id"):
        raise ValueError("Object ID must not be set")

    if msg.data.get("attributedTo") != msg.actor:
        raise ValueError("Actor must match object attributedTo")

    obj = msg.data
    obj["id"], obj_uuid = config.make_id(msg.actor)

    logger.info("Publishing object with id %s for %s", msg.actor, obj["id"])

    actor = await Actor.get_or_none(actor_id=msg.actor)

    if not actor:
        raise ValueError(f"Unknown actor {msg.actor}")

    actor_profile = actor_to_object(actor)
    activity_factory, _ = factories_for_actor_object(actor_profile)

    activity_id, activity_uuid = config.make_id(msg.actor)

    activity = activity_factory.create(obj, id=activity_id).build()

    logger.info(activity)

    await broker.publish(
        ActivityMessage(actor=msg.actor, data=activity),
        routing_key="send_message",
        exchange=exchange(),
    )

    session.add(
        StoredActivity(
            id=activity_uuid,
            data=activity,
            actor=msg.actor,
        )
    )
    session.add(
        StoredObject(
            id=obj_uuid,
            data=obj,
            actor=msg.actor,
        )
    )

config

SimpleStorageConfiguration

Bases: BaseModel

Configuration of the simple storage extension

Parameters:

Name Type Description Default
prefix str
'/simple_storage/'
Source code in cattle_grid/extensions/examples/simple_storage/config.py
class SimpleStorageConfiguration(BaseModel):
    """Configuration of the simple storage extension"""

    prefix: str = "/simple_storage/"
    """
    Path to use before the generated uuid. The protocol and domain will be extracted from the actor id. See [cattle_grid.extensions.examples.simple_storage.config.determine_url_start][].
    """

    def url_start(self, actor_id):
        return determine_url_start(actor_id, self.prefix)

    def make_id(self, actor_id):
        new_uuid = uuid6.uuid7()
        return self.url_start(actor_id) + str(new_uuid), new_uuid

prefix = '/simple_storage/' class-attribute instance-attribute

Path to use before the generated uuid. The protocol and domain will be extracted from the actor id. See cattle_grid.extensions.examples.simple_storage.config.determine_url_start.

determine_url_start(actor_id, prefix)

Used to determine the url of a stored object

>>> determine_url_start("http://abel.example/actor/alice",
...     "/simple/storage/")
'http://abel.example/simple/storage/'
Source code in cattle_grid/extensions/examples/simple_storage/config.py
def determine_url_start(actor_id, prefix):
    """
    Used to determine the url of a stored object

    ```pycon
    >>> determine_url_start("http://abel.example/actor/alice",
    ...     "/simple/storage/")
    'http://abel.example/simple/storage/'

    ```
    """
    parsed = urlparse(actor_id)

    return f"{parsed.scheme}://{parsed.netloc}{prefix}"

database

CommittingSession = Annotated[AsyncSession, Depends(with_session_commit)] module-attribute

Session that commits the transaction

FastApiSession = Annotated[AsyncSession, FADepends(with_fast_api_session)] module-attribute

Session annotation to be used with FastAPI

lifespan(engine) async

The lifespan ensure that the necessary database table is created.

Source code in cattle_grid/extensions/examples/simple_storage/database.py
@asynccontextmanager
async def lifespan(engine: SqlAsyncEngine):
    """The lifespan ensure that the necessary database table is
    created."""
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    yield

message_types

PublishActivity

Bases: BaseModel

Used when publishing an activity

Parameters:

Name Type Description Default
actor str
required
data dict
required
Source code in cattle_grid/extensions/examples/simple_storage/message_types.py
class PublishActivity(BaseModel):
    """Used when publishing an activity"""

    actor: str = Field(examples=["http://alice.example"])
    """The actor performing the activity"""

    data: dict = Field(
        examples=[
            {
                "@context": "https://www.w3.org/ns/activitystreams",
                "type": "AnimalSound",
                "actor": "http://alice.example",
                "to": ["http://bob.example"],
                "content": "moo",
            }
        ]
    )
    """Activity to publish"""

actor = Field(examples=['http://alice.example']) class-attribute instance-attribute

The actor performing the activity

data = Field(examples=[{'@context': 'https://www.w3.org/ns/activitystreams', 'type': 'AnimalSound', 'actor': 'http://alice.example', 'to': ['http://bob.example'], 'content': 'moo'}]) class-attribute instance-attribute

Activity to publish

PublishObject

Bases: BaseModel

Used when publishing an object

Parameters:

Name Type Description Default
actor str
required
data dict
required
Source code in cattle_grid/extensions/examples/simple_storage/message_types.py
class PublishObject(BaseModel):
    """Used when publishing an object"""

    actor: str = Field(examples=["http://alice.example"])
    """The actor performing the activity"""

    data: dict = Field(
        examples=[
            {
                "@context": "https://www.w3.org/ns/activitystreams",
                "type": "Note",
                "attributedTo": "http://alice.example",
                "to": ["http://bob.example"],
                "content": "moo",
            }
        ]
    )
    """Object to publish"""

actor = Field(examples=['http://alice.example']) class-attribute instance-attribute

The actor performing the activity

data = Field(examples=[{'@context': 'https://www.w3.org/ns/activitystreams', 'type': 'Note', 'attributedTo': 'http://alice.example', 'to': ['http://bob.example'], 'content': 'moo'}]) class-attribute instance-attribute

Object to publish

models

StoredActivity

Bases: Base

Stored activity in the database

Source code in cattle_grid/extensions/examples/simple_storage/models.py
class StoredActivity(Base):
    """Stored activity in the database"""

    __tablename__ = "simple_storage_stored_activity"
    """name of the table"""

    id: Mapped[bytes] = mapped_column(UUIDType(binary=True), primary_key=True)
    """The id (uuid as bytes)"""
    data: Mapped[dict] = mapped_column(JSON)
    """The activity as JSON"""
    actor: Mapped[str] = mapped_column()
    """The actor that created the activity"""
    create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
    """The create timestamp"""

__tablename__ = 'simple_storage_stored_activity' class-attribute instance-attribute

name of the table

actor = mapped_column() class-attribute instance-attribute

The actor that created the activity

create_date = mapped_column(server_default=func.now()) class-attribute instance-attribute

The create timestamp

data = mapped_column(JSON) class-attribute instance-attribute

The activity as JSON

id = mapped_column(UUIDType(binary=True), primary_key=True) class-attribute instance-attribute

The id (uuid as bytes)

StoredObject

Bases: Base

Stored object in the database

Source code in cattle_grid/extensions/examples/simple_storage/models.py
class StoredObject(Base):
    """Stored object in the database"""

    __tablename__ = "simple_storage_stored_object"
    """name of the table"""

    id: Mapped[bytes] = mapped_column(UUIDType(binary=True), primary_key=True)
    """The id (uuid as bytes)"""
    data: Mapped[dict] = mapped_column(JSON)
    """The object as JSON"""
    actor: Mapped[str] = mapped_column()
    """The actor that created the object"""
    create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
    """The create timestamp"""

__tablename__ = 'simple_storage_stored_object' class-attribute instance-attribute

name of the table

actor = mapped_column() class-attribute instance-attribute

The actor that created the object

create_date = mapped_column(server_default=func.now()) class-attribute instance-attribute

The create timestamp

data = mapped_column(JSON) class-attribute instance-attribute

The object as JSON

id = mapped_column(UUIDType(binary=True), primary_key=True) class-attribute instance-attribute

The id (uuid as bytes)

test_api

create_actor(base_url, preferred_username=None, identifiers={}, profile={}) async

Creates a new actor in the database

Source code in cattle_grid/activity_pub/actor.py
async def create_actor(
    base_url: str,
    preferred_username: str | None = None,
    identifiers: dict = {},
    profile: dict = {},
):
    """Creates a new actor in the database"""

    public_key, private_key = generate_rsa_public_private_key()
    public_key_name = "legacy-key-1"
    actor_id = new_url(base_url, "actor")

    actor = await Actor.create(
        actor_id=actor_id,
        inbox_uri=new_url(base_url, "inbox"),
        outbox_uri=new_url(base_url, "outbox"),
        following_uri=new_url(base_url, "following"),
        followers_uri=new_url(base_url, "followers"),
        public_key_name=public_key_name,
        public_key=public_key,
        profile=profile,
        automatically_accept_followers=False,
    )
    await Credential.create(
        actor_id=actor_id,
        identifier=f"{actor_id}#{public_key_name}",
        secret=private_key,
    )

    if preferred_username:
        if "webfinger" in identifiers:
            raise ValueError("webfinger key set in identifiers")
        identifiers = {
            **identifiers,
            "webfinger": compute_acct_uri(base_url, preferred_username),
        }

    if "activitypub_id" not in identifiers:
        identifiers = {**identifiers, "activitypub_id": actor_id}

    for name, identifier in identifiers.items():
        await PublicIdentifier.create(actor=actor, name=name, identifier=identifier)

    logging.info("Created actor with id '%s'", actor_id)

    await actor.fetch_related("identifiers")

    return actor

database() async

Fixture so that the database is initialized

Source code in cattle_grid/testing/fixtures.py
@pytest.fixture(autouse=True)
async def database():
    """Fixture so that the database is initialized"""
    async with with_database(db_uri="sqlite://:memory:", generate_schemas=True):
        yield

new_auth_config(actor_id, username=None)

Creates a new authorization configuration

Source code in cattle_grid/config/auth.py
def new_auth_config(actor_id: str, username: str | None = None) -> AuthConfig:
    """Creates a new authorization configuration"""
    if not username:
        username = secrets.token_urlsafe(12)

    domain = urlparse(actor_id).netloc
    acct_uri = f"acct:{username}@{domain}"

    public_key, private_key = generate_rsa_public_private_key()

    auth_config = AuthConfig(
        actor_id=actor_id,
        actor_acct_id=acct_uri,
        public_key=public_key,
        private_key=private_key,
        domain_blocks=set(),
    )

    return auth_config

save_auth_config(filename, config)

Saves the authorization configuration to a file

Source code in cattle_grid/config/auth.py
def save_auth_config(filename: str, config: AuthConfig) -> None:
    """Saves the authorization configuration to a file"""
    with open(filename, "wb") as fp:
        tomli_w.dump({"auth": config.model_dump()}, fp, multiline_strings=True)

test_actor() async

Fixture to create an actor

Source code in cattle_grid/testing/fixtures.py
@pytest.fixture
async def test_actor():
    """Fixture to create an actor"""
    return await create_actor("http://localhost/ap")

with_database(db_uri='sqlite://:memory:', generate_schemas=False) async

Opens the connection to the database using tortoise

Source code in cattle_grid/database.py
@asynccontextmanager
async def database(db_uri: str = "sqlite://:memory:", generate_schemas: bool = False):
    """Opens the connection to the database using tortoise"""
    await Tortoise.init(config=tortoise_config(db_uri))
    if generate_schemas:
        await Tortoise.generate_schemas()

    try:
        yield
    finally:
        await Tortoise.close_connections()

test_handlers

create_actor(base_url, preferred_username=None, identifiers={}, profile={}) async

Creates a new actor in the database

Source code in cattle_grid/activity_pub/actor.py
async def create_actor(
    base_url: str,
    preferred_username: str | None = None,
    identifiers: dict = {},
    profile: dict = {},
):
    """Creates a new actor in the database"""

    public_key, private_key = generate_rsa_public_private_key()
    public_key_name = "legacy-key-1"
    actor_id = new_url(base_url, "actor")

    actor = await Actor.create(
        actor_id=actor_id,
        inbox_uri=new_url(base_url, "inbox"),
        outbox_uri=new_url(base_url, "outbox"),
        following_uri=new_url(base_url, "following"),
        followers_uri=new_url(base_url, "followers"),
        public_key_name=public_key_name,
        public_key=public_key,
        profile=profile,
        automatically_accept_followers=False,
    )
    await Credential.create(
        actor_id=actor_id,
        identifier=f"{actor_id}#{public_key_name}",
        secret=private_key,
    )

    if preferred_username:
        if "webfinger" in identifiers:
            raise ValueError("webfinger key set in identifiers")
        identifiers = {
            **identifiers,
            "webfinger": compute_acct_uri(base_url, preferred_username),
        }

    if "activitypub_id" not in identifiers:
        identifiers = {**identifiers, "activitypub_id": actor_id}

    for name, identifier in identifiers.items():
        await PublicIdentifier.create(actor=actor, name=name, identifier=identifier)

    logging.info("Created actor with id '%s'", actor_id)

    await actor.fetch_related("identifiers")

    return actor

database() async

Fixture so that the database is initialized

Source code in cattle_grid/testing/fixtures.py
@pytest.fixture(autouse=True)
async def database():
    """Fixture so that the database is initialized"""
    async with with_database(db_uri="sqlite://:memory:", generate_schemas=True):
        yield

new_auth_config(actor_id, username=None)

Creates a new authorization configuration

Source code in cattle_grid/config/auth.py
def new_auth_config(actor_id: str, username: str | None = None) -> AuthConfig:
    """Creates a new authorization configuration"""
    if not username:
        username = secrets.token_urlsafe(12)

    domain = urlparse(actor_id).netloc
    acct_uri = f"acct:{username}@{domain}"

    public_key, private_key = generate_rsa_public_private_key()

    auth_config = AuthConfig(
        actor_id=actor_id,
        actor_acct_id=acct_uri,
        public_key=public_key,
        private_key=private_key,
        domain_blocks=set(),
    )

    return auth_config

save_auth_config(filename, config)

Saves the authorization configuration to a file

Source code in cattle_grid/config/auth.py
def save_auth_config(filename: str, config: AuthConfig) -> None:
    """Saves the authorization configuration to a file"""
    with open(filename, "wb") as fp:
        tomli_w.dump({"auth": config.model_dump()}, fp, multiline_strings=True)

test_actor() async

Fixture to create an actor

Source code in cattle_grid/testing/fixtures.py
@pytest.fixture
async def test_actor():
    """Fixture to create an actor"""
    return await create_actor("http://localhost/ap")

with_database(db_uri='sqlite://:memory:', generate_schemas=False) async

Opens the connection to the database using tortoise

Source code in cattle_grid/database.py
@asynccontextmanager
async def database(db_uri: str = "sqlite://:memory:", generate_schemas: bool = False):
    """Opens the connection to the database using tortoise"""
    await Tortoise.init(config=tortoise_config(db_uri))
    if generate_schemas:
        await Tortoise.generate_schemas()

    try:
        yield
    finally:
        await Tortoise.close_connections()