Skip to content

File Watcher

File watcher module for CodeMap.

logger module-attribute

logger = getLogger(__name__)

FileChangeHandler

Bases: FileSystemEventHandler

Handles file system events and triggers a callback.

Source code in src/codemap/watcher/file_watcher.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class FileChangeHandler(FileSystemEventHandler):
	"""Handles file system events and triggers a callback."""

	def __init__(
		self,
		callback: Callable[[], Coroutine[None, None, None]],
		debounce_delay: float = 1.0,
		event_loop: asyncio.AbstractEventLoop | None = None,
	) -> None:
		"""
		Initialize the handler.

		Args:
		    callback: An async function to call when changes are detected.
		    debounce_delay: Minimum time (seconds) between callback triggers.
		    event_loop: The asyncio event loop to use, or None to use the current one.

		"""
		self.callback = callback
		self.debounce_delay = debounce_delay
		self._last_event_time: float = 0
		self._debounce_task: asyncio.Task | None = None
		# Set up a thread-safe way to communicate with the event loop
		self._event_queue = queue.Queue()
		self._event_processed = threading.Event()
		# Store or get the event loop
		self._event_loop = event_loop or asyncio.get_event_loop()
		# Flag to track if we're in the process of handling an event
		self._processing = False

	def _schedule_callback(self) -> None:
		"""Schedule the callback execution from a thread-safe context."""
		# If processing is already in progress, just return
		if self._processing:
			return

		# Put the event in the queue for the event loop to process
		self._event_processed.clear()
		self._event_queue.put_nowait("file_change")

		# Schedule the task in the event loop
		asyncio.run_coroutine_threadsafe(self._process_events(), self._event_loop)

	async def _process_events(self) -> None:
		"""Process events from the queue in the event loop's context."""
		if self._processing:
			return

		self._processing = True
		try:
			# Get an event from the queue
			while not self._event_queue.empty():
				_ = self._event_queue.get_nowait()

				# Cancel any existing debounce task
				if self._debounce_task and not self._debounce_task.done():
					self._debounce_task.cancel()
					logger.debug("Cancelled existing debounce task due to new event.")

				# Create a new debounce task within the event loop's context
				logger.debug(f"Scheduling new debounced callback with {self.debounce_delay}s delay.")
				self._debounce_task = self._event_loop.create_task(self._debounced_callback())
		finally:
			self._processing = False
			self._event_processed.set()

	async def _debounced_callback(self) -> None:
		"""Wait for the debounce period and then execute the callback."""
		try:
			await asyncio.sleep(self.debounce_delay)
			logger.info("Debounce delay finished, triggering sync callback.")
			await self.callback()
			self._last_event_time = time.monotonic()  # Update time after successful execution
			logger.debug("Watcher callback executed successfully.")
		except asyncio.CancelledError:
			logger.debug("Debounce task cancelled before execution.")
			# Do not run the callback if cancelled
		except Exception:
			logger.exception("Error executing watcher callback")
		finally:
			# Clear the task reference once it's done
			self._debounce_task = None

	def on_any_event(self, event: FileSystemEvent) -> None:
		"""
		Catch all events and schedule the callback after debouncing.

		Args:
		    event: The file system event.

		"""
		if event.is_directory:
			return  # Ignore directory events for now, focus on file changes

		# Log the specific event detected
		event_type = event.event_type
		src_path = getattr(event, "src_path", "N/A")
		dest_path = getattr(event, "dest_path", "N/A")  # For moved events

		if event_type == "moved":
			logger.debug(f"Detected file {event_type}: {src_path} -> {dest_path}")
		else:
			logger.debug(f"Detected file {event_type}: {src_path}")

		# Schedule the callback in a thread-safe way
		self._schedule_callback()

__init__

__init__(
	callback: Callable[[], Coroutine[None, None, None]],
	debounce_delay: float = 1.0,
	event_loop: AbstractEventLoop | None = None,
) -> None

Initialize the handler.

Parameters:

Name Type Description Default
callback Callable[[], Coroutine[None, None, None]]

An async function to call when changes are detected.

required
debounce_delay float

Minimum time (seconds) between callback triggers.

1.0
event_loop AbstractEventLoop | None

The asyncio event loop to use, or None to use the current one.

None
Source code in src/codemap/watcher/file_watcher.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(
	self,
	callback: Callable[[], Coroutine[None, None, None]],
	debounce_delay: float = 1.0,
	event_loop: asyncio.AbstractEventLoop | None = None,
) -> None:
	"""
	Initialize the handler.

	Args:
	    callback: An async function to call when changes are detected.
	    debounce_delay: Minimum time (seconds) between callback triggers.
	    event_loop: The asyncio event loop to use, or None to use the current one.

	"""
	self.callback = callback
	self.debounce_delay = debounce_delay
	self._last_event_time: float = 0
	self._debounce_task: asyncio.Task | None = None
	# Set up a thread-safe way to communicate with the event loop
	self._event_queue = queue.Queue()
	self._event_processed = threading.Event()
	# Store or get the event loop
	self._event_loop = event_loop or asyncio.get_event_loop()
	# Flag to track if we're in the process of handling an event
	self._processing = False

callback instance-attribute

callback = callback

debounce_delay instance-attribute

debounce_delay = debounce_delay

on_any_event

on_any_event(event: FileSystemEvent) -> None

Catch all events and schedule the callback after debouncing.

Parameters:

Name Type Description Default
event FileSystemEvent

The file system event.

required
Source code in src/codemap/watcher/file_watcher.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def on_any_event(self, event: FileSystemEvent) -> None:
	"""
	Catch all events and schedule the callback after debouncing.

	Args:
	    event: The file system event.

	"""
	if event.is_directory:
		return  # Ignore directory events for now, focus on file changes

	# Log the specific event detected
	event_type = event.event_type
	src_path = getattr(event, "src_path", "N/A")
	dest_path = getattr(event, "dest_path", "N/A")  # For moved events

	if event_type == "moved":
		logger.debug(f"Detected file {event_type}: {src_path} -> {dest_path}")
	else:
		logger.debug(f"Detected file {event_type}: {src_path}")

	# Schedule the callback in a thread-safe way
	self._schedule_callback()

Watcher

Monitors a directory for changes and triggers a callback.

Source code in src/codemap/watcher/file_watcher.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
class Watcher:
	"""Monitors a directory for changes and triggers a callback."""

	def __init__(
		self,
		path_to_watch: str | Path,
		on_change_callback: Callable[[], Coroutine[None, None, None]],
		debounce_delay: float = 1.0,
	) -> None:
		"""
		Initialize the watcher.

		Args:
		    path_to_watch: The directory path to monitor.
		    on_change_callback: Async function to call upon detecting changes.
		    debounce_delay: Delay in seconds to avoid rapid firing of callbacks.

		"""
		self.observer = Observer()
		self.path_to_watch = Path(path_to_watch).resolve()
		if not self.path_to_watch.is_dir():
			msg = f"Path to watch must be a directory: {self.path_to_watch}"
			raise ValueError(msg)

		# Save the current event loop to use for callbacks
		try:
			self._event_loop = asyncio.get_event_loop()
		except RuntimeError:
			# If we're not in an event loop context, create a new one
			self._event_loop = asyncio.new_event_loop()
			asyncio.set_event_loop(self._event_loop)

		self.event_handler = FileChangeHandler(on_change_callback, debounce_delay, event_loop=self._event_loop)
		self._stop_event = anyio.Event()  # Initialize the event

	async def start(self) -> None:
		"""Start monitoring the directory."""
		if not self.path_to_watch.exists():
			logger.warning(f"Watch path {self.path_to_watch} does not exist. Creating it.")
			self.path_to_watch.mkdir(parents=True, exist_ok=True)  # Ensure the directory exists

		self.observer.schedule(self.event_handler, str(self.path_to_watch), recursive=True)
		self.observer.start()
		logger.info(f"Started watching directory: {self.path_to_watch}")
		try:
			# Wait until the stop event is set
			await self._stop_event.wait()
		except KeyboardInterrupt:
			logger.info("Watcher stopped by user (KeyboardInterrupt).")
		finally:
			# Ensure stop is called regardless of how wait() exits
			self.stop()

	def stop(self) -> None:
		"""Stop monitoring the directory."""
		if self.observer.is_alive():
			self.observer.stop()
			self.observer.join()  # Wait for observer thread to finish
			logger.info("Watchdog observer stopped.")
		# Set the event to signal the start method to exit
		self._stop_event.set()
		logger.info("Watcher stop event set.")

__init__

__init__(
	path_to_watch: str | Path,
	on_change_callback: Callable[
		[], Coroutine[None, None, None]
	],
	debounce_delay: float = 1.0,
) -> None

Initialize the watcher.

Parameters:

Name Type Description Default
path_to_watch str | Path

The directory path to monitor.

required
on_change_callback Callable[[], Coroutine[None, None, None]]

Async function to call upon detecting changes.

required
debounce_delay float

Delay in seconds to avoid rapid firing of callbacks.

1.0
Source code in src/codemap/watcher/file_watcher.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def __init__(
	self,
	path_to_watch: str | Path,
	on_change_callback: Callable[[], Coroutine[None, None, None]],
	debounce_delay: float = 1.0,
) -> None:
	"""
	Initialize the watcher.

	Args:
	    path_to_watch: The directory path to monitor.
	    on_change_callback: Async function to call upon detecting changes.
	    debounce_delay: Delay in seconds to avoid rapid firing of callbacks.

	"""
	self.observer = Observer()
	self.path_to_watch = Path(path_to_watch).resolve()
	if not self.path_to_watch.is_dir():
		msg = f"Path to watch must be a directory: {self.path_to_watch}"
		raise ValueError(msg)

	# Save the current event loop to use for callbacks
	try:
		self._event_loop = asyncio.get_event_loop()
	except RuntimeError:
		# If we're not in an event loop context, create a new one
		self._event_loop = asyncio.new_event_loop()
		asyncio.set_event_loop(self._event_loop)

	self.event_handler = FileChangeHandler(on_change_callback, debounce_delay, event_loop=self._event_loop)
	self._stop_event = anyio.Event()  # Initialize the event

observer instance-attribute

observer = Observer()

path_to_watch instance-attribute

path_to_watch = resolve()

event_handler instance-attribute

event_handler = FileChangeHandler(
	on_change_callback,
	debounce_delay,
	event_loop=_event_loop,
)

start async

start() -> None

Start monitoring the directory.

Source code in src/codemap/watcher/file_watcher.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
async def start(self) -> None:
	"""Start monitoring the directory."""
	if not self.path_to_watch.exists():
		logger.warning(f"Watch path {self.path_to_watch} does not exist. Creating it.")
		self.path_to_watch.mkdir(parents=True, exist_ok=True)  # Ensure the directory exists

	self.observer.schedule(self.event_handler, str(self.path_to_watch), recursive=True)
	self.observer.start()
	logger.info(f"Started watching directory: {self.path_to_watch}")
	try:
		# Wait until the stop event is set
		await self._stop_event.wait()
	except KeyboardInterrupt:
		logger.info("Watcher stopped by user (KeyboardInterrupt).")
	finally:
		# Ensure stop is called regardless of how wait() exits
		self.stop()

stop

stop() -> None

Stop monitoring the directory.

Source code in src/codemap/watcher/file_watcher.py
179
180
181
182
183
184
185
186
187
def stop(self) -> None:
	"""Stop monitoring the directory."""
	if self.observer.is_alive():
		self.observer.stop()
		self.observer.join()  # Wait for observer thread to finish
		logger.info("Watchdog observer stopped.")
	# Set the event to signal the start method to exit
	self._stop_event.set()
	logger.info("Watcher stop event set.")