2584 lines
110 KiB
Diff
2584 lines
110 KiB
Diff
Author: wangkuntian <wangkuntian@uniontech.com>
|
|
Date: Fri Oct 13 16:25:17 2023 +0800
|
|
|
|
feat: add distributed traffic feature
|
|
---
|
|
agent/l3/dvr_edge_ha_router.py | 4 +-
|
|
agent/l3/extensions/rg_port_forwarding.py | 398 ++++++++++++++++++++++++++++
|
|
agent/l3/ha.py | 9 -
|
|
agent/l3/ha_router.py | 74 +++---
|
|
agent/l3/keepalived_state_change.py | 12 +-
|
|
agent/l3/router_info.py | 19 +-
|
|
agent/linux/dhcp.py | 137 +++++++++-
|
|
agent/linux/interface.py | 43 +--
|
|
agent/linux/keepalived.py | 1 +
|
|
api/rpc/agentnotifiers/dhcp_rpc_agent_api.py | 9 +
|
|
api/rpc/callbacks/resources.py | 3 +
|
|
conf/agent/l3/keepalived.py | 2 +
|
|
conf/common.py | 16 +-
|
|
conf/policies/__init__.py | 2 +
|
|
conf/policies/rg_port_forwarding.py | 76 ++++++
|
|
db/l3_attrs_db.py | 7 +-
|
|
db/l3_db.py | 27 +-
|
|
db/l3_hamode_db.py | 97 +++++++
|
|
db/migration/alembic_migrations/versions/EXPAND_HEAD | 2 +-
|
|
.../train/expand/1c19a98b5eef_add_router_configurations.py | 36 +++
|
|
.../expand/cab12b72ed90_add_router_gateway_port_forwarding.py | 55 ++++
|
|
db/models/l3_attrs.py | 2 +
|
|
db/models/rg_port_forwarding.py | 59 +++++
|
|
extensions/rg_port_forwarding.py | 119 +++++++++
|
|
objects/rg_port_forwarding.py | 87 ++++++
|
|
objects/router.py | 15 +-
|
|
scheduler/l3_agent_scheduler.py | 107 ++++++--
|
|
services/l3_router/l3_router_plugin.py | 12 +
|
|
services/rg_portforwarding/__init__.py | 0
|
|
services/rg_portforwarding/common/__init__.py | 0
|
|
services/rg_portforwarding/common/exceptions.py | 77 ++++++
|
|
services/rg_portforwarding/pf_plugin.py | 369 ++++++++++++++++++++++++++
|
|
32 files changed, 1749 insertions(+), 127 deletions(-)
|
|
|
|
diff --git a/agent/l3/dvr_edge_ha_router.py b/agent/l3/dvr_edge_ha_router.py
|
|
index 71f740bef9..b92f70b70f 100644
|
|
--- a/agent/l3/dvr_edge_ha_router.py
|
|
+++ b/agent/l3/dvr_edge_ha_router.py
|
|
@@ -114,9 +114,7 @@ class DvrEdgeHaRouter(dvr_edge_router.DvrEdgeRouter,
|
|
|
|
def _external_gateway_added(self, ex_gw_port, interface_name,
|
|
ns_name, preserve_ips):
|
|
- link_up = self.external_gateway_link_up()
|
|
- self._plug_external_gateway(ex_gw_port, interface_name, ns_name,
|
|
- link_up=link_up)
|
|
+ self._plug_external_gateway(ex_gw_port, interface_name, ns_name)
|
|
|
|
def _is_this_snat_host(self):
|
|
return self.agent_conf.agent_mode == constants.L3_AGENT_MODE_DVR_SNAT
|
|
diff --git a/agent/l3/extensions/rg_port_forwarding.py b/agent/l3/extensions/rg_port_forwarding.py
|
|
new file mode 100644
|
|
index 0000000000..a159e4df34
|
|
--- /dev/null
|
|
+++ b/agent/l3/extensions/rg_port_forwarding.py
|
|
@@ -0,0 +1,398 @@
|
|
+# Copyright (c) 2023 UnionTech
|
|
+# All rights reserved
|
|
+#
|
|
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
+# not use this file except in compliance with the License. You may obtain
|
|
+# a copy of the License at
|
|
+#
|
|
+# http://www.apache.org/licenses/LICENSE-2.0
|
|
+#
|
|
+# Unless required by applicable law or agreed to in writing, software
|
|
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
+# License for the specific language governing permissions and limitations
|
|
+# under the License.
|
|
+
|
|
+import collections
|
|
+from typing import Optional, List
|
|
+from oslo_concurrency import lockutils
|
|
+from oslo_log import log as logging
|
|
+
|
|
+from neutron_lib import constants
|
|
+from neutron_lib.rpc import Connection
|
|
+from neutron_lib.context import Context
|
|
+from neutron_lib.agent import l3_extension
|
|
+
|
|
+from neutron.agent.linux.ip_lib import IPDevice
|
|
+from neutron.agent.l3.router_info import RouterInfo
|
|
+from neutron.agent.linux.iptables_manager import IptablesManager
|
|
+
|
|
+from neutron.api.rpc.handlers import resources_rpc
|
|
+from neutron.api.rpc.callbacks import resources, events
|
|
+from neutron.api.rpc.callbacks.consumer import registry
|
|
+
|
|
+from neutron.common import coordination
|
|
+
|
|
+from neutron.objects.ports import Port
|
|
+from neutron.objects.router import Router
|
|
+from neutron.objects.rg_port_forwarding import RGPortForwarding
|
|
+
|
|
+LOG = logging.getLogger(__name__)
|
|
+
|
|
+PORT_FORWARDING_PREFIX = 'rg_portforwarding-'
|
|
+DEFAULT_PORT_FORWARDING_CHAIN = 'rg-pf'
|
|
+PORT_FORWARDING_CHAIN_PREFIX = 'pf-'
|
|
+
|
|
+
|
|
+def _get_port_forwarding_chain_name(pf_id):
|
|
+ chain_name = PORT_FORWARDING_CHAIN_PREFIX + pf_id
|
|
+ return chain_name[:constants.MAX_IPTABLES_CHAIN_LEN_WRAP]
|
|
+
|
|
+
|
|
+class RGPortForwardingMapping(object):
|
|
+ def __init__(self):
|
|
+ self.managed_port_forwardings = {}
|
|
+ self.router_pf_mapping = collections.defaultdict(set)
|
|
+
|
|
+ @lockutils.synchronized('rg-port-forwarding-cache')
|
|
+ def check_port_forwarding_changes(self, new_pf: RGPortForwarding) -> bool:
|
|
+ old_pf = self.managed_port_forwardings.get(new_pf.id)
|
|
+ return old_pf != new_pf
|
|
+
|
|
+ @lockutils.synchronized('rg-port-forwarding-cache')
|
|
+ def set_port_forwardings(self, port_forwardings: List[RGPortForwarding]):
|
|
+ for port_forwarding in port_forwardings:
|
|
+ self._set_router_port_forwarding(port_forwarding,
|
|
+ port_forwarding.router_id)
|
|
+
|
|
+ def _set_router_port_forwarding(self,
|
|
+ port_forwarding: RGPortForwarding,
|
|
+ router_id: str):
|
|
+ self.router_pf_mapping[router_id].add(port_forwarding.id)
|
|
+ self.managed_port_forwardings[port_forwarding.id] = port_forwarding
|
|
+
|
|
+ @lockutils.synchronized('rg-port-forwarding-cache')
|
|
+ def update_port_forwardings(self, port_forwardings):
|
|
+ for port_forwarding in port_forwardings:
|
|
+ self.managed_port_forwardings[port_forwarding.id] = port_forwarding
|
|
+
|
|
+ @lockutils.synchronized('rg-port-forwarding-cache')
|
|
+ def del_port_forwardings(self, port_forwardings):
|
|
+ for port_forwarding in port_forwardings:
|
|
+ if not self.managed_port_forwardings.get(port_forwarding.id):
|
|
+ continue
|
|
+ self.managed_port_forwardings.pop(port_forwarding.id)
|
|
+ self.router_pf_mapping[port_forwarding.router_id].discard(
|
|
+ port_forwarding.id)
|
|
+ if not self.router_pf_mapping[port_forwarding.router_id]:
|
|
+ self.router_pf_mapping.pop(port_forwarding.router_id)
|
|
+
|
|
+ @lockutils.synchronized('rg-port-forwarding-cache')
|
|
+ def clean_port_forwardings_by_router_id(self, router_id: str):
|
|
+ pf_ids = self.router_pf_mapping.pop(router_id, [])
|
|
+ for pf_id in pf_ids:
|
|
+ self.managed_port_forwardings.pop(pf_id, None)
|
|
+
|
|
+
|
|
+class RGPortForwardingAgentExtension(l3_extension.L3AgentExtension):
|
|
+ SUPPORTED_RESOURCE_TYPES = [resources.RGPORTFORWARDING]
|
|
+
|
|
+ def consume_api(self, agent_api):
|
|
+ self.agent_api = agent_api
|
|
+
|
|
+ def initialize(self, connection, driver_type):
|
|
+ self.mapping = RGPortForwardingMapping()
|
|
+ self.resource_rpc = resources_rpc.ResourcesPullRpcApi()
|
|
+ self._register_rpc_consumers()
|
|
+
|
|
+ def _register_rpc_consumers(self):
|
|
+ registry.register(self._handle_notification,
|
|
+ resources.RGPORTFORWARDING)
|
|
+ self._connection = Connection()
|
|
+ endpoints = [resources_rpc.ResourcesPushRpcCallback()]
|
|
+ topic = resources_rpc.resource_type_versioned_topic(
|
|
+ resources.RGPORTFORWARDING)
|
|
+ self._connection.create_consumer(topic, endpoints, fanout=True)
|
|
+ self._connection.consume_in_threads()
|
|
+
|
|
+ def _handle_notification(self, context: Context,
|
|
+ resource_type: str,
|
|
+ forwardings, event_type):
|
|
+ for forwarding in forwardings:
|
|
+ self._process_port_forwarding_event(
|
|
+ context, forwarding, event_type)
|
|
+
|
|
+ def _get_gw_port_and_ip(self,
|
|
+ ri: RouterInfo) -> (Optional[Port], Optional[str]):
|
|
+ ex_gw_port = ri.get_ex_gw_port()
|
|
+ ex_gw_port_ip = self._get_gw_port_ip(ex_gw_port)
|
|
+ if not ex_gw_port_ip:
|
|
+ LOG.error(f"Router {ri.router_id} external port "
|
|
+ f"{ex_gw_port['id']} does not have any IP addresses")
|
|
+ return None, None
|
|
+ return ex_gw_port, ex_gw_port_ip
|
|
+
|
|
+ def _process_port_forwarding_event(self, context: Context,
|
|
+ port_forwarding: RGPortForwarding,
|
|
+ event_type: str):
|
|
+ router_id = port_forwarding.router_id
|
|
+ ri = self._get_router_info(router_id)
|
|
+ if not self._check_if_need_process(ri, force=True):
|
|
+ return
|
|
+
|
|
+ ex_gw_port, ex_gw_port_ip = self._get_gw_port_and_ip(ri)
|
|
+ if not ex_gw_port or not ex_gw_port_ip:
|
|
+ return
|
|
+
|
|
+ (interface_name, namespace,
|
|
+ iptables_manager) = self._get_resource_by_router(ri, ex_gw_port)
|
|
+
|
|
+ if event_type == events.CREATED:
|
|
+ self._process_create([port_forwarding], ri, interface_name,
|
|
+ ex_gw_port_ip, namespace, iptables_manager)
|
|
+ elif event_type == events.UPDATED:
|
|
+ self._process_update([port_forwarding], interface_name,
|
|
+ ex_gw_port_ip, namespace, iptables_manager)
|
|
+ elif event_type == events.DELETED:
|
|
+ self._process_delete([port_forwarding], interface_name,
|
|
+ ex_gw_port_ip, namespace, iptables_manager)
|
|
+
|
|
+ def ha_state_change(self, context: Context, data: Router) -> None:
|
|
+ pass
|
|
+
|
|
+ def update_network(self, context: Context, data: dict) -> None:
|
|
+ pass
|
|
+
|
|
+ def add_router(self, context: Context, data: Router) -> None:
|
|
+ LOG.info(f"call add_router for {data['id']}")
|
|
+ self.process_port_forwarding(context, data)
|
|
+
|
|
+ def update_router(self, context: Context, data: Router) -> None:
|
|
+ LOG.info(f"call update_router for {data['id']}")
|
|
+ self.process_port_forwarding(context, data)
|
|
+
|
|
+ def delete_router(self, context: Context, data: Router) -> None:
|
|
+ self.mapping.clean_port_forwardings_by_router_id(data['id'])
|
|
+
|
|
+ def _get_router_info(self, router_id) -> Optional[RouterInfo]:
|
|
+ router_info = self.agent_api.get_router_info(router_id)
|
|
+ if router_info:
|
|
+ return router_info
|
|
+ LOG.debug("Router %s is not managed by this agent. "
|
|
+ "It was possibly deleted concurrently.", router_id)
|
|
+
|
|
+ @staticmethod
|
|
+ def _check_if_need_process(ri: RouterInfo, force: bool = False) -> bool:
|
|
+ if not ri or not ri.get_ex_gw_port():
|
|
+ return False
|
|
+
|
|
+ if force:
|
|
+ return True
|
|
+
|
|
+ is_distributed = ri.router.get('distributed')
|
|
+ agent_mode = ri.agent_conf.agent_mode
|
|
+ if (is_distributed and
|
|
+ agent_mode in [constants.L3_AGENT_MODE_DVR_NO_EXTERNAL,
|
|
+ constants.L3_AGENT_MODE_DVR]):
|
|
+ # just support centralized cases
|
|
+ return False
|
|
+
|
|
+ if is_distributed and not ri.snat_namespace.exists():
|
|
+ return False
|
|
+
|
|
+ return True
|
|
+
|
|
+ def process_port_forwarding(self, context: Context, data: Router):
|
|
+ ri = self._get_router_info(data['id'])
|
|
+ if not self._check_if_need_process(ri):
|
|
+ return
|
|
+ self.check_local_port_forwardings(context, ri)
|
|
+
|
|
+ @staticmethod
|
|
+ def _get_gw_port_ip(gw_port: dict) -> Optional[str]:
|
|
+ fixed_ips = gw_port.get('fixed_ips', [])
|
|
+ if not fixed_ips:
|
|
+ return
|
|
+ return fixed_ips[0].get('ip_address', None)
|
|
+
|
|
+ @staticmethod
|
|
+ def _get_resource_by_router(ri: RouterInfo, ex_gw_port: dict) -> (
|
|
+ str, str, IptablesManager):
|
|
+ is_distributed = ri.router.get('distributed')
|
|
+ if is_distributed:
|
|
+ interface_name = ri.get_snat_external_device_interface_name(
|
|
+ ex_gw_port)
|
|
+ namespace = ri.snat_namespace.name
|
|
+ iptables_manager = ri.snat_iptables_manager
|
|
+ else:
|
|
+ interface_name = ri.get_external_device_interface_name(ex_gw_port)
|
|
+ namespace = ri.ns_name
|
|
+ iptables_manager = ri.iptables_manager
|
|
+
|
|
+ return interface_name, namespace, iptables_manager
|
|
+
|
|
+ def check_local_port_forwardings(self, context: Context, ri: RouterInfo):
|
|
+ pfs = self.resource_rpc.bulk_pull(
|
|
+ context, resources.RGPORTFORWARDING,
|
|
+ filter_kwargs={'router_id': ri.router_id})
|
|
+ if not pfs:
|
|
+ return
|
|
+ ex_gw_port, ex_gw_port_ip = self._get_gw_port_and_ip(ri)
|
|
+ if not ex_gw_port_ip or not ex_gw_port_ip:
|
|
+ return
|
|
+ (interface_name, namespace,
|
|
+ iptables_manager) = self._get_resource_by_router(ri, ex_gw_port)
|
|
+ local_pfs = set(self.mapping.managed_port_forwardings.keys())
|
|
+ new_pfs = []
|
|
+ updated_pfs = []
|
|
+ current_pfs = set()
|
|
+ for pf in pfs:
|
|
+ if pf.id in self.mapping.managed_port_forwardings:
|
|
+ if self.mapping.check_port_forwarding_changes(pf):
|
|
+ updated_pfs.append(pf)
|
|
+ else:
|
|
+ new_pfs.append(pf)
|
|
+ current_pfs.add(pf.id)
|
|
+
|
|
+ remove_pf_ids_set = local_pfs - current_pfs
|
|
+ remove_pfs = [self.mapping.managed_port_forwardings[pf_id]
|
|
+ for pf_id in remove_pf_ids_set]
|
|
+
|
|
+ self._process_create(new_pfs, ri, interface_name, ex_gw_port_ip,
|
|
+ namespace, iptables_manager)
|
|
+
|
|
+ self._process_update(updated_pfs, interface_name, ex_gw_port_ip,
|
|
+ namespace, iptables_manager)
|
|
+
|
|
+ self._process_delete(remove_pfs, interface_name, ex_gw_port_ip,
|
|
+ namespace, iptables_manager)
|
|
+
|
|
+ @staticmethod
|
|
+ def _install_default_rules(iptables_manager: IptablesManager):
|
|
+ default_rule = '-j %s-%s' % (iptables_manager.wrap_name,
|
|
+ DEFAULT_PORT_FORWARDING_CHAIN)
|
|
+ LOG.info(f'Add default chain {DEFAULT_PORT_FORWARDING_CHAIN}')
|
|
+ LOG.info(f'Add default rule {default_rule}')
|
|
+ iptables_manager.ipv4['nat'].add_chain(DEFAULT_PORT_FORWARDING_CHAIN)
|
|
+ iptables_manager.ipv4['nat'].add_rule('PREROUTING', default_rule)
|
|
+ iptables_manager.apply()
|
|
+
|
|
+ @staticmethod
|
|
+ def _get_rg_rules(port_forward: RGPortForwarding, wrap_name: str):
|
|
+ chain_rule_list = []
|
|
+ pf_chain_name = _get_port_forwarding_chain_name(port_forward.id)
|
|
+ chain_rule_list.append(
|
|
+ (DEFAULT_PORT_FORWARDING_CHAIN, f'-j {wrap_name}-{pf_chain_name}'))
|
|
+ gw_ip_address = port_forward.gw_ip_address
|
|
+ protocol = port_forward.protocol
|
|
+ internal_ip_address = str(port_forward.internal_ip_address)
|
|
+ internal_port = port_forward.internal_port
|
|
+ external_port = port_forward.external_port
|
|
+ chain_rule = (
|
|
+ pf_chain_name,
|
|
+ f'-d {gw_ip_address}/32 -p {protocol} -m {protocol} '
|
|
+ f'--dport {external_port} '
|
|
+ f'-j DNAT --to-destination {internal_ip_address}:{internal_port}'
|
|
+ )
|
|
+ chain_rule_list.append(chain_rule)
|
|
+ return chain_rule_list
|
|
+
|
|
+ def _rule_apply(self,
|
|
+ iptables_manager: IptablesManager,
|
|
+ port_forwarding: RGPortForwarding,
|
|
+ rule_tag: str):
|
|
+ iptables_manager.ipv4['nat'].clear_rules_by_tag(rule_tag)
|
|
+ if (DEFAULT_PORT_FORWARDING_CHAIN not in
|
|
+ iptables_manager.ipv4['nat'].chains):
|
|
+ self._install_default_rules(iptables_manager)
|
|
+
|
|
+ for chain, rule in self._get_rg_rules(port_forwarding,
|
|
+ iptables_manager.wrap_name):
|
|
+ LOG.info(f'Add router gateway port forwarding '
|
|
+ f'rule {rule} in {chain}')
|
|
+ if chain not in iptables_manager.ipv4['nat'].chains:
|
|
+ iptables_manager.ipv4['nat'].add_chain(chain)
|
|
+ iptables_manager.ipv4['nat'].add_rule(chain, rule, tag=rule_tag)
|
|
+
|
|
+ def _store_local(self, pf_objs: List[RGPortForwarding], event_type: str):
|
|
+ if event_type == events.CREATED:
|
|
+ self.mapping.set_port_forwardings(pf_objs)
|
|
+ elif event_type == events.UPDATED:
|
|
+ self.mapping.update_port_forwardings(pf_objs)
|
|
+ elif event_type == events.DELETED:
|
|
+ self.mapping.del_port_forwardings(pf_objs)
|
|
+
|
|
+ def _process_create(self,
|
|
+ port_forwardings: List[RGPortForwarding],
|
|
+ ri: RouterInfo,
|
|
+ interface_name: str,
|
|
+ interface_ip: str,
|
|
+ namespace: str,
|
|
+ iptables_manager: IptablesManager):
|
|
+ if not port_forwardings:
|
|
+ return
|
|
+
|
|
+ ha_port = ri.router.get(constants.HA_INTERFACE_KEY, None)
|
|
+ if ha_port and ha_port['status'] == constants.PORT_STATUS_ACTIVE:
|
|
+ ri.enable_keepalived()
|
|
+
|
|
+ for port_forwarding in port_forwardings:
|
|
+ if port_forwarding.id in self.mapping.managed_port_forwardings:
|
|
+ LOG.debug("Skip port forwarding %s for create, as it had been "
|
|
+ "managed by agent", port_forwarding.id)
|
|
+ continue
|
|
+ rule_tag = PORT_FORWARDING_PREFIX + port_forwarding.id
|
|
+ port_forwarding.gw_ip_address = interface_ip
|
|
+ self._rule_apply(iptables_manager, port_forwarding, rule_tag)
|
|
+ iptables_manager.apply()
|
|
+ self._store_local(port_forwardings, events.CREATED)
|
|
+
|
|
+ def _process_update(self,
|
|
+ port_forwardings: List[RGPortForwarding],
|
|
+ interface_name: str,
|
|
+ interface_ip: str,
|
|
+ namespace: str,
|
|
+ iptables_manager: IptablesManager):
|
|
+ if not port_forwardings:
|
|
+ return
|
|
+ device = IPDevice(interface_name, namespace=namespace)
|
|
+ for port_forwarding in port_forwardings:
|
|
+ # check if port forwarding change from OVO and router rpc
|
|
+ if not self.mapping.check_port_forwarding_changes(port_forwarding):
|
|
+ LOG.debug("Skip port forwarding %s for update, as there is no "
|
|
+ "difference between the memory managed by agent",
|
|
+ port_forwarding.id)
|
|
+ continue
|
|
+ current_chain = _get_port_forwarding_chain_name(port_forwarding.id)
|
|
+ iptables_manager.ipv4['nat'].remove_chain(current_chain)
|
|
+ ori_pf = self.mapping.managed_port_forwardings[port_forwarding.id]
|
|
+ device.delete_socket_conntrack_state(interface_ip,
|
|
+ ori_pf.external_port,
|
|
+ protocol=ori_pf.protocol)
|
|
+ rule_tag = PORT_FORWARDING_PREFIX + port_forwarding.id
|
|
+ port_forwarding.gw_ip_address = interface_ip
|
|
+ self._rule_apply(iptables_manager, port_forwarding, rule_tag)
|
|
+ iptables_manager.apply()
|
|
+ self._store_local(port_forwardings, events.UPDATED)
|
|
+
|
|
+ @coordination.synchronized('router-lock-ns-{namespace}')
|
|
+ def _process_delete(self,
|
|
+ port_forwardings: List[RGPortForwarding],
|
|
+ interface_name: str,
|
|
+ interface_ip: str,
|
|
+ namespace: str,
|
|
+ iptables_manager: IptablesManager):
|
|
+ if not port_forwardings:
|
|
+ return
|
|
+ device = IPDevice(interface_name, namespace=namespace)
|
|
+ for port_forwarding in port_forwardings:
|
|
+ current_chain = _get_port_forwarding_chain_name(port_forwarding.id)
|
|
+ iptables_manager.ipv4['nat'].remove_chain(current_chain)
|
|
+ device.delete_socket_conntrack_state(
|
|
+ interface_ip,
|
|
+ port_forwarding.external_port,
|
|
+ protocol=port_forwarding.protocol)
|
|
+
|
|
+ iptables_manager.apply()
|
|
+
|
|
+ self._store_local(port_forwardings, events.DELETED)
|
|
diff --git a/agent/l3/ha.py b/agent/l3/ha.py
|
|
index 17891dc983..182fa68175 100644
|
|
--- a/agent/l3/ha.py
|
|
+++ b/agent/l3/ha.py
|
|
@@ -163,15 +163,6 @@ class AgentMixin(object):
|
|
'agent %(host)s',
|
|
state_change_data)
|
|
|
|
- # Set external gateway port link up or down according to state
|
|
- if state == 'master':
|
|
- ri.set_external_gw_port_link_status(link_up=True, set_gw=True)
|
|
- elif state == 'backup':
|
|
- ri.set_external_gw_port_link_status(link_up=False)
|
|
- else:
|
|
- LOG.warning('Router %s has status %s, '
|
|
- 'no action to router gateway device.',
|
|
- router_id, state)
|
|
# TODO(dalvarez): Fix bug 1677279 by moving the IPv6 parameters
|
|
# configuration to keepalived-state-change in order to remove the
|
|
# dependency that currently exists on l3-agent running for the IPv6
|
|
diff --git a/agent/l3/ha_router.py b/agent/l3/ha_router.py
|
|
index 0a21902771..ef10ff76e4 100644
|
|
--- a/agent/l3/ha_router.py
|
|
+++ b/agent/l3/ha_router.py
|
|
@@ -17,6 +17,7 @@ import shutil
|
|
import signal
|
|
|
|
import netaddr
|
|
+from typing import Optional, List
|
|
from neutron_lib.api.definitions import portbindings
|
|
from neutron_lib import constants as n_consts
|
|
from neutron_lib.utils import runtime
|
|
@@ -137,6 +138,22 @@ class HaRouter(router.RouterInfo):
|
|
else:
|
|
return False
|
|
|
|
+ @property
|
|
+ def configurations(self) -> Optional[dict]:
|
|
+ return self.router.get('configurations', {})
|
|
+
|
|
+ @property
|
|
+ def master(self) -> Optional[str]:
|
|
+ if self.configurations:
|
|
+ return self.configurations.get('master_agent', None)
|
|
+ return None
|
|
+
|
|
+ @property
|
|
+ def slaves(self) -> Optional[List[str]]:
|
|
+ if self.configurations:
|
|
+ return self.configurations.get('slave_agents', [])
|
|
+ return []
|
|
+
|
|
def initialize(self, process_monitor):
|
|
ha_port = self.router.get(n_consts.HA_INTERFACE_KEY)
|
|
if not ha_port:
|
|
@@ -162,19 +179,32 @@ class HaRouter(router.RouterInfo):
|
|
throttle_restart_value=(
|
|
self.agent_conf.ha_vrrp_advert_int * THROTTLER_MULTIPLIER))
|
|
|
|
+ # The following call is required to ensure that if the state path does
|
|
+ # not exist it gets created.
|
|
+ self.keepalived_manager.get_full_config_file_path('test')
|
|
+
|
|
config = self.keepalived_manager.config
|
|
|
|
interface_name = self.get_ha_device_name()
|
|
subnets = self.ha_port.get('subnets', [])
|
|
ha_port_cidrs = [subnet['cidr'] for subnet in subnets]
|
|
+ nopreempt = True
|
|
+ state = 'BACKUP'
|
|
+ priority = self.ha_priority
|
|
+ if self.slaves and self.master:
|
|
+ nopreempt = False
|
|
+ if self.master == self.agent_conf.host:
|
|
+ state = 'MASTER'
|
|
+ priority = keepalived.HA_DEFAULT_MASTER_PRIORITY
|
|
+
|
|
instance = keepalived.KeepalivedInstance(
|
|
- 'BACKUP',
|
|
+ state,
|
|
interface_name,
|
|
self.ha_vr_id,
|
|
ha_port_cidrs,
|
|
- nopreempt=True,
|
|
+ nopreempt=nopreempt,
|
|
advert_int=self.agent_conf.ha_vrrp_advert_int,
|
|
- priority=self.ha_priority,
|
|
+ priority=priority,
|
|
vrrp_health_check_interval=(
|
|
self.agent_conf.ha_vrrp_health_check_interval),
|
|
ha_conf_dir=self.keepalived_manager.get_conf_dir())
|
|
@@ -396,13 +426,16 @@ class HaRouter(router.RouterInfo):
|
|
ha_device = self.get_ha_device_name()
|
|
ha_cidr = self._get_primary_vip()
|
|
config_dir = self.keepalived_manager.get_conf_dir()
|
|
- state_change_log = (
|
|
- "%s/neutron-keepalived-state-change.log") % config_dir
|
|
+ state_change_log = f"{config_dir}/neutron-keepalived-state-change.log"
|
|
|
|
def callback(pid_file):
|
|
+ LOG.info(f'Router: {self.router_id} master is {self.master}, '
|
|
+ f'salves are {self.slaves}.')
|
|
cmd = [
|
|
STATE_CHANGE_PROC_NAME,
|
|
'--router_id=%s' % self.router_id,
|
|
+ '--master_agent=%s' % self.master,
|
|
+ '--slave_agents=%s' % ','.join(self.slaves),
|
|
'--namespace=%s' % self.ha_namespace,
|
|
'--conf_dir=%s' % config_dir,
|
|
'--log-file=%s' % state_change_log,
|
|
@@ -453,9 +486,7 @@ class HaRouter(router.RouterInfo):
|
|
return port1_filtered == port2_filtered
|
|
|
|
def external_gateway_added(self, ex_gw_port, interface_name):
|
|
- link_up = self.external_gateway_link_up()
|
|
- self._plug_external_gateway(ex_gw_port, interface_name,
|
|
- self.ns_name, link_up=link_up)
|
|
+ self._plug_external_gateway(ex_gw_port, interface_name, self.ns_name)
|
|
self._add_gateway_vip(ex_gw_port, interface_name)
|
|
self._disable_ipv6_addressing_on_interface(interface_name)
|
|
|
|
@@ -519,30 +550,3 @@ class HaRouter(router.RouterInfo):
|
|
if (self.keepalived_manager.get_process().active and
|
|
self.ha_state == 'master'):
|
|
super(HaRouter, self).enable_radvd(internal_ports)
|
|
-
|
|
- def external_gateway_link_up(self):
|
|
- # Check HA router ha_state for its gateway port link state.
|
|
- # 'backup' instance will not link up the gateway port.
|
|
- return self.ha_state == 'master'
|
|
-
|
|
- def set_external_gw_port_link_status(self, link_up, set_gw=False):
|
|
- link_state = "up" if link_up else "down"
|
|
- LOG.info('Set router %s gateway device link state to %s.',
|
|
- self.router_id, link_state)
|
|
-
|
|
- ex_gw_port = self.get_ex_gw_port()
|
|
- ex_gw_port_id = (ex_gw_port and ex_gw_port['id'] or
|
|
- self.ex_gw_port and self.ex_gw_port['id'])
|
|
- if ex_gw_port_id:
|
|
- interface_name = self.get_external_device_name(ex_gw_port_id)
|
|
- ns_name = self.get_gw_ns_name()
|
|
- if (not self.driver.set_link_status(
|
|
- interface_name, namespace=ns_name, link_up=link_up) and
|
|
- link_up):
|
|
- LOG.error('Gateway interface for router %s was not set up; '
|
|
- 'router will not work properly', self.router_id)
|
|
- if link_up and set_gw:
|
|
- preserve_ips = self.get_router_preserve_ips()
|
|
- self._external_gateway_settings(ex_gw_port, interface_name,
|
|
- ns_name, preserve_ips)
|
|
- self.routes_updated([], self.routes)
|
|
diff --git a/agent/l3/keepalived_state_change.py b/agent/l3/keepalived_state_change.py
|
|
index 7fd9e4269e..8c10a8b00f 100644
|
|
--- a/agent/l3/keepalived_state_change.py
|
|
+++ b/agent/l3/keepalived_state_change.py
|
|
@@ -47,9 +47,12 @@ class KeepalivedUnixDomainConnection(agent_utils.UnixDomainHTTPConnection):
|
|
|
|
|
|
class MonitorDaemon(daemon.Daemon):
|
|
- def __init__(self, pidfile, router_id, user, group, namespace, conf_dir,
|
|
- interface, cidr):
|
|
+ def __init__(self, pidfile, host, router_id, master, slaves, user, group,
|
|
+ namespace, conf_dir, interface, cidr):
|
|
+ self.host = host
|
|
self.router_id = router_id
|
|
+ self.master = master
|
|
+ self.slaves = slaves
|
|
self.namespace = namespace
|
|
self.conf_dir = conf_dir
|
|
self.interface = interface
|
|
@@ -62,6 +65,8 @@ class MonitorDaemon(daemon.Daemon):
|
|
user=user, group=group)
|
|
|
|
def run(self):
|
|
+ LOG.debug(f'Router: {self.router_id} master is {self.master}, '
|
|
+ f'salves are {self.slaves}.')
|
|
self._thread_ip_monitor = threading.Thread(
|
|
target=ip_lib.ip_monitor,
|
|
args=(self.namespace, self.queue, self.event_stop,
|
|
@@ -169,7 +174,10 @@ def main():
|
|
keepalived.register_l3_agent_keepalived_opts()
|
|
configure(cfg.CONF)
|
|
MonitorDaemon(cfg.CONF.pid_file,
|
|
+ cfg.CONF.host,
|
|
cfg.CONF.router_id,
|
|
+ cfg.CONF.master_agent,
|
|
+ cfg.CONF.slave_agents,
|
|
cfg.CONF.user,
|
|
cfg.CONF.group,
|
|
cfg.CONF.namespace,
|
|
diff --git a/agent/l3/router_info.py b/agent/l3/router_info.py
|
|
index ea2b488cd2..eabdcc6e54 100644
|
|
--- a/agent/l3/router_info.py
|
|
+++ b/agent/l3/router_info.py
|
|
@@ -697,16 +697,14 @@ class RouterInfo(BaseRouterInfo):
|
|
return [common_utils.ip_to_cidr(ip['floating_ip_address'])
|
|
for ip in floating_ips]
|
|
|
|
- def _plug_external_gateway(self, ex_gw_port, interface_name, ns_name,
|
|
- link_up=True):
|
|
+ def _plug_external_gateway(self, ex_gw_port, interface_name, ns_name):
|
|
self.driver.plug(ex_gw_port['network_id'],
|
|
ex_gw_port['id'],
|
|
interface_name,
|
|
ex_gw_port['mac_address'],
|
|
namespace=ns_name,
|
|
prefix=EXTERNAL_DEV_PREFIX,
|
|
- mtu=ex_gw_port.get('mtu'),
|
|
- link_up=link_up)
|
|
+ mtu=ex_gw_port.get('mtu'))
|
|
|
|
def _get_external_gw_ips(self, ex_gw_port):
|
|
gateway_ips = []
|
|
@@ -766,11 +764,7 @@ class RouterInfo(BaseRouterInfo):
|
|
LOG.debug("External gateway added: port(%s), interface(%s), ns(%s)",
|
|
ex_gw_port, interface_name, ns_name)
|
|
self._plug_external_gateway(ex_gw_port, interface_name, ns_name)
|
|
- self._external_gateway_settings(ex_gw_port, interface_name,
|
|
- ns_name, preserve_ips)
|
|
|
|
- def _external_gateway_settings(self, ex_gw_port, interface_name,
|
|
- ns_name, preserve_ips):
|
|
# Build up the interface and gateway IP addresses that
|
|
# will be added to the interface.
|
|
ip_cidrs = common_utils.fixed_ip_cidrs(ex_gw_port['fixed_ips'])
|
|
@@ -815,19 +809,18 @@ class RouterInfo(BaseRouterInfo):
|
|
return any(netaddr.IPAddress(gw_ip).version == 6
|
|
for gw_ip in gateway_ips)
|
|
|
|
- def get_router_preserve_ips(self):
|
|
+ def external_gateway_added(self, ex_gw_port, interface_name):
|
|
preserve_ips = self._list_floating_ip_cidrs() + list(
|
|
self.centralized_port_forwarding_fip_set)
|
|
preserve_ips.extend(self.agent.pd.get_preserve_ips(self.router_id))
|
|
- return preserve_ips
|
|
|
|
- def external_gateway_added(self, ex_gw_port, interface_name):
|
|
- preserve_ips = self.get_router_preserve_ips()
|
|
self._external_gateway_added(
|
|
ex_gw_port, interface_name, self.ns_name, preserve_ips)
|
|
|
|
def external_gateway_updated(self, ex_gw_port, interface_name):
|
|
- preserve_ips = self.get_router_preserve_ips()
|
|
+ preserve_ips = self._list_floating_ip_cidrs() + list(
|
|
+ self.centralized_port_forwarding_fip_set)
|
|
+ preserve_ips.extend(self.agent.pd.get_preserve_ips(self.router_id))
|
|
self._external_gateway_added(
|
|
ex_gw_port, interface_name, self.ns_name, preserve_ips)
|
|
|
|
diff --git a/agent/linux/dhcp.py b/agent/linux/dhcp.py
|
|
index 249e1a8199..5b82fe328f 100644
|
|
--- a/agent/linux/dhcp.py
|
|
+++ b/agent/linux/dhcp.py
|
|
@@ -19,6 +19,7 @@ import os
|
|
import re
|
|
import shutil
|
|
import time
|
|
+from typing import List
|
|
|
|
import netaddr
|
|
from neutron_lib.api.definitions import extra_dhcp_opt as edo_ext
|
|
@@ -40,6 +41,7 @@ from neutron.agent.linux import iptables_manager
|
|
from neutron.cmd import runtime_checks as checks
|
|
from neutron.common import ipv6_utils
|
|
from neutron.common import utils as common_utils
|
|
+from neutron.conf.common import NETWORK_HOST_OPTS
|
|
from neutron.ipam import utils as ipam_utils
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
@@ -81,6 +83,51 @@ def port_requires_dhcp_configuration(port):
|
|
constants.DEVICE_OWNER_DHCP]
|
|
|
|
|
|
+class Octopus(object):
|
|
+ def __init__(self):
|
|
+ self.tentacles = collections.defaultdict(Tentacle)
|
|
+
|
|
+ def __str__(self):
|
|
+ lines = ['']
|
|
+ for subnet_id, tentacle in self.tentacles.items():
|
|
+ line = (f"Subnet {subnet_id} has multi routers "
|
|
+ f"{len(tentacle.gateway_ports) > 1} "
|
|
+ f"{tentacle}")
|
|
+ lines.append(line)
|
|
+ return '\n'.join(lines)
|
|
+
|
|
+
|
|
+class Tentacle(object):
|
|
+ def __init__(self, gateway_ip: str):
|
|
+ self.gateway_ip = gateway_ip
|
|
+ self.tags = []
|
|
+ self.gateway_ports = collections.defaultdict(Sucker)
|
|
+ self.suckers = collections.defaultdict(Sucker)
|
|
+
|
|
+ def __str__(self):
|
|
+ lines = [""]
|
|
+ for port_id, sucker in self.suckers.items():
|
|
+ line = f" Port {port_id} {sucker}"
|
|
+ lines.append(line)
|
|
+ return '\n'.join(lines)
|
|
+
|
|
+
|
|
+class Sucker(object):
|
|
+ def __init__(self, host: str, device_owner: str, ip_address: str):
|
|
+ self.host = host
|
|
+ self.device_owner = device_owner
|
|
+ self.ip_address = ip_address
|
|
+ self.tag = None
|
|
+
|
|
+ def subnet_tag(self, subnet_id: str):
|
|
+ return f'{self.host}-subnet-{subnet_id}'
|
|
+
|
|
+ def __str__(self):
|
|
+ return (f"ip: {self.ip_address} \t"
|
|
+ f"binding_host: {self.host} \t"
|
|
+ f"device_owner: {self.device_owner}")
|
|
+
|
|
+
|
|
class DictModel(dict):
|
|
"""Convert dict into an object that provides attribute access to values."""
|
|
|
|
@@ -149,6 +196,10 @@ class DhcpBase(object):
|
|
self.process_monitor = process_monitor
|
|
self.device_manager = DeviceManager(self.conf, plugin)
|
|
self.version = version
|
|
+ self.octopus = Octopus()
|
|
+ self._init_octopus()
|
|
+ self.compute_to_network = dict()
|
|
+ self._init_compute_to_network()
|
|
|
|
@abc.abstractmethod
|
|
def enable(self):
|
|
@@ -193,6 +244,31 @@ class DhcpBase(object):
|
|
"""True if the metadata-proxy should be enabled for the network."""
|
|
raise NotImplementedError()
|
|
|
|
+ def _init_compute_to_network(self):
|
|
+ for network_node in self.conf.network_nodes:
|
|
+ self.conf.register_opts(NETWORK_HOST_OPTS, group=network_node)
|
|
+ network_group = self.conf.get(network_node, None)
|
|
+ if network_group:
|
|
+ compute_nodes = network_group.get('compute_nodes', [])
|
|
+ for compute_node in compute_nodes:
|
|
+ self.compute_to_network[compute_node] = network_node
|
|
+
|
|
+ def _init_octopus(self):
|
|
+ for subnet in self.network.subnets:
|
|
+ self.octopus.tentacles[subnet.id] = Tentacle(subnet.gateway_ip)
|
|
+
|
|
+ for port in self.network.ports:
|
|
+ for ip in port.fixed_ips:
|
|
+ host = port.get('binding:host_id')
|
|
+ device_owner = port.get('device_owner')
|
|
+ ip_address = ip.get('ip_address')
|
|
+ tentacle = self.octopus.tentacles[ip.subnet_id]
|
|
+ sucker = Sucker(host, device_owner, ip_address)
|
|
+ tentacle.suckers[port.id] = sucker
|
|
+ if (device_owner in (constants.DEVICE_OWNER_HA_REPLICATED_INT,
|
|
+ constants.DEVICE_OWNER_ROUTER_INTF)):
|
|
+ tentacle.gateway_ports[host] = sucker
|
|
+
|
|
|
|
@six.add_metaclass(abc.ABCMeta)
|
|
class DhcpLocalProcess(DhcpBase):
|
|
@@ -841,8 +917,18 @@ class Dnsmasq(DhcpLocalProcess):
|
|
(port.mac_address, tag, name, ip_address,
|
|
'set:', self._PORT_TAG_PREFIX % port.id))
|
|
else:
|
|
- buf.write('%s,%s%s,%s\n' %
|
|
- (port.mac_address, tag, name, ip_address))
|
|
+ subnet_tag = f'subnet-{alloc.subnet_id}'
|
|
+ if self.conf.enable_set_route_for_single_port:
|
|
+ tentacle = self.octopus.tentacles[alloc.subnet_id]
|
|
+ sucker = tentacle.suckers[port.id]
|
|
+ tentacle.tags.append(subnet_tag)
|
|
+ if sucker.device_owner.startswith(
|
|
+ constants.DEVICE_OWNER_COMPUTE_PREFIX):
|
|
+ subnet_tag = sucker.subnet_tag(alloc.subnet_id)
|
|
+ sucker.tag = subnet_tag
|
|
+
|
|
+ buf.write(f'{port.mac_address},{tag}{name},{ip_address},'
|
|
+ f'set:{subnet_tag}\n')
|
|
|
|
file_utils.replace_file(filename, buf.getvalue())
|
|
LOG.debug('Done building host file %s', filename)
|
|
@@ -1059,7 +1145,8 @@ class Dnsmasq(DhcpLocalProcess):
|
|
"""Write a dnsmasq compatible options file."""
|
|
options, subnet_index_map = self._generate_opts_per_subnet()
|
|
options += self._generate_opts_per_port(subnet_index_map)
|
|
-
|
|
+ if self.conf.enable_set_route_for_single_port:
|
|
+ options += self._generate_opts_for_compute_port(options)
|
|
name = self.get_conf_file_name('opts')
|
|
file_utils.replace_file(name, '\n'.join(options))
|
|
return name
|
|
@@ -1220,6 +1307,50 @@ class Dnsmasq(DhcpLocalProcess):
|
|
vx_ips))))
|
|
return options
|
|
|
|
+ def _generate_opts_for_compute_port(self, options: List[str]) -> List[str]:
|
|
+ new_options = []
|
|
+ LOG.debug(self.octopus)
|
|
+ if not self.compute_to_network:
|
|
+ LOG.warning('CONF.enable_set_route_for_single_port is True, '
|
|
+ 'but not configured.')
|
|
+ return new_options
|
|
+ for subnet_id, tentacle in self.octopus.tentacles.items():
|
|
+ if len(tentacle.tags) == 0:
|
|
+ continue
|
|
+ if len(tentacle.gateway_ports) <= 1:
|
|
+ LOG.info(f'Subnet {subnet_id} is not bound '
|
|
+ f'to different routers, '
|
|
+ f'so skip generate options for compute ports.')
|
|
+ continue
|
|
+ for port_id, sucker in tentacle.suckers.items():
|
|
+ if not sucker.tag:
|
|
+ continue
|
|
+ if not sucker.device_owner.startswith(
|
|
+ constants.DEVICE_OWNER_COMPUTE_PREFIX):
|
|
+ continue
|
|
+ network_node = self.compute_to_network.get(sucker.host, None)
|
|
+ if not network_node:
|
|
+ LOG.warning(f'Compute host {sucker.host} not configured.')
|
|
+ continue
|
|
+ port = tentacle.gateway_ports.get(network_node, None)
|
|
+ if not port:
|
|
+ LOG.warning(f'Subnet {subnet_id} does not have gateway '
|
|
+ f'port on network host {network_node}.')
|
|
+ continue
|
|
+ if tentacle.gateway_ip == port.ip_address:
|
|
+ continue
|
|
+ for option in options.copy():
|
|
+ if subnet_id in option:
|
|
+ option = option.replace(f'subnet-{subnet_id}',
|
|
+ sucker.tag)
|
|
+ if ('option:classless-static-route' in option or
|
|
+ ',249,' in option or 'option:router' in option):
|
|
+ gateway_ip = option.split(',')[-1]
|
|
+ option = option.replace(gateway_ip,
|
|
+ port.ip_address)
|
|
+ new_options.append(option)
|
|
+ return new_options
|
|
+
|
|
def _make_subnet_interface_ip_map(self):
|
|
subnet_lookup = dict(
|
|
(netaddr.IPNetwork(subnet.cidr), subnet.id)
|
|
diff --git a/agent/linux/interface.py b/agent/linux/interface.py
|
|
index 3ac476d7ba..2e6455707c 100644
|
|
--- a/agent/linux/interface.py
|
|
+++ b/agent/linux/interface.py
|
|
@@ -259,17 +259,16 @@ class LinuxInterfaceDriver(object):
|
|
|
|
@abc.abstractmethod
|
|
def plug_new(self, network_id, port_id, device_name, mac_address,
|
|
- bridge=None, namespace=None, prefix=None, mtu=None,
|
|
- link_up=True):
|
|
+ bridge=None, namespace=None, prefix=None, mtu=None):
|
|
"""Plug in the interface only for new devices that don't exist yet."""
|
|
|
|
def plug(self, network_id, port_id, device_name, mac_address,
|
|
- bridge=None, namespace=None, prefix=None, mtu=None, link_up=True):
|
|
+ bridge=None, namespace=None, prefix=None, mtu=None):
|
|
if not ip_lib.device_exists(device_name,
|
|
namespace=namespace):
|
|
self._safe_plug_new(
|
|
network_id, port_id, device_name, mac_address, bridge,
|
|
- namespace, prefix, mtu, link_up)
|
|
+ namespace, prefix, mtu)
|
|
else:
|
|
LOG.info("Device %s already exists", device_name)
|
|
if mtu:
|
|
@@ -279,11 +278,11 @@ class LinuxInterfaceDriver(object):
|
|
LOG.warning("No MTU configured for port %s", port_id)
|
|
|
|
def _safe_plug_new(self, network_id, port_id, device_name, mac_address,
|
|
- bridge=None, namespace=None, prefix=None, mtu=None, link_up=True):
|
|
+ bridge=None, namespace=None, prefix=None, mtu=None):
|
|
try:
|
|
self.plug_new(
|
|
network_id, port_id, device_name, mac_address, bridge,
|
|
- namespace, prefix, mtu, link_up)
|
|
+ namespace, prefix, mtu)
|
|
except TypeError:
|
|
LOG.warning("Interface driver's plug_new() method should now "
|
|
"accept additional optional parameter 'link_up'. "
|
|
@@ -321,27 +320,10 @@ class LinuxInterfaceDriver(object):
|
|
LOG.warning("Interface driver cannot update MTU for ports")
|
|
self._mtu_update_warn_logged = True
|
|
|
|
- def set_link_status(self, device_name, namespace=None, link_up=True):
|
|
- ns_dev = ip_lib.IPWrapper(namespace=namespace).device(device_name)
|
|
- try:
|
|
- utils.wait_until_true(ns_dev.exists, timeout=3)
|
|
- except utils.WaitTimeout:
|
|
- LOG.debug('Device %s may have been deleted concurrently',
|
|
- device_name)
|
|
- return False
|
|
-
|
|
- if link_up:
|
|
- ns_dev.link.set_up()
|
|
- else:
|
|
- ns_dev.link.set_down()
|
|
-
|
|
- return True
|
|
-
|
|
|
|
class NullDriver(LinuxInterfaceDriver):
|
|
def plug_new(self, network_id, port_id, device_name, mac_address,
|
|
- bridge=None, namespace=None, prefix=None, mtu=None,
|
|
- link_up=True):
|
|
+ bridge=None, namespace=None, prefix=None, mtu=None):
|
|
pass
|
|
|
|
def unplug(self, device_name, bridge=None, namespace=None, prefix=None):
|
|
@@ -377,8 +359,7 @@ class OVSInterfaceDriver(LinuxInterfaceDriver):
|
|
ovs.replace_port(device_name, *attrs)
|
|
|
|
def plug_new(self, network_id, port_id, device_name, mac_address,
|
|
- bridge=None, namespace=None, prefix=None, mtu=None,
|
|
- link_up=True):
|
|
+ bridge=None, namespace=None, prefix=None, mtu=None):
|
|
"""Plug in the interface."""
|
|
if not bridge:
|
|
bridge = self.conf.ovs_integration_bridge
|
|
@@ -442,8 +423,8 @@ class OVSInterfaceDriver(LinuxInterfaceDriver):
|
|
else:
|
|
LOG.warning("No MTU configured for port %s", port_id)
|
|
|
|
- if link_up:
|
|
- ns_dev.link.set_up()
|
|
+ ns_dev.link.set_up()
|
|
+
|
|
if self.conf.ovs_use_veth:
|
|
# ovs-dpdk does not do checksum calculations for veth interface
|
|
# (bug 1832021)
|
|
@@ -488,8 +469,7 @@ class BridgeInterfaceDriver(LinuxInterfaceDriver):
|
|
DEV_NAME_PREFIX = 'ns-'
|
|
|
|
def plug_new(self, network_id, port_id, device_name, mac_address,
|
|
- bridge=None, namespace=None, prefix=None, mtu=None,
|
|
- link_up=True):
|
|
+ bridge=None, namespace=None, prefix=None, mtu=None):
|
|
"""Plugin the interface."""
|
|
ip = ip_lib.IPWrapper()
|
|
|
|
@@ -508,8 +488,7 @@ class BridgeInterfaceDriver(LinuxInterfaceDriver):
|
|
LOG.warning("No MTU configured for port %s", port_id)
|
|
|
|
root_veth.link.set_up()
|
|
- if link_up:
|
|
- ns_veth.link.set_up()
|
|
+ ns_veth.link.set_up()
|
|
|
|
def unplug(self, device_name, bridge=None, namespace=None, prefix=None):
|
|
"""Unplug the interface."""
|
|
diff --git a/agent/linux/keepalived.py b/agent/linux/keepalived.py
|
|
index f47a27f1d1..405a781f0b 100644
|
|
--- a/agent/linux/keepalived.py
|
|
+++ b/agent/linux/keepalived.py
|
|
@@ -32,6 +32,7 @@ from neutron.common import utils
|
|
VALID_STATES = ['MASTER', 'BACKUP']
|
|
VALID_AUTH_TYPES = ['AH', 'PASS']
|
|
HA_DEFAULT_PRIORITY = 50
|
|
+HA_DEFAULT_MASTER_PRIORITY = 100
|
|
PRIMARY_VIP_RANGE_SIZE = 24
|
|
KEEPALIVED_SERVICE_NAME = 'keepalived'
|
|
KEEPALIVED_EMAIL_FROM = 'neutron@openstack.local'
|
|
diff --git a/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py b/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py
|
|
index 29b61b2c9b..fbad5f1d9a 100644
|
|
--- a/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py
|
|
+++ b/api/rpc/agentnotifiers/dhcp_rpc_agent_api.py
|
|
@@ -312,6 +312,15 @@ class DhcpAgentNotifyAPI(object):
|
|
return False
|
|
if set(orig.keys()) != set(new.keys()):
|
|
return False
|
|
+
|
|
+ if cfg.CONF.enable_set_route_for_single_port:
|
|
+ device_owner = new.get('device_owner', None)
|
|
+ orig_device_owner = orig.get('device_owner', None)
|
|
+ if (not orig_device_owner and device_owner and
|
|
+ device_owner.startswith(
|
|
+ constants.DEVICE_OWNER_COMPUTE_PREFIX)):
|
|
+ return True
|
|
+
|
|
for k in orig.keys():
|
|
if k in ('status', 'updated_at', 'revision_number'):
|
|
continue
|
|
diff --git a/api/rpc/callbacks/resources.py b/api/rpc/callbacks/resources.py
|
|
index 734f05eb6f..de56211c53 100644
|
|
--- a/api/rpc/callbacks/resources.py
|
|
+++ b/api/rpc/callbacks/resources.py
|
|
@@ -15,6 +15,7 @@ from neutron.objects import conntrack_helper
|
|
from neutron.objects.logapi import logging_resource as log_object
|
|
from neutron.objects import network
|
|
from neutron.objects import port_forwarding
|
|
+from neutron.objects import rg_port_forwarding
|
|
from neutron.objects import ports
|
|
from neutron.objects.qos import policy
|
|
from neutron.objects import securitygroup
|
|
@@ -33,6 +34,7 @@ SUBNET = subnet.Subnet.obj_name()
|
|
SECURITYGROUP = securitygroup.SecurityGroup.obj_name()
|
|
SECURITYGROUPRULE = securitygroup.SecurityGroupRule.obj_name()
|
|
PORTFORWARDING = port_forwarding.PortForwarding.obj_name()
|
|
+RGPORTFORWARDING = rg_port_forwarding.RGPortForwarding.obj_name()
|
|
CONNTRACKHELPER = conntrack_helper.ConntrackHelper.obj_name()
|
|
|
|
|
|
@@ -47,6 +49,7 @@ _VALID_CLS = (
|
|
securitygroup.SecurityGroupRule,
|
|
log_object.Log,
|
|
port_forwarding.PortForwarding,
|
|
+ rg_port_forwarding.RGPortForwarding,
|
|
conntrack_helper.ConntrackHelper,
|
|
)
|
|
|
|
diff --git a/conf/agent/l3/keepalived.py b/conf/agent/l3/keepalived.py
|
|
index bd46c723fc..5ff3492280 100644
|
|
--- a/conf/agent/l3/keepalived.py
|
|
+++ b/conf/agent/l3/keepalived.py
|
|
@@ -20,6 +20,8 @@ from neutron._i18n import _
|
|
|
|
CLI_OPTS = [
|
|
cfg.StrOpt('router_id', help=_('ID of the router')),
|
|
+ cfg.StrOpt('master_agent', help=_('The master agent of router')),
|
|
+ cfg.ListOpt('slave_agents', help=_('The slave agents of router')),
|
|
cfg.StrOpt('namespace', help=_('Namespace of the router')),
|
|
cfg.StrOpt('conf_dir', help=_('Path to the router directory')),
|
|
cfg.StrOpt('monitor_interface', help=_('Interface to monitor')),
|
|
diff --git a/conf/common.py b/conf/common.py
|
|
index f885429613..45e0b48723 100644
|
|
--- a/conf/common.py
|
|
+++ b/conf/common.py
|
|
@@ -145,7 +145,21 @@ core_opts = [
|
|
"Setting to any positive integer means that on failure "
|
|
"the connection is retried that many times. "
|
|
"For example, setting to 3 means total attempts to "
|
|
- "connect will be 4."))
|
|
+ "connect will be 4.")),
|
|
+ cfg.BoolOpt('enable_set_route_for_single_port', default=False,
|
|
+ help=_("To set route path for every single port "
|
|
+ "when the same subnet has multi ports on router.")),
|
|
+ cfg.ListOpt('network_nodes',
|
|
+ default=[],
|
|
+ help=_("The list of network hosts to "
|
|
+ "make a network map "
|
|
+ "with compute node and network node.")),
|
|
+]
|
|
+
|
|
+NETWORK_HOST_OPTS = [
|
|
+ cfg.ListOpt('compute_nodes',
|
|
+ default=[],
|
|
+ help=_("The list of compute hosts."))
|
|
]
|
|
|
|
core_cli_opts = [
|
|
diff --git a/conf/policies/__init__.py b/conf/policies/__init__.py
|
|
index aa4dda63d0..15cdaea45a 100644
|
|
--- a/conf/policies/__init__.py
|
|
+++ b/conf/policies/__init__.py
|
|
@@ -34,6 +34,7 @@ from neutron.conf.policies import port
|
|
from neutron.conf.policies import qos
|
|
from neutron.conf.policies import rbac
|
|
from neutron.conf.policies import router
|
|
+from neutron.conf.policies import rg_port_forwarding
|
|
from neutron.conf.policies import security_group
|
|
from neutron.conf.policies import segment
|
|
from neutron.conf.policies import service_type
|
|
@@ -63,6 +64,7 @@ def list_rules():
|
|
qos.list_rules(),
|
|
rbac.list_rules(),
|
|
router.list_rules(),
|
|
+ rg_port_forwarding.list_rules(),
|
|
security_group.list_rules(),
|
|
segment.list_rules(),
|
|
service_type.list_rules(),
|
|
diff --git a/conf/policies/rg_port_forwarding.py b/conf/policies/rg_port_forwarding.py
|
|
new file mode 100644
|
|
index 0000000000..19e2cd5e2f
|
|
--- /dev/null
|
|
+++ b/conf/policies/rg_port_forwarding.py
|
|
@@ -0,0 +1,76 @@
|
|
+# Copyright (c) 2023 UnionTech
|
|
+# All rights reserved
|
|
+#
|
|
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
+# not use this file except in compliance with the License. You may obtain
|
|
+# a copy of the License at
|
|
+#
|
|
+# http://www.apache.org/licenses/LICENSE-2.0
|
|
+#
|
|
+# Unless required by applicable law or agreed to in writing, software
|
|
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
+# License for the specific language governing permissions and limitations
|
|
+# under the License.
|
|
+
|
|
+
|
|
+from oslo_policy import policy
|
|
+from neutron.conf.policies import base
|
|
+
|
|
+COLLECTION_PATH = '/routers/{router_id}/port_forwardings'
|
|
+RESOURCE_PATH = '/routers/{router_id}/port_forwardings/{port_forwarding_id}'
|
|
+
|
|
+rules = [
|
|
+ policy.DocumentedRuleDefault(
|
|
+ 'create_router_gateway_port_forwarding',
|
|
+ base.RULE_ADMIN_OR_PARENT_OWNER,
|
|
+ 'Create a router gateway port forwarding',
|
|
+ [
|
|
+ {
|
|
+ 'method': 'POST',
|
|
+ 'path': COLLECTION_PATH,
|
|
+ },
|
|
+ ]
|
|
+ ),
|
|
+ policy.DocumentedRuleDefault(
|
|
+ 'get_router_gateway_forwarding',
|
|
+ base.RULE_ADMIN_OR_PARENT_OWNER,
|
|
+ 'Get a router gateway port forwarding',
|
|
+ [
|
|
+ {
|
|
+ 'method': 'GET',
|
|
+ 'path': COLLECTION_PATH,
|
|
+ },
|
|
+ {
|
|
+ 'method': 'GET',
|
|
+ 'path': RESOURCE_PATH,
|
|
+ },
|
|
+ ]
|
|
+ ),
|
|
+ policy.DocumentedRuleDefault(
|
|
+ 'update_router_gateway_port_forwarding',
|
|
+ base.RULE_ADMIN_OR_PARENT_OWNER,
|
|
+ 'Update a floating IP port forwarding',
|
|
+ [
|
|
+ {
|
|
+ 'method': 'PUT',
|
|
+ 'path': RESOURCE_PATH,
|
|
+ },
|
|
+ ]
|
|
+ ),
|
|
+ policy.DocumentedRuleDefault(
|
|
+ 'delete_router_gateway_port_forwarding',
|
|
+ base.RULE_ADMIN_OR_PARENT_OWNER,
|
|
+ 'Delete a floating IP port forwarding',
|
|
+ [
|
|
+ {
|
|
+ 'method': 'DELETE',
|
|
+ 'path': RESOURCE_PATH,
|
|
+ },
|
|
+ ]
|
|
+ ),
|
|
+]
|
|
+
|
|
+
|
|
+def list_rules():
|
|
+ return rules
|
|
diff --git a/db/l3_attrs_db.py b/db/l3_attrs_db.py
|
|
index e6d4e298b1..f292b7aa32 100644
|
|
--- a/db/l3_attrs_db.py
|
|
+++ b/db/l3_attrs_db.py
|
|
@@ -19,6 +19,7 @@ from oslo_config import cfg
|
|
|
|
from neutron._i18n import _
|
|
from neutron.db.models import l3_attrs
|
|
+from neutron.objects.base import NeutronDbObject
|
|
|
|
|
|
def get_attr_info():
|
|
@@ -29,7 +30,11 @@ def get_attr_info():
|
|
'availability_zone_hints': {
|
|
'default': '[]',
|
|
'transform_to_db': az_validator.convert_az_list_to_string,
|
|
- 'transform_from_db': az_validator.convert_az_string_to_list}
|
|
+ 'transform_from_db': az_validator.convert_az_string_to_list},
|
|
+ 'configurations': {
|
|
+ 'default': '{}',
|
|
+ 'transform_to_db': NeutronDbObject.filter_to_json_str,
|
|
+ 'transform_from_db': NeutronDbObject.load_json_from_str}
|
|
}
|
|
|
|
|
|
diff --git a/db/l3_db.py b/db/l3_db.py
|
|
index 565b422532..b625dc1959 100644
|
|
--- a/db/l3_db.py
|
|
+++ b/db/l3_db.py
|
|
@@ -47,6 +47,7 @@ from neutron.common import ipv6_utils
|
|
from neutron.common import utils
|
|
from neutron.db import _utils as db_utils
|
|
from neutron.db.models import l3 as l3_models
|
|
+from neutron.plugins.ml2 import models as ml2_models
|
|
from neutron.db import models_v2
|
|
from neutron.db import standardattrdescription_db as st_attr
|
|
from neutron.extensions import l3
|
|
@@ -1086,21 +1087,37 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
|
|
# with subnet's gateway-ip, return that router.
|
|
# Otherwise return the first router.
|
|
RouterPort = l3_models.RouterPort
|
|
+ RouterPortBinding = orm.aliased(ml2_models.PortBinding,
|
|
+ name="router_port_binding")
|
|
+ ComputePortBinding = orm.aliased(ml2_models.PortBinding,
|
|
+ name="compute_port_binding")
|
|
gw_port = orm.aliased(models_v2.Port, name="gw_port")
|
|
# TODO(lujinluo): Need IPAllocation and Port object
|
|
routerport_qry = context.session.query(
|
|
- RouterPort.router_id, models_v2.IPAllocation.ip_address).join(
|
|
- RouterPort.port, models_v2.Port.fixed_ips).filter(
|
|
+ RouterPort.router_id, models_v2.IPAllocation.ip_address,
|
|
+ RouterPortBinding.host, ComputePortBinding.host,
|
|
+ ).join(
|
|
+ RouterPort.port, models_v2.Port.fixed_ips
|
|
+ ).filter(
|
|
models_v2.Port.network_id == internal_port['network_id'],
|
|
RouterPort.port_type.in_(constants.ROUTER_INTERFACE_OWNERS),
|
|
- models_v2.IPAllocation.subnet_id == internal_subnet['id']
|
|
- ).join(gw_port, gw_port.device_id == RouterPort.router_id).filter(
|
|
+ models_v2.IPAllocation.subnet_id == internal_subnet['id'],
|
|
+ ComputePortBinding.port_id == internal_port['id'],
|
|
+ ).join(
|
|
+ gw_port, gw_port.device_id == RouterPort.router_id
|
|
+ ).filter(
|
|
gw_port.network_id == external_network_id,
|
|
gw_port.device_owner == DEVICE_OWNER_ROUTER_GW
|
|
+ ).join(
|
|
+ RouterPortBinding, RouterPortBinding.port_id == models_v2.Port.id
|
|
).distinct()
|
|
|
|
first_router_id = None
|
|
- for router_id, interface_ip in routerport_qry:
|
|
+ for (router_id, interface_ip,
|
|
+ network_host, compute_host) in routerport_qry:
|
|
+ network_node = self.compute_to_network.get(compute_host, None)
|
|
+ if network_node and network_node == network_host:
|
|
+ return router_id
|
|
if interface_ip == internal_subnet['gateway_ip']:
|
|
return router_id
|
|
if not first_router_id:
|
|
diff --git a/db/l3_hamode_db.py b/db/l3_hamode_db.py
|
|
index bff388e166..4c414016dd 100644
|
|
--- a/db/l3_hamode_db.py
|
|
+++ b/db/l3_hamode_db.py
|
|
@@ -32,6 +32,7 @@ from neutron_lib import constants
|
|
from neutron_lib.db import api as db_api
|
|
from neutron_lib import exceptions as n_exc
|
|
from neutron_lib.exceptions import l3 as l3_exc
|
|
+from neutron_lib.exceptions import agent as agent_exc
|
|
from neutron_lib.exceptions import l3_ext_ha_mode as l3ha_exc
|
|
from neutron_lib.objects import exceptions as obj_base
|
|
from neutron_lib.plugins import utils as p_utils
|
|
@@ -54,6 +55,7 @@ from neutron.db import l3_dvr_db
|
|
from neutron.objects import base
|
|
from neutron.objects import l3_hamode
|
|
from neutron.objects import router as l3_obj
|
|
+from neutron.objects import agent as agent_obj
|
|
|
|
|
|
VR_ID_RANGE = set(range(1, 255))
|
|
@@ -378,12 +380,85 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
|
|
if not self.get_ha_network(context, router['tenant_id']):
|
|
self._create_ha_network(context, router['tenant_id'])
|
|
|
|
+ @staticmethod
|
|
+ def _check_router_configurations_creation(context,
|
|
+ configurations: dict,
|
|
+ is_ha: bool = True):
|
|
+ agents = agent_obj.Agent.get_objects(context,
|
|
+ binary='neutron-l3-agent')
|
|
+ agents = [agent['host'] for agent in agents]
|
|
+ master = configurations.get('master_agent', None)
|
|
+ slaves = configurations.get('slave_agents', [])
|
|
+ preferred_agent = configurations.get('preferred_agent', None)
|
|
+ if is_ha:
|
|
+ if master and slaves:
|
|
+ for agent in [master] + slaves:
|
|
+ if agent not in agents:
|
|
+ raise agent_exc.AgentNotFound(id=agent)
|
|
+ if master in slaves:
|
|
+ raise l3_exc.RouterAgentConflict()
|
|
+ else:
|
|
+ if master or slaves:
|
|
+ raise l3_exc.RouterAgentNotGiven()
|
|
+ else:
|
|
+ if preferred_agent and preferred_agent not in agents:
|
|
+ raise agent_exc.AgentNotFound(id=preferred_agent)
|
|
+
|
|
+ @staticmethod
|
|
+ def _check_router_configurations_update(context,
|
|
+ configurations: dict,
|
|
+ old_configurations: dict,
|
|
+ is_ha: bool = True) -> bool:
|
|
+ if configurations == old_configurations:
|
|
+ return False
|
|
+
|
|
+ agents = agent_obj.Agent.get_objects(context,
|
|
+ binary='neutron-l3-agent')
|
|
+ agents = [agent['host'] for agent in agents]
|
|
+ master = configurations.get('master_agent', None)
|
|
+ slaves = configurations.get('slave_agents', [])
|
|
+ preferred_agent = configurations.get('preferred_agent', None)
|
|
+ old_master = old_configurations.get('master_agent', None)
|
|
+ old_slaves = old_configurations.get('slave_agents', [])
|
|
+ old_preferred_agent = old_configurations.get('preferred_agent', None)
|
|
+ if is_ha:
|
|
+ if master:
|
|
+ if master != old_master:
|
|
+ if master not in agents:
|
|
+ raise agent_exc.AgentNotFound(id=master)
|
|
+ old_configurations['master_agent'] = master
|
|
+ if slaves:
|
|
+ if slaves != old_slaves:
|
|
+ for slave in slaves:
|
|
+ if slave not in agents:
|
|
+ raise agent_exc.AgentNotFound(id=slave)
|
|
+ old_configurations['slave_agents'] = slaves
|
|
+ if (old_configurations['master_agent'] in
|
|
+ old_configurations['slave_agents']):
|
|
+ raise l3_exc.RouterAgentConflict()
|
|
+ else:
|
|
+ if preferred_agent:
|
|
+ if preferred_agent != old_preferred_agent:
|
|
+ if preferred_agent not in agents:
|
|
+ raise agent_exc.AgentNotFound(id=preferred_agent)
|
|
+ old_configurations['preferred_agent'] = preferred_agent
|
|
+ else:
|
|
+ old_configurations['preferred_agent'] = None
|
|
+
|
|
+ return True
|
|
+
|
|
@registry.receives(resources.ROUTER, [events.PRECOMMIT_CREATE],
|
|
priority_group.PRIORITY_ROUTER_EXTENDED_ATTRIBUTE)
|
|
def _precommit_router_create(self, resource, event, trigger, context,
|
|
router, router_db, **kwargs):
|
|
"""Event handler to set ha flag and status on creation."""
|
|
is_ha = self._is_ha(router)
|
|
+ configurations = router.get('configurations', {})
|
|
+ if configurations:
|
|
+ self._check_router_configurations_creation(context, configurations,
|
|
+ is_ha)
|
|
+ self.set_extra_attr_value(context, router_db, 'configurations',
|
|
+ configurations)
|
|
router['ha'] = is_ha
|
|
self.set_extra_attr_value(context, router_db, 'ha', is_ha)
|
|
if not is_ha:
|
|
@@ -465,6 +540,28 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin,
|
|
self.set_extra_attr_value(
|
|
payload.context, payload.desired_state, 'ha', requested_ha_state)
|
|
|
|
+ @registry.receives(resources.ROUTER, [events.PRECOMMIT_UPDATE],
|
|
+ priority_group.PRIORITY_ROUTER_EXTENDED_ATTRIBUTE)
|
|
+ def _validate_configurations(self, resource, event, trigger, payload=None):
|
|
+ old_configurations = payload.states[0].get('configurations', {})
|
|
+ configurations = payload.request_body.get('configurations', {})
|
|
+
|
|
+ if not configurations:
|
|
+ return
|
|
+
|
|
+ if payload.desired_state.admin_state_up:
|
|
+ msg = _('Cannot change configurations of active routers. Please '
|
|
+ 'set router admin_state_up to False prior to upgrade')
|
|
+ raise n_exc.BadRequest(resource='router', msg=msg)
|
|
+
|
|
+ need_update = self._check_router_configurations_update(
|
|
+ payload.context, configurations, old_configurations,
|
|
+ payload.states[0]['ha'])
|
|
+
|
|
+ if need_update:
|
|
+ self.set_extra_attr_value(payload.context, payload.desired_state,
|
|
+ 'configurations', old_configurations)
|
|
+
|
|
@registry.receives(resources.ROUTER, [events.AFTER_UPDATE],
|
|
priority_group.PRIORITY_ROUTER_EXTENDED_ATTRIBUTE)
|
|
def _reconfigure_ha_resources(self, resource, event, trigger, context,
|
|
diff --git a/db/migration/alembic_migrations/versions/EXPAND_HEAD b/db/migration/alembic_migrations/versions/EXPAND_HEAD
|
|
index ffa2bbaaf6..0c8e4a2178 100644
|
|
--- a/db/migration/alembic_migrations/versions/EXPAND_HEAD
|
|
+++ b/db/migration/alembic_migrations/versions/EXPAND_HEAD
|
|
@@ -1 +1 @@
|
|
-c613d0b82681
|
|
+1c19a98b5eef
|
|
diff --git a/db/migration/alembic_migrations/versions/train/expand/1c19a98b5eef_add_router_configurations.py b/db/migration/alembic_migrations/versions/train/expand/1c19a98b5eef_add_router_configurations.py
|
|
new file mode 100644
|
|
index 0000000000..f12600ef4f
|
|
--- /dev/null
|
|
+++ b/db/migration/alembic_migrations/versions/train/expand/1c19a98b5eef_add_router_configurations.py
|
|
@@ -0,0 +1,36 @@
|
|
+# Copyright 2023 OpenStack Foundation
|
|
+#
|
|
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
+# not use this file except in compliance with the License. You may obtain
|
|
+# a copy of the License at
|
|
+#
|
|
+# http://www.apache.org/licenses/LICENSE-2.0
|
|
+#
|
|
+# Unless required by applicable law or agreed to in writing, software
|
|
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
+# License for the specific language governing permissions and limitations
|
|
+# under the License.
|
|
+#
|
|
+
|
|
+from alembic import op
|
|
+import sqlalchemy as sa
|
|
+
|
|
+"""add router configurations
|
|
+
|
|
+Revision ID: 1c19a98b5eef
|
|
+Revises: cab12b72ed90
|
|
+Create Date: 2023-08-01 10:05:56.412167
|
|
+
|
|
+"""
|
|
+
|
|
+# revision identifiers, used by Alembic.
|
|
+revision = '1c19a98b5eef'
|
|
+down_revision = 'cab12b72ed90'
|
|
+
|
|
+
|
|
+def upgrade():
|
|
+ # ### commands auto generated by Alembic - please adjust! ###
|
|
+ op.add_column('router_extra_attributes',
|
|
+ sa.Column('configurations', sa.String(length=4095)))
|
|
+ # ### end Alembic commands ###
|
|
diff --git a/db/migration/alembic_migrations/versions/train/expand/cab12b72ed90_add_router_gateway_port_forwarding.py b/db/migration/alembic_migrations/versions/train/expand/cab12b72ed90_add_router_gateway_port_forwarding.py
|
|
new file mode 100644
|
|
index 0000000000..ad511a7ed8
|
|
--- /dev/null
|
|
+++ b/db/migration/alembic_migrations/versions/train/expand/cab12b72ed90_add_router_gateway_port_forwarding.py
|
|
@@ -0,0 +1,55 @@
|
|
+# Copyright 2023 OpenStack Foundation
|
|
+#
|
|
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
+# not use this file except in compliance with the License. You may obtain
|
|
+# a copy of the License at
|
|
+#
|
|
+# http://www.apache.org/licenses/LICENSE-2.0
|
|
+#
|
|
+# Unless required by applicable law or agreed to in writing, software
|
|
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
+# License for the specific language governing permissions and limitations
|
|
+# under the License.
|
|
+#
|
|
+
|
|
+from alembic import op
|
|
+import sqlalchemy as sa
|
|
+
|
|
+"""add router gateway port forwarding
|
|
+
|
|
+Revision ID: cab12b72ed90
|
|
+Revises: c613d0b82681
|
|
+Create Date: 2023-07-04 10:27:54.485453
|
|
+
|
|
+"""
|
|
+
|
|
+# revision identifiers, used by Alembic.
|
|
+revision = 'cab12b72ed90'
|
|
+down_revision = 'c613d0b82681'
|
|
+
|
|
+
|
|
+def upgrade():
|
|
+ # ### commands auto generated by Alembic - please adjust! ###
|
|
+ op.create_table(
|
|
+ 'rgportforwardings',
|
|
+ sa.Column('id', sa.String(length=36), nullable=False),
|
|
+ sa.Column('router_id', sa.String(length=36), nullable=False),
|
|
+ sa.Column('external_port', sa.Integer(), nullable=False),
|
|
+ sa.Column('internal_neutron_port_id', sa.String(length=36),
|
|
+ nullable=False),
|
|
+ sa.Column('protocol', sa.String(length=40), nullable=False),
|
|
+ sa.Column('socket', sa.String(length=36), nullable=False),
|
|
+ sa.ForeignKeyConstraint(['internal_neutron_port_id'], ['ports.id'],
|
|
+ ondelete='CASCADE'),
|
|
+ sa.ForeignKeyConstraint(['router_id'], ['routers.id'],
|
|
+ ondelete='CASCADE'),
|
|
+ sa.PrimaryKeyConstraint('id'),
|
|
+ sa.UniqueConstraint(
|
|
+ 'internal_neutron_port_id', 'socket', 'protocol',
|
|
+ name='uniq_port_forwardings0internal_neutron_port_id0socket0protocol'),
|
|
+ sa.UniqueConstraint(
|
|
+ 'router_id', 'external_port', 'protocol',
|
|
+ name='uniq_rg_port_forwardings0router_id0external_port0protocol')
|
|
+ )
|
|
+ # ### end Alembic commands ###
|
|
diff --git a/db/models/l3_attrs.py b/db/models/l3_attrs.py
|
|
index 6c30ac2c16..904f4ef08d 100644
|
|
--- a/db/models/l3_attrs.py
|
|
+++ b/db/models/l3_attrs.py
|
|
@@ -41,6 +41,8 @@ class RouterExtraAttributes(model_base.BASEV2):
|
|
# Availability Zone support
|
|
availability_zone_hints = sa.Column(sa.String(255))
|
|
|
|
+ configurations = sa.Column(sa.String(4095))
|
|
+
|
|
router = orm.relationship(
|
|
'Router', load_on_pending=True,
|
|
backref=orm.backref("extra_attributes", lazy='joined',
|
|
diff --git a/db/models/rg_port_forwarding.py b/db/models/rg_port_forwarding.py
|
|
new file mode 100644
|
|
index 0000000000..e7963169a8
|
|
--- /dev/null
|
|
+++ b/db/models/rg_port_forwarding.py
|
|
@@ -0,0 +1,59 @@
|
|
+# Copyright (c) 2023 UnionTech
|
|
+# All rights reserved
|
|
+#
|
|
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
+# not use this file except in compliance with the License. You may obtain
|
|
+# a copy of the License at
|
|
+#
|
|
+# http://www.apache.org/licenses/LICENSE-2.0
|
|
+#
|
|
+# Unless required by applicable law or agreed to in writing, software
|
|
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
+# License for the specific language governing permissions and limitations
|
|
+# under the License.
|
|
+
|
|
+import sqlalchemy as sa
|
|
+from sqlalchemy import orm
|
|
+from neutron_lib.db import model_base
|
|
+from neutron_lib.db import constants as db_const
|
|
+
|
|
+from neutron.db.models import l3
|
|
+from neutron.db import models_v2
|
|
+
|
|
+
|
|
+class RGPortForwarding(model_base.BASEV2, model_base.HasId):
|
|
+ __table_args__ = (
|
|
+ sa.UniqueConstraint('router_id', 'external_port', 'protocol',
|
|
+ name='uniq_rg_port_forwardings0router_id0'
|
|
+ 'external_port0protocol'),
|
|
+ sa.UniqueConstraint('internal_neutron_port_id', 'socket', 'protocol',
|
|
+ name='uniq_port_forwardings0'
|
|
+ 'internal_neutron_port_id0socket0'
|
|
+ 'protocol')
|
|
+ )
|
|
+
|
|
+ router_id = sa.Column(sa.String(db_const.UUID_FIELD_SIZE),
|
|
+ sa.ForeignKey('routers.id',
|
|
+ ondelete="CASCADE"),
|
|
+ nullable=False)
|
|
+ external_port = sa.Column(sa.Integer, nullable=False)
|
|
+ internal_neutron_port_id = sa.Column(
|
|
+ sa.String(db_const.UUID_FIELD_SIZE),
|
|
+ sa.ForeignKey('ports.id', ondelete="CASCADE"),
|
|
+ nullable=False)
|
|
+ protocol = sa.Column(sa.String(40), nullable=False)
|
|
+ socket = sa.Column(sa.String(36), nullable=False)
|
|
+ port = orm.relationship(
|
|
+ models_v2.Port, load_on_pending=True,
|
|
+ backref=orm.backref("rg_port_forwardings",
|
|
+ lazy='subquery', uselist=True,
|
|
+ cascade='delete')
|
|
+ )
|
|
+ router = orm.relationship(
|
|
+ l3.Router, load_on_pending=True,
|
|
+ backref=orm.backref("rg_port_forwardings",
|
|
+ lazy='subquery', uselist=True,
|
|
+ cascade='delete')
|
|
+ )
|
|
+ revises_on_change = ('router', 'port',)
|
|
diff --git a/extensions/rg_port_forwarding.py b/extensions/rg_port_forwarding.py
|
|
new file mode 100644
|
|
index 0000000000..c9888cd1ec
|
|
--- /dev/null
|
|
+++ b/extensions/rg_port_forwarding.py
|
|
@@ -0,0 +1,119 @@
|
|
+# Copyright (c) 2023 UnionTech
|
|
+# All rights reserved
|
|
+#
|
|
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
+# not use this file except in compliance with the License. You may obtain
|
|
+# a copy of the License at
|
|
+#
|
|
+# http://www.apache.org/licenses/LICENSE-2.0
|
|
+#
|
|
+# Unless required by applicable law or agreed to in writing, software
|
|
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
+# License for the specific language governing permissions and limitations
|
|
+# under the License.
|
|
+
|
|
+import six
|
|
+import abc
|
|
+import itertools
|
|
+from typing import List
|
|
+
|
|
+from neutron_lib.context import Context
|
|
+from neutron_lib.plugins import constants
|
|
+from neutron_lib.plugins import directory
|
|
+from neutron_lib.services import base as service_base
|
|
+from neutron_lib.api.extensions import APIExtensionDescriptor
|
|
+from neutron_lib.api.definitions import rg_port_forwarding as apidef
|
|
+
|
|
+from neutron.api.v2 import base
|
|
+from neutron.api.v2 import resource_helper
|
|
+from neutron.api.extensions import ResourceExtension
|
|
+
|
|
+
|
|
+class Rg_port_forwarding(APIExtensionDescriptor):
|
|
+ api_definition = apidef
|
|
+
|
|
+ @classmethod
|
|
+ def get_plugin_interface(cls):
|
|
+ return RGPortForwardingPluginBase
|
|
+
|
|
+ @classmethod
|
|
+ def get_resources(cls):
|
|
+ special_mappings = {'routers': 'router'}
|
|
+ plural_mappings = resource_helper.build_plural_mappings(
|
|
+ special_mappings,
|
|
+ itertools.chain(
|
|
+ apidef.RESOURCE_ATTRIBUTE_MAP,
|
|
+ apidef.SUB_RESOURCE_ATTRIBUTE_MAP
|
|
+ )
|
|
+ )
|
|
+
|
|
+ resources = resource_helper.build_resource_info(
|
|
+ plural_mappings,
|
|
+ apidef.RESOURCE_ATTRIBUTE_MAP,
|
|
+ constants.ROUTER_GATEWAY_PORTFORWARDING,
|
|
+ translate_name=True,
|
|
+ allow_bulk=True)
|
|
+
|
|
+ plugin = directory.get_plugin(constants.ROUTER_GATEWAY_PORTFORWARDING)
|
|
+
|
|
+ parent = apidef.SUB_RESOURCE_ATTRIBUTE_MAP[apidef.COLLECTION_NAME].get(
|
|
+ 'parent')
|
|
+ params = apidef.SUB_RESOURCE_ATTRIBUTE_MAP[apidef.COLLECTION_NAME].get(
|
|
+ 'parameters')
|
|
+ controller = base.create_resource(
|
|
+ apidef.COLLECTION_NAME, apidef.RESOURCE_NAME, plugin, params,
|
|
+ allow_bulk=True, parent=parent, allow_pagination=True,
|
|
+ allow_sorting=True)
|
|
+
|
|
+ resource = ResourceExtension(
|
|
+ apidef.COLLECTION_NAME, controller, parent, attr_map=params)
|
|
+ resources.append(resource)
|
|
+
|
|
+ return resources
|
|
+
|
|
+
|
|
+@six.add_metaclass(abc.ABCMeta)
|
|
+class RGPortForwardingPluginBase(service_base.ServicePluginBase):
|
|
+ path_prefix = apidef.API_PREFIX
|
|
+
|
|
+ @classmethod
|
|
+ def get_plugin_type(cls):
|
|
+ return constants.ROUTER_GATEWAY_PORTFORWARDING
|
|
+
|
|
+ def get_plugin_description(self):
|
|
+ return "Router Gateway Port Forwarding Service Plugin"
|
|
+
|
|
+ @abc.abstractmethod
|
|
+ def create_router_gateway_port_forwarding(self, context: Context,
|
|
+ router_id: str,
|
|
+ gateway_port_forwarding: dict):
|
|
+ pass
|
|
+
|
|
+ @abc.abstractmethod
|
|
+ def update_router_gateway_port_forwarding(self, context: Context, id: str,
|
|
+ router_id: str,
|
|
+ gateway_port_forwarding: dict):
|
|
+ pass
|
|
+
|
|
+ @abc.abstractmethod
|
|
+ def get_router_gateway_port_forwarding(self, context: Context, id: str,
|
|
+ router_id: str,
|
|
+ fields: List[str] = None):
|
|
+ pass
|
|
+
|
|
+ @abc.abstractmethod
|
|
+ def get_router_gateway_port_forwardings(self, context: Context,
|
|
+ router_id: str,
|
|
+ filters: List[str] = None,
|
|
+ fields: List[str] = None,
|
|
+ sorts: List[str] = None,
|
|
+ limit: int = None,
|
|
+ marker: str = None,
|
|
+ page_reverse: bool = False):
|
|
+ pass
|
|
+
|
|
+ @abc.abstractmethod
|
|
+ def delete_router_gateway_port_forwarding(self, context: Context, id: str,
|
|
+ router_id: str):
|
|
+ pass
|
|
diff --git a/objects/rg_port_forwarding.py b/objects/rg_port_forwarding.py
|
|
new file mode 100644
|
|
index 0000000000..28cb4e1d4d
|
|
--- /dev/null
|
|
+++ b/objects/rg_port_forwarding.py
|
|
@@ -0,0 +1,87 @@
|
|
+# Copyright (c) 2023 UnionTech
|
|
+# All rights reserved
|
|
+#
|
|
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
+# not use this file except in compliance with the License. You may obtain
|
|
+# a copy of the License at
|
|
+#
|
|
+# http://www.apache.org/licenses/LICENSE-2.0
|
|
+#
|
|
+# Unless required by applicable law or agreed to in writing, software
|
|
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
+# License for the specific language governing permissions and limitations
|
|
+# under the License.
|
|
+
|
|
+import netaddr
|
|
+from neutron_lib import constants as lib_const
|
|
+from oslo_versionedobjects import fields as obj_fields
|
|
+
|
|
+from neutron.objects import base, common_types
|
|
+from neutron.db.models import rg_port_forwarding as models
|
|
+
|
|
+FIELDS_NOT_SUPPORT_FILTER = ['internal_ip_address', 'internal_port']
|
|
+
|
|
+
|
|
+@base.NeutronObjectRegistry.register
|
|
+class RGPortForwarding(base.NeutronDbObject):
|
|
+ VERSION = '1.0'
|
|
+
|
|
+ db_model = models.RGPortForwarding
|
|
+
|
|
+ primary_keys = ['id']
|
|
+ foreign_keys = {
|
|
+ 'Router': {'router_id': 'id'},
|
|
+ 'Port': {'internal_port_id': 'id'}
|
|
+ }
|
|
+ fields_need_translation = {
|
|
+ 'socket': 'socket',
|
|
+ 'internal_port_id': 'internal_neutron_port_id'
|
|
+ }
|
|
+
|
|
+ fields = {
|
|
+ 'id': common_types.UUIDField(),
|
|
+ 'router_id': common_types.UUIDField(nullable=False),
|
|
+ 'external_port': common_types.PortRangeField(nullable=False),
|
|
+ 'protocol': common_types.IpProtocolEnumField(nullable=False),
|
|
+ 'internal_port_id': common_types.UUIDField(nullable=False),
|
|
+ 'internal_ip_address': obj_fields.IPV4AddressField(),
|
|
+ 'internal_port': common_types.PortRangeField(nullable=False),
|
|
+ 'gw_ip_address': obj_fields.IPV4AddressField(),
|
|
+ }
|
|
+
|
|
+ synthetic_fields = ['gw_ip_address']
|
|
+ fields_no_update = {'id', 'router_id'}
|
|
+
|
|
+ def __eq__(self, other):
|
|
+ for attr in self.fields:
|
|
+ if getattr(self, attr) != getattr(other, attr):
|
|
+ return False
|
|
+ return True
|
|
+
|
|
+ def obj_load_attr(self, attrname):
|
|
+ super(RGPortForwarding, self).obj_load_attr(attrname)
|
|
+
|
|
+ def from_db_object(self, db_obj):
|
|
+ super(RGPortForwarding, self).from_db_object(db_obj)
|
|
+
|
|
+ @classmethod
|
|
+ def modify_fields_from_db(cls, db_obj):
|
|
+ result = super(RGPortForwarding, cls).modify_fields_from_db(db_obj)
|
|
+ if 'socket' in result:
|
|
+ groups = result['socket'].split(":")
|
|
+ result['internal_ip_address'] = netaddr.IPAddress(
|
|
+ groups[0], version=lib_const.IP_VERSION_4)
|
|
+ result['internal_port'] = int(groups[1])
|
|
+ del result['socket']
|
|
+ return result
|
|
+
|
|
+ @classmethod
|
|
+ def modify_fields_to_db(cls, fields):
|
|
+ result = super(RGPortForwarding, cls).modify_fields_to_db(fields)
|
|
+ if 'internal_ip_address' in result and 'internal_port' in result:
|
|
+ result['socket'] = (f"{result['internal_ip_address']}:"
|
|
+ f"{result['internal_port']}")
|
|
+ del result['internal_ip_address']
|
|
+ del result['internal_port']
|
|
+ return result
|
|
diff --git a/objects/router.py b/objects/router.py
|
|
index 1373f89515..9590a109f6 100644
|
|
--- a/objects/router.py
|
|
+++ b/objects/router.py
|
|
@@ -18,6 +18,7 @@ from neutron_lib.api.definitions import availability_zone as az_def
|
|
from neutron_lib.api.validators import availability_zone as az_validator
|
|
from neutron_lib import constants as n_const
|
|
from neutron_lib.utils import net as net_utils
|
|
+from neutron_lib.objects import utils as obj_utils
|
|
from oslo_versionedobjects import fields as obj_fields
|
|
import six
|
|
from sqlalchemy import func
|
|
@@ -70,7 +71,8 @@ class RouterRoute(base.NeutronDbObject):
|
|
@base.NeutronObjectRegistry.register
|
|
class RouterExtraAttributes(base.NeutronDbObject):
|
|
# Version 1.0: Initial version
|
|
- VERSION = '1.0'
|
|
+ # Version 1.1: Add configurations
|
|
+ VERSION = '1.1'
|
|
|
|
db_model = l3_attrs.RouterExtraAttributes
|
|
|
|
@@ -80,7 +82,8 @@ class RouterExtraAttributes(base.NeutronDbObject):
|
|
'service_router': obj_fields.BooleanField(default=False),
|
|
'ha': obj_fields.BooleanField(default=False),
|
|
'ha_vr_id': obj_fields.IntegerField(nullable=True),
|
|
- 'availability_zone_hints': obj_fields.ListOfStringsField(nullable=True)
|
|
+ 'availability_zone_hints': obj_fields.ListOfStringsField(nullable=True),
|
|
+ 'configurations': common_types.DictOfMiscValuesField(nullable=True),
|
|
}
|
|
|
|
primary_keys = ['router_id']
|
|
@@ -95,6 +98,9 @@ class RouterExtraAttributes(base.NeutronDbObject):
|
|
result[az_def.AZ_HINTS] = (
|
|
az_validator.convert_az_string_to_list(
|
|
result[az_def.AZ_HINTS]))
|
|
+ if 'configurations' in result:
|
|
+ result['configurations'] = cls.load_json_from_str(
|
|
+ result['configurations'], default={})
|
|
return result
|
|
|
|
@classmethod
|
|
@@ -104,6 +110,11 @@ class RouterExtraAttributes(base.NeutronDbObject):
|
|
result[az_def.AZ_HINTS] = (
|
|
az_validator.convert_az_list_to_string(
|
|
result[az_def.AZ_HINTS]))
|
|
+ if ('configurations' in result and
|
|
+ not isinstance(result['configurations'],
|
|
+ obj_utils.StringMatchingFilterObj)):
|
|
+ result['configurations'] = (
|
|
+ cls.filter_to_json_str(result['configurations']))
|
|
return result
|
|
|
|
@classmethod
|
|
diff --git a/scheduler/l3_agent_scheduler.py b/scheduler/l3_agent_scheduler.py
|
|
index 5810cf85b8..7a428aef03 100644
|
|
--- a/scheduler/l3_agent_scheduler.py
|
|
+++ b/scheduler/l3_agent_scheduler.py
|
|
@@ -14,13 +14,15 @@
|
|
# under the License.
|
|
|
|
import abc
|
|
-import collections
|
|
+import random
|
|
import functools
|
|
import itertools
|
|
-import random
|
|
+import collections
|
|
+from typing import List, Optional
|
|
|
|
from neutron_lib.api.definitions import availability_zone as az_def
|
|
from neutron_lib import constants as lib_const
|
|
+from neutron_lib.context import Context
|
|
from neutron_lib.db import api as lib_db_api
|
|
from neutron_lib.exceptions import l3 as l3_exc
|
|
from oslo_config import cfg
|
|
@@ -31,8 +33,9 @@ import six
|
|
from neutron.common import utils
|
|
from neutron.conf.db import l3_hamode_db
|
|
from neutron.db.models import l3agent as rb_model
|
|
+from neutron.objects.agent import Agent
|
|
from neutron.objects import l3agent as rb_obj
|
|
-
|
|
+from neutron.services.l3_router.l3_router_plugin import L3RouterPlugin
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
cfg.CONF.register_opts(l3_hamode_db.L3_HA_OPTS)
|
|
@@ -228,26 +231,25 @@ class L3Scheduler(object):
|
|
if not candidates:
|
|
return
|
|
elif sync_router.get('ha', False):
|
|
- chosen_agents = self._bind_ha_router(plugin, context,
|
|
- router_id,
|
|
- sync_router.get('tenant_id'),
|
|
+ chosen_agents = self._bind_ha_router(plugin, context, sync_router,
|
|
candidates)
|
|
if not chosen_agents:
|
|
return
|
|
chosen_agent = chosen_agents[-1]
|
|
else:
|
|
chosen_agent = self._choose_router_agent(
|
|
- plugin, context, candidates)
|
|
+ context, plugin, candidates, sync_router)
|
|
self.bind_router(plugin, context, router_id, chosen_agent.id)
|
|
return chosen_agent
|
|
|
|
@abc.abstractmethod
|
|
- def _choose_router_agent(self, plugin, context, candidates):
|
|
+ def _choose_router_agent(self, context, plugin, candidates, sync_router):
|
|
"""Choose an agent from candidates based on a specific policy."""
|
|
pass
|
|
|
|
@abc.abstractmethod
|
|
- def _choose_router_agents_for_ha(self, plugin, context, candidates):
|
|
+ def _choose_router_agents_for_ha(self, context, plugin, candidates,
|
|
+ sync_router):
|
|
"""Choose agents from candidates based on a specific policy."""
|
|
pass
|
|
|
|
@@ -315,19 +317,19 @@ class L3Scheduler(object):
|
|
hosting_list = [tuple(host) for host in hosting]
|
|
return list(set(candidates) - set(hosting_list))
|
|
|
|
- def _bind_ha_router(self, plugin, context, router_id,
|
|
- tenant_id, candidates):
|
|
+ def _bind_ha_router(self, plugin, context, sync_router, candidates):
|
|
"""Bind a HA router to agents based on a specific policy."""
|
|
-
|
|
+ router_id = sync_router.get('id')
|
|
+ tenant_id = sync_router.get('tenant_id')
|
|
candidates = self._filter_scheduled_agents(plugin, context, router_id,
|
|
candidates)
|
|
|
|
chosen_agents = self._choose_router_agents_for_ha(
|
|
- plugin, context, candidates)
|
|
+ context, plugin, candidates, sync_router)
|
|
|
|
for agent in chosen_agents:
|
|
- self.create_ha_port_and_bind(plugin, context, router_id,
|
|
- tenant_id, agent)
|
|
+ self.create_ha_port_and_bind(plugin, context, router_id, tenant_id,
|
|
+ agent)
|
|
|
|
return chosen_agents
|
|
|
|
@@ -335,10 +337,11 @@ class L3Scheduler(object):
|
|
class ChanceScheduler(L3Scheduler):
|
|
"""Randomly allocate an L3 agent for a router."""
|
|
|
|
- def _choose_router_agent(self, plugin, context, candidates):
|
|
+ def _choose_router_agent(self, context, plugin, candidates, sync_router):
|
|
return random.choice(candidates)
|
|
|
|
- def _choose_router_agents_for_ha(self, plugin, context, candidates):
|
|
+ def _choose_router_agents_for_ha(self, context, plugin, candidates,
|
|
+ sync_router):
|
|
num_agents = self._get_num_of_agents_for_ha(len(candidates))
|
|
return random.sample(candidates, num_agents)
|
|
|
|
@@ -346,13 +349,14 @@ class ChanceScheduler(L3Scheduler):
|
|
class LeastRoutersScheduler(L3Scheduler):
|
|
"""Allocate to an L3 agent with the least number of routers bound."""
|
|
|
|
- def _choose_router_agent(self, plugin, context, candidates):
|
|
+ def _choose_router_agent(self, context, plugin, candidates, sync_router):
|
|
candidate_ids = [candidate['id'] for candidate in candidates]
|
|
chosen_agent = plugin.get_l3_agent_with_min_routers(
|
|
context, candidate_ids)
|
|
return chosen_agent
|
|
|
|
- def _choose_router_agents_for_ha(self, plugin, context, candidates):
|
|
+ def _choose_router_agents_for_ha(self, context, plugin, candidates,
|
|
+ sync_router):
|
|
num_agents = self._get_num_of_agents_for_ha(len(candidates))
|
|
ordered_agents = plugin.get_l3_agents_ordered_by_num_routers(
|
|
context, [candidate['id'] for candidate in candidates])
|
|
@@ -397,7 +401,8 @@ class AZLeastRoutersScheduler(LeastRoutersScheduler):
|
|
|
|
return candidates
|
|
|
|
- def _choose_router_agents_for_ha(self, plugin, context, candidates):
|
|
+ def _choose_router_agents_for_ha(self, context, plugin, candidates,
|
|
+ sync_router):
|
|
ordered_agents = plugin.get_l3_agents_ordered_by_num_routers(
|
|
context, [candidate['id'] for candidate in candidates])
|
|
num_agents = self._get_num_of_agents_for_ha(len(ordered_agents))
|
|
@@ -416,3 +421,65 @@ class AZLeastRoutersScheduler(LeastRoutersScheduler):
|
|
if len(selected_agents) >= num_agents:
|
|
break
|
|
return selected_agents
|
|
+
|
|
+
|
|
+class PreferredL3AgentRoutersScheduler(LeastRoutersScheduler):
|
|
+
|
|
+ @staticmethod
|
|
+ def get_preferred_agent(sync_router: dict) -> Optional[str]:
|
|
+ configurations = sync_router.get('configurations', {})
|
|
+ if configurations:
|
|
+ return configurations.get('preferred_agent', None)
|
|
+ return None
|
|
+
|
|
+ @staticmethod
|
|
+ def get_agents(sync_router: dict) -> Optional[List[str]]:
|
|
+ configurations = sync_router.get('configurations', {})
|
|
+ if configurations:
|
|
+ master = configurations.get('master_agent', None)
|
|
+ slaves = configurations.get('slave_agents', [])
|
|
+ if master and slaves:
|
|
+ return slaves + [master]
|
|
+ return []
|
|
+
|
|
+ def _choose_router_agent(self, context: Context,
|
|
+ plugin: L3RouterPlugin,
|
|
+ candidates: List[Agent],
|
|
+ sync_router: dict) -> Agent:
|
|
+ agent = self.get_preferred_agent(sync_router)
|
|
+ if agent:
|
|
+ new_candidates = [candidate for candidate in candidates
|
|
+ if candidate['host'] == agent]
|
|
+ if not new_candidates:
|
|
+ LOG.warning(f"Router {sync_router['id']} failed to "
|
|
+ f"schedule l3 agent on {agent}.")
|
|
+ else:
|
|
+ agent = new_candidates[0]
|
|
+ LOG.debug(f"Router {sync_router['id']} l3 agent is {agent}.")
|
|
+ return agent
|
|
+ agent = super()._choose_router_agent(context, plugin, candidates,
|
|
+ sync_router)
|
|
+ return agent
|
|
+
|
|
+ def _choose_router_agents_for_ha(self, context: Context,
|
|
+ plugin: L3RouterPlugin,
|
|
+ candidates: List[Agent],
|
|
+ sync_router: dict) -> List[Agent]:
|
|
+
|
|
+ agents = self.get_agents(sync_router)
|
|
+ if agents:
|
|
+ if self.max_ha_agents < len(agents):
|
|
+ agents = agents[len(agents) - self.max_ha_agents:]
|
|
+ new_candidates = [candidate for candidate in candidates if
|
|
+ candidate['host'] in agents]
|
|
+ if len(new_candidates) != len(agents):
|
|
+ LOG.warning(f"Router {sync_router['id']} failed to "
|
|
+ f"schedule l3 agents on {agents}.")
|
|
+ else:
|
|
+ LOG.debug(f"Router {sync_router['id']} l3 agents are "
|
|
+ f"{new_candidates}.")
|
|
+ return new_candidates
|
|
+
|
|
+ return super(
|
|
+ PreferredL3AgentRoutersScheduler, self
|
|
+ )._choose_router_agents_for_ha(context, plugin, candidates, sync_router)
|
|
diff --git a/services/l3_router/l3_router_plugin.py b/services/l3_router/l3_router_plugin.py
|
|
index 2e8a762764..9825138261 100644
|
|
--- a/services/l3_router/l3_router_plugin.py
|
|
+++ b/services/l3_router/l3_router_plugin.py
|
|
@@ -38,6 +38,7 @@ from oslo_utils import importutils
|
|
|
|
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
|
|
from neutron.api.rpc.handlers import l3_rpc
|
|
+from neutron.conf.common import NETWORK_HOST_OPTS
|
|
from neutron.db import dns_db
|
|
from neutron.db import extraroute_db
|
|
from neutron.db import l3_dvr_ha_scheduler_db
|
|
@@ -135,6 +136,17 @@ class L3RouterPlugin(service_base.ServicePluginBase,
|
|
|
|
self.add_worker(rpc_worker)
|
|
self.l3_driver_controller = driver_controller.DriverController(self)
|
|
+ self.compute_to_network = dict()
|
|
+ self._init_compute_to_network()
|
|
+
|
|
+ def _init_compute_to_network(self):
|
|
+ for network_node in cfg.CONF.network_nodes:
|
|
+ cfg.CONF.register_opts(NETWORK_HOST_OPTS, group=network_node)
|
|
+ network_group = cfg.CONF.get(network_node, None)
|
|
+ if network_group:
|
|
+ compute_nodes = network_group.get('compute_nodes', [])
|
|
+ for compute_node in compute_nodes:
|
|
+ self.compute_to_network[compute_node] = network_node
|
|
|
|
@property
|
|
def supported_extension_aliases(self):
|
|
diff --git a/services/rg_portforwarding/__init__.py b/services/rg_portforwarding/__init__.py
|
|
new file mode 100644
|
|
index 0000000000..e69de29bb2
|
|
diff --git a/services/rg_portforwarding/common/__init__.py b/services/rg_portforwarding/common/__init__.py
|
|
new file mode 100644
|
|
index 0000000000..e69de29bb2
|
|
diff --git a/services/rg_portforwarding/common/exceptions.py b/services/rg_portforwarding/common/exceptions.py
|
|
new file mode 100644
|
|
index 0000000000..73cea68e32
|
|
--- /dev/null
|
|
+++ b/services/rg_portforwarding/common/exceptions.py
|
|
@@ -0,0 +1,77 @@
|
|
+# Copyright (c) 2023 UnionTech
|
|
+# All rights reserved
|
|
+#
|
|
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
+# not use this file except in compliance with the License. You may obtain
|
|
+# a copy of the License at
|
|
+#
|
|
+# http://www.apache.org/licenses/LICENSE-2.0
|
|
+#
|
|
+# Unless required by applicable law or agreed to in writing, software
|
|
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
+# License for the specific language governing permissions and limitations
|
|
+# under the License.
|
|
+
|
|
+from neutron._i18n import _
|
|
+from neutron_lib import exceptions
|
|
+
|
|
+
|
|
+class PortForwardingNotSupportFilterField(exceptions.BadRequest):
|
|
+ message = _("Port Forwarding filter %(filter)s is not supported.")
|
|
+
|
|
+
|
|
+class RouterDoesNotHaveGateway(exceptions.BadRequest):
|
|
+ message = _("Router %(router_id)s does not have any gateways.")
|
|
+
|
|
+
|
|
+class RouterGatewayPortNotFound(exceptions.NotFound):
|
|
+ message = _("Router %(router_id)s 's gateway port %(gw_port_id)s "
|
|
+ "could not be found.")
|
|
+
|
|
+
|
|
+class RouterGatewayPortDoesNotHaveAnyIPAddresses(exceptions.NotFound):
|
|
+ message = _("Router %(router_id)s 's gateway port %(gw_port_id)s "
|
|
+ "does not have any IP addresses.")
|
|
+
|
|
+
|
|
+class RouterGatewayPortForwardingNotFound(exceptions.NotFound):
|
|
+ message = _("Router Gateway Port Forwarding %(id)s could not be found.")
|
|
+
|
|
+
|
|
+class PortHasBindingFloatingIP(exceptions.InUse):
|
|
+ message = _("Cannot create port forwarding to floating IP "
|
|
+ "%(floating_ip_address)s (%(fip_id)s) with port %(port_id)s "
|
|
+ "using fixed IP %(fixed_ip)s, as that port already "
|
|
+ "has a binding floating IP.")
|
|
+
|
|
+
|
|
+class InconsistentPortAndIP(exceptions.BadRequest):
|
|
+ message = _("Port %(port_id)s does not have ip address %(ip_address)s.")
|
|
+
|
|
+
|
|
+class RouterGatewayPortForwardingAlreadyExists(exceptions.BadRequest):
|
|
+ message = _("A duplicate router gateway port forwarding entry "
|
|
+ "with same attributes already exists, "
|
|
+ "conflicting values are %(conflict)s.")
|
|
+
|
|
+
|
|
+class PortNetworkNotBindOnRouter(exceptions.BadRequest):
|
|
+ message = _("Port %(port_id)s 's network %(network_id)s "
|
|
+ "not bind on router %(router_id)s.")
|
|
+
|
|
+
|
|
+class RouterGatewayPortForwardingUpdateFailed(exceptions.BadRequest):
|
|
+ message = _("Another router port forwarding entry with the same "
|
|
+ "attributes already exists, conflicting "
|
|
+ "values are %(conflict)s.")
|
|
+
|
|
+
|
|
+class DeletedRouterWithRGForwarding(exceptions.InUse):
|
|
+ message = _("Cant not delete router, "
|
|
+ "router %(router_id)s has port forwardings to remove.")
|
|
+
|
|
+
|
|
+class DeletedRouterGatewayWithRGForwarding(exceptions.InUse):
|
|
+ message = _("Cant not delete or update router gateway, "
|
|
+ "router %(router_id)s has port forwardings to remove.")
|
|
diff --git a/services/rg_portforwarding/pf_plugin.py b/services/rg_portforwarding/pf_plugin.py
|
|
new file mode 100644
|
|
index 0000000000..ed8e68e53c
|
|
--- /dev/null
|
|
+++ b/services/rg_portforwarding/pf_plugin.py
|
|
@@ -0,0 +1,369 @@
|
|
+# Copyright (c) 2023 UnionTech
|
|
+# All rights reserved
|
|
+#
|
|
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
+# not use this file except in compliance with the License. You may obtain
|
|
+# a copy of the License at
|
|
+#
|
|
+# http://www.apache.org/licenses/LICENSE-2.0
|
|
+#
|
|
+# Unless required by applicable law or agreed to in writing, software
|
|
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
+# License for the specific language governing permissions and limitations
|
|
+# under the License.
|
|
+
|
|
+
|
|
+from oslo_log import log as logging
|
|
+from typing import List, Dict, Optional
|
|
+
|
|
+from neutron_lib import constants
|
|
+from neutron_lib.context import Context
|
|
+from neutron_lib.plugins import directory
|
|
+from neutron_lib.db import resource_extend
|
|
+from neutron_lib.plugins.constants import L3
|
|
+from neutron_lib.db.api import CONTEXT_WRITER
|
|
+from neutron_lib.exceptions import PortNotFound
|
|
+from neutron_lib.exceptions.l3 import RouterNotFound
|
|
+from neutron_lib.callbacks import registry, resources
|
|
+from neutron_lib.callbacks import events as lib_events
|
|
+from neutron_lib.callbacks.events import DBEventPayload
|
|
+from neutron_lib.api.definitions import rg_port_forwarding as apidef
|
|
+from neutron_lib.objects.exceptions import NeutronDbObjectDuplicateEntry
|
|
+
|
|
+from neutron.db import db_base_plugin_common
|
|
+from neutron.db.l3_dvr_db import is_distributed_router
|
|
+from neutron.db.l3_hamode_db import is_ha_router
|
|
+
|
|
+from neutron.objects.base import Pager
|
|
+from neutron.objects.ports import Port
|
|
+from neutron.objects.router import Router, FloatingIP
|
|
+from neutron.objects.rg_port_forwarding import RGPortForwarding
|
|
+from neutron.objects.rg_port_forwarding import FIELDS_NOT_SUPPORT_FILTER
|
|
+
|
|
+from neutron.extensions.rg_port_forwarding import RGPortForwardingPluginBase
|
|
+from neutron.services.l3_router.l3_router_plugin import L3RouterPlugin
|
|
+from neutron.services.rg_portforwarding.common import exceptions
|
|
+
|
|
+from neutron.api.rpc.callbacks import events
|
|
+from neutron.api.rpc.handlers import resources_rpc
|
|
+
|
|
+LOG = logging.getLogger(__name__)
|
|
+
|
|
+
|
|
+@resource_extend.has_resource_extenders
|
|
+@registry.has_registry_receivers
|
|
+class RGPortForwardingPlugin(RGPortForwardingPluginBase):
|
|
+ required_service_plugins = ['router']
|
|
+
|
|
+ supported_extension_aliases = [apidef.ALIAS]
|
|
+
|
|
+ __native_pagination_support = True
|
|
+ __native_sorting_support = True
|
|
+ __filter_validation_support = True
|
|
+
|
|
+ def __init__(self):
|
|
+ super(RGPortForwardingPlugin, self).__init__()
|
|
+ self.push_api = resources_rpc.ResourcesPushRpcApi()
|
|
+ self.l3_plugin = directory.get_plugin(L3)
|
|
+ self.core_plugin = directory.get_plugin()
|
|
+
|
|
+ @staticmethod
|
|
+ def _get_router(context: Context, router_id: str) -> Optional[Router]:
|
|
+ router = Router.get_object(context, id=router_id)
|
|
+ if not router:
|
|
+ raise RouterNotFound(router_id=router_id)
|
|
+ return router
|
|
+
|
|
+ @staticmethod
|
|
+ def _get_router_gateway(context: Context, router: Router) -> str:
|
|
+ gw_port_id = router.get('gw_port_id', None)
|
|
+ if not gw_port_id:
|
|
+ raise exceptions.RouterDoesNotHaveGateway(router_id=router.id)
|
|
+ gw_port = Port.get_object(
|
|
+ context, id=gw_port_id)
|
|
+ if not gw_port:
|
|
+ raise exceptions.RouterGatewayPortNotFound(router_id=router.id,
|
|
+ gw_port_id=gw_port_id)
|
|
+ gw_port_ips = gw_port.get("fixed_ips", [])
|
|
+ if len(gw_port_ips) <= 0:
|
|
+ raise exceptions.RouterGatewayPortDoesNotHaveAnyIPAddresses(
|
|
+ router_id=router.id, gw_port_id=gw_port_id)
|
|
+ gw_ip_address = gw_port_ips[0].get('ip_address')
|
|
+ return gw_ip_address
|
|
+
|
|
+ @staticmethod
|
|
+ def _get_port(context: Context, port_id: str) -> Optional[Port]:
|
|
+ port = Port.get_object(context, id=port_id)
|
|
+ if not port:
|
|
+ raise PortNotFound(port_id=port_id)
|
|
+ return port
|
|
+
|
|
+ @staticmethod
|
|
+ def _get_ports(context: Context, router_id: str, port: Port,
|
|
+ device_owner: str) -> Optional[List[Port]]:
|
|
+ ports = Port.get_ports_by_router_and_network(
|
|
+ context, router_id, device_owner, port.network_id)
|
|
+ if not ports:
|
|
+ raise exceptions.PortNetworkNotBindOnRouter(
|
|
+ port_id=port.id,
|
|
+ network_id=port.network_id,
|
|
+ router_id=router_id)
|
|
+ return ports
|
|
+
|
|
+ @staticmethod
|
|
+ def _validate_filter_for_port_forwarding(filters: Dict[str, str]) -> None:
|
|
+ if not filters:
|
|
+ return
|
|
+ for filter_member_key in filters.keys():
|
|
+ if filter_member_key in FIELDS_NOT_SUPPORT_FILTER:
|
|
+ raise exceptions.PortForwardingNotSupportFilterField(
|
|
+ filter=filter_member_key)
|
|
+
|
|
+ @staticmethod
|
|
+ def _check_port_has_binding_floating_ip(context: Context, port_id: str,
|
|
+ ip_address: str) -> None:
|
|
+ floatingip_objs = FloatingIP.get_objects(
|
|
+ context.elevated(),
|
|
+ fixed_port_id=port_id)
|
|
+ if floatingip_objs:
|
|
+ floating_ip_address = floatingip_objs[0].floating_ip_address
|
|
+ raise exceptions.PortHasBindingFloatingIP(
|
|
+ floating_ip_address=floating_ip_address,
|
|
+ fip_id=floatingip_objs[0].id,
|
|
+ port_id=port_id,
|
|
+ fixed_ip=ip_address)
|
|
+
|
|
+ @staticmethod
|
|
+ def _get_device_owner(router: Router) -> str:
|
|
+ if is_distributed_router(router):
|
|
+ return constants.DEVICE_OWNER_DVR_INTERFACE
|
|
+ elif is_ha_router(router):
|
|
+ return constants.DEVICE_OWNER_HA_REPLICATED_INT
|
|
+ return constants.DEVICE_OWNER_ROUTER_INTF
|
|
+
|
|
+ def _check_router_port(self, context: Context, router: Router,
|
|
+ port: Port):
|
|
+ device_owner = self._get_device_owner(router)
|
|
+ self._get_ports(context, router.id, port, device_owner)
|
|
+
|
|
+ def _check_port(self, context: Context, port_id: str, ip: str) -> Port:
|
|
+ port = self._get_port(context, port_id)
|
|
+ self._check_port_has_binding_floating_ip(context, port_id, ip)
|
|
+ fixed_ips = port.get('fixed_ips', [])
|
|
+ result = list(map(lambda x: str(x.get('ip_address')) == ip, fixed_ips))
|
|
+ if not any(result):
|
|
+ raise exceptions.InconsistentPortAndIP(port_id=port, ip_address=ip)
|
|
+ return port
|
|
+
|
|
+ def _check_router(self, context: Context, router_id: str) -> (Router, str):
|
|
+ router = self._get_router(context, router_id)
|
|
+ gw_ip_address = self._get_router_gateway(context, router)
|
|
+ return router, gw_ip_address
|
|
+
|
|
+ def _check_port_forwarding_create(self, context: Context, router_id: str,
|
|
+ pf_dict: Dict) -> None:
|
|
+ router, gw_ip_address = self._check_router(context, router_id)
|
|
+ pf_dict['router_id'] = router_id
|
|
+ pf_dict[apidef.GW_IP_ADDRESS] = gw_ip_address
|
|
+ internal_port_id = pf_dict[apidef.INTERNAL_PORT_ID]
|
|
+ internal_ip_address = pf_dict[apidef.INTERNAL_IP_ADDRESS]
|
|
+ internal_port = self._check_port(context, internal_port_id,
|
|
+ internal_ip_address)
|
|
+ self._check_router_port(context, router, internal_port)
|
|
+
|
|
+ @staticmethod
|
|
+ def _check_port_forwarding(context: Context, pf_obj: RGPortForwarding):
|
|
+ pf_objs = RGPortForwarding.get_objects(
|
|
+ context,
|
|
+ router_id=pf_obj.router_id,
|
|
+ protocol=pf_obj.protocol)
|
|
+
|
|
+ for obj in pf_objs:
|
|
+ if obj.id == pf_obj.get('id', None):
|
|
+ continue
|
|
+ # Ensure there are no conflicts on the outside
|
|
+ if obj.external_port == pf_obj.external_port:
|
|
+ raise exceptions.RouterGatewayPortForwardingAlreadyExists(
|
|
+ conflict={
|
|
+ 'router_id': pf_obj.router_id,
|
|
+ 'protocol': pf_obj.protocol,
|
|
+ 'external_port': obj.external_port,
|
|
+ }
|
|
+ )
|
|
+ # Ensure there are no conflicts in the inside
|
|
+ # socket: internal_ip_address + internal_port
|
|
+ if (obj.internal_port_id == pf_obj.internal_port_id and
|
|
+ obj.internal_ip_address == pf_obj.internal_ip_address and
|
|
+ obj.internal_port == pf_obj.internal_port):
|
|
+ raise exceptions.RouterGatewayPortForwardingAlreadyExists(
|
|
+ conflict={
|
|
+ 'router_id': pf_obj.router_id,
|
|
+ 'protocol': pf_obj.protocol,
|
|
+ 'internal_port_id': obj.internal_port_id,
|
|
+ 'internal_ip_address': str(obj.internal_ip_address),
|
|
+ 'internal_port': obj.internal_port
|
|
+ }
|
|
+ )
|
|
+
|
|
+ @staticmethod
|
|
+ def _find_existing_rg_port_forwarding(context: Context,
|
|
+ router_id: str,
|
|
+ port_forwarding: Dict,
|
|
+ specify_params: List = None):
|
|
+ # Because the session had been flushed by NeutronDbObjectDuplicateEntry
|
|
+ # so if we want to use the context to get another db queries, we need
|
|
+ # to rollback first.
|
|
+ context.session.rollback()
|
|
+ if not specify_params:
|
|
+ specify_params = [
|
|
+ {
|
|
+ 'router_id': router_id,
|
|
+ 'external_port': port_forwarding['external_port'],
|
|
+ 'protocol': port_forwarding['protocol']
|
|
+ },
|
|
+ {
|
|
+ 'internal_port_id': port_forwarding['internal_port_id'],
|
|
+ 'internal_ip_address': port_forwarding[
|
|
+ 'internal_ip_address'],
|
|
+ 'internal_port': port_forwarding['internal_port'],
|
|
+ 'protocol': port_forwarding['protocol']
|
|
+ }]
|
|
+ for param in specify_params:
|
|
+ objs = RGPortForwarding.get_objects(context, **param)
|
|
+ if objs:
|
|
+ return objs[0], param
|
|
+
|
|
+ @db_base_plugin_common.make_result_with_fields
|
|
+ @db_base_plugin_common.convert_result_to_dict
|
|
+ def get_router_gateway_port_forwardings(self, context: Context,
|
|
+ router_id: str,
|
|
+ filters: List[str] = None,
|
|
+ fields: List[str] = None,
|
|
+ sorts: List[str] = None,
|
|
+ limit: int = None,
|
|
+ marker: str = None,
|
|
+ page_reverse: bool = False):
|
|
+
|
|
+ router, gw_ip_address = self._check_router(context, router_id)
|
|
+ filters = filters or {}
|
|
+ self._validate_filter_for_port_forwarding(filters)
|
|
+ pager = Pager(sorts, limit, page_reverse, marker)
|
|
+ port_forwardings = RGPortForwarding.get_objects(
|
|
+ context, _pager=pager, router_id=router_id, **filters)
|
|
+ for pf in port_forwardings:
|
|
+ setattr(pf, 'gw_ip_address', gw_ip_address)
|
|
+ return port_forwardings
|
|
+
|
|
+ @db_base_plugin_common.convert_result_to_dict
|
|
+ def create_router_gateway_port_forwarding(self, context: Context,
|
|
+ router_id: str,
|
|
+ gateway_port_forwarding: dict):
|
|
+ port_forwarding = gateway_port_forwarding.get(apidef.RESOURCE_NAME)
|
|
+ self._check_port_forwarding_create(context, router_id, port_forwarding)
|
|
+ with CONTEXT_WRITER.using(context):
|
|
+ pf_obj = RGPortForwarding(context, **port_forwarding)
|
|
+ self._check_port_forwarding(context, pf_obj)
|
|
+ try:
|
|
+ pf_obj.create()
|
|
+ except NeutronDbObjectDuplicateEntry:
|
|
+ _, conflict = self._find_existing_rg_port_forwarding(
|
|
+ context, router_id, port_forwarding)
|
|
+ raise exceptions.RouterGatewayPortForwardingAlreadyExists(
|
|
+ conflict=conflict
|
|
+ )
|
|
+ self.push_api.push(context, [pf_obj], events.CREATED)
|
|
+ return pf_obj
|
|
+
|
|
+ @db_base_plugin_common.convert_result_to_dict
|
|
+ def update_router_gateway_port_forwarding(self, context: Context, id: str,
|
|
+ router_id: str,
|
|
+ gateway_port_forwarding: dict):
|
|
+
|
|
+ router = self._get_router(context, router_id)
|
|
+ gw_ip_address = self._get_router_gateway(context, router)
|
|
+ pf_obj = RGPortForwarding.get_object(context, id=id)
|
|
+ if not pf_obj:
|
|
+ raise exceptions.RouterGatewayPortForwardingNotFound(id=id)
|
|
+
|
|
+ port_forwarding = gateway_port_forwarding.get(apidef.RESOURCE_NAME, {})
|
|
+ port_forwarding[apidef.GW_IP_ADDRESS] = gw_ip_address
|
|
+ new_port_id = port_forwarding.get(apidef.INTERNAL_PORT_ID)
|
|
+ new_internal_ip = port_forwarding.get(apidef.INTERNAL_IP_ADDRESS, None)
|
|
+
|
|
+ if new_port_id and new_port_id != pf_obj.internal_port_id:
|
|
+ self._check_port_has_binding_floating_ip(context,
|
|
+ new_port_id,
|
|
+ new_internal_ip)
|
|
+
|
|
+ if any([new_internal_ip, new_port_id]):
|
|
+ port_forwarding.update({
|
|
+ apidef.INTERNAL_IP_ADDRESS: new_internal_ip
|
|
+ if new_internal_ip else
|
|
+ str(pf_obj.internal_ip_address),
|
|
+ apidef.INTERNAL_PORT_ID: new_port_id
|
|
+ if new_port_id else pf_obj.internal_port
|
|
+ })
|
|
+
|
|
+ with CONTEXT_WRITER.using(context):
|
|
+ pf_obj.update_fields(port_forwarding, reset_changes=True)
|
|
+ self._check_port_forwarding(context, pf_obj)
|
|
+ try:
|
|
+ pf_obj.update()
|
|
+ except NeutronDbObjectDuplicateEntry:
|
|
+ _, conflict = self._find_existing_rg_port_forwarding(
|
|
+ context, router_id, port_forwarding)
|
|
+ raise exceptions.RouterGatewayPortForwardingAlreadyExists(
|
|
+ conflict=conflict
|
|
+ )
|
|
+ self.push_api.push(context, [pf_obj], events.UPDATED)
|
|
+ return pf_obj
|
|
+
|
|
+ @db_base_plugin_common.make_result_with_fields
|
|
+ @db_base_plugin_common.convert_result_to_dict
|
|
+ def get_router_gateway_port_forwarding(self, context: Context, id: str,
|
|
+ router_id: str,
|
|
+ fields: List[str] = None):
|
|
+ _, gw_ip_address = self._check_router(context, router_id)
|
|
+ pf_obj = RGPortForwarding.get_object(context, id=id)
|
|
+ if not pf_obj:
|
|
+ raise exceptions.RouterGatewayPortForwardingNotFound(id=id)
|
|
+ setattr(pf_obj, apidef.GW_IP_ADDRESS, gw_ip_address)
|
|
+ return pf_obj
|
|
+
|
|
+ def delete_router_gateway_port_forwarding(self, context: Context, id: str,
|
|
+ router_id: str):
|
|
+ pf_obj = RGPortForwarding.get_object(context, id=id)
|
|
+ if not pf_obj:
|
|
+ raise exceptions.RouterGatewayPortForwardingNotFound(id=id)
|
|
+ with CONTEXT_WRITER.using(context):
|
|
+ pf_obj.delete()
|
|
+ self.push_api.push(context, [pf_obj], events.DELETED)
|
|
+
|
|
+ @registry.receives(resources.ROUTER, [lib_events.BEFORE_DELETE])
|
|
+ def _receive_router_before_delete(self, resource: str, event: str,
|
|
+ trigger: L3RouterPlugin,
|
|
+ payload: DBEventPayload):
|
|
+ router_id = payload.resource_id
|
|
+ context = payload.context
|
|
+ port_forwardings = RGPortForwarding.get_objects(context,
|
|
+ router_id=router_id)
|
|
+ if port_forwardings:
|
|
+ ex = exceptions.DeletedRouterWithRGForwarding(router_id=router_id)
|
|
+ LOG.info(ex.msg)
|
|
+ raise ex
|
|
+
|
|
+ @registry.receives(resources.ROUTER_GATEWAY, [lib_events.BEFORE_DELETE,
|
|
+ lib_events.BEFORE_UPDATE])
|
|
+ def _receive_router_gateway_before_delete(self, resource: str, event: str,
|
|
+ trigger: L3RouterPlugin,
|
|
+ payload: DBEventPayload):
|
|
+ router_id = payload.resource_id
|
|
+ context = payload.context
|
|
+ port_forwardings = RGPortForwarding.get_objects(context,
|
|
+ router_id=router_id)
|
|
+ if port_forwardings:
|
|
+ ex = exceptions.DeletedRouterGatewayWithRGForwarding(
|
|
+ router_id=router_id)
|
|
+ LOG.info(ex.msg)
|
|
+ raise ex
|