"Scikit-learn is a powerful tool for classifying exoplanets. By leveraging machine learning algorithms like Random Forest, Support Vector Machines, and Logistic Regression, researchers can analyze vast datasets of stellar observations and identify potential planetary candidates with high accuracy. This approach not only automates the classification process but also enables the discovery of exoplanets that might otherwise go unnoticed."- Gemini 2024
In this project, you'll build a machine learning model to classify celestial objects as either exoplanets or false positives using data from NASA's Kepler Space Telescope.
Use the Kepler Exoplanet Search Results dataset: Kepler Exoplanet Archive
This dataset contains information about thousands of Kepler objects of interest (KOIs), including confirmed exoplanets and false positives.
# Data manipulation import pandas as pd # Data visualization import matplotlib.pyplot as plt # Feature selection from sklearn.pipeline import Pipeline from sklearn.impute import SimpleImputer from sklearn.feature_selection import VarianceThreshold from sklearn.feature_selection import SelectKBest # Training and evaluating classifiers from sklearn.model_selection import cross_validate from sklearn.preprocessing import StandardScaler from sklearn.pipeline import make_pipeline from sklearn.linear_model import LogisticRegression from sklearn.ensemble import RandomForestClassifier from sklearn.svm import SVC # Saving model import os import joblib def get_data(): # Download .csv from NASA Exoplanet Archive and save as exoplanets.csv # https://exoplanetarchive.ipac.caltech.edu/cgi-bin/TblView/nph-tblView?app=ExoTbls&config=cumulative # Read data, skipping comment lines df = pd.read_csv('exoplanets.csv', comment='#', index_col='kepid') return df def explore_data(): df = get_data() print('Shape of dataframe (rows, columns):\t', df.shape) # Describe numeric columns stats = df.describe() print(stats) # Describe non-numeric columns cols = df.columns.difference(stats.columns) info = df[cols].describe() print(info) # Visualize histogram of disposition column df['koi_disposition'].hist() plt.title('Distribution of koi_disposition') plt.xlabel('koi_disposition') plt.ylabel('Frequency') plt.show() def clean_df(df): # Remove empty columns df.dropna(axis=1, how='all', inplace=True) # Remove empty rows df.dropna(how='all', inplace=True) # Remove duplicate rows df.drop_duplicates(inplace=True) # Remove sparse rows df.dropna(thresh=df.shape[1] * 0.5, inplace=True) def select_features(df): # Split feature (X) and target (y) values target = 'koi_disposition' y = df[target] X = df.drop(target, axis=1) # Drop non-numeric columns (not features) count0 = X.shape[1] X = X.drop(columns=X.select_dtypes(exclude='number').columns) print(f'Removed {count0 - X.shape[1]} non-numeric columns') # Drop sparse columns (more than 10% missing values) count0 = X.shape[1] X = X.dropna(axis=1, thresh=int(0.9 * len(X))) print(f'Removed {count0 - X.shape[1]} sparse columns') # Unsupervised feature selection methods # Remove features with low variance count0 = X.shape[1] selector = VarianceThreshold(threshold=0.01) selector.fit_transform(X) # get_support() returns a boolean mask of selected features X = X[X.columns[selector.get_support()]] print(f'Removed {count0 - X.shape[1]} low variance features') # Analyze pairwise correlations between remaining features # corr() default calculates the Pearson correlation coefficient # remove redundant variables based on correlation matrix = X.corr() # Set a threshold for correlation (e.g., 0.9) threshold = 0.9 # Find highly correlated features cols = set() for i in range(len(matrix.columns)): for j in range(i+1, len(matrix.columns)): if abs(matrix.iloc[i, j]) > threshold: cols.add(matrix.columns[i]) X = X.drop(cols, axis=1) print(f'Removed {len(cols)} correlated features') # Supervised feature selection method # Input missing values and perform # ANOVA feature selection for numeric features and categorical target # TIP: Try different values for strategy and k to improve performance pipeline = Pipeline([ ('imputer', SimpleImputer(strategy='mean')), ('feature_selection', SelectKBest(k=10)) ]) X = pipeline.fit_transform(X, y) return X, y def compare_classifiers(X, y, classifiers): scoring = { 'accuracy': 'accuracy', 'precision': 'precision_macro', 'recall': 'recall_macro', 'f1': 'f1_macro' } results = {} for name, clf in classifiers.items(): pipeline = make_pipeline(StandardScaler(), clf) cv = cross_validate(pipeline, X, y, scoring=scoring, return_train_score=False) results[name] = { metric: cv[f'test_{metric}'] for metric in scoring.keys() } mean_scores = pd.DataFrame({ name: {metric: scores.mean() for metric, scores in clf_results.items()} for name, clf_results in results.items() }).T std_scores = pd.DataFrame({ name: {metric: scores.std() for metric, scores in clf_results.items()} for name, clf_results in results.items() }).T summary = pd.concat([mean_scores, std_scores], keys=['Mean', 'Std'], axis=1) return summary def save_best_mean(X, y, scores, classifiers, save_dir): # Find the best classifier based on mean accuracy score = scores['Mean']['accuracy'].max() name = scores['Mean']['accuracy'].idxmax() clf = classifiers[name] # Train the best classifier on the entire dataset pipeline = make_pipeline(StandardScaler(), clf) pipeline.fit(X, y) # Save the best model with name and score for reference os.makedirs(save_dir, exist_ok=True) save_path = f'{save_dir}/{name}_{score:.2f}.joblib' joblib.dump(pipeline, save_path) return save_path def train_model(X, y): # Classifier selection # TIP fine tune classifier parameters to improve performance classifiers = { 'logistic_regression': LogisticRegression(), 'random_forest': RandomForestClassifier(), 'svm': SVC() } # Evaluating classifiers performance = compare_classifiers(X, y, classifiers) print(performance) # Training & saving model model_path = save_best_mean(X, y, performance, classifiers, 'models') print(f"Best model saved to {model_path}") return model_path def serve_model(model_path, X_new): model = joblib.load(model_path) predictions = model.predict(X_new) return predictions def main(): # Data collection & pre-processing df = get_data() clean_df(df) print('Shape of dataframe after cleaning:\t', df.shape) # Feature selection # Returns X: a numpy.ndarray of Features # y: a pandas series of Labels X, y = select_features(df) print('Shape of filtered features\t', X.shape) # The train_model function will save the best model and return its path model_path = train_model(X, y) # In subsequent runs, you can set the path manually # model_path = 'models/random_forest_0.83.joblib' # Example - serving a single prediction sample = X[0].reshape(1, -1) prediction = serve_model(model_path, sample) print(f'Prediction = {prediction[0]}') print(f'Actual = {y.iloc[0]}') # Example - serving multiple predictions samples = X[:3] predictions = serve_model(model_path, samples) print(f'Predictions = {predictions}') print(f'Actual = {y.values[:3]}') # When making predictions with the loaded model: # New data should have the same number of features as the training data. # The input should be a 2D array, even for a single sample # Hence the reshape(1, -1) in the example if __name__ == '__main__': # Uncomment this to optionally explore the data prior to training # explore_data() main()