Resolves file integrity constraints for semantic groups.
File integrity refers to the requirement that all changes to a specific file should
be included in the same commit, even if they are semantically different. This prevents
fragmented changes to the same file across multiple commits, which can lead to broken builds
or inconsistent states.
The resolver works by:
1. Identifying files that appear in multiple semantic groups
2. Calculating the semantic similarity between these overlapping groups
3. Either merging similar groups or reassigning chunks from less relevant groups
to the most appropriate group
This process ensures that each file is modified in exactly one commit, while still
maintaining semantic coherence within commits when possible.
Source code in src/codemap/git/semantic_grouping/resolver.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219 | class FileIntegrityResolver:
"""
Resolves file integrity constraints for semantic groups.
File integrity refers to the requirement that all changes to a specific file should
be included in the same commit, even if they are semantically different. This prevents
fragmented changes to the same file across multiple commits, which can lead to broken builds
or inconsistent states.
The resolver works by:
1. Identifying files that appear in multiple semantic groups
2. Calculating the semantic similarity between these overlapping groups
3. Either merging similar groups or reassigning chunks from less relevant groups
to the most appropriate group
This process ensures that each file is modified in exactly one commit, while still
maintaining semantic coherence within commits when possible.
"""
def __init__(
self,
similarity_threshold: float = 0.6,
config_loader: "ConfigLoader | None" = None,
) -> None:
"""
Initialize the resolver.
Args:
similarity_threshold: Threshold for group similarity to trigger merging (0.0-1.0).
config_loader: Optional ConfigLoader instance.
"""
if config_loader:
self.config_loader = config_loader
else:
from codemap.config import ConfigLoader
self.config_loader = ConfigLoader()
self.similarity_threshold = similarity_threshold
# Import here to avoid making sklearn a hard dependency
try:
from sklearn.metrics.pairwise import cosine_similarity
self.cosine_similarity = cosine_similarity
except ImportError as e:
logger.exception("Failed to import scikit-learn. Please install it with: uv add scikit-learn")
msg = "scikit-learn is required for file integrity resolution"
raise ImportError(msg) from e
def calculate_group_similarity(
self, group1: "SemanticGroup", group2: "SemanticGroup", chunk_embeddings: dict[DiffChunk, np.ndarray]
) -> float:
"""
Calculate similarity between two groups based on their chunks' embeddings.
This method computes the average pairwise cosine similarity between all combinations
of chunks from the two groups. The similarity is based on the semantic embeddings
of the chunks' content.
Process:
1. Extract embeddings for all chunks in both groups
2. Compute pairwise cosine similarities between each pair of chunks
3. Return the average similarity score
Args:
group1: First semantic group to compare
group2: Second semantic group to compare
chunk_embeddings: Dict mapping chunks to their embeddings
Returns:
float: Similarity score between 0 and 1, where:
- 0 indicates completely unrelated changes
- 1 indicates identical or extremely similar changes
- Values around 0.6-0.8 typically indicate related functionality
"""
# Get embeddings for chunks in each group
embeddings1 = [chunk_embeddings[chunk] for chunk in group1.chunks if chunk in chunk_embeddings]
embeddings2 = [chunk_embeddings[chunk] for chunk in group2.chunks if chunk in chunk_embeddings]
if not embeddings1 or not embeddings2:
return 0.0
# Calculate pairwise similarities
similarities = []
for emb1 in embeddings1:
for emb2 in embeddings2:
sim = self.cosine_similarity([emb1], [emb2])[0][0]
similarities.append(sim)
# Return average similarity
return sum(similarities) / len(similarities) if similarities else 0.0
def resolve_violations(
self, groups: list["SemanticGroup"], chunk_embeddings: dict[DiffChunk, np.ndarray]
) -> list["SemanticGroup"]:
"""
Resolve file integrity violations by merging or reassigning chunks.
A violation occurs when the same file appears in multiple semantic groups.
This needs to be resolved because a file should be modified in only one commit.
Args:
groups: List of SemanticGroup objects to resolve
chunk_embeddings: Dict mapping chunks to their embeddings
Returns:
List of SemanticGroup objects with all violations resolved
"""
# Keep iterating until no violations remain
while True:
# Build file to groups mapping
file_to_groups: dict[str, list[int]] = {}
for i, group in enumerate(groups):
for file in group.files:
if file not in file_to_groups:
file_to_groups[file] = []
file_to_groups[file].append(i)
# Find violations (files in multiple groups)
violations = {file: indices for file, indices in file_to_groups.items() if len(indices) > 1}
if not violations:
break # No violations, we're done
# Process the first violation
file = next(iter(violations))
group_indices = violations[file]
# Try to find groups to merge based on similarity
max_similarity = 0
groups_to_merge = None
# Calculate similarities between all pairs of groups containing this file
for i in range(len(group_indices)):
for j in range(i + 1, len(group_indices)):
idx1, idx2 = group_indices[i], group_indices[j]
similarity = self.calculate_group_similarity(groups[idx1], groups[idx2], chunk_embeddings)
if similarity > max_similarity:
max_similarity = similarity
groups_to_merge = (idx1, idx2)
# Decide whether to merge or reassign based on similarity threshold
if max_similarity >= self.similarity_threshold and groups_to_merge:
# STRATEGY 1: Merge groups if they're similar enough
idx1, idx2 = groups_to_merge
merged_group = groups[idx1].merge_with(groups[idx2])
# Replace the first group with the merged one and remove the second
groups[idx1] = merged_group
groups.pop(idx2)
else:
# STRATEGY 2: Reassign chunks to the primary group for this file
# Find the primary group (group with most chunks containing this file)
file_chunks_count = []
for idx in group_indices:
count = sum(1 for chunk in groups[idx].chunks if file in chunk.files)
file_chunks_count.append((idx, count))
# Sort by count descending
file_chunks_count.sort(key=lambda x: x[1], reverse=True)
primary_idx = file_chunks_count[0][0]
# Move chunks containing this file to the primary group
for idx in group_indices:
if idx != primary_idx:
# Find chunks containing this file
chunks_to_move = [chunk for chunk in groups[idx].chunks if file in chunk.files]
# Move chunks to primary group
groups[primary_idx].chunks.extend(chunks_to_move)
# Remove moved chunks from original group
groups[idx].chunks = [chunk for chunk in groups[idx].chunks if file not in chunk.files]
# Remove empty groups
groups = [group for group in groups if group.chunks]
return groups
|
__init__
__init__(
similarity_threshold: float = 0.6,
config_loader: ConfigLoader | None = None,
) -> None
Initialize the resolver.
Parameters:
Name |
Type |
Description |
Default |
similarity_threshold
|
float
|
Threshold for group similarity to trigger merging (0.0-1.0).
|
0.6
|
config_loader
|
ConfigLoader | None
|
Optional ConfigLoader instance.
|
None
|
Source code in src/codemap/git/semantic_grouping/resolver.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 | def __init__(
self,
similarity_threshold: float = 0.6,
config_loader: "ConfigLoader | None" = None,
) -> None:
"""
Initialize the resolver.
Args:
similarity_threshold: Threshold for group similarity to trigger merging (0.0-1.0).
config_loader: Optional ConfigLoader instance.
"""
if config_loader:
self.config_loader = config_loader
else:
from codemap.config import ConfigLoader
self.config_loader = ConfigLoader()
self.similarity_threshold = similarity_threshold
# Import here to avoid making sklearn a hard dependency
try:
from sklearn.metrics.pairwise import cosine_similarity
self.cosine_similarity = cosine_similarity
except ImportError as e:
logger.exception("Failed to import scikit-learn. Please install it with: uv add scikit-learn")
msg = "scikit-learn is required for file integrity resolution"
raise ImportError(msg) from e
|
config_loader
instance-attribute
config_loader = config_loader
similarity_threshold
instance-attribute
similarity_threshold = similarity_threshold
cosine_similarity
instance-attribute
cosine_similarity = cosine_similarity
calculate_group_similarity
calculate_group_similarity(
group1: SemanticGroup,
group2: SemanticGroup,
chunk_embeddings: dict[DiffChunk, ndarray],
) -> float
Calculate similarity between two groups based on their chunks' embeddings.
This method computes the average pairwise cosine similarity between all combinations
of chunks from the two groups. The similarity is based on the semantic embeddings
of the chunks' content.
Process:
1. Extract embeddings for all chunks in both groups
2. Compute pairwise cosine similarities between each pair of chunks
3. Return the average similarity score
Parameters:
Name |
Type |
Description |
Default |
group1
|
SemanticGroup
|
First semantic group to compare
|
required
|
group2
|
SemanticGroup
|
Second semantic group to compare
|
required
|
chunk_embeddings
|
dict[DiffChunk, ndarray]
|
Dict mapping chunks to their embeddings
|
required
|
Returns:
Name | Type |
Description |
float |
float
|
Similarity score between 0 and 1, where:
- 0 indicates completely unrelated changes
- 1 indicates identical or extremely similar changes
- Values around 0.6-0.8 typically indicate related functionality
|
Source code in src/codemap/git/semantic_grouping/resolver.py
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130 | def calculate_group_similarity(
self, group1: "SemanticGroup", group2: "SemanticGroup", chunk_embeddings: dict[DiffChunk, np.ndarray]
) -> float:
"""
Calculate similarity between two groups based on their chunks' embeddings.
This method computes the average pairwise cosine similarity between all combinations
of chunks from the two groups. The similarity is based on the semantic embeddings
of the chunks' content.
Process:
1. Extract embeddings for all chunks in both groups
2. Compute pairwise cosine similarities between each pair of chunks
3. Return the average similarity score
Args:
group1: First semantic group to compare
group2: Second semantic group to compare
chunk_embeddings: Dict mapping chunks to their embeddings
Returns:
float: Similarity score between 0 and 1, where:
- 0 indicates completely unrelated changes
- 1 indicates identical or extremely similar changes
- Values around 0.6-0.8 typically indicate related functionality
"""
# Get embeddings for chunks in each group
embeddings1 = [chunk_embeddings[chunk] for chunk in group1.chunks if chunk in chunk_embeddings]
embeddings2 = [chunk_embeddings[chunk] for chunk in group2.chunks if chunk in chunk_embeddings]
if not embeddings1 or not embeddings2:
return 0.0
# Calculate pairwise similarities
similarities = []
for emb1 in embeddings1:
for emb2 in embeddings2:
sim = self.cosine_similarity([emb1], [emb2])[0][0]
similarities.append(sim)
# Return average similarity
return sum(similarities) / len(similarities) if similarities else 0.0
|
resolve_violations
resolve_violations(
groups: list[SemanticGroup],
chunk_embeddings: dict[DiffChunk, ndarray],
) -> list[SemanticGroup]
Resolve file integrity violations by merging or reassigning chunks.
A violation occurs when the same file appears in multiple semantic groups.
This needs to be resolved because a file should be modified in only one commit.
Parameters:
Name |
Type |
Description |
Default |
groups
|
list[SemanticGroup]
|
List of SemanticGroup objects to resolve
|
required
|
chunk_embeddings
|
dict[DiffChunk, ndarray]
|
Dict mapping chunks to their embeddings
|
required
|
Returns:
Type |
Description |
list[SemanticGroup]
|
List of SemanticGroup objects with all violations resolved
|
Source code in src/codemap/git/semantic_grouping/resolver.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219 | def resolve_violations(
self, groups: list["SemanticGroup"], chunk_embeddings: dict[DiffChunk, np.ndarray]
) -> list["SemanticGroup"]:
"""
Resolve file integrity violations by merging or reassigning chunks.
A violation occurs when the same file appears in multiple semantic groups.
This needs to be resolved because a file should be modified in only one commit.
Args:
groups: List of SemanticGroup objects to resolve
chunk_embeddings: Dict mapping chunks to their embeddings
Returns:
List of SemanticGroup objects with all violations resolved
"""
# Keep iterating until no violations remain
while True:
# Build file to groups mapping
file_to_groups: dict[str, list[int]] = {}
for i, group in enumerate(groups):
for file in group.files:
if file not in file_to_groups:
file_to_groups[file] = []
file_to_groups[file].append(i)
# Find violations (files in multiple groups)
violations = {file: indices for file, indices in file_to_groups.items() if len(indices) > 1}
if not violations:
break # No violations, we're done
# Process the first violation
file = next(iter(violations))
group_indices = violations[file]
# Try to find groups to merge based on similarity
max_similarity = 0
groups_to_merge = None
# Calculate similarities between all pairs of groups containing this file
for i in range(len(group_indices)):
for j in range(i + 1, len(group_indices)):
idx1, idx2 = group_indices[i], group_indices[j]
similarity = self.calculate_group_similarity(groups[idx1], groups[idx2], chunk_embeddings)
if similarity > max_similarity:
max_similarity = similarity
groups_to_merge = (idx1, idx2)
# Decide whether to merge or reassign based on similarity threshold
if max_similarity >= self.similarity_threshold and groups_to_merge:
# STRATEGY 1: Merge groups if they're similar enough
idx1, idx2 = groups_to_merge
merged_group = groups[idx1].merge_with(groups[idx2])
# Replace the first group with the merged one and remove the second
groups[idx1] = merged_group
groups.pop(idx2)
else:
# STRATEGY 2: Reassign chunks to the primary group for this file
# Find the primary group (group with most chunks containing this file)
file_chunks_count = []
for idx in group_indices:
count = sum(1 for chunk in groups[idx].chunks if file in chunk.files)
file_chunks_count.append((idx, count))
# Sort by count descending
file_chunks_count.sort(key=lambda x: x[1], reverse=True)
primary_idx = file_chunks_count[0][0]
# Move chunks containing this file to the primary group
for idx in group_indices:
if idx != primary_idx:
# Find chunks containing this file
chunks_to_move = [chunk for chunk in groups[idx].chunks if file in chunk.files]
# Move chunks to primary group
groups[primary_idx].chunks.extend(chunks_to_move)
# Remove moved chunks from original group
groups[idx].chunks = [chunk for chunk in groups[idx].chunks if file not in chunk.files]
# Remove empty groups
groups = [group for group in groups if group.chunks]
return groups
|