#### Minimal, reproducible code sample:
```python
import zarr; import numpy as… np; import os
from multiprocessing import Process
import numcodecs; numcodecs.blosc.use_threads = False
synchronizer = zarr.sync.ProcessSynchronizer('foo.sync')
z = zarr.open_array(
store='example.zarr',
mode='w',
shape=(12),
chunks=(4),
fill_value=0,
compressor=None,
dtype='i4',
synchronizer=synchronizer
)
z[...]
def write_across_chunks(indices):
np.random.seed()
value = np.random.randint(0, 100)
zarrMultiprocessor = zarr.open_array('example.zarr', mode='r+', synchronizer=synchronizer)
zarrMultiprocessor[indices[0]:indices[1]] = value # write random number across 4 elements and 2 chunks
print(f'ProcessID: {os.getpid()}, Indices: {indices}, Value: "{value}", Zarr: [{zarrMultiprocessor[...]}]')
# simulate a number of processes, each writing across two chunks at a time
overlap_1_2 = [2, 6]
overlap_2_3 = [6, 10]
indices = [
overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3,
overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3,
overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3,
overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3,
overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3, overlap_1_2, overlap_2_3,
]
processes = []
for index in indices:
process = Process(target=write_across_chunks, args=[index])
processes.append(process)
process.start()
for i, process in enumerate(processes):
process.join()
```
#### Output:
```
ProcessID: 42649, Indices: [2:6], Value: "27", Zarr: [[ 0 0 27 27 27 27 0 0 0 0 0 0]]
ProcessID: 42652, Indices: [6:10], Value: "51", Zarr: [[ 0 0 27 27 27 27 51 51 51 51 0 0]]
ProcessID: 42653, Indices: [2:6], Value: "84", Zarr: [[ 0 0 84 84 84 84 51 51 51 51 0 0]]
ProcessID: 42654, Indices: [6:10], Value: "8", Zarr: [[ 0 0 84 84 84 84 61 61 8 8 0 0]] # problem!
ProcessID: 42656, Indices: [6:10], Value: "61", Zarr: [[ 0 0 10 10 84 84 61 61 61 61 0 0]] # problem!
ProcessID: 42657, Indices: [2:6], Value: "10", Zarr: [[ 0 0 10 10 10 10 61 61 61 61 0 0]]
ProcessID: 42658, Indices: [6:10], Value: "58", Zarr: [[ 0 0 10 10 10 10 58 58 58 58 0 0]]
ProcessID: 42655, Indices: [2:6], Value: "21", Zarr: [[ 0 0 21 21 21 21 58 58 58 58 0 0]]
ProcessID: 42662, Indices: [6:10], Value: "37", Zarr: [[ 0 0 77 77 21 21 37 37 37 37 0 0]] # problem!
ProcessID: 42664, Indices: [6:10], Value: "93", Zarr: [[ 0 0 7 7 21 21 7 7 93 93 0 0]] # problem!
ProcessID: 42650, Indices: [6:10], Value: "7", Zarr: [[ 0 0 52 52 21 21 7 7 7 7 0 0]] # problem!
ProcessID: 42651, Indices: [2:6], Value: "52", Zarr: [[ 0 0 52 52 52 52 7 7 7 7 0 0]]
ProcessID: 42659, Indices: [2:6], Value: "7", Zarr: [[ 0 0 18 18 7 7 7 7 7 7 0 0]] # problem!
ProcessID: 42668, Indices: [6:10], Value: "81", Zarr: [[ 0 0 18 18 7 7 81 81 81 81 0 0]] # problem!
ProcessID: 42669, Indices: [2:6], Value: "90", Zarr: [[ 0 0 90 90 90 90 81 81 81 81 0 0]]
ProcessID: 42667, Indices: [2:6], Value: "61", Zarr: [[ 0 0 61 61 61 61 84 84 84 84 0 0]]
ProcessID: 42660, Indices: [6:10], Value: "84", Zarr: [[ 0 0 61 61 61 61 84 84 84 84 0 0]]
ProcessID: 42661, Indices: [2:6], Value: "77", Zarr: [[ 0 0 61 61 77 77 84 84 84 84 0 0]] # problem!
ProcessID: 42673, Indices: [2:6], Value: "3", Zarr: [[ 0 0 3 3 3 3 84 84 84 84 0 0]]
ProcessID: 42674, Indices: [6:10], Value: "51", Zarr: [[ 0 0 89 89 3 3 1 1 51 51 0 0]] # problem!
ProcessID: 42672, Indices: [6:10], Value: "1", Zarr: [[ 0 0 89 89 3 3 1 1 1 1 0 0]] # problem!
ProcessID: 42663, Indices: [2:6], Value: "18", Zarr: [[ 0 0 89 89 18 18 1 1 1 1 0 0]] # problem!
ProcessID: 42675, Indices: [2:6], Value: "74", Zarr: [[ 0 0 74 74 74 74 1 1 1 1 0 0]]
ProcessID: 42665, Indices: [2:6], Value: "89", Zarr: [[ 0 0 74 74 89 89 1 1 1 1 0 0]] # problem!
ProcessID: 42671, Indices: [2:6], Value: "86", Zarr: [[ 0 0 86 86 86 86 1 1 1 1 0 0]]
ProcessID: 42677, Indices: [2:6], Value: "6", Zarr: [[ 0 0 6 6 6 6 33 33 33 33 0 0]]
ProcessID: 42676, Indices: [6:10], Value: "33", Zarr: [[ 0 0 6 6 6 6 33 33 33 33 0 0]]
ProcessID: 42678, Indices: [6:10], Value: "12", Zarr: [[ 0 0 6 6 6 6 12 12 12 12 0 0]]
ProcessID: 42666, Indices: [6:10], Value: "98", Zarr: [[ 0 0 6 6 6 6 98 98 98 98 0 0]]
ProcessID: 42670, Indices: [6:10], Value: "12", Zarr: [[ 0 0 6 6 6 6 12 12 12 12 0 0]]
```
#### Problem description:
I am prototyping for some larger data processing that will happen in the cloud and have been testing Zarr's parallel computing and synchronization capabilities on my local machine. The synchronization tools are not process-safe as I would expect.
Below is a toy example that creates a Zarr-store utilizing a ProcessSynchronizer to coordinate read and write protection when multiple processes access the same chunk in the store. The code initializes a store with 12 elements using a chunk-size of 4 for a total of 3 chunks. I spawn a number of processes where each writes across two chunks at a time. My expectation is that each of the print statements above would maintain a consistent set of numbers across chunks, e.g.:
```
Initially store is set to all zeros:
read --> [0, 0, 0, 0] [0, 0, 0, 0] [0, 0, 0, 0]
Process A writes ProcessSynchronizer-safe 8's across chunks 1 and 2:
write --> [_, _, 8, 8] [8, 8, _, _] [_, _, _, _]
Process B writes ProcessSynchronizer-safe 9's across chunks 2 and 3:
write --> [_, _, _, _] [_, _, 9, 9] [9, 9, _, _]
Final output:
read --> [0, 0, 8, 8] [8, 8, 9, 9] [9, 9, 0, 0]
```
Anywhere marked "problem!" above in the output is a situation where the process-safe writes return corrupted data.
Additional notes:
Blosc threading is disabled and compression isn't defined for the store. I have seen in other issues where that was a problem so that shouldn't be affecting read/writes in my example.
I have also tested and am getting similar results for a threaded application using the zarr.ThreadSynchronizer.
#### Version and installation information:
- zarr==2.10.2
- numcodecs==0.9.1
- python interpreter: 3.7.9
- Operating system (Linux/Windows/Mac): MacOS Big Sur 11.6
- How Zarr was installed: pip3 install into base o/s