Tower: upload cetmix_tower_webhook 18.0.1.0.1 (was 18.0.1.0.1, via marketplace)
This commit is contained in:
7
addons/cetmix_tower_webhook/tests/__init__.py
Normal file
7
addons/cetmix_tower_webhook/tests/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
# Copyright (C) 2025 Cetmix OÜ
|
||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
||||
|
||||
from . import test_cx_tower_webhook_authenticator
|
||||
from . import test_cx_tower_webhook_log
|
||||
from . import test_cx_tower_webhook
|
||||
from . import test_webhook_controller
|
||||
38
addons/cetmix_tower_webhook/tests/common.py
Normal file
38
addons/cetmix_tower_webhook/tests/common.py
Normal file
@@ -0,0 +1,38 @@
|
||||
# Copyright (C) 2025 Cetmix OÜ
|
||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
||||
|
||||
from odoo.tests import TransactionCase
|
||||
|
||||
|
||||
class CetmixTowerWebhookCommon(TransactionCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
# Set base url for correct link generation
|
||||
self.web_base_url = "https://example.com"
|
||||
self.env["ir.config_parameter"].sudo().set_param(
|
||||
"web.base.url", self.web_base_url
|
||||
)
|
||||
|
||||
# Create simple authenticator that allows all requests
|
||||
self.WebhookAuthenticator = self.env["cx.tower.webhook.authenticator"]
|
||||
self.simple_authenticator = self.WebhookAuthenticator.create(
|
||||
{
|
||||
"name": "Simple Authenticator",
|
||||
"code": "result = {'allowed': True, 'message': 'OK'}",
|
||||
}
|
||||
)
|
||||
|
||||
# Create Simple Webhook
|
||||
self.Webhook = self.env["cx.tower.webhook"]
|
||||
self.simple_webhook = self.Webhook.create(
|
||||
{
|
||||
"name": "Simple Webhook",
|
||||
"endpoint": "simple_webhook",
|
||||
"code": "result = {'exit_code': 0, 'message': 'OK'}",
|
||||
"authenticator_id": self.simple_authenticator.id,
|
||||
}
|
||||
)
|
||||
|
||||
# Log model
|
||||
self.Log = self.env["cx.tower.webhook.log"]
|
||||
154
addons/cetmix_tower_webhook/tests/test_cx_tower_webhook.py
Normal file
154
addons/cetmix_tower_webhook/tests/test_cx_tower_webhook.py
Normal file
@@ -0,0 +1,154 @@
|
||||
# Copyright (C) 2025 Cetmix OÜ
|
||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
||||
|
||||
from odoo.exceptions import ValidationError
|
||||
|
||||
from .common import CetmixTowerWebhookCommon
|
||||
|
||||
|
||||
class TestCetmixTowerWebhook(CetmixTowerWebhookCommon):
|
||||
def test_simple_webhook_success(self):
|
||||
"""
|
||||
Test that webhook is successful
|
||||
"""
|
||||
result = self.simple_webhook.execute(
|
||||
headers={}, payload={}, raw_data="", raise_on_error=False
|
||||
)
|
||||
self.assertEqual(result["exit_code"], 0)
|
||||
|
||||
def test_simple_webhook_without_optional_params(self):
|
||||
"""
|
||||
Test that webhook is successful without optional params
|
||||
"""
|
||||
result = self.simple_webhook.execute(raise_on_error=False)
|
||||
self.assertEqual(result["exit_code"], 0)
|
||||
|
||||
def test_webhook_code_custom_message(self):
|
||||
"""
|
||||
Test that custom message is returned from webhook code
|
||||
"""
|
||||
self.simple_webhook.write(
|
||||
{"code": "result = {'exit_code': 0, 'message': 'Webhook OK!'}"}
|
||||
)
|
||||
result = self.simple_webhook.execute(raise_on_error=False)
|
||||
self.assertEqual(result["exit_code"], 0)
|
||||
self.assertEqual(result["message"], "Webhook OK!")
|
||||
|
||||
def test_webhook_code_failure(self):
|
||||
"""
|
||||
Test that webhook returns error when code sets exit_code != 0
|
||||
"""
|
||||
self.simple_webhook.write(
|
||||
{"code": "result = {'exit_code': 42, 'message': 'Error occurred'}"}
|
||||
)
|
||||
result = self.simple_webhook.execute(raise_on_error=False)
|
||||
self.assertEqual(result["exit_code"], 42)
|
||||
self.assertEqual(result["message"], "Error occurred")
|
||||
|
||||
def test_webhook_code_raises_exception(self):
|
||||
"""
|
||||
Test that exception in webhook code is handled and returns exit_code 1
|
||||
"""
|
||||
self.simple_webhook.write({"code": "raise Exception('Webhook boom!')"})
|
||||
result = self.simple_webhook.execute(raise_on_error=False)
|
||||
self.assertEqual(result["exit_code"], 1)
|
||||
self.assertIn("Webhook boom!", result["message"])
|
||||
|
||||
def test_webhook_code_returns_non_dict(self):
|
||||
"""
|
||||
Test that webhook fails gracefully if code returns non-dict
|
||||
"""
|
||||
self.simple_webhook.write({"code": "result = 'not a dict'"})
|
||||
result = self.simple_webhook.execute(raise_on_error=False)
|
||||
self.assertEqual(result["exit_code"], 1)
|
||||
self.assertEqual(
|
||||
result["message"], "Webhook/Authenticator code error: result is not a dict"
|
||||
)
|
||||
|
||||
def test_webhook_execute_raises_exception(self):
|
||||
"""
|
||||
Test that webhook raises ValidationError if raise_on_error is True
|
||||
"""
|
||||
self.simple_webhook.write({"code": "raise Exception('Validation failed!')"})
|
||||
with self.assertRaises(ValidationError):
|
||||
self.simple_webhook.execute(raise_on_error=True)
|
||||
|
||||
def test_webhook_execute_with_payload(self):
|
||||
"""
|
||||
Test that webhook receives and processes payload correctly
|
||||
"""
|
||||
self.simple_webhook.write(
|
||||
{
|
||||
"code": "result = {'exit_code': 0, 'message': str(payload.get('key', 'none'))}" # noqa: E501
|
||||
}
|
||||
)
|
||||
payload = {"key": "value123"}
|
||||
result = self.simple_webhook.execute(payload=payload, raise_on_error=False)
|
||||
self.assertEqual(result["exit_code"], 0)
|
||||
self.assertEqual(result["message"], "value123")
|
||||
|
||||
def test_webhook_execute_with_user(self):
|
||||
"""
|
||||
Test that webhook executes as specified user
|
||||
"""
|
||||
test_user = self.env.ref("base.user_demo")
|
||||
self.simple_webhook.user_id = test_user
|
||||
self.simple_webhook.write(
|
||||
{"code": "result = {'exit_code': 0, 'message': user.login}"}
|
||||
)
|
||||
result = self.simple_webhook.execute(raise_on_error=False)
|
||||
self.assertEqual(result["message"], test_user.login)
|
||||
|
||||
def test_webhook_context_isolation(self):
|
||||
"""
|
||||
Test that only payload is available in eval context;
|
||||
extra kwargs are not accessible
|
||||
"""
|
||||
self.simple_webhook.write(
|
||||
{
|
||||
"code": (
|
||||
"fail = []\n"
|
||||
"for var in ['headers', 'raw_data', 'custom_param']:\n"
|
||||
" try:\n"
|
||||
" _ = eval(var)\n"
|
||||
" fail.append(var)\n"
|
||||
" except Exception:\n"
|
||||
" pass\n"
|
||||
"if fail:\n"
|
||||
" result = {'exit_code': 99, 'message': 'Leaked vars: ' + ','.join(fail)}\n" # noqa: E501
|
||||
"else:\n"
|
||||
" result = {'exit_code': 0, 'message': 'Context clean'}\n"
|
||||
)
|
||||
}
|
||||
)
|
||||
result = self.simple_webhook.execute(
|
||||
payload={"key": "val"},
|
||||
headers={"x": "y"},
|
||||
raw_data="boom",
|
||||
custom_param="xxx",
|
||||
raise_on_error=False,
|
||||
)
|
||||
self.assertEqual(result["exit_code"], 0, result["message"])
|
||||
self.assertIn("Context clean", result["message"])
|
||||
|
||||
def test_webhook_execute_runs_as_user_id(self):
|
||||
"""
|
||||
Test that the webhook code is always executed as the specified user_id,
|
||||
regardless of the caller's user context or extra kwargs.
|
||||
"""
|
||||
# set specific user
|
||||
test_user = self.env.ref("base.user_demo")
|
||||
self.simple_webhook.user_id = test_user
|
||||
self.simple_webhook.write(
|
||||
{"code": "result = {'exit_code': 0, 'message': user.login}"}
|
||||
)
|
||||
|
||||
# run execute() with another user and try to pass user_id via kwargs
|
||||
other_user = self.env.ref("base.user_admin")
|
||||
result = self.simple_webhook.with_user(other_user).execute(
|
||||
payload={},
|
||||
user_id=self.env.ref("base.user_root").id, # try to pass own user_id
|
||||
raise_on_error=False,
|
||||
)
|
||||
# the result should be from user_demo anyway
|
||||
self.assertEqual(result["message"], test_user.login)
|
||||
@@ -0,0 +1,143 @@
|
||||
# Copyright (C) 2025 Cetmix OÜ
|
||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
||||
|
||||
from odoo.exceptions import ValidationError
|
||||
|
||||
from .common import CetmixTowerWebhookCommon
|
||||
|
||||
|
||||
class TestCetmixTowerWebhookAuthenticator(CetmixTowerWebhookCommon):
|
||||
def test_simple_authentication_success(self):
|
||||
"""
|
||||
Test that authentication is successful
|
||||
"""
|
||||
# check that authentication is successful for authenticator
|
||||
# that allows all requests
|
||||
result = self.simple_authenticator.authenticate(
|
||||
headers={}, payload={}, raw_data=""
|
||||
)
|
||||
self.assertTrue(result["allowed"])
|
||||
|
||||
def test_simple_authentication_without_optional_params(self):
|
||||
"""
|
||||
Test that authentication is successful without optional params
|
||||
"""
|
||||
result = self.simple_authenticator.authenticate()
|
||||
self.assertTrue(result["allowed"])
|
||||
|
||||
def test_token_authentication_success(self):
|
||||
"""
|
||||
Test that authentication is successful for authenticator that allows requests
|
||||
with specific token in header
|
||||
"""
|
||||
auth_token_header = "X-Token"
|
||||
auth_token = "secret123"
|
||||
code = f"result = {{'allowed': headers.get('{auth_token_header}') == '{auth_token}'}}" # noqa: E501
|
||||
self.simple_authenticator.code = code
|
||||
result = self.simple_authenticator.authenticate(
|
||||
headers={auth_token_header: auth_token}
|
||||
)
|
||||
self.assertTrue(result["allowed"])
|
||||
|
||||
def test_token_authentication_failure(self):
|
||||
"""
|
||||
Test that authentication is failed for authenticator that allows
|
||||
requests with specific token in header
|
||||
"""
|
||||
auth_token_header = "X-Token"
|
||||
auth_token = "secret123"
|
||||
code = f"result = {{'allowed': headers.get('{auth_token_header}') == '{auth_token}'}}" # noqa: E501
|
||||
self.simple_authenticator.code = code
|
||||
result = self.simple_authenticator.authenticate(
|
||||
headers={auth_token_header: "wrong_token"}, raise_on_error=False
|
||||
)
|
||||
self.assertFalse(result["allowed"])
|
||||
|
||||
def test_token_authentication_failure_without_optional_params(self):
|
||||
"""
|
||||
Test that authentication is failed without optional params
|
||||
"""
|
||||
auth_token_header = "X-Token"
|
||||
auth_token = "secret123"
|
||||
code = f"result = {{'allowed': headers.get('{auth_token_header}') == '{auth_token}'}}" # noqa: E501
|
||||
self.simple_authenticator.code = code
|
||||
result = self.simple_authenticator.authenticate(raise_on_error=False)
|
||||
self.assertFalse(result["allowed"])
|
||||
self.assertEqual(result["http_code"], 500)
|
||||
self.assertIn("object has no attribute 'get'", result["message"])
|
||||
|
||||
def test_authentication_code_error(self):
|
||||
"""
|
||||
Test that authentication is failed with invalid code
|
||||
"""
|
||||
self.simple_authenticator.code = "1/0"
|
||||
result = self.simple_authenticator.authenticate(raise_on_error=False)
|
||||
self.assertFalse(result["allowed"])
|
||||
self.assertEqual(result["http_code"], 500)
|
||||
self.assertEqual(result["message"], "division by zero")
|
||||
|
||||
# test with raise_on_error=True
|
||||
with self.assertRaises(ValidationError) as e:
|
||||
self.simple_authenticator.authenticate()
|
||||
self.assertEqual(
|
||||
str(e.exception), "Authentication code error: division by zero"
|
||||
)
|
||||
|
||||
def test_authenticator_custom_http_code_and_message(self):
|
||||
"""
|
||||
Test that custom http_code and message returned from code are respected
|
||||
"""
|
||||
message = "I am a teapot!"
|
||||
self.simple_authenticator.code = (
|
||||
f"result = {{'allowed': False, 'http_code': 418, 'message': '{message}'}}"
|
||||
)
|
||||
result = self.simple_authenticator.authenticate(headers={})
|
||||
self.assertFalse(result["allowed"])
|
||||
self.assertEqual(result.get("http_code"), 418)
|
||||
self.assertEqual(result.get("message"), message)
|
||||
|
||||
def test_authenticator_returns_non_dict(self):
|
||||
"""
|
||||
Test that authentication fails if code returns non-dict result
|
||||
"""
|
||||
self.simple_authenticator.write({"code": "result = 'not a dict'"})
|
||||
result = self.simple_authenticator.authenticate(
|
||||
headers={}, raise_on_error=False
|
||||
)
|
||||
self.assertFalse(result["allowed"])
|
||||
self.assertEqual(result["http_code"], 500)
|
||||
self.assertIn("result is not a dict", result["message"])
|
||||
|
||||
def test_authentication_with_raw_data(self):
|
||||
"""
|
||||
Test that authentication works with raw_data and without headers
|
||||
"""
|
||||
self.simple_authenticator.write(
|
||||
{"code": "result = {'allowed': raw_data == 'magic'}"}
|
||||
)
|
||||
result = self.simple_authenticator.authenticate(raw_data="magic")
|
||||
self.assertTrue(result["allowed"])
|
||||
result = self.simple_authenticator.authenticate(raw_data="not_magic")
|
||||
self.assertFalse(result["allowed"])
|
||||
|
||||
def test_authentication_code_exception(self):
|
||||
"""
|
||||
Test that authentication code exception is captured in result['message']
|
||||
"""
|
||||
self.simple_authenticator.write({"code": "raise Exception('custom failure')"})
|
||||
result = self.simple_authenticator.authenticate(
|
||||
headers={}, raise_on_error=False
|
||||
)
|
||||
self.assertFalse(result["allowed"])
|
||||
self.assertEqual(result["http_code"], 500)
|
||||
self.assertIn("custom failure", result["message"])
|
||||
|
||||
def test_authentication_minimal_false(self):
|
||||
"""
|
||||
Test minimal code with only allowed: False
|
||||
"""
|
||||
self.simple_authenticator.write({"code": "result = {'allowed': False}"})
|
||||
result = self.simple_authenticator.authenticate(headers={})
|
||||
self.assertFalse(result["allowed"])
|
||||
self.assertIsNone(result.get("http_code"))
|
||||
self.assertIsNone(result.get("message"))
|
||||
@@ -0,0 +1,68 @@
|
||||
# Copyright (C) 2025 Cetmix OÜ
|
||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
||||
|
||||
import json
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from .common import CetmixTowerWebhookCommon
|
||||
|
||||
|
||||
class TestCetmixTowerWebhookLog(CetmixTowerWebhookCommon):
|
||||
def test_create_log_from_call(self):
|
||||
"""Test creating a log entry via create_from_call()."""
|
||||
vals = {
|
||||
"result_message": "Manual log",
|
||||
"http_status": 201,
|
||||
"authentication_status": "success",
|
||||
"code_status": "success",
|
||||
"request_payload": json.dumps({"foo": "bar"}),
|
||||
"request_headers": json.dumps({"X-Test": "test"}),
|
||||
"webhook_id": self.simple_webhook.id,
|
||||
}
|
||||
log = self.Log.create_from_call(webhook=self.simple_webhook, **vals)
|
||||
self.assertEqual(log.webhook_id, self.simple_webhook)
|
||||
self.assertEqual(log.result_message, "Manual log")
|
||||
self.assertEqual(log.http_status, 201)
|
||||
self.assertEqual(log.authentication_status, "success")
|
||||
self.assertIn("foo", log.request_payload)
|
||||
self.assertIn("X-Test", log.request_headers)
|
||||
|
||||
def test_gc_delete_old_logs(self):
|
||||
"""Test auto-removal of old logs via _gc_delete_old_logs()."""
|
||||
# Create an "old" log
|
||||
old_log = self.Log.create_from_call(
|
||||
webhook=self.simple_webhook,
|
||||
authentication_status="success",
|
||||
code_status="success",
|
||||
http_status=200,
|
||||
)
|
||||
# Set create_date in the past (we cannot use write
|
||||
# because the create_date is MAGIC Field)
|
||||
past_date = (datetime.now() - timedelta(days=100)).strftime("%Y-%m-%d %H:%M:%S")
|
||||
self.env.cr.execute(
|
||||
"UPDATE cx_tower_webhook_log SET create_date = %s WHERE id = %s",
|
||||
(past_date, old_log.id),
|
||||
)
|
||||
self.env.invalidate_all()
|
||||
# Create a new log
|
||||
new_log = self.Log.create_from_call(
|
||||
webhook=self.simple_webhook,
|
||||
authentication_status="success",
|
||||
code_status="success",
|
||||
http_status=200,
|
||||
)
|
||||
# Set log duration to 30 days
|
||||
self.env["ir.config_parameter"].sudo().set_param(
|
||||
"cetmix_tower_webhook.webhook_log_duration", 30
|
||||
)
|
||||
# Enter test mode to run the autovacuum cron because
|
||||
# `_run_vacuum_cleaner` makes a commit
|
||||
self.registry.enter_test_mode(self.cr)
|
||||
self.addCleanup(self.registry.leave_test_mode)
|
||||
env = self.env(cr=self.registry.cursor())
|
||||
|
||||
# Run the autovacuum cron
|
||||
env.ref("base.autovacuum_job").method_direct_trigger()
|
||||
|
||||
self.assertFalse(self.Log.browse(old_log.id).exists())
|
||||
self.assertTrue(self.Log.browse(new_log.id).exists())
|
||||
608
addons/cetmix_tower_webhook/tests/test_webhook_controller.py
Normal file
608
addons/cetmix_tower_webhook/tests/test_webhook_controller.py
Normal file
@@ -0,0 +1,608 @@
|
||||
# Copyright (C) 2025 Cetmix OÜ
|
||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
||||
|
||||
import json
|
||||
from unittest.mock import patch
|
||||
|
||||
from odoo.tests import HttpCase, tagged
|
||||
|
||||
|
||||
@tagged("-at_install", "post_install")
|
||||
class TestCxTowerWebhookController(HttpCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
env = cls.env
|
||||
# Authenticator that always allows requests
|
||||
cls.authenticator = env["cx.tower.webhook.authenticator"].create(
|
||||
{"name": "Always OK", "code": "result = {'allowed': True}"}
|
||||
)
|
||||
# POST webhook
|
||||
cls.webhook_post = env["cx.tower.webhook"].create(
|
||||
{
|
||||
"name": "Test Webhook POST",
|
||||
"endpoint": "webhook_post",
|
||||
"method": "post",
|
||||
"authenticator_id": cls.authenticator.id,
|
||||
"code": "result = {'exit_code': 0, 'message': 'POST ok'}",
|
||||
}
|
||||
)
|
||||
# GET webhook
|
||||
cls.webhook_get = env["cx.tower.webhook"].create(
|
||||
{
|
||||
"name": "Test Webhook GET",
|
||||
"endpoint": "webhook_get",
|
||||
"method": "get",
|
||||
"authenticator_id": cls.authenticator.id,
|
||||
"code": "result = {'exit_code': 0, 'message': 'GET ok'}",
|
||||
}
|
||||
)
|
||||
# Log model
|
||||
cls.Log = env["cx.tower.webhook.log"]
|
||||
|
||||
def url_for(self, endpoint):
|
||||
"""Helper to build webhook url"""
|
||||
url = f"/cetmix_tower_webhooks/{endpoint}"
|
||||
return self.base_url() + url
|
||||
|
||||
def assert_log(self, log=None, request_payload=None, **expected):
|
||||
"""
|
||||
Universal log checker for webhook log model.
|
||||
Checks expected field values and substrings.
|
||||
"""
|
||||
self.assertIsNotNone(log, "Log record was not created")
|
||||
if request_payload is not None:
|
||||
try:
|
||||
log_payload = log.request_payload
|
||||
# try to convert both to Python dict for comparison
|
||||
if isinstance(log_payload, str):
|
||||
log_payload = log_payload.strip()
|
||||
self.assertDictEqual(
|
||||
json.loads(
|
||||
log_payload.replace("'", '"')
|
||||
), # try to make JSON from possible str(dict)
|
||||
json.loads(request_payload),
|
||||
)
|
||||
except Exception as ex:
|
||||
self.fail(
|
||||
f"Payload comparison failed: {ex}\nLog: {log.request_payload}\nExpected: {request_payload}" # noqa: E501
|
||||
)
|
||||
for field, value in expected.items():
|
||||
if field == "request_payload":
|
||||
continue # Already checked
|
||||
actual = getattr(log, field)
|
||||
self.assertEqual(actual, value, f"{field}: expected {value}, got {actual}")
|
||||
|
||||
def test_post_webhook_success(self):
|
||||
"""Success test for POST request with correct payload."""
|
||||
data = json.dumps({"some": "data"})
|
||||
response = self.url_open(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
data=data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertIn(b"POST ok", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", self.webhook_post.id)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="success",
|
||||
authentication_status="success",
|
||||
http_status=200,
|
||||
endpoint=self.webhook_post.endpoint,
|
||||
request_payload=data,
|
||||
)
|
||||
|
||||
def test_get_webhook_success(self):
|
||||
"""Success test for GET request with correct payload."""
|
||||
response = self.url_open(
|
||||
f"{self.url_for(self.webhook_get.endpoint)}?foo=bar",
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertIn(b"GET ok", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", self.webhook_get.id)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="success",
|
||||
authentication_status="success",
|
||||
http_status=200,
|
||||
endpoint=self.webhook_get.endpoint,
|
||||
)
|
||||
self.assertIn("foo", log.request_payload)
|
||||
|
||||
def test_webhook_not_found(self):
|
||||
"""Test request to a non-existing webhook endpoint."""
|
||||
data = json.dumps({"test": 1})
|
||||
response = self.url_open(
|
||||
self.url_for("missing"),
|
||||
data=data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 404)
|
||||
self.assertIn(b"Webhook not found", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", False)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="skipped",
|
||||
authentication_status="failed",
|
||||
http_status=404,
|
||||
endpoint="missing",
|
||||
error_message="Webhook not found",
|
||||
request_payload=data,
|
||||
)
|
||||
|
||||
def test_wrong_method(self):
|
||||
"""
|
||||
Test GET request to POST-only webhook.
|
||||
"""
|
||||
response = self.url_open(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
)
|
||||
self.assertEqual(response.status_code, 404)
|
||||
self.assertIn(b"Webhook not found", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", False)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="skipped",
|
||||
authentication_status="failed",
|
||||
http_status=404,
|
||||
error_message="Webhook not found",
|
||||
endpoint=self.webhook_post.endpoint,
|
||||
request_method="get",
|
||||
)
|
||||
|
||||
def test_missing_payload_post(self):
|
||||
"""
|
||||
Test POST request with empty payload.
|
||||
"""
|
||||
# use opener instead of url_open to avoid checking of data
|
||||
response = self.opener.post(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
timeout=1200000,
|
||||
headers={"Content-Type": "application/json"},
|
||||
allow_redirects=True,
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertIn(b"POST ok", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", self.webhook_post.id)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="success",
|
||||
authentication_status="success",
|
||||
http_status=200,
|
||||
endpoint=self.webhook_post.endpoint,
|
||||
request_payload="{}",
|
||||
)
|
||||
|
||||
def test_authentication_failed(self):
|
||||
"""
|
||||
Test POST request with authenticator that always denies.
|
||||
"""
|
||||
bad_auth = self.env["cx.tower.webhook.authenticator"].create(
|
||||
{
|
||||
"name": "Never OK",
|
||||
"code": "result = {'allowed': False, 'custom_message': 'Forbidden'}",
|
||||
}
|
||||
)
|
||||
webhook = self.env["cx.tower.webhook"].create(
|
||||
{
|
||||
"name": "Forbidden Webhook",
|
||||
"endpoint": "forbidden",
|
||||
"method": "post",
|
||||
"authenticator_id": bad_auth.id,
|
||||
"code": "result = {'exit_code': 0, 'message': 'Should not run'}",
|
||||
}
|
||||
)
|
||||
data = json.dumps({"fail": 1})
|
||||
response = self.url_open(
|
||||
self.url_for(webhook.endpoint),
|
||||
data=data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertIn(b"Authentication not allowed", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", webhook.id)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="skipped",
|
||||
authentication_status="failed",
|
||||
http_status=403,
|
||||
endpoint=webhook.endpoint,
|
||||
request_payload=data,
|
||||
)
|
||||
|
||||
def test_webhook_code_failure(self):
|
||||
"""
|
||||
Test POST request to a webhook that raises an exception in code.
|
||||
"""
|
||||
self.webhook_post.code = "raise Exception('Some error!')"
|
||||
response = self.url_open(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
data=json.dumps({}),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn(b"Some error!", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", self.webhook_post.id)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="failed",
|
||||
authentication_status="success",
|
||||
http_status=500,
|
||||
endpoint=self.webhook_post.endpoint,
|
||||
request_payload="{}",
|
||||
)
|
||||
self.assertIn("Some error!", log.error_message)
|
||||
|
||||
def test_json_headers_are_stored(self):
|
||||
"""
|
||||
Test that request headers and payload are saved in webhook log record.
|
||||
"""
|
||||
payload = {"foo": "bar"}
|
||||
headers = {"X-Test-Header": "xxx", "Content-Type": "application/json"}
|
||||
response = self.url_open(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
data=json.dumps(payload),
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", self.webhook_post.id)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="success",
|
||||
authentication_status="success",
|
||||
http_status=200,
|
||||
endpoint=self.webhook_post.endpoint,
|
||||
)
|
||||
self.assertIn("foo", log.request_payload)
|
||||
self.assertIn("X-Test-Header", log.request_headers)
|
||||
self.assertIn(log.result_message, response.text)
|
||||
|
||||
def test_log_contains_ip(self):
|
||||
"""
|
||||
Test that the log contains the client's IP address and country (if available).
|
||||
"""
|
||||
payload = {"check": "ip"}
|
||||
self.url_open(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
data=json.dumps(payload),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", self.webhook_post.id)])
|
||||
self.assertTrue(log.ip_address)
|
||||
|
||||
def test_inactive_webhook(self):
|
||||
"""Test that inactive webhooks are not callable."""
|
||||
self.webhook_post.active = False
|
||||
response = self.url_open(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
data=json.dumps({"a": 1}),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 404)
|
||||
self.assertIn(b"Webhook not found", response.content)
|
||||
|
||||
def test_authenticator_code_raises(self):
|
||||
"""
|
||||
Test that if authenticator's code raises an error,
|
||||
proper log is created and 403 returned.
|
||||
"""
|
||||
bad_auth = self.env["cx.tower.webhook.authenticator"].create(
|
||||
{"name": "Broken Auth", "code": "raise Exception('auth fail')"}
|
||||
)
|
||||
webhook = self.env["cx.tower.webhook"].create(
|
||||
{
|
||||
"name": "Web with bad auth",
|
||||
"endpoint": "bad_auth",
|
||||
"method": "post",
|
||||
"authenticator_id": bad_auth.id,
|
||||
"code": "result = {'exit_code': 0, 'message': 'Should not run'}",
|
||||
}
|
||||
)
|
||||
response = self.url_open(
|
||||
self.url_for(webhook.endpoint),
|
||||
data=json.dumps({"x": 1}),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 403)
|
||||
self.assertIn(b"auth fail", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", webhook.id)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="skipped",
|
||||
authentication_status="failed",
|
||||
http_status=403,
|
||||
endpoint=webhook.endpoint,
|
||||
)
|
||||
self.assertIn("auth fail", log.error_message)
|
||||
|
||||
def test_post_webhook_json_content_type(self):
|
||||
"""
|
||||
Test POST request with content_type json.
|
||||
"""
|
||||
self.webhook_post.content_type = "json"
|
||||
self.webhook_post.code = "result = {'exit_code': 0, 'message': 'POST JSON ok'}"
|
||||
|
||||
data = json.dumps({"json_test": "ok"})
|
||||
response = self.url_open(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
data=data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertIn(b"POST JSON ok", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", self.webhook_post.id)])
|
||||
self.assert_log(
|
||||
log,
|
||||
code_status="success",
|
||||
authentication_status="success",
|
||||
http_status=200,
|
||||
endpoint=self.webhook_post.endpoint,
|
||||
request_payload=data,
|
||||
)
|
||||
|
||||
def test_post_webhook_form_content_type(self):
|
||||
"""
|
||||
Test POST request with content_type form.
|
||||
"""
|
||||
self.webhook_post.content_type = "form"
|
||||
self.webhook_post.code = "result = {'exit_code': 0, 'message': 'POST FORM ok'}"
|
||||
|
||||
data = {"form_field": "ok"}
|
||||
response = self.url_open(
|
||||
self.url_for(self.webhook_post.endpoint),
|
||||
data=data,
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertIn(b"POST FORM ok", response.content)
|
||||
|
||||
log = self.Log.search([("webhook_id", "=", self.webhook_post.id)])
|
||||
self.assertIn("form_field", log.request_payload)
|
||||
|
||||
def test_authenticator_ipv4_and_ipv6(self):
|
||||
"""
|
||||
Test IP filter for IPv4, IPv6, and networks
|
||||
by monkeypatching REMOTE_ADDR in environ.
|
||||
"""
|
||||
auth = self.env["cx.tower.webhook.authenticator"].create(
|
||||
{
|
||||
"name": "IP Test",
|
||||
"allowed_ip_addresses": "203.0.113.5,2001:db8::42,198.51.100.0/24,2001:db8:abcd::/48", # noqa: E501
|
||||
"code": "result = {'allowed': True}",
|
||||
}
|
||||
)
|
||||
webhook = self.env["cx.tower.webhook"].create(
|
||||
{
|
||||
"name": "IP Webhook",
|
||||
"endpoint": "webhook_iptest",
|
||||
"method": "post",
|
||||
"authenticator_id": auth.id,
|
||||
"code": "result = {'exit_code': 0, 'message': 'IP OK'}",
|
||||
}
|
||||
)
|
||||
|
||||
data = json.dumps({"ip": "test"})
|
||||
|
||||
def do_req(ip):
|
||||
# Patch _get_remote_addr to simulate requests coming
|
||||
# from different IP addresses
|
||||
with patch(
|
||||
"odoo.addons.cetmix_tower_webhook.controllers.main.CetmixTowerWebhookController._get_remote_addr",
|
||||
return_value=ip,
|
||||
):
|
||||
return self.url_open(
|
||||
self.url_for(webhook.endpoint),
|
||||
data=data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
|
||||
# IPv4 address allowed
|
||||
resp = do_req("203.0.113.5")
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
self.assertIn(b"IP OK", resp.content)
|
||||
|
||||
# IPv6 address allowed
|
||||
resp = do_req("2001:db8::42")
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
self.assertIn(b"IP OK", resp.content)
|
||||
|
||||
# IPv4 network allowed
|
||||
resp = do_req("198.51.100.99")
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
self.assertIn(b"IP OK", resp.content)
|
||||
|
||||
# IPv6 network allowed
|
||||
resp = do_req("2001:db8:abcd::abcd")
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
self.assertIn(b"IP OK", resp.content)
|
||||
|
||||
# Denied IPv4 address
|
||||
resp = do_req("203.0.113.99")
|
||||
self.assertEqual(resp.status_code, 403)
|
||||
self.assertIn(b"Address not allowed", resp.content)
|
||||
|
||||
# Denied IPv6 address
|
||||
resp = do_req("2001:db8:ffff::1")
|
||||
self.assertEqual(resp.status_code, 403)
|
||||
self.assertIn(b"Address not allowed", resp.content)
|
||||
|
||||
def _make_proxy_webhook(
|
||||
self,
|
||||
allowed,
|
||||
trusted=None,
|
||||
code="result = {'exit_code': 0, 'message': 'OK via proxy'}",
|
||||
):
|
||||
"""
|
||||
Helper to create a webhook with a dedicated authenticator configured
|
||||
for proxy-aware tests.
|
||||
"""
|
||||
auth = self.env["cx.tower.webhook.authenticator"].create(
|
||||
{
|
||||
"name": "Proxy Aware",
|
||||
"allowed_ip_addresses": allowed,
|
||||
"trusted_proxy_ips": trusted or "",
|
||||
"code": "result = {'allowed': True}",
|
||||
}
|
||||
)
|
||||
wh = self.env["cx.tower.webhook"].create(
|
||||
{
|
||||
"name": "Proxy Webhook",
|
||||
"endpoint": "proxy_webhook",
|
||||
"method": "post",
|
||||
"authenticator_id": auth.id,
|
||||
"code": code,
|
||||
}
|
||||
)
|
||||
return wh, auth
|
||||
|
||||
def test_proxy_headers_ignored_without_trusted_proxy(self):
|
||||
"""
|
||||
When trusted_proxy_ips is empty, XFF/X-Real-IP must be ignored.
|
||||
We fallback to immediate peer (proxy IP), which is not allowed -> 403.
|
||||
"""
|
||||
# Allow only the real client network, not the proxy itself
|
||||
webhook, _auth = self._make_proxy_webhook(
|
||||
allowed="203.0.113.0/24", trusted=None
|
||||
)
|
||||
|
||||
data = json.dumps({"k": "v"})
|
||||
proxy_ip = "10.0.0.5" # immediate peer (undocumented as trusted)
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"X-Forwarded-For": "203.0.113.7, 10.0.0.5", # should be ignored
|
||||
"X-Real-IP": "203.0.113.7", # should be ignored
|
||||
}
|
||||
with patch(
|
||||
"odoo.addons.cetmix_tower_webhook.controllers.main.CetmixTowerWebhookController._get_remote_addr",
|
||||
return_value=proxy_ip,
|
||||
):
|
||||
resp = self.url_open(
|
||||
self.url_for(webhook.endpoint), data=data, headers=headers
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, 403)
|
||||
self.assertIn(b"Address not allowed", resp.content)
|
||||
|
||||
def test_proxy_xff_honored_with_trusted_proxy(self):
|
||||
"""
|
||||
With trusted proxy configured, take the left-most IP from X-Forwarded-For.
|
||||
"""
|
||||
webhook, _auth = self._make_proxy_webhook(
|
||||
allowed="203.0.113.0/24",
|
||||
trusted="10.0.0.5",
|
||||
code="result = {'exit_code': 0, 'message': 'OK XFF'}",
|
||||
)
|
||||
|
||||
data = json.dumps({"k": "v"})
|
||||
proxy_ip = "10.0.0.5"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
# XFF list: client, proxy
|
||||
"X-Forwarded-For": "203.0.113.7, 10.0.0.5",
|
||||
}
|
||||
with patch(
|
||||
"odoo.addons.cetmix_tower_webhook.controllers.main.CetmixTowerWebhookController._get_remote_addr",
|
||||
return_value=proxy_ip,
|
||||
):
|
||||
resp = self.url_open(
|
||||
self.url_for(webhook.endpoint), data=data, headers=headers
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
self.assertIn(b"OK XFF", resp.content)
|
||||
|
||||
def test_proxy_x_real_ip_fallback_when_xff_missing(self):
|
||||
"""
|
||||
If XFF is missing/invalid but trusted proxy is set, fall back to X-Real-IP.
|
||||
"""
|
||||
webhook, _auth = self._make_proxy_webhook(
|
||||
allowed="203.0.113.0/24",
|
||||
trusted="10.0.0.5",
|
||||
code="result = {'exit_code': 0, 'message': 'OK X-Real-IP'}",
|
||||
)
|
||||
|
||||
data = json.dumps({"k": "v"})
|
||||
proxy_ip = "10.0.0.5"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"X-Forwarded-For": "garbage, not_an_ip", # invalids should be skipped
|
||||
"X-Real-IP": "203.0.113.8",
|
||||
}
|
||||
with patch(
|
||||
"odoo.addons.cetmix_tower_webhook.controllers.main.CetmixTowerWebhookController._get_remote_addr",
|
||||
return_value=proxy_ip,
|
||||
):
|
||||
resp = self.url_open(
|
||||
self.url_for(webhook.endpoint), data=data, headers=headers
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
self.assertIn(b"OK X-Real-IP", resp.content)
|
||||
|
||||
def test_proxy_invalid_headers_fall_back_to_immediate_peer(self):
|
||||
"""
|
||||
If headers are invalid even with trusted proxy, fall back to immediate peer.
|
||||
Since the proxy IP is not in allowlist, the request is denied.
|
||||
"""
|
||||
webhook, _auth = self._make_proxy_webhook(
|
||||
allowed="203.0.113.0/24", # does NOT include proxy IP
|
||||
trusted="10.0.0.5",
|
||||
)
|
||||
|
||||
data = json.dumps({"k": "v"})
|
||||
proxy_ip = "10.0.0.5"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"X-Forwarded-For": "not_an_ip, also_bad",
|
||||
"X-Real-IP": "bad_ip_value",
|
||||
}
|
||||
with patch(
|
||||
"odoo.addons.cetmix_tower_webhook.controllers.main.CetmixTowerWebhookController._get_remote_addr",
|
||||
return_value=proxy_ip,
|
||||
):
|
||||
resp = self.url_open(
|
||||
self.url_for(webhook.endpoint), data=data, headers=headers
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, 403)
|
||||
self.assertIn(b"Address not allowed", resp.content)
|
||||
|
||||
def test_proxy_allows_via_immediate_peer_when_proxy_ip_in_allowlist(self):
|
||||
"""
|
||||
If headers are ignored/invalid, but the proxy IP itself is allowed,
|
||||
access should be granted based on immediate peer.
|
||||
"""
|
||||
webhook, _auth = self._make_proxy_webhook(
|
||||
allowed="10.0.0.5", # allow the proxy itself
|
||||
trusted="", # no trusted proxies => headers ignored
|
||||
code="result = {'exit_code': 0, 'message': 'OK immediate peer'}",
|
||||
)
|
||||
|
||||
data = json.dumps({"k": "v"})
|
||||
proxy_ip = "10.0.0.5"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"X-Forwarded-For": "203.0.113.7", # should be ignored
|
||||
}
|
||||
with patch(
|
||||
"odoo.addons.cetmix_tower_webhook.controllers.main.CetmixTowerWebhookController._get_remote_addr",
|
||||
return_value=proxy_ip,
|
||||
):
|
||||
resp = self.url_open(
|
||||
self.url_for(webhook.endpoint), data=data, headers=headers
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
self.assertIn(b"OK immediate peer", resp.content)
|
||||
Reference in New Issue
Block a user