Skip to content

Docker Utils

Utilities for working with Docker containers directly via Python.

logger module-attribute

logger = getLogger(__name__)

DEFAULT_TIMEOUT module-attribute

DEFAULT_TIMEOUT = 60

HTTP_OK module-attribute

HTTP_OK = 200

QDRANT_IMAGE module-attribute

QDRANT_IMAGE = 'qdrant/qdrant:latest'

QDRANT_CONTAINER_NAME module-attribute

QDRANT_CONTAINER_NAME = 'codemap-qdrant'

QDRANT_HOST_PORT module-attribute

QDRANT_HOST_PORT = 6333

QDRANT_HTTP_PORT module-attribute

QDRANT_HTTP_PORT = 6333

QDRANT_GRPC_PORT module-attribute

QDRANT_GRPC_PORT = 6334

QDRANT_STORAGE_PATH module-attribute

QDRANT_STORAGE_PATH = '.codemap_cache/qdrant'

POSTGRES_IMAGE module-attribute

POSTGRES_IMAGE = 'postgres:latest'

POSTGRES_CONTAINER_NAME module-attribute

POSTGRES_CONTAINER_NAME = 'codemap-postgres'

POSTGRES_HOST_PORT module-attribute

POSTGRES_HOST_PORT = 5432

POSTGRES_ENV module-attribute

POSTGRES_ENV = {
	"POSTGRES_PASSWORD": "postgres",
	"POSTGRES_USER": "postgres",
	"POSTGRES_DB": "codemap",
}

POSTGRES_STORAGE_PATH module-attribute

POSTGRES_STORAGE_PATH = '.codemap_cache/postgres_data'

is_docker_running async

is_docker_running() -> bool

Check if the Docker daemon is running.

Source code in src/codemap/utils/docker_utils.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
async def is_docker_running() -> bool:
	"""Check if the Docker daemon is running."""

	def _check_docker_sync() -> bool:
		client = None
		try:
			client = docker.from_env()
			client.ping()
			return True
		except DockerException:
			return False
		finally:
			if client is not None:
				client.close()

	return await asyncio.to_thread(_check_docker_sync)

is_container_running async

is_container_running(container_name: str) -> bool

Check if a specific Docker container is running.

Parameters:

Name Type Description Default
container_name str

Name of the container to check

required

Returns:

Type Description
bool

True if the container is running, False otherwise

Source code in src/codemap/utils/docker_utils.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
async def is_container_running(container_name: str) -> bool:
	"""
	Check if a specific Docker container is running.

	Args:
	    container_name: Name of the container to check

	Returns:
	    True if the container is running, False otherwise

	"""

	def _check_container_sync(name: str) -> bool:
		client = None
		try:
			client = docker.from_env()
			try:
				container = client.containers.get(name)
				return container.status == "running"
			except NotFound:
				return False
		except DockerException:
			logger.exception("Docker error while checking container status for %s", name)
			return False
		finally:
			if client is not None:
				client.close()

	return await asyncio.to_thread(_check_container_sync, container_name)

pull_image_if_needed async

pull_image_if_needed(image_name: str) -> bool

Pull a Docker image if it's not already available locally.

Parameters:

Name Type Description Default
image_name str

Name of the image to pull

required

Returns:

Type Description
bool

True if successful, False otherwise

Source code in src/codemap/utils/docker_utils.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
async def pull_image_if_needed(image_name: str) -> bool:
	"""
	Pull a Docker image if it's not already available locally.

	Args:
	    image_name: Name of the image to pull

	Returns:
	    True if successful, False otherwise

	"""

	def _pull_image_sync(name: str) -> bool:
		client = None
		try:
			client = docker.from_env()
			try:
				client.images.get(name)
				logger.info(f"Image {name} already exists locally")
				return True
			except ImageNotFound:
				logger.info(f"Pulling image {name}...")
				try:
					client.images.pull(name)
					logger.info(f"Successfully pulled image {name}")
					return True
				except APIError:
					logger.exception(f"Failed to pull image {name}")
					return False
		except DockerException:  # Catch potential errors from images.get()
			logger.exception(f"Docker error while checking image {name}")
			return False
		finally:
			if client is not None:
				client.close()

	return await asyncio.to_thread(_pull_image_sync, image_name)

ensure_volume_path_exists async

ensure_volume_path_exists(path: str) -> None

Ensure the host path for a volume exists.

Parameters:

Name Type Description Default
path str

Path to ensure exists

required
Source code in src/codemap/utils/docker_utils.py
119
120
121
122
123
124
125
126
127
128
129
130
131
async def ensure_volume_path_exists(path: str) -> None:
	"""
	Ensure the host path for a volume exists.

	Args:
	    path: Path to ensure exists

	"""

	def _ensure_path_sync(p: str) -> None:
		Path(p).mkdir(parents=True, exist_ok=True)

	await asyncio.to_thread(_ensure_path_sync, path)

start_qdrant_container async

start_qdrant_container() -> bool

Start the Qdrant container.

Returns:

Type Description
bool

True if successful, False otherwise

Source code in src/codemap/utils/docker_utils.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
async def start_qdrant_container() -> bool:
	"""
	Start the Qdrant container.

	Returns:
	    True if successful, False otherwise

	"""

	def _start_qdrant_sync() -> bool:
		client = None
		try:
			client = docker.from_env()

			# Ensure image is available (This function is already async, called separately)
			# if not await pull_image_if_needed(QDRANT_IMAGE):
			# 	return False

			# Ensure storage directory exists (This function is already async, called separately)
			# await ensure_volume_path_exists(QDRANT_STORAGE_PATH)

			# Check if container already exists
			try:
				container = client.containers.get(QDRANT_CONTAINER_NAME)
				if container.status == "running":
					logger.info(f"Container {QDRANT_CONTAINER_NAME} is already running")
					return True

				# If container exists but is not running, start it
				logger.info(f"Starting existing container {QDRANT_CONTAINER_NAME}")
				container.start()
				logger.info(f"Started container {QDRANT_CONTAINER_NAME}")
				return True

			except NotFound:
				# Container doesn't exist, create and start it
				abs_storage_path = str(Path(QDRANT_STORAGE_PATH).absolute())

				logger.info(f"Creating and starting container {QDRANT_CONTAINER_NAME}")

				# Define volume binding in Docker SDK format
				volumes: list[str] = [f"{abs_storage_path}:/qdrant/storage:rw"]

				# Define port mapping
				ports: dict[str, int | list[int] | tuple[str, int] | None] = {
					f"{QDRANT_HTTP_PORT}/tcp": QDRANT_HOST_PORT,
					f"{QDRANT_GRPC_PORT}/tcp": QDRANT_GRPC_PORT,
				}

				restart_policy = {"Name": "always"}

				client.containers.run(
					image=QDRANT_IMAGE,
					name=QDRANT_CONTAINER_NAME,
					ports=ports,
					volumes=volumes,
					detach=True,
					restart_policy=restart_policy,  # type: ignore[arg-type]
				)
				logger.info(f"Created and started container {QDRANT_CONTAINER_NAME}")
				return True

		except DockerException:
			logger.exception("Docker error while starting Qdrant container")
			return False
		finally:
			if client is not None:
				client.close()

	# Ensure image is available
	if not await pull_image_if_needed(QDRANT_IMAGE):
		return False

	# Ensure storage directory exists
	await ensure_volume_path_exists(QDRANT_STORAGE_PATH)

	return await asyncio.to_thread(_start_qdrant_sync)

start_postgres_container async

start_postgres_container() -> bool

Start the PostgreSQL container.

Returns:

Type Description
bool

True if successful, False otherwise

Source code in src/codemap/utils/docker_utils.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
async def start_postgres_container() -> bool:
	"""
	Start the PostgreSQL container.

	Returns:
	    True if successful, False otherwise

	"""

	def _start_postgres_sync() -> bool:
		client = None
		try:
			client = docker.from_env()

			# Ensure image is available (This function is already async, called separately)
			# if not await pull_image_if_needed(POSTGRES_IMAGE):
			# 	return False

			# Ensure storage directory exists (This function is already async, called separately)
			# await ensure_volume_path_exists(POSTGRES_STORAGE_PATH)

			# Check if container already exists
			try:
				container = client.containers.get(POSTGRES_CONTAINER_NAME)
				if container.status == "running":
					logger.info(f"Container {POSTGRES_CONTAINER_NAME} is already running")
					return True

				# If container exists but is not running, start it
				logger.info(f"Starting existing container {POSTGRES_CONTAINER_NAME}")
				container.start()
				logger.info(f"Started container {POSTGRES_CONTAINER_NAME}")
				return True

			except NotFound:
				# Container doesn't exist, create and start it
				abs_storage_path = str(Path(POSTGRES_STORAGE_PATH).absolute())

				logger.info(f"Creating and starting container {POSTGRES_CONTAINER_NAME}")

				# Define volume binding in Docker SDK format
				volumes: list[str] = [f"{abs_storage_path}:/var/lib/postgresql/data:rw"]

				# Define port mapping
				ports: dict[str, int | list[int] | tuple[str, int] | None] = {"5432/tcp": POSTGRES_HOST_PORT}

				restart_policy = {"Name": "always"}

				client.containers.run(
					image=POSTGRES_IMAGE,
					name=POSTGRES_CONTAINER_NAME,
					ports=ports,
					volumes=volumes,
					environment=POSTGRES_ENV,
					detach=True,
					restart_policy=restart_policy,  # type: ignore[arg-type]
				)
				logger.info(f"Created and started container {POSTGRES_CONTAINER_NAME}")
				return True

		except DockerException:
			logger.exception("Docker error while starting PostgreSQL container")
			return False
		finally:
			if client is not None:
				client.close()

	# Ensure image is available
	if not await pull_image_if_needed(POSTGRES_IMAGE):
		return False

	# Ensure storage directory exists
	await ensure_volume_path_exists(POSTGRES_STORAGE_PATH)

	return await asyncio.to_thread(_start_postgres_sync)

check_qdrant_health async

check_qdrant_health(
	url: str = f"http://localhost:{QDRANT_HOST_PORT}",
) -> bool

Check if Qdrant service is healthy and ready to accept connections.

Parameters:

Name Type Description Default
url str

Base URL of the Qdrant service

f'http://localhost:{QDRANT_HOST_PORT}'

Returns:

Type Description
bool

True if Qdrant is healthy, False otherwise

Source code in src/codemap/utils/docker_utils.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
async def check_qdrant_health(url: str = f"http://localhost:{QDRANT_HOST_PORT}") -> bool:
	"""
	Check if Qdrant service is healthy and ready to accept connections.

	Args:
	    url: Base URL of the Qdrant service

	Returns:
	    True if Qdrant is healthy, False otherwise

	"""
	from httpx import AsyncClient, RequestError

	health_url = f"{url}/healthz"
	start_time = time.time()

	async with AsyncClient() as client:
		try:
			async with asyncio.timeout(DEFAULT_TIMEOUT):
				while True:
					try:
						response = await client.get(health_url)
						if response.status_code == HTTP_OK:
							logger.info("Qdrant service is healthy (responded 200 OK)")
							return True
					except RequestError:
						pass

					if time.time() - start_time >= DEFAULT_TIMEOUT:
						break

					# Wait before trying again
					await asyncio.sleep(1)
		except TimeoutError:
			pass

	logger.error(f"Qdrant service did not become healthy within {DEFAULT_TIMEOUT} seconds")
	return False

ensure_qdrant_running async

ensure_qdrant_running(
	wait_for_health: bool = True,
	qdrant_url: str = f"http://localhost:{QDRANT_HOST_PORT}",
) -> tuple[bool, str]

Ensure the Qdrant container is running, starting it if needed.

Parameters:

Name Type Description Default
wait_for_health bool

Whether to wait for Qdrant to be healthy

True
qdrant_url str

URL of the Qdrant service

f'http://localhost:{QDRANT_HOST_PORT}'

Returns:

Type Description
tuple[bool, str]

Tuple of (success, message)

Source code in src/codemap/utils/docker_utils.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
async def ensure_qdrant_running(
	wait_for_health: bool = True, qdrant_url: str = f"http://localhost:{QDRANT_HOST_PORT}"
) -> tuple[bool, str]:
	"""
	Ensure the Qdrant container is running, starting it if needed.

	Args:
	    wait_for_health: Whether to wait for Qdrant to be healthy
	    qdrant_url: URL of the Qdrant service

	Returns:
	    Tuple of (success, message)

	"""
	if not await is_docker_running():
		return False, "Docker daemon is not running"

	# Check if Qdrant service is already running
	qdrant_running = False

	from httpx import AsyncClient, HTTPError, RequestError

	try:
		# Try a direct HTTP request first to see if Qdrant is up
		async with AsyncClient(timeout=3.0) as client:
			try:
				response = await client.get(f"{qdrant_url}/health")
				if response.status_code == HTTP_OK:
					logger.info("Qdrant is already available via HTTP")
					qdrant_running = True
			except RequestError:
				# HTTP request failed, now check if it's running in Docker
				qdrant_running = await is_container_running(QDRANT_CONTAINER_NAME)
	except (HTTPError, ConnectionError, OSError) as e:
		logger.warning(f"Error checking Qdrant service: {e}")

	# Start services if needed
	if not qdrant_running:
		logger.info("Qdrant service is not running, starting container...")
		started = await start_qdrant_container()
		if not started:
			return False, "Failed to start Qdrant container"

		if wait_for_health:
			# Wait for Qdrant to be healthy
			logger.info(f"Waiting for Qdrant service to be healthy (timeout: {DEFAULT_TIMEOUT}s)...")
			healthy = await check_qdrant_health(qdrant_url)
			if not healthy:
				return False, "Qdrant service failed to become healthy within the timeout period"

	return True, "Qdrant container is running"

ensure_postgres_running async

ensure_postgres_running() -> tuple[bool, str]

Ensure the PostgreSQL container is running, starting it if needed.

Returns:

Type Description
tuple[bool, str]

Tuple of (success, message)

Source code in src/codemap/utils/docker_utils.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
async def ensure_postgres_running() -> tuple[bool, str]:
	"""
	Ensure the PostgreSQL container is running, starting it if needed.

	Returns:
	    Tuple of (success, message)

	"""
	if not await is_docker_running():
		return False, "Docker daemon is not running"

	# Check if PostgreSQL container is already running
	postgres_running = await is_container_running(POSTGRES_CONTAINER_NAME)

	# Start container if needed
	if not postgres_running:
		logger.info("PostgreSQL service is not running, starting container...")
		started = await start_postgres_container()
		if not started:
			return False, "Failed to start PostgreSQL container"

	return True, "PostgreSQL container is running"

stop_container async

stop_container(container_name: str) -> bool

Stop a Docker container.

Parameters:

Name Type Description Default
container_name str

Name of the container to stop

required

Returns:

Type Description
bool

True if successful, False otherwise

Source code in src/codemap/utils/docker_utils.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
async def stop_container(container_name: str) -> bool:
	"""
	Stop a Docker container.

	Args:
	    container_name: Name of the container to stop

	Returns:
	    True if successful, False otherwise

	"""

	def _stop_sync(name: str) -> bool:
		client = None
		try:
			client = docker.from_env()
			try:
				container = client.containers.get(name)
				if container.status == "running":
					logger.info(f"Stopping container {name}")
					container.stop(timeout=10)  # Wait up to 10 seconds for clean shutdown
					logger.info(f"Stopped container {name}")
				return True
			except NotFound:
				logger.info(f"Container {name} does not exist")
				return True
		except DockerException:  # Catch DockerException from client.containers.get or container.stop
			logger.exception(f"Docker error while stopping container {name}")
			return False
		finally:
			if client is not None:
				client.close()

	return await asyncio.to_thread(_stop_sync, container_name)

stop_all_codemap_containers async

stop_all_codemap_containers() -> tuple[bool, str]

Stop all CodeMap containers.

Returns:

Type Description
tuple[bool, str]

Tuple of (success, message)

Source code in src/codemap/utils/docker_utils.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
async def stop_all_codemap_containers() -> tuple[bool, str]:
	"""
	Stop all CodeMap containers.

	Returns:
	    Tuple of (success, message)

	"""
	containers = [QDRANT_CONTAINER_NAME, POSTGRES_CONTAINER_NAME]
	success = True

	for container_name in containers:
		if not await stop_container(container_name):
			success = False

	if success:
		return True, "All CodeMap containers stopped successfully"
	return False, "Failed to stop some CodeMap containers"