Skip to content

Client

Client interface for interacting with the database in CodeMap.

logger module-attribute

logger = getLogger(__name__)

DatabaseClient

Provides high-level methods to interact with the CodeMap database.

Source code in src/codemap/db/client.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
class DatabaseClient:
	"""Provides high-level methods to interact with the CodeMap database."""

	def __init__(self) -> None:
		"""
		Initializes the database client.

		It sets up the client in an uninitialized state. The actual
		initialization needs to be performed by calling the async initialize()
		method or waiting for _initialize_engine_if_needed to run when
		required.

		"""
		self.engine = None  # Initialize engine as None
		self.initialized = False  # Flag to track initialization status
		self._init_task = None  # Store reference to initialization task

		# Initialize engine in event loop if possible
		try:
			if asyncio.get_event_loop().is_running():
				self._init_task = asyncio.create_task(self.initialize())
			else:
				# In sync context, create a new event loop to initialize
				loop = asyncio.new_event_loop()
				loop.run_until_complete(self.initialize())
				loop.close()
		except RuntimeError:
			# No event loop available, will initialize on first use
			logger.debug("No event loop available during DatabaseClient init, will initialize on demand")

	async def initialize(self) -> None:
		"""
		Asynchronously initialize the database client.

		This should be called after creating the client and before using it.

		"""
		await self._initialize_engine()
		self.initialized = True
		logger.info("Database client successfully initialized")

	async def _initialize_engine(self) -> None:
		"""Asynchronously gets the engine and creates tables."""
		if self.engine is None:
			try:
				self.engine = await get_engine()  # Await the async function
				# create_db_and_tables is synchronous, run it after engine is ready
				create_db_and_tables(self.engine)
				logger.info("Database client initialized with PostgreSQL engine.")
			except RuntimeError:
				logger.exception("Failed to initialize database engine")
				# Decide how to handle this error - maybe raise, maybe set engine to None?
				# For now, re-raising to make the failure explicit.
				raise
			except Exception:
				logger.exception("An unexpected error occurred during database initialization")
				raise

	async def _initialize_engine_if_needed(self) -> None:
		"""Ensure engine is initialized before use."""
		if not self.initialized or self.engine is None:
			await self.initialize()

	async def cleanup(self) -> None:
		"""
		Asynchronously cleanup the database client resources.

		This should be called before discarding the client.

		"""
		if self.engine:
			# No need to close Engine in SQLAlchemy 2.0, but dispose will close connections
			if hasattr(self.engine, "dispose"):
				self.engine.dispose()
			self.engine = None
		self.initialized = False
		logger.info("Database client cleaned up")

	# Ensure engine is initialized before DB operations
	async def _ensure_engine_initialized(self) -> None:
		"""Ensures the database engine is properly initialized.

		This method checks if the engine is initialized and attempts to initialize it if not.
		If initialization fails, it logs an error and raises a RuntimeError.

		Raises:
		    RuntimeError: If database client initialization fails after attempting to initialize.

		Returns:
		    None: This method doesn't return anything but ensures engine is ready for use.
		"""
		if not self.initialized or self.engine is None:
			await self._initialize_engine_if_needed()
			if not self.initialized or self.engine is None:
				msg = "Database client initialization failed."
				logger.error(msg)
				raise RuntimeError(msg)

	def add_chat_message(
		self,
		session_id: str,
		user_query: str,
		ai_response: str | None = None,
		context: str | None = None,
		tool_calls: str | None = None,
	) -> ChatHistory:
		"""
		Adds a chat message to the history.

		Args:
		    session_id (str): The session identifier.
		    user_query (str): The user's query.
		    ai_response (Optional[str]): The AI's response.
		    context (Optional[str]): JSON string of context used.
		    tool_calls (Optional[str]): JSON string of tool calls made.

		Returns:
		    ChatHistory: The newly created chat history record.

		"""
		# Ensure engine is initialized - run in a new event loop if needed
		if not self.initialized or self.engine is None:
			loop = asyncio.new_event_loop()
			try:
				loop.run_until_complete(self._ensure_engine_initialized())
			finally:
				loop.close()

		if self.engine is None:
			# This should ideally not happen if _ensure_engine_initialized worked
			msg = "Database engine is not initialized after check."
			logger.error(msg)
			raise RuntimeError(msg)

		chat_entry = ChatHistory(
			session_id=session_id,
			user_query=user_query,
			ai_response=ai_response,
			context=context,
			tool_calls=tool_calls,
		)
		try:
			with get_session(self.engine) as session:
				session.add(chat_entry)
				session.commit()
				session.refresh(chat_entry)
				logger.debug(f"Added chat message for session {session_id} to DB (ID: {chat_entry.id}).")
				return chat_entry
		except Exception:
			logger.exception("Error adding chat message")
			raise  # Re-raise after logging

	def get_chat_history(self, session_id: str, limit: int = 50) -> list[ChatHistory]:
		"""
		Retrieves chat history for a session, ordered chronologically.

		Args:
		    session_id (str): The session identifier.
		    limit (int): The maximum number of messages to return.

		Returns:
		    List[ChatHistory]: A list of chat history records.

		"""
		# Ensure engine is initialized - run in a new event loop if needed
		if not self.initialized or self.engine is None:
			loop = asyncio.new_event_loop()
			try:
				loop.run_until_complete(self._ensure_engine_initialized())
			finally:
				loop.close()

		if self.engine is None:
			# This should ideally not happen if _ensure_engine_initialized worked
			msg = "Database engine is not initialized after check."
			logger.error(msg)
			raise RuntimeError(msg)

		statement = (
			select(ChatHistory)
			.where(ChatHistory.session_id == session_id)
			# Using type ignore as the linter incorrectly flags the type
			.order_by(asc(ChatHistory.timestamp))  # type: ignore[arg-type]
			.limit(limit)
		)
		try:
			with get_session(self.engine) as session:
				results = session.exec(statement).all()
				logger.debug(f"Retrieved {len(results)} messages for session {session_id}.")
				return list(results)
		except Exception:
			logger.exception("Error retrieving chat history")
			raise

	def update_chat_response(self, message_id: int, ai_response: str) -> bool:
		"""
		Updates the AI response for a specific chat message.

		Args:
		        message_id (int): The primary key ID of the chat message to update.
		        ai_response (str): The new AI response text.

		Returns:
		        bool: True if the update was successful, False otherwise.

		"""
		# Ensure engine is initialized - run in a new event loop if needed
		if not self.initialized or self.engine is None:
			loop = asyncio.new_event_loop()
			try:
				loop.run_until_complete(self._ensure_engine_initialized())
			finally:
				loop.close()

		if self.engine is None:
			logger.error("Cannot update chat response: Database engine not initialized.")
			return False

		try:
			with get_session(self.engine) as session:
				db_entry = session.get(ChatHistory, message_id)
				if db_entry:
					db_entry.ai_response = ai_response
					session.commit()
					logger.debug(f"Updated DB entry {message_id} with AI response.")
					return True
				logger.warning(f"Chat message with ID {message_id} not found for update.")
				return False
		except SQLAlchemyError:
			logger.exception(f"Database error updating chat response for message ID {message_id}")
			return False
		except Exception:
			logger.exception(f"Unexpected error updating chat response for message ID {message_id}")
			return False

__init__

__init__() -> None

Initializes the database client.

It sets up the client in an uninitialized state. The actual initialization needs to be performed by calling the async initialize() method or waiting for _initialize_engine_if_needed to run when required.

Source code in src/codemap/db/client.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def __init__(self) -> None:
	"""
	Initializes the database client.

	It sets up the client in an uninitialized state. The actual
	initialization needs to be performed by calling the async initialize()
	method or waiting for _initialize_engine_if_needed to run when
	required.

	"""
	self.engine = None  # Initialize engine as None
	self.initialized = False  # Flag to track initialization status
	self._init_task = None  # Store reference to initialization task

	# Initialize engine in event loop if possible
	try:
		if asyncio.get_event_loop().is_running():
			self._init_task = asyncio.create_task(self.initialize())
		else:
			# In sync context, create a new event loop to initialize
			loop = asyncio.new_event_loop()
			loop.run_until_complete(self.initialize())
			loop.close()
	except RuntimeError:
		# No event loop available, will initialize on first use
		logger.debug("No event loop available during DatabaseClient init, will initialize on demand")

engine instance-attribute

engine = None

initialized instance-attribute

initialized = False

initialize async

initialize() -> None

Asynchronously initialize the database client.

This should be called after creating the client and before using it.

Source code in src/codemap/db/client.py
46
47
48
49
50
51
52
53
54
55
async def initialize(self) -> None:
	"""
	Asynchronously initialize the database client.

	This should be called after creating the client and before using it.

	"""
	await self._initialize_engine()
	self.initialized = True
	logger.info("Database client successfully initialized")

cleanup async

cleanup() -> None

Asynchronously cleanup the database client resources.

This should be called before discarding the client.

Source code in src/codemap/db/client.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
async def cleanup(self) -> None:
	"""
	Asynchronously cleanup the database client resources.

	This should be called before discarding the client.

	"""
	if self.engine:
		# No need to close Engine in SQLAlchemy 2.0, but dispose will close connections
		if hasattr(self.engine, "dispose"):
			self.engine.dispose()
		self.engine = None
	self.initialized = False
	logger.info("Database client cleaned up")

add_chat_message

add_chat_message(
	session_id: str,
	user_query: str,
	ai_response: str | None = None,
	context: str | None = None,
	tool_calls: str | None = None,
) -> ChatHistory

Adds a chat message to the history.

Parameters:

Name Type Description Default
session_id str

The session identifier.

required
user_query str

The user's query.

required
ai_response Optional[str]

The AI's response.

None
context Optional[str]

JSON string of context used.

None
tool_calls Optional[str]

JSON string of tool calls made.

None

Returns:

Name Type Description
ChatHistory ChatHistory

The newly created chat history record.

Source code in src/codemap/db/client.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def add_chat_message(
	self,
	session_id: str,
	user_query: str,
	ai_response: str | None = None,
	context: str | None = None,
	tool_calls: str | None = None,
) -> ChatHistory:
	"""
	Adds a chat message to the history.

	Args:
	    session_id (str): The session identifier.
	    user_query (str): The user's query.
	    ai_response (Optional[str]): The AI's response.
	    context (Optional[str]): JSON string of context used.
	    tool_calls (Optional[str]): JSON string of tool calls made.

	Returns:
	    ChatHistory: The newly created chat history record.

	"""
	# Ensure engine is initialized - run in a new event loop if needed
	if not self.initialized or self.engine is None:
		loop = asyncio.new_event_loop()
		try:
			loop.run_until_complete(self._ensure_engine_initialized())
		finally:
			loop.close()

	if self.engine is None:
		# This should ideally not happen if _ensure_engine_initialized worked
		msg = "Database engine is not initialized after check."
		logger.error(msg)
		raise RuntimeError(msg)

	chat_entry = ChatHistory(
		session_id=session_id,
		user_query=user_query,
		ai_response=ai_response,
		context=context,
		tool_calls=tool_calls,
	)
	try:
		with get_session(self.engine) as session:
			session.add(chat_entry)
			session.commit()
			session.refresh(chat_entry)
			logger.debug(f"Added chat message for session {session_id} to DB (ID: {chat_entry.id}).")
			return chat_entry
	except Exception:
		logger.exception("Error adding chat message")
		raise  # Re-raise after logging

get_chat_history

get_chat_history(
	session_id: str, limit: int = 50
) -> list[ChatHistory]

Retrieves chat history for a session, ordered chronologically.

Parameters:

Name Type Description Default
session_id str

The session identifier.

required
limit int

The maximum number of messages to return.

50

Returns:

Type Description
list[ChatHistory]

List[ChatHistory]: A list of chat history records.

Source code in src/codemap/db/client.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def get_chat_history(self, session_id: str, limit: int = 50) -> list[ChatHistory]:
	"""
	Retrieves chat history for a session, ordered chronologically.

	Args:
	    session_id (str): The session identifier.
	    limit (int): The maximum number of messages to return.

	Returns:
	    List[ChatHistory]: A list of chat history records.

	"""
	# Ensure engine is initialized - run in a new event loop if needed
	if not self.initialized or self.engine is None:
		loop = asyncio.new_event_loop()
		try:
			loop.run_until_complete(self._ensure_engine_initialized())
		finally:
			loop.close()

	if self.engine is None:
		# This should ideally not happen if _ensure_engine_initialized worked
		msg = "Database engine is not initialized after check."
		logger.error(msg)
		raise RuntimeError(msg)

	statement = (
		select(ChatHistory)
		.where(ChatHistory.session_id == session_id)
		# Using type ignore as the linter incorrectly flags the type
		.order_by(asc(ChatHistory.timestamp))  # type: ignore[arg-type]
		.limit(limit)
	)
	try:
		with get_session(self.engine) as session:
			results = session.exec(statement).all()
			logger.debug(f"Retrieved {len(results)} messages for session {session_id}.")
			return list(results)
	except Exception:
		logger.exception("Error retrieving chat history")
		raise

update_chat_response

update_chat_response(
	message_id: int, ai_response: str
) -> bool

Updates the AI response for a specific chat message.

Parameters:

Name Type Description Default
message_id int

The primary key ID of the chat message to update.

required
ai_response str

The new AI response text.

required

Returns:

Name Type Description
bool bool

True if the update was successful, False otherwise.

Source code in src/codemap/db/client.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def update_chat_response(self, message_id: int, ai_response: str) -> bool:
	"""
	Updates the AI response for a specific chat message.

	Args:
	        message_id (int): The primary key ID of the chat message to update.
	        ai_response (str): The new AI response text.

	Returns:
	        bool: True if the update was successful, False otherwise.

	"""
	# Ensure engine is initialized - run in a new event loop if needed
	if not self.initialized or self.engine is None:
		loop = asyncio.new_event_loop()
		try:
			loop.run_until_complete(self._ensure_engine_initialized())
		finally:
			loop.close()

	if self.engine is None:
		logger.error("Cannot update chat response: Database engine not initialized.")
		return False

	try:
		with get_session(self.engine) as session:
			db_entry = session.get(ChatHistory, message_id)
			if db_entry:
				db_entry.ai_response = ai_response
				session.commit()
				logger.debug(f"Updated DB entry {message_id} with AI response.")
				return True
			logger.warning(f"Chat message with ID {message_id} not found for update.")
			return False
	except SQLAlchemyError:
		logger.exception(f"Database error updating chat response for message ID {message_id}")
		return False
	except Exception:
		logger.exception(f"Unexpected error updating chat response for message ID {message_id}")
		return False