Tower: upload cetmix_tower_git 16.0.2.0.2 (via marketplace)
This commit is contained in:
415
addons/cetmix_tower_git/models/cx_tower_git_remote.py
Normal file
415
addons/cetmix_tower_git/models/cx_tower_git_remote.py
Normal file
@@ -0,0 +1,415 @@
|
||||
# Copyright (C) 2024 Cetmix OÜ
|
||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
||||
import giturlparse
|
||||
|
||||
from odoo import _, api, fields, models
|
||||
from odoo.exceptions import ValidationError
|
||||
|
||||
|
||||
class CxTowerGitRemote(models.Model):
|
||||
"""
|
||||
Git Remote.
|
||||
Implements single git remote.
|
||||
Eg a branch or a pull request.
|
||||
"""
|
||||
|
||||
_name = "cx.tower.git.remote"
|
||||
_inherit = [
|
||||
"cx.tower.reference.mixin",
|
||||
"cx.tower.yaml.mixin",
|
||||
]
|
||||
_description = "Cetmix Tower Git Remote"
|
||||
_order = "sequence, name"
|
||||
|
||||
# Used to detect git ssh urls
|
||||
GIT_SSH_URL_PATTERN = r"^[\w\.-]+@[\w\.-]+:.*\.git$"
|
||||
GIT_HTTPS_URL_PATTERN = r"^https://.*\.git$"
|
||||
GIT_GIT_URL_PATTERN = r"^git://.*\.git$"
|
||||
|
||||
active = fields.Boolean(related="source_id.active", store=True, readonly=True)
|
||||
enabled = fields.Boolean(
|
||||
default=True, help="Enable in configuration and exported to files"
|
||||
)
|
||||
sequence = fields.Integer(default=10)
|
||||
name = fields.Char(compute="_compute_name", store=True, default="remote")
|
||||
source_id = fields.Many2one(
|
||||
comodel_name="cx.tower.git.source",
|
||||
required=True,
|
||||
ondelete="cascade",
|
||||
auto_join=True,
|
||||
)
|
||||
git_project_id = fields.Many2one(
|
||||
comodel_name="cx.tower.git.project",
|
||||
related="source_id.git_project_id",
|
||||
store=True,
|
||||
readonly=True,
|
||||
)
|
||||
repo_id = fields.Many2one(
|
||||
comodel_name="cx.tower.git.repo",
|
||||
string="Repository",
|
||||
required=True,
|
||||
ondelete="restrict",
|
||||
help="If selected, the remote URL will be filled from the"
|
||||
" repo settings based on the remote protocol",
|
||||
)
|
||||
repo_provider = fields.Selection(
|
||||
related="repo_id.provider",
|
||||
readonly=True,
|
||||
)
|
||||
# -- Repo related fields
|
||||
url_protocol = fields.Selection(
|
||||
string="Protocol",
|
||||
selection=[
|
||||
("ssh", "SSH"),
|
||||
("https", "HTTPS"),
|
||||
("git", "GIT"),
|
||||
],
|
||||
required=True,
|
||||
default=lambda self: self._get_default_url_protocol(),
|
||||
)
|
||||
is_private = fields.Boolean(
|
||||
string="Private",
|
||||
help="Repository is private",
|
||||
related="repo_id.is_private",
|
||||
store=True,
|
||||
readonly=True,
|
||||
)
|
||||
head_type = fields.Selection(
|
||||
selection=[
|
||||
("branch", "Branch"),
|
||||
("pr", "Pull/Merge Request"),
|
||||
("commit", "Commit"),
|
||||
],
|
||||
required=True,
|
||||
)
|
||||
head = fields.Char(
|
||||
help="Git remote head. Link to branch, PR, commit or commit hash.",
|
||||
required=True,
|
||||
index=True,
|
||||
)
|
||||
|
||||
def _get_default_url_protocol(self):
|
||||
"""Default URL protocol for new remote.
|
||||
|
||||
Returns:
|
||||
Char: Default URL protocol.
|
||||
"""
|
||||
return "https"
|
||||
|
||||
@api.depends("source_id", "sequence")
|
||||
def _compute_name(self):
|
||||
"""
|
||||
Compute remote name.
|
||||
By default all remotes are named `remote_<position>`
|
||||
where position is the position of the remote in the source.
|
||||
Eg first remote is `remote_1`, second is `remote_2`, etc.
|
||||
"""
|
||||
for remote in self:
|
||||
if remote.source_id:
|
||||
for index, source_remote in enumerate(remote.source_id.remote_ids):
|
||||
source_remote.name = f"remote_{index + 1}"
|
||||
|
||||
@api.onchange("head")
|
||||
def onchange_head(self):
|
||||
"""
|
||||
Extract head number from head url
|
||||
and set it as head.
|
||||
"""
|
||||
for remote in self:
|
||||
if remote.head and "/" in remote.head:
|
||||
remote.head = self._sanitize_head(remote.head)
|
||||
|
||||
@api.model_create_multi
|
||||
def create(self, vals_list):
|
||||
# Sanitize head
|
||||
for vals in vals_list:
|
||||
head = vals.get("head")
|
||||
if head and "/" in head:
|
||||
vals["head"] = self._sanitize_head(head)
|
||||
res = super().create(vals_list)
|
||||
# Export project to related files and templates
|
||||
res._update_related_files_and_templates()
|
||||
return res
|
||||
|
||||
def write(self, vals):
|
||||
# Sanitize head
|
||||
if "head" in vals:
|
||||
head = vals["head"]
|
||||
if head and "/" in head:
|
||||
vals["head"] = self._sanitize_head(head)
|
||||
res = super().write(vals)
|
||||
# Update related files and templates on update
|
||||
self._update_related_files_and_templates()
|
||||
return res
|
||||
|
||||
def unlink(self):
|
||||
"""
|
||||
Override to update related files and templates on unlink
|
||||
"""
|
||||
related_files = self.mapped("git_project_id").mapped("git_project_rel_ids")
|
||||
related_templates = self.mapped("git_project_id").mapped(
|
||||
"git_project_file_template_rel_ids"
|
||||
)
|
||||
res = super().unlink()
|
||||
|
||||
# Update related files and templates on unlink
|
||||
if related_files:
|
||||
related_files._save_to_file()
|
||||
if related_templates:
|
||||
related_templates._save_to_file_template()
|
||||
return res
|
||||
|
||||
def _sanitize_head(self, head):
|
||||
"""Sanitize head.
|
||||
Extract head number from head url
|
||||
and set it as head.
|
||||
|
||||
Args:
|
||||
head (Char): Head to sanitize
|
||||
|
||||
Returns:
|
||||
Char: Sanitized head
|
||||
"""
|
||||
if head and "/" in head:
|
||||
return head.split("/")[-1].strip()
|
||||
return head
|
||||
|
||||
@api.model
|
||||
def get_head_data(self):
|
||||
"""
|
||||
This method is used to get values for the dropdown dynamic widget.
|
||||
It is designed for integrations with repo providers using APIs.
|
||||
|
||||
Returns:
|
||||
List: List of tuples(selection, name)
|
||||
eg [('18.0', '18.0'), ('main', 'main'), ('develop', 'develop')]
|
||||
"""
|
||||
values = [
|
||||
("18.0", "18.0"),
|
||||
("main", "Main"),
|
||||
("develop", "Develop"),
|
||||
("17.0", "17.0"),
|
||||
]
|
||||
return values
|
||||
|
||||
def _update_related_files_and_templates(self):
|
||||
# Update related files on update
|
||||
related_files = self.mapped("git_project_id").mapped("git_project_rel_ids")
|
||||
if related_files:
|
||||
related_files._save_to_file()
|
||||
related_templates = self.mapped("git_project_id").mapped(
|
||||
"git_project_file_template_rel_ids"
|
||||
)
|
||||
if related_templates:
|
||||
related_templates._save_to_file_template()
|
||||
|
||||
# ------------------------------
|
||||
# Reference mixin methods
|
||||
# ------------------------------
|
||||
def _get_pre_populated_model_data(self):
|
||||
res = super()._get_pre_populated_model_data()
|
||||
res.update({"cx.tower.git.remote": ["cx.tower.git.source", "source_id"]})
|
||||
return res
|
||||
|
||||
# ------------------------------
|
||||
# YAML mixin methods
|
||||
# ------------------------------
|
||||
def _get_fields_for_yaml(self):
|
||||
res = super()._get_fields_for_yaml()
|
||||
res += [
|
||||
"name",
|
||||
"enabled",
|
||||
"sequence",
|
||||
"repo_id",
|
||||
"head",
|
||||
"head_type",
|
||||
]
|
||||
return res
|
||||
|
||||
# ------------------------------
|
||||
# Git Aggregator related methods
|
||||
# ------------------------------
|
||||
def _git_aggregator_prepare_url(self):
|
||||
"""Prepare url for git aggregator
|
||||
|
||||
Returns:
|
||||
Char: Prepared url for git aggregator
|
||||
"""
|
||||
self.ensure_one()
|
||||
|
||||
if not self.repo_id:
|
||||
raise ValidationError(_("Repository is required"))
|
||||
if not self.repo_id.url:
|
||||
raise ValidationError(_("Repository URL is not set"))
|
||||
|
||||
url = self.repo_id.url
|
||||
prepared_url = giturlparse.parse(url).urls.get(self.url_protocol, url)
|
||||
|
||||
# If repo is public or is not using HTTPS protocol return URL as is
|
||||
if not self.is_private or self.url_protocol != "https":
|
||||
return prepared_url
|
||||
|
||||
if self.repo_provider == "github":
|
||||
prepared_url = self._git_aggregator_prepare_url_github(prepared_url)
|
||||
elif self.repo_provider == "gitlab":
|
||||
prepared_url = self._git_aggregator_prepare_url_gitlab(prepared_url)
|
||||
elif self.repo_provider == "bitbucket":
|
||||
prepared_url = self._git_aggregator_prepare_url_bitbucket(prepared_url)
|
||||
|
||||
return prepared_url
|
||||
|
||||
def _git_aggregator_prepare_url_github(self, url):
|
||||
"""Prepare url for git aggregator
|
||||
for private Github repo using https protocol.
|
||||
|
||||
Args:
|
||||
url (Char): URL to prepare
|
||||
|
||||
Returns:
|
||||
Char: Prepared url for git aggregator
|
||||
"""
|
||||
self.ensure_one()
|
||||
|
||||
# This is how final url will look like
|
||||
# https://$GITHUB_TOKEN:x-oauth-basic@github.com/soem_org/some_private_repo.git
|
||||
url_without_protocol = url.replace("https://", "")
|
||||
url = f"https://$GITHUB_TOKEN:x-oauth-basic@{url_without_protocol}"
|
||||
return url
|
||||
|
||||
def _git_aggregator_prepare_url_gitlab(self, url):
|
||||
"""Prepare url for git aggregator
|
||||
for private GitLab repo using https protocol.
|
||||
|
||||
Args:
|
||||
url (Char): URL to prepare
|
||||
|
||||
Returns:
|
||||
Char: Prepared url for git aggregator
|
||||
"""
|
||||
self.ensure_one()
|
||||
|
||||
# This is how final url will look like
|
||||
# https://<token-name>:<token-value>@<gitlaburl-repository>.git
|
||||
url_without_protocol = url.replace("https://", "")
|
||||
url = f"https://$GITLAB_TOKEN_NAME:$GITLAB_TOKEN@{url_without_protocol}"
|
||||
return url
|
||||
|
||||
def _git_aggregator_prepare_url_bitbucket(self, url):
|
||||
"""Prepare url for git aggregator
|
||||
for private Github repo using https protocol.
|
||||
|
||||
Args:
|
||||
url (Char): URL to prepare
|
||||
|
||||
Returns:
|
||||
Char: Prepared url for git aggregator
|
||||
"""
|
||||
self.ensure_one()
|
||||
|
||||
# This is how final url will look like
|
||||
# https://x-token-auth:{access_token}@bitbucket.org/user/repo.git
|
||||
# From https://support.atlassian.com/bitbucket-cloud/docs/use-oauth-on-bitbucket-cloud/
|
||||
url_without_protocol = url.replace("https://", "")
|
||||
url = f"https://x-token-auth:$BITBUCKET_TOKEN@{url_without_protocol}"
|
||||
return url
|
||||
|
||||
def _git_aggregator_prepare_head(self):
|
||||
"""Prepare head for git aggregator
|
||||
|
||||
Returns:
|
||||
Char: Prepared head for git aggregator
|
||||
"""
|
||||
self.ensure_one()
|
||||
if self.repo_provider == "github":
|
||||
return self._git_aggregator_prepare_head_github()
|
||||
if self.repo_provider == "gitlab":
|
||||
return self._git_aggregator_prepare_head_gitlab()
|
||||
if self.repo_provider == "bitbucket":
|
||||
return self._git_aggregator_prepare_head_bitbucket()
|
||||
return self.head
|
||||
|
||||
def _git_aggregator_prepare_head_github(self):
|
||||
"""Prepare head for git aggregator for Github.
|
||||
|
||||
Returns:
|
||||
Char: Prepared head for git aggregator
|
||||
"""
|
||||
|
||||
# Extract branch name, PR/MR or commit number from head
|
||||
head_number = self.head.split("/")[-1]
|
||||
if not head_number:
|
||||
raise ValidationError(
|
||||
_("Git Aggregator: " "Head number is empty in %(head)s", head=self.head)
|
||||
)
|
||||
|
||||
# PR/MR
|
||||
if self.head_type == "pr":
|
||||
return f"refs/pull/{head_number}/head"
|
||||
|
||||
# Commit
|
||||
if self.head_type in ["commit", "branch"]:
|
||||
return f"{head_number}"
|
||||
|
||||
# Fallback to original head
|
||||
return self.head
|
||||
|
||||
def _git_aggregator_prepare_head_gitlab(self):
|
||||
"""Prepare head for git aggregator for GitLab.
|
||||
|
||||
Returns:
|
||||
Char: Prepared head for git aggregator
|
||||
"""
|
||||
# Extract branch name, PR/MR or commit number from head
|
||||
head_number = self.head.split("/")[-1]
|
||||
if not head_number:
|
||||
raise ValidationError(
|
||||
_("Git Aggregator: " "Head number is empty in %(head)s", head=self.head)
|
||||
)
|
||||
|
||||
# PR/MR
|
||||
if self.head_type == "pr":
|
||||
return f"merge-requests/{head_number}/head"
|
||||
|
||||
# Commit
|
||||
# https://gitlab.com/cetmix/test/-/tree/17.0-test-branch?ref_type=heads
|
||||
if self.head_type in ["commit", "branch"]:
|
||||
head_parts = head_number.split("?")
|
||||
return f"{head_parts[0]}"
|
||||
|
||||
# Fallback to original head
|
||||
return self.head
|
||||
|
||||
def _git_aggregator_prepare_head_bitbucket(self):
|
||||
"""Prepare head for git aggregator for Bitbucket.
|
||||
|
||||
Returns:
|
||||
Char: Prepared head for git aggregator
|
||||
"""
|
||||
# Extract branch name, PR/MR or commit number from head
|
||||
head_number = self.head.split("/")[-1]
|
||||
if not head_number:
|
||||
raise ValidationError(
|
||||
_("Git Aggregator: " "Head number is empty in %(head)s", head=self.head)
|
||||
)
|
||||
# PR/MR
|
||||
if self.head_type == "pr":
|
||||
raise ValidationError(
|
||||
_(
|
||||
"Git Aggregator: "
|
||||
"Bitbucket does not support"
|
||||
" fetching PRs. Please use branch instead.\n\n"
|
||||
"Source: %(src)s\n"
|
||||
"URL: %(url)s\n"
|
||||
"Head: %(head)s",
|
||||
src=self.source_id.name,
|
||||
url=self.repo_id.url,
|
||||
head=self.head,
|
||||
)
|
||||
)
|
||||
|
||||
# Commit
|
||||
if self.head_type in ["commit", "branch"]:
|
||||
return f"{head_number}"
|
||||
|
||||
# Fallback to original head
|
||||
return self.head
|
||||
Reference in New Issue
Block a user