Bases: AbstractDeployment
Deployment to local docker image.
Parameters:
Name |
Type |
Description |
Default |
**kwargs
|
Any
|
Keyword arguments (see DockerDeploymentConfig for details).
|
{}
|
Source code in swerex/deployment/docker.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 | def __init__(
self,
*,
logger: logging.Logger | None = None,
**kwargs: Any,
):
"""Deployment to local docker image.
Args:
**kwargs: Keyword arguments (see `DockerDeploymentConfig` for details).
"""
self._config = DockerDeploymentConfig(**kwargs)
self._runtime: RemoteRuntime | None = None
self._container_process = None
self._container_name = None
self.logger = logger or get_logger("rex-deploy")
self._runtime_timeout = 0.15
self._hooks = CombinedDeploymentHook()
|
_config
instance-attribute
_container_name
instance-attribute
_container_process
instance-attribute
_container_process = None
_hooks
instance-attribute
_hooks = CombinedDeploymentHook()
_runtime
instance-attribute
_runtime_timeout
instance-attribute
container_name
property
container_name: str | None
glibc_dockerfile
property
logger
instance-attribute
logger = logger or get_logger('rex-deploy')
runtime
property
Returns the runtime if running.
Raises:
Type |
Description |
DeploymentNotStartedError
|
If the deployment was not started.
|
_build_image
Builds image, returns image ID.
Source code in swerex/deployment/docker.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224 | def _build_image(self) -> str:
"""Builds image, returns image ID."""
self.logger.info(
f"Building image {self._config.image} to install a standalone python to {self._config.python_standalone_dir}. "
"This might take a while (but you only have to do it once). To skip this step, set `python_standalone_dir` to None."
)
dockerfile = self.glibc_dockerfile
platform_arg = []
if self._config.platform:
platform_arg = ["--platform", self._config.platform]
build_cmd = [
"docker",
"build",
"-q",
*platform_arg,
"--build-arg",
f"BASE_IMAGE={self._config.image}",
"-",
]
image_id = (
subprocess.check_output(
build_cmd,
input=dockerfile.encode(),
)
.decode()
.strip()
)
if not image_id.startswith("sha256:"):
msg = f"Failed to build image. Image ID is not a SHA256: {image_id}"
raise RuntimeError(msg)
return image_id
|
_get_container_name
_get_container_name() -> str
Returns a unique container name based on the image name.
Source code in swerex/deployment/docker.py
| def _get_container_name(self) -> str:
"""Returns a unique container name based on the image name."""
image_name_sanitized = "".join(c for c in self._config.image if c.isalnum() or c in "-_.")
return f"{image_name_sanitized}-{uuid.uuid4()}"
|
_get_swerex_start_cmd
_get_swerex_start_cmd(token: str) -> list[str]
Source code in swerex/deployment/docker.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129 | def _get_swerex_start_cmd(self, token: str) -> list[str]:
rex_args = f"--auth-token {token}"
pipx_install = "python3 -m pip install pipx && python3 -m pipx ensurepath"
if self._config.python_standalone_dir:
cmd = f"{self._config.python_standalone_dir}/python3.11/bin/{REMOTE_EXECUTABLE_NAME} {rex_args}"
else:
cmd = f"{REMOTE_EXECUTABLE_NAME} {rex_args} || ({pipx_install} && pipx run {PACKAGE_NAME} {rex_args})"
# Need to wrap with /bin/sh -c to avoid having '&&' interpreted by the parent shell
return [
"/bin/sh",
# "-l",
"-c",
cmd,
]
|
_get_token
Source code in swerex/deployment/docker.py
| def _get_token(self) -> str:
return str(uuid.uuid4())
|
_pull_image
Source code in swerex/deployment/docker.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144 | def _pull_image(self) -> None:
if self._config.pull == "never":
return
if self._config.pull == "missing" and _is_image_available(self._config.image):
return
self.logger.info(f"Pulling image {self._config.image!r}")
self._hooks.on_custom_step("Pulling docker image")
try:
_pull_image(self._config.image)
except subprocess.CalledProcessError as e:
msg = f"Failed to pull image {self._config.image}. "
msg += f"Error: {e.stderr.decode()}"
msg += f"Output: {e.output.decode()}"
raise DockerPullError(msg) from e
|
_wait_until_alive
async
_wait_until_alive(timeout: float = 10.0)
Source code in swerex/deployment/docker.py
102
103
104
105
106
107
108
109
110
111 | async def _wait_until_alive(self, timeout: float = 10.0):
try:
return await _wait_until_alive(self.is_alive, timeout=timeout, function_timeout=self._runtime_timeout)
except TimeoutError as e:
self.logger.error("Runtime did not start within timeout. Here's the output from the container process.")
self.logger.error(self._container_process.stdout.read().decode()) # type: ignore
self.logger.error(self._container_process.stderr.read().decode()) # type: ignore
assert self._container_process is not None
await self.stop()
raise e
|
add_hook
add_hook(hook: DeploymentHook)
Source code in swerex/deployment/docker.py
| def add_hook(self, hook: DeploymentHook):
self._hooks.add_hook(hook)
|
from_config
classmethod
Source code in swerex/deployment/docker.py
| @classmethod
def from_config(cls, config: DockerDeploymentConfig) -> Self:
return cls(**config.model_dump())
|
is_alive
async
Checks if the runtime is alive. The return value can be
tested with bool().
Raises:
Type |
Description |
DeploymentNotStartedError
|
If the deployment was not started.
|
Source code in swerex/deployment/docker.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100 | async def is_alive(self, *, timeout: float | None = None) -> IsAliveResponse:
"""Checks if the runtime is alive. The return value can be
tested with bool().
Raises:
DeploymentNotStartedError: If the deployment was not started.
"""
if self._runtime is None:
msg = "Runtime not started"
raise RuntimeError(msg)
if self._container_process is None:
msg = "Container process not started"
raise RuntimeError(msg)
if self._container_process.poll() is not None:
msg = "Container process terminated."
output = "stdout:\n" + self._container_process.stdout.read().decode() # type: ignore
output += "\nstderr:\n" + self._container_process.stderr.read().decode() # type: ignore
msg += "\n" + output
raise RuntimeError(msg)
return await self._runtime.is_alive(timeout=timeout)
|
start
async
Starts the runtime.
Source code in swerex/deployment/docker.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271 | async def start(self):
"""Starts the runtime."""
self._pull_image()
if self._config.python_standalone_dir is not None:
image_id = self._build_image()
else:
image_id = self._config.image
if self._config.port is None:
self._config.port = find_free_port()
assert self._container_name is None
self._container_name = self._get_container_name()
token = self._get_token()
platform_arg = []
if self._config.platform is not None:
platform_arg = ["--platform", self._config.platform]
rm_arg = []
if self._config.remove_container:
rm_arg = ["--rm"]
cmds = [
"docker",
"run",
*rm_arg,
"-p",
f"{self._config.port}:8000",
*platform_arg,
*self._config.docker_args,
"--name",
self._container_name,
image_id,
*self._get_swerex_start_cmd(token),
]
cmd_str = shlex.join(cmds)
self.logger.info(
f"Starting container {self._container_name} with image {self._config.image} serving on port {self._config.port}"
)
self.logger.debug(f"Command: {cmd_str!r}")
# shell=True required for && etc.
self._container_process = subprocess.Popen(cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self._hooks.on_custom_step("Starting runtime")
self.logger.info(f"Starting runtime at {self._config.port}")
self._runtime = RemoteRuntime.from_config(
RemoteRuntimeConfig(port=self._config.port, timeout=self._runtime_timeout, auth_token=token)
)
t0 = time.time()
await self._wait_until_alive(timeout=self._config.startup_timeout)
self.logger.info(f"Runtime started in {time.time() - t0:.2f}s")
|
stop
async
Stops the runtime.
Source code in swerex/deployment/docker.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310 | async def stop(self):
"""Stops the runtime."""
if self._runtime is not None:
await self._runtime.close()
self._runtime = None
if self._container_process is not None:
try:
subprocess.check_call(
["docker", "kill", self._container_name], # type: ignore
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=10,
)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
self.logger.warning(
f"Failed to kill container {self._container_name}: {e}. Will try harder.", exc_info=False
)
for _ in range(3):
self._container_process.kill()
try:
self._container_process.wait(timeout=5)
break
except subprocess.TimeoutExpired:
continue
else:
self.logger.warning(f"Failed to kill container {self._container_name} with SIGKILL")
self._container_process = None
self._container_name = None
if self._config.remove_images:
if _is_image_available(self._config.image):
self.logger.info(f"Removing image {self._config.image}")
try:
_remove_image(self._config.image)
except subprocess.CalledProcessError:
self.logger.error(f"Failed to remove image {self._config.image}", exc_info=True)
|