Websocket requests including supported entity types#47
Websocket requests including supported entity types#47albaintor wants to merge 16 commits intounfoldedcircle:mainfrom
Conversation
Improved requests signatures Handled unkown entity types
39be2f9 to
df90062
Compare
|
All done and tested : #47 To test it and include it in the integrations : This is what I have done for Kodi and it now works on Remote 2 Thanks @kennymc-c for the work, chatgpt helped me also for the rest :-) |
|
I'm now getting timeouts no matter how high I set the timeout value. Right after the timeout the response is shown in the log. Could something blocking the response? |
|
I have fixed some stuff in the meantime : after authentication the integration is requesting supported entity types. Normally the remote should respond quickly to this request. |
|
@zehnm could advise otherwise |
|
Also with the current version I still get a timeout |
|
The problem comes from the remote core : it cannot accept any requests from the integration if it itself is waiting for a response from the integration after sending a request.
This is the reason why I chose to ignore the timeout in the ucapi and try again later, but the right way would be to accept requests at anytime on the core side. Don't know if this is easy to implement |
|
I'm back at my original non-blocking version that works fine in the same case. I also added a timeout to prevent a loop while waiting for an answer. As the timeout is 30 seconds just like the setup timeout I should get a possible exception before the setup itself times out. |
|
Can you be more specific : in your version if I remember correctly the requests took the control over the websocket server until the response occurs ? In that case no others requests from the remote would be handled. Anyway I don't understand how the result would differ from the new implementation : if you are in the setup flow and the remote expects a response from the integration, I don't get how you could send a request and get a response |
|
Ok I have found the cause : the core is not in cause. The websocket handle of requests is blocked until a response is done. |
|
While it now works during the setup process I think I'm running into a race condition after a driver restart when adding the available entitites after receiving the connect event. The remote is asking for the available entitites but they have not yet all been added. Also the connect event is sent right after the setup again so I don't think it's the right place for this anway. |
This could be due to this code but I don't understand the problem : the supported entity types are extracted right after authentication but before the connected event so before the request for available entities. This should introduce just a request/response but no race condition... do you have logs ? await self._authenticate(websocket, True)
# Request supported entity types from remote
asyncio.create_task(self._update_supported_entity_types(websocket))
self._events.emit(uc.Events.CLIENT_CONNECTED) |
|
According to my log the authentication respone comes after my request: |
|
Normally the driver should be started before the setup flow like this await api.init("driver.json", setup_flow.driver_setup_handler)Anyway, I have moved the call to entity types extraction inside the request for available entities. The problem should not occur anymore hopefully |
|
Is this ready for review? Otherwise please mark this PR as draft if there are more updates or fixes coming. |
We thought it was but after testings it needed some adjustements. I will test the modifications before switching it back to review |
|
Thanks, now it works like expected without any addintional code that checks for entity types. |
|
Same here, just tested on both my remote 2 and 3 : I misunderstood why the extraction of entity types didn't work. It was related to the blocking request on available entities. Now using tasks to handle websocket responses it works perfectly @zehnm we are done now :) |
zehnm
left a comment
There was a problem hiding this comment.
Please explain the reason(s) of using locks and tasks to process received WebSocket messages.
I don't think they are required and complicate the message processing and might even introduce issues for integration driver usage. But I could also missed something :-)
ucapi/api.py
Outdated
| websocket, | ||
| msg: str, | ||
| msg_data: dict[str, Any] | None = None, | ||
| *, | ||
| timeout: float = 10.0, | ||
| ) -> dict[str, Any]: | ||
| """ | ||
| Send a request over websocket and await the matching response. | ||
|
|
||
| - Uses a Future stored in self._ws_pending[websocket][req_id] | ||
| - Reader task (_handle_ws -> _process_ws_message) completes the future on 'resp' | ||
| - Raises TimeoutError on timeout | ||
| :param websocket: client connection | ||
| :param msg: event message name | ||
| :param msg_data: message data payload | ||
| :param timeout: timeout for message | ||
| """ | ||
| if websocket is None: | ||
| if not self._clients: | ||
| raise RuntimeError("No active websocket connection!") | ||
| websocket = next(iter(self._clients)) |
There was a problem hiding this comment.
websocket parameter is not defined as optional. The doc implies it's required.
I think this should be a mandatory parameter. Using the first available connection if not provided is confusing.
There was a problem hiding this comment.
I removed the optional parameter everywhere, however the question remains : how to call requests from integration driver ? A @Property clients should be added then to retrieve the list of websockets ?
ucapi/api.py
Outdated
| async with self._ws_send_locks[websocket]: | ||
| await websocket.send(json.dumps(payload)) |
There was a problem hiding this comment.
See my first comment about locks for self._req_id_lock = asyncio.Lock().
Even if locks are required, this code could raise a KeyError if the websocket disconnected and the lock was removed between the check on L494 and the usage here. --> every async call can stop the execution and the websocket might disconnect and go away, e.g. on L498.
| await self._process_ws_message(websocket, message) | ||
| asyncio.create_task(self._process_ws_message(websocket, message)) | ||
| elif isinstance(message, (bytes, bytearray, memoryview)): | ||
| # Binary message (protobuf in future) | ||
| await self._process_ws_binary_message(websocket, bytes(message)) | ||
| asyncio.create_task( | ||
| self._process_ws_binary_message(websocket, bytes(message)) | ||
| ) |
There was a problem hiding this comment.
Why are now tasks required to process the received messages?
There was a problem hiding this comment.
This was the result of our tests, let's take the following example :
- We (integration) receive a request from the remote
- While processing the request, we need to send a request back to the remote and awaiting its response
- In that case the main task is blocked because it is waiting for the step 1 to return
Concrete example that I encountered :
- Remote -> Integration : get available entities
- Integration -> remote : get supported entity types
- remote -> integration : supported entity types
- integration -> remote : available entity types (filtered with supported entity types)
Without tasks, step 2 is blocked
There was a problem hiding this comment.
It might be related to websocket's behaviour with asyncio, that there's a missing await asyncio.sleep(0) call: https://websockets.readthedocs.io/en/stable/faq/asyncio.html
But that's pure speculation right now, I'll try to reproduce this and also look into the asyncio task solution if that won't introduce other side effects.
There was a problem hiding this comment.
I think that await asyncio.sleep(0) is kind of a hack to make the event loop look after other awaiting stuff.
Creating task to handle response seems to be the clean way to unlock the receive (main) task. But it may introduce some overhead I don't know : tasks in asyncio are not threads but this implies pushing data in the event loop.
There was a problem hiding this comment.
This is no hack in an async context, depending what the client is doing. It's well documented in the websocket library, if the client is doing synchronous operations.
I have to dig deeper in the websocket library documentation about the async message callback. It's very well possible that the _handle_ws callback is awaited inside the websocket library, which means it should not be delayed for too long. Then it's clear why it doesn't work sending another WS message and waiting for the response inside the callback. The callback has to give control back, otherwise it blocks the event loop inside the websocket library. Additional await asyncio.sleep(0) won't solve it.
What I'm afraid of is that simply using an asyncio task for every received message might introduce other side effects. This is a major change in runtime behaviour.
|
I tested the latest changes including extraction of supported entity types |
zehnm
left a comment
There was a problem hiding this comment.
I can't approve creating asyncio tasks for every received WS message in the _handle_ws handler at the moment. It's simply too big of a runtime change, that requires a lot of time testing and making sure that no new side effects are introduced.
Processing a received message needs to yield back to the websocket library as fast as possible, as it was the case so far.
That means: no blocking calls in the message handler. If a received message has to trigger a lengthy operation, or has to wait for another WebSocket request, it has to be done in a separate task.
This PR does two things, both require proper solutions:
- Allowing an integration driver to send the defined requestes in the Integration-API, e.g. retrieven the localization configuration.
Not yet solved: how does the integration driver get the websocket handle? See my comment with the additional event parameter. - Filtering available entities, based on the supported entity types of the Remote device.
Not yet solved: when to send the WS request to retrieve the supported types?
I would approach this with an internal event handler listening forCLIENT_CONNECTED
One final request: please dont blindly trust what AI spits out. The I in today's AI is just intelligent marketing :-)
| @@ -218,7 +216,6 @@ async def _handle_ws(self, websocket) -> None: | |||
| self._clients.add(websocket) | |||
| # Init per-websocket pending requests map + send lock | |||
There was a problem hiding this comment.
Cosmetic: "send lock" comment no longer correct
| if websocket not in self._ws_send_locks: | ||
| self._ws_send_locks[websocket] = asyncio.Lock() | ||
|
|
||
| # Allocate req_id safely |
There was a problem hiding this comment.
Cosmetic: "safely" no longer relevant
| async def get_supported_entity_types( | ||
| self, websocket, *, timeout: float = 5.0 | ||
| ) -> list[str]: |
There was a problem hiding this comment.
From where does an integration driver get the websocket handle from? Same for the other new get_ methods below.
Accessing api._clients is not an option! Internal fields might change at any time.
An obvious option could be enhancing the emitted Events with a websocket parameter that a client could use if interested. That would even allow tracking multiple Remote connections in an external integration. Existing integrations would not be affected (as far as I understand the event emitting / Python parameter handling).
But I need to think about that a bit more.
| await self._process_ws_message(websocket, message) | ||
| asyncio.create_task(self._process_ws_message(websocket, message)) | ||
| elif isinstance(message, (bytes, bytearray, memoryview)): | ||
| # Binary message (protobuf in future) | ||
| await self._process_ws_binary_message(websocket, bytes(message)) | ||
| asyncio.create_task( | ||
| self._process_ws_binary_message(websocket, bytes(message)) | ||
| ) |
There was a problem hiding this comment.
This is no hack in an async context, depending what the client is doing. It's well documented in the websocket library, if the client is doing synchronous operations.
I have to dig deeper in the websocket library documentation about the async message callback. It's very well possible that the _handle_ws callback is awaited inside the websocket library, which means it should not be delayed for too long. Then it's clear why it doesn't work sending another WS message and waiting for the response inside the callback. The callback has to give control back, otherwise it blocks the event loop inside the websocket library. Additional await asyncio.sleep(0) won't solve it.
What I'm afraid of is that simply using an asyncio task for every received message might introduce other side effects. This is a major change in runtime behaviour.
| if self._supported_entity_types is None: | ||
| # Request supported entity types from remote | ||
| await self._update_supported_entity_types(websocket) | ||
| if self._supported_entity_types: | ||
| available_entities = [ | ||
| entity | ||
| for entity in available_entities | ||
| if entity.get("entity_type") in self._supported_entity_types | ||
| ] |
There was a problem hiding this comment.
This is the main issue: sending another WS message inside the message callback from the websocket library, and waiting for a response.
It also looks hackish: why request the supported entity types here?
If entity types need to be restricted, it should already be known at this time.
If there's no way around it, then this would be the place to create an asyncio task to not block the websocket event loops. It's a much safer approach than putting every received message into a task.
|
Regarding 1) with an additional Goal: adding a @api.listens_to(ucapi.Events.CONNECT)
async def on_connect(websocket) -> None:
cfg = await api.get_localization_cfg(websocket)
print(f"Got localization config: {cfg}")
await api.set_device_state(ucapi.DeviceStates.CONNECTED)This requires adding the websocket parameter to where the events are emitted. For example: async def _handle_ws_event_msg(
self, websocket: Any, msg: str, msg_data: dict[str, Any] | None
) -> None:
if msg == uc.WsMsgEvents.CONNECT:
self._events.emit(uc.Events.CONNECT, websocket)
elif msg == uc.WsMsgEvents.DISCONNECT:
self._events.emit(uc.Events.DISCONNECT, websocket)
elif msg == uc.WsMsgEvents.ENTER_STANDBY:
self._events.emit(uc.Events.ENTER_STANDBY, websocket)
elif msg == uc.WsMsgEvents.EXIT_STANDBY:
self._events.emit(uc.Events.EXIT_STANDBY, websocket)
# ...Unfortunately this doesn't work with a typical CONNECT event handler in an integration driver: @api.listens_to(ucapi.Events.CONNECT)
async def on_connect() -> None:
await api.set_device_state(ucapi.DeviceStates.CONNECTED)Runtime error: missing parameter Proposed solutionWe could either force a breaking change with a new 0.y minor library version and require all integrations to adapt. Even though this is valid with a 0-version, it's still not very developer friendly. It would also be a good idea to stop using position based arguments when emitting events. By wrapping the event handler we can add backward compatiblity. Something like: @staticmethod
def _wrap_event_listener(listener: Callable) -> Callable:
"""
Wrap an event listener to enforce a kwargs-only event API while keeping
backward compatibility for listeners that declare fewer (or zero) params.
Contract:
- The library MUST emit events using keyword arguments only.
- Positional arguments are NOT supported for event delivery.
- Unknown kwargs are dropped unless listener accepts **kwargs.
"""
try:
sig = inspect.signature(listener)
except (TypeError, ValueError):
# If we can't introspect, just call it and hope it supports **kwargs.
return listener
params = list(sig.parameters.values())
accepts_varkw = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params)
accepted_kw = {
p.name
for p in params
if p.kind in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY)
}
@wraps(listener)
def wrapper(*args: Any, **kwargs: Any):
# Enforce kwargs-only: never forward positional args to user code.
# This prevents accidental signature breakage when adding new event params.
if args:
raise TypeError(
"Event listeners are called with keyword arguments only. "
"Library bug: positional args were provided."
)
if accepts_varkw:
return listener(**kwargs)
filtered = {k: v for k, v in kwargs.items() if k in accepted_kw}
return listener(**filtered)
return wrapper
def add_listener(self, event: uc.Events, f: Callable) -> None:
"""
Register a callback handler for the given event.
:param event: the event
:param f: callback handler
"""
self._events.add_listener(event, self._wrap_event_listener(f))
def listens_to(self, event: uc.Events) -> Callable[[Callable], Callable]:
"""
Register the given event.
:return: a decorator which will register the decorated function to the specified
event.
"""
def on(f: Callable) -> Callable:
self._events.add_listener(event, self._wrap_event_listener(f))
return f
return onThis wrapper allows both client event handler signatures: @api.listens_to(ucapi.Events.CONNECT)
async def on_connect_old() -> None:
await api.set_device_state(ucapi.DeviceStates.CONNECTED)
@api.listens_to(ucapi.Events.CONNECT)
async def on_connect_new(websocket) -> None:
await api.set_device_state(ucapi.DeviceStates.CONNECTED)By also documenting the Events enum, this should provide a usuable solution with minimal breakage. class Events(str, Enum):
"""Internal library events."""
CLIENT_CONNECTED = "client_connected"
"""WebSocket client connected.
Parameters:
- websocket: WebSocket client connection
"""
CLIENT_DISCONNECTED = "client_disconnected"
"""WebSocket client disconnected.
Parameters:
- websocket: WebSocket client connection
"""
ENTITY_ATTRIBUTES_UPDATED = "entity_attributes_updated"
"""Entity attributes updated.
Parameters:
- entity_id: entity identifier
- entity_type: entity type
- attributes: updated attributes
"""
SUBSCRIBE_ENTITIES = "subscribe_entities"
"""Integration API `subscribe_events` message.
Parameters:
- entity_ids: list of entity IDs to subscribe to
- websocket: WebSocket client connection
"""
UNSUBSCRIBE_ENTITIES = "unsubscribe_entities"
"""Integration API `unsubscribe_events` message.
Parameters:
- entity_ids: list of entity IDs to unsubscribe
- websocket: WebSocket client connection
"""
CONNECT = "connect"
"""Integration-API `connect` event message.
Parameters:
- websocket: WebSocket client connection
"""
DISCONNECT = "disconnect"
"""Integration-API `disconnect` event message.
Parameters:
- websocket: WebSocket client connection
"""
ENTER_STANDBY = "enter_standby"
"""Integration-API `enter_standby` event message.
Parameters:
- websocket: WebSocket client connection
"""
EXIT_STANDBY = "exit_standby"
"""Integration-API `exit_standby` event message.
Parameters:
- websocket: WebSocket client connection
"""@albaintor unless you see a better solution I can implement this in a separate PR, which you then can merge from main. |
|
Hi Markus, sounds good |
# ...
elif msg == uc.WsMessages.GET_AVAILABLE_ENTITIES:
asyncio.create_task(self._get_available_entities(websocket, req_id))
# ... async def _get_available_entities(self, websocket, req_id) -> None:
if self._supported_entity_types is None:
# Request supported entity types from remote
await self._update_supported_entity_types(websocket)
available_entities = self._available_entities.get_all()
if self._supported_entity_types:
available_entities = [
entity
for entity in available_entities
if entity.get("entity_type") in self._supported_entity_types
]
await self._send_ws_response(
websocket,
req_id,
uc.WsMsgEvents.AVAILABLE_ENTITIES,
{"available_entities": available_entities},
) |
That's an idea, but it should be for |
|
yes |
This is the PR I have modified from #46