Skip to content

.exchange

cattle_grid.exchange

create_router(main_exchange=exchange(), include_shovels=True)

Creates a router to be used to manage users

Source code in cattle_grid/exchange/__init__.py
def create_router(
    main_exchange: RabbitExchange = exchange(), include_shovels=True
) -> RabbitRouter:
    """Creates a router to be used to manage users"""

    router = RabbitRouter()

    if include_shovels:
        router.subscriber("incoming.#", internal_exchange())(incoming_shovel)
        router.subscriber(
            RabbitQueue("outgoing" + str(uuid4()), routing_key="outgoing.#"),
            internal_exchange(),
        )(outgoing_shovel)

    routing_config: List[Tuple[str, Any]] = [
        ("update_actor", update_actor),
        ("delete_actor", delete_actor_handler),
        ("send_message", send_message),
        ("ffetch_object", fetch_object),
    ]

    prefix = "cattle_grid_"

    for routing_key, coroutine in routing_config:
        router.subscriber(
            RabbitQueue(prefix + routing_key, routing_key=routing_key),
            exchange=main_exchange,
            title=routing_key,
        )(coroutine)

    return router

handlers

delete_actor_handler(msg, broker) async

Deletes the actor by id. Should be used asynchronously.

Source code in cattle_grid/exchange/handlers.py
async def delete_actor_handler(
    msg: DeleteActorMessage, broker: Annotated[RabbitBroker, Context()]
) -> None:
    """
    Deletes the actor by id. Should be used asynchronously.

    """

    logger.info("Deleting actor %s", msg.actor)
    actor = await Actor.get_or_none(actor_id=msg.actor)

    if actor is None:
        raise ValueError("Actor not found")

    await broker.publish(
        StoreActivityMessage(
            actor=actor.actor_id, data=delete_for_actor_profile(actor)
        ),
        routing_key="store_activity",
        exchange=internal_exchange(),
    )

    await delete_actor(actor)

update_actor(msg, broker=Context()) async

Should be used asynchronously

Source code in cattle_grid/exchange/handlers.py
async def update_actor(
    msg: UpdateActorMessage, broker: RabbitBroker = Context()
) -> None:
    """Should be used asynchronously"""

    actor = await Actor.get_or_none(actor_id=msg.actor)
    if actor is None:
        return

    send_update = False

    for action in msg.actions:
        if await handle_actor_action(actor, action):
            await actor.refresh_from_db()
            await actor.fetch_related("identifiers")
            send_update = True

    if msg.profile:
        actor.profile = msg.profile

        logger.info("Updating actor %s", actor.actor_id)
        await actor.save()
        await actor.fetch_related("identifiers")
        send_update = True

    if msg.autoFollow is not None:
        actor.automatically_accept_followers = msg.autoFollow
        await actor.save()

    if send_update:
        await broker.publish(
            StoreActivityMessage(actor=msg.actor, data=update_for_actor_profile(actor)),
            routing_key="store_activity",
            exchange=internal_exchange(),
        )

message_handlers

send_message(msg, broker=Context()) async

Takes a message and ensure it is distributed appropriatelty

Source code in cattle_grid/exchange/message_handlers.py
async def send_message(
    msg: ActivityMessage,
    broker: RabbitBroker = Context(),
) -> None:
    """Takes a message and ensure it is distributed appropriatelty"""

    content = msg.data
    activity_type = determine_activity_type(content)

    if not activity_type:
        return

    to_send = ActivityMessage(actor=msg.actor, data=content)

    await broker.publish(
        to_send, exchange=exchange(), routing_key=f"outgoing.{activity_type}"
    )
    await broker.publish(
        to_send, exchange=internal_exchange(), routing_key=f"outgoing.{activity_type}"
    )

server

create_exchange_api_router(config)

Creates a API Router for HTTP methods of the gateway. One should note that these mostly exist to fulfill secondary concerns of the gateway. Most of the work is done by the router.

Parameters:

Name Type Description Default
config LazySettings
required

Returns:

Type Description
APIRouter
Source code in cattle_grid/exchange/server/__init__.py
def create_exchange_api_router(config: LazySettings) -> APIRouter:
    """Creates a API Router for HTTP methods of the gateway.
    One should note that these mostly exist to fulfill secondary
    concerns of the gateway. Most of the work is done by the router.

    :param config:
    :return:
    """
    router = APIRouter()

    if config.get("gateway", {}).get("enable_authentication"):
        router.include_router(gateway_auth_router)

    router.include_router(create_user_router(config), prefix="/ap/admin")

    return router

shovel

should_shovel_activity(activity) async

Some activities like Block or Undo Block should not be visible to the user. This method returns False if this is the case.

Source code in cattle_grid/exchange/shovel.py
async def should_shovel_activity(activity: dict) -> bool:
    """Some activities like Block or Undo Block should not be visible to the user. This method
    returns False if this is the case."""

    activity_type = activity.get("type")

    if activity_type == "Block":
        return False

    logger.info([x.request for x in await Blocking.filter().all()])

    if activity_type == "Undo":
        object_id = id_for_object(activity.get("object"))
        blocking = await Blocking.get_or_none(request=object_id)

        if blocking:
            return False

    return True

test_actor_update

create_actor(base_url, preferred_username=None, identifiers={}, profile={}) async

Creates a new actor in the database

Source code in cattle_grid/activity_pub/actor.py
async def create_actor(
    base_url: str,
    preferred_username: str | None = None,
    identifiers: dict = {},
    profile: dict = {},
):
    """Creates a new actor in the database"""

    public_key, private_key = generate_rsa_public_private_key()
    public_key_name = "legacy-key-1"
    actor_id = new_url(base_url, "actor")

    actor = await Actor.create(
        actor_id=actor_id,
        inbox_uri=new_url(base_url, "inbox"),
        outbox_uri=new_url(base_url, "outbox"),
        following_uri=new_url(base_url, "following"),
        followers_uri=new_url(base_url, "followers"),
        public_key_name=public_key_name,
        public_key=public_key,
        profile=profile,
        automatically_accept_followers=False,
    )
    await Credential.create(
        actor_id=actor_id,
        identifier=f"{actor_id}#{public_key_name}",
        secret=private_key,
    )

    if preferred_username:
        if "webfinger" in identifiers:
            raise ValueError("webfinger key set in identifiers")
        identifiers = {
            **identifiers,
            "webfinger": compute_acct_uri(base_url, preferred_username),
        }

    if "activitypub_id" not in identifiers:
        identifiers = {**identifiers, "activitypub_id": actor_id}

    for name, identifier in identifiers.items():
        await PublicIdentifier.create(actor=actor, name=name, identifier=identifier)

    logging.info("Created actor with id '%s'", actor_id)

    await actor.fetch_related("identifiers")

    return actor

database() async

Fixture so that the database is initialized

Source code in cattle_grid/testing/fixtures.py
@pytest.fixture(autouse=True)
async def database():
    """Fixture so that the database is initialized"""
    async with with_database(db_uri="sqlite://:memory:", generate_schemas=True):
        yield

new_auth_config(actor_id, username=None)

Creates a new authorization configuration

Source code in cattle_grid/config/auth.py
def new_auth_config(actor_id: str, username: str | None = None) -> AuthConfig:
    """Creates a new authorization configuration"""
    if not username:
        username = secrets.token_urlsafe(12)

    domain = urlparse(actor_id).netloc
    acct_uri = f"acct:{username}@{domain}"

    public_key, private_key = generate_rsa_public_private_key()

    auth_config = AuthConfig(
        actor_id=actor_id,
        actor_acct_id=acct_uri,
        public_key=public_key,
        private_key=private_key,
        domain_blocks=set(),
    )

    return auth_config

save_auth_config(filename, config)

Saves the authorization configuration to a file

Source code in cattle_grid/config/auth.py
def save_auth_config(filename: str, config: AuthConfig) -> None:
    """Saves the authorization configuration to a file"""
    with open(filename, "wb") as fp:
        tomli_w.dump({"auth": config.model_dump()}, fp, multiline_strings=True)

test_actor() async

Fixture to create an actor

Source code in cattle_grid/testing/fixtures.py
@pytest.fixture
async def test_actor():
    """Fixture to create an actor"""
    return await create_actor("http://localhost/ap")

with_database(db_uri='sqlite://:memory:', generate_schemas=False) async

Opens the connection to the database using tortoise

Source code in cattle_grid/database.py
@asynccontextmanager
async def database(db_uri: str = "sqlite://:memory:", generate_schemas: bool = False):
    """Opens the connection to the database using tortoise"""
    await Tortoise.init(config=tortoise_config(db_uri))
    if generate_schemas:
        await Tortoise.generate_schemas()

    try:
        yield
    finally:
        await Tortoise.close_connections()

test_handlers

create_actor(base_url, preferred_username=None, identifiers={}, profile={}) async

Creates a new actor in the database

Source code in cattle_grid/activity_pub/actor.py
async def create_actor(
    base_url: str,
    preferred_username: str | None = None,
    identifiers: dict = {},
    profile: dict = {},
):
    """Creates a new actor in the database"""

    public_key, private_key = generate_rsa_public_private_key()
    public_key_name = "legacy-key-1"
    actor_id = new_url(base_url, "actor")

    actor = await Actor.create(
        actor_id=actor_id,
        inbox_uri=new_url(base_url, "inbox"),
        outbox_uri=new_url(base_url, "outbox"),
        following_uri=new_url(base_url, "following"),
        followers_uri=new_url(base_url, "followers"),
        public_key_name=public_key_name,
        public_key=public_key,
        profile=profile,
        automatically_accept_followers=False,
    )
    await Credential.create(
        actor_id=actor_id,
        identifier=f"{actor_id}#{public_key_name}",
        secret=private_key,
    )

    if preferred_username:
        if "webfinger" in identifiers:
            raise ValueError("webfinger key set in identifiers")
        identifiers = {
            **identifiers,
            "webfinger": compute_acct_uri(base_url, preferred_username),
        }

    if "activitypub_id" not in identifiers:
        identifiers = {**identifiers, "activitypub_id": actor_id}

    for name, identifier in identifiers.items():
        await PublicIdentifier.create(actor=actor, name=name, identifier=identifier)

    logging.info("Created actor with id '%s'", actor_id)

    await actor.fetch_related("identifiers")

    return actor

database() async

Fixture so that the database is initialized

Source code in cattle_grid/testing/fixtures.py
@pytest.fixture(autouse=True)
async def database():
    """Fixture so that the database is initialized"""
    async with with_database(db_uri="sqlite://:memory:", generate_schemas=True):
        yield

new_auth_config(actor_id, username=None)

Creates a new authorization configuration

Source code in cattle_grid/config/auth.py
def new_auth_config(actor_id: str, username: str | None = None) -> AuthConfig:
    """Creates a new authorization configuration"""
    if not username:
        username = secrets.token_urlsafe(12)

    domain = urlparse(actor_id).netloc
    acct_uri = f"acct:{username}@{domain}"

    public_key, private_key = generate_rsa_public_private_key()

    auth_config = AuthConfig(
        actor_id=actor_id,
        actor_acct_id=acct_uri,
        public_key=public_key,
        private_key=private_key,
        domain_blocks=set(),
    )

    return auth_config

save_auth_config(filename, config)

Saves the authorization configuration to a file

Source code in cattle_grid/config/auth.py
def save_auth_config(filename: str, config: AuthConfig) -> None:
    """Saves the authorization configuration to a file"""
    with open(filename, "wb") as fp:
        tomli_w.dump({"auth": config.model_dump()}, fp, multiline_strings=True)

test_actor() async

Fixture to create an actor

Source code in cattle_grid/testing/fixtures.py
@pytest.fixture
async def test_actor():
    """Fixture to create an actor"""
    return await create_actor("http://localhost/ap")

with_database(db_uri='sqlite://:memory:', generate_schemas=False) async

Opens the connection to the database using tortoise

Source code in cattle_grid/database.py
@asynccontextmanager
async def database(db_uri: str = "sqlite://:memory:", generate_schemas: bool = False):
    """Opens the connection to the database using tortoise"""
    await Tortoise.init(config=tortoise_config(db_uri))
    if generate_schemas:
        await Tortoise.generate_schemas()

    try:
        yield
    finally:
        await Tortoise.close_connections()