Skip to content

Pipeline

Unified pipeline for CodeMap data processing, synchronization, and retrieval.

This module defines the ProcessingPipeline, which acts as the central orchestrator for managing and interacting with the HNSW vector database. It handles initialization, synchronization with the Git repository, and provides semantic search capabilities.

logger module-attribute

logger = getLogger(__name__)

ProcessingPipeline

Orchestrates data processing, synchronization, and retrieval for CodeMap using Qdrant.

Manages connections and interactions with the Qdrant vector database, ensuring it is synchronized with the Git repository state. Provides methods for semantic search. Uses asyncio for database and embedding operations.

Source code in src/codemap/processor/pipeline.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
class ProcessingPipeline:
	"""
	Orchestrates data processing, synchronization, and retrieval for CodeMap using Qdrant.

	Manages connections and interactions with the Qdrant vector database,
	ensuring it is synchronized with the Git repository state. Provides
	methods for semantic search. Uses asyncio for database and embedding
	operations.

	"""

	def __init__(
		self,
		config_loader: ConfigLoader | None = None,
	) -> None:
		"""
		Initialize the processing pipeline synchronously.

		Core async initialization is done via `async_init`.

		Args:
		    config_loader: Application configuration loader. If None, a default one is created.
		"""
		if config_loader:
			self.config_loader = config_loader
		else:
			from codemap.config import ConfigLoader

			self.config_loader = ConfigLoader.get_instance()

		self.git_context = GitRepoContext.get_instance()

		self.repo_path = self.config_loader.get.repo_root

		if not self.repo_path:
			self.repo_path = self.git_context.repo_root

		if not self.repo_path:
			self.repo_path = self.git_context.get_repo_root()

		if not self.repo_path:
			msg = "Repository path could not be determined. Please ensure it's a git repository or set in config."
			logger.critical(msg)

		if self.repo_path:
			from pathlib import Path

			self.repo_path = Path(self.repo_path)
		else:
			logger.error("Critical: repo_path is None, RepoChecksumCalculator cannot be initialized.")

		_config_loader_type_check = self.config_loader.__class__
		if not config_loader:
			from codemap.config import ConfigLoader as _ActualConfigLoader

			_config_loader_type_check = _ActualConfigLoader

		if not isinstance(self.config_loader, _config_loader_type_check):
			from codemap.config import ConfigError

			logger.error(f"Config loading failed or returned unexpected type: {type(self.config_loader)}")
			msg = "Failed to load a valid Config object."
			raise ConfigError(msg)

		self.repo_checksum_calculator: RepoChecksumCalculator | None = None
		if self.repo_path and self.repo_path.is_dir():
			self.repo_checksum_calculator = RepoChecksumCalculator.get_instance(
				repo_path=self.repo_path, git_context=self.git_context, config_loader=self.config_loader
			)
			logger.info(f"RepoChecksumCalculator initialized for {self.repo_path}")
		else:
			logger.warning(
				"RepoChecksumCalculator could not be initialized because repo_path is invalid or not set. "
				"Checksum-based quick sync will be skipped."
			)

		# --- Defer Shared Components Initialization --- #
		self._analyzer: TreeSitterAnalyzer | None = None
		self._chunker: TreeSitterChunker | None = None
		self._db_client: DatabaseClient | None = None

		# --- Load Configuration --- #
		embedding_config = self.config_loader.get.embedding
		embedding_model = embedding_config.model_name
		qdrant_dimension = embedding_config.dimension
		distance_metric = embedding_config.dimension_metric

		self.embedding_model_name: str = "minishlab/potion-base-8M"
		if embedding_model and isinstance(embedding_model, str):
			self.embedding_model_name = embedding_model

		if not qdrant_dimension:
			logger.warning("Missing qdrant dimension in configuration, using default 256")
			qdrant_dimension = 256

		logger.info(f"Using embedding model: {self.embedding_model_name} with dimension: {qdrant_dimension}")

		vector_config = self.config_loader.get.embedding

		if self.repo_path:
			qdrant_location = self.repo_path / ".codemap_cache" / "qdrant"
			qdrant_location.mkdir(parents=True, exist_ok=True)

		qdrant_url = vector_config.url
		qdrant_api_key = vector_config.api_key

		distance_enum = qdrant_models.Distance.COSINE
		if distance_metric and distance_metric.upper() in ["COSINE", "EUCLID", "DOT", "MANHATTAN"]:
			distance_enum = getattr(qdrant_models.Distance, distance_metric.upper())

		str(self.repo_path) if self.repo_path else "no_repo_path"
		branch_str = self.git_context.branch or "no_branch"

		stable_repo_id = str(self.repo_path.resolve()) if self.repo_path else "unknown_repo"
		collection_base_name = hashlib.sha256(stable_repo_id.encode()).hexdigest()[:16]
		collection_name = f"codemap_{collection_base_name}_{branch_str}"

		import re

		safe_branch_str = re.sub(r"[^a-zA-Z0-9_-]", "_", branch_str)
		collection_name = f"codemap_{collection_base_name}_{safe_branch_str}"

		qdrant_init_args = {
			"config_loader": self.config_loader,
			"collection_name": collection_name,
			"dim": qdrant_dimension,
			"distance": distance_enum,
			"url": qdrant_url,
			"api_key": qdrant_api_key,
		}

		logger.info(f"Configuring Qdrant client for URL: {qdrant_url}, Collection: {collection_name}")

		self.qdrant_manager = QdrantManager(**qdrant_init_args)
		self._vector_synchronizer: VectorSynchronizer | None = None

		logger.info(f"ProcessingPipeline synchronous initialization complete for repo: {self.repo_path}")
		self.is_async_initialized = False
		self.watcher: Watcher | None = None
		self._watcher_task: asyncio.Task | None = None
		self._sync_lock = asyncio.Lock()

	@property
	def analyzer(self) -> TreeSitterAnalyzer:
		"""
		Lazily initialize and return a shared TreeSitterAnalyzer instance.

		Returns:
			TreeSitterAnalyzer: The shared analyzer instance.
		"""
		if self._analyzer is None:
			from codemap.processor.tree_sitter import TreeSitterAnalyzer

			self._analyzer = TreeSitterAnalyzer()
		return self._analyzer

	@property
	def chunker(self) -> TreeSitterChunker:
		"""
		Lazily initialize and return a TreeSitterChunker.

		Returns:
			TreeSitterChunker: The chunker instance.
		"""
		if self._chunker is None:
			from codemap.processor.lod import LODGenerator
			from codemap.processor.vector.chunking import TreeSitterChunker

			lod_generator = LODGenerator(analyzer=self.analyzer)
			self._chunker = TreeSitterChunker(
				lod_generator=lod_generator,
				config_loader=self.config_loader,
				git_context=self.git_context,
				repo_checksum_calculator=self.repo_checksum_calculator,
			)
		return self._chunker

	@property
	def db_client(self) -> DatabaseClient:
		"""
		Lazily initialize and return a DatabaseClient instance.

		Returns:
			DatabaseClient: The database client instance.

		Raises:
			RuntimeError: If the DatabaseClient cannot be initialized.
		"""
		if self._db_client is None:  # Only attempt initialization if not already done
			try:
				from codemap.db.client import DatabaseClient

				self._db_client = DatabaseClient()  # Add necessary args if any
			except ImportError:
				logger.exception(
					"DatabaseClient could not be imported. DB features will be unavailable. "
					"Ensure database dependencies are installed if needed."
				)
				# We will raise a RuntimeError below if _db_client is still None.
				# Allow to proceed to the check below
			except Exception:
				# Catch other potential errors during DatabaseClient instantiation
				logger.exception("Error initializing DatabaseClient")
				# We will raise a RuntimeError below if _db_client is still None.

		# After attempting initialization, check if it was successful.
		if self._db_client is None:
			msg = (
				"Failed to initialize DatabaseClient. It remains None after attempting import and instantiation. "
				"Check logs for import errors or instantiation issues."
			)
			logger.critical(msg)  # Use critical for such a failure
			raise RuntimeError(msg)

		return self._db_client

	@property
	def vector_synchronizer(self) -> VectorSynchronizer:
		"""
		Lazily initialize and return a VectorSynchronizer.

		Returns:
			VectorSynchronizer: The synchronizer instance.
		"""
		if self._vector_synchronizer is None:
			from codemap.processor.vector.synchronizer import VectorSynchronizer

			if self.repo_path is None:
				msg = "repo_path must not be None for VectorSynchronizer"
				logger.error(msg)
				raise RuntimeError(msg)
			if self.qdrant_manager is None:
				msg = "qdrant_manager must not be None for VectorSynchronizer"
				logger.error(msg)
				raise RuntimeError(msg)

			self._vector_synchronizer = VectorSynchronizer(
				repo_path=self.repo_path,
				qdrant_manager=self.qdrant_manager,
				chunker=self.chunker,
				embedding_model_name=self.embedding_model_name,
				analyzer=self.analyzer,
				config_loader=self.config_loader,
				repo_checksum_calculator=self.repo_checksum_calculator,
			)
		return self._vector_synchronizer

	async def async_init(self, sync_on_init: bool = True) -> None:
		"""
		Perform asynchronous initialization steps, including Qdrant connection and initial sync.

		Args:
		    sync_on_init: If True, run database synchronization during initialization.
		    update_progress: Optional ProgressUpdater instance for progress updates.

		"""
		if self.is_async_initialized:
			logger.info("Pipeline already async initialized.")
			return

		with progress_indicator("Initializing pipeline components..."):
			try:
				# Get embedding configuration for Qdrant URL
				embedding_config = self.config_loader.get.embedding
				qdrant_url = embedding_config.url

				# Check for Docker containers
				if qdrant_url:
					with progress_indicator("Checking Docker containers..."):
						# Only check Docker if we're using a URL that looks like localhost/127.0.0.1
						if "localhost" in qdrant_url or "127.0.0.1" in qdrant_url:
							logger.info("Ensuring Qdrant container is running")
							success, message = await ensure_qdrant_running(wait_for_health=True, qdrant_url=qdrant_url)

							if not success:
								logger.warning(f"Docker check failed: {message}")

							else:
								logger.info(f"Docker container check: {message}")

				# Initialize Qdrant client (connects, creates collection if needed)
				if self.qdrant_manager:
					with progress_indicator("Initializing Qdrant manager..."):
						await self.qdrant_manager.initialize()
						logger.info("Qdrant manager initialized asynchronously.")
				else:
					# This case should theoretically not happen if __init__ succeeded
					msg = "QdrantManager was not initialized in __init__."
					logger.error(msg)
					raise RuntimeError(msg)

				needs_sync = False
				if sync_on_init:
					needs_sync = True
					logger.info("`sync_on_init` is True. Performing index synchronization...")
				else:
					# Optional: Could add a check here if Qdrant collection is empty
					# requires another call to qdrant_manager, e.g., get_count()
					logger.info("Skipping sync on init as requested.")
					needs_sync = False

				# Set initialized flag *before* potentially long sync operation
				self.is_async_initialized = True
				logger.info("ProcessingPipeline async core components initialized.")

				if needs_sync:
					await self.sync_databases()

			except Exception:
				logger.exception("Failed during async initialization")
				# Optionally re-raise or handle specific exceptions
				raise

	async def stop(self) -> None:
		"""Stops the pipeline and releases resources, including closing Qdrant connection."""
		logger.info("Stopping ProcessingPipeline asynchronously...")
		if self.qdrant_manager:
			await self.qdrant_manager.close()
			self.qdrant_manager = None  # type: ignore[assignment]
		else:
			logger.warning("Qdrant Manager already None during stop.")

		# Stop the watcher if it's running
		if self._watcher_task and not self._watcher_task.done():
			logger.info("Stopping file watcher...")
			self._watcher_task.cancel()
			try:
				await self._watcher_task  # Allow cancellation to propagate
			except asyncio.CancelledError:
				logger.info("File watcher task cancelled.")
			if self.watcher:
				self.watcher.stop()
				logger.info("File watcher stopped.")
			self.watcher = None
			self._watcher_task = None

		# Other cleanup if needed
		self.is_async_initialized = False
		logger.info("ProcessingPipeline stopped.")

	# --- Synchronization --- #

	async def _sync_callback_wrapper(self) -> None:
		"""Async wrapper for the sync callback to handle locking."""
		if self._sync_lock.locked():
			logger.info("Sync already in progress, skipping watcher-triggered sync.")
			return

		async with self._sync_lock:
			logger.info("Watcher triggered sync starting...")
			# Run sync without progress bars from watcher
			await self.sync_databases()
			logger.info("Watcher triggered sync finished.")

	async def sync_databases(self) -> None:
		"""
		Asynchronously synchronize the Qdrant index with the Git repository state.

		Args:
		    update_progress: Optional ProgressUpdater instance for progress updates.

		"""
		if not self.is_async_initialized:
			logger.error("Cannot sync databases, async initialization not complete.")
			return

		# Acquire lock only if not already held (for watcher calls)
		if not self._sync_lock.locked():
			async with self._sync_lock:
				logger.info("Starting vector index synchronization using VectorSynchronizer...")
				# VectorSynchronizer handles its own progress updates internally now
				await self.vector_synchronizer.sync_index()
				# Final status message/logging is handled by sync_index
		else:
			# If lock is already held (likely by watcher call), just run it
			logger.info("Starting vector index synchronization (lock already held)...")
			await self.vector_synchronizer.sync_index()

	# --- Watcher Methods --- #

	def initialize_watcher(self, debounce_delay: float = 2.0) -> None:
		"""
		Initialize the file watcher.

		Args:
		    debounce_delay: Delay in seconds before triggering sync after a file change.

		"""
		if not self.repo_path:
			logger.error("Cannot initialize watcher without a repository path.")
			return

		if self.watcher:
			logger.warning("Watcher already initialized.")
			return

		logger.info(f"Initializing file watcher for path: {self.repo_path}")
		try:
			self.watcher = Watcher(
				path_to_watch=self.repo_path,
				on_change_callback=self._sync_callback_wrapper,  # Use the lock wrapper
				debounce_delay=debounce_delay,
			)
			logger.info("File watcher initialized.")
		except ValueError:
			logger.exception("Failed to initialize watcher")
			self.watcher = None
		except Exception:
			logger.exception("Unexpected error initializing watcher.")
			self.watcher = None

	async def start_watcher(self) -> None:
		"""
		Start the file watcher in the background.

		`initialize_watcher` must be called first.

		"""
		if not self.watcher:
			logger.error("Watcher not initialized. Call initialize_watcher() first.")
			return

		if self._watcher_task and not self._watcher_task.done():
			logger.warning("Watcher task is already running.")
			return

		logger.info("Starting file watcher task in the background...")
		# Create a task to run the watcher's start method asynchronously
		self._watcher_task = asyncio.create_task(self.watcher.start())
		# We don't await the task here; it runs independently.
		# Error handling within the watcher's start method logs issues.

	# --- Retrieval Methods --- #

	async def semantic_search(
		self,
		query: str,
		k: int = 5,
		filter_params: dict[str, Any] | None = None,
	) -> list[dict[str, Any]] | None:
		"""
		Perform semantic search for code chunks similar to the query using Qdrant.

		Args:
		    query: The search query string.
		    k: The number of top similar results to retrieve.
		    filter_params: Optional dictionary for filtering results. Supports:
		        - exact match: {"field": "value"} or {"match": {"field": "value"}}
		        - multiple values: {"match_any": {"field": ["value1", "value2"]}}
		        - range: {"range": {"field": {"gt": value, "lt": value}}}
		        - complex: {"must": [...], "should": [...], "must_not": [...]}

		Returns:
		    A list of search result dictionaries (Qdrant ScoredPoint converted to dict),
		    or None if an error occurs.

		"""
		if not self.is_async_initialized or not self.qdrant_manager:
			logger.error("QdrantManager not available for semantic search.")
			return None

		logger.debug("Performing semantic search for query: '%s', k=%d", query, k)

		try:
			# 1. Generate query embedding (must be async)
			query_embedding = generate_embedding([query], self.config_loader)
			if query_embedding is None:
				logger.error("Failed to generate embedding for query.")
				return None

			# Convert to numpy array if needed by Qdrant client, though list is often fine
			# query_vector = np.array(query_embedding, dtype=np.float32)
			query_vector = query_embedding[0]  # Qdrant client typically accepts list[float]

			# 2. Process filter parameters to Qdrant filter format
			query_filter = None
			if filter_params:
				query_filter = self._build_qdrant_filter(filter_params)
				logger.debug("Using filter for search: %s", query_filter)

			# 3. Query Qdrant index (must be async)
			search_results: list[qdrant_models.ScoredPoint] = await self.qdrant_manager.search(
				query_vector, k, query_filter=query_filter
			)

			if not search_results:
				logger.debug("Qdrant search returned no results.")
				return []

			# 4. Format results (convert ScoredPoint to dictionary)
			formatted_results = []
			for scored_point in search_results:
				# Convert Qdrant model to dict for consistent output
				# Include score (similarity) and payload
				result_dict = {
					"id": str(scored_point.id),  # Ensure ID is string
					"score": scored_point.score,
					"payload": scored_point.payload,
					# Optionally include version if needed
					# "version": scored_point.version,
				}
				formatted_results.append(result_dict)

			logger.debug("Semantic search found %d results.", len(formatted_results))
			return formatted_results

		except Exception:
			logger.exception("Error during semantic search.")
			return None

	def _build_qdrant_filter(self, filter_params: dict[str, Any]) -> qdrant_models.Filter:
		"""
		Convert filter parameters to Qdrant filter format.

		Args:
		    filter_params: Dictionary of filter parameters

		Returns:
		    Qdrant filter object

		"""
		# If already a proper Qdrant filter, return as is
		if isinstance(filter_params, qdrant_models.Filter):
			return filter_params

		# Check for clause-based filter (must, should, must_not)
		if any(key in filter_params for key in ["must", "should", "must_not"]):
			filter_obj = {}

			# Process must conditions (AND)
			if "must" in filter_params:
				filter_obj["must"] = [self._build_qdrant_filter(cond) for cond in filter_params["must"]]

			# Process should conditions (OR)
			if "should" in filter_params:
				filter_obj["should"] = [self._build_qdrant_filter(cond) for cond in filter_params["should"]]

			# Process must_not conditions (NOT)
			if "must_not" in filter_params:
				filter_obj["must_not"] = [self._build_qdrant_filter(cond) for cond in filter_params["must_not"]]

			return qdrant_models.Filter(**filter_obj)

		# Check for condition-based filter (match, range, etc.)
		if "match" in filter_params:
			field, value = next(iter(filter_params["match"].items()))
			return qdrant_models.Filter(
				must=[qdrant_models.FieldCondition(key=field, match=qdrant_models.MatchValue(value=value))]
			)

		if "match_any" in filter_params:
			field, values = next(iter(filter_params["match_any"].items()))
			# For string values
			if (values and isinstance(values[0], str)) or (values and isinstance(values[0], (int, float))):
				return qdrant_models.Filter(
					should=[
						qdrant_models.FieldCondition(key=field, match=qdrant_models.MatchValue(value=value))
						for value in values
					]
				)
			# Default case
			return qdrant_models.Filter(
				should=[
					qdrant_models.FieldCondition(key=field, match=qdrant_models.MatchValue(value=value))
					for value in values
				]
			)

		if "range" in filter_params:
			field, range_values = next(iter(filter_params["range"].items()))
			return qdrant_models.Filter(
				must=[qdrant_models.FieldCondition(key=field, range=qdrant_models.Range(**range_values))]
			)

		# Default: treat as simple field-value pairs (exact match)
		must_conditions = []
		for field, value in filter_params.items():
			must_conditions.append(qdrant_models.FieldCondition(key=field, match=qdrant_models.MatchValue(value=value)))

		return qdrant_models.Filter(must=must_conditions)

	# Context manager support for async operations
	async def __aenter__(self) -> Self:
		"""Return self for use as async context manager."""
		# Basic initialization is sync, async init must be called separately
		# Consider if automatic async_init here is desired, or keep it explicit
		# await self.async_init() # Example if auto-init is desired
		return self

	async def __aexit__(
		self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
	) -> None:
		"""Clean up resources when exiting the async context manager."""
		await self.stop()

__init__

__init__(config_loader: ConfigLoader | None = None) -> None

Initialize the processing pipeline synchronously.

Core async initialization is done via async_init.

Parameters:

Name Type Description Default
config_loader ConfigLoader | None

Application configuration loader. If None, a default one is created.

None
Source code in src/codemap/processor/pipeline.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def __init__(
	self,
	config_loader: ConfigLoader | None = None,
) -> None:
	"""
	Initialize the processing pipeline synchronously.

	Core async initialization is done via `async_init`.

	Args:
	    config_loader: Application configuration loader. If None, a default one is created.
	"""
	if config_loader:
		self.config_loader = config_loader
	else:
		from codemap.config import ConfigLoader

		self.config_loader = ConfigLoader.get_instance()

	self.git_context = GitRepoContext.get_instance()

	self.repo_path = self.config_loader.get.repo_root

	if not self.repo_path:
		self.repo_path = self.git_context.repo_root

	if not self.repo_path:
		self.repo_path = self.git_context.get_repo_root()

	if not self.repo_path:
		msg = "Repository path could not be determined. Please ensure it's a git repository or set in config."
		logger.critical(msg)

	if self.repo_path:
		from pathlib import Path

		self.repo_path = Path(self.repo_path)
	else:
		logger.error("Critical: repo_path is None, RepoChecksumCalculator cannot be initialized.")

	_config_loader_type_check = self.config_loader.__class__
	if not config_loader:
		from codemap.config import ConfigLoader as _ActualConfigLoader

		_config_loader_type_check = _ActualConfigLoader

	if not isinstance(self.config_loader, _config_loader_type_check):
		from codemap.config import ConfigError

		logger.error(f"Config loading failed or returned unexpected type: {type(self.config_loader)}")
		msg = "Failed to load a valid Config object."
		raise ConfigError(msg)

	self.repo_checksum_calculator: RepoChecksumCalculator | None = None
	if self.repo_path and self.repo_path.is_dir():
		self.repo_checksum_calculator = RepoChecksumCalculator.get_instance(
			repo_path=self.repo_path, git_context=self.git_context, config_loader=self.config_loader
		)
		logger.info(f"RepoChecksumCalculator initialized for {self.repo_path}")
	else:
		logger.warning(
			"RepoChecksumCalculator could not be initialized because repo_path is invalid or not set. "
			"Checksum-based quick sync will be skipped."
		)

	# --- Defer Shared Components Initialization --- #
	self._analyzer: TreeSitterAnalyzer | None = None
	self._chunker: TreeSitterChunker | None = None
	self._db_client: DatabaseClient | None = None

	# --- Load Configuration --- #
	embedding_config = self.config_loader.get.embedding
	embedding_model = embedding_config.model_name
	qdrant_dimension = embedding_config.dimension
	distance_metric = embedding_config.dimension_metric

	self.embedding_model_name: str = "minishlab/potion-base-8M"
	if embedding_model and isinstance(embedding_model, str):
		self.embedding_model_name = embedding_model

	if not qdrant_dimension:
		logger.warning("Missing qdrant dimension in configuration, using default 256")
		qdrant_dimension = 256

	logger.info(f"Using embedding model: {self.embedding_model_name} with dimension: {qdrant_dimension}")

	vector_config = self.config_loader.get.embedding

	if self.repo_path:
		qdrant_location = self.repo_path / ".codemap_cache" / "qdrant"
		qdrant_location.mkdir(parents=True, exist_ok=True)

	qdrant_url = vector_config.url
	qdrant_api_key = vector_config.api_key

	distance_enum = qdrant_models.Distance.COSINE
	if distance_metric and distance_metric.upper() in ["COSINE", "EUCLID", "DOT", "MANHATTAN"]:
		distance_enum = getattr(qdrant_models.Distance, distance_metric.upper())

	str(self.repo_path) if self.repo_path else "no_repo_path"
	branch_str = self.git_context.branch or "no_branch"

	stable_repo_id = str(self.repo_path.resolve()) if self.repo_path else "unknown_repo"
	collection_base_name = hashlib.sha256(stable_repo_id.encode()).hexdigest()[:16]
	collection_name = f"codemap_{collection_base_name}_{branch_str}"

	import re

	safe_branch_str = re.sub(r"[^a-zA-Z0-9_-]", "_", branch_str)
	collection_name = f"codemap_{collection_base_name}_{safe_branch_str}"

	qdrant_init_args = {
		"config_loader": self.config_loader,
		"collection_name": collection_name,
		"dim": qdrant_dimension,
		"distance": distance_enum,
		"url": qdrant_url,
		"api_key": qdrant_api_key,
	}

	logger.info(f"Configuring Qdrant client for URL: {qdrant_url}, Collection: {collection_name}")

	self.qdrant_manager = QdrantManager(**qdrant_init_args)
	self._vector_synchronizer: VectorSynchronizer | None = None

	logger.info(f"ProcessingPipeline synchronous initialization complete for repo: {self.repo_path}")
	self.is_async_initialized = False
	self.watcher: Watcher | None = None
	self._watcher_task: asyncio.Task | None = None
	self._sync_lock = asyncio.Lock()

config_loader instance-attribute

config_loader = config_loader

git_context instance-attribute

git_context = get_instance()

repo_path instance-attribute

repo_path = repo_root

repo_checksum_calculator instance-attribute

repo_checksum_calculator: RepoChecksumCalculator | None = (
	None
)

embedding_model_name instance-attribute

embedding_model_name: str = 'minishlab/potion-base-8M'

qdrant_manager instance-attribute

qdrant_manager = QdrantManager(**qdrant_init_args)

is_async_initialized instance-attribute

is_async_initialized = False

watcher instance-attribute

watcher: Watcher | None = None

analyzer property

Lazily initialize and return a shared TreeSitterAnalyzer instance.

Returns:

Name Type Description
TreeSitterAnalyzer TreeSitterAnalyzer

The shared analyzer instance.

chunker property

Lazily initialize and return a TreeSitterChunker.

Returns:

Name Type Description
TreeSitterChunker TreeSitterChunker

The chunker instance.

db_client property

db_client: DatabaseClient

Lazily initialize and return a DatabaseClient instance.

Returns:

Name Type Description
DatabaseClient DatabaseClient

The database client instance.

Raises:

Type Description
RuntimeError

If the DatabaseClient cannot be initialized.

vector_synchronizer property

vector_synchronizer: VectorSynchronizer

Lazily initialize and return a VectorSynchronizer.

Returns:

Name Type Description
VectorSynchronizer VectorSynchronizer

The synchronizer instance.

async_init async

async_init(sync_on_init: bool = True) -> None

Perform asynchronous initialization steps, including Qdrant connection and initial sync.

Parameters:

Name Type Description Default
sync_on_init bool

If True, run database synchronization during initialization.

True
update_progress

Optional ProgressUpdater instance for progress updates.

required
Source code in src/codemap/processor/pipeline.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
async def async_init(self, sync_on_init: bool = True) -> None:
	"""
	Perform asynchronous initialization steps, including Qdrant connection and initial sync.

	Args:
	    sync_on_init: If True, run database synchronization during initialization.
	    update_progress: Optional ProgressUpdater instance for progress updates.

	"""
	if self.is_async_initialized:
		logger.info("Pipeline already async initialized.")
		return

	with progress_indicator("Initializing pipeline components..."):
		try:
			# Get embedding configuration for Qdrant URL
			embedding_config = self.config_loader.get.embedding
			qdrant_url = embedding_config.url

			# Check for Docker containers
			if qdrant_url:
				with progress_indicator("Checking Docker containers..."):
					# Only check Docker if we're using a URL that looks like localhost/127.0.0.1
					if "localhost" in qdrant_url or "127.0.0.1" in qdrant_url:
						logger.info("Ensuring Qdrant container is running")
						success, message = await ensure_qdrant_running(wait_for_health=True, qdrant_url=qdrant_url)

						if not success:
							logger.warning(f"Docker check failed: {message}")

						else:
							logger.info(f"Docker container check: {message}")

			# Initialize Qdrant client (connects, creates collection if needed)
			if self.qdrant_manager:
				with progress_indicator("Initializing Qdrant manager..."):
					await self.qdrant_manager.initialize()
					logger.info("Qdrant manager initialized asynchronously.")
			else:
				# This case should theoretically not happen if __init__ succeeded
				msg = "QdrantManager was not initialized in __init__."
				logger.error(msg)
				raise RuntimeError(msg)

			needs_sync = False
			if sync_on_init:
				needs_sync = True
				logger.info("`sync_on_init` is True. Performing index synchronization...")
			else:
				# Optional: Could add a check here if Qdrant collection is empty
				# requires another call to qdrant_manager, e.g., get_count()
				logger.info("Skipping sync on init as requested.")
				needs_sync = False

			# Set initialized flag *before* potentially long sync operation
			self.is_async_initialized = True
			logger.info("ProcessingPipeline async core components initialized.")

			if needs_sync:
				await self.sync_databases()

		except Exception:
			logger.exception("Failed during async initialization")
			# Optionally re-raise or handle specific exceptions
			raise

stop async

stop() -> None

Stops the pipeline and releases resources, including closing Qdrant connection.

Source code in src/codemap/processor/pipeline.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
async def stop(self) -> None:
	"""Stops the pipeline and releases resources, including closing Qdrant connection."""
	logger.info("Stopping ProcessingPipeline asynchronously...")
	if self.qdrant_manager:
		await self.qdrant_manager.close()
		self.qdrant_manager = None  # type: ignore[assignment]
	else:
		logger.warning("Qdrant Manager already None during stop.")

	# Stop the watcher if it's running
	if self._watcher_task and not self._watcher_task.done():
		logger.info("Stopping file watcher...")
		self._watcher_task.cancel()
		try:
			await self._watcher_task  # Allow cancellation to propagate
		except asyncio.CancelledError:
			logger.info("File watcher task cancelled.")
		if self.watcher:
			self.watcher.stop()
			logger.info("File watcher stopped.")
		self.watcher = None
		self._watcher_task = None

	# Other cleanup if needed
	self.is_async_initialized = False
	logger.info("ProcessingPipeline stopped.")

sync_databases async

sync_databases() -> None

Asynchronously synchronize the Qdrant index with the Git repository state.

Parameters:

Name Type Description Default
update_progress

Optional ProgressUpdater instance for progress updates.

required
Source code in src/codemap/processor/pipeline.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
async def sync_databases(self) -> None:
	"""
	Asynchronously synchronize the Qdrant index with the Git repository state.

	Args:
	    update_progress: Optional ProgressUpdater instance for progress updates.

	"""
	if not self.is_async_initialized:
		logger.error("Cannot sync databases, async initialization not complete.")
		return

	# Acquire lock only if not already held (for watcher calls)
	if not self._sync_lock.locked():
		async with self._sync_lock:
			logger.info("Starting vector index synchronization using VectorSynchronizer...")
			# VectorSynchronizer handles its own progress updates internally now
			await self.vector_synchronizer.sync_index()
			# Final status message/logging is handled by sync_index
	else:
		# If lock is already held (likely by watcher call), just run it
		logger.info("Starting vector index synchronization (lock already held)...")
		await self.vector_synchronizer.sync_index()

initialize_watcher

initialize_watcher(debounce_delay: float = 2.0) -> None

Initialize the file watcher.

Parameters:

Name Type Description Default
debounce_delay float

Delay in seconds before triggering sync after a file change.

2.0
Source code in src/codemap/processor/pipeline.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
def initialize_watcher(self, debounce_delay: float = 2.0) -> None:
	"""
	Initialize the file watcher.

	Args:
	    debounce_delay: Delay in seconds before triggering sync after a file change.

	"""
	if not self.repo_path:
		logger.error("Cannot initialize watcher without a repository path.")
		return

	if self.watcher:
		logger.warning("Watcher already initialized.")
		return

	logger.info(f"Initializing file watcher for path: {self.repo_path}")
	try:
		self.watcher = Watcher(
			path_to_watch=self.repo_path,
			on_change_callback=self._sync_callback_wrapper,  # Use the lock wrapper
			debounce_delay=debounce_delay,
		)
		logger.info("File watcher initialized.")
	except ValueError:
		logger.exception("Failed to initialize watcher")
		self.watcher = None
	except Exception:
		logger.exception("Unexpected error initializing watcher.")
		self.watcher = None

start_watcher async

start_watcher() -> None

Start the file watcher in the background.

initialize_watcher must be called first.

Source code in src/codemap/processor/pipeline.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
async def start_watcher(self) -> None:
	"""
	Start the file watcher in the background.

	`initialize_watcher` must be called first.

	"""
	if not self.watcher:
		logger.error("Watcher not initialized. Call initialize_watcher() first.")
		return

	if self._watcher_task and not self._watcher_task.done():
		logger.warning("Watcher task is already running.")
		return

	logger.info("Starting file watcher task in the background...")
	# Create a task to run the watcher's start method asynchronously
	self._watcher_task = asyncio.create_task(self.watcher.start())
semantic_search(
	query: str,
	k: int = 5,
	filter_params: dict[str, Any] | None = None,
) -> list[dict[str, Any]] | None

Perform semantic search for code chunks similar to the query using Qdrant.

Parameters:

Name Type Description Default
query str

The search query string.

required
k int

The number of top similar results to retrieve.

5
filter_params dict[str, Any] | None

Optional dictionary for filtering results. Supports: - exact match: {"field": "value"} or {"match": {"field": "value"}} - multiple values: {"match_any": {"field": ["value1", "value2"]}} - range: {"range": {"field": {"gt": value, "lt": value}}} - complex: {"must": [...], "should": [...], "must_not": [...]}

None

Returns:

Type Description
list[dict[str, Any]] | None

A list of search result dictionaries (Qdrant ScoredPoint converted to dict),

list[dict[str, Any]] | None

or None if an error occurs.

Source code in src/codemap/processor/pipeline.py
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
async def semantic_search(
	self,
	query: str,
	k: int = 5,
	filter_params: dict[str, Any] | None = None,
) -> list[dict[str, Any]] | None:
	"""
	Perform semantic search for code chunks similar to the query using Qdrant.

	Args:
	    query: The search query string.
	    k: The number of top similar results to retrieve.
	    filter_params: Optional dictionary for filtering results. Supports:
	        - exact match: {"field": "value"} or {"match": {"field": "value"}}
	        - multiple values: {"match_any": {"field": ["value1", "value2"]}}
	        - range: {"range": {"field": {"gt": value, "lt": value}}}
	        - complex: {"must": [...], "should": [...], "must_not": [...]}

	Returns:
	    A list of search result dictionaries (Qdrant ScoredPoint converted to dict),
	    or None if an error occurs.

	"""
	if not self.is_async_initialized or not self.qdrant_manager:
		logger.error("QdrantManager not available for semantic search.")
		return None

	logger.debug("Performing semantic search for query: '%s', k=%d", query, k)

	try:
		# 1. Generate query embedding (must be async)
		query_embedding = generate_embedding([query], self.config_loader)
		if query_embedding is None:
			logger.error("Failed to generate embedding for query.")
			return None

		# Convert to numpy array if needed by Qdrant client, though list is often fine
		# query_vector = np.array(query_embedding, dtype=np.float32)
		query_vector = query_embedding[0]  # Qdrant client typically accepts list[float]

		# 2. Process filter parameters to Qdrant filter format
		query_filter = None
		if filter_params:
			query_filter = self._build_qdrant_filter(filter_params)
			logger.debug("Using filter for search: %s", query_filter)

		# 3. Query Qdrant index (must be async)
		search_results: list[qdrant_models.ScoredPoint] = await self.qdrant_manager.search(
			query_vector, k, query_filter=query_filter
		)

		if not search_results:
			logger.debug("Qdrant search returned no results.")
			return []

		# 4. Format results (convert ScoredPoint to dictionary)
		formatted_results = []
		for scored_point in search_results:
			# Convert Qdrant model to dict for consistent output
			# Include score (similarity) and payload
			result_dict = {
				"id": str(scored_point.id),  # Ensure ID is string
				"score": scored_point.score,
				"payload": scored_point.payload,
				# Optionally include version if needed
				# "version": scored_point.version,
			}
			formatted_results.append(result_dict)

		logger.debug("Semantic search found %d results.", len(formatted_results))
		return formatted_results

	except Exception:
		logger.exception("Error during semantic search.")
		return None

__aenter__ async

__aenter__() -> Self

Return self for use as async context manager.

Source code in src/codemap/processor/pipeline.py
627
628
629
630
631
632
async def __aenter__(self) -> Self:
	"""Return self for use as async context manager."""
	# Basic initialization is sync, async init must be called separately
	# Consider if automatic async_init here is desired, or keep it explicit
	# await self.async_init() # Example if auto-init is desired
	return self

__aexit__ async

__aexit__(
	exc_type: type[BaseException] | None,
	exc_val: BaseException | None,
	exc_tb: TracebackType | None,
) -> None

Clean up resources when exiting the async context manager.

Source code in src/codemap/processor/pipeline.py
634
635
636
637
638
async def __aexit__(
	self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
	"""Clean up resources when exiting the async context manager."""
	await self.stop()