Skip to content

.activity_pub

cattle_grid.activity_pub

get_async_api_schema()

Returns the async api schema for cattle_grid ActivityPub processing

Source code in cattle_grid/activity_pub/__init__.py
def get_async_api_schema() -> Schema:
    """Returns the async api schema for cattle_grid ActivityPub processing"""
    from faststream.asyncapi import get_app_schema

    app = get_mock_faststream_app()

    return get_app_schema(app)

get_fastapi_app()

Returns the fast api app for ActivityPub processing

Source code in cattle_grid/activity_pub/__init__.py
def get_fastapi_app() -> FastAPI:
    """Returns the fast api app for ActivityPub processing"""

    app = FastAPI(title="cattle_grid ActivityPub routes", version=__version__)
    app.include_router(router)

    return app

get_mock_faststream_app()

Creates a mock faststream app for ActivityPub processing

Source code in cattle_grid/activity_pub/__init__.py
def get_mock_faststream_app() -> FastStream:
    """Creates a mock faststream app for ActivityPub processing"""

    from faststream.rabbit import RabbitBroker

    broker = RabbitBroker()
    broker.include_router(create_processing_router())

    return FastStream(
        broker,
        title="cattle_grid ActivityPub processing",
        version=__version__,
        description="Illustrates how cattle grid processes ActivityPub",
    )

cattle_grid.activity_pub.activity

actor_deletes_themselves(activity)

Checks if activity is self delete of actor

>>> actor_deletes_themselves({"type": "Delete",
...     "actor": "http://actor.test/",
...     "object": "http://actor.test/"})
True

>>> actor_deletes_themselves({"type": "Delete",
...     "actor": "http://actor.test/",
...     "object": "http://other.test/"})
False
Source code in cattle_grid/activity_pub/activity.py
def actor_deletes_themselves(activity: Dict[str, Any]) -> bool:
    """
    Checks if activity is self delete of actor

    ```pycon
    >>> actor_deletes_themselves({"type": "Delete",
    ...     "actor": "http://actor.test/",
    ...     "object": "http://actor.test/"})
    True

    >>> actor_deletes_themselves({"type": "Delete",
    ...     "actor": "http://actor.test/",
    ...     "object": "http://other.test/"})
    False

    ```
    """

    activity_type = activity.get("type")
    if activity_type != "Delete":
        return False

    actor_id = activity.get("actor")
    object_id = id_for_object(activity.get("object"))

    if actor_id is None or object_id is None:
        return False

    return actor_id == object_id

cattle_grid.activity_pub.actor

actor_to_object(actor)

Transform the actor to an object

Parameters:

Name Type Description Default
actor Actor
required

Returns:

Type Description
dict
Source code in cattle_grid/activity_pub/actor.py
def actor_to_object(actor: Actor) -> dict:
    """Transform the actor to an object

    :params actor:
    :returns:
    """

    try:
        sorted_identifiers = [
            identifier.identifier
            for identifier in sorted(
                actor.identifiers, key=lambda x: x.preference, reverse=True
            )
        ]
    except Exception:
        sorted_identifiers = []

    preferred_username = determine_preferred_username(
        sorted_identifiers, actor.actor_id
    )

    result = AsActor(
        id=actor.actor_id,
        outbox=actor.outbox_uri,
        inbox=actor.inbox_uri,
        followers=actor.followers_uri,
        following=actor.following_uri,
        public_key=actor.public_key,
        public_key_name=actor.public_key_name,
        preferred_username=preferred_username,
        type=actor.profile.get("type", "Person"),
        name=actor.profile.get("name"),
        summary=actor.profile.get("summary"),
        url=actor.profile.get("url"),
        icon=actor.profile.get("image"),
    ).build(visibility=Visibility.OWNER)

    result["identifiers"] = sorted_identifiers

    return result

bovine_actor_for_actor_id(actor_id) async

Uses the information stored in Credential to construct a bovine actor

Parameters:

Name Type Description Default
actor_id str
required

Returns:

Type Description
BovineActor | None
Source code in cattle_grid/activity_pub/actor.py
async def bovine_actor_for_actor_id(actor_id: str) -> BovineActor | None:
    """Uses the information stored in [Credential][cattle_grid.activity_pub.models.Credential] to construct a bovine actor

    :params actor_id:
    :returns:
    """
    credential = await Credential.get_or_none(actor_id=actor_id)

    if credential is None:
        return None

    return BovineActor(
        public_key_url=credential.identifier,
        actor_id=actor_id,
        secret=credential.secret,
    )

compute_acct_uri(base_url, preferred_username)

Computes the acct uri

>>> compute_acct_uri("http://host.example/somewhere", "alice")
'acct:alice@host.example'
Source code in cattle_grid/activity_pub/actor.py
def compute_acct_uri(base_url: str, preferred_username: str):
    """Computes the acct uri

    ```pycon
    >>> compute_acct_uri("http://host.example/somewhere", "alice")
    'acct:alice@host.example'

    ```

    """
    host = urlparse(base_url).hostname

    return f"acct:{preferred_username}@{host}"

create_actor(base_url, preferred_username=None, identifiers={}, profile={}) async

Creates a new actor in the database

Source code in cattle_grid/activity_pub/actor.py
async def create_actor(
    base_url: str,
    preferred_username: str | None = None,
    identifiers: dict = {},
    profile: dict = {},
):
    """Creates a new actor in the database"""

    public_key, private_key = generate_rsa_public_private_key()
    public_key_name = "legacy-key-1"
    actor_id = new_url(base_url, "actor")

    actor = await Actor.create(
        actor_id=actor_id,
        inbox_uri=new_url(base_url, "inbox"),
        outbox_uri=new_url(base_url, "outbox"),
        following_uri=new_url(base_url, "following"),
        followers_uri=new_url(base_url, "followers"),
        public_key_name=public_key_name,
        public_key=public_key,
        profile=profile,
        automatically_accept_followers=False,
    )
    await Credential.create(
        actor_id=actor_id,
        identifier=f"{actor_id}#{public_key_name}",
        secret=private_key,
    )

    if preferred_username:
        if "webfinger" in identifiers:
            raise ValueError("webfinger key set in identifiers")
        identifiers = {
            **identifiers,
            "webfinger": compute_acct_uri(base_url, preferred_username),
        }

    if "activitypub_id" not in identifiers:
        identifiers = {**identifiers, "activitypub_id": actor_id}

    for name, identifier in identifiers.items():
        await PublicIdentifier.create(actor=actor, name=name, identifier=identifier)

    logging.info("Created actor with id '%s'", actor_id)

    await actor.fetch_related("identifiers")

    return actor

delete_actor(actor) async

Deletes an actor

Parameters:

Name Type Description Default
actor Actor

Actor to be deleted

required
Source code in cattle_grid/activity_pub/actor.py
async def delete_actor(actor: Actor):
    """Deletes an actor

    :param actor: Actor to be deleted
    """

    # await Credential.filter(actor_id=actor.actor_id).delete()
    await PublicIdentifier.filter(actor=actor).delete()

    actor.status = ActorStatus.deleted
    await actor.save()

delete_for_actor_profile(actor)

Creates a delete activity for the Actor

Source code in cattle_grid/activity_pub/actor.py
def delete_for_actor_profile(actor: Actor) -> dict:
    """Creates a delete activity for the Actor"""

    actor_profile = actor_to_object(actor)
    activity_factory, _ = factories_for_actor_object(actor_profile)

    result = (
        activity_factory.delete(
            actor_profile.get("id"), followers=actor_profile["followers"]
        )
        .as_public()
        .build()
    )

    result["cc"].append(actor_profile["following"])

    return result

determine_preferred_username(identifiers, actor_id)

Determine the preferred username from the sorted identifiers

Source code in cattle_grid/activity_pub/actor.py
def determine_preferred_username(identifiers: List[str], actor_id: str) -> str | None:
    """Determine the preferred username from the sorted identifiers"""
    actor_domain = urlparse(actor_id).netloc
    for identifier in identifiers:
        if identifier.startswith("acct:"):
            handle, domain = identifier.removeprefix("acct:").split("@")
            if domain == actor_domain:
                return handle

    return None

followers_for_actor(actor) async

Returns the list of accepted followers

Parameters:

Name Type Description Default
actor Actor
required

Returns:

Type Description
List[str]
Source code in cattle_grid/activity_pub/actor.py
async def followers_for_actor(actor: Actor) -> List[str]:
    """Returns the list of accepted followers

    :param actor:
    :returns:
    """

    await actor.fetch_related("followers")
    return [x.follower for x in actor.followers if x.accepted]

following_for_actor(actor) async

Returns the list of accepted people to follow said actor. This is the following table.

Parameters:

Name Type Description Default
actor Actor
required

Returns:

Type Description
List[str]
Source code in cattle_grid/activity_pub/actor.py
async def following_for_actor(actor: Actor) -> List[str]:
    """Returns the list of accepted people to follow said actor.
    This is the following table.

    :param actor:
    :returns:
    """

    await actor.fetch_related("following")
    return [x.following for x in actor.following if x.accepted]

is_valid_requester(requester, actor, obj) async

Checks if the requested is allowed to view the object

Source code in cattle_grid/activity_pub/actor.py
async def is_valid_requester(requester: str, actor: Actor, obj: dict):
    """Checks if the requested is allowed to view the object"""

    blocked = await Blocking.get_or_none(blocking=requester, actor=actor, active=True)

    if blocked:
        return False

    if is_public(obj):
        return True

    recipients = recipients_for_object(obj)
    self_delete = actor_deletes_themselves(obj)

    recipients = await update_recipients_for_actor(actor, recipients, self_delete)

    valid_requesters = recipients

    if "actor" in obj:
        valid_requesters = valid_requesters | {obj["actor"]}
    if "attributedTo" in obj:
        valid_requesters = valid_requesters | {obj["attributedTo"]}

    return requester in valid_requesters

remove_from_followers_following(actor_id_to_remove) async

Removes actor_id from all occurring followers and following

Source code in cattle_grid/activity_pub/actor.py
async def remove_from_followers_following(actor_id_to_remove: str):
    """Removes actor_id from all occurring followers and following"""

    await Follower.filter(follower=actor_id_to_remove).delete()
    await Following.filter(following=actor_id_to_remove).delete()

update_for_actor_profile(actor)

Creates an update for the Actor

Source code in cattle_grid/activity_pub/actor.py
def update_for_actor_profile(actor: Actor) -> dict:
    """Creates an update for the Actor"""

    actor_profile = actor_to_object(actor)
    activity_factory, _ = factories_for_actor_object(actor_profile)

    return (
        activity_factory.update(actor_profile, followers=actor_profile["followers"])
        .as_public()
        .build()
    )

update_recipients_for_actor(actor, recipients, self_delete=False) async

Updates set of recipients by removing the followers and following collections, and replacing them with the actual sets.

The following collecting is only allowed for self delete activities.

Source code in cattle_grid/activity_pub/actor.py
async def update_recipients_for_actor(actor, recipients, self_delete=False):
    """Updates set of recipients by removing the followers and following collections, and replacing
    them with the actual sets.

    The following collecting is only allowed for self delete activities.
    """
    if actor.followers_uri in recipients:
        recipients = recipients - {actor.followers_uri} | set(
            await followers_for_actor(actor)
        )

        logger.info("Got recipients %s after handling followers", ", ".join(recipients))

    if actor.following_uri in recipients:
        recipients = recipients - {actor.following_uri}

        if self_delete:
            recipients = recipients | set(await following_for_actor(actor))
        else:
            logger.warning(
                "Actor '%s' included following collection in recipients where not allowed",
                actor.actor_id,
            )

    return recipients

cattle_grid.activity_pub.enqueuer

determine_activity_type(activity)

Determines the type of an activity

>>> determine_activity_type({"type": "Follow"})
'Follow'
>>> determine_activity_type({}) is None
True

In the case of multiple types, these are concatenated. This means that they are probably missed by processing, but don’t get ignored.

>>> determine_activity_type({"type": ["Follow", "WhileSkipping"]})
'FollowWhileSkipping'

Parameters:

Name Type Description Default
activity dict
required

Returns:

Type Description
str | None
Source code in cattle_grid/activity_pub/enqueuer.py
def determine_activity_type(activity: dict) -> str | None:
    """Determines the type of an activity

    ```pycon
    >>> determine_activity_type({"type": "Follow"})
    'Follow'

    ```


    ```pycon
    >>> determine_activity_type({}) is None
    True

    ```

    In the case of multiple types, these are concatenated. This means
    that they are probably missed by processing, but don't get ignored.

    ```pycon
    >>> determine_activity_type({"type": ["Follow", "WhileSkipping"]})
    'FollowWhileSkipping'

    ```

    :params activity:
    :returns:

    """

    activity_type = activity.get("type")
    if activity_type is None:
        return None
    if isinstance(activity_type, list):
        activity_type = "".join(activity_type)

    return activity_type

enqueue_from_inbox(broker, exchange, receiving_actor_id, content) async

Enqueues a new message arrived from the inbox

The routing key will be incoming.${activity_type}

Source code in cattle_grid/activity_pub/enqueuer.py
async def enqueue_from_inbox(
    broker: RabbitBroker,
    exchange: RabbitExchange,
    receiving_actor_id: str,
    content: dict,
):
    """Enqueues a new message arrived from the inbox

    The routing key will be `incoming.${activity_type}`
    """
    activity_type = determine_activity_type(content)
    if activity_type is None:
        return

    msg = ActivityMessage(actor=receiving_actor_id, data=content)

    await broker.publish(
        msg, exchange=exchange, routing_key=f"incoming.{activity_type}"
    )