Skip to content

Farmeronlywjb #19712

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions chia/_tests/core/farmer/test_farmer_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,13 @@ async def test_farmer_ignores_concurrent_duplicate_signage_points(
sp = farmer_protocol.NewSignagePoint(
std_hash(b"1"), std_hash(b"2"), std_hash(b"3"), uint64(1), uint64(1000000), uint8(2), uint32(1), uint32(0)
)

node_connection, _ = await add_dummy_connection_wsc(farmer_server, self_hostname, 8444, NodeType.FULL_NODE, [])

await gather(
farmer_api.new_signage_point(sp),
farmer_api.new_signage_point(sp),
farmer_api.new_signage_point(sp),
farmer_api.new_signage_point(sp, node_connection),
farmer_api.new_signage_point(sp, node_connection),
farmer_api.new_signage_point(sp, node_connection),
)
# Wait a bit for the queue to fill
await sleep(1)
Expand Down
18 changes: 16 additions & 2 deletions chia/_tests/core/test_farmer_harvester_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from chia_rs.sized_ints import uint8, uint32, uint64

from chia._tests.conftest import HarvesterFarmerEnvironment
from chia._tests.connection_utils import add_dummy_connection_wsc
from chia._tests.plot_sync.test_delta import dummy_plot
from chia._tests.util.misc import assert_rpc_error
from chia._tests.util.rpc import validate_get_routes
Expand All @@ -34,6 +35,7 @@
from chia.plotting.util import add_plot_directory
from chia.protocols import farmer_protocol
from chia.protocols.harvester_protocol import Plot
from chia.protocols.outbound_message import NodeType
from chia.simulator.block_tools import get_plot_dir
from chia.util.bech32m import decode_puzzle_hash, encode_puzzle_hash
from chia.util.config import load_config, lock_and_load_config, save_config
Expand Down Expand Up @@ -133,7 +135,13 @@ async def have_signage_points() -> bool:
sp = farmer_protocol.NewSignagePoint(
std_hash(b"1"), std_hash(b"2"), std_hash(b"3"), uint64(1), uint64(1000000), uint8(2), uint32(1), uint32(0)
)
await farmer_api.new_signage_point(sp)

farmer_server = farmer_service._server
node_connection, _ = await add_dummy_connection_wsc(
farmer_server, farmer_service.self_hostname, 8444, NodeType.FULL_NODE, []
)

await farmer_api.new_signage_point(sp, node_connection)

await time_out_assert(5, have_signage_points, True)
assert (await farmer_rpc_client.get_signage_point(std_hash(b"2"))) is not None
Expand Down Expand Up @@ -243,7 +251,13 @@ async def test_farmer_get_pool_state(
sp = farmer_protocol.NewSignagePoint(
std_hash(b"1"), std_hash(b"2"), std_hash(b"3"), uint64(1), uint64(1000000), uint8(2), uint32(1), uint32(0)
)
await farmer_api.new_signage_point(sp)

farmer_server = farmer_service._server
node_connection, _ = await add_dummy_connection_wsc(
farmer_server, farmer_service.self_hostname, 8444, NodeType.FULL_NODE, []
)

await farmer_api.new_signage_point(sp, node_connection)
client_pool_state = await farmer_rpc_client.get_pool_state()
for pool_dict in client_pool_state["pool_state"]:
for key in ["points_found_24h", "points_acknowledged_24h"]:
Expand Down
9 changes: 8 additions & 1 deletion chia/_tests/farmer_harvester/test_farmer_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from chia_rs.sized_ints import uint8, uint32, uint64

from chia._tests.conftest import HarvesterFarmerEnvironment
from chia._tests.connection_utils import add_dummy_connection_wsc
from chia._tests.util.split_managers import split_async_manager
from chia._tests.util.time_out_assert import time_out_assert
from chia.cmds.cmds_util import get_any_service_client
Expand Down Expand Up @@ -285,7 +286,13 @@ def state_changed(change: str, data: dict[str, Any]) -> None:

farmer.state_changed_callback = state_changed # type: ignore
_, sp_for_farmer_api = create_sp(index=2, challenge_hash=std_hash(b"4"))
await farmer_api.new_signage_point(sp_for_farmer_api)

farmer_server = farmer_service._server
node_connection, _ = await add_dummy_connection_wsc(
farmer_server, farmer_service.self_hostname, 8444, NodeType.FULL_NODE, []
)

await farmer_api.new_signage_point(sp_for_farmer_api, node_connection)
assert number_of_missing_sps == uint32(1)


Expand Down
10 changes: 9 additions & 1 deletion chia/_tests/farmer_harvester/test_filter_prefix_bits.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
from chia_rs.sized_ints import uint8, uint32, uint64

from chia._tests.conftest import ConsensusMode
from chia._tests.connection_utils import add_dummy_connection_wsc
from chia._tests.core.test_farmer_harvester_rpc import wait_for_plot_sync
from chia._tests.util.setup_nodes import setup_farmer_multi_harvester
from chia._tests.util.time_out_assert import time_out_assert
from chia.farmer.farmer_api import FarmerAPI
from chia.farmer.farmer_rpc_client import FarmerRpcClient
from chia.harvester.harvester_rpc_client import HarvesterRpcClient
from chia.protocols import farmer_protocol
from chia.protocols.outbound_message import NodeType
from chia.server.aliases import HarvesterService
from chia.simulator.block_tools import create_block_tools_async, test_constants
from chia.types.blockchain_format.proof_of_space import get_plot_id, passes_plot_filter
Expand Down Expand Up @@ -123,7 +125,13 @@ def state_has_changed() -> bool:
peak_height=peak_height,
last_tx_height=uint32(0),
)
await farmer_api.new_signage_point(sp)

harvester_server = harvester_service._server # just to chew on
node_connection, _ = await add_dummy_connection_wsc(
harvester_server, harvester_service.self_hostname, 8444, NodeType.FULL_NODE, []
)

await farmer_api.new_signage_point(sp, node_connection)
await time_out_assert(5, state_has_changed, True)
# We're intercepting the harvester's state changes as we're expecting
# a farming_info one.
Expand Down
103 changes: 102 additions & 1 deletion chia/farmer/farmer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
)
from chia.protocols.protocol_message_types import ProtocolMessageTypes
from chia.rpc.rpc_server import StateChangedProtocol, default_get_connections
from chia.server.node_discovery import FarmerPeers
from chia.server.server import ChiaServer, ssl_context_for_root
from chia.server.ws_connection import WSChiaConnection
from chia.ssl.create_ssl import get_mozilla_ca_crt
Expand Down Expand Up @@ -129,6 +130,10 @@ def __init__(
consensus_constants: ConsensusConstants,
local_keychain: Optional[Keychain] = None,
):
self.farmer_peers: Optional[FarmerPeers] = None

self.peer_with_sps: set[bytes32] = set()

self.keychain_proxy: Optional[KeychainProxy] = None
self.local_keychain = local_keychain
self._root_path = root_path
Expand All @@ -154,6 +159,8 @@ def __init__(

self.cache_clear_task: Optional[asyncio.Task[None]] = None
self.update_pool_state_task: Optional[asyncio.Task[None]] = None
self.sp_task: Optional[asyncio.Task[None]] = None

self.constants = consensus_constants
self._shut_down = False
self.server: Any = None
Expand All @@ -179,6 +186,88 @@ def __init__(
# Use to find missing signage points. (new_signage_point, time)
self.prev_signage_point: Optional[tuple[uint64, farmer_protocol.NewSignagePoint]] = None

async def _sp_task_handler(self) -> None:
log.debug("WJB _sp_task_handler")

while True:
await asyncio.sleep(120)

if self.farmer_peers is None:
continue

allgood = False
found = False
count = 0
ngcloseit = []
goodcloseit = []

for peer in self.server.get_connections(NodeType.FULL_NODE):
if peer in self.farmer_peers.farm_list:
goodcloseit.append(peer)
else:
if peer.peer_node_id in self.peer_with_sps:
allgood = True
found = True
count += 1
continue
if not found and peer.peer_node_id in self.peer_with_sps:
found = True
count += 1
continue
ngcloseit.append(peer)

untrusted = len(self.farmer_peers.farm_list)
log.debug(f"WJB _sp_task_handler allgood {allgood} found {found} count {count} untrusted {untrusted}")

self.peer_with_sps = set()

if found:
count = 0
else:
count += 1

if allgood:
count = 0
for peer in goodcloseit:
await peer.close()
else:
for peer in ngcloseit:
await peer.close()

removepeer = []
for peer in self.farmer_peers.farm_list:
if peer.closed:
removepeer.append(peer)
for peer in removepeer:
self.farmer_peers.farm_list.remove(peer)

self.farmer_peers.target_outbound_count = count

def initialize_farmer_peers(self) -> None:
log.debug("WJB initialize_farmer_peers")
network_name = self.config["selected_network"]
try:
default_port = self.config["network_overrides"]["config"][network_name]["default_full_node_port"]
except KeyError:
self.log.info("Default port field not found in config.")
default_port = None
connect_to_unknown_peers = self.config.get("connect_to_unknown_peers", True)
testing = self.config.get("testing", False)
if self.farmer_peers is None and connect_to_unknown_peers and not testing:
self.farmer_peers = FarmerPeers(
server=self.server,
target_outbound_count=0,
peers_file_path=self._root_path
/ Path(self.config.get("farmer_peers_file_path", "farmer/db/farmer_peers.dat")),
introducer_info=self.config["introducer_peer"],
dns_servers=self.config.get("dns_servers", ["dns-introducer.chia.net"]),
peer_connect_interval=self.config["peer_connect_interval"],
selected_network=network_name,
default_port=default_port,
log=self.log,
)
create_referenced_task(self.farmer_peers.start())

@contextlib.asynccontextmanager
async def manage(self) -> AsyncIterator[None]:
async def start_task() -> None:
Expand All @@ -188,7 +277,9 @@ async def start_task() -> None:
if await self.setup_keys():
self.update_pool_state_task = create_referenced_task(self._periodically_update_pool_state_task())
self.cache_clear_task = create_referenced_task(self._periodically_clear_cache_and_refresh_task())
self.sp_task = create_referenced_task(self._sp_task_handler())
log.debug("start_task: initialized")

self.started = True
return
await asyncio.sleep(1)
Expand All @@ -209,6 +300,12 @@ async def start_task() -> None:
await self.cache_clear_task
if self.update_pool_state_task is not None:
await self.update_pool_state_task
if self.sp_task is not None:
await self.sp_task
if self.farmer_peers is not None:
await self.farmer_peers.ensure_is_closed()
self.farmer_peers = None

if self.keychain_proxy is not None:
proxy = self.keychain_proxy
self.keychain_proxy = None
Expand Down Expand Up @@ -308,12 +405,16 @@ async def handshake_task() -> None:
await peer.send_message(msg)
self.harvester_handshake_task = None

if peer.connection_type is NodeType.HARVESTER:
if peer.connection_type == NodeType.HARVESTER:
self.plot_sync_receivers[peer.peer_node_id] = Receiver(peer, self.plot_sync_callback)
self.harvester_handshake_task = create_referenced_task(handshake_task())

def set_server(self, server: ChiaServer) -> None:
self.server = server
try:
self.initialize_farmer_peers()
except Exception:
self.log.debug("WJB self.initialize_farmer_peers failed")

def state_changed(self, change: str, data: dict[str, Any]) -> None:
if self.state_changed_callback is not None:
Expand Down
10 changes: 8 additions & 2 deletions chia/farmer/farmer_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,14 @@ async def respond_signatures(self, response: harvester_protocol.RespondSignature
FARMER PROTOCOL (FARMER <-> FULL NODE)
"""

@metadata.request()
async def new_signage_point(self, new_signage_point: farmer_protocol.NewSignagePoint) -> None:
@metadata.request(peer_required=True)
async def new_signage_point(
self, new_signage_point: farmer_protocol.NewSignagePoint, peer: WSChiaConnection
) -> None:
if self.farmer.farmer_peers is not None:
self.farmer.peer_with_sps.add(peer.peer_node_id)
self.farmer.log.debug(f"WJB new_signage_point {self.farmer.peer_with_sps}")

if new_signage_point.challenge_chain_sp not in self.farmer.sps:
self.farmer.sps[new_signage_point.challenge_chain_sp] = []
if new_signage_point in self.farmer.sps[new_signage_point.challenge_chain_sp]:
Expand Down
24 changes: 23 additions & 1 deletion chia/server/node_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class FullNodeDiscovery:
pending_outbound_connections: set[str] = field(default_factory=set)
pending_tasks: set[asyncio.Task[None]] = field(default_factory=set)
introducer_info_obj: Optional[UnresolvedPeerInfo] = field(default=None)
farm_list: list[WSChiaConnection] = field(default_factory=list)
is_farmer: bool = field(default=False)

def __post_init__(self) -> None:
random.shuffle(self.dns_servers) # Don't always start with the same DNS server
Expand Down Expand Up @@ -210,6 +212,8 @@ async def _query_dns(self, dns_address: str) -> None:
self.log.warning(f"querying DNS introducer failed: {e}")

async def on_connect_callback(self, peer: WSChiaConnection) -> None:
if self.is_farmer:
self.farm_list.append(peer)
if self.server.on_connect is not None:
await self.server.on_connect(peer)
else:
Expand Down Expand Up @@ -319,7 +323,7 @@ async def _connect_to_peers(self, random: Random) -> None:

is_feeler = False
has_collision = False
if self._num_needed_peers() == 0:
if not self.is_farmer and self._num_needed_peers() == 0:
if time.time() * 1000 * 1000 > next_feeler:
next_feeler = self._poisson_next_send(time.time() * 1000 * 1000, 240, random)
is_feeler = True
Expand Down Expand Up @@ -675,3 +679,21 @@ async def add_peers(
self, peer_list: list[TimestampedPeerInfo], peer_src: Optional[PeerInfo], is_full_node: bool
) -> None:
await self._add_peers_common(peer_list, peer_src, is_full_node)


@dataclass
class FarmerPeers(FullNodeDiscovery):
def __post_init__(self) -> None:
super().__post_init__()

async def start(self) -> None:
self.initial_wait = 1
self.is_farmer = True
self.farm_list = []
await self.initialize_address_manager()
await self.start_tasks()

async def ensure_is_closed(self) -> None:
if self.is_closed:
return None
await self._close_common()
6 changes: 6 additions & 0 deletions chia/util/initial-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ farmer:
full_node_peers:
- host: *self_hostname
port: 8444
introducer_peer:
host: introducer.chia.net # Chia AWS introducer IPv4/IPv6
port: 8444
enable_private_networks: False
peer_connect_interval: 60
testing: false

pool_public_keys: !!set {}

Expand Down
Loading