我试图在控制器中流式传输特定方法的响应,但它没有给我任何数据。有没有更好的方法来实现这一点?
方法是streamService.getLocationData((;
预期输出-
{uncertaintyRadiusInMeters=[64], latitude=[12.345678], longitude=[-12.345678], dateTimeCaptured=[1623431888949]}
SpringBoot控制器-
@RequestMapping(value = "/request-location", method= {RequestMethod.GET, RequestMethod.POST}, produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<StreamingResponseBody> requestLocation(@RequestParam MultiValueMap<String,String> form) {
streamService.addDataToStream(form);
streamService.buildShardIterator();
StreamingResponseBody responseBody = outputStream -> streamService.getLocationData();
return ResponseEntity.ok()
.contentType(MediaType.valueOf(MediaType.APPLICATION_JSON_VALUE))
.body(responseBody);
//return streamService.getLocationData((;}
服务-
public ResponseEntity getLocationData() {
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey);
AmazonKinesis kinesisClient = AmazonKinesisClient.builder()
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.withRegion(awsRegion)
.build();
GetRecordsRequest recordsRequest = new GetRecordsRequest();
recordsRequest.setShardIterator(shardIterator.getShardIterator());
recordsRequest.setLimit(1000);
GetRecordsResult recordsResult = kinesisClient.getRecords(recordsRequest);
while (!recordsResult.getRecords().isEmpty()) {
recordsResult.getRecords().stream()
.map(record -> new String(record.getData().array()))
.forEach(System.out::println);
recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
kinesisClient.getRecords(recordsRequest);
try {
Thread.sleep(1000);
}
catch (InterruptedException exception) {
throw new RuntimeException(exception);
}
}
return ResponseEntity.ok(recordsResult);
}
您正在为该服务使用旧的V1 API。亚马逊非常建议转移到V2:
Java2.x的AWS SDK是对1.x版本代码库的主要重写。它构建在Java8+之上,并添加了一些经常需要的功能。其中包括对非阻塞I/O的支持,以及在运行时插入不同HTTP实现的能力
Kinesis V2的例子是有效的。要将记录放入数据流,请使用以下Java代码:
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
//snippet-end:[kinesis.java2.putrecord.import]
/**
* To run this Java V2 code example, ensure that you have setup your development environment, including your credentials.
*
* For information, see this documentation topic:
*
* https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
*/
public class StockTradesWriter {
public static void main(String[] args) {
final String USAGE = "n" +
"Usage:n" +
" StockTradesWriter <streamName>nn" +
"Where:n" +
" streamName - The Amazon Kinesis data stream to which records are written (for example, StockTradeStream)nn" +
"Example:n" +
" StockTradesWriter streamNamen";
if (args.length != 1) {
System.out.println(USAGE);
System.exit(1);
}
String streamName = args[0];
Region region = Region.US_EAST_1;
KinesisClient kinesisClient = KinesisClient.builder()
.region(region)
.build();
// Ensure that the Kinesis Stream is valid
validateStream(kinesisClient, streamName);
setStockData( kinesisClient, streamName);
kinesisClient.close();
}
// snippet-start:[kinesis.java2.putrecord.main]
public static void setStockData( KinesisClient kinesisClient, String streamName) {
try {
// Repeatedly send stock trades with a 100 milliseconds wait in between
StockTradeGenerator stockTradeGenerator = new StockTradeGenerator();
// Put in 50 Records for this example
int index = 50;
for (int x=0; x<index; x++){
StockTrade trade = stockTradeGenerator.getRandomTrade();
sendStockTrade(trade, kinesisClient, streamName);
Thread.sleep(100);
}
} catch (KinesisException | InterruptedException e) {
System.err.println(e.getMessage());
System.exit(1);
}
System.out.println("Done");
}
private static void sendStockTrade(StockTrade trade, KinesisClient kinesisClient,
String streamName) {
byte[] bytes = trade.toJsonAsBytes();
// The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library.
if (bytes == null) {
System.out.println("Could not get JSON bytes for stock trade");
return;
}
System.out.println("Putting trade: " + trade.toString());
PutRecordRequest request = PutRecordRequest.builder()
.partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below.
.streamName(streamName)
.data(SdkBytes.fromByteArray(bytes))
.build();
try {
kinesisClient.putRecord(request);
} catch (KinesisException e) {
e.getMessage();
}
}
private static void validateStream(KinesisClient kinesisClient, String streamName) {
try {
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
.streamName(streamName)
.build();
DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest);
if(!describeStreamResponse.streamDescription().streamStatus().toString().equals("ACTIVE")) {
System.err.println("Stream " + streamName + " is not active. Please wait a few moments and try again.");
System.exit(1);
}
}catch (KinesisException e) {
System.err.println("Error found while describing the stream " + streamName);
System.err.println(e);
System.exit(1);
}
// snippet-end:[kinesis.java2.putrecord.main]
}
}
要从数据流中检索数据,请使用以下代码:
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import java.util.ArrayList;
import java.util.List;
//snippet-end:[kinesis.java2.getrecord.import]
/**
* Demonstrates how to read data from a Amazon Kinesis Data Stream. Before running this Java code example, populate a Data Stream
* by running the StockTradesWriter example. That example populates a Data Stream that you can then use for this example.
* Also, ensure that you have setup your development environment, including your credentials.
*
* For information, see this documentation topic:
*
* https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html
*/
public class GetRecords {
public static void main(String[] args) {
final String USAGE = "n" +
"Usage:n" +
" GetRecords <streamName>nn" +
"Where:n" +
" streamName - The Amazon Kinesis data stream to read from (for example, StockTradeStream).nn" +
"Example:n" +
" GetRecords streamNamen";
if (args.length != 1) {
System.out.println(USAGE);
System.exit(1);
}
String streamName = args[0];
Region region = Region.US_EAST_1;
KinesisClient kinesisClient = KinesisClient.builder()
.region(region)
.build();
getStockTrades(kinesisClient,streamName);
kinesisClient.close();
}
// snippet-start:[kinesis.java2.getrecord.main]
public static void getStockTrades(KinesisClient kinesisClient, String streamName) {
String shardIterator;
String lastShardId = null;
// Retrieve the Shards from a Stream
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
.streamName(streamName)
.build();
List<Shard> shards = new ArrayList<>();
DescribeStreamResponse streamRes;
do {
streamRes = kinesisClient.describeStream(describeStreamRequest);
shards.addAll(streamRes.streamDescription().shards());
if (shards.size() > 0) {
lastShardId = shards.get(shards.size() - 1).shardId();
}
} while (streamRes.streamDescription().hasMoreShards());
GetShardIteratorRequest itReq = GetShardIteratorRequest.builder()
.streamName(streamName)
.shardIteratorType("TRIM_HORIZON")
.shardId(shards.get(0).shardId())
.build();
GetShardIteratorResponse shardIteratorResult = kinesisClient.getShardIterator(itReq);
shardIterator = shardIteratorResult.shardIterator();
// Continuously read data records from shard.
List<Record> records;
// Create new GetRecordsRequest with existing shardIterator.
// Set maximum records to return to 1000.
GetRecordsRequest recordsRequest = GetRecordsRequest.builder()
.shardIterator(shardIterator)
.limit(1000)
.build();
GetRecordsResponse result = kinesisClient.getRecords(recordsRequest);
// Put result into record list. Result may be empty.
records = result.records();
// Print records
for (Record record : records) {
SdkBytes byteBuffer = record.data();
System.out.println(String.format("Seq No: %s - %s", record.sequenceNumber(),
new String(byteBuffer.asByteArray())));
}
}
// snippet-end:[kinesis.java2.getrecord.main]
}
你可以在这里找到Kinesis的所有V2示例:
https://github.com/awsdocs/aws-doc-sdk-examples/tree/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis
如果你从未使用过V2 API,那么我建议从这里开始:
开始使用AWS SDK for Java 2.x