[           ]
宙飒天下网		1. WebSocket接口服务
当我们希望向客户端推送数据时,websocket接口服务是我们最常用也最合适的应用层协议.当然不是说websocket不能用于请求响应模式的接口服务,只是推送数据更适合它.
python下最合适写Websocket接口服务的是包websockets,这是一个基于asyncio的websocket包.
一个典型的例子--我们每秒向客户端推送当前的时间:
- server.py:
import asyncio import datetime import json import websockets from websockets.exceptions import (     ConnectionClosedError,     ConnectionClosedOK,     ConnectionClosed )  from logger import log  ROOMS = {     "/nowtime": set() } HOST = 'localhost' PORT = 5000   async def notify_user(websocket, path, message):     """广播用户加入了房间"""     if ROOMS.get(path):         message = json.dumps({             "status": 200,             "event": "notification",             "data": message         })         await websocket.send(message)   async def join_room(websocket, path) -> bool:     room = ROOMS.get(path)     if isinstance(room, set):         room.add(websocket)         await notify_user(websocket, path, "Welcom")         return True     else:         await notify_user(websocket, path, "Unknown url")         return False   async def leave_room(websocket, path) -> bool:     room = ROOMS.get(path)     if isinstance(room, set):         if websocket in room:             room.remove(websocket)             log.info(f"Leave Room {path}")             return True         else:             log.info(f"user not in Room {path}")             return False     else:         log.info("Unknown url")         return False   async def send_time(websocket):     while True:         now = datetime.datetime.utcnow().isoformat() + "Z"         data = {             "data": now,             "event": "nowtime",             "status": 200         }         await websocket.send(json.dumps(data))         await asyncio.sleep(3)   async def listen_close(websocket, task):     async for message in websocket:         if json.loads(message).get("event") == "close":             log.info("get event close from client")             break     try:         task.cancel()     except Exception as e:         log.info(f"set task a exception error {type(e)} {str(e)}")     else:         log.info("set task a exception")   async def nowtime(websocket, path):     loop = asyncio.get_event_loop()     log.info(path)     is_in_room = await join_room(websocket, path)     if is_in_room:         task = loop.create_task(send_time(websocket))         task.add_done_callback(lambda x: log.info("done"))         listener = loop.create_task(listen_close(websocket, task))         try:             await task         except asyncio.CancelledError as ce:             log.info("client closed connection itself")             await websocket.send(json.dumps({"message":"bye"}))         except ConnectionClosedError as close_error:             log.info("client force closed connection ")         except ConnectionClosedOK as close_ok:             log.info("client closed connection")         except ConnectionClosed as close:             log.info("client closed connection finally")         except Exception as e:             raise e         finally:             listener.cancel()             await leave_room(websocket, path)   asyncio.get_event_loop().run_until_complete(     websockets.serve(nowtime, HOST, PORT))  log.info(f"server start @ {HOST}:{PORT}") asyncio.get_event_loop().run_forever() 我们通过websockets.serve(handdler,host,port)构造项目,handdler是一个有参数websocket和path的函数,其中path是访问服务时指定的uri,websocket则是客户端和服务端的连接对象.
websocket无非是两个操作:
- 监听消息: - async for message in websocket:
- 发送消息: - websocket.send(str)
这个例子中我们使用url来构造room的概念,每个连接可以加入或退出一个room.每个room实际是一个set对象,当建立连接时我们根据url判断加入的room,当客户端退出时我们将这个对象从room中删除.有了这个room来存放连接集合,我们就可以很方便的做广播--遍历room中的连接对象每个都发送相同的消息即可.
1.1. websocket客户端
我们实现收到10条信息后发送一个close事件让服务端关闭这个连接.
1.1.1. 异步客户端
websockets包同样提供了异步的客户端,:
- client_async.py:
import asyncio import json import websockets  async def hello():     uri = "ws://localhost:5000/nowtime"     async with websockets.connect(uri) as websocket:         count = 0         async for message in websocket:             if websocket.closed:                 break             if count == 10:                 await websocket.send(json.dumps({"event":"close"}))                 print("send close")             else:                 count += 1             print(message)   asyncio.get_event_loop().run_until_complete(hello()) websockets包客户端的websocket对象和服务端一致.
1.1.2. 同步客户端
websocket-client提供了同步客户端.
- client_sync.py
import json import websocket  ws = websocket.create_connection("ws://localhost:5000/nowtime") try:     count = 0     for message in ws:         if not ws.connected:             break         if count == 10:             ws.send(json.dumps({"event":"close"}))             print("send close")         else:             count += 1         print(message) except Exception:     ws.close()   


 
		 
		 
		

还没有评论,来说两句吧...