1 引言

积跬步以至千里,积怠情以至深渊。

RYU是由日本NTT公司研发的开源SDN控制器,由Python语言编写。支持OpenFlow1.0、1.2、1.3、1.4和1.5版本的协议。本人将在此文档中持续更新关于RYU控制器的一些学习笔记,包括RYU的运行流程、部分源码解读、应用开发以及RYU具体使用。望读者能在共同学习的同时,批评指正。

2 RYU源码解读

2.1 RYU源文件目录结构

ryu/ryu目录下的主要目录内容如下:

(1) base:

base中有一个非常重要的文件:app_manager.py,其作用是RYU应用的管理中心,即对其他组件的管理。用于加载RYU应用程序,接受从APP发送过来的信息,同时也完成消息的路由。会被ryu-manager自动调用。

其主要的函数有app注册、注销、查找、并定义了RyuApp基类,定义了RYUAPP的基本属性。包含name, threads, events, event_handlers和observers等成员,以及对应的许多基本函数。如:start(), stop()等。

这个文件中还定义了AppManager基类,用于管理APP。定义了加载APP等函数。不过如果仅仅是开发APP的话,这个类可以不必关心。

(2) controller: 实现controller和交换机之间的互联和事件处理

controller文件夹中许多非常重要的文件,如events.py, ofp_handler.py, controller.py等。

在controller.py中定义了OpenFlowController基类,是控制器组件,管理与OF交换机连接的安全通道,接收OF消息,调用ofp_events,并发布相应的“事件”,以触发订阅了该“事件”的组件的处理逻辑。

在ofp_handler.py中定义了基本的handler,完成了基本的如:握手,错误信息处理和keep alive 等功能。更多的如packet_in_handler应该在app中定义。

在dpset.py文件中,定义了交换机端的一些消息,如端口状态信息等,用于描述和操作交换机。如添加端口,删除端口等操作。

(3) lib:网络基本协议的实现和使用

lib中定义了我们需要使用到的基本的数据结构,如dpid, mac和ip等数据结构。在lib/packet目录下,还定义了许多网络协议,如ICMP, DHCP, MPLS和IGMP等协议内容。而每一个数据包的类中都有parser和serialize两个函数。用于解析和序列化数据包。

lib目录下,还有ovs, netconf目录,对应的目录下有一些定义好的数据类型

(4) ofproto

在这个目录下,基本分为两类文件,一类是协议的数据结构定义,另一类是协议解析,也即数据包处理函数文件。

(5) topology:交换机和链路的查询模块

包含了switches.py等文件,基本定义了一套交换机的数据结构。event.py定义了交换上的事件。dumper.py定义了获取网络拓扑的内容。最后api.py向上提供了一套调用topology目录中定义函数的接口。

(6) contrib:第三方库

这个文件夹主要存放的是开源社区贡献者的代码。

(7) cmd:入口函数

定义了RYU的命令系统,为controller的执行创建环境,接收和处理相关命令

(8) services

完成了BGP (路由技术) 和vrrp (交换技术) 的实现

(9) tests

tests目录下存放了单元测试以及整合测试的代码。

2.2 ryu/app目录下源码解读及相关API的使用

2.2.1 ryu/app/simple_switch_13.py实现功能为:传统的2层交换机策略

对于OpenFlow交换机,接受RYU控制器的指令,并达到以下功能:对于接收到的数据包进行修改或针对指定端口进行转发;对于接收到的数据包进行转发到控制器的动作(packet-in);对于接收到来自控制器的数据包进行转发到指定的端口(packet-out)

对于RYU来说,接收到任何一个OpenFlow消息,都会产生一个相对应事件,因此RYU应用必须实现事件管理以处理相对应的事件。

NT: simple_switch.py、 simple_switch_12.py、simple_switch_13.py、simple_switch_14.py和simple_switch_15.py分别对应OpenFlow1.0、1.2、1.4、1.5版本的。

from ryu.base import app_manager

from ryu.controller import ofp_event

from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER

from ryu.controller.handler import set_ev_cls

from ryu.ofproto import ofproto_v1_3

from ryu.lib.packet import packet

from ryu.lib.packet import ethernet

from ryu.lib.packet import ether_types

# 继承ryu.base.app_manager.RyuApp

class SimpleSwitch13(app_manager.RyuApp):

    OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]  # 指定OpenFlow 1.3版本

    def __init__(self, *args, **kwargs):

        super(SimpleSwitch13, self).__init__(*args, **kwargs)

        self.mac_to_port = {}  # 定义MAC地址列表

# set_ev_cls指定事件类别得以接受消息和交换机状态作为参数

# 其中事件类别名称为ryu.controller.ofp_event.EventOFP+<OpenFlow消息名称>

# 例如:在 Packet-In 消息的状态下的事件名称为EventOFPPacketIn

# 对于交换机的状态来说,可指定以下中的一项

# ryu.controller.handler.HANDSHAKE_DISPATCHER 交换 HELLO 消息

# ryu.controller.handler.CONFIG_DISPATCHER   接收SwitchFeatures消息

# ryu.controller.handler.MAIN_DISPATCHER   一般状态

# ryu.controller.handler.DEAD_DISPATCHER   连线中断

    @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)

    def switch_features_handler(self, ev):

    # ev.msg 是用来存储对应事件的 OpenFlow 消息类别实体

    # msg.datapath是用来存储OpenFlow交换机的 ryu.controller.controller.Datapath 类别所对应的实体

        datapath = ev.msg.datapath  

        ofproto = datapath.ofproto  # ofproto表示使用的OpenFlow版本所对应的ryu.ofproto.ofproto_v1_3

        parser = datapath.ofproto_parser  # 和ofproto一样,有对应版本ryu.ofproto.ofproto_v1_3_parser

        

# 下发table-miss流表项,让交换机对于不会处理的数据包通过packet-in消息上交给Ryu控制器!!!

        # 匹配数据包

        # 若数据包没有 match 任何一个普通 Flow Entry 时,则触发 Packet-In

        match = parser.OFPMatch()  

        # 通过预留端口ofproto.OFPP_CONTROLLER,将packet-in消息发送给controller,并通过ofproto.OFPCML_NO_BUFFE指明Racket-in消息的原因是table miss

        actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,

                                          ofproto.OFPCML_NO_BUFFER)]

        # 执行 add_flow() 方法以发送 Flow Mod 消息

        self.add_flow(datapath, 0, match, actions)

    def add_flow(self, datapath, priority, match, actions, buffer_id=None):

    # 新增流表项

        ofproto = datapath.ofproto

        parser = datapath.ofproto_parser

# Apply Actions 是用来设定那些必须立即执行的 action 所使用

        inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,

                                             actions)]

        # 通过 Flow Mod 消息将 Flow Entry 新增到 Flow table 中

        if buffer_id:

            mod = parser.OFPFlowMod(datapath=datapath, buffer_id=buffer_id,

                                    priority=priority, match=match,

                                    instructions=inst)

        else:

            mod = parser.OFPFlowMod(datapath=datapath, priority=priority,

                                    match=match, instructions=inst)

        datapath.send_msg(mod)

    @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)

    def _packet_in_handler(self, ev):

        # If you hit this you might want to increase

        # the "miss_send_length" of your switch

        if ev.msg.msg_len < ev.msg.total_len:

            self.logger.debug("packet truncated: only %s of %s bytes",

                              ev.msg.msg_len, ev.msg.total_len)

        # 为了接收处理未知目的地的数据包,需要执行Packet-In 事件管理

        msg = ev.msg  # 每一个事件类ev中都有msg成员,用于携带触发事件的数据包

        datapath = msg.datapath  # 已经格式化的msg其实就是一个packet_in报文,msg.datapath直接可以获得packet_in报文的datapath结构

        # datapath用于描述一个交换网桥,也是和控制器通信的实体单元。

        # datapath.send_msg()函数用于发送数据到指定datapath。

        # 通过datapath.id可获得dpid数据。

        ofproto = datapath.ofproto  # datapath.ofproto对象是一个OpenFlow协议数据结构的对象,成员包含OpenFlow协议的数据结构,如动作类型OFPP_FLOOD

        parser = datapath.ofproto_parser  # datapath.ofp_parser则是一个按照OpenFlow解析的数据结构。

# 更新Mac地址表

        in_port = msg.match['in_port']

        pkt = packet.Packet(msg.data)

        eth = pkt.get_protocols(ethernet.ethernet)[0]

        if eth.ethertype == ether_types.ETH_TYPE_LLDP:

            # ignore lldp packet

            return

        dst = eth.dst

        src = eth.src

        dpid = datapath.id

        self.mac_to_port.setdefault(dpid, {})

        self.logger.info("packet in %s %s %s %s", dpid, src, dst, in_port)

        # learn a mac address to avoid FLOOD next time.

        self.mac_to_port[dpid][src] = in_port

# 判断转发的数据包的连接端口

# 目的 MAC 位址若存在于 MAC 地址表,则判断该连接端口号码为输出。

# 反之若不存在于 MAC 地址表则 OUTPUT action 类别的实体并生成 flooding( OFPP_FLOOD )给目的连接端口使用。

        if dst in self.mac_to_port[dpid]:

            out_port = self.mac_to_port[dpid][dst]

        else:

            out_port = ofproto.OFPP_FLOOD

        actions = [parser.OFPActionOutput(out_port)]

        # install a flow to avoid packet_in next time

        if out_port != ofproto.OFPP_FLOOD:

            match = parser.OFPMatch(in_port=in_port, eth_dst=dst, eth_src=src)

            # verify if we have a valid buffer_id, if yes avoid to send both

            # flow_mod & packet_out

            if msg.buffer_id != ofproto.OFP_NO_BUFFER:

                self.add_flow(datapath, 1, match, actions, msg.buffer_id)

                return

            else:

                self.add_flow(datapath, 1, match, actions)

# 在 MAC 地址表中找寻目的 MAC 地址,若是有找到则发送 Packet-Out 讯息,并且转送数据包。

        data = None

        if msg.buffer_id == ofproto.OFP_NO_BUFFER:

            data = msg.data

        out = parser.OFPPacketOut(datapath=datapath, buffer_id=msg.buffer_id,

                                  in_port=in_port, actions=actions, data=data)

        datapath.send_msg(out)

可结合Mininet进行测试,相关操作如下:

创建topo:

sudo mn –topo single,3 –mac –switch ovsk –controller remote -x

查看ovs-vswitchd的配置信息:

ovs-vsctl show

查看datapath的信息:

ovs-dpctl show 

设定交换机s1的Openflow版本:

ovs-vsctl set Bridge s1 protocols=OpenFlow13

查看交换机s1的流表信息:

ovs-ofctl -O OpenFlow13 dump-flows s1

主机h1网络抓包:

tcpdump -en -i h1-eth0 

NT:实验过程可结合ifconfig 命令用来查看和配置网络设备!!!例如:

ifconfig eth0 up        # 启动 <br>ifcfg etho up         # 启动

ifconfig eth0 down      # 关闭<br>ifcfg eth0 down        # 关闭

ifconfig eth0 hw ether 00:AA:BB:CC:DD:EE                # 修改MAC地址

ifconfig eth0 reload    # 重启

ifconfig eth0 add 33ffe:3240:800:1005::2/64              # 为网卡eth0配置IPv6地址 

ifconfig eth0 del 33ffe:3240:800:1005::2/64              # 为网卡eth0删除IPv6地址

ifconfig eth0 192.168.25.166 netmask 255.255.255.0 up    # 为网卡eth0配置IPv4地址

ifconfig eth0:ws arp # 启用ARP协议

ifconfig eth0:ws -arp # 关闭ARP协议

ifconfig eth0 mtu 1500 # 设置最大传输单元

2.2.2 ryu/app/simple_monitor_13.py实现功能为:定期检查网络状态

当网络发生异常时,需快速找到原因,并且尽快回复原状。而找出网络中的错误、发现真正的原因需要清楚地知道网络地状态,假设网络中某个端口正处于高流量的状态,不论是因为它是一个不正常的状态或是任何原因导致,变成一个由于没有持续监控所发生的问题。

from operator import attrgetter

from ryu.app import simple_switch_13

from ryu.controller import ofp_event

from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER

from ryu.controller.handler import set_ev_cls

from ryu.lib import hub

class SimpleMonitor13(simple_switch_13.SimpleSwitch13):

# 定期的向交换机发出要求以取得需要统计的数据

    def __init__(self, *args, **kwargs):

        super(SimpleMonitor13, self).__init__(*args, **kwargs)

        self.datapaths = {}

        self.monitor_thread = hub.spawn(self._monitor)  # 建立一个绿色线程,运行监控程序

        

# EventOFPStatureChange的信息类用来监测交换器的连线中断,会被触发在#Dathpath状态改变时

# 参数二表示 一般状态和连线中断状态

    @set_ev_cls(ofp_event.EventOFPStateChange,

                [MAIN_DISPATCHER, DEAD_DISPATCHER])

    def _state_change_handler(self, ev):   

    # 通过判断当前状态从监测列表添加或移除当前datapath

    # 连线中断状态用于确认连线中的交换机可以持续被监控

        datapath = ev.datapath

        # 当datapath的状态变成MAIN_DISPATCHER时,代表交换机已经被注册,并且正处于被监视的状态

        if ev.state == MAIN_DISPATCHER:

            if datapath.id not in self.datapaths:

                self.logger.debug('register datapath: %016x', datapath.id)

                self.datapaths[datapath.id] = datapath

        # 当datapath的状态变成DEAD_DISPATCHER时,代表注册状态已经解除

        elif ev.state == DEAD_DISPATCHER:

            if datapath.id in self.datapaths:

                self.logger.debug('unregister datapath: %016x', datapath.id)

                del self.datapaths[datapath.id]

    def _monitor(self):

        while True:

        # 不断地注册的向交换机发送要求取得的统计信息

            for dp in self.datapaths.values():

                self._request_stats(dp)

            # 每隔10s查询一次当前的监视datapath名单中的各个#datapath状况

            hub.sleep(10)

    def _request_stats(self, datapath):

        self.logger.debug('send stats request: %016x', datapath.id)

        ofproto = datapath.ofproto

        parser = datapath.ofproto_parser

# 向指定的datapath发送OFPFlowStatsRequest和OFPStatsResquest消息类实体,即对相关统计信息进行

# 请求

# OFPFlowStatsRequest主要用来对交换机的Flow Entry取得统计信息

# 对于交换即发出的要求可以使用table ID、output port、cookie 值和 match 条件来限定范围,但是以下实现的是取得所有的 Flow Entry。

        req = parser.OFPFlowStatsRequest(datapath)

        datapath.send_msg(req)

        

# OFPPortStatsRequest 是用来取得关于交换机的端口相关信息以及统计信息。

# 使用的使用可以指定端口号,以下使用OFPP_ANY,目的是要取得所有的端口统计信息。

        req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)

        datapath.send_msg(req)

# 对FlowStatsReply消息的回复进行事件处理

    @set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)

    def _flow_stats_reply_handler(self, ev):

    # body中存放了OFPFlowStats的列表,存储了每一个Flow Entry的统计资料,并作为OFPFlowStatsRequest的回应

        body = ev.msg.body  

        self.logger.info('datapath         '

                         'in-port  eth-dst           '

                         'out-port packets  bytes')

        self.logger.info('—————- '

                         '——– —————– '

                         '——– ——– ——–')

        # 对各个优先级非0的流表项按接收端口和目的MAC地址进行排序后遍历

        for stat in sorted([flow for flow in body if flow.priority == 1],

                           key=lambda flow: (flow.match['in_port'],

                                             flow.match['eth_dst'])):

            # 对交换机的datapath.id,目的MAC地址,输出端口和包以及字节流量进行打印

            self.logger.info('%016x %8x %17s %8x %8d %8d',

                             ev.msg.datapath.id,

                             stat.match['in_port'], stat.match['eth_dst'],

                             stat.instructions[0].actions[0].port,

                             stat.packet_count, stat.byte_count)

                             

# 对PortStatsReply消息的回复事件进行处理

    @set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)

    def _port_stats_reply_handler(self, ev):

        body = ev.msg.body

        self.logger.info('datapath         port     '

                         'rx-pkts  rx-bytes rx-error '

                         'tx-pkts  tx-bytes tx-error')

        self.logger.info('—————- ——– '

                         '——– ——– ——– '

                         '——– ——– ——–')

        # 根据端口号进行排序并遍历

        for stat in sorted(body, key=attrgetter('port_no')):

        # 打印交换机id,端口号和接收及发送的包的数量字节数和错误数

            self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',

                             ev.msg.datapath.id, stat.port_no,

                             stat.rx_packets, stat.rx_bytes, stat.rx_errors,

                             stat.tx_packets, stat.tx_bytes, stat.tx_errors)

在Ryubook中给出了OFPFlowStats的JSON格式的全部信息,下面给出OFPPortStats的JSON数据信息:

在这里插入图片描述

2.2.3 /ryu/app/simple_switch_rest_13.py

关于ofctl_rest.py的用法参考链接

RYU本身提供了一个类似WSGI的web服务器功能。借助这个功能,我们可以创建一个REST API。基于创建的REST API,可以快速的将RYU系统与其他系统或者是浏览器相连接。

REST:表征性状态传输(英文:Representational State Transfer,简称REST)是Roy Fielding博士在2000年他的博士论文中提出来的一种软件架构风格。REST架构风格中,资源是通过URI来描述的。对资源的操作采用了HTTP的GET,POST,PUT和DELETE方法相对应。资源的表现形式可以是json或xml。REST的架构是Client-Server架构,同时链接是无状态的。所以要求在传输的过程中需要包含状态信息。

代码解析:

import json

from ryu.app import simple_switch_13

from ryu.controller import ofp_event

from ryu.controller.handler import CONFIG_DISPATCHER

from ryu.controller.handler import set_ev_cls

from ryu.app.wsgi import ControllerBase

from ryu.app.wsgi import Response

from ryu.app.wsgi import route

from ryu.app.wsgi import WSGIApplication

from ryu.lib import dpid as dpid_lib

simple_switch_instance_name = 'simple_switch_api_app'

# 指定url为如下方式,其中,{dpid} 的部分必須與 ryu/lib/dpid.py

url = '/simpleswitch/mactable/{dpid}'

class SimpleSwitchRest13(simple_switch_13.SimpleSwitch13):

"""

更新交换机的MAC地址表

"""

    _CONTEXTS = {'wsgi': WSGIApplication}  # 用来建立Ryu中WSGI网页服务器所对应的类别,因此可通过wsgi Key来取得网页服务器的实体

    def __init__(self, *args, **kwargs):

        super(SimpleSwitchRest13, self).__init__(*args, **kwargs)

        self.switches = {}

        wsgi = kwargs['wsgi']  # 通过上一步设置的_CONTEXTS成员变量,可以通过kwargs进行实例化一个WSGIApplication。

        wsgi.register(SimpleSwitchController,

                      {simple_switch_instance_name: self})  # 使用register方法注册该服务到controller类上。

# 重写父类的switch_features_handler函数

    @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)

    def switch_features_handler(self, ev):

        super(SimpleSwitchRest13, self).switch_features_handler(ev)

        datapath = ev.msg.datapath

        self.switches[datapath.id] = datapath  # 存储datapath到switches

        self.mac_to_port.setdefault(datapath.id, {})  # 初始化MAC地址表

    def set_mac_to_port(self, dpid, entry):

    """

    该方法将MAC地址和端口注册到指定的交换机。该方法主要被REST API的PUT方法所调用。

    """

    # 获取MAC table

        mac_table = self.mac_to_port.setdefault(dpid, {})

         # 获取datapath,如果为None,证明没有该交换机

        datapath = self.switches.get(dpid)

        entry_port = entry['port']

        entry_mac = entry['mac']

        if datapath is not None:

            parser = datapath.ofproto_parser

            # # 如果entry_port不在mac_table中

            if entry_port not in mac_table.values():

# 下发流表

                for mac, port in mac_table.items():

                    # from known device to new device

                    actions = [parser.OFPActionOutput(entry_port)]

                    match = parser.OFPMatch(in_port=port, eth_dst=entry_mac)

                    self.add_flow(datapath, 1, match, actions)

                    # from new device to known device

                    actions = [parser.OFPActionOutput(port)]

                    match = parser.OFPMatch(in_port=entry_port, eth_dst=mac)

                    self.add_flow(datapath, 1, match, actions)

# 添加entry_mac, entry_port到mac_table

                mac_table.update({entry_mac: entry_port})

        return mac_table

class SimpleSwitchController(ControllerBase):

"""

定义收到HTTP请求时所需要回应的操作

"""

    def __init__(self, req, link, data, **config):

        super(SimpleSwitchController, self).__init__(req, link, data, **config)

        self.simple_switch_app = data[simple_switch_instance_name]

# 借助route装饰器关联方法和URL, 

# 第一个参数:任何自定义名称; 

# 第二个参数:指明URL;

# 第三个参数:指定http方法;

# 第四个参数:指明指定位置的格式,URL(/simpleswitch/mactable/{dpid} 匹配DPID_PATTERN的描述

# 当使用GET方式访问到该REST API接口时,调用list_mac_table函数!!

    @route('simpleswitch', url, methods=['GET'],

           requirements={'dpid': dpid_lib.DPID_PATTERN})

    def list_mac_table(self, req, **kwargs):

        simple_switch = self.simple_switch_app

        # 获取{dpid}

        dpid = dpid_lib.str_to_dpid(kwargs['dpid'])

# 如果没有dpid,返回404

        if dpid not in simple_switch.mac_to_port:

            return Response(status=404)

# 获取mac_table

        mac_table = simple_switch.mac_to_port.get(dpid, {})

        body = json.dumps(mac_table)

        return Response(content_type='application/json', body=body)

# 使用PUT方式设置mac_table

    @route('simpleswitch', url, methods=['PUT'],

           requirements={'dpid': dpid_lib.DPID_PATTERN})

    def put_mac_table(self, req, **kwargs):

        simple_switch = self.simple_switch_app

        dpid = dpid_lib.str_to_dpid(kwargs['dpid'])

        try:

            new_entry = req.json if req.body else {}

        except ValueError:

            raise Response(status=400)

        if dpid not in simple_switch.mac_to_port:

            return Response(status=404)

        try:

            mac_table = simple_switch.set_mac_to_port(dpid, new_entry)

            body = json.dumps(mac_table)

            return Response(content_type='application/json', body=body)

        except Exception as e:

            return Response(status=500)

2.2.4 ryu/app/gui_topology/gui_topology.py实现功能为:定期检查网络状态

ryu4.3版本和ryu3.14版本打开gui方式有所不同,本人安装版本为4.3。进入gui_topology文件夹进行,并运行如下命令

ryu-manager –observe-links gui_topology.py

1

用mininet模拟了一个深度为3的树状网络拓扑,并连接到ryu,命令如下:

sudo mn –controller remote –mac –topo tree,2,2

1

连接成功后,访问http://localhost:8080(我的ryu和mininet均运行在本机),即可看到如下界面:

在这里插入图片描述

物理实验,打开一个openflow交换机,访问http://localhost:8080,看到界面如下:

在这里插入图片描述

2.2.5 /ryu/app/simple_switch_lacp_13.py实现的功能为:链路聚合

网络聚合( Link Aggregation )是由 IEEE802.1AX-2008 所制定的,多条实体线路合并为一条逻辑线路(即,多个物理端口汇聚在一起,形成一个逻辑端口),以实现出/入流量吞吐量在各成员端口的负荷分担,交换机根据用户配置的端口负荷分担策略决定网络封包从哪个成员端口发送到对端的交换机。当交换机检测到其中一个成员端口的链路发生故障时,就停止在此端口上发送封包,并根据负荷分担策略在剩下的链路中重新计算报文的发送端口,故障端口回复再次担任收发端口。

因此链路聚合的实现可以让网络中特定的装置间通信速度提升、同时确保支援能力、提升容错的功能。可通过LACP( Link Aggregation Control Protocol )协议动态方法实现。

from ryu.base import app_manager

from ryu.controller import ofp_event

from ryu.controller.handler import CONFIG_DISPATCHER

from ryu.controller.handler import MAIN_DISPATCHER

from ryu.controller.handler import set_ev_cls

from ryu.ofproto import ofproto_v1_3

from ryu.lib import lacplib

from ryu.lib.dpid import str_to_dpid

from ryu.lib.packet import packet

from ryu.lib.packet import ethernet

from ryu.app import simple_switch_13

class SimpleSwitchLacp13(simple_switch_13.SimpleSwitch13):

    OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]

    _CONTEXTS = {'lacplib': lacplib.LacpLib}

    def __init__(self, *args, **kwargs):

        super(SimpleSwitchLacp13, self).__init__(*args, **kwargs)

        self.mac_to_port = {}

        # 通过成员变量_CONTEXTS,可以通过kwargs进行实例化一个lacplib的应用程序。

        self._lacp = kwargs['lacplib']

        # 通过LACP的add()方法来完成初始化设置,

        # 以下代码的初始化设置为:

        # datapath ID为0000000000000001的OpenFlow交换机的端口1和端口2整合为一个网络聚合群组

        self._lacp.add(

            dpid=str_to_dpid('0000000000000001'), ports=[1, 2])

    def del_flow(self, datapath, match):

    """

    当端口有效/无效状态变更时,被逻辑链路所使用的实体链路会因为数据包的通过而改变状态。

    因此,需要删除已经被记录的流表项

    """

        ofproto = datapath.ofproto

        parser = datapath.ofproto_parser

        mod = parser.OFPFlowMod(datapath=datapath,

                                command=ofproto.OFPFC_DELETE,

                                out_port=ofproto.OFPP_ANY,

                                out_group=ofproto.OFPG_ANY,

                                match=match)

        datapath.send_msg(mod)

    @set_ev_cls(lacplib.EventPacketIn, MAIN_DISPATCHER)

    def _packet_in_handler(self, ev):

    """

    需要由lacplib.EventPacketIn来装饰自定义的Packet-In函数

    """

        msg = ev.msg

        datapath = msg.datapath

        ofproto = datapath.ofproto

        parser = datapath.ofproto_parser

        in_port = msg.match['in_port']

        pkt = packet.Packet(msg.data)

        eth = pkt.get_protocols(ethernet.ethernet)[0]

        dst = eth.dst

        src = eth.src

        dpid = datapath.id

        self.mac_to_port.setdefault(dpid, {})

        self.logger.info("packet in %s %s %s %s", dpid, src, dst, in_port)

        # learn a mac address to avoid FLOOD next time.

        self.mac_to_port[dpid][src] = in_port

        if dst in self.mac_to_port[dpid]:

            out_port = self.mac_to_port[dpid][dst]

        else:

            out_port = ofproto.OFPP_FLOOD

        actions = [parser.OFPActionOutput(out_port)]

        # install a flow to avoid packet_in next time

        if out_port != ofproto.OFPP_FLOOD:

            match = parser.OFPMatch(in_port=in_port, eth_dst=dst)

            self.add_flow(datapath, 1, match, actions)

        data = None

        if msg.buffer_id == ofproto.OFP_NO_BUFFER:

            data = msg.data

        out = parser.OFPPacketOut(datapath=datapath, buffer_id=msg.buffer_id,

                                  in_port=in_port, actions=actions, data=data)

        datapath.send_msg(out)

    @set_ev_cls(lacplib.EventSlaveStateChanged, MAIN_DISPATCHER)

    def _slave_state_changed_handler(self, ev):

    """

    当端口的状态变更为有效或者无效时,需要通过EventSlaveStateChanged事件来进行处理

    """

        datapath = ev.datapath

        dpid = datapath.id

        port_no = ev.port

        enabled = ev.enabled

        self.logger.info("slave state changed port: %d enabled: %s",

                         port_no, enabled)

        if dpid in self.mac_to_port:

            for mac in self.mac_to_port[dpid]:

                match = datapath.ofproto_parser.OFPMatch(eth_dst=mac)

                self.del_flow(datapath, match)

            del self.mac_to_port[dpid]

        self.mac_to_port.setdefault(dpid, {})

2.2.6 /ryu/app/simple_switch_stp_13.py实现功能为:生成树协议

生成树是为了防止网络的拓扑中出现环路而产生广播风暴、占用交换机大量资源的技术。生成树有许多种类,例如STP、RSTP、PVST+、MSTP等不同的种类。详细介绍参考博客。

下边将介绍Ryu中的STP协议的实现

STP可以消除网络中的环路。其基本理论依据是根据网络拓扑构建(生成)无回路的连通图(就是树),从而保证数据传输路径的唯一性,避免出现环路导致报文流量的增加和循环。STP是工作在OSI第二层(Data Link Layer)的协议,通过在交换机之间传递特殊的消息并进行分布式的计算,来决定在一个有环路的网络中,某台交换机的某个端口应该被阻塞,用这种方法来避免掉环路。

STP作用: 消除环路:通过阻断冗余链路来消除网络中可能存在的环路;链路备份:当活动路径发生故障时,激活备份链路,及时恢复网络连通性。

from ryu.base import app_manager

from ryu.controller import ofp_event

from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER

from ryu.controller.handler import set_ev_cls

from ryu.ofproto import ofproto_v1_3

from ryu.lib import dpid as dpid_lib

from ryu.lib import stplib

from ryu.lib.packet import packet

from ryu.lib.packet import ethernet

from ryu.app import simple_switch_13

class SimpleSwitch13(simple_switch_13.SimpleSwitch13):

    OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]

    _CONTEXTS = {'stplib': stplib.Stp}

    def __init__(self, *args, **kwargs):

        super(SimpleSwitch13, self).__init__(*args, **kwargs)

        self.mac_to_port = {}

        self.stp = kwargs['stplib']

        # Sample of stplib config.

        #  please refer to stplib.Stp.set_config() for details.

        config = {dpid_lib.str_to_dpid('0000000000000001'):

                  {'bridge': {'priority': 0x8000}},

                  dpid_lib.str_to_dpid('0000000000000002'):

                  {'bridge': {'priority': 0x9000}},

                  dpid_lib.str_to_dpid('0000000000000003'):

                  {'bridge': {'priority': 0xa000}}}

        self.stp.set_config(config)

    def delete_flow(self, datapath):

        ofproto = datapath.ofproto

        parser = datapath.ofproto_parser

        for dst in self.mac_to_port[datapath.id].keys():

            match = parser.OFPMatch(eth_dst=dst)

            mod = parser.OFPFlowMod(

                datapath, command=ofproto.OFPFC_DELETE,

                out_port=ofproto.OFPP_ANY, out_group=ofproto.OFPG_ANY,

                priority=1, match=match)

            datapath.send_msg(mod)

    @set_ev_cls(stplib.EventPacketIn, MAIN_DISPATCHER)

    def _packet_in_handler(self, ev):

        msg = ev.msg

        datapath = msg.datapath

        ofproto = datapath.ofproto

        parser = datapath.ofproto_parser

        in_port = msg.match['in_port']

        pkt = packet.Packet(msg.data)

        eth = pkt.get_protocols(ethernet.ethernet)[0]

        dst = eth.dst

        src = eth.src

        dpid = datapath.id

        self.mac_to_port.setdefault(dpid, {})

        self.logger.info("packet in %s %s %s %s", dpid, src, dst, in_port)

        # learn a mac address to avoid FLOOD next time.

        self.mac_to_port[dpid][src] = in_port

        if dst in self.mac_to_port[dpid]:

            out_port = self.mac_to_port[dpid][dst]

        else:

            out_port = ofproto.OFPP_FLOOD

        actions = [parser.OFPActionOutput(out_port)]

        # install a flow to avoid packet_in next time

        if out_port != ofproto.OFPP_FLOOD:

            match = parser.OFPMatch(in_port=in_port, eth_dst=dst)

            self.add_flow(datapath, 1, match, actions)

        data = None

        if msg.buffer_id == ofproto.OFP_NO_BUFFER:

            data = msg.data

        out = parser.OFPPacketOut(datapath=datapath, buffer_id=msg.buffer_id,

                                  in_port=in_port, actions=actions, data=data)

        datapath.send_msg(out)

    @set_ev_cls(stplib.EventTopologyChange, MAIN_DISPATCHER)

    def _topology_change_handler(self, ev):

        dp = ev.dp

        dpid_str = dpid_lib.dpid_to_str(dp.id)

        msg = 'Receive topology change event. Flush MAC table.'

        self.logger.debug("[dpid=%s] %s", dpid_str, msg)

        if dp.id in self.mac_to_port:

            self.delete_flow(dp)

            del self.mac_to_port[dp.id]

    @set_ev_cls(stplib.EventPortStateChange, MAIN_DISPATCHER)

    def _port_state_change_handler(self, ev):

        dpid_str = dpid_lib.dpid_to_str(ev.dp.id)

        of_state = {stplib.PORT_STATE_DISABLE: 'DISABLE',

                    stplib.PORT_STATE_BLOCK: 'BLOCK',

                    stplib.PORT_STATE_LISTEN: 'LISTEN',

                    stplib.PORT_STATE_LEARN: 'LEARN',

                    stplib.PORT_STATE_FORWARD: 'FORWARD'}

        self.logger.debug("[dpid=%s][port=%d] state=%s",

                          dpid_str, ev.port_no, of_state[ev.port_state])

3 RYU的运行

3.1 RYU基本操作

a、RYU使用命令

使用命令格式如下:

ryu-manager <yourapp>

如,进入ryu/app目录,运行simple_switch_13.py

ryu-manager simple_switch_13.py

b、RYU启动参数

ryu-manager 启动ryu, 如果不加任何参数,则默认启动ofphandler模块。

ryu-manager –h 查看帮助信息

–verbose 打印详细debug信息

–version 显示程序的版本号并退出

–observe-links 自动下发LLDP,用于拓扑发现

–ofp-tcp-listen-port  修改ryu的openflow tcp监听端口

说明:若在启动RYU时使用了–observe-links参数,则RYU会收到非常大量的包含LLDP协议报文的PacketIn消息如果不对这一PacketIn消息进行特殊处理的话,很容易导致Ryu奔溃,无法正常工作!!!所以,为了避免这一问题,当你计划使用–observe-links启动Ryu时,在你处理PacketIn消息的函数开头,建议包含如下代码,即可解决问题:

 if eth.ethertype == ether_types.ETH_TYPE_LLDP:

  # ignore lldp packet

  return

3.2 RYU运行流程

RYU的main函数在ryu/cmd/maneger.py文件中,main函数主要内容如下代码。

在main函数中,首先从CONF文件中读取出app list。如果ryu-manager命令中不带任何参数,则默认应用为ofp_handler应用

NT: ofp_handler应用主要用于处理OpenFlow消息,完成了基本的消息处理,如hello_handler,用于处理hello报文

紧接着实例化一个AppManager对象,调用load_apps函数将应用加载。调用create_contexts函数创建对应的contexts

然后调用instantiate_apps函数将app_list和context中的app均实例化。启动wsgi架构,提供web应用

最后将所有的应用作为任务,作为coroutine的task去执行,joinall使得程序必须等待所有的task都执行完成才可以退出程序。最后调用close函数,关闭程序,释放资源

def main(args=None, prog=None):

    _parse_user_flags()

    # 根据配置项的注册,读取配置文件/usr/loca/etc/ryu/ryu.conf的配置

    try:

        CONF(args=args, prog=prog,

             project='ryu', version='ryu-manager %s' % version,

             default_config_files=['/usr/local/etc/ryu/ryu.conf'])

    except cfg.ConfigFilesNotFoundError:

        CONF(args=args, prog=prog,

             project='ryu', version='ryu-manager %s' % version)

    log.init_log()  # 初始化打印log

    logger = logging.getLogger(__name__)

# 根据配置文件的配置执行 log、pidfile

    if CONF.enable_debugger:

        msg = 'debugging is available (–enable-debugger option is turned on)'

        logger.info(msg)

    else:

        hub.patch(thread=True)

    if CONF.pid_file:

        with open(CONF.pid_file, 'w') as pid_file:

            pid_file.write(str(os.getpid()))

# 启动applist中的应用,若applist为空,则启动ofp_handler应用

    app_lists = CONF.app_lists + CONF.app

    # keep old behavior, run ofp if no application is specified.

    if not app_lists:

        app_lists = ['ryu.controller.ofp_handler']

    app_mgr = AppManager.get_instance()  # 在AppManager类中获取实例

    app_mgr.load_apps(app_lists)  # 加载App

    contexts = app_mgr.create_contexts()  # 创建运行环境,"dpset"/"wsgi"

    services = []

    services.extend(app_mgr.instantiate_apps(**contexts))

# 启动App线程,App实例化

    # ryu.controller.dpset.DPSet / rest_firewall.RestFirewallAPI / ryu.controller.ofp_handler.OFPHandler

    webapp = wsgi.start_service(app_mgr)  # webapp启动

    if webapp:

        thr = hub.spawn(webapp)

        services.append(thr)

    try:

        hub.joinall(services)  # 调用t.wait(),执行等待,wait()方法使当前线程暂停执行并释放对象锁标志

       # 循环join,直到有异常或者外部中断推迟

    except KeyboardInterrupt:

        logger.debug("Keyboard Interrupt received. "

                     "Closing RYU application manager…")

    finally:

        app_mgr.close()

 

NT: Datapath类在RYU中极为重要,位于ryu/controller/controller.py中,每当一个datapath实体与控制器建立连接时,就会实例化一个Datapath的对象。Datapath为OVS内核模块,类似网桥,负责执行数据交换,也就是把从接受端口收到的数据包在流表中进行匹配,并执行匹配到的动作,一个Datapath关联一个flow table,一个flow table包含多个条目。

3.3 RYU应用开发

ryu/base/app_manager.py文件中,实现了两个类RyuApp和AppManager。

RyuApp类是RYU封装好的APP基类,为应用开发提供基本的模板,用户只需要继承该类,即可开发应用,而注册对应的observer和handler都使用@derocator的形式,使得开发非常的简单高效。

AppManager类用于Application的管理,加载应用,运行应用,消息路由等功能,是RYU应用的调度中心。

3.3.1 Event Handle

在实现RYU各项功能时,需要用到事件管理(Event Handle)。因为对于RYU来说,接收到的任何一个OpenFlow消息都会产生一个相对应的事件。而在RYU的应用程序开发时,必须实现事件管理以处理相对应发生的事件。

Event Handle是一个拥有事件物件(Event Object)作为参数,并且使用"ryu.controller.handler.set_ev_cls"装饰的函数。

set_ev_cls 参数包括:指定事件类别得以接受消息、 交换机状态,其中

事件类别:名称命名规则ryu.controller.ofp_event.EventOFP + <OpenFlow消息名称>,例如在 Packet-In 消息的状态下的事件名称为EventOFPPacketIn。OpenFlow消息在ryu\ofproto\ofproto_v1_X_parser中可以查看。详细内容可参考RYU的API资料,或者这篇博客

交换机状态:对于交换机状态来说,可指定以下其一

ryu.controller.handler.HANDSHAKE_DISPATCHER 交换HELLO 消息

ryu.controller.handler.CONFIG_DISPATCHER 接收 SwitchFeatures消息

ryu.controller.handler.MAIN_DISPATCHER 一般状态

ryu.controller.handler.DEAD_DISPATCHER 连线中断

因此,处理事件函数的标准模板如下:

@set_ev_cls(ofp_event.Event, DISPATCHER(s))

def your_function(self, ev):

简单说,@set_ev_cls(ofp_event.Event, DISPATCHER(s))的含义就是,当接收DISPATCHER(s)情况的Event事件进行your_function处理。

3.3.2 源代码其他细节说明

OpenFlow协议中的细节:Match、Instructions 和 Action 在 OpenFlow 协议中的细节参考链接

ofproto函数库的使用:ofproto函数库是用来产生和解析OpenFlow消息的函数库。可参考链接

数据包处理协议:Ryu中提供了许多解析和包装数据包的协议,比如ARP、BGP、IPV4等,可参考链接

of-config函数库:of-config是用来管理OpenFlow交换机的一个通讯协议。of-config通讯协议被定义在 NETCONF(RFC 6241)标准中,可以对逻辑交换机的Port和Queue进行设定,参考链接

4 常见问题

4.1 ryu-manager运行报错

当ryu-manager被多次执行,或者ryu的监听端口6633被占用时,会运行报错。ryu控制器默认使用6633端口,因此应该首先查看哪个进程占用了这个端口,执行命令sudo lsof -i :6633。若该端口被占用,有如下两种方案解决办法:

直接将占用进程kill掉:sudo kill -9 pid(进程号)

将ryu的端口号设为其他不被占用的端口,如5555端口:ryu-manager –ofp-tcp-listen-port 5555 -verbose

参考

本文部分内容参考:

https://www.cnblogs.com/zxqstrong/p/4789105.html

https://osrg.github.io/ryu-book/zh_tw/html/index.html

————————————————

版权声明:本文为CSDN博主「梵高的向日葵丶」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

原文链接:https://blog.csdn.net/weixin_42094589/article/details/104160571