Skip to content

Docker

swerex.deployment.docker.DockerDeployment

DockerDeployment(*, logger: Logger | None = None, **kwargs: Any)

Bases: AbstractDeployment

Deployment to local docker image.

Parameters:

Name Type Description Default
**kwargs Any

Keyword arguments (see DockerDeploymentConfig for details).

{}
Source code in swerex/deployment/docker.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def __init__(
    self,
    *,
    logger: logging.Logger | None = None,
    **kwargs: Any,
):
    """Deployment to local docker image.

    Args:
        **kwargs: Keyword arguments (see `DockerDeploymentConfig` for details).
    """
    self._config = DockerDeploymentConfig(**kwargs)
    self._runtime: RemoteRuntime | None = None
    self._container_process = None
    self._container_name = None
    self.logger = logger or get_logger("rex-deploy")
    self._runtime_timeout = 0.15
    self._hooks = CombinedDeploymentHook()

container_name property

container_name: str | None

glibc_dockerfile property

glibc_dockerfile: str

logger instance-attribute

logger = logger or get_logger('rex-deploy')

runtime property

runtime: RemoteRuntime

Returns the runtime if running.

Raises:

Type Description
DeploymentNotStartedError

If the deployment was not started.

add_hook

add_hook(hook: DeploymentHook)
Source code in swerex/deployment/docker.py
65
66
def add_hook(self, hook: DeploymentHook):
    self._hooks.add_hook(hook)

from_config classmethod

from_config(config: DockerDeploymentConfig) -> Self
Source code in swerex/deployment/docker.py
68
69
70
@classmethod
def from_config(cls, config: DockerDeploymentConfig) -> Self:
    return cls(**config.model_dump())

is_alive async

is_alive(*, timeout: float | None = None) -> IsAliveResponse

Checks if the runtime is alive. The return value can be tested with bool().

Raises:

Type Description
DeploymentNotStartedError

If the deployment was not started.

Source code in swerex/deployment/docker.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
async def is_alive(self, *, timeout: float | None = None) -> IsAliveResponse:
    """Checks if the runtime is alive. The return value can be
    tested with bool().

    Raises:
        DeploymentNotStartedError: If the deployment was not started.
    """
    if self._runtime is None:
        msg = "Runtime not started"
        raise RuntimeError(msg)
    if self._container_process is None:
        msg = "Container process not started"
        raise RuntimeError(msg)
    if self._container_process.poll() is not None:
        msg = "Container process terminated."
        output = "stdout:\n" + self._container_process.stdout.read().decode()  # type: ignore
        output += "\nstderr:\n" + self._container_process.stderr.read().decode()  # type: ignore
        msg += "\n" + output
        raise RuntimeError(msg)
    return await self._runtime.is_alive(timeout=timeout)

start async

start()

Starts the runtime.

Source code in swerex/deployment/docker.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
async def start(self):
    """Starts the runtime."""
    self._pull_image()
    if self._config.python_standalone_dir:
        image_id = self._build_image()
    else:
        image_id = self._config.image
    if self._config.port is None:
        self._config.port = find_free_port()
    assert self._container_name is None
    self._container_name = self._get_container_name()
    token = self._get_token()
    platform_arg = []
    if self._config.platform is not None:
        platform_arg = ["--platform", self._config.platform]
    rm_arg = []
    if self._config.remove_container:
        rm_arg = ["--rm"]
    cmds = [
        "docker",
        "run",
        *rm_arg,
        "-p",
        f"{self._config.port}:8000",
        *platform_arg,
        *self._config.docker_args,
        "--name",
        self._container_name,
        image_id,
        *self._get_swerex_start_cmd(token),
    ]
    cmd_str = shlex.join(cmds)
    self.logger.info(
        f"Starting container {self._container_name} with image {self._config.image} serving on port {self._config.port}"
    )
    self.logger.debug(f"Command: {cmd_str!r}")
    # shell=True required for && etc.
    self._container_process = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    self._hooks.on_custom_step("Starting runtime")
    self.logger.info(f"Starting runtime at {self._config.port}")
    self._runtime = RemoteRuntime.from_config(
        RemoteRuntimeConfig(port=self._config.port, timeout=self._runtime_timeout, auth_token=token)
    )
    t0 = time.time()
    await self._wait_until_alive(timeout=self._config.startup_timeout)
    self.logger.info(f"Runtime started in {time.time() - t0:.2f}s")

stop async

stop()

Stops the runtime.

Source code in swerex/deployment/docker.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
async def stop(self):
    """Stops the runtime."""
    if self._runtime is not None:
        await self._runtime.close()
        self._runtime = None

    if self._container_process is not None:
        try:
            subprocess.check_call(
                ["docker", "kill", self._container_name],  # type: ignore
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL,
                timeout=10,
            )
        except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
            self.logger.warning(
                f"Failed to kill container {self._container_name}: {e}. Will try harder.", exc_info=False
            )
        for _ in range(3):
            self._container_process.kill()
            try:
                self._container_process.wait(timeout=5)
                break
            except subprocess.TimeoutExpired:
                continue
        else:
            self.logger.warning(f"Failed to kill container {self._container_name} with SIGKILL")

        self._container_process = None
        self._container_name = None

    if self._config.remove_images:
        if _is_image_available(self._config.image):
            self.logger.info(f"Removing image {self._config.image}")
            try:
                _remove_image(self._config.image)
            except subprocess.CalledProcessError:
                self.logger.error(f"Failed to remove image {self._config.image}", exc_info=True)