python - Error when training the model for sensor data using Kafka and RDL index 4096 is out of bounds for axis 0 with size 4096

This is my code using python, RDL Reinforcement Deep Learning and Kafka, both zookeeper and and kafka s

This is my code using python, RDL Reinforcement Deep Learning and Kafka, both zookeeper and and kafka server are working with no issue, when I run my following code using jupyter:

import json
import threading
import time
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3mon.vec_env import DummyVecEnv
from sklearn.preprocessing import StandardScaler
from confluent_kafka import Producer, Consumer

# ✅ Data Preparation
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# ✅ (Optional) Scaling - Uncomment if needed
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

USE_SCALED_DATA = False  # Change to True if you want to use scaled data
X_train_final = X_train_scaled if USE_SCALED_DATA else X_train
X_test_final = X_test_scaled if USE_SCALED_DATA else X_test

# ✅ Kafka Configuration
KAFKA_BROKER = 'localhost:9092'
KAFKA_TOPIC = 'sensordata'
KAFKA_ACTION_TOPIC = 'sensorfeedback'

# ✅ Kafka Producer Setup
producer = Producer({'bootstrap.servers': KAFKA_BROKER})

# ✅ Custom Gym Environment
class CustomEnv(gym.Env):
    def __init__(self, dataset):
        super(CustomEnv, self).__init__()
        self.dataset = dataset
        self.current_idx = 0
        self.observation_space = spaces.Box(low=-1, high=1, shape=(8,), dtype=np.float32)
        self.action_space = spaces.Discrete(2)  # Binary actions (0 or 1)
    def reset(self, seed=None, options=None):
        if seed is not None:
            np.random.seed(seed)
        self.current_idx = 0
        self.state = self.dataset[self.current_idx]

        # ✅ Ensure correct shape
        return np.array(self.state, dtype=np.float32).reshape(self.observation_space.shape), {}  


    def step(self, action):
        self.current_idx += 1
        done = self.current_idx >= len(self.dataset)
        truncated = False  # No early stopping condition

        if done:
            self.current_idx = 0  # Loop back for continuous learning

        self.state = self.dataset[self.current_idx]
        reward = 1 if action == 1 else 0  

        return np.array(self.state, dtype=np.float32), reward, done, truncated, {}

# ✅ RL Environment
env = DummyVecEnv([lambda: CustomEnv(X_train_final)])

# ✅ PPO Model Hyperparameters
model = PPO(
    policy="MlpPolicy",
    env=env,
    learning_rate=0.0005,
    n_steps=4096,
    batch_size=2048,
    n_epochs=5,
    gamma=0.99,
    gae_lambda=0.95,
    ent_coef=0.01,
    verbose=1,
    clip_range=0.2,
)

# ✅ Kafka Consumer Setup
consumer = Consumer({
    'bootstrap.servers': KAFKA_BROKER,
    'group.id': 'my-consumer-group',
    'auto.offset.reset': 'earliest'
})

# ✅ Kafka Message Handling
def on_message(msg):
    try:
        #sensor_data = np.array(json.loads(msg.value().decode())).reshape(1, -1)
        #sensor_data = sensor_data[:8]
        sensor_data = np.array(json.loads(msg.value().decode())).reshape(1, 8)  # Enforce shape
        env.envs[0].state = sensor_data[0]  # Update environment state

        # ✅ RL Model makes a decision
        action, _ = model.predict(sensor_data, deterministic=True)
        print(f" Predicted Action: {action}")

        # ✅ Send action feedback to Kafka
        producer.produce(KAFKA_ACTION_TOPIC, json.dumps({'action': int(action[0])}))
        producer.flush()
    except Exception as e:
        print(f"❌ Error processing Kafka message: {e}")

# ✅ Kafka Consumer Function
def consume_sensor_data():
    consumer.subscribe([KAFKA_TOPIC])
    try:
        while True:
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                print(f"⚠️ Kafka error: {msg.error()}")
                continue
            on_message(msg)
    except Exception as e:
        print(f"❌ Kafka Consumer Error: {e}")
    finally:
        consumer.close()

# ✅ Kafka Producer Function (Simulated Sensor Data)
def produce_sensor_data():
    try:
        for i in range(len(X_test_final)):  # ✅ Send actual test data
            sensor_data = X_test_final[i].tolist()
            producer.produce(KAFKA_TOPIC, json.dumps(sensor_data))
            producer.flush()
            time.sleep(1)
    except Exception as e:
        print(f"❌ Kafka Producer Error: {e}")

# ✅ Start Kafka Producer & Consumer in Threads
producer_thread = threading.Thread(target=produce_sensor_data, daemon=True)
consumer_thread = threading.Thread(target=consume_sensor_data, daemon=True)

producer_thread.start()
consumer_thread.start()

# ✅ Train RL Model in a Separate Thread
def train_model():
    model.learn(total_timesteps=50000)

train_thread = threading.Thread(target=train_model, daemon=True)
train_thread.start()

# ✅ Model Evaluation
def evaluate_model(model, X_test, y_test):
    try:
        predictions = []
        for i in range(len(X_test)):
            state = X_test[i].reshape(1, -1)
            action, _ = model.predict(state, deterministic=True)
            predictions.append(action[0])

        accuracy = accuracy_score(y_test, predictions)
        precision = precision_score(y_test, predictions, zero_division=1)
        recall = recall_score(y_test, predictions, zero_division=1)
        f1 = f1_score(y_test, predictions, zero_division=1)

        print(f"\n Accuracy: {accuracy:.4f}")
        print(f" Precision: {precision:.4f}")
        print(f" Recall: {recall:.4f}")
        print(f" F1 Score: {f1:.4f}")

        return accuracy, precision, recall, f1
    except Exception as e:
        print(f"❌ Model Evaluation Error: {e}")

# ✅ Evaluate model over multiple epochs
metrics_history = []
for epoch in range(5):
    train_model(total_timesteps=50000)
    accuracy, precision, recall, f1 = evaluate_model(model, X_test_final, y_test)
    metrics_history.append((accuracy, precision, recall, f1))

print("✅ Training & Evaluation Completed Successfully!")

The error I am facing is:

IndexError                                Traceback (most recent call last)
Cell In[145], line 173
    171 metrics_history = []
    172 for epoch in range(5):
--> 173     train_model()
    174     accuracy, precision, recall, f1 = evaluate_model(model, X_test_final, y_test)
    175     metrics_history.append((accuracy, precision, recall, f1))

Cell In[145], line 142, in train_model()
    141 def train_model():
--> 142     model.learn(total_timesteps=50000)

File ~/.local/lib/python3.8/site-packages/stable_baselines3/ppo/ppo.py:311, in PPO.learn(self, total_timesteps, callback, log_interval, tb_log_name, reset_num_timesteps, progress_bar)
    302 def learn(
    303     self: SelfPPO,
    304     total_timesteps: int,
   (...)
    309     progress_bar: bool = False,
    310 ) -> SelfPPO:
--> 311     return super().learn(
    312         total_timesteps=total_timesteps,
    313         callback=callback,
    314         log_interval=log_interval,
    315         tb_log_name=tb_log_name,
    316         reset_num_timesteps=reset_num_timesteps,
    317         progress_bar=progress_bar,
    318     )

File ~/.local/lib/python3.8/site-packages/stable_baselines3/common/on_policy_algorithm.py:323, in OnPolicyAlgorithm.learn(self, total_timesteps, callback, log_interval, tb_log_name, reset_num_timesteps, progress_bar)
    320 assert self.env is not None
    322 while self.num_timesteps < total_timesteps:
--> 323     continue_training = self.collect_rollouts(self.env, callback, self.rollout_buffer, n_rollout_steps=self.n_steps)
    325     if not continue_training:
    326         break

File ~/.local/lib/python3.8/site-packages/stable_baselines3/common/on_policy_algorithm.py:247, in OnPolicyAlgorithm.collect_rollouts(self, env, callback, rollout_buffer, n_rollout_steps)
    244             terminal_value = self.policy.predict_values(terminal_obs)[0]  # type: ignore[arg-type]
    245         rewards[idx] += self.gamma * terminal_value
--> 247 rollout_buffer.add(
    248     self._last_obs,  # type: ignore[arg-type]
    249     actions,
    250     rewards,
    251     self._last_episode_starts,  # type: ignore[arg-type]
    252     values,
    253     log_probs,
    254 )
    255 self._last_obs = new_obs  # type: ignore[assignment]
    256 self._last_episode_starts = dones

File ~/.local/lib/python3.8/site-packages/stable_baselines3/common/buffers.py:470, in RolloutBuffer.add(self, obs, action, reward, episode_start, value, log_prob)
    467 # Reshape to handle multi-dim and discrete action spaces, see GH #970 #1392
    468 action = action.reshape((self.n_envs, self.action_dim))
--> 470 self.observations[self.pos] = np.array(obs)
    471 self.actions[self.pos] = np.array(action)
    472 self.rewards[self.pos] = np.array(reward)

IndexError: index 4096 is out of bounds for axis 0 with size 4096
Predicted Action: [0]
 Predicted Action: [1]
 Predicted Action: [1]
❌ Error processing Kafka message: cannot reshape array of size 9 into shape (1,8)
 Predicted Action: [1]
 Predicted Action: [1]
 Predicted Action: [1]
 Predicted Action: [1]
 Predicted Action: [1]
❌ Error processing Kafka message: cannot reshape array of size 9 into shape (1,8)
❌ Error processing Kafka message: cannot reshape array of size 9 into shape (1,8)
 Predicted Action: [1]

and it goes without stop

I tried reseting the 4096 to 2048 , 1024 , 256 , 128 and I get same error same thing with total_timesteps I lowered the number but same error I get.

How to resolve the issue?

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744324300a4568566.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信