所以我有3个线程,
从Internet下载音频和音频计划对象的线程。根据音频时间表播放音频的线程。以及一个聆听消息的Web插座"通知"听众,这些消息说我们必须下载新音频并安排我们当前的音频。
程序流量如下:在应用程序启动时:SensepuleDownLoader启动,下载音频和计划文件。完成时,需要告诉音频播放器"嘿,文件已经准备就绪,这里是时间表",现在不需要做任何事情
音频播放器启动并连续循环,没有出口条件。Web套接字侦听器开始收到消息时。应该告诉时间表下载器"您需要重新启动,因为您需要下载新文件",它不需要将任何数据发送到下载的时间表,只需启动它再次上升。音乐应该保持播放。一旦完成,现在应该使用新的时间表重新启动音频播放器线程。
这是我到目前为止所拥有的,我不确定如何让Scheduledownloader告诉AudioPlayer"文件准备就绪,您需要启动,这是时间表"或"您需要重新启动新的时间表,它是"或如何让听众说" ScheduleDownLoader您需要重新开始"
public class ScheduleDownloader extends Thread {
private Thread t;
private String threadName;
String username;
String password;
public ScheduleDownloader(String username,String password,String threadName){
this.username = username;
this.password = password;
this.threadName= threadName;
}
public void start () {
System.out.println("Starting " + threadName );
if (t == null) {
t = new Thread (this, threadName);
t.start ();
}}
public void run() {
try {
Schedule schedule= null;
while(schedule == null){
System.out.println("Searching for schedule");
schedule= getTodaysSchedule();
}
System.out.println("Schedule Found");
boolean result = false;
while(result == false){
result = downloadFiles(schedule);
}
System.out.println("Files Downloaded");
} catch (IOException e) {
e.printStackTrace();
}
}
public Schedule getTodaysSchedule() throws IOException {
Schedule schedule = null;
CredentialsProvider provider = new BasicCredentialsProvider();
UsernamePasswordCredentials credentials
= new UsernamePasswordCredentials(username,password);
provider.setCredentials(AuthScope.ANY, credentials);
String url = "http://localhost:5000/api/schedule/today";
HttpClient httpClient = HttpClientBuilder.create().setDefaultCredentialsProvider(provider).build(); //Use this instead
HttpGet request = new HttpGet(url);
HttpResponse response = httpClient.execute(request);
//read content response body
if (response.getStatusLine().getStatusCode() != 200) {
System.out.println("sorry error:" + response.getStatusLine().getStatusCode());
} else {
BufferedReader rd = new BufferedReader(
new InputStreamReader(response.getEntity().getContent()));
StringBuffer result = new StringBuffer();
String line = "";
while ((line = rd.readLine()) != null) {
result.append(line);
}
//change json response to java objects
Gson gson = new Gson();
schedule = gson.fromJson(String.valueOf(result),Schedule.class);
}
return schedule;
}
public static boolean downloadFiles(Schedule schedule) {
//get the music
for(int i =0;i<schedule.getMusicScheduleItems().size();i++){
downloadOneFile("shoutloudaudio","music/" +
schedule.getMusicScheduleItems().get(i).getMusic().getId()+
"-music.wav");
}
//get the advertisements
for(int i =0;i<schedule.getAdvertisementScheduleItems().size();i++){
downloadOneFile("shoutloudaudio","advertisements/" +
schedule.getAdvertisementScheduleItems().get(i).getAdvertisement().getId()+
"-advertisement.wav");
}
return true;
}
public static boolean downloadOneFile(String bucketName,String key) {
if( new File(key.split("/")[1]).isFile()){
//check if we have it already and dont need to download it
System.out.println(key + " alraeady exits");
return true;
}
AWSCredentials awsCredentials = new BasicAWSCredentials(
"removed",
"removed"
);
AmazonS3 s3client = AmazonS3ClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.withRegion(Regions.EU_WEST_1)
.build();
S3Object s3object = s3client.getObject(bucketName, key);
S3ObjectInputStream inputStream = s3object.getObjectContent();
InputStream reader = new BufferedInputStream(
inputStream);
File file = new File(key.split("/")[1]);//save the file as whats after the / in key
OutputStream writer = null;
try {
writer = new BufferedOutputStream(new FileOutputStream(file));
} catch (FileNotFoundException e) {
e.printStackTrace();
return false;
}
int read = -1;
try {
while ((read = reader.read()) != -1) {
writer.write(read);
}
writer.flush();
writer.close();
}catch(IOException e){
e.printStackTrace();
return false;
}
return true;
}
}
audioplayer
public class AudioPlayer extends Thread {
Long currentFrameMusic;
Long currentFrameAdvertisement;
Clip clipMusic;
Clip clipAdvertisement;
private Thread t;
// current status of clip
String statusMusic;
String statusAdvertisement;
static AudioInputStream musicInputStream;
static AudioInputStream advertisementInputStream;
static String filePath;
Schedule schedule;
// constructor to initialize streams and clip
public AudioPlayer(Schedule schedule)
throws UnsupportedAudioFileException,
IOException, LineUnavailableException
{
//setup audio stream for music first
// create AudioInputStream object
this.schedule = schedule;
appendMusicFiles(schedule);
// create clip reference
clipMusic = AudioSystem.getClip();
// open audioInputStream to the clip
clipMusic.open(musicInputStream);
clipMusic.loop(Clip.LOOP_CONTINUOUSLY);
}
public void run(){
playMusic();
try {
checkShouldWePlayAnAdvertisement();
} catch (IOException e) {
e.printStackTrace();
} catch (UnsupportedAudioFileException e) {
e.printStackTrace();
} catch (LineUnavailableException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void start(){
t = new Thread (this, "AudioPlayerThread");
t.start ();
}
public void start2() throws IOException, UnsupportedAudioFileException, LineUnavailableException, InterruptedException {
playMusic();
checkShouldWePlayAnAdvertisement();
}
public void playMusic()
{
//start the clip
clipMusic.start();
statusMusic = "play";
}
// Method to pause the audio
public void pauseMusic()
{
if (statusMusic.equals("paused"))
{
System.out.println("audio is already paused");
return;
}
this.currentFrameMusic =
this.clipMusic.getMicrosecondPosition();
clipMusic.stop();
statusMusic = "paused";
System.out.println("pausing music");
}
// Method to resume the audio
public void resumeAudioMusic() throws UnsupportedAudioFileException,
IOException, LineUnavailableException
{
if (statusMusic.equals("play"))
{
System.out.println("Audio is already "+
"being played");
return;
}
clipMusic.close();
resetAudioStreamMusic();
clipMusic.setMicrosecondPosition(currentFrameMusic);
System.out.println("resuming music");
this.playMusic();
}
// Method to restart the audio
public void restartMusic() throws IOException, LineUnavailableException,
UnsupportedAudioFileException
{
clipMusic.stop();
clipMusic.close();
resetAudioStreamMusic();
currentFrameMusic = 0L;
clipMusic.setMicrosecondPosition(0);
this.playMusic();
}
// Method to stop the audio
public void stopMusic() throws UnsupportedAudioFileException,
IOException, LineUnavailableException
{
currentFrameMusic = 0L;
clipMusic.stop();
clipMusic.close();
}
public void resetAudioStreamMusic() throws UnsupportedAudioFileException, IOException,
LineUnavailableException
{
clipMusic = AudioSystem.getClip();
appendMusicFiles(schedule);
// open audioInputStream to the clip
clipMusic.open(musicInputStream);
clipMusic.loop(Clip.LOOP_CONTINUOUSLY);
}
public static void appendMusicFiles(Schedule schedule) throws IOException, UnsupportedAudioFileException {
//add the first audio file to stream
AudioInputStream appendedFiles = AudioSystem.getAudioInputStream(
new File(schedule.getMusicScheduleItems().get(0).getMusic()
.getId() + "-music.wav"));
//loop through an combine
for(int i =1;i<schedule.getMusicScheduleItems().size();i++){
File file= new File(schedule.getMusicScheduleItems().get(i).getMusic()
.getId() + "-music.wav");
AudioInputStream toBeAppended = AudioSystem.getAudioInputStream(file);
//append them
appendedFiles =
new AudioInputStream(
new SequenceInputStream(appendedFiles, toBeAppended),
appendedFiles.getFormat(),
appendedFiles.getFrameLength() + toBeAppended.getFrameLength());
}
musicInputStream = appendedFiles;
}
//advertisement methods
public void playAdvertisements() throws LineUnavailableException, IOException, InterruptedException {
clipAdvertisement = AudioSystem.getClip();
// open audioInputStream to the clip
clipAdvertisement.open(advertisementInputStream);
System.out.println(clipAdvertisement.getMicrosecondLength());
//start the clip
clipAdvertisement.start();
Thread.sleep(clipAdvertisement.getMicrosecondLength() / 1000);
statusAdvertisement = "play";
System.out.println("playing advertisements");
}
// Method to pause the audio
public void pauseAdvertisements()
{
if (statusAdvertisement.equals("paused"))
{
System.out.println("audio is already paused");
return;
}
this.currentFrameAdvertisement =
this.clipAdvertisement.getMicrosecondPosition();
clipAdvertisement.stop();
statusAdvertisement = "paused";
}
// Method to resume the audio
public void resumeAudioAdvertisement() throws UnsupportedAudioFileException,
IOException, LineUnavailableException, InterruptedException {
if (statusAdvertisement.equals("play"))
{
System.out.println("Audio is already "+
"being played");
return;
}
clipAdvertisement.close();
resetAudioStreamAdvertisement();
clipAdvertisement.setMicrosecondPosition(currentFrameMusic);
this.playAdvertisements();
}
// Method to restart the audio
public void restartAdvertisement() throws IOException, LineUnavailableException,
UnsupportedAudioFileException, InterruptedException {
clipAdvertisement.stop();
clipAdvertisement.close();
resetAudioStreamAdvertisement();
currentFrameAdvertisement = 0L;
clipAdvertisement.setMicrosecondPosition(0);
this.playAdvertisements();
}
// Method to stop the audio
public void stopAdvertisement() throws UnsupportedAudioFileException,
IOException, LineUnavailableException, InterruptedException {
currentFrameAdvertisement = 0L;
clipAdvertisement.stop();
clipAdvertisement.close();
System.out.println("stopping advertisement");
}
public void resetAudioStreamAdvertisement() throws UnsupportedAudioFileException, IOException,
LineUnavailableException
{
advertisementInputStream = AudioSystem.getAudioInputStream(
new File(filePath).getAbsoluteFile());
clipAdvertisement.open(musicInputStream);
clipAdvertisement.loop(Clip.LOOP_CONTINUOUSLY);
}
public static void appendAdvertisementFiles(List<Advertisement> advertisementItems) throws IOException, UnsupportedAudioFileException {
//add the first audio file to stream
AudioInputStream appendedFiles = AudioSystem.getAudioInputStream(
new File(advertisementItems.get(0)
.getId() + "-advertisement.wav"));
//loop through an combine
for(int i =1;i<advertisementItems.size();i++){
File file= new File(advertisementItems.get(i)
.getId() + "-advertisement.wav");
AudioInputStream toBeAppended = AudioSystem.getAudioInputStream(file);
//append them
appendedFiles =
new AudioInputStream(
new SequenceInputStream(appendedFiles, toBeAppended),
appendedFiles.getFormat(),
appendedFiles.getFrameLength() + toBeAppended.getFrameLength());
}
advertisementInputStream = appendedFiles;
}
public void checkShouldWePlayAnAdvertisement() throws IOException, UnsupportedAudioFileException, LineUnavailableException, InterruptedException {
ArrayList<String> playedAtTimes = new ArrayList<>();
ArrayList<Advertisement> advertisementsToBePlayed = new ArrayList<>();
boolean found;
//played at times is used to keep track of what time we played advertisements
//so when the loop reruns and the time hasnt changed it doesnt play it again
while(true){
found = false;
ZonedDateTime zdt = ZonedDateTime.now();
String timeHHMM =zdt.toString().substring(11,16);
for(int i =0;i<schedule.getAdvertisementScheduleItems().size();i++)
{
if(schedule.getAdvertisementScheduleItems().get(i).getTimes()
.contains(timeHHMM))
{
//this item should be played now
if(playedAtTimes.contains(timeHHMM)){
//we already played this,but the time hasnt changed when the loop ran again
}else{
advertisementsToBePlayed.add(schedule.getAdvertisementScheduleItems().get(i).getAdvertisement());
found = true;
}
}
}
if(found== true){
playedAtTimes.add(timeHHMM);
appendAdvertisementFiles(advertisementsToBePlayed);
pauseMusic();
playAdvertisements();
stopAdvertisement();
resumeAudioMusic();
}
}
}
}
iotclient(听众的一部分(
public class IotClient extends Thread {
Thread t;
String username;
public IotClient(String username) {
this.username = username;
}
public void run(){
String clientEndpoint = "removve"; // replace <prefix> and <region> with your own
String clientId = "1"; // replace with your own client ID. Use unique client IDs for concurrent connections.
// AWS IAM credentials could be retrieved from AWS Cognito, STS, or other secure sources
AWSIotMqttClient client = new AWSIotMqttClient(clientEndpoint, clientId, "remove", "remove");
// optional parameters can be set before connect()
try {
client.connect();
} catch (AWSIotException e) {
e.printStackTrace();
}
AWSIotQos qos = AWSIotQos.QOS0;
AWSIotTopic topic = new MyTopic("schedule/"+ username, qos);
try {
client.subscribe(topic, true);
} catch (AWSIotException e) {
e.printStackTrace();
}
while(true){
}
}
public void start(){
if (t == null) {
t = new Thread (this, "IotClientThread");
t.start ();
}
}
mytopic(听众的一部分(
public class MyTopic extends AWSIotTopic {
public MyTopic(String topic, AWSIotQos qos) {
super(topic, qos);
}
@Override
public void onMessage(AWSIotMessage message) {
System.out.println("Message recieved from topic: "+ message.getStringPayload());
}
}
线程通过内存中的消息的"容器"对象通过共享引用进行通信。这可能很简单,就像某些类的共享实例的可变字段一样简单,或更典型的集合,例如列表,地图,尤其是队列。
ArrayBlockingQueue是一个很好的共享参考。从一个线程到另一个线程,每个消息方向都会有一个队列。如果您有3个可以真正交谈的线程,那么您将有3对,因此有6个队列(每对2个(。但是,通常只有一个方向流动,因此您可以节省一些。
现在,这些通信的核心是等待一些消息的机制(读者/消费者(,并在推动消息时通知(作者/制作人(。
当然,您可以从底部开始,从原始的等待/通知中学习(那里的教程的彭蒂(,或者您可以跳入诸如ArrayBlockingquequequequequequequequequequequequequequequequequequequequequequequequequequequequequequequequequequeation of take((/放((。我建议从底部开始,因为当您在Java.util.concurrent遇到其他课程时,事情会更快。*
我不能给您代码,如果没有学会ITC的基本(线程间通信(,这在您的级别上是无法理解的。
良好的学习!
ps:途中有许多陷阱,例如线程安全性,写作的原子性,锁定算法,僵局,生病,饥饿。只需上面的多队列示例即可导致消息到达时的循环依赖关系,尤其是当队列填满和阻塞时。这是一门科学!