Skip to content

.activity_pub.processing

If you use cattle_grid as a middleware, it takes care of communicating to the Fediverse for you. This means instead of having to handle receiving and sending HTTP post requests, you will be dealing with RabbitMQ routing. The relevant routing keys will be

incoming.ACTIVITY_TYPE
outgoing.ACTIVITY_TYPE

where the message content is given by the ActivityMessage type.

cattle_grid.model

ActivityMessage

Bases: BaseModel

Message that contains an Activity. Activity is used as the name for the ‘data object’ being exchanged, as is common in the Fediverse

Source code in cattle_grid/model/__init__.py
class ActivityMessage(BaseModel):
    """
    Message that contains an Activity. Activity is used as the name for the 'data object' being exchanged, as is common in the Fediverse
    """

    model_config = ConfigDict(
        extra="forbid",
    )
    actor: str
    """
    actor_id of the actor that received the message
    """
    data: Dict[str, Any]
    """
    Activity.
    """
    activity_type: str
    """
    Type of the Activity
    """

activity_type: str instance-attribute

Type of the Activity

actor: str instance-attribute

actor_id of the actor that received the message

data: Dict[str, Any] instance-attribute

Activity.

FetchMessage

Bases: BaseModel

Used to request an ActivityPub object to be retrieved

Source code in cattle_grid/model/__init__.py
class FetchMessage(BaseModel):
    """
    Used to request an ActivityPub object to be retrieved
    """

    model_config = ConfigDict(
        extra="forbid",
    )
    actor: str
    """
    actor_id of the actor that received the message
    """
    uri: str
    """
    URI of the object being retrieved
    """

actor: str instance-attribute

actor_id of the actor that received the message

uri: str instance-attribute

URI of the object being retrieved

JSON Schema

This section provides the json-schema documents used on this page.

ActivityMessage

activity_message.schema.json
{
  "$schema": "http://json-schema.org/draft-06/schema#",
  "definitions": {
    "ActivityMessage": {
      "type": "object",
      "additionalProperties": false,
      "title": "Activity Message",
      "description": "Message that contains an Activity. Activity is used as the name for the 'data object' being exchanged, as is common in the Fediverse",
      "properties": {
        "actor": {
          "type": "string",
          "description": "actor_id of the actor that received the message"
        },
        "data": {
          "type": "object",
          "description": "Activity."
        },
        "activity_type": {
          "type": "string",
          "description": "Type of the Activity"
        }
      },
      "required": ["data", "actor", "activity_type"]
    },
    "FetchMessage": {
      "type": "object",
      "additionalProperties": false,
      "title": "Fetch Message",
      "description": "Used to request an ActivityPub object to be retrieved",
      "properties": {
        "actor": {
          "type": "string",
          "description": "actor_id of the actor that received the message"
        },
        "uri": {
          "type": "string",
          "description": "URI of the object being retrieved"
        }
      },
      "required": ["uri", "actor"]
    }
  }
}

cattle_grid.activity_pub.processing

cattle_grid.activity_pub.processing.incoming

incoming_accept_activity(msg, actor=Depends(actor_for_message), broker=Context()) async

Handles an incoming Accept activity

Source code in cattle_grid/activity_pub/processing/incoming.py
async def incoming_accept_activity(
    msg: ActivityMessage, actor: Actor = Depends(actor_for_message), broker=Context()
):
    """Handles an incoming Accept activity"""
    accept_request = msg.data
    request_being_accepted = id_for_object(accept_request.get("object"))

    following = await Following.get_or_none(request=request_being_accepted)

    if not following:
        logger.warning("Follow request with id '%s' not found", request_being_accepted)
        return

    following.accepted = True
    await following.save()

    logger.info("Processed follow request %s (following)", request_being_accepted)

incoming_block_activity(msg, actor=Depends(actor_for_message), broker=Context()) async

Handles an incoming Block activity

Source code in cattle_grid/activity_pub/processing/incoming.py
async def incoming_block_activity(
    msg: ActivityMessage, actor: Actor = Depends(actor_for_message), broker=Context()
):
    """Handles an incoming Block activity"""
    current_actor_id = msg.data.get("object")
    if current_actor_id != actor.actor_id:
        logger.warning("Mismatch of actor and target of block")
        return

    actor_blocking = msg.data.get("actor")

    following = await Following.get_or_none(following=actor_blocking)

    if not following:
        return

    await following.delete()

incoming_follow_request(msg, actor=Depends(actor_for_message), broker=Context()) async

Handles an incoming follow request

Source code in cattle_grid/activity_pub/processing/incoming.py
async def incoming_follow_request(
    msg: ActivityMessage, actor: Actor = Depends(actor_for_message), broker=Context()
):
    """Handles an incoming follow request"""
    follow_request = msg.data
    to_follow = follow_request.get("object")
    if isinstance(to_follow, dict):
        to_follow = to_follow.get("id")

    if to_follow is None or to_follow != actor.actor_id:
        return

    request_id = follow_request.get("id")
    follower = follow_request.get("actor")

    await Follower.update_or_create(
        actor=actor,
        follower=follower,
        defaults={"request": request_id, "accepted": False},
    )

    if actor.automatically_accept_followers:
        actor_object = actor_to_object(actor)
        activity_factory, _ = factories_for_actor_object(actor_object)
        accept = activity_factory.accept(follow_request).build()

        await broker.publish(
            ActivityMessage(actor=actor.actor_id, activity_type="Accept", data=accept),
            routing_key="outgoing.Accept",
            exchange=internal_exchange(),
        )
        logger.info(
            "Got follow request from %s with id %s (auto accepted)",
            follower,
            request_id,
        )

    else:
        logger.info("Got follow request from %s with id %s", follower, request_id)

incoming_reject_activity(msg, actor=Depends(actor_for_message), broker=Context()) async

Handles an incoming Reject activity

Source code in cattle_grid/activity_pub/processing/incoming.py
async def incoming_reject_activity(
    msg: ActivityMessage, actor: Actor = Depends(actor_for_message), broker=Context()
):
    """Handles an incoming Reject activity"""
    reject_request = msg.data
    request_being_rejected = reject_request.get("object")

    following = await Following.get_or_none(request=request_being_rejected)

    if not following:
        return

    await following.delete()

incoming_undo_activity(msg, actor=Depends(actor_for_message), broker=Context()) async

Handles an incoming Undo activity

Source code in cattle_grid/activity_pub/processing/incoming.py
async def incoming_undo_activity(
    msg: ActivityMessage, actor: Actor = Depends(actor_for_message), broker=Context()
):
    """Handles an incoming Undo activity"""
    accept_request = msg.data
    request_being_undone = accept_request.get("object")

    follower = await Follower.get_or_none(request=request_being_undone)

    if not follower:
        return

    await follower.delete()

cattle_grid.activity_pub.processing.outgoing

outgoing_accept_request(msg, actor=Depends(actor_for_message), broker=Context()) async

Handles an outgoing Accept activity

Source code in cattle_grid/activity_pub/processing/outgoing.py
async def outgoing_accept_request(
    msg: ActivityMessage, actor: Actor = Depends(actor_for_message), broker=Context()
):
    """Handles an outgoing Accept activity"""
    accept_request = msg.data
    request_being_accepted = id_for_object(accept_request.get("object"))

    follower = await Follower.get_or_none(request=request_being_accepted)

    if not follower:
        logger.warning("Follow request with id '%s' not found", request_being_accepted)
        return

    follower.accepted = True
    await follower.save()

    logger.info("Accepted follow request %s", request_being_accepted)

outgoing_block_activity(msg, actor=Depends(actor_for_message), broker=Context()) async

Handles an outgoing Block activity

Source code in cattle_grid/activity_pub/processing/outgoing.py
async def outgoing_block_activity(
    msg: ActivityMessage, actor: Actor = Depends(actor_for_message), broker=Context()
):
    """Handles an outgoing Block activity"""
    block_request = msg.data
    actor_being_blocked = block_request.get("object")

    follower = await Follower.get_or_none(follower=actor_being_blocked)
    if follower:
        await follower.delete()
        return

outgoing_follow_request(msg, actor=Depends(actor_for_message), broker=Context()) async

Handles an outgoing Follow request

Source code in cattle_grid/activity_pub/processing/outgoing.py
async def outgoing_follow_request(
    msg: ActivityMessage, actor: Actor = Depends(actor_for_message), broker=Context()
):
    """Handles an outgoing Follow request"""

    follow_request = msg.data
    to_follow = follow_request.get("object")
    if isinstance(to_follow, dict):
        to_follow = to_follow.get("id")

    if to_follow is None:
        return

    logger.info("Send follow request to %s", to_follow)

    await Following.update_or_create(
        actor=actor,
        following=to_follow,
        defaults={"request": follow_request.get("id"), "accepted": False},
    )

outgoing_message_distribution(msg, broker=Context()) async

Distributes the message to its recipients

Source code in cattle_grid/activity_pub/processing/outgoing.py
async def outgoing_message_distribution(msg: ActivityMessage, broker=Context()):
    """Distributes the message to its recipients"""

    recipients = recipients_for_object(msg.data)
    recipients = remove_public(recipients)

    logger.debug("Got recipients %s", ", ".join(recipients))

    recipients = await update_recipients_for_collections(msg, recipients)

    exchange = internal_exchange()
    logger.info("Got exchange")
    logger.info(exchange)

    for recipient in recipients:
        await broker.publish(
            ToSendMessage(actor=msg.actor, data=msg.data, target=recipient),
            exchange=exchange,
            routing_key="to_send",
        )

outgoing_reject_activity(msg, actor=Depends(actor_for_message), broker=Context()) async

Handles an outgoing Reject activity

Source code in cattle_grid/activity_pub/processing/outgoing.py
async def outgoing_reject_activity(
    msg: ActivityMessage, actor: Actor = Depends(actor_for_message), broker=Context()
):
    """Handles an outgoing Reject activity"""
    reject_request = msg.data
    request_being_rejected = reject_request.get("object")

    follower = await Follower.get_or_none(request=request_being_rejected)
    if follower:
        await follower.delete()
        return

outgoing_undo_request(msg, actor=Depends(actor_for_message), broker=Context()) async

Handles an outgoing Undo activity

Source code in cattle_grid/activity_pub/processing/outgoing.py
async def outgoing_undo_request(
    msg: ActivityMessage, actor: Actor = Depends(actor_for_message), broker=Context()
):
    """Handles an outgoing Undo activity"""
    accept_request = msg.data
    request_being_undone = accept_request.get("object")

    following = await Following.get_or_none(request=request_being_undone)
    if following:
        await following.delete()
        return

cattle_grid.activity_pub.processing.remote

fetch_object(msg, actor=Depends(bovine_actor_for_message), session=Context()) async

Handles retrieving a remote object

Source code in cattle_grid/activity_pub/processing/remote.py
async def fetch_object(
    msg: FetchMessage,
    actor=Depends(bovine_actor_for_message),
    session: aiohttp.ClientSession = Context(),
):
    """Handles retrieving a remote object"""
    await actor.init(session=session)

    result = await actor.get(msg.uri, fail_silently=True)

    return result

resolve_inbox(actor, target) async

Resolves the inbox of target for actor using a cache

Source code in cattle_grid/activity_pub/processing/remote.py
async def resolve_inbox(actor, target):
    """Resolves the inbox of target for actor using
    a cache"""
    cached = await InboxLocation.get_or_none(actor=target)
    if cached:
        return cached.inbox

    target_actor = await actor.get(target)
    if not target_actor:
        return None

    inbox = target_actor.get("inbox")
    if inbox is None:
        return

    await InboxLocation.update_or_create(actor=target, defaults={"inbox": inbox})

    return inbox

sending_message(msg, actor=Depends(bovine_actor_for_message), session=Context()) async

Handles sending a message

Source code in cattle_grid/activity_pub/processing/remote.py
async def sending_message(
    msg: ToSendMessage,
    actor=Depends(bovine_actor_for_message),
    session: aiohttp.ClientSession = Context(),
):
    """Handles sending a message"""
    await actor.init(session=session)

    # FIXME Cache actor?

    inbox = await resolve_inbox(actor, msg.target)

    logger.info("actor send %s", actor.actor_id)
    logger.info("actor from message %s", msg.data.get("actor", ""))

    if inbox:
        result = await actor.post(inbox, msg.data)
        logger.info("Got %s for sending to %s", str(result), inbox)