[           ]
宙飒天下		1. XMLRPC接口服务
XMLRPC是python标准库xmlrpc提供的RPC框架,一般我们会根据不同的需求配合使用标准库中的socketserver.ThreadingMixIn或者socketserver.ForkingMixIn来使用多线程或者多进程(注意只能在支持fork的平台使用)处理请求.注意XMLRPC并不安全,需要防止被人利用XML漏洞
下面是一个简单的例子,我们把它放在C0中.这个例子我们计算输入的字符串的md5哈希值
1.1. 最简单的服务端
logger.py用于定义结构化log. xmlrpc默认的会使用方法SimpleXMLRPCRequestHandler.log_error(format, *args)和SimpleXMLRPCRequestHandler.log_request(code='-', size="-")打印请求信息和错误信息到stderr,默认的打印方式是plain text格式的,这种非结构化的数据往往难以收集分析,我们使用structlog将其结构化.
import sys import logging import structlog LOG_LEVEL = logging.INFO SERVER_NAME = "xmlrpc-server" structlog.configure(     processors=[         structlog.stdlib.filter_by_level,  # 判断是否接受某个level的log消息         structlog.stdlib.add_logger_name,  # 增加字段logger         structlog.stdlib.add_log_level,  # 增加字段level         structlog.stdlib.PositionalArgumentsFormatter(),         structlog.processors.TimeStamper(fmt="iso"),  # 增加字段timestamp且使用iso格式输出         structlog.processors.StackInfoRenderer(),         structlog.processors.format_exc_info,  # 捕获异常的栈信息         structlog.processors.StackInfoRenderer(),  # 详细栈信息         structlog.processors.JSONRenderer()  # json格式输出,第一个参数会被放入event字段     ],     context_class=dict,     logger_factory=structlog.stdlib.LoggerFactory(),     wrapper_class=structlog.stdlib.BoundLogger,     cache_logger_on_first_use=True, )  handler = logging.StreamHandler(sys.stdout) root_logger = logging.getLogger() root_logger.addHandler(handler) root_logger.setLevel(LOG_LEVEL)  # 设置最低log等级 log = structlog.get_logger(SERVER_NAME) server.py用于实现算法逻辑.
import time from hashlib import md5 from http import HTTPStatus from xmlrpc.server import SimpleXMLRPCServer from xmlrpc.server import SimpleXMLRPCRequestHandler from socketserver import ThreadingMixIn from logger import log   HOST = "localhost" PORT = 5000  class RequestHandler(SimpleXMLRPCRequestHandler):     rpc_paths = ('/XMLRPC',)     def log_error(self, format, *args):         """Log an error.          This is called when a request cannot be fulfilled.  By         default it passes the message on to log_message().          Arguments are the same as for log_message().          XXX This should go to the separate error log.          """         log.error("server error",address=self.address_string(),errormsg=format%args)      def log_request(self, code='-', size="-"):         """Log an accepted request.          This is called by send_response().          """         if isinstance(code, HTTPStatus):             code = code.value         log.info("request info",address=self.address_string(),requestline=self.requestline, code=str(code), size=str(size))  class ThreadingXMLRPCServer(ThreadingMixIn, SimpleXMLRPCServer):     pass   def md5_func(text: str)->str:     """md5哈希."""     start = time.time()     result = md5(text.encode("utf-8")).hexdigest()     end = time.time()     log.info("time it",seconds=end-start)     return result   def main():     with ThreadingXMLRPCServer((HOST, PORT), requestHandler=RequestHandler, allow_none=True) as server:         # 注册所有可调用函数的名字到system.listMethods方法         # 注册可调用函数的docstring到system.methodHelp(func_name)方法         # 注册可调用函数的签名到system.methodSignature(func_name)方法         server.register_introspection_functions()          # 这个函数的作用是可以使客户端同时调用服务端的的多个函数。         server.register_multicall_functions()          # 注册一个函数,使它可以被调用,后面的字符串就是被调用的函数名         server.register_function(md5_func, md5_func.__name__)         try:             log.info("server start", msg=f"xmlrpc start @ {HOST}:{PORT}!")             server.serve_forever()         except Exception:             log.info("server error", exc_info=True, stack_info=True)             raise         finally:             log.info("server stoped", msg=f"xmlrpc @ {HOST}:{PORT} stoped!")   if __name__ == "__main__":     main() xmlrpc只要将函数注册到server对象,然后执行server_forever()方法即可.
1.2. 最简单的客户端
import xmlrpc.client  HOST = "localhost" PORT = 5000  with xmlrpc.client.ServerProxy(f"http://{HOST}:{PORT}/XMLRPC") as proxy:     result = proxy.md5_func("这是一个测试")  print(result) 这个例子中我们使用多线程以提高rpc服务的并发性能,但一般来说rpc是计算密集型任务,并发上去并不解决问题,更关键的是将每个核利用起来,我们当然也可以多起实例通多代理做负载均衡,但更简单的方案是利用多核将重计算的部分放在多进行中执行.
1.3. 利用多核
在多进程部分我们讲过如何使用concurrent.futures进行高层抽象的多进程操作,这边我们还是使用这种方式,代码C1展示了如何改造上面的例子
- server.py
... from concurrent.futures import ProcessPoolExecutor,wait  ...  WORKER = 4  ...  # 注意此处修改函数名字 def _md5_func(text: str)->str:     """md5哈希."""     start = time.time()     result = md5(text.encode("utf-8")).hexdigest()     end = time.time()     log.info("time it",seconds=end-start)     return result   def main():     with ProcessPoolExecutor(WORKER) as executor:         # 将要执行的业务包装,实际执行交给执行器         def md5_func(text:str)->str:             fut = executor.submit(_md5_func, text)             wait([fut])             return fut.result()          with ThreadingXMLRPCServer((HOST, PORT), requestHandler=RequestHandler, allow_none=True) as server:             # 注册所有可调用函数的名字到system.listMethods方法             # 注册可调用函数的docstring到system.methodHelp(func_name)方法             # 注册可调用函数的签名到system.methodSignature(func_name)方法             server.register_introspection_functions()              # 这个函数的作用是可以使客户端同时调用服务端的的多个函数。             server.register_multicall_functions()              # 注册一个函数,使它可以被调用,后面的字符串就是被调用的函数名             server.register_function(md5_func, md5_func.__name__)             try:                 log.info("server start", msg=f"xmlrpc start @ {HOST}:{PORT}!")                 server.serve_forever()             except Exception:                 log.info("server error", exc_info=True, stack_info=True)                 raise             finally:                 log.info("server stoped", msg=f"xmlrpc @ {HOST}:{PORT} stoped!")   if __name__ == "__main__":     main() 


 
		 
		 
		

还没有评论,来说两句吧...