Back
Featured image of post Flask-SocketIO官方文档翻译

Flask-SocketIO官方文档翻译

低延迟、双向的客户端、服务端通信。

Flask-SocketIO 帮助Flask实现低延迟、双向的客户端、服务端通信。客户端通过任何SocketIO官方库,都能建立与服务器的持久连接。

安装

通过pip快速安装:

pip install flask-socketio

依赖

Flask-SocketIO兼容Python2和Python3。异步实现有三种框架可供选择:

  • eventlet 性能最好,支持长轮询和Websocket协议。

  • gevent 支持多样的设置。gevent支持长轮询,但不像eventlet,不支持原生WebSocket。为了能支持WebSocket,有两种方案:一、安装 gevent-websocket 的方式使其支持WebSocket;二、通过uWSGI 实现WebSocket功能。性能方面,gevent表现不错,但不如eventlet。

  • Flask自带的Werkzeug开发服务器也能使用,但相比来说,性能很差,所以建议只用于开发测试。另外,自动服务器也只支持长轮询。

异步服务会自动加载安装的框架,优先级是eventlet、gevent。其中,gevent内的Websocket服务,其优先级是uWSGI、gevent-websocket。如果没有安装eventlet或gevent,那么会调用Flask自带的开发服务器。

如果使用的是多进程,消息队列通过广播形式进行协调操作。支持的队列包括RedisRabbitMQ。其他的消息队列需要通过包Kombu来支持。

关于客户端,javascript的Socketio库就能建立与服务器的连接。同样,Swift,Java以及C++的官方库也支持。其实,只要实现了Socket.IO协议的客户端都可以与服务器进行连接。


初始化

添加Flask-SocketIO插件:

from flask import Flask, render_template
from flask_socketio import SocketIO

app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)

if __name__ == '__main__':
    socketio.run(app)

支持init_app() 的方式初始化。注意这里的服务器启动方式, socketio.run() 封装了Flask的启动功能,代替了 Flask自带的app.run()debug模式可以在socketio.run()内部配置。在生产环境中,eventlet服务器在可用的情况下,会优先使用。如果eventletgevent 不可用,就会启动Werkzeug开发服务器。

支持Flask 0.11中的命令行交互。Flask-SocketIO实现了Socket.IO的启动命令flask run

$ FLASK_APP=my_app.py flask run

客户端必须加载Socket.IO库,并建立连接:

<script type="text/javascript" src="//cdnjs.cloudflare.com/ajax/libs/socket.io/1.3.6/socket.io.min.js"></script>
<script type="text/javascript" charset="utf-8">
    var socket = io.connect('http://' + document.domain + ':' + location.port);
    socket.on('connect', function() {
        socket.emit('my event', {data: 'I\'m connected!'});
    });
</script>

接收消息

客户端和服务端使用SocketIO时,消息都被当作事件进行接收。在客户端,Javascript 通过回调函数处理事件;在Flask-SocketIO服务端,每个事件都有对应的事件函数,类似原生Flask中,路由都有对应的视图函数。

服务端处理匿名事件:

@socketio.on('message')
def handle_message(message):
    print('received message: ' + message)

上面匿名事件接收的是字符串,匿名事件也可以接收JSON数据:

@socketio.on('json')
def handle_json(json):
    print('received json: ' + str(json))

也可以自定义事件名称,事件消息的数据可以是string,bytes,int或JSON格式:

@socketio.on('my event')
def handle_my_custom_event(json):
    print('received json: ' + str(json))

自定义事件也支持接收多参数:

@socketio.on('my event')
def handle_my_custom_event(arg1, arg2, arg3):
    print('received args: ' + arg1 + arg2 + arg3)

自定义事件非常灵活,可以不用声明数据类型。

Flask-SocketIO也支持SocketIO的命名空间;命名空间可以让客户端多路独立连接在同一个物理socket:

@socketio.on('my event', namespace='/test')
def handle_my_custom_namespace_event(json):
    print('received json: ' + str(json))

默认的命名空间是“/”。

有时,使用装饰器会让代码结构冗余,就可以使用on_event 方法

def my_function_handler(data):
    pass

socketio.on_event('my event', my_function_handler, namespace='/test')

客户端可能会有送达回调,用来接收服务端的送达回执,来确保消息送达。在服务端,事件函数的返回值会回传给客户端的送达回调,作为客户端送达回调的参数:

@socketio.on('my event')
def handle_my_custom_event(json):
    print('received json: ' + str(json))
    return 'one', 2

上面例子中,客户端会把’one’和2作为送达回调的参数。如果事件函数不返回值,送达回调也会被调用,但不含任何参数。


发送消息

定义在事件函数中的send()emit() 语句,可以发送回信给已连接的客户端。

下面例子中,数据会原封不动地返回给发送事件的客户端:

from flask_socketio import send, emit

@socketio.on('message')
def handle_message(message):
    send(message)

@socketio.on('json')
def handle_json(json):
    send(json, json=True)

@socketio.on('my event')
def handle_my_custom_event(json):
    emit('my response', json)

注意场景,send() 用于匿名事件; emit()用于自定义事件。

在有命名空间的情况下, send()emit()默认使用消息源的命名空间。如果要用不同的命名空间,可以指定namespace 参数。

@socketio.on('message')
def handle_message(message):
    send(message, namespace='/chat')

@socketio.on('my event')
def handle_my_custom_event(json):
    emit('my response', json, namespace='/chat')

如果要发送多个参数,可以用tuple:

@socketio.on('my event')
def handle_my_custom_event(json):
    emit('my response', ('foo', 'bar', json), namespace='/chat')

SocketIO 服务端也支持送达回调,确保消息送达客户端。

def ack():
    print '消息已收到!'

@socketio.on('my event')
def handle_my_custom_event(json):
    emit('my response', json, callback=ack)

Javascript客户端使用送达回调接收服务端的回执。客户端调用完送达回调后,服务端也会调用送达回调。如果客户端送达回调包含参数,那么服务端也会收到同样的参数。


广播

SocketIO的广播消息功能用途广泛。在Flask-SocketIO中,send()emit()设置参数broadcast=True就会启动广播功能。

@socketio.on('my event')
def handle_my_custom_event(data):
    emit('my response', data, broadcast=True)

广播功能开启时,所有连接这个命名空间的客户端(包括发送者在内)都会收到这个消息。命名空间未指定时,所有连接全局命名空间的客户端会接收消息。注意,广播消息不会触发回调函数。

上述所有的例子,都是客户端先发送消息,服务端再回应消息。但在实际应用中,服务端可能需要先主动发送消息。比如,在服务端后台线程中,发送事件通知给客户端。socketio.send()socketio.emit() 方法能用于广播给所有连接的客户端:

def some_function():
    socketio.emit('some event', {'data': 42})

注意,socketio.send()socketio.emit()方法不同于处在事件函数上下文中的send()emit()。另外,上例中,由于是在一个普通函数中,没有客户端上下文信息,所以 broadcast=True是默认的,不必指定。


房间

实际应用场景中,可能需要给用户分组。比如,聊天室,不同用户只能收到他们所在房间的消息。通过join_room()leave_room() 可以实现上述功能:

from flask_socketio import join_room, leave_room

@socketio.on('join')
def on_join(data):
    username = data['username']
    room = data['room']
    join_room(room)
    send(username + ' has entered the room.', room=room)

@socketio.on('leave')
def on_leave(data):
    username = data['username']
    room = data['room']
    leave_room(room)
    send(username + ' has left the room.', room=room)

send()emit() 函数接受room 参数。

所有客户端连接时,会被分配一个房间。默认房间名称为连接的session ID,Flask中通过request.sid获取该ID。客户端能加入所有存在的房间。客户端断开时,所有它加入的房间都会移除它。上下文外的socketio.send()socketio.emit()也可以接收room参数,来给房间中所有客户端广播。

因为所有客户端在加入时,都被指定了一个私人的房间,所以,如果想要发送消息给指定客户端,也可以通过指定消息的room参数为该客户端session ID来实现。


连接事件

Flask-SocketIO会发送连接和断开事件。下面的例子展示的就是注册相应的事件函数:

@socketio.on('connect', namespace='/chat')
def test_connect():
    emit('my response', {'data': 'Connected'})

@socketio.on('disconnect', namespace='/chat')
def test_disconnect():
    print('Client disconnected')

连接事件函数可以选择返回False来拒接连接请求。实际应用中,可以通过这种方式来验证用户权限。

注意,连接和断开事件的对象是命名空间。


基于类的命名空间

上面的例子都是通过装饰器来实现命名空间,但实际上可以通过类的方式实现同样的功能。继承flask_socketio.Namespace类,就能实现一个基于类的命名空间。

from flask_socketio import Namespace, emit

class MyCustomNamespace(Namespace):
    def on_connect(self):
        pass

    def on_disconnect(self):
        pass

    def on_my_event(self, data):
        emit('my_response', data)

socketio.on_namespace(MyCustomNamespace('/test'))

使用基于类的命名空间时,事件会被派发到on_事件名方法中去执行。比如,事件my_event会被 on_my_event方法处理。如果一个事件没有对应的方法,事件就会被忽略。另外,on_事件名称一定要符合python的语法规则。

当命名空间未指定时,会自动选择合适的命名空间。

当事件函数被同时定义于装饰器和类中时,只有装饰器的函数是有效的。


异常处理

@socketio.on_error()        # Handles the default namespace
def error_handler(e):
    pass

@socketio.on_error('/chat') # handles the '/chat' namespace
def error_handler_chat(e):
    pass

@socketio.on_error_default  # handles all namespaces without an explicit error handler
def default_error_handler(e):
    pass

异常处理函数将异常对象作为一个参数。

消息和数据能通过request.event参数获取,可以便于排错和记录错误日志:

from flask import request

@socketio.on("my error event")
def on_my_event(data):
    raise RuntimeError()

@socketio.on_error_default
def default_error_handler(e):
    print(request.event["message"]) # "my error event"
    print(request.event["args"])    # (data,)

访问Flask全局变量

SocketIO事件函数和Flask路由视图函数并不一样,最大的区别,就是所有SocketIO事件都发生于独立一次与客户端的长请求中。

尽管有差异,但Flask-SocketIO尽量保持像Flask处理HTTP请求一样,去处理事件。以下就是所有SocketIO使用规则:

  • 在调用事件函数前,应用上下文就已经被推入栈,以此确保current_appg在事件函数中可用。
  • 请求上下文,也在调用事件函数之前,被推入栈,确保requestsession可用。但请注意,WebSocket事件没有独立的请求,所以,连接时请求上下文被推入栈,所有派发的事件都发生一个连接生命周期中。
  • request 多了sid成员,用于为连接设定的session ID,其默认值为客户端房间号。
  • request 还有namespaceevent成员,其包括当前的命名空间和事件参数。event是一个字典包括message键和args键。
  • session也与flask普通情况下不同。一样的地方是,用户session的副本在建立连接后就可用了。但是,如果SocketIO 事件函数修改了session,那么修改的session就会被保存,之后SocketIO 事件函数中的session也是被修改过的,但普通HTTP路由视图函数看不到这些修改。实际上,当session被修改了,只是创建了这个session的副本分支。做出这个限制的技术原因是,为了保存用户的session,需要将session加密到cookie发送给客户端,这就需要HTTP的请求和响应,但SocketIO连接中并没有这些。当使用的是服务端session,比如,Flask-Session或Flask-KVSession插件,只要session修改不是发生在SocketIO 事件函数中,在路由视图函数中修改的session在SocketIO 视图函数中就是可见的。
  • before_requestafter_request钩子函数,不会被SocketIO 事件函数调用。
  • SocketIO 事件函数可以定制装饰器,但大多数Flask装饰器不能用于SocketIO 事件函数,因为SocketIO中没有Response的概念。

认证

验证用户的登录状态是一个很常见的需求,但传统基于表单和HTTP请求的机制并不适用于SocketIO连接,因为SocketIO并没有不会发送HTTP请求和接收response响应。必要的情况下,应用可以实现一个类似HTTP请求,基于SocketIO的登录验证系统。

但有一个更方便的做法,就是在建立SocketIO连接前,用传统方式完成对用户的认证;然后,将身份信息识别号保存在session或者cookie中;最后,当SocketIO连接建立时,SocketIO事件函数就能调用这些信息了。


使用Flask-Login插件

Flask-SocketIO可以访问保存在Flask-Login的登录信息。当Flask-Login验证完毕后,login_user()就被调用,用户信息就会被记录进session。至此,所有SocketIO的连接就能访问current_user上下文中的变量:

@socketio.on('connect')
def connect_handler():
    if current_user.is_authenticated:
        emit('my response',
             {'message': '{0} has joined'.format(current_user.name)},
             broadcast=True)
    else:
        return False  # not allowed here

要注意,login_required 装饰器不能用于SocketIO事件函数,但要实现类似功能,可以定制一个类似的如下方法:

import functools
from flask import request
from flask_login import current_user
from flask_socketio import disconnect

def authenticated_only(f):
    @functools.wraps(f)
    def wrapped(*args, **kwargs):
        if not current_user.is_authenticated:
            disconnect()
        else:
            return f(*args, **kwargs)
    return wrapped

@socketio.on('my event')
@authenticated_only
def handle_my_custom_event(data):
    emit('my response', {'message': '{0} has joined'.format(current_user.name)},
         broadcast=True)

部署

部署Flask-SocketIO服务器,有简单到复杂的各种参数。这里将介绍一些最常用的参数:

内置服务器

最简单的部署方式,就是安装eventlet或gevent,然后调用socketio.run(app),相应的例子已在上方提到。SocketIO会自动选择eventlet或gevent。

需要注意的是,socketio.run(app)是用于生产环境的,但前提必须确保eventlet或gevent已经安装;否则只会调用Flask自带的服务器,这个服务器仅限于测试环境使用。

另外,gevent也不能搭配uWSGI服务器使用。请查看下方uWSGI部分了解更多信息。

Gunicorn服务器

socketio.run(app)还可以使用 gunicorn 作为服务器,同时使用eventlet或gevent作为workers。这种方式一样需要安装gevent或eventlet,及gunicorn。命令行启动evenlet的gunicorn服务器是:

gunicorn --worker-class eventlet -w 1 module:app

gevent的gunicorn服务器:

gunicorn -k gevent -w 1 module:app

gevent-websocket插件支持gunicorn服务器搭配gevent worker和Websocket。

gunicorn -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker -w 1 module:app

所有这些命令,module是python的定义在应用实例当中的包或模块,app就是应用实例本身。

由于gunicorn算法的问题,服务器启动只能使用一个worker进程。因此,上面都必须加上 -w 1

uWSGI服务器

使用gevent的uWSGI服务器时,Socket.IO可以支持uWSGI原生的WebSocket。

uWSGI服务器具体使用方法不在本文探讨范围内。uWSGI很复杂,有许多配置选项。WebSocket传输必须要有WebSocket和SSL支持。下面的命令就是启动映射端口为5000的app.py:

$ uwsgi --http :5000 --gevent 1000 --http-websockets --master --wsgi-file app.py --callable app

使用nginx作为WebSocket反向代理

通过nginx可以实现前后端反向代理,即传递请求给应用。但是,只有nginx 1.4及以上版本才支持代理WebSocket协议。下面就是基本的nginx代理HTTP和WebSocket请求的配置:

server {
    listen 80;
    server_name _;

    location / {
        include proxy_params;
        proxy_pass http://127.0.0.1:5000;
    }

    location /socket.io {
        include proxy_params;
        proxy_http_version 1.1;
        proxy_buffering off;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "Upgrade";
        proxy_pass http://127.0.0.1:5000/socket.io;
    }
}

下面是负载均衡Socket.IO的例子:

upstream socketio_nodes {
    ip_hash;

    server 127.0.0.1:5000;
    server 127.0.0.1:5001;
    server 127.0.0.1:5002;
    # to scale the app, just add more nodes here!
}

server {
    listen 80;
    server_name _;

    location / {
        include proxy_params;
        proxy_pass http://127.0.0.1:5000;
    }

    location /socket.io {
        include proxy_params;
        proxy_http_version 1.1;
        proxy_buffering off;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "Upgrade";
        proxy_pass http://socketio_nodes/socket.io;
    }
}

上面的例子,可以作为原始配置,但要注意,应用到生产环境,需要一个更加完整的配置,覆盖其他部署信息,比如静态文件配置、SSL支持等。

使用多个worker

从2.0版本起,Flask-SocketIO支持在同一个负载均衡服务器下,使用多个worker模式。部署了多个worker的Flask-SocketIO应用,可以拓展客户端连接到多个进程及主机。这样就可以支持高并发量的场景。

使用Flask-SocketIO多个workers有两个条件:

  • 必须配置好负载均衡器,所有来自同一客户端的请求始终交给同一个worker。这通常被称为粘滞会话(Sticky Sessions)。对于nginx,使用ip_hash指定可以达到这个目标。Gunicorn不能使用多worker模式,就是因为负载均衡算法不支持粘滞会话。
  • 每个服务器都有自己的客户端集合,所以消息队列比如Redis 或 RabbitMQ被用来协调操作,比如广播和房间。

当使用消息队列时,须安装如下依赖:

  • Redis:安装redis包(pip install redis)
  • RabbitMQ:安装kombu包(pip install kombu)
  • 其他支持Kombu的消息队列:具体查看Kombu documentation文档查询依赖
  • 如果使用了evenlet或gevent,就需要打上monkey patching,强制消息队列包使用协程方法和类。

开启多个Flask-SocketIO服务器,首先必须确保消息队列处于运行中。启动Socket.IO服务器,使其连接消息队列,添加 message_queue 参数给SocketIO构造器:

socketio = SocketIO(app, message_queue='redis://')

message_queue的值是队列服务的URL连接。对于运行在同一主机下的redis队列,可以使用'redis://'。同样,RabbitMQ队列可以使用'amqp://'。Kombu可以参考 documentation section ,查看不同队列的URL格式。

从外部进程发送事件

在某些场景下,需要在外部进程中发送事件,而非从服务器,比如从Celery worker发送事件。如果SocketIO服务器被配置成侦听消息队列,那么其他进程,就可以通过自己创建的SocketIO实例,用和服务器一样的方式发送事件。

比如,应用跑在eventlet服务器上,使用Redis消息队列,下面代码就可以给所有客户端广播事件:

socketio = SocketIO(message_queue='redis://')
socketio.emit('my event', {'data': 'foo'}, namespace='/test')

使用SocketIO实例时,Flask app实例并没有传给构造器。

传给SocketIO的channel参数可以通过消息队列,指定具体的通信频道。在共享队列,且使用多个独立的SocketIO服务时,须使用自定义的频道名。

在用eventlet或gevent时,Flask-SocketIO不会应用monkey patching。但是,如果不打上monkey patching,使用消息队列时,Python用于与消息队列沟通的包可能会被挂起。

重要提醒,连接SocketIO服务的外部进程,不需要像主服务器一样,使用eventlet或gevent。只要主服务器使用了协程框架,那么就不用管外部进程的异步问题。比如,Celery的workers不需要设置eventlet或gevent,因为主服务器已经设置了。但是,如果外部进程由于某些原因,使用了异步框架,那么monkey pathcing就一定要打上了,这样才可以让消息队列使用协程异步的函数和类。

沪ICP备20004885号-2
jonathan.nuance@outlook.com
Built with Hugo
Theme Stack designed by Jimmy