Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,5 @@ jobs:
run: uv run pytest
env:
WEBHOOK_SERVER_URL: ${{ steps.tunnel.outputs.url }}
FISHJAM_ID: ${{ secrets.CI_FISHJAM_ID }}
FISHJAM_ID: ${{ vars.CI_FISHJAM_ID }}
FISHJAM_MANAGEMENT_TOKEN: ${{ secrets.CI_FISHJAM_MANAGEMENT_TOKEN }}
6 changes: 2 additions & 4 deletions examples/room_manager/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from flask import Flask, abort, jsonify, request
from room_service import RoomService

from fishjam import receive_binary
from fishjam import decode_server_notifications
from fishjam.room import RoomType


Expand Down Expand Up @@ -36,9 +36,7 @@ def get_room_query():

@app.post("/api/rooms/webhook")
def webhook():
notification = receive_binary(request.data)

if notification:
for notification in decode_server_notifications(request.data):
room_service.handle_notification(notification)

return "Webhook Notification Received", 200
3 changes: 2 additions & 1 deletion fishjam/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from fishjam._openapi_client.models import PeerMetadata

# API
from fishjam._webhook_notifier import receive_binary
from fishjam._webhook_notifier import decode_server_notifications, receive_binary
from fishjam._ws_notifier import FishjamNotifier
from fishjam.api._fishjam_client import (
AgentOptions,
Expand All @@ -31,6 +31,7 @@
__all__ = [
"FishjamClient",
"FishjamNotifier",
"decode_server_notifications",
"receive_binary",
"PeerMetadata",
"PeerOptions",
Expand Down
91 changes: 83 additions & 8 deletions fishjam/_webhook_notifier.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,107 @@
"""Module for decoding received webhook notifications from Fishjam."""

from typing import Union
import warnings
from typing import List, Union

import betterproto

from fishjam.events._protos.fishjam import ServerMessage
from fishjam.events._protos.fishjam import (
ServerMessage,
ServerMessageNotificationBatch,
)
from fishjam.events.allowed_notifications import (
ALLOWED_NOTIFICATIONS,
AllowedNotification,
)


def receive_binary(binary: bytes) -> Union[AllowedNotification, None]:
def _content_of(message: ServerMessage) -> Union[AllowedNotification, None]:
"""Return the message's `content` oneof if it is a supported notification."""
_which, content = betterproto.which_one_of(message, "content")
if isinstance(content, ALLOWED_NOTIFICATIONS):
return content
return None


def _unpack_batch(
batch: ServerMessageNotificationBatch,
) -> List[AllowedNotification]:
"""Flatten a notification batch into its supported notifications, in order.

Members whose content is not a supported notification are skipped.

Returns:
list[AllowedNotification]: The supported notifications from the batch.
"""
notifications = []
for message in batch.notifications:
notification = _content_of(message)
if notification is not None:
notifications.append(notification)
return notifications


def decode_server_notifications(binary: bytes) -> List[AllowedNotification]:
"""Decode a received protobuf payload into a list of notifications.

Handles both single notifications and batches transparently: a single
notification is returned as a one-element list, a batch is unpacked into
its members (in order), and anything unsupported yields an empty list.

The available notifications are listed in the `fishjam.events` module.

Args:
binary: The raw binary data received from the webhook.

Returns:
list[AllowedNotification]: The decoded notifications, in order. Empty
when the payload carries no supported notification.
"""
message = ServerMessage().parse(binary)
_which, content = betterproto.which_one_of(message, "content")

if isinstance(content, ServerMessageNotificationBatch):
return _unpack_batch(content)

if isinstance(content, ALLOWED_NOTIFICATIONS):
return [content]

return []


def receive_binary(
binary: bytes,
) -> Union[AllowedNotification, List[AllowedNotification], None]:
"""Transforms a received protobuf notification into a notification instance.
Comment thread
roznawsk marked this conversation as resolved.

.. deprecated::
Use `decode_server_notifications` instead, which always returns a list
and handles batched payloads with a single, consistent return type.

The available notifications are listed in `fishjam.events` module.

Args:
binary: The raw binary data received from the webhook.

Returns:
AllowedNotification | None: The parsed notification object, or None if
the message type is not supported.
AllowedNotification: A single notification when the payload carries one.
list[AllowedNotification]: The unpacked notifications, in order, when the
payload is a batch (webhook batching enabled).
None: When the payload is not a supported notification.
"""
warnings.warn(
"receive_binary is deprecated; use decode_server_notifications instead.",
DeprecationWarning,
stacklevel=2,
)

message = ServerMessage().parse(binary)
_which, message = betterproto.which_one_of(message, "content")
_which, content = betterproto.which_one_of(message, "content")

if isinstance(content, ServerMessageNotificationBatch):
return _unpack_batch(content)

if isinstance(message, ALLOWED_NOTIFICATIONS):
return message
if isinstance(content, ALLOWED_NOTIFICATIONS):
return content

return None
6 changes: 6 additions & 0 deletions fishjam/api/_fishjam_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ class RoomOptions:
room_type: The use-case of the room. If not provided, this defaults
to conference.
public: True if livestream viewers can omit specifying a token.
batch_webhook_notifications: If true, webhook notifications for this room
are coalesced into a single NotificationBatch per HTTP send instead
of one request per notification.
Comment thread
roznawsk marked this conversation as resolved.
"""

max_peers: int | None = None
Expand All @@ -107,6 +110,8 @@ class RoomOptions:
"""The use-case of the room. If not provided, this defaults to conference."""
public: bool = False
"""True if livestream viewers can omit specifying a token."""
batch_webhook_notifications: bool = False
"""Coalesce webhook notifications into a single NotificationBatch per send."""
Comment thread
roznawsk marked this conversation as resolved.


@dataclass
Expand Down Expand Up @@ -309,6 +314,7 @@ def create_room(self, options: RoomOptions | None = None) -> Room:
webhook_url=options.webhook_url,
room_type=RoomType(options.room_type),
public=options.public,
batch_webhook_notifications=options.batch_webhook_notifications,
)

room = cast(
Expand Down
7 changes: 3 additions & 4 deletions tests/support/webhook_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from flask import Flask, Response, request

from fishjam import receive_binary
from fishjam import decode_server_notifications

app = Flask(__name__)
QUEUES = None
Expand All @@ -16,10 +16,9 @@ def respond_default():
@app.route("/webhook", methods=["POST"])
def respond_root():
data = request.get_data()
msg = receive_binary(data)
if msg is not None:
for notification in decode_server_notifications(data):
for q in QUEUES.values():
q.put(msg)
q.put(notification)

return Response(status=200)

Expand Down
3 changes: 2 additions & 1 deletion tests/test_allowed_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
"ServerMessageTrackForwarding",
"ServerMessageTrackForwardingRemoved",
"ServerMessageVadNotification",
# Webhook-only transport wrapper; the WebSocket notifier never receives it.
# Transport wrapper, not a user-facing event: `decode_server_notifications`
# unpacks it into the individual notifications it carries.
"ServerMessageNotificationBatch",
# Deprecated in the proto.
"ServerMessageStreamConnected",
Expand Down
30 changes: 30 additions & 0 deletions tests/test_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,36 @@ async def test_peer_connected_room_deleted(

self.assert_webhook_events(event_checks, event_queue, room.id)

@pytest.mark.asyncio
async def test_batched_webhook_notifications(
self, room_api: FishjamClient, event_queue
):
Comment thread
roznawsk marked this conversation as resolved.
event_checks = [
ServerMessageRoomCreated,
ServerMessagePeerAdded,
ServerMessagePeerConnected,
ServerMessagePeerDisconnected,
ServerMessagePeerDeleted,
ServerMessageRoomDeleted,
]

options = RoomOptions(webhook_url=WEBHOOK_URL, batch_webhook_notifications=True)
room = room_api.create_room(options=options)
peer, token = room_api.create_peer(room.id)
Comment thread
roznawsk marked this conversation as resolved.

peer_socket = PeerSocket(fishjam_url=FISHJAM_ID)
peer_socket_task = asyncio.ensure_future(peer_socket.connect(token))
try:
await peer_socket.wait_ready()

room_api.delete_peer(room.id, peer.id)
room_api.delete_room(room.id)
finally:
peer_socket_task.cancel()
await asyncio.gather(peer_socket_task, return_exceptions=True)

self.assert_webhook_events(event_checks, event_queue, room.id)

def assert_webhook_events(self, event_checks, event_queue, room_id, timeout=60):
deadline = time.monotonic() + timeout
received = []
Expand Down
Loading
Loading