Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 46 additions & 36 deletions livekit-rtc/livekit/rtc/_ffi_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,27 @@
atexit.register(_resource_files.close)


def _lib_name():
if platform.system() == "Linux":
return "liblivekit_ffi.so"
elif platform.system() == "Darwin":
return "liblivekit_ffi.dylib"
elif platform.system() == "Windows":
return "livekit_ffi.dll"
return None


def get_ffi_lib():
# allow to override the lib path using an env var
libpath = os.environ.get("LIVEKIT_LIB_PATH", "").strip()
if libpath:
return ctypes.CDLL(libpath)

if platform.system() == "Linux":
libname = "liblivekit_ffi.so"
elif platform.system() == "Darwin":
libname = "liblivekit_ffi.dylib"
elif platform.system() == "Windows":
libname = "livekit_ffi.dll"
else:
libname = _lib_name()
if libname is None:
raise Exception(
f"no ffi library found for platform {platform.system()}. \
Set LIVEKIT_LIB_PATH to specify a the lib path"
f"no ffi library found for platform {platform.system()}. "
"Set LIVEKIT_LIB_PATH to specify the lib path"
)

res = importlib.resources.files("livekit.rtc.resources") / libname
Expand All @@ -58,32 +63,8 @@ def get_ffi_lib():
return ctypes.CDLL(str(path))


ffi_lib = get_ffi_lib()
ffi_cb_fnc = ctypes.CFUNCTYPE(None, ctypes.POINTER(ctypes.c_uint8), ctypes.c_size_t)

# C function types
ffi_lib.livekit_ffi_initialize.argtypes = [
ffi_cb_fnc,
ctypes.c_bool,
ctypes.c_char_p,
ctypes.c_char_p,
]

ffi_lib.livekit_ffi_request.argtypes = [
ctypes.POINTER(ctypes.c_ubyte),
ctypes.c_size_t,
ctypes.POINTER(ctypes.POINTER(ctypes.c_ubyte)),
ctypes.POINTER(ctypes.c_size_t),
]
ffi_lib.livekit_ffi_request.restype = ctypes.c_uint64

ffi_lib.livekit_ffi_drop_handle.argtypes = [ctypes.c_uint64]
ffi_lib.livekit_ffi_drop_handle.restype = ctypes.c_bool


ffi_lib.livekit_ffi_dispose.argtypes = []
ffi_lib.livekit_ffi_dispose.restype = None

INVALID_HANDLE = 0


Expand All @@ -102,7 +83,7 @@ def disposed(self) -> bool:
def dispose(self) -> None:
if self.handle != INVALID_HANDLE and not self._disposed:
self._disposed = True
assert ffi_lib.livekit_ffi_drop_handle(ctypes.c_uint64(self.handle))
assert FfiClient.instance._ffi_lib.livekit_ffi_drop_handle(ctypes.c_uint64(self.handle))

def __repr__(self) -> str:
return f"FfiHandle({self.handle})"
Expand Down Expand Up @@ -214,10 +195,39 @@ def __init__(self) -> None:
self._lock = threading.RLock()
self._queue = FfiQueue[proto_ffi.FfiEvent]()

ffi_lib.livekit_ffi_initialize(
try:
self._ffi_lib = get_ffi_lib()
except Exception as e:
libname = _lib_name() or "livekit_ffi"
raise ImportError(
"failed to load %s: %s\n"
"Install the livekit package with: pip install livekit\n"
"Or set LIVEKIT_LIB_PATH to the path of the native library." % (libname, e)
) from None
self._ffi_lib.livekit_ffi_initialize.argtypes = [
ffi_cb_fnc,
ctypes.c_bool,
ctypes.c_char_p,
ctypes.c_char_p,
]
self._ffi_lib.livekit_ffi_request.argtypes = [
ctypes.POINTER(ctypes.c_ubyte),
ctypes.c_size_t,
ctypes.POINTER(ctypes.POINTER(ctypes.c_ubyte)),
ctypes.POINTER(ctypes.c_size_t),
]
self._ffi_lib.livekit_ffi_request.restype = ctypes.c_uint64
self._ffi_lib.livekit_ffi_drop_handle.argtypes = [ctypes.c_uint64]
self._ffi_lib.livekit_ffi_drop_handle.restype = ctypes.c_bool
self._ffi_lib.livekit_ffi_dispose.argtypes = []
self._ffi_lib.livekit_ffi_dispose.restype = None

self._ffi_lib.livekit_ffi_initialize(
ffi_event_callback, True, b"python", __version__.encode("ascii")
)

ffi_lib = self._ffi_lib

@atexit.register
def _dispose_lk_ffi():
ffi_lib.livekit_ffi_dispose()
Expand All @@ -233,7 +243,7 @@ def request(self, req: proto_ffi.FfiRequest) -> proto_ffi.FfiResponse:

resp_ptr = ctypes.POINTER(ctypes.c_ubyte)()
resp_len = ctypes.c_size_t()
handle = ffi_lib.livekit_ffi_request(
handle = self._ffi_lib.livekit_ffi_request(
data, proto_len, ctypes.byref(resp_ptr), ctypes.byref(resp_len)
)
assert handle != INVALID_HANDLE
Expand Down
4 changes: 1 addition & 3 deletions livekit-rtc/livekit/rtc/participant.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,7 @@ async def perform_rpc(
queue = FfiClient.instance.queue.subscribe()
try:
resp = FfiClient.instance.request(req)
cb = await queue.wait_for(
lambda e: (e.perform_rpc.async_id == resp.perform_rpc.async_id)
)
cb = await queue.wait_for(lambda e: e.perform_rpc.async_id == resp.perform_rpc.async_id)
finally:
FfiClient.instance.queue.unsubscribe(queue)

Expand Down
18 changes: 12 additions & 6 deletions tests/rtc/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,11 @@ def on_room2_track_unpublished(
await room1.connect(url, token1)

await assert_eventually(
lambda: len(events["connection_state_changed"]) > 0
and events["connection_state_changed"][-1]
== f"room1-{rtc.ConnectionState.CONN_CONNECTED}",
lambda: (
len(events["connection_state_changed"]) > 0
and events["connection_state_changed"][-1]
== f"room1-{rtc.ConnectionState.CONN_CONNECTED}"
),
message="room1 connection_state_changed event not fired or did not reach CONN_CONNECTED state",
)

Expand Down Expand Up @@ -370,9 +372,13 @@ def on_room2_track_unpublished(
await room1.disconnect()

await assert_eventually(
lambda: lambda: len(events["connection_state_changed"]) > 0
and events["connection_state_changed"][-1]
== f"room1-{rtc.ConnectionState.CONN_DISCONNECTED}",
lambda: (
lambda: (
len(events["connection_state_changed"]) > 0
and events["connection_state_changed"][-1]
== f"room1-{rtc.ConnectionState.CONN_DISCONNECTED}"
)
),
message="room1 disconnected event not fired",
)

Expand Down
Loading