Compare commits

...

8 Commits

Author SHA1 Message Date
c130e8edea updated
Signed-off-by: Robert Shaw <robshaw@redhat.com>
2025-07-19 20:02:37 +00:00
88b009b540 updated
Signed-off-by: Robert Shaw <robshaw@redhat.com>
2025-07-19 19:47:01 +00:00
ca3156dffc updated
Signed-off-by: Robert Shaw <robshaw@redhat.com>
2025-07-19 18:17:53 +00:00
b7ac93cb08 updated
Signed-off-by: Robert Shaw <robshaw@redhat.com>
2025-07-19 18:13:13 +00:00
e0dd33aeff updated
Signed-off-by: Robert Shaw <robshaw@redhat.com>
2025-07-19 17:54:07 +00:00
edf35b63ce updated
Signed-off-by: Robert Shaw <robshaw@redhat.com>
2025-07-19 16:31:48 +00:00
4127593c30 updated
Signed-off-by: Robert Shaw <robshaw@redhat.com>
2025-07-19 16:30:26 +00:00
14f13ed690 added debug logging
Signed-off-by: Robert Shaw <robshaw@redhat.com>
2025-07-19 16:27:38 +00:00
2 changed files with 13 additions and 2 deletions

View File

@ -874,13 +874,17 @@ class NixlConnectorWorker:
Returns:
set of req_ids that have all done xfers
"""
current_time = time.perf_counter()
done_req_ids: set[str] = set()
for req_id, handles in list(transfers.items()):
in_progress = False
for handle, _xfer_stime in handles:
for handle, _xfer_stime, remote_id in handles:
xfer_state = self.nixl_wrapper.check_xfer_state(handle)
if xfer_state == "DONE":
self.nixl_wrapper.release_xfer_handle(handle)
logger.info("========= TRANSFER: req_id %s remote_engine_id %s transfer time: %s",
req_id, remote_id, current_time - _xfer_stime)
elif xfer_state == "PROC":
in_progress = True
continue
@ -1012,6 +1016,7 @@ class NixlConnectorWorker:
assert len(local_block_descs_ids) == len(remote_block_descs_ids)
# Prepare transfer with Nixl.
start = time.perf_counter()
handle = self.nixl_wrapper.make_prepped_xfer(
"READ",
local_xfer_side_handle,
@ -1019,15 +1024,20 @@ class NixlConnectorWorker:
remote_xfer_side_handle,
remote_block_descs_ids,
notif_msg=notif_id,
skip_desc_merge=True,
)
# Begin async xfer.
self.nixl_wrapper.transfer(handle)
end = time.perf_counter()
# logger.debug(
# "[nixl connector]: req_id %s transfer launch time %s",
# request_id, end - start)
# Use handle to check completion in future step().
# TODO (NickLucche) surface xfer elapsed time
self._recving_transfers[request_id].append(
(handle, time.perf_counter()))
(handle, time.perf_counter(), dst_engine_id))
def _get_block_descs_ids(self,
engine_id: str,

View File

@ -60,6 +60,7 @@ class AsyncLLM(EngineClient):
client_addresses: Optional[dict[str, str]] = None,
client_index: int = 0,
) -> None:
print("====== HELLO ======")
"""
Create an AsyncLLM.