15
15
16
16
import asyncio
17
17
import logging
18
+ import time
18
19
from asyncio .tasks import ALL_COMPLETED
19
20
from dataclasses import dataclass , replace
21
+ from datetime import timedelta
20
22
from math import isnan
21
- from typing import ( # pylint: disable=unused-import
22
- Any ,
23
- Dict ,
24
- Iterable ,
25
- List ,
26
- Optional ,
27
- Set ,
28
- Tuple ,
29
- )
23
+ from typing import Any , Dict , Iterable , List , Optional , Self , Set , Tuple
30
24
31
25
import grpc
32
26
from frequenz .channels import Bidirectional , Peekable , Receiver , Sender
@@ -62,6 +56,40 @@ class _User:
62
56
"""The bidirectional channel to communicate with the user."""
63
57
64
58
59
+ @dataclass
60
+ class _CacheEntry :
61
+ """Represents an entry in the cache with expiry time."""
62
+
63
+ inv_bat_pair : InvBatPair
64
+ """The inverter and adjacent battery data pair."""
65
+
66
+ expiry_time : int
67
+ """The expiration time (taken from the monotonic clock) of the cache entry."""
68
+
69
+ @classmethod
70
+ def from_ttl (
71
+ cls , inv_bat_pair : InvBatPair , ttl : timedelta = timedelta (hours = 2.5 )
72
+ ) -> Self :
73
+ """Initialize a CacheEntry instance from a TTL (Time-To-Live).
74
+
75
+ Args:
76
+ inv_bat_pair: the inverter and adjacent battery data pair to cache.
77
+ ttl: the time a cache entry is kept alive.
78
+
79
+ Returns:
80
+ this class instance.
81
+ """
82
+ return cls (inv_bat_pair , time .monotonic_ns () + int (ttl .total_seconds () * 1e9 ))
83
+
84
+ def has_expired (self ) -> bool :
85
+ """Check whether the cache entry has expired.
86
+
87
+ Returns:
88
+ whether the cache entry has expired.
89
+ """
90
+ return time .monotonic_ns () >= self .expiry_time
91
+
92
+
65
93
@actor
66
94
class PowerDistributingActor :
67
95
# pylint: disable=too-many-instance-attributes
@@ -211,6 +239,10 @@ def __init__(
211
239
max_data_age_sec = 10.0 ,
212
240
)
213
241
242
+ self ._cached_metrics : dict [int , _CacheEntry | None ] = {
243
+ bat_id : None for bat_id , _ in self ._bat_inv_map .items ()
244
+ }
245
+
214
246
def _create_users_tasks (self ) -> List [asyncio .Task [None ]]:
215
247
"""For each user create a task to wait for request.
216
248
@@ -224,37 +256,45 @@ def _create_users_tasks(self) -> List[asyncio.Task[None]]:
224
256
)
225
257
return tasks
226
258
227
- def _get_upper_bound (self , batteries : Set [int ]) -> float :
259
+ def _get_upper_bound (self , batteries : Set [int ], include_broken : bool ) -> float :
228
260
"""Get total upper bound of power to be set for given batteries.
229
261
230
262
Note, output of that function doesn't guarantee that this bound will be
231
263
the same when the request is processed.
232
264
233
265
Args:
234
266
batteries: List of batteries
267
+ include_broken: whether all batteries in the batteries set in the
268
+ request must be used regardless the status.
235
269
236
270
Returns:
237
271
Upper bound for `set_power` operation.
238
272
"""
239
- pairs_data : List [InvBatPair ] = self ._get_components_data (batteries )
273
+ pairs_data : List [InvBatPair ] = self ._get_components_data (
274
+ batteries , include_broken
275
+ )
240
276
return sum (
241
277
min (battery .power_upper_bound , inverter .active_power_upper_bound )
242
278
for battery , inverter in pairs_data
243
279
)
244
280
245
- def _get_lower_bound (self , batteries : Set [int ]) -> float :
281
+ def _get_lower_bound (self , batteries : Set [int ], include_broken : bool ) -> float :
246
282
"""Get total lower bound of power to be set for given batteries.
247
283
248
284
Note, output of that function doesn't guarantee that this bound will be
249
285
the same when the request is processed.
250
286
251
287
Args:
252
288
batteries: List of batteries
289
+ include_broken: whether all batteries in the batteries set in the
290
+ request must be used regardless the status.
253
291
254
292
Returns:
255
293
Lower bound for `set_power` operation.
256
294
"""
257
- pairs_data : List [InvBatPair ] = self ._get_components_data (batteries )
295
+ pairs_data : List [InvBatPair ] = self ._get_components_data (
296
+ batteries , include_broken
297
+ )
258
298
return sum (
259
299
max (battery .power_lower_bound , inverter .active_power_lower_bound )
260
300
for battery , inverter in pairs_data
@@ -282,21 +322,19 @@ async def run(self) -> None:
282
322
283
323
try :
284
324
pairs_data : List [InvBatPair ] = self ._get_components_data (
285
- request .batteries
325
+ request .batteries , request . include_broken
286
326
)
287
327
except KeyError as err :
288
328
await user .channel .send (Error (request = request , msg = str (err )))
289
329
continue
290
330
291
- if len ( pairs_data ) == 0 :
331
+ if not pairs_data and not request . include_broken :
292
332
error_msg = f"No data for the given batteries { str (request .batteries )} "
293
333
await user .channel .send (Error (request = request , msg = str (error_msg )))
294
334
continue
295
335
296
336
try :
297
- distribution = self .distribution_algorithm .distribute_power (
298
- request .power , pairs_data
299
- )
337
+ distribution = self ._get_power_distribution (request , pairs_data )
300
338
except ValueError as err :
301
339
error_msg = f"Couldn't distribute power, error: { str (err )} "
302
340
await user .channel .send (Error (request = request , msg = str (error_msg )))
@@ -379,6 +417,44 @@ async def _set_distributed_power(
379
417
380
418
return self ._parse_result (tasks , distribution .distribution , timeout_sec )
381
419
420
+ def _get_power_distribution (
421
+ self , request : Request , inv_bat_pairs : List [InvBatPair ]
422
+ ) -> DistributionResult :
423
+ """Get power distribution result for the batteries in the request.
424
+
425
+ Args:
426
+ request: the power request to process.
427
+ inv_bat_pairs: the battery and adjacent inverter data pairs.
428
+
429
+ Returns:
430
+ the power distribution result.
431
+ """
432
+ available_bat_ids = {battery .component_id for battery , _ in inv_bat_pairs }
433
+ unavailable_bat_ids = request .batteries - available_bat_ids
434
+ unavailable_inv_ids = {
435
+ self ._bat_inv_map [battery_id ] for battery_id in unavailable_bat_ids
436
+ }
437
+
438
+ if request .include_broken and not available_bat_ids :
439
+ return self .distribution_algorithm .distribute_power_equally (
440
+ request .power , unavailable_inv_ids
441
+ )
442
+
443
+ result = self .distribution_algorithm .distribute_power (
444
+ request .power , inv_bat_pairs
445
+ )
446
+
447
+ if request .include_broken and unavailable_inv_ids :
448
+ additional_result = self .distribution_algorithm .distribute_power_equally (
449
+ result .remaining_power , unavailable_inv_ids
450
+ )
451
+
452
+ for inv_id , power in additional_result .distribution .items ():
453
+ result .distribution [inv_id ] = power
454
+ result .remaining_power = 0.0
455
+
456
+ return result
457
+
382
458
def _check_request (self , request : Request ) -> Optional [Result ]:
383
459
"""Check whether the given request if correct.
384
460
@@ -388,6 +464,9 @@ def _check_request(self, request: Request) -> Optional[Result]:
388
464
Returns:
389
465
Result for the user if the request is wrong, None otherwise.
390
466
"""
467
+ if not request .batteries :
468
+ return Error (request = request , msg = "Empty battery IDs in the request" )
469
+
391
470
for battery in request .batteries :
392
471
if battery not in self ._battery_receivers :
393
472
msg = (
@@ -398,11 +477,11 @@ def _check_request(self, request: Request) -> Optional[Result]:
398
477
399
478
if not request .adjust_power :
400
479
if request .power < 0 :
401
- bound = self ._get_lower_bound (request .batteries )
480
+ bound = self ._get_lower_bound (request .batteries , request . include_broken )
402
481
if request .power < bound :
403
482
return OutOfBound (request = request , bound = bound )
404
483
else :
405
- bound = self ._get_upper_bound (request .batteries )
484
+ bound = self ._get_upper_bound (request .batteries , request . include_broken )
406
485
if request .power > bound :
407
486
return OutOfBound (request = request , bound = bound )
408
487
@@ -551,29 +630,15 @@ def _get_components_pairs(
551
630
552
631
return bat_inv_map , inv_bat_map
553
632
554
- def _get_working_batteries (self , batteries : Set [int ]) -> Set [int ]:
555
- """Get subset with working batteries.
556
-
557
- If none of the given batteries are working, then treat all of them
558
- as working.
559
-
560
- Args:
561
- batteries: requested batteries
562
-
563
- Returns:
564
- Subset with working batteries or input set if none of the given batteries
565
- are working.
566
- """
567
- working_batteries = self ._all_battery_status .get_working_batteries (batteries )
568
- if len (working_batteries ) == 0 :
569
- return batteries
570
- return working_batteries
571
-
572
- def _get_components_data (self , batteries : Set [int ]) -> List [InvBatPair ]:
633
+ def _get_components_data (
634
+ self , batteries : Set [int ], include_broken : bool
635
+ ) -> List [InvBatPair ]:
573
636
"""Get data for the given batteries and adjacent inverters.
574
637
575
638
Args:
576
639
batteries: Batteries that needs data.
640
+ include_broken: whether all batteries in the batteries set in the
641
+ request must be used regardless the status.
577
642
578
643
Raises:
579
644
KeyError: If any battery in the given list doesn't exists in microgrid.
@@ -582,7 +647,11 @@ def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]:
582
647
Pairs of battery and adjacent inverter data.
583
648
"""
584
649
pairs_data : List [InvBatPair ] = []
585
- working_batteries = self ._get_working_batteries (batteries )
650
+ working_batteries = (
651
+ batteries
652
+ if include_broken
653
+ else self ._all_battery_status .get_working_batteries (batteries )
654
+ )
586
655
587
656
for battery_id in working_batteries :
588
657
if battery_id not in self ._battery_receivers :
@@ -594,6 +663,12 @@ def _get_components_data(self, batteries: Set[int]) -> List[InvBatPair]:
594
663
inverter_id : int = self ._bat_inv_map [battery_id ]
595
664
596
665
data = self ._get_battery_inverter_data (battery_id , inverter_id )
666
+ if not data and include_broken :
667
+ cached_entry = self ._cached_metrics [battery_id ]
668
+ if cached_entry and not cached_entry .has_expired ():
669
+ data = cached_entry .inv_bat_pair
670
+ else :
671
+ data = None
597
672
if data is None :
598
673
_logger .warning (
599
674
"Skipping battery %d because its message isn't correct." ,
@@ -661,7 +736,9 @@ def _get_battery_inverter_data(
661
736
662
737
# If all values are ok then return them.
663
738
if not any (map (isnan , replaceable_metrics )):
664
- return InvBatPair (battery_data , inverter_data )
739
+ inv_bat_pair = InvBatPair (battery_data , inverter_data )
740
+ self ._cached_metrics [battery_id ] = _CacheEntry .from_ttl (inv_bat_pair )
741
+ return inv_bat_pair
665
742
666
743
# Replace NaN with the corresponding value in the adjacent component.
667
744
# If both metrics are None, return None to ignore this battery.
@@ -683,10 +760,12 @@ def _get_battery_inverter_data(
683
760
elif isnan (inv_bound ):
684
761
inverter_new_metrics [inv_attr ] = bat_bound
685
762
686
- return InvBatPair (
763
+ inv_bat_pair = InvBatPair (
687
764
replace (battery_data , ** battery_new_metrics ),
688
765
replace (inverter_data , ** inverter_new_metrics ),
689
766
)
767
+ self ._cached_metrics [battery_id ] = _CacheEntry .from_ttl (inv_bat_pair )
768
+ return inv_bat_pair
690
769
691
770
async def _create_channels (self ) -> None :
692
771
"""Create channels to get data of components in microgrid."""
0 commit comments