Skip to content

Add more logging for confluence perm-sync + handle case where permiss… #4586

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
93 changes: 77 additions & 16 deletions backend/ee/onyx/external_permissions/confluence/doc_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
from onyx.connectors.confluence.onyx_confluence import OnyxConfluence
from onyx.connectors.credentials_provider import OnyxDBCredentialsProvider
from onyx.connectors.models import SlimDocument
from onyx.db.document import (
get_document_ids_for_connector_credential_pair,
)
from onyx.db.engine import get_session_with_current_tenant
from onyx.db.models import ConnectorCredentialPair
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.utils.logger import setup_logger
Expand Down Expand Up @@ -160,9 +164,17 @@ def _get_space_permissions(

# Stores the permissions for each space
space_permissions_by_space_key[space_key] = space_permissions
logger.info(
f"Found space permissions for space '{space_key}': {space_permissions}"
)
if (
not space_permissions.is_public
and not space_permissions.external_user_emails
and not space_permissions.external_user_group_ids
):
logger.warning(
f"No permissions found for space '{space_key}'. This is very unlikely"
"to be correct and is more likely caused by an access token with"
"insufficient permissions. Make sure that the access token has Admin"
f"permissions for space '{space_key}'"
)

return space_permissions_by_space_key

Expand Down Expand Up @@ -275,6 +287,10 @@ def _get_all_page_restrictions(
# if inheriting restrictions from the parent, then the first one we run into
# should be applied (the reason why we'd traverse more than one ancestor is if
# the ancestor also is in "inherit" mode.)
logger.info(
f"Found user restrictions {ancestor_user_emails} and group restrictions {ancestor_group_names}"
f"for document {perm_sync_data.get('id')} based on ancestor {ancestor}"
)
return ExternalAccess(
external_user_emails=ancestor_user_emails,
external_user_group_ids=ancestor_group_names,
Expand Down Expand Up @@ -312,6 +328,7 @@ def _fetch_all_page_restrictions(
confluence_client=confluence_client,
perm_sync_data=slim_doc.perm_sync_data,
):
logger.info(f"Found restrictions {restrictions} for document {slim_doc.id}")
yield DocExternalAccess(
doc_id=slim_doc.id,
external_access=restrictions,
Expand All @@ -321,8 +338,9 @@ def _fetch_all_page_restrictions(

space_key = slim_doc.perm_sync_data.get("space_key")
if not (space_permissions := space_permissions_by_space_key.get(space_key)):
logger.debug(
f"Individually fetching space permissions for space {space_key}"
logger.warning(
f"Individually fetching space permissions for space {space_key}. This is "
"unexpected. It means the permissions were not able to fetched initially."
)
try:
# If the space permissions are not in the cache, then fetch them
Expand All @@ -345,6 +363,15 @@ def _fetch_all_page_restrictions(
logger.warning(
f"No permissions found for document {slim_doc.id} in space {space_key}"
)
# be safe, if we can't get the permissions then make the document inaccessible
yield DocExternalAccess(
doc_id=slim_doc.id,
external_access=ExternalAccess(
external_user_emails=set(),
external_user_group_ids=set(),
is_public=False,
),
)
continue

# If there are no restrictions, then use the space's restrictions
Expand All @@ -359,24 +386,23 @@ def _fetch_all_page_restrictions(
):
logger.warning(
f"Permissions are empty for document: {slim_doc.id}\n"
"This means space permissions are may be wrong for"
"This means space permissions may be wrong for"
f" Space key: {space_key}"
)

logger.debug("Finished fetching all page restrictions for space")
logger.info("Finished fetching all page restrictions")


def confluence_doc_sync(
cc_pair: ConnectorCredentialPair,
callback: IndexingHeartbeatInterface | None,
) -> Generator[DocExternalAccess, None, None]:
"""
Adds the external permissions to the documents in postgres
if the document doesn't already exists in postgres, we create
it in postgres so that when it gets created later, the permissions are
already populated
Fetches document permissions from Confluence and yields DocExternalAccess objects.
Compares fetched documents against existing documents in the DB for the connector.
If a document exists in the DB but not in the Confluence fetch, it's marked as restricted.
"""
logger.debug("Starting confluence doc sync")
logger.info(f"Starting confluence doc sync for CC Pair ID: {cc_pair.id}")
confluence_connector = ConfluenceConnector(
**cc_pair.connector.connector_specific_config
)
Expand All @@ -392,13 +418,16 @@ def confluence_doc_sync(
confluence_client=confluence_connector.confluence_client,
is_cloud=is_cloud,
)
logger.info("Space permissions by space key:")
for space_key, space_permissions in space_permissions_by_space_key.items():
logger.info(f"Space key: {space_key}, Permissions: {space_permissions}")

slim_docs = []
logger.debug("Fetching all slim documents from confluence")
slim_docs: list[SlimDocument] = []
logger.info("Fetching all slim documents from confluence")
for doc_batch in confluence_connector.retrieve_all_slim_documents(
callback=callback
):
logger.debug(f"Got {len(doc_batch)} slim documents from confluence")
logger.info(f"Got {len(doc_batch)} slim documents from confluence")
if callback:
if callback.should_stop():
raise RuntimeError("confluence_doc_sync: Stop signal detected")
Expand All @@ -407,11 +436,43 @@ def confluence_doc_sync(

slim_docs.extend(doc_batch)

logger.debug("Fetching all page restrictions for space")
# Find documents that are no longer accessible in Confluence
logger.info(f"Querying existing document IDs for CC Pair ID: {cc_pair.id}")
with get_session_with_current_tenant() as db_session:
existing_doc_ids = get_document_ids_for_connector_credential_pair(
db_session=db_session,
connector_id=cc_pair.connector.id,
credential_id=cc_pair.credential.id,
)

# Find missing doc IDs
fetched_doc_ids = {doc.id for doc in slim_docs}
missing_doc_ids = set(existing_doc_ids) - fetched_doc_ids

# Yield access removal for missing docs. Better to be safe.
if missing_doc_ids:
logger.warning(
f"Found {len(missing_doc_ids)} documents that are in the DB but "
"not present in Confluence fetch. Making them inaccessible."
)
for missing_id in missing_doc_ids:
logger.warning(f"Removing access for document ID: {missing_id}")
yield DocExternalAccess(
doc_id=missing_id,
external_access=ExternalAccess(
external_user_emails=set(),
external_user_group_ids=set(),
is_public=False,
),
)

logger.info("Fetching all page restrictions for fetched documents")
yield from _fetch_all_page_restrictions(
confluence_client=confluence_connector.confluence_client,
slim_docs=slim_docs,
space_permissions_by_space_key=space_permissions_by_space_key,
is_cloud=is_cloud,
callback=callback,
)

logger.info("Finished confluence doc sync")
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def _build_group_member_email_map(
) -> dict[str, set[str]]:
group_member_emails: dict[str, set[str]] = {}
for user in confluence_client.paginated_cql_user_retrieval():
logger.debug(f"Processing groups for user: {user}")
logger.info(f"Processing groups for user: {user}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider keeping this as debug level since it could be very noisy in production with many users


email = user.email
if not email:
Expand Down Expand Up @@ -52,7 +52,7 @@ def _build_group_member_email_map(
emit_background_error(msg, cc_pair_id=cc_pair_id)
logger.error(msg)
else:
logger.debug(f"Found groups {all_users_groups} for user with email {email}")
logger.info(f"Found groups {all_users_groups} for user with email {email}")

if not group_member_emails:
msg = "No groups found for any users."
Expand Down
5 changes: 5 additions & 0 deletions backend/ee/onyx/external_permissions/sync_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ def source_requires_external_group_sync(source: DocumentSource) -> bool:
}


def source_group_sync_is_cc_pair_agnostic(source: DocumentSource) -> bool:
"""Checks if the given DocumentSource requires external group syncing."""
return source in GROUP_PERMISSIONS_IS_CC_PAIR_AGNOSTIC


# If nothing is specified here, we run the doc_sync every time the celery beat runs
DOC_PERMISSION_SYNC_PERIODS: dict[DocumentSource, int] = {
# Polling is not supported so we fetch all doc permissions every 5 minutes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from sqlalchemy.orm import Session

from ee.onyx.external_permissions.sync_params import (
source_group_sync_is_cc_pair_agnostic,
)
from onyx.db.connector import mark_cc_pair_as_external_group_synced
from onyx.db.connector_credential_pair import get_connector_credential_pairs_for_source
from onyx.db.models import ConnectorCredentialPair


def _get_all_cc_pair_ids_to_mark_as_group_synced(
db_session: Session, cc_pair: ConnectorCredentialPair
) -> list[int]:
if not source_group_sync_is_cc_pair_agnostic(cc_pair.connector.source):
return [cc_pair.id]

cc_pairs = get_connector_credential_pairs_for_source(
db_session, cc_pair.connector.source
)
return [cc_pair.id for cc_pair in cc_pairs]


def mark_all_relevant_cc_pairs_as_external_group_synced(
db_session: Session, cc_pair: ConnectorCredentialPair
) -> None:
"""For some source types, one successful group sync run should count for all
cc pairs of that type. This function handles that case."""
cc_pair_ids = _get_all_cc_pair_ids_to_mark_as_group_synced(db_session, cc_pair)
for cc_pair_id in cc_pair_ids:
mark_cc_pair_as_external_group_synced(db_session, cc_pair_id)
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
from onyx.background.celery.apps.app_base import task_logger
from onyx.background.celery.celery_redis import celery_find_task
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
from onyx.background.celery.tasks.external_group_syncing.group_sync_utils import (
mark_all_relevant_cc_pairs_as_external_group_synced,
)
from onyx.background.error_logging import emit_background_error
from onyx.configs.app_configs import JOB_TIMEOUT
from onyx.configs.constants import CELERY_EXTERNAL_GROUP_SYNC_LOCK_TIMEOUT
Expand All @@ -38,8 +41,6 @@
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.factory import validate_ccpair_for_user
from onyx.db.connector import mark_cc_pair_as_external_group_synced
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.engine import get_session_with_current_tenant
from onyx.db.enums import AccessType
Expand Down Expand Up @@ -386,24 +387,6 @@ def connector_external_group_sync_generator_task(
f"No connector credential pair found for id: {cc_pair_id}"
)

try:
created = validate_ccpair_for_user(
cc_pair.connector.id,
cc_pair.credential.id,
db_session,
enforce_creation=False,
)
if not created:
task_logger.warning(
f"Unable to create connector credential pair for id: {cc_pair_id}"
)
except Exception:
task_logger.exception(
f"validate_ccpair_permissions_sync exceptioned: cc_pair={cc_pair_id}"
)
# TODO: add some notification to the admins here
raise

source_type = cc_pair.connector.source

ext_group_sync_func = GROUP_PERMISSIONS_FUNC_MAP.get(source_type)
Expand Down Expand Up @@ -440,7 +423,7 @@ def connector_external_group_sync_generator_task(
f"Synced {len(external_user_groups)} external user groups for {source_type}"
)

mark_cc_pair_as_external_group_synced(db_session, cc_pair.id)
mark_all_relevant_cc_pairs_as_external_group_synced(db_session, cc_pair)

update_sync_record_status(
db_session=db_session,
Expand Down
2 changes: 1 addition & 1 deletion backend/onyx/connectors/confluence/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from datetime import timedelta
from datetime import timezone
from typing import Any
from urllib.error import HTTPError
from urllib.parse import quote

from requests.exceptions import HTTPError
from typing_extensions import override

from onyx.configs.app_configs import CONFLUENCE_CONNECTOR_LABELS_TO_SKIP
Expand Down
Loading
Loading