有没有办法在一个线程中异步运行两个任务



我正在开发一款在主线程上运行密集操作的软件产品。设计不支持在单独的线程上运行它们,也不会更改。

同时,我们需要处理来自UI的鼠标移动。在一种情况下,鼠标光标冻结是因为主线程正忙于计算。

引入异步操作似乎是一个很好的例子:在一个单独的线程中异步运行计算,而主线程仍在处理鼠标移动。但正如我之前所说,目前的设计不支持它。

最近,我发现了一个在一个线程中异步运行两个任务的想法。这意味着线程上下文在两个任务之间切换,每个任务都会部分执行一段时间,直到每个任务都完成为止。

这在C++中可能吗?语言的版本(11或14(并不重要
该软件使用WinApi和标准消息队列来接收鼠标事件。

尝试查看Microsoft PPL,但据我所知,lib在这种情况下没有帮助
感谢大家的帮助。

您想要的是协作多任务。这在单个线程上是可能的。您可以查看协程,例如在boost或标准库中(从C++20开始(。

你也可以推出自己的精简版。关键因素是:

  • 每个任务都需要存储其上下文(例如参数(本身
  • 每个任务都需要一种暂停和恢复操作的方法。它自己决定何时暂停
  • 您可能需要某种形式的调度器来跟踪所有任务并经常运行它们。您可能希望以这样的方式设计它,即GUI主循环调用您的调度器,通过将可用时间预算传递给它跟踪的每个任务,调度器最多运行约30-50ms

如果线程根本不是一个选项,那么这是非常可行的。

Boost.Corutine、Boost.Context和Boost.Asio都支持某种级别的单线程并发。推论是协作的、可重入的、可中断的、可恢复的函数。上下文是用户-土地上下文切换。Asio执行器可以安排许多不同的任务在一个线程上运行。对于您的情况,我认为您可以自行选择在应用程序中放入哪些内容。

编辑

Fiber在上下文库的顶部实现了类似于迷你线程的"fibers"。

以下是我如何实现自己的从运行到完成的协作多任务处理:

enum class eStep
{
START,
STEP1,
STEP2,
DONE
};
struct sLongFuncContext
{
//whatver is meaning full to go from one step to the next
};
eStep long_func_split_in_steps(eStep aStep,sLongFuncContext &aContext)
{
eStep next;
switch (aStep)
{
case eStep::START:
// execute first part of func, save context
next = eStep::STEP1;
break;
case eStep::STEP1:
// execute 2nd part of func, save context
next = eStep::STEP2;
break;
case eStep::STEP2:
next = eStep::DONE;
break;
// repeat 
};
return (next);
}
int main()
{
eStep step = eStep::START;
sLongFuncContext context;
while (step != eStep::DONE)
{
// do a part of the long function
step = long_func_split_in_steps(step,context);
// handle mouse events
// ...
}

return 0;
}

由于您的目标是windows,但无法访问c++20协程(使用旧编译器(,因此可以使用winapiFibers,它就像重型协程一样。

这里记录了它:光纤Win32应用程序

这是一个使用它的例子:

#include <windows.h>
#include <tchar.h>
#include <stdio.h>
VOID
__stdcall
ReadFiberFunc(LPVOID lpParameter);
VOID
__stdcall
WriteFiberFunc(LPVOID lpParameter);
void DisplayFiberInfo(void);
typedef struct
{
DWORD dwParameter;          // DWORD parameter to fiber (unused)
DWORD dwFiberResultCode;    // GetLastError() result code
HANDLE hFile;               // handle to operate on
DWORD dwBytesProcessed;     // number of bytes processed
} FIBERDATASTRUCT, *PFIBERDATASTRUCT, *LPFIBERDATASTRUCT;
#define RTN_OK 0
#define RTN_USAGE 1
#define RTN_ERROR 13
#define BUFFER_SIZE 32768   // read/write buffer size
#define FIBER_COUNT 3       // max fibers (including primary)
#define PRIMARY_FIBER 0 // array index to primary fiber
#define READ_FIBER 1    // array index to read fiber
#define WRITE_FIBER 2   // array index to write fiber
LPVOID g_lpFiber[FIBER_COUNT];
LPBYTE g_lpBuffer;
DWORD g_dwBytesRead;
int __cdecl _tmain(int argc, TCHAR *argv[])
{
LPFIBERDATASTRUCT fs;
if (argc != 3)
{
printf("Usage: %s <SourceFile> <DestinationFile>n", argv[0]);
return RTN_USAGE;
}
//
// Allocate storage for our fiber data structures
//
fs = (LPFIBERDATASTRUCT) HeapAlloc(
GetProcessHeap(), 0,
sizeof(FIBERDATASTRUCT) * FIBER_COUNT);
if (fs == NULL)
{
printf("HeapAlloc error (%d)n", GetLastError());
return RTN_ERROR;
}
//
// Allocate storage for the read/write buffer
//
g_lpBuffer = (LPBYTE)HeapAlloc(GetProcessHeap(), 0, BUFFER_SIZE);
if (g_lpBuffer == NULL)
{
printf("HeapAlloc error (%d)n", GetLastError());
return RTN_ERROR;
}
//
// Open the source file
//
fs[READ_FIBER].hFile = CreateFile(
argv[1],
GENERIC_READ,
FILE_SHARE_READ,
NULL,
OPEN_EXISTING,
FILE_FLAG_SEQUENTIAL_SCAN,
NULL
);
if (fs[READ_FIBER].hFile == INVALID_HANDLE_VALUE)
{
printf("CreateFile error (%d)n", GetLastError());
return RTN_ERROR;
}
//
// Open the destination file
//
fs[WRITE_FIBER].hFile = CreateFile(
argv[2],
GENERIC_WRITE,
0,
NULL,
CREATE_NEW,
FILE_FLAG_SEQUENTIAL_SCAN,
NULL
);
if (fs[WRITE_FIBER].hFile == INVALID_HANDLE_VALUE)
{
printf("CreateFile error (%d)n", GetLastError());
return RTN_ERROR;
}
//
// Convert thread to a fiber, to allow scheduling other fibers
//
g_lpFiber[PRIMARY_FIBER]=ConvertThreadToFiber(&fs[PRIMARY_FIBER]);
if (g_lpFiber[PRIMARY_FIBER] == NULL)
{
printf("ConvertThreadToFiber error (%d)n", GetLastError());
return RTN_ERROR;
}
//
// Initialize the primary fiber data structure.  We don't use
// the primary fiber data structure for anything in this sample.
//
fs[PRIMARY_FIBER].dwParameter = 0;
fs[PRIMARY_FIBER].dwFiberResultCode = 0;
fs[PRIMARY_FIBER].hFile = INVALID_HANDLE_VALUE;
//
// Create the Read fiber
//
g_lpFiber[READ_FIBER]=CreateFiber(0,ReadFiberFunc,&fs[READ_FIBER]);
if (g_lpFiber[READ_FIBER] == NULL)
{
printf("CreateFiber error (%d)n", GetLastError());
return RTN_ERROR;
}
fs[READ_FIBER].dwParameter = 0x12345678;
//
// Create the Write fiber
//
g_lpFiber[WRITE_FIBER]=CreateFiber(0,WriteFiberFunc,&fs[WRITE_FIBER]);
if (g_lpFiber[WRITE_FIBER] == NULL)
{
printf("CreateFiber error (%d)n", GetLastError());
return RTN_ERROR;
}
fs[WRITE_FIBER].dwParameter = 0x54545454;
//
// Switch to the read fiber
//
SwitchToFiber(g_lpFiber[READ_FIBER]);
//
// We have been scheduled again. Display results from the 
// read/write fibers
//
printf("ReadFiber: result code is %lu, %lu bytes processedn",
fs[READ_FIBER].dwFiberResultCode, fs[READ_FIBER].dwBytesProcessed);
printf("WriteFiber: result code is %lu, %lu bytes processedn",
fs[WRITE_FIBER].dwFiberResultCode, fs[WRITE_FIBER].dwBytesProcessed);
//
// Delete the fibers
//
DeleteFiber(g_lpFiber[READ_FIBER]);
DeleteFiber(g_lpFiber[WRITE_FIBER]);
//
// Close handles
//
CloseHandle(fs[READ_FIBER].hFile);
CloseHandle(fs[WRITE_FIBER].hFile);
//
// Free allocated memory
//
HeapFree(GetProcessHeap(), 0, g_lpBuffer);
HeapFree(GetProcessHeap(), 0, fs);
return RTN_OK;
}
VOID
__stdcall
ReadFiberFunc(
LPVOID lpParameter
)
{
LPFIBERDATASTRUCT fds = (LPFIBERDATASTRUCT)lpParameter;
//
// If this fiber was passed NULL for fiber data, just return,
// causing the current thread to exit
//
if (fds == NULL)
{
printf("Passed NULL fiber data; exiting current thread.n");
return;
}
//
// Display some information pertaining to the current fiber
//
DisplayFiberInfo();
fds->dwBytesProcessed = 0;
while (1)
{
//
// Read data from file specified in the READ_FIBER structure
//
if (!ReadFile(fds->hFile, g_lpBuffer, BUFFER_SIZE, 
&g_dwBytesRead, NULL))
{
break;
}
//
// if we reached EOF, break
//
if (g_dwBytesRead == 0) break;
//
// Update number of bytes processed in the fiber data structure
//
fds->dwBytesProcessed += g_dwBytesRead;
//
// Switch to the write fiber
//
SwitchToFiber(g_lpFiber[WRITE_FIBER]);
} // while
//
// Update the fiber result code
//
fds->dwFiberResultCode = GetLastError();
//
// Switch back to the primary fiber
//
SwitchToFiber(g_lpFiber[PRIMARY_FIBER]);
}
VOID
__stdcall
WriteFiberFunc(
LPVOID lpParameter
)
{
LPFIBERDATASTRUCT fds = (LPFIBERDATASTRUCT)lpParameter;
DWORD dwBytesWritten;
//
// If this fiber was passed NULL for fiber data, just return,
// causing the current thread to exit
//
if (fds == NULL)
{
printf("Passed NULL fiber data; exiting current thread.n");
return;
}
//
// Display some information pertaining to the current fiber
//
DisplayFiberInfo();
//
// Assume all writes succeeded.  If a write fails, the fiber
// result code will be updated to reflect the reason for failure
//
fds->dwBytesProcessed = 0;
fds->dwFiberResultCode = ERROR_SUCCESS;
while (1)
{
//
// Write data to the file specified in the WRITE_FIBER structure
//
if (!WriteFile(fds->hFile, g_lpBuffer, g_dwBytesRead, 
&dwBytesWritten, NULL))
{
//
// If an error occurred writing, break
//
break;
}
//
// Update number of bytes processed in the fiber data structure
//
fds->dwBytesProcessed += dwBytesWritten;
//
// Switch back to the read fiber
//
SwitchToFiber(g_lpFiber[READ_FIBER]);
}  // while
//
// If an error occurred, update the fiber result code...
//
fds->dwFiberResultCode = GetLastError();
//
// ...and switch to the primary fiber
//
SwitchToFiber(g_lpFiber[PRIMARY_FIBER]);
}
void
DisplayFiberInfo(
void
)
{
LPFIBERDATASTRUCT fds = (LPFIBERDATASTRUCT)GetFiberData();
LPVOID lpCurrentFiber = GetCurrentFiber();
//
// Determine which fiber is executing, based on the fiber address
//
if (lpCurrentFiber == g_lpFiber[READ_FIBER])
printf("Read fiber entered");
else
{
if (lpCurrentFiber == g_lpFiber[WRITE_FIBER])
printf("Write fiber entered");
else
{
if (lpCurrentFiber == g_lpFiber[PRIMARY_FIBER])
printf("Primary fiber entered");
else
printf("Unknown fiber entered");
}
}
//
// Display dwParameter from the current fiber data structure
//
printf(" (dwParameter is 0x%lx)n", fds->dwParameter);
}

鉴于您正在使用winapi和UI,因此您已经有了消息处理,我建议您将有问题的操作分解为更多步骤,并使用自定义消息。让有问题的操作中的每一步都发布触发下一步的消息。由于这是windows已经处理(处理消息(的事情,因此与尝试使用协同程序或windows光纤相比,它应该更适合您已经拥有的内容。

这将在一定程度上减慢有问题操作的整体处理速度,但将保持UI的响应性。

然而,我也会认真考虑放弃单线程方法。如果有问题的操作只是接受输入并产生输出,那么将该操作推到一个单独的线程上并在结果出现时处理(再次通过发布的消息(通常是非常合理的解决方案。