Skip to content

Utils

Utility functions for diff splitting.

get_language_specific_patterns

get_language_specific_patterns(language: str) -> list[str]

Get language-specific regex patterns for code structure.

Parameters:

Name Type Description Default
language str

Programming language identifier

required

Returns:

Type Description
list[str]

A list of regex patterns for the language, or empty list if not supported

Source code in src/codemap/git/diff_splitter/utils.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def get_language_specific_patterns(language: str) -> list[str]:
	"""
	Get language-specific regex patterns for code structure.

	Args:
	    language: Programming language identifier

	Returns:
	    A list of regex patterns for the language, or empty list if not supported

	"""
	# Define pattern strings (used for semantic boundary detection)
	pattern_strings = {
		"py": [
			r"^import\s+.*",  # Import statements
			r"^from\s+.*",  # From imports
			r"^class\s+\w+",  # Class definitions
			r"^def\s+\w+",  # Function definitions
			r"^if\s+__name__\s*==\s*['\"]__main__['\"]",  # Main block
		],
		"js": [
			r"^import\s+.*",  # ES6 imports
			r"^const\s+\w+\s*=\s*require",  # CommonJS imports
			r"^function\s+\w+",  # Function declarations
			r"^const\s+\w+\s*=\s*function",  # Function expressions
			r"^class\s+\w+",  # Class declarations
			r"^export\s+",  # Exports
		],
		"ts": [
			r"^import\s+.*",  # Imports
			r"^export\s+",  # Exports
			r"^interface\s+",  # Interfaces
			r"^type\s+",  # Type definitions
			r"^class\s+",  # Classes
			r"^function\s+",  # Functions
		],
		"jsx": [
			r"^import\s+.*",  # ES6 imports
			r"^const\s+\w+\s*=\s*require",  # CommonJS imports
			r"^function\s+\w+",  # Function declarations
			r"^const\s+\w+\s*=\s*function",  # Function expressions
			r"^class\s+\w+",  # Class declarations
			r"^export\s+",  # Exports
		],
		"tsx": [
			r"^import\s+.*",  # Imports
			r"^export\s+",  # Exports
			r"^interface\s+",  # Interfaces
			r"^type\s+",  # Type definitions
			r"^class\s+",  # Classes
			r"^function\s+",  # Functions
		],
		"java": [
			r"^import\s+.*",  # Import statements
			r"^public\s+class",  # Public class
			r"^private\s+class",  # Private class
			r"^(public|private|protected)(\s+static)?\s+\w+\s+\w+\(",  # Methods
		],
		"go": [
			r"^import\s+",  # Import statements
			r"^func\s+",  # Function definitions
			r"^type\s+\w+\s+struct",  # Struct definitions
		],
		"rb": [
			r"^require\s+",  # Requires
			r"^class\s+",  # Class definitions
			r"^def\s+",  # Method definitions
			r"^module\s+",  # Module definitions
		],
		"php": [
			r"^namespace\s+",  # Namespace declarations
			r"^use\s+",  # Use statements
			r"^class\s+",  # Class definitions
			r"^(public|private|protected)\s+function",  # Methods
		],
		"cs": [
			r"^using\s+",  # Using directives
			r"^namespace\s+",  # Namespace declarations
			r"^(public|private|protected|internal)\s+class",  # Classes
			r"^(public|private|protected|internal)(\s+static)?\s+\w+\s+\w+\(",  # Methods
		],
		"kt": [
			r"^import\s+.*",  # Import statements
			r"^class\s+\w+",  # Class definitions
			r"^fun\s+\w+",  # Function definitions
			r"^val\s+\w+",  # Val declarations
			r"^var\s+\w+",  # Var declarations
		],
		"scala": [
			r"^import\s+.*",  # Import statements
			r"^class\s+\w+",  # Class definitions
			r"^object\s+\w+",  # Object definitions
			r"^def\s+\w+",  # Method definitions
			r"^val\s+\w+",  # Val declarations
			r"^var\s+\w+",  # Var declarations
		],
	}

	# Return pattern strings for the language or empty list if not supported
	return pattern_strings.get(language, [])

determine_commit_type

determine_commit_type(files: list[str]) -> str

Determine the appropriate commit type based on the files.

Parameters:

Name Type Description Default
files list[str]

List of file paths

required

Returns:

Type Description
str

Commit type string (e.g., "feat", "fix", "test", "docs", "chore")

Source code in src/codemap/git/diff_splitter/utils.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def determine_commit_type(files: list[str]) -> str:
	"""
	Determine the appropriate commit type based on the files.

	Args:
	    files: List of file paths

	Returns:
	    Commit type string (e.g., "feat", "fix", "test", "docs", "chore")

	"""
	# Check for test files
	if any(f.startswith("tests/") or "_test." in f or "test_" in f for f in files):
		return "test"

	# Check for documentation files
	if any(f.startswith("docs/") or f.endswith(".md") for f in files):
		return "docs"

	# Check for configuration files
	if any(f.endswith((".json", ".yml", ".yaml", ".toml", ".ini", ".cfg")) for f in files):
		return "chore"

	# Default to "chore" for general updates
	return "chore"

create_chunk_description

create_chunk_description(
	commit_type: str, files: list[str]
) -> str

Create a meaningful description for a chunk.

Parameters:

Name Type Description Default
commit_type str

Type of commit (e.g., "feat", "fix")

required
files list[str]

List of file paths

required

Returns:

Type Description
str

Description string

Source code in src/codemap/git/diff_splitter/utils.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def create_chunk_description(commit_type: str, files: list[str]) -> str:
	"""
	Create a meaningful description for a chunk.

	Args:
	    commit_type: Type of commit (e.g., "feat", "fix")
	    files: List of file paths

	Returns:
	    Description string

	"""
	if len(files) == 1:
		return f"{commit_type}: update {files[0]}"

	# Try to find a common directory using Path for better cross-platform compatibility
	try:
		common_dir = Path(os.path.commonpath(files))
		if str(common_dir) not in (".", ""):
			return f"{commit_type}: update files in {common_dir}"
	except ValueError:
		# commonpath raises ValueError if files are on different drives
		pass

	return f"{commit_type}: update {len(files)} related files"

get_deleted_tracked_files

get_deleted_tracked_files() -> tuple[set, set]

Get list of deleted but tracked files from git status.

Returns:

Type Description
tuple[set, set]

Tuple of (deleted_unstaged_files, deleted_staged_files) as sets

Source code in src/codemap/git/diff_splitter/utils.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def get_deleted_tracked_files() -> tuple[set, set]:
	"""
	Get list of deleted but tracked files from git status.

	Returns:
	    Tuple of (deleted_unstaged_files, deleted_staged_files) as sets

	"""
	deleted_unstaged_files = set()
	deleted_staged_files = set()
	try:
		# Parse git status to find deleted files
		context = ExtendedGitRepoContext.get_instance()
		status = context.repo.status()
		for filepath, flags in status.items():
			if flags & FileStatus.WT_DELETED:  # Worktree deleted (unstaged)
				deleted_unstaged_files.add(filepath)
			if flags & FileStatus.INDEX_DELETED:  # Index deleted (staged)
				deleted_staged_files.add(filepath)
		logger.debug("Found %d deleted unstaged files in git status", len(deleted_unstaged_files))
		logger.debug("Found %d deleted staged files in git status", len(deleted_staged_files))
	except GitError as e:  # Catch specific GitError from context operations
		logger.warning(
			"Failed to get git status for deleted files via context: %s. Proceeding without deleted file info.", e
		)
	except Exception:  # Catch any other unexpected error
		logger.exception("Unexpected error getting git status: %s. Proceeding without deleted file info.")

	return deleted_unstaged_files, deleted_staged_files

filter_valid_files

filter_valid_files(
	files: list[str],
	repo_root: Path,
	is_test_environment: bool = False,
) -> tuple[list[str], list[str]]

Filter invalid filenames and files based on existence and Git tracking.

Parameters:

Name Type Description Default
files list[str]

List of file paths to filter

required
repo_root Path

Path to the repository root

required
is_test_environment bool

Whether running in a test environment

False

Returns:

Type Description
tuple[list[str], list[str]]

Tuple of (valid_files, empty_list) - The second element is always an empty list now.

Source code in src/codemap/git/diff_splitter/utils.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def filter_valid_files(
	files: list[str], repo_root: Path, is_test_environment: bool = False
) -> tuple[list[str], list[str]]:
	"""
	Filter invalid filenames and files based on existence and Git tracking.

	Args:
	    files: List of file paths to filter
	    repo_root: Path to the repository root
	    is_test_environment: Whether running in a test environment

	Returns:
	    Tuple of (valid_files, empty_list) - The second element is always an empty list now.

	"""
	if not files:
		return [], []

	valid_files_intermediate = []
	# Keep track of files filtered due to large size if needed elsewhere,
	# but don't remove them from processing yet.

	for file in files:
		# Skip files that look like patterns or templates
		if any(char in file for char in ["*", "+", "{", "}", "\\"]) or file.startswith('"'):
			logger.warning("Skipping invalid filename in diff processing: %s", file)
			continue
		valid_files_intermediate.append(file)

	# --- File Existence and Git Tracking Checks ---
	valid_files = []  # Reset valid_files to populate after existence checks

	# Skip file existence checks in test environments
	if is_test_environment:
		logger.debug("In test environment - skipping file existence checks for %d files", len(valid_files_intermediate))
		# In test env, assume all intermediate files are valid regarding existence/tracking
		valid_files = valid_files_intermediate
	else:
		# Get deleted files
		deleted_unstaged_files, deleted_staged_files = get_deleted_tracked_files()

		# Check if files exist in the repository (tracked by git) or filesystem
		original_count = len(valid_files_intermediate)
		try:
			context = ExtendedGitRepoContext.get_instance()
			tracked_files = set(context.tracked_files.keys())

			# Keep files that either:
			# 1. Exist in filesystem
			# 2. Are tracked by git
			# 3. Are known deleted files from git status
			# 4. Are already staged deletions
			filtered_files = []
			for file in valid_files_intermediate:
				try:
					path_exists = Path(get_absolute_path(file, repo_root)).exists()
				except OSError as e:
					logger.warning("OS error checking existence for %s: %s. Skipping file.", file, e)
					continue
				except Exception:
					logger.exception("Unexpected error checking existence for %s. Skipping file.", file)
					continue

				if (
					path_exists
					or file in tracked_files
					or file in deleted_unstaged_files
					or file in deleted_staged_files
				):
					filtered_files.append(file)
				else:
					logger.warning("Skipping non-existent/untracked/not-deleted file in diff: %s", file)

			valid_files = filtered_files
			if len(valid_files) < original_count:
				logger.warning(
					"Filtered out %d files that don't exist or aren't tracked/deleted",
					original_count - len(valid_files),
				)
		except GitError as e:  # Catch GitError from context operations
			logger.warning("Failed to get tracked files from git context: %s. Filtering based on existence only.", e)
			# If we can't check git tracked files, filter by filesystem existence and git status
			filtered_files_fallback = []
			for file in valid_files_intermediate:
				try:
					path_exists = Path(file).exists()
				except OSError as e:
					logger.warning("OS error checking existence for %s: %s. Skipping file.", file, e)
					continue
				except Exception:
					logger.exception("Unexpected error checking existence for %s. Skipping file.", file)
					continue

				if path_exists or file in deleted_unstaged_files or file in deleted_staged_files:
					filtered_files_fallback.append(file)
				else:
					logger.warning("Skipping non-existent/not-deleted file in diff (git check failed): %s", file)

			valid_files = filtered_files_fallback  # Replace valid_files with the fallback list
			if len(valid_files) < original_count:
				# Adjust log message if git check failed
				logger.warning(
					"Filtered out %d files that don't exist (git check failed)",
					original_count - len(valid_files),
				)
		except Exception:  # Catch any other unexpected errors during the initial try block
			logger.exception("Unexpected error during file filtering. Proceeding with potentially incorrect list.")
			# If a catastrophic error occurs, proceed with the intermediate list
			valid_files = valid_files_intermediate

	# Return only the list of valid files. The concept of 'filtered_large_files' is removed.
	# Size checking will now happen within the splitting strategy.
	return valid_files, []  # Return empty list for the second element now.

is_test_environment

is_test_environment() -> bool

Check if the code is running in a test environment.

Returns:

Type Description
bool

True if in a test environment, False otherwise

Source code in src/codemap/git/diff_splitter/utils.py
375
376
377
378
379
380
381
382
383
384
def is_test_environment() -> bool:
	"""
	Check if the code is running in a test environment.

	Returns:
	    True if in a test environment, False otherwise

	"""
	# Check multiple environment indicators for tests
	return "PYTEST_CURRENT_TEST" in os.environ or "pytest" in sys.modules or os.environ.get("TESTING") == "1"

calculate_semantic_similarity

calculate_semantic_similarity(
	emb1: list[float], emb2: list[float]
) -> float

Calculate semantic similarity (cosine similarity) between two embedding vectors.

Parameters:

Name Type Description Default
emb1 list[float]

First embedding vector

required
emb2 list[float]

Second embedding vector

required

Returns:

Type Description
float

Similarity score between 0 and 1

Source code in src/codemap/git/diff_splitter/utils.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
def calculate_semantic_similarity(emb1: list[float], emb2: list[float]) -> float:
	"""
	Calculate semantic similarity (cosine similarity) between two embedding vectors.

	Args:
	    emb1: First embedding vector
	    emb2: Second embedding vector

	Returns:
	    Similarity score between 0 and 1

	"""
	if not emb1 or not emb2:
		return 0.0

	try:
		# Convert to numpy arrays
		vec1 = np.array(emb1, dtype=np.float64)
		vec2 = np.array(emb2, dtype=np.float64)

		# Calculate cosine similarity
		dot_product = np.dot(vec1, vec2)
		norm1 = np.linalg.norm(vec1)
		norm2 = np.linalg.norm(vec2)

		if norm1 <= EPSILON or norm2 <= EPSILON:
			return 0.0

		similarity = float(dot_product / (norm1 * norm2))

		# Handle potential numeric issues
		if not np.isfinite(similarity):
			return 0.0

		return max(0.0, min(1.0, similarity))  # Clamp to [0, 1]

	except (ValueError, TypeError, ArithmeticError, OverflowError):
		logger.warning("Failed to calculate similarity")
		return 0.0

match_test_file_patterns

match_test_file_patterns(file1: str, file2: str) -> bool

Check if files match common test file patterns.

Source code in src/codemap/git/diff_splitter/utils.py
428
429
430
431
432
433
434
435
436
437
438
439
def match_test_file_patterns(file1: str, file2: str) -> bool:
	"""Check if files match common test file patterns."""
	# test_X.py and X.py patterns
	if file1.startswith("test_") and file1[5:] == file2:
		return True
	if file2.startswith("test_") and file2[5:] == file1:
		return True

	# X_test.py and X.py patterns
	if file1.endswith("_test.py") and file1[:-8] + ".py" == file2:
		return True
	return bool(file2.endswith("_test.py") and file2[:-8] + ".py" == file1)

have_similar_names

have_similar_names(file1: str, file2: str) -> bool

Check if files have similar base names.

Source code in src/codemap/git/diff_splitter/utils.py
442
443
444
445
446
447
def have_similar_names(file1: str, file2: str) -> bool:
	"""Check if files have similar base names."""
	base1 = file1.rsplit(".", 1)[0] if "." in file1 else file1
	base2 = file2.rsplit(".", 1)[0] if "." in file2 else file2

	return (base1 in base2 or base2 in base1) and min(len(base1), len(base2)) >= MIN_NAME_LENGTH_FOR_SIMILARITY
has_related_file_pattern(
	file1: str,
	file2: str,
	related_file_patterns: Iterable[
		tuple[Pattern, Pattern]
	],
) -> bool

Check if files match known related patterns.

Parameters:

Name Type Description Default
file1 str

First file path

required
file2 str

Second file path

required
related_file_patterns Iterable[tuple[Pattern, Pattern]]

Compiled regex pattern pairs to check against

required

Returns:

Type Description
bool

True if the files match a known pattern, False otherwise

Source code in src/codemap/git/diff_splitter/utils.py
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
def has_related_file_pattern(file1: str, file2: str, related_file_patterns: Iterable[tuple[Pattern, Pattern]]) -> bool:
	"""
	Check if files match known related patterns.

	Args:
	    file1: First file path
	    file2: Second file path
	    related_file_patterns: Compiled regex pattern pairs to check against

	Returns:
	    True if the files match a known pattern, False otherwise

	"""
	for pattern1, pattern2 in related_file_patterns:
		if (pattern1.match(file1) and pattern2.match(file2)) or (pattern2.match(file1) and pattern1.match(file2)):
			return True
	return False
are_files_related(
	file1: str,
	file2: str,
	related_file_patterns: Iterable[
		tuple[Pattern, Pattern]
	],
) -> bool

Determine if two files are semantically related based on various criteria.

Parameters:

Name Type Description Default
file1 str

First file path

required
file2 str

Second file path

required
related_file_patterns Iterable[tuple[Pattern, Pattern]]

Compiled regex pattern pairs for pattern matching

required

Returns:

Type Description
bool

True if the files are related, False otherwise

Source code in src/codemap/git/diff_splitter/utils.py
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
def are_files_related(file1: str, file2: str, related_file_patterns: Iterable[tuple[Pattern, Pattern]]) -> bool:
	"""
	Determine if two files are semantically related based on various criteria.

	Args:
	    file1: First file path
	    file2: Second file path
	    related_file_patterns: Compiled regex pattern pairs for pattern matching

	Returns:
	    True if the files are related, False otherwise

	"""
	# 1. Files in the same directory
	dir1 = file1.rsplit("/", 1)[0] if "/" in file1 else ""
	dir2 = file2.rsplit("/", 1)[0] if "/" in file2 else ""
	if dir1 and dir1 == dir2:
		return True

	# 2. Files in closely related directories (parent/child or same root directory)
	if dir1 and dir2:
		if dir1.startswith(dir2 + "/") or dir2.startswith(dir1 + "/"):
			return True
		# Check if they share the same top-level directory
		top_dir1 = dir1.split("/", 1)[0] if "/" in dir1 else dir1
		top_dir2 = dir2.split("/", 1)[0] if "/" in dir2 else dir2
		if top_dir1 and top_dir1 == top_dir2:
			return True

	# 3. Test files and implementation files (simple check)
	if (file1.startswith("tests/") and file2 in file1) or (file2.startswith("tests/") and file1 in file2):
		return True

	# 4. Test file patterns
	file1_name = file1.rsplit("/", 1)[-1] if "/" in file1 else file1
	file2_name = file2.rsplit("/", 1)[-1] if "/" in file2 else file2
	if match_test_file_patterns(file1_name, file2_name):
		return True

	# 5. Files with similar names
	if have_similar_names(file1_name, file2_name):
		return True

	# 6. Check for related file patterns
	return has_related_file_pattern(file1, file2, related_file_patterns)