Skip to content

.exchange

cattle_grid.exchange

create_router(main_exchange=exchange())

Creates a router to be used to manage users

Parameters:

Name Type Description Default
config
required

Returns:

Type Description
RabbitRouter
Source code in cattle_grid/exchange/__init__.py
def create_router(main_exchange: RabbitExchange = exchange()) -> RabbitRouter:
    """Creates a router to be used to manage users

    :param config:
    :returns:
    """

    router = RabbitRouter()

    routing_config: List[Tuple[str, Callable[[Any], Awaitable[Any]]]] = [
        ("create_actor", create_actor),
        ("update_actor", update_actor),
        ("delete_actor", delete_actor),
        ("send_message", send_message),
        ("ffetch_object", fetch_object),
    ]

    prefix = "cattle_grid_"

    for routing_key, coroutine in routing_config:
        router.subscriber(
            RabbitQueue(prefix + routing_key, routing_key=routing_key),
            exchange=main_exchange,
        )(coroutine)

    return router

actor_handlers

create_actor(msg) async

Creates a user. Routing_key create_user.

Parameters:

Name Type Description Default
msg CreateActorMessage
required

Returns:

Type Description
CreateActorResult
Source code in cattle_grid/exchange/actor_handlers.py
async def create_actor(msg: CreateActorMessage) -> CreateActorResult:
    """Creates a user. Routing_key `create_user`.

    :param msg:
    :return:
    """
    try:
        actor = await ap_create_actor(
            msg.baseUrl, preferred_username=msg.preferredUsername, profile=msg.profile
        )

        if msg.autoFollow:
            actor.automatically_accept_followers = True
            await actor.save()

        result = actor_to_object(actor)

        logger.info("Created actor with id %s", result.get("id"))

        return CreateActorResult(actor=result)
    except Exception as e:
        logger.exception(e)
        return CreateActorResult(actor={})

delete_actor(msg) async

Deletes the actor by id. Should be used asynchronously.

Source code in cattle_grid/exchange/actor_handlers.py
async def delete_actor(msg: DeleteActorMessage) -> None:
    """
    Deletes the actor by id. Should be used asynchronously.

    """

update_actor(msg) async

Should be used asynchronously

Source code in cattle_grid/exchange/actor_handlers.py
async def update_actor(msg: UpdateActorMessage) -> None:
    """Should be used asynchronously"""
    actor = await Actor.get_or_none(actor_id=msg.actor)
    if actor is None:
        return
    if msg.profile:
        actor.profile = msg.profile

    await actor.save()

data_types

ActivityMessage

Bases: BaseModel

Message that contains an Activity. Activity is used as the name for the ‘data object’ being exchanged, as is common in the Fediverse

Source code in cattle_grid/exchange/data_types.py
class ActivityMessage(BaseModel):
    """
    Message that contains an Activity. Activity is used as the name for the 'data object' being exchanged, as is common in the Fediverse
    """

    model_config = ConfigDict(
        extra="forbid",
    )
    actor: str = Field(
        description="""
    actor_id of the actor that received the message
    """,
        examples=["http://local.example/actor"],
    )
    """ """
    data: Activity = Field(
        description="""
    Activity.
    """
    )
    """ """
actor: str = Field(description='\n actor_id of the actor that received the message\n ', examples=['http://local.example/actor']) class-attribute instance-attribute
data: Activity = Field(description='\n Activity.\n ') class-attribute instance-attribute

DeleteActorMessage

Bases: BaseModel

Allows one to delete the actor object

Source code in cattle_grid/exchange/data_types.py
class DeleteActorMessage(BaseModel):
    """
    Allows one to delete the actor object
    """

    model_config = ConfigDict(
        extra="forbid",
    )
    actor: str = Field(
        ...,
        examples=["http://local.example/actor"],
        description="""
    The URI of the actor being deleted.
    """,
    )

UpdateActorMessage

Bases: BaseModel

Allows one to update the actor object

Source code in cattle_grid/exchange/data_types.py
class UpdateActorMessage(BaseModel):
    """
    Allows one to update the actor object
    """

    model_config = ConfigDict(
        extra="forbid",
    )
    actor: str = Field(
        ...,
        examples=["http://local.example/actor"],
        description="""
    The URI of the actor being updated. Must be managed by cattle_grid
    """,
    )
    profile: Dict[str, Any] | None = Field(
        None,
        examples=[{"summary": "A new description of the actor"}],
        description="""
    New profile object for the actor. The fields.
    """,
    )
    autoFollow: bool | None = Field(
        None,
        examples=[True],
        description="""
    Enables setting actors to automatically accept follow requests
    """,
    )

gateway

The exchanges used by cattle_grid are using routing keys to make processing easier. The cattle_grid gateway takes these messages and readdresses them with routing keys based on an user. Here an user can have multiple actors.

Furthermore, convenience methods are provided to manage users and actors through a HTTP Api. This is in difference to interacting with the Fediverse, which is done through a message queue.

create_gateway_router()

Creates a router that moves messages to be routed by user.

Parameters:

Name Type Description Default
config
required

Returns:

Type Description
RabbitRouter
Source code in cattle_grid/exchange/gateway/__init__.py
def create_gateway_router() -> RabbitRouter:
    """Creates a router that moves messages to be routed by user.

    :param config:
    :returns:
    """
    gateway_router = RabbitRouter()
    gateway_router.include_router(router)

    return gateway_router

data_types

Action

Bases: str, Enum

Action to be taken by the gateway

Source code in cattle_grid/exchange/gateway/data_types.py
class Action(str, Enum):
    """
    Action to be taken by the gateway
    """

    self = "self"
    send = "send"
    fetch = "fetch"
    profile = "profile"
    options = "options"
    manage_actor = "manage_actor"
    delete_actor = "delete_actor"
Action1

Bases: str, Enum

Action taken by the gateway

Source code in cattle_grid/exchange/gateway/data_types.py
class Action1(str, Enum):
    """
    Action taken by the gateway
    """

    self = "self"
    fetch_result = "fetch_result"
    incoming = "incoming"
    profile = "profile"
    options = "options"
GatewayMessage

Bases: BaseModel

Message send to the gateway then forwarded to cattle_grid

Source code in cattle_grid/exchange/gateway/data_types.py
class GatewayMessage(BaseModel):
    """
    Message send to the gateway then forwarded to cattle_grid
    """

    model_config = ConfigDict(
        extra="forbid",
    )
    action: Action
    """
    Action to be taken by the gateway
    """
    actor: str
    """
    actor_id of the actor that is performing the action. Actor must belong to the user
    """
    data: Dict[str, Any]
    """
    Activity to be send or object to fetch/delete etc ... not sure yet. this seems awkward
    """
action: Action instance-attribute

Action to be taken by the gateway

actor: str instance-attribute

actor_id of the actor that is performing the action. Actor must belong to the user

data: Dict[str, Any] instance-attribute

Activity to be send or object to fetch/delete etc … not sure yet. this seems awkward

GatewayMessageResult

Bases: BaseModel

Message send from the gateway

Source code in cattle_grid/exchange/gateway/data_types.py
class GatewayMessageResult(BaseModel):
    """
    Message send from the gateway
    """

    model_config = ConfigDict(
        extra="forbid",
    )
    action: Action1
    """
    Action taken by the gateway
    """
    actor: str
    """
    actor_id of the actor that is receiving the message.
    """
    data: Dict[str, Any]
    """
    Received data.
    """
action: Action1 instance-attribute

Action taken by the gateway

actor: str instance-attribute

actor_id of the actor that is receiving the message.

data: Dict[str, Any] instance-attribute

Received data.

router

handle_manage_actor(msg, actor=Depends(actor), broker=Context()) async

Updates the actor

Parameters:

Name Type Description Default
broker RabbitBroker
Context()
actor Actor
Depends(actor)
msg GatewayMessage
required
Source code in cattle_grid/exchange/gateway/router.py
@send_subscriber(filter=filter_for_action(Action.manage_actor))
async def handle_manage_actor(
    msg: GatewayMessage,
    actor: Actor = Depends(actor),
    broker: RabbitBroker = Context(),
):
    """Updates the actor

    :param broker:
    :param actor:
    :param msg:
    """
    logger.info(msg.data)

    data = UpdateActorMessage.model_validate(msg.data)
    logger.info("Updating actor %s", data.actor)

    if data.autoFollow:
        actor.automatically_accept_followers = data.autoFollow
    if data.profile:
        actor.profile = data.profile

        await broker.publish(
            StoreActivityMessage(
                actor=data.actor, data=update_for_actor_profile(actor)
            ),
            routing_key="store_activity",
            exchange=internal_exchange(),
        )

    await actor.save()
name_from_routing_key(routing_key)
>>> name_from_routing_key("receiving.alice")
'alice'
Source code in cattle_grid/exchange/gateway/router.py
def name_from_routing_key(routing_key: str) -> str:
    """
    ```pycon
    >>> name_from_routing_key("receiving.alice")
    'alice'

    ```
    """
    return routing_key.split(".", 2)[1]

message_handlers

send_message(msg, broker=Context()) async

Takes a message and ensure it is distributed appropriatelty

Source code in cattle_grid/exchange/message_handlers.py
async def send_message(
    msg: TrueActivityMessage,
    broker: RabbitBroker = Context(),
) -> None:
    """Takes a message and ensure it is distributed appropriatelty"""

    content = msg.data.model_dump(exclude_none=True, by_alias=True)
    activity_type = determine_activity_type(content)

    if not activity_type:
        return

    to_send = RawActivityMessage(
        actor=msg.actor, data=content, activity_type=activity_type
    )

    await broker.publish(
        to_send, exchange=exchange(), routing_key=f"outgoing.{activity_type}"
    )
    await broker.publish(
        to_send, exchange=internal_exchange(), routing_key=f"outgoing.{activity_type}"
    )

server

gateway_auth_router = APIRouter(prefix='/admin') module-attribute

Blueprint to be included to provide the endpoints for authentication with rabbitmq

create_exchange_api_router(config)

Creates a blueprint for HTTP methods of the gateway. One should note that these mostly exist to fulfill secondary concerns of the gateway. Most of the work is done by the router.

Parameters:

Name Type Description Default
config LazySettings
required

Returns:

Type Description
APIRouter
Source code in cattle_grid/exchange/server/__init__.py
def create_exchange_api_router(config: LazySettings) -> APIRouter:
    """Creates a blueprint for HTTP methods of the gateway.
    One should note that these mostly exist to fulfill secondary
    concerns of the gateway. Most of the work is done by the router.

    :param config:
    :return:
    """
    router = APIRouter()

    if config.get("gateway", {}).get("enable_authentication"):
        router.include_router(gateway_auth_router)

    router.include_router(create_user_router(config), prefix="/ap/admin")

    return router

rabbit

Implementation of a HTTP auth backend for rabbitmq.

A possible configuration can be in the form of

/etc/rabbitmq/conf.d/03_http_auth.conf
auth_backends.1 = internal

auth_backends.2 = http
auth_http.http_method = post
auth_http.user_path = http://cattle_grid_app/admin/rabbitmq/user
auth_http.vhost_path = http://cattle_grid_app/admin/rabbitmq/vhost
auth_http.resource_path = http://cattle_grid_app/admin/rabbitmq/resource
auth_http.topic_path = http://cattle_grid_app/admin/rabbitmq/topic
topic_auth(username, name, routing_key) async

Checks if topic is allowed. Currently allowed are

exchange = "amq.topic"

and the routing keys send.username and receive.username

Source code in cattle_grid/exchange/server/rabbit.py
@rabbit_router.post("/topic", response_class=PlainTextResponse)
async def topic_auth(
    username: Annotated[str, Form()],
    name: Annotated[str, Form()],
    routing_key: Annotated[str, Form()],
) -> str:
    """Checks if topic is allowed. Currently allowed are

    ```
    exchange = "amq.topic"
    ```

    and the routing keys `send.username` and `receive.username`
    """
    if name != "amq.topic":
        logger.warning("User %s tried to access exchange %s", username, name)
        return "deny"
    if routing_key not in [f"send.{username}", f"receive.{username}"]:
        logger.warning(
            "User %s tried to subscribe to routing_key %s",
            username,
            routing_key,
        )
        return "deny"

    return "allow"
user_auth(username, password) async

Checks login with username/password

Source code in cattle_grid/exchange/server/rabbit.py
@rabbit_router.post("/user", response_class=PlainTextResponse)
async def user_auth(
    username: Annotated[str, Form()], password: Annotated[str, Form()]
) -> str:
    """Checks login with username/password"""
    user = await user_with_username_password(username, password)

    if not user:
        logger.warning("Failed login to message broker with username '%s'", username)
        return "deny"

    return "allow"
vhost_auth(username, vhost) async

Authentication for vhosts, currently only “/” is allowed

Source code in cattle_grid/exchange/server/rabbit.py
@rabbit_router.post("/vhost", response_class=PlainTextResponse)
async def vhost_auth(
    username: Annotated[str, Form()], vhost: Annotated[str, Form()]
) -> str:
    """Authentication for vhosts, currently only "/" is allowed"""
    if vhost != "/":
        logger.warning("User %s tried to access vhost %s", username, vhost)
        return "deny"
    return "allow"

shovel

incoming_shovel(msg, broker=Context(), routing_key=Context('message.raw_message.routing_key')) async

Transforms and resends the incoming activity

Source code in cattle_grid/exchange/shovel.py
async def incoming_shovel(
    msg: RawActivityMessage,
    broker: RabbitBroker = Context(),
    routing_key: str = Context("message.raw_message.routing_key"),
) -> None:
    """Transforms and resends the incoming activity"""
    activity = normalize_activity(msg.data, msg.actor)

    new_msg = ActivityMessage(actor=msg.actor, data=activity)

    await broker.publish(new_msg, routing_key=routing_key, exchange="FIXME")