使用
基本的使用
在 examples/basic/main.py
中创建一个 FastAPI 应用,并定义一些 JSON-RPC 方法:
from fastapi import FastAPI
from pydantic import BaseModel, Field
from starlette.requests import Request
from krpc import Entrypoint, RpcException, RpcErrorCode
class OperationParams(BaseModel):
a: int = Field(..., json_schema_extra={"example": 3}, description='A 变量')
b: int = Field(..., json_schema_extra={"example": 3}, description='B 变量')
app = FastAPI(title="Krpc API")
# 创建一个 JSON-RPC 入口点
api_v1 = Entrypoint('/api/v1/jsonrpc')
# 处理全局异常
@app.exception_handler(RpcException)
async def unicorn_exception_handler(request: Request, exc: RpcException):
message = api_v1.get_message(request)
return message.response_handle(error=exc.to_dict)
# 定义一个 JSON-RPC 方法 add
@api_v1.method
async def add(params: OperationParams, speak: str) -> int:
print(speak)
if params.a is None or params.b is None:
raise RpcException.parse(RpcErrorCode.INVALID_PARAMS)
return params.a + params.b
# 定义一个 JSON-RPC 方法 subtract
@api_v1.method
async def subtract(params: OperationParams) -> int:
if params.a is None or params.b is None:
raise RpcException.parse(RpcErrorCode.INVALID_PARAMS)
return params.a - params.b
# 将 JSON-RPC 入口点注册到 FastAPI 应用中
app.include_router(api_v1)
# 运行应用
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
运行示例
确保已经安装了依赖,可以运行以下命令来启动服务:
python examples/basic/main.py
现在,你可以发送 JSON-RPC 请求到 http://localhost:8000/api/v1/jsonrpc
来调用定义的方法。例如:
客户端
在 examples/basic/client.py
中编写的代码:
import asyncio
from pydantic import BaseModel
from krpc import RpcClient
class OperationParams(BaseModel):
a: int
b: int
class AddParams(BaseModel):
params: OperationParams
speak: str
model_config = {
'method_name': 'add'
}
async def main():
rpc_client = RpcClient(url="http://127.0.0.1:8000/api/v1/jsonrpc")
params = AddParams(params=OperationParams(a=1, b=2), speak="hello")
response = rpc_client.call_model(params)
print(f"同步调用结果: {response}")
response = await rpc_client.call_model_async(params)
print(f"异步调用结果: {response}")
response = rpc_client.call('add', {'params': {"a": 1, "b": 2}})
print(f"同步少参调用结果: {response}")
asyncio.run(main())
运行:
python examples/basic/client.py
如果不出意外你会看到类似如下输出:
同步调用结果: {'id': '8ad9a6cc-d332-4c3f-b6f8-ba734b773a34', 'result': 3, 'error': None}
异步调用结果: {'id': 'da25ef99-5fc4-4e01-8393-b3afad5a0eaa', 'result': 3, 'error': None}
同步少参调用结果: {'id': '16a913d6-8930-4178-ae1c-ce9ee757883e', 'result': None, 'error': {'code': -32602, 'message': 'Invalid params', 'data': 'Missing required parameter: speak'}}
自定义消息解码器
在 examples/cust_messages/cust_messages.py
例子中继承于 Message 编写自定义 GzipJsonMessage 消息解码器:
import io
import json
from typing import Any, Dict
from krpc import Message, message_management
import gzip
# 自定义加解码器
class GzipJsonMessage(Message):
rpc_media_type = "gzip-json"
def decode(self, data: bytes) -> Dict[str, Any] | None:
"""
使用gzip解压缩数据,并尝试将其反序列化为字典
:param data: 压缩的字节数据
:return: 反序列化后的字典
"""
# 使用gzip解压缩
buffer = io.BytesIO(data)
with gzip.GzipFile(fileobj=buffer, mode='rb') as un_gzipped_data:
data_json_bytes = un_gzipped_data.read()
# 将解压缩后的字节串反序列化为字典
data_dict = json.loads(data_json_bytes.decode('utf-8'))
return data_dict
def encode(self, data: Dict[str, Any]) -> bytes | None:
# 将字典转换为JSON格式的字节串
data_json_bytes = json.dumps(data).encode('utf-8')
buffer = io.BytesIO()
with gzip.GzipFile(fileobj=buffer, mode='w') as gzipped_data:
gzipped_data.write(data_json_bytes)
return buffer.getvalue()
# 添加 GzipJsonMessage 实例到 message_management 字典中
message_management['gzip-json'] = GzipJsonMessage()
在 examples/cust_messages/main.py
例子中创建一个 FastAPI 应用,并定义一些 GzipJSON-RPC 方法:
from fastapi import FastAPI
from pydantic import BaseModel, Field
from starlette.requests import Request
from krpc import Entrypoint, RpcException, RpcErrorCode
from cust_messages import message_management
class OperationParams(BaseModel):
a: int = Field(..., json_schema_extra={"example": 3}, description='A 变量')
b: int = Field(..., json_schema_extra={"example": 3}, description='B 变量')
app = FastAPI(title="Krpc API")
# 创建一个 GzipJSON-RPC 入口点
api_v1 = Entrypoint(
'/api/v1/gzip/jsonrpc',
cust_messages=message_management # <== 添加我们自定义消息加解码器管理
)
# 处理全局异常
@app.exception_handler(RpcException)
async def unicorn_exception_handler(request: Request, exc: RpcException):
message = api_v1.get_message(request)
return message.response_handle(error=exc.to_dict)
# 定义一个 GzipJSON-RPC 方法 add
@api_v1.method
async def add(params: OperationParams, speak: str) -> int:
print(speak)
if params.a is None or params.b is None:
raise RpcException.parse(RpcErrorCode.INVALID_PARAMS)
return params.a + params.b
# 定义一个 GzipJSON-RPC 方法 subtract
@api_v1.method
async def subtract(params: OperationParams) -> int:
if params.a is None or params.b is None:
raise RpcException.parse(RpcErrorCode.INVALID_PARAMS)
return params.a - params.b
# 将 GzipJSON-RPC 入口点注册到 FastAPI 应用中
app.include_router(api_v1)
# 运行应用
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
运行示例
确保已经安装了依赖,可以运行以下命令来启动服务:
python examples/cust_messages/main.py
现在,你可以发送 GzipJSON-RPC 请求到 http://localhost:8000/api/v1/gzip/jsonrpc
来调用定义的方法。例如:
客户端
在 examples/cust_messages/client.py
中编写的代码:
import asyncio
from pydantic import BaseModel
from krpc import RpcClient
class OperationParams(BaseModel):
a: int
b: int
class AddParams(BaseModel):
params: OperationParams
speak: str
model_config = {
'method_name': 'add'
}
async def main():
rpc_client = RpcClient(
url="http://127.0.0.1:8000/api/v1/gzip/jsonrpc",
rpc_media_type='gzip-json', # <== 指定我们自定义的加解码器
)
params = AddParams(params=OperationParams(a=1, b=2), speak="hello")
response = rpc_client.call_model(params)
print(f"同步调用结果: {response}")
response = await rpc_client.call_model_async(params)
print(f"异步调用结果: {response}")
response = rpc_client.call('add', {'params': {"a": 1, "b": 2}})
print(f"同步少参调用结果: {response}")
asyncio.run(main())
运行:
python examples/cust_messages/client.py
如果不出意外你会看到类似如下输出:
同步调用结果: {'id': '8ad9a6cc-d332-4c3f-b6f8-ba734b773a34', 'result': 3, 'error': None}
异步调用结果: {'id': 'da25ef99-5fc4-4e01-8393-b3afad5a0eaa', 'result': 3, 'error': None}
同步少参调用结果: {'id': '16a913d6-8930-4178-ae1c-ce9ee757883e', 'result': None, 'error': {'code': -32602, 'message': 'Invalid params', 'data': 'Missing required parameter: speak'}}
消息编码器支持
krpc
内置支持以下几种消息编码器允许编码器自定义,以满足多样化的数据传输和存储需求:
编码器名称 |
媒体类型 |
特点 |
适用场景 |
---|---|---|---|
JSON |
|
简洁、可读性强、广泛支持 |
数据交换、配置文件、日志记录 |
MessagePack |
|
高效、体积小、速度快 |
性能关键型应用、移动应用、游戏 |