Tower: upload cetmix_tower_webhook 16.0.1.0.5 (via marketplace)
This commit is contained in:
608
addons/cetmix_tower_webhook/tests/test_webhook_controller.py
Normal file
608
addons/cetmix_tower_webhook/tests/test_webhook_controller.py
Normal file
@@ -0,0 +1,608 @@
|
||||
# Copyright (C) 2025 Cetmix OÜ
|
||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
||||
|
||||
import json
|
||||
from unittest.mock import patch
|
||||
|
||||
from odoo.tests import HttpCase, tagged
|
||||
|
||||
|
||||
@tagged("-at_install", "post_install")
|
||||
class TestCxTowerWebhookController(HttpCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
env = cls.env
|
||||
# Authenticator that always allows requests
|
||||
cls.authenticator = env["cx.tower.webhook.authenticator"].create(
|
||||
{"name": "Always OK", "code": "result = {'allowed': True}"}
|
||||
)
|
||||
# POST webhook
|
||||
cls.webhook_post = env["cx.tower.webhook"].create(
|
||||
{
|
||||
"name": "Test Webhook POST",
|
||||
"endpoint": "webhook_post",
|
||||
"method": "post",
|
||||
"authenticator_id": cls.authenticator.id,
|
||||
"code": "result = {'exit_code': 0, 'message': 'POST ok'}",
|
||||
}
|
||||
)
|
||||
# GET webhook
|
||||
cls.webhook_get = env["cx.tower.webhook"].create(
|
||||
{
|
||||
"name": "Test Webhook GET",
|
||||
"endpoint": "webhook_get",
|
||||
"method": "get",
|
||||
"authenticator_id": cls.authenticator.id,
|
||||
"code": "result = {'exit_code': 0, 'message': 'GET ok'}",
|
||||
}
|
||||
)
|
||||
# Log model
|
||||
cls.Log = env["cx.tower.webhook.log"]
|
||||
|
||||
def url_for(self, endpoint):
|
||||
"""Helper to build webhook url"""
|
||||
url = f"/cetmix_tower_webhooks/{endpoint}"
|
||||
return self.base_url() + url
|
||||
|
||||
def assert_log(self, log=None, request_payload=None, **expected):
|
||||
"""
|
||||
Universal log checker for webhook log model.
|
||||
Checks expected field values and substrings.
|
||||
"""
|
||||
self.assertIsNotNone(log, "Log record was not created")
|
||||
if request_payload is not None:
|
||||
try:
|
||||
log_payload = log.request_payload
|
||||
# try to convert both to Python dict for comparison
|
||||
if isinstance(log_payload, str):
|
||||
log_payload = log_payload.strip()
|
||||
self.assertDictEqual(
|
||||
json.loads(
|
||||
log_payload.replace("'", '"')
|
||||
), # try to make JSON from possible str(dict)
|
||||
json.loads(request_payload),
|
||||
)
|
||||
except Exception as ex:
|
||||
self.fail(
|
||||
f"Payload comparison failed: {ex}\nLog: {log.request_payload}\nExpected: {request_payload}" # noqa: E501
|
||||
)
|
||||
for field, value in expected.items():
|
||||
if field == "request_payload":
|
||||
continue # Already checked
|
||||
actual = getattr(log, field)
|
||||
self.assertEqual(actual, value, f"{field}: expected {value}, got {actual}")
|
||||
|
||||
def test_post_webhook_success(self):
|
||||
"""Success test for POST request with correct payload."""
|
||||
data = json.dumps({"some": "data"})
|
||||
response = self.url_open(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
data=data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertIn(b"POST ok", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", self.webhook_post.id)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="success",
|
||||
authentication_status="success",
|
||||
http_status=200,
|
||||
endpoint=self.webhook_post.endpoint,
|
||||
request_payload=data,
|
||||
)
|
||||
|
||||
def test_get_webhook_success(self):
|
||||
"""Success test for GET request with correct payload."""
|
||||
response = self.url_open(
|
||||
f"{self.url_for(self.webhook_get.endpoint)}?foo=bar",
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertIn(b"GET ok", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", self.webhook_get.id)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="success",
|
||||
authentication_status="success",
|
||||
http_status=200,
|
||||
endpoint=self.webhook_get.endpoint,
|
||||
)
|
||||
self.assertIn("foo", log.request_payload)
|
||||
|
||||
def test_webhook_not_found(self):
|
||||
"""Test request to a non-existing webhook endpoint."""
|
||||
data = json.dumps({"test": 1})
|
||||
response = self.url_open(
|
||||
self.url_for("missing"),
|
||||
data=data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 404)
|
||||
self.assertIn(b"Webhook not found", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", False)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="skipped",
|
||||
authentication_status="failed",
|
||||
http_status=404,
|
||||
endpoint="missing",
|
||||
error_message="Webhook not found",
|
||||
request_payload=data,
|
||||
)
|
||||
|
||||
def test_wrong_method(self):
|
||||
"""
|
||||
Test GET request to POST-only webhook.
|
||||
"""
|
||||
response = self.url_open(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
)
|
||||
self.assertEqual(response.status_code, 404)
|
||||
self.assertIn(b"Webhook not found", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", False)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="skipped",
|
||||
authentication_status="failed",
|
||||
http_status=404,
|
||||
error_message="Webhook not found",
|
||||
endpoint=self.webhook_post.endpoint,
|
||||
request_method="get",
|
||||
)
|
||||
|
||||
def test_missing_payload_post(self):
|
||||
"""
|
||||
Test POST request with empty payload.
|
||||
"""
|
||||
# use opener instead of url_open to avoid checking of data
|
||||
response = self.opener.post(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
timeout=1200000,
|
||||
headers={"Content-Type": "application/json"},
|
||||
allow_redirects=True,
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertIn(b"POST ok", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", self.webhook_post.id)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="success",
|
||||
authentication_status="success",
|
||||
http_status=200,
|
||||
endpoint=self.webhook_post.endpoint,
|
||||
request_payload="{}",
|
||||
)
|
||||
|
||||
def test_authentication_failed(self):
|
||||
"""
|
||||
Test POST request with authenticator that always denies.
|
||||
"""
|
||||
bad_auth = self.env["cx.tower.webhook.authenticator"].create(
|
||||
{
|
||||
"name": "Never OK",
|
||||
"code": "result = {'allowed': False, 'custom_message': 'Forbidden'}",
|
||||
}
|
||||
)
|
||||
webhook = self.env["cx.tower.webhook"].create(
|
||||
{
|
||||
"name": "Forbidden Webhook",
|
||||
"endpoint": "forbidden",
|
||||
"method": "post",
|
||||
"authenticator_id": bad_auth.id,
|
||||
"code": "result = {'exit_code': 0, 'message': 'Should not run'}",
|
||||
}
|
||||
)
|
||||
data = json.dumps({"fail": 1})
|
||||
response = self.url_open(
|
||||
self.url_for(webhook.endpoint),
|
||||
data=data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertIn(b"Authentication not allowed", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", webhook.id)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="skipped",
|
||||
authentication_status="failed",
|
||||
http_status=403,
|
||||
endpoint=webhook.endpoint,
|
||||
request_payload=data,
|
||||
)
|
||||
|
||||
def test_webhook_code_failure(self):
|
||||
"""
|
||||
Test POST request to a webhook that raises an exception in code.
|
||||
"""
|
||||
self.webhook_post.code = "raise Exception('Some error!')"
|
||||
response = self.url_open(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
data=json.dumps({}),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn(b"Some error!", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", self.webhook_post.id)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="failed",
|
||||
authentication_status="success",
|
||||
http_status=500,
|
||||
endpoint=self.webhook_post.endpoint,
|
||||
request_payload="{}",
|
||||
)
|
||||
self.assertIn("Some error!", log.error_message)
|
||||
|
||||
def test_json_headers_are_stored(self):
|
||||
"""
|
||||
Test that request headers and payload are saved in webhook log record.
|
||||
"""
|
||||
payload = {"foo": "bar"}
|
||||
headers = {"X-Test-Header": "xxx", "Content-Type": "application/json"}
|
||||
response = self.url_open(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
data=json.dumps(payload),
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", self.webhook_post.id)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="success",
|
||||
authentication_status="success",
|
||||
http_status=200,
|
||||
endpoint=self.webhook_post.endpoint,
|
||||
)
|
||||
self.assertIn("foo", log.request_payload)
|
||||
self.assertIn("X-Test-Header", log.request_headers)
|
||||
self.assertIn(log.result_message, response.text)
|
||||
|
||||
def test_log_contains_ip(self):
|
||||
"""
|
||||
Test that the log contains the client's IP address and country (if available).
|
||||
"""
|
||||
payload = {"check": "ip"}
|
||||
self.url_open(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
data=json.dumps(payload),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", self.webhook_post.id)])
|
||||
self.assertTrue(log.ip_address)
|
||||
|
||||
def test_inactive_webhook(self):
|
||||
"""Test that inactive webhooks are not callable."""
|
||||
self.webhook_post.active = False
|
||||
response = self.url_open(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
data=json.dumps({"a": 1}),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 404)
|
||||
self.assertIn(b"Webhook not found", response.content)
|
||||
|
||||
def test_authenticator_code_raises(self):
|
||||
"""
|
||||
Test that if authenticator's code raises an error,
|
||||
proper log is created and 403 returned.
|
||||
"""
|
||||
bad_auth = self.env["cx.tower.webhook.authenticator"].create(
|
||||
{"name": "Broken Auth", "code": "raise Exception('auth fail')"}
|
||||
)
|
||||
webhook = self.env["cx.tower.webhook"].create(
|
||||
{
|
||||
"name": "Web with bad auth",
|
||||
"endpoint": "bad_auth",
|
||||
"method": "post",
|
||||
"authenticator_id": bad_auth.id,
|
||||
"code": "result = {'exit_code': 0, 'message': 'Should not run'}",
|
||||
}
|
||||
)
|
||||
response = self.url_open(
|
||||
self.url_for(webhook.endpoint),
|
||||
data=json.dumps({"x": 1}),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertIn(b"auth fail", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", webhook.id)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="skipped",
|
||||
authentication_status="failed",
|
||||
http_status=403,
|
||||
endpoint=webhook.endpoint,
|
||||
)
|
||||
self.assertIn("auth fail", log.error_message)
|
||||
|
||||
def test_post_webhook_json_content_type(self):
|
||||
"""
|
||||
Test POST request with content_type json.
|
||||
"""
|
||||
self.webhook_post.content_type = "json"
|
||||
self.webhook_post.code = "result = {'exit_code': 0, 'message': 'POST JSON ok'}"
|
||||
|
||||
data = json.dumps({"json_test": "ok"})
|
||||
response = self.url_open(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
data=data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertIn(b"POST JSON ok", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", self.webhook_post.id)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="success",
|
||||
authentication_status="success",
|
||||
http_status=200,
|
||||
endpoint=self.webhook_post.endpoint,
|
||||
request_payload=data,
|
||||
)
|
||||
|
||||
def test_post_webhook_form_content_type(self):
|
||||
"""
|
||||
Test POST request with content_type form.
|
||||
"""
|
||||
self.webhook_post.content_type = "form"
|
||||
self.webhook_post.code = "result = {'exit_code': 0, 'message': 'POST FORM ok'}"
|
||||
|
||||
data = {"form_field": "ok"}
|
||||
response = self.url_open(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
data=data,
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertIn(b"POST FORM ok", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", self.webhook_post.id)])
|
||||
self.assertIn("form_field", log.request_payload)
|
||||
|
||||
def test_authenticator_ipv4_and_ipv6(self):
|
||||
"""
|
||||
Test IP filter for IPv4, IPv6, and networks
|
||||
by monkeypatching REMOTE_ADDR in environ.
|
||||
"""
|
||||
auth = self.env["cx.tower.webhook.authenticator"].create(
|
||||
{
|
||||
"name": "IP Test",
|
||||
"allowed_ip_addresses": "203.0.113.5,2001:db8::42,198.51.100.0/24,2001:db8:abcd::/48", # noqa: E501
|
||||
"code": "result = {'allowed': True}",
|
||||
}
|
||||
)
|
||||
webhook = self.env["cx.tower.webhook"].create(
|
||||
{
|
||||
"name": "IP Webhook",
|
||||
"endpoint": "webhook_iptest",
|
||||
"method": "post",
|
||||
"authenticator_id": auth.id,
|
||||
"code": "result = {'exit_code': 0, 'message': 'IP OK'}",
|
||||
}
|
||||
)
|
||||
|
||||
data = json.dumps({"ip": "test"})
|
||||
|
||||
def do_req(ip):
|
||||
# Patch _get_remote_addr to simulate requests coming
|
||||
# from different IP addresses
|
||||
with patch(
|
||||
"odoo.addons.cetmix_tower_webhook.controllers.main.CetmixTowerWebhookController._get_remote_addr",
|
||||
return_value=ip,
|
||||
):
|
||||
return self.url_open(
|
||||
self.url_for(webhook.endpoint),
|
||||
data=data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
|
||||
# IPv4 address allowed
|
||||
resp = do_req("203.0.113.5")
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
self.assertIn(b"IP OK", resp.content)
|
||||
|
||||
# IPv6 address allowed
|
||||
resp = do_req("2001:db8::42")
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
self.assertIn(b"IP OK", resp.content)
|
||||
|
||||
# IPv4 network allowed
|
||||
resp = do_req("198.51.100.99")
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
self.assertIn(b"IP OK", resp.content)
|
||||
|
||||
# IPv6 network allowed
|
||||
resp = do_req("2001:db8:abcd::abcd")
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
self.assertIn(b"IP OK", resp.content)
|
||||
|
||||
# Denied IPv4 address
|
||||
resp = do_req("203.0.113.99")
|
||||
self.assertEqual(resp.status_code, 403)
|
||||
self.assertIn(b"Address not allowed", resp.content)
|
||||
|
||||
# Denied IPv6 address
|
||||
resp = do_req("2001:db8:ffff::1")
|
||||
self.assertEqual(resp.status_code, 403)
|
||||
self.assertIn(b"Address not allowed", resp.content)
|
||||
|
||||
def _make_proxy_webhook(
|
||||
self,
|
||||
allowed,
|
||||
trusted=None,
|
||||
code="result = {'exit_code': 0, 'message': 'OK via proxy'}",
|
||||
):
|
||||
"""
|
||||
Helper to create a webhook with a dedicated authenticator configured
|
||||
for proxy-aware tests.
|
||||
"""
|
||||
auth = self.env["cx.tower.webhook.authenticator"].create(
|
||||
{
|
||||
"name": "Proxy Aware",
|
||||
"allowed_ip_addresses": allowed,
|
||||
"trusted_proxy_ips": trusted or "",
|
||||
"code": "result = {'allowed': True}",
|
||||
}
|
||||
)
|
||||
wh = self.env["cx.tower.webhook"].create(
|
||||
{
|
||||
"name": "Proxy Webhook",
|
||||
"endpoint": "proxy_webhook",
|
||||
"method": "post",
|
||||
"authenticator_id": auth.id,
|
||||
"code": code,
|
||||
}
|
||||
)
|
||||
return wh, auth
|
||||
|
||||
def test_proxy_headers_ignored_without_trusted_proxy(self):
|
||||
"""
|
||||
When trusted_proxy_ips is empty, XFF/X-Real-IP must be ignored.
|
||||
We fallback to immediate peer (proxy IP), which is not allowed -> 403.
|
||||
"""
|
||||
# Allow only the real client network, not the proxy itself
|
||||
webhook, _auth = self._make_proxy_webhook(
|
||||
allowed="203.0.113.0/24", trusted=None
|
||||
)
|
||||
|
||||
data = json.dumps({"k": "v"})
|
||||
proxy_ip = "10.0.0.5" # immediate peer (undocumented as trusted)
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"X-Forwarded-For": "203.0.113.7, 10.0.0.5", # should be ignored
|
||||
"X-Real-IP": "203.0.113.7", # should be ignored
|
||||
}
|
||||
with patch(
|
||||
"odoo.addons.cetmix_tower_webhook.controllers.main.CetmixTowerWebhookController._get_remote_addr",
|
||||
return_value=proxy_ip,
|
||||
):
|
||||
resp = self.url_open(
|
||||
self.url_for(webhook.endpoint), data=data, headers=headers
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, 403)
|
||||
self.assertIn(b"Address not allowed", resp.content)
|
||||
|
||||
def test_proxy_xff_honored_with_trusted_proxy(self):
|
||||
"""
|
||||
With trusted proxy configured, take the left-most IP from X-Forwarded-For.
|
||||
"""
|
||||
webhook, _auth = self._make_proxy_webhook(
|
||||
allowed="203.0.113.0/24",
|
||||
trusted="10.0.0.5",
|
||||
code="result = {'exit_code': 0, 'message': 'OK XFF'}",
|
||||
)
|
||||
|
||||
data = json.dumps({"k": "v"})
|
||||
proxy_ip = "10.0.0.5"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
# XFF list: client, proxy
|
||||
"X-Forwarded-For": "203.0.113.7, 10.0.0.5",
|
||||
}
|
||||
with patch(
|
||||
"odoo.addons.cetmix_tower_webhook.controllers.main.CetmixTowerWebhookController._get_remote_addr",
|
||||
return_value=proxy_ip,
|
||||
):
|
||||
resp = self.url_open(
|
||||
self.url_for(webhook.endpoint), data=data, headers=headers
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
self.assertIn(b"OK XFF", resp.content)
|
||||
|
||||
def test_proxy_x_real_ip_fallback_when_xff_missing(self):
|
||||
"""
|
||||
If XFF is missing/invalid but trusted proxy is set, fall back to X-Real-IP.
|
||||
"""
|
||||
webhook, _auth = self._make_proxy_webhook(
|
||||
allowed="203.0.113.0/24",
|
||||
trusted="10.0.0.5",
|
||||
code="result = {'exit_code': 0, 'message': 'OK X-Real-IP'}",
|
||||
)
|
||||
|
||||
data = json.dumps({"k": "v"})
|
||||
proxy_ip = "10.0.0.5"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"X-Forwarded-For": "garbage, not_an_ip", # invalids should be skipped
|
||||
"X-Real-IP": "203.0.113.8",
|
||||
}
|
||||
with patch(
|
||||
"odoo.addons.cetmix_tower_webhook.controllers.main.CetmixTowerWebhookController._get_remote_addr",
|
||||
return_value=proxy_ip,
|
||||
):
|
||||
resp = self.url_open(
|
||||
self.url_for(webhook.endpoint), data=data, headers=headers
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
self.assertIn(b"OK X-Real-IP", resp.content)
|
||||
|
||||
def test_proxy_invalid_headers_fall_back_to_immediate_peer(self):
|
||||
"""
|
||||
If headers are invalid even with trusted proxy, fall back to immediate peer.
|
||||
Since the proxy IP is not in allowlist, the request is denied.
|
||||
"""
|
||||
webhook, _auth = self._make_proxy_webhook(
|
||||
allowed="203.0.113.0/24", # does NOT include proxy IP
|
||||
trusted="10.0.0.5",
|
||||
)
|
||||
|
||||
data = json.dumps({"k": "v"})
|
||||
proxy_ip = "10.0.0.5"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"X-Forwarded-For": "not_an_ip, also_bad",
|
||||
"X-Real-IP": "bad_ip_value",
|
||||
}
|
||||
with patch(
|
||||
"odoo.addons.cetmix_tower_webhook.controllers.main.CetmixTowerWebhookController._get_remote_addr",
|
||||
return_value=proxy_ip,
|
||||
):
|
||||
resp = self.url_open(
|
||||
self.url_for(webhook.endpoint), data=data, headers=headers
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, 403)
|
||||
self.assertIn(b"Address not allowed", resp.content)
|
||||
|
||||
def test_proxy_allows_via_immediate_peer_when_proxy_ip_in_allowlist(self):
|
||||
"""
|
||||
If headers are ignored/invalid, but the proxy IP itself is allowed,
|
||||
access should be granted based on immediate peer.
|
||||
"""
|
||||
webhook, _auth = self._make_proxy_webhook(
|
||||
allowed="10.0.0.5", # allow the proxy itself
|
||||
trusted="", # no trusted proxies => headers ignored
|
||||
code="result = {'exit_code': 0, 'message': 'OK immediate peer'}",
|
||||
)
|
||||
|
||||
data = json.dumps({"k": "v"})
|
||||
proxy_ip = "10.0.0.5"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"X-Forwarded-For": "203.0.113.7", # should be ignored
|
||||
}
|
||||
with patch(
|
||||
"odoo.addons.cetmix_tower_webhook.controllers.main.CetmixTowerWebhookController._get_remote_addr",
|
||||
return_value=proxy_ip,
|
||||
):
|
||||
resp = self.url_open(
|
||||
self.url_for(webhook.endpoint), data=data, headers=headers
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
self.assertIn(b"OK immediate peer", resp.content)
|
||||
Reference in New Issue
Block a user