Skip to content

[HZ-5410] PNCounter for Asyncio#804

Open
yuce wants to merge 2 commits intohazelcast:masterfrom
yuce:asyncio-pncounter
Open

[HZ-5410] PNCounter for Asyncio#804
yuce wants to merge 2 commits intohazelcast:masterfrom
yuce:asyncio-pncounter

Conversation

@yuce
Copy link
Copy Markdown
Contributor

@yuce yuce commented Apr 22, 2026

Straightforward port of PNCounter proxy and its test to the asyncio client.

Proxy:

Tests:

Diff:

1c1
< import functools
---
> import asyncio
5,7c5,7
< from hazelcast.future import Future
< from hazelcast.proxy.base import Proxy
< from hazelcast.cluster import VectorClock
---
> from hazelcast.errors import NoDataMemberInClusterError
> from hazelcast.internal.asyncio_cluster import VectorClock
> from hazelcast.internal.asyncio_proxy.base import Proxy
9d8
<     pn_counter_add_codec,
10a10
>     pn_counter_add_codec,
13d12
< from hazelcast.errors import NoDataMemberInClusterError
18c17
< class PNCounter(Proxy["BlockingPNCounter"]):
---
> class PNCounter(Proxy):
63c62
<     def get(self) -> Future[int]:
---
>     async def get(self) -> int:
74c73
<         return self._invoke_internal(pn_counter_get_codec)
---
>         return await self._invoke_internal(pn_counter_get_codec)
76c75
<     def get_and_add(self, delta: int) -> Future[int]:
---
>     async def get_and_add(self, delta: int) -> int:
92c91,93
<         return self._invoke_internal(pn_counter_add_codec, delta=delta, get_before_update=True)
---
>         return await self._invoke_internal(
>             pn_counter_add_codec, delta=delta, get_before_update=True
>         )
94c95
<     def add_and_get(self, delta: int) -> Future[int]:
---
>     async def add_and_get(self, delta: int) -> int:
110c111,113
<         return self._invoke_internal(pn_counter_add_codec, delta=delta, get_before_update=False)
---
>         return await self._invoke_internal(
>             pn_counter_add_codec, delta=delta, get_before_update=False
>         )
112c115
<     def get_and_subtract(self, delta: int) -> Future[int]:
---
>     async def get_and_subtract(self, delta: int) -> int:
128c131,133
<         return self._invoke_internal(pn_counter_add_codec, delta=-1 * delta, get_before_update=True)
---
>         return await self._invoke_internal(
>             pn_counter_add_codec, delta=-1 * delta, get_before_update=True
>         )
130c135
<     def subtract_and_get(self, delta: int) -> Future[int]:
---
>     async def subtract_and_get(self, delta: int) -> int:
146c151
<         return self._invoke_internal(
---
>         return await self._invoke_internal(
150c155
<     def get_and_decrement(self) -> Future[int]:
---
>     async def get_and_decrement(self) -> int:
162c167
<         return self._invoke_internal(pn_counter_add_codec, delta=-1, get_before_update=True)
---
>         return await self._invoke_internal(pn_counter_add_codec, delta=-1, get_before_update=True)
164c169
<     def decrement_and_get(self) -> Future[int]:
---
>     async def decrement_and_get(self) -> int:
176c181
<         return self._invoke_internal(pn_counter_add_codec, delta=-1, get_before_update=False)
---
>         return await self._invoke_internal(pn_counter_add_codec, delta=-1, get_before_update=False)
178c183
<     def get_and_increment(self) -> Future[int]:
---
>     async def get_and_increment(self) -> int:
190c195
<         return self._invoke_internal(pn_counter_add_codec, delta=1, get_before_update=True)
---
>         return await self._invoke_internal(pn_counter_add_codec, delta=1, get_before_update=True)
192c197
<     def increment_and_get(self) -> Future[int]:
---
>     async def increment_and_get(self) -> int:
204c209
<         return self._invoke_internal(pn_counter_add_codec, delta=1, get_before_update=False)
---
>         return await self._invoke_internal(pn_counter_add_codec, delta=1, get_before_update=False)
216,222c221,224
<     def blocking(self) -> "BlockingPNCounter":
<         return BlockingPNCounter(self)
< 
<     def _invoke_internal(self, codec, **kwargs):
<         delegated_future = Future()
<         self._set_result_or_error(delegated_future, [], None, codec, **kwargs)
<         return delegated_future
---
>     async def _invoke_internal(self, codec, **kwargs) -> int:
>         delegated_future = asyncio.get_running_loop().create_future()
>         await self._set_result_or_error(delegated_future, [], None, codec, **kwargs)
>         return await delegated_future
224c226
<     def _set_result_or_error(
---
>     async def _set_result_or_error(
227c229
<         target = self._get_crdt_operation_target(excluded_addresses)
---
>         target = await self._get_crdt_operation_target(excluded_addresses)
246,260d247
<         future = self._invoke_on_target(request, target.uuid, codec.decode_response)
< 
<         checker_func = functools.partial(
<             self._check_invocation_result,
<             delegated_future=delegated_future,
<             excluded_addresses=excluded_addresses,
<             target=target,
<             codec=codec,
<             **kwargs
<         )
<         future.add_done_callback(checker_func)
< 
<     def _check_invocation_result(
<         self, future, delegated_future, excluded_addresses, target, codec, **kwargs
<     ):
262c249
<             result = future.result()
---
>             result = await self._ainvoke_on_target(request, target.uuid, codec.decode_response)
272c259,261
<             self._set_result_or_error(delegated_future, excluded_addresses, ex, codec, **kwargs)
---
>             await self._set_result_or_error(
>                 delegated_future, excluded_addresses, ex, codec, **kwargs
>             )
274c263
<     def _get_crdt_operation_target(self, excluded_addresses):
---
>     async def _get_crdt_operation_target(self, excluded_addresses):
281c270
<         self._current_target_replica_address = self._choose_target_replica(excluded_addresses)
---
>         self._current_target_replica_address = await self._choose_target_replica(excluded_addresses)
284,285c273,274
<     def _choose_target_replica(self, excluded_addresses):
<         replica_addresses = self._get_replica_addresses(excluded_addresses)
---
>     async def _choose_target_replica(self, excluded_addresses):
>         replica_addresses = await self._get_replica_addresses(excluded_addresses)
293c282
<     def _get_replica_addresses(self, excluded_addresses):
---
>     async def _get_replica_addresses(self, excluded_addresses):
297c286
<         replica_count = self._get_max_configured_replica_count()
---
>         replica_count = await self._get_max_configured_replica_count()
309c298
<     def _get_max_configured_replica_count(self):
---
>     async def _get_max_configured_replica_count(self):
314c303
<         count = self._invoke(
---
>         count = await self._invoke(
316c305
<         ).result()
---
>         )
325c314,315
<     def _to_vector_clock(self, timestamps):
---
>     @classmethod
>     def _to_vector_clock(cls, timestamps):
333,402c323,324
< class BlockingPNCounter(PNCounter):
<     __slots__ = ("_wrapped", "name", "service_name")
< 
<     def __init__(self, wrapped: PNCounter):
<         self.name = wrapped.name
<         self.service_name = wrapped.service_name
<         self._wrapped = wrapped
< 
<     def get(  # type: ignore[override]
<         self,
<     ) -> int:
<         return self._wrapped.get().result()
< 
<     def get_and_add(  # type: ignore[override]
<         self,
<         delta: int,
<     ) -> int:
<         return self._wrapped.get_and_add(delta).result()
< 
<     def add_and_get(  # type: ignore[override]
<         self,
<         delta: int,
<     ) -> int:
<         return self._wrapped.add_and_get(delta).result()
< 
<     def get_and_subtract(  # type: ignore[override]
<         self,
<         delta: int,
<     ) -> int:
<         return self._wrapped.get_and_subtract(delta).result()
< 
<     def subtract_and_get(  # type: ignore[override]
<         self,
<         delta: int,
<     ) -> int:
<         return self._wrapped.subtract_and_get(delta).result()
< 
<     def get_and_decrement(  # type: ignore[override]
<         self,
<     ) -> int:
<         return self._wrapped.get_and_decrement().result()
< 
<     def decrement_and_get(  # type: ignore[override]
<         self,
<     ) -> int:
<         return self._wrapped.decrement_and_get().result()
< 
<     def get_and_increment(  # type: ignore[override]
<         self,
<     ) -> int:
<         return self._wrapped.get_and_increment().result()
< 
<     def increment_and_get(  # type: ignore[override]
<         self,
<     ) -> int:
<         return self._wrapped.increment_and_get().result()
< 
<     def reset(  # type: ignore[override]
<         self,
<     ) -> None:
<         self._wrapped.reset()
< 
<     def destroy(self) -> bool:
<         return self._wrapped.destroy()
< 
<     def blocking(self) -> "BlockingPNCounter":
<         return self
< 
<     def __repr__(self) -> str:
<         return self._wrapped.__repr__()
---
> async def create_pn_counter_proxy(service_name, name, context):
>     return PNCounter(service_name, name, context)

@yuce yuce changed the title PNCounter for Asyncio [HZ-5410] PNCounter for Asyncio Apr 22, 2026
@codecov-commenter
Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 94.48%. Comparing base (0b838ef) to head (359f771).

Additional details and impacted files
@@            Coverage Diff             @@
##           master     #804      +/-   ##
==========================================
+ Coverage   94.42%   94.48%   +0.05%     
==========================================
  Files         400      401       +1     
  Lines       26079    26186     +107     
==========================================
+ Hits        24626    24741     +115     
+ Misses       1453     1445       -8     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@yuce yuce requested a review from emreyigit April 22, 2026 08:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants