Contents
Nameko 内置扩展 (Built-in Extensions)
Nameko 包含许多内置扩展。本节介绍它们,并给出其用法的简要示例。
RPC
Nameko 包含了一个 RPC over AMQP 的实现。它包括 @rpc
装饰器的入口点,一个用于与其他服务对话的服务的代理 (proxy) ,以及非 Nameko 客户端可用来对集群进行 RPC 调用的独立代理:
服务端
from nameko.rpc import rpc, RpcProxy
class ServiceY:
name = "service_y"
@rpc
def append_identifier(self, value):
return u"{}-y".format(value)
class ServiceX:
name = "service_x"
y = RpcProxy("service_y")
@rpc
def remote_method(self, value):
res = u"{}-x".format(value)
return self.y.append_identifier(res
客户端
from nameko.standalone.rpc import ClusterRpcProxy
config = {
'AMQP_URI': AMQP_URI # e.g. "pyamqp://guest:guest@localhost"
}
with ClusterRpcProxy(config) as cluster_rpc:
cluster_rpc.service_x.remote_method("hellø") # "hellø-x-y"
正常的RPC调用会阻塞,直到远程方法完成为止,但是代理也具有异步调用模式来后台或并行化RPC调用:
with ClusterRpcProxy(config) as cluster_rpc:
hello_res = cluster_rpc.service_x.remote_method.call_async("hello")
world_res = cluster_rpc.service_x.remote_method.call_async("world")
# do work while waiting
hello_res.result() # "hello-x-y"
world_res.result() # "world-x-y"
在具有多个目标服务实例的群集中,RPC请求实例之间的轮询。该请求将仅由目标服务的一个实例处理。
仅在成功处理请求后,才会确认 AMQP 消息。如果服务未能确认消息并且AMQP连接已关闭(例如,如果服务进程被终止),则代理将撤消然后将消息分配给可用的服务实例。
事件(发布-订阅模式) / Event (Pub-Sub)
Nameko Events 是一个异步消息传递系统,实现了 Publish-Subscriber 模式。服务调度可能被零个或多个其他事件接收的事件:
from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc
class ServiceA:
""" Event dispatching service. """
name = "service_a"
dispatch = EventDispatcher()
@rpc
def dispatching_method(self, payload):
self.dispatch("event_type", payload)
class ServiceB:
""" Event listening service. """
name = "service_b"
@event_handler("service_a", "event_type")
def handle_event(self, payload):
print("service b received:", payload)
EventHandler 入口点有三种 handler_types
, 这三种分别表示确定的事件消息是如何在集群中收到的:
- SERVICE_POOL – 事件处理程序按服务名称进行池化,每个池中的一个实例接收事件,类似于RPC入口点的群集行为。这是默认的处理程序类型。
- BROADCAST – 每个侦听服务实例将接收事件。
- SINGLETON – 恰好一个侦听服务实例将接收该事件。
使用 BROADCAST
模式的示例:
from nameko.events import BROADCAST, event_handler
class ListenerService:
name = "listener"
@event_handler(
"monitor", "ping", handler_type=BROADCAST, reliable_delivery=False
)
def ping(self, payload):
# all running services will respond
print("pong from {}".format(self.name))
HTTP
HTTP入口点是基于 werkzeug 开发的,并支持所有标准HTTP方法(GET / POST / DELETE / PUT等)
HTTP 入口点可以为单个 URL 指定多个 HTTP 方法作为逗号分隔的列表。请参见下面的示例。
服务方法必须返回以下之一:
- a string, which becomes the response body
- a 2-tuple (status code, response body)
- a 3-tuple (status_code, headers dict, response body)
- an instance of werkzeug.wrappers.Response
示例如下
# http.py
import json
from nameko.web.handlers import http
class HttpService:
name = "http_service"
@http('GET', '/get/<int:value>')
def get_method(self, request, value):
return json.dumps({'value': value})
@http('POST', '/post')
def do_post(self, request):
return u"received: {}".format(request.get_data(as_text=True))
@http('GET,PUT,POST,DELETE', '/multi')
def do_multi(self, request):
return request.method
运行 HTTP 微服务
$ nameko run http
starting services: http_service
测试相关功能 1
$ curl -i localhost:8000/get/42
HTTP/1.1 200 OK
Content-Type: text/plain; charset=utf-8
Content-Length: 13
Date: Fri, 13 Feb 2015 14:51:18 GMT
{'value': 42}
测试相关功能 2
$ curl -i -d "post body" localhost:8000/post
HTTP/1.1 200 OK
Content-Type: text/plain; charset=utf-8
Content-Length: 19
Date: Fri, 13 Feb 2015 14:55:01 GMT
received: post body
更复杂的示例
# advanced_http.py
from nameko.web.handlers import http
from werkzeug.wrappers import Response
class Service:
name = "advanced_http_service"
@http('GET', '/privileged')
def forbidden(self, request):
return 403, "Forbidden"
@http('GET', '/headers')
def redirect(self, request):
return 201, {'Location': 'https://www.example.com/widget/1'}, ""
@http('GET', '/custom')
def custom(self, request):
return Response("payload")
运行
$ nameko run advanced_http
starting services: advanced_http_service
示例1
$ curl -i localhost:8000/privileged
HTTP/1.1 403 FORBIDDEN
Content-Type: text/plain; charset=utf-8
Content-Length: 9
Date: Fri, 13 Feb 2015 14:58:02 GMT
示例2
$ curl -i localhost:8000/headers
HTTP/1.1 201 CREATED
Location: https://www.example.com/widget/1
Content-Type: text/plain; charset=utf-8
Content-Length: 0
Date: Fri, 13 Feb 2015 14:58:48 GMT
(异常处理)
您可以通过重写来控制服务返回的错误的格式 response_from_exception()
:
import json
from nameko.web.handlers import HttpRequestHandler
from werkzeug.wrappers import Response
from nameko.exceptions import safe_for_serialization
class HttpError(Exception):
error_code = 'BAD_REQUEST'
status_code = 400
class InvalidArgumentsError(HttpError):
error_code = 'INVALID_ARGUMENTS'
class HttpEntrypoint(HttpRequestHandler):
def response_from_exception(self, exc):
if isinstance(exc, HttpError):
response = Response(
json.dumps({
'error': exc.error_code,
'message': safe_for_serialization(exc),
}),
status=exc.status_code,
mimetype='application/json'
)
return response
return HttpRequestHandler.response_from_exception(self, exc)
http = HttpEntrypoint.decorator
class Service:
name = "http_service"
@http('GET', '/custom_exception')
def custom_exception(self, request):
raise InvalidArgumentsError("Argument `foo` is required.")
http exception
$ nameko run http_exceptions
starting services: http_service
测试响应
$ curl -i http://localhost:8000/custom_exception
HTTP/1.1 400 BAD REQUEST
Content-Type: application/json
Content-Length: 72
Date: Thu, 06 Aug 2015 09:53:56 GMT
{"message": "Argument `foo` is required.", "error": "INVALID_ARGUMENTS"}
您可以使用 WEB_SERVER_ADDRESS 配置设置来更改 HTTP 端口 和 IP :
# foobar.yaml
AMQP_URI: 'pyamqp://guest:guest@localhost'
WEB_SERVER_ADDRESS: '0.0.0.0:8000'
计时器 (Timer)
计时器 (Timer) 是一个简单的入口点,每可配置的秒数触发一次。计时器不是“群集感知”的,并且会在所有服务实例上触发。
from nameko.timer import timer
class Service:
name ="service"
@timer(interval=1)
def ping(self):
# method executed every second
print("pong")
Leave a Reply