Skip to content

JSON-RPC

JSON-RPC client and data models.

JSONValueType = Union[str, int, float, bool, None] module-attribute

JSONRPCPayload

Bases: typing_extensions.TypedDict

A rpc call is represented by sending a Request object to a Server.

See: https://www.jsonrpc.org/specification#request_object

jsonrpc: str instance-attribute

id: int instance-attribute

method: str instance-attribute

params: list[JSONValueType] instance-attribute

JSONRPCResponse

Bases: typing_extensions.TypedDict

When a rpc call is made, the Server MUST reply with a Response.

See: https://www.jsonrpc.org/specification#response_object

jsonrpc: str instance-attribute

id: int instance-attribute

result: str instance-attribute

error: NotRequired[JSONRPCError] instance-attribute

JSONRPCError

Bases: typing_extensions.TypedDict

When a rpc call encounters an error.

See: https://www.jsonrpc.org/specification#error_object

code: int instance-attribute

message: str instance-attribute

data: NotRequired[Any] instance-attribute

JSONRPCResponseError(error: JSONRPCError)

Bases: Exception

Exception used when a rpc call encounters an error.

Source code in pytvpaint/george/client/rpc.py
60
61
def __init__(self, error: JSONRPCError) -> None:
    super().__init__(f"JSON-RPC Server error ({error['code']}): {error['message']}")

JSONRPCClient(url: str, timeout: int | None = 60, version: str = '2.0')

Simple JSON-RPC 2.0 client over websockets with thread-safe automatic reconnection.

See: https://www.jsonrpc.org/specification#notification

Initialize a new JSON-RPC client with a WebSocket url endpoint.

Parameters:

Name Type Description Default
url str

the WebSocket url endpoint

required
timeout int | None

the continuous reconnection timeout threshold

60
version str

The JSON-RPC version. Defaults to "2.0".

'2.0'
Source code in pytvpaint/george/client/rpc.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def __init__(self, url: str, timeout: int | None = 60, version: str = "2.0") -> None:
    """Initialize a new JSON-RPC client with a WebSocket url endpoint.

    Args:
        url: the WebSocket url endpoint
        timeout: the continuous reconnection timeout threshold
        version: The JSON-RPC version. Defaults to "2.0".
    """
    self.ws_handle = WebSocket()
    self.url = url
    self.rpc_id = 0
    self.timeout = timeout
    self.jsonrpc_version = version

    self.stop_ping = threading.Event()
    self.run_forever = False
    self.ping_thread: threading.Thread | None = None

ws_handle = WebSocket() instance-attribute

url = url instance-attribute

rpc_id = 0 instance-attribute

timeout = timeout instance-attribute

jsonrpc_version = version instance-attribute

stop_ping = threading.Event() instance-attribute

run_forever = False instance-attribute

ping_thread: threading.Thread | None = None instance-attribute

is_connected: bool property

Returns True if the client is connected and the socket is physically active.

connect() -> None

Connects to the WebSocket endpoint and initializes the monitoring thread.

Source code in pytvpaint/george/client/rpc.py
136
137
138
139
140
141
142
143
144
145
146
def connect(self) -> None:
    """Connects to the WebSocket endpoint and initializes the monitoring thread."""
    if self.timeout == 0:
        raise ValueError("Timeout cannot be 0. Use None for infinite blocking or > 0 for a timed block.")
    self.ws_handle.connect(self.url, timeout=self.timeout)

    if not self.ping_thread or not self.ping_thread.is_alive():
        self.stop_ping.clear()
        self.run_forever = True
        self.ping_thread = threading.Thread(target=self._auto_reconnect, daemon=True)
        self.ping_thread.start()

disconnect() -> None

Disconnects from the server and halts the monitoring thread.

Source code in pytvpaint/george/client/rpc.py
148
149
150
151
152
153
154
155
156
def disconnect(self) -> None:
    """Disconnects from the server and halts the monitoring thread."""
    self.run_forever = False
    self.stop_ping.set()

    if self.ping_thread and self.ping_thread.is_alive():
        self.ping_thread.join(timeout=2)

    self.ws_handle.close()

increment_rpc_id() -> None

Increments the internal RPC id until it reaches sys.maxsize.

Source code in pytvpaint/george/client/rpc.py
158
159
160
def increment_rpc_id(self) -> None:
    """Increments the internal RPC id until it reaches `sys.maxsize`."""
    self.rpc_id = (self.rpc_id + 1) % sys.maxsize

execute_remote(method: str, params: list[JSONValueType] | None = None) -> JSONRPCResponse

Executes a remote procedure call synchronously.

Parameters:

Name Type Description Default
method str

the name of the method to be invoked

required
params list[pytvpaint.george.client.rpc.JSONValueType] | None

the parameter values to be used during the invocation of the method. Defaults to None.

None

Raises:

Type Description
ConnectionError

if the client is not connected

pytvpaint.george.client.rpc.JSONRPCResponseError

if there was an error server-side

Returns:

Name Type Description
JSONRPCResponse pytvpaint.george.client.rpc.JSONRPCResponse

the JSON-RPC response payload

Source code in pytvpaint/george/client/rpc.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def execute_remote(
    self,
    method: str,
    params: list[JSONValueType] | None = None,
) -> JSONRPCResponse:
    """Executes a remote procedure call synchronously.

    Args:
        method: the name of the method to be invoked
        params: the parameter values to be used during the invocation of the method. Defaults to None.

    Raises:
        ConnectionError: if the client is not connected
        JSONRPCResponseError: if there was an error server-side

    Returns:
        JSONRPCResponse: the JSON-RPC response payload
    """
    if not self.is_connected:
        raise ConnectionError(f"Can't send rpc message because the client is not connected to {self.url}")

    payload: JSONRPCPayload = {
        "jsonrpc": self.jsonrpc_version,
        "id": self.rpc_id,
        "method": method,
        "params": params or [],
    }

    self.ws_handle.send(json.dumps(payload))
    self.increment_rpc_id()

    result = self.ws_handle.recv()
    response = cast(JSONRPCResponse, json.loads(result))

    if "error" in response:
        raise JSONRPCResponseError(response["error"])

    return response