Skip to content

Commit ceabf1c

Browse files
author
Yuepeng Pan
committed
[FLINK-33653][runtime] Introduce a benchmark for balanced tasks scheduling.
1 parent 6810df8 commit ceabf1c

6 files changed

Lines changed: 590 additions & 0 deletions

File tree

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.scheduler.benchmark.slot.matching.resolver;
20+
21+
import org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
22+
import org.apache.flink.runtime.clusterframework.types.AllocationID;
23+
import org.apache.flink.runtime.clusterframework.types.ResourceID;
24+
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
25+
import org.apache.flink.runtime.jobgraph.JobVertexID;
26+
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
27+
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
28+
import org.apache.flink.runtime.scheduler.adaptive.allocator.*;
29+
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
30+
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
31+
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
32+
import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
33+
34+
import org.openjdk.jmh.annotations.Benchmark;
35+
import org.openjdk.jmh.annotations.BenchmarkMode;
36+
import org.openjdk.jmh.annotations.Level;
37+
import org.openjdk.jmh.annotations.Mode;
38+
import org.openjdk.jmh.annotations.Param;
39+
import org.openjdk.jmh.annotations.Setup;
40+
import org.openjdk.jmh.infra.Blackhole;
41+
import org.openjdk.jmh.runner.RunnerException;
42+
43+
import java.net.InetAddress;
44+
import java.net.UnknownHostException;
45+
import java.util.ArrayList;
46+
import java.util.Collection;
47+
import java.util.HashSet;
48+
import java.util.List;
49+
import java.util.Set;
50+
51+
/** The executor to drive {@link SlotMatchingResolver}. */
52+
public class SlotMatchingResolverBenchmarkExecutor extends SchedulerBenchmarkExecutorBase {
53+
54+
/**
55+
* We set the number of slots is very smaller than the number of task managers
56+
* to simulate the production environment to the greatest extent possible.
57+
*/
58+
public static final int SLOTS_PER_TASKS_MANAGER = 8;
59+
public static final int TASK_MANAGERS = 128;
60+
61+
private static final int requestedSlotSharingGroups = 3;
62+
private static final List<SlotSharingGroup> slotSharingGroups = new ArrayList<>();
63+
private static final Collection<ExecutionSlotSharingGroup> requestGroups = new ArrayList<>();
64+
private static final Collection<PhysicalSlot> slots = new ArrayList<>();
65+
66+
static {
67+
// For ResourceProfile.UNKNOWN.
68+
slotSharingGroups.add(new SlotSharingGroup());
69+
// For other resource profiles.
70+
for (int i = 1; i < requestedSlotSharingGroups; i++) {
71+
SlotSharingGroup sharingGroup = new SlotSharingGroup();
72+
sharingGroup.setResourceProfile(newGrainfinedResourceProfile(i));
73+
slotSharingGroups.add(sharingGroup);
74+
}
75+
// For requested groups and slots.
76+
for (int tmIndex = 0; tmIndex < TASK_MANAGERS; tmIndex++) {
77+
78+
TaskManagerLocation tml = getTaskManagerLocation(tmIndex + 1);
79+
80+
for (int slotIndex = 0; slotIndex < SLOTS_PER_TASKS_MANAGER; slotIndex++) {
81+
ResourceProfile profile = newGrainfinedResourceProfile(slotIndex);
82+
83+
slots.add(new TestingSlot(new AllocationID(), profile, tml));
84+
requestGroups.add(getExecutionSlotSharingGroup(slotIndex + 1, slotIndex));
85+
}
86+
}
87+
}
88+
89+
private static ExecutionSlotSharingGroup getExecutionSlotSharingGroup(
90+
int loading, int slotIndex) {
91+
Set<ExecutionVertexID> executionVertexIDSet = new HashSet<>();
92+
JobVertexID jobVertexID = new JobVertexID();
93+
for (int i = 0; i < loading; i++) {
94+
executionVertexIDSet.add(new ExecutionVertexID(jobVertexID, i));
95+
}
96+
return new ExecutionSlotSharingGroup(
97+
slotSharingGroups.get(slotIndex % 3), executionVertexIDSet);
98+
}
99+
100+
public static TaskManagerLocation getTaskManagerLocation(int dataPort) {
101+
try {
102+
InetAddress inetAddress = InetAddress.getByName("1.2.3.4");
103+
return new TaskManagerLocation(ResourceID.generate(), inetAddress, dataPort);
104+
} catch (UnknownHostException e) {
105+
throw new RuntimeException(e);
106+
}
107+
}
108+
109+
public static ResourceProfile newGrainfinedResourceProfile(int slotIndex) {
110+
return ResourceProfile.newBuilder()
111+
.setCpuCores(slotIndex % 2 == 0 ? 1 : 2)
112+
.setTaskHeapMemoryMB(100)
113+
.setTaskOffHeapMemoryMB(100)
114+
.setManagedMemoryMB(100)
115+
.build();
116+
}
117+
118+
@Param({"NONE", "SLOTS", "TASKS"})
119+
private TaskManagerLoadBalanceMode taskManagerLoadBalanceMode;
120+
121+
private SlotMatchingResolver slotMatchingResolver;
122+
123+
public static void main(String[] args) throws RunnerException {
124+
runBenchmark(SlotMatchingResolverBenchmarkExecutor.class);
125+
}
126+
127+
@Setup(Level.Trial)
128+
public void setup() throws Exception {
129+
slotMatchingResolver = getSlotMatchingResolver();
130+
}
131+
132+
@Benchmark
133+
@BenchmarkMode(Mode.SingleShotTime)
134+
public void runSlotsMatching(Blackhole blackhole) {
135+
blackhole.consume(
136+
slotMatchingResolver.matchSlotSharingGroupWithSlots(requestGroups, slots));
137+
}
138+
139+
private SlotMatchingResolver getSlotMatchingResolver() {
140+
switch (taskManagerLoadBalanceMode) {
141+
case NONE:
142+
this.slotMatchingResolver = SimpleSlotMatchingResolver.INSTANCE;
143+
break;
144+
case SLOTS:
145+
this.slotMatchingResolver =
146+
SlotsBalancedSlotMatchingResolver.INSTANCE;
147+
break;
148+
case TASKS:
149+
this.slotMatchingResolver =
150+
TasksBalancedSlotMatchingResolver.INSTANCE;
151+
break;
152+
default:
153+
throw new UnsupportedOperationException(
154+
String.format(
155+
"Unsupported task manager load balance mode '%s' in %s",
156+
taskManagerLoadBalanceMode,
157+
getClass().getName()));
158+
}
159+
return slotMatchingResolver;
160+
}
161+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.scheduler.benchmark.slot.matching.strategy;
20+
21+
import org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
22+
import org.apache.flink.runtime.clusterframework.types.AllocationID;
23+
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
24+
import org.apache.flink.runtime.jobmaster.SlotRequestId;
25+
import org.apache.flink.runtime.jobmaster.slotpool.PendingRequest;
26+
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
27+
import org.apache.flink.runtime.jobmaster.slotpool.RequestSlotMatchingStrategy;
28+
import org.apache.flink.runtime.jobmaster.slotpool.SimpleRequestSlotMatchingStrategy;
29+
import org.apache.flink.runtime.jobmaster.slotpool.TasksBalancedRequestSlotMatchingStrategy;
30+
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
31+
import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
32+
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
33+
import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
34+
35+
import org.openjdk.jmh.annotations.Benchmark;
36+
import org.openjdk.jmh.annotations.BenchmarkMode;
37+
import org.openjdk.jmh.annotations.Level;
38+
import org.openjdk.jmh.annotations.Mode;
39+
import org.openjdk.jmh.annotations.Param;
40+
import org.openjdk.jmh.annotations.Setup;
41+
import org.openjdk.jmh.infra.Blackhole;
42+
import org.openjdk.jmh.runner.RunnerException;
43+
44+
import java.util.ArrayList;
45+
import java.util.Collection;
46+
import java.util.Collections;
47+
import java.util.HashMap;
48+
49+
import static org.apache.flink.scheduler.benchmark.slot.matching.resolver.SlotMatchingResolverBenchmarkExecutor.SLOTS_PER_TASKS_MANAGER;
50+
import static org.apache.flink.scheduler.benchmark.slot.matching.resolver.SlotMatchingResolverBenchmarkExecutor.TASK_MANAGERS;
51+
import static org.apache.flink.scheduler.benchmark.slot.matching.resolver.SlotMatchingResolverBenchmarkExecutor.getTaskManagerLocation;
52+
import static org.apache.flink.scheduler.benchmark.slot.matching.resolver.SlotMatchingResolverBenchmarkExecutor.newGrainfinedResourceProfile;
53+
54+
/** The executor to drive {@link RequestSlotMatchingStrategy}. */
55+
public class RequestSlotMatchingStrategyBenchmarkExecutor
56+
extends SchedulerBenchmarkExecutorBase {
57+
58+
private static final Collection<PhysicalSlot> slots = new ArrayList<>();
59+
private static final Collection<PendingRequest> slotRequests = new ArrayList<>();
60+
61+
static {
62+
// For requested groups and slots.
63+
for (int tmIndex = 0; tmIndex < TASK_MANAGERS; tmIndex++) {
64+
65+
TaskManagerLocation tml = getTaskManagerLocation(tmIndex + 1);
66+
67+
for (int slotIndex = 0; slotIndex < SLOTS_PER_TASKS_MANAGER; slotIndex++) {
68+
ResourceProfile profile = newGrainfinedResourceProfile(slotIndex);
69+
70+
slots.add(new TestingSlot(new AllocationID(), profile, tml));
71+
slotRequests.add(getPendingRequest(slotIndex + 1, slotIndex));
72+
}
73+
}
74+
}
75+
76+
private static PendingRequest getPendingRequest(float loading, int slotIndex) {
77+
return PendingRequest.createNormalRequest(
78+
new SlotRequestId(),
79+
newGrainfinedResourceProfile(slotIndex),
80+
new DefaultLoadingWeight(loading),
81+
Collections.emptyList());
82+
}
83+
84+
@Param({"NONE", "TASKS"})
85+
private TaskManagerLoadBalanceMode taskManagerLoadBalanceMode;
86+
87+
private RequestSlotMatchingStrategy requestSlotMatchingStrategy;
88+
89+
public static void main(String[] args) throws RunnerException {
90+
runBenchmark(RequestSlotMatchingStrategyBenchmarkExecutor.class);
91+
}
92+
93+
@Setup(Level.Trial)
94+
public void setup() throws Exception {
95+
requestSlotMatchingStrategy = getRequestSlotMatchingStrategy();
96+
}
97+
98+
@Benchmark
99+
@BenchmarkMode(Mode.SingleShotTime)
100+
public void runSlotsMatching(Blackhole blackhole) {
101+
blackhole.consume(
102+
requestSlotMatchingStrategy.matchRequestsAndSlots(
103+
slots, slotRequests, new HashMap<>()));
104+
}
105+
106+
private RequestSlotMatchingStrategy getRequestSlotMatchingStrategy() {
107+
switch (taskManagerLoadBalanceMode) {
108+
case TASKS:
109+
this.requestSlotMatchingStrategy =
110+
TasksBalancedRequestSlotMatchingStrategy.INSTANCE;
111+
break;
112+
case NONE:
113+
this.requestSlotMatchingStrategy = SimpleRequestSlotMatchingStrategy.INSTANCE;
114+
break;
115+
default:
116+
throw new UnsupportedOperationException(
117+
String.format(
118+
"Unsupported task manager load balance mode '%s' in %s",
119+
taskManagerLoadBalanceMode,
120+
getClass().getName()));
121+
}
122+
return requestSlotMatchingStrategy;
123+
}
124+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package org.apache.flink.scheduler.benchmark.slot.sharing.resolver;
2+
3+
import org.apache.flink.configuration.SchedulerExecutionMode;
4+
import org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
5+
import org.apache.flink.runtime.JobException;
6+
import org.apache.flink.runtime.client.JobExecutionException;
7+
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
8+
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
9+
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
10+
import org.apache.flink.runtime.jobgraph.JobGraph;
11+
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
12+
import org.apache.flink.runtime.jobgraph.JobVertex;
13+
import org.apache.flink.runtime.scheduler.SchedulerBase;
14+
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
15+
import org.apache.flink.runtime.scheduler.adaptive.JobGraphJobInformation;
16+
import org.apache.flink.runtime.scheduler.adaptive.allocator.*;
17+
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
18+
19+
import java.util.Collection;
20+
import java.util.stream.Collectors;
21+
22+
import static org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore;
23+
24+
/** The benchmark of initializing {@link org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingResolver}. */
25+
public class SlotSharingResolverBenchmark {
26+
27+
private final JobInformation jobInformation;
28+
private final VertexParallelism vertexParallelism;
29+
private final TaskManagerLoadBalanceMode taskManagerLoadBalanceMode;
30+
31+
public SlotSharingResolverBenchmark(
32+
TaskManagerLoadBalanceMode taskManagerLoadBalanceMode, Collection<JobVertex> vertices) {
33+
this.taskManagerLoadBalanceMode = taskManagerLoadBalanceMode;
34+
final JobGraph jobGraph =
35+
JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertices(vertices).build();
36+
try {
37+
ExecutionGraph executionGraph =
38+
TestingDefaultExecutionGraphBuilder.newBuilder()
39+
.setJobGraph(jobGraph)
40+
.build(new DirectScheduledExecutorService());
41+
VertexParallelismStore vertexParallelismStore = computeVertexParallelismStore(jobGraph);
42+
this.jobInformation = new JobGraphJobInformation(jobGraph, vertexParallelismStore);
43+
this.vertexParallelism = new VertexParallelism(
44+
executionGraph.getAllVertices().values().stream()
45+
.collect(
46+
Collectors.toMap(
47+
AccessExecutionJobVertex::getJobVertexId,
48+
AccessExecutionJobVertex::getParallelism)));
49+
} catch (JobException | JobExecutionException e) {
50+
throw new RuntimeException(e);
51+
}
52+
}
53+
54+
public Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroup> invokeSlotSharingResolver() {
55+
SlotSharingResolver slotSharingResolver = createSlotSharingResolver();
56+
return slotSharingResolver.getExecutionSlotSharingGroups(jobInformation, vertexParallelism);
57+
}
58+
59+
private SlotSharingResolver createSlotSharingResolver() {
60+
switch (taskManagerLoadBalanceMode) {
61+
case NONE:
62+
return DefaultSlotSharingResolver.INSTANCE;
63+
case TASKS:
64+
return TaskBalancedSlotSharingResolver.INSTANCE;
65+
default:
66+
throw new UnsupportedOperationException(
67+
String.format(
68+
"Unsupported task manager load balance mode '%s' in %s",
69+
taskManagerLoadBalanceMode,
70+
getClass().getName()));
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)