logo

Едновременно сортиране чрез сливане в споделена памет

Дадено е число 'n' и n числа, сортирайте числата, като използвате Обединяване на сортиране. (Съвет: Опитайте се да използвате системни извиквания shmget shmat).
Част 1: Алгоритъмът (КАК?)  
Рекурсивно направете два дъщерни процеса, един за лявата половина, един за дясната половина. Ако броят на елементите в масива за даден процес е по-малък от 5, изпълнете a Сортиране на вмъкване . След това родителят на двете деца обединява резултата и се връща обратно към родителя и т.н. Но как да го направите едновременно?
Част 2: Логичното (ЗАЩО?)  
Важната част от решението на този проблем не е алгоритмично, а да обясни концепциите за операционна система и ядро. 
За да постигнем едновременно сортиране, имаме нужда от начин да накараме два процеса да работят върху един и същи масив едновременно. За да улесни нещата, Linux предоставя много системни извиквания чрез прости крайни точки на API. Два от тях са shmget() (за разпределяне на споделена памет) и shmat() (за операции със споделена памет). Създаваме споделено пространство в паметта между дъщерния процес, който разклоняваме. Всеки сегмент е разделен на ляво и дясно дете, което е сортирано, като интересната част е, че работят едновременно! shmget() изисква от ядрото да разпредели a споделена страница и за двата процеса.
Защо традиционният fork() не работи?  
Отговорът се крие в това какво всъщност прави fork(). От документацията „fork() създава нов процес чрез дублиране на извикващия процес“. Дъщерният процес и родителският процес се изпълняват в отделни пространства на паметта. По време на fork() и двете пространства на паметта имат едно и също съдържание. Паметта записва файловия дескриптор (fd), промените и т.н., извършени от един от процесите, не засягат другия. Следователно имаме нужда от споделен сегмент на паметта.
 

CPP
#include    #include  #include  #include  #include  #include  #include  #include  void insertionSort(int arr[] int n); void merge(int a[] int l1 int h1 int h2); void mergeSort(int a[] int l int h) {  int i len = (h - l + 1);  // Using insertion sort for small sized array  if (len <= 5)  {  insertionSort(a + l len);  return;  }  pid_t lpid rpid;  lpid = fork();  if (lpid < 0)  {  // Lchild proc not created  perror('Left Child Proc. not createdn');  _exit(-1);  }  else if (lpid == 0)  {  mergeSort(a l l + len / 2 - 1);  _exit(0);  }  else  {  rpid = fork();  if (rpid < 0)  {  // Rchild proc not created  perror('Right Child Proc. not createdn');  _exit(-1);  }  else if (rpid == 0)  {  mergeSort(a l + len / 2 h);  _exit(0);  }  }  int status;  // Wait for child processes to finish  waitpid(lpid &status 0);  waitpid(rpid &status 0);  // Merge the sorted subarrays  merge(a l l + len / 2 - 1 h); } /* Function to sort an array using insertion sort*/ void insertionSort(int arr[] int n) {  int i key j;  for (i = 1; i < n; i++)  {  key = arr[i];  j = i - 1;  /* Move elements of arr[0..i-1] that are  greater than key to one position ahead  of their current position */  while (j >= 0 && arr[j] > key)  {  arr[j + 1] = arr[j];  j = j - 1;  }  arr[j + 1] = key;  } } // Method to merge sorted subarrays void merge(int a[] int l1 int h1 int h2) {  // We can directly copy the sorted elements  // in the final array no need for a temporary  // sorted array.  int count = h2 - l1 + 1;  int sorted[count];  int i = l1 k = h1 + 1 m = 0;  while (i <= h1 && k <= h2)  {  if (a[i] < a[k])  sorted[m++] = a[i++];  else if (a[k] < a[i])  sorted[m++] = a[k++];  else if (a[i] == a[k])  {  sorted[m++] = a[i++];  sorted[m++] = a[k++];  }  }  while (i <= h1)  sorted[m++] = a[i++];  while (k <= h2)  sorted[m++] = a[k++];  int arr_count = l1;  for (i = 0; i < count; i++ l1++)  a[l1] = sorted[i]; } // To check if array is actually sorted or not void isSorted(int arr[] int len) {  if (len == 1)  {  std::cout << 'Sorting Done Successfully' << std::endl;  return;  }  int i;  for (i = 1; i < len; i++)  {  if (arr[i] < arr[i - 1])  {  std::cout << 'Sorting Not Done' << std::endl;  return;  }  }  std::cout << 'Sorting Done Successfully' << std::endl;  return; } // To fill random values in array for testing // purpose void fillData(int a[] int len) {  // Create random arrays  int i;  for (i = 0; i < len; i++)  a[i] = rand();  return; } // Driver code int main() {  int shmid;  key_t key = IPC_PRIVATE;  int *shm_array;  int length = 128;  // Calculate segment length  size_t SHM_SIZE = sizeof(int) * length;  // Create the segment.  if ((shmid = shmget(key SHM_SIZE IPC_CREAT | 0666)) < 0)  {  perror('shmget');  _exit(1);  }  // Now we attach the segment to our data space.  if ((shm_array = (int *)shmat(shmid NULL 0)) == (int *)-1)  {  perror('shmat');  _exit(1);  }  // Create a random array of given length  srand(time(NULL));  fillData(shm_array length);  // Sort the created array  mergeSort(shm_array 0 length - 1);  // Check if array is sorted or not  isSorted(shm_array length);  /* Detach from the shared memory now that we are  done using it. */  if (shmdt(shm_array) == -1)  {  perror('shmdt');  _exit(1);  }  /* Delete the shared memory segment. */  if (shmctl(shmid IPC_RMID NULL) == -1)  {  perror('shmctl');  _exit(1);  }  return 0; } 
Java
import java.util.Arrays; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; public class ConcurrentMergeSort {  // Method to merge sorted subarrays  private static void merge(int[] a int low int mid int high) {  int[] temp = new int[high - low + 1];  int i = low j = mid + 1 k = 0;  while (i <= mid && j <= high) {  if (a[i] <= a[j]) {  temp[k++] = a[i++];  } else {  temp[k++] = a[j++];  }  }  while (i <= mid) {  temp[k++] = a[i++];  }  while (j <= high) {  temp[k++] = a[j++];  }  System.arraycopy(temp 0 a low temp.length);  }  // RecursiveAction for fork/join framework  static class SortTask extends RecursiveAction {  private final int[] a;  private final int low high;  SortTask(int[] a int low int high) {  this.a = a;  this.low = low;  this.high = high;  }  @Override  protected void compute() {  if (high - low <= 5) {  Arrays.sort(a low high + 1);  } else {  int mid = low + (high - low) / 2;  invokeAll(new SortTask(a low mid) new SortTask(a mid + 1 high));  merge(a low mid high);  }  }  }  // Method to check if array is sorted  private static boolean isSorted(int[] a) {  for (int i = 0; i < a.length - 1; i++) {  if (a[i] > a[i + 1]) {  return false;  }  }  return true;  }  // Method to fill array with random numbers  private static void fillData(int[] a) {  Random rand = new Random();  for (int i = 0; i < a.length; i++) {  a[i] = rand.nextInt();  }  }  public static void main(String[] args) {  int length = 128;  int[] a = new int[length];  fillData(a);  ForkJoinPool pool = new ForkJoinPool();  pool.invoke(new SortTask(a 0 a.length - 1));  if (isSorted(a)) {  System.out.println('Sorting Done Successfully');  } else {  System.out.println('Sorting Not Done');  }  } } 
Python3
import numpy as np import multiprocessing as mp import time def insertion_sort(arr): n = len(arr) for i in range(1 n): key = arr[i] j = i - 1 while j >= 0 and arr[j] > key: arr[j + 1] = arr[j] j -= 1 arr[j + 1] = key def merge(arr l mid r): n1 = mid - l + 1 n2 = r - mid L = arr[l:l + n1].copy() R = arr[mid + 1:mid + 1 + n2].copy() i = j = 0 k = l while i < n1 and j < n2: if L[i] <= R[j]: arr[k] = L[i] i += 1 else: arr[k] = R[j] j += 1 k += 1 while i < n1: arr[k] = L[i] i += 1 k += 1 while j < n2: arr[k] = R[j] j += 1 k += 1 def merge_sort(arr l r): if l < r: if r - l + 1 <= 5: insertion_sort(arr) else: mid = (l + r) // 2 p1 = mp.Process(target=merge_sort args=(arr l mid)) p2 = mp.Process(target=merge_sort args=(arr mid + 1 r)) p1.start() p2.start() p1.join() p2.join() merge(arr l mid r) def is_sorted(arr): for i in range(1 len(arr)): if arr[i] < arr[i - 1]: return False return True def fill_data(arr): np.random.seed(0) arr[:] = np.random.randint(0 1000 size=len(arr)) if __name__ == '__main__': length = 128 shm_array = mp.Array('i' length) fill_data(shm_array) start_time = time.time() merge_sort(shm_array 0 length - 1) end_time = time.time() if is_sorted(shm_array): print('Sorting Done Successfully') else: print('Sorting Not Done') print('Time taken:' end_time - start_time) 
JavaScript
// Importing required modules const { Worker isMainThread parentPort workerData } = require('worker_threads'); // Function to merge sorted subarrays function merge(a low mid high) {  let temp = new Array(high - low + 1);  let i = low j = mid + 1 k = 0;  while (i <= mid && j <= high) {  if (a[i] <= a[j]) {  temp[k++] = a[i++];  } else {  temp[k++] = a[j++];  }  }  while (i <= mid) {  temp[k++] = a[i++];  }  while (j <= high) {  temp[k++] = a[j++];  }  for (let p = 0; p < temp.length; p++) {  a[low + p] = temp[p];  } } // Function to check if array is sorted function isSorted(a) {  for (let i = 0; i < a.length - 1; i++) {  if (a[i] > a[i + 1]) {  return false;  }  }  return true; } // Function to fill array with random numbers function fillData(a) {  for (let i = 0; i < a.length; i++) {  a[i] = Math.floor(Math.random() * 1000);  } } // Function to sort the array using merge sort function sortArray(a low high) {  if (high - low <= 5) {  a.sort((a b) => a - b);  } else {  let mid = low + Math.floor((high - low) / 2);  sortArray(a low mid);  sortArray(a mid + 1 high);  merge(a low mid high);  } } // Main function function main() {  let length = 128;  let a = new Array(length);  fillData(a);  sortArray(a 0 a.length - 1);  if (isSorted(a)) {  console.log('Sorting Done Successfully');  } else {  console.log('Sorting Not Done');  } } main(); 

Изход: 
 



Sorting Done Successfully  

Времева сложност: O(N log N)

Помощно пространство: O(N)


Подобрения в производителността?  
Опитайте се да определите времето на кода и сравнете ефективността му с традиционния последователен код. Ще се изненадате да разберете, че производителността на последователното сортиране е по-добра! 
Когато да речем ляво дете има достъп до левия масив, масивът се зарежда в кеша на процесора. Сега, когато се осъществи достъп до десния масив (поради едновременни достъпи), има пропуск в кеша, тъй като кешът се запълва с левия сегмент и след това десният сегмент се копира в кеш паметта. Този процес напред-назад продължава и влошава производителността до такова ниво, че се представя по-лошо от последователния код.
Има начини за намаляване на пропуските в кеша чрез контролиране на работния процес на кода. Но те не могат да бъдат избегнати напълно!