Skip to content

Fargate

Warning

This deployment is currently in alpha stage. Expect breaking changes.

swerex.deployment.fargate.FargateDeployment

FargateDeployment(*, logger: Logger | None = None, **kwargs: Any)

Bases: AbstractDeployment

Source code in swerex/deployment/fargate.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def __init__(
    self,
    *,
    logger: logging.Logger | None = None,
    **kwargs: Any,
):
    self._config = FargateDeploymentConfig(**kwargs)
    self._runtime: RemoteRuntime | None = None
    self._container_process = None
    self._container_name = None
    self.logger = logger or get_logger("rex-deploy")
    # we need to setup ecs and ec2 to run containers
    self._cluster_arn = None
    self._execution_role_arn = None
    self._vpc_id = None
    self._subnet_id = None
    self._task_arn = None
    self._security_group_id = None
    self._hooks = CombinedDeploymentHook()

container_name property

container_name: str | None

logger instance-attribute

logger = logger or get_logger('rex-deploy')

runtime property

runtime: RemoteRuntime

Returns the runtime if running.

Raises:

Type Description
DeploymentNotStartedError

If the deployment was not started.

add_hook

add_hook(hook: DeploymentHook)
Source code in swerex/deployment/fargate.py
52
53
def add_hook(self, hook: DeploymentHook):
    self._hooks.add_hook(hook)

from_config classmethod

from_config(config: FargateDeploymentConfig) -> Self
Source code in swerex/deployment/fargate.py
55
56
57
@classmethod
def from_config(cls, config: FargateDeploymentConfig) -> Self:
    return cls(**config.model_dump())

is_alive async

is_alive(*, timeout: float | None = None) -> IsAliveResponse

Checks if the runtime is alive. The return value can be tested with bool().

Raises:

Type Description
DeploymentNotStartedError

If the deployment was not started.

Source code in swerex/deployment/fargate.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
async def is_alive(self, *, timeout: float | None = None) -> IsAliveResponse:
    """Checks if the runtime is alive. The return value can be
    tested with bool().

    Raises:
        DeploymentNotStartedError: If the deployment was not started.
    """
    if self._runtime is None or self._task_arn is None:
        raise DeploymentNotStartedError()
    else:
        # check if the task is running
        ecs_client = boto3.client("ecs")
        task_details = ecs_client.describe_tasks(cluster=self._cluster_arn, tasks=[self._task_arn])
        if task_details["tasks"][0]["lastStatus"] != "RUNNING":
            msg = f"Container process not running: {task_details['tasks'][0]['lastStatus']}"
            raise RuntimeError(msg)
    return await self._runtime.is_alive(timeout=timeout)

start async

start()

Starts the runtime.

Source code in swerex/deployment/fargate.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
async def start(
    self,
):
    """Starts the runtime."""
    self._init_aws()
    self._container_name = self._get_container_name()
    self.logger.info(f"Starting runtime with container name {self._container_name}")
    token = self._get_token()
    self._task_arn = run_fargate_task(
        command=self._get_command(token=token),
        name=self._container_name,
        task_definition_arn=self._task_definition["taskDefinitionArn"],
        subnet_id=self._subnet_id,
        security_group_id=self._security_group_id,
        cluster_arn=self._cluster_arn,
        **self._config.fargate_args,
    )
    self.logger.info(f"Container task submitted: {self._task_arn} - waiting for it to start...")
    # wait until the container is running
    t0 = time.time()
    ecs_client = boto3.client("ecs")
    waiter = ecs_client.get_waiter("tasks_running")
    waiter.wait(cluster=self._cluster_arn, tasks=[self._task_arn])
    self.logger.info(f"Fargate container started in {time.time() - t0:.2f}s")
    if self._config.log_group:
        try:
            log_url = get_cloudwatch_log_url(
                task_arn=self._task_arn,
                task_definition=self._task_definition,
                container_name=self._container_name,
            )
            self.logger.info(f"Monitor logs at: {log_url}")
        except Exception as e:
            self.logger.warning(f"Failed to get CloudWatch Logs URL: {str(e)}")
    public_ip = get_public_ip(self._task_arn, self._cluster_arn)
    self.logger.info(f"Container public IP: {public_ip}")
    self._runtime = RemoteRuntime(host=public_ip, port=self._config.port, auth_token=token, logger=self.logger)
    t0 = time.time()
    await self._wait_until_alive(timeout=self._config.runtime_timeout)
    self.logger.info(f"Runtime started in {time.time() - t0:.2f}s")

stop async

stop()

Stops the runtime.

Source code in swerex/deployment/fargate.py
164
165
166
167
168
169
170
171
172
173
async def stop(self):
    """Stops the runtime."""
    if self._runtime is not None:
        await self._runtime.close()
        self._runtime = None
    if self._task_arn is not None:
        ecs_client = boto3.client("ecs")
        ecs_client.stop_task(task=self._task_arn, cluster=self._cluster_arn)
    self._task_arn = None
    self._container_name = None