Skip to content

Pr Git Utils

Utility functions for PR generation.

logger module-attribute

logger = getLogger(__name__)

PRGitUtils

Bases: ExtendedGitRepoContext

Provides Git operations for PR generation using pygit2.

Source code in src/codemap/git/pr_generator/pr_git_utils.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
class PRGitUtils(ExtendedGitRepoContext):
	"""Provides Git operations for PR generation using pygit2."""

	_pr_git_utils_instance: PRGitUtils | None = None

	@classmethod
	def get_instance(cls) -> PRGitUtils:
		"""Get an instance of the PRGitUtils class."""
		if cls._pr_git_utils_instance is None:
			cls._pr_git_utils_instance = cls()
		return cls._pr_git_utils_instance

	def __init__(self) -> None:
		"""Initialize the PRGitUtils with the given repository path."""
		super().__init__()

	def create_branch(self, branch_name: str, from_reference: str | None = None) -> None:
		"""
		Create a new branch and switch to it using pygit2.

		Args:
		    branch_name: Name of the branch to create.
		    from_reference: Optional reference (branch name, commit SHA) to create the branch from.
		                    Defaults to current HEAD.

		Raises:
		    GitError: If branch creation or checkout fails.
		"""
		try:
			if from_reference:
				commit_obj = self.repo.revparse_single(from_reference)
				if not commit_obj:
					msg = f"Could not resolve 'from_reference': {from_reference}"
					logger.error(msg)
					raise GitError(msg)
				source_commit = commit_obj.peel(Commit)
			else:
				if self.repo.head_is_unborn:
					msg = "Cannot create branch from unborn HEAD. Please make an initial commit."
					logger.error(msg)
					raise GitError(msg)
				source_commit = self.repo.head.peel(Commit)

			self.repo.create_branch(branch_name, source_commit)
			logger.info(f"Branch '{branch_name}' created from '{source_commit.id}'.")
			self.checkout_branch(branch_name)  # Checkout after creation
		except GitError as e:
			msg = f"Failed to create branch '{branch_name}' using pygit2: {e}"
			logger.exception(msg)
			raise GitError(msg) from e
		except Exception as e:
			msg = f"An unexpected error occurred while creating branch '{branch_name}': {e}"
			logger.exception(msg)
			raise GitError(msg) from e

	def checkout_branch(self, branch_name: str) -> None:
		"""
		Checkout an existing branch using pygit2.

		Args:
		    branch_name: Name of the branch to checkout.

		Raises:
		    GitError: If checkout fails.
		"""
		try:
			# Construct the full ref name
			ref_name = f"refs/heads/{branch_name}"
			branch_obj = self.repo.lookup_reference(ref_name)
			self.repo.checkout(branch_obj)
			# Update self.branch after checkout, consistent with GitRepoContext constructor
			current_branch_obj = self.repo
			if not current_branch_obj.head_is_detached:
				self.branch = current_branch_obj.head.shorthand
			else:
				self.branch = ""  # Or perhaps the SHA for detached head
			logger.info(f"Checked out branch '{branch_name}' using pygit2.")
			# Run post-checkout hook if present
			if hook_exists("post-checkout"):
				exit_code = run_hook("post-checkout")
				if exit_code != 0:
					logger.warning("post-checkout hook failed (branch already checked out)")
		except GitError as e:
			msg = f"Failed to checkout branch '{branch_name}' using pygit2: {e}"
			logger.exception(msg)
			raise GitError(msg) from e
		except Exception as e:
			msg = f"An unexpected error occurred while checking out branch '{branch_name}': {e}"
			logger.exception(msg)
			raise GitError(msg) from e

	def push_branch(
		self, branch_name: str, force: bool = False, remote_name: str = "origin", ignore_hooks: bool = False
	) -> None:
		"""
		Push a branch to the remote using pygit2.

		Args:
		    branch_name: Name of the branch to push.
		    force: Whether to force push.
		    remote_name: Name of the remote (e.g., "origin").
		    ignore_hooks: If True, skip running the pre-push hook.

		Raises:
		    GitError: If push fails or pre-push hook fails.
		"""
		# Run pre-push hook if not ignored
		if not ignore_hooks:
			exit_code = run_hook("pre-push")
			if exit_code != 0:
				msg = "pre-push hook failed, aborting push."
				logger.error(msg)
				raise GitError(msg)
		try:
			remote = self.repo.remotes[remote_name]
			local_ref = f"refs/heads/{branch_name}"
			remote_ref = f"refs/heads/{branch_name}"

			refspec = f"{'+' if force else ''}{local_ref}:{remote_ref}"

			logger.info(
				f"Attempting to push branch '{branch_name}' to remote "
				f"'{remote_name}' with refspec '{refspec}' using pygit2."
			)

			# Import the proper pygit2 credential classes
			import shlex
			import subprocess
			from pathlib import Path
			from urllib.parse import urlparse

			from pygit2.callbacks import RemoteCallbacks
			from pygit2.enums import CredentialType

			# Create a credential class to handle SSH and username/password authentication
			class GitCredential:
				def __init__(self, cred_type: CredentialType, *args: str | None) -> None:
					self.credential_type = cred_type
					self.credential_tuple = args

			def credential_callback(
				url: str, username_from_url: str | None, allowed_types: CredentialType
			) -> GitCredential:
				"""
				Callback to handle credential requests from pygit2.

				Args:
					url: The URL being authenticated against
					username_from_url: Username extracted from the URL if present
					allowed_types: Bitmask of allowed credential types

				Returns:
					A credential object for authentication
				"""
				logger.debug(f"Authentication required for {url} (allowed types: {allowed_types})")

				# Get username from URL or use default from git config
				username = username_from_url
				if not username:
					try:
						config = self.repo.config
						username = config["user.name"]
					except (KeyError, AttributeError) as e:
						# Default if we can't get from config
						logger.debug(f"Could not get username from git config: {e}")
						username = "git"

				# Try SSH agent authentication first (if available)
				if CredentialType.SSH_KEY in allowed_types:
					logger.debug(f"Attempting SSH agent authentication for {username}")
					return GitCredential(CredentialType.SSH_KEY, username, None, None, "")

				# Try SSH key authentication if agent is not available
				if CredentialType.SSH_KEY in allowed_types:
					try:
						# Common SSH key paths
						ssh_dir = Path.home() / ".ssh"
						key_paths = [
							ssh_dir / "id_rsa",
							ssh_dir / "id_ed25519",
							ssh_dir / "id_ecdsa",
							ssh_dir / "id_dsa",
						]

						for private_key_path in key_paths:
							public_key_path = Path(f"{private_key_path}.pub")
							if private_key_path.exists() and public_key_path.exists():
								logger.debug(f"Attempting SSH key authentication with {private_key_path}")
								return GitCredential(
									CredentialType.SSH_KEY, username, str(public_key_path), str(private_key_path), ""
								)
					except OSError as e:
						logger.debug(f"SSH key authentication failed: {e}")

				# Try username/password if SSH is not available or didn't work
				if CredentialType.USERPASS_PLAINTEXT in allowed_types:
					try:
						# Extract hostname from URL
						parsed_url = urlparse(url)
						hostname = parsed_url.netloc

						# Use git credential fill to get credentials - this command is safe as it's hardcoded
						cmd = "git credential fill"
						# Use shlex.split for secure command execution
						process = subprocess.Popen(  # noqa: S603
							shlex.split(cmd),
							stdin=subprocess.PIPE,
							stdout=subprocess.PIPE,
							stderr=subprocess.PIPE,
							text=True,
						)

						# Provide input for git credential fill
						input_data = f"protocol={parsed_url.scheme}\nhost={hostname}\n\n"
						stdout, _ = process.communicate(input=input_data)

						if process.returncode == 0 and stdout:
							# Parse the output
							credentials = {}
							for line in stdout.splitlines():
								if "=" in line:
									key, value = line.split("=", 1)
									credentials[key] = value

							if "username" in credentials and "password" in credentials:
								logger.debug(f"Using username/password authentication for {credentials['username']}")
								return GitCredential(
									CredentialType.USERPASS_PLAINTEXT, credentials["username"], credentials["password"]
								)
					except (subprocess.SubprocessError, OSError) as e:
						logger.debug(f"Username/password authentication failed: {e}")

				# If nothing else works, try username-only authentication
				if CredentialType.USERNAME in allowed_types:
					logger.debug(f"Falling back to username-only authentication for {username}")
					return GitCredential(CredentialType.USERNAME, username)

				# If we get here, we couldn't find suitable credentials
				logger.warning(f"No suitable authentication method found for {url}")
				msg = "No suitable authentication method available"
				raise Pygit2GitError(msg)

			# Create callback object with our credential callback
			callbacks = RemoteCallbacks(credentials=credential_callback)

			# Pass callbacks to the push method
			remote.push([refspec], callbacks=callbacks)
			logger.info(f"Branch '{branch_name}' pushed to remote '{remote_name}' using pygit2.")
		except (Pygit2GitError, KeyError) as e:  # KeyError for remote_name not found
			msg = f"Failed to push branch '{branch_name}' to remote '{remote_name}' using pygit2: {e}"
			logger.exception(msg)
			raise GitError(msg) from e
		except GitError as e:  # Catch codemap's GitError if it somehow occurred before
			msg = f"Git operation error while pushing branch '{branch_name}': {e}"
			logger.exception(msg)
			raise GitError(msg) from e
		except Exception as e:
			msg = f"An unexpected error occurred while pushing branch '{branch_name}': {e}"
			logger.exception(msg)
			raise GitError(msg) from e

	def get_commit_messages(self, base_branch: str, head_branch: str) -> list[str]:
		"""
		Get commit messages (summaries) between two branches using pygit2.

		This lists commits that are in head_branch but not in base_branch.

		Args:
		    base_branch: Base branch name/ref (e.g., "main").
		    head_branch: Head branch name/ref (e.g., "feature-branch").

		Returns:
		    List of commit message summaries.

		Raises:
		    GitError: If retrieving commits fails.
		"""
		try:
			if not base_branch or not head_branch:
				logger.warning("Base or head branch is None/empty, cannot get commit messages.")
				return []

			def _resolve_to_commit_oid(branch_spec: str) -> Oid:
				obj = self.repo.revparse_single(branch_spec)
				if not obj:
					msg = f"Could not resolve '{branch_spec}'"
					logger.error(msg)
					raise GitError(msg)
				# Ensure it's a commit (could be a tag pointing to another tag, etc.)
				commit_obj = obj.peel(Commit)
				return commit_obj.id

			base_oid = _resolve_to_commit_oid(base_branch)
			head_oid = _resolve_to_commit_oid(head_branch)

			walker = self.repo.walk(head_oid, SortMode.TOPOLOGICAL)
			walker.hide(base_oid)

			commit_messages = []
			for commit_pygit2 in walker:
				# commit_pygit2.message is the full message. Get summary (first line).
				message_summary = commit_pygit2.message.splitlines()[0].strip() if commit_pygit2.message else ""
				commit_messages.append(message_summary)

			logger.info(f"Found {len(commit_messages)} commit messages between '{base_branch}' and '{head_branch}'.")
			return commit_messages

		except (Pygit2GitError, GitError) as e:
			msg = f"Failed to get commit messages between '{base_branch}' and '{head_branch}' using pygit2: {e}"
			logger.exception(msg)
			raise GitError(msg) from e
		except Exception as e:
			msg = (
				f"An unexpected error occurred while getting commit messages "
				f"between '{base_branch}' and '{head_branch}': {e}"
			)
			logger.exception(msg)
			raise GitError(msg) from e

	def get_branch_relation(self, branch_ref_name: str, target_branch_ref_name: str) -> tuple[bool, int]:
		"""
		Get the relationship between two branches using pygit2.

		Args:
			branch_ref_name: The branch to check (e.g., "main", "origin/main").
			target_branch_ref_name: The target branch to compare against (e.g., "feature/foo").

		Returns:
			Tuple of (is_ancestor, commit_count)
			- is_ancestor: True if branch_ref_name is an ancestor of target_branch_ref_name.
			- commit_count: Number of commits in target_branch_ref_name that are not in branch_ref_name.
						(i.e., how many commits target is "ahead" of branch).

		Raises:
			GitError: If branches cannot be resolved or other git issues occur.
		"""
		try:
			if not branch_ref_name or not target_branch_ref_name:
				logger.warning("Branch or target branch name is None/empty for relation check.")
				return False, 0

			# Resolve branch names to Oids. revparse_single can handle local and remote-like refs.
			branch_commit_obj = self.repo.revparse_single(branch_ref_name)
			if not branch_commit_obj:
				msg = f"Could not resolve branch: {branch_ref_name}"
				logger.error(msg)
				raise GitError(msg)
			branch_oid = branch_commit_obj.peel(Commit).id

			target_commit_obj = self.repo.revparse_single(target_branch_ref_name)
			if not target_commit_obj:
				msg = f"Could not resolve target branch: {target_branch_ref_name}"
				logger.error(msg)
				raise GitError(msg)
			target_oid = target_commit_obj.peel(Commit).id

			# Check if branch_oid is an ancestor of target_oid
			# pygit2's descendant_of(A, B) means "is A a descendant of B?"
			# So, is_ancestor (branch is ancestor of target) means target is descendant of branch.
			is_ancestor = self.repo.descendant_of(target_oid, branch_oid)

			# Get commit count: commits in target_oid that are not in branch_oid.
			# ahead_behind(A, B) returns (commits in A not in B, commits in B not in A)
			# We want commits in target_oid not in branch_oid.
			# So, if A=target_oid, B=branch_oid, we want the first value (ahead).
			ahead, _ = self.repo.ahead_behind(target_oid, branch_oid)
			commit_count_target_ahead = ahead  # Renaming for clarity

			logger.debug(
				f"Branch relation: {branch_ref_name} vs {target_branch_ref_name}. "
				f"Is ancestor: {is_ancestor}, Target ahead by: {commit_count_target_ahead}"
			)
			return is_ancestor, commit_count_target_ahead

		except Pygit2GitError as e:
			msg = (
				f"Pygit2 error determining branch relation between "
				f"'{branch_ref_name}' and '{target_branch_ref_name}': {e}"
			)
			logger.warning(msg)
			raise GitError(msg) from e  # Wrap in codemap's GitError
		except GitError as e:  # Catch codemap's GitError if raised by _resolve_to_commit_oid or similar
			msg = (
				f"Codemap GitError determining branch relation between '{branch_ref_name}' and "
				f"'{target_branch_ref_name}': {e}"
			)
			logger.warning(msg)
			raise  # Re-raise as it's already the correct type
		except Exception as e:  # Catch any other unexpected non-Git errors
			msg = (
				f"Unexpected error determining branch relation between '{branch_ref_name}' and "
				f"'{target_branch_ref_name}': {e}"
			)
			logger.warning(msg)
			raise GitError(msg) from e  # Wrap in codemap's GitError

get_instance classmethod

get_instance() -> PRGitUtils

Get an instance of the PRGitUtils class.

Source code in src/codemap/git/pr_generator/pr_git_utils.py
27
28
29
30
31
32
@classmethod
def get_instance(cls) -> PRGitUtils:
	"""Get an instance of the PRGitUtils class."""
	if cls._pr_git_utils_instance is None:
		cls._pr_git_utils_instance = cls()
	return cls._pr_git_utils_instance

__init__

__init__() -> None

Initialize the PRGitUtils with the given repository path.

Source code in src/codemap/git/pr_generator/pr_git_utils.py
34
35
36
def __init__(self) -> None:
	"""Initialize the PRGitUtils with the given repository path."""
	super().__init__()

create_branch

create_branch(
	branch_name: str, from_reference: str | None = None
) -> None

Create a new branch and switch to it using pygit2.

Parameters:

Name Type Description Default
branch_name str

Name of the branch to create.

required
from_reference str | None

Optional reference (branch name, commit SHA) to create the branch from. Defaults to current HEAD.

None

Raises:

Type Description
GitError

If branch creation or checkout fails.

Source code in src/codemap/git/pr_generator/pr_git_utils.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def create_branch(self, branch_name: str, from_reference: str | None = None) -> None:
	"""
	Create a new branch and switch to it using pygit2.

	Args:
	    branch_name: Name of the branch to create.
	    from_reference: Optional reference (branch name, commit SHA) to create the branch from.
	                    Defaults to current HEAD.

	Raises:
	    GitError: If branch creation or checkout fails.
	"""
	try:
		if from_reference:
			commit_obj = self.repo.revparse_single(from_reference)
			if not commit_obj:
				msg = f"Could not resolve 'from_reference': {from_reference}"
				logger.error(msg)
				raise GitError(msg)
			source_commit = commit_obj.peel(Commit)
		else:
			if self.repo.head_is_unborn:
				msg = "Cannot create branch from unborn HEAD. Please make an initial commit."
				logger.error(msg)
				raise GitError(msg)
			source_commit = self.repo.head.peel(Commit)

		self.repo.create_branch(branch_name, source_commit)
		logger.info(f"Branch '{branch_name}' created from '{source_commit.id}'.")
		self.checkout_branch(branch_name)  # Checkout after creation
	except GitError as e:
		msg = f"Failed to create branch '{branch_name}' using pygit2: {e}"
		logger.exception(msg)
		raise GitError(msg) from e
	except Exception as e:
		msg = f"An unexpected error occurred while creating branch '{branch_name}': {e}"
		logger.exception(msg)
		raise GitError(msg) from e

checkout_branch

checkout_branch(branch_name: str) -> None

Checkout an existing branch using pygit2.

Parameters:

Name Type Description Default
branch_name str

Name of the branch to checkout.

required

Raises:

Type Description
GitError

If checkout fails.

Source code in src/codemap/git/pr_generator/pr_git_utils.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def checkout_branch(self, branch_name: str) -> None:
	"""
	Checkout an existing branch using pygit2.

	Args:
	    branch_name: Name of the branch to checkout.

	Raises:
	    GitError: If checkout fails.
	"""
	try:
		# Construct the full ref name
		ref_name = f"refs/heads/{branch_name}"
		branch_obj = self.repo.lookup_reference(ref_name)
		self.repo.checkout(branch_obj)
		# Update self.branch after checkout, consistent with GitRepoContext constructor
		current_branch_obj = self.repo
		if not current_branch_obj.head_is_detached:
			self.branch = current_branch_obj.head.shorthand
		else:
			self.branch = ""  # Or perhaps the SHA for detached head
		logger.info(f"Checked out branch '{branch_name}' using pygit2.")
		# Run post-checkout hook if present
		if hook_exists("post-checkout"):
			exit_code = run_hook("post-checkout")
			if exit_code != 0:
				logger.warning("post-checkout hook failed (branch already checked out)")
	except GitError as e:
		msg = f"Failed to checkout branch '{branch_name}' using pygit2: {e}"
		logger.exception(msg)
		raise GitError(msg) from e
	except Exception as e:
		msg = f"An unexpected error occurred while checking out branch '{branch_name}': {e}"
		logger.exception(msg)
		raise GitError(msg) from e

push_branch

push_branch(
	branch_name: str,
	force: bool = False,
	remote_name: str = "origin",
	ignore_hooks: bool = False,
) -> None

Push a branch to the remote using pygit2.

Parameters:

Name Type Description Default
branch_name str

Name of the branch to push.

required
force bool

Whether to force push.

False
remote_name str

Name of the remote (e.g., "origin").

'origin'
ignore_hooks bool

If True, skip running the pre-push hook.

False

Raises:

Type Description
GitError

If push fails or pre-push hook fails.

Source code in src/codemap/git/pr_generator/pr_git_utils.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def push_branch(
	self, branch_name: str, force: bool = False, remote_name: str = "origin", ignore_hooks: bool = False
) -> None:
	"""
	Push a branch to the remote using pygit2.

	Args:
	    branch_name: Name of the branch to push.
	    force: Whether to force push.
	    remote_name: Name of the remote (e.g., "origin").
	    ignore_hooks: If True, skip running the pre-push hook.

	Raises:
	    GitError: If push fails or pre-push hook fails.
	"""
	# Run pre-push hook if not ignored
	if not ignore_hooks:
		exit_code = run_hook("pre-push")
		if exit_code != 0:
			msg = "pre-push hook failed, aborting push."
			logger.error(msg)
			raise GitError(msg)
	try:
		remote = self.repo.remotes[remote_name]
		local_ref = f"refs/heads/{branch_name}"
		remote_ref = f"refs/heads/{branch_name}"

		refspec = f"{'+' if force else ''}{local_ref}:{remote_ref}"

		logger.info(
			f"Attempting to push branch '{branch_name}' to remote "
			f"'{remote_name}' with refspec '{refspec}' using pygit2."
		)

		# Import the proper pygit2 credential classes
		import shlex
		import subprocess
		from pathlib import Path
		from urllib.parse import urlparse

		from pygit2.callbacks import RemoteCallbacks
		from pygit2.enums import CredentialType

		# Create a credential class to handle SSH and username/password authentication
		class GitCredential:
			def __init__(self, cred_type: CredentialType, *args: str | None) -> None:
				self.credential_type = cred_type
				self.credential_tuple = args

		def credential_callback(
			url: str, username_from_url: str | None, allowed_types: CredentialType
		) -> GitCredential:
			"""
			Callback to handle credential requests from pygit2.

			Args:
				url: The URL being authenticated against
				username_from_url: Username extracted from the URL if present
				allowed_types: Bitmask of allowed credential types

			Returns:
				A credential object for authentication
			"""
			logger.debug(f"Authentication required for {url} (allowed types: {allowed_types})")

			# Get username from URL or use default from git config
			username = username_from_url
			if not username:
				try:
					config = self.repo.config
					username = config["user.name"]
				except (KeyError, AttributeError) as e:
					# Default if we can't get from config
					logger.debug(f"Could not get username from git config: {e}")
					username = "git"

			# Try SSH agent authentication first (if available)
			if CredentialType.SSH_KEY in allowed_types:
				logger.debug(f"Attempting SSH agent authentication for {username}")
				return GitCredential(CredentialType.SSH_KEY, username, None, None, "")

			# Try SSH key authentication if agent is not available
			if CredentialType.SSH_KEY in allowed_types:
				try:
					# Common SSH key paths
					ssh_dir = Path.home() / ".ssh"
					key_paths = [
						ssh_dir / "id_rsa",
						ssh_dir / "id_ed25519",
						ssh_dir / "id_ecdsa",
						ssh_dir / "id_dsa",
					]

					for private_key_path in key_paths:
						public_key_path = Path(f"{private_key_path}.pub")
						if private_key_path.exists() and public_key_path.exists():
							logger.debug(f"Attempting SSH key authentication with {private_key_path}")
							return GitCredential(
								CredentialType.SSH_KEY, username, str(public_key_path), str(private_key_path), ""
							)
				except OSError as e:
					logger.debug(f"SSH key authentication failed: {e}")

			# Try username/password if SSH is not available or didn't work
			if CredentialType.USERPASS_PLAINTEXT in allowed_types:
				try:
					# Extract hostname from URL
					parsed_url = urlparse(url)
					hostname = parsed_url.netloc

					# Use git credential fill to get credentials - this command is safe as it's hardcoded
					cmd = "git credential fill"
					# Use shlex.split for secure command execution
					process = subprocess.Popen(  # noqa: S603
						shlex.split(cmd),
						stdin=subprocess.PIPE,
						stdout=subprocess.PIPE,
						stderr=subprocess.PIPE,
						text=True,
					)

					# Provide input for git credential fill
					input_data = f"protocol={parsed_url.scheme}\nhost={hostname}\n\n"
					stdout, _ = process.communicate(input=input_data)

					if process.returncode == 0 and stdout:
						# Parse the output
						credentials = {}
						for line in stdout.splitlines():
							if "=" in line:
								key, value = line.split("=", 1)
								credentials[key] = value

						if "username" in credentials and "password" in credentials:
							logger.debug(f"Using username/password authentication for {credentials['username']}")
							return GitCredential(
								CredentialType.USERPASS_PLAINTEXT, credentials["username"], credentials["password"]
							)
				except (subprocess.SubprocessError, OSError) as e:
					logger.debug(f"Username/password authentication failed: {e}")

			# If nothing else works, try username-only authentication
			if CredentialType.USERNAME in allowed_types:
				logger.debug(f"Falling back to username-only authentication for {username}")
				return GitCredential(CredentialType.USERNAME, username)

			# If we get here, we couldn't find suitable credentials
			logger.warning(f"No suitable authentication method found for {url}")
			msg = "No suitable authentication method available"
			raise Pygit2GitError(msg)

		# Create callback object with our credential callback
		callbacks = RemoteCallbacks(credentials=credential_callback)

		# Pass callbacks to the push method
		remote.push([refspec], callbacks=callbacks)
		logger.info(f"Branch '{branch_name}' pushed to remote '{remote_name}' using pygit2.")
	except (Pygit2GitError, KeyError) as e:  # KeyError for remote_name not found
		msg = f"Failed to push branch '{branch_name}' to remote '{remote_name}' using pygit2: {e}"
		logger.exception(msg)
		raise GitError(msg) from e
	except GitError as e:  # Catch codemap's GitError if it somehow occurred before
		msg = f"Git operation error while pushing branch '{branch_name}': {e}"
		logger.exception(msg)
		raise GitError(msg) from e
	except Exception as e:
		msg = f"An unexpected error occurred while pushing branch '{branch_name}': {e}"
		logger.exception(msg)
		raise GitError(msg) from e

get_commit_messages

get_commit_messages(
	base_branch: str, head_branch: str
) -> list[str]

Get commit messages (summaries) between two branches using pygit2.

This lists commits that are in head_branch but not in base_branch.

Parameters:

Name Type Description Default
base_branch str

Base branch name/ref (e.g., "main").

required
head_branch str

Head branch name/ref (e.g., "feature-branch").

required

Returns:

Type Description
list[str]

List of commit message summaries.

Raises:

Type Description
GitError

If retrieving commits fails.

Source code in src/codemap/git/pr_generator/pr_git_utils.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
def get_commit_messages(self, base_branch: str, head_branch: str) -> list[str]:
	"""
	Get commit messages (summaries) between two branches using pygit2.

	This lists commits that are in head_branch but not in base_branch.

	Args:
	    base_branch: Base branch name/ref (e.g., "main").
	    head_branch: Head branch name/ref (e.g., "feature-branch").

	Returns:
	    List of commit message summaries.

	Raises:
	    GitError: If retrieving commits fails.
	"""
	try:
		if not base_branch or not head_branch:
			logger.warning("Base or head branch is None/empty, cannot get commit messages.")
			return []

		def _resolve_to_commit_oid(branch_spec: str) -> Oid:
			obj = self.repo.revparse_single(branch_spec)
			if not obj:
				msg = f"Could not resolve '{branch_spec}'"
				logger.error(msg)
				raise GitError(msg)
			# Ensure it's a commit (could be a tag pointing to another tag, etc.)
			commit_obj = obj.peel(Commit)
			return commit_obj.id

		base_oid = _resolve_to_commit_oid(base_branch)
		head_oid = _resolve_to_commit_oid(head_branch)

		walker = self.repo.walk(head_oid, SortMode.TOPOLOGICAL)
		walker.hide(base_oid)

		commit_messages = []
		for commit_pygit2 in walker:
			# commit_pygit2.message is the full message. Get summary (first line).
			message_summary = commit_pygit2.message.splitlines()[0].strip() if commit_pygit2.message else ""
			commit_messages.append(message_summary)

		logger.info(f"Found {len(commit_messages)} commit messages between '{base_branch}' and '{head_branch}'.")
		return commit_messages

	except (Pygit2GitError, GitError) as e:
		msg = f"Failed to get commit messages between '{base_branch}' and '{head_branch}' using pygit2: {e}"
		logger.exception(msg)
		raise GitError(msg) from e
	except Exception as e:
		msg = (
			f"An unexpected error occurred while getting commit messages "
			f"between '{base_branch}' and '{head_branch}': {e}"
		)
		logger.exception(msg)
		raise GitError(msg) from e

get_branch_relation

get_branch_relation(
	branch_ref_name: str, target_branch_ref_name: str
) -> tuple[bool, int]

Get the relationship between two branches using pygit2.

Parameters:

Name Type Description Default
branch_ref_name str

The branch to check (e.g., "main", "origin/main").

required
target_branch_ref_name str

The target branch to compare against (e.g., "feature/foo").

required

Returns:

Type Description
bool

Tuple of (is_ancestor, commit_count)

int
  • is_ancestor: True if branch_ref_name is an ancestor of target_branch_ref_name.
tuple[bool, int]
  • commit_count: Number of commits in target_branch_ref_name that are not in branch_ref_name. (i.e., how many commits target is "ahead" of branch).

Raises:

Type Description
GitError

If branches cannot be resolved or other git issues occur.

Source code in src/codemap/git/pr_generator/pr_git_utils.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
def get_branch_relation(self, branch_ref_name: str, target_branch_ref_name: str) -> tuple[bool, int]:
	"""
	Get the relationship between two branches using pygit2.

	Args:
		branch_ref_name: The branch to check (e.g., "main", "origin/main").
		target_branch_ref_name: The target branch to compare against (e.g., "feature/foo").

	Returns:
		Tuple of (is_ancestor, commit_count)
		- is_ancestor: True if branch_ref_name is an ancestor of target_branch_ref_name.
		- commit_count: Number of commits in target_branch_ref_name that are not in branch_ref_name.
					(i.e., how many commits target is "ahead" of branch).

	Raises:
		GitError: If branches cannot be resolved or other git issues occur.
	"""
	try:
		if not branch_ref_name or not target_branch_ref_name:
			logger.warning("Branch or target branch name is None/empty for relation check.")
			return False, 0

		# Resolve branch names to Oids. revparse_single can handle local and remote-like refs.
		branch_commit_obj = self.repo.revparse_single(branch_ref_name)
		if not branch_commit_obj:
			msg = f"Could not resolve branch: {branch_ref_name}"
			logger.error(msg)
			raise GitError(msg)
		branch_oid = branch_commit_obj.peel(Commit).id

		target_commit_obj = self.repo.revparse_single(target_branch_ref_name)
		if not target_commit_obj:
			msg = f"Could not resolve target branch: {target_branch_ref_name}"
			logger.error(msg)
			raise GitError(msg)
		target_oid = target_commit_obj.peel(Commit).id

		# Check if branch_oid is an ancestor of target_oid
		# pygit2's descendant_of(A, B) means "is A a descendant of B?"
		# So, is_ancestor (branch is ancestor of target) means target is descendant of branch.
		is_ancestor = self.repo.descendant_of(target_oid, branch_oid)

		# Get commit count: commits in target_oid that are not in branch_oid.
		# ahead_behind(A, B) returns (commits in A not in B, commits in B not in A)
		# We want commits in target_oid not in branch_oid.
		# So, if A=target_oid, B=branch_oid, we want the first value (ahead).
		ahead, _ = self.repo.ahead_behind(target_oid, branch_oid)
		commit_count_target_ahead = ahead  # Renaming for clarity

		logger.debug(
			f"Branch relation: {branch_ref_name} vs {target_branch_ref_name}. "
			f"Is ancestor: {is_ancestor}, Target ahead by: {commit_count_target_ahead}"
		)
		return is_ancestor, commit_count_target_ahead

	except Pygit2GitError as e:
		msg = (
			f"Pygit2 error determining branch relation between "
			f"'{branch_ref_name}' and '{target_branch_ref_name}': {e}"
		)
		logger.warning(msg)
		raise GitError(msg) from e  # Wrap in codemap's GitError
	except GitError as e:  # Catch codemap's GitError if raised by _resolve_to_commit_oid or similar
		msg = (
			f"Codemap GitError determining branch relation between '{branch_ref_name}' and "
			f"'{target_branch_ref_name}': {e}"
		)
		logger.warning(msg)
		raise  # Re-raise as it's already the correct type
	except Exception as e:  # Catch any other unexpected non-Git errors
		msg = (
			f"Unexpected error determining branch relation between '{branch_ref_name}' and "
			f"'{target_branch_ref_name}': {e}"
		)
		logger.warning(msg)
		raise GitError(msg) from e  # Wrap in codemap's GitError