Skip to content

Arguments & Results

Repid provides mechanisms to pass arguments to a job and retrieve the job's result.

Preparation

First of all, you will have to define what will be used to store the data. Arguments can either be stored inside of a message (so you only need message broker) or inside of a bucket broker. Results, on the other hand, can only be stored in a result bucket broker.

For example, let's create a connection with InMemoryBucketBroker.

from repid import Connection, InMemoryMessageBroker, InMemoryBucketBroker

my_connection = Connection(
    message_broker=InMemoryMessageBroker(),
    args_bucket_broker=InMemoryBucketBroker(),
    results_bucket_broker=InMemoryBucketBroker(,
        use_result_bucket=True,
    ),
)

We can then experiment with storing of our buckets.

Arguments

First of all, let's create an actor with some arguments.

from repid import Router

router = Router()


@router.actor
async def actor_with_args(user_id: int, user_name: str, user_messages: list[str]) -> list[str]:
    user_message = f"Hi {user_name}! Your id is: {user_id}."
    user_messages.append(user_message)
    return user_messages

Now we would like to schedule a job, which will pass those arguments to the actor.

from repid import Job

# code above is omitted

await Job(
    "actor_with_args",
    args=dict(user_id=123, user_name="Alex", user_messages=["This is your first message!"]),
).enqueue()

# code below is omitted

What will happen under the hood?

  1. Job.args will be serialized using Config.SERIALIZER
  2. The serialized string will be
    • encoded into the message or
    • passed to arguments bucket broker if Job.use_args_bucketer is set to True
      • defaults to True if the current connection contains arguments bucket broker
  3. When the message will be received by the actor, arguments will be decoded and mapped to the function arguments using actor's Converter.convert_inputs
  4. The return of the actor will be encoded using actor's Converter.convert_outputs

Results

You can define whether to store result of a job or not using Job.store_result argument.

Job("some_job", store_result=True)

The default is to store result, if connection contains result bucket broker, and not to store result otherwise.

You can access result of a job using Job.result async property.

import asyncio
from repid import Job

# code above is omitted

myjob = Job(
    "actor_with_args",
    args=dict(user_id=123, user_name="Alex", user_messages=["This is your first message!"]),
)

await myjob.enqueue()

await asyncio.sleep(1.0)  # wait for the job to complete

result_bucket = await myjob.result
print(result_bucket)

# code below is omitted

Info

Results of recurring jobs will be overwritten.

IDs

By default, an UUID4 will be generated upon creation of a job, both in case of an arguments bucket and a result bucket.

You can pass your own IDs using appropriate arguments:

Job("some_job", args_id="my_args_id", result_id="my_result_id")

Chaining jobs

As a result bucket is technically backwards compatible with arguments bucket (result bucket just holds more fields), you can use it to chain multiple jobs.

Job("job1", result_id="my_chained_id")
Job("job2", args_id="my_chained_id")

Warning

Keep in mind that you will have to ensure order of the execution of those jobs and availability of the result bucket to the arguments bucket broker of the second job.

Example:

import asyncio
import os

from repid import Connection, Job, RedisBucketBroker, RedisMessageBroker, Repid, Router, Worker

redis_messages_dsn = os.environ.get("REDIS_CONNECTION")
redis_args_and_results_dsn = os.environ.get("REDIS_ARGS_CONNECTION")

my_connection = Connection(
    message_broker=RedisMessageBroker(redis_messages_dsn),
    args_bucket_broker=RedisBucketBroker(redis_args_and_results_dsn),
    results_bucket_broker=RedisBucketBroker(
        redis_args_and_results_dsn,
        use_result_bucket=True,
    ),
)

app = Repid(my_connection)

my_router = Router()


@my_router.actor
async def add_hello(user_name: str, user_id: int, messages: list[str]) -> dict:
    message = f"Hello {user_name}!"
    messages.append(message)
    return dict(user_id=user_id, messages=messages)


@my_router.actor
async def add_id(user_id: int, messages: list[str]) -> list[str]:
    message = f"Your id is {user_id}."
    messages.append(message)
    return messages


async def main() -> None:
    async with app.magic(auto_disconnect=True):
        w = Worker(routers=[my_router], messages_limit=1)

        await Job(
            "add_hello",
            args=dict(user_name="Alex", user_id=123, messages=["This is your first message!"]),
            result_id="chained_id",
        ).enqueue()

        await w.run()

        second_job = Job("add_id", args_id="chained_id")

        await second_job.enqueue()

        await w.run()

        result_bucket = await second_job.result
        print(result_bucket.data)


if __name__ == "__main__":
    asyncio.run(main())

TTL

Repid also supports specifying Time-To-Live for both arguments and result buckets.

from datetime import timedelta

Job("some_job", args_ttl=timedelta(weeks=2), result_ttl=timedelta(days=5))

You can also set TTL to None, which equals to no expiration.

Job("some_job", args_ttl=None, result_ttl=None)

By default, arguments buckets have no TTL, while result buckets have TTL of one day.

Warning

Not all bucket brokers may have native support for TTL, so be careful not to run out of memory.