classVectorSynchronizer:"""Handles asynchronous synchronization between Git repository and Qdrant vector index."""def__init__(self,repo_path:Path,qdrant_manager:QdrantManager,chunker:TreeSitterChunker|None,embedding_model_name:str,analyzer:TreeSitterAnalyzer|None=None,config_loader:"ConfigLoader | None"=None,repo_checksum_calculator:RepoChecksumCalculator|None=None,)->None:""" Initialize the vector synchronizer. Args: repo_path: Path to the git repository root. qdrant_manager: Instance of QdrantManager to handle vector storage. chunker: Instance of chunker used to create code chunks. embedding_model_name: Name of the embedding model to use. analyzer: Optional TreeSitterAnalyzer instance. config_loader: Configuration loader instance. repo_checksum_calculator: Optional RepoChecksumCalculator instance. """self.repo_path=repo_pathself.qdrant_manager=qdrant_managerself.git_context=ExtendedGitRepoContext.get_instance()self.embedding_model_name=embedding_model_nameself.analyzer=analyzerorTreeSitterAnalyzer()# Ensure RepoChecksumCalculator is instantiated with git_contextifrepo_checksum_calculatorisNone:self.repo_checksum_calculator=RepoChecksumCalculator.get_instance(repo_path=self.repo_path,git_context=self.git_context,config_loader=config_loader)else:self.repo_checksum_calculator=repo_checksum_calculator# Ensure existing calculator also has git_context, as it might be crucial for branch logicifself.repo_checksum_calculator.git_contextisNoneandself.git_context:self.repo_checksum_calculator.git_context=self.git_contextifconfig_loader:self.config_loader=config_loaderelse:fromcodemap.configimportConfigLoaderself.config_loader=ConfigLoader.get_instance()embedding_config=self.config_loader.get.embeddingself.qdrant_batch_size=embedding_config.qdrant_batch_sizeifchunkerisNone:self.chunker=TreeSitterChunker(git_context=self.git_context,config_loader=self.config_loader,repo_checksum_calculator=self.repo_checksum_calculator,)else:ifgetattr(chunker,"git_context",None)isNone:chunker.git_context=self.git_contextif(hasattr(chunker,"repo_checksum_calculator")andgetattr(chunker,"repo_checksum_calculator",None)isNoneandself.repo_checksum_calculator):chunker.repo_checksum_calculator=self.repo_checksum_calculatorself.chunker=chunkerlogger.info(f"VectorSynchronizer initialized for repo: {repo_path} "f"using Qdrant collection: '{qdrant_manager.collection_name}' "f"and embedding model: {embedding_model_name}")ifnotself.repo_checksum_calculator:logger.warning("RepoChecksumCalculator could not be initialized. Checksum-based sync will be skipped.")def_get_checksum_cache_path(self)->Path:"""Gets the path to the checksum cache file within .codemap_cache."""# Ensure .codemap_cache directory is at the root of the repo_path passed to VectorSynchronizercache_dir=self.repo_path/".codemap_cache"returncache_dir/"last_sync_checksum.txt"def_read_last_sync_checksum(self)->str|None:"""Reads the last successful sync checksum from the cache file."""cache_file=self._get_checksum_cache_path()try:ifcache_file.exists():returncache_file.read_text().strip()exceptOSErrorase:logger.warning(f"Error reading checksum cache file {cache_file}: {e}")returnNonedef_write_last_sync_checksum(self,checksum:str)->None:"""Writes the given checksum to the cache file."""cache_file=self._get_checksum_cache_path()try:cache_file.parent.mkdir(parents=True,exist_ok=True)# Ensure .codemap_cache existscache_file.write_text(checksum)logger.info(f"Updated checksum cache file {cache_file} with checksum {checksum[:8]}...")exceptOSError:logger.exception(f"Error writing checksum cache file {cache_file}")asyncdef_generate_chunks_for_file(self,file_path_str:str,file_hash:str)->list[dict[str,Any]]:"""Helper coroutine to generate chunks for a single file. File hash is passed for context (e.g. git hash or content hash). """chunks_for_file=[]absolute_path=self.repo_path/file_path_strtry:# Pass the file_hash (which could be git_hash for tracked or content_hash for untracked) to chunkerfile_chunks_generator=self.chunker.chunk_file(absolute_path,git_hash=file_hash)iffile_chunks_generator:chunks_for_file.extend([chunk.model_dump()forchunkinfile_chunks_generator])logger.debug(f"Generated {len(chunks_for_file)} chunks for {file_path_str}")else:logger.debug(f"No chunks generated for file: {file_path_str}")exceptException:logger.exception(f"Error processing file {file_path_str} during chunk generation")returnchunks_for_fileasyncdef_get_qdrant_state(self)->dict[str,set[tuple[str,str]]]:r""" Retrieves the current state from Qdrant. Maps file paths to sets of (chunk_id, git_or_content_hash). The hash stored in Qdrant should be the file\'s content hash for untracked or git_hash for tracked files. """awaitself.qdrant_manager.initialize()logger.info("Retrieving current state from Qdrant collection...")qdrant_state:dict[str,set[tuple[str,str]]]=defaultdict(set)all_ids=awaitself.qdrant_manager.get_all_point_ids_with_filter()logger.info(f"[State Check] Retrieved {len(all_ids)} point IDs from Qdrant.")payloads={}ifall_ids:foriinrange(0,len(all_ids),self.qdrant_batch_size):batch_ids=all_ids[i:i+self.qdrant_batch_size]batch_payloads=awaitself.qdrant_manager.get_payloads_by_ids(batch_ids)payloads.update(batch_payloads)logger.info(f"[State Check] Retrieved {len(payloads)} payloads from Qdrant.")processed_count=0forpoint_id,payload_dictinpayloads.items():ifpayload_dict:file_metadata=payload_dict.get("file_metadata")file_path_val:str|None=Nonecomparison_hash_for_state:str|None=Noneifisinstance(file_metadata,dict):file_path_val=file_metadata.get("file_path")file_content_hash_from_meta=file_metadata.get("file_content_hash")git_metadata=payload_dict.get("git_metadata")ifisinstance(git_metadata,dict)andgit_metadata.get("tracked")isTrue:git_hash_from_meta=git_metadata.get("git_hash")ifisinstance(git_hash_from_meta,str):comparison_hash_for_state=git_hash_from_metaelifisinstance(file_content_hash_from_meta,str):comparison_hash_for_state=file_content_hash_from_metaif(isinstance(file_path_val,str)andfile_path_val.strip()andisinstance(comparison_hash_for_state,str)):qdrant_state[file_path_val].add((str(point_id),comparison_hash_for_state))processed_count+=1else:logger.warning(f"Point {point_id} in Qdrant is missing file_path or comparison_hash. "f"Payload components: file_path_val={file_path_val}, "f"comparison_hash_for_state={comparison_hash_for_state}")continueelse:logger.warning(f"Point ID {point_id} has an empty or None payload. Skipping.")logger.info(f"Retrieved state for {len(qdrant_state)} files ({processed_count} chunks) from Qdrant.")returnqdrant_stateasyncdef_compare_states(self,current_file_hashes:dict[str,str],# relative_path -> content_hash (from all_repo_files)previous_nodes_map:dict[str,dict[str,str]]|None,# path -> {"type": "file"|"dir", "hash": hash_val}qdrant_state:dict[str,set[tuple[str,str]]],# file_path -> set of (chunk_id, git_or_content_hash_in_db))->tuple[set[str],set[str]]:""" Compare current file hashes with previous checksum map and Qdrant state. Args: current_file_hashes: Current state of files in repo (path -> content/git hash). previous_nodes_map: Previously stored checksum map (path -> {type, hash}). qdrant_state: Current state of chunks in Qdrant. Returns: tuple[set[str], set[str]]: - files_to_process: Relative paths of files that are new or changed. - chunks_to_delete_from_qdrant: Specific Qdrant chunk_ids to delete. """files_to_process:set[str]=set()chunks_to_delete_from_qdrant:set[str]=set()processed_qdrant_paths_this_cycle:set[str]=set()# 1. Determine files to process based on current vs. previous checksumsforfile_path,current_hashincurrent_file_hashes.items():previous_file_hash:str|None=Noneifprevious_nodes_mapandfile_pathinprevious_nodes_map:node_info=previous_nodes_map[file_path]ifnode_info.get("type")=="file":previous_file_hash=node_info.get("hash")ifprevious_file_hashisNoneorprevious_file_hash!=current_hash:logger.info(f"[Compare] File '{file_path}' is new or changed. "f"Old hash: {previous_file_hash}, New hash: {current_hash}")files_to_process.add(file_path)# If the file was present and its hash matches, it can be skippediffile_pathinqdrant_state:logger.info(f"[Compare] Marking existing Qdrant chunks for changed file '{file_path}' for deletion.")chunks_to_delete_from_qdrant.update(cidforcid,_inqdrant_state[file_path])processed_qdrant_paths_this_cycle.add(file_path)# Mark as seen from current repo state# 2. Determine files/chunks to delete based on previous checksums vs. currentifprevious_nodes_map:forold_path,node_infoinprevious_nodes_map.items():ifnode_info.get("type")=="file"andold_pathnotincurrent_file_hashes:logger.info(f"[Compare] File '{old_path}' was in previous checksum but not current. Deleting from Qdrant.")ifold_pathinqdrant_state:forchunk_id,_inqdrant_state[old_path]:chunks_to_delete_from_qdrant.add(chunk_id)processed_qdrant_paths_this_cycle.add(old_path)# Mark as seen from previous repo state# 3. Clean up any orphaned Qdrant entries not covered by current or previous valid states# These might be from very old states or errors.all_known_valid_paths=set(current_file_hashes.keys())ifprevious_nodes_map:all_known_valid_paths.update(pforp,infoinprevious_nodes_map.items()ifinfo.get("type")=="file")forqdrant_file_path,qdrant_chunks_setinqdrant_state.items():ifqdrant_file_pathnotinall_known_valid_paths:logger.warning(f"Orphaned file_path '{qdrant_file_path}' in Qdrant not found in current ""or previous valid checksums. Deleting its chunks.")forchunk_id,_inqdrant_chunks_set:chunks_to_delete_from_qdrant.add(chunk_id)logger.info(f"[Compare States] Result: {len(files_to_process)} files to process/reprocess, "# noqa: S608f"{len(chunks_to_delete_from_qdrant)} specific chunks to delete from Qdrant.")# The second element of the tuple (files_whose_chunks_are_all_deleted) is no longer# explicitly needed with this logic.returnfiles_to_process,chunks_to_delete_from_qdrantasyncdef_process_and_upsert_batch(self,chunk_batch:list[dict[str,Any]])->int:"""Process a batch of chunks by generating embeddings and upserting to Qdrant."""ifnotchunk_batch:return0deduplicated_batch=[]seen_content_hashes=set()forchunkinchunk_batch:content_hash=chunk["metadata"].get("content_hash","")file_metadata_dict=chunk["metadata"].get("file_metadata",{})file_content_hash=(file_metadata_dict.get("file_content_hash","")ifisinstance(file_metadata_dict,dict)else"")dedup_key=f"{content_hash}:{file_content_hash}"ifdedup_keynotinseen_content_hashes:seen_content_hashes.add(dedup_key)deduplicated_batch.append(chunk)iflen(deduplicated_batch)<len(chunk_batch):logger.info(f"Removed {len(chunk_batch)-len(deduplicated_batch)} "f"duplicate chunks, processing {len(deduplicated_batch)} unique chunks")ifnotdeduplicated_batch:# If all chunks were duplicateslogger.info("All chunks in the batch were duplicates. Nothing to process.")return0logger.info(f"Processing batch of {len(deduplicated_batch)} unique chunks for embedding and upsert.")texts_to_embed=[chunk["content"]forchunkindeduplicated_batch]embeddings=generate_embedding(texts_to_embed,self.config_loader)ifembeddingsisNoneorlen(embeddings)!=len(deduplicated_batch):logger.error("Embed batch failed: "f"got {len(embeddings)ifembeddingselse0}, expected {len(deduplicated_batch)}. Skipping.")failed_files={chunk["metadata"].get("file_path","unknown")forchunkindeduplicated_batch}logger.error(f"Failed batch involved files: {failed_files}")return0points_to_upsert=[]forchunk,embeddinginzip(deduplicated_batch,embeddings,strict=True):original_file_path_str=chunk["metadata"].get("file_path","unknown")chunk_id=str(uuid.uuid4())chunk["metadata"]["chunk_id"]=chunk_idchunk["metadata"]["file_path"]=original_file_path_strpayload:dict[str,Any]=cast("dict[str, Any]",chunk["metadata"])point=create_qdrant_point(chunk_id,embedding,payload)points_to_upsert.append(point)ifpoints_to_upsert:awaitself.qdrant_manager.upsert_points(points_to_upsert)logger.info(f"Successfully upserted {len(points_to_upsert)} points from batch.")returnlen(points_to_upsert)logger.warning("No points generated from batch to upsert.")return0asyncdefsync_index(self)->bool:""" Asynchronously synchronize the Qdrant index with the current repository state. Returns True if synchronization completed successfully, False otherwise. """sync_success=Falsecurrent_repo_root_checksum:str|None=None# This local variable will hold the map for the current sync operation.current_nodes_map:dict[str,dict[str,str]]={}# Initialize as emptyprevious_root_hash:str|None=Noneprevious_nodes_map:dict[str,dict[str,str]]|None=Noneifself.repo_checksum_calculator:# Attempt to read the checksum from the last successful sync for the current branchlogger.info("Attempting to read latest checksum data for current branch...")prev_hash,prev_map=self.repo_checksum_calculator.read_latest_checksum_data_for_current_branch()ifprev_hash:previous_root_hash=prev_hashifprev_map:previous_nodes_map=prev_maptry:# calculate_repo_checksum returns: tuple[str, dict[str, dict[str, str]]]# Renamed local var to avoid confusion if self.all_nodes_map_from_checksum is used elsewhere(calculated_root_hash,calculated_nodes_map,)=awaitself.repo_checksum_calculator.calculate_repo_checksum()current_repo_root_checksum=calculated_root_hashself.all_nodes_map_from_checksum=calculated_nodes_map# Store the fresh map on selfcurrent_nodes_map=self.all_nodes_map_from_checksum# Use this fresh map for the current sync# Quick sync: If root checksums match, assume no changes and skip detailed comparison.ifprevious_root_hashandcurrent_repo_root_checksum==previous_root_hash:branch_name=(self.repo_checksum_calculator.git_context.get_current_branch()ifself.repo_checksum_calculator.git_contextelse"unknown")logger.info(f"Root checksum ({current_repo_root_checksum}) matches "f"previous state for branch '{branch_name}'. ""Quick sync indicates no changes needed.")# Consider updating a 'last_synced_timestamp' or similar marker here if needed.returnTrue# Successfully synced (no changes)logger.info("Root checksum mismatch or no previous checksum. Proceeding with detailed comparison and sync.")exceptException:# pylint: disable=broad-exceptlogger.exception("Error calculating repository checksum. ""Proceeding with full comparison using potentially stale or no current checksum data.")# current_nodes_map remains {}, signifying we couldn't get a fresh current state.# This will be handled by the check below.else:logger.warning("RepoChecksumCalculator not available. Cannot perform checksum-based ""quick sync, read previous checksum, or get fresh current state. ""Proceeding with comparison based on Qdrant state only if necessary, ""but sync will likely be incomplete.")# previous_root_hash and previous_nodes_map remain None.# current_nodes_map remains {}, signifying we couldn't get a fresh current state.# This will be handled by the check below.# Populate current_file_hashes from the local current_nodes_map.# current_nodes_map will be populated if checksum calculation succeeded, otherwise it's {}.current_file_hashes:dict[str,str]={}ifnotcurrent_nodes_map:# Checks if the map is empty# This means checksum calculation failed or RepoChecksumCalculator was not available.# We cannot reliably determine the current state of files.logger.error("Current repository file map is empty (failed to calculate checksums ""or RepoChecksumCalculator missing). Cannot proceed with accurate sync ""as current file states are unknown.")returnFalse# Cannot sync without knowing current file states.# If current_nodes_map is not empty, proceed to populate current_file_hashesforpath,node_infoincurrent_nodes_map.items():ifnode_info.get("type")=="file"and"hash"innode_info:# Ensure hash key existscurrent_file_hashes[path]=node_info["hash"]# If current_nodes_map was valid (not empty) but contained no files (e.g. empty repo),# current_file_hashes will be empty. This is a valid state for _compare_states.# Get the current state from Qdrantqdrant_state=awaitself._get_qdrant_state()withprogress_indicator("Comparing repository state with vector state..."):files_to_process,chunks_to_delete=awaitself._compare_states(current_file_hashes,previous_nodes_map,qdrant_state)withprogress_indicator(f"Deleting {len(chunks_to_delete)} outdated vectors..."):ifchunks_to_delete:delete_ids_list=list(chunks_to_delete)foriinrange(0,len(delete_ids_list),self.qdrant_batch_size):batch_ids_to_delete=delete_ids_list[i:i+self.qdrant_batch_size]awaitself.qdrant_manager.delete_points(batch_ids_to_delete)logger.info(f"Deleted batch of {len(batch_ids_to_delete)} vectors.")logger.info(f"Finished deleting {len(chunks_to_delete)} vectors.")else:logger.info("No vectors to delete.")# Step: Update git_metadata for files whose content hasn't changed but Git status might havelogger.info("Checking for Git metadata updates for unchanged files...")# Candidate files: in current repo, content hash same as previous, so not in files_to_processfiles_to_check_for_git_metadata_update=set(current_file_hashes.keys())-files_to_processchunk_ids_to_fetch_payloads_for_meta_check:list[str]=[]# Maps file_path_str to list of its chunk_ids that are candidates for metadata updatecandidate_file_to_chunks_map:dict[str,list[str]]=defaultdict(list)forfile_path_strinfiles_to_check_for_git_metadata_update:iffile_path_strnotinqdrant_state:# No existing chunks for this file in Qdrantcontinue# Consider only chunks that are not already marked for deletionchunks_for_this_file=[cidforcid,_inqdrant_state[file_path_str]ifcidnotinchunks_to_delete]ifchunks_for_this_file:# qdrant_state stores chunk_ids as strings (derived from ExtendedPointId)chunk_ids_to_fetch_payloads_for_meta_check.extend(chunks_for_this_file)candidate_file_to_chunks_map[file_path_str]=chunks_for_this_file# Batch fetch payloads for all potentially affected chunksfetched_payloads_for_meta_check:dict[str,dict[str,Any]]={}ifchunk_ids_to_fetch_payloads_for_meta_check:logger.info(f"Fetching payloads for {len(chunk_ids_to_fetch_payloads_for_meta_check)} chunks to check Git metadata.")foriinrange(0,len(chunk_ids_to_fetch_payloads_for_meta_check),self.qdrant_batch_size):batch_ids=chunk_ids_to_fetch_payloads_for_meta_check[i:i+self.qdrant_batch_size]# Cast to satisfy linter for QdrantManager's expected typetyped_batch_ids=cast("list[str | int | uuid.UUID]",batch_ids)batch_payloads=awaitself.qdrant_manager.get_payloads_by_ids(typed_batch_ids)fetched_payloads_for_meta_check.update(batch_payloads)# Dictionary to group chunk_ids by the required new git_metadata# Key: frozenset of new_git_metadata.items() to make it hashable# Value: list of chunk_ids (strings)git_metadata_update_groups:dict[frozenset,list[str]]=defaultdict(list)forfile_path_str,chunk_ids_in_fileincandidate_file_to_chunks_map.items():current_is_tracked=self.git_context.is_file_tracked(file_path_str)current_branch=self.git_context.get_current_branch()current_git_hash_for_file:str|None=Noneifcurrent_is_tracked:try:# This should be the blob OID for the filecurrent_git_hash_for_file=self.git_context.get_file_git_hash(file_path_str)exceptException:# noqa: BLE001logger.warning(f"Could not get git hash for tracked file {file_path_str} during metadata update check.",exc_info=True,)required_new_git_metadata={"tracked":current_is_tracked,"branch":current_branch,"git_hash":current_git_hash_for_file,# Will be None if untracked or error getting hash}forchunk_idinchunk_ids_in_file:# chunk_id is already a stringchunk_payload=fetched_payloads_for_meta_check.get(chunk_id)ifnotchunk_payload:logger.warning(f"Payload not found for chunk {chunk_id} of file {file_path_str} ""during metadata check. Skipping this chunk.")continueold_git_metadata=chunk_payload.get("git_metadata")update_needed=Falseifnotisinstance(old_git_metadata,dict):update_needed=True# If no proper old metadata, or it's missing, update to currentelif(old_git_metadata.get("tracked")!=required_new_git_metadata["tracked"]orold_git_metadata.get("branch")!=required_new_git_metadata["branch"]orold_git_metadata.get("git_hash")!=required_new_git_metadata["git_hash"]):update_needed=Trueifupdate_needed:key=frozenset(required_new_git_metadata.items())git_metadata_update_groups[key].append(chunk_id)ifgit_metadata_update_groups:num_chunks_to_update=sum(len(ids)foridsingit_metadata_update_groups.values())logger.info(f"Found {num_chunks_to_update} chunks requiring Git metadata updates, "f"grouped into {len(git_metadata_update_groups)} unique metadata sets.")total_update_batches=sum((len(chunk_ids_group)+self.qdrant_batch_size-1)//self.qdrant_batch_sizeforchunk_ids_groupingit_metadata_update_groups.values())# Ensure total is at least 1 if there are groups, for progress bar logicprogress_total=(total_update_batchesiftotal_update_batches>0else(1ifgit_metadata_update_groupselse0))ifprogress_total>0:# Only show progress if there's something to dowithprogress_indicator("Applying Git metadata updates to chunks...",total=progress_total,style="progress",transient=True,)asupdate_meta_progress_bar:applied_batches_count=0fornew_meta_fset,chunk_ids_to_update_with_this_metaingit_metadata_update_groups.items():new_meta_dict=dict(new_meta_fset)payload_to_set={"git_metadata":new_meta_dict}foriinrange(0,len(chunk_ids_to_update_with_this_meta),self.qdrant_batch_size):batch_chunk_ids=chunk_ids_to_update_with_this_meta[i:i+self.qdrant_batch_size]ifbatch_chunk_ids:# Ensure batch is not empty# Cast to satisfy linter for QdrantManager's expected typetyped_point_ids=cast("list[str | int | uuid.UUID]",batch_chunk_ids)awaitself.qdrant_manager.set_payload(payload=payload_to_set,point_ids=typed_point_ids,filter_condition=models.Filter())logger.info(f"Updated git_metadata for {len(batch_chunk_ids)} chunks with: {new_meta_dict}")applied_batches_count+=1update_meta_progress_bar(None,applied_batches_count,None)# Update progress# Ensure progress bar completes if all batches were empty but groups existedifapplied_batches_count==0andgit_metadata_update_groups:update_meta_progress_bar(None,progress_total,None)# Force completionelifnum_chunks_to_update>0:# Log if groups existed but somehow total_progress was 0logger.info(f"Updating {num_chunks_to_update} chunks' Git metadata without progress bar (zero batches)")fornew_meta_fset,chunk_ids_to_update_with_this_metaingit_metadata_update_groups.items():new_meta_dict=dict(new_meta_fset)payload_to_set={"git_metadata":new_meta_dict}ifchunk_ids_to_update_with_this_meta:# Check if list is not empty# Cast to satisfy linter for QdrantManager's expected typetyped_point_ids_single_batch=cast("list[str | int | uuid.UUID]",chunk_ids_to_update_with_this_meta)awaitself.qdrant_manager.set_payload(payload=payload_to_set,point_ids=typed_point_ids_single_batch,filter_condition=models.Filter(),)logger.info(f"Updated git_metadata for {len(chunk_ids_to_update_with_this_meta)} "f"chunks (in a single batch) with: {new_meta_dict}")else:logger.info("No Git metadata updates required for existing chunks of unchanged files.")num_files_to_process=len(files_to_process)all_chunks:list[dict[str,Any]]=[]# Ensure all_chunks is initializedprocessed_files_count=0msg="Processing new/updated files..."withprogress_indicator(msg,style="progress",total=num_files_to_processifnum_files_to_process>0else1,# total must be > 0transient=True,)asupdate_file_progress:ifnum_files_to_process>0:processed_files_count=0# Wrapper coroutine to update progress as tasks completeasyncdefwrapped_generate_chunks(file_path:str,f_hash:str)->list[dict[str,Any]]:nonlocalprocessed_files_count# Allow modification of the outer scope variabletry:returnawaitself._generate_chunks_for_file(file_path,f_hash)finally:processed_files_count+=1update_file_progress(None,processed_files_count,None)tasks_to_gather=[]forfile_path_to_procinfiles_to_process:file_current_hash=current_file_hashes.get(file_path_to_proc)iffile_current_hash:tasks_to_gather.append(wrapped_generate_chunks(file_path_to_proc,file_current_hash))else:logger.warning(f"File '{file_path_to_proc}' marked to process but its current hash not found. Skipping.")# If a file is skipped, increment progress here as it won't be wrappedprocessed_files_count+=1update_file_progress(None,processed_files_count,None)iftasks_to_gather:logger.info(f"Concurrently generating chunks for {len(tasks_to_gather)} files...")list_of_chunk_lists=awaitasyncio.gather(*tasks_to_gather)all_chunks=[chunkforsublistinlist_of_chunk_listsforchunkinsublist]logger.info(f"Total chunks generated: {len(all_chunks)}.")else:logger.info("No files eligible for concurrent chunk generation.")# Final update to ensure the progress bar completes to 100% if some files were skipped# and caused processed_files_count to not reach num_files_to_process via the finally blocks alone.ifprocessed_files_count<num_files_to_process:update_file_progress(None,num_files_to_process,None)else:# num_files_to_process == 0logger.info("No new/updated files to process.")update_file_progress(None,1,None)# Complete the dummy task if total was 1withprogress_indicator("Processing chunks..."):awaitself._process_and_upsert_batch(all_chunks)sync_success=Truelogger.info("Vector index synchronization completed successfully.")returnsync_success
def__init__(self,repo_path:Path,qdrant_manager:QdrantManager,chunker:TreeSitterChunker|None,embedding_model_name:str,analyzer:TreeSitterAnalyzer|None=None,config_loader:"ConfigLoader | None"=None,repo_checksum_calculator:RepoChecksumCalculator|None=None,)->None:""" Initialize the vector synchronizer. Args: repo_path: Path to the git repository root. qdrant_manager: Instance of QdrantManager to handle vector storage. chunker: Instance of chunker used to create code chunks. embedding_model_name: Name of the embedding model to use. analyzer: Optional TreeSitterAnalyzer instance. config_loader: Configuration loader instance. repo_checksum_calculator: Optional RepoChecksumCalculator instance. """self.repo_path=repo_pathself.qdrant_manager=qdrant_managerself.git_context=ExtendedGitRepoContext.get_instance()self.embedding_model_name=embedding_model_nameself.analyzer=analyzerorTreeSitterAnalyzer()# Ensure RepoChecksumCalculator is instantiated with git_contextifrepo_checksum_calculatorisNone:self.repo_checksum_calculator=RepoChecksumCalculator.get_instance(repo_path=self.repo_path,git_context=self.git_context,config_loader=config_loader)else:self.repo_checksum_calculator=repo_checksum_calculator# Ensure existing calculator also has git_context, as it might be crucial for branch logicifself.repo_checksum_calculator.git_contextisNoneandself.git_context:self.repo_checksum_calculator.git_context=self.git_contextifconfig_loader:self.config_loader=config_loaderelse:fromcodemap.configimportConfigLoaderself.config_loader=ConfigLoader.get_instance()embedding_config=self.config_loader.get.embeddingself.qdrant_batch_size=embedding_config.qdrant_batch_sizeifchunkerisNone:self.chunker=TreeSitterChunker(git_context=self.git_context,config_loader=self.config_loader,repo_checksum_calculator=self.repo_checksum_calculator,)else:ifgetattr(chunker,"git_context",None)isNone:chunker.git_context=self.git_contextif(hasattr(chunker,"repo_checksum_calculator")andgetattr(chunker,"repo_checksum_calculator",None)isNoneandself.repo_checksum_calculator):chunker.repo_checksum_calculator=self.repo_checksum_calculatorself.chunker=chunkerlogger.info(f"VectorSynchronizer initialized for repo: {repo_path} "f"using Qdrant collection: '{qdrant_manager.collection_name}' "f"and embedding model: {embedding_model_name}")ifnotself.repo_checksum_calculator:logger.warning("RepoChecksumCalculator could not be initialized. Checksum-based sync will be skipped.")
asyncdefsync_index(self)->bool:""" Asynchronously synchronize the Qdrant index with the current repository state. Returns True if synchronization completed successfully, False otherwise. """sync_success=Falsecurrent_repo_root_checksum:str|None=None# This local variable will hold the map for the current sync operation.current_nodes_map:dict[str,dict[str,str]]={}# Initialize as emptyprevious_root_hash:str|None=Noneprevious_nodes_map:dict[str,dict[str,str]]|None=Noneifself.repo_checksum_calculator:# Attempt to read the checksum from the last successful sync for the current branchlogger.info("Attempting to read latest checksum data for current branch...")prev_hash,prev_map=self.repo_checksum_calculator.read_latest_checksum_data_for_current_branch()ifprev_hash:previous_root_hash=prev_hashifprev_map:previous_nodes_map=prev_maptry:# calculate_repo_checksum returns: tuple[str, dict[str, dict[str, str]]]# Renamed local var to avoid confusion if self.all_nodes_map_from_checksum is used elsewhere(calculated_root_hash,calculated_nodes_map,)=awaitself.repo_checksum_calculator.calculate_repo_checksum()current_repo_root_checksum=calculated_root_hashself.all_nodes_map_from_checksum=calculated_nodes_map# Store the fresh map on selfcurrent_nodes_map=self.all_nodes_map_from_checksum# Use this fresh map for the current sync# Quick sync: If root checksums match, assume no changes and skip detailed comparison.ifprevious_root_hashandcurrent_repo_root_checksum==previous_root_hash:branch_name=(self.repo_checksum_calculator.git_context.get_current_branch()ifself.repo_checksum_calculator.git_contextelse"unknown")logger.info(f"Root checksum ({current_repo_root_checksum}) matches "f"previous state for branch '{branch_name}'. ""Quick sync indicates no changes needed.")# Consider updating a 'last_synced_timestamp' or similar marker here if needed.returnTrue# Successfully synced (no changes)logger.info("Root checksum mismatch or no previous checksum. Proceeding with detailed comparison and sync.")exceptException:# pylint: disable=broad-exceptlogger.exception("Error calculating repository checksum. ""Proceeding with full comparison using potentially stale or no current checksum data.")# current_nodes_map remains {}, signifying we couldn't get a fresh current state.# This will be handled by the check below.else:logger.warning("RepoChecksumCalculator not available. Cannot perform checksum-based ""quick sync, read previous checksum, or get fresh current state. ""Proceeding with comparison based on Qdrant state only if necessary, ""but sync will likely be incomplete.")# previous_root_hash and previous_nodes_map remain None.# current_nodes_map remains {}, signifying we couldn't get a fresh current state.# This will be handled by the check below.# Populate current_file_hashes from the local current_nodes_map.# current_nodes_map will be populated if checksum calculation succeeded, otherwise it's {}.current_file_hashes:dict[str,str]={}ifnotcurrent_nodes_map:# Checks if the map is empty# This means checksum calculation failed or RepoChecksumCalculator was not available.# We cannot reliably determine the current state of files.logger.error("Current repository file map is empty (failed to calculate checksums ""or RepoChecksumCalculator missing). Cannot proceed with accurate sync ""as current file states are unknown.")returnFalse# Cannot sync without knowing current file states.# If current_nodes_map is not empty, proceed to populate current_file_hashesforpath,node_infoincurrent_nodes_map.items():ifnode_info.get("type")=="file"and"hash"innode_info:# Ensure hash key existscurrent_file_hashes[path]=node_info["hash"]# If current_nodes_map was valid (not empty) but contained no files (e.g. empty repo),# current_file_hashes will be empty. This is a valid state for _compare_states.# Get the current state from Qdrantqdrant_state=awaitself._get_qdrant_state()withprogress_indicator("Comparing repository state with vector state..."):files_to_process,chunks_to_delete=awaitself._compare_states(current_file_hashes,previous_nodes_map,qdrant_state)withprogress_indicator(f"Deleting {len(chunks_to_delete)} outdated vectors..."):ifchunks_to_delete:delete_ids_list=list(chunks_to_delete)foriinrange(0,len(delete_ids_list),self.qdrant_batch_size):batch_ids_to_delete=delete_ids_list[i:i+self.qdrant_batch_size]awaitself.qdrant_manager.delete_points(batch_ids_to_delete)logger.info(f"Deleted batch of {len(batch_ids_to_delete)} vectors.")logger.info(f"Finished deleting {len(chunks_to_delete)} vectors.")else:logger.info("No vectors to delete.")# Step: Update git_metadata for files whose content hasn't changed but Git status might havelogger.info("Checking for Git metadata updates for unchanged files...")# Candidate files: in current repo, content hash same as previous, so not in files_to_processfiles_to_check_for_git_metadata_update=set(current_file_hashes.keys())-files_to_processchunk_ids_to_fetch_payloads_for_meta_check:list[str]=[]# Maps file_path_str to list of its chunk_ids that are candidates for metadata updatecandidate_file_to_chunks_map:dict[str,list[str]]=defaultdict(list)forfile_path_strinfiles_to_check_for_git_metadata_update:iffile_path_strnotinqdrant_state:# No existing chunks for this file in Qdrantcontinue# Consider only chunks that are not already marked for deletionchunks_for_this_file=[cidforcid,_inqdrant_state[file_path_str]ifcidnotinchunks_to_delete]ifchunks_for_this_file:# qdrant_state stores chunk_ids as strings (derived from ExtendedPointId)chunk_ids_to_fetch_payloads_for_meta_check.extend(chunks_for_this_file)candidate_file_to_chunks_map[file_path_str]=chunks_for_this_file# Batch fetch payloads for all potentially affected chunksfetched_payloads_for_meta_check:dict[str,dict[str,Any]]={}ifchunk_ids_to_fetch_payloads_for_meta_check:logger.info(f"Fetching payloads for {len(chunk_ids_to_fetch_payloads_for_meta_check)} chunks to check Git metadata.")foriinrange(0,len(chunk_ids_to_fetch_payloads_for_meta_check),self.qdrant_batch_size):batch_ids=chunk_ids_to_fetch_payloads_for_meta_check[i:i+self.qdrant_batch_size]# Cast to satisfy linter for QdrantManager's expected typetyped_batch_ids=cast("list[str | int | uuid.UUID]",batch_ids)batch_payloads=awaitself.qdrant_manager.get_payloads_by_ids(typed_batch_ids)fetched_payloads_for_meta_check.update(batch_payloads)# Dictionary to group chunk_ids by the required new git_metadata# Key: frozenset of new_git_metadata.items() to make it hashable# Value: list of chunk_ids (strings)git_metadata_update_groups:dict[frozenset,list[str]]=defaultdict(list)forfile_path_str,chunk_ids_in_fileincandidate_file_to_chunks_map.items():current_is_tracked=self.git_context.is_file_tracked(file_path_str)current_branch=self.git_context.get_current_branch()current_git_hash_for_file:str|None=Noneifcurrent_is_tracked:try:# This should be the blob OID for the filecurrent_git_hash_for_file=self.git_context.get_file_git_hash(file_path_str)exceptException:# noqa: BLE001logger.warning(f"Could not get git hash for tracked file {file_path_str} during metadata update check.",exc_info=True,)required_new_git_metadata={"tracked":current_is_tracked,"branch":current_branch,"git_hash":current_git_hash_for_file,# Will be None if untracked or error getting hash}forchunk_idinchunk_ids_in_file:# chunk_id is already a stringchunk_payload=fetched_payloads_for_meta_check.get(chunk_id)ifnotchunk_payload:logger.warning(f"Payload not found for chunk {chunk_id} of file {file_path_str} ""during metadata check. Skipping this chunk.")continueold_git_metadata=chunk_payload.get("git_metadata")update_needed=Falseifnotisinstance(old_git_metadata,dict):update_needed=True# If no proper old metadata, or it's missing, update to currentelif(old_git_metadata.get("tracked")!=required_new_git_metadata["tracked"]orold_git_metadata.get("branch")!=required_new_git_metadata["branch"]orold_git_metadata.get("git_hash")!=required_new_git_metadata["git_hash"]):update_needed=Trueifupdate_needed:key=frozenset(required_new_git_metadata.items())git_metadata_update_groups[key].append(chunk_id)ifgit_metadata_update_groups:num_chunks_to_update=sum(len(ids)foridsingit_metadata_update_groups.values())logger.info(f"Found {num_chunks_to_update} chunks requiring Git metadata updates, "f"grouped into {len(git_metadata_update_groups)} unique metadata sets.")total_update_batches=sum((len(chunk_ids_group)+self.qdrant_batch_size-1)//self.qdrant_batch_sizeforchunk_ids_groupingit_metadata_update_groups.values())# Ensure total is at least 1 if there are groups, for progress bar logicprogress_total=(total_update_batchesiftotal_update_batches>0else(1ifgit_metadata_update_groupselse0))ifprogress_total>0:# Only show progress if there's something to dowithprogress_indicator("Applying Git metadata updates to chunks...",total=progress_total,style="progress",transient=True,)asupdate_meta_progress_bar:applied_batches_count=0fornew_meta_fset,chunk_ids_to_update_with_this_metaingit_metadata_update_groups.items():new_meta_dict=dict(new_meta_fset)payload_to_set={"git_metadata":new_meta_dict}foriinrange(0,len(chunk_ids_to_update_with_this_meta),self.qdrant_batch_size):batch_chunk_ids=chunk_ids_to_update_with_this_meta[i:i+self.qdrant_batch_size]ifbatch_chunk_ids:# Ensure batch is not empty# Cast to satisfy linter for QdrantManager's expected typetyped_point_ids=cast("list[str | int | uuid.UUID]",batch_chunk_ids)awaitself.qdrant_manager.set_payload(payload=payload_to_set,point_ids=typed_point_ids,filter_condition=models.Filter())logger.info(f"Updated git_metadata for {len(batch_chunk_ids)} chunks with: {new_meta_dict}")applied_batches_count+=1update_meta_progress_bar(None,applied_batches_count,None)# Update progress# Ensure progress bar completes if all batches were empty but groups existedifapplied_batches_count==0andgit_metadata_update_groups:update_meta_progress_bar(None,progress_total,None)# Force completionelifnum_chunks_to_update>0:# Log if groups existed but somehow total_progress was 0logger.info(f"Updating {num_chunks_to_update} chunks' Git metadata without progress bar (zero batches)")fornew_meta_fset,chunk_ids_to_update_with_this_metaingit_metadata_update_groups.items():new_meta_dict=dict(new_meta_fset)payload_to_set={"git_metadata":new_meta_dict}ifchunk_ids_to_update_with_this_meta:# Check if list is not empty# Cast to satisfy linter for QdrantManager's expected typetyped_point_ids_single_batch=cast("list[str | int | uuid.UUID]",chunk_ids_to_update_with_this_meta)awaitself.qdrant_manager.set_payload(payload=payload_to_set,point_ids=typed_point_ids_single_batch,filter_condition=models.Filter(),)logger.info(f"Updated git_metadata for {len(chunk_ids_to_update_with_this_meta)} "f"chunks (in a single batch) with: {new_meta_dict}")else:logger.info("No Git metadata updates required for existing chunks of unchanged files.")num_files_to_process=len(files_to_process)all_chunks:list[dict[str,Any]]=[]# Ensure all_chunks is initializedprocessed_files_count=0msg="Processing new/updated files..."withprogress_indicator(msg,style="progress",total=num_files_to_processifnum_files_to_process>0else1,# total must be > 0transient=True,)asupdate_file_progress:ifnum_files_to_process>0:processed_files_count=0# Wrapper coroutine to update progress as tasks completeasyncdefwrapped_generate_chunks(file_path:str,f_hash:str)->list[dict[str,Any]]:nonlocalprocessed_files_count# Allow modification of the outer scope variabletry:returnawaitself._generate_chunks_for_file(file_path,f_hash)finally:processed_files_count+=1update_file_progress(None,processed_files_count,None)tasks_to_gather=[]forfile_path_to_procinfiles_to_process:file_current_hash=current_file_hashes.get(file_path_to_proc)iffile_current_hash:tasks_to_gather.append(wrapped_generate_chunks(file_path_to_proc,file_current_hash))else:logger.warning(f"File '{file_path_to_proc}' marked to process but its current hash not found. Skipping.")# If a file is skipped, increment progress here as it won't be wrappedprocessed_files_count+=1update_file_progress(None,processed_files_count,None)iftasks_to_gather:logger.info(f"Concurrently generating chunks for {len(tasks_to_gather)} files...")list_of_chunk_lists=awaitasyncio.gather(*tasks_to_gather)all_chunks=[chunkforsublistinlist_of_chunk_listsforchunkinsublist]logger.info(f"Total chunks generated: {len(all_chunks)}.")else:logger.info("No files eligible for concurrent chunk generation.")# Final update to ensure the progress bar completes to 100% if some files were skipped# and caused processed_files_count to not reach num_files_to_process via the finally blocks alone.ifprocessed_files_count<num_files_to_process:update_file_progress(None,num_files_to_process,None)else:# num_files_to_process == 0logger.info("No new/updated files to process.")update_file_progress(None,1,None)# Complete the dummy task if total was 1withprogress_indicator("Processing chunks..."):awaitself._process_and_upsert_batch(all_chunks)sync_success=Truelogger.info("Vector index synchronization completed successfully.")returnsync_success