Neurohazard
暮雲煙月,皓首窮經;森羅萬象,如是我聞。

Nameko 内置扩展 (Built-in Extensions)

wpadmin~October 11, 2019 /Software Engineering

Contents

Nameko 内置扩展 (Built-in Extensions)

Nameko 包含许多内置扩展。本节介绍它们,并给出其用法的简要示例。

RPC

Nameko 包含了一个 RPC over AMQP 的实现。它包括 @rpc 装饰器的入口点,一个用于与其他服务对话的服务的代理 (proxy) ,以及非 Nameko 客户端可用来对集群进行 RPC 调用的独立代理:

服务端

from nameko.rpc import rpc, RpcProxy

class ServiceY:
    name = "service_y"

    @rpc
    def append_identifier(self, value):
        return u"{}-y".format(value)


class ServiceX:
    name = "service_x"

    y = RpcProxy("service_y")

    @rpc
    def remote_method(self, value):
        res = u"{}-x".format(value)
        return self.y.append_identifier(res

客户端

from nameko.standalone.rpc import ClusterRpcProxy

config = {
    'AMQP_URI': AMQP_URI  # e.g. "pyamqp://guest:guest@localhost"
}

with ClusterRpcProxy(config) as cluster_rpc:
    cluster_rpc.service_x.remote_method("hellø")  # "hellø-x-y"

正常的RPC调用会阻塞,直到远程方法完成为止,但是代理也具有异步调用模式来后台或并行化RPC调用:

with ClusterRpcProxy(config) as cluster_rpc:
    hello_res = cluster_rpc.service_x.remote_method.call_async("hello")
    world_res = cluster_rpc.service_x.remote_method.call_async("world")
    # do work while waiting
    hello_res.result()  # "hello-x-y"
    world_res.result()  # "world-x-y"

在具有多个目标服务实例的群集中,RPC请求实例之间的轮询。该请求将仅由目标服务的一个实例处理。

仅在成功处理请求后,才会确认 AMQP 消息。如果服务未能确认消息并且AMQP连接已关闭(例如,如果服务进程被终止),则代理将撤消然后将消息分配给可用的服务实例。

事件(发布-订阅模式) / Event (Pub-Sub)

Nameko Events 是一个异步消息传递系统,实现了 Publish-Subscriber 模式。服务调度可能被零个或多个其他事件接收的事件:

from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc

class ServiceA:
    """ Event dispatching service. """
    name = "service_a"

    dispatch = EventDispatcher()

    @rpc
    def dispatching_method(self, payload):
        self.dispatch("event_type", payload)


class ServiceB:
    """ Event listening service. """
    name = "service_b"

    @event_handler("service_a", "event_type")
    def handle_event(self, payload):
        print("service b received:", payload)

EventHandler 入口点有三种 handler_types, 这三种分别表示确定的事件消息是如何在集群中收到的:

使用 BROADCAST 模式的示例:

from nameko.events import BROADCAST, event_handler

class ListenerService:
    name = "listener"

    @event_handler(
        "monitor", "ping", handler_type=BROADCAST, reliable_delivery=False
    )
    def ping(self, payload):
        # all running services will respond
        print("pong from {}".format(self.name))

HTTP

HTTP入口点是基于 werkzeug 开发的,并支持所有标准HTTP方法(GET / POST / DELETE / PUT等)

HTTP 入口点可以为单个 URL 指定多个 HTTP 方法作为逗号分隔的列表。请参见下面的示例。

服务方法必须返回以下之一:

示例如下

# http.py

import json
from nameko.web.handlers import http

class HttpService:
    name = "http_service"

    @http('GET', '/get/<int:value>')
    def get_method(self, request, value):
        return json.dumps({'value': value})

    @http('POST', '/post')
    def do_post(self, request):
        return u"received: {}".format(request.get_data(as_text=True))

    @http('GET,PUT,POST,DELETE', '/multi')
    def do_multi(self, request):
        return request.method

运行 HTTP 微服务

$ nameko run http
starting services: http_service

测试相关功能 1

$ curl -i localhost:8000/get/42
HTTP/1.1 200 OK
Content-Type: text/plain; charset=utf-8
Content-Length: 13
Date: Fri, 13 Feb 2015 14:51:18 GMT

{'value': 42}

测试相关功能 2


$ curl -i -d "post body" localhost:8000/post HTTP/1.1 200 OK Content-Type: text/plain; charset=utf-8 Content-Length: 19 Date: Fri, 13 Feb 2015 14:55:01 GMT received: post body

更复杂的示例

# advanced_http.py

from nameko.web.handlers import http
from werkzeug.wrappers import Response

class Service:
    name = "advanced_http_service"

    @http('GET', '/privileged')
    def forbidden(self, request):
        return 403, "Forbidden"

    @http('GET', '/headers')
    def redirect(self, request):
        return 201, {'Location': 'https://www.example.com/widget/1'}, ""

    @http('GET', '/custom')
    def custom(self, request):
        return Response("payload")

运行

$ nameko run advanced_http
starting services: advanced_http_service

示例1

$ curl -i localhost:8000/privileged
HTTP/1.1 403 FORBIDDEN
Content-Type: text/plain; charset=utf-8
Content-Length: 9
Date: Fri, 13 Feb 2015 14:58:02 GMT

示例2

$ curl -i localhost:8000/headers
HTTP/1.1 201 CREATED
Location: https://www.example.com/widget/1
Content-Type: text/plain; charset=utf-8
Content-Length: 0
Date: Fri, 13 Feb 2015 14:58:48 GMT

(异常处理)
您可以通过重写来控制服务返回的错误的格式 response_from_exception()

import json
from nameko.web.handlers import HttpRequestHandler
from werkzeug.wrappers import Response
from nameko.exceptions import safe_for_serialization


class HttpError(Exception):
    error_code = 'BAD_REQUEST'
    status_code = 400


class InvalidArgumentsError(HttpError):
    error_code = 'INVALID_ARGUMENTS'


class HttpEntrypoint(HttpRequestHandler):
    def response_from_exception(self, exc):
        if isinstance(exc, HttpError):
            response = Response(
                json.dumps({
                    'error': exc.error_code,
                    'message': safe_for_serialization(exc),
                }),
                status=exc.status_code,
                mimetype='application/json'
            )
            return response
        return HttpRequestHandler.response_from_exception(self, exc)


http = HttpEntrypoint.decorator


class Service:
    name = "http_service"

    @http('GET', '/custom_exception')
    def custom_exception(self, request):
        raise InvalidArgumentsError("Argument `foo` is required.")

http exception

$ nameko run http_exceptions
starting services: http_service

测试响应

$ curl -i http://localhost:8000/custom_exception
HTTP/1.1 400 BAD REQUEST
Content-Type: application/json
Content-Length: 72
Date: Thu, 06 Aug 2015 09:53:56 GMT

{"message": "Argument `foo` is required.", "error": "INVALID_ARGUMENTS"}

您可以使用 WEB_SERVER_ADDRESS 配置设置来更改 HTTP 端口 和 IP :

# foobar.yaml

AMQP_URI: 'pyamqp://guest:guest@localhost'
WEB_SERVER_ADDRESS: '0.0.0.0:8000'

计时器 (Timer)

计时器 (Timer) 是一个简单的入口点,每可配置的秒数触发一次。计时器不是“群集感知”的,并且会在所有服务实例上触发。

from nameko.timer import timer

class Service:
    name ="service"

    @timer(interval=1)
    def ping(self):
        # method executed every second
        print("pong")

Leave a Reply

Your email address will not be published. Required fields are marked *