Skip to content

Commit df1e149

Browse files
...
1 parent 97b0e53 commit df1e149

File tree

2 files changed

+99
-14
lines changed

2 files changed

+99
-14
lines changed

src/cluster.jl

+15-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,18 @@ struct Node
2525
end
2626
end
2727

28+
struct Ctx
29+
cid::Integer
30+
xid::Integer
31+
function Ctx(cid, xid)
32+
@assert haskey(cluster_table[], cid)
33+
@assert xid <= length(cluster_table[][cid].contexts)
34+
@assert !isnothing(cluster_table[][cid].contexts[xid])
35+
new(cid, xid)
36+
end
37+
end
38+
39+
2840
cluster_table = Ref(Dict{Integer,ClusterInfo}())
2941

3042
function addcluster(access_node, nw; kwargs...)
@@ -106,7 +118,9 @@ Distributed.nprocs(cluster_handle::Cluster) = Distributed.remotecall_fetch(nproc
106118

107119
Distributed.procs(cluster_handle::Cluster) = Distributed.remotecall_fetch(procs, cluster_handle.cid; role=:master)
108120

109-
Distributed.workers(cluster_handle::Cluster) = Distributed.remotecall_fetch(workers, cluster_handle.cid; role=:master)
121+
Distributed.workers(cluster_handle::Cluster) = reduce(vcat, filter(!isnothing, cluster_table[][cluster_handle.cid].contexts)) # Distributed.remotecall_fetch(workers, cluster_handle.cid; role=:master)
122+
123+
Distributed.workers(cluster_handle::Ctx) = cluster_table[][cluster_handle.cid].contexts[cluster_handle.xid] # Distributed.remotecall_fetch(workers, cluster_handle.cid; role=:master)
110124

111125
contexts(cluster_handle::Cluster) = cluster_table[][cluster_handle.cid].contexts
112126

src/remotecall.jl

+84-13
Original file line numberDiff line numberDiff line change
@@ -20,35 +20,59 @@ future_table = Ref(Dict{Future,Integer}())
2020

2121

2222
function Distributed.remotecall(f, node_handle::Node, args...; kwargs...)
23+
cid = cluster_handle.cid
2324
pid = node_handle.pid
24-
r = remotecall(() -> remotecall(f, pid, args...; kwargs...), node_handle.cid)
25-
future_table[][r] = node_handle.cid
25+
r = remotecall(() -> remotecall(f, pid, args...; kwargs...), cid)
26+
future_table[][r] = cid
2627
return r
2728
end
2829

2930
function Distributed.remotecall(f, cluster_handle::Cluster, args...; kwargs...)
30-
r = remotecall(() -> asyncmap(w -> remotecall(f, w, args...; kwargs...), workers(role=:master)), cluster_handle.cid)
31-
future_table[][r] = cluster_handle.cid
31+
cid = cluster_handle.cid
32+
wids = workers(cluster_handle)
33+
r = remotecall(() -> asyncmap(w -> remotecall(f, w, args...; kwargs...), wids), cid)
34+
future_table[][r] = cid
3235
return r
3336
end
3437

38+
function Distributed.remotecall(f, cluster_handle::Ctx, args...; kwargs...)
39+
cid = cluster_handle.cid
40+
wids = workers(cluster_handle)
41+
r = remotecall(() -> asyncmap(w -> remotecall(f, w, args...; kwargs...), wids), cid)
42+
future_table[][r] = cid
43+
return r
44+
end
3545

3646

3747
function Distributed.remotecall_fetch(f, node_handle::Node, args...; kwargs...)
48+
cid = cluster_handle.cid
3849
pid = node_handle.pid
39-
remotecall_fetch(() -> remotecall_fetch(f, pid, args...; kwargs...), node_handle.cid)
50+
remotecall_fetch(() -> remotecall_fetch(f, pid, args...; kwargs...), cid)
4051
end
4152

4253
function Distributed.remotecall_fetch(f, cluster_handle::Cluster, args...; kwargs...)
4354
cid = cluster_handle.cid
44-
remotecall_fetch(() -> asyncmap(pid -> remotecall_fetch(f, pid, args...; kwargs...), workers(role=:master)), cid)
55+
wids = workers(cluster_handle)
56+
remotecall_fetch(() -> asyncmap(pid -> remotecall_fetch(f, pid, args...; kwargs...), wids), cid)
4557
end
4658

59+
function Distributed.remotecall_fetch(f, cluster_handle::Ctx, args...; kwargs...)
60+
cid = cluster_handle.cid
61+
wids = workers(cluster_handle)
62+
remotecall_fetch(() -> asyncmap(pid -> remotecall_fetch(f, pid, args...; kwargs...), wids), cid)
63+
end
4764

4865
function Distributed.remotecall_fetch(reducer, f, cluster_handle::Cluster, args...; kwargs...)
49-
remotecall_fetch(() -> reduce(reducer, asyncmap(w -> remotecall_fetch(f, w, args...; kwargs...), workers(role=:master))), cluster_handle.cid)
66+
cid = cluster_handle.cid
67+
wids = workers(cluster_handle)
68+
remotecall_fetch(() -> reduce(reducer, asyncmap(w -> remotecall_fetch(f, w, args...; kwargs...), wids)), cid)
5069
end
5170

71+
function Distributed.remotecall_fetch(reducer, f, cluster_handle::Ctx, args...; kwargs...)
72+
cid = cluster_handle.cid
73+
wids = workers(cluster_handle)
74+
remotecall_fetch(() -> reduce(reducer, asyncmap(w -> remotecall_fetch(f, w, args...; kwargs...), wids)), cid)
75+
end
5276

5377

5478
function Distributed.remotecall_wait(f, node_handle::Node, args...; kwargs...)
@@ -59,8 +83,18 @@ function Distributed.remotecall_wait(f, node_handle::Node, args...; kwargs...)
5983
end
6084

6185
function Distributed.remotecall_wait(f, cluster_handle::Cluster, args...; kwargs...)
62-
r = remotecall_wait(() -> asyncmap(w -> remotecall_wait(f, w, args...; kwargs...), workers(role=:master)), cluster_handle.cid)
63-
future_table[][r] = cluster_handle.cid
86+
cid = cluster_handle.cid
87+
wids = workers(cluster_handle)
88+
r = remotecall_wait(() -> asyncmap(w -> remotecall_wait(f, w, args...; kwargs...), wids), cid)
89+
future_table[][r] = cid
90+
return r
91+
end
92+
93+
function Distributed.remotecall_wait(f, cluster_handle::Ctx, args...; kwargs...)
94+
cid = cluster_handle.cid
95+
wids = workers(cluster_handle)
96+
r = remotecall_wait(() -> asyncmap(w -> remotecall_wait(f, w, args...; kwargs...),wids), cid)
97+
future_table[][r] = cid
6498
return r
6599
end
66100

@@ -90,7 +124,15 @@ function Distributed.remote_do(f, node_handle::Node, args...; kwargs...)
90124
end
91125

92126
function Distributed.remote_do(f, cluster_handle::Cluster, args...; kwargs...)
93-
remote_do(() -> for w in workers(role=:master) remote_do(f, w, args...; kwargs...) end, cluster_handle.cid)
127+
cid = cluster_handle.cid
128+
wids = workers(cluster_handle)
129+
remote_do(() -> for w in wids remote_do(f, w, args...; kwargs...) end, cid)
130+
end
131+
132+
function Distributed.remote_do(f, cluster_handle::Ctx, args...; kwargs...)
133+
cid = cluster_handle.cid
134+
wids = workers(cluster_handle)
135+
remote_do(() -> for w in wids remote_do(f, w, args...; kwargs...) end, cid)
94136
end
95137

96138
# @spawn ???
@@ -100,7 +142,18 @@ end
100142
macro spawnat_cluster(cluster_handle, arg)
101143
quote
102144
cid = $cluster_handle.cid
103-
f = @spawnat(cid, asyncmap(w->@spawnat(w, $arg), workers(role=:master)))
145+
wids = workers($cluster_handle)
146+
f = @spawnat(cid, asyncmap(w->@spawnat(w, $arg), $wids))
147+
$future_table[][f] = cid
148+
f
149+
end
150+
end
151+
152+
macro spawnat_context(cluster_handle, arg)
153+
quote
154+
cid = $cluster_handle.cid
155+
wids = workers($cluster_handle)
156+
f = @spawnat(cid, asyncmap(w->@spawnat(w, $arg), $wids))
104157
$future_table[][f] = cid
105158
f
106159
end
@@ -120,14 +173,32 @@ end
120173
macro fetchfrom_cluster(cluster_handle, arg)
121174
quote
122175
cid = $cluster_handle.cid
123-
@fetchfrom(cid, asyncmap(w->@fetchfrom(w, $arg), workers(role=:master)))
176+
wids = workers($cluster_handle)
177+
@fetchfrom(cid, asyncmap(w->@fetchfrom(w, $arg), $wids))
178+
end
179+
end
180+
181+
macro fetchfrom_context(cluster_handle, arg)
182+
quote
183+
cid = $cluster_handle.cid
184+
wids = workers($cluster_handle)
185+
@fetchfrom(cid, asyncmap(w->@fetchfrom(w, $arg),$wids))
124186
end
125187
end
126188

127189
macro fetchfrom_cluster(reducer, cluster_handle, arg)
128190
quote
129191
cid = $cluster_handle.cid
130-
@fetchfrom(cid, reduce($reducer, asyncmap(w->@fetchfrom(w, $arg), workers(role=:master))))
192+
wids = workers($cluster_handle)
193+
@fetchfrom(cid, reduce($reducer, asyncmap(w->@fetchfrom(w, $arg), $wids)))
194+
end
195+
end
196+
197+
macro fetchfrom_context(reducer, cluster_handle, arg)
198+
quote
199+
cid = $cluster_handle.cid
200+
wids = workers($cluster_handle)
201+
@fetchfrom(cid, reduce($reducer, asyncmap(w->@fetchfrom(w, $arg), $wids)))
131202
end
132203
end
133204

0 commit comments

Comments
 (0)