

我在SCons FAQ上找到了这个建议:




我们在scons discord中讨论了这个问题,并提出了一个示例,该示例将设置同步测试运行程序,以确保在运行测试时没有其他任务正在运行。


import SCons
# A bound map of stream (as in stream of work) name to side-effect
# file. Since SCons will not allow tasks with a shared side-effect
# to execute concurrently, this gives us a way to limit link jobs
# independently of overall SCons concurrency.
node_map = dict()
# A list of nodes that have to be run synchronously.
# sync node ensures the test runners are syncrhonous amongst
# themselves.
sync_nodes = list()
# this emitter will make a phony sideeffect per target
# the test builders will share all the other sideeffects making
# sure the tests only run when nothing else is running.
def sync_se_emitter(target, source, env):
name = str(target[0])
se_name = "#unique_node_" + str(hash(name))
se_node = node_map.get(se_name, None)
if not se_node:
se_node = env.Entry(se_name)
# This may not be necessary, but why chance it
node_map[se_name] = se_node
for sync_node in sync_nodes:
env.SideEffect(se_name, sync_node)
env.SideEffect(se_node, target)
return (target, source)
# here we force all builders to use the emitter, so all
# targets will respect the shared sideeffect when being built.
# NOTE: that the builders which should be synchronous must be listed
# by name, as SynchronousTestRunner is in this example
original_create_nodes = SCons.Builder.BuilderBase._create_nodes
def always_emitter_create_nodes(self, env, target = None, source = None):
if self.get_name(env) != "SynchronousTestRunner":
if self.emitter:
self.emitter = SCons.Builder.ListEmitter([self.emitter, sync_se_emitter])
self.emitter = SCons.Builder.ListEmitter([sync_se_emitter])
return original_create_nodes(self, env, target, source)
SCons.Builder.BuilderBase._create_nodes = always_emitter_create_nodes

env = Environment()
nodes = []
# this is a fake test runner which acts like its running a test
env['BUILDERS']["SynchronousTestRunner"] = SCons.Builder.Builder(
"sleep 1",
"echo Starting test $TARGET",
"sleep 5",
"echo Finished test $TARGET",
'echo done > $TARGET'],
# this emitter connects the test runners with the shared sideeffect
def sync_test_emitter(target, source, env):
for name in node_map:
env.SideEffect(name, target)
return (target, source)
env['BUILDERS']["SynchronousTestRunner"].emitter = SCons.Builder.ListEmitter([sync_test_emitter])
# in this test we create two test runners and make them depend on various source files
# being generated. This is just to force the tests to be run in the middle of
# the build. This will allow the example to demonstrate that all other jobs
# have paused so the test can be performed.
env.SynchronousTestRunner("test.out", "source10.c")
env.SynchronousTestRunner("test2.out", "source62.c")
for i in range(50):
nodes.append(env.Textfile(f"source{i}.c", f"int func{i}(){{return {i};}}"))
for i in range(50, 76):
node = env.Textfile(f"source{i}.c", f"int func{i}(){{return {i};}}")
env.Depends(node, "test.out")
for i in range(76, 100):
node = env.Textfile(f"source{i}.c", f"int func{i}(){{return {i};}}")
env.Depends(node, "test2.out")
nodes.append(env.Textfile('main.c', 'int main(){return 0;}'))
env.Program('out', nodes)



# Allows using functions `SyncBuilder` and `Environment.SyncCommand`.
from SyncBuild import SyncBuilder


from SCons.Builder import Builder, BuilderBase, ListEmitter
from SCons.Environment import Base as BaseEnvironment
# This code allows to build some targets synchronously, which means there won't
# be anything else built at the same time even if SCons is run with flag `-j`.
# This is achieved by adding a different dummy values as side effect of each
# target. (These files won't be created. They are only a way of enforcing
# constraints on SCons.)
# Then the files that need to be built synchronously have added every dummy
# value from the entire configuration as a side effect, which effectively
# prevents it from being built along with any other file.
# To create a synchronous target use `SyncBuilder`.
__processed_targets = set()
__lock_values = []
__synchronous_nodes = []
def __add_emiter_to_builder(builder, emitter):
if builder.emitter:
if isinstance(builder.emitter, ListEmitter):
if not any(x is emitter for x in builder.emitter):
builder.emitter = ListEmitter([builder.emitter, emitter])
builder.emitter = ListEmitter([emitter])
def __individual_sync_locks_emiter(target, source, env):
if not target or target[0] not in __processed_targets:
lock_value = env.Value(f'.#sync_lock_{len(__lock_values)}#')
env.SideEffect(lock_value, target + __synchronous_nodes)
return target, source
__original_create_nodes = BuilderBase._create_nodes
def __create_nodes_adding_emiter(self, *args, **kwargs):
__add_emiter_to_builder(self, __individual_sync_locks_emiter)
return __original_create_nodes(self, *args, **kwargs)
BuilderBase._create_nodes = __create_nodes_adding_emiter
def _all_sync_locks_emitter(target, source, env):
env.SideEffect(__lock_values, target)
return (target, source)
def SyncBuilder(*args, **kwargs):
"""It works like the normal `Builder` except it prevents the targets from
being built at the same time as any other target."""
target = Builder(*args, **kwargs)
__add_emiter_to_builder(target, _all_sync_locks_emitter)
return target
def __SyncBuilder(self, *args, **kwargs):
"""It works like the normal `Builder` except it prevents the targets from
being built at the same time as any other target."""
target = self.Builder(*args, **kwargs)
__add_emiter_to_builder(target, _all_sync_locks_emitter)
return target
BaseEnvironment.SyncBuilder = __SyncBuilder
def __SyncCommand(self, *args, **kwargs):
"""It works like the normal `Command` except it prevents the targets from
being built at the same time as any other target."""
target = self.Command(*args, **kwargs)
_all_sync_locks_emitter(target, [], self)
return target
BaseEnvironment.SyncCommand = __SyncCommand


env = Environment()
nodes = []
# this is a fake test runner which acts like its running a test
env['BUILDERS']["SynchronousTestRunner"] = SyncBuilder(
"sleep 1",
"echo Starting test $TARGET",
"sleep 5",
"echo Finished test $TARGET",
'echo done > $TARGET'],
# in this test we create two test runners and make them depend on various source files
# being generated. This is just to force the tests to be run in the middle of
# the build. This will allow the example to demonstrate that all other jobs
# have paused so the test can be performed.
env.SynchronousTestRunner("test.out", "source10.c")
env.SynchronousTestRunner("test2.out", "source62.c")
for i in range(50):
nodes.append(env.Textfile(f"source{i}.c", f"int func{i}(){{return {i};}}"))
for i in range(50, 76):
node = env.Textfile(f"source{i}.c", f"int func{i}(){{return {i};}}")
env.Depends(node, "test.out")
for i in range(76, 100):
node = env.Textfile(f"source{i}.c", f"int func{i}(){{return {i};}}")
env.Depends(node, "test2.out")
nodes.append(env.Textfile('main.c', 'int main(){return 0;}'))
env.Program('out', nodes)

