Skip to content

Remove Setters and Getters from Cluster's components #2802

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion .github/workflows/ci_plugins.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:

- name: Get changed files for plugins
id: changed-files-specific
uses: tj-actions/changed-files@v44
uses: tj-actions/changed-files@@v46.0.5

- name: Filter changed plugins and set output
id: set-matrix
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci_templates.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- name: Get changed templates
if: env.mode == 'changed'
id: changed-files-specific
uses: tj-actions/changed-files@v44
uses: tj-actions/changed-files@v46.0.5

- name: Filter changed templates and set output
if: env.mode == 'changed'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release_templates.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:

- name: Get changed files for templates
id: changed-files-specific
uses: tj-actions/changed-files@v44
uses: tj-actions/changed-files@v46.0.5

- name: Filter changed plugins and set output
id: set-matrix
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add redis cache inside `db.metadata` for quick multi-process loading
- Add redis plugin
- Add pydantic schema support
- Modify `DataBackend` to support multiple threads
- Remove setters and getters from cluster components.
- Add parameterizable `Builder` for `Datalayer`

#### Bug Fixes

Expand Down
13 changes: 0 additions & 13 deletions plugins/redis/superduper_redis/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,3 @@ def drop(self, force: bool = False):
:param uuid: Component uuid.
"""
self.redis.flushdb()

@property
def db(self):
"""Get the ``db``."""
return self._db

@db.setter
def db(self, value):
"""Set the ``db``.

:param value: The value to set the ``db`` to.
"""
self._db = value
13 changes: 0 additions & 13 deletions plugins/snowflake/superduper_snowflake/data_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,6 @@ def drop_table(self, table: str):
"""
self._run_query(f'DROP TABLE IF EXISTS {self.schema}."{table}"')

@property
def db(self):
"""Return the datalayer."""
return self._db

@db.setter
def db(self, value):
"""Set the datalayer.

:param value: The datalayer.
"""
self._db = value

def create_table_and_schema(self, identifier: str, schema: Schema, primary_id: str):
"""Create a schema in the data-backend.

Expand Down
20 changes: 8 additions & 12 deletions plugins/sql/superduper_sql/data_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,25 @@ def _create_connection(self):
@contextmanager
def get_connection(self):
"""Get a connection for the current thread, creating it if it doesn't exist."""
thread_name = threading.current_thread().name

if not hasattr(self.local, "connection"):
with self.lock: # Lock only during connection creation
self.local.connection, self.local.name, self.local.in_memory = (
self._create_connection()
)
logging.info(
f"Created new connection for thread"
f"'{threading.current_thread().name}'"
)

logging.info(f"Created new connection for thread '{thread_name}'")

try:
logging.debug(
f"Reusing connection for thread '{threading.current_thread().name}'"
)
logging.debug(f"Reusing connection for thread '{thread_name}'")
yield self.local.connection
except Exception as e:
# If there's a connection error,
# clear the thread's connection so a new one will be created next time
# If there's a connection error, clear the thread's connection
# so a new one will be created next time
if hasattr(self.local, "connection"):
delattr(self.local, "connection")
logging.error(
f"Connection error in thread '{threading.current_thread().name}: {e}'"
)
logging.error(f"Connection error in thread '{thread_name}: {e}'")
raise e


Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ dependencies = [
"httpx",
"toml",
"papermill>=2.6.0",
"deprecation",
"jupyter"
]

Expand Down
8 changes: 4 additions & 4 deletions superduper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .base import config, config_settings, configs, logger
from .base.superduper import superduper

# Ensure CFG is imported or defined
CFG = configs.CFG
ROOT = config_settings.ROOT

Expand Down Expand Up @@ -40,10 +41,10 @@
from .components.vector_index import VectorIndex

REQUIRES = [
'superduper=={}'.format(__version__),
f'superduper=={__version__}',
]

__all__ = (
__all__ = [
'CFG',
'ROOT',
'config',
Expand All @@ -68,10 +69,9 @@
'Table',
'Application',
'Template',
'Application',
'Component',
'trigger',
'pickle_serializer',
'dill_serializer',
'Streamlit',
)
]
22 changes: 4 additions & 18 deletions superduper/backends/base/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ def initialize_with_components(self):

This method is executed when a cluster is initialized.
"""
components = self.db.load_all('Table')
components = self._db.load_all('Table')
for component in components:
if isreallyinstance(component, self.cls):
for identifier in self.db.show(component):
component = self.db.load(component, identifier=identifier)
for identifier in self._db.show(component):
component = self._db.load(component, identifier=identifier)
self.put_component(component)

def build_tool(self, component: 'Component'):
Expand All @@ -54,10 +54,9 @@ def put_component(self, component: 'Component', **kwargs):
:param kwargs: kwargs dictionary.
"""
logging.info(
f'Putting component: {component.huuid} on to {self.__class__.__name__}'
f"Putting component '{component.huuid}' to '{self.__class__.__name__}'"
)
tool = self.build_tool(component)
tool.db = self.db
self.component_uuid_mapping[(component.component, component.identifier)].add(
component.uuid
)
Expand Down Expand Up @@ -155,16 +154,3 @@ def drop_component(self, component: str, identifier: str):
:param component: Component name.
:param identifier: Component identifier.
"""

@property
def db(self) -> 'Datalayer':
"""Get the ``db``."""
return self._db

@db.setter
def db(self, value: 'Datalayer'):
"""Set the ``db``.

:param value: ``Datalayer`` instance.
"""
self._db = value
13 changes: 0 additions & 13 deletions superduper/backends/base/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,3 @@ def handle_event(self, event_type, table, ids):
:param ids: The ids to handle.
"""
pass

@property
def db(self) -> 'Datalayer':
"""Get the ``db``."""
return self._db

@db.setter
def db(self, value: 'Datalayer'):
"""Set the ``db``.

:param value: ``Datalayer`` instance.
"""
self._db = value
42 changes: 7 additions & 35 deletions superduper/backends/base/cluster.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import dataclasses as dc
import time
import typing as t
from abc import ABC, abstractmethod

import click
import deprecated

from superduper.backends.base.cache import Cache
from superduper import logging
from superduper.backends.base.cdc import CDCBackend
from superduper.backends.base.compute import ComputeBackend
from superduper.backends.base.crontab import CrontabBackend
Expand All @@ -17,26 +17,21 @@
class Cluster(ABC):
"""Cluster object for managing the backend.

:param cache: The cache backend.
:param scheduler: The scheduler backend.
:param vector_search: The vector search backend.
:param compute: The compute backend.
:param cdc: The change data capture backend.
:param crontab: The crontab backend.
"""

cache: Cache
scheduler: BaseScheduler
vector_search: VectorSearchBackend
compute: ComputeBackend
cdc: CDCBackend
crontab: CrontabBackend

def __post_init__(self):
self._db = None

def drop(self, force: bool = False):
"""Drop all of the backends.
"""Drop all backends.

:param force: Skip confirmation.
"""
Expand Down Expand Up @@ -65,43 +60,20 @@ def build(cls, CFG, **kwargs):
"""
pass

@property
def db(self):
"""Get the ``db``."""
return self._db

@db.setter
def db(self, value):
"""Set the ``db``.

:param value: ``Datalayer`` instance.
"""
self._db = value
self.scheduler.db = value
self.vector_search.db = value
self.crontab.db = value
if self.compute is not None:
self.compute.db = value
self.cdc.db = value

@deprecated.deprecated(reason="Do we need this function ?")
def load_custom_plugins(self):
"""Load user plugins."""
from superduper import logging

if 'Plugin' in self.db.show('Table'):
if 'Plugin' in self._db.show('Table'):
logging.info("Found custom plugins - loading...")
for plugin in self.db.show('Plugin'):
for plugin in self._db.show('Plugin'):
logging.info(f"Loading plugin: {plugin}")
plugin = self.db.load('Plugin', plugin)
plugin = self._db.load('Plugin', plugin)

def initialize(self):
"""Initialize the cluster."""
from superduper import logging

start = time.time()
assert self.db

self.load_custom_plugins()

self.scheduler.initialize()
self.compute.initialize()
Expand Down
14 changes: 0 additions & 14 deletions superduper/backends/base/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from superduper.misc.importing import isreallysubclass

if t.TYPE_CHECKING:
from superduper.base.datalayer import Datalayer
from superduper.base.event import Job
from superduper.components.component import Component

Expand Down Expand Up @@ -71,19 +70,6 @@ def put_component(self, component: 'Component'):
:param component: Component to put.
"""

@property
def db(self) -> 'Datalayer':
"""Get the ``db``."""
return self._db

@db.setter
def db(self, value: 'Datalayer'):
"""Set the ``db``.

:param value: ``Datalayer`` instance.
"""
self._db = value

@abstractmethod
def drop_component(self, component: str, identifier: str):
"""Drop the component from compute.
Expand Down
26 changes: 0 additions & 26 deletions superduper/backends/base/data_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,6 @@ def random_id(self):
"""Generate random-id."""
pass

@property
def db(self):
"""Return the datalayer."""
return self._db

@db.setter
def db(self, value):
"""Set the datalayer.

:param value: The datalayer.
"""
self._db = value

@abstractmethod
def create_table_and_schema(
self, identifier: str, schema: 'Schema', primary_id: str
Expand Down Expand Up @@ -381,19 +368,6 @@ class DataBackendProxy:
def __init__(self, backend):
self._backend = backend

@property
def db(self):
"""Return the datalayer."""
return self._backend._datalayer

@db.setter
def db(self, value):
"""Set the datalayer.

:param value: The datalayer.
"""
self._backend._db = value

@property
def type(self):
"""Instance of databackend."""
Expand Down
Loading