async def simple_asgi_app(scope, receive, send):
"""
scope: 包含连接信息的字典
receive: 接收消息的异步可调用对象
send: 发送消息的异步可调用对象
"""
if scope['type'] == 'http':
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': b'Hello, ASGI World!',
})
# 基于类的 ASGI 应用
class ASGIApp:
async def __call__(self, scope, receive, send):
if scope['type'] == 'http':
await self.handle_http(scope, receive, send)
elif scope['type'] == 'websocket':
await self.handle_websocket(scope, receive, send)
async def handle_http(self, scope, receive, send):
path = scope.get('path', '/')
method = scope.get('method', 'GET')
response_body = f"""
ASGI 应用
""".encode('utf-8')
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/html']],
})
await send({
'type': 'http.response.body',
'body': response_body,
})
async def handle_websocket(self, scope, receive, send):
await send({'type': 'websocket.accept'})
while True:
message = await receive()
if message['type'] == 'websocket.receive':
await send({
'type': 'websocket.send',
'text': f"Echo: {message.get('text', '')}"
})
elif message['type'] == 'websocket.disconnect':
break
app = ASGIApp()