java中的OpenAI流响应



OpenAIchat/completionsAPI通过传递stream=true支持流响应。这使得响应以数据块的形式出现,而不是一个完整的响应。这将类似于ChatGPT输出答案的方式。

我看到不同语言或技术堆栈的类似问题(如这里)。但我找不到一个完整的例子,如何实现在java(或kotlin)与OkHttp客户端。

如何以流的形式获得OpenAI的API响应?谢谢!

我修改了代码,现在ChatGPT完全可以通过Java在控制台实现流对话。

import com.theokanning.openai.completion.chat.*;
import com.theokanning.openai.service.OpenAiService;
import io.reactivex.Flowable;
import java.util.*;
public class Starter {
// Create a new OpenAiService instance with the given API key
public static OpenAiService service = new OpenAiService("Your API Token");
public static void main(String[] args) {
System.out.println("Streaming chat completion...");
Scanner scanner = new Scanner(System.in);
String userInput = scanner.nextLine();
// Create a list of ChatMessage objects
List<ChatMessage> message = new ArrayList<ChatMessage>();
message.add(new ChatMessage(ChatMessageRole.USER.value(), userInput));
// Create a ChatCompletionRequest object
ChatCompletionRequest chatCompletionRequest;
boolean running = true;
// Run the loop until the user enters "exit"
while (running) {
chatCompletionRequest = ChatCompletionRequest
.builder()
.model("gpt-3.5-turbo")
.messages(message)
.n(1)
.maxTokens(500)
.logitBias(Collections.emptyMap())
.build();
// Create a Flowable object to stream the chat completion
Flowable<ChatCompletionChunk> flowableResult = service.streamChatCompletion(chatCompletionRequest);
// Create a StringBuilder object to store the result
StringBuilder buffer = new StringBuilder();
// Subscribe to the Flowable object and print the result
flowableResult.subscribe(chunk -> {
chunk.getChoices().forEach(choice -> {
String result = choice.getMessage().getContent();
if (result != null) {
buffer.append(result);
System.out.print(choice.getMessage().getContent());
}
});
}, Throwable::printStackTrace, () -> System.out.println());
// Get the user input
userInput = scanner.nextLine();
// Add the user input to the list of ChatMessage objects
message.add(new ChatMessage(ChatMessageRole.SYSTEM.value(), buffer.toString()));
message.add(new ChatMessage(ChatMessageRole.USER.value(), userInput));
// Exit the loop if the user enters "exit"
if (userInput.equals("exit")) {
running = false;
}
}
scanner.close();
service.shutdownExecutor();
}
}

这是使用java接收来自ChatGPT的流响应的okhttp:

public static void okHttpEvent(SseEmitter emitter, String prompt, String openAiKey) {
Request request = new Request.Builder()
.url("https://{open-api-address}/v1/chat/stream/completions?q=" + prompt + "&eid=" + RequestContext.getRequestId())
.build();
OkHttpClient okHttpClient = new OkHttpClient.Builder().addInterceptor(new Interceptor() {
@NonNull
@Override
public Response intercept(@NonNull Interceptor.Chain chain) throws IOException {
Request original = chain.request();
Request request = original.newBuilder()
.header("Authorization", "Bearer " + openAiKey)
.header("Accept", MediaType.TEXT_EVENT_STREAM.toString())
.method(original.method(), original.body())
.build();
return chain.proceed(request);
}
})
.connectTimeout(3, TimeUnit.MINUTES)
.readTimeout(3, TimeUnit.MINUTES)
.build();
RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() {
@Override
public void onOpen(EventSource eventSource, Response response) {
}
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {
try {
HttpUrl url = eventSource.request().url();
String reqId = url.queryParameter("eid");
SseChatService.getInstance().appendMsg(data, reqId);
emitter.send(data);
} catch (IOException e) {
emitter.completeWithError(e);
log.error("send sse to client error", e);
}
}
@Override
public void onClosed(EventSource eventSource) {
emitter.complete();
}
@Override
public void onFailure(EventSource eventSource, Throwable t, Response response) {
emitter.completeWithError(t);
log.error("event source failure", t);
}
});
realEventSource.connect(okHttpClient);
}

我有一个使用Flux模块的工作示例,它类似于OkHttp,所以希望它可以帮助。

示例使用Open AI聊天流并将其发送回客户端。确保将openAIAPIKey变量设置为您的键。

import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.core.io.buffer.DataBuffer;
import reactor.core.publisher.Flux;
import org.springframework.http.*;
// ... class code
public Flux<Object> chatStream(OpenAIChatBody requestBody) {
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setBearerAuth(openAIAPIKey);
WebClient client = WebClient.create("https://api.openai.com/v1");
// Send the request to openAI
return client.post()
.uri("/chat/completions")
.headers(httpHeaders -> httpHeaders.addAll(headers))
.bodyValue(requestBody)
.retrieve()
.bodyToFlux(DataBuffer.class)
.map(dataBuffer -> dataBuffer.toString(StandardCharsets.UTF_8))
.concatMap(chunk -> {
String[] lines = chunk.split("n");
return Flux.fromArray(lines)
.filter(line -> !line.trim().isEmpty())
.map(line -> line.replace("data:", "")
.replace("[DONE]", "")
.replace("data: [DONE]", "")
.trim());
})
.filter(data -> !data.isEmpty())
.concatMap(data -> {
try {
Map<String, Object> resultObject = new ObjectMapper().readValue(data, Map.class);
// return data - you can parse the resultObject to whatever it is that you need
return Flux.just(resultObject);
} catch (JsonProcessingException e) {
return null;
}
});
} catch (Exception e) {
return null;
}
}

您需要在pom.xml文件中包含以下依赖项:

<dependencies>
<!-- ... other dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- This is for flux to stop displaying a potential DNS warning for MACs -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver-dns-native-macos</artifactId>
<version>4.1.73.Final</version>
<classifier>osx-aarch_64</classifier>
</dependency>
</dependencies>

所有这些代码都可以在下面的示例项目的工作文件中找到。

最新更新