Skip to content

.activity_pub.server

cattle_grid.activity_pub.server

This package contains the overall router for all connection needs to the Fediverse. This means the .well-known endpoints.

webfinger_responder(resource) async

Handles requests to .well-known/webfinger. Results are determined by the identifier property of PublicIdentifier matching the resource parameter.

See RFC 7033 WebFinger.

Source code in cattle_grid/activity_pub/server/__init__.py
@router.get("/.well-known/webfinger")
async def webfinger_responder(resource: str) -> JrdData:
    """Handles requests to .well-known/webfinger. Results are determined by the identifier property of [PublicIdentifier][cattle_grid.activity_pub.models.PublicIdentifier] matching the resource
    parameter.

    See [RFC 7033 WebFinger](https://www.rfc-editor.org/rfc/rfc7033).
    """

    logger.info("looking up web finger for resource '%s'", resource)

    pi = await PublicIdentifier.get_or_none(identifier=resource)
    if not pi:
        raise HTTPException(status_code=404, detail="Item not found")

    await pi.fetch_related("actor")

    return webfinger_response(pi.identifier, pi.actor.actor_id)

router

ActivityPub related functionality

APHeaders

Bases: BaseModel

Headers every request should have. These should be added by the remote proxy.

Parameters:

Name Type Description Default
x_cattle_grid_requester str
required
x_ap_location str
required
Source code in cattle_grid/activity_pub/server/router.py
class APHeaders(BaseModel):
    """Headers every request should have. These should be added by the remote proxy."""

    x_cattle_grid_requester: str
    """URI of the actor making the request"""
    x_ap_location: str
    """URI of the resource being retrieved"""
x_ap_location instance-attribute

URI of the resource being retrieved

x_cattle_grid_requester instance-attribute

URI of the actor making the request

ActivityResponse

Bases: JSONResponse

Response that ensures the content-type is “application/activity+json”

Source code in cattle_grid/activity_pub/server/router.py
class ActivityResponse(JSONResponse):
    """Response that ensures the content-type is
    "application/activity+json"
    """

    media_type = "application/activity+json"

actor_profile(id_str, headers) async

Returns the actor

Source code in cattle_grid/activity_pub/server/router.py
@ap_router.get("/actor/{id_str}", response_class=ActivityResponse)
async def actor_profile(id_str, headers: ActivityPubHeaders):
    """Returns the actor"""
    actor = validate_actor(await Actor.get_or_none(actor_id=headers.x_ap_location))
    await ensure_not_blocked(actor, headers.x_cattle_grid_requester)

    await actor.fetch_related("identifiers")

    result = actor_to_object(actor)
    return result

followers(id_str, headers) async

Returns the followers

Source code in cattle_grid/activity_pub/server/router.py
@ap_router.get("/followers/{id_str}", response_class=ActivityResponse)
async def followers(id_str, headers: ActivityPubHeaders):
    """Returns the followers"""
    actor = validate_actor(await Actor.get_or_none(followers_uri=headers.x_ap_location))
    await ensure_not_blocked(actor, headers.x_cattle_grid_requester)

    followers = await followers_for_actor(actor)
    return OrderedCollection(id=headers.x_ap_location, items=followers).build()

following(id_str, headers) async

Returns the following

Source code in cattle_grid/activity_pub/server/router.py
@ap_router.get("/following/{id_str}", response_class=ActivityResponse)
async def following(id_str, headers: ActivityPubHeaders):
    """Returns the following"""
    actor = validate_actor(await Actor.get_or_none(following_uri=headers.x_ap_location))
    await ensure_not_blocked(actor, headers.x_cattle_grid_requester)

    following = await following_for_actor(actor)
    return OrderedCollection(id=headers.x_ap_location, items=following).build()

outbox(id_str, headers) async

Returns an empty ordered collection as outbox

Source code in cattle_grid/activity_pub/server/router.py
@ap_router.get("/outbox/{id_str}", response_class=ActivityResponse)
async def outbox(id_str, headers: ActivityPubHeaders):
    """Returns an empty ordered collection as outbox"""
    actor = validate_actor(await Actor.get_or_none(outbox_uri=headers.x_ap_location))
    await ensure_not_blocked(actor, headers.x_cattle_grid_requester)

    return OrderedCollection(id=headers.x_ap_location).build()

router_inbox

ActivityPub related functionality

APHeadersWithDigest

Bases: APHeaders

The addition of digest headers

Parameters:

Name Type Description Default
x_cattle_grid_requester str
required
x_ap_location str
required
digest str | None
None
content_digest str | None
None
Source code in cattle_grid/activity_pub/server/router_inbox.py
class APHeadersWithDigest(APHeaders):
    """The addition of digest headers"""

    digest: str | None = None
    """Legacy digest"""
    content_digest: str | None = None
    """Digest according to [RFC 9530 Digest Fields](https://www.rfc-editor.org/rfc/rfc9530.html)"""
content_digest = None class-attribute instance-attribute

Digest according to RFC 9530 Digest Fields

digest = None class-attribute instance-attribute

Legacy digest

inbox(id_str, request, headers, broker, exchange) async

Processes an inbox message

Source code in cattle_grid/activity_pub/server/router_inbox.py
@ap_router_inbox.post("/inbox/{id_str}", status_code=202)
async def inbox(
    id_str,
    request: Request,
    headers: Annotated[APHeadersWithDigest, Header()],
    broker: Broker,
    exchange: InternalExchange,
):
    """Processes an inbox message"""
    logger.info("Got incoming request")
    actor = await Actor.get_or_none(inbox_uri=headers.x_ap_location)
    if actor is None:
        raise HTTPException(404)

    try:
        data = await request.body()
        digest_headers = {}
        if headers.digest:
            digest_headers["digest"] = headers.digest
        if headers.content_digest:
            digest_headers["content-digest"] = headers.content_digest

        if not validate_digest(digest_headers, data):
            raise HTTPException(400)
        data = json.loads(data)
        if not isinstance(data, dict):
            logger.info("Could not parse request body")
            logger.debug(data)
            raise HTTPException(422)

        request_actor = data.get("actor")

        if request_actor != headers.x_cattle_grid_requester:
            raise HTTPException(401)

        await enqueue_from_inbox(broker, exchange, actor.actor_id, data)

        return ""

    except Exception as e:
        if isinstance(e, HTTPException):
            raise e
        logger.error("Processing post request failed with %s", e)
        logger.exception(e)

        raise HTTPException(422)

router_object

ActivityPub related functionality

return_object(obj_id, headers) async

Returns the stored activities

Source code in cattle_grid/activity_pub/server/router_object.py
@ap_router_object.get("/object/{obj_id}", response_class=ActivityResponse)
async def return_object(obj_id, headers: ActivityPubHeaders):
    """Returns the stored activities"""

    obj = await StoredActivity.get_or_none(id=obj_id).prefetch_related("actor")

    if obj is None or not isinstance(obj.data, dict):
        raise HTTPException(404)

    if obj.data.get("id") != headers.x_ap_location:
        raise HTTPException(404)

    if not await is_valid_requester(
        headers.x_cattle_grid_requester, obj.actor, obj.data
    ):
        raise HTTPException(401)

    return obj.data