Java线程:3个Java线程之间的混合



所以我有3个线程,

从Internet下载音频和音频计划对象的线程。根据音频时间表播放音频的线程。以及一个聆听消息的Web插座"通知"听众,这些消息说我们必须下载新音频并安排我们当前的音频。

程序流量如下:在应用程序启动时:SensepuleDownLoader启动,下载音频和计划文件。完成时,需要告诉音频播放器"嘿,文件已经准备就绪,这里是时间表",现在不需要做任何事情

音频播放器启动并连续循环,没有出口条件。Web套接字侦听器开始收到消息时。应该告诉时间表下载器"您需要重新启动,因为您需要下载新文件",它不需要将任何数据发送到下载的时间表,只需启动它再次上升。音乐应该保持播放。一旦完成,现在应该使用新的时间表重新启动音频播放器线程。

这是我到目前为止所拥有的,我不确定如何让Scheduledownloader告诉AudioPlayer"文件准备就绪,您需要启动,这是时间表"或"您需要重新启动新的时间表,它是"或如何让听众说" ScheduleDownLoader您需要重新开始"

public class ScheduleDownloader extends Thread {
private Thread t;
private String threadName;
   String username;
 String password;
    public ScheduleDownloader(String username,String password,String threadName){
        this.username = username;
        this.password = password;
        this.threadName= threadName;
    }
public void start () {
    System.out.println("Starting " +  threadName );
    if (t == null) {
        t = new Thread (this, threadName);
        t.start ();
    }}
public void run() {
    try {
        Schedule schedule= null;
        while(schedule == null){
            System.out.println("Searching for schedule");
           schedule= getTodaysSchedule();
        }
        System.out.println("Schedule Found");
        boolean result = false;
        while(result == false){
            result = downloadFiles(schedule);
        }
        System.out.println("Files Downloaded");
    } catch (IOException e) {
        e.printStackTrace();
    }
}
public Schedule getTodaysSchedule() throws IOException {
        Schedule schedule = null;
        CredentialsProvider provider = new BasicCredentialsProvider();
        UsernamePasswordCredentials credentials
                = new UsernamePasswordCredentials(username,password);
        provider.setCredentials(AuthScope.ANY, credentials);

        String url = "http://localhost:5000/api/schedule/today";
        HttpClient httpClient = HttpClientBuilder.create().setDefaultCredentialsProvider(provider).build(); //Use this instead
        HttpGet request = new HttpGet(url);
        HttpResponse response = httpClient.execute(request);
  //read content response body
        if (response.getStatusLine().getStatusCode() != 200) {
            System.out.println("sorry error:" + response.getStatusLine().getStatusCode());
        } else {
            BufferedReader rd = new BufferedReader(
                    new InputStreamReader(response.getEntity().getContent()));
            StringBuffer result = new StringBuffer();
            String line = "";
            while ((line = rd.readLine()) != null) {
                result.append(line);
            }
            //change json response to java objects

            Gson gson = new Gson();
             schedule = gson.fromJson(String.valueOf(result),Schedule.class);
}
        return schedule;
}
public static boolean downloadFiles(Schedule schedule) {
 //get the music
    for(int i =0;i<schedule.getMusicScheduleItems().size();i++){
    downloadOneFile("shoutloudaudio","music/" +
            schedule.getMusicScheduleItems().get(i).getMusic().getId()+
            "-music.wav");
    }
    //get the advertisements
    for(int i =0;i<schedule.getAdvertisementScheduleItems().size();i++){
        downloadOneFile("shoutloudaudio","advertisements/" +
                schedule.getAdvertisementScheduleItems().get(i).getAdvertisement().getId()+
                "-advertisement.wav");
    }
    return true;

}
public static boolean downloadOneFile(String bucketName,String key) {
    if( new File(key.split("/")[1]).isFile()){
        //check if we have it already and dont need to download it
        System.out.println(key + " alraeady exits");
        return true;
    }
    AWSCredentials awsCredentials = new BasicAWSCredentials(
            "removed",
            "removed"
    );
    AmazonS3 s3client = AmazonS3ClientBuilder
            .standard()
            .withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
            .withRegion(Regions.EU_WEST_1)
            .build();
    S3Object s3object = s3client.getObject(bucketName, key);
    S3ObjectInputStream inputStream = s3object.getObjectContent();
    InputStream reader = new BufferedInputStream(
            inputStream);
    File file = new File(key.split("/")[1]);//save the file as whats after the / in key
    OutputStream writer = null;
    try {
        writer = new BufferedOutputStream(new FileOutputStream(file));
    } catch (FileNotFoundException e) {
        e.printStackTrace();
        return false;
    }
    int read = -1;
try {
while ((read = reader.read()) != -1) {
    writer.write(read);
}
writer.flush();
writer.close();
}catch(IOException e){
e.printStackTrace();
return false;
}
    return true;
}
}

audioplayer

public class AudioPlayer extends Thread {
Long currentFrameMusic;
Long currentFrameAdvertisement;
Clip clipMusic;
Clip clipAdvertisement;
private Thread t;
 // current status of clip
 String statusMusic;
 String statusAdvertisement;
static AudioInputStream musicInputStream;
static AudioInputStream advertisementInputStream;
static String filePath;
Schedule schedule;
 // constructor to initialize streams and clip
public AudioPlayer(Schedule schedule)
        throws UnsupportedAudioFileException,
        IOException, LineUnavailableException
{
    //setup audio stream for music first
    // create AudioInputStream object
this.schedule = schedule;
    appendMusicFiles(schedule);
    // create clip reference
    clipMusic = AudioSystem.getClip();
    // open audioInputStream to the clip
    clipMusic.open(musicInputStream);
    clipMusic.loop(Clip.LOOP_CONTINUOUSLY);
}
public void run(){
    playMusic();
    try {
        checkShouldWePlayAnAdvertisement();
    } catch (IOException e) {
        e.printStackTrace();
    } catch (UnsupportedAudioFileException e) {
        e.printStackTrace();
    } catch (LineUnavailableException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
public void start(){
    t = new Thread (this, "AudioPlayerThread");
    t.start ();
}
public void start2() throws IOException, UnsupportedAudioFileException, LineUnavailableException, InterruptedException {
playMusic();
checkShouldWePlayAnAdvertisement();
}
public void playMusic()
{
    //start the clip
    clipMusic.start();
    statusMusic = "play";
}
// Method to pause the audio
public void pauseMusic()
{
    if (statusMusic.equals("paused"))
    {
        System.out.println("audio is already paused");
        return;
    }
    this.currentFrameMusic =
            this.clipMusic.getMicrosecondPosition();
    clipMusic.stop();
    statusMusic = "paused";
    System.out.println("pausing music");
}
// Method to resume the audio
public void resumeAudioMusic() throws UnsupportedAudioFileException,
        IOException, LineUnavailableException
{
    if (statusMusic.equals("play"))
    {
        System.out.println("Audio is already "+
                "being played");
        return;
    }
    clipMusic.close();
    resetAudioStreamMusic();
    clipMusic.setMicrosecondPosition(currentFrameMusic);
    System.out.println("resuming music");
    this.playMusic();
}
// Method to restart the audio
public void restartMusic() throws IOException, LineUnavailableException,
        UnsupportedAudioFileException
{
    clipMusic.stop();
    clipMusic.close();
    resetAudioStreamMusic();
    currentFrameMusic = 0L;
    clipMusic.setMicrosecondPosition(0);
    this.playMusic();
}
// Method to stop the audio
public void stopMusic() throws UnsupportedAudioFileException,
        IOException, LineUnavailableException
{
    currentFrameMusic = 0L;
    clipMusic.stop();
    clipMusic.close();
}
public void resetAudioStreamMusic() throws UnsupportedAudioFileException, IOException,
        LineUnavailableException
{
   clipMusic =  AudioSystem.getClip();
   appendMusicFiles(schedule);
    // open audioInputStream to the clip
    clipMusic.open(musicInputStream);
    clipMusic.loop(Clip.LOOP_CONTINUOUSLY);
}
public static void appendMusicFiles(Schedule schedule) throws IOException, UnsupportedAudioFileException {
    //add the first audio file to stream
    AudioInputStream appendedFiles = AudioSystem.getAudioInputStream(
            new File(schedule.getMusicScheduleItems().get(0).getMusic()
                    .getId() + "-music.wav"));
    //loop through an combine
    for(int i =1;i<schedule.getMusicScheduleItems().size();i++){
        File file=  new File(schedule.getMusicScheduleItems().get(i).getMusic()
                .getId() + "-music.wav");
        AudioInputStream toBeAppended = AudioSystem.getAudioInputStream(file);
        //append them
        appendedFiles =
                new AudioInputStream(
                        new SequenceInputStream(appendedFiles, toBeAppended),
                        appendedFiles.getFormat(),
                        appendedFiles.getFrameLength() + toBeAppended.getFrameLength());
    }
    musicInputStream = appendedFiles;
}
//advertisement methods
public void playAdvertisements() throws LineUnavailableException, IOException, InterruptedException {
    clipAdvertisement = AudioSystem.getClip();
    // open audioInputStream to the clip
    clipAdvertisement.open(advertisementInputStream);
    System.out.println(clipAdvertisement.getMicrosecondLength());
    //start the clip
    clipAdvertisement.start();
    Thread.sleep(clipAdvertisement.getMicrosecondLength() / 1000);
    statusAdvertisement = "play";
    System.out.println("playing advertisements");
}
// Method to pause the audio
public void pauseAdvertisements()
{
    if (statusAdvertisement.equals("paused"))
    {
        System.out.println("audio is already paused");
        return;
    }
    this.currentFrameAdvertisement =
            this.clipAdvertisement.getMicrosecondPosition();
    clipAdvertisement.stop();
    statusAdvertisement = "paused";
}
// Method to resume the audio
public void resumeAudioAdvertisement() throws UnsupportedAudioFileException,
        IOException, LineUnavailableException, InterruptedException {
    if (statusAdvertisement.equals("play"))
    {
        System.out.println("Audio is already "+
                "being played");
        return;
    }
    clipAdvertisement.close();
    resetAudioStreamAdvertisement();
    clipAdvertisement.setMicrosecondPosition(currentFrameMusic);
    this.playAdvertisements();
}
// Method to restart the audio
public void restartAdvertisement() throws IOException, LineUnavailableException,
        UnsupportedAudioFileException, InterruptedException {
    clipAdvertisement.stop();
    clipAdvertisement.close();
    resetAudioStreamAdvertisement();
    currentFrameAdvertisement = 0L;
    clipAdvertisement.setMicrosecondPosition(0);
    this.playAdvertisements();
}
// Method to stop the audio
public void stopAdvertisement() throws UnsupportedAudioFileException,
        IOException, LineUnavailableException, InterruptedException {
    currentFrameAdvertisement = 0L;
    clipAdvertisement.stop();
    clipAdvertisement.close();
    System.out.println("stopping advertisement");
}
public void resetAudioStreamAdvertisement() throws UnsupportedAudioFileException, IOException,
        LineUnavailableException
{
    advertisementInputStream = AudioSystem.getAudioInputStream(
            new File(filePath).getAbsoluteFile());
    clipAdvertisement.open(musicInputStream);
    clipAdvertisement.loop(Clip.LOOP_CONTINUOUSLY);
}
public static void appendAdvertisementFiles(List<Advertisement> advertisementItems) throws IOException, UnsupportedAudioFileException {
    //add the first audio file to stream
    AudioInputStream appendedFiles = AudioSystem.getAudioInputStream(
            new File(advertisementItems.get(0)
                    .getId() + "-advertisement.wav"));
    //loop through an combine
    for(int i =1;i<advertisementItems.size();i++){
        File file=  new File(advertisementItems.get(i)
                .getId() + "-advertisement.wav");
        AudioInputStream toBeAppended = AudioSystem.getAudioInputStream(file);
        //append them
        appendedFiles =
                new AudioInputStream(
                        new SequenceInputStream(appendedFiles, toBeAppended),
                        appendedFiles.getFormat(),
                        appendedFiles.getFrameLength() + toBeAppended.getFrameLength());
    }
    advertisementInputStream = appendedFiles;
}
     public void checkShouldWePlayAnAdvertisement() throws IOException, UnsupportedAudioFileException, LineUnavailableException, InterruptedException {
    ArrayList<String> playedAtTimes = new ArrayList<>();
    ArrayList<Advertisement> advertisementsToBePlayed = new ArrayList<>();
    boolean found;
    //played at times is used to keep track of what time we played advertisements
    //so when the loop reruns and the time hasnt changed it doesnt play it again
    while(true){
        found = false;
        ZonedDateTime zdt = ZonedDateTime.now();
      String timeHHMM =zdt.toString().substring(11,16);
     for(int i =0;i<schedule.getAdvertisementScheduleItems().size();i++)
  {



    if(schedule.getAdvertisementScheduleItems().get(i).getTimes()
    .contains(timeHHMM))
      {
//this item should be played now
if(playedAtTimes.contains(timeHHMM)){
    //we already played this,but the time hasnt changed when the loop ran again
}else{
advertisementsToBePlayed.add(schedule.getAdvertisementScheduleItems().get(i).getAdvertisement());
found = true;
}
}
}
 if(found== true){
playedAtTimes.add(timeHHMM);
appendAdvertisementFiles(advertisementsToBePlayed);
pauseMusic();
playAdvertisements();
stopAdvertisement();
resumeAudioMusic();
}
    }
 }
 }

iotclient(听众的一部分(

public class IotClient extends Thread {
 Thread t;
 String username;
public IotClient(String username)  {
    this.username = username;
}
 public void run(){
 String clientEndpoint = "removve";       // replace <prefix> and <region> with your own
 String clientId = "1";                              // replace with your own client ID. Use unique client IDs for concurrent connections.
// AWS IAM credentials could be retrieved from AWS Cognito, STS, or other secure sources
AWSIotMqttClient client = new AWSIotMqttClient(clientEndpoint, clientId, "remove", "remove");
    // optional parameters can be set before connect()
try {
    client.connect();
} catch (AWSIotException e) {
    e.printStackTrace();
}
AWSIotQos qos = AWSIotQos.QOS0;
AWSIotTopic topic = new MyTopic("schedule/"+ username, qos);
try {
    client.subscribe(topic, true);
} catch (AWSIotException e) {
    e.printStackTrace();
}
while(true){
}
}
public void start(){
if (t == null) {
    t = new Thread (this, "IotClientThread");
    t.start ();
}
   }

mytopic(听众的一部分(

public class MyTopic extends AWSIotTopic {
public MyTopic(String topic, AWSIotQos qos) {
    super(topic, qos);
}
@Override
public void onMessage(AWSIotMessage message) {
    System.out.println("Message recieved from topic: "+ message.getStringPayload());
}
}

线程通过内存中的消息的"容器"对象通过共享引用进行通信。这可能很简单,就像某些类的共享实例的可变字段一样简单,或更典型的集合,例如列表,地图,尤其是队列。

ArrayBlockingQueue是一个很好的共享参考。从一个线程到另一个线程,每个消息方向都会有一个队列。如果您有3个可以真正交谈的线程,那么您将有3对,因此有6个队列(每对2个(。但是,通常只有一个方向流动,因此您可以节省一些。

现在,这些通信的核心是等待一些消息的机制(读者/消费者(,并在推动消息时通知(作者/制作人(。

当然,您可以从底部开始,从原始的等待/通知中学习(那里的教程的彭蒂(,或者您可以跳入诸如ArrayBlockingquequequequequequequequequequequequequequequequequequequequequequequequequequequequequequequequequequeation of take((/放((。我建议从底部开始,因为当您在Java.util.concurrent遇到其他课程时,事情会更快。*

我不能给您代码,如果没有学会ITC的基本(线程间通信(,这在您的级别上是无法理解的。

良好的学习!

ps:途中有许多陷阱,例如线程安全性,写作的原子性,锁定算法,僵局,生病,饥饿。只需上面的多队列示例即可导致消息到达时的循环依赖关系,尤其是当队列填满和阻塞时。这是一门科学!

最新更新